Skip to content

Commit 796a233

Browse files
fix: use the cloud catalog when doing save_as_table in a cloud session
1 parent 513da9a commit 796a233

File tree

6 files changed

+88
-10
lines changed

6 files changed

+88
-10
lines changed

src/fenic/_backends/cloud/execution.py

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,24 @@
2828
SaveToFileExecutionRequest,
2929
ShowExecutionRequest,
3030
StartExecutionRequest,
31-
TableIdentifier,
31+
)
32+
from fenic_cloud.protos.engine.v1.engine_pb2 import (
33+
TableIdentifier as TableIdentifierProto,
3234
)
3335
from fenic_cloud.protos.engine.v1.engine_pb2_grpc import EngineServiceStub
3436

3537
from fenic._backends.cloud.metrics import get_query_execution_metrics
3638
from fenic._backends.schema_serde import deserialize_schema, serialize_schema
39+
from fenic._backends.utils.catalog_utils import TableIdentifier
3740
from fenic.core._interfaces import BaseExecution
41+
from fenic.core._logical_plan.plans.sink import TableSink
3842
from fenic.core._logical_plan.serde import LogicalPlanSerde
3943
from fenic.core.error import (
4044
CloudExecutionError,
4145
CloudSessionError,
4246
ExecutionError,
4347
InternalError,
48+
PlanError,
4449
ValidationError,
4550
)
4651
from fenic.core.metrics import LMMetrics, PhysicalPlanRepr, QueryMetrics, RMMetrics
@@ -164,16 +169,67 @@ def save_as_table(
164169
) -> QueryMetrics:
165170
"""Execute the logical plan and save the result as a table."""
166171
logger.debug(f"Saving plan {logical_plan} as table: {table_name}")
172+
173+
if isinstance(logical_plan, TableSink):
174+
if not logical_plan.location:
175+
raise ValidationError(
176+
f"Cannot save to table '{table_name}' - location is required. "
177+
f"Provide a location.")
178+
179+
table_identifier = TableIdentifier.from_string(table_name).enrich(
180+
self.session_state.catalog.get_current_catalog(),
181+
self.session_state.catalog.get_current_database(),
182+
)
183+
184+
# If the table doesn't exist, create it, this has to be done in the user's context.
185+
table_identifier_str = str(table_identifier)
186+
table_exists = self.session_state.catalog.does_table_exist(table_identifier_str)
187+
if table_exists:
188+
if mode == "error":
189+
raise PlanError(
190+
f"Cannot save to table '{table_name}' - it already exists and mode is 'error'. "
191+
f"Choose a different approach: "
192+
f"1) Use mode='overwrite' to replace the existing table, "
193+
f"2) Use mode='append' to add data to the existing table, "
194+
f"3) Use mode='ignore' to skip saving if table exists, "
195+
f"4) Use a different table name.")
196+
if mode == "ignore":
197+
logger.warning(f"Table {table_name} already exists, ignoring write.")
198+
return QueryMetrics()
199+
if mode == "append":
200+
saved_schema = self.session_state.catalog.describe_table(table_identifier_str)
201+
plan_schema = logical_plan.schema()
202+
if saved_schema != plan_schema:
203+
raise PlanError(
204+
f"Cannot append to table '{table_name}' - schema mismatch detected. "
205+
f"The existing table has a different schema than your DataFrame. "
206+
f"Existing schema: {saved_schema} "
207+
f"Your DataFrame schema: {plan_schema} "
208+
f"To fix this: "
209+
f"1) Use mode='overwrite' to replace the table with your DataFrame's schema, "
210+
f"2) Modify your DataFrame to match the existing table's schema, "
211+
f"3) Use a different table name.")
212+
else:
213+
logger.debug(f"Creating table {table_identifier_str} with location: {logical_plan.location}")
214+
# Create the table in the catalog.
215+
result =self.session_state.catalog.create_table(
216+
table_identifier_str,
217+
logical_plan.schema(),
218+
location=logical_plan.location,
219+
ignore_if_exists=mode == "ignore",
220+
file_format="PARQUET")
221+
logger.debug(f"Table {table_identifier_str} created with result: {result}")
222+
167223
# TODO (DY): check that current catalog and schema (if specified in table_name) match session state
168-
table_identifier = TableIdentifier(
169-
catalog=self.session_state.catalog,
170-
schema=self.session_state.schema,
224+
table_identifier_proto = TableIdentifierProto(
225+
catalog=table_identifier.catalog,
226+
schema=table_identifier.db,
171227
table=table_name,
172228
)
173229
request = StartExecutionRequest(
174230
save_as_table=SaveAsTableExecutionRequest(
175231
serialized_plan=LogicalPlanSerde.serialize(logical_plan),
176-
table_identifier=table_identifier,
232+
table_identifier=table_identifier_proto,
177233
mode=mode,
178234
)
179235
)

src/fenic/_backends/local/physical_plan/sink.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
import logging
4-
from typing import TYPE_CHECKING, List, Literal, Tuple
4+
from typing import TYPE_CHECKING, List, Literal, Optional, Tuple
55

66
if TYPE_CHECKING:
77
from fenic._backends.local.session_state import LocalSessionState
@@ -80,13 +80,15 @@ def __init__(
8080
cache_info: CacheInfo,
8181
session_state: LocalSessionState,
8282
schema: Schema,
83+
location: Optional[str] = None,
8384
):
8485
super().__init__(
8586
children=[child], cache_info=cache_info, session_state=session_state
8687
)
8788
self.table_name = table_name
8889
self.mode = mode
8990
self.schema = schema
91+
self.location = location
9092

9193
def _execute(self, child_dfs: List[pl.DataFrame]) -> pl.DataFrame:
9294
if len(child_dfs) != 1:

src/fenic/_backends/local/transpiler/plan_converter.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ def convert(
368368
cache_info=logical.cache_info,
369369
session_state=self.session_state,
370370
schema=logical.schema(),
371+
location=logical.location,
371372
)
372373

373374
elif isinstance(logical, SQL):

src/fenic/_backends/utils/catalog_utils.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,13 @@ def enrich(self, catalog_name: str, db_name: str) -> "TableIdentifier":
103103
table=self.table,
104104
)
105105

