Skip to content

Race on closing updates channel in WatchFiltered #1964

@WillAbides

Description

@WillAbides

Observed behavior

Calling kv.WatchFiltered() with -race can detect a data race on the updates channel.

Expected behavior

-race shows no data races

Server and client version

Client: commit 4ec2f44 (current HEAD)
Server: v2.12.1

Host environment

I'm working on a darwin/arm64 system

Steps to reproduce

  1. Create the file jetstream/test/kv_race_test.go with this content:
jetstream/test/kv_race_test.go
package test

import (
	"context"
	"errors"
	"net"
	"sync/atomic"
	"testing"

	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
)

type customConn struct {
	net.Conn
	read func(*customConn, []byte) (int, error)
}

func (c *customConn) Read(b []byte) (int, error) {
	return c.read(c, b)
}

type customDialer struct {
	dial func(network, address string) (net.Conn, error)
}

func (d *customDialer) Dial(network, address string) (net.Conn, error) {
	return d.dial(network, address)
  ##}

func TestKeyValueWatchRaceOnCancelCtx(t *testing.T) {
	ctx, cancel := context.WithCancel(t.Context())
	defer cancel()

	s := RunBasicJetStreamServer()
	defer shutdownJSServerAndRemoveStorage(t, s)

	// when this is true, reading a message will cancel the context
	var cancelContextOnRead atomic.Bool

	// dialer that wraps connections to inject our conn that cancels context on read
	dialer := &customDialer{
		dial: func(network, address string) (net.Conn, error) {
			conn, err := net.Dial(network, address)
			return &customConn{
				Conn: conn,
				read: func(c *customConn, b []byte) (int, error) {
					n, readErr := c.Conn.Read(b)
					if cancelContextOnRead.Load() {
						go cancel()
					}
					return n, readErr
				},
			}, err
		},
	}

	nc, err := nats.Connect(s.ClientURL(), nats.SetCustomDialer(dialer))
	if err != nil {
		t.Fatalf("connectingt: %v", err)
	}
	defer nc.Close()

	js, err := jetstream.New(nc)
	if err != nil {
		t.Fatalf("creating js: %v", err)
	}

	kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: t.Name()})
	if err != nil {
		t.Fatalf("creating kv: %v", err)
	}

	// the next read will cancel the context
	cancelContextOnRead.Store(true)

	_, err = kv.WatchFiltered(ctx, nil)

	// We don't know if context will be canceled before or after Watch returns
	if errors.Is(err, context.Canceled) {
		err = nil
	}
	expectOk(t, err)
}
  1. Run go test -modfile=go_test.mod ./jetstream/test -run TestKeyValueWatchRaceOnCancelCtx -race -count 1

    • You may need to repeat this a few times for the race to be detected
  2. Observe "DATA RACE" output:

output
==================
WARNING: DATA RACE
Write at 0x00c0000bc470 by goroutine 42:
  runtime.recvDirect()
      go/pkg/mod/golang.org/[email protected]/src/runtime/chan.go:405 +0x7c
  github.com/nats-io/nats.go/jetstream.(*kvs).WatchFiltered.func2()
      repos/nats-io/nats.go/jetstream/kv.go:1294 +0x40
  github.com/nats-io/nats%2ego.(*Conn).waitForMsgs()
      repos/nats-io/nats.go/nats.go:3354 +0x68c
  github.com/nats-io/nats%2ego.(*Conn).subscribeLocked.gowrap1()
      repos/nats-io/nats.go/nats.go:4600 +0x3c

Previous read at 0x00c0000bc470 by goroutine 7:
  runtime.chansend1()
      go/pkg/mod/golang.org/[email protected]/src/runtime/chan.go:162 +0x2c
  github.com/nats-io/nats.go/jetstream.(*kvs).WatchFiltered()
      repos/nats-io/nats.go/jetstream/kv.go:1304 +0x112c
  github.com/nats-io/nats.go/jetstream/test.TestKeyValueWatchRaceOnCancelCtx()
      repos/nats-io/nats.go/jetstream/test/kv_race_test.go:77 +0x430
  testing.tRunner()
      go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1934 +0x164
  testing.(*T).Run.gowrap1()
      go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1997 +0x3c

Goroutine 42 (running) created at:
  github.com/nats-io/nats%2ego.(*Conn).subscribeLocked()
      repos/nats-io/nats.go/nats.go:4600 +0x588
  github.com/nats-io/nats%2ego.(*Conn).subscribe()
      repos/nats-io/nats.go/nats.go:4535 +0xc8
  github.com/nats-io/nats%2ego.(*js).subscribe()
      repos/nats-io/nats.go/js.go:1927 +0x1860
  github.com/nats-io/nats%2ego.(*js).Subscribe()
      repos/nats-io/nats.go/js.go:1508 +0x74
  github.com/nats-io/nats.go/jetstream.(*kvs).WatchFiltered()
      repos/nats-io/nats.go/jetstream/kv.go:1285 +0xf10
  github.com/nats-io/nats.go/jetstream/test.TestKeyValueWatchRaceOnCancelCtx()
      repos/nats-io/nats.go/jetstream/test/kv_race_test.go:77 +0x430
  testing.tRunner()
      go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1934 +0x164
  testing.(*T).Run.gowrap1()
      go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1997 +0x3c

Goroutine 7 (running) created at:
  testing.(*T).Run()
      go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1997 +0x6e0
  testing.runTests.func1()
      go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:2477 +0x74
  testing.tRunner()
      go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1934 +0x164
  testing.runTests()
      go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:2475 +0x734
  testing.(*M).Run()
      go/pkg/mod/golang.org/[email protected]/src/testing/testing.go:2337 +0xaf4
  main.main()
      _testmain.go:339 +0x100
==================
--- FAIL: TestKeyValueWatchRaceOnCancelCtx (0.05s)
    testing.go:1617: race detected during execution of test
FAIL
FAIL    github.com/nats-io/nats.go/jetstream/test       0.389s
FAIL

Metadata

Metadata

Assignees

Labels

defectSuspected defect such as a bug or regression

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions