[AMORO-4089] Fix optimizing process stuck permanently after AMS restart#4090
[AMORO-4089] Fix optimizing process stuck permanently after AMS restart#4090j1wonpark wants to merge 6 commits intoapache:masterfrom
Conversation
807224d to
ea63514
Compare
ea63514 to
fe4b1a9
Compare
Fix multiple scenarios where tables become permanently stuck after AMS restart by refactoring initTableRuntime() recovery logic, fixing OptimizerKeeper race condition, and adding defensive measures. Signed-off-by: Jiwon Park <jpark92@outlook.kr>
fe4b1a9 to
fb3ce80
Compare
|
Hi, could you review this PR when you get a chance? This builds on #4043 and handles some additional edge cases in the recovery logic. Thanks! |
PENDING tables (processId=0) should not be reset via completeEmptyProcess() as it incorrectly marks existing snapshots as already optimized. Simply re-add them to the scheduler instead. Also use separate optimizer threads in quota tests to avoid interference with resetStaleTasksForThread. Signed-off-by: Jiwon Park <jpark92@outlook.kr>
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #4090 +/- ##
============================================
+ Coverage 22.39% 22.45% +0.05%
Complexity 2552 2552
============================================
Files 458 458
Lines 42116 42022 -94
Branches 5917 5915 -2
============================================
+ Hits 9433 9435 +2
+ Misses 31871 31775 -96
Partials 812 812
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
lintingbin
left a comment
There was a problem hiding this comment.
Thanks for the thorough work on this PR! The recovery decision matrix is well-thought-out, the refactoring into loadProcess() / canResumeProcess() / resetTableForRecovery() makes the logic much clearer, and the test coverage is excellent.
However, there is a critical bug: the tableRuntime.recover(process) call was removed during the refactoring, which means tableRuntime.getOptimizingProcess() returns null for all resumed processes. This directly breaks OptimizingCommitExecutor (throws IllegalStateException) and causes NPEs in DefaultTableMaintainerContext. Ironically, this means the "all tasks completed but not yet committing" recovery scenario (issue #7 in the description) will still result in a stuck table.
Summary of findings:
| Issue | Severity | Description |
|---|---|---|
Missing recover() call in resume path |
Critical | optimizingProcess is never set → commit fails, NPE in maintainer context |
Process left in tableQueue after beginCommitting() |
Minor | Empty-shell process with no tasks sits in queue indefinitely |
recover() guard relaxation is dead code |
Minor | Becomes relevant only after the critical fix is applied |
completeEmptyProcess() expanded scope |
Minor | Public method now resets any non-IDLE state; may affect future callers |
The OptimizerKeeper race fix, resetStaleTasksForThread, null-safe loadProcess, per-table try-catch, and PENDING table preservation all look correct and well-designed.
| process.getProcessId(), | ||
| tableRuntime.getTableIdentifier()); | ||
| process.close(false); | ||
| tableRuntime.beginCommitting(); |
There was a problem hiding this comment.
Nit: Process left in tableQueue with no remaining tasks
When allTasksPrepared() is true and beginCommitting() is called, the process has already been added to tableQueue (line above) but its taskQueue is empty — all tasks are SUCCESS. This "empty shell" process will sit in tableQueue indefinitely:
pollTask()will iterate over it on every poll but never get a task from it- It won't be cleaned up until
clearProcess()is called after a commit
Consider either not adding it to tableQueue when all tasks are already completed, or removing it after triggering beginCommitting():
if (process.allTasksPrepared()) {
tableQueue.remove(process);
// ...
tableRuntime.beginCommitting();
}There was a problem hiding this comment.
Good catch. You're right that the process sits in tableQueue briefly with an empty taskQueue. In practice it gets cleaned up when the commit completes (persistAndSetCompleted() → clearProcess(this)), so it's short-lived.
That said, removing it proactively is cleaner — I'll update this to:
if (process.allTasksPrepared()) {
tableQueue.remove(process);
tableRuntime.beginCommitting();
}| "Close the committing process {} on table {}", | ||
|
|
||
| if (canResumeProcess(process, tableRuntime)) { | ||
| tableQueue.offer(process); |
There was a problem hiding this comment.
Bug (Critical): Missing tableRuntime.recover(process) — will cause IllegalStateException and NPE at runtime
In the original code, tableRuntime.recover(process) was called to set the optimizingProcess field on DefaultTableRuntime. This call has been completely removed in the new code. As a result, when a process is resumed, tableRuntime.getOptimizingProcess() will return null.
This directly breaks the commit cycle. OptimizingCommitExecutor.execute() does:
Optional.of(tableRuntime)
.map(t -> (DefaultTableRuntime) t)
.map(DefaultTableRuntime::getOptimizingProcess) // returns null!
.orElseThrow(() -> new IllegalStateException(
"OptimizingProcess is null while committing:" + tableRuntime))
.commit();So in the allTasksPrepared recovery path, after beginCommitting() is called, the commit executor will throw IllegalStateException because getOptimizingProcess() returns null. The table will remain stuck — the exact problem this PR aims to fix.
Additionally, this affects other callers:
DefaultTableMaintainerContext.getTargetSnapshotId()→ NPE whenisProcessing()is true- Cancel process via API → silently fails (returns
false) - Disabling self-optimizing → running process is not properly closed
Suggested fix:
if (canResumeProcess(process, tableRuntime)) {
tableRuntime.recover(process); // <-- add this line
tableQueue.offer(process);
if (process.allTasksPrepared()) {
// ...
}
}There was a problem hiding this comment.
Thanks for the thorough review, @lintingbin!
I believe recover() is actually still being called — the TableOptimizingProcess recovery constructor invokes tableRuntime.recover(this) internally when status != ProcessStatus.KILLED:
// OptimizingQueue.java L569-572
this.status = processMeta.getStatus();
if (this.status != ProcessStatus.KILLED) {
tableRuntime.recover(this);
}So the flow is: loadProcess() → new TableOptimizingProcess(tableRuntime, meta, state) → constructor calls tableRuntime.recover(this) → optimizingProcess field is set.
The explicit tableRuntime.recover(process) call in the old initTableRuntime() was actually redundant alongside the constructor call. This PR just removes the duplicate.
There was a problem hiding this comment.
if (this.status != ProcessStatus.KILLED) {
tableRuntime.recover(this);
}
Perhaps we should remove the judgment to preserve the original meaning:
tableRuntime.recover(this)
I'm not quite sure why we need to add the condition this.status != ProcessStatus.KILLED either.
There was a problem hiding this comment.
Good question — this condition isn't new in this PR. It was introduced in commit 064f400 ([AMORO-3485] Introduce scheduler module and external resource container) as a rename from ProcessStatus.CLOSED → ProcessStatus.KILLED.
The intent is that a KILLED process has already been fully terminated, so recovering it into tableRuntime.optimizingProcess would be unnecessary — and could cause issues since recover() also triggers completeProcess(true) when the process status is SUCCESS, which we wouldn't want for a KILLED process.
That said, in the current PR the loadProcess() → canResumeProcess() flow already filters out KILLED processes (canResumeProcess requires status == RUNNING), and resetTableForRecovery() handles the cleanup. So the guard is effectively redundant now, but I'd prefer to keep it as a defensive check since it's pre-existing behavior and doesn't hurt.
There was a problem hiding this comment.
If there are no side effects from removing it, I would prefer to remove this check. Since you're essentially refactoring the entire process, it's better to directly eliminate any unused code from before. Otherwise, it wouldn't feel like a complete refactoring.
There was a problem hiding this comment.
Agreed — removed the status != ProcessStatus.KILLED guard in f25600f. Since canResumeProcess() already filters out KILLED processes before reaching this path, the check was redundant. Cleaner to remove it as part of this refactoring.
| public void recover(OptimizingProcess optimizingProcess) { | ||
| if (!getOptimizingStatus().isProcessing() | ||
| || !Objects.equals(optimizingProcess.getProcessId(), getProcessId())) { | ||
| if (!Objects.equals(optimizingProcess.getProcessId(), getProcessId())) { |
There was a problem hiding this comment.
Note: This change is currently dead code
Since recover() is no longer called anywhere in the new initTableRuntime() logic, this modification has no effect in the current PR.
However, if the missing recover() call is added back (see my comment on OptimizingQueue.java), then this change becomes necessary — because the resume path may call recover() on a table whose isProcessing() status is true (e.g., *_OPTIMIZING), so relaxing the guard condition here makes sense in that context.
There was a problem hiding this comment.
This change is actually active — the TableOptimizingProcess recovery constructor still calls tableRuntime.recover(this) (see my reply on the first comment).
The condition relaxation is also necessary: unlike the original code which guarded process creation with isProcessing(), the new loadProcess() creates a process whenever processId != 0 regardless of OptimizingStatus. This means recover() can now be called on a table in PENDING or PLANNING state (e.g., if AMS crashed between setting processId and updating the status). Without removing the !isProcessing() guard, that would throw IllegalStateException.
| .updateState(PENDING_INPUT_KEY, any -> new AbstractOptimizingEvaluator.PendingInput()) | ||
| .commit(); | ||
| if (originalStatus == OptimizingStatus.IDLE) { | ||
| return; |
There was a problem hiding this comment.
Caution: Expanded scope of completeEmptyProcess() may have unintended side effects
The original implementation only allowed PLANNING and PENDING states to be reset to IDLE. The new version resets any non-IDLE state (including *_OPTIMIZING and COMMITTING).
While this is needed by resetTableForRecovery() (which guards with its own IDLE/PENDING check), completeEmptyProcess() is a public method. Any existing or future caller that invokes it on a *_OPTIMIZING table will now silently reset it to IDLE and mark the current snapshot as "optimized" — this was previously a no-op.
Consider either:
- Adding a comment documenting the new broader contract, or
- Making
resetTableForRecoveryuse a dedicated internal method, keepingcompleteEmptyProcess()scoped to its original states to avoid accidentally resetting actively-optimizing tables from other call sites.
There was a problem hiding this comment.
Valid concern. The expanded scope is intentional for resetTableForRecovery(), which needs to reset tables from any non-IDLE state during startup recovery.
In practice, the risk to existing callers is low — the only other call site is planInternal() which always runs in PLANNING state. But I agree it's worth making the contract explicit. I'll add a comment documenting the broader scope.
…very When all tasks are already completed during recovery, skip adding the process to tableQueue and directly trigger beginCommitting() to prevent an empty shell process from lingering in the queue. Signed-off-by: Jiwon Park <jpark92@outlook.kr>
…r scope Document that completeEmptyProcess() resets the table from any non-IDLE state, not just PLANNING/PENDING, as it is now also used during startup recovery for tables with unrecoverable processes. Signed-off-by: Jiwon Park <jpark92@outlook.kr>
…ctor canResumeProcess() already filters out KILLED processes, so the status != KILLED guard in the recovery constructor is unnecessary. Remove it to clean up legacy code as part of the refactoring. Signed-off-by: Jiwon Park <jpark92@outlook.kr>
xxubai
left a comment
There was a problem hiding this comment.
Thank you for submitting this fix. We’ve also experienced this issue internally, as it affects system stability. We’ll prioritize addressing it.
The overall implementation looks solid, particularly the added unit test coverage for this case.
| toSequence = processState.getToSequence(); | ||
| } | ||
| this.status = processMeta.getStatus(); | ||
| if (this.status != ProcessStatus.KILLED) { |
There was a problem hiding this comment.
Can you explain why remove the condition?
There was a problem hiding this comment.
Removed per @lintingbin's earlier feedback (comment).
In the refactored flow, canResumeProcess() already filters out non-RUNNING processes (including KILLED), so a KILLED process never reaches this constructor path — resetTableForRecovery() handles it instead. The guard was redundant, so I cleaned it up.
| OptimizerKeepingTask keepingTask = suspendingQueue.take(); | ||
| String token = keepingTask.getToken(); | ||
| boolean isExpired = !keepingTask.tryKeeping(); | ||
| if (isExpired) { |
There was a problem hiding this comment.
What would be the benefit of moving this code block to the front?
There was a problem hiding this comment.
It fixes an ordering bug. buildSuspendingPredication checks !activeTokens.contains(task.getToken()) to find tasks from dead optimizers, where activeTokens is authOptimizers.keySet().
In the original code, collectTasks ran before unregisterOptimizer — so the expired token was still in authOptimizers when the predicate evaluated, meaning !activeTokens.contains(token) was false. The expired optimizer's tasks got silently skipped. They'd only be picked up in a later iteration (triggered by a different optimizer's keepalive) or when the ack/exec timeout kicked in.
Moving unregisterOptimizer before collectTasks ensures the token is already gone by the time the predicate runs, so the tasks are correctly collected and retried immediately.
| // Don't reset — let the optimizer finish execution and report | ||
| // the result. If the optimizer is dead, OptimizerKeeper will | ||
| // detect the stale token and reset the task via retryTask(). | ||
| LOG.info( |
There was a problem hiding this comment.
nit: Recovery tasks kept in SCHEDULED/ACKED state are logged at INFO, but this is a transient recovery state that resolves itself. Consider using DEBUG to avoid noisy logs in clusters with many tables.
There was a problem hiding this comment.
Good point — changed to LOG.debug in ef33d27.
Transient recovery state logs can be noisy in clusters with many tables. Signed-off-by: Jiwon Park <jpark92@outlook.kr>
Why are the changes needed?
After AMS restarts, tables can become permanently stuck in an optimizing state and never recover.
Several edge cases cause this:
*_OPTIMIZINGbut process isFAILEDin DB — table gets added totableQueuebut poll blocks forever on a dead process.COMMITTINGbut process isFAILED— table ends up in neitherschedulernortableQueue(ghost table).PLANNINGwith an orphaned process — same ghost table problem.NullPointerExceptionduring recovery, blocking other tables' initialization.SCHEDULED/ACKEDtasks not re-queued — tasks stuck waiting for an optimizer that may have died during AMS downtime.beginCommitting()not called — AMS crashes after persisting the last task asSUCCESSbut before updating the table status toCOMMITTING. On recovery, the process is resumed intotableQueuebuttaskQueueis empty, soacceptResult()is never triggered and the table is stuck forever.Close #4089.
Recovery decision matrix (
initTableRuntime)The following matrix shows how each (process status, table status) combination is handled after AMS restart.
isProcessing=*_OPTIMIZINGorCOMMITTING.schedulerfor re-evaluation.completeEmptyProcess(), add toscheduler.canResumeProcess()= true. Process is re-added totableQueueand tasks continue executing. If all tasks are alreadySUCCESS,beginCommitting()is triggered immediately viaallTasksPrepared()check.Brief change log
initTableRuntime()recovery logic: Refactored intoloadProcess()/canResumeProcess()/resetTableForRecovery(). Only resumes when process is RUNNING and table is*_OPTIMIZING. All other combinations except IDLE and PENDING reset to IDLE. Per-table try-catch added to isolate failures.PENDINGis a normal pre-processing state (processId=0, waiting for planning). Resetting it would incorrectly mark existing snapshots as optimized viacompleteEmptyProcess(), causing pending data to be permanently skipped. These tables are simply re-added toschedulerwithout reset.SUCCESS,beginCommitting()is triggered immediately to prevent the table from being stuck with an emptytaskQueue.SCHEDULED/ACKEDtask handling: Kept as-is during recovery (with logging) instead of silently falling through. StaleACKEDtasks are auto-reset toPLANNEDwhen the same optimizer thread polls again (resetStaleTasksForThread).unregisterOptimizer()now runs beforecollectTasks(), so expired optimizer's tasks are immediately detected.completeEmptyProcess(): Expanded to reset any non-IDLE state to IDLE (previously onlyPLANNING/PENDING). Used byresetTableForRecovery()for PLANNING, COMMITTING, and*_OPTIMIZINGstates.How was this patch tested?
Add some test cases that check the changes thoroughly including negative and positive cases if possible
testPollResetsStaleAckedTask: Verifies stale ACKED task is auto-reset when the same optimizer thread polls again.testReloadPlanningWithOrphanedProcess: Table in PLANNING with orphaned process → reset to IDLE.testReloadOptimizingWithFailedProcess: Table in *_OPTIMIZING with FAILED process → reset to IDLE.testReloadCommittingWithFailedProcess: Table in COMMITTING with FAILED process → reset to IDLE.testReloadOptimizingWithNoProcessRecord: Table in *_OPTIMIZING with missing process record → reset to IDLE.testOptimizerExpiredto expectPLANNED(tasks are immediately reset since unregister now happens before task scan).testReloadAllTasksCompletedNotYetCommitting: All tasks SUCCESS + table still in *_OPTIMIZING →beginCommitting()triggered on recovery.testReloadScheduledTaskandtestReloadAckTaskwith clarifying comments.testPollTaskWithOverQuotaDisabledandtestPollTaskWithOverQuotaEnabledto use separate optimizer threads for quota-check polls, avoiding interference withresetStaleTasksForThread.Add screenshots for manual tests if appropriate
Run test locally before making a pull request
Documentation