-
Notifications
You must be signed in to change notification settings - Fork 730
[GH-2240] Fix write and read nested geometry array using vectorized parquet reader #2359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GH-2240] Fix write and read nested geometry array using vectorized parquet reader #2359
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR addresses SPARK-48942, fixing a bug where reading nested geometry arrays from Parquet files fails when using Spark's vectorized reader. The solution implements compatibility checks for UserDefinedTypes (UDTs) in nested structures and adds workaround utilities for schema transformation.
- Adds UDT compatibility checking in Parquet column vector operations to handle type mismatches between logical and physical schemas
- Implements schema transformation utilities to convert nested GeometryUDT to BinaryType for Parquet compatibility
- Provides comprehensive test coverage for various nested geometry scenarios including arrays of structs and deeply nested structures
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
geoparquetIOTests.scala | Adds test cases for nested geometry array scenarios and GeoParquet format validation |
TransformNestedUDTForParquet.scala | Expression class for transforming nested GeometryUDT schemas to BinaryType |
TransformNestedUDTParquet.scala | Catalyst rule for automatic schema transformation in Parquet reading operations |
SedonaContext.scala | Registers the new transformation rule in the optimization pipeline |
ParquetColumnVector.java | Enhanced type compatibility checking with caching for UDT and nested type comparisons |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
...java/org/apache/spark/sql/execution/datasources/geoparquet/internal/ParquetColumnVector.java
Outdated
Show resolved
Hide resolved
...java/org/apache/spark/sql/execution/datasources/geoparquet/internal/ParquetColumnVector.java
Outdated
Show resolved
Hide resolved
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/TransformNestedUDTParquet.scala
Outdated
Show resolved
Hide resolved
...java/org/apache/spark/sql/execution/datasources/geoparquet/internal/ParquetColumnVector.java
Show resolved
Hide resolved
/** | ||
* Transform a schema to handle nested UDT by processing each top-level field. This preserves | ||
* top-level GeometryUDT fields while transforming nested ones to BinaryType. | ||
*/ | ||
private def transformSchemaForNestedUDT(schema: StructType): StructType = { | ||
StructType( | ||
schema.fields.map(field => field.copy(dataType = transformTopLevelUDT(field.dataType)))) | ||
} | ||
|
||
/** | ||
* Transform a top-level field's data type, preserving GeometryUDT at the top level but | ||
* converting nested GeometryUDT to BinaryType. | ||
*/ | ||
private def transformTopLevelUDT(dataType: DataType): DataType = { | ||
dataType match { | ||
case ArrayType(elementType, containsNull) => | ||
ArrayType(transformNestedUDTToBinary(elementType), containsNull) | ||
case MapType(keyType, valueType, valueContainsNull) => | ||
MapType( | ||
transformNestedUDTToBinary(keyType), | ||
transformNestedUDTToBinary(valueType), | ||
valueContainsNull) | ||
case StructType(fields) => | ||
StructType( | ||
fields.map(field => field.copy(dataType = transformNestedUDTToBinary(field.dataType)))) | ||
case _: GeometryUDT => dataType // Preserve top-level GeometryUDT | ||
case other => other | ||
} | ||
} | ||
|
||
/** | ||
* Recursively transform nested data types, converting ALL GeometryUDT to BinaryType. This is | ||
* used for nested structures where GeometryUDT must be converted. | ||
*/ | ||
private def transformNestedUDTToBinary(dataType: DataType): DataType = { | ||
dataType match { | ||
case _: GeometryUDT => BinaryType | ||
case ArrayType(elementType, containsNull) => | ||
ArrayType(transformNestedUDTToBinary(elementType), containsNull) | ||
case MapType(keyType, valueType, valueContainsNull) => | ||
MapType( | ||
transformNestedUDTToBinary(keyType), | ||
transformNestedUDTToBinary(valueType), | ||
valueContainsNull) | ||
case StructType(fields) => | ||
StructType( | ||
fields.map(field => field.copy(dataType = transformNestedUDTToBinary(field.dataType)))) | ||
case udt: UserDefinedType[_] => transformNestedUDTToBinary(udt.sqlType) | ||
case other => other | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we can merge these 3 functions into one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we can combine transformTopLevelUDT() and transformNestedUDTToBinary() for sure since they are just different in a boollean flag. I'd keep transformSchemaForNestedUDT separate as the entry point for schema.
...ala/org/apache/spark/sql/sedona_sql/expressions/transform/TransformNestedUDTForParquet.scala
Outdated
Show resolved
Hide resolved
val result = readDf.collect() | ||
assert(result.length == 1) | ||
val nestedArray = result(0).getSeq[Any](0) | ||
assert(nestedArray.length == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the expected type of nested UDT values (binary or geometry object)? According to TransformNestedUDTParquet.scala
I guess it is binary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type read back is actually geometry. The parquet metadata stores GeometryUDT information in the Spark schema metadata, and when it is read back Spark automatically reads this back from the SPARK_METADATA_KEY.
The TransformNestedUDTParquet rule just fixes / ensures that nested GeometryUDT gets properly handled regardless of which metadata source is used.
I have added some tests after read back to test if the regular geometry operations work.
Did you read the Contributor Guide?
Is this PR related to a ticket?
[GH-XXX] my subject
. Closes #<issue_number>What changes were proposed in this PR?
This PR addresses SPARK-48942, a bug that occurs when reading nested geometry arrays from Parquet files using Spark's vectorized reader. The fix implements compatibility checks for UserDefinedTypes (UDTs) in nested structures and adds workaround utilities for schema transformation.
How was this patch tested?
new tests are added to geoparquetIOTests.scala
Did this PR include necessary documentation updates?