Skip to content

Commit f040c35

Browse files
fix: use the cloud catalog when doing save_as_table in a cloud session
1 parent fdea6af commit f040c35

File tree

7 files changed

+89
-28
lines changed

7 files changed

+89
-28
lines changed

src/fenic/_backends/cloud/catalog.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from dataclasses import dataclass
55
from datetime import datetime
66
from typing import Any, Coroutine, Dict, List, Optional
7+
from urllib.parse import urlparse
78
from uuid import UUID
89

910
import polars as pl
@@ -56,12 +57,12 @@
5657

5758
logger = logging.getLogger(__name__)
5859

59-
6060
@dataclass(frozen=True)
6161
class CatalogKey:
6262
catalog_name: str
6363
catalog_id: UUID
6464

65+
CLOUD_SUPPORTED_SCHEMES = ["s3"]
6566

6667
class CloudCatalog(BaseCatalog):
6768
"""A catalog for cloud execution mode. Implements the BaseCatalog -
@@ -112,11 +113,18 @@ def list_catalogs(self) -> List[str]:
112113
remote_catalogs.append(DEFAULT_CATALOG_NAME)
113114
return remote_catalogs
114115

115-
def create_catalog(self, catalog_name: str, ignore_if_exists: bool = True) -> bool:
116+
def create_catalog(
117+
self,
118+
catalog_name: str,
119+
location: str,
120+
ignore_if_exists: bool = True) -> bool:
116121
"""Create a new catalog."""
117122
if compare_object_names(catalog_name, DEFAULT_CATALOG_NAME):
118123
raise CatalogError("Cannot create a catalog with the default name")
119124

125+
if urlparse(location).scheme not in CLOUD_SUPPORTED_SCHEMES:
126+
raise CatalogError(f"Unsupported scheme: {urlparse(location).scheme}")
127+
120128
with self.lock:
121129
if self._does_catalog_exist(catalog_name):
122130
if ignore_if_exists:
@@ -131,7 +139,7 @@ def create_catalog(self, catalog_name: str, ignore_if_exists: bool = True) -> bo
131139
created_by_user_id=UUID(self.user_id),
132140
parent_organization_id=UUID(self.organization_id),
133141
catalog_type=TypedefCatalogTypeReferenceEnum.INTERNAL_TYPEDEF,
134-
catalog_warehouse="",
142+
catalog_warehouse=location,
135143
)
136144
)
137145
return True
@@ -267,14 +275,12 @@ def create_table(
267275
self,
268276
table_name: str,
269277
schema: Schema,
270-
location: str,
271278
ignore_if_exists: bool = True,
272-
file_format: Optional[str] = None,
273279
) -> bool:
274280
"""Create a new table in the current database."""
275281
with self.lock:
276282
return self._create_table(
277-
table_name, schema, location, ignore_if_exists, file_format
283+
table_name, schema, ignore_if_exists
278284
)
279285

280286

@@ -526,9 +532,7 @@ def _create_table(
526532
self,
527533
table_name: str,
528534
schema: Schema,
529-
location: str,
530535
ignore_if_exists: bool = True,
531-
file_format: Optional[str] = None,
532536
) -> bool:
533537
table_identifier = TableIdentifier.from_string(table_name).enrich(
534538
self.current_catalog_name, self.current_database_name
@@ -557,11 +561,6 @@ def _create_table(
557561
raise TableAlreadyExistsError(table_identifier.table, table_identifier.db)
558562

559563
catalog_id = self._get_catalog_id(table_identifier.catalog)
560-
fixed_file_format = (
561-
FileFormat.PARQUET
562-
if file_format is None
563-
else FileFormat(file_format.upper())
564-
)
565564
self._execute_catalog_command(
566565
self.user_client.sc_create_table(
567566
dispatch=self._get_catalog_dispatch_input(catalog_id),
@@ -571,8 +570,8 @@ def _create_table(
571570
canonical_name=table_identifier.table.casefold(),
572571
description=None,
573572
external=False,
574-
location=location,
575-
file_format=fixed_file_format,
573+
location=self._get_table_location_from_table_identifier(table_identifier),
574+
file_format=FileFormat.PARQUET,
576575
partition_field_names=[],
577576
schema_=self._get_schema_input_from_schema(schema),
578577
),
@@ -661,3 +660,8 @@ def _get_schema_type_to_pyarrow(schema_type: str):
661660
return pa.float64()
662661
else:
663662
return schema_type
663+
664+
@staticmethod
665+
def _get_table_location_from_table_identifier(table_identifier: TableIdentifier) -> str:
666+
"""Gets the key in the s3 bucket for the table based on its database and name."""
667+
return f"{table_identifier.db}/{table_identifier.table}"

src/fenic/_backends/cloud/execution.py

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,23 @@
2828
SaveToFileExecutionRequest,
2929
ShowExecutionRequest,
3030
StartExecutionRequest,
31-
TableIdentifier,
31+
)
32+
from fenic_cloud.protos.engine.v1.engine_pb2 import (
33+
TableIdentifier as TableIdentifierProto,
3234
)
3335
from fenic_cloud.protos.engine.v1.engine_pb2_grpc import EngineServiceStub
3436

3537
from fenic._backends.cloud.metrics import get_query_execution_metrics
3638
from fenic._backends.schema_serde import deserialize_schema, serialize_schema
39+
from fenic._backends.utils.catalog_utils import TableIdentifier
3740
from fenic.core._interfaces import BaseExecution
3841
from fenic.core._logical_plan.serde import LogicalPlanSerde
3942
from fenic.core.error import (
4043
CloudExecutionError,
4144
CloudSessionError,
4245
ExecutionError,
4346
InternalError,
47+
PlanError,
4448
ValidationError,
4549
)
4650
from fenic.core.metrics import LMMetrics, PhysicalPlanRepr, QueryMetrics, RMMetrics
@@ -164,16 +168,57 @@ def save_as_table(
164168
) -> QueryMetrics:
165169
"""Execute the logical plan and save the result as a table."""
166170
logger.debug(f"Saving plan {logical_plan} as table: {table_name}")
171+
172+
table_identifier = TableIdentifier.from_string(table_name).enrich(
173+
self.session_state.catalog.get_current_catalog(),
174+
self.session_state.catalog.get_current_database(),
175+
)
176+
177+
# If the table doesn't exist, create it, this has to be done in the user's context.
178+
table_identifier_str = str(table_identifier)
179+
table_exists = self.session_state.catalog.does_table_exist(table_identifier_str)
180+
if table_exists:
181+
if mode == "error":
182+
raise PlanError(
183+
f"Cannot save to table '{table_name}' - it already exists and mode is 'error'. "
184+
f"Choose a different approach: "
185+
f"1) Use mode='overwrite' to replace the existing table, "
186+
f"2) Use mode='append' to add data to the existing table, "
187+
f"3) Use mode='ignore' to skip saving if table exists, "
188+
f"4) Use a different table name.")
189+
if mode == "ignore":
190+
logger.warning(f"Table {table_name} already exists, ignoring write.")
191+
return QueryMetrics()
192+
if mode == "append":
193+
saved_schema = self.session_state.catalog.describe_table(table_identifier_str)
194+
plan_schema = logical_plan.schema()
195+
if saved_schema != plan_schema:
196+
raise PlanError(
197+
f"Cannot append to table '{table_name}' - schema mismatch detected. "
198+
f"The existing table has a different schema than your DataFrame. "
199+
f"Existing schema: {saved_schema} "
200+
f"Your DataFrame schema: {plan_schema} "
201+
f"To fix this: "
202+
f"1) Use mode='overwrite' to replace the table with your DataFrame's schema, "
203+
f"2) Modify your DataFrame to match the existing table's schema, "
204+
f"3) Use a different table name.")
205+
else:
206+
raise CloudExecutionError(
207+
f"Cannot save to table '{table_name}' - it does not exist. "
208+
f"Choose a different approach: "
209+
f"1) Create the table in question "
210+
f"2) Use a different table name.")
211+
167212
# TODO (DY): check that current catalog and schema (if specified in table_name) match session state
168-
table_identifier = TableIdentifier(
169-
catalog=self.session_state.catalog,
170-
schema=self.session_state.schema,
213+
table_identifier_proto = TableIdentifierProto(
214+
catalog=table_identifier.catalog,
215+
schema=table_identifier.db,
171216
table=table_name,
172217
)
173218
request = StartExecutionRequest(
174219
save_as_table=SaveAsTableExecutionRequest(
175220
serialized_plan=LogicalPlanSerde.serialize(logical_plan),
176-
table_identifier=table_identifier,
221+
table_identifier=table_identifier_proto,
177222
mode=mode,
178223
)
179224
)

src/fenic/_backends/local/physical_plan/sink.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
import logging
4-
from typing import TYPE_CHECKING, List, Literal, Tuple
4+
from typing import TYPE_CHECKING, List, Literal, Optional, Tuple
55

66
if TYPE_CHECKING:
77
from fenic._backends.local.session_state import LocalSessionState

src/fenic/_backends/utils/catalog_utils.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,13 @@ def enrich(self, catalog_name: str, db_name: str) -> "TableIdentifier":
103103
table=self.table,
104104
)
105105

106+
def __str__(self) -> str:
107+
str_identifier = self.table
108+
if self.db:
109+
str_identifier = f"{self.db}.{str_identifier}"
110+
if self.catalog:
111+
str_identifier = f"{self.catalog}.{str_identifier}"
112+
return str_identifier
106113

107114
@dataclass(frozen=True)
108115
class DBIdentifier(BaseIdentifier):
@@ -132,6 +139,12 @@ def enrich(self, catalog_name: str) -> "DBIdentifier":
132139
return self
133140
return DBIdentifier(catalog=catalog_name, db=self.db)
134141

142+
def __str__(self) -> str:
143+
str_identifier = self.db
144+
if self.catalog:
145+
str_identifier = f"{self.catalog}.{str_identifier}"
146+
return str_identifier
147+
135148

136149
def compare_object_names(object_name_1: str, object_name_2: str) -> bool:
137150
"""Compare two object names, ignoring case."""

src/fenic/api/io/writer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import logging
66
from pathlib import Path
7-
from typing import TYPE_CHECKING, Literal, Union
7+
from typing import TYPE_CHECKING, Literal, Optional, Union
88

99
if TYPE_CHECKING:
1010
from fenic.api.dataframe import DataFrame

src/fenic/core/_logical_plan/plans/sink.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def __init__(
9191
- append: Appends data to table if it exists
9292
- overwrite: Overwrites existing table
9393
- ignore: Silently ignores operation if table exists
94+
location: location where the table will be saved (only available for cloud execution)
9495
"""
9596
self.child = child
9697
self.table_name = table_name

tests/_backends/cloud/catalog/test_cloud_catalog.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -531,14 +531,12 @@ def test_create_table(cloud_catalog, schema): # noqa: D103
531531
cloud_catalog.create_table(
532532
TEST_TABLE_NAME_1,
533533
schema=schema,
534-
location=TEST_SAMPLE_LOCATION,
535534
ignore_if_exists=False,
536535
)
537536
with pytest.raises(CatalogError):
538537
cloud_catalog.create_table(
539538
"some_catalog.some_database.some_table",
540539
schema=schema,
541-
location=TEST_SAMPLE_LOCATION,
542540
)
543541

544542

@@ -566,16 +564,16 @@ def test_drop_table(cloud_catalog): # noqa: D103
566564

567565
def test_create_catalog(cloud_catalog): # noqa: D103
568566
with pytest.raises(CatalogError):
569-
cloud_catalog.create_catalog(DEFAULT_CATALOG_NAME)
567+
cloud_catalog.create_catalog(DEFAULT_CATALOG_NAME, TEST_SAMPLE_LOCATION)
570568

571-
assert cloud_catalog.create_catalog(TEST_NEW_CATALOG_NAME)
569+
assert cloud_catalog.create_catalog(TEST_NEW_CATALOG_NAME, TEST_SAMPLE_LOCATION)
572570

573571
# The catalog already exists, so we should return False (default for ignore_if_exists is True)
574-
assert not cloud_catalog.create_catalog(TEST_CATALOG_NAME)
572+
assert not cloud_catalog.create_catalog(TEST_CATALOG_NAME, TEST_SAMPLE_LOCATION)
575573

576574
# The catalog already exists, so we should raise an error if ignore_if_exists is False
577575
with pytest.raises(CatalogAlreadyExistsError):
578-
cloud_catalog.create_catalog(TEST_CATALOG_NAME, ignore_if_exists=False)
576+
cloud_catalog.create_catalog(TEST_CATALOG_NAME, TEST_SAMPLE_LOCATION, ignore_if_exists=False)
579577

580578

581579
def test_drop_catalog(cloud_catalog): # noqa: D103

0 commit comments

Comments
 (0)