Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,20 +1,72 @@
.editorconfig
.gitattributes
.github
.gitignore
.gitlab-ci.yml
.idea
.pre-commit-config.yaml
.readthedocs.yml
.travis.yml
.git
ui
ami/media
backups
venv
.venv
.env
.envs
venv/
.venv/
.env/
.envs/
.envs/*
node_modules
data

# Python cache / bytecode
__pycache__/
*.py[cod]
*.pyo
*.pyd
*.pdb
*.egg-info/
*.egg
*.whl


# Django / runtime artifacts
*.log
*.pot
*.pyc
db.sqlite3
media/
staticfiles/ # collected static files (use collectstatic inside container)

# Node / UI dependencies (if using React/Vue in your UI service)
node_modules/
npm-debug.log
yarn-error.log
.pnpm-debug.log

# Docs build artifacts
/docs/_build/

# Git / VCS
.git/
.gitignore
.gitattributes
*.swp
*.swo

# IDE / editor
.vscode/
.idea/
*.iml

# OS cruft
.DS_Store
Thumbs.db

# Docker itself
.dockerignore
Dockerfile
docker-compose*.yml

# Build / dist
build/
dist/
.eggs/
3 changes: 3 additions & 0 deletions .envs/.local/.django
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ DJANGO_SUPERUSER_PASSWORD=localadmin
# Redis
REDIS_URL=redis://redis:6379/0

# NATS
NATS_URL=nats://nats:4222

# Celery / Flower
CELERY_FLOWER_USER=QSocnxapfMvzLqJXSsXtnEZqRkBtsmKT
CELERY_FLOWER_PASSWORD=BEQgmCtgyrFieKNoGTsux9YIye0I7P5Q7vEgfJD2C4jxmtHDetFaE2jhS7K7rxaf
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,6 @@ sandbox/

# Other
flower

# huggingface cache
huggingface_cache/
24 changes: 23 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,29 @@
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Remote Attach",
"name": "Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Django attach",
"type": "debugpy",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5679
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": "."
}
]
},
{
"name": "Celery worker attach",
"type": "debugpy",
"request": "attach",
"connect": {
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ docker compose -f processing_services/example/docker-compose.yml up -d
- Django admin: http://localhost:8000/admin/
- OpenAPI / Swagger documentation: http://localhost:8000/api/v2/docs/
- Minio UI: http://minio:9001, Minio service: http://minio:9000
- NATS dashboard: https://natsdashboard.com/ (Add localhost)

NOTE: If one of these services is not working properly, it could be due another process is using the port. You can check for this with `lsof -i :<PORT_NUMBER>`.

Expand Down
5 changes: 3 additions & 2 deletions ami/base/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ def get_active_project(
# If not in URL, try query parameters
if not project_id:
# Look for project_id in GET query parameters or POST data
# POST data returns a list of ints, but QueryDict.get() returns a single value
project_id = request.query_params.get(param) or request.data.get(param)
# request.data may not always be a dict (e.g., for non-POST requests), so we check its type
post_data = request.data if isinstance(request.data, dict) else {}
project_id = request.query_params.get(param) or post_data.get(param)

project_id = SingleParamSerializer[int].clean(
param_name=param,
Expand Down
32 changes: 20 additions & 12 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def get_status_label(status: JobState, progress: float) -> str:
if status in [JobState.CREATED, JobState.PENDING, JobState.RECEIVED]:
return "Waiting to start"
elif status in [JobState.STARTED, JobState.RETRY, JobState.SUCCESS]:
return f"{progress:.0%} complete"
return f"{progress:.0%} complete" # noqa E231
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unnecessary noqa directives.

The # noqa E231 comments on lines 75, 437, and 489 are unnecessary—the E231 rule doesn't apply to format specifiers like :.0% or :.2f. Similarly, the # noqa E713 comments on lines 136 and 143 are unnecessary—E713 refers to membership tests, and "not found in" appears only within string literals here. These have been flagged in previous reviews and by static analysis.

Apply this diff to remove the unnecessary directives:

-        return f"{progress:.0%} complete"  # noqa E231
+        return f"{progress:.0%} complete"
-        raise ValueError(f"Job stage with key '{stage_key}' not found in progress")  # noqa E713
+        raise ValueError(f"Job stage with key '{stage_key}' not found in progress")
-        raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")  # noqa E713
+        raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")
-                job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")  # noqa E231
+                job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")
-            job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")  # noqa E231
+            job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")

Also applies to: 136-136, 143-143, 437-437, 489-489

🧰 Tools
🪛 Ruff (0.14.3)

75-75: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

🤖 Prompt for AI Agents
In ami/jobs/models.py around lines 75, 136, 143, 437, and 489, there are
unnecessary "# noqa E231" and "# noqa E713" directives attached to string
formatting and string-literal content; remove those trailing noqa comments on
those specific lines so the code reads normally (e.g., drop "  # noqa E231" at
line 75 and 437 and 489, and drop "  # noqa E713" at lines 136 and 143), run
linters/tests to confirm no other violations remain, and do not add any
replacement noqa directives.

else:
return f"{status.name}"

Expand Down Expand Up @@ -133,14 +133,14 @@ def get_stage(self, stage_key: str) -> JobProgressStageDetail:
for stage in self.stages:
if stage.key == stage_key:
return stage
raise ValueError(f"Job stage with key '{stage_key}' not found in progress")
raise ValueError(f"Job stage with key '{stage_key}' not found in progress") # noqa E713
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unnecessary noqa directives.

The # noqa E713 comments are unnecessary. The E713 rule (test for membership should be not in) applies to code logic, not to text within string literals. Since "not found in" appears in error messages here, not as actual membership tests, these suppressions serve no purpose.

-        raise ValueError(f"Job stage with key '{stage_key}' not found in progress")  # noqa E713
+        raise ValueError(f"Job stage with key '{stage_key}' not found in progress")
-        raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")  # noqa E713
+        raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")

Also applies to: 143-143

🤖 Prompt for AI Agents
In ami/jobs/models.py around lines 136 and 143, the inline comments "# noqa
E713" are unnecessary because the E713 rule refers to membership tests and the
text "not found in" appears only inside string literals in the error messages;
remove both "# noqa E713" occurrences so the linter isn't being silenced
unnecessarily and rerun linters/tests to confirm no violations remain.


def get_stage_param(self, stage_key: str, param_key: str) -> ConfigurableStageParam:
stage = self.get_stage(stage_key)
for param in stage.params:
if param.key == param_key:
return param
raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")
raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'") # noqa E713

def add_stage_param(self, stage_key: str, param_name: str, value: typing.Any = None) -> ConfigurableStageParam:
stage = self.get_stage(stage_key)
Expand Down Expand Up @@ -322,15 +322,13 @@ def run(cls, job: "Job"):
"""
Procedure for an ML pipeline as a job.
"""
from ami.ml.orchestration.jobs import queue_images_to_nats

job.update_status(JobState.STARTED)
job.started_at = datetime.datetime.now()
job.finished_at = None
job.save()

# Keep track of sub-tasks for saving results, pair with batch number
save_tasks: list[tuple[int, AsyncResult]] = []
save_tasks_completed: list[tuple[int, AsyncResult]] = []

if job.delay:
update_interval_seconds = 2
last_update = time.time()
Expand Down Expand Up @@ -365,7 +363,7 @@ def run(cls, job: "Job"):
progress=0,
)

images = list(
images: list[SourceImage] = list(
# @TODO return generator plus image count
# @TODO pass to celery group chain?
job.pipeline.collect_images(
Expand All @@ -389,8 +387,6 @@ def run(cls, job: "Job"):
images = images[: job.limit]
image_count = len(images)
job.progress.add_stage_param("collect", "Limit", image_count)
else:
image_count = source_image_count

job.progress.update_stage(
"collect",
Expand All @@ -401,6 +397,17 @@ def run(cls, job: "Job"):
# End image collection stage
job.save()

if job.project.feature_flags.async_pipeline_workers:
queue_images_to_nats(job, images)
else:
cls.process_images(job, images)

@classmethod
def process_images(cls, job, images):
image_count = len(images)
# Keep track of sub-tasks for saving results, pair with batch number
save_tasks: list[tuple[int, AsyncResult]] = []
save_tasks_completed: list[tuple[int, AsyncResult]] = []
total_captures = 0
total_detections = 0
total_classifications = 0
Expand All @@ -420,7 +427,7 @@ def run(cls, job: "Job"):
job_id=job.pk,
project_id=job.project.pk,
)
job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")
job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s") # noqa E231
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unnecessary noqa directives.

Both # noqa E231 comments are unnecessary. The E231 rule doesn't apply to format specifiers like :.2f or :.0%.

-                job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")  # noqa E231
+                job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")
-            job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")  # noqa E231
+            job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")

Also applies to: 482-482

🧰 Tools
🪛 Ruff (0.14.2)

430-430: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

🤖 Prompt for AI Agents
In ami/jobs/models.py around lines 430 and 482, there are unnecessary "# noqa
E231" comments on lines that contain format specifiers (e.g., f"Processed image
batch {i+1} in {time.time() - request_sent:.2f}s"); remove those "# noqa E231"
directives from both locations so the code is cleaner and linter directives are
only used when truly needed.

except Exception as e:
# Log error about image batch and continue
job.logger.error(f"Failed to process image batch {i+1}: {e}")
Expand Down Expand Up @@ -472,7 +479,7 @@ def run(cls, job: "Job"):

if image_count:
percent_successful = 1 - len(request_failed_images) / image_count if image_count else 0
job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")
job.logger.info(f"Processed {percent_successful:.0%} of images successfully.") # noqa E231

# Check all Celery sub-tasks if they have completed saving results
save_tasks_remaining = set(save_tasks) - set(save_tasks_completed)
Expand Down Expand Up @@ -513,6 +520,7 @@ def run(cls, job: "Job"):
job.save()


# TODO: This needs to happen once a job is done
class DataStorageSyncJob(JobType):
name = "Data storage sync"
key = "data_storage_sync"
Expand Down
108 changes: 108 additions & 0 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from celery.result import AsyncResult
from celery.signals import task_failure, task_postrun, task_prerun

from ami.ml.orchestration.nats_queue import TaskQueueManager
from ami.ml.orchestration.task_state import TaskStateManager
from ami.ml.orchestration.utils import run_in_async_loop
from ami.ml.schemas import PipelineResultsResponse
from ami.tasks import default_soft_time_limit, default_time_limit
from config import celery_app

Expand Down Expand Up @@ -30,6 +34,110 @@ def run_job(self, job_id: int) -> None:
job.logger.info(f"Finished job {job}")


@celery_app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(Exception,),
soft_time_limit=300, # 5 minutes
time_limit=360, # 6 minutes
)
def process_pipeline_result(self, job_id: int, result_data: dict, reply_subject: str) -> None:
"""
Process a single pipeline result asynchronously.

This task:
1. Deserializes the pipeline result
2. Saves it to the database
3. Updates progress by removing processed image IDs from Redis
4. Acknowledges the task via NATS

Args:
job_id: The job ID
result_data: Dictionary containing the pipeline result
reply_subject: NATS reply subject for acknowledgment

Returns:
dict with status information
"""
from ami.jobs.models import Job, JobState # avoid circular import

try:
job = Job.objects.get(pk=job_id)
job.logger.info(f"Processing pipeline result for job {job_id}, reply_subject: {reply_subject}")

# Save to database (this is the slow operation)
if not job.pipeline:
job.logger.warning(f"Job {job_id} has no pipeline, skipping save_results")
return

job.logger.info(f"Successfully saved results for job {job_id}")

# Deserialize the result
pipeline_result = PipelineResultsResponse(**result_data)

# Update progress tracking in Redis
state_manager = TaskStateManager(job.pk)
processed_image_ids = {str(img.id) for img in pipeline_result.source_images}
state_manager.mark_images_processed(processed_image_ids)

progress_info = state_manager.get_progress()
progress_percentage = 0.0

if progress_info is not None:
# Get updated progress
progress_percentage = progress_info.percentage

job.logger.info(
f"Job {job_id} progress: {progress_info.processed}/{progress_info.total} images processed "
f"({progress_percentage*100}%), {progress_info.remaining} remaining"
)
else:
job.logger.warning(f"No pending images found in Redis for job {job_id}, setting progress to 100%")
progress_percentage = 1.0

job.progress.update_stage(
"process",
status=JobState.SUCCESS if progress_percentage >= 1.0 else JobState.STARTED,
progress=progress_percentage,
)
job.save()

job.pipeline.save_results(results=pipeline_result, job_id=job.pk)
# Acknowledge the task via NATS
try:

async def ack_task():
async with TaskQueueManager() as manager:
return await manager.acknowledge_task(reply_subject)

ack_success = run_in_async_loop(ack_task, f"acknowledging job {job.pk} via NATS")

if ack_success:
job.logger.info(f"Successfully acknowledged task via NATS: {reply_subject}")
else:
job.logger.warning(f"Failed to acknowledge task via NATS: {reply_subject}")
except Exception as ack_error:
job.logger.error(f"Error acknowledging task via NATS: {ack_error}")
# Don't fail the task if ACK fails - data is already saved

# Update job stage with calculated progress
job.progress.update_stage(
"results",
status=JobState.STARTED if progress_percentage < 1.0 else JobState.SUCCESS,
progress=progress_percentage,
)
job.save()

except Job.DoesNotExist:
logger.error(f"Job {job_id} not found")
raise
except Exception as e:
logger.error(f"Failed to process pipeline result for job {job_id}: {e}")
# Celery will automatically retry based on autoretry_for
raise


@task_postrun.connect(sender=run_job)
@task_prerun.connect(sender=run_job)
def update_job_status(sender, task_id, task, *args, **kwargs):
Expand Down
Loading