Skip to content

[AMORO-4089] Fix optimizing process stuck permanently after AMS restart#4090

Open
j1wonpark wants to merge 6 commits intoapache:masterfrom
j1wonpark:fix/AMORO-4089-optimizing-process-stuck-after-ams-restart
Open

[AMORO-4089] Fix optimizing process stuck permanently after AMS restart#4090
j1wonpark wants to merge 6 commits intoapache:masterfrom
j1wonpark:fix/AMORO-4089-optimizing-process-stuck-after-ams-restart

Conversation

@j1wonpark
Copy link
Contributor

@j1wonpark j1wonpark commented Feb 15, 2026

Why are the changes needed?

After AMS restarts, tables can become permanently stuck in an optimizing state and never recover.
Several edge cases cause this:

  1. Table in *_OPTIMIZING but process is FAILED in DB — table gets added to tableQueue but poll blocks forever on a dead process.
  2. Table in COMMITTING but process is FAILED — table ends up in neither scheduler nor tableQueue (ghost table).
  3. Table in PLANNING with an orphaned process — same ghost table problem.
  4. Process record missing from DBNullPointerException during recovery, blocking other tables' initialization.
  5. SCHEDULED/ACKED tasks not re-queued — tasks stuck waiting for an optimizer that may have died during AMS downtime.
  6. OptimizerKeeper race condition — expired optimizer's tasks scanned before unregister, so the predicate doesn't match them.
  7. All tasks completed but beginCommitting() not called — AMS crashes after persisting the last task as SUCCESS but before updating the table status to COMMITTING. On recovery, the process is resumed into tableQueue but taskQueue is empty, so acceptResult() is never triggered and the table is stuck forever.

Close #4089.

Recovery decision matrix (initTableRuntime)

The following matrix shows how each (process status, table status) combination is handled after AMS restart.
isProcessing = *_OPTIMIZING or COMMITTING.

Process \ Table IDLE PLANNING PENDING *_OPTIMIZING COMMITTING
null (processId=0 or record missing) reset reset reset
RUNNING reset resume reset
FAILED reset reset reset
SUCCESS reset reset reset
CLOSED / KILLED reset reset reset
  • — no reset needed, add to scheduler for re-evaluation.
  • reset — reset to IDLE via completeEmptyProcess(), add to scheduler.
  • resumecanResumeProcess() = true. Process is re-added to tableQueue and tasks continue executing. If all tasks are already SUCCESS, beginCommitting() is triggered immediately via allTasksPrepared() check.

Brief change log

  • initTableRuntime() recovery logic: Refactored into loadProcess() / canResumeProcess() / resetTableForRecovery(). Only resumes when process is RUNNING and table is *_OPTIMIZING. All other combinations except IDLE and PENDING reset to IDLE. Per-table try-catch added to isolate failures.
  • PENDING tables preserved: PENDING is a normal pre-processing state (processId=0, waiting for planning). Resetting it would incorrectly mark existing snapshots as optimized via completeEmptyProcess(), causing pending data to be permanently skipped. These tables are simply re-added to scheduler without reset.
  • All-tasks-completed recovery: When a resumed process has all tasks already SUCCESS, beginCommitting() is triggered immediately to prevent the table from being stuck with an empty taskQueue.
  • SCHEDULED/ACKED task handling: Kept as-is during recovery (with logging) instead of silently falling through. Stale ACKED tasks are auto-reset to PLANNED when the same optimizer thread polls again (resetStaleTasksForThread).
  • OptimizerKeeper race condition: unregisterOptimizer() now runs before collectTasks(), so expired optimizer's tasks are immediately detected.
  • completeEmptyProcess(): Expanded to reset any non-IDLE state to IDLE (previously only PLANNING/PENDING). Used by resetTableForRecovery() for PLANNING, COMMITTING, and *_OPTIMIZING states.

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

    • testPollResetsStaleAckedTask: Verifies stale ACKED task is auto-reset when the same optimizer thread polls again.
    • testReloadPlanningWithOrphanedProcess: Table in PLANNING with orphaned process → reset to IDLE.
    • testReloadOptimizingWithFailedProcess: Table in *_OPTIMIZING with FAILED process → reset to IDLE.
    • testReloadCommittingWithFailedProcess: Table in COMMITTING with FAILED process → reset to IDLE.
    • testReloadOptimizingWithNoProcessRecord: Table in *_OPTIMIZING with missing process record → reset to IDLE.
    • Updated testOptimizerExpired to expect PLANNED (tasks are immediately reset since unregister now happens before task scan).
    • testReloadAllTasksCompletedNotYetCommitting: All tasks SUCCESS + table still in *_OPTIMIZING → beginCommitting() triggered on recovery.
    • Updated testReloadScheduledTask and testReloadAckTask with clarifying comments.
    • Updated testPollTaskWithOverQuotaDisabled and testPollTaskWithOverQuotaEnabled to use separate optimizer threads for quota-check polls, avoiding interference with resetStaleTasksForThread.
  • Add screenshots for manual tests if appropriate

  • Run test locally before making a pull request

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@github-actions github-actions bot added the module:ams-server Ams server module label Feb 15, 2026
@j1wonpark j1wonpark force-pushed the fix/AMORO-4089-optimizing-process-stuck-after-ams-restart branch from 807224d to ea63514 Compare February 19, 2026 05:48
@j1wonpark j1wonpark marked this pull request as draft February 19, 2026 07:36
@j1wonpark j1wonpark force-pushed the fix/AMORO-4089-optimizing-process-stuck-after-ams-restart branch from ea63514 to fe4b1a9 Compare February 19, 2026 12:34
Fix multiple scenarios where tables become permanently stuck after
AMS restart by refactoring initTableRuntime() recovery logic, fixing
OptimizerKeeper race condition, and adding defensive measures.

