Skip to content

Commit c7779e9

Browse files
author
Tobias Kopp
committed
[Benchmark] Implement timeout for DuckDB
- Insert timeout values into database (DuckDB)
1 parent 0ddcfd8 commit c7779e9

File tree

2 files changed

+62
-9
lines changed

2 files changed

+62
-9
lines changed

benchmark/database_connectors/connector.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from abc import ABC, abstractmethod
22
import os
33

4+
DEFAULT_TIMEOUT = 30 # seconds
5+
TIMEOUT_PER_CASE = 10 # seconds
46

57
#=======================================================================================================================
68
# Connector exceptions

benchmark/database_connectors/duckdb.py

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import os
44
import json
5+
import subprocess
56
from tqdm import tqdm
67

78

@@ -43,6 +44,7 @@ def execute(self, n_runs, params: dict):
4344

4445
# If tables contain scale factors, they have to be loaded separately for every case
4546
if (with_scale_factors or not bool(params.get('readonly'))):
47+
timeout = (DEFAULT_TIMEOUT + TIMEOUT_PER_CASE) * len(params['cases'])
4648
# Write cases/queries to a file that will be passed to the command to execute
4749
statements = list()
4850
for case, query_stmt in params['cases'].items():
@@ -70,6 +72,7 @@ def execute(self, n_runs, params: dict):
7072

7173
# Otherwise, tables have to be created just once before the measurements (done above)
7274
else:
75+
timeout = DEFAULT_TIMEOUT + TIMEOUT_PER_CASE * len(params['cases'])
7376
# Write cases/queries to a file that will be passed to the command to execute
7477
with open(TMP_SQL_FILE, "a+") as tmp:
7578
tmp.write(".timer on\n")
@@ -81,7 +84,7 @@ def execute(self, n_runs, params: dict):
8184
# Execute query file and collect measurement data
8285
command = f"./{self.duckdb_cli} {TMP_DB} < {TMP_SQL_FILE}" + " | grep 'Run Time' | cut -d ' ' -f 5 | awk '{print $1 * 1000;}'"
8386
if not self.multithreaded:
84-
command = f'taskset -c 2 {command}'
87+
command = 'taskset -c 2 ' + command
8588

8689
if self.verbose:
8790
tqdm.write(f" $ {command}")
@@ -90,14 +93,21 @@ def execute(self, n_runs, params: dict):
9093
with open(TMP_SQL_FILE) as tmp:
9194
tqdm.write(" " + " ".join(tmp.readlines()))
9295

93-
stream = os.popen(f'{command}')
94-
for idx, line in enumerate(stream):
95-
time = float(line.replace("\n", "").replace(",", ".")) # in milliseconds
96-
case = list(params['cases'].keys())[idx]
97-
if case not in measurement_times.keys():
98-
measurement_times[case] = list()
99-
measurement_times[case].append(time)
100-
stream.close()
96+
benchmark_info = f"{suite}/{benchmark}/{experiment} [{configname}]"
97+
try:
98+
durations = self.run_command(command, timeout, benchmark_info)
99+
except ExperimentTimeoutExpired as ex:
100+
for case in params['cases'].keys():
101+
if case not in measurement_times.keys():
102+
measurement_times[case] = list()
103+
measurement_times[case].append(TIMEOUT_PER_CASE * 1000)
104+
else:
105+
for idx, line in enumerate(durations):
106+
time = float(line.replace("\n", "").replace(",", ".")) # in milliseconds
107+
case = list(params['cases'].keys())[idx]
108+
if case not in measurement_times.keys():
109+
measurement_times[case] = list()
110+
measurement_times[case].append(time)
101111

102112

103113
finally:
@@ -180,3 +190,44 @@ def generate_create_table_stmts(self, data: dict, with_scale_factors):
180190
with open(TMP_SQL_FILE, "w") as tmp:
181191
for stmt in statements:
182192
tmp.write(stmt + "\n")
193+
194+
195+
def run_command(self, command, timeout, benchmark_info):
196+
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
197+
cwd=os.getcwd(), shell=True)
198+
try:
199+
out, err = process.communicate("".encode('latin-1'), timeout=timeout)
200+
except subprocess.TimeoutExpired:
201+
process.kill()
202+
raise ExperimentTimeoutExpired(f'Query timed out after {timeout} seconds')
203+
finally:
204+
if process.poll() is None: # if process is still alive
205+
process.terminate() # try to shut down gracefully
206+
try:
207+
process.wait(timeout=5) # wait for process to terminate
208+
except subprocess.TimeoutExpired:
209+
process.kill() # kill if process did not terminate in time
210+
211+
out = out.decode('latin-1')
212+
err = err.decode('latin-1')
213+
214+
if process.returncode or len(err):
215+
outstr = '\n'.join(out.split('\n')[-20:])
216+
tqdm.write(f'''\
217+
Unexpected failure during execution of benchmark "{benchmark_info}" with return code {process.returncode}:''')
218+
tqdm.write(command)
219+
tqdm.write(f'''\
220+
===== stdout =====
221+
{outstr}
222+
===== stderr =====
223+
{err}
224+
==================
225+
''')
226+
if process.returncode:
227+
raise ConnectorException(f'Benchmark failed with return code {process.returncode}.')
228+
229+
# Parse `out` for timings
230+
durations = out.split('\n')
231+
durations.remove('')
232+
233+
return durations

0 commit comments

Comments
 (0)