-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Open
Labels
BugSomething isn't workingSomething isn't workingWaiting for replyWaiting for replyWaiting for reply
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
pg数据库整库同步到doris,无法同步物理删除事件的数据
What you expected to happen
希望能够同步物理删除的数据。
How to reproduce
SET 'table.exec.state.ttl' = '1h';
EXECUTE CDCSOURCE demo_doris
WITH
(
'connector' = 'postgres-cdc',
'hostname' = '192.168.0.37',
'port' = '5432',
'database-name' = 'db',
'schema-name' = 'public',
'username' = 'root',
'password' = 'root',
'checkpoint' = '600000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'public\.sys_dict_data2',
'debezium.decimal.handling.mode' = 'string',
'source.decoding.plugin.name' = 'pgoutput',
'source.include.schema.changes' = 'true',
'debezium.slot.name' = 'slot_sys_4',
'sink.url' = 'jdbc:mysql://192.168.0.223:9030/ods_pg',
'sink.auto.create' = 'true',
'sink.connector' = 'doris',
'sink.fenodes' = '192.168.0.223:8030',
'sink.username' = 'root',
'sink.password' = 'root',
'sink.doris.batch.size' = '1000',
'sink.sink.max-retries' = '1',
'sink.sink.db' = 'ods_pg',
'sink.sink.properties.format' = 'json',
'sink.sink.properties.table.create' = 'true',
'sink.sink.properties.table.create.properties.light_schema_change' = 'true',
'sink.sink.properties.read_json_by_line' = 'true',
-- 'sink.sink.properties.timezone' = 'Etc/UTC',
'sink.sink.enable-delete' = 'true',
'sink.table.identifier' = '#{schemaName}.#{tableName}',
'sink.sink.label-prefix' = '#{schemaName}_#{tableName}_19'
);
整库同步的代码如上,目前只测试了一个表。插入和更新都可以同步,删除无法同步。pg表的复制级别是default
初步排查了一下代码,
在org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#processElement 方法中,判断delete事件,会回撤,
public void processElement(StreamRecord<RowData> element) throws Exception {
final RowData row = element.getValue();
List<RowData> values = state.value();
if (values == null) {
values = new ArrayList<>(2);
}
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
addRow(values, row);
break;
case UPDATE_BEFORE:
case DELETE:
retractRow(values, row);
break;
}
}
retractRow方法中,获取一个下标,如果等于-1, 则会直接返回了,并打印一个状态ttl的日志,我在同步的任务中加了SET 'table.exec.state.ttl' = '1h';
并没有效果,根本原因不是这个。
private void retractRow(List<RowData> values, RowData retract) throws IOException {
final int lastIndex = values.size() - 1;
final int index = findFirst(values, retract);
if (index == -1) {
LOG.info(STATE_CLEARED_WARN_MSG);
return;
} else {
// Remove first found row
values.remove(index);
}
if (values.isEmpty()) {
// Delete this row
retract.setRowKind(DELETE);
collector.collect(retract);
} else if (index == lastIndex) {
// Last row has been removed, update to the second last one
final RowData latestRow = values.get(values.size() - 1);
latestRow.setRowKind(UPDATE_AFTER);
collector.collect(latestRow);
}
if (values.isEmpty()) {
state.clear();
} else {
state.update(values);
}
}
获取下标的方法会判断有没有主键,两种情况处理的逻辑不一样。实例运行情况走了没有主键的情况,只能在把pg表的复制级别改为full 才能同步删除的数据,default不能。
private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) {
newRow.setRowKind(oldRow.getRowKind());
if (hasUpsertKey) {
return equaliser.equals(
upsertKeyProjectedRow1.replaceRow(newRow),
upsertKeyProjectedRow2.replaceRow(oldRow));
}
return equaliser.equals(newRow, oldRow);
}
调试了不少时间,初步发现在创建源表视图的时候主键丢失了,表实际是有主键的。方法org.dinky.cdc.sql.SQLSinkBuilder#addSourceTableView
private String addSourceTableView(DataStream<Row> rowDataDataStream, Table table) {
// Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_
String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline());
customTableEnvironment.createTemporaryView(
viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));
logger.info("Create {} temporaryView successful...", viewName);
return viewName;
}
咱是初学者,问了一下ai, 从datastream转换为table的时候需要手动指定主键,并不会从datastream中去推断主键。
刚学习flink,不知道是不是这个原因导致的无法删除数据。还希望大神们帮忙排查一下。
Anything else
No response
Version
1.2.0
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
BugSomething isn't workingSomething isn't workingWaiting for replyWaiting for replyWaiting for reply