diff --git a/pkg/workflows/dontime/plugin_test.go b/pkg/workflows/dontime/plugin_test.go index b75c517db..3e91d7241 100644 --- a/pkg/workflows/dontime/plugin_test.go +++ b/pkg/workflows/dontime/plugin_test.go @@ -280,11 +280,16 @@ func TestPlugin_FinishedExecutions(t *testing.T) { }) t.Run("Transmit: delete removed executionIDs", func(t *testing.T) { + store.setDonTimes("workflow-123", []int64{time.Now().UnixMilli()}) + r := ocr3types.ReportWithInfo[[]byte]{} r.Report, err = proto.Marshal(outcomeProto) require.NoError(t, err) err = transmitter.Transmit(ctx, types.ConfigDigest{}, 0, r, []types.AttributedOnchainSignature{}) require.NoError(t, err) + + _, err = store.GetDonTimes("workflow-123") + require.ErrorContains(t, err, "no don time for executionID workflow-123") }) } diff --git a/pkg/workflows/dontime/store.go b/pkg/workflows/dontime/store.go index b0354c73f..2e7f20405 100644 --- a/pkg/workflows/dontime/store.go +++ b/pkg/workflows/dontime/store.go @@ -117,6 +117,22 @@ func (s *Store) setDonTimes(executionID string, donTimes []int64) { s.donTimes[executionID] = donTimes } +func (s *Store) replaceDonTimes(donTimes map[string][]int64) { + s.mu.Lock() + defer s.mu.Unlock() + + for executionID, timestamps := range donTimes { + s.donTimes[executionID] = timestamps + } + + for executionID := range s.donTimes { + if _, ok := donTimes[executionID]; !ok { + delete(s.donTimes, executionID) + delete(s.requests, executionID) + } + } +} + func (s *Store) GetLastObservedDonTime() int64 { s.mu.Lock() defer s.mu.Unlock() diff --git a/pkg/workflows/dontime/transmitter.go b/pkg/workflows/dontime/transmitter.go index 0f31e5aee..c9d30c3a5 100644 --- a/pkg/workflows/dontime/transmitter.go +++ b/pkg/workflows/dontime/transmitter.go @@ -33,9 +33,11 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64 return err } + currentDonTimes := make(map[string][]int64, len(outcome.ObservedDonTimes)) for id, observedDonTimes := range outcome.ObservedDonTimes { - t.store.setDonTimes(id, observedDonTimes.Timestamps) + currentDonTimes[id] = observedDonTimes.Timestamps } + t.store.replaceDonTimes(currentDonTimes) t.store.setLastObservedDonTime(outcome.Timestamp) t.lggr.Infow("Transmitting timestamps", "lastObservedDonTime", outcome.Timestamp)