diff --git a/internal/reaper/reaper.go b/internal/reaper/reaper.go index 0c187c400a..af05a4fca3 100644 --- a/internal/reaper/reaper.go +++ b/internal/reaper/reaper.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "sync" + "sync/atomic" "github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts" @@ -37,6 +39,7 @@ type Reaper struct { measurementCh chan metrics.MeasurementEnvelope measurementCache *InstanceMetricCache logger log.Logger + mu sync.RWMutex monitoredSources sources.SourceConns prevLoopMonitoredDBs sources.SourceConns cancelFuncs map[string]context.CancelFunc @@ -97,7 +100,11 @@ func (r *Reaper) Reap(ctx context.Context) { // UpdateMonitoredDBCache(r.monitoredSources) hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set - for _, monitoredSource := range r.monitoredSources { + r.mu.RLock() + srcs := slices.Clone(r.monitoredSources) + r.mu.RUnlock() + + for _, monitoredSource := range srcs { srcL := logger.WithField("source", monitoredSource.Name) ctx = log.WithLogger(ctx, srcL) @@ -418,7 +425,9 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) { r.logger.WithField("source", md.Name).Info("Source configs changed, restarting all gatherers...") r.ShutdownOldWorkers(ctx, map[string]bool{md.Name: true}) } + r.mu.Lock() r.monitoredSources = newSrcs + r.mu.Unlock() r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed") return nil } @@ -427,9 +436,12 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) { // every monitoredDbsDatastoreSyncIntervalSeconds (default 10min) func (r *Reaper) WriteMonitoredSources(ctx context.Context) { for { - if len(r.monitoredSources) > 0 { + r.mu.RLock() + srcs := slices.Clone(r.monitoredSources) + r.mu.RUnlock() + if len(srcs) > 0 { now := time.Now().UnixNano() - for _, mdb := range r.monitoredSources { + for _, mdb := range srcs { db := metrics.NewMeasurement(now) db["tag_group"] = mdb.Group db["master_only"] = mdb.OnlyIfMaster diff --git a/internal/webserver/wslog_test.go b/internal/webserver/wslog_test.go index dcc1f5ddf3..722b3bba96 100644 --- a/internal/webserver/wslog_test.go +++ b/internal/webserver/wslog_test.go @@ -43,12 +43,22 @@ func TestServeWsLog_Success(t *testing.T) { assert.NoError(t, ws.WriteMessage(websocket.PingMessage, nil)) // send some log message - time.Sleep(100 * time.Millisecond) ts.Info("Test message") - // check output though the websocket - assert.NoError(t, ws.SetReadDeadline(time.Now().Add(2*time.Second))) - msgType, msg, err := ws.ReadMessage() - assert.NoError(t, err) + var msg []byte + var msgType int + + require.Eventually(t, func() bool { + if err := ws.SetReadDeadline(time.Now().Add(200 * time.Millisecond)); err != nil { + return false + } + tpe, m, err := ws.ReadMessage() + if err != nil { + return false + } + msgType = tpe + msg = m + return strings.Contains(string(msg), "Test message") + }, 2*time.Second, 50*time.Millisecond) assert.Equal(t, websocket.TextMessage, msgType) assert.NotEmpty(t, msg) assert.Contains(t, string(msg), "Test message")