-
Notifications
You must be signed in to change notification settings - Fork 11
feat: Processing service V2 #987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for antenna-preview canceled.
|
|
Exciting! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces Processing Service V2, enabling a pull-based task queue architecture using NATS JetStream instead of the push-based Celery approach. Workers can now pull tasks via HTTP endpoints, process them independently, and acknowledge completion without maintaining persistent connections.
Key changes:
- Added NATS JetStream integration for distributed task queuing with configurable visibility timeouts
- Introduced new REST API endpoints for task pulling (
/jobs/{id}/tasks) and result submission (/jobs/{id}/result) - Implemented Redis-based progress tracking to handle asynchronous worker updates
Reviewed Changes
Copilot reviewed 15 out of 16 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| requirements/base.txt | Added nats-py dependency for NATS client support |
| object_model_diagram.md | Added comprehensive Mermaid diagram documenting ML pipeline system architecture |
| docker-compose.yml | Added NATS JetStream service with health checks and monitoring |
| config/settings/base.py | Added NATS_URL configuration setting |
| ami/utils/nats_queue.py | New TaskQueueManager class for NATS JetStream operations |
| ami/jobs/views.py | Added task pulling and result submission endpoints with pipeline filtering |
| ami/jobs/utils.py | Helper function for running async code in sync Django context |
| ami/jobs/tasks.py | New Celery task for processing pipeline results asynchronously |
| ami/jobs/task_state.py | TaskStateManager for Redis-based job progress tracking |
| ami/jobs/models.py | Added queue_images_to_nats method and NATS cleanup logic |
| ami/base/views.py | Fixed request.data handling when not a dict |
| README.md | Added NATS dashboard documentation link |
| .vscode/launch.json | Added debug configurations for Django and Celery containers |
| .envs/.local/.django | Added NATS_URL environment variable |
| .dockerignore | Expanded with comprehensive ignore patterns |
Comments suppressed due to low confidence (1)
object_model_diagram.md:1
- The comment at line 13 appears to be template text from instructions rather than actual documentation content. This namedtuple field description doesn't match the file's purpose as an object model diagram.
# Object Model Diagram: ML Pipeline System
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds NATS JetStream-based async orchestration for ML pipelines (TaskQueueManager, JetStream publish/reserve/ack, cleanup), Redis-backed TaskStateManager, run-in-async-loop helper, job queuing/cleanup helpers, Celery result processing task, MLJob routing gated by a new feature flag, API endpoints/filters, local/settings NATS config, tooling/docs, and ignore/edit config updates. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant API as Jobs API
participant MLJob
participant QueueMgr as TaskQueueManager
participant NATS
participant State as TaskStateManager
participant Redis
participant Celery
participant Worker as NATS Worker
Note over MLJob,QueueMgr: Job submission / queuing (async flag on)
API->>MLJob: create/submit job
MLJob->>State: initialize_job(image_ids, stages)
State->>Redis: store pending IDs & total
MLJob->>QueueMgr: publish_task(job_id, image_msg) x N
QueueMgr->>NATS: publish messages
NATS-->>Worker: deliver task(s)
alt Worker processes and ACKs
Worker->>NATS: acknowledge_task(reply_subject)
Worker->>Celery: send pipeline result
Celery->>State: update_state(processed_ids, stage, request_id)
State->>Redis: update pending -> compute progress
else synchronous path (flag disabled)
MLJob->>MLJob: process_images() (sync batches, DB updates)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 inconclusive)
✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (19)
.dockerignore(1 hunks).envs/.local/.django(1 hunks).gitignore(1 hunks).vscode/launch.json(1 hunks)README.md(1 hunks)ami/base/views.py(1 hunks)ami/jobs/models.py(8 hunks)ami/jobs/tasks.py(2 hunks)ami/jobs/views.py(3 hunks)ami/main/models.py(1 hunks)ami/ml/orchestration/jobs.py(1 hunks)ami/ml/orchestration/nats_queue.py(1 hunks)ami/ml/orchestration/task_state.py(1 hunks)ami/ml/orchestration/utils.py(1 hunks)ami/utils/requests.py(2 hunks)config/settings/base.py(2 hunks)docker-compose.yml(4 hunks)object_model_diagram.md(1 hunks)requirements/base.txt(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
ami/ml/orchestration/nats_queue.py (1)
ami/jobs/views.py (1)
result(256-339)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
cleanup(20-23)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
process_pipeline_result(45-138)ami/jobs/models.py (4)
Job(727-1012)JobState(27-63)logger(997-1006)final_states(58-59)ami/ml/orchestration/nats_queue.py (2)
TaskQueueManager(28-294)reserve_task(152-208)
ami/jobs/tasks.py (5)
ami/ml/orchestration/nats_queue.py (2)
TaskQueueManager(28-294)acknowledge_task(210-229)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-97)mark_images_processed(48-61)get_progress(63-90)ami/ml/orchestration/utils.py (1)
run_in_async_loop(8-18)ami/jobs/models.py (5)
Job(727-1012)JobState(27-63)logger(997-1006)update_stage(168-188)save(947-958)ami/ml/models/pipeline.py (3)
save(1115-1121)save_results(809-917)save_results(1107-1108)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (2)
Job(727-1012)logger(997-1006)ami/ml/orchestration/nats_queue.py (3)
TaskQueueManager(28-294)cleanup_job_resources(278-294)publish_task(119-150)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-97)cleanup(92-97)initialize_job(38-46)ami/ml/orchestration/utils.py (1)
run_in_async_loop(8-18)
ami/ml/orchestration/utils.py (1)
ami/jobs/models.py (1)
logger(997-1006)
ami/base/views.py (1)
ami/main/api/views.py (1)
get(1595-1651)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
queue_images_to_nats(28-107)ami/main/models.py (1)
SourceImage(1622-1870)ami/ml/models/pipeline.py (2)
process_images(163-278)process_images(1091-1105)
🪛 LanguageTool
object_model_diagram.md
[grammar] ~167-~167: Ensure spelling is correct
Context: ...ts 4. Job tracks progress through JobProgress and JobProgressStageDetail
(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)
🪛 markdownlint-cli2 (0.18.1)
object_model_diagram.md
15-15: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
31-31: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
38-38: Bare URL used
(MD034, no-bare-urls)
39-39: Bare URL used
(MD034, no-bare-urls)
40-40: Bare URL used
(MD034, no-bare-urls)
41-41: Bare URL used
(MD034, no-bare-urls)
42-42: Bare URL used
(MD034, no-bare-urls)
42-42: Bare URL used
(MD034, no-bare-urls)
43-43: Bare URL used
(MD034, no-bare-urls)
61-61: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
77-77: Bare URL used
(MD034, no-bare-urls)
97-97: Bare URL used
(MD034, no-bare-urls)
118-118: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
122-122: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
126-126: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
130-130: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
🪛 Ruff (0.14.2)
ami/ml/orchestration/nats_queue.py
70-70: Unused method argument: ttr
(ARG002)
73-73: Avoid specifying long messages outside the exception class
(TRY003)
81-81: Do not catch blind exception: Exception
(BLE001)
94-94: Avoid specifying long messages outside the exception class
(TRY003)
103-103: Do not catch blind exception: Exception
(BLE001)
132-132: Avoid specifying long messages outside the exception class
(TRY003)
146-146: Consider moving this statement to an else block
(TRY300)
148-148: Do not catch blind exception: Exception
(BLE001)
149-149: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
164-164: Avoid specifying long messages outside the exception class
(TRY003)
206-206: Do not catch blind exception: Exception
(BLE001)
207-207: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
221-221: Avoid specifying long messages outside the exception class
(TRY003)
226-226: Consider moving this statement to an else block
(TRY300)
227-227: Do not catch blind exception: Exception
(BLE001)
228-228: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
242-242: Avoid specifying long messages outside the exception class
(TRY003)
250-250: Consider moving this statement to an else block
(TRY300)
251-251: Do not catch blind exception: Exception
(BLE001)
252-252: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
266-266: Avoid specifying long messages outside the exception class
(TRY003)
273-273: Consider moving this statement to an else block
(TRY300)
274-274: Do not catch blind exception: Exception
(BLE001)
275-275: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/orchestration/task_state.py
35-35: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
36-36: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
ami/jobs/views.py
33-43: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
73-73: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
218-218: Unused method argument: pk
(ARG002)
236-236: Avoid specifying long messages outside the exception class
(TRY003)
244-244: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
283-283: Avoid specifying long messages outside the exception class
(TRY003)
288-288: Avoid specifying long messages outside the exception class
(TRY003)
298-298: Avoid specifying long messages outside the exception class
(TRY003)
301-301: Avoid specifying long messages outside the exception class
(TRY003)
322-322: Do not catch blind exception: Exception
(BLE001)
323-323: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/jobs/tasks.py
45-45: Unused function argument: self
(ARG001)
120-120: Do not catch blind exception: Exception
(BLE001)
121-121: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
133-133: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
136-136: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
config/settings/base.py
28-28: Unused noqa directive (non-enabled: E231, E501)
Remove unused noqa directive
(RUF100)
ami/ml/orchestration/jobs.py
69-69: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
78-78: Do not catch blind exception: Exception
(BLE001)
ami/ml/orchestration/utils.py
14-14: Do not catch blind exception: Exception
(BLE001)
15-15: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/jobs/models.py
75-75: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
430-430: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
482-482: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
| async def _ensure_consumer(self, job_id: str, ttr: int = 30): | ||
| """Ensure consumer exists for the given job.""" | ||
| if self.js is None: | ||
| raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.") | ||
|
|
||
| stream_name = self._get_stream_name(job_id) | ||
| consumer_name = self._get_consumer_name(job_id) | ||
| subject = self._get_subject(job_id) | ||
|
|
||
| try: | ||
| info = await self.js.consumer_info(stream_name, consumer_name) | ||
| logger.debug(f"Consumer {consumer_name} already exists: {info}") | ||
| except Exception: | ||
| # Consumer doesn't exist, create it | ||
| await self.js.add_consumer( | ||
| stream=stream_name, | ||
| config=ConsumerConfig( | ||
| durable_name=consumer_name, | ||
| ack_policy=AckPolicy.EXPLICIT, | ||
| ack_wait=ttr, # Visibility timeout (TTR) | ||
| max_deliver=5, # Max retry attempts | ||
| deliver_policy=DeliverPolicy.ALL, | ||
| max_ack_pending=100, # Max unacked messages | ||
| filter_subject=subject, | ||
| ), | ||
| ) | ||
| logger.info(f"Created consumer {consumer_name}") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix TTR mismatch when the consumer already exists
If a worker calls reserve_task() before anything is published, _ensure_consumer() creates the durable with the default ttr=30. Later, when publish_task() tries to use a longer ttr, the existing consumer is left untouched—JetStream does not let us mutate ack_wait on an existing durable—so the queue keeps timing out at 30 s. That results in premature redeliveries for any job that genuinely needs a longer visibility timeout.
Please make _ensure_consumer() recreate the durable whenever the currently configured ack_wait differs from the requested ttr (and keep passing the desired ttr from both call sites). One way to do it:
@@
- try:
- info = await self.js.consumer_info(stream_name, consumer_name)
- logger.debug(f"Consumer {consumer_name} already exists: {info}")
- except Exception:
- # Consumer doesn't exist, create it
- await self.js.add_consumer(
- stream=stream_name,
- config=ConsumerConfig(
- durable_name=consumer_name,
- ack_policy=AckPolicy.EXPLICIT,
- ack_wait=ttr, # Visibility timeout (TTR)
- max_deliver=5, # Max retry attempts
- deliver_policy=DeliverPolicy.ALL,
- max_ack_pending=100, # Max unacked messages
- filter_subject=subject,
- ),
- )
- logger.info(f"Created consumer {consumer_name}")
+ info = None
+ try:
+ info = await self.js.consumer_info(stream_name, consumer_name)
+ ack_wait_seconds = int(info.config.ack_wait.total_seconds())
+ if ack_wait_seconds != ttr:
+ await self.js.delete_consumer(stream_name, consumer_name)
+ info = None
+ except Exception:
+ info = None
+
+ if info is None:
+ await self.js.add_consumer(
+ stream=stream_name,
+ config=ConsumerConfig(
+ durable_name=consumer_name,
+ ack_policy=AckPolicy.EXPLICIT,
+ ack_wait=ttr, # Visibility timeout (TTR)
+ max_deliver=5,
+ deliver_policy=DeliverPolicy.ALL,
+ max_ack_pending=100,
+ filter_subject=subject,
+ ),
+ )
+ logger.info(f"Created consumer {consumer_name} with ack_wait={ttr}")
+ else:
+ logger.debug(
+ f"Consumer {consumer_name} already exists with ack_wait={ack_wait_seconds}"
+ )This keeps the visibility timeout aligned with what the publisher expects and prevents unintended fast requeues. Let me know if you need help wiring the desired ttr through the call sites.
Also applies to: 170-173
🧰 Tools
🪛 Ruff (0.14.2)
94-94: Avoid specifying long messages outside the exception class
(TRY003)
103-103: Do not catch blind exception: Exception
(BLE001)
| def run_in_async_loop(func: typing.Callable, error_msg: str) -> typing.Any: | ||
| # helper to use new_event_loop() to ensure we're not mixing with Django's async context | ||
| loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(loop) | ||
| try: | ||
| return loop.run_until_complete(func()) | ||
| except Exception as e: | ||
| logger.error(f"Error in async loop - {error_msg}: {e}") | ||
| return None | ||
| finally: | ||
| loop.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore the previous event loop after running the coroutine.
Creating a brand-new loop and leaving it registered after loop.close() means the current thread keeps a closed loop attached; any later asyncio.get_event_loop() calls will hand back that closed instance, causing RuntimeError: Event loop is closed. Please capture the prior loop, set the new loop only for the duration of the run, and restore the original (or clear it) before closing. That keeps the thread’s loop policy healthy for the rest of the process. (docs.python.org)
def run_in_async_loop(func: typing.Callable, error_msg: str) -> typing.Any:
- # helper to use new_event_loop() to ensure we're not mixing with Django's async context
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
+ # helper to use new_event_loop() to ensure we're not mixing with Django's async context
+ loop = asyncio.new_event_loop()
+ try:
+ previous_loop = asyncio.get_event_loop()
+ except RuntimeError:
+ previous_loop = None
+ asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(func())
except Exception as e:
- logger.error(f"Error in async loop - {error_msg}: {e}")
+ logger.exception("Error in async loop - %s", error_msg)
return None
finally:
+ if previous_loop is not None:
+ asyncio.set_event_loop(previous_loop)
+ else:
+ asyncio.set_event_loop(None)
loop.close()🧰 Tools
🪛 Ruff (0.14.2)
14-14: Do not catch blind exception: Exception
(BLE001)
15-15: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
In ami/ml/orchestration/utils.py around lines 8 to 18, the function creates a
new asyncio event loop and sets it as the current loop but does not restore the
previous loop before closing, which can leave a closed loop registered and later
cause RuntimeError; fix this by saving the current loop
(asyncio.get_event_loop() or None if there is no running loop), set the new loop
only for the duration of run_until_complete, and in the finally block restore
the original loop (or call asyncio.set_event_loop(None) if there was no prior
loop) before calling loop.close(), ensuring the thread’s loop reference is not
left pointing at a closed loop.
| EXTERNAL_BASE_URL = env( | ||
| "EXTERNAL_BASE_URL", default=f"http://{EXTERNAL_HOSTNAME}" # noqa: E231, E501 # type: ignore[no-untyped-call] | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop the unused # noqa.
Ruff raises RUF100 here because # noqa: E231, E501 is suppressing nothing, so the current code fails lint. Remove the directive and lay out the call cleanly instead.
-EXTERNAL_BASE_URL = env(
- "EXTERNAL_BASE_URL", default=f"http://{EXTERNAL_HOSTNAME}" # noqa: E231, E501 # type: ignore[no-untyped-call]
-)
+EXTERNAL_BASE_URL = env(
+ "EXTERNAL_BASE_URL",
+ default=f"http://{EXTERNAL_HOSTNAME}",
+) # type: ignore[no-untyped-call]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| EXTERNAL_BASE_URL = env( | |
| "EXTERNAL_BASE_URL", default=f"http://{EXTERNAL_HOSTNAME}" # noqa: E231, E501 # type: ignore[no-untyped-call] | |
| ) | |
| EXTERNAL_BASE_URL = env( | |
| "EXTERNAL_BASE_URL", | |
| default=f"http://{EXTERNAL_HOSTNAME}", | |
| ) # type: ignore[no-untyped-call] |
🧰 Tools
🪛 Ruff (0.14.2)
28-28: Unused noqa directive (non-enabled: E231, E501)
Remove unused noqa directive
(RUF100)
🤖 Prompt for AI Agents
In config/settings/base.py around lines 27 to 29, remove the unused "# noqa:
E231, E501" comment and reformat the env call to avoid line-length/lint issues;
keep the existing "# type: ignore[no-untyped-call]" if needed, place the
arguments on separate lines (or wrap the call) so the default
f"http://{EXTERNAL_HOSTNAME}" is clear and the line length is within limits, and
ensure there are no trailing noqa directives left.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (1)
ami/jobs/models.py (1)
400-403: CRITICAL: Handle failures fromqueue_images_to_nats.The return value from
queue_images_to_natsis ignored. According to the function definition inami/ml/orchestration/jobs.py, it returnsFalsewhen image queueing fails. Without checking this return value, failed jobs will remain stuck inSTARTEDstate indefinitely with no work enqueued and no visible error for operators.This issue was previously flagged but remains unaddressed.
if job.project.feature_flags.async_pipeline_workers: - queue_images_to_nats(job, images) + success = queue_images_to_nats(job, images) + if not success: + job.logger.error("Failed to queue images to NATS for job %s", job.pk) + job.progress.update_stage("collect", status=JobState.FAILURE) + job.update_status(JobState.FAILURE, save=False) + job.finished_at = datetime.datetime.now() + job.save() + return else: cls.process_images(job, images)
🧹 Nitpick comments (1)
ami/ml/orchestration/__init__.py (1)
6-6: Remove unnecessarynoqadirectives from commented code.The
noqadirectives serve no purpose on a commented-out line and add confusion.Apply this diff:
-# from .processing import * # noqa: F401, F403 +# from .processing import *
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
ami/jobs/models.py(7 hunks)ami/ml/orchestration/__init__.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
queue_images_to_nats(28-107)ami/main/models.py (1)
SourceImage(1622-1870)ami/ml/models/pipeline.py (2)
process_images(163-278)process_images(1091-1105)
🪛 Ruff (0.14.2)
ami/jobs/models.py
75-75: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
430-430: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
482-482: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
ami/ml/orchestration/__init__.py
6-6: Unused noqa directive (unused: F403; non-enabled: F401)
Remove unused noqa directive
(RUF100)
🔇 Additional comments (4)
ami/ml/orchestration/__init__.py (1)
1-6: Removal of re-export is safe and correctly resolves the circular dependency.The codebase has only one import of processing-related code (
from ami.ml.orchestration.processing import process_single_source_imageinami/main/models.py:1533), which directly accesses the.processingsubmodule and is unaffected by removing the re-export from__init__.py. No code imports fromami.ml.orchestrationto access processing symbols. The re-export created a circular dependency chain (ami.jobs.models → ami.jobs.tasks → ami.ml.orchestration → ami.ml.orchestration.processing → ami.jobs.models), and removing it correctly breaks this cycle.ami/jobs/models.py (3)
366-366: Good type hint improvement.The explicit
list[SourceImage]type annotation improves code clarity and type safety.
405-410: Good refactoring to separate processing logic.Extracting the image processing workflow into a dedicated
process_imagesclassmethod improves code organization and makes it easier to maintain and test the V1 (push-based) processing path independently from the V2 (pull-based) path.
325-326: The import placement is correct and cannot be moved to module level.This function-level import is necessary to avoid a circular dependency.
ami.ml.orchestration.jobsimports fromami.jobs.modelsat the module level, so movingqueue_images_to_natsto the top-level imports inami.jobs.modelswould create a circular import cycle. The current placement inside the function is the appropriate solution.Likely an incorrect or invalid review comment.
| if stage.key == stage_key: | ||
| return stage | ||
| raise ValueError(f"Job stage with key '{stage_key}' not found in progress") | ||
| raise ValueError(f"Job stage with key '{stage_key}' not found in progress") # noqa E713 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unnecessary noqa directives.
The # noqa E713 comments are unnecessary. The E713 rule (test for membership should be not in) applies to code logic, not to text within string literals. Since "not found in" appears in error messages here, not as actual membership tests, these suppressions serve no purpose.
- raise ValueError(f"Job stage with key '{stage_key}' not found in progress") # noqa E713
+ raise ValueError(f"Job stage with key '{stage_key}' not found in progress")- raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'") # noqa E713
+ raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")Also applies to: 143-143
🤖 Prompt for AI Agents
In ami/jobs/models.py around lines 136 and 143, the inline comments "# noqa
E713" are unnecessary because the E713 rule refers to membership tests and the
text "not found in" appears only inside string literals in the error messages;
remove both "# noqa E713" occurrences so the linter isn't being silenced
unnecessarily and rerun linters/tests to confirm no violations remain.
| project_id=job.project.pk, | ||
| ) | ||
| job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s") | ||
| job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s") # noqa E231 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unnecessary noqa directives.
Both # noqa E231 comments are unnecessary. The E231 rule doesn't apply to format specifiers like :.2f or :.0%.
- job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s") # noqa E231
+ job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")- job.logger.info(f"Processed {percent_successful:.0%} of images successfully.") # noqa E231
+ job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")Also applies to: 482-482
🧰 Tools
🪛 Ruff (0.14.2)
430-430: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
🤖 Prompt for AI Agents
In ami/jobs/models.py around lines 430 and 482, there are unnecessary "# noqa
E231" comments on lines that contain format specifiers (e.g., f"Processed image
batch {i+1} in {time.time() - request_sent:.2f}s"); remove those "# noqa E231"
directives from both locations so the code is cleaner and linter directives are
only used when truly needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
ami/jobs/models.py(7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/jobs/models.py (2)
ami/ml/orchestration/jobs.py (1)
queue_images_to_nats(28-107)ami/ml/models/pipeline.py (3)
save(1115-1121)process_images(163-278)process_images(1091-1105)
🪛 Ruff (0.14.2)
ami/jobs/models.py
401-401: Expected an indented block after if statement
(invalid-syntax)
🔇 Additional comments (1)
ami/jobs/models.py (1)
413-528: Well-structured extraction of image processing logic.The new
process_imagesclassmethod cleanly extracts the synchronous image processing workflow from therunmethod. The implementation includes proper error handling, progress tracking, batch-level logging, and save-task management. The existing TODO comment on line 500-501 correctly notes thatsub_task.wait()is not recommended, but this is acceptable as a known temporary limitation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
ami/jobs/tasks.py (1)
80-143: Don’t mark progress until after durable save + ACKWe remove image IDs from Redis and advance the “process” stage before
save_resultsruns and well before we confirm the JetStream ACK. Ifsave_resultsthrows or the ACK fails, the task retries (or JetStream redelivers), yet Redis already shows the images done and the job stage isSUCCESS. At that point the worker cannot recover accurate progress, and the retry will double-write results. Move bothupdate_state(..., "process")and the corresponding_update_job_progresscall to after a successfulsave_resultsand ACK, and make ACK failure fatal so Celery retries before we mutate Redis. Perform the same gating for the “results” stage so we never remove IDs unless the message is fully settled. request_verification- state_manager = TaskStateManager(job_id) - - progress_info = state_manager.update_state(processed_image_ids, stage="process", request_id=self.request.id) - if not progress_info: - ... - try: - _update_job_progress(job_id, "process", progress_info.percentage) - ... - if pipeline_result: - job.pipeline.save_results(...) - ... - ack_success = run_in_async_loop(...) - if ack_success: - ... - # Update job stage with calculated progress - progress_info = state_manager.update_state(processed_image_ids, stage="results", request_id=self.request.id) - if not progress_info: - ... - _update_job_progress(job_id, "results", progress_info.percentage) + state_manager = TaskStateManager(job_id) + + try: + job = Job.objects.get(pk=job_id) + ... + if pipeline_result: + job.pipeline.save_results(results=pipeline_result, job_id=job.pk) + ... + ack_success = run_in_async_loop(...) + if not ack_success: + raise RuntimeError(f"Failed to acknowledge task via NATS: {reply_subject}") + + progress_info = state_manager.update_state( + processed_image_ids, stage="process", request_id=self.request.id + ) + if not progress_info: + raise self.retry(countdown=5, max_retries=10) + _update_job_progress(job_id, "process", progress_info.percentage) + + progress_info = state_manager.update_state( + processed_image_ids, stage="results", request_id=self.request.id + ) + if not progress_info: + raise self.retry(countdown=5, max_retries=10) + _update_job_progress(job_id, "results", progress_info.percentage)
🧹 Nitpick comments (4)
ami/jobs/utils.py (1)
36-48: Uselogging.exceptionin exception handlers for better diagnostics.In exception handlers, prefer
logging.exceptionoverlogging.errorto automatically include the stack trace, which aids debugging.Based on learnings
Apply this diff:
try: image = SourceImage.objects.select_related("deployment__project").get(pk=image_id) except SourceImage.DoesNotExist: - logger.error(f"SourceImage with id {image_id} does not exist") + logger.exception(f"SourceImage with id {image_id} does not exist") raise # Fetch the pipeline and validate it exists try: pipeline = Pipeline.objects.get(pk=pipeline_id) except Pipeline.DoesNotExist: - logger.error(f"Pipeline with id {pipeline_id} does not exist") + logger.exception(f"Pipeline with id {pipeline_id} does not exist") raiseami/jobs/management/commands/process_single_image.py (3)
51-65: Consider removing duplicate validation logic.The validation of
SourceImageandPipelinehere duplicates the validation already performed insidesubmit_single_image_job(lines 36-48 inami/jobs/utils.py). While this provides earlier feedback to the user, it creates maintenance overhead if validation logic changes.If you prefer immediate user feedback, keep this validation. Otherwise, rely on the exceptions raised by
submit_single_image_joband handle them in the submission try-except block (lines 71-78).
71-78: Replace blindExceptioncatch with specific exceptions.Catching
Exceptionis too broad and may mask unexpected errors. Catch only the specific exceptions thatsubmit_single_image_jobcan raise.Apply this diff:
try: job = submit_single_image_job( image_id=image_id, pipeline_id=pipeline_id, job_name=job_name, ) - except Exception as e: - raise CommandError(f"Failed to submit job: {str(e)}") + except (SourceImage.DoesNotExist, Pipeline.DoesNotExist, Project.DoesNotExist) as e: + raise CommandError(f"Failed to submit job: {str(e)}") from eNote: If you keep the validation at lines 51-65,
SourceImage.DoesNotExistandPipeline.DoesNotExistwon't be raised here, so you can simplify to justProject.DoesNotExist.
161-161: Remove unusednoqadirective.The
noqa: E221directive is unnecessary as E221 is not an enabled rule in your Ruff configuration.Apply this diff:
for error in job.progress.errors[-5:]: # Last 5 errors - self.stdout.write(f" - {error}") # noqa: E221 + self.stdout.write(f" - {error}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
ami/jobs/management/commands/README.md(1 hunks)ami/jobs/management/commands/process_single_image.py(1 hunks)ami/jobs/tasks.py(3 hunks)ami/jobs/utils.py(1 hunks)ami/ml/orchestration/__init__.py(1 hunks)ami/ml/orchestration/jobs.py(1 hunks)ami/ml/orchestration/task_state.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
ami/jobs/management/commands/process_single_image.py (2)
ami/jobs/utils.py (1)
submit_single_image_job(12-78)ami/main/models.py (1)
Classification(2208-2352)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
cleanup(20-23)
ami/jobs/utils.py (1)
ami/jobs/models.py (2)
Job(734-1019)enqueue(816-833)
ami/jobs/tasks.py (6)
ami/jobs/views.py (1)
result(256-339)ami/ml/orchestration/nats_queue.py (2)
TaskQueueManager(28-294)acknowledge_task(210-229)ami/ml/orchestration/task_state.py (2)
TaskStateManager(17-126)update_state(55-84)ami/ml/orchestration/utils.py (1)
run_in_async_loop(8-18)ami/jobs/models.py (4)
Job(734-1019)logger(1004-1013)update_stage(168-188)save(954-965)ami/ml/models/pipeline.py (3)
save_results(809-917)save_results(1107-1108)save(1115-1121)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (5)
Job(734-1019)JobState(27-63)logger(1004-1013)update_stage(168-188)save(954-965)ami/ml/orchestration/nats_queue.py (2)
cleanup_job_resources(278-294)publish_task(119-150)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-126)cleanup(120-126)initialize_job(38-50)ami/ml/orchestration/utils.py (1)
run_in_async_loop(8-18)
🪛 Ruff (0.14.3)
ami/jobs/management/commands/process_single_image.py
44-44: Unused method argument: args
(ARG002)
58-58: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
58-58: Avoid specifying long messages outside the exception class
(TRY003)
65-65: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
65-65: Avoid specifying long messages outside the exception class
(TRY003)
77-77: Do not catch blind exception: Exception
(BLE001)
78-78: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
78-78: Avoid specifying long messages outside the exception class
(TRY003)
78-78: Use explicit conversion flag
Replace with conversion flag
(RUF010)
161-161: Unused noqa directive (non-enabled: E221)
Remove unused noqa directive
(RUF100)
ami/ml/orchestration/task_state.py
35-35: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
36-36: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
53-53: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
ami/jobs/utils.py
40-40: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
47-47: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/orchestration/__init__.py
5-5: Unused noqa directive (unused: F403; non-enabled: F401)
Remove unused noqa directive
(RUF100)
ami/jobs/tasks.py
128-128: Do not catch blind exception: Exception
(BLE001)
129-129: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
140-140: Abstract raise to an inner function
(TRY301)
144-144: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
147-147: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/orchestration/jobs.py
69-69: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
78-78: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (4)
ami/jobs/utils.py (1)
56-78: Well-structured job creation and enqueuing flow.The implementation correctly generates a default job name, creates the Job with appropriate associations (project, pipeline, image), and enqueues it via Celery. Logging at key points aids observability.
ami/jobs/management/commands/process_single_image.py (3)
18-42: Well-designed CLI interface.The argument definitions are clear and provide good defaults. The
--waitflag with configurable--poll-intervaloffers flexible UX for both scripted and interactive use.
100-131: Excellent job polling and progress display implementation.The polling loop efficiently tracks status changes, provides live progress updates using
\rfor in-place updates, and gracefully handlesKeyboardInterrupt. This creates a polished user experience.
137-165: Comprehensive result reporting enhances debuggability.The result display logic provides actionable feedback for success, failure, and unexpected statuses, including detection/classification counts and error summaries. This is particularly valuable for testing and debugging workflows.
| # Infer project from image if not provided | ||
| if project_id is None: | ||
| project = image.deployment.project | ||
| else: | ||
| project = Project.objects.get(pk=project_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential Project.DoesNotExist exception.
When project_id is explicitly provided, Project.objects.get(pk=project_id) can raise Project.DoesNotExist, but this exception is neither caught nor documented in the function's Raises section.
Wrap the fetch in a try-except block and document the exception:
# Infer project from image if not provided
if project_id is None:
project = image.deployment.project
else:
- project = Project.objects.get(pk=project_id)
+ try:
+ project = Project.objects.get(pk=project_id)
+ except Project.DoesNotExist:
+ logger.exception(f"Project with id {project_id} does not exist")
+ raiseAnd update the docstring:
Raises:
SourceImage.DoesNotExist: If the image doesn't exist
Pipeline.DoesNotExist: If the pipeline doesn't exist
+ Project.DoesNotExist: If the provided project_id doesn't exist🤖 Prompt for AI Agents
In ami/jobs/utils.py around lines 50 to 54, the code calls
Project.objects.get(pk=project_id) without handling Project.DoesNotExist; wrap
that call in a try/except that catches Project.DoesNotExist and re-raises it
with clearer context (e.g., include project_id and a brief message) using "raise
Project.DoesNotExist(... ) from e" so stacktrace is preserved, and update the
function docstring's Raises section to list Project.DoesNotExist with the new
behavior.
| def initialize_job(self, image_ids: list[str], stages: list[str]) -> None: | ||
| """ | ||
| Initialize job tracking with a list of image IDs to process. | ||
| Args: | ||
| image_ids: List of image IDs that need to be processed | ||
| stages: List of stages to track for each image | ||
| """ | ||
| self.stages = stages | ||
| for stage in stages: | ||
| cache.set(self._get_pending_key(stage), image_ids, timeout=self.TIMEOUT) | ||
|
|
||
| cache.set(self._total_key, len(image_ids), timeout=self.TIMEOUT) | ||
|
|
||
| def _get_pending_key(self, stage: str) -> str: | ||
| return f"{self._pending_key}:{stage}" # noqa E231 | ||
|
|
||
| def update_state( | ||
| self, | ||
| processed_image_ids: set[str], | ||
| stage: str, | ||
| request_id: str, | ||
| ) -> None | TaskProgress: | ||
| """ | ||
| Update the task state with newly processed images. | ||
| Args: | ||
| processed_image_ids: Set of image IDs that have just been processed | ||
| """ | ||
| # Create a unique lock key for this job | ||
| lock_key = f"job:{self.job_id}:process_results_lock" | ||
| lock_timeout = 360 # 6 minutes (matches task time_limit) | ||
| lock_acquired = cache.add(lock_key, request_id, timeout=lock_timeout) | ||
| if not lock_acquired: | ||
| return None | ||
|
|
||
| try: | ||
| # Update progress tracking in Redis | ||
| progress_info = self._get_progress(processed_image_ids, stage) | ||
| return progress_info | ||
| finally: | ||
| # Always release the lock when done | ||
| current_lock_value = cache.get(lock_key) | ||
| # Only delete if we still own the lock (prevents race condition) | ||
| if current_lock_value == request_id: | ||
| cache.delete(lock_key) | ||
| logger.debug(f"Released lock for job {self.job_id}, task {request_id}") | ||
|
|
||
| def _get_progress(self, processed_image_ids: set[str], stage: str) -> TaskProgress | None: | ||
| """ | ||
| Get current progress information for the job. | ||
| Returns: | ||
| TaskProgress namedtuple with fields: | ||
| - remaining: Number of images still pending (or None if not tracked) | ||
| - total: Total number of images (or None if not tracked) | ||
| - processed: Number of images processed (or None if not tracked) | ||
| - percentage: Progress as float 0.0-1.0 (or None if not tracked) | ||
| """ | ||
| pending_images = cache.get(self._get_pending_key(stage)) | ||
| total_images = cache.get(self._total_key) | ||
| if pending_images is None or total_images is None: | ||
| return None | ||
| remaining_images = [img_id for img_id in pending_images if img_id not in processed_image_ids] | ||
| assert len(pending_images) >= len(remaining_images) | ||
| cache.set(self._get_pending_key(stage), remaining_images, timeout=self.TIMEOUT) | ||
|
|
||
| remaining = len(remaining_images) | ||
| processed = total_images - remaining | ||
| percentage = float(processed) / total_images if total_images > 0 else 1.0 | ||
| logger.info( | ||
| f"Pending images from Redis for job {self.job_id} {stage}: " | ||
| f"{remaining}/{total_images}: {percentage*100}%" | ||
| ) | ||
|
|
||
| return TaskProgress( | ||
| remaining=remaining, | ||
| total=total_images, | ||
| processed=processed, | ||
| percentage=percentage, | ||
| ) | ||
|
|
||
| def cleanup(self) -> None: | ||
| """ | ||
| Delete all Redis keys associated with this job. | ||
| """ | ||
| for stage in self.stages: | ||
| cache.delete(self._get_pending_key(stage)) | ||
| cache.delete(self._total_key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Persist the tracked stages so cleanup() works in new processes
cleanup() assumes self.stages is populated on this instance, but initialize_job() only sets it in-memory. As soon as another worker or process instantiates TaskStateManager(job_id) (the typical pattern for cleanup after a job finishes), self.stages is missing and cleanup() raises AttributeError, leaving Redis keys behind. Persist the stage list alongside the other job keys so any instance can retrieve it before cleaning up. request_verification
class TaskStateManager:
@@
- self._pending_key = f"job:{job_id}:pending_images" # noqa E231
- self._total_key = f"job:{job_id}:pending_images_total" # noqa E231
+ self._pending_key = f"job:{job_id}:pending_images"
+ self._total_key = f"job:{job_id}:pending_images_total"
+ self._stages_key = f"job:{job_id}:pending_images_stages"
+ self.stages: list[str] = cache.get(self._stages_key, [])
@@
- self.stages = stages
+ self.stages = stages
+ cache.set(self._stages_key, stages, timeout=self.TIMEOUT)
@@
def cleanup(self) -> None:
@@
- for stage in self.stages:
+ for stage in self.stages:
cache.delete(self._get_pending_key(stage))
cache.delete(self._total_key)
+ cache.delete(self._stages_key)Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff (0.14.3)
53-53: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
ami/jobs/models.py (1)
412-413: Consider adding type hints and a docstring.The new
process_imagesmethod would benefit from explicit type hints for its parameters and a docstring describing its purpose, especially since the codebase is adding type annotations (see line 366).Example:
@classmethod def process_images(cls, job: "Job", images: list[SourceImage]) -> None: """ Process images through the ML pipeline in batches. Args: job: The Job instance to update with progress images: List of SourceImage instances to process """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
ami/jobs/models.py(7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
queue_images_to_nats(28-107)ami/main/models.py (12)
SourceImage(1622-1870)save(320-323)save(883-896)save(1118-1121)save(1461-1468)save(1560-1563)save(1828-1831)save(2109-2130)save(2346-2352)save(2525-2530)save(2813-2830)save(3450-3453)ami/ml/models/pipeline.py (3)
save(1115-1121)process_images(163-278)process_images(1091-1105)
🪛 Ruff (0.14.3)
ami/jobs/models.py
75-75: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
437-437: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
489-489: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
🔇 Additional comments (3)
ami/jobs/models.py (3)
325-326: Good use of local import to avoid circular dependencies.Importing
queue_images_to_natswithin the method scope is appropriate here, especially if it helps avoid circular import issues between the jobs and orchestration modules.
366-366: Good addition of type annotation.The explicit type annotation
list[SourceImage]improves code clarity and enables better IDE support.
400-410: LGTM: Async routing with proper failure handling.The feature-flag-driven routing logic correctly handles the failure case when
queue_images_to_natsreturnsFalseby marking the job asFAILURE, logging the error, and exiting early. This addresses the critical concern from previous reviews.
| return "Waiting to start" | ||
| elif status in [JobState.STARTED, JobState.RETRY, JobState.SUCCESS]: | ||
| return f"{progress:.0%} complete" | ||
| return f"{progress:.0%} complete" # noqa E231 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unnecessary noqa directives.
The # noqa E231 comments on lines 75, 437, and 489 are unnecessary—the E231 rule doesn't apply to format specifiers like :.0% or :.2f. Similarly, the # noqa E713 comments on lines 136 and 143 are unnecessary—E713 refers to membership tests, and "not found in" appears only within string literals here. These have been flagged in previous reviews and by static analysis.
Apply this diff to remove the unnecessary directives:
- return f"{progress:.0%} complete" # noqa E231
+ return f"{progress:.0%} complete"- raise ValueError(f"Job stage with key '{stage_key}' not found in progress") # noqa E713
+ raise ValueError(f"Job stage with key '{stage_key}' not found in progress")- raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'") # noqa E713
+ raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")- job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s") # noqa E231
+ job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")- job.logger.info(f"Processed {percent_successful:.0%} of images successfully.") # noqa E231
+ job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")Also applies to: 136-136, 143-143, 437-437, 489-489
🧰 Tools
🪛 Ruff (0.14.3)
75-75: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
🤖 Prompt for AI Agents
In ami/jobs/models.py around lines 75, 136, 143, 437, and 489, there are
unnecessary "# noqa E231" and "# noqa E713" directives attached to string
formatting and string-literal content; remove those trailing noqa comments on
those specific lines so the code reads normally (e.g., drop " # noqa E231" at
line 75 and 437 and 489, and drop " # noqa E713" at lines 136 and 143), run
linters/tests to confirm no other violations remain, and do not add any
replacement noqa directives.
Summary
Initial version of the Processing service V2.
Current state
The V2 path is working but disabled in this PR pending the compatibility work. When enabled, starting a job will create a queue for that job and populate with one task per image. The tasks can be pulled and ACKed via the APIs below.
List of Changes
pipelineandid_onlyparameters to the/jobsAPI. This will allow workers to query for jobs for pipelines they can process. E.g.localhost:8000/api/v2/jobs?pipeline=11&ids_only=1localhost:8000/api/v2/jobs/11/tasks?batch=4localhost:8000/api/v2/jobs/11/result/DjangoorceleryworkercontainersTODOs:
Related Issues
See issues #970 and #971.
How to Test the Changes
This path can be enabled by turning on the
job.project.feature_flags.async_pipeline_workersfeature flag, seeami/jobs/models.py:400:And running the
ami workerfrom RolnickLab/ami-data-companion#94Test
Test both modes by tweaking the flag in the django admin console:

Deployment Notes
Checklist
Summary by CodeRabbit
New Features
Documentation
Chores
Bug Fixes