Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 52 additions & 62 deletions airflow-core/src/airflow/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
text,
)
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import Mapped, relationship
from sqlalchemy.orm import relationship

from airflow._shared.timezones import timezone
from airflow.models.base import Base, StringID
from airflow.settings import json
from airflow.utils.sqlalchemy import UtcDateTime, mapped_column
from airflow.utils.sqlalchemy import UtcDateTime

if TYPE_CHECKING:
from collections.abc import Iterable
Expand Down Expand Up @@ -140,7 +140,7 @@ def remove_references_to_deleted_dags(session: Session):
class AssetWatcherModel(Base):
"""A table to store asset watchers."""

name: Mapped[str] = mapped_column(
name = Column(
String(length=1500).with_variant(
String(
length=1500,
Expand All @@ -152,8 +152,8 @@ class AssetWatcherModel(Base):
),
nullable=False,
)
asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
trigger_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
asset_id = Column(Integer, primary_key=True, nullable=False)
trigger_id = Column(Integer, primary_key=True, nullable=False)

asset = relationship("AssetModel", back_populates="watchers")
trigger = relationship("Trigger", back_populates="asset_watchers")
Expand Down Expand Up @@ -187,8 +187,8 @@ class AssetAliasModel(Base):
:param uri: a string that uniquely identifies the asset alias
"""

id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(
String(length=1500).with_variant(
String(
length=1500,
Expand All @@ -200,7 +200,7 @@ class AssetAliasModel(Base):
),
nullable=False,
)
group: Mapped[str] = mapped_column(
group = Column(
String(length=1500).with_variant(
String(
length=1500,
Expand Down Expand Up @@ -263,8 +263,8 @@ class AssetModel(Base):
:param extra: JSON field for arbitrary extra info
"""

id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(
String(length=1500).with_variant(
String(
length=1500,
Expand All @@ -276,7 +276,7 @@ class AssetModel(Base):
),
nullable=False,
)
uri: Mapped[str] = mapped_column(
uri = Column(
String(length=1500).with_variant(
String(
length=1500,
Expand All @@ -288,7 +288,7 @@ class AssetModel(Base):
),
nullable=False,
)
group: Mapped[str] = mapped_column(
group = Column(
String(length=1500).with_variant(
String(
length=1500,
Expand All @@ -301,12 +301,10 @@ class AssetModel(Base):
default=str,
nullable=False,
)
extra: Mapped[dict] = mapped_column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})

created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at: Mapped[UtcDateTime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
)
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)

active = relationship("AssetActive", uselist=False, viewonly=True, back_populates="asset")

Expand Down Expand Up @@ -376,7 +374,7 @@ class AssetActive(Base):
*name and URI are each unique* within active assets.
"""

name: Mapped[str] = mapped_column(
name = Column(
String(length=1500).with_variant(
String(
length=1500,
Expand All @@ -388,7 +386,7 @@ class AssetActive(Base):
),
nullable=False,
)
uri: Mapped[str] = mapped_column(
uri = Column(
String(length=1500).with_variant(
String(
length=1500,
Expand Down Expand Up @@ -424,7 +422,7 @@ def for_asset(cls, asset: AssetModel) -> AssetActive:
class DagScheduleAssetNameReference(Base):
"""Reference from a DAG to an asset name reference of which it is a consumer."""

name: Mapped[str] = mapped_column(
name = Column(
String(length=1500).with_variant(
String(
length=1500,
Expand All @@ -437,8 +435,8 @@ class DagScheduleAssetNameReference(Base):
primary_key=True,
nullable=False,
)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
dag_id = Column(StringID(), primary_key=True, nullable=False)
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)

dag = relationship("DagModel", back_populates="schedule_asset_name_references")

Expand Down Expand Up @@ -470,7 +468,7 @@ def __repr__(self):
class DagScheduleAssetUriReference(Base):
"""Reference from a DAG to an asset URI reference of which it is a consumer."""

uri: Mapped[str] = mapped_column(
uri = Column(
String(length=1500).with_variant(
String(
length=1500,
Expand All @@ -483,8 +481,8 @@ class DagScheduleAssetUriReference(Base):
primary_key=True,
nullable=False,
)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
dag_id = Column(StringID(), primary_key=True, nullable=False)
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)

dag = relationship("DagModel", back_populates="schedule_asset_uri_references")

Expand Down Expand Up @@ -516,12 +514,10 @@ def __repr__(self):
class DagScheduleAssetAliasReference(Base):
"""References from a DAG to an asset alias of which it is a consumer."""

alias_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at: Mapped[UtcDateTime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
)
alias_id = Column(Integer, primary_key=True, nullable=False)
dag_id = Column(StringID(), primary_key=True, nullable=False)
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)

asset_alias = relationship("AssetAliasModel", back_populates="scheduled_dags")
dag = relationship("DagModel", back_populates="schedule_asset_alias_references")
Expand Down Expand Up @@ -560,12 +556,10 @@ def __repr__(self):
class DagScheduleAssetReference(Base):
"""References from a DAG to an asset of which it is a consumer."""

asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at: Mapped[UtcDateTime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
)
asset_id = Column(Integer, primary_key=True, nullable=False)
dag_id = Column(StringID(), primary_key=True, nullable=False)
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)

asset = relationship("AssetModel", back_populates="scheduled_dags")
dag = relationship("DagModel", back_populates="schedule_asset_references")
Expand Down Expand Up @@ -613,13 +607,11 @@ def __repr__(self):
class TaskOutletAssetReference(Base):
"""References from a task to an asset that it updates / produces."""

asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
task_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at: Mapped[UtcDateTime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
)
asset_id = Column(Integer, primary_key=True, nullable=False)
dag_id = Column(StringID(), primary_key=True, nullable=False)
task_id = Column(StringID(), primary_key=True, nullable=False)
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)

asset = relationship("AssetModel", back_populates="producing_tasks")

Expand Down Expand Up @@ -664,13 +656,11 @@ def __repr__(self):
class TaskInletAssetReference(Base):
"""References from a task to an asset that it references as an inlet."""

asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
task_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at: Mapped[UtcDateTime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
)
asset_id = Column(Integer, primary_key=True, nullable=False)
dag_id = Column(StringID(), primary_key=True, nullable=False)
task_id = Column(StringID(), primary_key=True, nullable=False)
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)

asset = relationship("AssetModel", back_populates="consuming_tasks")

Expand Down Expand Up @@ -710,9 +700,9 @@ def __repr__(self):
class AssetDagRunQueue(Base):
"""Model for storing asset events that need processing."""

asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
target_dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
asset_id = Column(Integer, primary_key=True, nullable=False)
target_dag_id = Column(StringID(), primary_key=True, nullable=False)
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
asset = relationship("AssetModel", viewonly=True)
dag_model = relationship("DagModel", viewonly=True)

Expand Down Expand Up @@ -775,14 +765,14 @@ class AssetEvent(Base):
if the foreign key object is.
"""

id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
asset_id: Mapped[int] = mapped_column(Integer, nullable=False)
extra: Mapped[dict] = mapped_column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
source_task_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
source_dag_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
source_run_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
source_map_index: Mapped[int | None] = mapped_column(Integer, nullable=True, server_default=text("-1"))
timestamp: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
id = Column(Integer, primary_key=True, autoincrement=True)
asset_id = Column(Integer, nullable=False)
extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
source_task_id = Column(StringID(), nullable=True)
source_dag_id = Column(StringID(), nullable=True)
source_run_id = Column(StringID(), nullable=True)
source_map_index = Column(Integer, nullable=True, server_default=text("-1"))
timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False)

__tablename__ = "asset_event"
__table_args__ = (
Expand Down
45 changes: 21 additions & 24 deletions airflow-core/src/airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from sqlalchemy import (
Boolean,
Column,
ForeignKeyConstraint,
Integer,
String,
Expand All @@ -39,15 +40,15 @@
select,
)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Mapped, relationship, validates
from sqlalchemy.orm import relationship, validates
from sqlalchemy_jsonfield import JSONField

from airflow._shared.timezones import timezone
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models.base import Base, StringID
from airflow.settings import json
from airflow.utils.session import create_session
from airflow.utils.sqlalchemy import UtcDateTime, mapped_column, nulls_first, with_row_locks
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down Expand Up @@ -115,27 +116,23 @@ class Backfill(Base):

__tablename__ = "backfill"

id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
from_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, nullable=False)
to_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, nullable=False)
dag_run_conf: Mapped[JSONField] = mapped_column(JSONField(json=json), nullable=False, default={})
is_paused: Mapped[bool] = mapped_column(Boolean, default=False)
id = Column(Integer, primary_key=True, autoincrement=True)
dag_id = Column(StringID(), nullable=False)
from_date = Column(UtcDateTime, nullable=False)
to_date = Column(UtcDateTime, nullable=False)
dag_run_conf = Column(JSONField(json=json), nullable=False, default={})
is_paused = Column(Boolean, default=False)
"""
Controls whether new dag runs will be created for this backfill.

Does not pause existing dag runs.
"""
reprocess_behavior: Mapped[str] = mapped_column(
StringID(), nullable=False, default=ReprocessBehavior.NONE
)
max_active_runs: Mapped[int] = mapped_column(Integer, default=10, nullable=False)
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
completed_at: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)
updated_at: Mapped[UtcDateTime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
)
triggering_user_name: Mapped[str | None] = mapped_column(
reprocess_behavior = Column(StringID(), nullable=False, default=ReprocessBehavior.NONE)
max_active_runs = Column(Integer, default=10, nullable=False)
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
completed_at = Column(UtcDateTime, nullable=True)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
triggering_user_name = Column(
String(512),
nullable=True,
) # The user that triggered the Backfill, if applicable
Expand Down Expand Up @@ -169,12 +166,12 @@ class BackfillDagRun(Base):
"""Mapping table between backfill run and dag run."""

__tablename__ = "backfill_dag_run"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
backfill_id: Mapped[int] = mapped_column(Integer, nullable=False)
dag_run_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
exception_reason: Mapped[str] = mapped_column(StringID())
logical_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, nullable=False)
sort_ordinal: Mapped[int] = mapped_column(Integer, nullable=False)
id = Column(Integer, primary_key=True, autoincrement=True)
backfill_id = Column(Integer, nullable=False)
dag_run_id = Column(Integer, nullable=True)
exception_reason = Column(StringID())
logical_date = Column(UtcDateTime, nullable=False)
sort_ordinal = Column(Integer, nullable=False)

backfill = relationship("Backfill", back_populates="backfill_dag_run_associations")
dag_run = relationship("DagRun")
Expand Down
14 changes: 7 additions & 7 deletions airflow-core/src/airflow/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

from typing import TYPE_CHECKING, Any

from sqlalchemy import Integer, MetaData, String, text
from sqlalchemy.orm import Mapped, registry
from sqlalchemy import Column, Integer, MetaData, String, text
from sqlalchemy.orm import registry

from airflow.configuration import conf
from airflow.utils.sqlalchemy import is_sqlalchemy_v1, mapped_column
from airflow.utils.sqlalchemy import is_sqlalchemy_v1

SQL_ALCHEMY_SCHEMA = conf.get("database", "SQL_ALCHEMY_SCHEMA")

Expand Down Expand Up @@ -94,7 +94,7 @@ class TaskInstanceDependencies(Base):

__abstract__ = True

task_id: Mapped[str] = mapped_column(StringID(), nullable=False)
dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
map_index: Mapped[int] = mapped_column(Integer, nullable=False, server_default=text("-1"))
task_id = Column(StringID(), nullable=False)
dag_id = Column(StringID(), nullable=False)
run_id = Column(StringID(), nullable=False)
map_index = Column(Integer, nullable=False, server_default=text("-1"))
Loading
Loading