Skip to content
This repository was archived by the owner on Sep 2, 2025. It is now read-only.

Commit 4b5b8d9

Browse files
various improvement (#493) (#494)
Co-authored-by: Jeremy Cohen <[email protected]> (cherry picked from commit 0cb9582) Co-authored-by: Chenyu Li <[email protected]>
1 parent 0f3227f commit 4b5b8d9

File tree

3 files changed

+35
-7
lines changed

3 files changed

+35
-7
lines changed

dbt/adapters/spark/python_submissions.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def __init__(self, parsed_model: Dict, credentials: SparkCredentials) -> None:
3131

3232
@property
3333
def cluster_id(self) -> str:
34-
return self.parsed_model.get("cluster_id", self.credentials.cluster_id)
34+
return self.parsed_model["config"].get("cluster_id", self.credentials.cluster_id)
3535

3636
def get_timeout(self) -> int:
3737
timeout = self.parsed_model["config"].get("timeout", DEFAULT_TIMEOUT)
@@ -82,7 +82,17 @@ def _submit_job(self, path: str, cluster_spec: dict) -> str:
8282
"notebook_path": path,
8383
},
8484
}
85-
job_spec.update(cluster_spec)
85+
job_spec.update(cluster_spec) # updates 'new_cluster' config
86+
# PYPI packages
87+
packages = self.parsed_model["config"].get("packages", [])
88+
# additional format of packages
89+
additional_libs = self.parsed_model["config"].get("additional_libs", [])
90+
libraries = []
91+
for package in packages:
92+
libraries.append({"pypi": {"package": package}})
93+
for lib in additional_libs:
94+
libraries.append(lib)
95+
job_spec.update({"libraries": libraries}) # type: ignore
8696
submit_response = requests.post(
8797
f"https://{self.credentials.host}/api/2.1/jobs/runs/submit",
8898
headers=self.auth_header,
@@ -96,7 +106,7 @@ def _submit_job(self, path: str, cluster_spec: dict) -> str:
96106

97107
def _submit_through_notebook(self, compiled_code: str, cluster_spec: dict) -> None:
98108
# it is safe to call mkdirs even if dir already exists and have content inside
99-
work_dir = f"/dbt_python_model/{self.schema}/"
109+
work_dir = f"/Shared/dbt_python_model/{self.schema}/"
100110
self._create_work_dir(work_dir)
101111
# add notebook
102112
whole_file_path = f"{work_dir}{self.identifier}"

dbt/include/spark/macros/materializations/table.sql

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
dbt = dbtObj(spark.table)
4343
df = model(dbt, spark)
4444

45+
# make sure pyspark exists in the namepace, for 7.3.x-scala2.12 it does not exist
46+
import pyspark
4547
# make sure pandas exists before using it
4648
try:
4749
import pandas
@@ -52,9 +54,9 @@ except ImportError:
5254
# make sure pyspark.pandas exists before using it
5355
try:
5456
import pyspark.pandas
55-
pyspark_available = True
57+
pyspark_pandas_api_available = True
5658
except ImportError:
57-
pyspark_available = False
59+
pyspark_pandas_api_available = False
5860

5961
# make sure databricks.koalas exists before using it
6062
try:
@@ -66,15 +68,15 @@ except ImportError:
6668
# preferentially convert pandas DataFrames to pandas-on-Spark or Koalas DataFrames first
6769
# since they know how to convert pandas DataFrames better than `spark.createDataFrame(df)`
6870
# and converting from pandas-on-Spark to Spark DataFrame has no overhead
69-
if pyspark_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
71+
if pyspark_pandas_api_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
7072
df = pyspark.pandas.frame.DataFrame(df)
7173
elif koalas_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
7274
df = databricks.koalas.frame.DataFrame(df)
7375

7476
# convert to pyspark.sql.dataframe.DataFrame
7577
if isinstance(df, pyspark.sql.dataframe.DataFrame):
7678
pass # since it is already a Spark DataFrame
77-
elif pyspark_available and isinstance(df, pyspark.pandas.frame.DataFrame):
79+
elif pyspark_pandas_api_available and isinstance(df, pyspark.pandas.frame.DataFrame):
7880
df = df.to_spark()
7981
elif koalas_available and isinstance(df, databricks.koalas.frame.DataFrame):
8082
df = df.to_spark()

tests/functional/adapter/test_python_model.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,26 @@ def project_config_update(self):
2020

2121
models__simple_python_model = """
2222
import pandas
23+
import torch
24+
import spacy
2325
2426
def model(dbt, spark):
2527
dbt.config(
2628
materialized='table',
29+
submission_method='job_cluster',
30+
job_cluster_config={
31+
"spark_version": "7.3.x-scala2.12",
32+
"node_type_id": "i3.xlarge",
33+
"num_workers": 0,
34+
"spark_conf": {
35+
"spark.databricks.cluster.profile": "singleNode",
36+
"spark.master": "local[*, 4]"
37+
},
38+
"custom_tags": {
39+
"ResourceClass": "SingleNode"
40+
}
41+
},
42+
packages=['spacy', 'torch']
2743
)
2844
data = [[1,2]] * 10
2945
return spark.createDataFrame(data, schema=['test', 'test2'])

0 commit comments

Comments
 (0)