Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/workflows/dontime/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,16 @@ func TestPlugin_FinishedExecutions(t *testing.T) {
})

t.Run("Transmit: delete removed executionIDs", func(t *testing.T) {
store.setDonTimes("workflow-123", []int64{time.Now().UnixMilli()})

r := ocr3types.ReportWithInfo[[]byte]{}
r.Report, err = proto.Marshal(outcomeProto)
require.NoError(t, err)
err = transmitter.Transmit(ctx, types.ConfigDigest{}, 0, r, []types.AttributedOnchainSignature{})
require.NoError(t, err)

_, err = store.GetDonTimes("workflow-123")
require.ErrorContains(t, err, "no don time for executionID workflow-123")
})
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/workflows/dontime/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ func (s *Store) setDonTimes(executionID string, donTimes []int64) {
s.donTimes[executionID] = donTimes
}

func (s *Store) replaceDonTimes(donTimes map[string][]int64) {
s.mu.Lock()
defer s.mu.Unlock()

for executionID, timestamps := range donTimes {
s.donTimes[executionID] = timestamps
}

for executionID := range s.donTimes {
if _, ok := donTimes[executionID]; !ok {
delete(s.donTimes, executionID)
delete(s.requests, executionID)
}
}
}

func (s *Store) GetLastObservedDonTime() int64 {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
4 changes: 3 additions & 1 deletion pkg/workflows/dontime/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64
return err
}

currentDonTimes := make(map[string][]int64, len(outcome.ObservedDonTimes))
for id, observedDonTimes := range outcome.ObservedDonTimes {
t.store.setDonTimes(id, observedDonTimes.Timestamps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same though, but Codex says this:

Not by itself.
The deletion in plugin.go only removes state during Outcome, from that node’s local store, at the moment that node computes the pruned outcome. The problem is that transmitter.go also writes to the same local store later, based on whatever is in the transmitted report, and before the fix it only did “upserts”:

for IDs still present: setDonTimes(...)

for IDs no longer present: do nothing
So if a store already had donTimes["workflow-123"], and a later report arrived where workflow-123 had been pruned from ObservedDonTimes, Transmit would leave that stale entry behind unless that exact process had already deleted it via Outcome.
Why that matters:

Outcome cleanup is not the right place to rely on for store reconciliation, because Transmit is another mutation path for the same state.

The store should reflect the latest transmitted outcome after Transmit finishes.

Without reconciliation in Transmit, stale donTimes can remain locally and RequestDonTime can incorrectly serve old cached timestamps for already-expired execution IDs.
So the deletion in plugin.go was correct, but it was not sufficient to keep the store consistent across all store write paths.

@DylanTinianov : Could you look?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this makes sense. Also I have another PR to clean ObservedDonTimes

currentDonTimes[id] = observedDonTimes.Timestamps
}
t.store.replaceDonTimes(currentDonTimes)
t.store.setLastObservedDonTime(outcome.Timestamp)

t.lggr.Infow("Transmitting timestamps", "lastObservedDonTime", outcome.Timestamp)
Expand Down
Loading