diff --git a/dagster_sqlmesh/controller/dagster.py b/dagster_sqlmesh/controller/dagster.py index c0e0343..9495e1f 100644 --- a/dagster_sqlmesh/controller/dagster.py +++ b/dagster_sqlmesh/controller/dagster.py @@ -7,7 +7,7 @@ ) from dagster_sqlmesh.translator import SQLMeshDagsterTranslator from dagster_sqlmesh.types import ( - ConvertibleToAssetDep, + CoercibleToAssetDep, ConvertibleToAssetOut, SQLMeshModelDep, SQLMeshMultiAssetOptions, @@ -28,7 +28,7 @@ def to_asset_outs( cache is provided, it will be tried first to load the asset outs.""" internal_asset_deps_map: dict[str, set[str]] = {} - deps_map: dict[str, ConvertibleToAssetDep] = {} + deps_map: dict[str, CoercibleToAssetDep] = {} asset_outs: dict[str, ConvertibleToAssetOut] = {} with self.instance(environment, "to_asset_outs") as instance: @@ -59,7 +59,7 @@ def to_asset_outs( internal_asset_deps.add(key) # create an external dep - deps_map[table] = translator.create_asset_dep(key=key) + deps_map[table] = key model_key = translator.get_asset_key_str(model.fqn) asset_outs[model_key] = translator.create_asset_out( diff --git a/dagster_sqlmesh/translator.py b/dagster_sqlmesh/translator.py index 7b3d879..f97fa0a 100644 --- a/dagster_sqlmesh/translator.py +++ b/dagster_sqlmesh/translator.py @@ -2,13 +2,13 @@ from collections.abc import Sequence from inspect import signature -from dagster import AssetDep, AssetKey, AssetOut +from dagster import AssetKey, AssetOut from pydantic import BaseModel, Field from sqlglot import exp from sqlmesh.core.context import Context from sqlmesh.core.model import Model -from .types import ConvertibleToAssetDep, ConvertibleToAssetOut +from .types import ConvertibleToAssetOut class IntermediateAssetOut(BaseModel): @@ -36,14 +36,6 @@ def to_asset_out(self) -> AssetOut: ) -class IntermediateAssetDep(BaseModel): - key: str - kwargs: dict[str, t.Any] = Field(default_factory=dict) - - def to_asset_dep(self) -> AssetDep: - return AssetDep(AssetKey.from_user_string(self.key)) - - class SQLMeshDagsterTranslator: """Translates SQLMesh objects for Dagster. @@ -106,21 +98,6 @@ def get_context_dialect(self, context: Context) -> str: """ return context.engine_adapter.dialect - def create_asset_dep(self, *, key: str, **kwargs: t.Any) -> ConvertibleToAssetDep: - """Create an object that resolves to an AssetDep. - - This creates an intermediate representation that can be converted to a - Dagster AssetDep. Most users will not need to use this method directly. - - Args: - key: The asset key string for the dependency - **kwargs: Additional arguments to pass to the AssetDep - - Returns: - ConvertibleToAssetDep: An object that can be converted to an AssetDep - """ - return IntermediateAssetDep(key=key, kwargs=kwargs) - def create_asset_out( self, *, model_key: str, asset_key: str, **kwargs: t.Any ) -> ConvertibleToAssetOut: diff --git a/dagster_sqlmesh/types.py b/dagster_sqlmesh/types.py index 50187b5..575e927 100644 --- a/dagster_sqlmesh/types.py +++ b/dagster_sqlmesh/types.py @@ -1,9 +1,28 @@ import typing as t +from collections.abc import Sequence from dataclasses import dataclass, field -from dagster import AssetCheckResult, AssetDep, AssetKey, AssetMaterialization, AssetOut +from dagster import ( + AssetCheckResult, + AssetDep, + AssetKey, + AssetMaterialization, + AssetOut, + AssetsDefinition, + AssetSpec, + SourceAsset, +) from sqlmesh.core.model import Model +# Type alias that mirrors Dagster's internal CoercibleToAssetDep +# This uses only public API types and will remain stable across Dagster versions +# Equivalent to: +# CoercibleToAssetDep = Union[CoercibleToAssetKey, "AssetSpec", "AssetsDefinition", "SourceAsset", "AssetDep"] +# CoercibleToAssetKey = Union[AssetKey, str, Sequence[str]] + +CoercibleToAssetDep = AssetKey | str | Sequence[str] | AssetSpec | AssetsDefinition | SourceAsset | AssetDep + + MultiAssetResponse = t.Iterable[AssetCheckResult | AssetMaterialization] @@ -29,20 +48,16 @@ class SQLMeshModelDep: def parse_fqn(self) -> SQLMeshParsedFQN: return SQLMeshParsedFQN.parse(self.fqn) - + + class ConvertibleToAssetOut(t.Protocol): def to_asset_out(self) -> AssetOut: """Convert to an AssetOut object.""" ... -class ConvertibleToAssetDep(t.Protocol): - def to_asset_dep(self) -> AssetDep: - """Convert to an AssetDep object.""" - ... class ConvertibleToAssetKey(t.Protocol): - def to_asset_key(self) -> AssetKey: - ... + def to_asset_key(self) -> AssetKey: ... @dataclass(kw_only=True) class SQLMeshMultiAssetOptions: @@ -53,7 +68,7 @@ class SQLMeshMultiAssetOptions: manipulate the dagster asset creation process as they see fit.""" outs: t.Mapping[str, ConvertibleToAssetOut] = field(default_factory=lambda: {}) - deps: t.Iterable[ConvertibleToAssetDep] = field(default_factory=lambda: []) + deps: t.Iterable[CoercibleToAssetDep] = field(default_factory=lambda: []) internal_asset_deps: t.Mapping[str, set[str]] = field(default_factory=lambda: {}) def to_asset_outs(self) -> t.Mapping[str, AssetOut]: @@ -62,11 +77,17 @@ def to_asset_outs(self) -> t.Mapping[str, AssetOut]: def to_asset_deps(self) -> t.Iterable[AssetDep]: """Convert to an iterable of AssetDep objects.""" - return [dep.to_asset_dep() for dep in self.deps] + all_deps: list[AssetDep] = [] + for dep in self.deps: + if isinstance(dep, AssetsDefinition) and len(dep.keys) > 1: + all_deps.extend([AssetDep.from_coercible(key) for key in dep.keys]) + else: + all_deps.append(AssetDep.from_coercible(dep)) + return all_deps def to_internal_asset_deps(self) -> dict[str, set[AssetKey]]: """Convert to a dictionary of internal asset dependencies.""" return { key: {AssetKey.from_user_string(dep) for dep in deps} for key, deps in self.internal_asset_deps.items() - } \ No newline at end of file + }