Skip to content

Am I Misunderstanding Groups in Director? #167

@agile-anthony

Description

@agile-anthony

I have a use case where i produce a list of file names and I want a task to process each filename in parallel. So I have task that returns the list of filenames, and the next step is a group consisting of one task. However, Director passes the entire list to the task. I assumed that Director would pass each instance from the list to an instance of the task, but that is not how it is working.

Here is my workflow:

poc.ACCEPT-CATALOGS:
  tasks:
    - LIST-CATALOGS
    - GROUP_CATALOGS:
        type: group
        tasks:
          - CREATE-DATASOURCE
  schema: catalogs

And here are my tasks:

  1. LIST-CATALOGS: Retrieves a list of catalogs recently stored in a certain S3 bucket. The as_dict function just converts the S3 Object into a dictionary.
@task(name="LIST-CATALOGS")
def list_catalogs(*args, **kwargs) -> List[Dict]:
    domain = kwargs["payload"]["domain"]
    catalogs = client.list_objects(bucket_name='*********', prefix=domain,
                                   recursive=False, include_user_meta=True)
    result = [ as_dict(c, domain) for c in catalogs ]
    return result
  1. CREATE-DATASOURCE: For each catalog (represented as a dict), pull its object contents from the S3 bucket and do stuff with it. The "stuff" isn't shown here, but it explains my use case.
@task(name="CREATE-DATASOURCE")
def create_datasource(*args, **kwargs):
    catalog = args[0]   # args[0] contains the entire list and this task is called once, oh no!
    # Do stuff here using the info in a dict to pull the object contents

I've done something similar using native Celery, which is why I assumed that Director may work the same way. I defined the Celery Group Canvas like this:

job = group([ create_datasource.si(as_dict(c, domain)) for c in catalogs ])
job.apply_async()

Is there a way to create similar functionality in Celery-Director? Thank you for reading and for any helpful feedback.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions