Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.auron.flink.arrow;

import java.util.List;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.auron.flink.arrow.vectors.ArrowArrayColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowBigIntColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowBooleanColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowDateColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowDecimalColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowDoubleColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowFloatColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowIntColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowMapColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowRowColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowSmallIntColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowTimeColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowTimestampColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowTinyIntColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowVarBinaryColumnVector;
import org.apache.auron.flink.arrow.vectors.ArrowVarCharColumnVector;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.columnar.ColumnarRowData;
import org.apache.flink.table.data.columnar.vector.ColumnVector;
import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

/**
* Reads Arrow {@link VectorSchemaRoot} data as Flink {@link RowData}.
*
* <p>Uses Flink's columnar data structures ({@link ColumnarRowData} backed by {@link
* VectorizedColumnBatch}) for zero-copy access to Arrow vectors. Each Arrow {@link FieldVector} is
* wrapped in a Flink {@link ColumnVector} implementation that delegates reads directly to the
* underlying Arrow vector.
*
* <p>Object reuse: {@link #read(int)} returns the same {@link ColumnarRowData} instance with a
* different row ID. Callers must consume or copy the returned row before the next call. This is
* standard Flink practice for columnar readers.
*
* <p>Implements {@link AutoCloseable} for use in resource management blocks. Note: this does NOT
* close the underlying {@link VectorSchemaRoot}. The caller that created the root is responsible
* for its lifecycle.
*/
public class FlinkArrowReader implements AutoCloseable {

private final RowType rowType;
private ColumnVector[] columnVectors;
private VectorizedColumnBatch batch;
private ColumnarRowData reusableRow;
private VectorSchemaRoot root;

private FlinkArrowReader(ColumnVector[] columnVectors, VectorSchemaRoot root, RowType rowType) {
this.columnVectors = columnVectors;
this.batch = new VectorizedColumnBatch(columnVectors);
this.reusableRow = new ColumnarRowData(batch);
this.root = root;
this.rowType = rowType;
}

/**
* Creates a {@link FlinkArrowReader} from a {@link VectorSchemaRoot} and {@link RowType}.
*
* <p>The RowType must match the schema of the VectorSchemaRoot (same number of fields, matching
* types). Each Arrow field vector is wrapped in the appropriate Flink {@link ColumnVector}
* implementation based on the corresponding Flink {@link LogicalType}.
*
* @param root the Arrow VectorSchemaRoot containing the data
* @param rowType the Flink RowType describing the schema
* @return a new FlinkArrowReader
* @throws IllegalArgumentException if field counts do not match
* @throws UnsupportedOperationException if a LogicalType is not supported
*/
public static FlinkArrowReader create(VectorSchemaRoot root, RowType rowType) {
Preconditions.checkNotNull(root, "root must not be null");
Preconditions.checkNotNull(rowType, "rowType must not be null");
List<FieldVector> fieldVectors = root.getFieldVectors();
List<RowType.RowField> fields = rowType.getFields();
if (fieldVectors.size() != fields.size()) {
throw new IllegalArgumentException(
"VectorSchemaRoot has " + fieldVectors.size() + " fields but RowType has " + fields.size());
}
ColumnVector[] columns = new ColumnVector[fieldVectors.size()];
for (int i = 0; i < fieldVectors.size(); i++) {
columns[i] = createColumnVector(fieldVectors.get(i), fields.get(i).getType());
}
return new FlinkArrowReader(columns, root, rowType);
}

/**
* Reads a row at the given position. Returns a reused {@link RowData} object — callers must not
* hold references across calls.
*
* @param rowId the row index within the current batch
* @return the row data at the given position
*/
public RowData read(int rowId) {
reusableRow.setRowId(rowId);
return reusableRow;
}

/**
* Returns the number of rows in the current batch.
*
* @return the row count
*/
public int getRowCount() {
return root.getRowCount();
}

/**
* Resets the reader to use a new {@link VectorSchemaRoot} with the same schema. Recreates
* column vector wrappers for the new root's field vectors.
*
* <p>The new root must have the same schema (same number and types of fields) as the original.
*
* @param newRoot the new VectorSchemaRoot, must not be null
*/
public void reset(VectorSchemaRoot newRoot) {
Preconditions.checkNotNull(newRoot, "newRoot must not be null");
this.root = newRoot;
List<FieldVector> newVectors = newRoot.getFieldVectors();
Preconditions.checkArgument(
newVectors.size() == columnVectors.length,
"New root has %s fields but reader expects %s",
newVectors.size(),
columnVectors.length);
List<RowType.RowField> fields = rowType.getFields();
for (int i = 0; i < columnVectors.length; i++) {
columnVectors[i] =
createColumnVector(newVectors.get(i), fields.get(i).getType());
}
this.batch = new VectorizedColumnBatch(columnVectors);
this.reusableRow = new ColumnarRowData(batch);
}

/**
* Implements {@link AutoCloseable} for use in resource management blocks. Note: this does NOT
* close the underlying {@link VectorSchemaRoot}. The caller that created the root is responsible
* for its lifecycle.
*/
@Override
public void close() {
// Reader is a view; root lifecycle managed by caller.
}

/**
* Creates the appropriate Flink {@link ColumnVector} wrapper for the given Arrow {@link
* FieldVector} and Flink {@link LogicalType}.
*/
static ColumnVector createColumnVector(FieldVector vector, LogicalType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
return new ArrowBooleanColumnVector((BitVector) vector);
case TINYINT:
return new ArrowTinyIntColumnVector((TinyIntVector) vector);
case SMALLINT:
return new ArrowSmallIntColumnVector((SmallIntVector) vector);
case INTEGER:
return new ArrowIntColumnVector((IntVector) vector);
case BIGINT:
return new ArrowBigIntColumnVector((BigIntVector) vector);
case FLOAT:
return new ArrowFloatColumnVector((Float4Vector) vector);
case DOUBLE:
return new ArrowDoubleColumnVector((Float8Vector) vector);
case VARCHAR:
case CHAR:
return new ArrowVarCharColumnVector((VarCharVector) vector);
case VARBINARY:
case BINARY:
return new ArrowVarBinaryColumnVector((VarBinaryVector) vector);
case DECIMAL:
return new ArrowDecimalColumnVector((DecimalVector) vector);
case DATE:
return new ArrowDateColumnVector((DateDayVector) vector);
case TIME_WITHOUT_TIME_ZONE:
// The writer (FlinkArrowFieldWriter) normalizes all TIME values to microseconds
// in a TimeMicroVector, regardless of the declared Flink TIME precision.
return new ArrowTimeColumnVector((TimeMicroVector) vector);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
// The writer normalizes all timestamps to microseconds. TimeStampVector is the
// common parent of TimeStampMicroVector and TimeStampMicroTZVector.
return new ArrowTimestampColumnVector((TimeStampVector) vector);
Comment on lines +212 to +220
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The createColumnVector method for TIME_WITHOUT_TIME_ZONE unconditionally casts the Arrow vector to TimeMicroVector, but FlinkArrowUtils.toArrowType() maps TIME to different Arrow time units based on precision:

  • TIME(0) → TimeSecVector (not TimeMicroVector) → ClassCastException at runtime
  • TIME(1-3) → TimeMilliVector (not TimeMicroVector) → ClassCastException at runtime
  • TIME(4-6) → TimeMicroVector ← only this precision range works correctly
  • TIME(7+) → TimeNanoVector (not TimeMicroVector) → ClassCastException at runtime

For TIMESTAMP_WITHOUT_TIME_ZONE/TIMESTAMP_WITH_LOCAL_TIME_ZONE, the cast to TimeStampVector will not fail (it is the parent class of all timestamp vectors), but the conversion micros / 1000 and % 1000 is only correct for microsecond-precision vectors. For other precisions:

  • TIMESTAMP(0) → TimeStampSecVector: vector.get(i) returns seconds, but is treated as microseconds → result is off by 10^6
  • TIMESTAMP(1-3) → TimeStampMilliVector: treated as microseconds → off by 10^3
  • TIMESTAMP(7+) → TimeStampNanoVector: treated as microseconds → off by 10^3 (nanos ÷ 1000 for millis, nanos % 1000 * 1000 for nano)

The createColumnVector method needs to dispatch on the TIME/TIMESTAMP precision (via the LogicalType's precision) and apply the correct unit conversion and vector cast for each precision range, consistent with FlinkArrowUtils.toArrowType(). The comment claiming "writer normalizes all TIME/TIMESTAMP values to microseconds regardless of declared precision" is incorrect — FlinkArrowUtils.toArrowType() selects the Arrow time unit based on precision.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the FlinkArrowUtils.toArrowType() mapping. The reader is designed as the inverse of the
writer (PR #1930), which per the design discussion normalizes all TIME/TIMESTAMP values to microsecond
precision in Arrow, regardless of the declared Flink precision. This is documented in the design doc
(section 3.3) and in the code comments on the TIME/TIMESTAMP switch cases.

That said, this coupling to the writer's normalization behavior is worth highlighting. If the writer's
approach changes to preserve precision-dependent time units, the reader would need to be updated to
match. Will need to confirm with @x-tong on PR #1930 that the writer does indeed normalize to microseconds.

case ARRAY:
return createArrayColumnVector((ListVector) vector, (ArrayType) type);
case MAP:
return createMapColumnVector((MapVector) vector, (MapType) type);
case ROW:
return createRowColumnVector((StructVector) vector, (RowType) type);
default:
throw new UnsupportedOperationException(
"Unsupported Flink type for Arrow reader: " + type.asSummaryString());
}
}

private static ColumnVector createArrayColumnVector(ListVector vector, ArrayType arrayType) {
ColumnVector elementVector = createColumnVector(vector.getDataVector(), arrayType.getElementType());
return new ArrowArrayColumnVector(vector, elementVector);
}

private static ColumnVector createMapColumnVector(MapVector vector, MapType mapType) {
StructVector entriesVector = (StructVector) vector.getDataVector();
ColumnVector keyVector = createColumnVector(entriesVector.getChild(MapVector.KEY_NAME), mapType.getKeyType());
ColumnVector valueVector =
createColumnVector(entriesVector.getChild(MapVector.VALUE_NAME), mapType.getValueType());
return new ArrowMapColumnVector(vector, keyVector, valueVector);
}

private static ColumnVector createRowColumnVector(StructVector vector, RowType rowType) {
List<FieldVector> childVectors = vector.getChildrenFromFields();
List<RowType.RowField> fields = rowType.getFields();
ColumnVector[] childColumns = new ColumnVector[childVectors.size()];
for (int i = 0; i < childVectors.size(); i++) {
childColumns[i] =
createColumnVector(childVectors.get(i), fields.get(i).getType());
}
return new ArrowRowColumnVector(vector, childColumns);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.auron.flink.arrow.vectors;

import org.apache.arrow.vector.complex.ListVector;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.columnar.ColumnarArrayData;
import org.apache.flink.table.data.columnar.vector.ArrayColumnVector;
import org.apache.flink.table.data.columnar.vector.ColumnVector;
import org.apache.flink.util.Preconditions;

/**
* A Flink {@link ArrayColumnVector} backed by an Arrow {@link ListVector}.
*
* <p>This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access
* to Arrow list data from Flink's columnar batch execution engine. The child element vector is
* itself a Flink {@link ColumnVector} wrapping the corresponding Arrow child vector, enabling
* recursive nesting of complex types.
*/
public final class ArrowArrayColumnVector implements ArrayColumnVector {

private ListVector vector;
private ColumnVector elementColumnVector;

/**
* Creates a new wrapper around the given Arrow {@link ListVector}.
*
* @param vector the Arrow list vector to wrap, must not be null
* @param elementColumnVector the Flink column vector wrapping the Arrow child element vector,
* must not be null
*/
public ArrowArrayColumnVector(ListVector vector, ColumnVector elementColumnVector) {
this.vector = Preconditions.checkNotNull(vector);
this.elementColumnVector = Preconditions.checkNotNull(elementColumnVector);
}

/** {@inheritDoc} */
@Override
public boolean isNullAt(int i) {
return vector.isNull(i);
}

/** {@inheritDoc} */
@Override
public ArrayData getArray(int i) {
int offset = vector.getOffsetBuffer().getInt((long) i * ListVector.OFFSET_WIDTH);
int length = vector.getOffsetBuffer().getInt((long) (i + 1) * ListVector.OFFSET_WIDTH) - offset;
return new ColumnarArrayData(elementColumnVector, offset, length);
}

/**
* Replaces the underlying Arrow vector and child element vector. Used during reader reset to
* point at a new batch without allocating a new wrapper.
*
* @param vector the new Arrow list vector, must not be null
* @param elementColumnVector the new Flink column vector for child elements, must not be null
*/
void setVector(ListVector vector, ColumnVector elementColumnVector) {
this.vector = Preconditions.checkNotNull(vector);
this.elementColumnVector = Preconditions.checkNotNull(elementColumnVector);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.auron.flink.arrow.vectors;

import org.apache.arrow.vector.BigIntVector;
import org.apache.flink.table.data.columnar.vector.LongColumnVector;
import org.apache.flink.util.Preconditions;

/**
* A Flink {@link LongColumnVector} backed by an Arrow {@link BigIntVector}.
*
* <p>This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access
* to Arrow data from Flink's columnar batch execution engine.
*/
public final class ArrowBigIntColumnVector implements LongColumnVector {

private BigIntVector vector;

/**
* Creates a new wrapper around the given Arrow {@link BigIntVector}.
*
* @param vector the Arrow vector to wrap, must not be null
*/
public ArrowBigIntColumnVector(BigIntVector vector) {
this.vector = Preconditions.checkNotNull(vector);
}

/** {@inheritDoc} */
@Override
public boolean isNullAt(int i) {
return vector.isNull(i);
}

/** {@inheritDoc} */
@Override
public long getLong(int i) {
return vector.get(i);
}

/**
* Replaces the underlying Arrow vector. Used during reader reset to point at a new batch
* without allocating a new wrapper.
*
* @param vector the new Arrow vector, must not be null
*/
void setVector(BigIntVector vector) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend removing the setVector method

  1. Simplifies the code
  2. setVector merely eliminates a small amount of overhead from the wrapper class

this.vector = Preconditions.checkNotNull(vector);
}
}
Loading
Loading