Skip to content

Iceberg connector with multiple tables and composite primary keys #344

@mikunjrathwa

Description

@mikunjrathwa

We are trying to setup a kafka connector with below configs.

  • We have two topics demo_tpc1 and demo_tpc2 to be ingested to iceberg via polaris.
  • Both have non identical composite primary keys "Primary1,Primary2" and "Primary1,Primary2,Primary3" respectively.
  • Creating the connector wih below configs results in below error:
    org.apache.iceberg.types.Types$NestedField.fieldId()\" because the return value of \"org.apache.iceberg.Schema.findField(String)\" is null\
  • Connector:
{

  "name": "multi-test",

  "config": {

    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "tasks.max": "1",

    "topics":"demo_tpc1,demo_tpc2",
	
	"iceberg.tables": "demo.demo_tpc1,demo.demo_tpc2",

    "iceberg.tables.demo_tpc1.name": "demo.demo_tpc1",
	"iceberg.tables.demo_tpc2.name": "demo.demo_tpc2",
	
	"iceberg.table.awf.demo_tpc1.id-columns": "Primary1,Primary2",
	"iceberg.table.awf.demo_tpc2.id-columns": "Primary1,Primary2,Primary3",
	
    "iceberg.tables.schema-case-insensitive": "true",
    "iceberg.tables.auto-create-enabled": "true",
    "iceberg.tables.evolve-schema-enabled": "true",
    "iceberg.tables.upsert-mode-enabled": "true",

    "iceberg.catalog": "polaris",
    "iceberg.catalog.type": "rest",

    "iceberg.catalog.uri": "http://<polaris_url>:8181/api/catalog",
    "iceberg.catalog.warehouse": "awf",
    "iceberg.catalog.credential": "root:polaris",

    "iceberg.catalog.scope": "PRINCIPAL_ROLE:ALL",
    "iceberg.catalog.token-refresh-enabled": "true",
    "iceberg.catalog.io-impl": "org.apache.iceberg.azure.adlsv2.ADLSFileIO",
    "iceberg.catalog.include-credentials": "true",
    
	"key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "https://testconf.azure.confluent.cloud",
    "value.converter.schema.registry.url": "https://testconf.azure.confluent.cloud",
    "key.converter.basic.auth.credentials.source": "USER_INFO",
    "key.converter.basic.auth.user.info": "test:test",
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.basic.auth.user.info": "test:test",

    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",
    "iceberg.control.topic.enable": "true",

    "iceberg.control.group-id": "multi-g1"
  }

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions