-
Notifications
You must be signed in to change notification settings - Fork 11
Update status of disconnected and stale jobs #981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for antenna-preview ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a comprehensive job status monitoring system to detect and handle disconnected and stale jobs automatically. The system introduces periodic status checking with configurable timeouts and automatic retry mechanisms for failed tasks.
- Adds a new
check_status()method to the Job model that validates job state against Celery task status - Implements a periodic task (
check_unfinished_jobs) that runs every 3 minutes to monitor all incomplete jobs - Includes extensive test coverage for various job failure scenarios including disappeared tasks, stale jobs, and timeout conditions
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| ami/jobs/models.py | Core implementation of job status checking with timeout handling and auto-retry logic |
| ami/jobs/tasks.py | Periodic task for monitoring unfinished jobs with cache-based locking |
| ami/jobs/tests.py | Comprehensive test suite covering all status checking scenarios |
| ami/jobs/migrations/0018_add_last_checked_at_and_periodic_task.py | Database migration adding last_checked_at field and periodic task setup |
| ami/jobs/management/commands/update_stale_jobs.py | Updated management command to use new status checking system |
| ami/jobs/management/commands/debug_jobs.py | New debugging utility for testing job status scenarios |
| setup.cfg | Flake8 configuration to ignore F541 warnings |
| .vscode/settings.json | VS Code settings update for F541 ignore |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
ami/jobs/models.py
Outdated
| # Only update the fields we changed to avoid concurrency issues | ||
| update_fields = ["last_checked_at"] | ||
| if status_changed: | ||
| update_fields.extend(["status", "progress", "finished_at"]) | ||
| self.save(update_fields=update_fields, update_progress=False) |
Copilot
AI
Oct 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The comment mentions avoiding concurrency issues but the code pattern is repeated multiple times. Consider extracting this logic into a helper method to reduce duplication and ensure consistent handling of partial field updates.
| unfinished_jobs = unfinished_jobs[:MAX_JOBS_PER_RUN] | ||
|
|
||
| # Only check jobs that haven't been checked recently | ||
| now = datetime.datetime.now() |
Copilot
AI
Oct 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using datetime.datetime.now() instead of timezone.now() may cause timezone-related issues. Since Django's timezone.now() is already imported elsewhere in the codebase, consider using it here for consistency.
| STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS = 3600 # 1 hour - max time in PENDING without workers | ||
| PENDING_LOG_INTERVAL_SECONDS = 300 # 5 minutes - how often to log waiting messages | ||
|
|
||
| now = datetime.datetime.now() |
Copilot
AI
Oct 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using datetime.datetime.now() instead of Django's timezone.now() may cause timezone-related issues, especially when comparing with database timestamps that are timezone-aware.
ami/jobs/models.py
Outdated
| # Configuration thresholds (TODO: make these configurable via settings) | ||
| NO_TASK_ID_TIMEOUT_SECONDS = 300 # 5 minutes - time to wait for task_id before marking as failed | ||
| DISAPPEARED_TASK_RETRY_THRESHOLD_SECONDS = 300 # 5 minutes - auto-retry if task disappeared within this time | ||
| MAX_JOB_RUNTIME_SECONDS = 2 * 24 * 60 * 60 # 2 days - max time a job can run before being marked stale | ||
| STUCK_PENDING_TIMEOUT_SECONDS = 600 # 10 minutes - max time in PENDING with workers available | ||
| STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS = 3600 # 1 hour - max time in PENDING without workers | ||
| PENDING_LOG_INTERVAL_SECONDS = 300 # 5 minutes - how often to log waiting messages |
Copilot
AI
Oct 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] These configuration constants are hardcoded in the method. Consider moving them to Django settings or class-level constants to make them more maintainable and testable.
| # Configuration thresholds (TODO: make these configurable via settings) | ||
| LOCK_TIMEOUT_SECONDS = 300 # 5 minutes - how long the lock is held | ||
| MAX_JOBS_PER_RUN = 100 # Maximum number of jobs to check in one run | ||
| MIN_CHECK_INTERVAL_MINUTES = 2 # Minimum time between checks for the same job |
Copilot
AI
Oct 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Similar to the job model, these configuration constants should be moved to Django settings to make them configurable without code changes.
|
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughThis pull request introduces a comprehensive job status reconciliation system to resolve issues where job statuses remain outdated or "stuck" in STARTED state. It adds a periodic Celery task that continuously checks unfinished jobs against their actual Celery task states, implements atomic row-level locking for safe concurrent updates, adds a debugging management command, and includes extensive tests for status checking and concurrency scenarios. Changes
Sequence DiagramsequenceDiagram
participant Periodic as Periodic<br/>Task Scheduler
participant CeleryTask as check_unfinished_jobs<br/>(Celery Task)
participant Cache as Django Cache<br/>(Distributed Lock)
participant JobModel as Job Model<br/>(DB + Locking)
participant CeleryWorker as Celery AsyncResult<br/>(Remote Task State)
Periodic->>CeleryTask: Trigger every 3 minutes
CeleryTask->>Cache: Attempt to acquire lock
alt Lock acquired
Cache-->>CeleryTask: Lock successful
CeleryTask->>JobModel: Query unfinished jobs<br/>(not recently checked)
JobModel-->>CeleryTask: Job list
loop For each Job
CeleryTask->>JobModel: atomic_job_update(lock)
CeleryTask->>CeleryWorker: Fetch current task state
CeleryWorker-->>CeleryTask: Task status<br/>(PENDING/STARTED/SUCCESS/FAILED/etc.)
CeleryTask->>JobModel: Reconcile & update status<br/>if state mismatched
JobModel-->>CeleryTask: Updated (or no change)
CeleryTask->>JobModel: Release lock
end
CeleryTask->>JobModel: Update last_checked_at
CeleryTask->>Cache: Release distributed lock
else Lock already held
Cache-->>CeleryTask: Lock unavailable
CeleryTask-->>Periodic: Skip (another run in progress)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Areas requiring extra attention:
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
ami/jobs/models.py (1)
1335-1359: Replace naivedatetime.now()withtimezone.now()incheck_status.We’re still calling
datetime.datetime.now()here, sonowis naive. The moment a job hasscheduled_at/last_checked_atpopulated viatimezone.now()(the normal path), the subtraction inside_check_missing_task_idor the “min check interval” logic will raiseTypeError: can't subtract offset-naive and offset-aware datetimes. That makes the new periodic checker blow up after the first successful update. Please switch todjango.utils.timezone.now()(and propagate it through the helpers) so we always compare like-with-like—this is the blocker that Copilot flagged earlier, and it’s still unresolved. Based on learnings.- now = datetime.datetime.now() + from django.utils import timezone + now = timezone.now()ami/jobs/tasks.py (1)
115-126: Use timezone-aware timestamps incheck_unfinished_jobs.
datetime.datetime.now()returns a naive datetime, butJob.last_checked_atandJob.scheduled_atare stored as timezone-aware values (we set them viatimezone.now()all over the codebase). On the second invocation of this task the subtractionnow - job.last_checked_atwill hit aTypeError: can't subtract offset-naive and offset-aware datetimes, crashing the periodic job. Switch todjango.utils.timezone.now()(and reuse it everywhere in this task) so the arithmetic stays compatible with our model fields.@@ - import datetime + import datetime + from django.utils import timezone @@ - now = datetime.datetime.now() + now = timezone.now()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
ami/jobs/management/commands/debug_jobs.py(1 hunks)ami/jobs/management/commands/update_stale_jobs.py(2 hunks)ami/jobs/migrations/0018_add_last_checked_at_and_periodic_task.py(1 hunks)ami/jobs/models.py(24 hunks)ami/jobs/tasks.py(3 hunks)ami/jobs/tests.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
ami/jobs/models.py (2)
ami/jobs/views.py (1)
retry(98-109)ami/ml/post_processing/base.py (1)
update_progress(57-68)
ami/jobs/tests.py (2)
ami/jobs/models.py (8)
Job(875-1641)check_status(1320-1409)JobState(140-194)atomic_job_update(91-137)save(1518-1587)update_progress(1439-1463)logger(1626-1635)JobLogHandler(386-429)ami/jobs/tasks.py (1)
check_unfinished_jobs(72-157)
ami/jobs/management/commands/debug_jobs.py (1)
ami/jobs/models.py (7)
JobState(140-194)MLJob(456-658)check_status(1320-1409)running_states(185-186)final_states(189-190)failed_states(193-194)enqueue(966-983)
ami/jobs/tasks.py (1)
ami/jobs/models.py (6)
logger(1626-1635)save(1518-1587)Job(875-1641)JobState(140-194)running_states(185-186)check_status(1320-1409)
ami/jobs/management/commands/update_stale_jobs.py (1)
ami/jobs/models.py (5)
Job(875-1641)JobState(140-194)running_states(185-186)check_status(1320-1409)save(1518-1587)
🪛 Ruff (0.14.2)
ami/jobs/models.py
65-65: Consider moving this statement to an else block
(TRY300)
66-66: Do not catch blind exception: Exception
(BLE001)
73-73: Unused function argument: timestamp
(ARG001)
394-394: Comment contains ambiguous × (MULTIPLICATION SIGN). Did you mean x (LATIN SMALL LETTER X)?
(RUF003)
1073-1073: Unused method argument: now
(ARG002)
1103-1103: Do not catch blind exception: Exception
(BLE001)
1131-1131: Unused method argument: task
(ARG002)
1172-1172: Consider moving this statement to an else block
(TRY300)
1173-1173: Do not catch blind exception: Exception
(BLE001)
1174-1174: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1243-1243: Do not catch blind exception: Exception
(BLE001)
1244-1244: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1369-1369: Do not catch blind exception: Exception
(BLE001)
1400-1400: Do not catch blind exception: Exception
(BLE001)
1401-1401: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1509-1509: Do not catch blind exception: Exception
(BLE001)
1576-1576: Do not catch blind exception: Exception
(BLE001)
1577-1577: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/jobs/tests.py
388-388: Unused method argument: mock_job_objects
(ARG002)
ami/jobs/migrations/0018_add_last_checked_at_and_periodic_task.py
5-5: Unused function argument: apps
(ARG001)
5-5: Unused function argument: schema_editor
(ARG001)
23-23: Unused function argument: apps
(ARG001)
23-23: Unused function argument: schema_editor
(ARG001)
29-31: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
33-44: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/management/commands/debug_jobs.py
119-119: Unused method argument: args
(ARG002)
163-163: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
163-163: Avoid specifying long messages outside the exception class
(TRY003)
190-190: Do not catch blind exception: Exception
(BLE001)
207-207: f-string without any placeholders
Remove extraneous f prefix
(F541)
221-221: Avoid specifying long messages outside the exception class
(TRY003)
276-276: f-string without any placeholders
Remove extraneous f prefix
(F541)
327-327: Do not catch blind exception: Exception
(BLE001)
407-407: f-string without any placeholders
Remove extraneous f prefix
(F541)
413-413: f-string without any placeholders
Remove extraneous f prefix
(F541)
431-431: f-string without any placeholders
Remove extraneous f prefix
(F541)
439-439: f-string without any placeholders
Remove extraneous f prefix
(F541)
448-448: Do not catch blind exception: Exception
(BLE001)
453-453: f-string without any placeholders
Remove extraneous f prefix
(F541)
458-458: f-string without any placeholders
Remove extraneous f prefix
(F541)
463-463: f-string without any placeholders
Remove extraneous f prefix
(F541)
471-471: f-string without any placeholders
Remove extraneous f prefix
(F541)
478-478: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
478-478: Avoid specifying long messages outside the exception class
(TRY003)
494-494: f-string without any placeholders
Remove extraneous f prefix
(F541)
512-512: f-string without any placeholders
Remove extraneous f prefix
(F541)
515-515: Do not catch blind exception: Exception
(BLE001)
520-520: f-string without any placeholders
Remove extraneous f prefix
(F541)
526-526: f-string without any placeholders
Remove extraneous f prefix
(F541)
527-527: f-string without any placeholders
Remove extraneous f prefix
(F541)
529-529: f-string without any placeholders
Remove extraneous f prefix
(F541)
530-530: f-string without any placeholders
Remove extraneous f prefix
(F541)
532-532: f-string without any placeholders
Remove extraneous f prefix
(F541)
533-533: f-string without any placeholders
Remove extraneous f prefix
(F541)
535-535: f-string without any placeholders
Remove extraneous f prefix
(F541)
536-536: f-string without any placeholders
Remove extraneous f prefix
(F541)
ami/jobs/management/commands/update_stale_jobs.py
29-29: Unused method argument: args
(ARG002)
47-47: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: test
- GitHub Check: Redirect rules
- GitHub Check: Header rules
- GitHub Check: Pages changed
| if status_changed: | ||
| job.refresh_from_db() | ||
| if dry_run: | ||
| self.stdout.write( | ||
| self.style.WARNING(f" → Would change to: {job.status} (dry-run, not saved)") | ||
| ) | ||
| else: | ||
| self.stdout.write(self.style.SUCCESS(f" → Changed to: {job.status}")) | ||
| updated += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dry-run reporting prints the wrong target status
When --check-all runs in dry-run mode, job.check_status(save=False, …) mutates the in-memory job.status to the prospective value. Immediately refreshing from the database resets it before logging, so the output always shows the original status and the dry-run preview is misleading. Capture the new status before refreshing (or defer the refresh until after printing) so the CLI reports what would actually change.
if status_changed:
- job.refresh_from_db()
- if dry_run:
- self.stdout.write(
- self.style.WARNING(f" → Would change to: {job.status} (dry-run, not saved)")
- )
- else:
- self.stdout.write(self.style.SUCCESS(f" → Changed to: {job.status}"))
+ new_status = job.status
+ if dry_run:
+ self.stdout.write(
+ self.style.WARNING(f" → Would change to: {new_status} (dry-run, not saved)")
+ )
+ job.refresh_from_db()
+ else:
+ job.refresh_from_db()
+ self.stdout.write(self.style.SUCCESS(f" → Changed to: {job.status}"))
updated += 1📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if status_changed: | |
| job.refresh_from_db() | |
| if dry_run: | |
| self.stdout.write( | |
| self.style.WARNING(f" → Would change to: {job.status} (dry-run, not saved)") | |
| ) | |
| else: | |
| self.stdout.write(self.style.SUCCESS(f" → Changed to: {job.status}")) | |
| updated += 1 | |
| if status_changed: | |
| new_status = job.status | |
| if dry_run: | |
| self.stdout.write( | |
| self.style.WARNING(f" → Would change to: {new_status} (dry-run, not saved)") | |
| ) | |
| job.refresh_from_db() | |
| else: | |
| job.refresh_from_db() | |
| self.stdout.write(self.style.SUCCESS(f" → Changed to: {job.status}")) | |
| updated += 1 |
🤖 Prompt for AI Agents
In ami/jobs/management/commands/debug_jobs.py around lines 315 to 323, the
dry-run branch refreshes the job from DB before printing, which resets the
in-memory job.status and causes the CLI to show the original status instead of
the prospective one; capture the new_status returned or assigned by
job.check_status(save=False, ...) (or store job.status into a local variable
immediately after check_status) and use that saved value when printing the
dry-run message (or move job.refresh_from_db() to after the print) so the "Would
change to" output shows the intended target status.
| dependencies = [ | ||
| ("jobs", "0017_alter_job_logs_alter_job_progress"), | ||
| ] | ||
|
|
||
| operations = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Declare dependency on django_celery_beat before touching its tables.
This migration calls IntervalSchedule.objects.get_or_create() and PeriodicTask.objects.get_or_create(), but it has no dependency on django_celery_beat. On a fresh database Django can schedule this migration before the beat app’s migrations, so the underlying tables don’t exist yet and you get a ProgrammingError: relation "django_celery_beat_periodictask" does not exist. Add an explicit dependency so we only run after beat’s schema is in place.
class Migration(migrations.Migration):
dependencies = [
("jobs", "0017_alter_job_logs_alter_job_progress"),
+ ("django_celery_beat", "0001_initial"), # adjust to the latest beat migration in this project
]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| dependencies = [ | |
| ("jobs", "0017_alter_job_logs_alter_job_progress"), | |
| ] | |
| operations = [ | |
| dependencies = [ | |
| ("jobs", "0017_alter_job_logs_alter_job_progress"), | |
| ("django_celery_beat", "0001_initial"), # adjust to the latest beat migration in this project | |
| ] | |
| operations = [ |
🧰 Tools
🪛 Ruff (0.14.2)
29-31: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
🤖 Prompt for AI Agents
In ami/jobs/migrations/0018_add_last_checked_at_and_periodic_task.py around
lines 29 to 33, the migration uses IntervalSchedule.objects.get_or_create() and
PeriodicTask.objects.get_or_create() but lacks a dependency on
django_celery_beat; add an explicit dependency tuple to the dependencies list
(for example ("django_celery_beat", "0001_initial")) so this migration runs only
after django_celery_beat’s migrations have created the required tables.

Summary
Introduces a new periodic status check for all incomplete jobs.
Fixes: #721
List of Changes
Detailed Description
How to Test the Changes
Checklist
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Tests