-
Notifications
You must be signed in to change notification settings - Fork 11
feat: Processing service V2 #987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 16 commits
ae02d2e
24a15af
0da97a6
2db7d66
8a714cd
700f594
3b42e08
8ea5d7d
61fc2c5
9af597c
7ff8865
0fbe899
7899fc5
d9f8ffd
edad552
d254867
1cc890e
84ee5a2
09fee92
4480b0d
3032709
3e7ef3b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,20 +1,72 @@ | ||
| .editorconfig | ||
| .gitattributes | ||
| .github | ||
| .gitignore | ||
| .gitlab-ci.yml | ||
| .idea | ||
| .pre-commit-config.yaml | ||
| .readthedocs.yml | ||
| .travis.yml | ||
| .git | ||
| ui | ||
| ami/media | ||
| backups | ||
| venv | ||
| .venv | ||
| .env | ||
| .envs | ||
| venv/ | ||
| .venv/ | ||
| .env/ | ||
| .envs/ | ||
| .envs/* | ||
| node_modules | ||
| data | ||
|
|
||
| # Python cache / bytecode | ||
| __pycache__/ | ||
| *.py[cod] | ||
| *.pyo | ||
| *.pyd | ||
| *.pdb | ||
| *.egg-info/ | ||
| *.egg | ||
| *.whl | ||
|
|
||
|
|
||
| # Django / runtime artifacts | ||
| *.log | ||
| *.pot | ||
| *.pyc | ||
| db.sqlite3 | ||
| media/ | ||
| staticfiles/ # collected static files (use collectstatic inside container) | ||
|
|
||
| # Node / UI dependencies (if using React/Vue in your UI service) | ||
| node_modules/ | ||
| npm-debug.log | ||
| yarn-error.log | ||
| .pnpm-debug.log | ||
|
|
||
| # Docs build artifacts | ||
| /docs/_build/ | ||
|
|
||
| # Git / VCS | ||
| .git/ | ||
| .gitignore | ||
| .gitattributes | ||
| *.swp | ||
| *.swo | ||
|
|
||
| # IDE / editor | ||
| .vscode/ | ||
| .idea/ | ||
| *.iml | ||
|
|
||
| # OS cruft | ||
| .DS_Store | ||
| Thumbs.db | ||
|
|
||
| # Docker itself | ||
| .dockerignore | ||
| Dockerfile | ||
| docker-compose*.yml | ||
|
|
||
| # Build / dist | ||
| build/ | ||
| dist/ | ||
| .eggs/ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -276,3 +276,6 @@ sandbox/ | |
|
|
||
| # Other | ||
| flower | ||
|
|
||
| # huggingface cache | ||
| huggingface_cache/ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -72,7 +72,7 @@ def get_status_label(status: JobState, progress: float) -> str: | |
| if status in [JobState.CREATED, JobState.PENDING, JobState.RECEIVED]: | ||
| return "Waiting to start" | ||
| elif status in [JobState.STARTED, JobState.RETRY, JobState.SUCCESS]: | ||
| return f"{progress:.0%} complete" | ||
| return f"{progress:.0%} complete" # noqa E231 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove unnecessary The Apply this diff to remove the unnecessary directives: - return f"{progress:.0%} complete" # noqa E231
+ return f"{progress:.0%} complete"- raise ValueError(f"Job stage with key '{stage_key}' not found in progress") # noqa E713
+ raise ValueError(f"Job stage with key '{stage_key}' not found in progress")- raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'") # noqa E713
+ raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")- job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s") # noqa E231
+ job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")- job.logger.info(f"Processed {percent_successful:.0%} of images successfully.") # noqa E231
+ job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")Also applies to: 136-136, 143-143, 437-437, 489-489 🧰 Tools🪛 Ruff (0.14.3)75-75: Unused blanket Remove unused (RUF100) 🤖 Prompt for AI Agents |
||
| else: | ||
| return f"{status.name}" | ||
|
|
||
|
|
@@ -133,14 +133,14 @@ def get_stage(self, stage_key: str) -> JobProgressStageDetail: | |
| for stage in self.stages: | ||
| if stage.key == stage_key: | ||
| return stage | ||
| raise ValueError(f"Job stage with key '{stage_key}' not found in progress") | ||
| raise ValueError(f"Job stage with key '{stage_key}' not found in progress") # noqa E713 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove unnecessary The - raise ValueError(f"Job stage with key '{stage_key}' not found in progress") # noqa E713
+ raise ValueError(f"Job stage with key '{stage_key}' not found in progress")- raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'") # noqa E713
+ raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'")Also applies to: 143-143 🤖 Prompt for AI Agents |
||
|
|
||
| def get_stage_param(self, stage_key: str, param_key: str) -> ConfigurableStageParam: | ||
| stage = self.get_stage(stage_key) | ||
| for param in stage.params: | ||
| if param.key == param_key: | ||
| return param | ||
| raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'") | ||
| raise ValueError(f"Job stage parameter with key '{param_key}' not found in stage '{stage_key}'") # noqa E713 | ||
|
|
||
| def add_stage_param(self, stage_key: str, param_name: str, value: typing.Any = None) -> ConfigurableStageParam: | ||
| stage = self.get_stage(stage_key) | ||
|
|
@@ -322,15 +322,13 @@ def run(cls, job: "Job"): | |
| """ | ||
| Procedure for an ML pipeline as a job. | ||
| """ | ||
| from ami.ml.orchestration.jobs import queue_images_to_nats | ||
|
|
||
| job.update_status(JobState.STARTED) | ||
| job.started_at = datetime.datetime.now() | ||
| job.finished_at = None | ||
| job.save() | ||
|
|
||
| # Keep track of sub-tasks for saving results, pair with batch number | ||
| save_tasks: list[tuple[int, AsyncResult]] = [] | ||
| save_tasks_completed: list[tuple[int, AsyncResult]] = [] | ||
|
|
||
| if job.delay: | ||
| update_interval_seconds = 2 | ||
| last_update = time.time() | ||
|
|
@@ -365,7 +363,7 @@ def run(cls, job: "Job"): | |
| progress=0, | ||
| ) | ||
|
|
||
| images = list( | ||
| images: list[SourceImage] = list( | ||
| # @TODO return generator plus image count | ||
| # @TODO pass to celery group chain? | ||
| job.pipeline.collect_images( | ||
|
|
@@ -389,8 +387,6 @@ def run(cls, job: "Job"): | |
| images = images[: job.limit] | ||
| image_count = len(images) | ||
| job.progress.add_stage_param("collect", "Limit", image_count) | ||
| else: | ||
| image_count = source_image_count | ||
|
|
||
| job.progress.update_stage( | ||
| "collect", | ||
|
|
@@ -401,6 +397,17 @@ def run(cls, job: "Job"): | |
| # End image collection stage | ||
| job.save() | ||
|
|
||
| if job.project.feature_flags.async_pipeline_workers: | ||
| queue_images_to_nats(job, images) | ||
| else: | ||
| cls.process_images(job, images) | ||
carlosgjs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| @classmethod | ||
| def process_images(cls, job, images): | ||
| image_count = len(images) | ||
| # Keep track of sub-tasks for saving results, pair with batch number | ||
| save_tasks: list[tuple[int, AsyncResult]] = [] | ||
| save_tasks_completed: list[tuple[int, AsyncResult]] = [] | ||
| total_captures = 0 | ||
| total_detections = 0 | ||
| total_classifications = 0 | ||
|
|
@@ -420,7 +427,7 @@ def run(cls, job: "Job"): | |
| job_id=job.pk, | ||
| project_id=job.project.pk, | ||
| ) | ||
| job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s") | ||
| job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s") # noqa E231 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove unnecessary Both - job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s") # noqa E231
+ job.logger.info(f"Processed image batch {i+1} in {time.time() - request_sent:.2f}s")- job.logger.info(f"Processed {percent_successful:.0%} of images successfully.") # noqa E231
+ job.logger.info(f"Processed {percent_successful:.0%} of images successfully.")Also applies to: 482-482 🧰 Tools🪛 Ruff (0.14.2)430-430: Unused blanket Remove unused (RUF100) 🤖 Prompt for AI Agents |
||
| except Exception as e: | ||
| # Log error about image batch and continue | ||
| job.logger.error(f"Failed to process image batch {i+1}: {e}") | ||
|
|
@@ -472,7 +479,7 @@ def run(cls, job: "Job"): | |
|
|
||
| if image_count: | ||
| percent_successful = 1 - len(request_failed_images) / image_count if image_count else 0 | ||
| job.logger.info(f"Processed {percent_successful:.0%} of images successfully.") | ||
| job.logger.info(f"Processed {percent_successful:.0%} of images successfully.") # noqa E231 | ||
|
|
||
| # Check all Celery sub-tasks if they have completed saving results | ||
| save_tasks_remaining = set(save_tasks) - set(save_tasks_completed) | ||
|
|
@@ -513,6 +520,7 @@ def run(cls, job: "Job"): | |
| job.save() | ||
|
|
||
|
|
||
| # TODO: This needs to happen once a job is done | ||
| class DataStorageSyncJob(JobType): | ||
| name = "Data storage sync" | ||
| key = "data_storage_sync" | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.