Skip to content

Iceberg Sink Connector Not Consuming from Topic #338

@SAITEJA245

Description

@SAITEJA245

Iceberg Sink Connector Not Consuming from Topic Even After Setting consumer.override.auto.offset.reset


Description:

We are using the Iceberg Sink Connector to consume from a Kafka topic emf. However, the connector is not consuming any data, and the CURRENT-OFFSET remains unset (-) even after performing the following actions:


Steps to Reproduce:

  1. Deployed the connector with this configuration (redacted for brevity):
{
  "name": "emf-iceberg-sink-v2",
  "config": {
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "type": "sink",

    "topics": "emf",
    "tasks.max": "1",

    "iceberg.tables.evolve-schema-enabled": "true",

    "iceberg.catalog": "local",
    "iceberg.catalog.warehouse": "s3://iceberg-dev/warehouse",
    "iceberg.catalog.uri": "jdbc:mariadb://mariadb.dev.local:3306/ICEBERG_CATALOG?user=******&password=******&useSSL=false&verifyServerCertificate=false",
    "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "iceberg.catalog.catalog-impl": "org.apache.iceberg.jdbc.JdbcCatalog",
    "iceberg.catalog.client.region": "******",
    "iceberg.catalog.s3.access-key-id": "***********",
    "iceberg.catalog.s3.secret-access-key": "************",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

    "iceberg.tables": "emf.event",
    "iceberg.tables.upsert.enabled": "false",
    "iceberg.tables.write.mode": "append",

    "consumer.override.bootstrap.servers": "kafka-ha1.dev.local:9092",
    "consumer.override.auto.offset.reset": "earliest"
  }
}

Here is the status of the connector

curl -s http://localhost:8083/connectors/emf-iceberg-sink-v2/status | jq
{
  "name": "emf-iceberg-sink-v2",
  "connector": {
    "state": "RUNNING",
    "worker_id": "**.**.**.***:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "**.**.**.***:8083"
    }
  ],
  "type": "sink"
}

  1. I Verified that the topic has data with:
kafka-console-consumer --bootstrap-server kafka-ha1.dev.local:9092 \
  --topic emf --from-beginning --max-messages 10
  1. Deleted and recreated the connector to ensure a fresh start.

  2. Confirmed via:

kafka-consumer-groups --bootstrap-server kafka-ha1.dev.local:9092 \
  --describe --group connect-emf-iceberg-sink-v2

That the group is active, but:

CURRENT-OFFSET: -
LAG: -

Describe consumer group output:

kafka-consumer-groups \
  --bootstrap-server kafka-ha1.dev.local:9092 \
  --describe \
  --group connect-emf-iceberg-sink-v2

GROUP                       TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                   HOST            CLIENT-ID
connect-emf-iceberg-sink-v2 emf             0          -               19296458        -               connector-consumer-emf-iceberg-sink-v2-0-cb18745f-0736-4384-bdca-4c5be80e1912 /**.**.**.***   connector-consumer-emf-iceberg-sink-v2-0
connect-emf-iceberg-sink-v2 emf             1          -               17445703        -               connector-consumer-emf-iceberg-sink-v2-0-cb18745f-0736-4384-bdca-4c5be80e1912 /**.**.**.***   connector-consumer-emf-iceberg-sink-v2-0

Additional Info:

  • Other connectors (like emf-s3-json-sink), which consume and write to S3 in plain JSON, can consume from the same topic without issue.

Expected Behavior:

The connector should consume data from the beginning of the topic and commit offsets to __connect-offsets, setting the CURRENT-OFFSET appropriately.


Actual Behavior:

  • Connector remains idle.
  • CURRENT-OFFSET is not set.
  • No data is written to the Iceberg table.
  • No offset commits observed in __connect-offsets.

Environment:

  • Kafka: 2.8.2
  • Iceberg Sink Connector version: [iceberg-kafka-connect-runtime-0.5.10]
  • Number of topic partitions: 2

Request:

Please help investigate why the Iceberg sink connector is not consuming messages and not committing offsets despite the configuration being correct. Let me know if additional logs or debug output are needed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions