@@ -29,9 +29,9 @@ public interface StatsChangedListener {
29
29
private final Set <Integer > stoppedPGIds = new HashSet <>();
30
30
private final Map <Integer , FlowStatPoint > flowStatPointShadowMap = new HashMap <>();
31
31
32
- private final Map <Integer , ArrayHistory <LatencyStatPoint >> latencyStatPointHistoryMap = new HashMap <>();
32
+ private final Map <Integer , ArrayHistory <LatencyStatPoint >> latencyHistoryByPGID = new HashMap <>();
33
+ private final Map <Integer , LatencyStatPoint > latencyInitialOffsetByPGID = new HashMap <>();
33
34
private final Map <Integer , Long > maxLatencyMap = new HashMap <>();
34
- private final Map <Integer , LatencyStatPoint > latencyStatPointShadowMap = new HashMap <>();
35
35
private String [] histogramKeys = new String [0 ];
36
36
37
37
private Map <String , Integer > lastVerId = new HashMap <>();
@@ -55,17 +55,13 @@ public Map<Integer, FlowStatPoint> getFlowStatPointShadowMap() {
55
55
}
56
56
57
57
public Map <Integer , ArrayHistory <LatencyStatPoint >> getLatencyStatPointHistoryMap () {
58
- return latencyStatPointHistoryMap ;
58
+ return latencyHistoryByPGID ;
59
59
}
60
60
61
61
public Map <Integer , Long > getMaxLatencyMap () {
62
62
return maxLatencyMap ;
63
63
}
64
64
65
- public Map <Integer , LatencyStatPoint > getLatencyStatPointShadowMap () {
66
- return latencyStatPointShadowMap ;
67
- }
68
-
69
65
public String [] getHistogramKeys (final int size ) {
70
66
return Arrays .copyOfRange (histogramKeys , Math .max (0 , histogramKeys .length - size ), histogramKeys .length );
71
67
}
@@ -161,7 +157,7 @@ private void handlePGIDStatsReceived(final WorkerStateEvent event) {
161
157
162
158
final Map <String , LatencyStat > latencyStatMap = receivedPGIDStats .getLatency ();
163
159
if (latencyStatMap != null ) {
164
- processLatencyStats (receivedPGIDStats . getLatency () , verId , time );
160
+ processLatencyStats (latencyStatMap , verId , time );
165
161
} else {
166
162
clearLatencyStats ();
167
163
}
@@ -241,7 +237,7 @@ private void processLatencyStats(
241
237
final Map <String , Integer > verId ,
242
238
final double time
243
239
) {
244
- final Set <Integer > unvisitedStreams = new HashSet <>(latencyStatPointHistoryMap .keySet ());
240
+ final Set <Integer > unvisitedStreams = new HashSet <>(latencyHistoryByPGID .keySet ());
245
241
final Set <String > histogramKeysSet = new HashSet <>();
246
242
247
243
latencyStatMap .forEach ((final String pgID , final LatencyStat latencyStat ) -> {
@@ -251,20 +247,23 @@ private void processLatencyStats(
251
247
} catch (NumberFormatException exc ) {
252
248
return ;
253
249
}
254
-
255
250
unvisitedStreams .remove (intPGID );
256
251
257
- final LatencyStatPoint statsFlowHistoryPoint = new LatencyStatPoint (latencyStat , time );
258
- ArrayHistory <LatencyStatPoint > history = latencyStatPointHistoryMap .get (intPGID );
252
+ ArrayHistory <LatencyStatPoint > history = latencyHistoryByPGID .get (intPGID );
259
253
if (history == null ) {
260
254
history = new ArrayHistory <>(HISTORY_SIZE );
261
- latencyStatPointHistoryMap .put (intPGID , history );
255
+ latencyHistoryByPGID .put (intPGID , history );
262
256
} else if (!verId .get (pgID ).equals (lastVerId .get (pgID ))) {
263
257
history .clear ();
264
258
maxLatencyMap .remove (intPGID );
265
- latencyStatPointShadowMap .remove (intPGID );
259
+ latencyInitialOffsetByPGID .remove (intPGID );
266
260
}
267
- history .add (statsFlowHistoryPoint );
261
+
262
+ final LatencyStatPoint latencyStatPoint = new LatencyStatPoint (latencyStat , time );
263
+ latencyInitialOffsetByPGID .putIfAbsent (intPGID , latencyStatPoint );
264
+
265
+ LatencyStatPoint shifted = latencyStatPoint .subtractOffset (latencyInitialOffsetByPGID .get (intPGID ));
266
+ history .add (shifted );
268
267
269
268
final long lastMax = latencyStat .getLat ().getLastMax ();
270
269
final Long maxLatency = maxLatencyMap .get (intPGID );
@@ -273,37 +272,33 @@ private void processLatencyStats(
273
272
}
274
273
275
274
histogramKeysSet .addAll (latencyStat .getLat ().getHistogram ().keySet ());
276
-
277
- latencyStatPointShadowMap .putIfAbsent (intPGID , statsFlowHistoryPoint );
278
275
});
279
276
280
277
histogramKeys = new String [histogramKeysSet .size ()];
281
278
histogramKeysSet .toArray (histogramKeys );
282
279
Arrays .sort (histogramKeys , PGIDStatsStorage ::compareHistogramKeys );
283
280
284
281
unvisitedStreams .forEach ((final Integer pgID ) -> {
285
- latencyStatPointHistoryMap .remove (pgID );
282
+ latencyHistoryByPGID .remove (pgID );
286
283
maxLatencyMap .remove (pgID );
287
- latencyStatPointShadowMap .remove (pgID );
284
+ latencyInitialOffsetByPGID .remove (pgID );
288
285
});
289
286
}
290
287
291
288
private void clearLatencyStats () {
292
- latencyStatPointHistoryMap .clear ();
289
+ latencyHistoryByPGID .clear ();
293
290
maxLatencyMap .clear ();
294
- latencyStatPointShadowMap .clear ();
291
+ latencyInitialOffsetByPGID .clear ();
295
292
}
296
293
297
294
private void resetLatencyStats () {
298
- latencyStatPointShadowMap .clear ();
295
+ latencyInitialOffsetByPGID .clear ();
299
296
maxLatencyMap .clear ();
300
- latencyStatPointHistoryMap .forEach ((final Integer pgID , final ArrayHistory <LatencyStatPoint > history ) -> {
297
+ latencyHistoryByPGID .forEach ((final Integer pgID , final ArrayHistory <LatencyStatPoint > history ) -> {
301
298
if (!history .isEmpty ()) {
302
299
final LatencyStatPoint last = history .last ();
303
300
maxLatencyMap .put (pgID , last .getLatencyStat ().getLat ().getLastMax ());
304
- latencyStatPointShadowMap .put (pgID , last );
305
301
history .clear ();
306
- history .add (last );
307
302
}
308
303
});
309
304
}
0 commit comments