Skip to content

Conversation

suhwan-cheon
Copy link
Contributor

@suhwan-cheon suhwan-cheon commented Sep 14, 2025

issue: https://issues.apache.org/jira/browse/FLINK-38247

Issue

An infinite loop occurred when using the MySqlChunkSplitter to split a table with a MySQL BIGINT UNSIGNED primary key. (This problem happens when the primary key value exceeds Long.MAX_VALUE)


Solution

I added StatementUtils.setSafeObject to detects the overflow and correctly converts the value to a BigDecimal before setting it in the PreparedStatement.


Verification

  • I added StatementUtilsTest to verify the correctness of StatementUtils.setSafeObject.
  • I used Java's dynamic proxy (createPreparedStatementProxy) to avoid creating a verbose mock class, simplifying the test code.

@lvyanquan
Copy link
Contributor

Please add an itcase in MySqlSourceITCase.

// Therefore, we need to handle the overflow issue by converting the long value to BigDecimal.
public static void setSafeObject(PreparedStatement ps, int parameterIndex, Object value)
throws SQLException {
if (value instanceof Long && (Long) value < 0L) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be BigInteger type, you can verify this in itcase.

Copy link
Contributor

@lvyanquan lvyanquan Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example of sql lists for possible test case:



CREATE TABLE `unsigned_bigint_pk`
--
(
`order_id`  BIGINT UNSIGNED NOT NULL,
`desc`  varchar(512) NOT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO unsigned_bigint_pk
VALUES (1,  'flink'),
(2,  'flink'),
(3,  'flink'),
(4,  'flink'),
(5,  'flink'),
(6,  'flink'),
(7,  'flink'),
(8,  'flink'),
(9,  'flink'),
(10,  'flink'),
(11,  'flink'),
(12,  'flink'),
(18446744073709551604,  'flink'),
(18446744073709551605,  'flink'),
(18446744073709551606,  'flink'),
(18446744073709551607,  'flink'),
(18446744073709551608,  'flink'),
(18446744073709551609,  'flink'),
(18446744073709551610,  'flink'),
(18446744073709551611,  'flink'),
(18446744073709551612,  'flink'),
(18446744073709551613,  'flink'),
(18446744073709551614,  'flink'),
(18446744073709551615,  'flink');


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lvyanquan
Hello! I added itcase in MySqlSourceITCase.

I think JDBC may surface BIGINT UNSIGNED as Long (values > Long.MAX_VALUE appear negative due to two’s complement)
So In this code, 1) detect negative Longs 2) bind them as BigInteger, ensuring values near 2^64−1 are handled correctly.
I Added an IT in MySqlSourceITCase that creates unsigned_bigint_pk and verifies boundary values. And I saw that the test worked well.

@lvyanquan
Copy link
Contributor

Run 'mvn spotless:apply' to fix these violations.

@suhwan-cheon
Copy link
Contributor Author

Run 'mvn spotless:apply' to fix these violations.

I've applied it. Can you run the test again?

@suhwan-cheon
Copy link
Contributor Author

there's a Checkstyle error. I'm not used to this environment, so I made a mistake.
I've fixed Checkstyle error and ran the MySQL connector unit tests with mvn -pl flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc -DskipITs=true test
please review.

.tableList(db + "." + table)
.deserializer(deserializer)
.startupOptions(StartupOptions.initial())
.chunkKeyColumn(new ObjectPath(db, table), "order_id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You add .splitSize(2) here and you can set rootLogger.level in log4j2-test.properties to INFO to see the actual split information.
like:


gners.MySqlChunkSplitter - ChunkSplitter has split 2820 chunks for table customer_kgqlle.unsigned_bigint_pk
72864 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 2
72869 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 3
72871 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 4
72875 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 5
72878 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 6
72879 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 7
72881 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 8
72882 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 9
72884 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 10
72885 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 11
72990 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - ChunkSplitter has split 2830 chunks for table customer_kgqlle.unsigned_bigint_pk
72990 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 12
72992 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 18446744073709551604

Which means that this problem was not resolved.

@lvyanquan lvyanquan added this to the V3.6.0 milestone Sep 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants