-
Notifications
You must be signed in to change notification settings - Fork 11
Enable async and distributed processing for the ML backend #910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…void shadowing the FlatBugDetector model
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enables asynchronous and distributed processing for ML backend workflows by implementing a task queue architecture using Celery and RabbitMQ. The system splits large image processing jobs into smaller batch tasks that can be processed by multiple workers, addressing scalability issues with long-running single tasks.
- Replaces single monolithic ML processing tasks with distributed batch processing using Celery task queues
- Adds support for external ML processing workers that can subscribe to pipeline-specific queues
- Implements comprehensive job status tracking through MLTaskRecord model for monitoring subtask progress
Reviewed Changes
Copilot reviewed 42 out of 44 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| processing_services/*/celery_worker/ | Adds Celery worker configuration and startup scripts for external ML processing services |
| processing_services/*/api/processing.py | Extracts core processing logic into reusable functions for both sync and async execution |
| ami/ml/models/pipeline.py | Implements async batch processing and task management functionality |
| ami/jobs/models.py | Adds MLTaskRecord model and comprehensive job progress tracking for distributed tasks |
| ami/ml/signals.py | Implements automatic worker subscription to pipeline-specific queues |
| docker-compose*.yml | Configures RabbitMQ message broker and additional Celery workers |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| ANTENNA_CELERY_WORKER_NAME = "antenna_celeryworker" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider making this an overridable environment variable (i.e. antenna_mlworker) to customize which workers subscribe
| if task.traceback: | ||
| # TODO: Error logs will have many tracebacks | ||
| # could add some processing to provide a concise error summary | ||
| job.logger.error(f"Subtask {task_name} ({task_id}) failed: {task.traceback}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logs the error, but then still tries to parse a successful result. Can you mark the task as failed and then continue to the next one? The status check did it's job correctly! But the subtask failed. Can you update the MLTaskRecord to say it failed?
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughIntroduces asynchronous ML subtask tracking and orchestration: new MLTaskRecord model, ML-specific job orchestration and Celery tasks (status checks, dangling-job cron), signal-driven dynamic worker queue subscriptions, RabbitMQ/Celery configuration changes, migrations, and integration tests. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant DjangoAPI as Django API
participant Scheduler as Celery Scheduler
participant Worker as Celery Worker\n(antenna)
participant Processing as Processing Service
participant DB as Database
User->>DjangoAPI: POST /jobs/ (MLJob)
DjangoAPI->>DB: Create Job (status=PENDING)
DjangoAPI->>Scheduler: Enqueue run_job
Scheduler->>Worker: run_job(job_id)
Worker->>DB: Create MLTaskRecord (status=STARTED)
Worker->>Scheduler: Submit process_pipeline_request tasks (per image)
par Async processing
Worker->>Processing: process_pipeline_request(images)
Processing-->>Worker: PipelineResultsResponse
Worker->>DB: Update MLTaskRecord (SUCCESS/FAIL)
end
Scheduler->>Worker: check_ml_job_status(job_id)
Worker->>DB: Query MLTaskRecord entries
alt All subtasks complete
Worker->>DB: Finalize Job (status=SUCCESS)
else In-progress remain
Worker->>Scheduler: Reschedule check_ml_job_status
end
sequenceDiagram
participant CeleryApp as Celery App
participant Signal as ML Signals
participant Inspector as Celery Inspector
participant Worker as Antenna Worker
participant Redis as Broker
CeleryApp->>Signal: worker_ready
Signal->>Inspector: inspect active workers
Inspector-->>Signal: antenna_celeryworker@host
Signal->>Redis: control.add_consumer(ml-pipeline-{slug}) for each Pipeline
Note over Redis: Worker subscribed to pipeline queues
Pipeline->>Signal: post_save (created)
Signal->>Redis: add_consumer for new pipeline
Pipeline->>Signal: post_delete
Signal->>Redis: remove_consumer for pipeline
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Areas requiring special attention:
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
docker-compose.yml (1)
101-125: Restore database readiness before running worker startup migrations.
celeryworkernow runspython manage.py migratebefore the worker boots, but itsdepends_ononly lists RabbitMQ. Without waiting for Postgres to be up, the migration can fail with anOperationalError, the container exits, anddjango(which now waits on a healthy worker) never starts. Please add an explicit Postgres dependency or a wait-for-db gate before running migrations so the worker starts reliably.Apply this diff to reintroduce the dependency:
- depends_on: - - rabbitmq + depends_on: + postgres: + condition: service_started + rabbitmq: + condition: service_started
🧹 Nitpick comments (2)
.envs/.local/.postgres (1)
6-6: Consider usingpostgresql://scheme instead ofpostgres://.The
postgres://URI scheme is deprecated. Modern PostgreSQL clients and tools (including SQLAlchemy and psycopg2) recommend usingpostgresql://instead.Apply this diff:
-DATABASE_URL=postgres://xekSryPnqczJXkOnTAeDmDyIapSRrGEE:iMRQjJEGflj5xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami +DATABASE_URL=postgresql://xekSryPnqczJXkOnTAeDmDyIapSRrGEE:iMRQjJEGflj5xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/amicompose/local/django/celery/worker/start (1)
8-8: Consider running migrations separately from worker startup.Running
migratein the worker startup script can cause issues if multiple workers start concurrently, even though Django has migration locking. It's better practice to run migrations as a separate initialization step before starting workers.Consider moving the migration to a separate initialization container or running it explicitly before starting workers in your orchestration layer.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (29)
.envs/.ci/.django(1 hunks).envs/.ci/.postgres(1 hunks).envs/.local/.django(1 hunks).envs/.local/.postgres(1 hunks)ami/jobs/admin.py(2 hunks)ami/jobs/migrations/0019_mltaskrecord.py(1 hunks)ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py(1 hunks)ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py(1 hunks)ami/jobs/migrations/0022_job_last_checked.py(1 hunks)ami/jobs/migrations/0023_alter_job_last_checked_alter_mltaskrecord_status.py(1 hunks)ami/jobs/models.py(10 hunks)ami/jobs/tasks.py(2 hunks)ami/jobs/tests.py(2 hunks)ami/jobs/views.py(2 hunks)ami/main/models.py(1 hunks)ami/ml/apps.py(1 hunks)ami/ml/migrations/0026_check_dangling_ml_jobs_celery_beat.py(1 hunks)ami/ml/models/__init__.py(1 hunks)ami/ml/models/pipeline.py(7 hunks)ami/ml/schemas.py(2 hunks)ami/ml/signals.py(1 hunks)ami/ml/tasks.py(4 hunks)ami/tests/fixtures/main.py(1 hunks)compose/local/django/Dockerfile(0 hunks)compose/local/django/celery/worker/start(1 hunks)config/settings/base.py(3 hunks)config/settings/local.py(1 hunks)docker-compose.ci.yml(5 hunks)docker-compose.yml(6 hunks)
💤 Files with no reviewable changes (1)
- compose/local/django/Dockerfile
🧰 Additional context used
🧬 Code graph analysis (16)
ami/jobs/tasks.py (1)
ami/jobs/models.py (4)
Job(1057-1355)MLJob(328-786)update_status(1238-1258)save(1290-1301)
ami/ml/schemas.py (6)
ui/src/data-services/models/job.ts (1)
pipeline(109-111)ui/src/data-services/models/pipeline.ts (1)
algorithms(29-31)ui/src/data-services/models/job-details.ts (1)
errors(18-20)ui/src/data-services/models/occurrence-details.ts (1)
detections(108-110)processing_services/minimal/api/schemas.py (1)
PipelineResultsResponse(230-243)processing_services/example/api/schemas.py (1)
PipelineResultsResponse(265-279)
ami/ml/tasks.py (2)
ami/ml/models/pipeline.py (3)
process_images(187-285)process_images(1240-1252)save(1276-1282)ami/jobs/models.py (7)
Job(1057-1355)MLJob(328-786)logger(1340-1349)check_inprogress_subtasks(314-318)check_inprogress_subtasks(333-512)check_inprogress_subtasks(1190-1197)save(1290-1301)
ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py (2)
ami/jobs/migrations/0019_mltaskrecord.py (1)
Migration(9-72)ami/jobs/models.py (3)
JobLogs(255-257)default_job_progress(210-214)JobProgress(113-207)
ami/jobs/migrations/0023_alter_job_last_checked_alter_mltaskrecord_status.py (1)
ami/utils/schemas.py (1)
choices(63-65)
ami/jobs/views.py (2)
ami/jobs/models.py (6)
Job(1057-1355)JobState(32-68)MLJob(328-786)check_inprogress_subtasks(314-318)check_inprogress_subtasks(333-512)check_inprogress_subtasks(1190-1197)ami/ml/tasks.py (1)
check_ml_job_status(111-149)
ami/main/models.py (2)
ami/main/admin.py (1)
regroup_events(180-183)ami/tasks.py (1)
regroup_events(90-99)
ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py (1)
ami/jobs/migrations/0019_mltaskrecord.py (1)
Migration(9-72)
ami/jobs/migrations/0022_job_last_checked.py (2)
ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py (1)
Migration(8-28)ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py (1)
Migration(6-30)
ami/jobs/models.py (5)
ami/ml/schemas.py (3)
PipelineRequest(174-178)PipelineResultsResponse(181-231)combine_with(198-231)ami/ml/signals.py (2)
get_worker_name(15-28)subscribe_celeryworker_to_pipeline_queues(32-67)ami/ml/tasks.py (1)
check_ml_job_status(111-149)ami/jobs/views.py (1)
check_inprogress_subtasks(161-176)ami/ml/models/pipeline.py (6)
save_results(955-1063)save_results(1268-1269)save_results_async(1271-1274)save(1276-1282)process_pipeline_request(164-184)schedule_process_images(1254-1266)
ami/ml/models/__init__.py (1)
ami/ml/models/pipeline.py (2)
Pipeline(1105-1282)PipelineSaveResults(945-951)
ami/ml/models/pipeline.py (3)
ami/main/models.py (15)
name(987-988)SourceImage(1623-1871)Project(225-447)config(1394-1402)save(319-322)save(882-897)save(1119-1122)save(1462-1469)save(1561-1564)save(1829-1832)save(2110-2131)save(2347-2353)save(2526-2531)save(2814-2831)save(3451-3454)ami/ml/schemas.py (5)
PipelineRequest(174-178)PipelineResultsResponse(181-231)SourceImageRequest(116-120)PipelineRequestConfigParameters(155-171)DetectionRequest(138-142)ami/jobs/models.py (4)
logger(1340-1349)Job(1057-1355)MLTaskRecord(1005-1054)save(1290-1301)
ami/jobs/admin.py (2)
ami/jobs/models.py (3)
Job(1057-1355)MLTaskRecord(1005-1054)kill_task(1045-1054)ami/main/admin.py (2)
AdminBase(41-58)queryset(469-472)
ami/ml/signals.py (1)
ami/ml/models/pipeline.py (1)
Pipeline(1105-1282)
ami/jobs/migrations/0019_mltaskrecord.py (2)
ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py (1)
Migration(8-28)ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py (1)
Migration(6-30)
ami/jobs/tests.py (2)
ami/jobs/models.py (18)
Job(1057-1355)JobState(32-68)MLJob(328-786)MLSubtaskNames(992-994)MLSubtaskState(997-1002)save(1290-1301)logger(1340-1349)run(321-325)run(657-786)run(794-832)run(840-883)run(895-919)run(927-947)run(955-956)run(1199-1207)update_job_progress(515-654)MLTaskRecord(1005-1054)kill_task(1045-1054)ami/ml/tasks.py (2)
check_ml_job_status(111-149)check_dangling_ml_jobs(153-183)
🪛 Ruff (0.14.3)
ami/ml/schemas.py
206-206: Consider [self, *others] instead of concatenation
Replace with [self, *others]
(RUF005)
210-210: Avoid specifying long messages outside the exception class
(TRY003)
214-214: Avoid specifying long messages outside the exception class
(TRY003)
218-218: Avoid specifying long messages outside the exception class
(TRY003)
ami/ml/tasks.py
137-137: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
137-137: Avoid specifying long messages outside the exception class
(TRY003)
138-138: Do not catch blind exception: Exception
(BLE001)
140-140: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
149-149: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
149-149: Create your own exception
(TRY002)
ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py
9-11: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
13-28: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/migrations/0023_alter_job_last_checked_alter_mltaskrecord_status.py
8-10: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
12-33: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/views.py
161-161: Unused method argument: request
(ARG002)
161-161: Unused method argument: pk
(ARG002)
ami/ml/apps.py
10-10: Unused noqa directive (non-enabled: F401)
Remove unused noqa directive
(RUF100)
ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py
7-9: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
11-30: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/migrations/0022_job_last_checked.py
7-9: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
11-17: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/models.py
437-437: Create your own exception
(TRY002)
437-437: Avoid specifying long messages outside the exception class
(TRY003)
755-755: Do not catch blind exception: Exception
(BLE001)
756-756: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1043-1043: Avoid specifying long messages outside the exception class
(TRY003)
ami/ml/models/pipeline.py
334-334: Function definition does not bind loop variable prediction_request
(B023)
335-335: Function definition does not bind loop variable task_id
(B023)
377-377: Avoid specifying long messages outside the exception class
(TRY003)
ami/jobs/admin.py
76-76: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/ml/signals.py
27-27: Do not catch blind exception: Exception
(BLE001)
32-32: Unused function argument: kwargs
(ARG001)
38-38: Use is and is not for type comparisons, or isinstance() for isinstance checks
(E721)
57-57: Avoid specifying long messages outside the exception class
(TRY003)
65-65: Redundant exception object included in logging.exception call
(TRY401)
71-71: Unused function argument: sender
(ARG001)
71-71: Unused function argument: kwargs
(ARG001)
88-88: Redundant exception object included in logging.exception call
(TRY401)
92-92: Unused function argument: sender
(ARG001)
92-92: Unused function argument: kwargs
(ARG001)
99-99: Abstract raise to an inner function
(TRY301)
99-99: Avoid specifying long messages outside the exception class
(TRY003)
104-104: Redundant exception object included in logging.exception call
(TRY401)
ami/jobs/migrations/0019_mltaskrecord.py
10-13: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
15-72: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/tests.py
326-326: Do not catch blind exception: Exception
(BLE001)
327-327: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/migrations/0026_check_dangling_ml_jobs_celery_beat.py
5-5: Unused function argument: apps
(ARG001)
5-5: Unused function argument: schema_editor
(ARG001)
21-21: Unused function argument: apps
(ARG001)
21-21: Unused function argument: schema_editor
(ARG001)
27-29: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
31-33: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Redirect rules
- GitHub Check: Header rules
- GitHub Check: Pages changed
- GitHub Check: test
🔇 Additional comments (7)
ami/main/models.py (1)
888-890: Excellent improvement for transaction safety with async tasks.Wrapping the Celery task in
transaction.on_commit()ensures theregroup_eventstask is only enqueued after the database transaction successfully commits. This prevents race conditions where the task could execute before the deployment data is persisted, or execute at all if the transaction rolls back.This aligns with the broader pattern mentioned in the PR objectives of coordinating asynchronous pipeline steps with DB transactions to ensure consistency.
.envs/.ci/.django (1)
23-25: LGTM!The Celery broker URL and RabbitMQ credentials are properly configured for the CI environment.
compose/local/django/celery/worker/start (1)
7-9: LGTM!The Celery worker configuration correctly specifies the
antennaqueue and uses a descriptive worker name for discoverability.ami/jobs/tasks.py (1)
51-56: LGTM!The conditional logic correctly delegates status management to MLJob's custom implementation, which uses MLTaskRecord tracking and dedicated status update methods (
check_inprogress_subtasksandupdate_job_progress).ami/ml/apps.py (1)
9-10: LGTM!The
ready()method correctly imports signals for registration during Django app initialization, following standard Django patterns.ami/ml/models/__init__.py (1)
2-2: LGTM!The export of
PipelineSaveResultsis properly structured and aligns with the new dataclass introduced for ML pipeline result handling.Also applies to: 10-10
config/settings/local.py (1)
91-92: LGTM!Setting the default Celery queue to
antennaaligns with the worker configuration and queue-based task routing introduced in this PR.
| POSTGRES_DB=ami-ci | ||
| POSTGRES_USER=4JXkOnTAeDmDyIapSRrGEE | ||
| POSTGRES_PASSWORD=d4xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI | ||
| DATABASE_URL=postgres://xekSryPnqczJXkOnTAeDmDyIapSRrGEE:iMRQjJEGflj5xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: DATABASE_URL credentials and database name don't match the individual environment variables.
The DATABASE_URL contains different credentials and database name than the individual POSTGRES_* variables:
- User mismatch: URL has
xekSryPnqczJXkOnTAeDmDyIapSRrGEEbutPOSTGRES_USER(line 4) is4JXkOnTAeDmDyIapSRrGEE - Password mismatch: URL password differs from
POSTGRES_PASSWORD(line 5) - Database mismatch: URL has
amibutPOSTGRES_DB(line 3) isami-ci
This will cause CI connection failures when code uses DATABASE_URL instead of individual variables.
Apply this diff to fix the mismatches and use the recommended postgresql:// scheme:
-DATABASE_URL=postgres://xekSryPnqczJXkOnTAeDmDyIapSRrGEE:iMRQjJEGflj5xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami
+DATABASE_URL=postgresql://4JXkOnTAeDmDyIapSRrGEE:d4xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami-ci📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| DATABASE_URL=postgres://xekSryPnqczJXkOnTAeDmDyIapSRrGEE:iMRQjJEGflj5xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami | |
| DATABASE_URL=postgresql://4JXkOnTAeDmDyIapSRrGEE:d4xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami-ci |
🤖 Prompt for AI Agents
In .envs/.ci/.postgres around line 6, the DATABASE_URL contains mismatched user,
password, and database name compared to the POSTGRES_* variables and uses the
old scheme; update DATABASE_URL to use the postgresql:// scheme and exact values
from POSTGRES_USER, POSTGRES_PASSWORD, and POSTGRES_DB (so the username,
password, host, port and database name match the individual variables), ensuring
the URL credentials and database name are identical to lines 3–5.
| migrations.AlterField( | ||
| model_name="job", | ||
| name="last_checked", | ||
| field=models.DateTimeField(blank=True, default=datetime.datetime.now, null=True), | ||
| ), | ||
| migrations.AlterField( | ||
| model_name="mltaskrecord", | ||
| name="status", | ||
| field=models.CharField( | ||
| choices=[ | ||
| ("PENDING", "PENDING"), | ||
| ("STARTED", "STARTED"), | ||
| ("SUCCESS", "SUCCESS"), | ||
| ("FAIL", "FAIL"), | ||
| ("REVOKED", "REVOKED"), | ||
| ], | ||
| default="STARTED", | ||
| max_length=255, | ||
| ), | ||
| ), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a timezone-aware default for last_checked.
DateTimeField(default=datetime.datetime.now) returns naïve datetimes when USE_TZ=True, triggering warnings and risking incorrect conversions. Please switch to django.utils.timezone.now, which returns an aware datetime.
Suggested fix:
-import datetime
-from django.db import migrations, models
+import datetime
+from django.db import migrations, models
+from django.utils import timezone
@@
- field=models.DateTimeField(blank=True, default=datetime.datetime.now, null=True),
+ field=models.DateTimeField(blank=True, default=timezone.now, null=True),Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In ami/jobs/migrations/0023_alter_job_last_checked_alter_mltaskrecord_status.py
around lines 13 to 32, the migrations.AlterField for Job.last_checked uses
datetime.datetime.now which produces naive datetimes under USE_TZ=True; replace
the default with django.utils.timezone.now to return timezone-aware datetimes,
and update the import at the top of the migration file to import timezone.now
(or import from django.utils import timezone and use timezone.now) so the
migration default is timezone-aware.
| assert job.job_type_key == MLJob.key, f"{job} is not an ML job." | ||
| has_inprogress_tasks = job.check_inprogress_subtasks() | ||
| if has_inprogress_tasks: | ||
| # Schedule task to update the job status | ||
| from django.db import transaction | ||
|
|
||
| from ami.ml.tasks import check_ml_job_status | ||
|
|
||
| transaction.on_commit(lambda: check_ml_job_status.apply_async((job.pk,))) | ||
| return Response({"inprogress_subtasks": has_inprogress_tasks}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix inverted rescheduling logic for ML subtasks.
Job.check_inprogress_subtasks() returns True when all subtasks have finished and False while work is still running (see the docstring in ami/jobs/models.py lines 1189-1196). Capturing that as has_inprogress_tasks flips the meaning, so we only reschedule the polling task once everything is done and we tell the client there are in-progress subtasks when there are not. Flip the condition and surface the correct boolean so the monitoring loop keeps running.
- has_inprogress_tasks = job.check_inprogress_subtasks()
- if has_inprogress_tasks:
+ subtasks_complete = job.check_inprogress_subtasks()
+ inprogress_subtasks = subtasks_complete is False
+ if inprogress_subtasks:
# Schedule task to update the job status
from django.db import transaction
from ami.ml.tasks import check_ml_job_status
transaction.on_commit(lambda: check_ml_job_status.apply_async((job.pk,)))
- return Response({"inprogress_subtasks": has_inprogress_tasks})
+ return Response({"inprogress_subtasks": inprogress_subtasks})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| assert job.job_type_key == MLJob.key, f"{job} is not an ML job." | |
| has_inprogress_tasks = job.check_inprogress_subtasks() | |
| if has_inprogress_tasks: | |
| # Schedule task to update the job status | |
| from django.db import transaction | |
| from ami.ml.tasks import check_ml_job_status | |
| transaction.on_commit(lambda: check_ml_job_status.apply_async((job.pk,))) | |
| return Response({"inprogress_subtasks": has_inprogress_tasks}) | |
| assert job.job_type_key == MLJob.key, f"{job} is not an ML job." | |
| subtasks_complete = job.check_inprogress_subtasks() | |
| inprogress_subtasks = subtasks_complete is False | |
| if inprogress_subtasks: | |
| # Schedule task to update the job status | |
| from django.db import transaction | |
| from ami.ml.tasks import check_ml_job_status | |
| transaction.on_commit(lambda: check_ml_job_status.apply_async((job.pk,))) | |
| return Response({"inprogress_subtasks": inprogress_subtasks}) |
🤖 Prompt for AI Agents
In ami/jobs/views.py around lines 167-176, the code treats
Job.check_inprogress_subtasks() as "has in-progress tasks" but that method
returns True when all subtasks have finished; this inverts logic and reschedules
only when work is done. Change the variable/logic to reflect the actual meaning
(e.g., all_subtasks_finished = job.check_inprogress_subtasks()), schedule
check_ml_job_status only when NOT all_subtasks_finished (i.e., while work is
still running), and return {"inprogress_subtasks": not all_subtasks_finished} so
the client receives the correct boolean.
| lambda: process_pipeline_request.apply_async( | ||
| args=[prediction_request.dict(), project_id], | ||
| task_id=task_id, | ||
| # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)? | ||
| queue=f"ml-pipeline-{pipeline}", | ||
| # all pipelines have their own queue beginning with "ml-pipeline-" | ||
| # the antenna celeryworker should subscribe to all pipeline queues | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix late-binding bug when scheduling Celery subtasks.
transaction.on_commit runs after the loop finishes, so this lambda captures prediction_request and task_id by reference. Every callback will execute with the final batch’s payload and UUID, so earlier batches never reach their queue. Bind the per-iteration values when defining the callback (e.g., via default arguments or functools.partial).
Apply this diff to lock in the values:
- transaction.on_commit(
- lambda: process_pipeline_request.apply_async(
- args=[prediction_request.dict(), project_id],
- task_id=task_id,
- # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)?
- queue=f"ml-pipeline-{pipeline}",
- # all pipelines have their own queue beginning with "ml-pipeline-"
- # the antenna celeryworker should subscribe to all pipeline queues
- )
- )
+ transaction.on_commit(
+ lambda prediction_request=prediction_request, task_id=task_id: process_pipeline_request.apply_async(
+ args=[prediction_request.dict(), project_id],
+ task_id=task_id,
+ # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)?
+ queue=f"ml-pipeline-{pipeline}",
+ # all pipelines have their own queue beginning with "ml-pipeline-"
+ # the antenna celeryworker should subscribe to all pipeline queues
+ )
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| lambda: process_pipeline_request.apply_async( | |
| args=[prediction_request.dict(), project_id], | |
| task_id=task_id, | |
| # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)? | |
| queue=f"ml-pipeline-{pipeline}", | |
| # all pipelines have their own queue beginning with "ml-pipeline-" | |
| # the antenna celeryworker should subscribe to all pipeline queues | |
| ) | |
| ) | |
| lambda prediction_request=prediction_request, task_id=task_id: process_pipeline_request.apply_async( | |
| args=[prediction_request.dict(), project_id], | |
| task_id=task_id, | |
| # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)? | |
| queue=f"ml-pipeline-{pipeline}", | |
| # all pipelines have their own queue beginning with "ml-pipeline-" | |
| # the antenna celeryworker should subscribe to all pipeline queues | |
| ) | |
| ) |
🧰 Tools
🪛 Ruff (0.14.3)
334-334: Function definition does not bind loop variable prediction_request
(B023)
335-335: Function definition does not bind loop variable task_id
(B023)
🤖 Prompt for AI Agents
In ami/ml/models/pipeline.py around lines 333 to 341, the lambda passed to
transaction.on_commit closes over loop variables causing a late-binding bug;
capture per-iteration values before registering the callback (e.g.,
local_prediction = prediction_request.dict(), local_task_id = task_id,
local_pipeline = pipeline) and use them as default args in the lambda or use
functools.partial so each callback calls process_pipeline_request.apply_async
with the bound args/task_id/queue (queue=f"ml-pipeline-{local_pipeline}")
instead of referencing the loop variables.
| inspector = celery_app.control.inspect() | ||
| active_workers = inspector.active() | ||
| if active_workers: # TODO: currently only works if there is one worker | ||
| # NOTE: all antenna celery workers should have ANTENNA_CELERY_WORKER_NAME | ||
| # in their name instead of the the default "celery" | ||
| return next((worker for worker in active_workers.keys() if ANTENNA_CELERY_WORKER_NAME in worker), None) | ||
| except Exception as e: | ||
| logger.warning(f"Could not find antenna celery worker name: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_worker_name fails whenever the worker is idle
inspect().active() returns {} when there are no active tasks, so we return None and downstream assertions fail. As a result, pipeline_created never subscribes the worker to new queues during normal idle periods. Please discover workers via inspect().stats()/ping() (or the sender object when available) so we always obtain the hostname.
- inspector = celery_app.control.inspect()
- active_workers = inspector.active()
- if active_workers: # TODO: currently only works if there is one worker
- # NOTE: all antenna celery workers should have ANTENNA_CELERY_WORKER_NAME
- # in their name instead of the the default "celery"
- return next((worker for worker in active_workers.keys() if ANTENNA_CELERY_WORKER_NAME in worker), None)
+ inspector = celery_app.control.inspect()
+ worker_sets = [
+ inspector.active() or {},
+ inspector.registered() or {},
+ inspector.stats() or {},
+ ]
+ for workers in worker_sets:
+ for worker in workers:
+ if ANTENNA_CELERY_WORKER_NAME in worker:
+ return worker🧰 Tools
🪛 Ruff (0.14.3)
27-27: Do not catch blind exception: Exception
(BLE001)
| if not pipelines: | ||
| # TODO: kinda hacky. is there a way to unify the django and celery logs | ||
| # to more easily see which queues the worker is subscribed to? | ||
| raise ValueError("No pipelines found; cannot subscribe to any queues.") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t raise when there are no pipelines yet
Fresh environments legitimately start with zero pipelines. Raising ValueError during worker_ready aborts the worker before it can process jobs, leaving the whole ML queue offline. Please log and return early instead of throwing.
- if not pipelines:
- # TODO: kinda hacky. is there a way to unify the django and celery logs
- # to more easily see which queues the worker is subscribed to?
- raise ValueError("No pipelines found; cannot subscribe to any queues.")
+ if not pipelines:
+ logger.info("No pipelines found; worker subscription is skipped until a pipeline is created.")
+ return True🧰 Tools
🪛 Ruff (0.14.3)
57-57: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
In ami/ml/signals.py around lines 54 to 58, the current worker_ready handler
raises ValueError when pipelines is empty which crashes the worker; instead, log
an informative message and return early so the worker stays up. Replace the
raise with a call to the module/Django logger (e.g. logger.info or
logger.warning) that includes context ("no pipelines configured; worker will not
subscribe to ML queues yet") and then return from the function; do not re-raise
or exit the process.
| project = pipeline.projects.first() | ||
| assert project, f"Pipeline {pipeline} must be associated with a project." | ||
|
|
||
| results = process_images( | ||
| pipeline=pipeline, | ||
| endpoint_url=endpoint_url, | ||
| images=images, | ||
| job_id=job_id, | ||
| ) | ||
| results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project.pk) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the job’s project instead of pipeline.projects.first()
pipeline.projects.first() can return the wrong project (or None) when a pipeline is linked to multiple projects or was created without a ManyToMany assignment yet. That means we’ll pass the wrong project_id into process_images, so downstream service selection and config overrides run against the wrong project, breaking request routing for multi-project pipelines. Please derive the project from the ML job when job_id is present, and only fall back to an explicit argument or raise if it’s missing.
- pipeline = Pipeline.objects.get(slug=pipeline_choice)
- project = pipeline.projects.first()
- assert project, f"Pipeline {pipeline} must be associated with a project."
-
- results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project.pk)
+ pipeline = Pipeline.objects.get(slug=pipeline_choice)
+ project_id: int | None = None
+ if job and job.project_id:
+ project_id = job.project_id
+ else:
+ project = pipeline.projects.first()
+ if project:
+ project_id = project.pk
+ assert project_id is not None, (
+ f"Cannot determine project for pipeline {pipeline}; the job must supply a project."
+ )
+
+ results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project_id)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In ami/ml/tasks.py around lines 29 to 33, avoid using pipeline.projects.first()
because it can return the wrong project or None; instead when job_id is provided
load the ML job (e.g., MLJob.objects.get(pk=job_id)) and derive the project from
job.project (or job.project_id) and pass that project's pk into process_images;
if no job_id use an explicit project argument if provided, otherwise raise an
informative error so we never silently use pipeline.projects.first().
| <<: *django | ||
| image: ami_ci_celeryworker | ||
| command: /start-celeryworker | ||
| depends_on: | ||
| - rabbitmq | ||
| # start the worker with antenna_celeryworker to ensure it's discoverable by ami.ml.signals.get_worker_name | ||
| command: | ||
| - sh | ||
| - -c | ||
| - | | ||
| python manage.py migrate && | ||
| python -m celery -A config.celery_app worker --queues=antenna -n antenna_celeryworker@%h -l INFO | ||
| healthcheck: | ||
| # make sure DATABASE_URL is inside the ./.envs/.ci/.postgres | ||
| test: ["CMD-SHELL", "celery -A config.celery_app inspect ping -d antenna_celeryworker@$(hostname) | grep -q pong"] | ||
| interval: 10s | ||
| timeout: 50s | ||
| retries: 5 | ||
| start_period: 10s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore the inherited service dependencies for the Celery worker
By overriding depends_on with a bare list we drop the Postgres/Redis/MinIO dependencies that *django provided. The worker now runs python manage.py migrate without waiting for Postgres, so CI routinely fails with connection errors. Please keep the original dependency map and add RabbitMQ alongside it.
celeryworker:
<<: *django
- depends_on:
- - rabbitmq
+ depends_on:
+ postgres:
+ condition: service_started
+ redis:
+ condition: service_started
+ minio-init:
+ condition: service_started
+ rabbitmq:
+ condition: service_started
+ ml_backend:
+ condition: service_started
+ celeryworker:
+ condition: service_healthy
# start the worker with antenna_celeryworker to ensure it's discoverable by ami.ml.signals.get_worker_nameCommittable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In docker-compose.ci.yml around lines 50 to 66, the depends_on for the Celery
worker was replaced with a bare list dropping the *django inherited dependencies
(Postgres/Redis/MinIO); restore the original dependency map by using the YAML
merge to include the *django depends_on and then append rabbitmq (e.g. use a
depends_on mapping that merges <<: *django and adds rabbitmq to the list) so the
worker waits for DB/Redis/MinIO plus RabbitMQ before running migrations.
| # TODO: Error logs will have many tracebacks | ||
| # could add some processing to provide a concise error summary | ||
| job.logger.error(f"Subtask {task_name} ({task_id}) failed: {task.traceback}") | ||
| inprogress_subtask.status = MLSubtaskState.FAIL.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this get saved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, with continue we move on to checking the next subtask. And only every 10 subtasks do we do a bulk update to the tasks.
Lines 441 to 452 in bd86042
| if len(inprogress_subtasks_to_update) >= 10: | |
| MLTaskRecord.objects.bulk_update( | |
| inprogress_subtasks_to_update, | |
| [ | |
| "status", | |
| "raw_traceback", | |
| "raw_results", | |
| "num_captures", | |
| "num_detections", | |
| "num_classifications", | |
| ], | |
| ) |
| job.save() | ||
|
|
||
| # Remove remaining tasks from the queue | ||
| for ml_task_record in job.ml_task_records.all(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have a Cancel action for the parent Job. Can you move or add this logic there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
ami/ml/tasks.py (1)
29-32: Critical: Resolve thepipeline.projects.first()issue from the previous review.Using
pipeline.projects.first()can return the wrong project (orNone) when a pipeline is linked to multiple projects, breaking request routing and config overrides. The previous review comment at lines 29-33 recommended deriving the project fromjob.projectwhenjob_idis present. This issue remains unresolved.Please apply the suggested fix from the previous review:
images = SourceImage.objects.filter(pk__in=image_ids) pipeline = Pipeline.objects.get(slug=pipeline_choice) - project = pipeline.projects.first() - assert project, f"Pipeline {pipeline} must be associated with a project." + project_id: int | None = None + if job and job.project_id: + project_id = job.project_id + else: + project = pipeline.projects.first() + if project: + project_id = project.pk + assert project_id is not None, ( + f"Cannot determine project for pipeline {pipeline}; the job must supply a project." + ) - results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project.pk) + results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project_id)
🧹 Nitpick comments (3)
ami/jobs/tests.py (2)
326-328: Uselogging.exceptionfor better error context.When logging within an exception handler,
logging.exceptionautomatically includes the stack trace, making debugging easier.Apply this diff:
except Exception as e: - logger.error(f"Error fetching details for task {task_id}: {e}") + logger.exception(f"Error fetching details for task {task_id}: {e}") details[task_id] = {"error": str(e)}
499-499: Simplify the state check.Use a direct equality comparison instead of checking membership in a single-element list.
Apply this diff:
- self.assertIn(async_result.state, ["REVOKED"]) + self.assertEqual(async_result.state, "REVOKED")ami/ml/tasks.py (1)
158-158: Remove redundant import.
datetimeis already imported at the top of the file (line 1), so this local import is unnecessary.Apply this diff:
- import datetime - from ami.jobs.models import Job, JobState, MLJob
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
ami/jobs/tests.py(2 hunks)ami/ml/tasks.py(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/jobs/tests.py (2)
ami/jobs/models.py (18)
Job(1057-1355)JobState(32-68)MLJob(328-786)MLSubtaskNames(992-994)MLSubtaskState(997-1002)save(1290-1301)logger(1340-1349)run(321-325)run(657-786)run(794-832)run(840-883)run(895-919)run(927-947)run(955-956)run(1199-1207)update_job_progress(515-654)MLTaskRecord(1005-1054)kill_task(1045-1054)ami/ml/tasks.py (2)
check_ml_job_status(111-149)check_dangling_ml_jobs(153-188)
ami/ml/tasks.py (3)
ami/ml/models/pipeline.py (3)
process_images(187-285)process_images(1240-1252)save(1276-1282)ami/jobs/models.py (9)
Job(1057-1355)MLJob(328-786)logger(1340-1349)check_inprogress_subtasks(314-318)check_inprogress_subtasks(333-512)check_inprogress_subtasks(1190-1197)save(1290-1301)update_status(1238-1258)kill_task(1045-1054)ami/jobs/views.py (1)
check_inprogress_subtasks(161-176)
🪛 Ruff (0.14.3)
ami/jobs/tests.py
326-326: Do not catch blind exception: Exception
(BLE001)
327-327: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/tasks.py
137-137: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
137-137: Avoid specifying long messages outside the exception class
(TRY003)
138-138: Do not catch blind exception: Exception
(BLE001)
140-140: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
149-149: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
149-149: Create your own exception
(TRY002)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (1)
ami/jobs/tests.py (1)
354-356: Verify that directly callingcheck_ml_job_statuswithin the test loop is intentional.Invoking
check_ml_job_status(job.pk)andMLJob.update_job_progress(job)synchronously within the test polling loop bypasses the normal async callback flow. While this is acceptable for integration testing to force status updates, ensure it doesn't mask timing or concurrency issues that could occur in production.
| except Job.DoesNotExist: | ||
| raise ValueError(f"Job with ID {ml_job_id} does not exist.") | ||
| except Exception as e: | ||
| error_msg = f"Error checking status for job with ID {ml_job_id}: {e}" | ||
| job.logger.error(error_msg) | ||
| job.update_status(JobState.FAILURE) | ||
| job.finished_at = datetime.datetime.now() | ||
| job.save() | ||
|
|
||
| # Remove remaining tasks from the queue | ||
| for ml_task_record in job.ml_task_records.all(): | ||
| ml_task_record.kill_task() | ||
|
|
||
| raise Exception(error_msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Improve exception handling and logging.
The error handling has several issues:
- Line 137: Use exception chaining (
raise ... from e) to preserve the original context. - Line 140: Use
logging.exceptioninstead oflogging.errorto include the stack trace automatically. - Line 149: Use exception chaining when re-raising.
Apply this diff:
except Job.DoesNotExist:
- raise ValueError(f"Job with ID {ml_job_id} does not exist.")
+ raise ValueError(f"Job with ID {ml_job_id} does not exist.") from None
except Exception as e:
error_msg = f"Error checking status for job with ID {ml_job_id}: {e}"
- job.logger.error(error_msg)
+ job.logger.exception(error_msg)
job.update_status(JobState.FAILURE)
job.finished_at = datetime.datetime.now()
job.save()
# Remove remaining tasks from the queue
for ml_task_record in job.ml_task_records.all():
ml_task_record.kill_task()
- raise Exception(error_msg)
+ raise Exception(error_msg) from e🧰 Tools
🪛 Ruff (0.14.3)
137-137: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
137-137: Avoid specifying long messages outside the exception class
(TRY003)
138-138: Do not catch blind exception: Exception
(BLE001)
140-140: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
149-149: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
149-149: Create your own exception
(TRY002)
🤖 Prompt for AI Agents
In ami/ml/tasks.py around lines 136 to 149, the exception handling should
preserve original exception context and include stack traces: replace the
job.logger.error call with job.logger.exception(error_msg) so the stack trace is
logged, change the initial generic except Exception as e to re-raise chained
exceptions where appropriate (use "raise ValueError(... ) from e" for the
Job.DoesNotExist handling if re-raising a different exception) and when
re-raising the final Exception at the end use "raise Exception(error_msg) from
e" to maintain exception chaining; keep the job status/update/task cleanup logic
but ensure any re-raises use "from e" and use logging.exception for full stack
traces.
Summary
Currently, we run MLJobs as a single async celery task which is not scalable to large batch image processing jobs. For example, when processing a large number of image (i.e. 100+ images), the singular long running task is easily interrupted due to a failed worker and also takes a long time to complete (since images still have to be processed in a sequential manner).
This PR uses Celery as the task queue and RabbitMQ as the message broker to send batches of images as individual
process_pipeline_requests. Each queue is dedicated to a specific ML pipeline (named after the pipeline slug). A celery worker can pick up tasks based on the pipelines they subscribe to. For example, the antenna celery workers may subscribe to all queues; an external celeryworker would subscribe to select pipelines.The
ami.ml.tasks.check_ml_job_statustask listens for completedprocess_pipeline_requestsand enqueuessave_resultstasks; thecheck_ml_job_statusreschedules itself when there are still in progress tasks.Related PR
This PR focuses on the "controller" side of the batch image processing system. This PR should be ready to deploy (pending further testing in a Staging environment) and tasks will be processed by antenna workers.
A follow up PR addresses adding additional consumers via the processing service template: #1011
List of Changes
Add a
process_pipeline_requesttask which takes aPipelineRequestand returns the model's results. The logic for this task is defined on the antenna side and can be added to external celeryworkers/processing services.Introduces a new
MLTaskRecordmodel which stores the results and stats of a celery task.Update the
Jobmodel to includeml_task_recordsto track the celery tasks currently in-progress and completed (these can be eitherprocess_pipeline_requestorsave_resultstasks)Add
ami.ml.tasks.check_ml_job_statuswhich checks the subtasks of anMLJob, updates the job status, and schedulessave_resultstasks.Add a celery beat task
ami.ml.tasks.check_dangling_ml_jobswhich revokes MLJobs that areSTARTEDbut has had not been checked bycheck_ml_job_statusrecently.All tests should now be run with the separate
docker-compose.cistack to ensure isolated testing.Related Issues
Addresses in part #515
The planning of this feature was discussed in #515. See the comments beginning at #515 (comment)
Detailed Description
Note, this PR doesn't include external celery workers, it only updates the antenna celery worker.
%%{init: {"theme": "base", "themeVariables": { "background": "#ffffff" }}}%% flowchart TD subgraph Django["CONTAINER:DJANGO"] subgraph MLJob["class:MLJob"] A["run()"] A --> B["subscribe_celeryworker_to_pipeline_queues(antenna_celeryworker)"] B --> D["Produce check_ml_job_status task"] end subgraph Pipeline["class:Pipeline"] B --> C["schedule_process_images(...)"] C --> E["process_images(process_sync=False)"] E --> F["handle_async_process_images(...)"] F --> G1["Produce process_pipeline_request task #1"] F --> G2["Produce process_pipeline_request task #2"] F --> G3["Produce process_pipeline_request task #N"] end end subgraph Celery["CONTAINER:RABBITMQ"] G1 --> H(("ml-pipeline-{pipeline} queue")) G2 --> H G3 --> H D --> I(("antenna queue")) end subgraph Antenna["CONTAINER:ANTENNA_CELERYWORKER"] H --> K{Consume process_pipeline_request task} K --> L["process_images(process_sync=True)"] subgraph Pipeline2["class:Pipeline"] L --> M["handle_sync_process_images()"] M --> M1["POST request to the ML Backend's `/process` endpoint"] end I --> N1{Consume check_ml_job_status task} N1 --> Q["check_inprogress_subtasks()"] S --> N1 subgraph MLJob2["class:MLJob"] I --> |Retrieve save_results results| Q H --> |Retrieve process_pipeline_request results| Q Q -->|process_pipeline_request OR save_results tasks in progress| S S["Produce check_ml_job_status task"] Q -->|process_pipeline_request task finished| T T["Produce save_results"] end T --> J J{"Consume save_results task"} J --> J1 subgraph Pipeline3["class:Pipeline"] J1["Save results to db"] end end subgraph ExternalWorker["CONTAINER:EXTERNAL_CELERYWORKER"] H --> O{Consume process_pipeline_request task} O --> O1[Some external logic for processing images] end %% Shapes classDef queueStyle shape:circle,fill:#f0f4c3,stroke:#333,color:black classDef workerStyle shape:diamond,fill:#ffe0b2,stroke:#333,color:black class H,I queueStyle class K,O,N1,N2,J workerStyle %% Style: black subgraph titles style Django fill:#fff,stroke:#000,stroke-width:3px,color:#000 style Celery fill:#fff,stroke:#000,stroke-width:3px,color:#000 style Antenna fill:#fff,stroke:#000,stroke-width:3px,color:#000 style ExternalWorker fill:#fff,stroke:#000,stroke-width:3px,color:#000 %% Arrows (you had BLACK before but in all caps; lowercase is valid) linkStyle default stroke:black,stroke-width:2px %% Style task produced style D fill:#f9f,stroke:#333,stroke-width:2px,color:#000 style S fill:#f9f,stroke:#333,stroke-width:2px,color:#000 style T fill:#f9f,stroke:#333,stroke-width:2px,color:#000 style G1 fill:#f9f,stroke:#333,stroke-width:2px,color:#000 style G2 fill:#f9f,stroke:#333,stroke-width:2px,color:#000 style G3 fill:#f9f,stroke:#333,stroke-width:2px,color:#000How to Test the Changes
CI tests are moved to a separate docker compose stack.
So to run antenna, still use the main docker compose stack.
Run the new batch processing unit test:
Deployment Notes
Include instructions if this PR requires specific steps for its deployment (database migrations, config changes, etc.)
Checklist
The
test_run_ml_jobunit test correctly passes. It also correctly fails when an Exception is manually added to the/processendpoint and/orsave_resultstask logic; the failed task details are logged in the test and the test fails since ML Task Records show failed subtasks.Summary by CodeRabbit
New Features
Improvements
Tests