Skip to content

Commit eeaddf9

Browse files
author
Tobias Kopp
committed
[Benchmark] Implement timeout for postgres connector
- Fix: don't close connection too early
1 parent c7779e9 commit eeaddf9

File tree

1 file changed

+82
-17
lines changed

1 file changed

+82
-17
lines changed

benchmark/database_connectors/postgresql.py

Lines changed: 82 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,15 @@ def execute(self, n_runs, params: dict):
5555
cursor = connection.cursor()
5656
cursor.execute("set jit=off;")
5757
self.create_tables(cursor, params['data'], with_scale_factors)
58+
connection.close()
59+
5860

5961
# If tables contain scale factors, they have to be loaded separately for every case
6062
if (with_scale_factors or not bool(params.get('readonly'))):
6163
for case, query_stmt in params['cases'].items():
64+
connection = psycopg2.connect(**db_options)
65+
connection.autocommit = True
66+
cursor = connection.cursor()
6267
# Create tables from tmp tables with scale factor
6368
for table_name, table in params['data'].items():
6469
if table.get('scale_factors'):
@@ -69,6 +74,7 @@ def execute(self, n_runs, params: dict):
6974
num_rows = round((table['lines_in_file'] - header) * sf)
7075
cursor.execute(f"DELETE FROM {table_name};") # empty existing table
7176
cursor.execute(f"INSERT INTO {table_name} SELECT * FROM {table_name}_tmp LIMIT {num_rows};") # copy data with scale factor
77+
connection.close()
7278

7379
# Write case/query to a file that will be passed to the command to execute
7480
with open(TMP_SQL_FILE, "w") as tmp:
@@ -84,20 +90,25 @@ def execute(self, n_runs, params: dict):
8490
verbose_printed = True
8591
with open(TMP_SQL_FILE) as tmp:
8692
tqdm.write(" " + " ".join(tmp.readlines()))
87-
stream = os.popen(f'{command}')
88-
for idx, line in enumerate(stream):
89-
time = float(line.replace("\n", "").replace(",", ".")) # in milliseconds
93+
94+
timeout = DEFAULT_TIMEOUT + TIMEOUT_PER_CASE
95+
benchmark_info = f"{suite}/{benchmark}/{experiment} [PostgreSQL]"
96+
try:
97+
durations = self.run_command(command, timeout, benchmark_info)
98+
except ExperimentTimeoutExpired as ex:
9099
if case not in measurement_times.keys():
91100
measurement_times[case] = list()
92-
measurement_times[case].append(time)
93-
94-
stream.close()
101+
measurement_times[case].append(TIMEOUT_PER_CASE * 1000)
102+
else:
103+
for idx, line in enumerate(durations):
104+
time = float(line.replace("\n", "").replace(",", ".")) # in milliseconds
105+
if case not in measurement_times.keys():
106+
measurement_times[case] = list()
107+
measurement_times[case].append(time)
95108

96109

97110
# Otherwise, tables have to be created just once before the measurements (done above)
98111
else:
99-
connection.close()
100-
101112
# Write cases/queries to a file that will be passed to the command to execute
102113
with open(TMP_SQL_FILE, "w") as tmp:
103114
tmp.write("\\timing on\n")
@@ -113,14 +124,23 @@ def execute(self, n_runs, params: dict):
113124
verbose_printed = True
114125
with open(TMP_SQL_FILE) as tmp:
115126
tqdm.write(" " + " ".join(tmp.readlines()))
116-
stream = os.popen(f'{command}')
117-
for idx, line in enumerate(stream):
118-
time = float(line.replace("\n", "").replace(",", ".")) # in milliseconds
119-
case = list(params['cases'].keys())[idx]
120-
if case not in measurement_times.keys():
121-
measurement_times[case] = list()
122-
measurement_times[case].append(time)
123-
stream.close()
127+
128+
timeout = DEFAULT_TIMEOUT + TIMEOUT_PER_CASE * len(params['cases'])
129+
benchmark_info = f"{suite}/{benchmark}/{experiment} [PostgreSQL]"
130+
try:
131+
durations = self.run_command(command, timeout, benchmark_info)
132+
except ExperimentTimeoutExpired as ex:
133+
for case in params['cases'].keys():
134+
if case not in measurement_times.keys():
135+
measurement_times[case] = list()
136+
measurement_times[case].append(TIMEOUT_PER_CASE * 1000)
137+
else:
138+
for idx, line in enumerate(durations):
139+
time = float(line.replace("\n", "").replace(",", ".")) # in milliseconds
140+
case = list(params['cases'].keys())[idx]
141+
if case not in measurement_times.keys():
142+
measurement_times[case] = list()
143+
measurement_times[case].append(time)
124144

125145
finally:
126146
if(connection):
@@ -146,7 +166,10 @@ def clean_up(self):
146166
connection = psycopg2.connect(user=db_options['user'])
147167
connection.autocommit = True
148168
cursor = connection.cursor()
149-
cursor.execute(f"DROP DATABASE IF EXISTS {db_options['dbname']};")
169+
try:
170+
cursor.execute(f"DROP DATABASE IF EXISTS {db_options['dbname']};")
171+
except Exception as ex:
172+
tqdm.write(f"Unexpeced error while executing 'DROP DATABASE IF EXISTS {db_options['dbname']}' : {ex}")
150173
connection.close()
151174
if os.path.exists(TMP_SQL_FILE):
152175
os.remove(TMP_SQL_FILE)
@@ -213,3 +236,45 @@ def create_tables(self, cursor, data: dict, with_scale_factors):
213236

214237
if with_scale_factors:
215238
cursor.execute(f"CREATE TABLE {table_name[:-4]} {columns};") # Create actual table that will be used for experiment
239+
240+
241+
def run_command(self, command, timeout, benchmark_info):
242+
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
243+
cwd=os.getcwd(), shell=True)
244+
try:
245+
out, err = process.communicate("".encode('latin-1'), timeout=timeout)
246+
except subprocess.TimeoutExpired:
247+
process.kill()
248+
self.clean_up()
249+
raise ExperimentTimeoutExpired(f'Query timed out after {timeout} seconds')
250+
finally:
251+
if process.poll() is None: # if process is still alive
252+
process.terminate() # try to shut down gracefully
253+
try:
254+
process.wait(timeout=1) # wait for process to terminate
255+
except subprocess.TimeoutExpired:
256+
process.kill() # kill if process did not terminate in time
257+
258+
out = out.decode('latin-1')
259+
err = err.decode('latin-1')
260+
261+
if process.returncode or len(err):
262+
outstr = '\n'.join(out.split('\n')[-20:])
263+
tqdm.write(f'''\
264+
Unexpected failure during execution of benchmark "{benchmark_info}" with return code {process.returncode}:''')
265+
tqdm.write(command)
266+
tqdm.write(f'''\
267+
===== stdout =====
268+
{outstr}
269+
===== stderr =====
270+
{err}
271+
==================
272+
''')
273+
if process.returncode:
274+
raise ConnectorException(f'Benchmark failed with return code {process.returncode}.')
275+
276+
# Parse `out` for timings
277+
durations = out.split('\n')
278+
durations.remove('')
279+
280+
return durations

0 commit comments

Comments
 (0)