Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String DEFAULT_CATALOG_NAME = "iceberg";
private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg";
public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-";
public static final String DEFAULT_COORDINATOR_CONTROL_GROUP_SUFFIX = "-coord";

public static final int SCHEMA_UPDATE_RETRIES = 2; // 3 total attempts
public static final int CREATE_TABLE_RETRIES = 2; // 3 total attempts
Expand Down Expand Up @@ -390,14 +391,14 @@ public String controlTopic() {
return getString(CONTROL_TOPIC_PROP);
}

public String controlGroupId() {
public String controlCoordinatorGroupId() {
String result = getString(CONTROL_GROUP_ID_PROP);
if (result != null) {
return result;
return result + DEFAULT_COORDINATOR_CONTROL_GROUP_SUFFIX;
}
String connectorName = connectorName();
Preconditions.checkNotNull(connectorName, "Connector name cannot be null");
return DEFAULT_CONTROL_GROUP_PREFIX + connectorName;
return DEFAULT_CONTROL_GROUP_PREFIX + connectorName + DEFAULT_COORDINATOR_CONTROL_GROUP_SUFFIX;
}

public String connectGroupId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public abstract class Channel {
private static final Logger LOG = LoggerFactory.getLogger(Channel.class);

private final String controlTopic;
private final String groupId;
private final String connectGroupId;
private final Producer<String, byte[]> producer;
private final Consumer<String, byte[]> consumer;
private final Admin admin;
Expand All @@ -65,7 +65,7 @@ public Channel(
IcebergSinkConfig config,
KafkaClientFactory clientFactory) {
this.controlTopic = config.controlTopic();
this.groupId = config.controlGroupId();
this.connectGroupId = config.connectGroupId();

String transactionalId = name + config.transactionalSuffix();
Pair<UUID, Producer<String, byte[]>> pair = clientFactory.createProducer(transactionalId);
Expand Down Expand Up @@ -130,7 +130,7 @@ record -> {

Event event = eventDecoder.decode(record.value());
if (event != null) {
if (event.groupId().equals(groupId)) {
if (event.groupId().equals(connectGroupId)) {
LOG.debug("Received event of type: {}", event.type().name());
if (receiveFn.apply(new Envelope(event, record.partition(), record.offset()))) {
LOG.debug("Handled event of type: {}", event.type().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkTaskContext;
Expand Down Expand Up @@ -94,7 +93,7 @@ private CommitterImpl(

// The source-of-truth for source-topic offsets is the control-group-id
Map<TopicPartition, Long> stableConsumerOffsets =
fetchStableConsumerOffsets(config.controlGroupId());
fetchStableConsumerOffsets(config.connectGroupId());
// Rewind kafka connect consumer to avoid duplicates
context.offset(stableConsumerOffsets);

Expand Down Expand Up @@ -148,7 +147,7 @@ private void sendCommitResponse(UUID commitId, CommittableSupplier committableSu
writerResult -> {
Event commitResponse =
new Event(
config.controlGroupId(),
config.connectGroupId(),
new DataWritten(
writerResult.partitionStruct(),
commitId,
Expand Down Expand Up @@ -178,13 +177,12 @@ private void sendCommitResponse(UUID commitId, CommittableSupplier committableSu

Event commitReady =
new Event(
config.controlGroupId(),
config.connectGroupId(),
new DataComplete(commitId, assignments));
events.add(commitReady);

Map<TopicPartition, Offset> offsets = committable.offsetsByTopicPartition();
send(events, offsets, new ConsumerGroupMetadata(config.controlGroupId()));
send(ImmutableList.of(), offsets, new ConsumerGroupMetadata(config.connectGroupId()));
send(events, offsets, KafkaUtils.getConsumerGroupMetadata(context, config.connectGroupId()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ public Coordinator(
Collection<MemberDescription> members,
KafkaClientFactory clientFactory) {
// pass consumer group ID to which we commit low watermark offsets
super("coordinator", config.controlGroupId() + "-coord", config, clientFactory);
super("coordinator", config.controlCoordinatorGroupId(), config, clientFactory);

this.catalog = catalog;
this.config = config;
this.totalPartitionCount =
members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum();
this.snapshotOffsetsProp =
String.format(OFFSETS_SNAPSHOT_PROP_FMT, config.controlTopic(), config.controlGroupId());
String.format(OFFSETS_SNAPSHOT_PROP_FMT, config.controlTopic(), config.connectGroupId());
this.exec = ThreadPools.newWorkerPool("iceberg-committer", config.commitThreads());
this.commitState = new CommitState(config);

Expand All @@ -99,7 +99,7 @@ public void process() {
commitState.startNewCommit();
LOG.info("Started new commit with commit-id={}", commitState.currentCommitId().toString());
Event event =
new Event(config.controlGroupId(), new StartCommit(commitState.currentCommitId()));
new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId()));
send(event);
LOG.info("Sent workers commit trigger with commit-id={}", commitState.currentCommitId().toString());

Expand Down Expand Up @@ -157,7 +157,7 @@ private void doCommit(boolean partialCommit) {
commitState.clearResponses();

Event event =
new Event(config.controlGroupId(), new CommitComplete(commitState.currentCommitId(), vtts));
new Event(config.connectGroupId(), new CommitComplete(commitState.currentCommitId(), vtts));
send(event);

LOG.info(
Expand Down Expand Up @@ -259,7 +259,7 @@ private void commitToTable(
Long snapshotId = latestSnapshot(table, branch.orElse(null)).snapshotId();
Event event =
new Event(
config.controlGroupId(),
config.connectGroupId(),
new CommitToTable(
commitState.currentCommitId(),
TableReference.of(config.catalogName(), tableIdentifier),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@
package io.tabular.iceberg.connect.channel;

import java.util.concurrent.ExecutionException;
import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkTaskContext;

public class KafkaUtils {

private static final String CONTEXT_CLASS_NAME =
"org.apache.kafka.connect.runtime.WorkerSinkTaskContext";

public static ConsumerGroupDescription consumerGroupDescription(
String consumerGroupId, Admin admin) {
try {
Expand All @@ -40,5 +47,16 @@ public static ConsumerGroupDescription consumerGroupDescription(
}
}

@SuppressWarnings("unchecked")
public static ConsumerGroupMetadata getConsumerGroupMetadata(
SinkTaskContext context, String connectGroupId) {
if (CONTEXT_CLASS_NAME.equals(context.getClass().getName())) {
return ((Consumer<byte[], byte[]>)
DynFields.builder().hiddenImpl(CONTEXT_CLASS_NAME, "consumer").build(context).get())
.groupMetadata();
}
return new ConsumerGroupMetadata(connectGroupId);
}

private KafkaUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class ChannelTestBase {
protected static final String SRC_TOPIC_NAME = "src-topic";
protected static final String CTL_TOPIC_NAME = "ctl-topic";
protected static final TopicPartition CTL_TOPIC_PARTITION = new TopicPartition(CTL_TOPIC_NAME, 0);
protected static final String CONTROL_CONSUMER_GROUP_ID = "cg-connector";
protected static final String CONNECT_CONSUMER_GROUP_ID = "cg-connector";
protected InMemoryCatalog catalog;
protected Table table;
protected IcebergSinkConfig config;
Expand Down Expand Up @@ -84,7 +84,7 @@ private InMemoryCatalog initInMemoryCatalog() {

protected static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id";
protected static final String OFFSETS_SNAPSHOT_PROP =
String.format("kafka.connect.offsets.%s.%s", CTL_TOPIC_NAME, CONTROL_CONSUMER_GROUP_ID);
String.format("kafka.connect.offsets.%s.%s", CTL_TOPIC_NAME, CONNECT_CONSUMER_GROUP_ID);
protected static final String VTTS_SNAPSHOT_PROP = "kafka.connect.vtts";

@BeforeEach
Expand All @@ -97,7 +97,7 @@ public void before() {
config = mock(IcebergSinkConfig.class);
when(config.controlTopic()).thenReturn(CTL_TOPIC_NAME);
when(config.commitThreads()).thenReturn(1);
when(config.controlGroupId()).thenReturn(CONTROL_CONSUMER_GROUP_ID);
when(config.connectGroupId()).thenReturn(CONNECT_CONSUMER_GROUP_ID);
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.catalogName()).thenReturn("catalog");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ private OffsetDateTime offsetDateTime(Long ms) {

whenAdminListConsumerGroupOffsetsThenReturn(
ImmutableMap.of(
config.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L),
config.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L),
config.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 90L, SOURCE_TP1, 80L)));

try (CommitterImpl ignored =
Expand All @@ -320,7 +320,7 @@ public void testCommitShouldThrowExceptionIfCoordinatorIsTerminated() throws IOE

whenAdminListConsumerGroupOffsetsThenReturn(
ImmutableMap.of(
config.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L)));
CONFIG.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L)));

TerminatedCoordinatorThreadFactory coordinatorThreadFactory =
new TerminatedCoordinatorThreadFactory();
Expand Down Expand Up @@ -352,7 +352,7 @@ public void testCommitShouldDoNothingIfThereAreNoMessages() throws IOException {

whenAdminListConsumerGroupOffsetsThenReturn(
ImmutableMap.of(
CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L)));
CONFIG.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L)));

CommittableSupplier committableSupplier =
() -> {
Expand All @@ -379,7 +379,7 @@ public void testCommitShouldDoNothingIfThereIsNoCommitRequestMessage() throws IO

whenAdminListConsumerGroupOffsetsThenReturn(
ImmutableMap.of(
CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L)));
CONFIG.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L)));

CommittableSupplier committableSupplier =
() -> {
Expand All @@ -399,7 +399,7 @@ public void testCommitShouldDoNothingIfThereIsNoCommitRequestMessage() throws IO
UUID.randomUUID().toString(),
AvroUtil.encode(
new Event(
CONFIG.controlGroupId(),
CONFIG.connectGroupId(),
new CommitComplete(UUID.randomUUID(), offsetDateTime(100L))))));

committer.commit(committableSupplier);
Expand All @@ -418,7 +418,7 @@ public void testCommitShouldRespondToCommitRequest() throws IOException {

whenAdminListConsumerGroupOffsetsThenReturn(
ImmutableMap.of(
CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L)));
CONFIG.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L)));

List<DataFile> dataFiles = ImmutableList.of(createDataFile());
List<DeleteFile> deleteFiles = ImmutableList.of();
Expand All @@ -444,7 +444,7 @@ public void testCommitShouldRespondToCommitRequest() throws IOException {
UUID.randomUUID().toString(),
AvroUtil.encode(
new Event(
CONFIG.controlGroupId(),
CONFIG.connectGroupId(),
new StartCommit(commitId)))));

committer.commit(committableSupplier);
Expand All @@ -468,7 +468,7 @@ public void testCommitShouldRespondToCommitRequest() throws IOException {
Map<TopicPartition, OffsetAndMetadata> expectedConsumerOffset =
ImmutableMap.of(SOURCE_TP0, new OffsetAndMetadata(100L));
assertThat(producer.consumerGroupOffsetsHistory().get(0))
.isEqualTo(ImmutableMap.of(CONFIG.controlGroupId(), expectedConsumerOffset));
.isEqualTo(ImmutableMap.of(CONFIG.connectGroupId(), expectedConsumerOffset));
assertThat(producer.consumerGroupOffsetsHistory().get(1))
.isEqualTo(ImmutableMap.of(CONFIG.connectGroupId(), expectedConsumerOffset));
}
Expand All @@ -484,7 +484,7 @@ public void testCommitWhenCommittableIsEmpty() throws IOException {

whenAdminListConsumerGroupOffsetsThenReturn(
ImmutableMap.of(
CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L)));
CONFIG.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L)));

CommittableSupplier committableSupplier =
() -> new Committable(ImmutableMap.of(), ImmutableList.of());
Expand All @@ -502,7 +502,7 @@ public void testCommitWhenCommittableIsEmpty() throws IOException {
UUID.randomUUID().toString(),
AvroUtil.encode(
new Event(
CONFIG.controlGroupId(),
CONFIG.connectGroupId(),
new StartCommit(commitId)))));


Expand Down Expand Up @@ -536,7 +536,7 @@ public void testCommitShouldCommitOffsetsOnlyForPartitionsWeMadeProgressOn() thr

whenAdminListConsumerGroupOffsetsThenReturn(
ImmutableMap.of(
CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L)));
CONFIG.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L)));

List<DataFile> dataFiles = ImmutableList.of(createDataFile());
List<DeleteFile> deleteFiles = ImmutableList.of();
Expand All @@ -561,7 +561,7 @@ public void testCommitShouldCommitOffsetsOnlyForPartitionsWeMadeProgressOn() thr
UUID.randomUUID().toString(),
AvroUtil.encode(
new Event(
CONFIG.controlGroupId(),
CONFIG.connectGroupId(),
new StartCommit(commitId)))));

committer.commit(committableSupplier);
Expand All @@ -587,7 +587,7 @@ public void testCommitShouldCommitOffsetsOnlyForPartitionsWeMadeProgressOn() thr
Map<TopicPartition, OffsetAndMetadata> expectedConsumerOffset =
ImmutableMap.of(sourceTp1, new OffsetAndMetadata(100L));
assertThat(producer.consumerGroupOffsetsHistory().get(0))
.isEqualTo(ImmutableMap.of(CONFIG.controlGroupId(), expectedConsumerOffset));
.isEqualTo(ImmutableMap.of(CONFIG.connectGroupId(), expectedConsumerOffset));
assertThat(producer.consumerGroupOffsetsHistory().get(1))
.isEqualTo(ImmutableMap.of(CONFIG.connectGroupId(), expectedConsumerOffset));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void testShouldDeduplicateDataFilesBeforeAppending() {
currentCommitId -> {
Event commitResponse =
new Event(
config.controlGroupId(),
config.connectGroupId(),
new DataWritten(
StructType.of(),
currentCommitId,
Expand All @@ -188,7 +188,7 @@ public void testShouldDeduplicateDataFilesBeforeAppending() {
commitResponse,
commitResponse, // duplicate commit response
new Event(
config.controlGroupId(),
config.connectGroupId(),
new DataComplete(
currentCommitId,
ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts)))));
Expand Down Expand Up @@ -216,7 +216,7 @@ public void testShouldDeduplicateDeleteFilesBeforeAppending() {
currentCommitId -> {
Event duplicateCommitResponse =
new Event(
config.controlGroupId(),
config.connectGroupId(),
new DataWritten(
StructType.of(),
currentCommitId,
Expand All @@ -228,7 +228,7 @@ public void testShouldDeduplicateDeleteFilesBeforeAppending() {
duplicateCommitResponse,
duplicateCommitResponse, // duplicate commit response
new Event(
config.controlGroupId(),
config.connectGroupId(),
new DataComplete(
currentCommitId,
ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts)))));
Expand Down Expand Up @@ -338,7 +338,7 @@ public void testCommitMultiPartitionSpecAppendDataFiles() {
"key",
AvroUtil.encode(
new Event(
config.controlGroupId(),
config.connectGroupId(),
new DataWritten(
spec.partitionType(),
commitId,
Expand All @@ -355,7 +355,7 @@ public void testCommitMultiPartitionSpecAppendDataFiles() {
"key",
AvroUtil.encode(
new Event(
config.controlGroupId(),
config.connectGroupId(),
new DataComplete(
commitId,
ImmutableList.of(
Expand Down Expand Up @@ -433,7 +433,7 @@ private UUID coordinatorTest(
currentCommitId -> {
Event commitResponse =
new Event(
config.controlGroupId(),
config.connectGroupId(),
new DataWritten(
StructType.of(),
currentCommitId,
Expand All @@ -443,7 +443,7 @@ private UUID coordinatorTest(

Event commitReady =
new Event(
config.controlGroupId(),
config.connectGroupId(),
new DataComplete(
currentCommitId,
ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts))));
Expand Down