Skip to content

Commit 6eac1cb

Browse files
vincbeckGitOps Bot
authored andcommitted
Revert "Update all models to use sqlalchemy 2 annotations (apache#55954)" (apache#56296)
This reverts commit 3150430.
1 parent 0f3b841 commit 6eac1cb

30 files changed

+433
-515
lines changed

airflow-core/src/airflow/models/asset.py

Lines changed: 52 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@
3535
text,
3636
)
3737
from sqlalchemy.ext.associationproxy import association_proxy
38-
from sqlalchemy.orm import Mapped, relationship
38+
from sqlalchemy.orm import relationship
3939

4040
from airflow._shared.timezones import timezone
4141
from airflow.models.base import Base, StringID
4242
from airflow.settings import json
43-
from airflow.utils.sqlalchemy import UtcDateTime, mapped_column
43+
from airflow.utils.sqlalchemy import UtcDateTime
4444

4545
if TYPE_CHECKING:
4646
from collections.abc import Iterable
@@ -140,7 +140,7 @@ def remove_references_to_deleted_dags(session: Session):
140140
class AssetWatcherModel(Base):
141141
"""A table to store asset watchers."""
142142

143-
name: Mapped[str] = mapped_column(
143+
name = Column(
144144
String(length=1500).with_variant(
145145
String(
146146
length=1500,
@@ -152,8 +152,8 @@ class AssetWatcherModel(Base):
152152
),
153153
nullable=False,
154154
)
155-
asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
156-
trigger_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
155+
asset_id = Column(Integer, primary_key=True, nullable=False)
156+
trigger_id = Column(Integer, primary_key=True, nullable=False)
157157

158158
asset = relationship("AssetModel", back_populates="watchers")
159159
trigger = relationship("Trigger", back_populates="asset_watchers")
@@ -187,8 +187,8 @@ class AssetAliasModel(Base):
187187
:param uri: a string that uniquely identifies the asset alias
188188
"""
189189

190-
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
191-
name: Mapped[str] = mapped_column(
190+
id = Column(Integer, primary_key=True, autoincrement=True)
191+
name = Column(
192192
String(length=1500).with_variant(
193193
String(
194194
length=1500,
@@ -200,7 +200,7 @@ class AssetAliasModel(Base):
200200
),
201201
nullable=False,
202202
)
203-
group: Mapped[str] = mapped_column(
203+
group = Column(
204204
String(length=1500).with_variant(
205205
String(
206206
length=1500,
@@ -263,8 +263,8 @@ class AssetModel(Base):
263263
:param extra: JSON field for arbitrary extra info
264264
"""
265265

266-
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
267-
name: Mapped[str] = mapped_column(
266+
id = Column(Integer, primary_key=True, autoincrement=True)
267+
name = Column(
268268
String(length=1500).with_variant(
269269
String(
270270
length=1500,
@@ -276,7 +276,7 @@ class AssetModel(Base):
276276
),
277277
nullable=False,
278278
)
279-
uri: Mapped[str] = mapped_column(
279+
uri = Column(
280280
String(length=1500).with_variant(
281281
String(
282282
length=1500,
@@ -288,7 +288,7 @@ class AssetModel(Base):
288288
),
289289
nullable=False,
290290
)
291-
group: Mapped[str] = mapped_column(
291+
group = Column(
292292
String(length=1500).with_variant(
293293
String(
294294
length=1500,
@@ -301,12 +301,10 @@ class AssetModel(Base):
301301
default=str,
302302
nullable=False,
303303
)
304-
extra: Mapped[dict] = mapped_column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
304+
extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
305305

306-
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
307-
updated_at: Mapped[UtcDateTime] = mapped_column(
308-
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
309-
)
306+
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
307+
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
310308

311309
active = relationship("AssetActive", uselist=False, viewonly=True, back_populates="asset")
312310

@@ -376,7 +374,7 @@ class AssetActive(Base):
376374
*name and URI are each unique* within active assets.
377375
"""
378376

379-
name: Mapped[str] = mapped_column(
377+
name = Column(
380378
String(length=1500).with_variant(
381379
String(
382380
length=1500,
@@ -388,7 +386,7 @@ class AssetActive(Base):
388386
),
389387
nullable=False,
390388
)
391-
uri: Mapped[str] = mapped_column(
389+
uri = Column(
392390
String(length=1500).with_variant(
393391
String(
394392
length=1500,
@@ -424,7 +422,7 @@ def for_asset(cls, asset: AssetModel) -> AssetActive:
424422
class DagScheduleAssetNameReference(Base):
425423
"""Reference from a DAG to an asset name reference of which it is a consumer."""
426424

427-
name: Mapped[str] = mapped_column(
425+
name = Column(
428426
String(length=1500).with_variant(
429427
String(
430428
length=1500,
@@ -437,8 +435,8 @@ class DagScheduleAssetNameReference(Base):
437435
primary_key=True,
438436
nullable=False,
439437
)
440-
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
441-
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
438+
dag_id = Column(StringID(), primary_key=True, nullable=False)
439+
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
442440

443441
dag = relationship("DagModel", back_populates="schedule_asset_name_references")
444442

@@ -470,7 +468,7 @@ def __repr__(self):
470468
class DagScheduleAssetUriReference(Base):
471469
"""Reference from a DAG to an asset URI reference of which it is a consumer."""
472470

473-
uri: Mapped[str] = mapped_column(
471+
uri = Column(
474472
String(length=1500).with_variant(
475473
String(
476474
length=1500,
@@ -483,8 +481,8 @@ class DagScheduleAssetUriReference(Base):
483481
primary_key=True,
484482
nullable=False,
485483
)
486-
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
487-
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
484+
dag_id = Column(StringID(), primary_key=True, nullable=False)
485+
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
488486

489487
dag = relationship("DagModel", back_populates="schedule_asset_uri_references")
490488

@@ -516,12 +514,10 @@ def __repr__(self):
516514
class DagScheduleAssetAliasReference(Base):
517515
"""References from a DAG to an asset alias of which it is a consumer."""
518516

519-
alias_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
520-
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
521-
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
522-
updated_at: Mapped[UtcDateTime] = mapped_column(
523-
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
524-
)
517+
alias_id = Column(Integer, primary_key=True, nullable=False)
518+
dag_id = Column(StringID(), primary_key=True, nullable=False)
519+
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
520+
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
525521

526522
asset_alias = relationship("AssetAliasModel", back_populates="scheduled_dags")
527523
dag = relationship("DagModel", back_populates="schedule_asset_alias_references")
@@ -560,12 +556,10 @@ def __repr__(self):
560556
class DagScheduleAssetReference(Base):
561557
"""References from a DAG to an asset of which it is a consumer."""
562558

563-
asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
564-
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
565-
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
566-
updated_at: Mapped[UtcDateTime] = mapped_column(
567-
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
568-
)
559+
asset_id = Column(Integer, primary_key=True, nullable=False)
560+
dag_id = Column(StringID(), primary_key=True, nullable=False)
561+
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
562+
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
569563

570564
asset = relationship("AssetModel", back_populates="scheduled_dags")
571565
dag = relationship("DagModel", back_populates="schedule_asset_references")
@@ -613,13 +607,11 @@ def __repr__(self):
613607
class TaskOutletAssetReference(Base):
614608
"""References from a task to an asset that it updates / produces."""
615609

616-
asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
617-
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
618-
task_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
619-
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
620-
updated_at: Mapped[UtcDateTime] = mapped_column(
621-
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
622-
)
610+
asset_id = Column(Integer, primary_key=True, nullable=False)
611+
dag_id = Column(StringID(), primary_key=True, nullable=False)
612+
task_id = Column(StringID(), primary_key=True, nullable=False)
613+
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
614+
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
623615

624616
asset = relationship("AssetModel", back_populates="producing_tasks")
625617

@@ -664,13 +656,11 @@ def __repr__(self):
664656
class TaskInletAssetReference(Base):
665657
"""References from a task to an asset that it references as an inlet."""
666658

667-
asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
668-
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
669-
task_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
670-
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
671-
updated_at: Mapped[UtcDateTime] = mapped_column(
672-
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
673-
)
659+
asset_id = Column(Integer, primary_key=True, nullable=False)
660+
dag_id = Column(StringID(), primary_key=True, nullable=False)
661+
task_id = Column(StringID(), primary_key=True, nullable=False)
662+
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
663+
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
674664

675665
asset = relationship("AssetModel", back_populates="consuming_tasks")
676666

@@ -710,9 +700,9 @@ def __repr__(self):
710700
class AssetDagRunQueue(Base):
711701
"""Model for storing asset events that need processing."""
712702

713-
asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
714-
target_dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
715-
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
703+
asset_id = Column(Integer, primary_key=True, nullable=False)
704+
target_dag_id = Column(StringID(), primary_key=True, nullable=False)
705+
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
716706
asset = relationship("AssetModel", viewonly=True)
717707
dag_model = relationship("DagModel", viewonly=True)
718708

@@ -775,14 +765,14 @@ class AssetEvent(Base):
775765
if the foreign key object is.
776766
"""
777767

778-
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
779-
asset_id: Mapped[int] = mapped_column(Integer, nullable=False)
780-
extra: Mapped[dict] = mapped_column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
781-
source_task_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
782-
source_dag_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
783-
source_run_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
784-
source_map_index: Mapped[int | None] = mapped_column(Integer, nullable=True, server_default=text("-1"))
785-
timestamp: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
768+
id = Column(Integer, primary_key=True, autoincrement=True)
769+
asset_id = Column(Integer, nullable=False)
770+
extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
771+
source_task_id = Column(StringID(), nullable=True)
772+
source_dag_id = Column(StringID(), nullable=True)
773+
source_run_id = Column(StringID(), nullable=True)
774+
source_map_index = Column(Integer, nullable=True, server_default=text("-1"))
775+
timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
786776

787777
__tablename__ = "asset_event"
788778
__table_args__ = (

airflow-core/src/airflow/models/backfill.py

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
from sqlalchemy import (
3232
Boolean,
33+
Column,
3334
ForeignKeyConstraint,
3435
Integer,
3536
String,
@@ -39,15 +40,15 @@
3940
select,
4041
)
4142
from sqlalchemy.exc import IntegrityError
42-
from sqlalchemy.orm import Mapped, relationship, validates
43+
from sqlalchemy.orm import relationship, validates
4344
from sqlalchemy_jsonfield import JSONField
4445

4546
from airflow._shared.timezones import timezone
4647
from airflow.exceptions import AirflowException, DagNotFound
4748
from airflow.models.base import Base, StringID
4849
from airflow.settings import json
4950
from airflow.utils.session import create_session
50-
from airflow.utils.sqlalchemy import UtcDateTime, mapped_column, nulls_first, with_row_locks
51+
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks
5152
from airflow.utils.state import DagRunState
5253
from airflow.utils.types import DagRunTriggeredByType, DagRunType
5354

@@ -115,27 +116,23 @@ class Backfill(Base):
115116

116117
__tablename__ = "backfill"
117118

118-
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
119-
dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
120-
from_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, nullable=False)
121-
to_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, nullable=False)
122-
dag_run_conf: Mapped[JSONField] = mapped_column(JSONField(json=json), nullable=False, default={})
123-
is_paused: Mapped[bool] = mapped_column(Boolean, default=False)
119+
id = Column(Integer, primary_key=True, autoincrement=True)
120+
dag_id = Column(StringID(), nullable=False)
121+
from_date = Column(UtcDateTime, nullable=False)
122+
to_date = Column(UtcDateTime, nullable=False)
123+
dag_run_conf = Column(JSONField(json=json), nullable=False, default={})
124+
is_paused = Column(Boolean, default=False)
124125
"""
125126
Controls whether new dag runs will be created for this backfill.
126127
127128
Does not pause existing dag runs.
128129
"""
129-
reprocess_behavior: Mapped[str] = mapped_column(
130-
StringID(), nullable=False, default=ReprocessBehavior.NONE
131-
)
132-
max_active_runs: Mapped[int] = mapped_column(Integer, default=10, nullable=False)
133-
created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
134-
completed_at: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)
135-
updated_at: Mapped[UtcDateTime] = mapped_column(
136-
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
137-
)
138-
triggering_user_name: Mapped[str | None] = mapped_column(
130+
reprocess_behavior = Column(StringID(), nullable=False, default=ReprocessBehavior.NONE)
131+
max_active_runs = Column(Integer, default=10, nullable=False)
132+
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
133+
completed_at = Column(UtcDateTime, nullable=True)
134+
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
135+
triggering_user_name = Column(
139136
String(512),
140137
nullable=True,
141138
) # The user that triggered the Backfill, if applicable
@@ -169,12 +166,12 @@ class BackfillDagRun(Base):
169166
"""Mapping table between backfill run and dag run."""
170167

171168
__tablename__ = "backfill_dag_run"
172-
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
173-
backfill_id: Mapped[int] = mapped_column(Integer, nullable=False)
174-
dag_run_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
175-
exception_reason: Mapped[str] = mapped_column(StringID())
176-
logical_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, nullable=False)
177-
sort_ordinal: Mapped[int] = mapped_column(Integer, nullable=False)
169+
id = Column(Integer, primary_key=True, autoincrement=True)
170+
backfill_id = Column(Integer, nullable=False)
171+
dag_run_id = Column(Integer, nullable=True)
172+
exception_reason = Column(StringID())
173+
logical_date = Column(UtcDateTime, nullable=False)
174+
sort_ordinal = Column(Integer, nullable=False)
178175

179176
backfill = relationship("Backfill", back_populates="backfill_dag_run_associations")
180177
dag_run = relationship("DagRun")

airflow-core/src/airflow/models/base.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
from typing import TYPE_CHECKING, Any
2121

22-
from sqlalchemy import Integer, MetaData, String, text
23-
from sqlalchemy.orm import Mapped, registry
22+
from sqlalchemy import Column, Integer, MetaData, String, text
23+
from sqlalchemy.orm import registry
2424

2525
from airflow.configuration import conf
26-
from airflow.utils.sqlalchemy import is_sqlalchemy_v1, mapped_column
26+
from airflow.utils.sqlalchemy import is_sqlalchemy_v1
2727

2828
SQL_ALCHEMY_SCHEMA = conf.get("database", "SQL_ALCHEMY_SCHEMA")
2929

@@ -94,7 +94,7 @@ class TaskInstanceDependencies(Base):
9494

9595
__abstract__ = True
9696

97-
task_id: Mapped[str] = mapped_column(StringID(), nullable=False)
98-
dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
99-
run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
100-
map_index: Mapped[int] = mapped_column(Integer, nullable=False, server_default=text("-1"))
97+
task_id = Column(StringID(), nullable=False)
98+
dag_id = Column(StringID(), nullable=False)
99+
run_id = Column(StringID(), nullable=False)
100+
map_index = Column(Integer, nullable=False, server_default=text("-1"))

0 commit comments

Comments
 (0)