106+
def __str__(self) -> str:
107+
str_identifier = self.table
108+
if self.db:
109+
str_identifier = f"{self.db}.{str_identifier}"
110+
if self.catalog:
111+
str_identifier = f"{self.catalog}.{str_identifier}"
112+
return str_identifier
106113

107114
@dataclass(frozen=True)
108115
class DBIdentifier(BaseIdentifier):
@@ -132,6 +139,12 @@ def enrich(self, catalog_name: str) -> "DBIdentifier":
132139
return self
133140
return DBIdentifier(catalog=catalog_name, db=self.db)
134141

142+
def __str__(self) -> str:
143+
str_identifier = self.db
144+
if self.catalog:
145+
str_identifier = f"{self.catalog}.{str_identifier}"
146+
return str_identifier
147+
135148

136149
def compare_object_names(object_name_1: str, object_name_2: str) -> bool:
137150
"""Compare two object names, ignoring case."""

src/fenic/api/io/writer.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import logging
66
from pathlib import Path
7-
from typing import TYPE_CHECKING, Literal, Union
7+
from typing import TYPE_CHECKING, Literal, Optional, Union
88

99
if TYPE_CHECKING:
1010
from fenic.api.dataframe import DataFrame
@@ -36,6 +36,7 @@ def save_as_table(
3636
self,
3737
table_name: str,
3838
mode: Literal["error", "append", "overwrite", "ignore"] = "error",
39+
location: Optional[str] = None,
3940
) -> QueryMetrics:
4041
"""Saves the content of the DataFrame as the specified table.
4142
@@ -46,6 +47,7 @@ def save_as_table(
4647
- append: Appends data to table if it exists
4748
- overwrite: Overwrites existing table
4849
- ignore: Silently ignores operation if table exists
50+
location: location where the table will be saved (only available for cloud execution)
4951
5052
Returns:
5153
QueryMetrics: The query metrics
@@ -66,7 +68,7 @@ def save_as_table(
6668
```
6769
"""
6870
sink_plan = TableSink(
69-
child=self._dataframe._logical_plan, table_name=table_name, mode=mode
71+
child=self._dataframe._logical_plan, table_name=table_name, mode=mode, location=location
7072
)
7173

7274
metrics = self._dataframe._logical_plan.session_state.execution.save_as_table(

src/fenic/core/_logical_plan/plans/sink.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List, Literal
1+
from typing import List, Literal, Optional
22

33
from fenic.core._logical_plan.plans.base import LogicalPlan
44
from fenic.core.error import InternalError
@@ -80,6 +80,7 @@ def __init__(
8080
child: LogicalPlan,
8181
table_name: str,
8282
mode: Literal["error", "append", "overwrite", "ignore"] = "error",
83+
location: Optional[str] = None,
8384
):
8485
"""Initialize a table sink node.
8586
@@ -91,10 +92,12 @@ def __init__(
9192
- append: Appends data to table if it exists
9293
- overwrite: Overwrites existing table
9394
- ignore: Silently ignores operation if table exists
95+
location: location where the table will be saved (only available for cloud execution)
9496
"""
9597
self.child = child
9698
self.table_name = table_name
9799
self.mode = mode
100+
self.location = location
98101
super().__init__(self.child.session_state)
99102

100103
def children(self) -> List[LogicalPlan]:
@@ -107,7 +110,7 @@ def _build_schema(self) -> Schema:
107110

108111
def _repr(self) -> str:
109112
"""Return the string representation for this table sink plan."""
110-
return f"TableSink(table_name='{self.table_name}', mode='{self.mode}')"
113+
return f"TableSink(table_name='{self.table_name}', mode='{self.mode}', location='{self.location}')"
111114

112115
def with_children(self, children: List[LogicalPlan]) -> LogicalPlan:
113116
"""Create a new table sink with the same properties but different children.
@@ -129,4 +132,5 @@ def with_children(self, children: List[LogicalPlan]) -> LogicalPlan:
129132
child=children[0],
130133
table_name=self.table_name,
131134
mode=self.mode,
135+
location=self.location,
132136
)

0 commit comments

Comments
 (0)