diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java index e538c325d78..08d8ffd936f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java @@ -22,6 +22,7 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; +import java.math.BigInteger; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -76,6 +77,18 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) }); } + // PreparedStatement#setObject method will be converted to long type when handling bigint + // unsigned, which poses a data overflow issue. + // Therefore, we need to handle the overflow issue by converting the long value to BigInteger. + public static void setSafeObject(PreparedStatement ps, int parameterIndex, Object value) + throws SQLException { + if (value instanceof Long && (Long) value < 0L) { + ps.setObject(parameterIndex, new BigInteger(Long.toUnsignedString((Long) value))); + return; + } + ps.setObject(parameterIndex, value); + } + public static Object queryMin( JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) throws SQLException { @@ -85,7 +98,7 @@ public static Object queryMin( quote(columnName), quote(tableId), quote(columnName)); return jdbc.prepareQueryAndMap( minQuery, - ps -> ps.setObject(1, excludedLowerBound), + ps -> setSafeObject(ps, 1, excludedLowerBound), rs -> { if (!rs.next()) { // this should never happen @@ -118,7 +131,7 @@ public static Object queryNextChunkMax( chunkSize); return jdbc.prepareQueryAndMap( query, - ps -> ps.setObject(1, includedLowerBound), + ps -> setSafeObject(ps, 1, includedLowerBound), rs -> { if (!rs.next()) { // this should never happen @@ -204,18 +217,18 @@ public static PreparedStatement readTableSplitDataStatement( } if (isFirstSplit) { for (int i = 0; i < primaryKeyNum; i++) { - statement.setObject(i + 1, splitEnd[i]); - statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]); + setSafeObject(statement, i + 1, splitEnd[i]); + setSafeObject(statement, i + 1 + primaryKeyNum, splitEnd[i]); } } else if (isLastSplit) { for (int i = 0; i < primaryKeyNum; i++) { - statement.setObject(i + 1, splitStart[i]); + setSafeObject(statement, i + 1, splitStart[i]); } } else { for (int i = 0; i < primaryKeyNum; i++) { - statement.setObject(i + 1, splitStart[i]); - statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]); - statement.setObject(i + 1 + 2 * primaryKeyNum, splitEnd[i]); + setSafeObject(statement, i + 1, splitStart[i]); + setSafeObject(statement, i + 1 + primaryKeyNum, splitEnd[i]); + setSafeObject(statement, i + 1 + 2 * primaryKeyNum, splitEnd[i]); } } return statement; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java index 3c7f9d141ea..a5a9eb9d912 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -56,6 +56,7 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.DataType; @@ -1447,6 +1448,98 @@ private boolean hasNextData(final CloseableIterator iterator) } } + @Test + void testUnsignedBigintPrimaryKeyChunking() throws Exception { + customDatabase.createAndInitialize(); + + String db = customDatabase.getDatabaseName(); + String table = "unsigned_bigint_pk"; + try (MySqlConnection connection = getConnection()) { + connection.setAutoCommit(false); + String createSql = + String.format( + "CREATE TABLE %s.%s (\n" + + " `order_id` BIGINT UNSIGNED NOT NULL,\n" + + " `desc` VARCHAR(512) NOT NULL,\n" + + " PRIMARY KEY (`order_id`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8;", + StatementUtils.quote(db), StatementUtils.quote(table)); + // Insert sample data including values near UNSIGNED BIGINT max + String insertSql = + String.format( + "INSERT INTO %s.%s (`order_id`, `desc`) VALUES " + + "(1, 'flink'),(2, 'flink'),(3, 'flink'),(4, 'flink'),(5, 'flink')," + + "(6, 'flink'),(7, 'flink'),(8, 'flink'),(9, 'flink'),(10, 'flink')," + + "(11, 'flink'),(12, 'flink')," + + "(18446744073709551604, 'flink'),(18446744073709551605, 'flink')," + + "(18446744073709551606, 'flink'),(18446744073709551607, 'flink')," + + "(18446744073709551608, 'flink'),(18446744073709551609, 'flink')," + + "(18446744073709551610, 'flink'),(18446744073709551611, 'flink')," + + "(18446744073709551612, 'flink'),(18446744073709551613, 'flink')," + + "(18446744073709551614, 'flink'),(18446744073709551615, 'flink');", + StatementUtils.quote(db), StatementUtils.quote(table)); + // Drop if exists to be idempotent across runs, then create and insert + connection.execute( + String.format( + "DROP TABLE IF EXISTS %s.%s;", + StatementUtils.quote(db), StatementUtils.quote(table)), + createSql, + insertSql); + connection.commit(); + } + + // Build a source reading only the unsigned_bigint_pk table + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("order_id", DataTypes.DECIMAL(20, 0)), + DataTypes.FIELD("desc", DataTypes.STRING())); + LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType); + InternalTypeInfo typeInfo = InternalTypeInfo.of(logicalType); + RowDataDebeziumDeserializeSchema deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType((RowType) dataType.getLogicalType()) + .setResultTypeInfo(typeInfo) + .build(); + + MySqlSource source = + MySqlSource.builder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .username(customDatabase.getUsername()) + .password(customDatabase.getPassword()) + .serverTimeZone("UTC") + .databaseList(db) + .tableList(db + "." + table) + .deserializer(deserializer) + .startupOptions(StartupOptions.initial()) + .chunkKeyColumn(new ObjectPath(db, table), "order_id") + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + try (CloseableIterator it = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source") + .executeAndCollect()) { + // Expect 24 records as inserted above + List result = fetchRowData(it, 24, this::stringifyUnsignedPkRow); + // Validate a couple of boundary values exist to ensure chunking across unsigned range + // works + assertThat(result) + .contains( + "+I[1, flink]", + "+I[12, flink]", + "+I[18446744073709551604, flink]", + "+I[18446744073709551615, flink]"); + } + } + + private String stringifyUnsignedPkRow(RowData row) { + DecimalData decimal = row.getDecimal(0, 20, 0); + String orderId = decimal.toBigDecimal().toPlainString(); + String desc = row.getString(1).toString(); + return "+I[" + orderId + ", " + desc + "]"; + } + /** * A {@link DebeziumDeserializationSchema} implementation which sleep given milliseconds after * deserialize per record, this class is designed for test. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtilsTest.java new file mode 100644 index 00000000000..ab817a0b2f5 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtilsTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.utils; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Proxy; +import java.math.BigInteger; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for {@link org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils}. */ +class StatementUtilsTest { + + @Test + void testSetSafeObjectCorrectlyHandlesOverflow() throws SQLException { + Map invocationDetails = new HashMap<>(); + PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails); + + long overflowValue = Long.MAX_VALUE + 1L; + BigInteger expectedBigInteger = new BigInteger(Long.toUnsignedString(overflowValue)); + + // Use the safe method + StatementUtils.setSafeObject(psProxy, 1, overflowValue); + + // Assert that it correctly used setObject with the converted BigInteger value + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isInstanceOf(BigInteger.class); + assertThat(invocationDetails.get("value")).isEqualTo(expectedBigInteger); + } + + @Test + void testDirectSetObjectFailsOnOverflow() throws SQLException { + Map invocationDetails = new HashMap<>(); + PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails); + + long overflowValue = Long.MAX_VALUE + 1L; + + // Directly call the unsafe method on the proxy + psProxy.setObject(1, overflowValue); + + // Assert that it incorrectly used setObject, preserving the wrong negative long value + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isInstanceOf(Long.class); + assertThat(invocationDetails.get("value")).isEqualTo(Long.MIN_VALUE); + } + + @Test + void testSetSafeObjectHandlesRegularValues() throws SQLException { + Map invocationDetails = new HashMap<>(); + PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails); + + // Test with a common Long + StatementUtils.setSafeObject(psProxy, 1, 123L); + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isEqualTo(123L); + invocationDetails.clear(); + + // Test with a String + StatementUtils.setSafeObject(psProxy, 2, "test"); + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isEqualTo("test"); + invocationDetails.clear(); + + // Test with null + StatementUtils.setSafeObject(psProxy, 3, null); + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isNull(); + invocationDetails.clear(); + } + + private PreparedStatement createPreparedStatementProxy(Map invocationDetails) { + return (PreparedStatement) + Proxy.newProxyInstance( + StatementUtilsTest.class.getClassLoader(), + new Class[] {PreparedStatement.class}, + (proxy, method, args) -> { + String methodName = method.getName(); + if (methodName.equals("setObject")) { + invocationDetails.put("methodName", methodName); + invocationDetails.put("parameterIndex", args[0]); + invocationDetails.put("value", args[1]); + } + return null; + }); + } +}