Skip to content

Commit f41cbde

Browse files
Allow users to specify trusted Avro serializable classes to Dataflow worker (#36809)
* [Proposal] Allow users to specify trusted Avro serializable classes to Dataflow worker * Fixup boot.go * Add default factory; add tests * Set default options from boot.go; move PipelineOpt to SdkHarnessOptions * Add check for empty list
1 parent eadbc6e commit f41cbde

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

sdks/java/container/boot.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,28 @@ func main() {
276276
args = append(args, "--add-modules="+module.GetStringValue())
277277
}
278278
}
279+
// Add trusted Avro serializable classes
280+
var serializableClassesList []string
281+
if serializableClasses, ok := pipelineOptions.GetStructValue().GetFields()["avroSerializableClasses"]; ok {
282+
for _, cls := range serializableClasses.GetListValue().GetValues() {
283+
// User can specify an empty list, which is serialized as a single, blank value
284+
if cls.GetStringValue() != "" {
285+
serializableClassesList = append(serializableClassesList, cls.GetStringValue())
286+
}
287+
}
288+
} else {
289+
serializableClassesList = []string{
290+
"java.math.BigDecimal",
291+
"java.math.BigInteger",
292+
"java.net.URI",
293+
"java.net.URL",
294+
"java.io.File",
295+
"java.lang.Integer",
296+
}
297+
}
298+
if len(serializableClassesList) > 0 {
299+
args = append(args, "-Dorg.apache.avro.SERIALIZABLE_CLASSES="+strings.Join(serializableClassesList, ","))
300+
}
279301
}
280302
// Automatically open modules for Java 11+
281303
openModuleAgentJar := "/opt/apache/beam/jars/open-module-agent.jar"

sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,4 +440,16 @@ public Duration create(PipelineOptions options) {
440440
int getElementProcessingTimeoutMinutes();
441441

442442
void setElementProcessingTimeoutMinutes(int value);
443+
444+
/**
445+
* The Avro spec supports the `java-class` schema annotation, which allows fields to be serialized
446+
* and deserialized via their toString/String constructor. As of Avro 1.11.4+, allowed Java
447+
* classes must be explicitly specified via the jvm option. The comma-separated String value of
448+
* this pipeline option will be passed to the Dataflow worker via the
449+
* -Dorg.apache.avro.SERIALIZABLE_CLASSES jvm option.
450+
*/
451+
@Description("Serializable classes required by java-class props in Avro 1.11.4+")
452+
List<String> getAvroSerializableClasses();
453+
454+
void setAvroSerializableClasses(List<String> options);
443455
}

0 commit comments

Comments
 (0)