Skip to content

[Bug] [Module Name] The entire library of the pg database is synchronized to doris, and the data of physical deleted events cannot be synchronized #4486

@qzztf

Description

@qzztf

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

pg数据库整库同步到doris,无法同步物理删除事件的数据

What you expected to happen

希望能够同步物理删除的数据。

How to reproduce

SET 'table.exec.state.ttl' = '1h';
EXECUTE CDCSOURCE demo_doris
WITH
  (
    'connector' = 'postgres-cdc',
    'hostname' = '192.168.0.37',
    'port' = '5432',
    'database-name' = 'db',
    'schema-name' = 'public',
    'username' = 'root',
    'password' = 'root',
    'checkpoint' = '600000',
    'scan.startup.mode' = 'initial',
    'parallelism' = '1',
    'table-name' = 'public\.sys_dict_data2',
    'debezium.decimal.handling.mode' = 'string',
    'source.decoding.plugin.name' = 'pgoutput',
    'source.include.schema.changes' = 'true',
    'debezium.slot.name' = 'slot_sys_4',
    'sink.url' = 'jdbc:mysql://192.168.0.223:9030/ods_pg',
    'sink.auto.create' = 'true',
    'sink.connector' = 'doris',
    'sink.fenodes' = '192.168.0.223:8030',
    'sink.username' = 'root',
    'sink.password' = 'root',
    'sink.doris.batch.size' = '1000',
    'sink.sink.max-retries' = '1',
    'sink.sink.db' = 'ods_pg',
    'sink.sink.properties.format' = 'json',
    'sink.sink.properties.table.create' = 'true',
    'sink.sink.properties.table.create.properties.light_schema_change' = 'true',
    'sink.sink.properties.read_json_by_line' = 'true',
    --  'sink.sink.properties.timezone' = 'Etc/UTC',
    'sink.sink.enable-delete' = 'true',
    'sink.table.identifier' = '#{schemaName}.#{tableName}',
    'sink.sink.label-prefix' = '#{schemaName}_#{tableName}_19'
  );

整库同步的代码如上,目前只测试了一个表。插入和更新都可以同步,删除无法同步。pg表的复制级别是default
初步排查了一下代码,
在org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#processElement 方法中,判断delete事件,会回撤,

public void processElement(StreamRecord<RowData> element) throws Exception {
        final RowData row = element.getValue();
        List<RowData> values = state.value();
        if (values == null) {
            values = new ArrayList<>(2);
        }

        switch (row.getRowKind()) {
            case INSERT:
            case UPDATE_AFTER:
                addRow(values, row);
                break;

            case UPDATE_BEFORE:
            case DELETE:
                retractRow(values, row);
                break;
        }
    }

retractRow方法中,获取一个下标,如果等于-1, 则会直接返回了,并打印一个状态ttl的日志,我在同步的任务中加了SET 'table.exec.state.ttl' = '1h';
并没有效果,根本原因不是这个。

private void retractRow(List<RowData> values, RowData retract) throws IOException {
        final int lastIndex = values.size() - 1;
        final int index = findFirst(values, retract);
        if (index == -1) {
            LOG.info(STATE_CLEARED_WARN_MSG);
            return;
        } else {
            // Remove first found row
            values.remove(index);
        }
        if (values.isEmpty()) {
            // Delete this row
            retract.setRowKind(DELETE);
            collector.collect(retract);
        } else if (index == lastIndex) {
            // Last row has been removed, update to the second last one
            final RowData latestRow = values.get(values.size() - 1);
            latestRow.setRowKind(UPDATE_AFTER);
            collector.collect(latestRow);
        }

        if (values.isEmpty()) {
            state.clear();
        } else {
            state.update(values);
        }
    }

获取下标的方法会判断有没有主键,两种情况处理的逻辑不一样。实例运行情况走了没有主键的情况,只能在把pg表的复制级别改为full 才能同步删除的数据,default不能。

private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) {
        newRow.setRowKind(oldRow.getRowKind());
        if (hasUpsertKey) {
            return equaliser.equals(
                    upsertKeyProjectedRow1.replaceRow(newRow),
                    upsertKeyProjectedRow2.replaceRow(oldRow));
        }
        return equaliser.equals(newRow, oldRow);
    }

调试了不少时间,初步发现在创建源表视图的时候主键丢失了,表实际是有主键的。方法org.dinky.cdc.sql.SQLSinkBuilder#addSourceTableView

private String addSourceTableView(DataStream<Row> rowDataDataStream, Table table) {
        // Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_
        String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline());

        customTableEnvironment.createTemporaryView(
                viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));
        logger.info("Create {} temporaryView successful...", viewName);
        return viewName;
    }

咱是初学者,问了一下ai, 从datastream转换为table的时候需要手动指定主键,并不会从datastream中去推断主键。
刚学习flink,不知道是不是这个原因导致的无法删除数据。还希望大神们帮忙排查一下。

Anything else

No response

Version

1.2.0

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    BugSomething isn't workingWaiting for replyWaiting for reply

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions