Skip to content

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Oct 8, 2025

Summary

Initial version of the Processing service V2.

Current state

The V2 path is working but disabled in this PR pending the compatibility work. When enabled, starting a job will create a queue for that job and populate with one task per image. The tasks can be pulled and ACKed via the APIs below.

List of Changes

  • Add pipeline and id_only parameters to the /jobs API. This will allow workers to query for jobs for pipelines they can process. E.g. localhost:8000/api/v2/jobs?pipeline=11&ids_only=1
  • Add API for pulling tasks from a job queue: localhost:8000/api/v2/jobs/11/tasks?batch=4
  • Add API for posting results: localhost:8000/api/v2/jobs/11/result/
  • Added NATS JetStream to the docker compose. I also tried RabbitMQ and Beanstalkd, but they don't support the visibility timeout semantics we want or a disconnected mode of pulling and ACKing tasks.
  • Added debug targets for attaching to either the Django or celeryworker containers

TODOs:

  • A mechanism by which jobs can run using either v1 (push) or v2 (pull) processing services
  • Proper cleanup of queues after jobs complete
  • Unit tests for the new code paths

Related Issues

See issues #970 and #971.

How to Test the Changes

This path can be enabled by turning on the job.project.feature_flags.async_pipeline_workers feature flag, see ami/jobs/models.py:400:

        if job.project.feature_flags.async_pipeline_workers:
            cls.queue_images_to_nats(job, images)
        else:
            cls.process_images(job, images)

And running the ami worker from RolnickLab/ami-data-companion#94

Test

image

Test both modes by tweaking the flag in the django admin console:
image

Deployment Notes

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • New Features

    • Async ML pipeline: queued task processing with NATS, per-job progress tracking, endpoints to fetch tasks and submit results, CLI to run single-image jobs, and helper to submit jobs.
  • Documentation

    • NATS dashboard in Quick Start, new ML pipeline object-model diagram, and management command README.
  • Chores

    • Expanded ignore lists, local env and debug presets, docker-compose NATS service, NATS dependency and default setting, updated debug launch configs.
  • Bug Fixes

    • Safer handling of non-dict request payloads.

@netlify
Copy link

netlify bot commented Oct 8, 2025

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 3032709
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/690a7b69e06ad4000750044a

@mihow
Copy link
Collaborator

mihow commented Oct 8, 2025

Exciting!

@carlosgjs carlosgjs marked this pull request as ready for review October 24, 2025 18:49
Copilot AI review requested due to automatic review settings October 24, 2025 18:49
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces Processing Service V2, enabling a pull-based task queue architecture using NATS JetStream instead of the push-based Celery approach. Workers can now pull tasks via HTTP endpoints, process them independently, and acknowledge completion without maintaining persistent connections.

Key changes:

  • Added NATS JetStream integration for distributed task queuing with configurable visibility timeouts
  • Introduced new REST API endpoints for task pulling (/jobs/{id}/tasks) and result submission (/jobs/{id}/result)
  • Implemented Redis-based progress tracking to handle asynchronous worker updates

Reviewed Changes

Copilot reviewed 15 out of 16 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
requirements/base.txt Added nats-py dependency for NATS client support
object_model_diagram.md Added comprehensive Mermaid diagram documenting ML pipeline system architecture
docker-compose.yml Added NATS JetStream service with health checks and monitoring
config/settings/base.py Added NATS_URL configuration setting
ami/utils/nats_queue.py New TaskQueueManager class for NATS JetStream operations
ami/jobs/views.py Added task pulling and result submission endpoints with pipeline filtering
ami/jobs/utils.py Helper function for running async code in sync Django context
ami/jobs/tasks.py New Celery task for processing pipeline results asynchronously
ami/jobs/task_state.py TaskStateManager for Redis-based job progress tracking
ami/jobs/models.py Added queue_images_to_nats method and NATS cleanup logic
ami/base/views.py Fixed request.data handling when not a dict
README.md Added NATS dashboard documentation link
.vscode/launch.json Added debug configurations for Django and Celery containers
.envs/.local/.django Added NATS_URL environment variable
.dockerignore Expanded with comprehensive ignore patterns
Comments suppressed due to low confidence (1)

object_model_diagram.md:1

  • The comment at line 13 appears to be template text from instructions rather than actual documentation content. This namedtuple field description doesn't match the file's purpose as an object model diagram.
# Object Model Diagram: ML Pipeline System

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@carlosgjs carlosgjs requested a review from mihow October 24, 2025 18:59
@coderabbitai
Copy link

coderabbitai bot commented Oct 31, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds NATS JetStream-based async orchestration for ML pipelines (TaskQueueManager, JetStream publish/reserve/ack, cleanup), Redis-backed TaskStateManager, run-in-async-loop helper, job queuing/cleanup helpers, Celery result processing task, MLJob routing gated by a new feature flag, API endpoints/filters, local/settings NATS config, tooling/docs, and ignore/edit config updates.

Changes

Cohort / File(s) Summary
Orchestration core (new modules)
ami/ml/orchestration/nats_queue.py, ami/ml/orchestration/task_state.py, ami/ml/orchestration/utils.py, ami/ml/orchestration/jobs.py
Add TaskQueueManager (JetStream connection, stream/consumer ensure, publish/reserve/ack, cleanup), TaskStateManager (Redis per-job stage tracking, locks, progress), run_in_async_loop helper, and helpers to queue images and cleanup NATS resources.
Job processing: models & tasks
ami/jobs/models.py, ami/jobs/tasks.py
MLJob: add feature-flag routing to queue images to NATS or call new process_images; extract processing into process_images. Add Celery task process_pipeline_result plus helpers (log_time, _update_job_progress) to persist results, update progress, and ACK via NATS.
API, filters & views
ami/jobs/views.py, ami/utils/requests.py
Add JobFilterSet, OpenAPI params (ids_only, incomplete_only, batch) and query behaviors (ids_only, incomplete_only), endpoints to reserve tasks (GET /jobs/{id}/tasks) and queue result processing (POST /jobs/{id}/result). Note: OpenAPI params duplicated in file.
Feature flags & models
ami/main/models.py
Add boolean async_pipeline_workers to ProjectFeatureFlags.
Helpers & CLI
ami/jobs/utils.py, ami/jobs/management/commands/process_single_image.py, ami/jobs/management/commands/README.md
Add submit_single_image_job helper, new process_single_image management command with wait/polling and accompanying README.
Orchestration package init
ami/ml/orchestration/__init__.py
Remove unconditional re-export from .processing (commented note about circular import).
Base view safety
ami/base/views.py
Guard access to request.data by coercing non-dict payloads to an empty dict before reading project_id.
Dev, env & settings
\.envs/.local/.django, config/settings/base.py, docker-compose.yml, requirements/base.txt
Add NATS_URL in local env and settings default; add nats service to docker-compose with JetStream/monitoring and healthcheck, add nats dependency and debug port for django; add nats-py==2.10.0 to requirements.
Repository tooling & docs
\.dockerignore, \.gitignore, .vscode/launch.json, README.md, object_model_diagram.md
Expand .dockerignore to include venvs, caches, build artifacts, IDE/OS cruft, Docker files; add huggingface_cache/ to .gitignore; add debug attach configs in .vscode/launch.json; document NATS dashboard in README; add object-model mermaid diagram.
Misc / small edits
ami/utils/requests.py, ami/ml/orchestration/__init__.py, other docs
Minor refactors, comments, duplicated OpenAPI block noted, and small formatting/documentation additions.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant API as Jobs API
    participant MLJob
    participant QueueMgr as TaskQueueManager
    participant NATS
    participant State as TaskStateManager
    participant Redis
    participant Celery
    participant Worker as NATS Worker

    Note over MLJob,QueueMgr: Job submission / queuing (async flag on)
    API->>MLJob: create/submit job
    MLJob->>State: initialize_job(image_ids, stages)
    State->>Redis: store pending IDs & total

    MLJob->>QueueMgr: publish_task(job_id, image_msg) x N
    QueueMgr->>NATS: publish messages
    NATS-->>Worker: deliver task(s)

    alt Worker processes and ACKs
        Worker->>NATS: acknowledge_task(reply_subject)
        Worker->>Celery: send pipeline result
        Celery->>State: update_state(processed_ids, stage, request_id)
        State->>Redis: update pending -> compute progress
    else synchronous path (flag disabled)
        MLJob->>MLJob: process_images() (sync batches, DB updates)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Pay extra attention to:
    • ami/ml/orchestration/nats_queue.py — JetStream resource lifecycle, subject/consumer naming, time-to-run and ack semantics.
    • ami/ml/orchestration/task_state.py — Redis key schema, locking correctness, TTL and concurrent updates.
    • ami/jobs/tasks.py & ami/jobs/models.py — Celery task retry/time limits, atomic progress updates, feature-flag routing and failure paths when queuing fails.
    • ami/utils/requests.py — duplicated OpenAPI parameter block to deduplicate.
    • ami/ml/orchestration/utils.py — new event-loop creation and ensuring no ORM objects are used across async boundaries.

Poem

🐇
I packed the carrots onto JetStream trails,
Redis counts each hop in tiny scales.
Workers nibble, ACKs float on the breeze,
Celery dances through background trees,
A rabbit queues and watches pipelines sail.

Pre-merge checks and finishing touches

❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Linked Issues check ❓ Inconclusive The PR description references related issues (#970, #971) and links them appropriately. However, the referenced issues are not explicitly closed by this PR, and the TODOs (proper queue cleanup, unit tests, v1/v2 job routing) suggest this is an incomplete feature pending follow-up work. Verify whether issues #970 and #971 should be closed or left open; clarify the relationship between this PR and future work needed for full feature completion.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: Processing service V2' clearly and concisely describes the main feature being added—a new version of the processing service—which is the primary change across all modified files.
Description check ✅ Passed The PR description follows the template with all major sections completed: Summary, List of Changes, Related Issues, Detailed Description (How to Test), and Deployment Notes. The Checklist is partially filled. Minor gaps: 'How to Test' lacks specific automated test references, and 'Deployment Notes' is empty, but overall substance is comprehensive.
Out of Scope Changes check ✅ Passed The PR introduces significant changes to configuration files (.dockerignore, .gitignore, docker-compose.yml), debugging setup (.vscode/launch.json), and adds multiple new modules for NATS queueing, which are all within scope of adding a Processing service V2. However, some changes like adding huggingface_cache/ to .gitignore appear tangential.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ef8f16c and d254867.

📒 Files selected for processing (19)
  • .dockerignore (1 hunks)
  • .envs/.local/.django (1 hunks)
  • .gitignore (1 hunks)
  • .vscode/launch.json (1 hunks)
  • README.md (1 hunks)
  • ami/base/views.py (1 hunks)
  • ami/jobs/models.py (8 hunks)
  • ami/jobs/tasks.py (2 hunks)
  • ami/jobs/views.py (3 hunks)
  • ami/main/models.py (1 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • ami/ml/orchestration/nats_queue.py (1 hunks)
  • ami/ml/orchestration/task_state.py (1 hunks)
  • ami/ml/orchestration/utils.py (1 hunks)
  • ami/utils/requests.py (2 hunks)
  • config/settings/base.py (2 hunks)
  • docker-compose.yml (4 hunks)
  • object_model_diagram.md (1 hunks)
  • requirements/base.txt (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
ami/ml/orchestration/nats_queue.py (1)
ami/jobs/views.py (1)
  • result (256-339)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
  • cleanup (20-23)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
  • process_pipeline_result (45-138)
ami/jobs/models.py (4)
  • Job (727-1012)
  • JobState (27-63)
  • logger (997-1006)
  • final_states (58-59)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • reserve_task (152-208)
ami/jobs/tasks.py (5)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • acknowledge_task (210-229)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-97)
  • mark_images_processed (48-61)
  • get_progress (63-90)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
ami/jobs/models.py (5)
  • Job (727-1012)
  • JobState (27-63)
  • logger (997-1006)
  • update_stage (168-188)
  • save (947-958)
ami/ml/models/pipeline.py (3)
  • save (1115-1121)
  • save_results (809-917)
  • save_results (1107-1108)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (2)
  • Job (727-1012)
  • logger (997-1006)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (28-294)
  • cleanup_job_resources (278-294)
  • publish_task (119-150)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-97)
  • cleanup (92-97)
  • initialize_job (38-46)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
ami/ml/orchestration/utils.py (1)
ami/jobs/models.py (1)
  • logger (997-1006)
ami/base/views.py (1)
ami/main/api/views.py (1)
  • get (1595-1651)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (28-107)
ami/main/models.py (1)
  • SourceImage (1622-1870)
ami/ml/models/pipeline.py (2)
  • process_images (163-278)
  • process_images (1091-1105)
🪛 LanguageTool
object_model_diagram.md

[grammar] ~167-~167: Ensure spelling is correct
Context: ...ts 4. Job tracks progress through JobProgress and JobProgressStageDetail

(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)

🪛 markdownlint-cli2 (0.18.1)
object_model_diagram.md

15-15: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


31-31: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


38-38: Bare URL used

(MD034, no-bare-urls)


39-39: Bare URL used

(MD034, no-bare-urls)


40-40: Bare URL used

(MD034, no-bare-urls)


41-41: Bare URL used

(MD034, no-bare-urls)


42-42: Bare URL used

(MD034, no-bare-urls)


42-42: Bare URL used

(MD034, no-bare-urls)


43-43: Bare URL used

(MD034, no-bare-urls)


61-61: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


77-77: Bare URL used

(MD034, no-bare-urls)


97-97: Bare URL used

(MD034, no-bare-urls)


118-118: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


122-122: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


126-126: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


130-130: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)

🪛 Ruff (0.14.2)
ami/ml/orchestration/nats_queue.py

70-70: Unused method argument: ttr

(ARG002)


73-73: Avoid specifying long messages outside the exception class

(TRY003)


81-81: Do not catch blind exception: Exception

(BLE001)


94-94: Avoid specifying long messages outside the exception class

(TRY003)


103-103: Do not catch blind exception: Exception

(BLE001)


132-132: Avoid specifying long messages outside the exception class

(TRY003)


146-146: Consider moving this statement to an else block

(TRY300)


148-148: Do not catch blind exception: Exception

(BLE001)


149-149: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


164-164: Avoid specifying long messages outside the exception class

(TRY003)


206-206: Do not catch blind exception: Exception

(BLE001)


207-207: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


221-221: Avoid specifying long messages outside the exception class

(TRY003)


226-226: Consider moving this statement to an else block

(TRY300)


227-227: Do not catch blind exception: Exception

(BLE001)


228-228: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


242-242: Avoid specifying long messages outside the exception class

(TRY003)


250-250: Consider moving this statement to an else block

(TRY300)


251-251: Do not catch blind exception: Exception

(BLE001)


252-252: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


266-266: Avoid specifying long messages outside the exception class

(TRY003)


273-273: Consider moving this statement to an else block

(TRY300)


274-274: Do not catch blind exception: Exception

(BLE001)


275-275: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/task_state.py

35-35: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


36-36: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

ami/jobs/views.py

33-43: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


73-73: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


218-218: Unused method argument: pk

(ARG002)


236-236: Avoid specifying long messages outside the exception class

(TRY003)


244-244: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


283-283: Avoid specifying long messages outside the exception class

(TRY003)


288-288: Avoid specifying long messages outside the exception class

(TRY003)


298-298: Avoid specifying long messages outside the exception class

(TRY003)


301-301: Avoid specifying long messages outside the exception class

(TRY003)


322-322: Do not catch blind exception: Exception

(BLE001)


323-323: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/tasks.py

45-45: Unused function argument: self

(ARG001)


120-120: Do not catch blind exception: Exception

(BLE001)


121-121: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


133-133: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


136-136: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

config/settings/base.py

28-28: Unused noqa directive (non-enabled: E231, E501)

Remove unused noqa directive

(RUF100)

ami/ml/orchestration/jobs.py

69-69: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


78-78: Do not catch blind exception: Exception

(BLE001)

ami/ml/orchestration/utils.py

14-14: Do not catch blind exception: Exception

(BLE001)


15-15: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/models.py

75-75: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


430-430: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


482-482: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

Comment on lines +91 to +118
async def _ensure_consumer(self, job_id: str, ttr: int = 30):
"""Ensure consumer exists for the given job."""
if self.js is None:
raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.")

stream_name = self._get_stream_name(job_id)
consumer_name = self._get_consumer_name(job_id)
subject = self._get_subject(job_id)

try:
info = await self.js.consumer_info(stream_name, consumer_name)
logger.debug(f"Consumer {consumer_name} already exists: {info}")
except Exception:
# Consumer doesn't exist, create it
await self.js.add_consumer(
stream=stream_name,
config=ConsumerConfig(
durable_name=consumer_name,
ack_policy=AckPolicy.EXPLICIT,
ack_wait=ttr, # Visibility timeout (TTR)
max_deliver=5, # Max retry attempts
deliver_policy=DeliverPolicy.ALL,
max_ack_pending=100, # Max unacked messages
filter_subject=subject,
),
)
logger.info(f"Created consumer {consumer_name}")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix TTR mismatch when the consumer already exists

If a worker calls reserve_task() before anything is published, _ensure_consumer() creates the durable with the default ttr=30. Later, when publish_task() tries to use a longer ttr, the existing consumer is left untouched—JetStream does not let us mutate ack_wait on an existing durable—so the queue keeps timing out at 30 s. That results in premature redeliveries for any job that genuinely needs a longer visibility timeout.

Please make _ensure_consumer() recreate the durable whenever the currently configured ack_wait differs from the requested ttr (and keep passing the desired ttr from both call sites). One way to do it:

@@
-        try:
-            info = await self.js.consumer_info(stream_name, consumer_name)
-            logger.debug(f"Consumer {consumer_name} already exists: {info}")
-        except Exception:
-            # Consumer doesn't exist, create it
-            await self.js.add_consumer(
-                stream=stream_name,
-                config=ConsumerConfig(
-                    durable_name=consumer_name,
-                    ack_policy=AckPolicy.EXPLICIT,
-                    ack_wait=ttr,  # Visibility timeout (TTR)
-                    max_deliver=5,  # Max retry attempts
-                    deliver_policy=DeliverPolicy.ALL,
-                    max_ack_pending=100,  # Max unacked messages
-                    filter_subject=subject,
-                ),
-            )
-            logger.info(f"Created consumer {consumer_name}")
+        info = None
+        try:
+            info = await self.js.consumer_info(stream_name, consumer_name)
+            ack_wait_seconds = int(info.config.ack_wait.total_seconds())
+            if ack_wait_seconds != ttr:
+                await self.js.delete_consumer(stream_name, consumer_name)
+                info = None
+        except Exception:
+            info = None
+
+        if info is None:
+            await self.js.add_consumer(
+                stream=stream_name,
+                config=ConsumerConfig(
+                    durable_name=consumer_name,
+                    ack_policy=AckPolicy.EXPLICIT,
+                    ack_wait=ttr,  # Visibility timeout (TTR)
+                    max_deliver=5,
+                    deliver_policy=DeliverPolicy.ALL,
+                    max_ack_pending=100,
+                    filter_subject=subject,
+                ),
+            )
+            logger.info(f"Created consumer {consumer_name} with ack_wait={ttr}")
+        else:
+            logger.debug(
+                f"Consumer {consumer_name} already exists with ack_wait={ack_wait_seconds}"
+            )

This keeps the visibility timeout aligned with what the publisher expects and prevents unintended fast requeues. Let me know if you need help wiring the desired ttr through the call sites.

Also applies to: 170-173

🧰 Tools
🪛 Ruff (0.14.2)

94-94: Avoid specifying long messages outside the exception class

(TRY003)


103-103: Do not catch blind exception: Exception

(BLE001)

Comment on lines +8 to +18
def run_in_async_loop(func: typing.Callable, error_msg: str) -> typing.Any:
# helper to use new_event_loop() to ensure we're not mixing with Django's async context
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(func())
except Exception as e:
logger.error(f"Error in async loop - {error_msg}: {e}")
return None
finally:
loop.close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Restore the previous event loop after running the coroutine.
Creating a brand-new loop and leaving it registered after loop.close() means the current thread keeps a closed loop attached; any later asyncio.get_event_loop() calls will hand back that closed instance, causing RuntimeError: Event loop is closed. Please capture the prior loop, set the new loop only for the duration of the run, and restore the original (or clear it) before closing. That keeps the thread’s loop policy healthy for the rest of the process. (docs.python.org)

 def run_in_async_loop(func: typing.Callable, error_msg: str) -> typing.Any:
-    # helper to use new_event_loop() to ensure we're not mixing with Django's async context
-    loop = asyncio.new_event_loop()
-    asyncio.set_event_loop(loop)
+    # helper to use new_event_loop() to ensure we're not mixing with Django's async context
+    loop = asyncio.new_event_loop()
+    try:
+        previous_loop = asyncio.get_event_loop()
+    except RuntimeError:
+        previous_loop = None
+    asyncio.set_event_loop(loop)
     try:
         return loop.run_until_complete(func())
     except Exception as e:
-        logger.error(f"Error in async loop - {error_msg}: {e}")
+        logger.exception("Error in async loop - %s", error_msg)
         return None
     finally:
+        if previous_loop is not None:
+            asyncio.set_event_loop(previous_loop)
+        else:
+            asyncio.set_event_loop(None)
         loop.close()
🧰 Tools
🪛 Ruff (0.14.2)

14-14: Do not catch blind exception: Exception

(BLE001)


15-15: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
In ami/ml/orchestration/utils.py around lines 8 to 18, the function creates a
new asyncio event loop and sets it as the current loop but does not restore the
previous loop before closing, which can leave a closed loop registered and later
cause RuntimeError; fix this by saving the current loop
(asyncio.get_event_loop() or None if there is no running loop), set the new loop
only for the duration of run_until_complete, and in the finally block restore
the original loop (or call asyncio.set_event_loop(None) if there was no prior
loop) before calling loop.close(), ensuring the thread’s loop reference is not
left pointing at a closed loop.

Comment on lines +27 to +29
EXTERNAL_BASE_URL = env(
"EXTERNAL_BASE_URL", default=f"http://{EXTERNAL_HOSTNAME}" # noqa: E231, E501 # type: ignore[no-untyped-call]
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Drop the unused # noqa.

Ruff raises RUF100 here because # noqa: E231, E501 is suppressing nothing, so the current code fails lint. Remove the directive and lay out the call cleanly instead.

-EXTERNAL_BASE_URL = env(
-    "EXTERNAL_BASE_URL", default=f"http://{EXTERNAL_HOSTNAME}"  # noqa: E231, E501 # type: ignore[no-untyped-call]
-)
+EXTERNAL_BASE_URL = env(
+    "EXTERNAL_BASE_URL",
+    default=f"http://{EXTERNAL_HOSTNAME}",
+)  # type: ignore[no-untyped-call]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
EXTERNAL_BASE_URL = env(
"EXTERNAL_BASE_URL", default=f"http://{EXTERNAL_HOSTNAME}" # noqa: E231, E501 # type: ignore[no-untyped-call]
)
EXTERNAL_BASE_URL = env(
"EXTERNAL_BASE_URL",
default=f"http://{EXTERNAL_HOSTNAME}",
) # type: ignore[no-untyped-call]
🧰 Tools
🪛 Ruff (0.14.2)

28-28: Unused noqa directive (non-enabled: E231, E501)

Remove unused noqa directive

(RUF100)

🤖 Prompt for AI Agents
In config/settings/base.py around lines 27 to 29, remove the unused "# noqa:
E231, E501" comment and reformat the env call to avoid line-length/lint issues;
keep the existing "# type: ignore[no-untyped-call]" if needed, place the
arguments on separate lines (or wrap the call) so the default
f"http://{EXTERNAL_HOSTNAME}" is clear and the line length is within limits, and
ensure there are no trailing noqa directives left.

carlosgjs and others added 2 commits October 31, 2025 14:33
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
ami/jobs/models.py (1)

400-403: CRITICAL: Handle failures from queue_images_to_nats.

The return value from queue_images_to_nats is ignored. According to the function definition in ami/ml/orchestration/jobs.py, it returns False when image queueing fails. Without checking this return value, failed jobs will remain stuck in STARTED state indefinitely with no work enqueued and no visible error for operators.

This issue was previously flagged but remains unaddressed.

         if job.project.feature_flags.async_pipeline_workers:
-            queue_images_to_nats(job, images)
+            success = queue_images_to_nats(job, images)
+            if not success:
+                job.logger.error("Failed to queue images to NATS for job %s", job.pk)
+                job.progress.update_stage("collect", status=JobState.FAILURE)
+                job.update_status(JobState.FAILURE, save=False)
+                job.finished_at = datetime.datetime.now()
+                job.save()
+                return
         else:
             cls.process_images(job, images)
🧹 Nitpick comments (1)
ami/ml/orchestration/__init__.py (1)

6-6: Remove unnecessary noqa directives from commented code.

The noqa directives serve no purpose on a commented-out line and add confusion.

Apply this diff:

-# from .processing import *  # noqa: F401, F403
+# from .processing import *
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d254867 and 1cc890e.

📒 Files selected for processing (2)
  • ami/jobs/models.py (7 hunks)
  • ami/ml/orchestration/__init__.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (28-107)
ami/main/models.py (1)
  • SourceImage (1622-1870)
ami/ml/models/pipeline.py (2)
  • process_images (163-278)
  • process_images (1091-1105)
🪛 Ruff (0.14.2)
ami/jobs/models.py

75-75: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


430-430: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


482-482: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

ami/ml/orchestration/__init__.py

6-6: Unused noqa directive (unused: F403; non-enabled: F401)

Remove unused noqa directive

(RUF100)

🔇 Additional comments (4)
ami/ml/orchestration/__init__.py (1)

1-6: Removal of re-export is safe and correctly resolves the circular dependency.

The codebase has only one import of processing-related code (from ami.ml.orchestration.processing import process_single_source_image in ami/main/models.py:1533), which directly accesses the .processing submodule and is unaffected by removing the re-export from __init__.py. No code imports from ami.ml.orchestration to access processing symbols. The re-export created a circular dependency chain (ami.jobs.models → ami.jobs.tasks → ami.ml.orchestration → ami.ml.orchestration.processing → ami.jobs.models), and removing it correctly breaks this cycle.

ami/jobs/models.py (3)

366-366: Good type hint improvement.

The explicit list[SourceImage] type annotation improves code clarity and type safety.


405-410: Good refactoring to separate processing logic.

Extracting the image processing workflow into a dedicated process_images classmethod improves code organization and makes it easier to maintain and test the V1 (push-based) processing path independently from the V2 (pull-based) path.


325-326: The import placement is correct and cannot be moved to module level.

This function-level import is necessary to avoid a circular dependency. ami.ml.orchestration.jobs imports from ami.jobs.models at the module level, so moving queue_images_to_nats to the top-level imports in ami.jobs.models would create a circular import cycle. The current placement inside the function is the appropriate solution.

Likely an incorrect or invalid review comment.

if stage.key == stage_key:
return stage
raise ValueError(f"Job stage with key '{stage_key}' not found in progress")
raise ValueError(f"Job stage with key '{stage_key}' not found in progress") # noqa E713
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unnecessary noqa directives.

The # noqa E713 comments are unnecessary. The E713 rule (test for membership should be not in) applies to code logic, not to text within string literals. Since "not found in" appears in error messages here, not as actual membership tests, these suppressions serve no purpose.

-        raise ValueError(f"Job stage with key '{stage_key}' not found in progress")  # noqa E713
+        raise ValueError(f"Job stage with key '{stage_key}' not found in progress")
-        raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")  # noqa E713
+        raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")

Also applies to: 143-143

🤖 Prompt for AI Agents
In ami/jobs/models.py around lines 136 and 143, the inline comments "# noqa
E713" are unnecessary because the E713 rule refers to membership tests and the
text "not found in" appears only inside string literals in the error messages;
remove both "# noqa E713" occurrences so the linter isn't being silenced
unnecessarily and rerun linters/tests to confirm no violations remain.

project_id=job.project.pk,
)
job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")
job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s") # noqa E231
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unnecessary noqa directives.

Both # noqa E231 comments are unnecessary. The E231 rule doesn't apply to format specifiers like :.2f or :.0%.

-                job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")  # noqa E231
+                job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")
-            job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")  # noqa E231
+            job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")

Also applies to: 482-482

🧰 Tools
🪛 Ruff (0.14.2)

430-430: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

🤖 Prompt for AI Agents
In ami/jobs/models.py around lines 430 and 482, there are unnecessary "# noqa
E231" comments on lines that contain format specifiers (e.g., f"Processed image
batch {i+1} in {time.time() - request_sent:.2f}s"); remove those "# noqa E231"
directives from both locations so the code is cleaner and linter directives are
only used when truly needed.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1cc890e and 84ee5a2.

📒 Files selected for processing (1)
  • ami/jobs/models.py (7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/jobs/models.py (2)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (28-107)
ami/ml/models/pipeline.py (3)
  • save (1115-1121)
  • process_images (163-278)
  • process_images (1091-1105)
🪛 Ruff (0.14.2)
ami/jobs/models.py

401-401: Expected an indented block after if statement

(invalid-syntax)

🔇 Additional comments (1)
ami/jobs/models.py (1)

413-528: Well-structured extraction of image processing logic.

The new process_images classmethod cleanly extracts the synchronous image processing workflow from the run method. The implementation includes proper error handling, progress tracking, batch-level logging, and save-task management. The existing TODO comment on line 500-501 correctly notes that sub_task.wait() is not recommended, but this is acceptable as a known temporary limitation.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
ami/jobs/tasks.py (1)

80-143: Don’t mark progress until after durable save + ACK

We remove image IDs from Redis and advance the “process” stage before save_results runs and well before we confirm the JetStream ACK. If save_results throws or the ACK fails, the task retries (or JetStream redelivers), yet Redis already shows the images done and the job stage is SUCCESS. At that point the worker cannot recover accurate progress, and the retry will double-write results. Move both update_state(..., "process") and the corresponding _update_job_progress call to after a successful save_results and ACK, and make ACK failure fatal so Celery retries before we mutate Redis. Perform the same gating for the “results” stage so we never remove IDs unless the message is fully settled. request_verification

-    state_manager = TaskStateManager(job_id)
-
-    progress_info = state_manager.update_state(processed_image_ids, stage="process", request_id=self.request.id)
-    if not progress_info:
-        ...
-    try:
-        _update_job_progress(job_id, "process", progress_info.percentage)
-        ...
-        if pipeline_result:
-            job.pipeline.save_results(...)
-        ...
-        ack_success = run_in_async_loop(...)
-        if ack_success:
-            ...
-        # Update job stage with calculated progress
-        progress_info = state_manager.update_state(processed_image_ids, stage="results", request_id=self.request.id)
-        if not progress_info:
-            ...
-        _update_job_progress(job_id, "results", progress_info.percentage)
+    state_manager = TaskStateManager(job_id)
+
+    try:
+        job = Job.objects.get(pk=job_id)
+        ...
+        if pipeline_result:
+            job.pipeline.save_results(results=pipeline_result, job_id=job.pk)
+            ...
+        ack_success = run_in_async_loop(...)
+        if not ack_success:
+            raise RuntimeError(f"Failed to acknowledge task via NATS: {reply_subject}")
+
+        progress_info = state_manager.update_state(
+            processed_image_ids, stage="process", request_id=self.request.id
+        )
+        if not progress_info:
+            raise self.retry(countdown=5, max_retries=10)
+        _update_job_progress(job_id, "process", progress_info.percentage)
+
+        progress_info = state_manager.update_state(
+            processed_image_ids, stage="results", request_id=self.request.id
+        )
+        if not progress_info:
+            raise self.retry(countdown=5, max_retries=10)
+        _update_job_progress(job_id, "results", progress_info.percentage)
🧹 Nitpick comments (4)
ami/jobs/utils.py (1)

36-48: Use logging.exception in exception handlers for better diagnostics.

In exception handlers, prefer logging.exception over logging.error to automatically include the stack trace, which aids debugging.

Based on learnings

Apply this diff:

     try:
         image = SourceImage.objects.select_related("deployment__project").get(pk=image_id)
     except SourceImage.DoesNotExist:
-        logger.error(f"SourceImage with id {image_id} does not exist")
+        logger.exception(f"SourceImage with id {image_id} does not exist")
         raise

     # Fetch the pipeline and validate it exists
     try:
         pipeline = Pipeline.objects.get(pk=pipeline_id)
     except Pipeline.DoesNotExist:
-        logger.error(f"Pipeline with id {pipeline_id} does not exist")
+        logger.exception(f"Pipeline with id {pipeline_id} does not exist")
         raise
ami/jobs/management/commands/process_single_image.py (3)

51-65: Consider removing duplicate validation logic.

The validation of SourceImage and Pipeline here duplicates the validation already performed inside submit_single_image_job (lines 36-48 in ami/jobs/utils.py). While this provides earlier feedback to the user, it creates maintenance overhead if validation logic changes.

If you prefer immediate user feedback, keep this validation. Otherwise, rely on the exceptions raised by submit_single_image_job and handle them in the submission try-except block (lines 71-78).


71-78: Replace blind Exception catch with specific exceptions.

Catching Exception is too broad and may mask unexpected errors. Catch only the specific exceptions that submit_single_image_job can raise.

Apply this diff:

         try:
             job = submit_single_image_job(
                 image_id=image_id,
                 pipeline_id=pipeline_id,
                 job_name=job_name,
             )
-        except Exception as e:
-            raise CommandError(f"Failed to submit job: {str(e)}")
+        except (SourceImage.DoesNotExist, Pipeline.DoesNotExist, Project.DoesNotExist) as e:
+            raise CommandError(f"Failed to submit job: {str(e)}") from e

Note: If you keep the validation at lines 51-65, SourceImage.DoesNotExist and Pipeline.DoesNotExist won't be raised here, so you can simplify to just Project.DoesNotExist.


161-161: Remove unused noqa directive.

The noqa: E221 directive is unnecessary as E221 is not an enabled rule in your Ruff configuration.

Apply this diff:

                 for error in job.progress.errors[-5:]:  # Last 5 errors
-                    self.stdout.write(f"  - {error}")  # noqa: E221
+                    self.stdout.write(f"  - {error}")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 84ee5a2 and 4480b0d.

📒 Files selected for processing (7)
  • ami/jobs/management/commands/README.md (1 hunks)
  • ami/jobs/management/commands/process_single_image.py (1 hunks)
  • ami/jobs/tasks.py (3 hunks)
  • ami/jobs/utils.py (1 hunks)
  • ami/ml/orchestration/__init__.py (1 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • ami/ml/orchestration/task_state.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
ami/jobs/management/commands/process_single_image.py (2)
ami/jobs/utils.py (1)
  • submit_single_image_job (12-78)
ami/main/models.py (1)
  • Classification (2208-2352)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
  • cleanup (20-23)
ami/jobs/utils.py (1)
ami/jobs/models.py (2)
  • Job (734-1019)
  • enqueue (816-833)
ami/jobs/tasks.py (6)
ami/jobs/views.py (1)
  • result (256-339)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • acknowledge_task (210-229)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-126)
  • update_state (55-84)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/models/pipeline.py (3)
  • save_results (809-917)
  • save_results (1107-1108)
  • save (1115-1121)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (5)
  • Job (734-1019)
  • JobState (27-63)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/orchestration/nats_queue.py (2)
  • cleanup_job_resources (278-294)
  • publish_task (119-150)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-126)
  • cleanup (120-126)
  • initialize_job (38-50)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
🪛 Ruff (0.14.3)
ami/jobs/management/commands/process_single_image.py

44-44: Unused method argument: args

(ARG002)


58-58: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


58-58: Avoid specifying long messages outside the exception class

(TRY003)


65-65: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


65-65: Avoid specifying long messages outside the exception class

(TRY003)


77-77: Do not catch blind exception: Exception

(BLE001)


78-78: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


78-78: Avoid specifying long messages outside the exception class

(TRY003)


78-78: Use explicit conversion flag

Replace with conversion flag

(RUF010)


161-161: Unused noqa directive (non-enabled: E221)

Remove unused noqa directive

(RUF100)

ami/ml/orchestration/task_state.py

35-35: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


36-36: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


53-53: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

ami/jobs/utils.py

40-40: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


47-47: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/__init__.py

5-5: Unused noqa directive (unused: F403; non-enabled: F401)

Remove unused noqa directive

(RUF100)

ami/jobs/tasks.py

128-128: Do not catch blind exception: Exception

(BLE001)


129-129: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


140-140: Abstract raise to an inner function

(TRY301)


144-144: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


147-147: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/jobs.py

69-69: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


78-78: Do not catch blind exception: Exception

(BLE001)

🔇 Additional comments (4)
ami/jobs/utils.py (1)

56-78: Well-structured job creation and enqueuing flow.

The implementation correctly generates a default job name, creates the Job with appropriate associations (project, pipeline, image), and enqueues it via Celery. Logging at key points aids observability.

ami/jobs/management/commands/process_single_image.py (3)

18-42: Well-designed CLI interface.

The argument definitions are clear and provide good defaults. The --wait flag with configurable --poll-interval offers flexible UX for both scripted and interactive use.


100-131: Excellent job polling and progress display implementation.

The polling loop efficiently tracks status changes, provides live progress updates using \r for in-place updates, and gracefully handles KeyboardInterrupt. This creates a polished user experience.


137-165: Comprehensive result reporting enhances debuggability.

The result display logic provides actionable feedback for success, failure, and unexpected statuses, including detection/classification counts and error summaries. This is particularly valuable for testing and debugging workflows.

Comment on lines +50 to +54
# Infer project from image if not provided
if project_id is None:
project = image.deployment.project
else:
project = Project.objects.get(pk=project_id)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle potential Project.DoesNotExist exception.

When project_id is explicitly provided, Project.objects.get(pk=project_id) can raise Project.DoesNotExist, but this exception is neither caught nor documented in the function's Raises section.

Wrap the fetch in a try-except block and document the exception:

     # Infer project from image if not provided
     if project_id is None:
         project = image.deployment.project
     else:
-        project = Project.objects.get(pk=project_id)
+        try:
+            project = Project.objects.get(pk=project_id)
+        except Project.DoesNotExist:
+            logger.exception(f"Project with id {project_id} does not exist")
+            raise

And update the docstring:

     Raises:
         SourceImage.DoesNotExist: If the image doesn't exist
         Pipeline.DoesNotExist: If the pipeline doesn't exist
+        Project.DoesNotExist: If the provided project_id doesn't exist
🤖 Prompt for AI Agents
In ami/jobs/utils.py around lines 50 to 54, the code calls
Project.objects.get(pk=project_id) without handling Project.DoesNotExist; wrap
that call in a try/except that catches Project.DoesNotExist and re-raises it
with clearer context (e.g., include project_id and a brief message) using "raise
Project.DoesNotExist(... ) from e" so stacktrace is preserved, and update the
function docstring's Raises section to list Project.DoesNotExist with the new
behavior.

Comment on lines +38 to +126
def initialize_job(self, image_ids: list[str], stages: list[str]) -> None:
"""
Initialize job tracking with a list of image IDs to process.
Args:
image_ids: List of image IDs that need to be processed
stages: List of stages to track for each image
"""
self.stages = stages
for stage in stages:
cache.set(self._get_pending_key(stage), image_ids, timeout=self.TIMEOUT)

cache.set(self._total_key, len(image_ids), timeout=self.TIMEOUT)

def _get_pending_key(self, stage: str) -> str:
return f"{self._pending_key}:{stage}" # noqa E231

def update_state(
self,
processed_image_ids: set[str],
stage: str,
request_id: str,
) -> None | TaskProgress:
"""
Update the task state with newly processed images.
Args:
processed_image_ids: Set of image IDs that have just been processed
"""
# Create a unique lock key for this job
lock_key = f"job:{self.job_id}:process_results_lock"
lock_timeout = 360 # 6 minutes (matches task time_limit)
lock_acquired = cache.add(lock_key, request_id, timeout=lock_timeout)
if not lock_acquired:
return None

try:
# Update progress tracking in Redis
progress_info = self._get_progress(processed_image_ids, stage)
return progress_info
finally:
# Always release the lock when done
current_lock_value = cache.get(lock_key)
# Only delete if we still own the lock (prevents race condition)
if current_lock_value == request_id:
cache.delete(lock_key)
logger.debug(f"Released lock for job {self.job_id}, task {request_id}")

def _get_progress(self, processed_image_ids: set[str], stage: str) -> TaskProgress | None:
"""
Get current progress information for the job.
Returns:
TaskProgress namedtuple with fields:
- remaining: Number of images still pending (or None if not tracked)
- total: Total number of images (or None if not tracked)
- processed: Number of images processed (or None if not tracked)
- percentage: Progress as float 0.0-1.0 (or None if not tracked)
"""
pending_images = cache.get(self._get_pending_key(stage))
total_images = cache.get(self._total_key)
if pending_images is None or total_images is None:
return None
remaining_images = [img_id for img_id in pending_images if img_id not in processed_image_ids]
assert len(pending_images) >= len(remaining_images)
cache.set(self._get_pending_key(stage), remaining_images, timeout=self.TIMEOUT)

remaining = len(remaining_images)
processed = total_images - remaining
percentage = float(processed) / total_images if total_images > 0 else 1.0
logger.info(
f"Pending images from Redis for job {self.job_id} {stage}: "
f"{remaining}/{total_images}: {percentage*100}%"
)

return TaskProgress(
remaining=remaining,
total=total_images,
processed=processed,
percentage=percentage,
)

def cleanup(self) -> None:
"""
Delete all Redis keys associated with this job.
"""
for stage in self.stages:
cache.delete(self._get_pending_key(stage))
cache.delete(self._total_key)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Persist the tracked stages so cleanup() works in new processes

cleanup() assumes self.stages is populated on this instance, but initialize_job() only sets it in-memory. As soon as another worker or process instantiates TaskStateManager(job_id) (the typical pattern for cleanup after a job finishes), self.stages is missing and cleanup() raises AttributeError, leaving Redis keys behind. Persist the stage list alongside the other job keys so any instance can retrieve it before cleaning up. request_verification

 class TaskStateManager:
@@
-        self._pending_key = f"job:{job_id}:pending_images"  # noqa E231
-        self._total_key = f"job:{job_id}:pending_images_total"  # noqa E231
+        self._pending_key = f"job:{job_id}:pending_images"
+        self._total_key = f"job:{job_id}:pending_images_total"
+        self._stages_key = f"job:{job_id}:pending_images_stages"
+        self.stages: list[str] = cache.get(self._stages_key, [])
@@
-        self.stages = stages
+        self.stages = stages
+        cache.set(self._stages_key, stages, timeout=self.TIMEOUT)
@@
     def cleanup(self) -> None:
@@
-        for stage in self.stages:
+        for stage in self.stages:
             cache.delete(self._get_pending_key(stage))
         cache.delete(self._total_key)
+        cache.delete(self._stages_key)

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.14.3)

53-53: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
ami/jobs/models.py (1)

412-413: Consider adding type hints and a docstring.

The new process_images method would benefit from explicit type hints for its parameters and a docstring describing its purpose, especially since the codebase is adding type annotations (see line 366).

Example:

@classmethod
def process_images(cls, job: "Job", images: list[SourceImage]) -> None:
    """
    Process images through the ML pipeline in batches.
    
    Args:
        job: The Job instance to update with progress
        images: List of SourceImage instances to process
    """
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4480b0d and 3032709.

📒 Files selected for processing (1)
  • ami/jobs/models.py (7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (28-107)
ami/main/models.py (12)
  • SourceImage (1622-1870)
  • save (320-323)
  • save (883-896)
  • save (1118-1121)
  • save (1461-1468)
  • save (1560-1563)
  • save (1828-1831)
  • save (2109-2130)
  • save (2346-2352)
  • save (2525-2530)
  • save (2813-2830)
  • save (3450-3453)
ami/ml/models/pipeline.py (3)
  • save (1115-1121)
  • process_images (163-278)
  • process_images (1091-1105)
🪛 Ruff (0.14.3)
ami/jobs/models.py

75-75: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


437-437: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


489-489: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

🔇 Additional comments (3)
ami/jobs/models.py (3)

325-326: Good use of local import to avoid circular dependencies.

Importing queue_images_to_nats within the method scope is appropriate here, especially if it helps avoid circular import issues between the jobs and orchestration modules.


366-366: Good addition of type annotation.

The explicit type annotation list[SourceImage] improves code clarity and enables better IDE support.


400-410: LGTM: Async routing with proper failure handling.

The feature-flag-driven routing logic correctly handles the failure case when queue_images_to_nats returns False by marking the job as FAILURE, logging the error, and exiting early. This addresses the critical concern from previous reviews.

return "Waiting to start"
elif status in [JobState.STARTED, JobState.RETRY, JobState.SUCCESS]:
return f"{progress:.0%} complete"
return f"{progress:.0%} complete" # noqa E231
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unnecessary noqa directives.

The # noqa E231 comments on lines 75, 437, and 489 are unnecessary—the E231 rule doesn't apply to format specifiers like :.0% or :.2f. Similarly, the # noqa E713 comments on lines 136 and 143 are unnecessary—E713 refers to membership tests, and "not found in" appears only within string literals here. These have been flagged in previous reviews and by static analysis.

Apply this diff to remove the unnecessary directives:

-        return f"{progress:.0%} complete"  # noqa E231
+        return f"{progress:.0%} complete"
-        raise ValueError(f"Job stage with key '{stage_key}' not found in progress")  # noqa E713
+        raise ValueError(f"Job stage with key '{stage_key}' not found in progress")
-        raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")  # noqa E713
+        raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")
-                job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")  # noqa E231
+                job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")
-            job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")  # noqa E231
+            job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")

Also applies to: 136-136, 143-143, 437-437, 489-489

🧰 Tools
🪛 Ruff (0.14.3)

75-75: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

🤖 Prompt for AI Agents
In ami/jobs/models.py around lines 75, 136, 143, 437, and 489, there are
unnecessary "# noqa E231" and "# noqa E713" directives attached to string
formatting and string-literal content; remove those trailing noqa comments on
those specific lines so the code reads normally (e.g., drop "  # noqa E231" at
line 75 and 437 and 489, and drop "  # noqa E713" at lines 136 and 143), run
linters/tests to confirm no other violations remain, and do not add any
replacement noqa directives.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants