Skip to content

Handle stale fetch streams after worker restart#3632

Closed
mzhou-oai wants to merge 10 commits intoapache:mainfrom
mzhou-oai:dev/mzhou/dpcdi-3260-stale-stream-reopen-upstream
Closed

Handle stale fetch streams after worker restart#3632
mzhou-oai wants to merge 10 commits intoapache:mainfrom
mzhou-oai:dev/mzhou/dpcdi-3260-stale-stream-reopen-upstream

Conversation

@mzhou-oai
Copy link

@mzhou-oai mzhou-oai commented Mar 17, 2026

Problem Statement

A worker restart can leave an in-flight reducer holding a stale streamId even though the worker comes back on the same stable hostname and still has the shuffle data on disk. In that case the worker correctly reports:

  • Stream <id> is not registered with worker. This can happen if the worker was restart recently.

That is a restart-specific condition, not necessarily a hard worker failure. Live stream registrations are process-local and are not reconstructed by recoverPath, so the old streamId cannot be used after the worker process comes back.

Before this change, CelebornInputStream treated that response like a generic fetch failure. It excluded the worker, consumed normal retry budget and backoff, and entered the usual peer-failover or retry path instead of reopening the stream on the same worker. As a result, a Celeborn worker restart can strand active Spark tasks in Celeborn fetch retry loops even when the worker is already back and the shuffle data is still available.

Proposal

This change makes stale-stream handling explicit in the client retry path:

  • mark the stale-stream case with a stable coded failure marker on the existing ChunkFetchFailure error string
  • parse that marker into ChunkFetchFailureException and treat it as the structured signal for same-worker reopen
  • keep a fallback to the legacy human-readable message so new clients still recover from older workers that do not send the code yet
  • do not classify that specific failure as a critical fetch cause
  • do not exclude the restarted worker before retrying
  • recreate the reader on the same PartitionLocation with pbStreamHandler = null so the client issues a fresh OPEN_STREAM
  • reuse checkpoint metadata so already returned chunks are skipped on the reopened stream

Validation

  • mvn -pl client -am -DskipTests compile
  • mvn -pl common -am -Dtest=ExceptionUtilsSuiteJ,TransportResponseHandlerSuiteJ test
  • Verified the fix e2e on a running celeborn cluster in k8s, ran tpc-ds 3TB benchmark.

(cherry picked from commit 8664706461c398ab48b541075a7ee11e2717a155)
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add an end to end test with a worker that is restarted ?

Throwable current = throwable;
while (current != null) {
String message = current.getMessage();
if (message != null && message.contains("is not registered with worker")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is too fragile, we need a better way.
Can we add some error code ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, just did an attempt, ChunkFetchFailureException doesn't carry an error code ,and it's hard to wire in RPC. see the commit history for the attempts.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eolivelli Please let me know which revision is preferred

@mzhou-oai
Copy link
Author

mzhou-oai commented Mar 17, 2026

Can you please add an end to end test with a worker that is restarted ?

Yes, it's verified e2e. let me update the PR body about that.

@mzhou-oai mzhou-oai closed this Mar 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants