Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error)

internalCtx, cancel := context.WithCancel(context.Background())

// Auto-calculate internal watcher buffer size if not explicitly set.
// The internal store watcher is critical infrastructure and should have
// sufficient buffer to handle burst traffic without lagging.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should but that's for performance, cache should work correctly no matter how big buffer it has. Correctly here doesn't mean effectively, just means it should not fail tests.

Test should validate correctness not performance, so that's why my surprise in the need to change cache node and not just test code.

internalBufferSize := cfg.InternalWatcherBufferSize
if internalBufferSize == 0 {
// Use the larger of: 4x client buffer or 1/4 of history window
internalBufferSize = cfg.PerWatcherBufferSize * 4
if historyQuarter := cfg.HistoryWindowSize / 4; historyQuarter > internalBufferSize {
internalBufferSize = historyQuarter
}
// Ensure minimum buffer size of 10 for reasonable throughput
if internalBufferSize < 10 {
internalBufferSize = 10
}
}

cache := &Cache{
prefix: prefix,
cfg: cfg,
Expand All @@ -74,6 +90,8 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error)
stop: cancel,
internalCtx: internalCtx,
}
// Store the calculated buffer size back to config for use in watch()
cache.cfg.InternalWatcherBufferSize = internalBufferSize

cache.demux = NewDemux(internalCtx, &cache.waitGroup, cfg.HistoryWindowSize, cfg.ResyncInterval)

