Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Timer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

public class ProxySinkTask {
Expand Down Expand Up @@ -86,13 +87,24 @@ public void put(final Collection<SinkRecord> records) throws IOException, Execut
LOGGER.trace(String.format("Got %d records from put API.", records.size()));
ExecutionTimer processingTime = ExecutionTimer.start();

Function<Record, String> keyMapper;

if (clickHouseSinkConfig.isEnableDbTopicSplit()) {
// in this case topic == table name so it cannot be a grouping key because it will cause all records
// to be written to same database. Grouping by partition is not possible, too.
keyMapper = Record::getDatabase;
} else if (!clickHouseSinkConfig.isExactlyOnce() && clickHouseSinkConfig.isIgnorePartitionsWhenBatching()) {
keyMapper = Record::getTopic;
} else {
keyMapper = Record::getTopicAndPartition;
}

Map<String, List<Record>> dataRecords = records.stream()
.map(v -> Record.convert(v,
clickHouseSinkConfig.isEnableDbTopicSplit(),
clickHouseSinkConfig.getDbTopicSplitChar(),
clickHouseSinkConfig.getDatabase() ))
.collect(Collectors.groupingBy(!clickHouseSinkConfig.isExactlyOnce() && clickHouseSinkConfig.isIgnorePartitionsWhenBatching()
? Record::getTopic : Record::getTopicAndPartition));
.collect(Collectors.groupingBy(keyMapper));

statistics.recordProcessingTime(processingTime);
// TODO - Multi process???
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,11 @@ public Table describeTableV2(String database, String tableName) {
}
return table;
}
public List<Table> extractTablesMapping(String database, Map<String, Table> cache) {
public List<Table> extractTablesMapping(String tableDatabase, Map<String, Table> cache) {
List<Table> tableList = new ArrayList<>();
for (Table table : showTables(database) ) {
for (Table table : showTables(tableDatabase) ) {
// (Full) Table names are escaped in the cache
String escapedTableName = Utils.escapeTableName(database, table.getCleanName());
String escapedTableName = Utils.escapeTableName(tableDatabase, table.getCleanName());

// Read from cache if we already described this table before
// This means we won't pick up edited table configs until the connector is restarted
Expand All @@ -444,7 +444,7 @@ public List<Table> extractTablesMapping(String database, Map<String, Table> cach
continue;
}
}
Table tableDescribed = describeTable(this.database, table.getCleanName());
Table tableDescribed = describeTable(tableDatabase, table.getCleanName());
if (tableDescribed != null) {
tableDescribed.setNumColumns(table.getNumColumns());
tableList.add(tableDescribed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,4 +1100,45 @@ public void exactlyOnceStateMismatchTest(int split, int batch) {
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));assertTrue(ClickHouseTestHelpers.validateRows(chc, topic, sr));
Copy link

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing semicolon or line break between the two assertions. This should be split into two separate statements for proper formatting and readability.

Suggested change
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));assertTrue(ClickHouseTestHelpers.validateRows(chc, topic, sr));
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
assertTrue(ClickHouseTestHelpers.validateRows(chc, topic, sr));

Copilot uses AI. Check for mistakes.

}

@Test
public void splitDBTopicTest() throws Exception {
Map<String, String> props = createProps();
ClickHouseHelperClient chc = createClient(props);
String topic1 = "tenant_1__events";
String topic2 = "tenant_2__events";
ClickHouseTestHelpers.dropDatabase(chc, "tenant_1");
ClickHouseTestHelpers.dropDatabase(chc, "tenant_2");
ClickHouseTestHelpers.createDatabase(chc, "tenant_1");
ClickHouseTestHelpers.createDatabase(chc, "tenant_2");

ClickHouseTestHelpers.query(chc, "CREATE TABLE `tenant_1`.`events` (" +
"`off16` Int16," +
"`string` String" +
") Engine = MergeTree ORDER BY `off16`");
ClickHouseTestHelpers.query(chc, "CREATE TABLE `tenant_2`.`events` (" +
"`off16` Int16," +
"`string` String" +
") Engine = MergeTree ORDER BY `off16`");

Collection<SinkRecord> sr1 = SchemaTestData.createSimpleData(topic1, 1, 5);
Collection<SinkRecord> sr2 = SchemaTestData.createSimpleData(topic2, 1, 10);

List<SinkRecord> records = new ArrayList<>();
records.addAll(sr1);
records.addAll(sr2);
Collections.shuffle(records);

ClickHouseSinkTask chst = new ClickHouseSinkTask();
props.put(ClickHouseSinkConfig.DB_TOPIC_SPLIT_CHAR, "__");
props.put(ClickHouseSinkConfig.ENABLE_DB_TOPIC_SPLIT, "true");
props.put(ClickHouseSinkConfig.DATABASE, "tenant_1");
chst.start(props);
chst.put(records);
chst.stop();


assertEquals(sr1.size(), ClickHouseTestHelpers.countRows(chc, "events", "tenant_1"));
assertEquals(sr2.size(), ClickHouseTestHelpers.countRows(chc, "events", "tenant_2"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.clickhouse.kafka.connect.sink.data.convert;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.regex.Pattern;

import static org.junit.Assert.assertEquals;

class RecordConvertorTest {

@ParameterizedTest
@MethodSource("splitDBTopicProvider")
void splitDBTopic(String topic, String dbTopicSeparatorChar, String database) {

String[] parts = topic.split(Pattern.quote(dbTopicSeparatorChar));
String actualDatabase = parts[0];
String actualTopic = parts[1];
System.out.println("actual_topic: " + actualTopic);
Copy link

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using System.out.println for debugging in tests is not recommended. Consider using a proper logger or removing this debug statement entirely.

Suggested change
System.out.println("actual_topic: " + actualTopic);

Copilot uses AI. Check for mistakes.

assertEquals(database, actualDatabase);
}

static Object[][] splitDBTopicProvider() {
return new Object[][] {
{ "tenant_A__telemetry", "__", "tenant_A" },
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.client.api.query.Records;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;
Copy link

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is unused. The ClickHouseSinkConfig class is not referenced anywhere in this file.

Suggested change
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;

Copilot uses AI. Check for mistakes.
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseFieldDescriptor;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
import com.clickhouse.kafka.connect.sink.db.mapping.Type;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import jdk.dynalink.Operation;
Copy link

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is unused and appears to be accidentally added. The jdk.dynalink.Operation class is not used anywhere in this file.

Suggested change
import jdk.dynalink.Operation;

Copilot uses AI. Check for mistakes.
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.json.JSONObject;
Expand Down Expand Up @@ -65,13 +67,26 @@ public static boolean isCloud() {
}

public static void query(ClickHouseHelperClient chc, String query) {
if (chc.isUseClientV2()) {
chc.queryV2(query);
} else {
chc.queryV1(query);
try {
if (chc.isUseClientV2()) {
chc.queryV2(query).close();
} else {
chc.queryV1(query).close();
}
} catch (Exception e) {
LOGGER.error("Error while executing query", e);
throw new RuntimeException(e);
}
}

public static void createDatabase(ClickHouseHelperClient chc, String database) {
query(chc, "CREATE DATABASE " + database);
}

public static void dropDatabase(ClickHouseHelperClient chc, String database) {
query(chc, "DROP DATABASE IF EXISTS " + database);
}

public static OperationMetrics dropTable(ClickHouseHelperClient chc, String tableName) {
for (int i = 0; i < 5; i++) {
try {
Expand Down Expand Up @@ -185,7 +200,11 @@ public static OperationMetrics optimizeTable(ClickHouseHelperClient chc, String
}

public static int countRows(ClickHouseHelperClient chc, String tableName) {
String queryCount = String.format("SELECT COUNT(*) FROM `%s` SETTINGS select_sequential_consistency = 1", tableName);
return countRows(chc, tableName, chc.getDatabase());
}

public static int countRows(ClickHouseHelperClient chc, String tableName, String database) {
String queryCount = String.format("SELECT COUNT(*) FROM `%s`.`%s` SETTINGS select_sequential_consistency = 1", database, tableName);

try {
optimizeTable(chc, tableName);
Expand Down
Loading