Skip to content

Commit 1a16b9d

Browse files
authored
feat(openlineage): Add parentRunFacet for DAG events (#57809)
1 parent afa5bff commit 1a16b9d

File tree

9 files changed

+897
-52
lines changed

9 files changed

+897
-52
lines changed

providers/openlineage/docs/guides/user.rst

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,56 @@ You can enable this automation by setting ``spark_inject_transport_info`` option
478478
AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO=true
479479
480480
481+
Passing parent information to Airflow DAG
482+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
483+
484+
To enable full OpenLineage lineage tracking across dependent DAGs, you can pass parent and root job information
485+
through the DAG's ``dag_run.conf``. When a DAG run configuration includes an ``openlineage`` section with valid metadata,
486+
this information is automatically parsed and converted into DAG run's ``parentRunFacet``, from which the root information
487+
is also propagated to all tasks. If no DAG run ``openlineage`` configuration is provided, the DAG run will not contain
488+
``parentRunFacet`` and root of all tasks will default to Dag run.
489+
490+
The ``openlineage`` dict in conf should contain the following keys:
491+
492+
493+
*(all three values must be included to create a parent reference)*
494+
495+
- **parentRunId** — the unique run ID (uuid) of the direct parent job
496+
- **parentJobName** — the name of the parent job
497+
- **parentJobNamespace** — the namespace of the parent job
498+
499+
*(all three values must be included to create a root reference, otherwise parent will be used as root)*
500+
501+
- **rootParentRunId** — the run ID (uuid) of the top-level (root) job
502+
- **rootParentJobName** — the name of the top-level (root) job
503+
- **rootParentJobNamespace** — the namespace of the top-level (root) job
504+
505+
.. note::
506+
507+
We highly recommend providing all six OpenLineage identifiers (parent and root) to ensure complete lineage tracking. If the root information is missing, the parent set will be used as the root; if any of the three parent fields are missing, no parent facet will be created. Partial or mixed configurations are not supported - either all three parent or all three root values must be provided together.
508+
509+
510+
Example:
511+
512+
.. code-block:: shell
513+
514+
curl -X POST "http://<AIRFLOW_HOST>/api/v2/dags/my_dag_name/dagRuns" \
515+
-H "Content-Type: application/json" \
516+
-d '{
517+
"logical_date": "2019-08-24T14:15:22Z",
518+
"conf": {
519+
"openlineage": {
520+
"parentRunId": "3bb703d1-09c1-4a42-8da5-35a0b3216072",
521+
"parentJobNamespace": "prod_biz",
522+
"parentJobName": "get_files",
523+
"rootParentRunId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e",
524+
"rootParentJobNamespace": "prod_analytics",
525+
"rootParentJobName": "generate_report_sales_e2e"
526+
}
527+
}
528+
}'
529+
530+
481531
Troubleshooting
482532
===============
483533

providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
get_airflow_mapped_task_facet,
4242
get_airflow_run_facet,
4343
get_dag_documentation,
44+
get_dag_parent_run_facet,
4445
get_job_name,
46+
get_root_information_from_dagrun_conf,
4547
get_task_documentation,
4648
get_task_parent_run_facet,
4749
get_user_provided_run_facets,
@@ -224,7 +226,11 @@ def on_running():
224226
task=task_metadata,
225227
run_facets={
226228
**get_user_provided_run_facets(task_instance, TaskInstanceState.RUNNING),
227-
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
229+
**get_task_parent_run_facet(
230+
parent_run_id=parent_run_id,
231+
parent_job_name=dag.dag_id,
232+
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
233+
),
228234
**get_airflow_mapped_task_facet(task_instance),
229235
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
230236
**debug_facet,
@@ -351,7 +357,11 @@ def on_success():
351357
nominal_end_time=data_interval_end,
352358
run_facets={
353359
**get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS),
354-
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
360+
**get_task_parent_run_facet(
361+
parent_run_id=parent_run_id,
362+
parent_job_name=dag.dag_id,
363+
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
364+
),
355365
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
356366
**get_airflow_debug_facet(),
357367
},
@@ -489,7 +499,11 @@ def on_failure():
489499
job_description_type=doc_type,
490500
run_facets={
491501
**get_user_provided_run_facets(task_instance, TaskInstanceState.FAILED),
492-
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
502+
**get_task_parent_run_facet(
503+
parent_run_id=parent_run_id,
504+
parent_job_name=dag.dag_id,
505+
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
506+
),
493507
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
494508
**get_airflow_debug_facet(),
495509
},
@@ -540,7 +554,11 @@ def on_state_change():
540554
"job_description": None,
541555
"job_description_type": None,
542556
"run_facets": {
543-
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=ti.dag_id),
557+
**get_task_parent_run_facet(
558+
parent_run_id=parent_run_id,
559+
parent_job_name=ti.dag_id,
560+
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
561+
),
544562
**get_airflow_debug_facet(),
545563
},
546564
}
@@ -645,8 +663,6 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None:
645663
)
646664
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None
647665

648-
run_facets = {**get_airflow_dag_run_facet(dag_run)}
649-
650666
date = dag_run.logical_date
651667
if AIRFLOW_V_3_0_PLUS and date is None:
652668
date = dag_run.run_after
@@ -660,7 +676,6 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None:
660676
start_date=dag_run.start_date,
661677
nominal_start_time=data_interval_start,
662678
nominal_end_time=data_interval_end,
663-
run_facets=run_facets,
664679
clear_number=dag_run.clear_number,
665680
owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None,
666681
job_description=doc,
@@ -669,6 +684,10 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None:
669684
# AirflowJobFacet should be created outside ProcessPoolExecutor that pickles objects,
670685
# as it causes lack of some TaskGroup attributes and crashes event emission.
671686
job_facets=get_airflow_job_facet(dag_run=dag_run),
687+
run_facets={
688+
**get_airflow_dag_run_facet(dag_run),
689+
**get_dag_parent_run_facet(getattr(dag_run, "conf", {})),
690+
},
672691
)
673692
except BaseException as e:
674693
self.log.warning("OpenLineage received exception in method on_dag_run_running", exc_info=e)
@@ -716,7 +735,10 @@ def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None:
716735
job_description_type=doc_type,
717736
task_ids=task_ids,
718737
dag_run_state=dag_run.get_state(),
719-
run_facets={**get_airflow_dag_run_facet(dag_run)},
738+
run_facets={
739+
**get_airflow_dag_run_facet(dag_run),
740+
**get_dag_parent_run_facet(getattr(dag_run, "conf", {})),
741+
},
720742
)
721743
except BaseException as e:
722744
self.log.warning("OpenLineage received exception in method on_dag_run_success", exc_info=e)
@@ -765,7 +787,10 @@ def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None:
765787
dag_run_state=dag_run.get_state(),
766788
task_ids=task_ids,
767789
msg=msg,
768-
run_facets={**get_airflow_dag_run_facet(dag_run)},
790+
run_facets={
791+
**get_airflow_dag_run_facet(dag_run),
792+
**get_dag_parent_run_facet(getattr(dag_run, "conf", {})),
793+
},
769794
)
770795
except BaseException as e:
771796
self.log.warning("OpenLineage received exception in method on_dag_run_failed", exc_info=e)

providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from airflow.providers.openlineage import conf
2222
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
23-
from airflow.providers.openlineage.utils.utils import get_job_name
23+
from airflow.providers.openlineage.utils.utils import get_job_name, get_root_information_from_dagrun_conf
2424
from airflow.providers.openlineage.version_compat import AIRFLOW_V_3_0_PLUS
2525

2626
if TYPE_CHECKING:
@@ -102,51 +102,69 @@ def lineage_root_parent_id(task_instance: TaskInstance):
102102
"""
103103
return "/".join(
104104
(
105-
lineage_job_namespace(),
105+
lineage_root_job_namespace(task_instance),
106106
lineage_root_job_name(task_instance),
107107
lineage_root_run_id(task_instance),
108108
)
109109
)
110110

111111

112112
def lineage_root_job_name(task_instance: TaskInstance):
113+
root_parent_job_name = _get_ol_root_id("root_parent_job_name", task_instance)
114+
if root_parent_job_name:
115+
return root_parent_job_name
113116
return task_instance.dag_id
114117

115118

116119
def lineage_root_run_id(task_instance: TaskInstance):
120+
root_parent_run_id = _get_ol_root_id("root_parent_run_id", task_instance)
121+
if root_parent_run_id:
122+
return root_parent_run_id
117123
return OpenLineageAdapter.build_dag_run_id(
118124
dag_id=task_instance.dag_id,
119125
logical_date=_get_logical_date(task_instance),
120126
clear_number=_get_dag_run_clear_number(task_instance),
121127
)
122128

123129

130+
def lineage_root_job_namespace(task_instance: TaskInstance):
131+
root_parent_job_namespace = _get_ol_root_id("root_parent_job_namespace", task_instance)
132+
if root_parent_job_namespace:
133+
return root_parent_job_namespace
134+
return conf.namespace()
135+
136+
137+
def _get_ol_root_id(id_key: str, task_instance: TaskInstance) -> str | None:
138+
dr_conf = _get_dag_run_conf(task_instance=task_instance)
139+
ol_root_info = get_root_information_from_dagrun_conf(dr_conf=dr_conf)
140+
if ol_root_info and ol_root_info.get(id_key):
141+
return ol_root_info[id_key]
142+
return None
143+
144+
145+
def _get_dagrun_from_ti(task_instance: TaskInstance):
146+
context = task_instance.get_template_context()
147+
if getattr(task_instance, "dag_run", None):
148+
return task_instance.dag_run
149+
return context["dag_run"]
150+
151+
152+
def _get_dag_run_conf(task_instance: TaskInstance) -> dict:
153+
dr = _get_dagrun_from_ti(task_instance=task_instance)
154+
return dr.conf or {}
155+
156+
124157
def _get_dag_run_clear_number(task_instance: TaskInstance):
125-
# todo: remove when min airflow version >= 3.0
126-
if AIRFLOW_V_3_0_PLUS:
127-
context = task_instance.get_template_context()
128-
if hasattr(task_instance, "dag_run"):
129-
dag_run = task_instance.dag_run
130-
else:
131-
dag_run = context["dag_run"]
132-
return dag_run.clear_number
133-
return task_instance.dag_run.clear_number
158+
dr = _get_dagrun_from_ti(task_instance=task_instance)
159+
return dr.clear_number
134160

135161

136162
def _get_logical_date(task_instance):
137-
# todo: remove when min airflow version >= 3.0
138163
if AIRFLOW_V_3_0_PLUS:
139-
context = task_instance.get_template_context()
140-
if hasattr(task_instance, "dag_run"):
141-
dag_run = task_instance.dag_run
142-
else:
143-
dag_run = context["dag_run"]
144-
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
145-
date = dag_run.logical_date
146-
else:
147-
date = dag_run.run_after
148-
elif hasattr(task_instance, "logical_date"):
149-
date = task_instance.logical_date
150-
else:
151-
date = task_instance.execution_date
152-
return date
164+
dr = _get_dagrun_from_ti(task_instance=task_instance)
165+
if getattr(dr, "logical_date", None):
166+
return dr.logical_date
167+
return dr.run_after
168+
if getattr(task_instance, "logical_date", None):
169+
return task_instance.logical_date
170+
return task_instance.execution_date

providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
lineage_job_namespace,
2929
lineage_parent_id,
3030
lineage_root_job_name,
31+
lineage_root_job_namespace,
3132
lineage_root_parent_id,
3233
lineage_root_run_id,
3334
lineage_run_id,
@@ -51,6 +52,7 @@ class OpenLineageProviderPlugin(AirflowPlugin):
5152
lineage_parent_id,
5253
lineage_root_run_id,
5354
lineage_root_job_name,
55+
lineage_root_job_namespace,
5456
lineage_root_parent_id,
5557
]
5658
listeners = [get_openlineage_listener()]

0 commit comments

Comments
 (0)