Skip to content

Commit 5c1bd6c

Browse files
jsuerethjack-berg
andauthored
Add adaptable circular buffer implementation for ExponentialCounter. (#4087)
* Add adaptable circular buffer implementation for ExponentialCounter and expose hooks to test its use in Exponential Histogram aggregator. * Clean up some adapting circular buffer code. * Fix style issues. * Apply spotless. * Add tests for adapting integer array. * Finish wiring ability to remember previous integer cell size and expand testing. * Update array copy from code review. * Fixes/cleanups from review. - Fix a bug in equality where it was forcing ExponentialCounter to have the same offset, even if it had stored 0 counts in all buckets. This interacts negatively with merge/diff tests where creating a fresh exponential bucket would have different indexStart then diff-ing another. - Modify default exponential bucket counter to be adapting circular buffer. - Remove some not-well-though-out methods (like zeroOf, zeroFrom) in favor of a "clear" method on ExponentialCounter - Modify ExponentialBucketStrategy to be an actual implementation. * Improve testing of copy behavior across exponential-counter implementations. * Last fix/cleanup for PR. Remove remaining TODO around preserving runtime optimisations. * Fixes from review. * Add test to ensure 0 is returned from exponential counters outside popualted range. * Add a bunch of extra equality tests. * run spotless. * Add note about equality. * Add copy() method to AdaptingIntegerArray, update tests. * Fix checkstyle. * Add internal disclaimer, reduce visibility of test classes Co-authored-by: jack-berg <[email protected]>
1 parent 0ed4967 commit 5c1bd6c

File tree

15 files changed

+892
-75
lines changed

15 files changed

+892
-75
lines changed

sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramAggregationParam.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package io.opentelemetry.sdk.metrics.internal.aggregator;
77

88
import io.opentelemetry.sdk.metrics.exemplar.ExemplarReservoir;
9+
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
910
import java.util.Collections;
1011

1112
/** The types of histogram aggregation to benchmark. */
@@ -20,7 +21,20 @@ public enum HistogramAggregationParam {
2021
new DoubleHistogramAggregator(
2122
ExplicitBucketHistogramUtils.createBoundaryArray(Collections.emptyList()),
2223
ExemplarReservoir::noSamples)),
23-
EXPONENTIAL(new DoubleExponentialHistogramAggregator(ExemplarReservoir::noSamples));
24+
EXPONENTIAL_SMALL_CIRCULAR_BUFFER(
25+
new DoubleExponentialHistogramAggregator(
26+
ExemplarReservoir::noSamples,
27+
ExponentialBucketStrategy.newStrategy(
28+
20, 20, ExponentialCounterFactory.circularBufferCounter()))),
29+
EXPONENTIAL_CIRCULAR_BUFFER(
30+
new DoubleExponentialHistogramAggregator(
31+
ExemplarReservoir::noSamples,
32+
ExponentialBucketStrategy.newStrategy(
33+
20, 320, ExponentialCounterFactory.circularBufferCounter()))),
34+
EXPONENTIAL_MAP_COUNTER(
35+
new DoubleExponentialHistogramAggregator(
36+
ExemplarReservoir::noSamples,
37+
ExponentialBucketStrategy.newStrategy(20, 320, ExponentialCounterFactory.mapCounter())));
2438

2539
private final Aggregator<?> aggregator;
2640

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExponentialHistogramAggregator.java

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.opentelemetry.sdk.metrics.data.MetricData;
1414
import io.opentelemetry.sdk.metrics.exemplar.ExemplarReservoir;
1515
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
16+
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
1617
import io.opentelemetry.sdk.resources.Resource;
1718
import java.util.List;
1819
import java.util.Map;
@@ -22,14 +23,24 @@ final class DoubleExponentialHistogramAggregator
2223
implements Aggregator<ExponentialHistogramAccumulation> {
2324

2425
private final Supplier<ExemplarReservoir> reservoirSupplier;
26+
private final ExponentialBucketStrategy bucketStrategy;
2527

2628
DoubleExponentialHistogramAggregator(Supplier<ExemplarReservoir> reservoirSupplier) {
29+
this(
30+
reservoirSupplier,
31+
ExponentialBucketStrategy.newStrategy(
32+
20, 320, ExponentialCounterFactory.circularBufferCounter()));
33+
}
34+
35+
DoubleExponentialHistogramAggregator(
36+
Supplier<ExemplarReservoir> reservoirSupplier, ExponentialBucketStrategy bucketStrategy) {
2737
this.reservoirSupplier = reservoirSupplier;
38+
this.bucketStrategy = bucketStrategy;
2839
}
2940

3041
@Override
3142
public AggregatorHandle<ExponentialHistogramAccumulation> createHandle() {
32-
return new Handle(reservoirSupplier.get());
43+
return new Handle(reservoirSupplier.get(), this.bucketStrategy);
3344
}
3445

3546
/**
@@ -132,32 +143,36 @@ public MetricData toMetricData(
132143
}
133144

134145
static final class Handle extends AggregatorHandle<ExponentialHistogramAccumulation> {
135-
136-
private int scale;
137-
private DoubleExponentialHistogramBuckets positiveBuckets;
138-
private DoubleExponentialHistogramBuckets negativeBuckets;
146+
private final ExponentialBucketStrategy bucketStrategy;
147+
private final DoubleExponentialHistogramBuckets positiveBuckets;
148+
private final DoubleExponentialHistogramBuckets negativeBuckets;
139149
private long zeroCount;
140150
private double sum;
141151

142-
Handle(ExemplarReservoir reservoir) {
152+
Handle(ExemplarReservoir reservoir, ExponentialBucketStrategy bucketStrategy) {
143153
super(reservoir);
144154
this.sum = 0;
145155
this.zeroCount = 0;
146-
this.scale = DoubleExponentialHistogramBuckets.MAX_SCALE;
147-
this.positiveBuckets = new DoubleExponentialHistogramBuckets();
148-
this.negativeBuckets = new DoubleExponentialHistogramBuckets();
156+
this.bucketStrategy = bucketStrategy;
157+
this.positiveBuckets = this.bucketStrategy.newBuckets();
158+
this.negativeBuckets = this.bucketStrategy.newBuckets();
149159
}
150160

151161
@Override
152162
protected synchronized ExponentialHistogramAccumulation doAccumulateThenReset(
153163
List<ExemplarData> exemplars) {
154164
ExponentialHistogramAccumulation acc =
155165
ExponentialHistogramAccumulation.create(
156-
scale, sum, positiveBuckets, negativeBuckets, zeroCount, exemplars);
166+
this.positiveBuckets.getScale(),
167+
sum,
168+
positiveBuckets.copy(),
169+
negativeBuckets.copy(),
170+
zeroCount,
171+
exemplars);
157172
this.sum = 0;
158173
this.zeroCount = 0;
159-
this.positiveBuckets = new DoubleExponentialHistogramBuckets();
160-
this.negativeBuckets = new DoubleExponentialHistogramBuckets();
174+
this.positiveBuckets.clear();
175+
this.negativeBuckets.clear();
161176
return acc;
162177
}
163178

@@ -180,6 +195,8 @@ protected synchronized void doRecordDouble(double value) {
180195
// Record; If recording fails, calculate scale reduction and scale down to fit new value.
181196
// 2nd attempt at recording should work with new scale
182197
DoubleExponentialHistogramBuckets buckets = (c > 0) ? positiveBuckets : negativeBuckets;
198+
// TODO: We should experiment with downscale on demand during sync execution and only
199+
// unifying scale factor between positive/negative at collection time (doAccumulate).
183200
if (!buckets.record(value)) {
184201
// getScaleReduction() used with downScale() will scale down as required to record value,
185202
// fit inside max allowed buckets, and make sure index can be represented by int.
@@ -196,7 +213,6 @@ protected void doRecordLong(long value) {
196213
void downScale(int by) {
197214
positiveBuckets.downscale(by);
198215
negativeBuckets.downscale(by);
199-
this.scale -= by;
200216
}
201217
}
202218
}

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExponentialHistogramBuckets.java

Lines changed: 75 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import io.opentelemetry.sdk.internal.PrimitiveLongList;
99
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
1010
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounter;
11-
import io.opentelemetry.sdk.metrics.internal.state.MapCounter;
11+
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
1212
import java.util.Collections;
1313
import java.util.List;
1414
import javax.annotation.Nonnull;
@@ -23,27 +23,37 @@
2323
*/
2424
final class DoubleExponentialHistogramBuckets implements ExponentialHistogramBuckets {
2525

26-
public static final int MAX_SCALE = 20;
27-
28-
private static final int MAX_BUCKETS = MapCounter.MAX_SIZE;
29-
26+
private final ExponentialCounterFactory counterFactory;
3027
private ExponentialCounter counts;
3128
private BucketMapper bucketMapper;
3229
private int scale;
3330

34-
DoubleExponentialHistogramBuckets() {
35-
this.counts = new MapCounter();
36-
this.bucketMapper = new LogarithmMapper(MAX_SCALE);
37-
this.scale = MAX_SCALE;
31+
DoubleExponentialHistogramBuckets(
32+
int scale, int maxBuckets, ExponentialCounterFactory counterFactory) {
33+
this.counterFactory = counterFactory;
34+
this.counts = counterFactory.newCounter(maxBuckets);
35+
this.bucketMapper = new LogarithmMapper(scale);
36+
this.scale = scale;
3837
}
3938

4039
// For copying
4140
DoubleExponentialHistogramBuckets(DoubleExponentialHistogramBuckets buckets) {
42-
this.counts = new MapCounter(buckets.counts); // copy counts
41+
this.counterFactory = buckets.counterFactory;
42+
this.counts = counterFactory.copy(buckets.counts);
4343
this.bucketMapper = new LogarithmMapper(buckets.scale);
4444
this.scale = buckets.scale;
4545
}
4646

47+
/** Returns a copy of this bucket. */
48+
DoubleExponentialHistogramBuckets copy() {
49+
return new DoubleExponentialHistogramBuckets(this);
50+
}
51+
52+
/** Resets all counters in this bucket set to zero, but preserves scale. */
53+
public void clear() {
54+
this.counts.clear();
55+
}
56+
4757
boolean record(double value) {
4858
if (value == 0.0) {
4959
// Guarded by caller. If passed 0 it would be a bug in the SDK.
@@ -55,6 +65,12 @@ boolean record(double value) {
5565

5666
@Override
5767
public int getOffset() {
68+
// We need to unify the behavior of empty buckets.
69+
// Unfortunately, getIndexStart is not meaningful for empty counters, so we default to
70+
// returning 0 for offset and an empty list.
71+
if (counts.isEmpty()) {
72+
return 0;
73+
}
5874
return counts.getIndexStart();
5975
}
6076

@@ -74,6 +90,9 @@ public List<Long> getBucketCounts() {
7490

7591
@Override
7692
public long getTotalCount() {
93+
if (counts.isEmpty()) {
94+
return 0;
95+
}
7796
long totalCount = 0;
7897
for (int i = counts.getIndexStart(); i <= counts.getIndexEnd(); i++) {
7998
totalCount += counts.get(i);
@@ -90,7 +109,11 @@ void downscale(int by) {
90109
}
91110

92111
if (!counts.isEmpty()) {
93-
ExponentialCounter newCounts = new MapCounter();
112+
// We want to preserve other optimisations here as well, e.g. integer size.
113+
// Instead of creating a new counter, we copy the existing one (for bucket size
114+
// optimisations), and clear the values before writing the new ones.
115+
ExponentialCounter newCounts = counterFactory.copy(counts);
116+
newCounts.clear();
94117

95118
for (int i = counts.getIndexStart(); i <= counts.getIndexEnd(); i++) {
96119
long count = counts.get(i);
@@ -117,7 +140,7 @@ void downscale(int by) {
117140
*/
118141
static DoubleExponentialHistogramBuckets diff(
119142
DoubleExponentialHistogramBuckets a, DoubleExponentialHistogramBuckets b) {
120-
DoubleExponentialHistogramBuckets copy = new DoubleExponentialHistogramBuckets(a);
143+
DoubleExponentialHistogramBuckets copy = a.copy();
121144
copy.mergeWith(b, /* additive= */ false);
122145
return copy;
123146
}
@@ -133,11 +156,11 @@ static DoubleExponentialHistogramBuckets diff(
133156
static DoubleExponentialHistogramBuckets merge(
134157
DoubleExponentialHistogramBuckets a, DoubleExponentialHistogramBuckets b) {
135158
if (b.counts.isEmpty()) {
136-
return new DoubleExponentialHistogramBuckets(a);
159+
return a;
137160
} else if (a.counts.isEmpty()) {
138-
return new DoubleExponentialHistogramBuckets(b);
161+
return b;
139162
}
140-
DoubleExponentialHistogramBuckets copy = new DoubleExponentialHistogramBuckets(a);
163+
DoubleExponentialHistogramBuckets copy = a.copy();
141164
copy.mergeWith(b, /* additive= */ true);
142165
return copy;
143166
}
@@ -218,7 +241,7 @@ int getScaleReduction(double value) {
218241
int getScaleReduction(long newStart, long newEnd) {
219242
int scaleReduction = 0;
220243

221-
while (newEnd - newStart + 1 > MAX_BUCKETS) {
244+
while (newEnd - newStart + 1 > counts.getMaxSize()) {
222245
newStart >>= 1;
223246
newEnd >>= 1;
224247
scaleReduction++;
@@ -234,19 +257,48 @@ public boolean equals(@Nullable Object obj) {
234257
DoubleExponentialHistogramBuckets other = (DoubleExponentialHistogramBuckets) obj;
235258
// Don't need to compare getTotalCount() because equivalent bucket counts
236259
// imply equivalent overall count.
237-
return getBucketCounts().equals(other.getBucketCounts())
238-
&& this.getOffset() == other.getOffset()
239-
&& this.scale == other.scale;
260+
// Additionally, we compare the "semantics" of bucket counts, that is
261+
// it's ok for getOffset() to diverge as long as the populated counts remain
262+
// the same. This is because we don't "normalize" buckets after doing
263+
// difference/subtraction operations.
264+
return this.scale == other.scale && sameBucketCounts(other);
265+
}
266+
267+
/**
268+
* Tests if two bucket counts are equivalent semantically.
269+
*
270+
* <p>Semantic equivalence means:
271+
*
272+
* <ul>
273+
* <li>All counts are stored between indexStart/indexEnd.
274+
* <li>Offset does NOT need to be the same
275+
* </ul>
276+
*/
277+
private boolean sameBucketCounts(DoubleExponentialHistogramBuckets other) {
278+
int min = Math.min(this.counts.getIndexStart(), other.counts.getIndexStart());
279+
int max = Math.max(this.counts.getIndexEnd(), other.counts.getIndexEnd());
280+
for (int idx = min; idx <= max; idx++) {
281+
if (this.counts.get(idx) != other.counts.get(idx)) {
282+
return false;
283+
}
284+
}
285+
return true;
240286
}
241287

242288
@Override
243289
public int hashCode() {
244290
int hash = 1;
245291
hash *= 1000003;
246-
hash ^= getOffset();
247-
hash *= 1000003;
248-
hash ^= getBucketCounts().hashCode();
249-
hash *= 1000003;
292+
// We need a new algorithm here that lines up w/ equals, so we only use non-zero counts.
293+
for (int idx = this.counts.getIndexStart(); idx <= this.counts.getIndexEnd(); idx++) {
294+
long count = this.counts.get(idx);
295+
if (count != 0) {
296+
hash ^= idx;
297+
hash *= 1000003;
298+
hash = (int) (hash ^ count);
299+
hash *= 1000003;
300+
}
301+
}
250302
hash ^= scale;
251303
// Don't need to hash getTotalCount() because equivalent bucket
252304
// counts imply equivalent overall count.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.metrics.internal.aggregator;
7+
8+
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
9+
10+
/** The configuration for how to create exponential histogram buckets. */
11+
final class ExponentialBucketStrategy {
12+
/** Starting scale of exponential buckets. */
13+
private final int scale;
14+
/** The maximum number of buckets that will be used for positive or negative recordings. */
15+
private final int maxBuckets;
16+
/** The mechanism of constructing and copying buckets. */
17+
private final ExponentialCounterFactory counterFactory;
18+
19+
private ExponentialBucketStrategy(
20+
int scale, int maxBuckets, ExponentialCounterFactory counterFactory) {
21+
this.scale = scale;
22+
this.maxBuckets = maxBuckets;
23+
this.counterFactory = counterFactory;
24+
}
25+
26+
/** Constructs fresh new buckets with default settings. */
27+
DoubleExponentialHistogramBuckets newBuckets() {
28+
return new DoubleExponentialHistogramBuckets(scale, maxBuckets, counterFactory);
29+
}
30+
31+
/** Create a new strategy for generating Exponential Buckets. */
32+
static ExponentialBucketStrategy newStrategy(
33+
int scale, int maxBuckets, ExponentialCounterFactory counterFactory) {
34+
return new ExponentialBucketStrategy(scale, maxBuckets, counterFactory);
35+
}
36+
}

0 commit comments

Comments
 (0)