Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,20 +1,72 @@
.editorconfig
.gitattributes
.github
.gitignore
.gitlab-ci.yml
.idea
.pre-commit-config.yaml
.readthedocs.yml
.travis.yml
.git
ui
ami/media
backups
venv
.venv
.env
.envs
venv/
.venv/
.env/
.envs/
.envs/*
node_modules
data

# Python cache / bytecode
__pycache__/
*.py[cod]
*.pyo
*.pyd
*.pdb
*.egg-info/
*.egg
*.whl


# Django / runtime artifacts
*.log
*.pot
*.pyc
db.sqlite3
media/
staticfiles/ # collected static files (use collectstatic inside container)

# Node / UI dependencies (if using React/Vue in your UI service)
node_modules/
npm-debug.log
yarn-error.log
.pnpm-debug.log

# Docs build artifacts
/docs/_build/

# Git / VCS
.git/
.gitignore
.gitattributes
*.swp
*.swo

# IDE / editor
.vscode/
.idea/
*.iml

# OS cruft
.DS_Store
Thumbs.db

# Docker itself
.dockerignore
Dockerfile
docker-compose*.yml

# Build / dist
build/
dist/
.eggs/
3 changes: 3 additions & 0 deletions .envs/.local/.django
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ DJANGO_SUPERUSER_PASSWORD=localadmin
# Redis
REDIS_URL=redis://redis:6379/0

# NATS
NATS_URL=nats://nats:4222

# Celery / Flower
CELERY_FLOWER_USER=QSocnxapfMvzLqJXSsXtnEZqRkBtsmKT
CELERY_FLOWER_PASSWORD=BEQgmCtgyrFieKNoGTsux9YIye0I7P5Q7vEgfJD2C4jxmtHDetFaE2jhS7K7rxaf
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,6 @@ sandbox/

# Other
flower

# huggingface cache
huggingface_cache/
24 changes: 23 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,29 @@
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Remote Attach",
"name": "Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Django attach",
"type": "debugpy",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5679
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": "."
}
]
},
{
"name": "Celery worker attach",
"type": "debugpy",
"request": "attach",
"connect": {
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ docker compose -f processing_services/example/docker-compose.yml up -d
- Django admin: http://localhost:8000/admin/
- OpenAPI / Swagger documentation: http://localhost:8000/api/v2/docs/
- Minio UI: http://minio:9001, Minio service: http://minio:9000
- NATS dashboard: https://natsdashboard.com/ (Add localhost)

NOTE: If one of these services is not working properly, it could be due another process is using the port. You can check for this with `lsof -i :<PORT_NUMBER>`.

Expand Down
5 changes: 3 additions & 2 deletions ami/base/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ def get_active_project(
# If not in URL, try query parameters
if not project_id:
# Look for project_id in GET query parameters or POST data
# POST data returns a list of ints, but QueryDict.get() returns a single value
project_id = request.query_params.get(param) or request.data.get(param)
# request.data may not always be a dict (e.g., for non-POST requests), so we check its type
post_data = request.data if isinstance(request.data, dict) else {}
project_id = request.query_params.get(param) or post_data.get(param)

project_id = SingleParamSerializer[int].clean(
param_name=param,
Expand Down
11 changes: 11 additions & 0 deletions ami/jobs/management/commands/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Process Single Image Command

A Django management command for processing a single image through a pipeline. Useful for testing, debugging, and reprocessing individual images.

## Usage

### With Wait Flag (Monitor Progress)

```bash
docker compose run --rm web python manage.py process_single_image 12345 --pipeline 1 --wait
```
166 changes: 166 additions & 0 deletions ami/jobs/management/commands/process_single_image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Management command to process a single image through a pipeline for testing/debugging."""

import logging
import time

from django.core.management.base import BaseCommand, CommandError

from ami.jobs.utils import submit_single_image_job
from ami.main.models import Detection, SourceImage
from ami.ml.models import Pipeline

logger = logging.getLogger(__name__)


class Command(BaseCommand):
help = "Submit a job to process a single image through a pipeline (for testing/debugging)"

def add_arguments(self, parser):
parser.add_argument("image_id", type=int, help="SourceImage ID to process")
parser.add_argument(
"--pipeline",
type=int,
required=True,
help="Pipeline ID to use for processing",
)
parser.add_argument(
"--name",
type=str,
default=None,
help="Custom job name (optional)",
)
parser.add_argument(
"--wait",
action="store_true",
help="Wait for the job to complete and show results",
)
parser.add_argument(
"--poll-interval",
type=int,
default=2,
help="Polling interval in seconds when using --wait (default: 2)",
)

def handle(self, *args, **options):
image_id = options["image_id"]
pipeline_id = options["pipeline"]
job_name = options["name"]
wait = options["wait"]
poll_interval = options["poll_interval"]

# Validate image exists
try:
image = SourceImage.objects.select_related("deployment__project").get(pk=image_id)
self.stdout.write(self.style.SUCCESS(f"✓ Found image: {image.path}"))
self.stdout.write(f" Project: {image.deployment.project.name}")
self.stdout.write(f" Deployment: {image.deployment.name}")
except SourceImage.DoesNotExist:
raise CommandError(f"SourceImage with id {image_id} does not exist")

# Validate pipeline exists
try:
pipeline = Pipeline.objects.get(pk=pipeline_id)
self.stdout.write(self.style.SUCCESS(f"✓ Using pipeline: {pipeline.name} (v{pipeline.version})"))
except Pipeline.DoesNotExist:
raise CommandError(f"Pipeline with id {pipeline_id} does not exist")

# Submit the job
self.stdout.write("")
self.stdout.write(self.style.WARNING("Submitting job..."))

try:
job = submit_single_image_job(
image_id=image_id,
pipeline_id=pipeline_id,
job_name=job_name,
)
except Exception as e:
raise CommandError(f"Failed to submit job: {str(e)}")

self.stdout.write(
self.style.SUCCESS(
f"✓ Job {job.pk} created and enqueued\n"
f" Task ID: {job.task_id}\n"
f" Status: {job.status}\n"
f" Name: {job.name}"
)
)

if not wait:
self.stdout.write("")
self.stdout.write("To check job status, run:")
self.stdout.write(f" Job.objects.get(pk={job.pk}).status")
return

# Wait for job completion
self.stdout.write("")
self.stdout.write(self.style.WARNING("Waiting for job to complete..."))
self.stdout.write("(Press Ctrl+C to stop waiting)\n")

try:
start_time = time.time()
last_status = None
last_progress = None

while True:
job.refresh_from_db()
progress = job.progress.summary.progress * 100
status = job.status

# Only update display if something changed
if status != last_status or abs(progress - (last_progress or 0)) > 0.1:
elapsed = time.time() - start_time
self.stdout.write(
f" Status: {status:15s} | Progress: {progress:5.1f}% | Elapsed: {elapsed:6.1f}s",
ending="\r",
)
last_status = status
last_progress = progress

# Check if job is done
if job.status in ["SUCCESS", "FAILURE", "REVOKED", "REJECTED"]:
self.stdout.write("") # New line after progress updates
break

time.sleep(poll_interval)

except KeyboardInterrupt:
self.stdout.write("")
self.stdout.write(self.style.WARNING("\n⚠ Stopped waiting (job is still running)"))
self.stdout.write(f" Job ID: {job.pk}")
return

# Show results
self.stdout.write("")
elapsed_total = time.time() - start_time

if job.status == "SUCCESS":
self.stdout.write(self.style.SUCCESS(f"✓ Job completed successfully in {elapsed_total:.1f}s"))

# Show results summary
detection_count = Detection.objects.filter(source_image_id=image_id).count()
self.stdout.write("\nResults:")
self.stdout.write(f" Detections created: {detection_count}")

# Show classifications if any
if detection_count > 0:
from ami.main.models import Classification

classification_count = Classification.objects.filter(detection__source_image_id=image_id).count()
self.stdout.write(f" Classifications created: {classification_count}")

elif job.status == "FAILURE":
self.stdout.write(self.style.ERROR(f"✗ Job failed after {elapsed_total:.1f}s"))
self.stdout.write("\nCheck job logs for details:")
self.stdout.write(f" Job.objects.get(pk={job.pk}).logs")

# Show any error messages
if job.progress.errors:
self.stdout.write("\nErrors:")
for error in job.progress.errors[-5:]: # Last 5 errors
self.stdout.write(f" - {error}") # noqa: E221

else:
self.stdout.write(self.style.WARNING(f"⚠ Job ended with status: {job.status}"))

self.stdout.write(f"\nJob ID: {job.pk}")
Loading