Skip to content

Commit 9168cd6

Browse files
Merge branch 'master' into crawl-rule
2 parents be32a2f + bab1fa3 commit 9168cd6

File tree

9 files changed

+77
-45
lines changed

9 files changed

+77
-45
lines changed

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ require (
88
github.com/gorilla/mux v1.8.1
99
github.com/prometheus/client_golang v1.22.0
1010
github.com/stretchr/testify v1.10.0
11-
go.etcd.io/etcd/api/v3 v3.6.0
12-
go.etcd.io/etcd/client/v3 v3.6.0
11+
go.etcd.io/etcd/api/v3 v3.6.1
12+
go.etcd.io/etcd/client/v3 v3.6.1
1313
go.uber.org/zap v1.27.0
1414
golang.org/x/net v0.41.0
1515
)
@@ -28,7 +28,7 @@ require (
2828
github.com/prometheus/client_model v0.6.1 // indirect
2929
github.com/prometheus/common v0.62.0 // indirect
3030
github.com/prometheus/procfs v0.15.1 // indirect
31-
go.etcd.io/etcd/client/pkg/v3 v3.6.0 // indirect
31+
go.etcd.io/etcd/client/pkg/v3 v3.6.1 // indirect
3232
go.uber.org/multierr v1.11.0 // indirect
3333
golang.org/x/sys v0.33.0 // indirect
3434
golang.org/x/text v0.26.0 // indirect

go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
5353
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
5454
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
5555
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
56-
go.etcd.io/etcd/api/v3 v3.6.0 h1:vdbkcUBGLf1vfopoGE/uS3Nv0KPyIpUV/HM6w9yx2kM=
57-
go.etcd.io/etcd/api/v3 v3.6.0/go.mod h1:Wt5yZqEmxgTNJGHob7mTVBJDZNXiHPtXTcPab37iFOw=
58-
go.etcd.io/etcd/client/pkg/v3 v3.6.0 h1:nchnPqpuxvv3UuGGHaz0DQKYi5EIW5wOYsgUNRc365k=
59-
go.etcd.io/etcd/client/pkg/v3 v3.6.0/go.mod h1:Jv5SFWMnGvIBn8o3OaBq/PnT0jjsX8iNokAUessNjoA=
60-
go.etcd.io/etcd/client/v3 v3.6.0 h1:/yjKzD+HW5v/3DVj9tpwFxzNbu8hjcKID183ug9duWk=
61-
go.etcd.io/etcd/client/v3 v3.6.0/go.mod h1:Jzk/Knqe06pkOZPHXsQ0+vNDvMQrgIqJ0W8DwPdMJMg=
56+
go.etcd.io/etcd/api/v3 v3.6.1 h1:yJ9WlDih9HT457QPuHt/TH/XtsdN2tubyxyQHSHPsEo=
57+
go.etcd.io/etcd/api/v3 v3.6.1/go.mod h1:lnfuqoGsXMlZdTJlact3IB56o3bWp1DIlXPIGKRArto=
58+
go.etcd.io/etcd/client/pkg/v3 v3.6.1 h1:CxDVv8ggphmamrXM4Of8aCC8QHzDM4tGcVr9p2BSoGk=
59+
go.etcd.io/etcd/client/pkg/v3 v3.6.1/go.mod h1:aTkCp+6ixcVTZmrJGa7/Mc5nMNs59PEgBbq+HCmWyMc=
60+
go.etcd.io/etcd/client/v3 v3.6.1 h1:KelkcizJGsskUXlsxjVrSmINvMMga0VWwFF0tSPGEP0=
61+
go.etcd.io/etcd/client/v3 v3.6.1/go.mod h1:fCbPUdjWNLfx1A6ATo9syUmFVxqHH9bCnPLBZmnLmMY=
6262
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
6363
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
6464
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=

rules/engine.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,10 @@ func newV3Engine(logger *zap.Logger, cl *v3.Client, options ...EngineOption) v3E
116116
}
117117
var baseEtcdLocker lock.RuleLocker
118118
if opts.dontUseSharedLockSession {
119-
baseEtcdLocker = lock.NewV3Locker(cl, opts.lockAcquisitionTimeout, opts.useTryLock)
119+
baseEtcdLocker = lock.NewV3Locker(cl, opts.lockAcquisitionTimeout, opts.useTryLock, logger)
120120
} else {
121121
sessionManager := concurrency.NewSessionManager(cl, logger)
122-
baseEtcdLocker = lock.NewSessionLocker(sessionManager.GetSession, opts.lockAcquisitionTimeout, false, opts.useTryLock)
122+
baseEtcdLocker = lock.NewSessionLocker(sessionManager.GetSession, opts.lockAcquisitionTimeout, false, opts.useTryLock, logger)
123123
}
124124
metricsEtcdLocker := lock.WithMetrics(baseEtcdLocker, "etcd")
125125
var baseLocker lock.RuleLocker

rules/engine_lock_test.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ func Test_ReentrantLocking(t *testing.T) {
3939
var rlckr lock.RuleLocker
4040
if useShared {
4141
sessionManager := concurrency.NewSessionManager(cl, logger)
42-
rlckr = lock.NewSessionLocker(sessionManager.GetSession, 5, closeSession, useTryLock)
42+
rlckr = lock.NewSessionLocker(sessionManager.GetSession, 5, closeSession, useTryLock, logger)
4343
} else {
44-
baseEtcdLocker := lock.NewV3Locker(cl, 5, useTryLock)
44+
baseEtcdLocker := lock.NewV3Locker(cl, 5, useTryLock, logger)
4545
baseMapLocker := lock.NewMapLocker()
4646
rlckr = lock.NewNestedLocker(baseMapLocker, baseEtcdLocker)
4747
}
@@ -54,11 +54,7 @@ func Test_ReentrantLocking(t *testing.T) {
5454
} else {
5555
assert.Error(t, err2)
5656
}
57-
if closeSession {
58-
assert.Error(t, rlck.Unlock())
59-
} else {
60-
assert.NoError(t, rlck.Unlock())
61-
}
57+
assert.NoError(t, rlck.Unlock())
6258
})
6359
}
6460
}

rules/etcd.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type etcdV3ReadAPI struct {
1616
kV v3.KV
1717
}
1818

