-
Notifications
You must be signed in to change notification settings - Fork 61
Open
Labels
enhancementNew feature or requestNew feature or request
Description
I have a use case where i produce a list of file names and I want a task to process each filename in parallel. So I have task that returns the list of filenames, and the next step is a group consisting of one task. However, Director passes the entire list to the task. I assumed that Director would pass each instance from the list to an instance of the task, but that is not how it is working.
Here is my workflow:
poc.ACCEPT-CATALOGS:
tasks:
- LIST-CATALOGS
- GROUP_CATALOGS:
type: group
tasks:
- CREATE-DATASOURCE
schema: catalogs
And here are my tasks:
- LIST-CATALOGS: Retrieves a list of catalogs recently stored in a certain S3 bucket. The as_dict function just converts the S3 Object into a dictionary.
@task(name="LIST-CATALOGS")
def list_catalogs(*args, **kwargs) -> List[Dict]:
domain = kwargs["payload"]["domain"]
catalogs = client.list_objects(bucket_name='*********', prefix=domain,
recursive=False, include_user_meta=True)
result = [ as_dict(c, domain) for c in catalogs ]
return result
- CREATE-DATASOURCE: For each catalog (represented as a dict), pull its object contents from the S3 bucket and do stuff with it. The "stuff" isn't shown here, but it explains my use case.
@task(name="CREATE-DATASOURCE")
def create_datasource(*args, **kwargs):
catalog = args[0] # args[0] contains the entire list and this task is called once, oh no!
# Do stuff here using the info in a dict to pull the object contents
I've done something similar using native Celery, which is why I assumed that Director may work the same way. I defined the Celery Group Canvas like this:
job = group([ create_datasource.si(as_dict(c, domain)) for c in catalogs ])
job.apply_async()
Is there a way to create similar functionality in Celery-Director? Thank you for reading and for any helpful feedback.
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request