-
Notifications
You must be signed in to change notification settings - Fork 210
[AURON #1851] Introduce Arrow to Flink RowData reader #2063
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
weiqingy
wants to merge
8
commits into
apache:master
Choose a base branch
from
weiqingy:wiyang/AURON-1851-arrow-reader
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+2,763
−0
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
18ad5af
Add AURON-1851-DESIGN.md
weiqingy c67a8d8
[AURON #1851] Add primitive Arrow ColumnVector wrappers
weiqingy ac5ab6f
[AURON #1851] Add string, binary, and decimal Arrow ColumnVector wrap…
weiqingy 553d42e
[AURON #1851] Add temporal Arrow ColumnVector wrappers
weiqingy 07521ce
[AURON #1851] Add nested type Arrow ColumnVector wrappers
weiqingy b51d7a6
[AURON #1851] Add FlinkArrowReader orchestrator
weiqingy ffcf6d6
[AURON #1851] Add FlinkArrowReader unit tests
weiqingy 68584da
[AURON #1851] Fix pre-epoch timestamp conversion with Math.floorDiv/f…
weiqingy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
256 changes: 256 additions & 0 deletions
256
...sion/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,256 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.auron.flink.arrow; | ||
|
|
||
| import java.util.List; | ||
| import org.apache.arrow.vector.BigIntVector; | ||
| import org.apache.arrow.vector.BitVector; | ||
| import org.apache.arrow.vector.DateDayVector; | ||
| import org.apache.arrow.vector.DecimalVector; | ||
| import org.apache.arrow.vector.FieldVector; | ||
| import org.apache.arrow.vector.Float4Vector; | ||
| import org.apache.arrow.vector.Float8Vector; | ||
| import org.apache.arrow.vector.IntVector; | ||
| import org.apache.arrow.vector.SmallIntVector; | ||
| import org.apache.arrow.vector.TimeMicroVector; | ||
| import org.apache.arrow.vector.TimeStampVector; | ||
| import org.apache.arrow.vector.TinyIntVector; | ||
| import org.apache.arrow.vector.VarBinaryVector; | ||
| import org.apache.arrow.vector.VarCharVector; | ||
| import org.apache.arrow.vector.VectorSchemaRoot; | ||
| import org.apache.arrow.vector.complex.ListVector; | ||
| import org.apache.arrow.vector.complex.MapVector; | ||
| import org.apache.arrow.vector.complex.StructVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowArrayColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowBigIntColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowBooleanColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowDateColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowDecimalColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowDoubleColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowFloatColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowIntColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowMapColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowRowColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowSmallIntColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowTimeColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowTimestampColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowTinyIntColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowVarBinaryColumnVector; | ||
| import org.apache.auron.flink.arrow.vectors.ArrowVarCharColumnVector; | ||
| import org.apache.flink.table.data.RowData; | ||
| import org.apache.flink.table.data.columnar.ColumnarRowData; | ||
| import org.apache.flink.table.data.columnar.vector.ColumnVector; | ||
| import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch; | ||
| import org.apache.flink.table.types.logical.ArrayType; | ||
| import org.apache.flink.table.types.logical.LogicalType; | ||
| import org.apache.flink.table.types.logical.MapType; | ||
| import org.apache.flink.table.types.logical.RowType; | ||
| import org.apache.flink.util.Preconditions; | ||
|
|
||
| /** | ||
| * Reads Arrow {@link VectorSchemaRoot} data as Flink {@link RowData}. | ||
| * | ||
| * <p>Uses Flink's columnar data structures ({@link ColumnarRowData} backed by {@link | ||
| * VectorizedColumnBatch}) for zero-copy access to Arrow vectors. Each Arrow {@link FieldVector} is | ||
| * wrapped in a Flink {@link ColumnVector} implementation that delegates reads directly to the | ||
| * underlying Arrow vector. | ||
| * | ||
| * <p>Object reuse: {@link #read(int)} returns the same {@link ColumnarRowData} instance with a | ||
| * different row ID. Callers must consume or copy the returned row before the next call. This is | ||
| * standard Flink practice for columnar readers. | ||
| * | ||
| * <p>Implements {@link AutoCloseable} for use in resource management blocks. Note: this does NOT | ||
| * close the underlying {@link VectorSchemaRoot}. The caller that created the root is responsible | ||
| * for its lifecycle. | ||
| */ | ||
| public class FlinkArrowReader implements AutoCloseable { | ||
|
|
||
| private final RowType rowType; | ||
| private ColumnVector[] columnVectors; | ||
| private VectorizedColumnBatch batch; | ||
| private ColumnarRowData reusableRow; | ||
| private VectorSchemaRoot root; | ||
|
|
||
| private FlinkArrowReader(ColumnVector[] columnVectors, VectorSchemaRoot root, RowType rowType) { | ||
| this.columnVectors = columnVectors; | ||
| this.batch = new VectorizedColumnBatch(columnVectors); | ||
| this.reusableRow = new ColumnarRowData(batch); | ||
| this.root = root; | ||
| this.rowType = rowType; | ||
| } | ||
|
|
||
| /** | ||
| * Creates a {@link FlinkArrowReader} from a {@link VectorSchemaRoot} and {@link RowType}. | ||
| * | ||
| * <p>The RowType must match the schema of the VectorSchemaRoot (same number of fields, matching | ||
| * types). Each Arrow field vector is wrapped in the appropriate Flink {@link ColumnVector} | ||
| * implementation based on the corresponding Flink {@link LogicalType}. | ||
| * | ||
| * @param root the Arrow VectorSchemaRoot containing the data | ||
| * @param rowType the Flink RowType describing the schema | ||
| * @return a new FlinkArrowReader | ||
| * @throws IllegalArgumentException if field counts do not match | ||
| * @throws UnsupportedOperationException if a LogicalType is not supported | ||
| */ | ||
| public static FlinkArrowReader create(VectorSchemaRoot root, RowType rowType) { | ||
| Preconditions.checkNotNull(root, "root must not be null"); | ||
| Preconditions.checkNotNull(rowType, "rowType must not be null"); | ||
| List<FieldVector> fieldVectors = root.getFieldVectors(); | ||
| List<RowType.RowField> fields = rowType.getFields(); | ||
| if (fieldVectors.size() != fields.size()) { | ||
| throw new IllegalArgumentException( | ||
| "VectorSchemaRoot has " + fieldVectors.size() + " fields but RowType has " + fields.size()); | ||
| } | ||
| ColumnVector[] columns = new ColumnVector[fieldVectors.size()]; | ||
| for (int i = 0; i < fieldVectors.size(); i++) { | ||
| columns[i] = createColumnVector(fieldVectors.get(i), fields.get(i).getType()); | ||
| } | ||
| return new FlinkArrowReader(columns, root, rowType); | ||
| } | ||
|
|
||
| /** | ||
| * Reads a row at the given position. Returns a reused {@link RowData} object — callers must not | ||
| * hold references across calls. | ||
| * | ||
| * @param rowId the row index within the current batch | ||
| * @return the row data at the given position | ||
| */ | ||
| public RowData read(int rowId) { | ||
| reusableRow.setRowId(rowId); | ||
| return reusableRow; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the number of rows in the current batch. | ||
| * | ||
| * @return the row count | ||
| */ | ||
| public int getRowCount() { | ||
| return root.getRowCount(); | ||
| } | ||
|
|
||
| /** | ||
| * Resets the reader to use a new {@link VectorSchemaRoot} with the same schema. Recreates | ||
| * column vector wrappers for the new root's field vectors. | ||
| * | ||
| * <p>The new root must have the same schema (same number and types of fields) as the original. | ||
| * | ||
| * @param newRoot the new VectorSchemaRoot, must not be null | ||
| */ | ||
| public void reset(VectorSchemaRoot newRoot) { | ||
| Preconditions.checkNotNull(newRoot, "newRoot must not be null"); | ||
| this.root = newRoot; | ||
| List<FieldVector> newVectors = newRoot.getFieldVectors(); | ||
| Preconditions.checkArgument( | ||
| newVectors.size() == columnVectors.length, | ||
| "New root has %s fields but reader expects %s", | ||
| newVectors.size(), | ||
| columnVectors.length); | ||
| List<RowType.RowField> fields = rowType.getFields(); | ||
| for (int i = 0; i < columnVectors.length; i++) { | ||
| columnVectors[i] = | ||
| createColumnVector(newVectors.get(i), fields.get(i).getType()); | ||
| } | ||
| this.batch = new VectorizedColumnBatch(columnVectors); | ||
| this.reusableRow = new ColumnarRowData(batch); | ||
| } | ||
|
|
||
| /** | ||
| * Implements {@link AutoCloseable} for use in resource management blocks. Note: this does NOT | ||
| * close the underlying {@link VectorSchemaRoot}. The caller that created the root is responsible | ||
| * for its lifecycle. | ||
| */ | ||
| @Override | ||
| public void close() { | ||
| // Reader is a view; root lifecycle managed by caller. | ||
| } | ||
|
|
||
| /** | ||
| * Creates the appropriate Flink {@link ColumnVector} wrapper for the given Arrow {@link | ||
| * FieldVector} and Flink {@link LogicalType}. | ||
| */ | ||
| static ColumnVector createColumnVector(FieldVector vector, LogicalType type) { | ||
| switch (type.getTypeRoot()) { | ||
| case BOOLEAN: | ||
| return new ArrowBooleanColumnVector((BitVector) vector); | ||
| case TINYINT: | ||
| return new ArrowTinyIntColumnVector((TinyIntVector) vector); | ||
| case SMALLINT: | ||
| return new ArrowSmallIntColumnVector((SmallIntVector) vector); | ||
| case INTEGER: | ||
| return new ArrowIntColumnVector((IntVector) vector); | ||
| case BIGINT: | ||
| return new ArrowBigIntColumnVector((BigIntVector) vector); | ||
| case FLOAT: | ||
| return new ArrowFloatColumnVector((Float4Vector) vector); | ||
| case DOUBLE: | ||
| return new ArrowDoubleColumnVector((Float8Vector) vector); | ||
| case VARCHAR: | ||
| case CHAR: | ||
| return new ArrowVarCharColumnVector((VarCharVector) vector); | ||
| case VARBINARY: | ||
| case BINARY: | ||
| return new ArrowVarBinaryColumnVector((VarBinaryVector) vector); | ||
| case DECIMAL: | ||
| return new ArrowDecimalColumnVector((DecimalVector) vector); | ||
| case DATE: | ||
| return new ArrowDateColumnVector((DateDayVector) vector); | ||
| case TIME_WITHOUT_TIME_ZONE: | ||
| // The writer (FlinkArrowFieldWriter) normalizes all TIME values to microseconds | ||
| // in a TimeMicroVector, regardless of the declared Flink TIME precision. | ||
| return new ArrowTimeColumnVector((TimeMicroVector) vector); | ||
| case TIMESTAMP_WITHOUT_TIME_ZONE: | ||
| case TIMESTAMP_WITH_LOCAL_TIME_ZONE: | ||
| // The writer normalizes all timestamps to microseconds. TimeStampVector is the | ||
| // common parent of TimeStampMicroVector and TimeStampMicroTZVector. | ||
| return new ArrowTimestampColumnVector((TimeStampVector) vector); | ||
| case ARRAY: | ||
| return createArrayColumnVector((ListVector) vector, (ArrayType) type); | ||
| case MAP: | ||
| return createMapColumnVector((MapVector) vector, (MapType) type); | ||
| case ROW: | ||
| return createRowColumnVector((StructVector) vector, (RowType) type); | ||
| default: | ||
| throw new UnsupportedOperationException( | ||
| "Unsupported Flink type for Arrow reader: " + type.asSummaryString()); | ||
| } | ||
| } | ||
|
|
||
| private static ColumnVector createArrayColumnVector(ListVector vector, ArrayType arrayType) { | ||
| ColumnVector elementVector = createColumnVector(vector.getDataVector(), arrayType.getElementType()); | ||
| return new ArrowArrayColumnVector(vector, elementVector); | ||
| } | ||
|
|
||
| private static ColumnVector createMapColumnVector(MapVector vector, MapType mapType) { | ||
| StructVector entriesVector = (StructVector) vector.getDataVector(); | ||
| ColumnVector keyVector = createColumnVector(entriesVector.getChild(MapVector.KEY_NAME), mapType.getKeyType()); | ||
| ColumnVector valueVector = | ||
| createColumnVector(entriesVector.getChild(MapVector.VALUE_NAME), mapType.getValueType()); | ||
| return new ArrowMapColumnVector(vector, keyVector, valueVector); | ||
| } | ||
|
|
||
| private static ColumnVector createRowColumnVector(StructVector vector, RowType rowType) { | ||
| List<FieldVector> childVectors = vector.getChildrenFromFields(); | ||
| List<RowType.RowField> fields = rowType.getFields(); | ||
| ColumnVector[] childColumns = new ColumnVector[childVectors.size()]; | ||
| for (int i = 0; i < childVectors.size(); i++) { | ||
| childColumns[i] = | ||
| createColumnVector(childVectors.get(i), fields.get(i).getType()); | ||
| } | ||
| return new ArrowRowColumnVector(vector, childColumns); | ||
| } | ||
| } | ||
76 changes: 76 additions & 0 deletions
76
...nk-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowArrayColumnVector.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.auron.flink.arrow.vectors; | ||
|
|
||
| import org.apache.arrow.vector.complex.ListVector; | ||
| import org.apache.flink.table.data.ArrayData; | ||
| import org.apache.flink.table.data.columnar.ColumnarArrayData; | ||
| import org.apache.flink.table.data.columnar.vector.ArrayColumnVector; | ||
| import org.apache.flink.table.data.columnar.vector.ColumnVector; | ||
| import org.apache.flink.util.Preconditions; | ||
|
|
||
| /** | ||
| * A Flink {@link ArrayColumnVector} backed by an Arrow {@link ListVector}. | ||
| * | ||
| * <p>This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access | ||
| * to Arrow list data from Flink's columnar batch execution engine. The child element vector is | ||
| * itself a Flink {@link ColumnVector} wrapping the corresponding Arrow child vector, enabling | ||
| * recursive nesting of complex types. | ||
| */ | ||
| public final class ArrowArrayColumnVector implements ArrayColumnVector { | ||
|
|
||
| private ListVector vector; | ||
| private ColumnVector elementColumnVector; | ||
|
|
||
| /** | ||
| * Creates a new wrapper around the given Arrow {@link ListVector}. | ||
| * | ||
| * @param vector the Arrow list vector to wrap, must not be null | ||
| * @param elementColumnVector the Flink column vector wrapping the Arrow child element vector, | ||
| * must not be null | ||
| */ | ||
| public ArrowArrayColumnVector(ListVector vector, ColumnVector elementColumnVector) { | ||
| this.vector = Preconditions.checkNotNull(vector); | ||
| this.elementColumnVector = Preconditions.checkNotNull(elementColumnVector); | ||
| } | ||
|
|
||
| /** {@inheritDoc} */ | ||
| @Override | ||
| public boolean isNullAt(int i) { | ||
| return vector.isNull(i); | ||
| } | ||
|
|
||
| /** {@inheritDoc} */ | ||
| @Override | ||
| public ArrayData getArray(int i) { | ||
| int offset = vector.getOffsetBuffer().getInt((long) i * ListVector.OFFSET_WIDTH); | ||
| int length = vector.getOffsetBuffer().getInt((long) (i + 1) * ListVector.OFFSET_WIDTH) - offset; | ||
| return new ColumnarArrayData(elementColumnVector, offset, length); | ||
| } | ||
|
|
||
| /** | ||
| * Replaces the underlying Arrow vector and child element vector. Used during reader reset to | ||
| * point at a new batch without allocating a new wrapper. | ||
| * | ||
| * @param vector the new Arrow list vector, must not be null | ||
| * @param elementColumnVector the new Flink column vector for child elements, must not be null | ||
| */ | ||
| void setVector(ListVector vector, ColumnVector elementColumnVector) { | ||
| this.vector = Preconditions.checkNotNull(vector); | ||
| this.elementColumnVector = Preconditions.checkNotNull(elementColumnVector); | ||
| } | ||
| } |
63 changes: 63 additions & 0 deletions
63
...k-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBigIntColumnVector.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.auron.flink.arrow.vectors; | ||
|
|
||
| import org.apache.arrow.vector.BigIntVector; | ||
| import org.apache.flink.table.data.columnar.vector.LongColumnVector; | ||
| import org.apache.flink.util.Preconditions; | ||
|
|
||
| /** | ||
| * A Flink {@link LongColumnVector} backed by an Arrow {@link BigIntVector}. | ||
| * | ||
| * <p>This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access | ||
| * to Arrow data from Flink's columnar batch execution engine. | ||
| */ | ||
| public final class ArrowBigIntColumnVector implements LongColumnVector { | ||
|
|
||
| private BigIntVector vector; | ||
|
|
||
| /** | ||
| * Creates a new wrapper around the given Arrow {@link BigIntVector}. | ||
| * | ||
| * @param vector the Arrow vector to wrap, must not be null | ||
| */ | ||
| public ArrowBigIntColumnVector(BigIntVector vector) { | ||
| this.vector = Preconditions.checkNotNull(vector); | ||
| } | ||
|
|
||
| /** {@inheritDoc} */ | ||
| @Override | ||
| public boolean isNullAt(int i) { | ||
| return vector.isNull(i); | ||
| } | ||
|
|
||
| /** {@inheritDoc} */ | ||
| @Override | ||
| public long getLong(int i) { | ||
| return vector.get(i); | ||
| } | ||
|
|
||
| /** | ||
| * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch | ||
| * without allocating a new wrapper. | ||
| * | ||
| * @param vector the new Arrow vector, must not be null | ||
| */ | ||
| void setVector(BigIntVector vector) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Recommend removing the setVector method
|
||
| this.vector = Preconditions.checkNotNull(vector); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
createColumnVectormethod forTIME_WITHOUT_TIME_ZONEunconditionally casts the Arrow vector toTimeMicroVector, butFlinkArrowUtils.toArrowType()maps TIME to different Arrow time units based on precision:TimeSecVector(notTimeMicroVector) →ClassCastExceptionat runtimeTimeMilliVector(notTimeMicroVector) →ClassCastExceptionat runtimeTimeMicroVector← only this precision range works correctlyTimeNanoVector(notTimeMicroVector) →ClassCastExceptionat runtimeFor
TIMESTAMP_WITHOUT_TIME_ZONE/TIMESTAMP_WITH_LOCAL_TIME_ZONE, the cast toTimeStampVectorwill not fail (it is the parent class of all timestamp vectors), but the conversionmicros / 1000and% 1000is only correct for microsecond-precision vectors. For other precisions:TimeStampSecVector:vector.get(i)returns seconds, but is treated as microseconds → result is off by 10^6TimeStampMilliVector: treated as microseconds → off by 10^3TimeStampNanoVector: treated as microseconds → off by 10^3 (nanos ÷ 1000 for millis, nanos % 1000 * 1000 for nano)The
createColumnVectormethod needs to dispatch on the TIME/TIMESTAMP precision (via theLogicalType's precision) and apply the correct unit conversion and vector cast for each precision range, consistent withFlinkArrowUtils.toArrowType(). The comment claiming "writer normalizes all TIME/TIMESTAMP values to microseconds regardless of declared precision" is incorrect —FlinkArrowUtils.toArrowType()selects the Arrow time unit based on precision.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch on the FlinkArrowUtils.toArrowType() mapping. The reader is designed as the inverse of the
writer (PR #1930), which per the design discussion normalizes all TIME/TIMESTAMP values to microsecond
precision in Arrow, regardless of the declared Flink precision. This is documented in the design doc
(section 3.3) and in the code comments on the TIME/TIMESTAMP switch cases.
That said, this coupling to the writer's normalization behavior is worth highlighting. If the writer's
approach changes to preserve precision-dependent time units, the reader would need to be updated to
match. Will need to confirm with @x-tong on PR #1930 that the writer does indeed normalize to microseconds.