Skip to content

action_on_event_received only process events sent by current workflow execution #358

@jiangxin369

Description

@jiangxin369

Describe the feature

Currently, the event is broadcasting, once the wanted event is sent, all workflow executions of this workflow would be triggered.

Describe the solution you'd like

As the scheduler dispatcher would inspect the context of each event to figure out if it contains the workflow execution id, we can inject the runtime context of each task execution to the user-defined event to make sure the event will only effect on the current workflow execution.

We need the following changes:

  • Add a global variable _CURRENT_TASK_CONTEXT in context.py, it is used to store the runtime context of each task execution.
  • To read and write the _CURRENT_TASK_CONTEXT, add get_runtime_task_context and set_runtime_task_context functions.
  • Add a public API called wrap_execution_context to inject the runtime info to the context of the event before sending it.
_CURRENT_TASK_CONTEXT: TaskExecutionContext = None


def set_runtime_task_context(context: TaskExecutionContext):
    global _CURRENT_TASK_CONTEXT
    _CURRENT_TASK_CONTEXT = context


def get_runtime_task_context():
    return _CURRENT_TASK_CONTEXT


def wrap_execution_info_to_context(event: Event):
    """
  The event whose context is wrapped with workflow execution info would only be processed by specific workflow execution.
  """
    pass

How to use it?

def func():
    notification_client = get_notification_client()
    event = Event(event_key=EVENT_KEY, message='This is a custom message.')
    
    // wrap event with context
    wrap_execution_info_to_context(event)
    // send event
    notification_client.send_event(event)

with Workflow(name='workflow') as w1:
    task = PythonOperator(name='task', python_callable=func)

Describe alternatives you've considered

Additional context

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions