@@ -116,6 +116,20 @@ class StatisticalModel:
116116 "SEQ_NUMBER" : np .uint32 ,
117117 "MSG_LENGTH" : np .uint64 ,
118118 }
119+ CSV_COLUMN_TYPES_NULLABLE = {
120+ "START_TIME" : "UInt64" ,
121+ "END_TIME" : "UInt64" ,
122+ "PROTOCOL" : "UInt8" ,
123+ "SRC_IP" : str ,
124+ "DST_IP" : str ,
125+ "SRC_PORT" : "UInt16" ,
126+ "DST_PORT" : "UInt16" ,
127+ "PACKETS" : "UInt64" ,
128+ "BYTES" : "UInt64" ,
129+ "EXPORT_TIME" : "UInt64" ,
130+ "SEQ_NUMBER" : "UInt32" ,
131+ "MSG_LENGTH" : "UInt64" ,
132+ }
119133
120134 AGGREGATE_FLOWS = {
121135 "START_TIME" : "min" ,
@@ -235,10 +249,25 @@ def _init_flows(self, path: os.PathLike):
235249 first_write = True
236250 logging .getLogger ().debug ("reading file with flows=%s" , path )
237251 # ports could be empty in flows with protocol like ICMP
238- for chunk in pd .read_csv (path , dtype = self .CSV_COLUMN_TYPES , chunksize = 10_000 ):
239- chunk ["SRC_PORT" ] = chunk ["SRC_PORT" ].fillna (0 )
240- chunk ["DST_PORT" ] = chunk ["DST_PORT" ].fillna (0 )
241- chunk = chunk .astype (self .CSV_COLUMN_TYPES )
252+ for chunk in pd .read_csv (
253+ path , dtype = self .CSV_COLUMN_TYPES_NULLABLE , chunksize = 10_000
254+ ):
255+ chunk = chunk .fillna (
256+ {
257+ "START_TIME" : 0 ,
258+ "END_TIME" : 0 ,
259+ "PROTOCOL" : 0 ,
260+ "SRC_IP" : "" ,
261+ "DST_IP" : "" ,
262+ "SRC_PORT" : 0 ,
263+ "DST_PORT" : 0 ,
264+ "PACKETS" : 0 ,
265+ "BYTES" : 0 ,
266+ "EXPORT_TIME" : 0 ,
267+ "SEQ_NUMBER" : 0 ,
268+ "MSG_LENGTH" : 0 ,
269+ }
270+ ).astype (self .CSV_COLUMN_TYPES )
242271
243272 self ._zero_icmp_ports (chunk )
244273
@@ -666,7 +695,7 @@ def setup_statsitic_objects(
666695 measure_end_time = end_time ,
667696 ),
668697 "dt_flows_active_time" : DiscreteCounter ("Flow Duration Active" ),
669- "dr_flows_cache_time " : DiscreteCounter ("Flow Duration in Cache" ),
698+ "dt_flows_cache_time " : DiscreteCounter ("Flow Duration in Cache" ),
670699 "tsc_data_rate" : TimeSeriesCounter (
671700 "data rate in Gb/s" ,
672701 sim ,
0 commit comments