-
Notifications
You must be signed in to change notification settings - Fork 731
Closed
Description
Expected behaviour
Writing a dataframe with a geometry column nested within an array to Parquet, can be read back without issues.
Actual behaviour
Py4JJavaError: An error occurred while calling o232.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 1 times, most recent failure: Lost task 0.0 in stage 26.0 (TID 36) (17e1c21c3c4d executor driver): java.lang.IllegalArgumentException: Spark type: StructType(StructField(geometry,BinaryType,true)) doesn't match the type: StructType(StructField(geometry,org.apache.spark.sql.sedona_sql.UDT.GeometryUDT@1114c728,true)) in column vector
at org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:70)
at org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:117)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:286)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:306)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:293)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:217)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4160)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4157)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalArgumentException: Spark type: StructType(StructField(geometry,BinaryType,true)) doesn't match the type: StructType(StructField(geometry,org.apache.spark.sql.sedona_sql.UDT.GeometryUDT@1114c728,true)) in column vector
at org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:70)
at org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:117)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:286)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:306)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:293)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:217)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
... 1 more
Steps to reproduce
The following minimal script reproduces the issue:
# pip install apache-sedona==1.7.2 shapely pyproj pyspark==3.5.0
from sedona.spark import SedonaContext
from pyspark.sql.functions import array, struct, expr
config = (
SedonaContext.builder()
.config(
"spark.jars.packages",
"org.apache.sedona:sedona-spark-3.5_2.12:1.7.2,"
"org.datasyslab:geotools-wrapper:1.7.2-28.5",
)
.config(
"spark.jars.repositories",
"https://artifacts.unidata.ucar.edu/repository/unidata-all",
)
.getOrCreate()
)
spark = SedonaContext.create(config)
df = spark.createDataFrame([{'id': 'something'}]).withColumn("nested", array(struct(expr("ST_Point(0, 0)").alias("geometry"))))
df.write.mode("overwrite").parquet("/tmp/test")
print(spark.read.parquet("/tmp/test").collect())
Notes
- The issue only happens with that particular nested structure. e.g. nesting without the
array
works just fine. - The issue only happens when using pyspark 3.5. However, with pyspark 3.3 (and python 3.10) it works fine.
- When adding geometry column at the root and then using GeoParquet as the format, it works fine.
- In Databriks 15.4 with a similar setup, also works.
Environment
- Python 3.11.13
- Java version = 17