Skip to content

Commit 0f3b841

Browse files
ephraimbuddyGitOps Bot
authored andcommitted
UI: Fix Grid for cleared runs when tasks were removed (apache#56085)
Ensure removed/historical tasks from selected runs are visible in Grid even if they no longer exist in the current DAG version. We now: - Include synthetic leaf nodes for task_ids present in TIs but missing from the serialized DAG in both grid/structure and grid/ti_summaries. - Aggregate TI states for these synthetic nodes Add tests covering structure and TI summaries for removed tasks.
1 parent acc6a94 commit 0f3b841

File tree

3 files changed

+119
-11
lines changed
  • airflow-core

3 files changed

+119
-11
lines changed

airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from __future__ import annotations
1919

2020
import collections
21-
from typing import TYPE_CHECKING, Annotated
21+
from typing import TYPE_CHECKING, Annotated, Any
2222

2323
import structlog
2424
from fastapi import Depends, HTTPException, status
@@ -49,6 +49,7 @@
4949
from airflow.api_fastapi.core_api.security import requires_access_dag
5050
from airflow.api_fastapi.core_api.services.ui.grid import (
5151
_find_aggregates,
52+
_get_aggs_for_node,
5253
_merge_node_dicts,
5354
)
5455
from airflow.api_fastapi.core_api.services.ui.task_group import (
@@ -158,7 +159,7 @@ def get_dag_structure(
158159
task_group_sort = get_task_group_children_getter()
159160
if not run_ids:
160161
nodes = [task_group_to_dict_grid(x) for x in task_group_sort(latest_dag.task_group)]
161-
return nodes
162+
return [GridNodeResponse(**n) for n in nodes]
162163

163164
serdags = session.scalars(
164165
select(SerializedDagModel).where(
@@ -173,7 +174,7 @@ def get_dag_structure(
173174
),
174175
)
175176
)
176-
merged_nodes: list[GridNodeResponse] = []
177+
merged_nodes: list[dict[str, Any]] = []
177178
dags = [latest_dag]
178179
for serdag in serdags:
179180
if serdag:
@@ -182,7 +183,37 @@ def get_dag_structure(
182183
nodes = [task_group_to_dict_grid(x) for x in task_group_sort(dag.task_group)]
183184
_merge_node_dicts(merged_nodes, nodes)
184185

185-
return merged_nodes
186+
# Ensure historical tasks (e.g. removed) that exist in TIs for the selected runs are represented
187+
def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
188+
ids: set[str] = set()
189+
for n in nodes:
190+
nid = n.get("id")
191+
if nid:
192+
ids.add(nid)
193+
children = n.get("children")
194+
if children:
195+
ids |= _collect_ids(children) # recurse
196+
return ids
197+
198+
existing_ids = _collect_ids(merged_nodes)
199+
historical_task_ids = session.scalars(
200+
select(TaskInstance.task_id)
201+
.join(TaskInstance.dag_run)
202+
.where(TaskInstance.dag_id == dag_id, DagRun.id.in_(run_ids))
203+
.distinct()
204+
)
205+
for task_id in historical_task_ids:
206+
if task_id not in existing_ids:
207+
merged_nodes.append(
208+
{
209+
"id": task_id,
210+
"label": task_id,
211+
"is_mapped": None,
212+
"children": None,
213+
}
214+
)
215+
216+
return [GridNodeResponse(**n) for n in merged_nodes]
186217

187218

188219
@grid_router.get(
@@ -349,19 +380,47 @@ def get_grid_ti_summaries(
349380
assert serdag
350381

351382
def get_node_sumaries():
383+
yielded_task_ids: set[str] = set()
384+
385+
# Yield all nodes discoverable from the serialized DAG structure
352386
for node in _find_aggregates(
353387
node=serdag.dag.task_group,
354388
parent_node=None,
355389
ti_details=ti_details,
356390
):
357-
if node["type"] == "task":
358-
node["child_states"] = None
359-
node["min_start_date"] = None
360-
node["max_end_date"] = None
391+
if node["type"] in {"task", "mapped_task"}:
392+
yielded_task_ids.add(node["task_id"])
393+
if node["type"] == "task":
394+
node["child_states"] = None
395+
node["min_start_date"] = None
396+
node["max_end_date"] = None
361397
yield node
362398

399+
# For good history: add synthetic leaf nodes for task_ids that have TIs in this run
400+
# but are not present in the current DAG structure (e.g. removed tasks)
401+
missing_task_ids = set(ti_details.keys()) - yielded_task_ids
402+
for task_id in sorted(missing_task_ids):
403+
detail = ti_details[task_id]
404+
# Create a leaf task node with aggregated state from its TIs
405+
agg = _get_aggs_for_node(detail)
406+
yield {
407+
"task_id": task_id,
408+
"type": "task",
409+
"parent_id": None,
410+
**agg,
411+
# Align with leaf behavior
412+
"child_states": None,
413+
"min_start_date": None,
414+
"max_end_date": None,
415+
}
416+
417+
task_instances = list(get_node_sumaries())
418+
# If a group id and a task id collide, prefer the group record
419+
group_ids = {n.get("task_id") for n in task_instances if n.get("type") == "group"}
420+
filtered = [n for n in task_instances if not (n.get("type") == "task" and n.get("task_id") in group_ids)]
421+
363422
return { # type: ignore[return-value]
364423
"run_id": run_id,
365424
"dag_id": dag_id,
366-
"task_instances": list(get_node_sumaries()),
425+
"task_instances": filtered,
367426
}

airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,19 @@ def _find_aggregates(
8585
"""Recursively fill the Task Group Map."""
8686
node_id = node.node_id
8787
parent_id = parent_node.node_id if parent_node else None
88-
details = ti_details[node_id]
88+
# Do not mutate ti_details by accidental key creation
89+
details = ti_details.get(node_id, [])
8990

9091
if node is None:
9192
return
9293
if isinstance(node, MappedOperator):
94+
# For unmapped tasks, reflect a single None state so UI shows one square
95+
mapped_details = details or [{"state": None, "start_date": None, "end_date": None}]
9396
yield {
9497
"task_id": node_id,
9598
"type": "mapped_task",
9699
"parent_id": parent_id,
97-
**_get_aggs_for_node(details),
100+
**_get_aggs_for_node(mapped_details),
98101
}
99102

100103
return

airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,39 @@ def test_should_response_200_with_deleted_task_and_taskgroup(self, session, test
436436
},
437437
]
438438

439+
# Also verify that TI summaries include a leaf entry for the removed task
440+
ti_resp = test_client.get(f"/grid/ti_summaries/{DAG_ID_3}/run_3")
441+
assert ti_resp.status_code == 200
442+
ti_payload = ti_resp.json()
443+
assert ti_payload["dag_id"] == DAG_ID_3
444+
assert ti_payload["run_id"] == "run_3"
445+
# Find the removed task summary; it should exist even if not in current serialized DAG structure
446+
removed_ti = next(
447+
(
448+
n
449+
for n in ti_payload["task_instances"]
450+
if n["task_id"] == TASK_ID_4 and n["child_states"] is None
451+
),
452+
None,
453+
)
454+
assert removed_ti is not None
455+
# Its state should be the aggregated state of its TIs, which includes 'removed'
456+
assert removed_ti["state"] in (
457+
"removed",
458+
None,
459+
"skipped",
460+
"success",
461+
"failed",
462+
"running",
463+
"queued",
464+
"scheduled",
465+
"deferred",
466+
"restarting",
467+
"up_for_retry",
468+
"up_for_reschedule",
469+
"upstream_failed",
470+
)
471+
439472
def test_get_dag_structure(self, session, test_client):
440473
session.commit()
441474
response = test_client.get(f"/grid/structure/{DAG_ID}?limit=5")
@@ -704,3 +737,16 @@ def sort_dict(in_dict):
704737
expected = sort_dict(expected)
705738
actual = sort_dict(actual)
706739
assert actual == expected
740+
741+
def test_structure_includes_historical_removed_task_with_proper_shape(self, session, test_client):
742+
# Ensure the structure endpoint returns synthetic node for historical/removed task
743+
response = test_client.get(f"/grid/structure/{DAG_ID_3}")
744+
assert response.status_code == 200
745+
nodes = response.json()
746+
# Find the historical removed task id
747+
t4 = next((n for n in nodes if n["id"] == TASK_ID_4), None)
748+
assert t4 is not None
749+
assert t4["label"] == TASK_ID_4
750+
# Optional None fields are excluded from response due to response_model_exclude_none=True
751+
assert "is_mapped" not in t4
752+
assert "children" not in t4

0 commit comments

Comments
 (0)