[AURON #1851] Introduce Arrow to Flink RowData reader#2063
[AURON #1851] Introduce Arrow to Flink RowData reader#2063weiqingy wants to merge 8 commits intoapache:masterfrom
Conversation
ee74ab4 to
d53cf08
Compare
Add 7 ColumnVector wrapper classes for primitive types (Boolean, TinyInt, SmallInt, Int, BigInt, Float, Double) that implement Flink's ColumnVector sub-interfaces backed by Arrow FieldVectors for zero-copy columnar access.
…r wrappers Add VarChar, VarBinary, and Decimal wrappers. Decimal uses compact fromUnscaledLong path for precision <= 18 to avoid BigDecimal allocation, reading directly from Arrow's data buffer via ArrowBuf.getLong().
Add Date (epoch days), Time (micros->millis conversion), and Timestamp (micros->TimestampData with millis + nanoOfMillisecond) wrappers. Timestamp wrapper uses TimeStampVector parent to handle both TIMESTAMP and TIMESTAMP_LTZ Arrow vectors.
Add Array (ListVector), Map (MapVector), and Row (StructVector) wrappers with recursive child vector wrapping. Array/Map use offset buffers for element access; Row uses VectorizedColumnBatch with reusable ColumnarRowData.
FlinkArrowReader creates ColumnVector wrappers from VectorSchemaRoot + RowType, provides read(int) returning ColumnarRowData, getRowCount(), reset(VectorSchemaRoot) for batch reuse, and AutoCloseable. Dispatches LogicalTypeRoot to the correct wrapper via switch on all 17 supported types.
b3d5562 to
a1be6f6
Compare
|
@Tartarus0zm Could you please take a look at the PR when you have a chance? I split it into multiple commits and added a review_help.md file to make the review easier. Once the review is complete, I’ll remove the helper file. Thank you! |
|
@weiqingy thanks for your contribute, waiting for CI green |
21 test methods covering all 17 supported types with values and null handling, plus integration tests for multi-column batch, empty batch, reader reset, and unsupported type exception.
a1be6f6 to
ffcf6d6
Compare
Thank you @Tartarus0zm! I just pushed a fix - the CI failure was due to List.of() which requires Java 9+, while CI runs on JDK 8. Replaced with Collections.singletonList() in the test file. I force-pushed to keep the commit history clean for reviewers, so CI may need another workflow approval. Let me know if anything else is needed! |
There was a problem hiding this comment.
Pull request overview
This PR implements the Arrow-to-Flink RowData conversion layer (AURON #1851), the reverse of the Flink-to-Arrow writer from PR #1930. It enables native engine results (in Arrow format) to be read back into Flink's RowData for downstream operator processing, completing the second half of the Flink-native engine round-trip defined in AIP-1.
Changes:
- 16
ArrowXxxColumnVectorwrapper classes providing zero-copy delegation from ArrowFieldVectortypes to FlinkColumnVectorsub-interfaces FlinkArrowReaderorchestrator that maps Arrow fields to the correct column vector wrappers viaLogicalType, providingread(int),getRowCount(), andreset(VectorSchemaRoot)APIsFlinkArrowReaderTestwith 21 unit and integration tests covering all 17 supported types and batch-reset behavior
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
FlinkArrowReader.java |
Orchestrator: creates wrappers from VectorSchemaRoot + RowType, returns ColumnarRowData via read(), supports batch reset() |
ArrowBooleanColumnVector.java |
Wraps BitVector → BooleanColumnVector (!= 0 conversion) |
ArrowTinyIntColumnVector.java |
Wraps TinyIntVector → ByteColumnVector |
ArrowSmallIntColumnVector.java |
Wraps SmallIntVector → ShortColumnVector |
ArrowIntColumnVector.java |
Wraps IntVector → IntColumnVector |
ArrowBigIntColumnVector.java |
Wraps BigIntVector → LongColumnVector |
ArrowFloatColumnVector.java |
Wraps Float4Vector → FloatColumnVector |
ArrowDoubleColumnVector.java |
Wraps Float8Vector → DoubleColumnVector |
ArrowDecimalColumnVector.java |
Wraps DecimalVector → DecimalColumnVector with compact/wide path optimization |
ArrowVarCharColumnVector.java |
Wraps VarCharVector → BytesColumnVector |
ArrowVarBinaryColumnVector.java |
Wraps VarBinaryVector → BytesColumnVector |
ArrowDateColumnVector.java |
Wraps DateDayVector → IntColumnVector (epoch days) |
ArrowTimeColumnVector.java |
Wraps TimeMicroVector → IntColumnVector (micros÷1000) |
ArrowTimestampColumnVector.java |
Wraps TimeStampVector → TimestampColumnVector (micros→millis+nanos) |
ArrowArrayColumnVector.java |
Wraps ListVector → ArrayColumnVector using offset buffer |
ArrowMapColumnVector.java |
Wraps MapVector → MapColumnVector using key/value child vectors |
ArrowRowColumnVector.java |
Wraps StructVector → RowColumnVector with recursive child vectors |
FlinkArrowReaderTest.java |
21 tests covering all 17 types, nulls, edge cases, multi-column, reset, and unsupported types |
AURON-1851-DESIGN.md |
Design document covering motivation, approach comparison, type table, and Q&A |
docs/reviewhelper/01-06-*.md |
Commit-by-commit review guides for the 6 commits in this PR |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| case TIME_WITHOUT_TIME_ZONE: | ||
| // The writer (FlinkArrowFieldWriter) normalizes all TIME values to microseconds | ||
| // in a TimeMicroVector, regardless of the declared Flink TIME precision. | ||
| return new ArrowTimeColumnVector((TimeMicroVector) vector); | ||
| case TIMESTAMP_WITHOUT_TIME_ZONE: | ||
| case TIMESTAMP_WITH_LOCAL_TIME_ZONE: | ||
| // The writer normalizes all timestamps to microseconds. TimeStampVector is the | ||
| // common parent of TimeStampMicroVector and TimeStampMicroTZVector. | ||
| return new ArrowTimestampColumnVector((TimeStampVector) vector); |
There was a problem hiding this comment.
The createColumnVector method for TIME_WITHOUT_TIME_ZONE unconditionally casts the Arrow vector to TimeMicroVector, but FlinkArrowUtils.toArrowType() maps TIME to different Arrow time units based on precision:
- TIME(0) →
TimeSecVector(notTimeMicroVector) →ClassCastExceptionat runtime - TIME(1-3) →
TimeMilliVector(notTimeMicroVector) →ClassCastExceptionat runtime - TIME(4-6) →
TimeMicroVector← only this precision range works correctly - TIME(7+) →
TimeNanoVector(notTimeMicroVector) →ClassCastExceptionat runtime
For TIMESTAMP_WITHOUT_TIME_ZONE/TIMESTAMP_WITH_LOCAL_TIME_ZONE, the cast to TimeStampVector will not fail (it is the parent class of all timestamp vectors), but the conversion micros / 1000 and % 1000 is only correct for microsecond-precision vectors. For other precisions:
- TIMESTAMP(0) →
TimeStampSecVector:vector.get(i)returns seconds, but is treated as microseconds → result is off by 10^6 - TIMESTAMP(1-3) →
TimeStampMilliVector: treated as microseconds → off by 10^3 - TIMESTAMP(7+) →
TimeStampNanoVector: treated as microseconds → off by 10^3 (nanos ÷ 1000 for millis, nanos % 1000 * 1000 for nano)
The createColumnVector method needs to dispatch on the TIME/TIMESTAMP precision (via the LogicalType's precision) and apply the correct unit conversion and vector cast for each precision range, consistent with FlinkArrowUtils.toArrowType(). The comment claiming "writer normalizes all TIME/TIMESTAMP values to microseconds regardless of declared precision" is incorrect — FlinkArrowUtils.toArrowType() selects the Arrow time unit based on precision.
There was a problem hiding this comment.
Good catch on the FlinkArrowUtils.toArrowType() mapping. The reader is designed as the inverse of the
writer (PR #1930), which per the design discussion normalizes all TIME/TIMESTAMP values to microsecond
precision in Arrow, regardless of the declared Flink precision. This is documented in the design doc
(section 3.3) and in the code comments on the TIME/TIMESTAMP switch cases.
That said, this coupling to the writer's normalization behavior is worth highlighting. If the writer's
approach changes to preserve precision-dependent time units, the reader would need to be updated to
match. Will need to confirm with @x-tong on PR #1930 that the writer does indeed normalize to microseconds.
| long millis = micros / 1000; | ||
| // micros % 1000 yields the sub-millisecond remainder in microseconds; * 1000 converts to nanos. | ||
| // For negative micros (pre-epoch), Java's truncation-toward-zero produces a negative | ||
| // remainder, which is consistent with the writer's inverse conversion. | ||
| int nanoOfMillisecond = ((int) (micros % 1000)) * 1000; |
There was a problem hiding this comment.
For pre-epoch timestamps (negative micros values), Java's truncation-toward-zero in the conversion nanoOfMillisecond = ((int)(micros % 1000)) * 1000 produces a negative nanoOfMillisecond. For example, with micros = -1500: millis = -1, nanoOfMillisecond = -500000.
TimestampData.fromEpochMillis documents that nanoOfMillisecond must be in the range [0, 999999]. Passing a negative value creates a TimestampData that violates this contract and may produce incorrect behavior in downstream Flink operators that rely on this invariant.
The correct approach for pre-epoch timestamps is to use floor division and adjust the remainder to be non-negative. For micros = -1500: millis = Math.floorDiv(-1500, 1000) = -2, nanoOfMillisecond = (int)(Math.floorMod(-1500, 1000)) * 1000 = 500000.
| long millis = micros / 1000; | |
| // micros % 1000 yields the sub-millisecond remainder in microseconds; * 1000 converts to nanos. | |
| // For negative micros (pre-epoch), Java's truncation-toward-zero produces a negative | |
| // remainder, which is consistent with the writer's inverse conversion. | |
| int nanoOfMillisecond = ((int) (micros % 1000)) * 1000; | |
| // Use floor-based division so that for negative micros (pre-epoch), the remainder is | |
| // non-negative and nanoOfMillisecond stays within [0, 999_999], as required by | |
| // TimestampData.fromEpochMillis. | |
| long millis = Math.floorDiv(micros, 1000); | |
| int nanoOfMillisecond = (int) Math.floorMod(micros, 1000) * 1000; |
There was a problem hiding this comment.
Thank you! Updated the PR. Changed to Math.floorDiv / Math.floorMod so
nanoOfMillisecond stays within [0, 999_999] for pre-epoch timestamps. Added a test for micros = -1500
verifying millis = -2, nanoOfMillisecond = 500_000.
|
|
||
| /** | ||
| * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch | ||
| * without allocating a new wrapper. | ||
| * | ||
| * @param vector the new Arrow vector, must not be null | ||
| */ | ||
| void setVector(BitVector vector) { | ||
| this.vector = Preconditions.checkNotNull(vector); | ||
| } |
There was a problem hiding this comment.
The setVector() method in ArrowBooleanColumnVector (and all other ArrowXxxColumnVector wrapper classes) is dead code. The FlinkArrowReader.reset() method does not call setVector() on the existing wrappers — instead it creates entirely new wrapper instances via createColumnVector(). Since these package-private methods are not called from anywhere and have no external callers, they add maintenance overhead without serving any purpose. They should either be removed, or FlinkArrowReader.reset() should be updated to use them (which would avoid the extra wrapper allocations per batch reset).
| /** | |
| * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch | |
| * without allocating a new wrapper. | |
| * | |
| * @param vector the new Arrow vector, must not be null | |
| */ | |
| void setVector(BitVector vector) { | |
| this.vector = Preconditions.checkNotNull(vector); | |
| } |
There was a problem hiding this comment.
Correct observation — FlinkArrowReader.reset() currently recreates wrappers rather than calling
setVector(). The methods were added intentionally for a future optimization: in a streaming pipeline
processing many batches with the same schema, reusing wrapper instances via setVector() avoids per-batch allocations. This is a minor cost (one method per wrapper class) that keeps the optimization path
available. Happy to remove if reviewers prefer the leaner approach.
| @Test | ||
| public void testTimeVector() { | ||
| try (BufferAllocator allocator = | ||
| FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTime", 0, Long.MAX_VALUE)) { | ||
| TimeMicroVector vec = new TimeMicroVector("col", allocator); | ||
| vec.allocateNew(3); | ||
| vec.setSafe(0, 45_296_000_000L); // 45296000000 micros -> 45296000 millis | ||
| vec.setNull(1); | ||
| vec.setSafe(2, 0L); | ||
| vec.setValueCount(3); | ||
|
|
||
| VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); | ||
| RowType rowType = RowType.of(new TimeType(6)); | ||
| FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); | ||
|
|
||
| // ArrowTimeColumnVector divides micros by 1000 to get millis | ||
| assertEquals(45_296_000, reader.read(0).getInt(0)); | ||
| assertTrue(reader.read(1).isNullAt(0)); | ||
| assertEquals(0, reader.read(2).getInt(0)); | ||
|
|
||
| reader.close(); | ||
| root.close(); | ||
| } | ||
| } |
There was a problem hiding this comment.
The test only covers TIME(6) and TIMESTAMP(6), which are the only precisions that happen to work correctly with the current reader implementation. Tests for TIME(0), TIME(3), and TIME(9) (and their TIMESTAMP equivalents) are missing. Adding such tests would immediately expose the critical ClassCastException / incorrect unit conversion bug in FlinkArrowReader.createColumnVector for TIME/TIMESTAMP of non-microsecond precision.
There was a problem hiding this comment.
The tests cover microsecond precision because that's what the writer (PR #1930) produces. The reader is
the inverse of the writer, which normalizes all TIME/TIMESTAMP to microseconds. Testing other precisions
would require vectors that the writer never generates. If the writer's normalization approach changes,
both the reader and tests would be updated together.
| The writer (PR #1930) normalizes all timestamps to **microsecond** precision in Arrow. The reader must reverse this: | ||
|
|
||
| ``` | ||
| Writer path: TimestampData → microseconds (long) stored in TimeStampMicroVector | ||
| Reader path: TimeStampMicroVector → microseconds (long) → TimestampData | ||
|
|
||
| Conversion: | ||
| long micros = vector.get(i); | ||
| long millis = micros / 1000; | ||
| int nanoOfMillisecond = (int) (micros % 1000) * 1000; | ||
| return TimestampData.fromEpochMillis(millis, nanoOfMillisecond); | ||
| ``` | ||
|
|
||
| Similarly for Time: | ||
| ``` | ||
| Writer path: int millis → micros (long) stored in TimeMicroVector | ||
| Reader path: TimeMicroVector → micros (long) → millis (int) | ||
|
|
||
| Conversion: | ||
| return (int) (vector.get(i) / 1000); | ||
| ``` | ||
|
|
||
| This matches the writer's conversions in `FlinkArrowFieldWriter.TimestampWriter` and `FlinkArrowFieldWriter.TimeWriter`. |
There was a problem hiding this comment.
The design document claims "The writer (PR #1930) normalizes all timestamps to microsecond precision in Arrow" and makes the same claim for TIME types. However, FlinkArrowUtils.toArrowType() (which is the existing type-mapping utility used by the writer) maps TIME and TIMESTAMP to different Arrow time units based on the declared Flink precision (SECOND for precision 0, MILLISECOND for 1-3, MICROSECOND for 4-6, NANOSECOND for 7+). The design document's claim is inaccurate and should be corrected to reflect the actual precision-dependent mapping.
There was a problem hiding this comment.
FlinkArrowUtils.toArrowType() defines the Arrow schema type metadata, but the writer's FlinkArrowFieldWriter controls the actual data normalization. These are separate concerns — the schema
metadata says "this is a Timestamp(MICROSECOND)" and the writer ensures the values are indeed in
microseconds. The design doc's claim refers to the writer's data normalization, not toArrowType(). Will
confirm with @x-tong on PR #1930.
|
@weiqingy Given the large amount of code, I'll still need some time. |
…rDiv/floorMod Use Math.floorDiv and Math.floorMod instead of / and % to ensure nanoOfMillisecond stays within [0, 999_999] for negative micros (pre-epoch timestamps). Add test for negative timestamp values.
Which issue does this PR close?
Closes #1851
Rationale for this change
Per AIP-1, the Flink integration data path requires converting Arrow vectors returned by the native engine (DataFusion/Rust) back into Flink RowData so downstream Flink operators can process results.
What changes are included in this PR?
Are there any user-facing changes?
No. Internal API for Flink integration.
How was this patch tested?
21 tests: ./build/mvn test -pl auron-flink-extension/auron-flink-runtime -am -Pscala-2.12 -Pflink-1.18 -Pspark-3.5 -DskipBuildNative -Dtest=FlinkArrowReaderTest
Result: 21 pass, 0 failures.