-
Notifications
You must be signed in to change notification settings - Fork 784
Closed
Labels
defectSuspected defect such as a bug or regressionSuspected defect such as a bug or regression
Description
Observed behavior
Calling kv.WatchFiltered() with -race can detect a data race on the updates channel.
Expected behavior
-race shows no data races
Server and client version
Client: commit 4ec2f44 (current HEAD)
Server: v2.12.1
Host environment
I'm working on a darwin/arm64 system
Steps to reproduce
- Create the file
jetstream/test/kv_race_test.gowith this content:
jetstream/test/kv_race_test.go
package test
import (
"context"
"errors"
"net"
"sync/atomic"
"testing"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
type customConn struct {
net.Conn
read func(*customConn, []byte) (int, error)
}
func (c *customConn) Read(b []byte) (int, error) {
return c.read(c, b)
}
type customDialer struct {
dial func(network, address string) (net.Conn, error)
}
func (d *customDialer) Dial(network, address string) (net.Conn, error) {
return d.dial(network, address)
##}
func TestKeyValueWatchRaceOnCancelCtx(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
// when this is true, reading a message will cancel the context
var cancelContextOnRead atomic.Bool
// dialer that wraps connections to inject our conn that cancels context on read
dialer := &customDialer{
dial: func(network, address string) (net.Conn, error) {
conn, err := net.Dial(network, address)
return &customConn{
Conn: conn,
read: func(c *customConn, b []byte) (int, error) {
n, readErr := c.Conn.Read(b)
if cancelContextOnRead.Load() {
go cancel()
}
return n, readErr
},
}, err
},
}
nc, err := nats.Connect(s.ClientURL(), nats.SetCustomDialer(dialer))
if err != nil {
t.Fatalf("connectingt: %v", err)
}
defer nc.Close()
js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("creating js: %v", err)
}
kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: t.Name()})
if err != nil {
t.Fatalf("creating kv: %v", err)
}
// the next read will cancel the context
cancelContextOnRead.Store(true)
_, err = kv.WatchFiltered(ctx, nil)
// We don't know if context will be canceled before or after Watch returns
if errors.Is(err, context.Canceled) {
err = nil
}
expectOk(t, err)
}-
Run
go test -modfile=go_test.mod ./jetstream/test -run TestKeyValueWatchRaceOnCancelCtx -race -count 1- You may need to repeat this a few times for the race to be detected
-
Observe "DATA RACE" output:
output
==================
WARNING: DATA RACE
Write at 0x00c0000bc470 by goroutine 42:
runtime.recvDirect()
go/pkg/mod/golang.org/[email protected]/src/runtime/chan.go:405 +0x7c
github.com/nats-io/nats.go/jetstream.(*kvs).WatchFiltered.func2()
repos/nats-io/nats.go/jetstream/kv.go:1294 +0x40
github.com/nats-io/nats%2ego.(*Conn).waitForMsgs()
repos/nats-io/nats.go/nats.go:3354 +0x68c
github.com/nats-io/nats%2ego.(*Conn).subscribeLocked.gowrap1()
repos/nats-io/nats.go/nats.go:4600 +0x3c
Previous read at 0x00c0000bc470 by goroutine 7:
runtime.chansend1()
go/pkg/mod/golang.org/[email protected]/src/runtime/chan.go:162 +0x2c
github.com/nats-io/nats.go/jetstream.(*kvs).WatchFiltered()
repos/nats-io/nats.go/jetstream/kv.go:1304 +0x112c
github.com/nats-io/nats.go/jetstream/test.TestKeyValueWatchRaceOnCancelCtx()
repos/nats-io/nats.go/jetstream/test/kv_race_test.go:77 +0x430
testing.tRunner()
go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1934 +0x164
testing.(*T).Run.gowrap1()
go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1997 +0x3c
Goroutine 42 (running) created at:
github.com/nats-io/nats%2ego.(*Conn).subscribeLocked()
repos/nats-io/nats.go/nats.go:4600 +0x588
github.com/nats-io/nats%2ego.(*Conn).subscribe()
repos/nats-io/nats.go/nats.go:4535 +0xc8
github.com/nats-io/nats%2ego.(*js).subscribe()
repos/nats-io/nats.go/js.go:1927 +0x1860
github.com/nats-io/nats%2ego.(*js).Subscribe()
repos/nats-io/nats.go/js.go:1508 +0x74
github.com/nats-io/nats.go/jetstream.(*kvs).WatchFiltered()
repos/nats-io/nats.go/jetstream/kv.go:1285 +0xf10
github.com/nats-io/nats.go/jetstream/test.TestKeyValueWatchRaceOnCancelCtx()
repos/nats-io/nats.go/jetstream/test/kv_race_test.go:77 +0x430
testing.tRunner()
go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1934 +0x164
testing.(*T).Run.gowrap1()
go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1997 +0x3c
Goroutine 7 (running) created at:
testing.(*T).Run()
go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1997 +0x6e0
testing.runTests.func1()
go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:2477 +0x74
testing.tRunner()
go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1934 +0x164
testing.runTests()
go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:2475 +0x734
testing.(*M).Run()
go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:2337 +0xaf4
main.main()
_testmain.go:339 +0x100
==================
--- FAIL: TestKeyValueWatchRaceOnCancelCtx (0.05s)
testing.go:1617: race detected during execution of test
FAIL
FAIL github.com/nats-io/nats.go/jetstream/test 0.389s
FAIL
Metadata
Metadata
Assignees
Labels
defectSuspected defect such as a bug or regressionSuspected defect such as a bug or regression