Skip to content

Conversation

@vanessavmac
Copy link
Collaborator

@vanessavmac vanessavmac commented Aug 4, 2025

Summary

Currently, we run MLJobs as a single async celery task which is not scalable to large batch image processing jobs. For example, when processing a large number of image (i.e. 100+ images), the singular long running task is easily interrupted due to a failed worker and also takes a long time to complete (since images still have to be processed in a sequential manner).

This PR uses Celery as the task queue and RabbitMQ as the message broker to send batches of images as individual process_pipeline_requests. Each queue is dedicated to a specific ML pipeline (named after the pipeline slug). A celery worker can pick up tasks based on the pipelines they subscribe to. For example, the antenna celery workers may subscribe to all queues; an external celeryworker would subscribe to select pipelines.

The ami.ml.tasks.check_ml_job_status task listens for completed process_pipeline_requests and enqueues save_results tasks; the check_ml_job_status reschedules itself when there are still in progress tasks.

Related PR

This PR focuses on the "controller" side of the batch image processing system. This PR should be ready to deploy (pending further testing in a Staging environment) and tasks will be processed by antenna workers.

A follow up PR addresses adding additional consumers via the processing service template: #1011

List of Changes

  • Add a process_pipeline_request task which takes a PipelineRequest and returns the model's results. The logic for this task is defined on the antenna side and can be added to external celeryworkers/processing services.

  • Introduces a new MLTaskRecord model which stores the results and stats of a celery task.

  • Update the Job model to include ml_task_records to track the celery tasks currently in-progress and completed (these can be either process_pipeline_request or save_results tasks)

  • Add ami.ml.tasks.check_ml_job_status which checks the subtasks of an MLJob, updates the job status, and schedules save_results tasks.

  • Add a celery beat task ami.ml.tasks.check_dangling_ml_jobs which revokes MLJobs that are STARTED but has had not been checked by check_ml_job_status recently.

  • All tests should now be run with the separate docker-compose.ci stack to ensure isolated testing.

Related Issues

Addresses in part #515

The planning of this feature was discussed in #515. See the comments beginning at #515 (comment)

Detailed Description

Note, this PR doesn't include external celery workers, it only updates the antenna celery worker.

%%{init: {"theme": "base", "themeVariables": { "background": "#ffffff" }}}%%
flowchart TD
    subgraph Django["CONTAINER:DJANGO"]
        subgraph MLJob["class:MLJob"]
            A["run()"]
            A --> B["subscribe_celeryworker_to_pipeline_queues(antenna_celeryworker)"]
            B --> D["Produce check_ml_job_status task"]
        end
        subgraph Pipeline["class:Pipeline"]
            B --> C["schedule_process_images(...)"]
            C --> E["process_images(process_sync=False)"]
            E --> F["handle_async_process_images(...)"]

            F --> G1["Produce process_pipeline_request task #1"]
            F --> G2["Produce process_pipeline_request task #2"]
            F --> G3["Produce process_pipeline_request task #N"]
        end
    end

    subgraph Celery["CONTAINER:RABBITMQ"]
        G1 --> H(("ml-pipeline-{pipeline} queue"))
        G2 --> H
        G3 --> H
        D --> I(("antenna queue"))
    end

    subgraph Antenna["CONTAINER:ANTENNA_CELERYWORKER"]
        H --> K{Consume process_pipeline_request task}
        K --> L["process_images(process_sync=True)"]
        
        subgraph Pipeline2["class:Pipeline"]
            L --> M["handle_sync_process_images()"]
            M --> M1["POST request to the ML Backend's `/process` endpoint"]
        end

        I --> N1{Consume check_ml_job_status task}
        N1 --> Q["check_inprogress_subtasks()"]
        S --> N1

        subgraph MLJob2["class:MLJob"]
            I --> |Retrieve save_results results| Q
            H --> |Retrieve process_pipeline_request results| Q
            Q -->|process_pipeline_request OR save_results tasks in progress| S
            S["Produce check_ml_job_status task"]
            Q -->|process_pipeline_request task finished| T
            T["Produce save_results"]
        end
        T --> J
        J{"Consume save_results task"}
        J --> J1
        subgraph Pipeline3["class:Pipeline"]
            J1["Save results to db"]
        end
    end

    subgraph ExternalWorker["CONTAINER:EXTERNAL_CELERYWORKER"]
        H --> O{Consume process_pipeline_request task}
        O --> O1[Some external logic for processing images]
    end


    %% Shapes
    classDef queueStyle shape:circle,fill:#f0f4c3,stroke:#333,color:black
    classDef workerStyle shape:diamond,fill:#ffe0b2,stroke:#333,color:black

    class H,I queueStyle
    class K,O,N1,N2,J workerStyle

    %% Style: black subgraph titles
    style Django fill:#fff,stroke:#000,stroke-width:3px,color:#000
    style Celery fill:#fff,stroke:#000,stroke-width:3px,color:#000
    style Antenna fill:#fff,stroke:#000,stroke-width:3px,color:#000
    style ExternalWorker fill:#fff,stroke:#000,stroke-width:3px,color:#000


    %% Arrows (you had BLACK before but in all caps; lowercase is valid)
    linkStyle default stroke:black,stroke-width:2px

    %% Style task produced
    style D fill:#f9f,stroke:#333,stroke-width:2px,color:#000
    style S fill:#f9f,stroke:#333,stroke-width:2px,color:#000
    style T fill:#f9f,stroke:#333,stroke-width:2px,color:#000
    style G1 fill:#f9f,stroke:#333,stroke-width:2px,color:#000
    style G2 fill:#f9f,stroke:#333,stroke-width:2px,color:#000
    style G3 fill:#f9f,stroke:#333,stroke-width:2px,color:#000

Loading

How to Test the Changes

CI tests are moved to a separate docker compose stack.

So to run antenna, still use the main docker compose stack.

docker compose up -d

Run the new batch processing unit test:

docker compose -f docker-compose.ci.yml up -d
docker compose -f docker-compose.ci.yml run --rm django python manage.py test -k test_run_ml_job --pdb

Deployment Notes

Include instructions if this PR requires specific steps for its deployment (database migrations, config changes, etc.)

Checklist

  • I have tested these changes appropriately.

The test_run_ml_job unit test correctly passes. It also correctly fails when an Exception is manually added to the /process endpoint and/or save_results task logic; the failed task details are logged in the test and the test fails since ML Task Records show failed subtasks.

  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • New Features

    • ML task tracking subsystem to record per-image subtasks and their statuses
    • Endpoint to trigger/check in-progress ML subtasks
    • Periodic scheduler to detect and revoke stale ML jobs
  • Improvements

    • Enhanced job progress calculation across processing and results stages
    • Shift to asynchronous pipeline task scheduling for better throughput
    • CI and local configs improved for dedicated task queue and worker discoverability
  • Tests

    • New integration tests covering ML batch processing and stale-job cleanup

vanessavmac and others added 30 commits March 23, 2025 11:17
@mihow mihow added the VISS-SSEC label Sep 9, 2025
@mihow mihow requested a review from Copilot October 6, 2025 22:39
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR enables asynchronous and distributed processing for ML backend workflows by implementing a task queue architecture using Celery and RabbitMQ. The system splits large image processing jobs into smaller batch tasks that can be processed by multiple workers, addressing scalability issues with long-running single tasks.

  • Replaces single monolithic ML processing tasks with distributed batch processing using Celery task queues
  • Adds support for external ML processing workers that can subscribe to pipeline-specific queues
  • Implements comprehensive job status tracking through MLTaskRecord model for monitoring subtask progress

Reviewed Changes

Copilot reviewed 42 out of 44 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
processing_services/*/celery_worker/ Adds Celery worker configuration and startup scripts for external ML processing services
processing_services/*/api/processing.py Extracts core processing logic into reusable functions for both sync and async execution
ami/ml/models/pipeline.py Implements async batch processing and task management functionality
ami/jobs/models.py Adds MLTaskRecord model and comprehensive job progress tracking for distributed tasks
ami/ml/signals.py Implements automatic worker subscription to pipeline-specific queues
docker-compose*.yml Configures RabbitMQ message broker and additional Celery workers

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.


logger = logging.getLogger(__name__)

ANTENNA_CELERY_WORKER_NAME = "antenna_celeryworker"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making this an overridable environment variable (i.e. antenna_mlworker) to customize which workers subscribe

if task.traceback:
# TODO: Error logs will have many tracebacks
# could add some processing to provide a concise error summary
job.logger.error(f"Subtask {task_name} ({task_id}) failed: {task.traceback}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logs the error, but then still tries to parse a successful result. Can you mark the task as failed and then continue to the next one? The status check did it's job correctly! But the subtask failed. Can you update the MLTaskRecord to say it failed?

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 4, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Introduces asynchronous ML subtask tracking and orchestration: new MLTaskRecord model, ML-specific job orchestration and Celery tasks (status checks, dangling-job cron), signal-driven dynamic worker queue subscriptions, RabbitMQ/Celery configuration changes, migrations, and integration tests.

Changes

Cohort / File(s) Summary
Environment Configuration
\.envs/.ci/.django` , \.envs/.ci/.postgres` , \.envs/.local/.django` , \.envs/.local/.postgres`
Added CELERY_BROKER_URL and RabbitMQ credentials; added DATABASE_URL entries for CI and local Postgres env files.
ML Task Tracking Model & Admin
ami/jobs/migrations/0019_mltaskrecord.py, ami/jobs/admin.py
New MLTaskRecord model (fields for task metadata, pydantic schema fields, results, counts, relations) and admin MLTaskRecordAdmin with kill_task action.
Job Migrations
ami/jobs/migrations/0020_*.py0023_*.py
Series of migrations: switch Job fields to PydanticSchemaField, remove subtask_id, relax task_id, add/alter status choices, add last_checked timestamp with defaults.
Job Models & Orchestration
ami/jobs/models.py, ami/jobs/tasks.py, ami/jobs/views.py
Added MLSubtaskNames/MLSubtaskState, MLTaskRecord model, MLJob orchestration methods (check_inprogress_subtasks, update_job_progress), Job.last_checked, skip global status update for ML jobs, and a viewset action to trigger in-progress checks.
ML Pipeline & Schemas
ami/ml/models/pipeline.py, ami/ml/schemas.py, ami/ml/tasks.py
Refactored process_images (new sync/async split), added process_pipeline_request task, added PipelineResultsResponse.combine_with, removed endpoint_url usage, added check_ml_job_status and check_dangling_ml_jobs Celery tasks.
Signals & AppConfig
ami/ml/signals.py, ami/ml/apps.py, ami/ml/models/__init__.py, ami/ml/migrations/0026_*.py
New signal module for worker discovery and dynamic queue subscribe/unsubscribe; AppConfig.ready imports signals; exported PipelineSaveResults; migration creates periodic task for dangling job checks.
Tests
ami/jobs/tests.py
Added TestMLJobBatchProcessing and TestStaleMLJob integration tests covering per-image subtasks, Celery task lifecycle, and stale-job revocation behaviors.
Settings & Deployment
config/settings/base.py, config/settings/local.py, compose/local/django/Dockerfile, compose/local/django/celery/worker/start
Use ami.ml.apps.MLConfig, added REDIS_URL and CELERY_TASK_DEFAULT_QUEUE, removed legacy worker script copy, updated worker startup to run migrations then start antenna-named worker on antenna queue.
Compose & Infra
docker-compose.yml, docker-compose.ci.yml, ami/main/models.py
Added RabbitMQ service and CI-specific rabbitmq; tightened service start/health dependencies; celery worker healthchecks and named queue; defer regroup_events via transaction.on_commit to avoid races.
Minor
ami/tests/fixtures/main.py
Minor formatting (blank line) change.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant DjangoAPI as Django API
    participant Scheduler as Celery Scheduler
    participant Worker as Celery Worker\n(antenna)
    participant Processing as Processing Service
    participant DB as Database

    User->>DjangoAPI: POST /jobs/ (MLJob)
    DjangoAPI->>DB: Create Job (status=PENDING)
    DjangoAPI->>Scheduler: Enqueue run_job
    Scheduler->>Worker: run_job(job_id)
    Worker->>DB: Create MLTaskRecord (status=STARTED)
    Worker->>Scheduler: Submit process_pipeline_request tasks (per image)
    par Async processing
        Worker->>Processing: process_pipeline_request(images)
        Processing-->>Worker: PipelineResultsResponse
        Worker->>DB: Update MLTaskRecord (SUCCESS/FAIL)
    end
    Scheduler->>Worker: check_ml_job_status(job_id)
    Worker->>DB: Query MLTaskRecord entries
    alt All subtasks complete
        Worker->>DB: Finalize Job (status=SUCCESS)
    else In-progress remain
        Worker->>Scheduler: Reschedule check_ml_job_status
    end
Loading
sequenceDiagram
    participant CeleryApp as Celery App
    participant Signal as ML Signals
    participant Inspector as Celery Inspector
    participant Worker as Antenna Worker
    participant Redis as Broker

    CeleryApp->>Signal: worker_ready
    Signal->>Inspector: inspect active workers
    Inspector-->>Signal: antenna_celeryworker@host
    Signal->>Redis: control.add_consumer(ml-pipeline-{slug}) for each Pipeline
    Note over Redis: Worker subscribed to pipeline queues
    Pipeline->>Signal: post_save (created)
    Signal->>Redis: add_consumer for new pipeline
    Pipeline->>Signal: post_delete
    Signal->>Redis: remove_consumer for pipeline
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Areas requiring special attention:

  • ami/ml/models/pipeline.py — signature change, sync/async split, call-site compatibility.
  • ami/jobs/models.py — new orchestration logic (check_inprogress_subtasks, update_job_progress) and progress aggregation.
  • ami/ml/signals.py — worker discovery and dynamic queue subscription reliability.
  • ami/jobs/migrations/0019–0023 — data migrations affecting status and relationships.
  • docker-compose.yml / docker-compose.ci.yml — RabbitMQ, worker naming, healthchecks, and startup sequencing.
  • ami/jobs/tests.py — integration tests that exercise complex async flows.

Poem

🐰
Hopped a queue and carried a task,
I nudged each subtask — no need to ask,
Records kept neat, checks tick and chime,
Workers hum in ordered time,
A carrot for code — async dreams at last!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 38.46% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Enable async and distributed processing for the ML backend' clearly and concisely summarizes the main change in the changeset, which is implementing async and distributed ML processing using Celery and RabbitMQ.
Description check ✅ Passed The PR description is comprehensive and well-structured, following most of the template sections including Summary, List of Changes, Related Issues, Detailed Description with a flowchart, How to Test, Deployment Notes, and Checklist.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch 515-new-async-distributed-ml-backend

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
docker-compose.yml (1)

101-125: Restore database readiness before running worker startup migrations.

celeryworker now runs python manage.py migrate before the worker boots, but its depends_on only lists RabbitMQ. Without waiting for Postgres to be up, the migration can fail with an OperationalError, the container exits, and django (which now waits on a healthy worker) never starts. Please add an explicit Postgres dependency or a wait-for-db gate before running migrations so the worker starts reliably.

Apply this diff to reintroduce the dependency:

-    depends_on:
-      - rabbitmq
+    depends_on:
+      postgres:
+        condition: service_started
+      rabbitmq:
+        condition: service_started
🧹 Nitpick comments (2)
.envs/.local/.postgres (1)

6-6: Consider using postgresql:// scheme instead of postgres://.

The postgres:// URI scheme is deprecated. Modern PostgreSQL clients and tools (including SQLAlchemy and psycopg2) recommend using postgresql:// instead.

Apply this diff:

-DATABASE_URL=postgres://xekSryPnqczJXkOnTAeDmDyIapSRrGEE:iMRQjJEGflj5xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami
+DATABASE_URL=postgresql://xekSryPnqczJXkOnTAeDmDyIapSRrGEE:iMRQjJEGflj5xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami
compose/local/django/celery/worker/start (1)

8-8: Consider running migrations separately from worker startup.

Running migrate in the worker startup script can cause issues if multiple workers start concurrently, even though Django has migration locking. It's better practice to run migrations as a separate initialization step before starting workers.

Consider moving the migration to a separate initialization container or running it explicitly before starting workers in your orchestration layer.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7ce3424 and bd86042.

📒 Files selected for processing (29)
  • .envs/.ci/.django (1 hunks)
  • .envs/.ci/.postgres (1 hunks)
  • .envs/.local/.django (1 hunks)
  • .envs/.local/.postgres (1 hunks)
  • ami/jobs/admin.py (2 hunks)
  • ami/jobs/migrations/0019_mltaskrecord.py (1 hunks)
  • ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py (1 hunks)
  • ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py (1 hunks)
  • ami/jobs/migrations/0022_job_last_checked.py (1 hunks)
  • ami/jobs/migrations/0023_alter_job_last_checked_alter_mltaskrecord_status.py (1 hunks)
  • ami/jobs/models.py (10 hunks)
  • ami/jobs/tasks.py (2 hunks)
  • ami/jobs/tests.py (2 hunks)
  • ami/jobs/views.py (2 hunks)
  • ami/main/models.py (1 hunks)
  • ami/ml/apps.py (1 hunks)
  • ami/ml/migrations/0026_check_dangling_ml_jobs_celery_beat.py (1 hunks)
  • ami/ml/models/__init__.py (1 hunks)
  • ami/ml/models/pipeline.py (7 hunks)
  • ami/ml/schemas.py (2 hunks)
  • ami/ml/signals.py (1 hunks)
  • ami/ml/tasks.py (4 hunks)
  • ami/tests/fixtures/main.py (1 hunks)
  • compose/local/django/Dockerfile (0 hunks)
  • compose/local/django/celery/worker/start (1 hunks)
  • config/settings/base.py (3 hunks)
  • config/settings/local.py (1 hunks)
  • docker-compose.ci.yml (5 hunks)
  • docker-compose.yml (6 hunks)
💤 Files with no reviewable changes (1)
  • compose/local/django/Dockerfile
🧰 Additional context used
🧬 Code graph analysis (16)
ami/jobs/tasks.py (1)
ami/jobs/models.py (4)
  • Job (1057-1355)
  • MLJob (328-786)
  • update_status (1238-1258)
  • save (1290-1301)
ami/ml/schemas.py (6)
ui/src/data-services/models/job.ts (1)
  • pipeline (109-111)
ui/src/data-services/models/pipeline.ts (1)
  • algorithms (29-31)
ui/src/data-services/models/job-details.ts (1)
  • errors (18-20)
ui/src/data-services/models/occurrence-details.ts (1)
  • detections (108-110)
processing_services/minimal/api/schemas.py (1)
  • PipelineResultsResponse (230-243)
processing_services/example/api/schemas.py (1)
  • PipelineResultsResponse (265-279)
ami/ml/tasks.py (2)
ami/ml/models/pipeline.py (3)
  • process_images (187-285)
  • process_images (1240-1252)
  • save (1276-1282)
ami/jobs/models.py (7)
  • Job (1057-1355)
  • MLJob (328-786)
  • logger (1340-1349)
  • check_inprogress_subtasks (314-318)
  • check_inprogress_subtasks (333-512)
  • check_inprogress_subtasks (1190-1197)
  • save (1290-1301)
ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py (2)
ami/jobs/migrations/0019_mltaskrecord.py (1)
  • Migration (9-72)
ami/jobs/models.py (3)
  • JobLogs (255-257)
  • default_job_progress (210-214)
  • JobProgress (113-207)
ami/jobs/migrations/0023_alter_job_last_checked_alter_mltaskrecord_status.py (1)
ami/utils/schemas.py (1)
  • choices (63-65)
ami/jobs/views.py (2)
ami/jobs/models.py (6)
  • Job (1057-1355)
  • JobState (32-68)
  • MLJob (328-786)
  • check_inprogress_subtasks (314-318)
  • check_inprogress_subtasks (333-512)
  • check_inprogress_subtasks (1190-1197)
ami/ml/tasks.py (1)
  • check_ml_job_status (111-149)
ami/main/models.py (2)
ami/main/admin.py (1)
  • regroup_events (180-183)
ami/tasks.py (1)
  • regroup_events (90-99)
ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py (1)
ami/jobs/migrations/0019_mltaskrecord.py (1)
  • Migration (9-72)
ami/jobs/migrations/0022_job_last_checked.py (2)
ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py (1)
  • Migration (8-28)
ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py (1)
  • Migration (6-30)
ami/jobs/models.py (5)
ami/ml/schemas.py (3)
  • PipelineRequest (174-178)
  • PipelineResultsResponse (181-231)
  • combine_with (198-231)
ami/ml/signals.py (2)
  • get_worker_name (15-28)
  • subscribe_celeryworker_to_pipeline_queues (32-67)
ami/ml/tasks.py (1)
  • check_ml_job_status (111-149)
ami/jobs/views.py (1)
  • check_inprogress_subtasks (161-176)
ami/ml/models/pipeline.py (6)
  • save_results (955-1063)
  • save_results (1268-1269)
  • save_results_async (1271-1274)
  • save (1276-1282)
  • process_pipeline_request (164-184)
  • schedule_process_images (1254-1266)
ami/ml/models/__init__.py (1)
ami/ml/models/pipeline.py (2)
  • Pipeline (1105-1282)
  • PipelineSaveResults (945-951)
ami/ml/models/pipeline.py (3)
ami/main/models.py (15)
  • name (987-988)
  • SourceImage (1623-1871)
  • Project (225-447)
  • config (1394-1402)
  • save (319-322)
  • save (882-897)
  • save (1119-1122)
  • save (1462-1469)
  • save (1561-1564)
  • save (1829-1832)
  • save (2110-2131)
  • save (2347-2353)
  • save (2526-2531)
  • save (2814-2831)
  • save (3451-3454)
ami/ml/schemas.py (5)
  • PipelineRequest (174-178)
  • PipelineResultsResponse (181-231)
  • SourceImageRequest (116-120)
  • PipelineRequestConfigParameters (155-171)
  • DetectionRequest (138-142)
ami/jobs/models.py (4)
  • logger (1340-1349)
  • Job (1057-1355)
  • MLTaskRecord (1005-1054)
  • save (1290-1301)
ami/jobs/admin.py (2)
ami/jobs/models.py (3)
  • Job (1057-1355)
  • MLTaskRecord (1005-1054)
  • kill_task (1045-1054)
ami/main/admin.py (2)
  • AdminBase (41-58)
  • queryset (469-472)
ami/ml/signals.py (1)
ami/ml/models/pipeline.py (1)
  • Pipeline (1105-1282)
ami/jobs/migrations/0019_mltaskrecord.py (2)
ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py (1)
  • Migration (8-28)
ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py (1)
  • Migration (6-30)
ami/jobs/tests.py (2)
ami/jobs/models.py (18)
  • Job (1057-1355)
  • JobState (32-68)
  • MLJob (328-786)
  • MLSubtaskNames (992-994)
  • MLSubtaskState (997-1002)
  • save (1290-1301)
  • logger (1340-1349)
  • run (321-325)
  • run (657-786)
  • run (794-832)
  • run (840-883)
  • run (895-919)
  • run (927-947)
  • run (955-956)
  • run (1199-1207)
  • update_job_progress (515-654)
  • MLTaskRecord (1005-1054)
  • kill_task (1045-1054)
ami/ml/tasks.py (2)
  • check_ml_job_status (111-149)
  • check_dangling_ml_jobs (153-183)
🪛 Ruff (0.14.3)
ami/ml/schemas.py

206-206: Consider [self, *others] instead of concatenation

Replace with [self, *others]

(RUF005)


210-210: Avoid specifying long messages outside the exception class

(TRY003)


214-214: Avoid specifying long messages outside the exception class

(TRY003)


218-218: Avoid specifying long messages outside the exception class

(TRY003)

ami/ml/tasks.py

137-137: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


137-137: Avoid specifying long messages outside the exception class

(TRY003)


138-138: Do not catch blind exception: Exception

(BLE001)


140-140: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


149-149: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


149-149: Create your own exception

(TRY002)

ami/jobs/migrations/0020_alter_job_logs_alter_job_progress.py

9-11: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


13-28: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

ami/jobs/migrations/0023_alter_job_last_checked_alter_mltaskrecord_status.py

8-10: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


12-33: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

ami/jobs/views.py

161-161: Unused method argument: request

(ARG002)


161-161: Unused method argument: pk

(ARG002)

ami/ml/apps.py

10-10: Unused noqa directive (non-enabled: F401)

Remove unused noqa directive

(RUF100)

ami/jobs/migrations/0021_remove_mltaskrecord_subtask_id_and_more.py

7-9: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


11-30: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

ami/jobs/migrations/0022_job_last_checked.py

7-9: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


11-17: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

ami/jobs/models.py

437-437: Create your own exception

(TRY002)


437-437: Avoid specifying long messages outside the exception class

(TRY003)


755-755: Do not catch blind exception: Exception

(BLE001)


756-756: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1043-1043: Avoid specifying long messages outside the exception class

(TRY003)

ami/ml/models/pipeline.py

334-334: Function definition does not bind loop variable prediction_request

(B023)


335-335: Function definition does not bind loop variable task_id

(B023)


377-377: Avoid specifying long messages outside the exception class

(TRY003)

ami/jobs/admin.py

76-76: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

ami/ml/signals.py

27-27: Do not catch blind exception: Exception

(BLE001)


32-32: Unused function argument: kwargs

(ARG001)


38-38: Use is and is not for type comparisons, or isinstance() for isinstance checks

(E721)


57-57: Avoid specifying long messages outside the exception class

(TRY003)


65-65: Redundant exception object included in logging.exception call

(TRY401)


71-71: Unused function argument: sender

(ARG001)


71-71: Unused function argument: kwargs

(ARG001)


88-88: Redundant exception object included in logging.exception call

(TRY401)


92-92: Unused function argument: sender

(ARG001)


92-92: Unused function argument: kwargs

(ARG001)


99-99: Abstract raise to an inner function

(TRY301)


99-99: Avoid specifying long messages outside the exception class

(TRY003)


104-104: Redundant exception object included in logging.exception call

(TRY401)

ami/jobs/migrations/0019_mltaskrecord.py

10-13: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


15-72: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

ami/jobs/tests.py

326-326: Do not catch blind exception: Exception

(BLE001)


327-327: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/migrations/0026_check_dangling_ml_jobs_celery_beat.py

5-5: Unused function argument: apps

(ARG001)


5-5: Unused function argument: schema_editor

(ARG001)


21-21: Unused function argument: apps

(ARG001)


21-21: Unused function argument: schema_editor

(ARG001)


27-29: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


31-33: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Redirect rules
  • GitHub Check: Header rules
  • GitHub Check: Pages changed
  • GitHub Check: test
🔇 Additional comments (7)
ami/main/models.py (1)

888-890: Excellent improvement for transaction safety with async tasks.

Wrapping the Celery task in transaction.on_commit() ensures the regroup_events task is only enqueued after the database transaction successfully commits. This prevents race conditions where the task could execute before the deployment data is persisted, or execute at all if the transaction rolls back.

This aligns with the broader pattern mentioned in the PR objectives of coordinating asynchronous pipeline steps with DB transactions to ensure consistency.

.envs/.ci/.django (1)

23-25: LGTM!

The Celery broker URL and RabbitMQ credentials are properly configured for the CI environment.

compose/local/django/celery/worker/start (1)

7-9: LGTM!

The Celery worker configuration correctly specifies the antenna queue and uses a descriptive worker name for discoverability.

ami/jobs/tasks.py (1)

51-56: LGTM!

The conditional logic correctly delegates status management to MLJob's custom implementation, which uses MLTaskRecord tracking and dedicated status update methods (check_inprogress_subtasks and update_job_progress).

ami/ml/apps.py (1)

9-10: LGTM!

The ready() method correctly imports signals for registration during Django app initialization, following standard Django patterns.

ami/ml/models/__init__.py (1)

2-2: LGTM!

The export of PipelineSaveResults is properly structured and aligns with the new dataclass introduced for ML pipeline result handling.

Also applies to: 10-10

config/settings/local.py (1)

91-92: LGTM!

Setting the default Celery queue to antenna aligns with the worker configuration and queue-based task routing introduced in this PR.

POSTGRES_DB=ami-ci
POSTGRES_USER=4JXkOnTAeDmDyIapSRrGEE
POSTGRES_PASSWORD=d4xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI
DATABASE_URL=postgres://xekSryPnqczJXkOnTAeDmDyIapSRrGEE:iMRQjJEGflj5xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: DATABASE_URL credentials and database name don't match the individual environment variables.

The DATABASE_URL contains different credentials and database name than the individual POSTGRES_* variables:

  • User mismatch: URL has xekSryPnqczJXkOnTAeDmDyIapSRrGEE but POSTGRES_USER (line 4) is 4JXkOnTAeDmDyIapSRrGEE
  • Password mismatch: URL password differs from POSTGRES_PASSWORD (line 5)
  • Database mismatch: URL has ami but POSTGRES_DB (line 3) is ami-ci

This will cause CI connection failures when code uses DATABASE_URL instead of individual variables.

Apply this diff to fix the mismatches and use the recommended postgresql:// scheme:

-DATABASE_URL=postgres://xekSryPnqczJXkOnTAeDmDyIapSRrGEE:iMRQjJEGflj5xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami
+DATABASE_URL=postgresql://4JXkOnTAeDmDyIapSRrGEE:d4xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami-ci
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
DATABASE_URL=postgres://xekSryPnqczJXkOnTAeDmDyIapSRrGEE:iMRQjJEGflj5xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami
DATABASE_URL=postgresql://4JXkOnTAeDmDyIapSRrGEE:d4xojpnJU3OzPQ0apSCLP1oHR1TYvyMzAlF5KpE9HFL6MPlnbDibwI@postgres:5432/ami-ci
🤖 Prompt for AI Agents
In .envs/.ci/.postgres around line 6, the DATABASE_URL contains mismatched user,
password, and database name compared to the POSTGRES_* variables and uses the
old scheme; update DATABASE_URL to use the postgresql:// scheme and exact values
from POSTGRES_USER, POSTGRES_PASSWORD, and POSTGRES_DB (so the username,
password, host, port and database name match the individual variables), ensuring
the URL credentials and database name are identical to lines 3–5.

Comment on lines +13 to +32
migrations.AlterField(
model_name="job",
name="last_checked",
field=models.DateTimeField(blank=True, default=datetime.datetime.now, null=True),
),
migrations.AlterField(
model_name="mltaskrecord",
name="status",
field=models.CharField(
choices=[
("PENDING", "PENDING"),
("STARTED", "STARTED"),
("SUCCESS", "SUCCESS"),
("FAIL", "FAIL"),
("REVOKED", "REVOKED"),
],
default="STARTED",
max_length=255,
),
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use a timezone-aware default for last_checked.

DateTimeField(default=datetime.datetime.now) returns naïve datetimes when USE_TZ=True, triggering warnings and risking incorrect conversions. Please switch to django.utils.timezone.now, which returns an aware datetime.

Suggested fix:

-import datetime
-from django.db import migrations, models
+import datetime
+from django.db import migrations, models
+from django.utils import timezone
@@
-            field=models.DateTimeField(blank=True, default=datetime.datetime.now, null=True),
+            field=models.DateTimeField(blank=True, default=timezone.now, null=True),

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In ami/jobs/migrations/0023_alter_job_last_checked_alter_mltaskrecord_status.py
around lines 13 to 32, the migrations.AlterField for Job.last_checked uses
datetime.datetime.now which produces naive datetimes under USE_TZ=True; replace
the default with django.utils.timezone.now to return timezone-aware datetimes,
and update the import at the top of the migration file to import timezone.now
(or import from django.utils import timezone and use timezone.now) so the
migration default is timezone-aware.

Comment on lines +167 to +176
assert job.job_type_key == MLJob.key, f"{job} is not an ML job."
has_inprogress_tasks = job.check_inprogress_subtasks()
if has_inprogress_tasks:
# Schedule task to update the job status
from django.db import transaction

from ami.ml.tasks import check_ml_job_status

transaction.on_commit(lambda: check_ml_job_status.apply_async((job.pk,)))
return Response({"inprogress_subtasks": has_inprogress_tasks})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix inverted rescheduling logic for ML subtasks.

Job.check_inprogress_subtasks() returns True when all subtasks have finished and False while work is still running (see the docstring in ami/jobs/models.py lines 1189-1196). Capturing that as has_inprogress_tasks flips the meaning, so we only reschedule the polling task once everything is done and we tell the client there are in-progress subtasks when there are not. Flip the condition and surface the correct boolean so the monitoring loop keeps running.

-        has_inprogress_tasks = job.check_inprogress_subtasks()
-        if has_inprogress_tasks:
+        subtasks_complete = job.check_inprogress_subtasks()
+        inprogress_subtasks = subtasks_complete is False
+        if inprogress_subtasks:
             # Schedule task to update the job status
             from django.db import transaction

             from ami.ml.tasks import check_ml_job_status

             transaction.on_commit(lambda: check_ml_job_status.apply_async((job.pk,)))
-        return Response({"inprogress_subtasks": has_inprogress_tasks})
+        return Response({"inprogress_subtasks": inprogress_subtasks})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
assert job.job_type_key == MLJob.key, f"{job} is not an ML job."
has_inprogress_tasks = job.check_inprogress_subtasks()
if has_inprogress_tasks:
# Schedule task to update the job status
from django.db import transaction
from ami.ml.tasks import check_ml_job_status
transaction.on_commit(lambda: check_ml_job_status.apply_async((job.pk,)))
return Response({"inprogress_subtasks": has_inprogress_tasks})
assert job.job_type_key == MLJob.key, f"{job} is not an ML job."
subtasks_complete = job.check_inprogress_subtasks()
inprogress_subtasks = subtasks_complete is False
if inprogress_subtasks:
# Schedule task to update the job status
from django.db import transaction
from ami.ml.tasks import check_ml_job_status
transaction.on_commit(lambda: check_ml_job_status.apply_async((job.pk,)))
return Response({"inprogress_subtasks": inprogress_subtasks})
🤖 Prompt for AI Agents
In ami/jobs/views.py around lines 167-176, the code treats
Job.check_inprogress_subtasks() as "has in-progress tasks" but that method
returns True when all subtasks have finished; this inverts logic and reschedules
only when work is done. Change the variable/logic to reflect the actual meaning
(e.g., all_subtasks_finished = job.check_inprogress_subtasks()), schedule
check_ml_job_status only when NOT all_subtasks_finished (i.e., while work is
still running), and return {"inprogress_subtasks": not all_subtasks_finished} so
the client receives the correct boolean.

Comment on lines +333 to +341
lambda: process_pipeline_request.apply_async(
args=[prediction_request.dict(), project_id],
task_id=task_id,
# TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)?
queue=f"ml-pipeline-{pipeline}",
# all pipelines have their own queue beginning with "ml-pipeline-"
# the antenna celeryworker should subscribe to all pipeline queues
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix late-binding bug when scheduling Celery subtasks.

transaction.on_commit runs after the loop finishes, so this lambda captures prediction_request and task_id by reference. Every callback will execute with the final batch’s payload and UUID, so earlier batches never reach their queue. Bind the per-iteration values when defining the callback (e.g., via default arguments or functools.partial).

Apply this diff to lock in the values:

-        transaction.on_commit(
-            lambda: process_pipeline_request.apply_async(
-                args=[prediction_request.dict(), project_id],
-                task_id=task_id,
-                # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)?
-                queue=f"ml-pipeline-{pipeline}",
-                # all pipelines have their own queue beginning with "ml-pipeline-"
-                # the antenna celeryworker should subscribe to all pipeline queues
-            )
-        )
+        transaction.on_commit(
+            lambda prediction_request=prediction_request, task_id=task_id: process_pipeline_request.apply_async(
+                args=[prediction_request.dict(), project_id],
+                task_id=task_id,
+                # TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)?
+                queue=f"ml-pipeline-{pipeline}",
+                # all pipelines have their own queue beginning with "ml-pipeline-"
+                # the antenna celeryworker should subscribe to all pipeline queues
+            )
+        )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
lambda: process_pipeline_request.apply_async(
args=[prediction_request.dict(), project_id],
task_id=task_id,
# TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)?
queue=f"ml-pipeline-{pipeline}",
# all pipelines have their own queue beginning with "ml-pipeline-"
# the antenna celeryworker should subscribe to all pipeline queues
)
)
lambda prediction_request=prediction_request, task_id=task_id: process_pipeline_request.apply_async(
args=[prediction_request.dict(), project_id],
task_id=task_id,
# TODO: make ml-pipeline an environment variable (i.e. PIPELINE_QUEUE_PREFIX)?
queue=f"ml-pipeline-{pipeline}",
# all pipelines have their own queue beginning with "ml-pipeline-"
# the antenna celeryworker should subscribe to all pipeline queues
)
)
🧰 Tools
🪛 Ruff (0.14.3)

334-334: Function definition does not bind loop variable prediction_request

(B023)


335-335: Function definition does not bind loop variable task_id

(B023)

🤖 Prompt for AI Agents
In ami/ml/models/pipeline.py around lines 333 to 341, the lambda passed to
transaction.on_commit closes over loop variables causing a late-binding bug;
capture per-iteration values before registering the callback (e.g.,
local_prediction = prediction_request.dict(), local_task_id = task_id,
local_pipeline = pipeline) and use them as default args in the lambda or use
functools.partial so each callback calls process_pipeline_request.apply_async
with the bound args/task_id/queue (queue=f"ml-pipeline-{local_pipeline}")
instead of referencing the loop variables.

Comment on lines +21 to +28
inspector = celery_app.control.inspect()
active_workers = inspector.active()
if active_workers: # TODO: currently only works if there is one worker
# NOTE: all antenna celery workers should have ANTENNA_CELERY_WORKER_NAME
# in their name instead of the the default "celery"
return next((worker for worker in active_workers.keys() if ANTENNA_CELERY_WORKER_NAME in worker), None)
except Exception as e:
logger.warning(f"Could not find antenna celery worker name: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

get_worker_name fails whenever the worker is idle

inspect().active() returns {} when there are no active tasks, so we return None and downstream assertions fail. As a result, pipeline_created never subscribes the worker to new queues during normal idle periods. Please discover workers via inspect().stats()/ping() (or the sender object when available) so we always obtain the hostname.

-        inspector = celery_app.control.inspect()
-        active_workers = inspector.active()
-        if active_workers:  # TODO: currently only works if there is one worker
-            # NOTE: all antenna celery workers should have ANTENNA_CELERY_WORKER_NAME
-            # in their name instead of the the default "celery"
-            return next((worker for worker in active_workers.keys() if ANTENNA_CELERY_WORKER_NAME in worker), None)
+        inspector = celery_app.control.inspect()
+        worker_sets = [
+            inspector.active() or {},
+            inspector.registered() or {},
+            inspector.stats() or {},
+        ]
+        for workers in worker_sets:
+            for worker in workers:
+                if ANTENNA_CELERY_WORKER_NAME in worker:
+                    return worker
🧰 Tools
🪛 Ruff (0.14.3)

27-27: Do not catch blind exception: Exception

(BLE001)

Comment on lines +54 to +58
if not pipelines:
# TODO: kinda hacky. is there a way to unify the django and celery logs
# to more easily see which queues the worker is subscribed to?
raise ValueError("No pipelines found; cannot subscribe to any queues.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t raise when there are no pipelines yet

Fresh environments legitimately start with zero pipelines. Raising ValueError during worker_ready aborts the worker before it can process jobs, leaving the whole ML queue offline. Please log and return early instead of throwing.

-    if not pipelines:
-        # TODO: kinda hacky. is there a way to unify the django and celery logs
-        # to more easily see which queues the worker is subscribed to?
-        raise ValueError("No pipelines found; cannot subscribe to any queues.")
+    if not pipelines:
+        logger.info("No pipelines found; worker subscription is skipped until a pipeline is created.")
+        return True
🧰 Tools
🪛 Ruff (0.14.3)

57-57: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In ami/ml/signals.py around lines 54 to 58, the current worker_ready handler
raises ValueError when pipelines is empty which crashes the worker; instead, log
an informative message and return early so the worker stays up. Replace the
raise with a call to the module/Django logger (e.g. logger.info or
logger.warning) that includes context ("no pipelines configured; worker will not
subscribe to ML queues yet") and then return from the function; do not re-raise
or exit the process.

Comment on lines +29 to 33
project = pipeline.projects.first()
assert project, f"Pipeline {pipeline} must be associated with a project."

results = process_images(
pipeline=pipeline,
endpoint_url=endpoint_url,
images=images,
job_id=job_id,
)
results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project.pk)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Use the job’s project instead of pipeline.projects.first()

pipeline.projects.first() can return the wrong project (or None) when a pipeline is linked to multiple projects or was created without a ManyToMany assignment yet. That means we’ll pass the wrong project_id into process_images, so downstream service selection and config overrides run against the wrong project, breaking request routing for multi-project pipelines. Please derive the project from the ML job when job_id is present, and only fall back to an explicit argument or raise if it’s missing.

-    pipeline = Pipeline.objects.get(slug=pipeline_choice)
-    project = pipeline.projects.first()
-    assert project, f"Pipeline {pipeline} must be associated with a project."
-
-    results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project.pk)
+    pipeline = Pipeline.objects.get(slug=pipeline_choice)
+    project_id: int | None = None
+    if job and job.project_id:
+        project_id = job.project_id
+    else:
+        project = pipeline.projects.first()
+        if project:
+            project_id = project.pk
+    assert project_id is not None, (
+        f"Cannot determine project for pipeline {pipeline}; the job must supply a project."
+    )
+
+    results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project_id)

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In ami/ml/tasks.py around lines 29 to 33, avoid using pipeline.projects.first()
because it can return the wrong project or None; instead when job_id is provided
load the ML job (e.g., MLJob.objects.get(pk=job_id)) and derive the project from
job.project (or job.project_id) and pass that project's pk into process_images;
if no job_id use an explicit project argument if provided, otherwise raise an
informative error so we never silently use pipeline.projects.first().

Comment on lines 28 to +66
<<: *django
image: ami_ci_celeryworker
command: /start-celeryworker
depends_on:
- rabbitmq
# start the worker with antenna_celeryworker to ensure it's discoverable by ami.ml.signals.get_worker_name
command:
- sh
- -c
- |
python manage.py migrate &&
python -m celery -A config.celery_app worker --queues=antenna -n antenna_celeryworker@%h -l INFO
healthcheck:
# make sure DATABASE_URL is inside the ./.envs/.ci/.postgres
test: ["CMD-SHELL", "celery -A config.celery_app inspect ping -d antenna_celeryworker@$(hostname) | grep -q pong"]
interval: 10s
timeout: 50s
retries: 5
start_period: 10s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Restore the inherited service dependencies for the Celery worker

By overriding depends_on with a bare list we drop the Postgres/Redis/MinIO dependencies that *django provided. The worker now runs python manage.py migrate without waiting for Postgres, so CI routinely fails with connection errors. Please keep the original dependency map and add RabbitMQ alongside it.

   celeryworker:
     <<: *django
-    depends_on:
-      - rabbitmq
+    depends_on:
+      postgres:
+        condition: service_started
+      redis:
+        condition: service_started
+      minio-init:
+        condition: service_started
+      rabbitmq:
+        condition: service_started
+      ml_backend:
+        condition: service_started
+      celeryworker:
+        condition: service_healthy
     # start the worker with antenna_celeryworker to ensure it's discoverable by ami.ml.signals.get_worker_name

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In docker-compose.ci.yml around lines 50 to 66, the depends_on for the Celery
worker was replaced with a bare list dropping the *django inherited dependencies
(Postgres/Redis/MinIO); restore the original dependency map by using the YAML
merge to include the *django depends_on and then append rabbitmq (e.g. use a
depends_on mapping that merges <<: *django and adds rabbitmq to the list) so the
worker waits for DB/Redis/MinIO plus RabbitMQ before running migrations.

# TODO: Error logs will have many tracebacks
# could add some processing to provide a concise error summary
job.logger.error(f"Subtask {task_name} ({task_id}) failed: {task.traceback}")
inprogress_subtask.status = MLSubtaskState.FAIL.name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this get saved?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, with continue we move on to checking the next subtask. And only every 10 subtasks do we do a bulk update to the tasks.

antenna/ami/jobs/models.py

Lines 441 to 452 in bd86042

if len(inprogress_subtasks_to_update) >= 10:
MLTaskRecord.objects.bulk_update(
inprogress_subtasks_to_update,
[
"status",
"raw_traceback",
"raw_results",
"num_captures",
"num_detections",
"num_classifications",
],
)

job.save()

# Remove remaining tasks from the queue
for ml_task_record in job.ml_task_records.all():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have a Cancel action for the parent Job. Can you move or add this logic there?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
ami/ml/tasks.py (1)

29-32: Critical: Resolve the pipeline.projects.first() issue from the previous review.

Using pipeline.projects.first() can return the wrong project (or None) when a pipeline is linked to multiple projects, breaking request routing and config overrides. The previous review comment at lines 29-33 recommended deriving the project from job.project when job_id is present. This issue remains unresolved.

Please apply the suggested fix from the previous review:

     images = SourceImage.objects.filter(pk__in=image_ids)
     pipeline = Pipeline.objects.get(slug=pipeline_choice)
-    project = pipeline.projects.first()
-    assert project, f"Pipeline {pipeline} must be associated with a project."
+    project_id: int | None = None
+    if job and job.project_id:
+        project_id = job.project_id
+    else:
+        project = pipeline.projects.first()
+        if project:
+            project_id = project.pk
+    assert project_id is not None, (
+        f"Cannot determine project for pipeline {pipeline}; the job must supply a project."
+    )
 
-    results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project.pk)
+    results = process_images(pipeline=pipeline, images=images, job_id=job_id, project_id=project_id)
🧹 Nitpick comments (3)
ami/jobs/tests.py (2)

326-328: Use logging.exception for better error context.

When logging within an exception handler, logging.exception automatically includes the stack trace, making debugging easier.

Apply this diff:

                except Exception as e:
-                   logger.error(f"Error fetching details for task {task_id}: {e}")
+                   logger.exception(f"Error fetching details for task {task_id}: {e}")
                    details[task_id] = {"error": str(e)}

499-499: Simplify the state check.

Use a direct equality comparison instead of checking membership in a single-element list.

Apply this diff:

-        self.assertIn(async_result.state, ["REVOKED"])
+        self.assertEqual(async_result.state, "REVOKED")
ami/ml/tasks.py (1)

158-158: Remove redundant import.

datetime is already imported at the top of the file (line 1), so this local import is unnecessary.

Apply this diff:

-    import datetime
-
     from ami.jobs.models import Job, JobState, MLJob
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bd86042 and b552d1e.

📒 Files selected for processing (2)
  • ami/jobs/tests.py (2 hunks)
  • ami/ml/tasks.py (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/jobs/tests.py (2)
ami/jobs/models.py (18)
  • Job (1057-1355)
  • JobState (32-68)
  • MLJob (328-786)
  • MLSubtaskNames (992-994)
  • MLSubtaskState (997-1002)
  • save (1290-1301)
  • logger (1340-1349)
  • run (321-325)
  • run (657-786)
  • run (794-832)
  • run (840-883)
  • run (895-919)
  • run (927-947)
  • run (955-956)
  • run (1199-1207)
  • update_job_progress (515-654)
  • MLTaskRecord (1005-1054)
  • kill_task (1045-1054)
ami/ml/tasks.py (2)
  • check_ml_job_status (111-149)
  • check_dangling_ml_jobs (153-188)
ami/ml/tasks.py (3)
ami/ml/models/pipeline.py (3)
  • process_images (187-285)
  • process_images (1240-1252)
  • save (1276-1282)
ami/jobs/models.py (9)
  • Job (1057-1355)
  • MLJob (328-786)
  • logger (1340-1349)
  • check_inprogress_subtasks (314-318)
  • check_inprogress_subtasks (333-512)
  • check_inprogress_subtasks (1190-1197)
  • save (1290-1301)
  • update_status (1238-1258)
  • kill_task (1045-1054)
ami/jobs/views.py (1)
  • check_inprogress_subtasks (161-176)
🪛 Ruff (0.14.3)
ami/jobs/tests.py

326-326: Do not catch blind exception: Exception

(BLE001)


327-327: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/tasks.py

137-137: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


137-137: Avoid specifying long messages outside the exception class

(TRY003)


138-138: Do not catch blind exception: Exception

(BLE001)


140-140: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


149-149: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


149-149: Create your own exception

(TRY002)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (1)
ami/jobs/tests.py (1)

354-356: Verify that directly calling check_ml_job_status within the test loop is intentional.

Invoking check_ml_job_status(job.pk) and MLJob.update_job_progress(job) synchronously within the test polling loop bypasses the normal async callback flow. While this is acceptable for integration testing to force status updates, ensure it doesn't mask timing or concurrency issues that could occur in production.

Comment on lines +136 to +149
except Job.DoesNotExist:
raise ValueError(f"Job with ID {ml_job_id} does not exist.")
except Exception as e:
error_msg = f"Error checking status for job with ID {ml_job_id}: {e}"
job.logger.error(error_msg)
job.update_status(JobState.FAILURE)
job.finished_at = datetime.datetime.now()
job.save()

# Remove remaining tasks from the queue
for ml_task_record in job.ml_task_records.all():
ml_task_record.kill_task()

raise Exception(error_msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Improve exception handling and logging.

The error handling has several issues:

  1. Line 137: Use exception chaining (raise ... from e) to preserve the original context.
  2. Line 140: Use logging.exception instead of logging.error to include the stack trace automatically.
  3. Line 149: Use exception chaining when re-raising.

Apply this diff:

     except Job.DoesNotExist:
-        raise ValueError(f"Job with ID {ml_job_id} does not exist.")
+        raise ValueError(f"Job with ID {ml_job_id} does not exist.") from None
     except Exception as e:
         error_msg = f"Error checking status for job with ID {ml_job_id}: {e}"
-        job.logger.error(error_msg)
+        job.logger.exception(error_msg)
         job.update_status(JobState.FAILURE)
         job.finished_at = datetime.datetime.now()
         job.save()
 
         # Remove remaining tasks from the queue
         for ml_task_record in job.ml_task_records.all():
             ml_task_record.kill_task()
 
-        raise Exception(error_msg)
+        raise Exception(error_msg) from e
🧰 Tools
🪛 Ruff (0.14.3)

137-137: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


137-137: Avoid specifying long messages outside the exception class

(TRY003)


138-138: Do not catch blind exception: Exception

(BLE001)


140-140: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


149-149: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


149-149: Create your own exception

(TRY002)

🤖 Prompt for AI Agents
In ami/ml/tasks.py around lines 136 to 149, the exception handling should
preserve original exception context and include stack traces: replace the
job.logger.error call with job.logger.exception(error_msg) so the stack trace is
logged, change the initial generic except Exception as e to re-raise chained
exceptions where appropriate (use "raise ValueError(... ) from e" for the
Job.DoesNotExist handling if re-raising a different exception) and when
re-raising the final Exception at the end use "raise Exception(error_msg) from
e" to maintain exception chaining; keep the job status/update/task cleanup logic
but ensure any re-raises use "from e" and use logging.exception for full stack
traces.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants