Skip to content

Commit 302bb4f

Browse files
authored
Fix operator extra links not appearing on failed tasks (#58227)
1 parent b40ecc9 commit 302bb4f

File tree

2 files changed

+100
-4
lines changed

2 files changed

+100
-4
lines changed

task-sdk/src/airflow/sdk/execution_time/task_runner.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1411,9 +1411,17 @@ def finalize(
14111411
task = ti.task
14121412
# Pushing xcom for each operator extra links defined on the operator only.
14131413
for oe in task.operator_extra_links:
1414-
link, xcom_key = oe.get_link(operator=task, ti_key=ti), oe.xcom_key # type: ignore[arg-type]
1415-
log.debug("Setting xcom for operator extra link", link=link, xcom_key=xcom_key)
1416-
_xcom_push_to_db(ti, key=xcom_key, value=link)
1414+
try:
1415+
link, xcom_key = oe.get_link(operator=task, ti_key=ti), oe.xcom_key # type: ignore[arg-type]
1416+
log.debug("Setting xcom for operator extra link", link=link, xcom_key=xcom_key)
1417+
_xcom_push_to_db(ti, key=xcom_key, value=link)
1418+
except Exception:
1419+
log.exception(
1420+
"Failed to push an xcom for task operator extra link",
1421+
link_name=oe.name,
1422+
xcom_key=oe.xcom_key,
1423+
ti=ti,
1424+
)
14171425

14181426
if getattr(ti.task, "overwrite_rtif_after_execution", False):
14191427
log.debug("Overwriting Rendered template fields.")

task-sdk/tests/task_sdk/execution_time/test_task_runner.py

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from pathlib import Path
2929
from typing import TYPE_CHECKING
3030
from unittest import mock
31-
from unittest.mock import patch
31+
from unittest.mock import call, patch
3232

3333
import pandas as pd
3434
import pytest
@@ -50,6 +50,7 @@
5050
from airflow.sdk import (
5151
DAG,
5252
BaseOperator,
53+
BaseOperatorLink,
5354
Connection,
5455
dag as dag_decorator,
5556
get_current_context,
@@ -1816,6 +1817,93 @@ def execute(self, context):
18161817
map_index=runtime_ti.map_index,
18171818
)
18181819

1820+
def test_task_failed_with_operator_extra_links(
1821+
self, create_runtime_ti, mock_supervisor_comms, time_machine
1822+
):
1823+
"""Test that operator extra links are pushed to xcoms even when task fails."""
1824+
instant = timezone.datetime(2024, 12, 3, 10, 0)
1825+
time_machine.move_to(instant, tick=False)
1826+
1827+
class DummyTestOperator(BaseOperator):
1828+
operator_extra_links = (AirflowLink(),)
1829+
1830+
def execute(self, context):
1831+
raise ValueError("Task failed intentionally")
1832+
1833+
task = DummyTestOperator(task_id="task_with_operator_extra_links")
1834+
runtime_ti = create_runtime_ti(task=task)
1835+
context = runtime_ti.get_template_context()
1836+
runtime_ti.start_date = instant
1837+
runtime_ti.end_date = instant
1838+
1839+
state, _, error = run(runtime_ti, context=context, log=mock.MagicMock())
1840+
assert state == TaskInstanceState.FAILED
1841+
assert error is not None
1842+
1843+
with mock.patch.object(XCom, "_set_xcom_in_db") as mock_xcom_set:
1844+
finalize(
1845+
runtime_ti,
1846+
log=mock.MagicMock(),
1847+
state=TaskInstanceState.FAILED,
1848+
context=context,
1849+
error=error,
1850+
)
1851+
assert mock_xcom_set.mock_calls == [
1852+
call(
1853+
key="_link_AirflowLink",
1854+
value="https://airflow.apache.org",
1855+
dag_id=runtime_ti.dag_id,
1856+
task_id=runtime_ti.task_id,
1857+
run_id=runtime_ti.run_id,
1858+
map_index=runtime_ti.map_index,
1859+
)
1860+
]
1861+
1862+
def test_operator_extra_links_exception_handling(
1863+
self, create_runtime_ti, mock_supervisor_comms, time_machine
1864+
):
1865+
"""Test that exceptions in get_link() don't prevent other links from being pushed."""
1866+
instant = timezone.datetime(2024, 12, 3, 10, 0)
1867+
time_machine.move_to(instant, tick=False)
1868+
1869+
class FailingLink(BaseOperatorLink):
1870+
"""A link that raises an exception when get_link is called."""
1871+
1872+
name = "failing_link"
1873+
1874+
def get_link(self, operator, *, ti_key):
1875+
raise ValueError("Link generation failed")
1876+
1877+
class DummyTestOperator(BaseOperator):
1878+
operator_extra_links = (FailingLink(), AirflowLink())
1879+
1880+
def execute(self, context):
1881+
pass
1882+
1883+
task = DummyTestOperator(task_id="task_with_multiple_links")
1884+
runtime_ti = create_runtime_ti(task=task)
1885+
context = runtime_ti.get_template_context()
1886+
runtime_ti.start_date = instant
1887+
runtime_ti.end_date = instant
1888+
1889+
with mock.patch.object(XCom, "_set_xcom_in_db") as mock_xcom_set:
1890+
finalize(
1891+
runtime_ti,
1892+
log=mock.MagicMock(),
1893+
state=TaskInstanceState.SUCCESS,
1894+
context=context,
1895+
)
1896+
assert mock_xcom_set.mock_calls == [
1897+
call(
1898+
key="_link_AirflowLink",
1899+
value="https://airflow.apache.org",
1900+
dag_id=runtime_ti.dag_id,
1901+
task_id=runtime_ti.task_id,
1902+
run_id=runtime_ti.run_id,
1903+
map_index=runtime_ti.map_index,
1904+
)
1905+
]
1906+
18191907
@pytest.mark.parametrize(
18201908
("cmd", "rendered_cmd"),
18211909
[

0 commit comments

Comments
 (0)