Skip to content

Commit 58cc5ad

Browse files
committed
memmetrics: simplify locking and solve data race
This package was updating counters concurrently because of incorrect locking. There's not really a reason to have two separate locks or try to optimize with RW locks. This change replaces the two RW locks with one exclusive lock. The RTMetrics struct methods always acquire this exclusive lock. This is simple and will be easier to keep correct as the code evolves.
1 parent 1826c8c commit 58cc5ad

File tree

2 files changed

+45
-67
lines changed

2 files changed

+45
-67
lines changed

memmetrics/roundtrip.go

Lines changed: 37 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,15 @@ import (
1515
// are a rolling window histograms with defined precision as well.
1616
// See RTOptions for more detail on parameters.
1717
type RTMetrics struct {
18-
total *RollingCounter
19-
netErrors *RollingCounter
20-
statusCodes map[int]*RollingCounter
21-
statusCodesLock sync.RWMutex
22-
histogram *RollingHDRHistogram
23-
histogramLock sync.RWMutex
24-
25-
newCounter NewCounterFn
26-
newHist NewRollingHistogramFn
27-
clock timetools.TimeProvider
18+
// lock protects all data members.
19+
lock sync.Mutex
20+
total *RollingCounter
21+
netErrors *RollingCounter
22+
statusCodes map[int]*RollingCounter
23+
histogram *RollingHDRHistogram
24+
newCounter NewCounterFn
25+
newHist NewRollingHistogramFn
26+
clock timetools.TimeProvider
2827
}
2928

3029
type rrOptSetter func(r *RTMetrics) error
@@ -65,8 +64,7 @@ func RTClock(clock timetools.TimeProvider) rrOptSetter {
6564
// NewRTMetrics returns new instance of metrics collector.
6665
func NewRTMetrics(settings ...rrOptSetter) (*RTMetrics, error) {
6766
m := &RTMetrics{
68-
statusCodes: make(map[int]*RollingCounter),
69-
statusCodesLock: sync.RWMutex{},
67+
statusCodes: make(map[int]*RollingCounter),
7068
}
7169
for _, s := range settings {
7270
if err := s(m); err != nil {
@@ -113,14 +111,10 @@ func NewRTMetrics(settings ...rrOptSetter) (*RTMetrics, error) {
113111

114112
// Export Returns a new RTMetrics which is a copy of the current one
115113
func (m *RTMetrics) Export() *RTMetrics {
116-
m.statusCodesLock.RLock()
117-
defer m.statusCodesLock.RUnlock()
118-
m.histogramLock.RLock()
119-
defer m.histogramLock.RUnlock()
114+
m.lock.Lock()
115+
defer m.lock.Unlock()
120116

121117
export := &RTMetrics{}
122-
export.statusCodesLock = sync.RWMutex{}
123-
export.histogramLock = sync.RWMutex{}
124118
export.total = m.total.Clone()
125119
export.netErrors = m.netErrors.Clone()
126120
exportStatusCodes := map[int]*RollingCounter{}
@@ -140,12 +134,16 @@ func (m *RTMetrics) Export() *RTMetrics {
140134

141135
// CounterWindowSize gets total windows size
142136
func (m *RTMetrics) CounterWindowSize() time.Duration {
137+
m.lock.Lock()
138+
defer m.lock.Unlock()
143139
return m.total.WindowSize()
144140
}
145141

146142
// NetworkErrorRatio calculates the amont of network errors such as time outs and dropped connection
147143
// that occurred in the given time window compared to the total requests count.
148144
func (m *RTMetrics) NetworkErrorRatio() float64 {
145+
m.lock.Lock()
146+
defer m.lock.Unlock()
149147
if m.total.Count() == 0 {
150148
return 0
151149
}
@@ -154,10 +152,10 @@ func (m *RTMetrics) NetworkErrorRatio() float64 {
154152

155153
// ResponseCodeRatio calculates ratio of count(startA to endA) / count(startB to endB)
156154
func (m *RTMetrics) ResponseCodeRatio(startA, endA, startB, endB int) float64 {
155+
m.lock.Lock()
156+
defer m.lock.Unlock()
157157
a := int64(0)
158158
b := int64(0)
159-
m.statusCodesLock.RLock()
160-
defer m.statusCodesLock.RUnlock()
161159
for code, v := range m.statusCodes {
162160
if code < endA && code >= startA {
163161
a += v.Count()
@@ -174,6 +172,9 @@ func (m *RTMetrics) ResponseCodeRatio(startA, endA, startB, endB int) float64 {
174172

175173
// Append append a metric
176174
func (m *RTMetrics) Append(other *RTMetrics) error {
175+
m.lock.Lock()
176+
defer m.lock.Unlock()
177+
177178
if m == other {
178179
return errors.New("RTMetrics cannot append to self")
179180
}
@@ -188,10 +189,6 @@ func (m *RTMetrics) Append(other *RTMetrics) error {
188189

189190
copied := other.Export()
190191

191-
m.statusCodesLock.Lock()
192-
defer m.statusCodesLock.Unlock()
193-
m.histogramLock.Lock()
194-
defer m.histogramLock.Unlock()
195192
for code, c := range copied.statusCodes {
196193
o, ok := m.statusCodes[code]
197194
if ok {
@@ -208,6 +205,9 @@ func (m *RTMetrics) Append(other *RTMetrics) error {
208205

209206
// Record records a metric
210207
func (m *RTMetrics) Record(code int, duration time.Duration) {
208+
m.lock.Lock()
209+
defer m.lock.Unlock()
210+
211211
m.total.Inc(1)
212212
if code == http.StatusGatewayTimeout || code == http.StatusBadGateway {
213213
m.netErrors.Inc(1)
@@ -218,19 +218,24 @@ func (m *RTMetrics) Record(code int, duration time.Duration) {
218218

219219
// TotalCount returns total count of processed requests collected.
220220
func (m *RTMetrics) TotalCount() int64 {
221+
m.lock.Lock()
222+
defer m.lock.Unlock()
221223
return m.total.Count()
222224
}
223225

224226
// NetworkErrorCount returns total count of processed requests observed
225227
func (m *RTMetrics) NetworkErrorCount() int64 {
228+
m.lock.Lock()
229+
defer m.lock.Unlock()
226230
return m.netErrors.Count()
227231
}
228232

229233
// StatusCodesCounts returns map with counts of the response codes
230234
func (m *RTMetrics) StatusCodesCounts() map[int]int64 {
235+
m.lock.Lock()
236+
defer m.lock.Unlock()
237+
231238
sc := make(map[int]int64)
232-
m.statusCodesLock.RLock()
233-
defer m.statusCodesLock.RUnlock()
234239
for k, v := range m.statusCodes {
235240
if v.Count() != 0 {
236241
sc[k] = v.Count()
@@ -241,40 +246,32 @@ func (m *RTMetrics) StatusCodesCounts() map[int]int64 {
241246

242247
// LatencyHistogram computes and returns resulting histogram with latencies observed.
243248
func (m *RTMetrics) LatencyHistogram() (*HDRHistogram, error) {
244-
m.histogramLock.Lock()
245-
defer m.histogramLock.Unlock()
249+
m.lock.Lock()
250+
defer m.lock.Unlock()
246251
return m.histogram.Merged()
247252
}
248253

249254
// Reset reset metrics
250255
func (m *RTMetrics) Reset() {
251-
m.statusCodesLock.Lock()
252-
defer m.statusCodesLock.Unlock()
253-
m.histogramLock.Lock()
254-
defer m.histogramLock.Unlock()
256+
m.lock.Lock()
257+
defer m.lock.Unlock()
255258
m.histogram.Reset()
256259
m.total.Reset()
257260
m.netErrors.Reset()
258261
m.statusCodes = make(map[int]*RollingCounter)
259262
}
260263

264+
// WARNING: Lock must be held before calling.
261265
func (m *RTMetrics) recordLatency(d time.Duration) error {
262-
m.histogramLock.Lock()
263-
defer m.histogramLock.Unlock()
264266
return m.histogram.RecordLatencies(d, 1)
265267
}
266268

269+
// WARNING: Lock must be held before calling.
267270
func (m *RTMetrics) recordStatusCode(statusCode int) error {
268-
m.statusCodesLock.Lock()
269271
if c, ok := m.statusCodes[statusCode]; ok {
270272
c.Inc(1)
271-
m.statusCodesLock.Unlock()
272273
return nil
273274
}
274-
m.statusCodesLock.Unlock()
275-
276-
m.statusCodesLock.Lock()
277-
defer m.statusCodesLock.Unlock()
278275

279276
// Check if another goroutine has written our counter already
280277
if c, ok := m.statusCodes[statusCode]; ok {

memmetrics/roundtrip_test.go

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package memmetrics
22

33
import (
44
"runtime"
5-
"sync"
65
"testing"
76
"time"
87

@@ -75,28 +74,29 @@ func TestAppend(t *testing.T) {
7574
}
7675

7776
func TestConcurrentRecords(t *testing.T) {
78-
// This test asserts a race condition which requires parallelism
77+
// This test asserts a race condition which requires concurrency. Set
78+
// GOMAXPROCS high for this test, then restore after test completes.
79+
n := runtime.GOMAXPROCS(0)
7980
runtime.GOMAXPROCS(100)
81+
defer runtime.GOMAXPROCS(n)
8082

8183
rr, err := NewRTMetrics(RTClock(testutils.GetClock()))
8284
require.NoError(t, err)
8385

8486
for code := 0; code < 100; code++ {
8587
for numRecords := 0; numRecords < 10; numRecords++ {
8688
go func(statusCode int) {
87-
_ = rr.recordStatusCode(statusCode)
89+
rr.Record(statusCode, time.Second)
8890
}(code)
8991
}
9092
}
9193
}
9294

9395
func TestRTMetricExportReturnsNewCopy(t *testing.T) {
9496
a := RTMetrics{
95-
clock: &timetools.RealTime{},
96-
statusCodes: map[int]*RollingCounter{},
97-
statusCodesLock: sync.RWMutex{},
98-
histogram: &RollingHDRHistogram{},
99-
histogramLock: sync.RWMutex{},
97+
clock: &timetools.RealTime{},
98+
statusCodes: map[int]*RollingCounter{},
99+
histogram: &RollingHDRHistogram{},
100100
}
101101

102102
var err error
@@ -129,23 +129,4 @@ func TestRTMetricExportReturnsNewCopy(t *testing.T) {
129129
assert.NotNil(t, b.newCounter)
130130
assert.NotNil(t, b.newHist)
131131
assert.NotNil(t, b.clock)
132-
133-
// a and b should have different locks
134-
locksSucceed := make(chan bool)
135-
go func() {
136-
a.statusCodesLock.Lock()
137-
b.statusCodesLock.Lock()
138-
a.histogramLock.Lock()
139-
b.histogramLock.Lock()
140-
locksSucceed <- true
141-
}()
142-
143-
for {
144-
select {
145-
case <-locksSucceed:
146-
return
147-
case <-time.After(10 * time.Second):
148-
t.FailNow()
149-
}
150-
}
151132
}

0 commit comments

Comments
 (0)