Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dagster_sqlmesh/controller/dagster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
)
from dagster_sqlmesh.translator import SQLMeshDagsterTranslator
from dagster_sqlmesh.types import (
ConvertibleToAssetDep,
CoercibleToAssetDep,
ConvertibleToAssetOut,
SQLMeshModelDep,
SQLMeshMultiAssetOptions,
Expand All @@ -28,7 +28,7 @@ def to_asset_outs(
cache is provided, it will be tried first to load the asset outs."""

internal_asset_deps_map: dict[str, set[str]] = {}
deps_map: dict[str, ConvertibleToAssetDep] = {}
deps_map: dict[str, CoercibleToAssetDep] = {}
asset_outs: dict[str, ConvertibleToAssetOut] = {}

with self.instance(environment, "to_asset_outs") as instance:
Expand Down Expand Up @@ -59,7 +59,7 @@ def to_asset_outs(
internal_asset_deps.add(key)

# create an external dep
deps_map[table] = translator.create_asset_dep(key=key)
deps_map[table] = key

model_key = translator.get_asset_key_str(model.fqn)
asset_outs[model_key] = translator.create_asset_out(
Expand Down
27 changes: 2 additions & 25 deletions dagster_sqlmesh/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from collections.abc import Sequence
from inspect import signature

from dagster import AssetDep, AssetKey, AssetOut
from dagster import AssetKey, AssetOut
from pydantic import BaseModel, Field
from sqlglot import exp
from sqlmesh.core.context import Context
from sqlmesh.core.model import Model

from .types import ConvertibleToAssetDep, ConvertibleToAssetOut
from .types import ConvertibleToAssetOut


class IntermediateAssetOut(BaseModel):
Expand Down Expand Up @@ -36,14 +36,6 @@ def to_asset_out(self) -> AssetOut:
)


class IntermediateAssetDep(BaseModel):
key: str
kwargs: dict[str, t.Any] = Field(default_factory=dict)

def to_asset_dep(self) -> AssetDep:
return AssetDep(AssetKey.from_user_string(self.key))


class SQLMeshDagsterTranslator:
"""Translates SQLMesh objects for Dagster.

Expand Down Expand Up @@ -106,21 +98,6 @@ def get_context_dialect(self, context: Context) -> str:
"""
return context.engine_adapter.dialect

def create_asset_dep(self, *, key: str, **kwargs: t.Any) -> ConvertibleToAssetDep:
"""Create an object that resolves to an AssetDep.

This creates an intermediate representation that can be converted to a
Dagster AssetDep. Most users will not need to use this method directly.

Args:
key: The asset key string for the dependency
**kwargs: Additional arguments to pass to the AssetDep

Returns:
ConvertibleToAssetDep: An object that can be converted to an AssetDep
"""
return IntermediateAssetDep(key=key, kwargs=kwargs)

def create_asset_out(
self, *, model_key: str, asset_key: str, **kwargs: t.Any
) -> ConvertibleToAssetOut:
Expand Down
43 changes: 32 additions & 11 deletions dagster_sqlmesh/types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
import typing as t
from collections.abc import Sequence
from dataclasses import dataclass, field

from dagster import AssetCheckResult, AssetDep, AssetKey, AssetMaterialization, AssetOut
from dagster import (
AssetCheckResult,
AssetDep,
AssetKey,
AssetMaterialization,
AssetOut,
AssetsDefinition,
AssetSpec,
SourceAsset,
)
from sqlmesh.core.model import Model

# Type alias that mirrors Dagster's internal CoercibleToAssetDep
# This uses only public API types and will remain stable across Dagster versions
# Equivalent to:
# CoercibleToAssetDep = Union[CoercibleToAssetKey, "AssetSpec", "AssetsDefinition", "SourceAsset", "AssetDep"]
# CoercibleToAssetKey = Union[AssetKey, str, Sequence[str]]

CoercibleToAssetDep = AssetKey | str | Sequence[str] | AssetSpec | AssetsDefinition | SourceAsset | AssetDep


MultiAssetResponse = t.Iterable[AssetCheckResult | AssetMaterialization]


Expand All @@ -29,20 +48,16 @@ class SQLMeshModelDep:

def parse_fqn(self) -> SQLMeshParsedFQN:
return SQLMeshParsedFQN.parse(self.fqn)



class ConvertibleToAssetOut(t.Protocol):
def to_asset_out(self) -> AssetOut:
"""Convert to an AssetOut object."""
...

class ConvertibleToAssetDep(t.Protocol):
def to_asset_dep(self) -> AssetDep:
"""Convert to an AssetDep object."""
...

class ConvertibleToAssetKey(t.Protocol):
def to_asset_key(self) -> AssetKey:
...
def to_asset_key(self) -> AssetKey: ...

@dataclass(kw_only=True)
class SQLMeshMultiAssetOptions:
Expand All @@ -53,7 +68,7 @@ class SQLMeshMultiAssetOptions:
manipulate the dagster asset creation process as they see fit."""

outs: t.Mapping[str, ConvertibleToAssetOut] = field(default_factory=lambda: {})
deps: t.Iterable[ConvertibleToAssetDep] = field(default_factory=lambda: [])
deps: t.Iterable[CoercibleToAssetDep] = field(default_factory=lambda: [])
internal_asset_deps: t.Mapping[str, set[str]] = field(default_factory=lambda: {})

def to_asset_outs(self) -> t.Mapping[str, AssetOut]:
Expand All @@ -62,11 +77,17 @@ def to_asset_outs(self) -> t.Mapping[str, AssetOut]:

def to_asset_deps(self) -> t.Iterable[AssetDep]:
"""Convert to an iterable of AssetDep objects."""
return [dep.to_asset_dep() for dep in self.deps]
all_deps: list[AssetDep] = []
for dep in self.deps:
if isinstance(dep, AssetsDefinition) and len(dep.keys) > 1:
all_deps.extend([AssetDep.from_coercible(key) for key in dep.keys])
else:
all_deps.append(AssetDep.from_coercible(dep))
return all_deps

def to_internal_asset_deps(self) -> dict[str, set[AssetKey]]:
"""Convert to a dictionary of internal asset dependencies."""
return {
key: {AssetKey.from_user_string(dep) for dep in deps}
for key, deps in self.internal_asset_deps.items()
}
}