Expand Down Expand Up @@ -241,7 +259,7 @@ func (c *Cache) get(ctx context.Context) (*clientv3.GetResponse, error) {
func (c *Cache) watch(rev int64) error {
readyOnce := sync.Once{}
for {
storeW := newWatcher(c.cfg.PerWatcherBufferSize, nil)
storeW := newInternalWatcher(c.cfg.InternalWatcherBufferSize, nil)
c.demux.Register(storeW, rev)
applyErr := make(chan error, 1)
c.waitGroup.Add(1)
Expand Down Expand Up @@ -269,6 +287,10 @@ func (c *Cache) applyStorage(storeW *watcher) error {
return nil
case resp, ok := <-storeW.respCh:
if !ok {
// Channel closed. Check if there's an error response.
if storeW.cancelResp != nil {
return fmt.Errorf("%s", storeW.cancelResp.CancelReason)
}
return nil
}
if err := c.store.Apply(resp); err != nil {
Expand Down
25 changes: 17 additions & 8 deletions cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ package cache
import "time"

type Config struct {
// PerWatcherBufferSize caps each watchers buffered channel.
// PerWatcherBufferSize caps each watcher's buffered channel.
// Bigger values tolerate brief client slow-downs at the cost of extra memory.
PerWatcherBufferSize int
// InternalWatcherBufferSize caps the internal store watcher's buffered channel.
// This should be sized for system throughput to prevent the store watcher from lagging.
// If 0, defaults to max(PerWatcherBufferSize * 4, HistoryWindowSize / 4).
InternalWatcherBufferSize int
// HistoryWindowSize is the max events kept in memory for replay.
// It defines how far back the cache can replay events to lagging watchers
HistoryWindowSize int
Expand All @@ -38,13 +42,14 @@ type Config struct {
// TODO: tune via performance/load tests.
func defaultConfig() Config {
return Config{
PerWatcherBufferSize: 10,
HistoryWindowSize: 2048,
ResyncInterval: 50 * time.Millisecond,
InitialBackoff: 50 * time.Millisecond,
MaxBackoff: 2 * time.Second,
GetTimeout: 5 * time.Second,
BTreeDegree: 32,
PerWatcherBufferSize: 10,
InternalWatcherBufferSize: 0, // Auto-calculated based on HistoryWindowSize
HistoryWindowSize: 2048,
ResyncInterval: 50 * time.Millisecond,
InitialBackoff: 50 * time.Millisecond,
MaxBackoff: 2 * time.Second,
GetTimeout: 5 * time.Second,
BTreeDegree: 32,
}
}

Expand Down Expand Up @@ -77,3 +82,7 @@ func WithGetTimeout(d time.Duration) Option {
func WithBTreeDegree(n int) Option {
return func(c *Config) { c.BTreeDegree = n }
}

func WithInternalWatcherBufferSize(n int) Option {
return func(c *Config) { c.InternalWatcherBufferSize = n }
}
53 changes: 46 additions & 7 deletions cache/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,39 @@ func (d *demux) Register(w *watcher, startingRev int64) {
d.mu.Lock()
defer d.mu.Unlock()

// Internal watchers (store watcher) drive the demux state.
// If registering an internal watcher with a revision behind maxRev,
// it means we're restarting after a failure - purge and reset.
if w.isInternal && d.maxRev > 0 && startingRev <= d.maxRev {
d.purge()
d.minRev = startingRev
}

if d.maxRev == 0 {
if startingRev == 0 {
// Internal watchers should always be active, never lagging.
if w.isInternal {
d.activeWatchers[w] = startingRev
} else if startingRev == 0 {
d.activeWatchers[w] = 0
} else {
d.laggingWatchers[w] = startingRev
}
return
}

// Special case: 0 means newest.
// Special case: 0 means "newest".
if startingRev == 0 {
startingRev = d.maxRev + 1
}

if startingRev <= d.maxRev {
d.laggingWatchers[w] = startingRev
if w.isInternal {
// Internal watchers should always be active. If we reach here after
// the purge above didn't trigger, register as active anyway.
d.activeWatchers[w] = startingRev
} else {
d.laggingWatchers[w] = startingRev
}
} else {
d.activeWatchers[w] = startingRev
}
Expand Down Expand Up @@ -216,8 +233,15 @@ func (d *demux) broadcastEventsLocked(events []*clientv3.Event) {

for w, nextRev := range d.activeWatchers {
if nextRev != 0 && firstRev > nextRev {
d.laggingWatchers[w] = nextRev
delete(d.activeWatchers, w)
if w.isInternal {
// Internal watchers (like store watcher) should never be resynced.
// If they lag, stop them with error to trigger watch restart.
w.StopWithError("cache: internal store watcher lagged behind")
delete(d.activeWatchers, w)
} else {
d.laggingWatchers[w] = nextRev
delete(d.activeWatchers, w)
}
continue
}

Expand All @@ -236,8 +260,15 @@ func (d *demux) broadcastEventsLocked(events []*clientv3.Event) {
if !w.enqueueResponse(clientv3.WatchResponse{
Events: events[sendStart:],
}) { // overflow → lagging
d.laggingWatchers[w] = nextRev
delete(d.activeWatchers, w)
if w.isInternal {
// Internal watchers (like store watcher) should never be resynced.
// If they lag, stop them with error to trigger watch restart.
w.StopWithError("cache: internal store watcher buffer overflow")
delete(d.activeWatchers, w)
} else {
d.laggingWatchers[w] = nextRev
delete(d.activeWatchers, w)
}
} else {
d.activeWatchers[w] = lastRev + 1
}
Expand Down Expand Up @@ -282,6 +313,14 @@ func (d *demux) resyncLaggingWatchers() {
}

for w, nextRev := range d.laggingWatchers {
// Internal watchers should never be resynced (defensive check).
// They should have been stopped in broadcastEventsLocked.
if w.isInternal {
w.StopWithError("cache: internal watcher should not be in lagging watchers")
delete(d.laggingWatchers, w)
continue
}

if nextRev < d.minRev {
w.Compact(nextRev)
delete(d.laggingWatchers, w)
Expand Down
29 changes: 26 additions & 3 deletions cache/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,28 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
)

// watcher holds one clients buffered stream of events.
// watcher holds one client's buffered stream of events.
type watcher struct {
respCh chan clientv3.WatchResponse
cancelResp *clientv3.WatchResponse
keyPred KeyPredicate
isInternal bool // if true, this is the internal store watcher and should never be resynced
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't understand they idea of not resyncing internal watchers.

stopOnce sync.Once
}

func newWatcher(bufSize int, pred KeyPredicate) *watcher {
return &watcher{
respCh: make(chan clientv3.WatchResponse, bufSize),
keyPred: pred,
respCh: make(chan clientv3.WatchResponse, bufSize),
keyPred: pred,
isInternal: false,
}
}

func newInternalWatcher(bufSize int, pred KeyPredicate) *watcher {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we need a separate function to hardcode a bool argument.

return &watcher{
respCh: make(chan clientv3.WatchResponse, bufSize),
keyPred: pred,
isInternal: true,
}
}

Expand Down Expand Up @@ -77,3 +87,16 @@ func (w *watcher) Stop() {
close(w.respCh)
})
}

// StopWithError closes the event channel and sets an error response.
// This is used when the internal store watcher lags and needs to trigger a watch restart.
func (w *watcher) StopWithError(reason string) {
resp := &clientv3.WatchResponse{
Canceled: true,
CancelReason: reason,
}
w.stopOnce.Do(func() {
w.cancelResp = resp
close(w.respCh)
})
}