Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@

from opentelemetry.sdk.trace import Span
from pydantic import NonNegativeInt
from sqlalchemy.orm import Query, Session
from sqlalchemy.engine import ScalarResult
from sqlalchemy.orm import Session
from sqlalchemy.sql.elements import Case, ColumnElement

from airflow.models.dag_version import DagVersion
Expand Down Expand Up @@ -572,11 +573,11 @@ def active_runs_of_dags(
)
if exclude_backfill:
query = query.where(cls.run_type != DagRunType.BACKFILL_JOB)
return dict(session.execute(query).all())
return {dag_id: count for dag_id, count in session.execute(query)}

@classmethod
@retry_db_transaction
def get_running_dag_runs_to_examine(cls, session: Session) -> Query:
def get_running_dag_runs_to_examine(cls, session: Session) -> ScalarResult[DagRun]:
"""
Return the next DagRuns that the scheduler should attempt to schedule.

Expand Down Expand Up @@ -615,7 +616,7 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> Query:

@classmethod
@retry_db_transaction
def get_queued_dag_runs_to_set_running(cls, session: Session) -> Query:
def get_queued_dag_runs_to_set_running(cls, session: Session) -> ScalarResult[DagRun]:
"""
Return the next queued DagRuns that the scheduler should attempt to schedule.

Expand Down