Skip to content

Commit d369860

Browse files
committed
ft-analyzer update flow file preparation to start earlier
1 parent 1a696be commit d369860

File tree

4 files changed

+57
-50
lines changed

4 files changed

+57
-50
lines changed

tools/ft-analyzer/ftanalyzer/models/statistical_model.py

Lines changed: 50 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ def __init__(
172172
if isinstance(reference, str):
173173
self._ref_path = reference
174174
else:
175-
self._ref_path = self._ref_path = tempfile.NamedTemporaryFile(
175+
self._ref_path = tempfile.NamedTemporaryFile(
176176
delete=False, prefix="tmp_ref", suffix=".csv"
177177
).name
178178
reference.to_csv(
@@ -186,11 +186,7 @@ def __init__(
186186
self._ref_ip_addresses_converted = isinstance(reference, pd.DataFrame)
187187
self._stat_counter = use_statistical_counter
188188
self._inactive_timeout = inactive_timeout
189-
190-
try:
191-
self._flows_path = self._init_flows(flows)
192-
except Exception as err:
193-
raise SMException("Unable to read file with flows.") from err
189+
self._flows_path = flows
194190

195191
if merge:
196192
self._merge_flows(biflows_ts_correction)
@@ -222,7 +218,8 @@ def __init__(
222218
self._future_ref = None
223219
self._future_sim = None
224220

225-
def _init_flows(self, path: os.PathLike):
221+
@staticmethod
222+
def prepare_flows_file(path: os.PathLike, generator_stats: GeneratorStats):
226223
"""initial read of flows.csv in chunks
227224
replaces faulty values and filters out some flows
228225
@@ -238,47 +235,50 @@ def _init_flows(self, path: os.PathLike):
238235
first_write = True
239236
logging.getLogger().debug("reading file with flows=%s", path)
240237
# ports could be empty in flows with protocol like ICMP
241-
for chunk in pd.read_csv(
242-
path, dtype=self.CSV_COLUMN_TYPES_NULLABLE, chunksize=10_000
243-
):
244-
chunk = chunk.fillna(
245-
{
246-
"START_TIME": 0,
247-
"END_TIME": 0,
248-
"PROTOCOL": 0,
249-
"SRC_IP": "",
250-
"DST_IP": "",
251-
"SRC_PORT": 0,
252-
"DST_PORT": 0,
253-
"PACKETS": 0,
254-
"BYTES": 0,
255-
"EXPORT_TIME": 0,
256-
"SEQ_NUMBER": 0,
257-
"MSG_LENGTH": 0,
258-
}
259-
).astype(self.CSV_COLUMN_TYPES)
260-
261-
self._zero_icmp_ports(chunk)
262-
263-
if self._generator_stats.start_time > 0:
264-
# filter out flows that start before the start time with 500 ms tolerance
265-
chunk = chunk[
266-
chunk["START_TIME"] >= self._generator_stats.start_time - 500
267-
]
268-
269-
# if stats.end_time > 0:
270-
# # filter out flows that start before the end time
271-
# chunk = chunk[chunk["START_TIME"] <= stats.end_time]
272-
273-
self._filter_multicast(chunk)
274-
275-
chunk.to_csv(
276-
out_file,
277-
index=False,
278-
mode="w" if first_write else "a",
279-
header=first_write,
280-
)
281-
first_write = False
238+
# open output file once to avoid repeated open/close syscalls
239+
with open(out_file, "w", newline="", encoding="ascii") as csvf:
240+
for chunk in pd.read_csv(
241+
path, dtype=StatisticalModel.CSV_COLUMN_TYPES_NULLABLE, chunksize=10_000
242+
):
243+
# fill missing values in-place to avoid extra copy
244+
chunk.fillna(
245+
{
246+
"START_TIME": 0,
247+
"END_TIME": 0,
248+
"PROTOCOL": 0,
249+
"SRC_IP": "",
250+
"DST_IP": "",
251+
"SRC_PORT": 0,
252+
"DST_PORT": 0,
253+
"PACKETS": 0,
254+
"BYTES": 0,
255+
"EXPORT_TIME": 0,
256+
"SEQ_NUMBER": 0,
257+
"MSG_LENGTH": 0,
258+
},
259+
inplace=True,
260+
)
261+
262+
chunk = chunk.astype(StatisticalModel.CSV_COLUMN_TYPES)
263+
264+
# zero ICMP ports (vectorized)
265+
StatisticalModel._zero_icmp_ports(chunk)
266+
267+
# build a single combined mask to apply all filters in one go
268+
mask = np.ones(len(chunk), dtype=bool)
269+
if generator_stats.start_time > 0:
270+
mask &= chunk["START_TIME"] >= generator_stats.start_time - 500
271+
272+
# multicast filters: ipv4 and ipv6
273+
# DST_IP might be empty string for some rows (we set that above), so startswith is safe
274+
mask &= chunk["DST_IP"] != "255.255.255.255"
275+
mask &= ~chunk["DST_IP"].str.startswith("ff02:")
276+
277+
filtered = chunk.loc[mask]
278+
279+
# write filtered chunk to CSV using the open file handle
280+
filtered.to_csv(csvf, index=False, header=first_write)
281+
first_write = False
282282

283283
os.remove(path)
284284
return out_file
@@ -323,7 +323,8 @@ def _load_ref_df(self):
323323
self._ref_path, engine="pyarrow", dtype=self.CSV_COLUMN_TYPES
324324
)
325325

326-
def _zero_icmp_ports(self, df: pd.DataFrame):
326+
@staticmethod
327+
def _zero_icmp_ports(df: pd.DataFrame):
327328
icmp_protocols = [1, 58] # ICMP and ICMPv6
328329
icmp_mask = df["PROTOCOL"].isin(icmp_protocols)
329330
df.loc[icmp_mask, ["SRC_PORT", "DST_PORT"]] = 0

tools/ft-orchestration/tests/simulation/test_simulation_general.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ def finalizer_download_logs():
374374
probe_instance.host_statistics.get_csv(tmp_dir)
375375

376376
flows_file_future.result()
377+
flows_file = StatisticalModel.prepare_flows_file(flows_file, stats)
377378
replicated_ref = replicated_ref_future.result()
378379

379380
stats_report, precise_report = validate(

tools/ft-orchestration/tests/simulation/test_simulation_overload.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ def finalizer_download_logs():
322322
probe_instance.host_statistics.get_csv(tmp_dir)
323323

324324
flows_file_future.result()
325+
flows_file = StatisticalModel.prepare_flows_file(flows_file, stats)
325326
replicated_ref = replicated_ref_future.result()
326327

327328
model = StatisticalModel(

tools/ft-orchestration/tests/simulation/test_simulation_threshold.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,9 @@ def finalizer_download_logs():
227227
request.addfinalizer(cleanup)
228228
request.addfinalizer(finalizer_download_logs)
229229

230-
def run_single_test(loops: int, speed: MbpsSpeed) -> tuple[bool, StatisticalReport]:
230+
def run_single_test(
231+
loops: int, speed: MbpsSpeed, flows_file: os.PathLike
232+
) -> tuple[bool, StatisticalReport]:
231233
logging.getLogger().info(
232234
"running test with speed: %s Mbps (loops: %s)", speed.speed, loops
233235
)
@@ -272,6 +274,7 @@ def run_single_test(loops: int, speed: MbpsSpeed) -> tuple[bool, StatisticalRepo
272274
probe_instance.host_statistics.get_csv(tmp_dir)
273275

274276
flows_file_future.result()
277+
flows_file = StatisticalModel.prepare_flows_file(flows_file, stats)
275278
replicated_ref = replicated_ref_future.result()
276279

277280
flow_replicator = None
@@ -333,6 +336,7 @@ def run_single_test(loops: int, speed: MbpsSpeed) -> tuple[bool, StatisticalRepo
333336
result, report = run_single_test(
334337
max(1, int(math.ceil(speed_current / scenario.default.mbps))),
335338
MbpsSpeed(speed_current),
339+
flows_file,
336340
)
337341
report.print_results()
338342
except Exception as e:

0 commit comments

Comments
 (0)