19+
var ErrWatcherClosing = errors.New("Watcher closing")
20+
1921
// This method is currently not used but is being kept around to limit
2022
// the blast radius of implementing batch gets for rule evaluations.
2123
// The arrangement of interfaces is not ideal and should be addressed
@@ -142,7 +144,7 @@ func (ev3kw *etcdV3KeyWatcher) next() (string, *string, error) {
142144
ev3kw.reset()
143145
err := ev3kw.w.Close()
144146
if err == nil {
145-
err = errors.New("Watcher closing")
147+
err = ErrWatcherClosing
146148
}
147149
return "", nil, err
148150
case wr, stillOpen := <-ev3kw.ch:

rules/lock/lock.go

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package lock
22

33
import (
44
"errors"
5+
"strings"
56
"time"
67

8+
"go.uber.org/zap"
79
"golang.org/x/net/context"
810

911
v3 "go.etcd.io/etcd/client/v3"
@@ -21,28 +23,29 @@ type RuleLock interface {
2123
type GetSession func(context.Context) (*v3c.Session, error)
2224

2325
// NewV3Locker creates a locker backed by etcd V3.
24-
func NewV3Locker(cl *v3.Client, lockTimeout int, useTryLock bool) RuleLocker {
26+
func NewV3Locker(cl *v3.Client, lockTimeout int, useTryLock bool, logger *zap.Logger) RuleLocker {
2527
// The TTL is for the lease associated with the session, in seconds. While the session is still open,
2628
// the lease's TTL will keep getting renewed to keep it from expiring, so all this really does is
2729
// set the amount of time it takes for the lease to expire if the lease stops being renewed due
2830
// to the application shutting down before a session could be properly closed.
2931
newSession := func(_ context.Context) (*v3c.Session, error) {
3032
return v3c.NewSession(cl, v3c.WithTTL(30))
3133
}
32-
return NewSessionLocker(newSession, lockTimeout, true, useTryLock)
34+
return NewSessionLocker(newSession, lockTimeout, true, useTryLock, logger)
3335
}
3436

3537
// NewSessionLocker creates a new locker with the provided session constructor. Note that
3638
// if closeSession is false, it means that the session provided by getSession will not be
3739
// closed but instead be reused. In that case the locker must be protected by another locker
3840
// (for instance an in-memory locker) because locks within the same session are reentrant
3941
// which means that two goroutines can obtain the same lock.
40-
func NewSessionLocker(getSession GetSession, lockTimeout int, closeSession, useTryLock bool) RuleLocker {
42+
func NewSessionLocker(getSession GetSession, lockTimeout int, closeSession, useTryLock bool, logger *zap.Logger) RuleLocker {
4143
return &v3Locker{
4244
lockTimeout: lockTimeout,
4345
newSession: getSession,
4446
closeSession: closeSession,
4547
useTryLock: useTryLock,
48+
logger: logger,
4649
}
4750
}
4851

@@ -51,12 +54,13 @@ type v3Locker struct {
5154
newSession GetSession
5255
closeSession bool
5356
useTryLock bool
57+
logger *zap.Logger
5458
}
5559

5660
func (v3l *v3Locker) Lock(key string, options ...Option) (RuleLock, error) {
57-
return v3l.lockWithTimeout(key, v3l.lockTimeout)
61+
return v3l.lockWithTimeout(key, v3l.lockTimeout, v3l.logger)
5862
}
59-
func (v3l *v3Locker) lockWithTimeout(key string, timeout int) (RuleLock, error) {
63+
func (v3l *v3Locker) lockWithTimeout(key string, timeout int, logger *zap.Logger) (RuleLock, error) {
6064
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
6165
defer cancel()
6266
s, err := v3l.newSession(ctx)
@@ -73,7 +77,8 @@ func (v3l *v3Locker) lockWithTimeout(key string, timeout int) (RuleLock, error)
7377
return nil, err
7478
}
7579
lock := &v3Lock{
76-
mutex: m,
80+
mutex: m,
81+
logger: logger,
7782
}
7883
if v3l.closeSession {
7984
lock.session = s
@@ -84,24 +89,45 @@ func (v3l *v3Locker) lockWithTimeout(key string, timeout int) (RuleLock, error)
8489
type v3Lock struct {
8590
mutex *v3c.Mutex
8691
session *v3c.Session
92+
logger *zap.Logger
8793
}
8894

8995
// ErrNilMutex indicates that the lock has a nil mutex
9096
var ErrNilMutex = errors.New("mutex is nil")
9197

98+
// Max number of unlock retries
99+
var unlockMaxRetries = 5
100+
101+
// This should be given every chance to complete, otherwise
102+
// a lock could prevent future interactions with a resource.
92103
func (v3l *v3Lock) Unlock(_ ...Option) error {
93104
if v3l.mutex != nil {
94-
// This should be given every chance to complete, otherwise
95-
// a lock could prevent future interactions with a resource.
96-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
97-
defer cancel()
98-
err := v3l.mutex.Unlock(ctx)
105+
var err error
106+
for i := range unlockMaxRetries {
107+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
108+
err = v3l.mutex.Unlock(ctx)
109+
cancel()
110+
if err != nil {
111+
v3l.logger.Warn("Unlock error", zap.String("key", v3l.mutex.Key()), zap.Int("attempt", i+1), zap.Error(err))
112+
time.Sleep(time.Minute)
113+
} else {
114+
break
115+
}
116+
}
117+
if err != nil {
118+
err = errors.Join(errors.New("Unlock:"), err)
119+
}
120+
99121
// If the lock failed to be released, as least closing the session
100122
// will allow the lease it is associated with to expire.
101123
if v3l.session != nil {
102124
serr := v3l.session.Close()
103-
if err == nil {
104-
err = serr
125+
// The Unlock will close the session in some cases
126+
if serr != nil && strings.Contains(serr.Error(), "lease not found") {
127+
serr = nil
128+
}
129+
if err == nil && serr != nil {
130+
err = errors.Join(errors.New("Close:"), serr)
105131
}
106132
}
107133
return err

rules/lock/lock_test.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import (
77
"github.com/stretchr/testify/require"
88
v3 "go.etcd.io/etcd/client/v3"
99
v3c "go.etcd.io/etcd/client/v3/concurrency"
10+
"go.uber.org/zap"
1011
"golang.org/x/net/context"
1112

1213
"github.com/IBM-Cloud/go-etcd-rules/rules/teststore"
1314
)
1415

1516
func Test_V3Locker(t *testing.T) {
17+
logger, _ := zap.NewDevelopment()
1618
cfg, cl := teststore.InitV3Etcd(t)
1719
c, err := v3.New(cfg)
1820
require.NoError(t, err)
@@ -31,29 +33,31 @@ func Test_V3Locker(t *testing.T) {
3133
rlckr := v3Locker{
3234
newSession: newSession,
3335
lockTimeout: 5,
36+
logger: logger,
3437
}
35-
rlck, err1 := rlckr.Lock("test")
36-
assert.NoError(t, err1)
37-
_, err2 := rlckr.lockWithTimeout("test", 10)
38-
assert.Error(t, err2)
39-
assert.NoError(t, rlck.Unlock())
38+
rlck, err := rlckr.Lock("test")
39+
assert.NoError(t, err, err)
40+
_, err = rlckr.lockWithTimeout("test", 10, logger)
41+
assert.Error(t, err, err)
42+
err = rlck.Unlock()
43+
assert.NoError(t, err, err)
4044

4145
done1 := make(chan bool)
4246
done2 := make(chan bool)
43-
4447
go func() {
45-
lckr := NewV3Locker(c, 5, useTryLock)
46-
lck, lErr := lckr.Lock("test1")
47-
assert.NoError(t, lErr)
48+
lckr := NewV3Locker(c, 5, useTryLock, logger)
49+
lck, err := lckr.Lock("test1")
50+
assert.NoError(t, err, err)
4851
done1 <- true
4952
<-done2
5053
if lck != nil {
51-
assert.NoError(t, lck.Unlock())
54+
err = lck.Unlock()
55+
assert.NoError(t, err, err)
5256
}
5357
}()
5458
<-done1
5559
_, err = rlckr.Lock("test1")
56-
assert.Error(t, err)
60+
assert.Error(t, err, err)
5761
done2 <- true
5862
})
5963
}

rules/lock/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ func (ml metricLocker) Lock(key string, options ...Option) (RuleLock, error) {
3434
}
3535

3636
func (ml metricLocker) Unlock(options ...Option) error {
37-
opts := buildOptions(options...)
3837
err := ml.RuleLock.Unlock(options...)
3938
if err != nil {
39+
opts := buildOptions(options...)
4040
ml.observeUnlockError(ml.lockerName, opts.method, opts.pattern)
4141
}
4242
return err

rules/watcher.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package rules
22

33
import (
4+
"errors"
45
"strings"
56
"time"
67

@@ -54,11 +55,14 @@ func (w *watcher) isStopped() bool {
5455
func (w *watcher) singleRun() {
5556
key, value, err := w.kw.next()
5657
if err != nil {
57-
w.logger.Error("Watcher error", zap.Error(err))
5858
if strings.Contains(err.Error(), "connection refused") {
5959
w.logger.Info("Cluster unavailable; waiting one minute to retry")
6060
time.Sleep(time.Minute)
6161
} else {
62+
// Watcher are always closed periodically, no need to log that
63+
if !errors.Is(err, ErrWatcherClosing) {
64+
w.logger.Error("Watcher error", zap.Error(err))
65+
}
6266
// Maximum logging rate is 1 per second.
6367
time.Sleep(time.Second)
6468
}

0 commit comments

Comments
 (0)