Skip to content

Commit 066bb79

Browse files
some-logging-improvements (#262)
* some-logging-improvements - log during table creation failures - some additional logging at critical states between coordinator/worker during commit phase * comments
1 parent e942a59 commit 066bb79

File tree

4 files changed

+54
-45
lines changed

4 files changed

+54
-45
lines changed

kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,9 @@ public boolean isCommitTimedOut() {
106106
return false;
107107
}
108108

109-
if (System.currentTimeMillis() - startTime > config.commitTimeoutMs()) {
110-
LOG.info("Commit timeout reached");
109+
long currentTime = System.currentTimeMillis();
110+
if (currentTime - startTime > config.commitTimeoutMs()) {
111+
LOG.info("Commit timeout reached. Now: {}, start: {}, timeout: {}", currentTime, startTime, config.commitTimeoutMs());
111112
return true;
112113
}
113114
return false;
@@ -125,14 +126,14 @@ public boolean isCommitReady(int expectedPartitionCount) {
125126
.sum();
126127

127128
if (receivedPartitionCount >= expectedPartitionCount) {
128-
LOG.debug(
129+
LOG.info(
129130
"Commit {} ready, received responses for all {} partitions",
130131
currentCommitId,
131132
receivedPartitionCount);
132133
return true;
133134
}
134135

135-
LOG.debug(
136+
LOG.info(
136137
"Commit {} not ready, received responses for {} of {} partitions, waiting for more",
137138
currentCommitId,
138139
receivedPartitionCount,

kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,12 @@ public void process() {
9797
if (commitState.isCommitIntervalReached()) {
9898
// send out begin commit
9999
commitState.startNewCommit();
100+
LOG.info("Started new commit with commit-id={}", commitState.currentCommitId().toString());
100101
Event event =
101102
new Event(config.controlGroupId(), new StartCommit(commitState.currentCommitId()));
102103
send(event);
103-
LOG.debug("Started new commit with commit-id={}", commitState.currentCommitId().toString());
104+
LOG.info("Sent workers commit trigger with commit-id={}", commitState.currentCommitId().toString());
105+
104106
}
105107

106108
consumeAvailable(POLL_DURATION, this::receive);
@@ -127,6 +129,7 @@ private boolean receive(Envelope envelope) {
127129

128130
private void commit(boolean partialCommit) {
129131
try {
132+
LOG.info("Processing commit after responses for {}, isPartialCommit {}",commitState.currentCommitId(), partialCommit);
130133
doCommit(partialCommit);
131134
} catch (Exception e) {
132135
LOG.warn("Commit failed, will try again next cycle", e);

kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ private void routeRecordStatically(SinkRecord record) {
134134

135135
private void routeRecordDynamically(SinkRecord record) {
136136
String routeField = config.tablesRouteField();
137-
Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing");
137+
Preconditions.checkNotNull(routeField, String.format("Route field cannot be null with dynamic routing at topic: %s, partition: %d, offset: %d", record.topic(), record.kafkaPartition(), record.kafkaOffset()));
138138

139139
String routeValue = extractRouteValue(record.value(), routeField);
140140
if (routeValue != null) {

kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java

Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) {
4747
}
4848

4949
public RecordWriter createWriter(
50-
String tableName, SinkRecord sample, boolean ignoreMissingTable) {
50+
String tableName, SinkRecord sample, boolean ignoreMissingTable) {
5151
TableIdentifier identifier = TableIdentifier.parse(tableName);
5252
Table table;
5353
try {
@@ -67,47 +67,52 @@ public RecordWriter createWriter(
6767

6868
@VisibleForTesting
6969
Table autoCreateTable(String tableName, SinkRecord sample) {
70-
StructType structType;
71-
if (sample.valueSchema() == null) {
72-
structType =
73-
SchemaUtils.inferIcebergType(sample.value(), config)
74-
.orElseThrow(() -> new DataException("Unable to create table from empty object"))
75-
.asStructType();
76-
} else {
77-
structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType();
78-
}
70+
try {
71+
StructType structType;
72+
if (sample.valueSchema() == null) {
73+
structType =
74+
SchemaUtils.inferIcebergType(sample.value(), config)
75+
.orElseThrow(() -> new DataException("Unable to create table from empty object"))
76+
.asStructType();
77+
} else {
78+
structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType();
79+
}
7980

80-
org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields());
81-
TableIdentifier identifier = TableIdentifier.parse(tableName);
81+
org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields());
82+
TableIdentifier identifier = TableIdentifier.parse(tableName);
8283

83-
List<String> partitionBy = config.tableConfig(tableName).partitionBy();
84-
PartitionSpec spec;
85-
try {
86-
spec = SchemaUtils.createPartitionSpec(schema, partitionBy);
84+
List<String> partitionBy = config.tableConfig(tableName).partitionBy();
85+
PartitionSpec spec;
86+
try {
87+
spec = SchemaUtils.createPartitionSpec(schema, partitionBy);
88+
} catch (Exception e) {
89+
LOG.error(
90+
"Unable to create partition spec {}, table {} will be unpartitioned",
91+
partitionBy,
92+
identifier,
93+
e);
94+
spec = PartitionSpec.unpartitioned();
95+
}
96+
97+
PartitionSpec partitionSpec = spec;
98+
AtomicReference<Table> result = new AtomicReference<>();
99+
Tasks.range(1)
100+
.retry(IcebergSinkConfig.CREATE_TABLE_RETRIES)
101+
.run(
102+
notUsed -> {
103+
try {
104+
result.set(catalog.loadTable(identifier));
105+
} catch (NoSuchTableException e) {
106+
result.set(
107+
catalog.createTable(
108+
identifier, schema, partitionSpec, config.autoCreateProps()));
109+
LOG.info("Created new table {} from record at topic: {}, partition: {}, offset: {}", identifier, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset());
110+
}
111+
});
112+
return result.get();
87113
} catch (Exception e) {
88-
LOG.error(
89-
"Unable to create partition spec {}, table {} will be unpartitioned",
90-
partitionBy,
91-
identifier,
92-
e);
93-
spec = PartitionSpec.unpartitioned();
114+
LOG.error("Error creating new table {} from record at topic: {}, partition: {}, offset: {}", tableName, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset());
115+
throw e;
94116
}
95-
96-
PartitionSpec partitionSpec = spec;
97-
AtomicReference<Table> result = new AtomicReference<>();
98-
Tasks.range(1)
99-
.retry(IcebergSinkConfig.CREATE_TABLE_RETRIES)
100-
.run(
101-
notUsed -> {
102-
try {
103-
result.set(catalog.loadTable(identifier));
104-
} catch (NoSuchTableException e) {
105-
result.set(
106-
catalog.createTable(
107-
identifier, schema, partitionSpec, config.autoCreateProps()));
108-
LOG.info("Created new table {} from record at topic: {}, partition: {}, offset: {}", identifier, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset());
109-
}
110-
});
111-
return result.get();
112117
}
113118
}

0 commit comments

Comments
 (0)