Skip to content

Conversation

@ministat
Copy link
Contributor

@ministat ministat commented Feb 9, 2025

set the repartition number by, for example, 'gremlin.spark.outputRepartition=500'. only integer values larger than 0 is valid input. Otherwise the repartition will be skipped silently.

@codecov-commenter
Copy link

codecov-commenter commented Feb 9, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 78.06%. Comparing base (cfd6889) to head (c28455a).
Report is 129 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3026      +/-   ##
============================================
+ Coverage     77.87%   78.06%   +0.19%     
- Complexity    13578    13926     +348     
============================================
  Files          1015     1019       +4     
  Lines         59308    59933     +625     
  Branches       6835     6950     +115     
============================================
+ Hits          46184    46787     +603     
+ Misses        10817    10814       -3     
- Partials       2307     2332      +25     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
final String repartitionString = hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this int parsing can be moved inside the if block as it doesn't need to be done unless the outputLocation is non-null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accept

graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
JavaPairRDD<Object, VertexWritable> javaPairRDD = graphRDD;
if (repartition > 0) {
javaPairRDD = javaPairRDD.repartition(repartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this configuration also apply to the writeMemoryRDD method? If not should the configuration name be more specific to exclude memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be applied to both GraphRDD and MemoryRDD.

*/
private <K, V> JavaPairRDD<K, V> repartitionJavaPairRDD(final org.apache.hadoop.conf.Configuration hadoopConfiguration, JavaPairRDD<K, V> graphRDD) {
JavaPairRDD<K, V> javaPairRDD = graphRDD;
final String repartitionString = hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does PersistedOutputRDD also need to be aware of this configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PersistedOutputRDD wants to persist the RDD, which is a little different from writing RDD to HDFS and does not generate small files. But in order to keep consistent, I also apply the change.
Here is the summary from ChatGPT:

In summary, persisting an RDD in Spark does not directly lead to the generation of small files in HDFS. The generation of small files in HDFS is more commonly associated with writing RDD data to HDFS using methods like saveAsTextFile, which can happen independently of persisting the RDD in memory or disk.

@andreachild
Copy link
Contributor

VOTE +1

@Cole-Greer
Copy link
Contributor

Cole-Greer commented Feb 13, 2025

What's the impact of this change if the users do not explicitly configure the spark output partitioning? Does this change impact the default behaviour in any meaningful way?

Also is this intended to be targeting the master branch or is it intended for 3.7-dev?

Could you also add a quick changelog entry and document the new configuration in https://github.com/apache/tinkerpop/blob/master/docs/src/reference/implementations-spark.asciidoc?

/**
* Allow users to customize the RDD partitions to reduce HDFS small files
*/
private static <K, V> JavaPairRDD<K, V> repartitionJavaPairRDD(final Configuration configuration, JavaPairRDD<K, V> graphRDD) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can pull this common code into the shared interface OutputRDD as a default method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@ministat
Copy link
Contributor Author

ministat commented Feb 14, 2025

What's the impact of this change if the users do not explicitly configure the spark output partitioning? Does this change impact the default behaviour in any meaningful way?

If not explicitly configure this option, the output partitions number is determined by the input dataset. If the input data contains many partitions based on the default partition policy in Spark, that will cause small HDFS files problem. That is why I create this PR. But if user can tolerate small file problem, it is ok to not configure this option.

Also is this intended to be targeting the master branch or is it intended for 3.7-dev?

In my previous PR, I targeted 3.7-dev, so I follow it here. Shall I change to target for master?

Could you also add a quick changelog entry and document the new configuration in https://github.com/apache/
tinkerpop/blob/master/docs/src/reference/implementations-spark.asciidoc?

Sure

set the repartition number by, for example, 'gremlin.spark.outputRepartition=500'.
only integer values larger than 0 is valid input. Otherwise the
repartition will be skipped silently.
@Cole-Greer
Copy link
Contributor

Also is this intended to be targeting the master branch or is it intended for 3.7-dev?

In my previous PR, I targeted 3.7-dev, so I follow it here. Shall I change to target for master?

I'm not sure which previous PR you're referencing here. Our branching strategy is such that any change to an older development branch will be merged up into newer dev branches such that it gets included in all upcoming releases. A PR which targets 3.7-dev will also get merged up into master, but a PR which targets master will only be merged there. In other words, a PR which targets 3.7-dev will be included in the upcoming 3.7.4 and 4.0.0 releases, where a PR targeting master will only be included in the 4.0.0 release.

This is a non-breaking change so it is ok to target 3.7-dev if you would like it included in 3.7.4.

VOTE +1 (pending confirmation of desired target branch)

@ministat
Copy link
Contributor Author

Thanks for explanation. I hope this change can be included in the upcoming release.

@Cole-Greer Cole-Greer changed the base branch from master to 3.7-dev February 18, 2025 19:22
@Cole-Greer
Copy link
Contributor

I hope this change can be included in the upcoming release.

I've re-targeted this PR for 3.7-dev

@Cole-Greer Cole-Greer merged commit ff6c5d4 into apache:3.7-dev Feb 24, 2025
21 checks passed
flora-jin pushed a commit to flora-jin/tinkerpop that referenced this pull request Feb 24, 2025
set the repartition number by, for example, 'gremlin.spark.outputRepartition=500'.
only integer values larger than 0 is valid input. Otherwise the
repartition will be skipped silently.

Co-authored-by: Hongjiang Zhang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants