Skip to content

Commit 9a48a43

Browse files
author
Tobias Kopp
committed
[Benchmark] Implement timeout for hyper
- Revert timout changes as it overrides multi core - Implement timout tracking for hyper
1 parent 3b5f774 commit 9a48a43

File tree

1 file changed

+92
-35
lines changed

1 file changed

+92
-35
lines changed

benchmark/database_connectors/hyper.py

Lines changed: 92 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import sys
99
import subprocess
1010
from tqdm import tqdm
11+
import multiprocessing
1112

1213

1314
# Converting table names to lower case is needed because then
@@ -17,6 +18,17 @@ def table_name_to_lower_case(name: str):
1718
return name.lower()
1819

1920

21+
# Used as function for new process to track time / timeout
22+
def run_multiple_queries(connection, queries, data, queue):
23+
times = hyperconf.benchmark_execution_times(connection, queries, data)
24+
queue.put(times)
25+
26+
def run_single_query(connection, query):
27+
with connection.execute_query(query) as result:
28+
for row in result:
29+
pass
30+
31+
2032
class HyPer(Connector):
2133

2234
def __init__(self, args = dict()):
@@ -33,6 +45,12 @@ def execute(self, n_runs, params: dict()):
3345
result = None
3446
if self.multithreaded:
3547
result = HyPer._execute(n_runs, params)
48+
try:
49+
result = HyPer._execute(n_runs, params)
50+
except Exception as ex:
51+
tqdm.write(str(ex))
52+
return dict()
53+
3654
else:
3755
path = os.getcwd()
3856
script = f'''
@@ -44,18 +62,26 @@ def execute(self, n_runs, params: dict()):
4462
args = ['taskset', '-c', '2', 'python3', '-c', script]
4563
if self.verbose:
4664
tqdm.write(f" $ {' '.join(args)}")
65+
# try:
66+
# P = subprocess.run(
67+
# args=args,
68+
# capture_output=True,
69+
# text=True,
70+
# cwd=path
71+
# )
72+
# result = eval(P.stdout)
73+
# except Exception as ex:
74+
# tqdm.write(str(ex))
75+
# return dict()
4776
P = subprocess.run(
48-
args=args,
49-
capture_output=True,
50-
text=True,
51-
cwd=path
52-
)
53-
# TODO error handling
54-
77+
args=args,
78+
capture_output=True,
79+
text=True,
80+
cwd=path
81+
)
5582
result = eval(P.stdout)
5683

5784
patched_result = dict()
58-
5985
for key, val in result.items():
6086
patched_result[f'{key}{suffix}'] = val
6187
return patched_result
@@ -73,6 +99,8 @@ def _execute(n_runs, params: dict()):
7399

74100
hyperconf.init() # prepare for measurements
75101

102+
num_timeout_cases = 0
103+
76104
for run_id in range(n_runs):
77105
# If tables contain scale factors, they have to be loaded separately for every case
78106
if (with_scale_factors or not bool(params.get('readonly'))):
@@ -95,8 +123,11 @@ def _execute(n_runs, params: dict()):
95123

96124
# Execute cases
97125
for case, query in params['cases'].items():
126+
# Set up tables
98127
for table_name, table in params['data'].items():
99128
table_name = table_name_to_lower_case(table_name)
129+
connection.execute_command(f'DELETE FROM {table_name};') # Empty table first
130+
100131
if table.get('scale_factors'):
101132
sf = table['scale_factors'][case]
102133
else:
@@ -105,44 +136,70 @@ def _execute(n_runs, params: dict()):
105136
num_rows = round((table['lines_in_file'] - header) * sf)
106137
connection.execute_command(f'INSERT INTO {table_name} SELECT * FROM {table_name}_tmp LIMIT {num_rows};')
107138

108-
with connection.execute_query(query) as result:
109-
for row in result:
110-
pass
111-
for table_name, table in params['data'].items():
112-
connection.execute_command(f'DELETE FROM {table_name_to_lower_case(table_name)};')
113-
114-
# extract results
115-
matches = hyperconf.filter_results(
116-
hyperconf.extract_results(),
117-
{ 'k': 'query-end'},
118-
[ hyperconf.MATCH_SELECT ]
119-
)
120-
times = map(lambda m: m['v']['execution-time'] * 1000, matches)
121-
times = list(map(lambda t: f'{t:.3f}', times))
122-
times = times[run_id * len(list(params['cases'].keys())) : ] # get only times of this run, ignore previous runs
123-
times = list(zip(params['cases'].keys(), times))
124-
for case, time in times:
125-
if case not in measurement_times.keys():
126-
measurement_times[case] = list()
127-
measurement_times[case].append(time)
139+
timeout = TIMEOUT_PER_CASE
140+
times = None
141+
p = multiprocessing.Process(target=run_single_query, args=(connection, query))
142+
try:
143+
p.start()
144+
p.join(timeout=timeout)
145+
if p.is_alive():
146+
# timeout happened
147+
num_timeout_cases += 1
148+
time = timeout * 1000 # in ms
149+
p.terminate()
150+
p.join()
151+
else:
152+
# no timeout, extract result
153+
matches = hyperconf.filter_results(
154+
hyperconf.extract_results(),
155+
{ 'k': 'query-end'},
156+
[ hyperconf.MATCH_SELECT ]
157+
)
158+
times = map(lambda m: m['v']['execution-time'] * 1000, matches)
159+
times = list(map(lambda t: f'{t:.3f}', times))
160+
case_idx = list(params['cases'].keys()).index(case)
161+
time = times[run_id * len(list(params['cases'].keys())) + case_idx - num_timeout_cases]
162+
163+
if case not in measurement_times.keys():
164+
measurement_times[case] = list()
165+
measurement_times[case].append(time)
166+
167+
except Exception as ex:
168+
raise(ConnectorException(ex))
169+
128170

129171

130172
else:
173+
open(hyperconf.HYPER_LOG_FILE, 'w').close() # to clear the log file and ignore previous runs
131174
# Otherwise, tables have to be created just once before the measurements
132175
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
133176
with Connection(endpoint=hyper.endpoint, database='benchmark.hyper', create_mode=CreateMode.CREATE_AND_REPLACE) as connection:
134177
table_defs = HyPer.get_all_table_defs(params)
135178
queries = HyPer.get_cases_queries(params, table_defs)
136179
data = HyPer.get_data(params, table_defs)
137180

138-
times = hyperconf.benchmark_execution_times(connection, queries.values(), data)
139-
times = times[run_id * len(queries.keys()):] # get only times of this run, ignore previous runs
140-
times = list(zip(queries.keys(), list(map(lambda t: float(f'{t:.3f}'), times))))
181+
timeout = DEFAULT_TIMEOUT + TIMEOUT_PER_CASE * len(params['cases'])
182+
times = None
183+
q = multiprocessing.Queue()
184+
p = multiprocessing.Process(target=run_multiple_queries, args=(connection, queries.values(), data, q))
185+
try:
186+
p.start()
187+
p.join(timeout=timeout)
188+
if p.is_alive():
189+
# timeout happened
190+
times = times = list(zip(queries.keys(), [TIMEOUT_PER_CASE for _ in range(len(params['cases']))]))
191+
else:
192+
times = q.get()
193+
times = list(zip(queries.keys(), list(map(lambda t: float(f'{t:.3f}'), times))))
194+
195+
for case, time in times:
196+
if case not in measurement_times.keys():
197+
measurement_times[case] = list()
198+
measurement_times[case].append(time)
199+
200+
except Exception as ex:
201+
raise(ConnectorException(ex))
141202

142-
for case, time in times:
143-
if case not in measurement_times.keys():
144-
measurement_times[case] = list()
145-
measurement_times[case].append(time)
146203

147204

148205
return {'HyPer': measurement_times}

0 commit comments

Comments
 (0)