Skip to content

Commit 55ca5b2

Browse files
authored
fix the db not exist bug (#131)
1 parent c694d31 commit 55ca5b2

File tree

2 files changed

+27
-6
lines changed

2 files changed

+27
-6
lines changed

dataflow/operators/filter/Text2SQL/sql_execution_filter.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,18 +62,34 @@ def run(self, storage: DataFlowStorage,
6262
self.check_column(dataframe)
6363
results = []
6464
self.logger.info(f"Start to filter {len(dataframe)} SQLs")
65+
db_id_need_to_check = dataframe[input_db_id_key].unique()
66+
for db_id in db_id_need_to_check:
67+
if not self.database_manager.registry.database_exists(db_id):
68+
self.logger.warning(f"Database {db_id} not found in registry, please check the database folder")
69+
continue
70+
6571
for _, row in tqdm(dataframe.iterrows(), total=len(dataframe), desc="Processing SQLs"):
6672
db_id = row[input_db_id_key]
6773
sql = row[input_sql_key]
6874

6975
if not self.filter_select_sql(sql):
7076
continue
71-
72-
ans = self.database_manager.analyze_sql_execution_plan(db_id, sql, 5)
73-
if not ans['success']:
77+
78+
try:
79+
ans = self.database_manager.analyze_sql_execution_plan(db_id, sql, 5)
80+
except Exception as e:
81+
self.logger.error(f"Error analyzing SQL execution plan: {e}")
7482
continue
7583

76-
ans = self.database_manager.execute_query(db_id, sql, 10)
84+
if not ans['success']:
85+
continue
86+
87+
try:
88+
ans = self.database_manager.execute_query(db_id, sql, 10)
89+
except Exception as e:
90+
self.logger.error(f"Error executing SQL query: {e}")
91+
continue
92+
7793
if not ans['success']:
7894
continue
7995

dataflow/utils/text2sql/database_manager.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os
77
import glob
88
from dataclasses import dataclass
9+
from dataflow import get_logger
910

1011

1112
@dataclass
@@ -267,7 +268,7 @@ class DatabaseManager:
267268
def __init__(self, db_type: str = "sqlite", config: Optional[Dict] = None, logger=None):
268269
self.db_type = db_type.lower()
269270
self.config = config or {}
270-
self.logger = logger
271+
self.logger = get_logger()
271272

272273
self.registry = DatabaseRegistry(logger)
273274
self.cache = SchemaCache()
@@ -427,7 +428,11 @@ def execute_query(self, db_id: str, sql: str, timeout: float = 5.0) -> Dict[str,
427428
def analyze_sql_execution_plan(self, db_id: str, sql: str, timeout: float = 5.0) -> Dict[str, Any]:
428429
db_info = self.registry.get_database(db_id)
429430
if not db_info:
430-
raise ValueError(f"Database {db_id} not found")
431+
self.logger.error(f"Database {db_id} not found")
432+
return {
433+
'success': False,
434+
'error': f'Database {db_id} not found'
435+
}
431436

432437
connector = self.connectors[db_info.db_type]
433438
result = None

0 commit comments

Comments
 (0)