Signed-off-by: Jiwon Park <jpark92@outlook.kr>
@j1wonpark j1wonpark force-pushed the fix/AMORO-4089-optimizing-process-stuck-after-ams-restart branch from fe4b1a9 to fb3ce80 Compare February 20, 2026 08:44
@j1wonpark j1wonpark marked this pull request as ready for review February 20, 2026 09:57
@j1wonpark
Copy link
Contributor Author

Hi, could you review this PR when you get a chance?

This builds on #4043 and handles some additional edge cases in the recovery logic.
@klion26 @lintingbin @baiyangtx

Thanks!

PENDING tables (processId=0) should not be reset via
completeEmptyProcess() as it incorrectly marks existing snapshots
as already optimized. Simply re-add them to the scheduler instead.

Also use separate optimizer threads in quota tests to avoid
interference with resetStaleTasksForThread.

Signed-off-by: Jiwon Park <jpark92@outlook.kr>
@codecov-commenter
Copy link

codecov-commenter commented Feb 20, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 22.45%. Comparing base (fda105e) to head (f25600f).
⚠️ Report is 3 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4090      +/-   ##
============================================
+ Coverage     22.39%   22.45%   +0.05%     
  Complexity     2552     2552              
============================================
  Files           458      458              
  Lines         42116    42022      -94     
  Branches       5917     5915       -2     
============================================
+ Hits           9433     9435       +2     
+ Misses        31871    31775      -96     
  Partials        812      812              
Flag Coverage Δ
trino 22.45% <ø> (+0.05%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@lintingbin lintingbin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough work on this PR! The recovery decision matrix is well-thought-out, the refactoring into loadProcess() / canResumeProcess() / resetTableForRecovery() makes the logic much clearer, and the test coverage is excellent.

However, there is a critical bug: the tableRuntime.recover(process) call was removed during the refactoring, which means tableRuntime.getOptimizingProcess() returns null for all resumed processes. This directly breaks OptimizingCommitExecutor (throws IllegalStateException) and causes NPEs in DefaultTableMaintainerContext. Ironically, this means the "all tasks completed but not yet committing" recovery scenario (issue #7 in the description) will still result in a stuck table.

Summary of findings:

Issue Severity Description
Missing recover() call in resume path Critical optimizingProcess is never set → commit fails, NPE in maintainer context
Process left in tableQueue after beginCommitting() Minor Empty-shell process with no tasks sits in queue indefinitely
recover() guard relaxation is dead code Minor Becomes relevant only after the critical fix is applied
completeEmptyProcess() expanded scope Minor Public method now resets any non-IDLE state; may affect future callers

The OptimizerKeeper race fix, resetStaleTasksForThread, null-safe loadProcess, per-table try-catch, and PENDING table preservation all look correct and well-designed.

process.getProcessId(),
tableRuntime.getTableIdentifier());
process.close(false);
tableRuntime.beginCommitting();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Process left in tableQueue with no remaining tasks

When allTasksPrepared() is true and beginCommitting() is called, the process has already been added to tableQueue (line above) but its taskQueue is empty — all tasks are SUCCESS. This "empty shell" process will sit in tableQueue indefinitely:

  • pollTask() will iterate over it on every poll but never get a task from it
  • It won't be cleaned up until clearProcess() is called after a commit

Consider either not adding it to tableQueue when all tasks are already completed, or removing it after triggering beginCommitting():

if (process.allTasksPrepared()) {
    tableQueue.remove(process);
    // ...
    tableRuntime.beginCommitting();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. You're right that the process sits in tableQueue briefly with an empty taskQueue. In practice it gets cleaned up when the commit completes (persistAndSetCompleted()clearProcess(this)), so it's short-lived.

That said, removing it proactively is cleaner — I'll update this to:

if (process.allTasksPrepared()) {
    tableQueue.remove(process);
    tableRuntime.beginCommitting();
}

"Close the committing process {} on table {}",

if (canResumeProcess(process, tableRuntime)) {
tableQueue.offer(process);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (Critical): Missing tableRuntime.recover(process) — will cause IllegalStateException and NPE at runtime

In the original code, tableRuntime.recover(process) was called to set the optimizingProcess field on DefaultTableRuntime. This call has been completely removed in the new code. As a result, when a process is resumed, tableRuntime.getOptimizingProcess() will return null.

This directly breaks the commit cycle. OptimizingCommitExecutor.execute() does:

Optional.of(tableRuntime)
    .map(t -> (DefaultTableRuntime) t)
    .map(DefaultTableRuntime::getOptimizingProcess)   // returns null!
    .orElseThrow(() -> new IllegalStateException(
        "OptimizingProcess is null while committing:" + tableRuntime))
    .commit();

So in the allTasksPrepared recovery path, after beginCommitting() is called, the commit executor will throw IllegalStateException because getOptimizingProcess() returns null. The table will remain stuck — the exact problem this PR aims to fix.

Additionally, this affects other callers:

  • DefaultTableMaintainerContext.getTargetSnapshotId() → NPE when isProcessing() is true
  • Cancel process via API → silently fails (returns false)
  • Disabling self-optimizing → running process is not properly closed

Suggested fix:

if (canResumeProcess(process, tableRuntime)) {
    tableRuntime.recover(process);  // <-- add this line
    tableQueue.offer(process);
    if (process.allTasksPrepared()) {
        // ...
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough review, @lintingbin!

I believe recover() is actually still being called — the TableOptimizingProcess recovery constructor invokes tableRuntime.recover(this) internally when status != ProcessStatus.KILLED:

// OptimizingQueue.java L569-572
this.status = processMeta.getStatus();
if (this.status != ProcessStatus.KILLED) {
    tableRuntime.recover(this);
}

So the flow is: loadProcess()new TableOptimizingProcess(tableRuntime, meta, state) → constructor calls tableRuntime.recover(this)optimizingProcess field is set.

The explicit tableRuntime.recover(process) call in the old initTableRuntime() was actually redundant alongside the constructor call. This PR just removes the duplicate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (this.status != ProcessStatus.KILLED) {
    tableRuntime.recover(this);
}

Perhaps we should remove the judgment to preserve the original meaning:
tableRuntime.recover(this)

I'm not quite sure why we need to add the condition this.status != ProcessStatus.KILLED either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question — this condition isn't new in this PR. It was introduced in commit 064f400 ([AMORO-3485] Introduce scheduler module and external resource container) as a rename from ProcessStatus.CLOSEDProcessStatus.KILLED.

The intent is that a KILLED process has already been fully terminated, so recovering it into tableRuntime.optimizingProcess would be unnecessary — and could cause issues since recover() also triggers completeProcess(true) when the process status is SUCCESS, which we wouldn't want for a KILLED process.

That said, in the current PR the loadProcess()canResumeProcess() flow already filters out KILLED processes (canResumeProcess requires status == RUNNING), and resetTableForRecovery() handles the cleanup. So the guard is effectively redundant now, but I'd prefer to keep it as a defensive check since it's pre-existing behavior and doesn't hurt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are no side effects from removing it, I would prefer to remove this check. Since you're essentially refactoring the entire process, it's better to directly eliminate any unused code from before. Otherwise, it wouldn't feel like a complete refactoring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — removed the status != ProcessStatus.KILLED guard in f25600f. Since canResumeProcess() already filters out KILLED processes before reaching this path, the check was redundant. Cleaner to remove it as part of this refactoring.

public void recover(OptimizingProcess optimizingProcess) {
if (!getOptimizingStatus().isProcessing()
|| !Objects.equals(optimizingProcess.getProcessId(), getProcessId())) {
if (!Objects.equals(optimizingProcess.getProcessId(), getProcessId())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This change is currently dead code

Since recover() is no longer called anywhere in the new initTableRuntime() logic, this modification has no effect in the current PR.

However, if the missing recover() call is added back (see my comment on OptimizingQueue.java), then this change becomes necessary — because the resume path may call recover() on a table whose isProcessing() status is true (e.g., *_OPTIMIZING), so relaxing the guard condition here makes sense in that context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is actually active — the TableOptimizingProcess recovery constructor still calls tableRuntime.recover(this) (see my reply on the first comment).

The condition relaxation is also necessary: unlike the original code which guarded process creation with isProcessing(), the new loadProcess() creates a process whenever processId != 0 regardless of OptimizingStatus. This means recover() can now be called on a table in PENDING or PLANNING state (e.g., if AMS crashed between setting processId and updating the status). Without removing the !isProcessing() guard, that would throw IllegalStateException.

.updateState(PENDING_INPUT_KEY, any -> new AbstractOptimizingEvaluator.PendingInput())
.commit();
if (originalStatus == OptimizingStatus.IDLE) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution: Expanded scope of completeEmptyProcess() may have unintended side effects

The original implementation only allowed PLANNING and PENDING states to be reset to IDLE. The new version resets any non-IDLE state (including *_OPTIMIZING and COMMITTING).

While this is needed by resetTableForRecovery() (which guards with its own IDLE/PENDING check), completeEmptyProcess() is a public method. Any existing or future caller that invokes it on a *_OPTIMIZING table will now silently reset it to IDLE and mark the current snapshot as "optimized" — this was previously a no-op.

Consider either:

  1. Adding a comment documenting the new broader contract, or
  2. Making resetTableForRecovery use a dedicated internal method, keeping completeEmptyProcess() scoped to its original states to avoid accidentally resetting actively-optimizing tables from other call sites.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid concern. The expanded scope is intentional for resetTableForRecovery(), which needs to reset tables from any non-IDLE state during startup recovery.

In practice, the risk to existing callers is low — the only other call site is planInternal() which always runs in PLANNING state. But I agree it's worth making the contract explicit. I'll add a comment documenting the broader scope.

…very

When all tasks are already completed during recovery, skip adding the
process to tableQueue and directly trigger beginCommitting() to prevent
an empty shell process from lingering in the queue.

Signed-off-by: Jiwon Park <jpark92@outlook.kr>
…r scope

Document that completeEmptyProcess() resets the table from any non-IDLE
state, not just PLANNING/PENDING, as it is now also used during startup
recovery for tables with unrecoverable processes.

Signed-off-by: Jiwon Park <jpark92@outlook.kr>
…ctor

canResumeProcess() already filters out KILLED processes, so the
status != KILLED guard in the recovery constructor is unnecessary.
Remove it to clean up legacy code as part of the refactoring.

Signed-off-by: Jiwon Park <jpark92@outlook.kr>
Copy link
Contributor

@xxubai xxubai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for submitting this fix. We’ve also experienced this issue internally, as it affects system stability. We’ll prioritize addressing it.

The overall implementation looks solid, particularly the added unit test coverage for this case.

toSequence = processState.getToSequence();
}
this.status = processMeta.getStatus();
if (this.status != ProcessStatus.KILLED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why remove the condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed per @lintingbin's earlier feedback (comment).

In the refactored flow, canResumeProcess() already filters out non-RUNNING processes (including KILLED), so a KILLED process never reaches this constructor path — resetTableForRecovery() handles it instead. The guard was redundant, so I cleaned it up.

OptimizerKeepingTask keepingTask = suspendingQueue.take();
String token = keepingTask.getToken();
boolean isExpired = !keepingTask.tryKeeping();
if (isExpired) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the benefit of moving this code block to the front?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It fixes an ordering bug. buildSuspendingPredication checks !activeTokens.contains(task.getToken()) to find tasks from dead optimizers, where activeTokens is authOptimizers.keySet().

In the original code, collectTasks ran before unregisterOptimizer — so the expired token was still in authOptimizers when the predicate evaluated, meaning !activeTokens.contains(token) was false. The expired optimizer's tasks got silently skipped. They'd only be picked up in a later iteration (triggered by a different optimizer's keepalive) or when the ack/exec timeout kicked in.

Moving unregisterOptimizer before collectTasks ensures the token is already gone by the time the predicate runs, so the tasks are correctly collected and retried immediately.

// Don't reset — let the optimizer finish execution and report
// the result. If the optimizer is dead, OptimizerKeeper will
// detect the stale token and reset the task via retryTask().
LOG.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Recovery tasks kept in SCHEDULED/ACKED state are logged at INFO, but this is a transient recovery state that resolves itself. Consider using DEBUG to avoid noisy logs in clusters with many tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — changed to LOG.debug in ef33d27.

Transient recovery state logs can be noisy in clusters with many tables.

Signed-off-by: Jiwon Park <jpark92@outlook.kr>
Copy link
Contributor

@xxubai xxubai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

module:ams-server Ams server module

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: Optimizing process stuck permanently after AMS restart

4 participants