Skip to content

Commit 01ad4dd

Browse files
committed
[Fix][Connector-V2] Fix issue of out-of-order fields in cdc
1 parent 98c22ea commit 01ad4dd

File tree

1 file changed

+29
-15
lines changed
  • seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql

1 file changed

+29
-15
lines changed

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/AbstractMysqlCDCITBase.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,6 @@ public void testMysqlCdcCheckDataWithNoPrimaryKey(TestContainer container) {
300300
public void testMysqlCdcWithColumnIncludeList(TestContainer container) {
301301
// Clear related content to ensure that multiple operations are not affected
302302
clearTable(MYSQL_DATABASE, SOURCE_TABLE_1);
303-
clearTable(MYSQL_DATABASE, "mysql_cdc_e2e_sink_table_column_include");
304303

305304
CompletableFuture.supplyAsync(
306305
() -> {
@@ -315,20 +314,26 @@ public void testMysqlCdcWithColumnIncludeList(TestContainer container) {
315314
await().atMost(60000, TimeUnit.MILLISECONDS)
316315
.untilAsserted(
317316
() -> {
318-
log.info(
319-
query(
320-
getColumnIncludeQuerySQL(
321-
MYSQL_DATABASE,
322-
"mysql_cdc_e2e_sink_table_column_include"))
323-
.toString());
324-
Assertions.assertIterableEquals(
325-
query(
326-
getColumnIncludeSourceQuerySQL(
327-
MYSQL_DATABASE, SOURCE_TABLE_1)),
328-
query(
329-
getColumnIncludeQuerySQL(
330-
MYSQL_DATABASE,
331-
"mysql_cdc_e2e_sink_table_column_include")));
317+
// Check if table exists first
318+
if (tableExists(
319+
MYSQL_DATABASE, "mysql_cdc_e2e_sink_table_column_include")) {
320+
log.info(
321+
query(
322+
getColumnIncludeQuerySQL(
323+
MYSQL_DATABASE,
324+
"mysql_cdc_e2e_sink_table_column_include"))
325+
.toString());
326+
Assertions.assertIterableEquals(
327+
query(
328+
getColumnIncludeSourceQuerySQL(
329+
MYSQL_DATABASE, SOURCE_TABLE_1)),
330+
query(
331+
getColumnIncludeQuerySQL(
332+
MYSQL_DATABASE,
333+
"mysql_cdc_e2e_sink_table_column_include")));
334+
} else {
335+
Assertions.fail("Sink table not created yet");
336+
}
332337
});
333338

334339
// insert update delete
@@ -870,4 +875,13 @@ private void upsertDeleteSourceTableColumnInclude(String database, String tableN
870875
executeSql("DELETE FROM " + database + "." + tableName + " where id = 2");
871876
executeSql("UPDATE " + database + "." + tableName + " SET f_bigint = 10000 where id = 3");
872877
}
878+
879+
private boolean tableExists(String database, String tableName) {
880+
try {
881+
executeSql("SELECT 1 FROM " + database + "." + tableName + " LIMIT 1");
882+
return true;
883+
} catch (Exception e) {
884+
return false;
885+
}
886+
}
873887
}

0 commit comments

Comments
 (0)