Conversation
| )(ScioUtil.strippedPath(path), suffix) | ||
| val dynamicDestinations = DynamicFileDestinations | ||
| .constant(fp, SerializableFunctions.identity[T]) | ||
| val job = Job.getInstance(conf) |
There was a problem hiding this comment.
The Job API has known issues with Java 25; it triggers the Subject#getSubject call discussed here: apache/parquet-java#3328
We don't actually need to use Job; we were just using it as a convenient way to manage distinct Configurations, but I don't see a reason we can't drop Job and just use Configuration directly.
There was a problem hiding this comment.
Calling Job.getInstance(conf) seems to copy the conf values into a new object while this would mutate the input conf ... could be an issue in e.g. tests or something?
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5883 +/- ##
==========================================
+ Coverage 61.52% 61.54% +0.01%
==========================================
Files 317 317
Lines 11663 11653 -10
Branches 869 822 -47
==========================================
- Hits 7176 7172 -4
+ Misses 4487 4481 -6 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
The test "support options from optionsFile" was failing on Java 25 because ScioOptions was not explicitly registered with PipelineOptionsFactory, causing getOptionsFile to return null. Changes: - Register ScioOptions in parseArguments to ensure --optionsFile is recognized Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
| @@ -90,7 +90,7 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType]( | |||
| val cls = ScioUtil.classOf[T] | |||
| val job = Job.getInstance(conf) | |||
There was a problem hiding this comment.
I was unable to remove this instance of Job API becuase Magnolify's ParquetType#setupInput requires it: https://github.com/spotify/magnolify/blob/v0.9.3/parquet/src/main/scala/magnolify/parquet/ParquetType.scala#L66-L70
It references some private methods so I can't just inline the method logic. We can refactor this in a future Magnolify version.
Thus, legacy (non-SDF) typed Parquet reads are currently not supported on Java 25. However, based on the linked parquet-java thread, looks like the issue with the Job API is set to be fixed in the upcoming Hadoop 3.4.3 release (unfortunately the 3.4.3 release candidate resolver link expired so I wasn't able to validate that).
There was a problem hiding this comment.
This is a stale comment after your magnolify PR, yes? We should just do a magnolify release to support both variants and remove the Job dep here
| ): (T, Args) = { | ||
| val optClass = ScioUtil.classOf[T] | ||
| PipelineOptionsFactory.register(optClass) | ||
| PipelineOptionsFactory.register(classOf[ScioOptions]) |
| )(ScioUtil.strippedPath(path), suffix) | ||
| val dynamicDestinations = DynamicFileDestinations | ||
| .constant(fp, SerializableFunctions.identity[T]) | ||
| val job = Job.getInstance(conf) |
There was a problem hiding this comment.
Calling Job.getInstance(conf) seems to copy the conf values into a new object while this would mutate the input conf ... could be an issue in e.g. tests or something?
| @@ -90,7 +90,7 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType]( | |||
| val cls = ScioUtil.classOf[T] | |||
| val job = Job.getInstance(conf) | |||
There was a problem hiding this comment.
This is a stale comment after your magnolify PR, yes? We should just do a magnolify release to support both variants and remove the Job dep here
|
|
||
| FileInputFormat.setInputPaths(job, path) | ||
| GcsConnectorUtil.setCredentials(conf) | ||
| conf.set(FileInputFormat.INPUT_DIR, StringUtils.escapeString(path)); |
There was a problem hiding this comment.
If we're not calling FileInputFormat.setInputPaths then the creds aren't required, no?
Also the internals of setInputPaths is more like:
val p = StringUtils.stringToPath(path)
val inputDir = StringUtils.escapeString(p.getFileSystem(conf).makeQualified(p).toString());
conf.set(FileInputFormat.INPUT_DIR, inputDir);
There was a problem hiding this comment.
yeah I saw that - but I don't think it's needed for our use case where we know path is either a single file path or glob. Beam already checks filesystem prefix no?
There was a problem hiding this comment.
I thought the qualification part might differ between filesystems but tbh I didn't dig into that level
| val rows = Seq( | ||
| (() => createConfig(true), "splittable"), | ||
| (() => ParquetAvroIO.ReadParam.DefaultConfiguration, "default") | ||
| ) ++ ( |
There was a problem hiding this comment.
Bump magnolify post-release and this should be OK, no?
There was a problem hiding this comment.
Sadly no - that issue is solved and a new one GetSubject call popped up coming from Beam's HadoopFormatIO internals
(the other Parquet[Type,Example]IO tests don't test real reads on the legacy pathway, so we only need it here...)
There was a problem hiding this comment.
Validated that latest Hadoop 3.4.3 RC fixes the outstanding issues with parquet legacy reads. not sure when the planned stable release date is though.
Java 25 is supported as of Beam 2.69. Let's support in Scio too
Change summary for Java 25 upgrade:
[11, 17, 21]to[17, 21, 25]. Note that we're still compiling to 11 bytecode.org.apache.hadoop.mapreduce.JobAPI, which we used as a convenient shorthand for managing distinct Configurations, and just usingorg.apache.hadoop.conf.Configurationobjects directly. (See comment below).parquetAvroFile[T](...)and.parquetExampleFile, but not for.typedParquetFile, due to a limitation with the current magnolify-parquet implementation preventing us from totally tearing out uses of theJobAPI. However, SplittableDoFn-based reads (which are currently enabled by default) work across all Parquet read formats.-proc:fulljava option (thanks Claude!)--optionsFile=...by explicitly registeringScioOptionsTesting: