From 18ad5af8dbd4d3bd43ab162a9cc0301dedf8ba7f Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Tue, 24 Feb 2026 21:25:29 -0800 Subject: [PATCH 1/8] Add AURON-1851-DESIGN.md --- AURON-1851-DESIGN.md | 461 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 461 insertions(+) create mode 100644 AURON-1851-DESIGN.md diff --git a/AURON-1851-DESIGN.md b/AURON-1851-DESIGN.md new file mode 100644 index 000000000..a282dabe2 --- /dev/null +++ b/AURON-1851-DESIGN.md @@ -0,0 +1,461 @@ +# Design: Arrow to Flink RowData Conversion (AURON #1851) + +**Issue**: https://github.com/apache/auron/issues/1851 +**AIP**: AIP-1 — Introduce Flink integration of native engine +**Author**: @weiqingy +**Status**: Draft + +## 1. Motivation + +Per AIP-1, the Flink integration data path is: + +``` +Flink RowData → Arrow (Writer) → C Data Interface / JNI → Rust/DataFusion → Arrow → Flink RowData (Reader) + ^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + #1850 (x-tong) #1851 (this issue) +``` + +The writer side (#1850, PR #1930) converts Flink `RowData` into Arrow `VectorSchemaRoot` for export to the native engine. This issue implements the reverse: converting Arrow vectors returned by the native engine back into Flink `RowData` so downstream Flink operators can process the results. + +Without this component, the native execution results cannot flow back into the Flink pipeline. + +## 2. Design Approach + +### Two Candidate Approaches + +**Approach A — Columnar ColumnVector wrappers (recommended)** + +Follow Flink's own pattern from `flink-python`. Create thin `ColumnVector` wrappers around Arrow `FieldVector` types, then compose them into an `ArrowReader` that returns `ColumnarRowData`. This is how Flink itself reads Arrow data internally. + +- Each wrapper implements a Flink `ColumnVector` sub-interface (e.g., `IntColumnVector`, `BytesColumnVector`) +- The wrapper delegates reads directly to the underlying Arrow vector — zero data copying +- `ColumnarRowData` provides a row view over the batch without materializing individual rows + +**Approach B — Row-at-a-time FlinkArrowFieldReader** + +Mirror the writer pattern (PR #1930). Create `FlinkArrowFieldReader` subclasses that read field-by-field into `GenericRowData`, iterating row-by-row over the batch. + +- Conceptually simpler, symmetric with the writer +- Copies data element-by-element — O(rows × columns) allocations per batch + +### Decision: Approach A + +Approach A is chosen for these reasons: + +1. **Performance**: Zero-copy columnar access. `ColumnarRowData` is just a view — no per-row object allocation. This aligns with AIP-1's goal of native vectorized execution. +2. **Flink precedent**: This is exactly how Flink's own `flink-python` module reads Arrow data. The pattern is proven and maintained upstream. +3. **Compatibility**: `ColumnarRowData` implements `RowData`, so it is transparent to all downstream Flink operators. +4. **Memory efficiency**: Arrow vectors remain the single source of truth. No duplicate data structures. + +The tradeoff is more classes (one wrapper per vector type), but each is ~20 lines of delegation code. + +## 3. Detailed Design + +### 3.1 Package Structure + +All classes in `org.apache.auron.flink.arrow` (same package as `FlinkArrowUtils`, `FlinkArrowWriter`). + +``` +auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/ +├── FlinkArrowUtils.java (existing) +├── FlinkArrowWriter.java (PR #1930) +├── FlinkArrowFieldWriter.java (PR #1930) +├── FlinkArrowReader.java (NEW — orchestrator) +└── vectors/ (NEW — ColumnVector wrappers) + ├── ArrowBooleanColumnVector.java + ├── ArrowTinyIntColumnVector.java + ├── ArrowSmallIntColumnVector.java + ├── ArrowIntColumnVector.java + ├── ArrowBigIntColumnVector.java + ├── ArrowFloatColumnVector.java + ├── ArrowDoubleColumnVector.java + ├── ArrowDecimalColumnVector.java + ├── ArrowVarCharColumnVector.java + ├── ArrowVarBinaryColumnVector.java + ├── ArrowDateColumnVector.java + ├── ArrowTimeColumnVector.java + ├── ArrowTimestampColumnVector.java + ├── ArrowArrayColumnVector.java + ├── ArrowMapColumnVector.java + └── ArrowRowColumnVector.java +``` + +### 3.2 ColumnVector Wrappers + +Each wrapper implements the corresponding Flink `ColumnVector` sub-interface and delegates to the Arrow `FieldVector`. Example pattern: + +```java +/** + * Arrow-backed column vector for INT columns. + * Wraps an Arrow IntVector and implements Flink's IntColumnVector. + */ +public final class ArrowIntColumnVector implements IntColumnVector { + + private final IntVector vector; + + public ArrowIntColumnVector(IntVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + @Override + public int getInt(int i) { + return vector.get(i); + } + + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } +} +``` + +All wrappers follow this same pattern. Notable type-specific handling: + +| Flink ColumnVector Interface | Arrow Vector | Notes | +|---|---|---| +| `BooleanColumnVector` | `BitVector` | | +| `ByteColumnVector` | `TinyIntVector` | | +| `ShortColumnVector` | `SmallIntVector` | | +| `IntColumnVector` | `IntVector` | Also used for `DateDayVector` (date as epoch days) | +| `LongColumnVector` | `BigIntVector` | | +| `FloatColumnVector` | `Float4Vector` | | +| `DoubleColumnVector` | `Float8Vector` | | +| `DecimalColumnVector` | `DecimalVector` | `fromUnscaledLong` for precision ≤ 18, `fromBigDecimal` otherwise | +| `BytesColumnVector` | `VarCharVector` | `new Bytes(vector.get(i))` — matches Flink upstream pattern | +| `BytesColumnVector` | `VarBinaryVector` | `new Bytes(vector.get(i))` — matches Flink upstream pattern | +| `IntColumnVector` | `DateDayVector` | Date stored as int (epoch days), same as Flink internal | +| `IntColumnVector` | `TimeMicroVector` | Convert microseconds back to milliseconds (int) to match Flink's internal representation | +| `TimestampColumnVector` | `TimeStampMicroVector` | Convert micros → `TimestampData.fromEpochMillis(millis, nanoOfMillisecond)` | +| `TimestampColumnVector` | `TimeStampMicroTZVector` | Same conversion, for `TIMESTAMP WITH LOCAL TIME ZONE` | +| `ArrayColumnVector` | `ListVector` | Recursively wraps element vector | +| `MapColumnVector` | `MapVector` | Recursively wraps key/value vectors | +| `RowColumnVector` | `StructVector` | Recursively wraps child field vectors | + +### 3.3 Timestamp Precision Handling + +The writer (PR #1930) normalizes all timestamps to **microsecond** precision in Arrow. The reader must reverse this: + +``` +Writer path: TimestampData → microseconds (long) stored in TimeStampMicroVector +Reader path: TimeStampMicroVector → microseconds (long) → TimestampData + +Conversion: + long micros = vector.get(i); + long millis = micros / 1000; + int nanoOfMillisecond = (int) (micros % 1000) * 1000; + return TimestampData.fromEpochMillis(millis, nanoOfMillisecond); +``` + +Similarly for Time: +``` +Writer path: int millis → micros (long) stored in TimeMicroVector +Reader path: TimeMicroVector → micros (long) → millis (int) + +Conversion: + return (int) (vector.get(i) / 1000); +``` + +This matches the writer's conversions in `FlinkArrowFieldWriter.TimestampWriter` and `FlinkArrowFieldWriter.TimeWriter`. + +### 3.4 FlinkArrowReader (Orchestrator) + +```java +/** + * Reads Arrow VectorSchemaRoot data as Flink RowData. + * + *

Uses Flink's columnar data structures (ColumnarRowData backed by + * VectorizedColumnBatch) for zero-copy access to Arrow vectors. + */ +public class FlinkArrowReader { + + private final ColumnVector[] columnVectors; + private final ColumnarRowData reusableRow; + + private FlinkArrowReader(ColumnVector[] columnVectors) { + this.columnVectors = columnVectors; + VectorizedColumnBatch batch = new VectorizedColumnBatch(columnVectors); + this.reusableRow = new ColumnarRowData(batch); + } + + /** + * Creates a FlinkArrowReader from a VectorSchemaRoot and RowType. + */ + public static FlinkArrowReader create(VectorSchemaRoot root, RowType rowType) { + List fieldVectors = root.getFieldVectors(); + List fields = rowType.getFields(); + // validate sizes match + ColumnVector[] columns = new ColumnVector[fieldVectors.size()]; + for (int i = 0; i < fieldVectors.size(); i++) { + columns[i] = createColumnVector(fieldVectors.get(i), fields.get(i).getType()); + } + return new FlinkArrowReader(columns); + } + + /** + * Reads a row at the given position. Returns a reused RowData object + * (callers must not hold references across calls). + */ + public RowData read(int rowId) { + reusableRow.setRowId(rowId); + return reusableRow; + } + + /** + * Returns the number of rows in the current batch. + */ + public int getRowCount() { + // Delegate to the first vector's value count, or 0 if empty + } + + // Factory dispatching per LogicalType → ColumnVector wrapper + private static ColumnVector createColumnVector(FieldVector vector, LogicalType type) { + // instanceof dispatch matching FlinkArrowUtils.toArrowType + } +} +``` + +**Object reuse**: `read(int)` returns the same `ColumnarRowData` instance with a different `rowId`. This is standard Flink practice for columnar readers — callers must consume or copy before the next call. This matches Flink's own `ArrowReader` design. + +### 3.5 Type Coverage + +Must match the types supported by `FlinkArrowUtils.toArrowType()` (merged in #1959) and `FlinkArrowFieldWriter` (PR #1930): + +| Flink LogicalType | Arrow Vector Type | Reader Wrapper | +|---|---|---| +| `BOOLEAN` | `BitVector` | `ArrowBooleanColumnVector` | +| `TINYINT` | `TinyIntVector` | `ArrowTinyIntColumnVector` | +| `SMALLINT` | `SmallIntVector` | `ArrowSmallIntColumnVector` | +| `INTEGER` | `IntVector` | `ArrowIntColumnVector` | +| `BIGINT` | `BigIntVector` | `ArrowBigIntColumnVector` | +| `FLOAT` | `Float4Vector` | `ArrowFloatColumnVector` | +| `DOUBLE` | `Float8Vector` | `ArrowDoubleColumnVector` | +| `DECIMAL(p, s)` | `DecimalVector` | `ArrowDecimalColumnVector` | +| `VARCHAR` / `CHAR` | `VarCharVector` | `ArrowVarCharColumnVector` | +| `VARBINARY` / `BINARY` | `VarBinaryVector` | `ArrowVarBinaryColumnVector` | +| `DATE` | `DateDayVector` | `ArrowDateColumnVector` | +| `TIME(p)` | `TimeMicroVector` | `ArrowTimeColumnVector` | +| `TIMESTAMP(p)` | `TimeStampMicroVector` | `ArrowTimestampColumnVector` | +| `TIMESTAMP_LTZ(p)` | `TimeStampMicroTZVector` | `ArrowTimestampColumnVector` (shared) | +| `ARRAY` | `ListVector` | `ArrowArrayColumnVector` | +| `MAP` | `MapVector` | `ArrowMapColumnVector` | +| `ROW` | `StructVector` | `ArrowRowColumnVector` | + +### 3.6 Relationship to FlinkArrowFFIExporter + +`FlinkArrowFFIExporter` (PR #1930) handles the **export** path (Flink → native). The import path (native → Flink) will eventually need a `FlinkArrowFFIImporter` that: + +1. Receives Arrow arrays from native via C Data Interface +2. Imports them into a `VectorSchemaRoot` +3. Wraps with `FlinkArrowReader` to produce `RowData` + +That importer is out of scope for this issue. This issue delivers the `FlinkArrowReader` layer only. The FFI import will be a separate follow-up that depends on both the reader (#1851) and the planner rule. + +## 4. Dependencies + +### Build Dependencies (already available in auron-flink-runtime) + +These were added by PR #1930: +- `arrow-vector` — Arrow vector types +- `arrow-memory-unsafe` — Arrow memory allocator +- `flink-table-common` — Flink ColumnVector interfaces, ColumnarRowData, VectorizedColumnBatch + +No additional Maven dependencies needed. + +### Code Dependencies + +- `FlinkArrowUtils.toArrowSchema()` / `toArrowType()` (merged in #1959) — for consistent type mapping +- Flink's `ColumnarRowData`, `VectorizedColumnBatch` from `flink-table-common` +- Flink's `ColumnVector` sub-interfaces (`IntColumnVector`, `DecimalColumnVector`, etc.) + +### Blocked By + +- None — can proceed independently of #1850 (writer PR). The reader reads Arrow vectors regardless of how they were produced. + +### Blocks + +- FFI importer (future) — needs reader to materialize RowData from imported Arrow batches +- Planner rule for `StreamExecCalc` — needs both writer + reader + FFI to complete the round-trip + +## 5. Test Plan + +### 5.1 Unit Tests per ColumnVector Wrapper + +For each `ArrowXxxColumnVector`: +- Write known values into an Arrow vector manually +- Read back via the wrapper, assert values match +- Test null handling (set some positions null, verify `isNullAt()`) +- Test edge cases (empty vectors, max/min values, zero-length strings) + +### 5.2 FlinkArrowReader Integration Tests + +- Construct a `VectorSchemaRoot` with multiple column types +- Populate via Arrow API directly +- Create `FlinkArrowReader`, iterate all rows, verify values +- Test schema with all supported types in a single batch +- Test empty batch (0 rows) + +### 5.3 Round-Trip Tests (Write → Read) + +Once #1850 (writer) is merged: +- Create `RowData` instances with known values +- Write via `FlinkArrowWriter` into `VectorSchemaRoot` +- Read back via `FlinkArrowReader` +- Assert input and output RowData are equivalent +- Cover all supported types including nested (ARRAY, MAP, ROW) + +These round-trip tests validate that the writer and reader are perfectly symmetric, which is the critical correctness property for the end-to-end pipeline. + +### 5.4 Test File Location + +`auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/` +- `FlinkArrowReaderTest.java` — unit + integration tests +- `FlinkArrowRoundTripTest.java` — round-trip tests (added after #1850 merges) + +### Build & Run + +```bash +# Build +./auron-build.sh --pre --sparkver 3.5 --scalaver 2.12 -Pflink,flink-1.18 -DskipBuildNative + +# Run reader tests +./build/mvn test -pl auron-flink-extension/auron-flink-runtime -am \ + -Pscala-2.12 -Pflink-1.18 -Pspark-3.5 -Prelease \ + -Dtest=FlinkArrowReaderTest +``` + +## 6. Implementation Plan + +The work is split into two PRs for reviewability: + +### PR 1: ColumnVector wrappers + FlinkArrowReader + unit tests + +Files: +- 16 `ArrowXxxColumnVector` classes in `vectors/` subpackage +- `FlinkArrowReader.java` +- `FlinkArrowReaderTest.java` + +### PR 2 (after #1850 merges): Round-trip tests + +Files: +- `FlinkArrowRoundTripTest.java` + +This split allows PR 1 to proceed immediately without waiting for the writer. + +## 7. Review Feedback & Responses + +### 7.1 String/VarChar Allocation (raised as critical) + +**Claim**: `VarCharVector.get(i)` allocates a new `byte[]` per row, defeating zero-copy. + +**Assessment**: Partially valid, but matches Flink's own approach. Flink's upstream `ArrowVarCharColumnVector` in `flink-python` does exactly `new Bytes(varCharVector.get(i), 0, bytes.length)`. The `BytesColumnVector` interface returns a `Bytes` object (which holds `byte[] data`, `int offset`, `int len`), so some allocation is inherent to the interface contract. + +True zero-copy would require bypassing `BytesColumnVector` entirely and accessing Arrow's `dataBuffer`/`offsetBuffer` via `MemorySegment` — but this would break the `ColumnarRowData` abstraction and is not how Flink's internal readers work. + +**Decision**: Match Flink's upstream implementation. The per-row `byte[]` allocation is bounded by the batch size (typically ~10K rows), and string processing is already dominated by downstream serialization costs. This is a valid optimization target for a future PR if profiling shows it as a bottleneck, but not a correctness issue and not a blocker for the initial implementation. + +### 7.2 Decimal Allocation Optimization (raised as critical) + +**Claim**: `DecimalVector.getObject(i)` allocates a `BigDecimal` per row; should use `fromUnscaledLong` for precision ≤ 18. + +**Assessment**: Valid optimization. Flink's own implementation also uses `fromBigDecimal(vector.getObject(i))`, so matching upstream is correct. However, Flink's `DecimalData` uses a compact `long` representation for precision ≤ 18. We can avoid the `BigDecimal` intermediate by reading the raw 128-bit value from Arrow and extracting the `long` directly. + +**Decision**: Implement with the optimization: +```java +@Override +public DecimalData getDecimal(int i, int precision, int scale) { + if (precision <= DecimalData.MAX_COMPACT_PRECISION) { + // Read raw bytes from Arrow's 128-bit storage, extract as long + byte[] bytes = decimalVector.get(i); + long unscaledLong = extractUnscaledLong(bytes); + return DecimalData.fromUnscaledLong(unscaledLong, precision, scale); + } + return DecimalData.fromBigDecimal(decimalVector.getObject(i), precision, scale); +} +``` +This avoids `BigDecimal` allocation for the common case (precision ≤ 18) while falling back to the safe path for large decimals. + +### 7.3 Dictionary Encoding Support (raised as architectural risk) + +**Claim**: DataFusion may return `DictionaryVector` for string columns, causing `ClassCastException`. + +**Assessment**: Not applicable for Phase 1. The Calc operator evaluates scalar expressions (projections, filters) — DataFusion processes the Arrow batch and returns vectors of the same types that were input. Dictionary encoding is an optimization used in scan/shuffle operations, not in expression evaluation output. The writer sends `VarCharVector`; the native engine will return `VarCharVector`. + +**Decision**: Out of scope for this PR. If a future operator (e.g., scan pushdown) introduces dictionary-encoded vectors, a `DictionaryVector` wrapper can be added then. The `createColumnVector` factory is extensible — adding a new branch for `DictionaryVector` is straightforward. + +### 7.4 Memory Management / AutoCloseable (raised as architectural risk) + +**Claim**: Arrow off-heap memory must be freed; `FlinkArrowReader` should implement `AutoCloseable`. + +**Assessment**: Valid. The reader is a view and does not own the `VectorSchemaRoot`, but implementing `AutoCloseable` makes ownership semantics explicit and enables use in try-with-resources. The `close()` method should document that it does NOT close the underlying root — the caller (FFI importer) manages that lifecycle. + +**Decision**: Accepted. `FlinkArrowReader` will implement `AutoCloseable` with a no-op `close()` and clear documentation: +```java +/** + * Implements AutoCloseable for use in resource management blocks. + * Note: this does NOT close the underlying VectorSchemaRoot. + * The caller that created the root is responsible for its lifecycle. + */ +@Override +public void close() { + // Reader is a view; root lifecycle managed by caller +} +``` + +### 7.5 Timestamp Timezone Handling (raised as minor) + +**Claim**: Should respect Flink's session timezone when converting `TIMESTAMP_LTZ`. + +**Assessment**: Not applicable at this layer. The writer normalizes all timestamps to UTC microseconds. The reader reverses the conversion — `micros → TimestampData.fromEpochMillis(millis, nanoOfMillisecond)`. `TimestampData` is timezone-agnostic (it stores epoch-based values). Session timezone handling is the responsibility of Flink's SQL layer when presenting results to users, not the Arrow serialization layer. + +**Decision**: No change needed. + +### 7.6 Nested Type Nullability (raised as minor) + +**Claim**: Must check `isNullAt` on parent before reading children in nested types. + +**Assessment**: Already handled by the `ColumnarRowData` contract. When the caller checks `isNullAt(i)` on the row (which delegates to the parent ColumnVector), it returns true, and the caller should not call `getArray`/`getMap`/`getRow`. This is the standard Flink contract for `RowData`. However, the wrapper implementations should be defensive — if the parent is null, the Arrow child vectors may contain garbage at those positions. + +**Decision**: Document the contract. The wrappers for `Array`, `Map`, and `Row` will follow Flink's upstream pattern where `isNullAt` delegates to the parent vector. Callers are expected to check nullability before accessing nested values per the `RowData` contract. + +### 7.7 Reader Reusability Across Batches (raised as minor) + +**Claim**: Allow swapping `VectorSchemaRoot` without reinstantiating wrappers. + +**Assessment**: Valid and useful. In a streaming pipeline, the same operator processes many batches with the same schema. Reinstantiating 16+ wrappers per batch is wasteful. + +**Decision**: Accepted. Add a `reset(VectorSchemaRoot newRoot)` method that swaps the underlying vectors in each wrapper without reallocating: +```java +/** + * Resets the reader to use a new VectorSchemaRoot with the same schema. + * Avoids reinstantiating column vector wrappers for each batch. + */ +public void reset(VectorSchemaRoot newRoot) { + List newVectors = newRoot.getFieldVectors(); + for (int i = 0; i < columnVectors.length; i++) { + ((ResettableColumnVector) columnVectors[i]).reset(newVectors.get(i)); + } +} +``` +Each wrapper will implement a package-private `ResettableColumnVector` interface with a `reset(FieldVector)` method. This is a minor addition to each wrapper class. + +## 8. Alternatives Considered + +### 8.1 Approach B: Row-at-a-time with FlinkArrowFieldReader + +Mirror the writer's `FlinkArrowFieldWriter` pattern with reader counterparts that extract field values into `GenericRowData`. + +**Rejected because**: +- Allocates one `GenericRowData` per row (or requires manual object reuse) +- Copies every value from Arrow vector into Java objects +- Flink's own Arrow integration uses the columnar approach for good reason +- Does not leverage the vectorized execution model that AIP-1 is built on + +### 8.2 Using Flink's built-in Arrow readers directly + +Flink's `flink-python` module contains Arrow column vector wrappers. We could reuse them. + +**Rejected because**: +- They live in `flink-python` which is a large dependency with Python-specific code +- They are annotated `@Internal` — not part of Flink's public API +- The wrappers are simple enough that reimplementing them avoids the coupling +- Auron's timestamp precision handling (always microseconds) may differ from Flink's general-purpose implementation From c67a8d875b33807292d64901a9e467d4e08b6fe2 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 2 Mar 2026 19:04:13 -0800 Subject: [PATCH 2/8] [AURON #1851] Add primitive Arrow ColumnVector wrappers Add 7 ColumnVector wrapper classes for primitive types (Boolean, TinyInt, SmallInt, Int, BigInt, Float, Double) that implement Flink's ColumnVector sub-interfaces backed by Arrow FieldVectors for zero-copy columnar access. --- .../vectors/ArrowBigIntColumnVector.java | 63 +++++++++++++++++++ .../vectors/ArrowBooleanColumnVector.java | 63 +++++++++++++++++++ .../vectors/ArrowDoubleColumnVector.java | 63 +++++++++++++++++++ .../arrow/vectors/ArrowFloatColumnVector.java | 63 +++++++++++++++++++ .../arrow/vectors/ArrowIntColumnVector.java | 63 +++++++++++++++++++ .../vectors/ArrowSmallIntColumnVector.java | 63 +++++++++++++++++++ .../vectors/ArrowTinyIntColumnVector.java | 63 +++++++++++++++++++ docs/reviewhelper/01-primitive-wrappers.md | 29 +++++++++ 8 files changed, 470 insertions(+) create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBigIntColumnVector.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBooleanColumnVector.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDoubleColumnVector.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowFloatColumnVector.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowIntColumnVector.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowSmallIntColumnVector.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTinyIntColumnVector.java create mode 100644 docs/reviewhelper/01-primitive-wrappers.md diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBigIntColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBigIntColumnVector.java new file mode 100644 index 000000000..10e157cc9 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBigIntColumnVector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.flink.table.data.columnar.vector.LongColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link LongColumnVector} backed by an Arrow {@link BigIntVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. + */ +public final class ArrowBigIntColumnVector implements LongColumnVector { + + private BigIntVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link BigIntVector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowBigIntColumnVector(BigIntVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public long getLong(int i) { + return vector.get(i); + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(BigIntVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBooleanColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBooleanColumnVector.java new file mode 100644 index 000000000..b4e32056a --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBooleanColumnVector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.BitVector; +import org.apache.flink.table.data.columnar.vector.BooleanColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link BooleanColumnVector} backed by an Arrow {@link BitVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. + */ +public final class ArrowBooleanColumnVector implements BooleanColumnVector { + + private BitVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link BitVector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowBooleanColumnVector(BitVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public boolean getBoolean(int i) { + return vector.get(i) != 0; + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(BitVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDoubleColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDoubleColumnVector.java new file mode 100644 index 000000000..1590fd9aa --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDoubleColumnVector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.Float8Vector; +import org.apache.flink.table.data.columnar.vector.DoubleColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link DoubleColumnVector} backed by an Arrow {@link Float8Vector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. + */ +public final class ArrowDoubleColumnVector implements DoubleColumnVector { + + private Float8Vector vector; + + /** + * Creates a new wrapper around the given Arrow {@link Float8Vector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowDoubleColumnVector(Float8Vector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public double getDouble(int i) { + return vector.get(i); + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(Float8Vector vector) { + this.vector = Preconditions.checkNotNull(vector); + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowFloatColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowFloatColumnVector.java new file mode 100644 index 000000000..bc198144b --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowFloatColumnVector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.Float4Vector; +import org.apache.flink.table.data.columnar.vector.FloatColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link FloatColumnVector} backed by an Arrow {@link Float4Vector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. + */ +public final class ArrowFloatColumnVector implements FloatColumnVector { + + private Float4Vector vector; + + /** + * Creates a new wrapper around the given Arrow {@link Float4Vector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowFloatColumnVector(Float4Vector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public float getFloat(int i) { + return vector.get(i); + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(Float4Vector vector) { + this.vector = Preconditions.checkNotNull(vector); + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowIntColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowIntColumnVector.java new file mode 100644 index 000000000..5e9ae846e --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowIntColumnVector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.IntVector; +import org.apache.flink.table.data.columnar.vector.IntColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link IntColumnVector} backed by an Arrow {@link IntVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. + */ +public final class ArrowIntColumnVector implements IntColumnVector { + + private IntVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link IntVector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowIntColumnVector(IntVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public int getInt(int i) { + return vector.get(i); + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(IntVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowSmallIntColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowSmallIntColumnVector.java new file mode 100644 index 000000000..845346dab --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowSmallIntColumnVector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.SmallIntVector; +import org.apache.flink.table.data.columnar.vector.ShortColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link ShortColumnVector} backed by an Arrow {@link SmallIntVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. + */ +public final class ArrowSmallIntColumnVector implements ShortColumnVector { + + private SmallIntVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link SmallIntVector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowSmallIntColumnVector(SmallIntVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public short getShort(int i) { + return vector.get(i); + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(SmallIntVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTinyIntColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTinyIntColumnVector.java new file mode 100644 index 000000000..634d75d87 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTinyIntColumnVector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.TinyIntVector; +import org.apache.flink.table.data.columnar.vector.ByteColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link ByteColumnVector} backed by an Arrow {@link TinyIntVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. + */ +public final class ArrowTinyIntColumnVector implements ByteColumnVector { + + private TinyIntVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link TinyIntVector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowTinyIntColumnVector(TinyIntVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public byte getByte(int i) { + return vector.get(i); + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(TinyIntVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } +} diff --git a/docs/reviewhelper/01-primitive-wrappers.md b/docs/reviewhelper/01-primitive-wrappers.md new file mode 100644 index 000000000..b5f8d6c7a --- /dev/null +++ b/docs/reviewhelper/01-primitive-wrappers.md @@ -0,0 +1,29 @@ +# Commit 1: Primitive Arrow ColumnVector Wrappers + +**Commit**: `[AURON #1851] Add primitive Arrow ColumnVector wrappers` + +## What This Commit Does + +Adds 7 thin wrapper classes that implement Flink's primitive `ColumnVector` sub-interfaces by delegating to Arrow `FieldVector` types. Each wrapper is ~63 lines of straightforward delegation code. + +## Files to Review (7) + +All in `auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/`: + +| File | Arrow Vector | Flink Interface | Key Method | +|------|-------------|-----------------|------------| +| `ArrowBooleanColumnVector` | `BitVector` | `BooleanColumnVector` | `getBoolean(i)` → `vector.get(i) != 0` | +| `ArrowTinyIntColumnVector` | `TinyIntVector` | `ByteColumnVector` | `getByte(i)` → `vector.get(i)` | +| `ArrowSmallIntColumnVector` | `SmallIntVector` | `ShortColumnVector` | `getShort(i)` → `vector.get(i)` | +| `ArrowIntColumnVector` | `IntVector` | `IntColumnVector` | `getInt(i)` → `vector.get(i)` | +| `ArrowBigIntColumnVector` | `BigIntVector` | `LongColumnVector` | `getLong(i)` → `vector.get(i)` | +| `ArrowFloatColumnVector` | `Float4Vector` | `FloatColumnVector` | `getFloat(i)` → `vector.get(i)` | +| `ArrowDoubleColumnVector` | `Float8Vector` | `DoubleColumnVector` | `getDouble(i)` → `vector.get(i)` | + +## What to Look For + +- Each class is `public final`, implements one Flink interface +- Constructor validates with `Preconditions.checkNotNull` +- `isNullAt(i)` delegates to `vector.isNull(i)` +- `setVector()` is package-private — used by `FlinkArrowReader.reset()` (commit 5) +- `BitVector.get(i)` returns `int` (0 or 1), converted via `!= 0` From ac5ab6f7aa59ed24a559d089663fa98375989a1c Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 2 Mar 2026 19:08:37 -0800 Subject: [PATCH 3/8] [AURON #1851] Add string, binary, and decimal Arrow ColumnVector wrappers Add VarChar, VarBinary, and Decimal wrappers. Decimal uses compact fromUnscaledLong path for precision <= 18 to avoid BigDecimal allocation, reading directly from Arrow's data buffer via ArrowBuf.getLong(). --- .../vectors/ArrowDecimalColumnVector.java | 94 +++++++++++++++++++ .../vectors/ArrowVarBinaryColumnVector.java | 64 +++++++++++++ .../vectors/ArrowVarCharColumnVector.java | 64 +++++++++++++ docs/reviewhelper/02-string-binary-decimal.md | 23 +++++ 4 files changed, 245 insertions(+) create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDecimalColumnVector.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarBinaryColumnVector.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarCharColumnVector.java create mode 100644 docs/reviewhelper/02-string-binary-decimal.md diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDecimalColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDecimalColumnVector.java new file mode 100644 index 000000000..fe7001bb8 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDecimalColumnVector.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.vector.DecimalVector; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.columnar.vector.DecimalColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link DecimalColumnVector} backed by an Arrow {@link DecimalVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. + * + *

For compact decimals (precision <= 18), the implementation avoids BigDecimal allocation by + * reading the unscaled value directly from Arrow's little-endian byte representation. + */ +public final class ArrowDecimalColumnVector implements DecimalColumnVector { + + /** + * Maximum precision that fits in a long (same as Flink's internal compact decimal threshold). + * DecimalData.MAX_COMPACT_PRECISION is package-private, so we define our own constant. + */ + private static final int MAX_COMPACT_PRECISION = 18; + + private DecimalVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link DecimalVector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowDecimalColumnVector(DecimalVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public DecimalData getDecimal(int i, int precision, int scale) { + if (precision <= MAX_COMPACT_PRECISION) { + // Compact path: avoid BigDecimal allocation for precision <= 18. + // Arrow stores decimals as 128-bit little-endian two's complement. + // For precision <= 18, the value fits in a long. + long unscaledLong = readLittleEndianLong(vector.getDataBuffer(), (long) i * 16); + return DecimalData.fromUnscaledLong(unscaledLong, precision, scale); + } + return DecimalData.fromBigDecimal(vector.getObject(i), precision, scale); + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(DecimalVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** + * Reads a little-endian long (8 bytes) from an Arrow buffer at the given byte offset. Arrow + * stores decimals as 128-bit little-endian two's complement. For values that fit in a long + * (precision <= 18), the lower 8 bytes are sufficient. + * + * @param buffer the Arrow data buffer + * @param offset byte offset into the buffer + * @return the unscaled value as a long + */ + private static long readLittleEndianLong(ArrowBuf buffer, long offset) { + return buffer.getLong(offset); + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarBinaryColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarBinaryColumnVector.java new file mode 100644 index 000000000..a22261f24 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarBinaryColumnVector.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.flink.table.data.columnar.vector.BytesColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link BytesColumnVector} backed by an Arrow {@link VarBinaryVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. + */ +public final class ArrowVarBinaryColumnVector implements BytesColumnVector { + + private VarBinaryVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link VarBinaryVector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowVarBinaryColumnVector(VarBinaryVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public Bytes getBytes(int i) { + byte[] bytes = vector.get(i); + return new Bytes(bytes, 0, bytes.length); + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(VarBinaryVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarCharColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarCharColumnVector.java new file mode 100644 index 000000000..b6ecf4aa9 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarCharColumnVector.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.VarCharVector; +import org.apache.flink.table.data.columnar.vector.BytesColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link BytesColumnVector} backed by an Arrow {@link VarCharVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. + */ +public final class ArrowVarCharColumnVector implements BytesColumnVector { + + private VarCharVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link VarCharVector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowVarCharColumnVector(VarCharVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public Bytes getBytes(int i) { + byte[] bytes = vector.get(i); + return new Bytes(bytes, 0, bytes.length); + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(VarCharVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } +} diff --git a/docs/reviewhelper/02-string-binary-decimal.md b/docs/reviewhelper/02-string-binary-decimal.md new file mode 100644 index 000000000..b13efa78c --- /dev/null +++ b/docs/reviewhelper/02-string-binary-decimal.md @@ -0,0 +1,23 @@ +# Commit 2: String, Binary, and Decimal Wrappers + +**Commit**: `[AURON #1851] Add string, binary, and decimal Arrow ColumnVector wrappers` + +## What This Commit Does + +Adds 3 wrappers for variable-length and decimal types. The decimal wrapper includes an optimization to avoid `BigDecimal` allocation for common precision values. + +## Files to Review (3) + +| File | Arrow Vector | Flink Interface | Notes | +|------|-------------|-----------------|-------| +| `ArrowVarCharColumnVector` | `VarCharVector` | `BytesColumnVector` | `getBytes(i)` returns `new Bytes(vector.get(i), 0, len)` | +| `ArrowVarBinaryColumnVector` | `VarBinaryVector` | `BytesColumnVector` | Same pattern as VarChar | +| `ArrowDecimalColumnVector` | `DecimalVector` | `DecimalColumnVector` | Compact + wide path | + +## What to Look For + +- **VarChar/VarBinary**: `vector.get(i)` returns `byte[]` (allocation per call — inherent to Arrow API, matches Flink upstream) +- **Decimal optimization** (key review point): + - Precision ≤ 18: reads 8 bytes directly from `vector.getDataBuffer()` via `ArrowBuf.getLong(offset)`, avoids `BigDecimal` + - Precision > 18: falls back to `DecimalData.fromBigDecimal(vector.getObject(i), ...)` + - `MAX_COMPACT_PRECISION = 18` is a local constant because Flink's `DecimalData.MAX_COMPACT_PRECISION` is package-private From 553d42e845cf860a7fb6e8c8f60ec94fed37ed18 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 2 Mar 2026 19:12:41 -0800 Subject: [PATCH 4/8] [AURON #1851] Add temporal Arrow ColumnVector wrappers Add Date (epoch days), Time (micros->millis conversion), and Timestamp (micros->TimestampData with millis + nanoOfMillisecond) wrappers. Timestamp wrapper uses TimeStampVector parent to handle both TIMESTAMP and TIMESTAMP_LTZ Arrow vectors. --- .../arrow/vectors/ArrowDateColumnVector.java | 64 ++++++++++++++ .../arrow/vectors/ArrowTimeColumnVector.java | 74 ++++++++++++++++ .../vectors/ArrowTimestampColumnVector.java | 85 +++++++++++++++++++ docs/reviewhelper/03-temporal-wrappers.md | 26 ++++++ 4 files changed, 249 insertions(+) create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDateColumnVector.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimeColumnVector.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java create mode 100644 docs/reviewhelper/03-temporal-wrappers.md diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDateColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDateColumnVector.java new file mode 100644 index 000000000..cf356b90c --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDateColumnVector.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.DateDayVector; +import org.apache.flink.table.data.columnar.vector.IntColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link IntColumnVector} backed by an Arrow {@link DateDayVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. {@link DateDayVector} stores dates + * as epoch days ({@code int}), which maps directly to Flink's internal DATE representation. + */ +public final class ArrowDateColumnVector implements IntColumnVector { + + private DateDayVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link DateDayVector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowDateColumnVector(DateDayVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public int getInt(int i) { + return vector.get(i); + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(DateDayVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimeColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimeColumnVector.java new file mode 100644 index 000000000..c4e9e2aa3 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimeColumnVector.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.flink.table.data.columnar.vector.IntColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link IntColumnVector} backed by an Arrow {@link TimeMicroVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. {@link TimeMicroVector} stores time + * values as microseconds since midnight ({@code long}), which are converted to milliseconds + * ({@code int}) to match Flink's internal TIME representation. This reverses the writer's + * conversion from milliseconds to microseconds. + */ +public final class ArrowTimeColumnVector implements IntColumnVector { + + private TimeMicroVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link TimeMicroVector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowTimeColumnVector(TimeMicroVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** + * Returns the time value at the given index as milliseconds since midnight. + * + *

The underlying Arrow vector stores microseconds; this method divides by 1000 to produce + * the millisecond value expected by Flink's internal TIME type. + * + * @param i the row index + * @return time of day in milliseconds since midnight + */ + @Override + public int getInt(int i) { + return (int) (vector.get(i) / 1000); + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(TimeMicroVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java new file mode 100644 index 000000000..41f70519a --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.TimeStampVector; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.columnar.vector.TimestampColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link TimestampColumnVector} backed by an Arrow {@link TimeStampVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. It handles both {@code + * TimeStampMicroVector} (TIMESTAMP) and {@code TimeStampMicroTZVector} (TIMESTAMP_LTZ) by + * accepting their common parent type {@link TimeStampVector}. Microsecond values are converted to + * Flink's {@link TimestampData} representation (epoch millis + sub-millisecond nanos). + */ +public final class ArrowTimestampColumnVector implements TimestampColumnVector { + + private TimeStampVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link TimeStampVector}. + * + *

Accepts both {@code TimeStampMicroVector} and {@code TimeStampMicroTZVector} since they + * share the same storage format and parent type. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowTimestampColumnVector(TimeStampVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** + * Returns the timestamp at the given index as a {@link TimestampData}. + * + *

The underlying Arrow vector stores microseconds since epoch. This method splits the value + * into epoch milliseconds and sub-millisecond nanoseconds to construct a {@link TimestampData}. + * + * @param i the row index + * @param precision the timestamp precision (unused; conversion is always from microseconds) + * @return the timestamp value + */ + @Override + public TimestampData getTimestamp(int i, int precision) { + long micros = vector.get(i); + long millis = micros / 1000; + // micros % 1000 yields the sub-millisecond remainder in microseconds; * 1000 converts to nanos. + // For negative micros (pre-epoch), Java's truncation-toward-zero produces a negative + // remainder, which is consistent with the writer's inverse conversion. + int nanoOfMillisecond = ((int) (micros % 1000)) * 1000; + return TimestampData.fromEpochMillis(millis, nanoOfMillisecond); + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(TimeStampVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } +} diff --git a/docs/reviewhelper/03-temporal-wrappers.md b/docs/reviewhelper/03-temporal-wrappers.md new file mode 100644 index 000000000..39b3025c4 --- /dev/null +++ b/docs/reviewhelper/03-temporal-wrappers.md @@ -0,0 +1,26 @@ +# Commit 3: Temporal Wrappers + +**Commit**: `[AURON #1851] Add temporal Arrow ColumnVector wrappers` + +## What This Commit Does + +Adds 3 wrappers for date, time, and timestamp types. Time and timestamp require precision conversion (microseconds ↔ milliseconds) to match the writer's normalization. + +## Files to Review (3) + +| File | Arrow Vector | Flink Interface | Conversion | +|------|-------------|-----------------|------------| +| `ArrowDateColumnVector` | `DateDayVector` | `IntColumnVector` | Direct passthrough (epoch days as int) | +| `ArrowTimeColumnVector` | `TimeMicroVector` | `IntColumnVector` | `(int)(vector.get(i) / 1000)` — micros → millis | +| `ArrowTimestampColumnVector` | `TimeStampVector` | `TimestampColumnVector` | Splits micros into millis + nanoOfMillisecond | + +## What to Look For + +- **Time conversion**: Writer (PR #1930) normalizes all TIME to microseconds. Reader reverses: `micros / 1000 = millis`. Max time value (86,399,999) fits in int. +- **Timestamp conversion** (key review point): + ``` + millis = micros / 1000 + nanoOfMillisecond = ((int)(micros % 1000)) * 1000 + ``` + Explicit parentheses added for clarity. Comment documents pre-epoch rounding behavior. +- **TimeStampVector**: Uses the abstract parent type to handle both `TimeStampMicroVector` (TIMESTAMP) and `TimeStampMicroTZVector` (TIMESTAMP_LTZ) with a single wrapper. From 07521ce3bfd2d6519973b9e62858b51b03963428 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 2 Mar 2026 19:18:56 -0800 Subject: [PATCH 5/8] [AURON #1851] Add nested type Arrow ColumnVector wrappers Add Array (ListVector), Map (MapVector), and Row (StructVector) wrappers with recursive child vector wrapping. Array/Map use offset buffers for element access; Row uses VectorizedColumnBatch with reusable ColumnarRowData. --- .../arrow/vectors/ArrowArrayColumnVector.java | 76 +++++++++++++++++ .../arrow/vectors/ArrowMapColumnVector.java | 83 +++++++++++++++++++ .../arrow/vectors/ArrowRowColumnVector.java | 79 ++++++++++++++++++ docs/reviewhelper/04-nested-types.md | 30 +++++++ 4 files changed, 268 insertions(+) create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowArrayColumnVector.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowMapColumnVector.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowRowColumnVector.java create mode 100644 docs/reviewhelper/04-nested-types.md diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowArrayColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowArrayColumnVector.java new file mode 100644 index 000000000..64d673513 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowArrayColumnVector.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.complex.ListVector; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.columnar.ColumnarArrayData; +import org.apache.flink.table.data.columnar.vector.ArrayColumnVector; +import org.apache.flink.table.data.columnar.vector.ColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link ArrayColumnVector} backed by an Arrow {@link ListVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow list data from Flink's columnar batch execution engine. The child element vector is + * itself a Flink {@link ColumnVector} wrapping the corresponding Arrow child vector, enabling + * recursive nesting of complex types. + */ +public final class ArrowArrayColumnVector implements ArrayColumnVector { + + private ListVector vector; + private ColumnVector elementColumnVector; + + /** + * Creates a new wrapper around the given Arrow {@link ListVector}. + * + * @param vector the Arrow list vector to wrap, must not be null + * @param elementColumnVector the Flink column vector wrapping the Arrow child element vector, + * must not be null + */ + public ArrowArrayColumnVector(ListVector vector, ColumnVector elementColumnVector) { + this.vector = Preconditions.checkNotNull(vector); + this.elementColumnVector = Preconditions.checkNotNull(elementColumnVector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public ArrayData getArray(int i) { + int offset = vector.getOffsetBuffer().getInt((long) i * ListVector.OFFSET_WIDTH); + int length = vector.getOffsetBuffer().getInt((long) (i + 1) * ListVector.OFFSET_WIDTH) - offset; + return new ColumnarArrayData(elementColumnVector, offset, length); + } + + /** + * Replaces the underlying Arrow vector and child element vector. Used during reader reset to + * point at a new batch without allocating a new wrapper. + * + * @param vector the new Arrow list vector, must not be null + * @param elementColumnVector the new Flink column vector for child elements, must not be null + */ + void setVector(ListVector vector, ColumnVector elementColumnVector) { + this.vector = Preconditions.checkNotNull(vector); + this.elementColumnVector = Preconditions.checkNotNull(elementColumnVector); + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowMapColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowMapColumnVector.java new file mode 100644 index 000000000..c64232f37 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowMapColumnVector.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.complex.MapVector; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.columnar.ColumnarMapData; +import org.apache.flink.table.data.columnar.vector.ColumnVector; +import org.apache.flink.table.data.columnar.vector.MapColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link MapColumnVector} backed by an Arrow {@link MapVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow map data from Flink's columnar batch execution engine. Arrow represents maps as a list + * of struct entries, where each entry contains a "key" and "value" child vector. The key and value + * Flink {@link ColumnVector} instances wrap the corresponding Arrow child vectors, enabling + * recursive nesting of complex types. + */ +public final class ArrowMapColumnVector implements MapColumnVector { + + private MapVector vector; + private ColumnVector keyColumnVector; + private ColumnVector valueColumnVector; + + /** + * Creates a new wrapper around the given Arrow {@link MapVector}. + * + * @param vector the Arrow map vector to wrap, must not be null + * @param keyColumnVector the Flink column vector wrapping the Arrow key child vector, must not + * be null + * @param valueColumnVector the Flink column vector wrapping the Arrow value child vector, must + * not be null + */ + public ArrowMapColumnVector(MapVector vector, ColumnVector keyColumnVector, ColumnVector valueColumnVector) { + this.vector = Preconditions.checkNotNull(vector); + this.keyColumnVector = Preconditions.checkNotNull(keyColumnVector); + this.valueColumnVector = Preconditions.checkNotNull(valueColumnVector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public MapData getMap(int i) { + int offset = vector.getOffsetBuffer().getInt((long) i * MapVector.OFFSET_WIDTH); + int length = vector.getOffsetBuffer().getInt((long) (i + 1) * MapVector.OFFSET_WIDTH) - offset; + return new ColumnarMapData(keyColumnVector, valueColumnVector, offset, length); + } + + /** + * Replaces the underlying Arrow vector and child key/value vectors. Used during reader reset + * to point at a new batch without allocating a new wrapper. + * + * @param vector the new Arrow map vector, must not be null + * @param keyColumnVector the new Flink column vector for keys, must not be null + * @param valueColumnVector the new Flink column vector for values, must not be null + */ + void setVector(MapVector vector, ColumnVector keyColumnVector, ColumnVector valueColumnVector) { + this.vector = Preconditions.checkNotNull(vector); + this.keyColumnVector = Preconditions.checkNotNull(keyColumnVector); + this.valueColumnVector = Preconditions.checkNotNull(valueColumnVector); + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowRowColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowRowColumnVector.java new file mode 100644 index 000000000..93db8dd26 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowRowColumnVector.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.complex.StructVector; +import org.apache.flink.table.data.columnar.ColumnarRowData; +import org.apache.flink.table.data.columnar.vector.ColumnVector; +import org.apache.flink.table.data.columnar.vector.RowColumnVector; +import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link RowColumnVector} backed by an Arrow {@link StructVector}. + * + *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow struct data from Flink's columnar batch execution engine. Each struct field is + * represented as a Flink {@link ColumnVector} wrapping the corresponding Arrow child vector, + * enabling recursive nesting of complex types. The child vectors are bundled into a {@link + * VectorizedColumnBatch} and accessed through a reusable {@link ColumnarRowData} instance. + */ +public final class ArrowRowColumnVector implements RowColumnVector { + + private StructVector vector; + private VectorizedColumnBatch childBatch; + private ColumnarRowData reusableRow; + + /** + * Creates a new wrapper around the given Arrow {@link StructVector}. + * + * @param vector the Arrow struct vector to wrap, must not be null + * @param childColumnVectors the Flink column vectors wrapping each Arrow child field vector, + * must not be null + */ + public ArrowRowColumnVector(StructVector vector, ColumnVector[] childColumnVectors) { + this.vector = Preconditions.checkNotNull(vector); + this.childBatch = new VectorizedColumnBatch(Preconditions.checkNotNull(childColumnVectors)); + this.reusableRow = new ColumnarRowData(childBatch); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public ColumnarRowData getRow(int i) { + reusableRow.setRowId(i); + return reusableRow; + } + + /** + * Replaces the underlying Arrow vector and child field vectors. Used during reader reset to + * point at a new batch without allocating a new wrapper. + * + * @param vector the new Arrow struct vector, must not be null + * @param childColumnVectors the new Flink column vectors for child fields, must not be null + */ + void setVector(StructVector vector, ColumnVector[] childColumnVectors) { + this.vector = Preconditions.checkNotNull(vector); + this.childBatch = new VectorizedColumnBatch(Preconditions.checkNotNull(childColumnVectors)); + this.reusableRow = new ColumnarRowData(childBatch); + } +} diff --git a/docs/reviewhelper/04-nested-types.md b/docs/reviewhelper/04-nested-types.md new file mode 100644 index 000000000..f5a040542 --- /dev/null +++ b/docs/reviewhelper/04-nested-types.md @@ -0,0 +1,30 @@ +# Commit 4: Nested Type Wrappers + +**Commit**: `[AURON #1851] Add nested type Arrow ColumnVector wrappers` + +## What This Commit Does + +Adds 3 wrappers for complex/nested types (Array, Map, Row). These recursively compose child `ColumnVector` wrappers, enabling arbitrarily nested schemas. + +## Files to Review (3) + +| File | Arrow Vector | Flink Interface | Pattern | +|------|-------------|-----------------|---------| +| `ArrowArrayColumnVector` | `ListVector` | `ArrayColumnVector` | Offset buffer + child element vector | +| `ArrowMapColumnVector` | `MapVector` | `MapColumnVector` | Offset buffer + key/value child vectors | +| `ArrowRowColumnVector` | `StructVector` | `RowColumnVector` | VectorizedColumnBatch + reusable ColumnarRowData | + +## What to Look For + +- **Offset calculation** (Array + Map): + ``` + offset = vector.getOffsetBuffer().getInt((long) i * OFFSET_WIDTH) + length = vector.getOffsetBuffer().getInt((long) (i + 1) * OFFSET_WIDTH) - offset + ``` + Standard Arrow list offset protocol. `OFFSET_WIDTH = 4` (inherited from `BaseRepeatedValueVector`). + +- **Map data model**: `ColumnarMapData(keyColumnVector, valueColumnVector, offset, length)` — takes ColumnVectors directly, not ArrayData. + +- **Row reuse**: `getRow(i)` returns the same `ColumnarRowData` instance with `setRowId(i)`. Standard Flink pattern — callers must consume before next call. + +- **Constructor parameters**: Array takes `(ListVector, ColumnVector)`, Map takes `(MapVector, ColumnVector, ColumnVector)`, Row takes `(StructVector, ColumnVector[])`. Child vectors are created by `FlinkArrowReader` (commit 5). From b51d7a6759664b94455aa59968921e4e7172a9f0 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 2 Mar 2026 19:26:08 -0800 Subject: [PATCH 6/8] [AURON #1851] Add FlinkArrowReader orchestrator FlinkArrowReader creates ColumnVector wrappers from VectorSchemaRoot + RowType, provides read(int) returning ColumnarRowData, getRowCount(), reset(VectorSchemaRoot) for batch reuse, and AutoCloseable. Dispatches LogicalTypeRoot to the correct wrapper via switch on all 17 supported types. --- .../auron/flink/arrow/FlinkArrowReader.java | 256 ++++++++++++++++++ docs/reviewhelper/05-reader-orchestrator.md | 29 ++ 2 files changed, 285 insertions(+) create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java create mode 100644 docs/reviewhelper/05-reader-orchestrator.md diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java new file mode 100644 index 000000000..ee416984f --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import java.util.List; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.auron.flink.arrow.vectors.ArrowArrayColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowBigIntColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowBooleanColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowDateColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowDecimalColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowDoubleColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowFloatColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowIntColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowMapColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowRowColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowSmallIntColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowTimeColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowTimestampColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowTinyIntColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowVarBinaryColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowVarCharColumnVector; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.columnar.ColumnarRowData; +import org.apache.flink.table.data.columnar.vector.ColumnVector; +import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +/** + * Reads Arrow {@link VectorSchemaRoot} data as Flink {@link RowData}. + * + *

Uses Flink's columnar data structures ({@link ColumnarRowData} backed by {@link + * VectorizedColumnBatch}) for zero-copy access to Arrow vectors. Each Arrow {@link FieldVector} is + * wrapped in a Flink {@link ColumnVector} implementation that delegates reads directly to the + * underlying Arrow vector. + * + *

Object reuse: {@link #read(int)} returns the same {@link ColumnarRowData} instance with a + * different row ID. Callers must consume or copy the returned row before the next call. This is + * standard Flink practice for columnar readers. + * + *

Implements {@link AutoCloseable} for use in resource management blocks. Note: this does NOT + * close the underlying {@link VectorSchemaRoot}. The caller that created the root is responsible + * for its lifecycle. + */ +public class FlinkArrowReader implements AutoCloseable { + + private final RowType rowType; + private ColumnVector[] columnVectors; + private VectorizedColumnBatch batch; + private ColumnarRowData reusableRow; + private VectorSchemaRoot root; + + private FlinkArrowReader(ColumnVector[] columnVectors, VectorSchemaRoot root, RowType rowType) { + this.columnVectors = columnVectors; + this.batch = new VectorizedColumnBatch(columnVectors); + this.reusableRow = new ColumnarRowData(batch); + this.root = root; + this.rowType = rowType; + } + + /** + * Creates a {@link FlinkArrowReader} from a {@link VectorSchemaRoot} and {@link RowType}. + * + *

The RowType must match the schema of the VectorSchemaRoot (same number of fields, matching + * types). Each Arrow field vector is wrapped in the appropriate Flink {@link ColumnVector} + * implementation based on the corresponding Flink {@link LogicalType}. + * + * @param root the Arrow VectorSchemaRoot containing the data + * @param rowType the Flink RowType describing the schema + * @return a new FlinkArrowReader + * @throws IllegalArgumentException if field counts do not match + * @throws UnsupportedOperationException if a LogicalType is not supported + */ + public static FlinkArrowReader create(VectorSchemaRoot root, RowType rowType) { + Preconditions.checkNotNull(root, "root must not be null"); + Preconditions.checkNotNull(rowType, "rowType must not be null"); + List fieldVectors = root.getFieldVectors(); + List fields = rowType.getFields(); + if (fieldVectors.size() != fields.size()) { + throw new IllegalArgumentException( + "VectorSchemaRoot has " + fieldVectors.size() + " fields but RowType has " + fields.size()); + } + ColumnVector[] columns = new ColumnVector[fieldVectors.size()]; + for (int i = 0; i < fieldVectors.size(); i++) { + columns[i] = createColumnVector(fieldVectors.get(i), fields.get(i).getType()); + } + return new FlinkArrowReader(columns, root, rowType); + } + + /** + * Reads a row at the given position. Returns a reused {@link RowData} object — callers must not + * hold references across calls. + * + * @param rowId the row index within the current batch + * @return the row data at the given position + */ + public RowData read(int rowId) { + reusableRow.setRowId(rowId); + return reusableRow; + } + + /** + * Returns the number of rows in the current batch. + * + * @return the row count + */ + public int getRowCount() { + return root.getRowCount(); + } + + /** + * Resets the reader to use a new {@link VectorSchemaRoot} with the same schema. Recreates + * column vector wrappers for the new root's field vectors. + * + *

The new root must have the same schema (same number and types of fields) as the original. + * + * @param newRoot the new VectorSchemaRoot, must not be null + */ + public void reset(VectorSchemaRoot newRoot) { + Preconditions.checkNotNull(newRoot, "newRoot must not be null"); + this.root = newRoot; + List newVectors = newRoot.getFieldVectors(); + Preconditions.checkArgument( + newVectors.size() == columnVectors.length, + "New root has %s fields but reader expects %s", + newVectors.size(), + columnVectors.length); + List fields = rowType.getFields(); + for (int i = 0; i < columnVectors.length; i++) { + columnVectors[i] = + createColumnVector(newVectors.get(i), fields.get(i).getType()); + } + this.batch = new VectorizedColumnBatch(columnVectors); + this.reusableRow = new ColumnarRowData(batch); + } + + /** + * Implements {@link AutoCloseable} for use in resource management blocks. Note: this does NOT + * close the underlying {@link VectorSchemaRoot}. The caller that created the root is responsible + * for its lifecycle. + */ + @Override + public void close() { + // Reader is a view; root lifecycle managed by caller. + } + + /** + * Creates the appropriate Flink {@link ColumnVector} wrapper for the given Arrow {@link + * FieldVector} and Flink {@link LogicalType}. + */ + static ColumnVector createColumnVector(FieldVector vector, LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return new ArrowBooleanColumnVector((BitVector) vector); + case TINYINT: + return new ArrowTinyIntColumnVector((TinyIntVector) vector); + case SMALLINT: + return new ArrowSmallIntColumnVector((SmallIntVector) vector); + case INTEGER: + return new ArrowIntColumnVector((IntVector) vector); + case BIGINT: + return new ArrowBigIntColumnVector((BigIntVector) vector); + case FLOAT: + return new ArrowFloatColumnVector((Float4Vector) vector); + case DOUBLE: + return new ArrowDoubleColumnVector((Float8Vector) vector); + case VARCHAR: + case CHAR: + return new ArrowVarCharColumnVector((VarCharVector) vector); + case VARBINARY: + case BINARY: + return new ArrowVarBinaryColumnVector((VarBinaryVector) vector); + case DECIMAL: + return new ArrowDecimalColumnVector((DecimalVector) vector); + case DATE: + return new ArrowDateColumnVector((DateDayVector) vector); + case TIME_WITHOUT_TIME_ZONE: + // The writer (FlinkArrowFieldWriter) normalizes all TIME values to microseconds + // in a TimeMicroVector, regardless of the declared Flink TIME precision. + return new ArrowTimeColumnVector((TimeMicroVector) vector); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // The writer normalizes all timestamps to microseconds. TimeStampVector is the + // common parent of TimeStampMicroVector and TimeStampMicroTZVector. + return new ArrowTimestampColumnVector((TimeStampVector) vector); + case ARRAY: + return createArrayColumnVector((ListVector) vector, (ArrayType) type); + case MAP: + return createMapColumnVector((MapVector) vector, (MapType) type); + case ROW: + return createRowColumnVector((StructVector) vector, (RowType) type); + default: + throw new UnsupportedOperationException( + "Unsupported Flink type for Arrow reader: " + type.asSummaryString()); + } + } + + private static ColumnVector createArrayColumnVector(ListVector vector, ArrayType arrayType) { + ColumnVector elementVector = createColumnVector(vector.getDataVector(), arrayType.getElementType()); + return new ArrowArrayColumnVector(vector, elementVector); + } + + private static ColumnVector createMapColumnVector(MapVector vector, MapType mapType) { + StructVector entriesVector = (StructVector) vector.getDataVector(); + ColumnVector keyVector = createColumnVector(entriesVector.getChild(MapVector.KEY_NAME), mapType.getKeyType()); + ColumnVector valueVector = + createColumnVector(entriesVector.getChild(MapVector.VALUE_NAME), mapType.getValueType()); + return new ArrowMapColumnVector(vector, keyVector, valueVector); + } + + private static ColumnVector createRowColumnVector(StructVector vector, RowType rowType) { + List childVectors = vector.getChildrenFromFields(); + List fields = rowType.getFields(); + ColumnVector[] childColumns = new ColumnVector[childVectors.size()]; + for (int i = 0; i < childVectors.size(); i++) { + childColumns[i] = + createColumnVector(childVectors.get(i), fields.get(i).getType()); + } + return new ArrowRowColumnVector(vector, childColumns); + } +} diff --git a/docs/reviewhelper/05-reader-orchestrator.md b/docs/reviewhelper/05-reader-orchestrator.md new file mode 100644 index 000000000..3f42c6ae0 --- /dev/null +++ b/docs/reviewhelper/05-reader-orchestrator.md @@ -0,0 +1,29 @@ +# Commit 5: FlinkArrowReader Orchestrator + +**Commit**: `[AURON #1851] Add FlinkArrowReader orchestrator` + +## What This Commit Does + +Adds the central `FlinkArrowReader` class that ties all 16 wrappers together. It creates the appropriate `ColumnVector` wrapper for each field based on the Flink `LogicalType`, and provides the public API for reading rows. + +## File to Review (1) + +`auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java` + +## Public API + +| Method | Purpose | +|--------|---------| +| `create(VectorSchemaRoot, RowType)` | Factory — creates wrappers, validates field counts | +| `read(int rowId)` | Returns reusable `ColumnarRowData` at given row position | +| `getRowCount()` | Delegates to `root.getRowCount()` | +| `reset(VectorSchemaRoot)` | Swaps to new batch with same schema | +| `close()` | No-op — reader is a view, caller manages root lifecycle | + +## What to Look For + +- **`createColumnVector` switch** (key review point): Dispatches `LogicalTypeRoot` → correct wrapper + Arrow vector cast. Verify every type from `FlinkArrowUtils.toArrowType()` is covered. +- **TIME/TIMESTAMP comments**: Documents that the writer normalizes to microseconds, so the reader always expects `TimeMicroVector` / `TimeStampVector`. +- **Nested type factories**: `createArrayColumnVector`, `createMapColumnVector`, `createRowColumnVector` recursively call `createColumnVector` for child types. +- **Null checks**: `Preconditions.checkNotNull` on `create()` and `reset()` params. `reset()` also validates field count with `checkArgument`. +- **`reset()` approach**: Recreates all column vectors using stored `RowType` rather than per-type vector swapping. Simpler, runs once per batch (not per row). From ffcf6d6e5339a1c3a3866fb37e96adf7a1e135bb Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 2 Mar 2026 19:34:22 -0800 Subject: [PATCH 7/8] [AURON #1851] Add FlinkArrowReader unit tests 21 test methods covering all 17 supported types with values and null handling, plus integration tests for multi-column batch, empty batch, reader reset, and unsupported type exception. --- .../flink/arrow/FlinkArrowReaderTest.java | 714 ++++++++++++++++++ .../PR-AURON-1851/AURON-1851-DESIGN.md | 0 docs/reviewhelper/06-unit-tests.md | 48 ++ 3 files changed, 762 insertions(+) create mode 100644 auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java rename AURON-1851-DESIGN.md => docs/PR-AURON-1851/AURON-1851-DESIGN.md (100%) create mode 100644 docs/reviewhelper/06-unit-tests.md diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java new file mode 100644 index 000000000..e468e4cb8 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import static org.junit.jupiter.api.Assertions.*; + +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.Test; + +/** Unit tests for {@link FlinkArrowReader}. */ +public class FlinkArrowReaderTest { + + @Test + public void testBooleanVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testBoolean", 0, Long.MAX_VALUE)) { + BitVector bitVector = new BitVector("col", allocator); + bitVector.allocateNew(3); + bitVector.setSafe(0, 1); + bitVector.setNull(1); + bitVector.setSafe(2, 0); + bitVector.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(bitVector)); + RowType rowType = RowType.of(new BooleanType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(3, reader.getRowCount()); + assertTrue(reader.read(0).getBoolean(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertFalse(reader.read(2).getBoolean(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testTinyIntVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTinyInt", 0, Long.MAX_VALUE)) { + TinyIntVector vec = new TinyIntVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 1); + vec.setNull(1); + vec.setSafe(2, -1); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new TinyIntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals((byte) 1, reader.read(0).getByte(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals((byte) -1, reader.read(2).getByte(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testSmallIntVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testSmallInt", 0, Long.MAX_VALUE)) { + SmallIntVector vec = new SmallIntVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 100); + vec.setNull(1); + vec.setSafe(2, -100); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new SmallIntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals((short) 100, reader.read(0).getShort(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals((short) -100, reader.read(2).getShort(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testIntVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testInt", 0, Long.MAX_VALUE)) { + IntVector vec = new IntVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 42); + vec.setNull(1); + vec.setSafe(2, Integer.MAX_VALUE); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new IntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(42, reader.read(0).getInt(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(Integer.MAX_VALUE, reader.read(2).getInt(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testBigIntVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testBigInt", 0, Long.MAX_VALUE)) { + BigIntVector vec = new BigIntVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, Long.MAX_VALUE); + vec.setNull(1); + vec.setSafe(2, -1L); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new BigIntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(Long.MAX_VALUE, reader.read(0).getLong(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(-1L, reader.read(2).getLong(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testFloatVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testFloat", 0, Long.MAX_VALUE)) { + Float4Vector vec = new Float4Vector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 3.14f); + vec.setNull(1); + vec.setSafe(2, Float.NaN); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new FloatType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(3.14f, reader.read(0).getFloat(0), 0.001f); + assertTrue(reader.read(1).isNullAt(0)); + assertTrue(Float.isNaN(reader.read(2).getFloat(0))); + + reader.close(); + root.close(); + } + } + + @Test + public void testDoubleVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDouble", 0, Long.MAX_VALUE)) { + Float8Vector vec = new Float8Vector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 2.718); + vec.setNull(1); + vec.setSafe(2, Double.MAX_VALUE); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new DoubleType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(2.718, reader.read(0).getDouble(0), 0.001); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(Double.MAX_VALUE, reader.read(2).getDouble(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testVarCharVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testVarChar", 0, Long.MAX_VALUE)) { + VarCharVector vec = new VarCharVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, "hello".getBytes(StandardCharsets.UTF_8)); + vec.setNull(1); + vec.setSafe(2, "".getBytes(StandardCharsets.UTF_8)); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new VarCharType(100)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + RowData row0 = reader.read(0); + assertArrayEquals( + "hello".getBytes(StandardCharsets.UTF_8), row0.getString(0).toBytes()); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(0, reader.read(2).getString(0).toBytes().length); + + reader.close(); + root.close(); + } + } + + @Test + public void testVarBinaryVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testVarBinary", 0, Long.MAX_VALUE)) { + VarBinaryVector vec = new VarBinaryVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, new byte[] {0x01, 0x02}); + vec.setNull(1); + vec.setSafe(2, new byte[] {}); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new VarBinaryType(100)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertArrayEquals(new byte[] {0x01, 0x02}, reader.read(0).getBinary(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertArrayEquals(new byte[] {}, reader.read(2).getBinary(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testDecimalVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDecimal", 0, Long.MAX_VALUE)) { + // Compact path: precision 10 (<= 18) + DecimalVector compactVec = new DecimalVector("compact", allocator, 10, 2); + compactVec.allocateNew(2); + compactVec.setSafe(0, new BigDecimal("123.45")); + compactVec.setNull(1); + compactVec.setValueCount(2); + + VectorSchemaRoot compactRoot = new VectorSchemaRoot(Collections.singletonList(compactVec)); + RowType compactType = RowType.of(new DecimalType(10, 2)); + FlinkArrowReader compactReader = FlinkArrowReader.create(compactRoot, compactType); + + DecimalData compactVal = compactReader.read(0).getDecimal(0, 10, 2); + assertEquals(new BigDecimal("123.45"), compactVal.toBigDecimal()); + assertTrue(compactReader.read(1).isNullAt(0)); + + compactReader.close(); + compactRoot.close(); + + // Wide path: precision 20 (> 18) + DecimalVector wideVec = new DecimalVector("wide", allocator, 20, 2); + wideVec.allocateNew(1); + wideVec.setSafe(0, new BigDecimal("123456789012345678.90")); + wideVec.setValueCount(1); + + VectorSchemaRoot wideRoot = new VectorSchemaRoot(Collections.singletonList(wideVec)); + RowType wideType = RowType.of(new DecimalType(20, 2)); + FlinkArrowReader wideReader = FlinkArrowReader.create(wideRoot, wideType); + + DecimalData wideVal = wideReader.read(0).getDecimal(0, 20, 2); + assertEquals(0, new BigDecimal("123456789012345678.90").compareTo(wideVal.toBigDecimal())); + + wideReader.close(); + wideRoot.close(); + } + } + + @Test + public void testDateVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDate", 0, Long.MAX_VALUE)) { + DateDayVector vec = new DateDayVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 18000); + vec.setNull(1); + vec.setSafe(2, 0); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new DateType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(18000, reader.read(0).getInt(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(0, reader.read(2).getInt(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testTimeVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTime", 0, Long.MAX_VALUE)) { + TimeMicroVector vec = new TimeMicroVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 45_296_000_000L); // 45296000000 micros -> 45296000 millis + vec.setNull(1); + vec.setSafe(2, 0L); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new TimeType(6)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + // ArrowTimeColumnVector divides micros by 1000 to get millis + assertEquals(45_296_000, reader.read(0).getInt(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(0, reader.read(2).getInt(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testTimestampVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTimestamp", 0, Long.MAX_VALUE)) { + TimeStampMicroVector vec = new TimeStampMicroVector("col", allocator); + vec.allocateNew(2); + vec.setSafe(0, 1_672_531_200_000_123L); // 2023-01-01T00:00:00.000123 + vec.setNull(1); + vec.setValueCount(2); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new TimestampType(6)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + TimestampData ts = reader.read(0).getTimestamp(0, 6); + // millis = 1672531200000123 / 1000 = 1672531200000 + assertEquals(1_672_531_200_000L, ts.getMillisecond()); + // nanoOfMillisecond = (1672531200000123 % 1000) * 1000 = 123 * 1000 = 123000 + assertEquals(123_000, ts.getNanoOfMillisecond()); + assertTrue(reader.read(1).isNullAt(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testTimestampLtzVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTimestampLtz", 0, Long.MAX_VALUE)) { + TimeStampMicroTZVector vec = new TimeStampMicroTZVector("col", allocator, "UTC"); + vec.allocateNew(2); + vec.setSafe(0, 1_672_531_200_000_123L); + vec.setNull(1); + vec.setValueCount(2); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new LocalZonedTimestampType(6)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + TimestampData ts = reader.read(0).getTimestamp(0, 6); + assertEquals(1_672_531_200_000L, ts.getMillisecond()); + assertEquals(123_000, ts.getNanoOfMillisecond()); + assertTrue(reader.read(1).isNullAt(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testArrayVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testArray", 0, Long.MAX_VALUE)) { + ListVector listVector = ListVector.empty("col", allocator); + listVector.addOrGetVector(FieldType.nullable(new ArrowType.Int(32, true))); + listVector.allocateNew(); + + UnionListWriter writer = listVector.getWriter(); + // Row 0: [1, 2, 3] + writer.setPosition(0); + writer.startList(); + writer.writeInt(1); + writer.writeInt(2); + writer.writeInt(3); + writer.endList(); + // Row 1: null (set after writer completes) + writer.setPosition(1); + writer.startList(); + writer.endList(); + // Row 2: [] + writer.setPosition(2); + writer.startList(); + writer.endList(); + writer.setValueCount(3); + listVector.setNull(1); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(listVector)); + RowType rowType = RowType.of(new ArrayType(new IntType())); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + // Row 0: [1, 2, 3] + ArrayData arr0 = reader.read(0).getArray(0); + assertEquals(3, arr0.size()); + assertEquals(1, arr0.getInt(0)); + assertEquals(2, arr0.getInt(1)); + assertEquals(3, arr0.getInt(2)); + + // Row 1: null + assertTrue(reader.read(1).isNullAt(0)); + + // Row 2: [] + ArrayData arr2 = reader.read(2).getArray(0); + assertEquals(0, arr2.size()); + + reader.close(); + root.close(); + } + } + + @Test + public void testMapVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testMap", 0, Long.MAX_VALUE)) { + MapVector mapVector = MapVector.empty("col", allocator, false); + mapVector.allocateNew(); + + BaseWriter.MapWriter mapWriter = mapVector.getWriter(); + // Row 0: {"a" -> 1} + mapWriter.setPosition(0); + mapWriter.startMap(); + mapWriter.startEntry(); + byte[] keyBytes = "a".getBytes(StandardCharsets.UTF_8); + ArrowBuf keyBuf = allocator.buffer(keyBytes.length); + keyBuf.setBytes(0, keyBytes); + mapWriter.key().varChar().writeVarChar(0, keyBytes.length, keyBuf); + keyBuf.close(); + mapWriter.value().integer().writeInt(1); + mapWriter.endEntry(); + mapWriter.endMap(); + // Row 1: null (write empty map, then mark null) + mapWriter.setPosition(1); + mapWriter.startMap(); + mapWriter.endMap(); + // Row 2: {} (empty map) + mapWriter.setPosition(2); + mapWriter.startMap(); + mapWriter.endMap(); + mapVector.setValueCount(3); + mapVector.setNull(1); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(mapVector)); + RowType rowType = RowType.of(new MapType(new VarCharType(100), new IntType())); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + // Row 0: {"a" -> 1} + MapData map0 = reader.read(0).getMap(0); + assertEquals(1, map0.size()); + + // Row 1: null + assertTrue(reader.read(1).isNullAt(0)); + + // Row 2: empty + MapData map2 = reader.read(2).getMap(0); + assertEquals(0, map2.size()); + + reader.close(); + root.close(); + } + } + + @Test + public void testRowVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testRow", 0, Long.MAX_VALUE)) { + FieldType intFieldType = FieldType.nullable(new ArrowType.Int(32, true)); + FieldType utf8FieldType = FieldType.nullable(ArrowType.Utf8.INSTANCE); + StructVector structVector = StructVector.empty("col", allocator); + structVector.addOrGet("f0", intFieldType, IntVector.class); + structVector.addOrGet("f1", utf8FieldType, VarCharVector.class); + + IntVector intChild = (IntVector) structVector.getChild("f0"); + VarCharVector strChild = (VarCharVector) structVector.getChild("f1"); + + structVector.allocateNew(); + structVector.setIndexDefined(0); + intChild.setSafe(0, 42); + strChild.setSafe(0, "hello".getBytes(StandardCharsets.UTF_8)); + structVector.setNull(1); + intChild.setNull(1); + strChild.setNull(1); + structVector.setValueCount(2); + intChild.setValueCount(2); + strChild.setValueCount(2); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(structVector)); + RowType innerType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"f0", "f1"}); + RowType rowType = RowType.of(new LogicalType[] {innerType}, new String[] {"col"}); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + // Row 0: struct(42, "hello") + RowData nested0 = reader.read(0).getRow(0, 2); + assertEquals(42, nested0.getInt(0)); + assertArrayEquals( + "hello".getBytes(StandardCharsets.UTF_8), + nested0.getString(1).toBytes()); + + // Row 1: null struct + assertTrue(reader.read(1).isNullAt(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testMultiColumnBatch() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testMultiCol", 0, Long.MAX_VALUE)) { + IntVector intVec = new IntVector("id", allocator); + intVec.allocateNew(3); + intVec.setSafe(0, 1); + intVec.setSafe(1, 2); + intVec.setSafe(2, 3); + intVec.setValueCount(3); + + VarCharVector strVec = new VarCharVector("name", allocator); + strVec.allocateNew(3); + strVec.setSafe(0, "alice".getBytes(StandardCharsets.UTF_8)); + strVec.setSafe(1, "bob".getBytes(StandardCharsets.UTF_8)); + strVec.setNull(2); + strVec.setValueCount(3); + + BitVector boolVec = new BitVector("active", allocator); + boolVec.allocateNew(3); + boolVec.setSafe(0, 1); + boolVec.setSafe(1, 0); + boolVec.setSafe(2, 1); + boolVec.setValueCount(3); + + List vectors = Arrays.asList(intVec, strVec, boolVec); + VectorSchemaRoot root = new VectorSchemaRoot(vectors); + RowType rowType = RowType.of( + new LogicalType[] {new IntType(), new VarCharType(100), new BooleanType()}, + new String[] {"id", "name", "active"}); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(3, reader.getRowCount()); + + RowData row0 = reader.read(0); + assertEquals(1, row0.getInt(0)); + assertArrayEquals( + "alice".getBytes(StandardCharsets.UTF_8), row0.getString(1).toBytes()); + assertTrue(row0.getBoolean(2)); + + RowData row1 = reader.read(1); + assertEquals(2, row1.getInt(0)); + assertArrayEquals( + "bob".getBytes(StandardCharsets.UTF_8), row1.getString(1).toBytes()); + assertFalse(row1.getBoolean(2)); + + RowData row2 = reader.read(2); + assertEquals(3, row2.getInt(0)); + assertTrue(row2.isNullAt(1)); + assertTrue(row2.getBoolean(2)); + + reader.close(); + root.close(); + } + } + + @Test + public void testEmptyBatch() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testEmpty", 0, Long.MAX_VALUE)) { + IntVector vec = new IntVector("col", allocator); + vec.allocateNew(0); + vec.setValueCount(0); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new IntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(0, reader.getRowCount()); + + reader.close(); + root.close(); + } + } + + @Test + public void testResetWithNewRoot() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testReset", 0, Long.MAX_VALUE)) { + // Batch 1 + IntVector vec1 = new IntVector("col", allocator); + vec1.allocateNew(2); + vec1.setSafe(0, 10); + vec1.setSafe(1, 20); + vec1.setValueCount(2); + VectorSchemaRoot root1 = new VectorSchemaRoot(Collections.singletonList(vec1)); + + RowType rowType = RowType.of(new IntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root1, rowType); + + assertEquals(10, reader.read(0).getInt(0)); + assertEquals(20, reader.read(1).getInt(0)); + + // Batch 2 — different values, same schema + IntVector vec2 = new IntVector("col", allocator); + vec2.allocateNew(2); + vec2.setSafe(0, 99); + vec2.setSafe(1, 100); + vec2.setValueCount(2); + VectorSchemaRoot root2 = new VectorSchemaRoot(Collections.singletonList(vec2)); + + reader.reset(root2); + assertEquals(2, reader.getRowCount()); + assertEquals(99, reader.read(0).getInt(0)); + assertEquals(100, reader.read(1).getInt(0)); + + reader.close(); + root1.close(); + root2.close(); + } + } + + @Test + public void testUnsupportedTypeThrows() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testUnsupported", 0, Long.MAX_VALUE)) { + IntVector vec = new IntVector("col", allocator); + vec.allocateNew(1); + vec.setSafe(0, 1); + vec.setValueCount(1); + + assertThrows( + UnsupportedOperationException.class, + () -> FlinkArrowReader.createColumnVector( + vec, new RawType<>(String.class, StringSerializer.INSTANCE))); + + vec.close(); + } + } +} diff --git a/AURON-1851-DESIGN.md b/docs/PR-AURON-1851/AURON-1851-DESIGN.md similarity index 100% rename from AURON-1851-DESIGN.md rename to docs/PR-AURON-1851/AURON-1851-DESIGN.md diff --git a/docs/reviewhelper/06-unit-tests.md b/docs/reviewhelper/06-unit-tests.md new file mode 100644 index 000000000..83e710e3d --- /dev/null +++ b/docs/reviewhelper/06-unit-tests.md @@ -0,0 +1,48 @@ +# Commit 6: Unit Tests + +**Commit**: `[AURON #1851] Add FlinkArrowReader unit tests` + +## What This Commit Does + +Adds `FlinkArrowReaderTest` with 21 test methods covering all 17 supported types, null handling, edge cases, and integration scenarios. + +## File to Review (1) + +`auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java` + +## Test Coverage + +### Per-type tests (17 tests — values + nulls for each type) +| Test | Type | Key Assertions | +|------|------|----------------| +| `testBooleanVector` | BOOLEAN | true/false via `getBoolean`, null via `isNullAt` | +| `testTinyIntVector` | TINYINT | byte values + null | +| `testSmallIntVector` | SMALLINT | short values + null | +| `testIntVector` | INTEGER | int values including MAX_VALUE + null | +| `testBigIntVector` | BIGINT | long values including MAX_VALUE + null | +| `testFloatVector` | FLOAT | float values including NaN + null | +| `testDoubleVector` | DOUBLE | double values including MAX_VALUE + null | +| `testVarCharVector` | VARCHAR | string bytes + null + empty string | +| `testVarBinaryVector` | VARBINARY | binary bytes + null + empty | +| `testDecimalVector` | DECIMAL | compact path (p=10) + wide path (p=20) + null | +| `testDateVector` | DATE | epoch days + null | +| `testTimeVector` | TIME | micros→millis conversion + null | +| `testTimestampVector` | TIMESTAMP | micros→TimestampData (millis + nanos) + null | +| `testTimestampLtzVector` | TIMESTAMP_LTZ | same as above with timezone | +| `testArrayVector` | ARRAY | nested int array + null + empty | +| `testMapVector` | MAP | key-value pairs + null + empty | +| `testRowVector` | ROW | nested struct + null | + +### Integration tests (4 tests) +| Test | What It Validates | +|------|-------------------| +| `testMultiColumnBatch` | 3-column (int, varchar, boolean) batch, 3 rows | +| `testEmptyBatch` | 0-row batch → `getRowCount() == 0` | +| `testResetWithNewRoot` | `reset()` swaps batch, verifies new values | +| `testUnsupportedTypeThrows` | `RawType` → `UnsupportedOperationException` | + +## What to Look For + +- Each test creates Arrow vectors manually, populates values, wraps in `VectorSchemaRoot`, creates `FlinkArrowReader`, and asserts read values match +- Resource cleanup: `try-with-resources` for `BufferAllocator`, explicit `close()` for reader and root +- Decimal test verifies both compact (p≤18, uses `fromUnscaledLong`) and wide (p>18, uses `fromBigDecimal`) paths From 68584dacb097fbbcb9f28acfe45e336c6df275ee Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Tue, 3 Mar 2026 09:55:50 -0800 Subject: [PATCH 8/8] [AURON #1851] Fix pre-epoch timestamp conversion with Math.floorDiv/floorMod Use Math.floorDiv and Math.floorMod instead of / and % to ensure nanoOfMillisecond stays within [0, 999_999] for negative micros (pre-epoch timestamps). Add test for negative timestamp values. --- .../vectors/ArrowTimestampColumnVector.java | 10 ++++---- .../flink/arrow/FlinkArrowReaderTest.java | 23 +++++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java index 41f70519a..574e6df82 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java @@ -65,11 +65,11 @@ public boolean isNullAt(int i) { @Override public TimestampData getTimestamp(int i, int precision) { long micros = vector.get(i); - long millis = micros / 1000; - // micros % 1000 yields the sub-millisecond remainder in microseconds; * 1000 converts to nanos. - // For negative micros (pre-epoch), Java's truncation-toward-zero produces a negative - // remainder, which is consistent with the writer's inverse conversion. - int nanoOfMillisecond = ((int) (micros % 1000)) * 1000; + // Use floor-based division so that for negative micros (pre-epoch), the remainder is + // non-negative and nanoOfMillisecond stays within [0, 999_999], as required by + // TimestampData.fromEpochMillis. + long millis = Math.floorDiv(micros, 1000); + int nanoOfMillisecond = ((int) Math.floorMod(micros, 1000)) * 1000; return TimestampData.fromEpochMillis(millis, nanoOfMillisecond); } diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java index e468e4cb8..1816c6c87 100644 --- a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java +++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java @@ -436,6 +436,29 @@ public void testTimestampLtzVector() { } } + @Test + public void testNegativeTimestamp() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testNegTs", 0, Long.MAX_VALUE)) { + TimeStampMicroVector vec = new TimeStampMicroVector("col", allocator); + vec.allocateNew(1); + vec.setSafe(0, -1500L); // 1.5 millis before epoch + vec.setValueCount(1); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new TimestampType(6)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + TimestampData ts = reader.read(0).getTimestamp(0, 6); + // floorDiv(-1500, 1000) = -2, floorMod(-1500, 1000) = 500 + assertEquals(-2L, ts.getMillisecond()); + assertEquals(500_000, ts.getNanoOfMillisecond()); + + reader.close(); + root.close(); + } + } + @Test public void testArrayVector() { try (BufferAllocator allocator =