diff --git a/go.mod b/go.mod index 7163e10..2ff28c2 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.23.0 github.com/shopspring/decimal v1.4.0 - github.com/smartcontractkit/chainlink-common v0.10.1-0.20260217160002-b56cb5356cc7 + github.com/smartcontractkit/chainlink-common v0.10.1-0.20260319155722-325b0156b188 github.com/smartcontractkit/chainlink-common/keystore v1.0.2 github.com/smartcontractkit/libocr v0.0.0-20260130195252-6e18e2a30acc github.com/smartcontractkit/wsrpc v0.8.5-0.20250502134807-c57d3d995945 @@ -97,7 +97,7 @@ require ( github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/smartcontractkit/chain-selectors v1.0.91 // indirect github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 // indirect - github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f // indirect + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260226130359-963f935e0396 // indirect github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b // indirect github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b // indirect github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e // indirect diff --git a/go.sum b/go.sum index 52c86b9..06694ab 100644 --- a/go.sum +++ b/go.sum @@ -567,14 +567,16 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/smartcontractkit/chain-selectors v1.0.91 h1:Aip7IZTv40RtbHgZ9mTjm5KyhYrpPefG7iVMzLZ27M4= github.com/smartcontractkit/chain-selectors v1.0.91/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common v0.10.1-0.20260217160002-b56cb5356cc7 h1:h5cmgzKpKn5N5ItpEDFhRcrtqs36nu9r/dciJub1hos= -github.com/smartcontractkit/chainlink-common v0.10.1-0.20260217160002-b56cb5356cc7/go.mod h1:HXgSKzmZ/bhSx8nHU7hHW6dR+BHSXkdcpFv2T8qJcS8= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260317141303-08016f5b4637 h1:kQjHx2gBmb0NY1scp7fE3dv/HcV3joT1xnMq0PMyNXk= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260317141303-08016f5b4637/go.mod h1:0ghbAr7tRO0tT5ZqBXhOyzgUO37tNNe33Yn0hskauVM= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260319155722-325b0156b188 h1:6n15Fng45mSa2OEqRAiFk4e/6O5mZpo15eXzgF5g5xA= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260319155722-325b0156b188/go.mod h1:0ghbAr7tRO0tT5ZqBXhOyzgUO37tNNe33Yn0hskauVM= github.com/smartcontractkit/chainlink-common/keystore v1.0.2 h1:AWisx4JT3QV8tcgh6J5NCrex+wAgTYpWyHsyNPSXzsQ= github.com/smartcontractkit/chainlink-common/keystore v1.0.2/go.mod h1:rSkIHdomyak3YnUtXLenl6poIq8q0V3UZPiiyYqPdGA= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f h1:MHlgzqiDPyDV397bZkzS9TtWXb3FR9Pb8FR9cP9h0As= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260226130359-963f935e0396 h1:03tbcwjyIEjvHba1IWOj1sfThwebm2XNzyFHSuZtlWc= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260226130359-963f935e0396/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b h1:36knUpKHHAZ86K4FGWXtx8i/EQftGdk2bqCoEu/Cha8= diff --git a/llo/channel_definitions_test.go b/llo/channel_definitions_test.go index 74b78ba..3153562 100644 --- a/llo/channel_definitions_test.go +++ b/llo/channel_definitions_test.go @@ -13,7 +13,7 @@ type mockReportCodec struct { err error } -func (m mockReportCodec) Encode(Report, llotypes.ChannelDefinition) ([]byte, error) { +func (m mockReportCodec) Encode(Report, llotypes.ChannelDefinition, *OptsCache) ([]byte, error) { return nil, nil } diff --git a/llo/json_report_codec.go b/llo/json_report_codec.go index efb47a3..a6ac6af 100644 --- a/llo/json_report_codec.go +++ b/llo/json_report_codec.go @@ -19,7 +19,7 @@ var _ ReportCodec = JSONReportCodec{} type JSONReportCodec struct{} -func (cdc JSONReportCodec) Encode(r Report, _ llotypes.ChannelDefinition) ([]byte, error) { +func (cdc JSONReportCodec) Encode(r Report, _ llotypes.ChannelDefinition, _ *OptsCache) ([]byte, error) { type encode struct { ConfigDigest types.ConfigDigest SeqNr uint64 diff --git a/llo/json_report_codec_test.go b/llo/json_report_codec_test.go index 3d4ae78..98d30b7 100644 --- a/llo/json_report_codec_test.go +++ b/llo/json_report_codec_test.go @@ -94,7 +94,7 @@ func Test_JSONCodec_Properties(t *testing.T) { properties.Property("Encode/Decode", prop.ForAll( func(r Report) bool { - b, err := codec.Encode(r, cd) + b, err := codec.Encode(r, cd, nil) require.NoError(t, err) r2, err := codec.Decode(b) require.NoError(t, err) @@ -305,7 +305,7 @@ func Test_JSONCodec(t *testing.T) { cdc := JSONReportCodec{} - encoded, err := cdc.Encode(r, llo.ChannelDefinition{}) + encoded, err := cdc.Encode(r, llo.ChannelDefinition{}, nil) require.NoError(t, err) assert.Equal(t, `{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000","SeqNr":43,"ChannelID":46,"ValidAfterNanoseconds":44,"ObservationTimestampNanoseconds":45,"Values":[{"t":0,"v":"1"},{"t":0,"v":"2"},{"t":1,"v":"Q{Bid: 3.13, Benchmark: 4.4, Ask: 5.12}"}],"Specimen":true}`, string(encoded)) //nolint:testifylint // need to verify exact match including order for determinism diff --git a/llo/observation_codec.go b/llo/observation_codec.go index 1a4a880..e524b23 100644 --- a/llo/observation_codec.go +++ b/llo/observation_codec.go @@ -87,29 +87,6 @@ func (c protoObservationCodec) Encode(obs Observation) (types.Observation, error return b, nil } -func channelDefinitionsToProtoObservation(in llotypes.ChannelDefinitions) (out map[uint32]*LLOChannelDefinitionProto) { - if len(in) > 0 { - out = make(map[uint32]*LLOChannelDefinitionProto, len(in)) - for id, d := range in { - streams := make([]*LLOStreamDefinition, len(d.Streams)) - for i, strm := range d.Streams { - streams[i] = &LLOStreamDefinition{ - StreamID: strm.StreamID, - Aggregator: uint32(strm.Aggregator), - } - } - out[id] = &LLOChannelDefinitionProto{ - ReportFormat: uint32(d.ReportFormat), - Streams: streams, - Opts: d.Opts, - Tombstone: d.Tombstone, - Source: d.Source, - } - } - } - return -} - func (c protoObservationCodec) Decode(b types.Observation) (Observation, error) { var err error if c.enableCompression { @@ -187,12 +164,37 @@ func channelDefinitionsFromProtoObservation(channelDefinitions map[uint32]*LLOCh } } dfns[id] = llotypes.ChannelDefinition{ - ReportFormat: llotypes.ReportFormat(d.ReportFormat), - Streams: streams, - Opts: d.Opts, - Tombstone: d.Tombstone, - Source: d.Source, + ReportFormat: llotypes.ReportFormat(d.ReportFormat), + Streams: streams, + Opts: d.Opts, + Tombstone: d.Tombstone, + Source: d.Source, + DisableNilStreamValues: d.DisableNilStreamValues, } } return dfns } + +func channelDefinitionsToProtoObservation(in llotypes.ChannelDefinitions) (out map[uint32]*LLOChannelDefinitionProto) { + if len(in) > 0 { + out = make(map[uint32]*LLOChannelDefinitionProto, len(in)) + for id, d := range in { + streams := make([]*LLOStreamDefinition, len(d.Streams)) + for i, strm := range d.Streams { + streams[i] = &LLOStreamDefinition{ + StreamID: strm.StreamID, + Aggregator: uint32(strm.Aggregator), + } + } + out[id] = &LLOChannelDefinitionProto{ + ReportFormat: uint32(d.ReportFormat), + Streams: streams, + Opts: d.Opts, + Tombstone: d.Tombstone, + Source: d.Source, + DisableNilStreamValues: d.DisableNilStreamValues, + } + } + } + return +} diff --git a/llo/observation_codec_test.go b/llo/observation_codec_test.go index 45a183b..432ea2e 100644 --- a/llo/observation_codec_test.go +++ b/llo/observation_codec_test.go @@ -71,6 +71,38 @@ func Test_protoObservationCodec(t *testing.T) { assert.Equal(t, expectedObs, obs2) }) + t.Run("encode and decode preserves properties in channel definitions", func(t *testing.T) { + obs := Observation{ + UnixTimestampNanoseconds: 1, + UpdateChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 1: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}}, + DisableNilStreamValues: true, + Opts: []byte(`{}`), + Tombstone: false, + Source: 1, + }, + 2: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorQuote}}, + DisableNilStreamValues: false, + Opts: []byte(`{}`), + Tombstone: true, + Source: 2, + }, + }, + } + codec, err := NewProtoObservationCodec(logger.Nop(), true) + require.NoError(t, err) + obsBytes, err := codec.Encode(obs) + require.NoError(t, err) + obs2, err := codec.Decode(obsBytes) + require.NoError(t, err) + assert.Equal(t, obs.UpdateChannelDefinitions, obs2.UpdateChannelDefinitions) + assert.True(t, obs2.UpdateChannelDefinitions[1].DisableNilStreamValues) + assert.False(t, obs2.UpdateChannelDefinitions[2].DisableNilStreamValues) + }) t.Run("decoding with invalid data", func(t *testing.T) { t.Run("not a protobuf", func(t *testing.T) { codec, err := NewProtoObservationCodec(logger.Nop(), true) diff --git a/llo/opts_cache.go b/llo/opts_cache.go new file mode 100644 index 0000000..df114bb --- /dev/null +++ b/llo/opts_cache.go @@ -0,0 +1,121 @@ +package llo + +import ( + "bytes" + "fmt" + "reflect" + sync "sync" + + "github.com/goccy/go-json" + + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" +) + +type optsCacheKey struct { + channelID llotypes.ChannelID + optsType reflect.Type +} + +// OptsCache caches decoded channel definition options keyed by (ChannelID, target type). +// Raw opts bytes are stored via Set during channel definition changes in Outcome(). +// Decoded values are produced lazily on the first GetOpts call for a given (channelID, type) +// and reused until the channel is updated or removed. +type OptsCache struct { + mu sync.Mutex + raw map[llotypes.ChannelID]llotypes.ChannelOpts + decoded map[optsCacheKey]any +} + +func NewOptsCache() *OptsCache { + return &OptsCache{ + raw: make(map[llotypes.ChannelID]llotypes.ChannelOpts), + decoded: make(map[optsCacheKey]any), + } +} + +// Set stores the raw opts for a channel and invalidates any previously decoded +// values for that channel. It is a no-op when the raw bytes are identical to +// what is already stored. +func (c *OptsCache) Set(channelID llotypes.ChannelID, raw llotypes.ChannelOpts) { + c.mu.Lock() + defer c.mu.Unlock() + + if existing, ok := c.raw[channelID]; ok && bytes.Equal(existing, raw) { + return + } + c.raw[channelID] = raw + for key := range c.decoded { + if key.channelID == channelID { + delete(c.decoded, key) + } + } +} + +// Len returns the number of channels in the cache. +func (c *OptsCache) Len() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.raw) +} + +// Remove removes all raw and decoded data for a channel. +func (c *OptsCache) Remove(channelID llotypes.ChannelID) { + c.mu.Lock() + defer c.mu.Unlock() + + delete(c.raw, channelID) + for key := range c.decoded { + if key.channelID == channelID { + delete(c.decoded, key) + } + } +} + +// ResetTo resets the cache to the given channel definitions. +func (c *OptsCache) ResetTo(channelDefinitions llotypes.ChannelDefinitions) { + c.mu.Lock() + defer c.mu.Unlock() + c.raw = make(map[llotypes.ChannelID]llotypes.ChannelOpts) + c.decoded = make(map[optsCacheKey]any) + + for channelID, cd := range channelDefinitions { + c.raw[channelID] = cd.Opts + } +} + +// GetOpts returns decoded channel opts of type T for the given channel. +// On the first call for a given (channelID, T) after Set, the raw bytes are +// decoded via json.Unmarshal and the result is cached. Subsequent calls return +// the cached value directly. +// +// Returns an error if the channel is not in the cache or decoding fails. +// The caller must pass a valid opts cache. +func GetOpts[T any](c *OptsCache, channelID llotypes.ChannelID) (T, error) { + var zero T + c.mu.Lock() + defer c.mu.Unlock() + + key := optsCacheKey{ + channelID: channelID, + optsType: reflect.TypeFor[T](), + } + + if entry, ok := c.decoded[key]; ok { + return entry.(T), nil + } + + raw, ok := c.raw[channelID] + if !ok { + return zero, fmt.Errorf("channel %d not in opts cache", channelID) + } + + var result T + if len(raw) > 0 { + if err := json.Unmarshal(raw, &result); err != nil { + return zero, fmt.Errorf("failed to decode opts for channel %d: %w", channelID, err) + } + } + + c.decoded[key] = result + return result, nil +} diff --git a/llo/opts_cache_test.go b/llo/opts_cache_test.go new file mode 100644 index 0000000..de6244a --- /dev/null +++ b/llo/opts_cache_test.go @@ -0,0 +1,291 @@ +package llo + +import ( + "testing" + + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type testOptsA struct { + FeedID string `json:"feedID"` + Value int `json:"value"` +} + +type testOptsB struct { + Multiplier int `json:"multiplier"` +} + +func TestOptsCache_Len(t *testing.T) { + t.Run("new cache is empty", func(t *testing.T) { + cache := NewOptsCache() + assert.Equal(t, 0, cache.Len()) + }) + + t.Run("Len reflects number of channels", func(t *testing.T) { + cache := NewOptsCache() + assert.Equal(t, 0, cache.Len()) + + cache.Set(1, []byte(`{}`)) + assert.Equal(t, 1, cache.Len()) + + cache.Set(2, []byte(`{}`)) + assert.Equal(t, 2, cache.Len()) + + cache.Set(3, []byte(`{}`)) + assert.Equal(t, 3, cache.Len()) + }) + + t.Run("Remove decreases Len", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{}`)) + cache.Set(2, []byte(`{}`)) + assert.Equal(t, 2, cache.Len()) + + cache.Remove(1) + assert.Equal(t, 1, cache.Len()) + + cache.Remove(2) + assert.Equal(t, 0, cache.Len()) + }) + + t.Run("Set same channel with identical bytes does not change Len", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{"feedID":"x"}`)) + assert.Equal(t, 1, cache.Len()) + cache.Set(1, []byte(`{"feedID":"x"}`)) + assert.Equal(t, 1, cache.Len()) + }) + + t.Run("Set same channel with different bytes does not change Len", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{"feedID":"a"}`)) + assert.Equal(t, 1, cache.Len()) + cache.Set(1, []byte(`{"feedID":"b"}`)) + assert.Equal(t, 1, cache.Len()) + }) +} + +func TestOptsCache_GetOpts(t *testing.T) { + t.Run("missing channel returns error", func(t *testing.T) { + cache := NewOptsCache() + _, err := GetOpts[testOptsA](cache, 1) + require.Error(t, err) + assert.Contains(t, err.Error(), "not in opts cache") + }) + + t.Run("decodes and caches on first access", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{"feedID":"eth-usd","value":42}`)) + + r1, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + assert.Equal(t, "eth-usd", r1.FeedID) + assert.Equal(t, 42, r1.Value) + + r2, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + assert.Equal(t, r1, r2) + }) + + t.Run("empty raw bytes returns zero value", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, nil) + + r, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + assert.Equal(t, testOptsA{}, r) + }) + + t.Run("different types for same channel are independent", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{"feedID":"eth","value":1,"multiplier":10}`)) + + a, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + assert.Equal(t, "eth", a.FeedID) + + b, err := GetOpts[testOptsB](cache, 1) + require.NoError(t, err) + assert.Equal(t, 10, b.Multiplier) + }) + + t.Run("different channels are independent", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{"feedID":"eth"}`)) + cache.Set(2, []byte(`{"feedID":"btc"}`)) + + r1, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + assert.Equal(t, "eth", r1.FeedID) + + r2, err := GetOpts[testOptsA](cache, 2) + require.NoError(t, err) + assert.Equal(t, "btc", r2.FeedID) + }) + + t.Run("invalid JSON returns error", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`not-json`)) + + _, err := GetOpts[testOptsA](cache, 1) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to decode") + }) +} + +func TestOptsCache_Set(t *testing.T) { + t.Run("Set invalidates decoded entries", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{"feedID":"v1"}`)) + + r1, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + assert.Equal(t, "v1", r1.FeedID) + + cache.Set(1, []byte(`{"feedID":"v2"}`)) + + r2, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + assert.Equal(t, "v2", r2.FeedID) + }) + + t.Run("Set with identical bytes is a no-op", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{"feedID":"same"}`)) + + r1, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + assert.Equal(t, "same", r1.FeedID) + + // Set again with the same bytes — decoded cache should survive + cache.Set(1, []byte(`{"feedID":"same"}`)) + + // Should still be the same cached object (no re-decode) + r2, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + assert.Equal(t, "same", r2.FeedID) + }) +} + +func TestOptsCache_Remove(t *testing.T) { + t.Run("removes raw and decoded entries", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{"feedID":"eth"}`)) + cache.Set(2, []byte(`{"feedID":"btc"}`)) + + _, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + _, err = GetOpts[testOptsA](cache, 2) + require.NoError(t, err) + + cache.Remove(1) + + _, err = GetOpts[testOptsA](cache, 1) + require.Error(t, err, "channel 1 should be gone") + + r, err := GetOpts[testOptsA](cache, 2) + require.NoError(t, err, "channel 2 should remain") + assert.Equal(t, "btc", r.FeedID) + }) + + t.Run("remove then re-set works", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{"feedID":"old"}`)) + _, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + + cache.Remove(1) + cache.Set(1, []byte(`{"feedID":"new"}`)) + + r, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + assert.Equal(t, "new", r.FeedID) + }) +} + +func TestOptsCache_ResetTo(t *testing.T) { + t.Run("replaces all entries with channel definitions", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{"feedID":"old1"}`)) + cache.Set(2, []byte(`{"feedID":"old2"}`)) + cache.Set(3, []byte(`{"feedID":"old3"}`)) + _, _ = GetOpts[testOptsA](cache, 1) + _, _ = GetOpts[testOptsA](cache, 2) + require.Equal(t, 3, cache.Len()) + + cache.ResetTo(llotypes.ChannelDefinitions{ + 2: {Opts: llotypes.ChannelOpts(`{"feedID":"ch2"}`)}, + 4: {Opts: llotypes.ChannelOpts(`{"feedID":"ch4"}`)}, + }) + + assert.Equal(t, 2, cache.Len()) + _, err := GetOpts[testOptsA](cache, 1) + require.Error(t, err, "channel 1 should be gone after reset") + r2, err := GetOpts[testOptsA](cache, 2) + require.NoError(t, err) + assert.Equal(t, "ch2", r2.FeedID) + _, err = GetOpts[testOptsA](cache, 3) + require.Error(t, err, "channel 3 should be gone after reset") + r4, err := GetOpts[testOptsA](cache, 4) + require.NoError(t, err) + assert.Equal(t, "ch4", r4.FeedID) + }) + + t.Run("empty definitions clears cache", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{"feedID":"x"}`)) + require.Equal(t, 1, cache.Len()) + + cache.ResetTo(nil) + assert.Equal(t, 0, cache.Len()) + _, err := GetOpts[testOptsA](cache, 1) + require.Error(t, err) + }) + + t.Run("empty map clears cache", func(t *testing.T) { + cache := NewOptsCache() + cache.Set(1, []byte(`{"feedID":"x"}`)) + require.Equal(t, 1, cache.Len()) + + cache.ResetTo(llotypes.ChannelDefinitions{}) + assert.Equal(t, 0, cache.Len()) + _, err := GetOpts[testOptsA](cache, 1) + require.Error(t, err) + }) +} + +func TestOptsCache_ChannelDefinitionWorkflow(t *testing.T) { + cache := NewOptsCache() + defs := llotypes.ChannelDefinitions{ + 1: {Opts: llotypes.ChannelOpts(`{"feedID":"ch1"}`)}, + 2: {Opts: llotypes.ChannelOpts(`{"feedID":"ch2"}`)}, + 3: {Opts: llotypes.ChannelOpts(`{"feedID":"ch3"}`)}, + } + + // Populate cache from initial definitions + for cid, cd := range defs { + cache.Set(cid, cd.Opts) + } + + r1, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + assert.Equal(t, "ch1", r1.FeedID) + + // Update channel 2 + cache.Set(2, []byte(`{"feedID":"ch2-updated"}`)) + r2, err := GetOpts[testOptsA](cache, 2) + require.NoError(t, err) + assert.Equal(t, "ch2-updated", r2.FeedID) + + // Remove channel 3 + cache.Remove(3) + _, err = GetOpts[testOptsA](cache, 3) + require.Error(t, err) + + // Channel 1 unchanged + r1Again, err := GetOpts[testOptsA](cache, 1) + require.NoError(t, err) + assert.Equal(t, "ch1", r1Again.FeedID) +} diff --git a/llo/outcome_codec_common.go b/llo/outcome_codec_common.go index 288d1dc..a5f8f3e 100644 --- a/llo/outcome_codec_common.go +++ b/llo/outcome_codec_common.go @@ -50,6 +50,24 @@ func makeLLOStreamValue(v StreamValue) (*LLOStreamValue, error) { return &LLOStreamValue{Type: v.Type(), Value: value}, nil } +func makeChannelDefinitionProto(d llotypes.ChannelDefinition) *LLOChannelDefinitionProto { + streams := make([]*LLOStreamDefinition, len(d.Streams)) + for i, strm := range d.Streams { + streams[i] = &LLOStreamDefinition{ + StreamID: strm.StreamID, + Aggregator: uint32(strm.Aggregator), + } + } + return &LLOChannelDefinitionProto{ + ReportFormat: uint32(d.ReportFormat), + Streams: streams, + Opts: d.Opts, + Tombstone: d.Tombstone, + Source: d.Source, + DisableNilStreamValues: d.DisableNilStreamValues, + } +} + func channelDefinitionsToProtoOutcome(in llotypes.ChannelDefinitions) (out []*LLOChannelIDAndDefinitionProto) { if len(in) > 0 { out = make([]*LLOChannelIDAndDefinitionProto, 0, len(in)) @@ -66,23 +84,6 @@ func channelDefinitionsToProtoOutcome(in llotypes.ChannelDefinitions) (out []*LL return } -func makeChannelDefinitionProto(d llotypes.ChannelDefinition) *LLOChannelDefinitionProto { - streams := make([]*LLOStreamDefinition, len(d.Streams)) - for i, strm := range d.Streams { - streams[i] = &LLOStreamDefinition{ - StreamID: strm.StreamID, - Aggregator: uint32(strm.Aggregator), - } - } - return &LLOChannelDefinitionProto{ - ReportFormat: uint32(d.ReportFormat), - Streams: streams, - Opts: d.Opts, - Tombstone: d.Tombstone, - Source: d.Source, - } -} - func channelDefinitionsFromProtoOutcome(in []*LLOChannelIDAndDefinitionProto) (out llotypes.ChannelDefinitions, err error) { if len(in) > 0 { out = make(map[llotypes.ChannelID]llotypes.ChannelDefinition, len(in)) @@ -100,11 +101,12 @@ func channelDefinitionsFromProtoOutcome(in []*LLOChannelIDAndDefinitionProto) (o } } out[d.ChannelID] = llotypes.ChannelDefinition{ - ReportFormat: llotypes.ReportFormat(d.ChannelDefinition.ReportFormat), - Streams: streams, - Opts: d.ChannelDefinition.Opts, - Tombstone: d.ChannelDefinition.Tombstone, - Source: d.ChannelDefinition.Source, + ReportFormat: llotypes.ReportFormat(d.ChannelDefinition.ReportFormat), + Streams: streams, + Opts: d.ChannelDefinition.Opts, + Tombstone: d.ChannelDefinition.Tombstone, + Source: d.ChannelDefinition.Source, + DisableNilStreamValues: d.ChannelDefinition.DisableNilStreamValues, } } } diff --git a/llo/outcome_codec_v0_test.go b/llo/outcome_codec_v0_test.go index b5f5bf7..94e7fd3 100644 --- a/llo/outcome_codec_v0_test.go +++ b/llo/outcome_codec_v0_test.go @@ -78,6 +78,37 @@ func Test_protoOutcomeCodecV0(t *testing.T) { assert.Equal(t, outcome, outcome2) }) + t.Run("encode and decode preserves attributes in channel definitions", func(t *testing.T) { + outcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("staging"), + ObservationTimestampNanoseconds: 1, + ChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 1: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}}, + DisableNilStreamValues: true, + Opts: []byte(`{}`), + Tombstone: false, + Source: 1, + }, + 2: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorQuote}}, + DisableNilStreamValues: false, + Opts: []byte(`{}`), + Tombstone: true, + Source: 2, + }, + }, + } + outcomeBytes, err := (protoOutcomeCodecV0{}).Encode(outcome) + require.NoError(t, err) + outcome2, err := (protoOutcomeCodecV0{}).Decode(outcomeBytes) + require.NoError(t, err) + assert.Equal(t, outcome.ChannelDefinitions, outcome2.ChannelDefinitions) + assert.True(t, outcome2.ChannelDefinitions[1].DisableNilStreamValues) + assert.False(t, outcome2.ChannelDefinitions[2].DisableNilStreamValues) + }) } func Fuzz_protoOutcomeCodecV0_Decode(f *testing.F) { diff --git a/llo/outcome_codec_v1_test.go b/llo/outcome_codec_v1_test.go index a29351f..aa86d35 100644 --- a/llo/outcome_codec_v1_test.go +++ b/llo/outcome_codec_v1_test.go @@ -78,6 +78,37 @@ func Test_protoOutcomeCodecV1(t *testing.T) { assert.Equal(t, outcome, outcome2) }) + t.Run("encode and decode preserves attributes in channel definitions", func(t *testing.T) { + outcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("staging"), + ObservationTimestampNanoseconds: 1, + ChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 1: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}}, + DisableNilStreamValues: true, + Opts: []byte(`{}`), + Tombstone: false, + Source: 1, + }, + 2: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorQuote}}, + DisableNilStreamValues: false, + Opts: []byte(`{}`), + Tombstone: true, + Source: 2, + }, + }, + } + outcomeBytes, err := (protoOutcomeCodecV1{}).Encode(outcome) + require.NoError(t, err) + outcome2, err := (protoOutcomeCodecV1{}).Decode(outcomeBytes) + require.NoError(t, err) + assert.Equal(t, outcome.ChannelDefinitions, outcome2.ChannelDefinitions) + assert.True(t, outcome2.ChannelDefinitions[1].DisableNilStreamValues) + assert.False(t, outcome2.ChannelDefinitions[2].DisableNilStreamValues) + }) } // Test_protoOutcomeCodecV1_GoldenFiles asserts outcome serialization against committed golden files. diff --git a/llo/outcome_golden_cases.go b/llo/outcome_golden_cases.go index ea59493..08eb3ac 100644 --- a/llo/outcome_golden_cases.go +++ b/llo/outcome_golden_cases.go @@ -18,23 +18,20 @@ type GoldenOutcomeCase struct { // fullChannelDefinitions is shared between the "full" and "from_full" golden cases. var fullChannelDefinitions = map[llotypes.ChannelID]llotypes.ChannelDefinition{ 1: { - ReportFormat: llotypes.ReportFormatJSON, - Streams: []llotypes.Stream{ - {StreamID: 1, Aggregator: llotypes.AggregatorMedian}, - {StreamID: 2, Aggregator: llotypes.AggregatorQuote}, - }, - Opts: []byte(`{"foo":"bar"}`), - Tombstone: true, - Source: 0, + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorQuote}}, + Opts: []byte(`{"foo":"bar"}`), + Tombstone: true, + Source: 0, + DisableNilStreamValues: true, }, 2: { - ReportFormat: llotypes.ReportFormatJSON, - Streams: []llotypes.Stream{ - {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, - }, - Opts: []byte(`{"baz":"qux"}`), - Source: 1001, - Tombstone: false, + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, + Opts: []byte(`{"baz":"qux"}`), + Source: 1001, + Tombstone: false, + DisableNilStreamValues: true, }, } diff --git a/llo/plugin.go b/llo/plugin.go index e146009..2a2e5fb 100644 --- a/llo/plugin.go +++ b/llo/plugin.go @@ -292,6 +292,7 @@ func (f *PluginFactory) NewReportingPlugin(ctx context.Context, cfg ocr3types.Re f.OutcomeTelemetryCh, f.ReportTelemetryCh, f.DonID, + NewOptsCache(), cfg.MaxDurationObservation, offchainConfig.ProtocolVersion, offchainConfig.DefaultMinReportIntervalNanoseconds, @@ -326,7 +327,8 @@ type Plugin struct { ReportCodecs map[llotypes.ReportFormat]ReportCodec OutcomeTelemetryCh chan<- *LLOOutcomeTelemetry ReportTelemetryCh chan<- *LLOReportTelemetry - DonID uint32 + DonID uint32 + OptsCache *OptsCache // must be non-nil; set by NewReportingPlugin or by tests that exercise Outcome/Reports // From ReportingPluginConfig MaxDurationObservation time.Duration diff --git a/llo/plugin_codecs.pb.go b/llo/plugin_codecs.pb.go index f2b89e1..f5f5a99 100644 --- a/llo/plugin_codecs.pb.go +++ b/llo/plugin_codecs.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.7 -// protoc v6.33.0 +// protoc v7.34.0 // source: plugin_codecs.proto package llo @@ -337,14 +337,15 @@ func (x *LLOTimestampedStreamValue) GetStreamValue() *LLOStreamValue { } type LLOChannelDefinitionProto struct { - state protoimpl.MessageState `protogen:"open.v1"` - ReportFormat uint32 `protobuf:"varint,1,opt,name=reportFormat,proto3" json:"reportFormat,omitempty"` - Streams []*LLOStreamDefinition `protobuf:"bytes,2,rep,name=streams,proto3" json:"streams,omitempty"` - Opts []byte `protobuf:"bytes,3,opt,name=opts,proto3" json:"opts,omitempty"` - Tombstone bool `protobuf:"varint,4,opt,name=tombstone,proto3" json:"tombstone,omitempty"` - Source uint32 `protobuf:"varint,5,opt,name=source,proto3" json:"source,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + ReportFormat uint32 `protobuf:"varint,1,opt,name=reportFormat,proto3" json:"reportFormat,omitempty"` + Streams []*LLOStreamDefinition `protobuf:"bytes,2,rep,name=streams,proto3" json:"streams,omitempty"` + Opts []byte `protobuf:"bytes,3,opt,name=opts,proto3" json:"opts,omitempty"` + Tombstone bool `protobuf:"varint,4,opt,name=tombstone,proto3" json:"tombstone,omitempty"` + Source uint32 `protobuf:"varint,5,opt,name=source,proto3" json:"source,omitempty"` + DisableNilStreamValues bool `protobuf:"varint,6,opt,name=disableNilStreamValues,proto3" json:"disableNilStreamValues,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *LLOChannelDefinitionProto) Reset() { @@ -412,6 +413,13 @@ func (x *LLOChannelDefinitionProto) GetSource() uint32 { return 0 } +func (x *LLOChannelDefinitionProto) GetDisableNilStreamValues() bool { + if x != nil { + return x.DisableNilStreamValues + } + return false +} + type LLOStreamDefinition struct { state protoimpl.MessageState `protogen:"open.v1"` StreamID uint32 `protobuf:"varint,1,opt,name=streamID,proto3" json:"streamID,omitempty"` @@ -918,13 +926,14 @@ const file_plugin_codecs_proto_rawDesc = "" + "\x03ask\x18\x03 \x01(\fR\x03ask\"\x87\x01\n" + "\x19LLOTimestampedStreamValue\x124\n" + "\x15observedAtNanoseconds\x18\x01 \x01(\x04R\x15observedAtNanoseconds\x124\n" + - "\vstreamValue\x18\x02 \x01(\v2\x12.v1.LLOStreamValueR\vstreamValue\"\xbc\x01\n" + + "\vstreamValue\x18\x02 \x01(\v2\x12.v1.LLOStreamValueR\vstreamValue\"\xf4\x01\n" + "\x19LLOChannelDefinitionProto\x12\"\n" + "\freportFormat\x18\x01 \x01(\rR\freportFormat\x121\n" + "\astreams\x18\x02 \x03(\v2\x17.v1.LLOStreamDefinitionR\astreams\x12\x12\n" + "\x04opts\x18\x03 \x01(\fR\x04opts\x12\x1c\n" + "\ttombstone\x18\x04 \x01(\bR\ttombstone\x12\x16\n" + - "\x06source\x18\x05 \x01(\rR\x06source\"Q\n" + + "\x06source\x18\x05 \x01(\rR\x06source\x126\n" + + "\x16disableNilStreamValues\x18\x06 \x01(\bR\x16disableNilStreamValues\"Q\n" + "\x13LLOStreamDefinition\x12\x1a\n" + "\bstreamID\x18\x01 \x01(\rR\bstreamID\x12\x1e\n" + "\n" + diff --git a/llo/plugin_codecs.proto b/llo/plugin_codecs.proto index a2e869a..6fb7d44 100644 --- a/llo/plugin_codecs.proto +++ b/llo/plugin_codecs.proto @@ -58,6 +58,7 @@ message LLOChannelDefinitionProto { bytes opts = 3; bool tombstone = 4; uint32 source = 5; + bool disableNilStreamValues = 6; } message LLOStreamDefinition { diff --git a/llo/plugin_codecs_test.go b/llo/plugin_codecs_test.go index 1783f4d..456740b 100644 --- a/llo/plugin_codecs_test.go +++ b/llo/plugin_codecs_test.go @@ -60,11 +60,12 @@ func genStreamValuesMap() gopter.Gen { func genChannelDefinition() gopter.Gen { return gen.StrictStruct(reflect.TypeOf(llotypes.ChannelDefinition{}), map[string]gopter.Gen{ - "ReportFormat": genReportFormat(), - "Streams": gen.SliceOf(genStream()), - "Opts": gen.SliceOf(gen.UInt8()), - "Tombstone": gen.Bool(), - "Source": gen.UInt32(), + "ReportFormat": genReportFormat(), + "Streams": gen.SliceOf(genStream()), + "Opts": gen.SliceOf(gen.UInt8()), + "Tombstone": gen.Bool(), + "Source": gen.UInt32(), + "DisableNilStreamValues": gen.Bool(), }) } @@ -123,6 +124,15 @@ func equalObservations(obs, obs2 Observation) bool { if !bytes.Equal(v.Opts, v2.Opts) { return false } + if v.DisableNilStreamValues != v2.DisableNilStreamValues { + return false + } + if v.Tombstone != v2.Tombstone { + return false + } + if v.Source != v2.Source { + return false + } } if len(obs.StreamValues) != len(obs2.StreamValues) { @@ -171,6 +181,18 @@ func equalOutcomes(t *testing.T, outcome, outcome2 Outcome) bool { t.Logf("Outcomes not equal; ChannelDefinitions: %v != %v", outcome.ChannelDefinitions, outcome2.ChannelDefinitions) return false } + if v.DisableNilStreamValues != v2.DisableNilStreamValues { + t.Logf("Outcomes not equal; ChannelDefinitions DisableNilStreamValues: %v != %v", outcome.ChannelDefinitions, outcome2.ChannelDefinitions) + return false + } + if v.Tombstone != v2.Tombstone { + t.Logf("Outcomes not equal; ChannelDefinitions Tombstone: %v != %v", outcome.ChannelDefinitions, outcome2.ChannelDefinitions) + return false + } + if v.Source != v2.Source { + t.Logf("Outcomes not equal; ChannelDefinitions Source: %v != %v", outcome.ChannelDefinitions, outcome2.ChannelDefinitions) + return false + } } if len(outcome.ValidAfterNanoseconds) != len(outcome2.ValidAfterNanoseconds) { diff --git a/llo/plugin_outcome.go b/llo/plugin_outcome.go index 95f5a09..69bb615 100644 --- a/llo/plugin_outcome.go +++ b/llo/plugin_outcome.go @@ -7,7 +7,6 @@ import ( "fmt" "sort" - "github.com/goccy/go-json" "github.com/smartcontractkit/libocr/offchainreporting2/types" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" @@ -93,6 +92,13 @@ func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos outcome.ChannelDefinitions = llotypes.ChannelDefinitions{} } + // Reset OptsCache if it has a different number of channels than the outcome.ChannelDefinitions + // This can happen if the node was restarted or when the protocol instance is promoted from staging to production. + if p.OptsCache.Len() != len(outcome.ChannelDefinitions) { + p.Logger.Warnw("OptsCache length mismatch with ChannelDefinitions length, resetting OptsCache", "optsCacheLen", p.OptsCache.Len(), "channelDefinitionsLen", len(outcome.ChannelDefinitions), "stage", "Outcome", "seqNr", outctx.SeqNr) + p.OptsCache.ResetTo(outcome.ChannelDefinitions) + } + // if retired, stop updating channel definitions if outcome.LifeCycleStage == LifeCycleStageRetired { removeChannelVotesByID, updateChannelDefinitionsByHash = nil, nil @@ -105,6 +111,7 @@ func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos } removedChannelIDs = append(removedChannelIDs, channelID) delete(outcome.ChannelDefinitions, channelID) + p.OptsCache.Remove(channelID) } type hashWithID struct { @@ -151,6 +158,7 @@ func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos ) } outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition + p.OptsCache.Set(defWithID.ChannelID, defWithID.Opts) } ///////////////////////////////// @@ -163,7 +171,7 @@ func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos if outcome.ValidAfterNanoseconds == nil { outcome.ValidAfterNanoseconds = map[llotypes.ChannelID]uint64{} for channelID, previousValidAfterNanoseconds := range previousOutcome.ValidAfterNanoseconds { - if err3 := previousOutcome.IsReportable(channelID, p.ProtocolVersion, p.DefaultMinReportIntervalNanoseconds); err3 != nil { + if err3 := previousOutcome.IsReportable(channelID, p.ProtocolVersion, p.DefaultMinReportIntervalNanoseconds, p.OptsCache); err3 != nil { if p.Config.VerboseLogging { p.Logger.Debugw("Channel is not reportable", "channelID", channelID, "err", err3, "stage", "Outcome", "seqNr", outctx.SeqNr) } @@ -364,7 +372,7 @@ func (p *Plugin) decodeObservations(aos []types.AttributedObservation, outctx oc streamObservations[id] = append(streamObservations[id], sv) } if p.Config.VerboseLogging { - p.Logger.Debugw("Got observations from peer", "stage", "Outcome", "sv", streamObservations, "oracleID", ao.Observer, "seqNr", outctx.SeqNr) + p.Logger.Debugw("Got observations from peer", "stage", "Outcome", "oracleID", ao.Observer, "seqNr", outctx.SeqNr) } } @@ -400,17 +408,23 @@ func (out *Outcome) GenRetirementReport(protocolVersion uint32) RetirementReport // Channel becomes reportable when: // ObservationTimestampNanoseconds > ValidAfterNanoseconds(previous observation timestamp)+MinReportInterval -// Indicates whether a report can be generated for the given channel. +// timeResolutionOpts is used to read TimeResolution from cached opts in IsReportable. +type timeResolutionOpts struct { + TimeResolution TimeResolution `json:"TimeResolution"` +} + +// IsReportable checks if a report can be generated for the given channel. // Checks if channel is retired, tombstoned, has missing stream values (when -// disableNilStreamValues is set in channel opts), and if ValidAfterNanoseconds -// is set. Returns nil if channel is reportable. +// DisableNilStreamValues is true), and if ValidAfterNanoseconds is set. +// Returns nil if channel is reportable. +// Time-resolution for overlap checks uses IsSecondsResolution, which tries the cache when available and falls back to decoding opts (e.g. after node restart). // // Note: this is not a complete guarantee that a report will be successfully // generated. Reports can still be silently dropped at the encoding step for // other reasons (e.g. codec errors, bid/mid/ask validation failures). Those // failure modes are not covered here and can still result in report gaps if -// disableNilStreamValues is not set or if the failure is unrelated to nil values. -func (out *Outcome) IsReportable(channelID llotypes.ChannelID, protocolVersion uint32, minReportInterval uint64) *UnreportableChannelError { +// DisableNilStreamValues is false or if the report codec fails to encode the report. +func (out *Outcome) IsReportable(channelID llotypes.ChannelID, protocolVersion uint32, minReportInterval uint64, optsCache *OptsCache) *UnreportableChannelError { if out.LifeCycleStage == LifeCycleStageRetired { return &UnreportableChannelError{nil, "IsReportable=false; retired channel", channelID} } @@ -425,14 +439,11 @@ func (out *Outcome) IsReportable(channelID llotypes.ChannelID, protocolVersion u return &UnreportableChannelError{nil, "IsReportable=false; tombstone channel", channelID} } - if nilStreamValuesDisabled(cd.Opts) { - // check if all stream values are present - // note: post-deployment/rollout of the disableNilStreamValues channel opt config, - // we should ensure all channels have disableNilStreamValues: true as this fixes - // a bug related to report gaps. + // If DisableNilStreamValues is true, check if all stream values are present + if cd.DisableNilStreamValues { for _, strm := range cd.Streams { if out.StreamAggregates[strm.StreamID][strm.Aggregator] == nil { - return &UnreportableChannelError{nil, fmt.Sprintf("IsReportable=false; missing stream value for streamID=%d aggregator=%q", strm.StreamID, strm.Aggregator), channelID} + return &UnreportableChannelError{nil, fmt.Sprintf("IsReportable=false; nil stream value for streamID=%d aggregator=%q", strm.StreamID, strm.Aggregator), channelID} } } } @@ -463,7 +474,7 @@ func (out *Outcome) IsReportable(channelID llotypes.ChannelID, protocolVersion u // This keeps compatibility with old nodes that may not have nanosecond resolution // // Also use seconds resolution for report formats that require it to prevent overlap - if protocolVersion == 0 || IsSecondsResolution(cd.ReportFormat, cd.Opts) { + if protocolVersion == 0 || IsSecondsResolution(ChannelDefinitionWithID{cd, channelID}, optsCache) { validAfterSeconds := validAfterNanos / 1e9 obsTsSeconds := obsTsNanos / 1e9 if validAfterSeconds >= obsTsSeconds { @@ -474,54 +485,34 @@ func (out *Outcome) IsReportable(channelID llotypes.ChannelID, protocolVersion u return nil } -// nilStreamValuesDisabled returns false (the default) when nil stream values -// are allowed for the channel, meaning channels will remain reportable even if -// some stream aggregate values are missing. Returns true only when the opts -// JSON explicitly sets "disableNilStreamValues" to true, which causes channels -// with missing stream values to be treated as unreportable. -func nilStreamValuesDisabled(opts []byte) bool { - if len(opts) == 0 { - return false - } - - // loose JSON unmarshal of just disableNilStreamValues field — no dependency on the codec package - var v struct { - DisableNilStreamValues bool `json:"disableNilStreamValues"` - } - if err := json.Unmarshal(opts, &v); err != nil { - return false // default to false if unmarshal fails (not set during initial rollout) - } - return v.DisableNilStreamValues -} - -func IsSecondsResolution(reportFormat llotypes.ReportFormat, opts llotypes.ChannelOpts) bool { - switch reportFormat { +// IsSecondsResolution returns whether the report format uses second-level resolution +// for the given opts. For ReportFormatEVMABIEncodeUnpacked, the cache must be populated +// (e.g. by Outcome's reset or Set during channel add/update) for correct resolution; +// no fallback to decoding opts is used — on cache miss it returns false. +func IsSecondsResolution(cd ChannelDefinitionWithID, optsCache *OptsCache) bool { + switch cd.ReportFormat { // TODO: Might be cleaner to expose a TimeResolution() uint64 field on the // ReportCodec so that the plugin doesn't have to have special knowledge of // the report format details case llotypes.ReportFormatEVMPremiumLegacy: return true case llotypes.ReportFormatEVMABIEncodeUnpacked: - var parsed struct { - TimeResolution TimeResolution `json:"TimeResolution"` - } - if err := json.Unmarshal(opts, &parsed); err != nil { - // If we can't parse opts, default to seconds - return true + if o, err := GetOpts[timeResolutionOpts](optsCache, cd.ChannelID); err == nil { + return o.TimeResolution == ResolutionSeconds } - return parsed.TimeResolution == ResolutionSeconds + return false default: return false } } // List of reportable channels (according to IsReportable), sorted according -// to a canonical ordering -func (out *Outcome) ReportableChannels(protocolVersion uint32, defaultMinReportInterval uint64) (reportable []llotypes.ChannelID, unreportable []*UnreportableChannelError) { +// to a canonical ordering. +func (out *Outcome) ReportableChannels(protocolVersion uint32, defaultMinReportInterval uint64, optsCache *OptsCache) (reportable []llotypes.ChannelID, unreportable []*UnreportableChannelError) { for channelID := range out.ChannelDefinitions { // In theory in future, minReportInterval could be overridden on a // per-channel basis in the ChannelDefinitions - if err := out.IsReportable(channelID, protocolVersion, defaultMinReportInterval); err != nil { + if err := out.IsReportable(channelID, protocolVersion, defaultMinReportInterval, optsCache); err != nil { unreportable = append(unreportable, err) } else { reportable = append(reportable, channelID) diff --git a/llo/plugin_outcome_test.go b/llo/plugin_outcome_test.go index 8912e0a..4b77e5f 100644 --- a/llo/plugin_outcome_test.go +++ b/llo/plugin_outcome_test.go @@ -46,6 +46,7 @@ func Test_Outcome_GoldenFiles(t *testing.T) { ConfigDigest: types.ConfigDigest{1, 2, 3, 4}, ProtocolVersion: 1, DefaultMinReportIntervalNanoseconds: 1, + OptsCache: NewOptsCache(), } // Minimal observations (timestamp only) so the plugin advances from previous outcome without new channel defs or stream values. obs, err := p.ObservationCodec.Encode(Observation{UnixTimestampNanoseconds: 9876543210 + uint64(time.Second)}) @@ -99,6 +100,7 @@ func Test_Outcome_EncodedMatchesGolden(t *testing.T) { ConfigDigest: types.ConfigDigest{1, 2, 3, 4}, ProtocolVersion: 1, DefaultMinReportIntervalNanoseconds: 1, + OptsCache: NewOptsCache(), } // Golden cases that the plugin produces; "full" is only used as previous outcome, not produced here. @@ -166,6 +168,7 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) { DonID: 10000043, ConfigDigest: types.ConfigDigest{1, 2, 3, 4}, F: 1, + OptsCache: NewOptsCache(), } testStartTS := time.Now() testStartNanos := uint64(testStartTS.UnixNano()) //nolint:gosec // safe cast in tests @@ -305,8 +308,8 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) { } // Verify channel is reportable before tombstoning - require.Nil(t, previousOutcome.IsReportable(channelID, 1, uint64(100*time.Millisecond))) - reportable, _ := previousOutcome.ReportableChannels(1, uint64(100*time.Millisecond)) + require.Nil(t, previousOutcome.IsReportable(channelID, 1, uint64(100*time.Millisecond), nil)) + reportable, _ := previousOutcome.ReportableChannels(1, uint64(100*time.Millisecond), nil) assert.Contains(t, reportable, channelID) // Encode previous outcome @@ -346,12 +349,12 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) { assert.Equal(t, tombstonedCd, decoded.ChannelDefinitions[channelID]) // Verify channel is no longer reportable - err = decoded.IsReportable(channelID, 1, uint64(100*time.Millisecond)) + err = decoded.IsReportable(channelID, 1, uint64(100*time.Millisecond), nil) require.NotNil(t, err) assert.Contains(t, err.Error(), "tombstone channel") // Verify ReportableChannels excludes the tombstoned channel - reportable, unreportable := decoded.ReportableChannels(1, uint64(100*time.Millisecond)) + reportable, unreportable := decoded.ReportableChannels(1, uint64(100*time.Millisecond), nil) assert.NotContains(t, reportable, channelID, "Tombstoned channel should not be in reportable list") require.Len(t, unreportable, 1) assert.Equal(t, channelID, unreportable[0].ChannelID) @@ -391,6 +394,57 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) { assert.NotContains(t, decoded.ChannelDefinitions, llotypes.ChannelID(MaxOutcomeChannelDefinitionsLength)) assert.NotContains(t, decoded.ChannelDefinitions, llotypes.ChannelID(MaxOutcomeChannelDefinitionsLength+1)) }) + + t.Run("resets OptsCache when length differs from ChannelDefinitions (e.g. after restart)", func(t *testing.T) { + optsCache := NewOptsCache() + optsCache.Set(1, []byte(`{}`)) + optsCache.Set(2, []byte(`{}`)) + require.Equal(t, 2, optsCache.Len(), "cache should have 2 channels before Outcome") + + pluginWithCache := &Plugin{ + Config: Config{true}, + OutcomeCodec: outcomeCodec, + Logger: logger.Test(t), + ObservationCodec: obsCodec, + DonID: 10000043, + ConfigDigest: types.ConfigDigest{1, 2, 3, 4}, + F: 1, + OptsCache: optsCache, + } + + previousOutcome := Outcome{ + LifeCycleStage: LifeCycleStageProduction, + ObservationTimestampNanoseconds: testStartNanos, + ChannelDefinitions: llotypes.ChannelDefinitions{ + 42: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}}, + Opts: []byte(`{"feedID":"ch42"}`), + }, + }, + } + encodedPrevious, err := pluginWithCache.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + obs, err := pluginWithCache.ObservationCodec.Encode(Observation{}) + require.NoError(t, err) + aos := make([]types.AttributedObservation, 3) + for i := range aos { + aos[i] = types.AttributedObservation{Observation: obs, Observer: commontypes.OracleID(i)} + } + + _, err = pluginWithCache.Outcome(ctx, ocr3types.OutcomeContext{ + PreviousOutcome: encodedPrevious, + SeqNr: 2, + }, types.Query{}, aos) + require.NoError(t, err) + + assert.Equal(t, 1, pluginWithCache.OptsCache.Len(), "OptsCache should be reset to match ChannelDefinitions length") + _, err = GetOpts[struct{ FeedID string `json:"feedID"` }](pluginWithCache.OptsCache, 42) + require.NoError(t, err, "channel 42 should be in cache after reset") + _, err = GetOpts[struct{ FeedID string `json:"feedID"` }](pluginWithCache.OptsCache, 1) + require.Error(t, err, "channel 1 should no longer be in cache after reset") + }) }) t.Run("stream observations", func(t *testing.T) { @@ -681,22 +735,22 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) { }) t.Run("ValidAfterNanoseconds update behaviour when previous outcome has missing stream values", func(t *testing.T) { // channel 1 always has all stream values; channel 2 is missing stream 3 in the previous outcome. - // The test verifies the gap-prevention behaviour of disableNilStreamValues for channel 2. - tests := []struct { - name string - channel2Opts []byte - wantValidAfter2 uint64 - }{ - { - name: "default (disableNilStreamValues absent): ValidAfterNanoseconds still advances despite missing stream values; backwards compat for existing channels", - channel2Opts: nil, - wantValidAfter2: uint64(101 * time.Second), // validAfterNanoseconds still updated; gap behaviour preserved for backwards compat during rollout - }, - { - name: "disableNilStreamValues=true: ValidAfterNanoseconds does not advance when previous outcome has missing stream values", - channel2Opts: []byte(`{"disableNilStreamValues":true}`), - wantValidAfter2: uint64(100 * time.Second), // validAfterNanoseconds not updated; report gap prevented - }, + // The test verifies the gap-prevention behaviour of DisableNilStreamValues for channel 2. + tests := []struct { + name string + disableNilStreamValues bool + wantValidAfter2 uint64 + }{ + { + name: "DisableNilStreamValues=false: ValidAfterNanoseconds still advances despite missing stream values", + disableNilStreamValues: false, + wantValidAfter2: uint64(101 * time.Second), // validAfterNanoseconds still updated; gap behaviour preserved + }, + { + name: "default (DisableNilStreamValues true): ValidAfterNanoseconds does not advance when previous outcome has missing stream values", + disableNilStreamValues: true, + wantValidAfter2: uint64(100 * time.Second), // validAfterNanoseconds not updated; report gap prevented + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { @@ -706,9 +760,9 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) { Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}}, }, 2: { // requires streams 2 and 3; stream 3 is missing in the previous outcome - ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, - Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorQuote}}, - Opts: tc.channel2Opts, + ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, + Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorQuote}}, + DisableNilStreamValues: tc.disableNilStreamValues, }, } // previous outcome: channel 1 would have reported; channel 2 would not @@ -756,7 +810,7 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) { // channel 1 always advances (had all stream values in the previous outcome) assert.Equal(t, uint64(101*time.Second), decoded.ValidAfterNanoseconds[1]) - // channel 2 depends on disableNilStreamValues + // channel 2 depends on DisableNilStreamValues assert.Equal(t, tc.wantValidAfter2, decoded.ValidAfterNanoseconds[2]) }) } @@ -996,31 +1050,31 @@ func Test_Outcome_Methods(t *testing.T) { // Not reportable if retired outcome.LifeCycleStage = LifeCycleStageRetired - require.EqualError(t, outcome.IsReportable(cid, 0, 0), "ChannelID: 1; Reason: IsReportable=false; retired channel") + require.EqualError(t, outcome.IsReportable(cid, 0, 0, nil), "ChannelID: 1; Reason: IsReportable=false; retired channel") // No channel definition with ID outcome.LifeCycleStage = LifeCycleStageProduction outcome.ObservationTimestampNanoseconds = uint64(time.Unix(1726670490, 0).UnixNano()) //nolint:gosec // time won't be negative outcome.ChannelDefinitions = map[llotypes.ChannelID]llotypes.ChannelDefinition{} - require.EqualError(t, outcome.IsReportable(cid, 0, 0), "ChannelID: 1; Reason: IsReportable=false; no channel definition with this ID") + require.EqualError(t, outcome.IsReportable(cid, 0, 0, nil), "ChannelID: 1; Reason: IsReportable=false; no channel definition with this ID") // No ValidAfterNanoseconds yet outcome.ChannelDefinitions = map[llotypes.ChannelID]llotypes.ChannelDefinition{ cid: {}, } - require.EqualError(t, outcome.IsReportable(cid, 0, 0), "ChannelID: 1; Reason: IsReportable=false; no ValidAfterNanoseconds entry yet, this must be a new channel") + require.EqualError(t, outcome.IsReportable(cid, 0, 0, nil), "ChannelID: 1; Reason: IsReportable=false; no ValidAfterNanoseconds entry yet, this must be a new channel") // ValidAfterNanoseconds is in the future outcome.ValidAfterNanoseconds = map[llotypes.ChannelID]uint64{cid: uint64(1726670491 * time.Second)} - require.EqualError(t, outcome.IsReportable(cid, 0, 0), "ChannelID: 1; Reason: ChannelID: 1; Reason: IsReportable=false; not valid yet (observationsTimestampSeconds=1726670490, validAfterSeconds=1726670491)") + require.EqualError(t, outcome.IsReportable(cid, 0, 0, nil), "ChannelID: 1; Reason: ChannelID: 1; Reason: IsReportable=false; not valid yet (observationsTimestampSeconds=1726670490, validAfterSeconds=1726670491)") // ValidAfterSeconds=ObservationTimestampSeconds; IsReportable=false outcome.ValidAfterNanoseconds = map[llotypes.ChannelID]uint64{cid: uint64(1726670490 * time.Second)} - require.EqualError(t, outcome.IsReportable(cid, 0, 0), "ChannelID: 1; Reason: ChannelID: 1; Reason: IsReportable=false; not valid yet (observationsTimestampSeconds=1726670490, validAfterSeconds=1726670490)") + require.EqualError(t, outcome.IsReportable(cid, 0, 0, nil), "ChannelID: 1; Reason: ChannelID: 1; Reason: IsReportable=false; not valid yet (observationsTimestampSeconds=1726670490, validAfterSeconds=1726670490)") // ValidAfterSeconds= 1s, does report outcome.ValidAfterNanoseconds[cid] = obsTSNanos - uint64(1*time.Second) - assert.Nil(t, outcome.IsReportable(cid, 1, uint64(100*time.Millisecond))) + assert.Nil(t, outcome.IsReportable(cid, 1, uint64(100*time.Millisecond), nil)) // if cadence is exactly 1s, if time is >= 1s, does report - assert.Nil(t, outcome.IsReportable(cid, 1, uint64(1*time.Second))) + assert.Nil(t, outcome.IsReportable(cid, 1, uint64(1*time.Second), nil)) // if cadence is 5s, if time is < 5s, does not report because cadence hasn't elapsed - require.EqualError(t, outcome.IsReportable(cid, 1, uint64(5*time.Second)), "ChannelID: 1; Reason: IsReportable=false; not valid yet (ObservationTimestampNanoseconds=1726670490999999999, validAfterNanoseconds=1726670489999999999, minReportInterval=5000000000); 4.000000 seconds (4000000000ns) until reportable") + require.EqualError(t, outcome.IsReportable(cid, 1, uint64(5*time.Second), nil), "ChannelID: 1; Reason: IsReportable=false; not valid yet (ObservationTimestampNanoseconds=1726670490999999999, validAfterNanoseconds=1726670489999999999, minReportInterval=5000000000); 4.000000 seconds (4000000000ns) until reportable") }) t.Run("ReportableChannels", func(t *testing.T) { defaultMinReportInterval := uint64(1 * time.Second) @@ -1201,31 +1271,10 @@ func Test_Outcome_Methods(t *testing.T) { 3: uint64(1726670489 * time.Second), }, } - reportable, unreportable := outcome.ReportableChannels(1, defaultMinReportInterval) + reportable, unreportable := outcome.ReportableChannels(1, defaultMinReportInterval, nil) assert.Equal(t, []llotypes.ChannelID{1, 3}, reportable) require.Len(t, unreportable, 1) assert.Equal(t, "ChannelID: 2; Reason: IsReportable=false; no ValidAfterNanoseconds entry yet, this must be a new channel", unreportable[0].Error()) }) }) } - -func Test_nilStreamValuesDisabled(t *testing.T) { - t.Run("nil opts returns false (default)", func(t *testing.T) { - assert.False(t, nilStreamValuesDisabled(nil)) - }) - t.Run("empty opts returns false (default)", func(t *testing.T) { - assert.False(t, nilStreamValuesDisabled([]byte{})) - }) - t.Run("opts without the field returns false (default)", func(t *testing.T) { - assert.False(t, nilStreamValuesDisabled([]byte(`{"someOtherField":true}`))) - }) - t.Run("invalid JSON returns false (default)", func(t *testing.T) { - assert.False(t, nilStreamValuesDisabled([]byte(`not json`))) - }) - t.Run("disableNilStreamValues=false returns false", func(t *testing.T) { - assert.False(t, nilStreamValuesDisabled([]byte(`{"disableNilStreamValues":false}`))) - }) - t.Run("disableNilStreamValues=true returns true", func(t *testing.T) { - assert.True(t, nilStreamValuesDisabled([]byte(`{"disableNilStreamValues":true}`))) - }) -} diff --git a/llo/plugin_reports.go b/llo/plugin_reports.go index d77ad2f..30da8d4 100644 --- a/llo/plugin_reports.go +++ b/llo/plugin_reports.go @@ -46,7 +46,7 @@ func (p *Plugin) reports(ctx context.Context, seqNr uint64, rawOutcome ocr3types }) } - reportableChannels, unreportableChannels := outcome.ReportableChannels(p.ProtocolVersion, p.DefaultMinReportIntervalNanoseconds) + reportableChannels, unreportableChannels := outcome.ReportableChannels(p.ProtocolVersion, p.DefaultMinReportIntervalNanoseconds, p.OptsCache) if p.Config.VerboseLogging { p.Logger.Debugw("Reportable channels", "lifeCycleStage", outcome.LifeCycleStage, "reportableChannels", reportableChannels, "unreportableChannels", unreportableChannels, "stage", "Report", "seqNr", seqNr) } @@ -101,7 +101,7 @@ func (p *Plugin) encodeReport(r Report, cd llotypes.ChannelDefinition) (types.Re return nil, fmt.Errorf("codec missing for ReportFormat=%q", cd.ReportFormat) } p.captureReportTelemetry(r, cd) - return codec.Encode(r, cd) + return codec.Encode(r, cd, p.OptsCache) } func (p *Plugin) captureReportTelemetry(r Report, cd llotypes.ChannelDefinition) { diff --git a/llo/plugin_reports_test.go b/llo/plugin_reports_test.go index 99b702b..2672e8e 100644 --- a/llo/plugin_reports_test.go +++ b/llo/plugin_reports_test.go @@ -44,6 +44,7 @@ func testReports(t *testing.T, outcomeCodec OutcomeCodec) { RetirementReportCodec: StandardRetirementReportCodec{}, DefaultMinReportIntervalNanoseconds: uint64(minReportInterval), //nolint:gosec // time won't be negative ProtocolVersion: protocolVersion, + OptsCache: NewOptsCache(), } t.Run("ignores seqnr=0", func(t *testing.T) { @@ -266,26 +267,25 @@ func testReports(t *testing.T, outcomeCodec OutcomeCodec) { // previous ObservationTimestampNanoseconds and we produce reports for ranges [100s, 200s] and [200s, 400s]. }) - t.Run("channels with nil stream values that pass IsReportable are still dropped at encodeReport", func(t *testing.T) { - // This test shows that the two code paths produce the same "no report emitted" end result: - // 1. disableNilStreamValues=false (legacy): channel passes IsReportable, encodeReport fails on nil → no report - // 2. disableNilStreamValues=true (fix): channel fails IsReportable early → no report - // The critical difference is ValidAfterNanoseconds: path 1 still advances it (report gap risk); - // path 2 does not. This test exercises path 1 to confirm encodeReport is the fallback gate. + t.Run("channels with nil stream values that pass IsReportable do not produce reports if DisableNilStreamValues is false", func(t *testing.T) { + // This test shows that when DisableNilStreamValues=false, the channel passes IsReportable + // but encodeReport fails on nil → no report. The critical difference vs DisableNilStreamValues=true + // is that ValidAfterNanoseconds still advances (report gap risk). + // This test exercises the encodeReport fallback gate. outcome := Outcome{ ObservationTimestampNanoseconds: uint64(200 * time.Second), ValidAfterNanoseconds: map[llotypes.ChannelID]uint64{ 1: uint64(100 * time.Second), }, ChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ - 1: { - ReportFormat: llotypes.ReportFormatJSON, - Streams: []llotypes.Stream{ - {StreamID: 1, Aggregator: llotypes.AggregatorMedian}, - {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, - }, - // disableNilStreamValues not set: channel passes IsReportable despite nil stream 2 + 1: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{ + {StreamID: 1, Aggregator: llotypes.AggregatorMedian}, + {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, }, + DisableNilStreamValues: false, // channel passes IsReportable despite nil stream 2 + }, }, StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{ 1: {llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(1.1))}, @@ -293,8 +293,8 @@ func testReports(t *testing.T, outcomeCodec OutcomeCodec) { }, } - // Confirm channel 1 IS reportable (timing check passes, no disableNilStreamValues gate) - require.Nil(t, outcome.IsReportable(1, protocolVersion, uint64(minReportInterval))) + // Confirm channel 1 IS reportable (timing check passes, DisableNilStreamValues=false) + require.Nil(t, outcome.IsReportable(1, protocolVersion, uint64(minReportInterval), nil)) encoded, err := p.OutcomeCodec.Encode(outcome) require.NoError(t, err) @@ -339,12 +339,12 @@ func testReports(t *testing.T, outcomeCodec OutcomeCodec) { } // Verify tombstoned channel is not reportable - unreportableErr := outcome.IsReportable(1, protocolVersion, uint64(minReportInterval)) + unreportableErr := outcome.IsReportable(1, protocolVersion, uint64(minReportInterval), nil) require.NotNil(t, unreportableErr) assert.Contains(t, unreportableErr.Error(), "tombstone channel") // Verify non-tombstoned channel is reportable - require.Nil(t, outcome.IsReportable(2, protocolVersion, uint64(minReportInterval))) + require.Nil(t, outcome.IsReportable(2, protocolVersion, uint64(minReportInterval), nil)) encoded, err := p.OutcomeCodec.Encode(outcome) require.NoError(t, err) diff --git a/llo/reportcodecs/evm/report_codec_common.go b/llo/reportcodecs/evm/report_codec_common.go index b915a5e..fc8cb7f 100644 --- a/llo/reportcodecs/evm/report_codec_common.go +++ b/llo/reportcodecs/evm/report_codec_common.go @@ -11,6 +11,7 @@ import ( "github.com/shopspring/decimal" + "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" ubig "github.com/smartcontractkit/chainlink-data-streams/llo/reportcodecs/evm/utils" @@ -302,3 +303,23 @@ func EncodePackedBigInt(value *big.Int, typeStr string) ([]byte, error) { modValue.FillBytes(result) return result, nil } + +// ClampReportRange clamps the report range to the max report range if it exceeds the max range. +// Returns the clamped valid after nanoseconds. +// If the report range is within the max range, returns the valid after nanoseconds unchanged. +// If the max report range is not specified, uses the default max report range. +func ClampReportRange(r logger.Logger, report llo.Report, maxReportRange llo.Duration) uint64 { + if maxReportRange == 0 { + maxReportRange = llo.DefaultMaxReportRange + } + + if report.ObservationTimestampNanoseconds-report.ValidAfterNanoseconds > uint64(maxReportRange) { + r.Warnw("Report range exceeds max report range", + "channelID", report.ChannelID, "seqNr", report.SeqNr, + "maxReportRange", maxReportRange, "clamping to max range", maxReportRange.String()) + + return report.ObservationTimestampNanoseconds - uint64(maxReportRange) + } + + return report.ValidAfterNanoseconds +} diff --git a/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked.go b/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked.go index c6b6438..0acc78a 100644 --- a/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked.go +++ b/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked.go @@ -60,16 +60,10 @@ type ReportFormatEVMABIEncodeOpts struct { // Seconds use uint32 ABI encoding, while milliseconds/microseconds/nanoseconds use uint64. // Defaults to "s" (seconds) if not specified. TimeResolution llo.TimeResolution `json:"TimeResolution,omitempty"` - // DisableNilStreamValues controls whether channels with nil stream values - // are reportable. When false (default), nil stream values are allowed and - // channels are reportable. Set to true to make channels with missing - // stream values unreportable. - // - // This field is also read by the outcome plugin via independent JSON - // parsing (see nilStreamValuesDisabled in plugin_outcome.go). It is - // declared here so that Decode's DisallowUnknownFields does not reject - // channel opts that include it; the codec itself does not act on it. - DisableNilStreamValues bool `json:"disableNilStreamValues,omitempty"` + // MaxReportRange is the maximum range of the report. + // The range will be limited to ObservationTimestamp + MaxReportRange if the report is longer than the max range. + // Defaults to 5 minutes if not specified. + MaxReportRange llo.Duration `json:"maxReportRange,omitempty"` } func (r *ReportFormatEVMABIEncodeOpts) Decode(opts []byte) error { @@ -91,7 +85,7 @@ type BaseReportFields struct { ExpiresAt uint64 } -func (r ReportCodecEVMABIEncodeUnpacked) Encode(report llo.Report, cd llotypes.ChannelDefinition) ([]byte, error) { +func (r ReportCodecEVMABIEncodeUnpacked) Encode(report llo.Report, cd llotypes.ChannelDefinition, optsCache *llo.OptsCache) ([]byte, error) { if report.Specimen { return nil, errors.New("ReportCodecEVMABIEncodeUnpacked does not support encoding specimen reports") } @@ -107,14 +101,12 @@ func (r ReportCodecEVMABIEncodeUnpacked) Encode(report llo.Report, cd llotypes.C return nil, fmt.Errorf("ReportCodecEVMABIEncodeUnpacked failed to extract link price: %w", err) } - // NOTE: It seems suboptimal to have to parse the opts on every encode but - // not sure how to avoid it. Should be negligible performance hit as long - // as Opts is small. - opts := ReportFormatEVMABIEncodeOpts{} - if err = (&opts).Decode(cd.Opts); err != nil { - return nil, fmt.Errorf("failed to decode opts; got: '%s'; %w", cd.Opts, err) + opts, getErr := llo.GetOpts[ReportFormatEVMABIEncodeOpts](optsCache, report.ChannelID) + if getErr != nil { + return nil, fmt.Errorf("opts not in cache for channel %d: %w", report.ChannelID, getErr) } + report.ValidAfterNanoseconds = ClampReportRange(r, report, opts.MaxReportRange) validAfter := llo.ConvertTimestamp(report.ValidAfterNanoseconds, opts.TimeResolution) observationTimestamp := llo.ConvertTimestamp(report.ObservationTimestampNanoseconds, opts.TimeResolution) expiresAt := observationTimestamp + llo.ScaleSeconds(opts.ExpirationWindow, opts.TimeResolution) diff --git a/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_expr.go b/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_expr.go index eaee75b..7237173 100644 --- a/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_expr.go +++ b/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_expr.go @@ -25,7 +25,7 @@ func NewReportCodecEVMABIEncodeUnpackedExpr(lggr logger.Logger, donID uint32) Re return ReportCodecEVMABIEncodeUnpackedExpr{logger.Sugared(lggr).Named("ReportCodecEVMABIEncodeUnpackedExpr"), donID} } -func (r ReportCodecEVMABIEncodeUnpackedExpr) Encode(report llo.Report, cd llotypes.ChannelDefinition) ([]byte, error) { +func (r ReportCodecEVMABIEncodeUnpackedExpr) Encode(report llo.Report, cd llotypes.ChannelDefinition, optsCache *llo.OptsCache) ([]byte, error) { if report.Specimen { return nil, errors.New("ReportCodecEVMABIEncodeUnpackedExpr does not support encoding specimen reports") } @@ -41,12 +41,9 @@ func (r ReportCodecEVMABIEncodeUnpackedExpr) Encode(report llo.Report, cd llotyp return nil, fmt.Errorf("ReportCodecEVMABIEncodeUnpackedExpr failed to extract link price: %w", err) } - // NOTE: It seems suboptimal to have to parse the opts on every encode but - // not sure how to avoid it. Should be negligible performance hit as long - // as Opts is small. - opts := ReportFormatEVMABIEncodeOpts{} - if err = (&opts).Decode(cd.Opts); err != nil { - return nil, fmt.Errorf("ReportCodecEVMABIEncodeUnpackedExpr failed to decode opts; got: '%s'; %w", cd.Opts, err) + opts, getErr := llo.GetOpts[ReportFormatEVMABIEncodeOpts](optsCache, report.ChannelID) + if getErr != nil { + return nil, fmt.Errorf("opts not in cache for channel %d: %w", report.ChannelID, getErr) } if len(opts.ABI) < 1 { @@ -58,6 +55,7 @@ func (r ReportCodecEVMABIEncodeUnpackedExpr) Encode(report llo.Report, cd llotyp return nil, fmt.Errorf("ReportCodecEVMABIEncodeUnpackedExpr not enough streams for calculated streams; expected: %d, got: %d", opts.ABI[len(opts.ABI)-1].encoders[0].ExpressionStreamID, len(cd.Streams)) } + report.ValidAfterNanoseconds = ClampReportRange(r, report, opts.MaxReportRange) validAfter := llo.ConvertTimestamp(report.ValidAfterNanoseconds, opts.TimeResolution) observationTimestamp := llo.ConvertTimestamp(report.ObservationTimestampNanoseconds, opts.TimeResolution) expiresAt := observationTimestamp + llo.ScaleSeconds(opts.ExpirationWindow, opts.TimeResolution) diff --git a/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_expr_test.go b/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_expr_test.go index cd892cb..f0720e7 100644 --- a/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_expr_test.go +++ b/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_expr_test.go @@ -18,6 +18,7 @@ import ( ubig "github.com/smartcontractkit/chainlink-data-streams/llo/reportcodecs/evm/utils" + "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" ) @@ -71,8 +72,10 @@ func TestReportCodecEVMABIEncodeUnpackedExpr_Encode(t *testing.T) { Opts: serializedOpts, } + cache := llo.NewOptsCache() + cache.Set(report.ChannelID, cd.Opts) codec := ReportCodecEVMABIEncodeUnpackedExpr{} - _, err = codec.Encode(report, cd) + _, err = codec.Encode(report, cd, cache) require.Error(t, err) assert.Contains(t, err.Error(), "ReportCodecEVMABIEncodeUnpackedExpr no expressions found in channel definition") }) @@ -148,8 +151,12 @@ func TestReportCodecEVMABIEncodeUnpackedExpr_Encode(t *testing.T) { Opts: serializedOpts, } - codec := ReportCodecEVMABIEncodeUnpackedExpr{} - encoded, err := codec.Encode(report, cd) + codec := ReportCodecEVMABIEncodeUnpackedExpr{Logger: logger.Nop()} + clampedValidAfterNanos := ClampReportRange(logger.Nop(), report, 0) + + cache := llo.NewOptsCache() + cache.Set(report.ChannelID, cd.Opts) + encoded, err := codec.Encode(report, cd, cache) require.NoError(t, err) values, err := expectedDEXBasedAssetSchema.Unpack(encoded) @@ -164,12 +171,12 @@ func TestReportCodecEVMABIEncodeUnpackedExpr_Encode(t *testing.T) { for i := range report.Values { report.Values[i] = nil } - _, err = codec.Encode(report, cd) + _, err = codec.Encode(report, cd, cache) require.Error(t, err) return AllTrue([]bool{ assert.Equal(t, sampleFeedID, (common.Hash)(values[0].([32]byte))), //nolint:testifylint // false positive // feedId - assert.Equal(t, uint32(sampleValidAfterNanoseconds/1e9)+1, values[1].(uint32)), //nolint:gosec // G115 // validFromTimestamp + assert.Equal(t, uint32(clampedValidAfterNanos/1e9)+1, values[1].(uint32)), //nolint:gosec // G115 // validFromTimestamp assert.Equal(t, uint32(sampleObservationTimestampNanoseconds/1e9), values[2].(uint32)), //nolint:gosec // G115 // observationsTimestamp assert.Equal(t, expectedLinkFee.String(), values[3].(*big.Int).String()), // nativeFee (Values[0] is link benchmark in test data) assert.Equal(t, expectedNativeFee.String(), values[4].(*big.Int).String()), // linkFee (Values[1] is native benchmark in test data) @@ -271,8 +278,12 @@ func TestReportCodecEVMABIEncodeUnpackedExpr_Encode(t *testing.T) { Opts: serializedOpts, } - codec := ReportCodecEVMABIEncodeUnpackedExpr{} - encoded, err := codec.Encode(report, cd) + codec := ReportCodecEVMABIEncodeUnpackedExpr{Logger: logger.Nop()} + clampedValidAfterNanos := ClampReportRange(logger.Nop(), report, 0) + + cache := llo.NewOptsCache() + cache.Set(report.ChannelID, cd.Opts) + encoded, err := codec.Encode(report, cd, cache) require.NoError(t, err) values, err := schema.Unpack(encoded) @@ -292,7 +303,7 @@ func TestReportCodecEVMABIEncodeUnpackedExpr_Encode(t *testing.T) { } // Verify timestamps per resolution type - expectedValidFrom := llo.ConvertTimestamp(sampleValidAfterNanoseconds, sampleTimeResolution) + 1 + expectedValidFrom := llo.ConvertTimestamp(clampedValidAfterNanos, sampleTimeResolution) + 1 expectedObservationTimestamp := llo.ConvertTimestamp(sampleObservationTimestampNanoseconds, sampleTimeResolution) expectedExpiresAt := expectedObservationTimestamp + llo.ScaleSeconds(sampleExpirationWindow, sampleTimeResolution) if timestampType == "uint32" { diff --git a/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_test.go b/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_test.go index a5ecc64..1fb9c59 100644 --- a/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_test.go +++ b/llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_test.go @@ -7,6 +7,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -21,6 +22,7 @@ import ( ubig "github.com/smartcontractkit/chainlink-data-streams/llo/reportcodecs/evm/utils" + "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" ) @@ -46,7 +48,7 @@ func TestReportFormatEVMABIEncodeOpts_Decode_Encode_properties(t *testing.T) { err = decoded.Decode(encoded) require.NoError(t, err) - return decoded.BaseUSDFee.Equal(opts.BaseUSDFee) && decoded.ExpirationWindow == opts.ExpirationWindow && decoded.FeedID == opts.FeedID && assert.Equal(t, opts.ABI, decoded.ABI) && decoded.DisableNilStreamValues == opts.DisableNilStreamValues + return decoded.BaseUSDFee.Equal(opts.BaseUSDFee) && decoded.ExpirationWindow == opts.ExpirationWindow && decoded.FeedID == opts.FeedID && assert.Equal(t, opts.ABI, decoded.ABI) } properties.Property("Encodes values", prop.ForAll( runTest, @@ -56,7 +58,7 @@ func TestReportFormatEVMABIEncodeOpts_Decode_Encode_properties(t *testing.T) { "FeedID": genFeedID(), "ABI": genABI(), "TimeResolution": genTimeResolution(), - "DisableNilStreamValues": gen.Bool(), + "MaxReportRange": genMaxReportRange(), }))) properties.TestingRun(t) @@ -81,8 +83,12 @@ func genSingleABIEncoder() gopter.Gen { }) } +func genMaxReportRange() gopter.Gen { + return gen.OneConstOf(llo.Duration(0), llo.Duration(60*time.Minute)) +} + func TestReportCodecEVMABIEncodeUnpacked_Encode_properties(t *testing.T) { - codec := ReportCodecEVMABIEncodeUnpacked{} + codec := ReportCodecEVMABIEncodeUnpacked{Logger: logger.Nop()} properties := gopter.NewProperties(nil) @@ -154,7 +160,11 @@ func TestReportCodecEVMABIEncodeUnpacked_Encode_properties(t *testing.T) { Opts: serializedOpts, } - encoded, err := codec.Encode(report, cd) + clampedValidAfterNanos := ClampReportRange(logger.Nop(), report, 0) + + cache := llo.NewOptsCache() + cache.Set(report.ChannelID, cd.Opts) + encoded, err := codec.Encode(report, cd, cache) require.NoError(t, err) values, err := expectedDEXBasedAssetSchema.Unpack(encoded) @@ -169,12 +179,12 @@ func TestReportCodecEVMABIEncodeUnpacked_Encode_properties(t *testing.T) { for i := range report.Values { report.Values[i] = nil } - _, err = codec.Encode(report, cd) + _, err = codec.Encode(report, cd, cache) require.Error(t, err) return AllTrue([]bool{ assert.Equal(t, sampleFeedID, (common.Hash)(values[0].([32]byte))), //nolint:testifylint // false positive // feedId - assert.Equal(t, uint32(sampleValidAfterNanoseconds/1e9)+1, values[1].(uint32)), //nolint:gosec // G115 // validFromTimestamp + assert.Equal(t, uint32(clampedValidAfterNanos/1e9)+1, values[1].(uint32)), //nolint:gosec // G115 // validFromTimestamp assert.Equal(t, uint32(sampleObservationTimestampNanoseconds/1e9), values[2].(uint32)), //nolint:gosec // G115 // observationsTimestamp assert.Equal(t, expectedLinkFee.String(), values[3].(*big.Int).String()), // linkFee assert.Equal(t, expectedNativeFee.String(), values[4].(*big.Int).String()), // nativeFee @@ -265,7 +275,11 @@ func TestReportCodecEVMABIEncodeUnpacked_Encode_properties(t *testing.T) { Opts: serializedOpts, } - encoded, err := codec.Encode(report, cd) + clampedValidAfterNanos := ClampReportRange(logger.Nop(), report, 0) + + cache := llo.NewOptsCache() + cache.Set(report.ChannelID, cd.Opts) + encoded, err := codec.Encode(report, cd, cache) require.NoError(t, err) values, err := expectedRWASchema.Unpack(encoded) @@ -280,12 +294,12 @@ func TestReportCodecEVMABIEncodeUnpacked_Encode_properties(t *testing.T) { for i := range report.Values { report.Values[i] = nil } - _, err = codec.Encode(report, cd) + _, err = codec.Encode(report, cd, cache) require.Error(t, err) return AllTrue([]bool{ assert.Equal(t, sampleFeedID, (common.Hash)(values[0].([32]byte))), //nolint:testifylint // false positive // feedId - assert.Equal(t, uint32(sampleValidAfterNanoseconds/1e9)+1, values[1].(uint32)), //nolint:gosec // G115 // validFromTimestamp + assert.Equal(t, uint32(clampedValidAfterNanos/1e9)+1, values[1].(uint32)), //nolint:gosec // G115 // validFromTimestamp assert.Equal(t, uint32(sampleObservationTimestampNanoseconds/1e9), values[2].(uint32)), //nolint:gosec // G115 // observationsTimestamp assert.Equal(t, expectedLinkFee.String(), values[3].(*big.Int).String()), // linkFee assert.Equal(t, expectedNativeFee.String(), values[4].(*big.Int).String()), // nativeFee @@ -363,7 +377,11 @@ func TestReportCodecEVMABIEncodeUnpacked_Encode_properties(t *testing.T) { Opts: serializedOpts, } - encoded, err := codec.Encode(report, cd) + clampedValidAfterNanos := ClampReportRange(logger.Nop(), report, 0) + + cache := llo.NewOptsCache() + cache.Set(report.ChannelID, cd.Opts) + encoded, err := codec.Encode(report, cd, cache) require.NoError(t, err) values, err := expectedDEXBasedAssetSchema.Unpack(encoded) @@ -378,12 +396,12 @@ func TestReportCodecEVMABIEncodeUnpacked_Encode_properties(t *testing.T) { for i := range report.Values { report.Values[i] = nil } - _, err = codec.Encode(report, cd) + _, err = codec.Encode(report, cd, cache) require.Error(t, err) return AllTrue([]bool{ assert.Equal(t, sampleFeedID, (common.Hash)(values[0].([32]byte))), //nolint:testifylint // false positive // feedId - assert.Equal(t, uint32(sampleValidAfterNanoseconds/1e9)+1, values[1].(uint32)), //nolint:gosec // G115 // validFromTimestamp + assert.Equal(t, uint32(clampedValidAfterNanos/1e9)+1, values[1].(uint32)), //nolint:gosec // G115 // validFromTimestamp assert.Equal(t, uint32(sampleObservationTimestampNanoseconds/1e9), values[2].(uint32)), //nolint:gosec // G115 // observationsTimestamp assert.Equal(t, expectedLinkFee.String(), values[3].(*big.Int).String()), // linkFee assert.Equal(t, expectedNativeFee.String(), values[4].(*big.Int).String()), // nativeFee @@ -469,7 +487,11 @@ func TestReportCodecEVMABIEncodeUnpacked_Encode_properties(t *testing.T) { Opts: serializedOpts, } - encoded, err := codec.Encode(report, cd) + clampedValidAfterNanos := ClampReportRange(logger.Nop(), report, 0) + + cache := llo.NewOptsCache() + cache.Set(report.ChannelID, cd.Opts) + encoded, err := codec.Encode(report, cd, cache) require.NoError(t, err) values, err := schema.Unpack(encoded) @@ -487,7 +509,7 @@ func TestReportCodecEVMABIEncodeUnpacked_Encode_properties(t *testing.T) { } // Verify timestamps per resolution type - expectedValidFrom := llo.ConvertTimestamp(sampleValidAfterNanoseconds, sampleTimeResolution) + 1 + expectedValidFrom := llo.ConvertTimestamp(clampedValidAfterNanos, sampleTimeResolution) + 1 expectedObservationTimestamp := llo.ConvertTimestamp(sampleObservationTimestampNanoseconds, sampleTimeResolution) expectedExpiresAt := expectedObservationTimestamp + llo.ScaleSeconds(sampleExpirationWindow, sampleTimeResolution) if timestampType == "uint32" { @@ -599,7 +621,11 @@ func TestReportCodecEVMABIEncodeUnpacked_Encode_properties(t *testing.T) { Opts: serializedOpts, } - encoded, err := codec.Encode(report, cd) + clampedValidAfterNanos := ClampReportRange(logger.Nop(), report, 0) + + cache := llo.NewOptsCache() + cache.Set(report.ChannelID, cd.Opts) + encoded, err := codec.Encode(report, cd, cache) require.NoError(t, err) values, err := expectedFundingRateSchema.Unpack(encoded) @@ -614,12 +640,12 @@ func TestReportCodecEVMABIEncodeUnpacked_Encode_properties(t *testing.T) { for i := range report.Values { report.Values[i] = nil } - _, err = codec.Encode(report, cd) + _, err = codec.Encode(report, cd, cache) require.Error(t, err) return AllTrue([]bool{ assert.Equal(t, sampleFeedID, (common.Hash)(values[0].([32]byte))), //nolint:testifylint // false positive // feedId - assert.Equal(t, uint32(sampleValidAfterNanoseconds/1e9)+1, values[1].(uint32)), //nolint:gosec // G115 // validFromTimestamp + assert.Equal(t, uint32(clampedValidAfterNanos/1e9)+1, values[1].(uint32)), //nolint:gosec // G115 // validFromTimestamp assert.Equal(t, uint32(sampleObservationTimestampNanoseconds/1e9), values[2].(uint32)), //nolint:gosec // G115 // observationsTimestamp assert.Equal(t, expectedLinkFee.String(), values[3].(*big.Int).String()), // linkFee assert.Equal(t, expectedNativeFee.String(), values[4].(*big.Int).String()), // nativeFee @@ -699,12 +725,14 @@ func TestReportCodecEVMABIEncodeUnpacked_Encode(t *testing.T) { Opts: serializedOpts, } + cache := llo.NewOptsCache() + cache.Set(report.ChannelID, cd.Opts) codec := ReportCodecEVMABIEncodeUnpacked{} - _, err = codec.Encode(report, cd) + _, err = codec.Encode(report, cd, cache) require.EqualError(t, err, "failed to build payload; ABI and values length mismatch; ABI: 0, Values: 3") report.Values = []llo.StreamValue{} - _, err = codec.Encode(report, cd) + _, err = codec.Encode(report, cd, cache) require.EqualError(t, err, "ReportCodecEVMABIEncodeUnpacked requires at least 2 values (NativePrice, LinkPrice, ...); got report.Values: []") }) } @@ -811,7 +839,6 @@ func genFundingIntervalHours() gopter.Gen { }) } - func mustNewABIType(t string) abi.Type { result, err := abi.NewType(t, "", []abi.ArgumentMarshaling{}) if err != nil { diff --git a/llo/reportcodecs/evm/report_codec_evm_streamlined.go b/llo/reportcodecs/evm/report_codec_evm_streamlined.go index 28cd954..a08474a 100644 --- a/llo/reportcodecs/evm/report_codec_evm_streamlined.go +++ b/llo/reportcodecs/evm/report_codec_evm_streamlined.go @@ -14,6 +14,7 @@ import ( ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" ) @@ -22,17 +23,20 @@ var ( _ llo.ReportCodec = ReportCodecEVMStreamlined{} ) -func NewReportCodecStreamlined() ReportCodecEVMStreamlined { - return ReportCodecEVMStreamlined{} +func NewReportCodecStreamlined(lggr logger.Logger) ReportCodecEVMStreamlined { + return ReportCodecEVMStreamlined{logger.Sugared(lggr).Named("ReportCodecEVMStreamlined")} } -type ReportCodecEVMStreamlined struct{} +type ReportCodecEVMStreamlined struct { + logger.Logger +} -func (rc ReportCodecEVMStreamlined) Encode(r llo.Report, cd llotypes.ChannelDefinition) (payload []byte, err error) { - opts := ReportFormatEVMStreamlinedOpts{} - if err = (&opts).Decode(cd.Opts); err != nil { - return nil, fmt.Errorf("failed to decode opts; got: '%s'; %w", cd.Opts, err) +func (rc ReportCodecEVMStreamlined) Encode(r llo.Report, cd llotypes.ChannelDefinition, optsCache *llo.OptsCache) (payload []byte, err error) { + opts, getErr := llo.GetOpts[ReportFormatEVMStreamlinedOpts](optsCache, r.ChannelID) + if getErr != nil { + return nil, fmt.Errorf("opts not in cache for channel %d: %w", r.ChannelID, getErr) } + r.ValidAfterNanoseconds = ClampReportRange(rc, r, opts.MaxReportRange) if opts.FeedID == nil { payload = append( @@ -168,16 +172,10 @@ type ReportFormatEVMStreamlinedOpts struct { // The total number of streams must be n, where n is the number of // top-level elements in this ABI array ABI []ABIEncoder `json:"abi"` - // DisableNilStreamValues controls whether channels with nil stream values - // are reportable. When false (default), nil stream values are allowed and - // channels are reportable. Set to true to make channels with missing - // stream values unreportable. - // - // This field is also read by the outcome plugin via independent JSON - // parsing (see nilStreamValuesDisabled in plugin_outcome.go). It is - // declared here so that Decode's DisallowUnknownFields does not reject - // channel opts that include it; the codec itself does not act on it. - DisableNilStreamValues bool `json:"disableNilStreamValues,omitempty"` + // MaxReportRange is the maximum range of the report. + // The range will be limited to ObservationTimestamp + MaxReportRange if the report is longer than the max range. + // Defaults to 5 minutes if not specified. + MaxReportRange llo.Duration `json:"maxReportRange,omitempty"` } func (r *ReportFormatEVMStreamlinedOpts) Decode(opts []byte) error { diff --git a/llo/reportcodecs/evm/report_codec_evm_streamlined_test.go b/llo/reportcodecs/evm/report_codec_evm_streamlined_test.go index faa1d83..800499e 100644 --- a/llo/reportcodecs/evm/report_codec_evm_streamlined_test.go +++ b/llo/reportcodecs/evm/report_codec_evm_streamlined_test.go @@ -13,13 +13,14 @@ import ( ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" ) func TestReportCodecEVMStreamlined(t *testing.T) { t.Parallel() - codec := ReportCodecEVMStreamlined{} + codec := ReportCodecEVMStreamlined{Logger: logger.Nop()} t.Run("Encode", func(t *testing.T) { t.Run("one value, without feed ID - fits into one evm word", func(t *testing.T) { @@ -27,13 +28,17 @@ func TestReportCodecEVMStreamlined(t *testing.T) { ReportFormat: 42, Opts: []byte(`{"abi":[{"type":"int128"}]}`), } + cache := llo.NewOptsCache() + cache.Set(1, cd.Opts) + validAfter := uint64(1234567890) payload, err := codec.Encode(llo.Report{ - ChannelID: 1, - ValidAfterNanoseconds: 1234567890, + ChannelID: 1, + ValidAfterNanoseconds: validAfter, + ObservationTimestampNanoseconds: validAfter, // within range so ClampReportRange does not change validAfter Values: []llo.StreamValue{ llo.ToDecimal(decimal.NewFromFloat(1123455935.123)), }, - }, cd) + }, cd, cache) require.NoError(t, err) require.Len(t, payload, 32) // Report Format @@ -50,13 +55,17 @@ func TestReportCodecEVMStreamlined(t *testing.T) { cd := llotypes.ChannelDefinition{ Opts: []byte(fmt.Sprintf(`{"abi":[{"type":"int192"}], "feedID":"0x%s"}`, feedID)), } + cache := llo.NewOptsCache() + cache.Set(1, cd.Opts) + validAfter := uint64(1234567890) payload, err := codec.Encode(llo.Report{ - ChannelID: 1, - ValidAfterNanoseconds: 1234567890, + ChannelID: 1, + ValidAfterNanoseconds: validAfter, + ObservationTimestampNanoseconds: validAfter, // within range so ClampReportRange does not change validAfter Values: []llo.StreamValue{ llo.ToDecimal(decimal.NewFromFloat(1123455935.123)), }, - }, cd) + }, cd, cache) require.NoError(t, err) require.Len(t, payload, 64) assert.Equal(t, feedID, hex.EncodeToString(payload[:32])) // feed id @@ -84,7 +93,7 @@ func TestReportCodecEVMStreamlined(t *testing.T) { Streams: []llotypes.Stream{ {StreamID: 123, Aggregator: llotypes.AggregatorMedian}, }, - Opts: []byte(`{"abi":[{"type":"int160"}], "disableNilStreamValues":false}`), + Opts: []byte(`{"abi":[{"type":"int160"}]}`), }) require.NoError(t, err) t.Run("with invalid opts", func(t *testing.T) { diff --git a/llo/reportcodecs/evm/report_codec_premium_legacy.go b/llo/reportcodecs/evm/report_codec_premium_legacy.go index 1d503b2..930fdad 100644 --- a/llo/reportcodecs/evm/report_codec_premium_legacy.go +++ b/llo/reportcodecs/evm/report_codec_premium_legacy.go @@ -65,16 +65,10 @@ type ReportFormatEVMPremiumLegacyOpts struct { // Multiplier is used to scale the bid, benchmark and ask values in the // report. If not specified, or zero is used, a multiplier of 1 is assumed. Multiplier *ubig.Big `json:"multiplier"` - // DisableNilStreamValues controls whether channels with nil stream values - // are reportable. When false (default), nil stream values are allowed and - // channels are reportable. Set to true to make channels with missing - // stream values unreportable. - // - // This field is also read by the outcome plugin via independent JSON - // parsing (see nilStreamValuesDisabled in plugin_outcome.go). It is - // declared here so that Decode's DisallowUnknownFields does not reject - // channel opts that include it; the codec itself does not act on it. - DisableNilStreamValues bool `json:"disableNilStreamValues,omitempty"` + // MaxReportRange is the maximum range of the report. + // The range will be limited to ObservationTimestamp + MaxReportRange if the report is longer than the max range. + // Defaults to 5 minutes if not specified. + MaxReportRange llo.Duration `json:"maxReportRange,omitempty"` } func (r *ReportFormatEVMPremiumLegacyOpts) Decode(opts []byte) error { @@ -87,7 +81,7 @@ func (r *ReportFormatEVMPremiumLegacyOpts) Decode(opts []byte) error { return decoder.Decode(r) } -func (r ReportCodecPremiumLegacy) Encode(report llo.Report, cd llotypes.ChannelDefinition) ([]byte, error) { +func (r ReportCodecPremiumLegacy) Encode(report llo.Report, cd llotypes.ChannelDefinition, optsCache *llo.OptsCache) ([]byte, error) { if report.Specimen { return nil, errors.New("ReportCodecPremiumLegacy does not support encoding specimen reports") } @@ -96,13 +90,12 @@ func (r ReportCodecPremiumLegacy) Encode(report llo.Report, cd llotypes.ChannelD return nil, fmt.Errorf("ReportCodecPremiumLegacy cannot encode; got unusable report; %w", err) } - // NOTE: It seems suboptimal to have to parse the opts on every encode but - // not sure how to avoid it. Should be negligible performance hit as long - // as Opts is small. - opts := ReportFormatEVMPremiumLegacyOpts{} - if err = (&opts).Decode(cd.Opts); err != nil { - return nil, fmt.Errorf("failed to decode opts; got: '%s'; %w", cd.Opts, err) + opts, getErr := llo.GetOpts[ReportFormatEVMPremiumLegacyOpts](optsCache, report.ChannelID) + if getErr != nil { + return nil, fmt.Errorf("opts not in cache for channel %d: %w", report.ChannelID, getErr) } + + report.ValidAfterNanoseconds = ClampReportRange(r, report, opts.MaxReportRange) var multiplier decimal.Decimal if opts.Multiplier == nil { multiplier = decimal.NewFromInt(1) diff --git a/llo/reportcodecs/evm/report_codec_premium_legacy_test.go b/llo/reportcodecs/evm/report_codec_premium_legacy_test.go index 3e5173c..cf791e4 100644 --- a/llo/reportcodecs/evm/report_codec_premium_legacy_test.go +++ b/llo/reportcodecs/evm/report_codec_premium_legacy_test.go @@ -31,8 +31,10 @@ func FuzzReportCodecPremiumLegacy_Decode(f *testing.F) { cd := llotypes.ChannelDefinition{Opts: llotypes.ChannelOpts(fmt.Sprintf(`{"baseUSDFee":"10.50","expirationWindow":60,"feedId":"0x%x","multiplier":10}`, feedID))} codec := ReportCodecPremiumLegacy{logger.Test(f), 100002} + cache := llo.NewOptsCache() + cache.Set(validReport.ChannelID, cd.Opts) - validEncodedReport, err := codec.Encode(validReport, cd) + validEncodedReport, err := codec.Encode(validReport, cd, cache) require.NoError(f, err) f.Add(validEncodedReport) @@ -60,7 +62,9 @@ func Test_ReportCodecPremiumLegacy(t *testing.T) { cd := llotypes.ChannelDefinition{Opts: llotypes.ChannelOpts(fmt.Sprintf(`{"baseUSDFee":"10.50","expirationWindow":60,"feedId":"0x%x","multiplier":10}`, feedID))} t.Run("Encode errors if no values", func(t *testing.T) { - _, err := rc.Encode(llo.Report{}, cd) + cache := llo.NewOptsCache() + cache.Set(0, cd.Opts) + _, err := rc.Encode(llo.Report{}, cd, cache) require.Error(t, err) assert.Contains(t, err.Error(), "ReportCodecPremiumLegacy cannot encode; got unusable report; ReportCodecPremiumLegacy requires exactly 3 values (NativePrice, LinkPrice, Quote{Bid, Mid, Ask}); got report.Values: []") @@ -69,16 +73,20 @@ func Test_ReportCodecPremiumLegacy(t *testing.T) { t.Run("does not encode specimen reports", func(t *testing.T) { report := newValidPremiumLegacyReport() report.Specimen = true + cache := llo.NewOptsCache() + cache.Set(report.ChannelID, cd.Opts) - _, err := rc.Encode(report, cd) + _, err := rc.Encode(report, cd, cache) require.Error(t, err) require.EqualError(t, err, "ReportCodecPremiumLegacy does not support encoding specimen reports") }) t.Run("Encode constructs a report from observations", func(t *testing.T) { report := newValidPremiumLegacyReport() + cache := llo.NewOptsCache() + cache.Set(report.ChannelID, cd.Opts) - encoded, err := rc.Encode(report, cd) + encoded, err := rc.Encode(report, cd, cache) require.NoError(t, err) assert.Len(t, encoded, 288) @@ -118,8 +126,10 @@ func Test_ReportCodecPremiumLegacy(t *testing.T) { report := llo.Report{ Values: []llo.StreamValue{nil, nil, &llo.Quote{Bid: decimal.NewFromInt(37), Benchmark: decimal.NewFromInt(38), Ask: decimal.NewFromInt(39)}}, } + cache := llo.NewOptsCache() + cache.Set(report.ChannelID, cd.Opts) - encoded, err := rc.Encode(report, cd) + encoded, err := rc.Encode(report, cd, cache) require.NoError(t, err) assert.Len(t, encoded, 288) @@ -327,43 +337,4 @@ func Test_ReportCodecPremiumLegacy_Verify(t *testing.T) { err := c.Verify(cd) require.NoError(t, err) }) - t.Run("disableNilStreamValues=undefined does not error", func(t *testing.T) { - cd := llotypes.ChannelDefinition{ - Streams: []llotypes.Stream{ - {StreamID: 1, Aggregator: llotypes.AggregatorMedian}, - {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, - {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, - }, - ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, - Opts: []byte(`{"baseUSDFee":"1","feedID":"0x1111111111111111111111111111111111111111111111111111111111111111"}`), - } - err := c.Verify(cd) - require.NoError(t, err) - }) - t.Run("disableNilStreamValues=true is a known field and does not error", func(t *testing.T) { - cd := llotypes.ChannelDefinition{ - Streams: []llotypes.Stream{ - {StreamID: 1, Aggregator: llotypes.AggregatorMedian}, - {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, - {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, - }, - ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, - Opts: []byte(`{"baseUSDFee":"1","feedID":"0x1111111111111111111111111111111111111111111111111111111111111111","disableNilStreamValues":true}`), - } - err := c.Verify(cd) - require.NoError(t, err) - }) - t.Run("disableNilStreamValues=false is a known field and does not error", func(t *testing.T) { - cd := llotypes.ChannelDefinition{ - Streams: []llotypes.Stream{ - {StreamID: 1, Aggregator: llotypes.AggregatorMedian}, - {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, - {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, - }, - ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, - Opts: []byte(`{"baseUSDFee":"1","feedID":"0x1111111111111111111111111111111111111111111111111111111111111111","disableNilStreamValues":false}`), - } - err := c.Verify(cd) - require.NoError(t, err) - }) } diff --git a/llo/stream_calculated.go b/llo/stream_calculated.go index 8a13364..3ce8e48 100644 --- a/llo/stream_calculated.go +++ b/llo/stream_calculated.go @@ -9,8 +9,6 @@ import ( "sync" "time" - "github.com/goccy/go-json" - "github.com/expr-lang/expr" "github.com/expr-lang/expr/ast" "github.com/expr-lang/expr/parser" @@ -51,7 +49,7 @@ var ( "Ceil": Ceil, "Floor": Floor, "Avg": Avg, - "Duration": Duration, + "Duration": ParseDuration, } }, } @@ -475,8 +473,8 @@ func Truncate(x any, precision int) (decimal.Decimal, error) { return n.Truncate(int32(precision)), nil } -// Duration parses a duration string into a time.Duration -func Duration(x string) (time.Duration, error) { +// ParseDuration parses a duration string into a time.ParseDuration +func ParseDuration(x string) (time.Duration, error) { return time.ParseDuration(x) } @@ -567,13 +565,9 @@ func (p *Plugin) ProcessCalculatedStreams(outcome *Outcome) { continue } - // TODO: we can potentially cache the opts for each channel definition - // and avoid unmarshalling the options on outcome. - // for now keep it simple as this will require invalidating on - // channel definitions updates. - copt := opts{} - if err := json.Unmarshal(cd.Opts, &copt); err != nil { - p.Logger.Errorw("failed to unmarshal channel definition options", "channelID", cid, "error", err) + copt, getErr := GetOpts[calculatedStreamOpts](p.OptsCache, cid) + if getErr != nil { + p.Logger.Errorw("channel opts not in cache", "channelID", cid, "error", getErr) env.release() continue } @@ -604,7 +598,7 @@ func (p *Plugin) ProcessCalculatedStreams(outcome *Outcome) { } } -func (p *Plugin) evalExpression(o *opts, cid llotypes.ChannelID, env environment, outcome *Outcome) error { +func (p *Plugin) evalExpression(o *calculatedStreamOpts, cid llotypes.ChannelID, env environment, outcome *Outcome) error { for _, abi := range o.ABI { if abi.ExpressionStreamID == 0 { return fmt.Errorf("expression stream ID is 0, channelID: %d, expression: %s", @@ -638,7 +632,9 @@ func (p *Plugin) evalExpression(o *opts, cid llotypes.ChannelID, env environment return nil } -type opts struct { +// calculatedStreamOpts is the options structure for expression/calculated streams. +// It is used with OptsCache for decoding channel opts in ProcessCalculatedStreams. +type calculatedStreamOpts struct { ABI []struct { Type string `json:"type"` Expression string `json:"expression"` @@ -713,7 +709,7 @@ func (p *Plugin) ProcessCalculatedStreamsDryRun(expression string) error { } // Process the calculated streams - o := &opts{ + o := &calculatedStreamOpts{ ABI: []struct { Type string `json:"type"` Expression string `json:"expression"` diff --git a/llo/stream_calculated_test.go b/llo/stream_calculated_test.go index b784359..f8ca1a3 100644 --- a/llo/stream_calculated_test.go +++ b/llo/stream_calculated_test.go @@ -1730,7 +1730,10 @@ func TestProcessStreamCalculated(t *testing.T) { t.Run(tt.name, func(t *testing.T) { lggr, err := logger.New() require.NoError(t, err) - p := &Plugin{Logger: lggr} + p := &Plugin{Logger: lggr, OptsCache: NewOptsCache()} + for cid, cd := range tt.outcome.ChannelDefinitions { + p.OptsCache.Set(cid, cd.Opts) + } p.ProcessCalculatedStreams(&tt.outcome) for streamID, expectedValue := range tt.expectedValues { @@ -1767,7 +1770,10 @@ func BenchmarkProcessCalculatedStreams(b *testing.B) { StreamAggregates: aggr, } - p := &Plugin{Logger: logger.Nop()} + p := &Plugin{Logger: logger.Nop(), OptsCache: NewOptsCache()} + for cid, cd := range outcome.ChannelDefinitions { + p.OptsCache.Set(cid, cd.Opts) + } for i := 0; i < b.N; i++ { p.ProcessCalculatedStreams(&outcome) diff --git a/llo/testdata/outcome_serialization/from_full.bin b/llo/testdata/outcome_serialization/from_full.bin index ef4ab8e..cb3de2d 100644 --- a/llo/testdata/outcome_serialization/from_full.bin +++ b/llo/testdata/outcome_serialization/from_full.bin @@ -1,3 +1,3 @@ -productionêÁ«Â(# {"foo":"bar"}  {"baz":"qux"}(é"€ä—Ð"ê­Àå$ \ No newline at end of file +productionêÁ«Â(%! {"foo":"bar"} 0  {"baz":"qux"}(é0"€ä—Ð"ê­Àå$ \ No newline at end of file diff --git a/llo/testdata/outcome_serialization/full.bin b/llo/testdata/outcome_serialization/full.bin index dddaaea..658b0fe 100644 Binary files a/llo/testdata/outcome_serialization/full.bin and b/llo/testdata/outcome_serialization/full.bin differ diff --git a/llo/types.go b/llo/types.go index 4a107dc..cd3c3db 100644 --- a/llo/types.go +++ b/llo/types.go @@ -1,7 +1,9 @@ package llo import ( + "errors" "fmt" + "time" "github.com/goccy/go-json" @@ -11,6 +13,11 @@ import ( llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" ) +const ( + // DefaultMaxReportRange is the default maximum range of the report if unset in the opts. + DefaultMaxReportRange = Duration(5 * time.Minute) +) + type ObservationCodec interface { Encode(obs Observation) (types.Observation, error) Decode(encoded types.Observation) (obs Observation, err error) @@ -24,8 +31,9 @@ type OutcomeCodec interface { type ReportCodec interface { // Encode may be lossy, so no Decode function is expected // Encode should handle nil stream aggregate values without panicking (it - // may return error instead) - Encode(Report, llotypes.ChannelDefinition) ([]byte, error) + // may return error instead). + // Codecs may use GetOpts(optsCache, report.ChannelID) to get cached parsed opts. + Encode(Report, llotypes.ChannelDefinition, *OptsCache) ([]byte, error) // Verify may optionally verify a channel definition to ensure it is valid // for the given report codec. If a codec does not wish to implement // validation it may simply return nil here. If any definition fails @@ -129,3 +137,34 @@ func ScaleSeconds(seconds uint32, resolution TimeResolution) uint64 { return uint64(seconds) } } + +type Duration time.Duration + +func (d Duration) String() string { + return time.Duration(d).String() +} + +func (d Duration) MarshalJSON() ([]byte, error) { + return json.Marshal(time.Duration(d).String()) +} + +func (d *Duration) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case float64: + *d = Duration(time.Duration(value)) + return nil + case string: + tmp, err := time.ParseDuration(value) + if err != nil { + return err + } + *d = Duration(tmp) + return nil + default: + return errors.New("invalid duration") + } +} diff --git a/llo/types_test.go b/llo/types_test.go new file mode 100644 index 0000000..9a39cd5 --- /dev/null +++ b/llo/types_test.go @@ -0,0 +1,243 @@ +package llo + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDuration_String(t *testing.T) { + tests := []struct { + name string + d Duration + expected string + }{ + {"zero", Duration(0), "0s"}, + {"one second", Duration(time.Second), "1s"}, + {"five minutes", Duration(5 * time.Minute), "5m0s"}, + {"complex", Duration(2*time.Hour + 30*time.Minute + 15*time.Second), "2h30m15s"}, + {"sub-second", Duration(500 * time.Millisecond), "500ms"}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, tc.d.String()) + }) + } +} + +func TestDuration_MarshalJSON(t *testing.T) { + tests := []struct { + name string + d Duration + expected string + }{ + {"zero", Duration(0), `"0s"`}, + {"one second", Duration(time.Second), `"1s"`}, + {"five minutes", Duration(5 * time.Minute), `"5m0s"`}, + {"negative", Duration(-3 * time.Second), `"-3s"`}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + b, err := tc.d.MarshalJSON() + require.NoError(t, err) + assert.Equal(t, tc.expected, string(b)) + }) + } +} + +func TestDuration_UnmarshalJSON(t *testing.T) { + t.Run("string values", func(t *testing.T) { + tests := []struct { + name string + input string + expected Duration + }{ + {"seconds", `"5s"`, Duration(5 * time.Second)}, + {"minutes", `"10m"`, Duration(10 * time.Minute)}, + {"complex", `"1h30m"`, Duration(time.Hour + 30*time.Minute)}, + {"milliseconds", `"250ms"`, Duration(250 * time.Millisecond)}, + {"negative", `"-2s"`, Duration(-2 * time.Second)}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var d Duration + err := d.UnmarshalJSON([]byte(tc.input)) + require.NoError(t, err) + assert.Equal(t, tc.expected, d) + }) + } + }) + + t.Run("numeric values (nanoseconds)", func(t *testing.T) { + tests := []struct { + name string + input string + expected Duration + }{ + {"zero", `0`, Duration(0)}, + {"one billion (1s)", `1000000000`, Duration(time.Second)}, + {"fractional", `1500000000.0`, Duration(1500 * time.Millisecond)}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var d Duration + err := d.UnmarshalJSON([]byte(tc.input)) + require.NoError(t, err) + assert.Equal(t, tc.expected, d) + }) + } + }) + + t.Run("invalid input", func(t *testing.T) { + tests := []struct { + name string + input string + }{ + {"invalid JSON", `{`}, + {"invalid duration string", `"notaduration"`}, + {"boolean", `true`}, + {"null", `null`}, + {"array", `[1,2]`}, + {"object", `{"key":"val"}`}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var d Duration + err := d.UnmarshalJSON([]byte(tc.input)) + assert.Error(t, err) + }) + } + }) + + t.Run("roundtrip", func(t *testing.T) { + original := Duration(5*time.Minute + 30*time.Second) + b, err := original.MarshalJSON() + require.NoError(t, err) + + var decoded Duration + err = decoded.UnmarshalJSON(b) + require.NoError(t, err) + assert.Equal(t, original, decoded) + }) +} + +func TestTimeResolution_MarshalJSON(t *testing.T) { + tests := []struct { + name string + tp TimeResolution + expected string + }{ + {"seconds", ResolutionSeconds, `"s"`}, + {"milliseconds", ResolutionMilliseconds, `"ms"`}, + {"microseconds", ResolutionMicroseconds, `"us"`}, + {"nanoseconds", ResolutionNanoseconds, `"ns"`}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + b, err := tc.tp.MarshalJSON() + require.NoError(t, err) + assert.Equal(t, tc.expected, string(b)) + }) + } + + t.Run("invalid resolution returns error", func(t *testing.T) { + tp := TimeResolution(255) + _, err := tp.MarshalJSON() + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid timestamp resolution") + }) +} + +func TestTimeResolution_UnmarshalJSON(t *testing.T) { + tests := []struct { + name string + input string + expected TimeResolution + }{ + {"seconds", `"s"`, ResolutionSeconds}, + {"milliseconds", `"ms"`, ResolutionMilliseconds}, + {"microseconds", `"us"`, ResolutionMicroseconds}, + {"nanoseconds", `"ns"`, ResolutionNanoseconds}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var tp TimeResolution + err := tp.UnmarshalJSON([]byte(tc.input)) + require.NoError(t, err) + assert.Equal(t, tc.expected, tp) + }) + } + + t.Run("invalid string", func(t *testing.T) { + var tp TimeResolution + err := tp.UnmarshalJSON([]byte(`"hours"`)) + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid timestamp resolution") + }) + + t.Run("invalid JSON", func(t *testing.T) { + var tp TimeResolution + err := tp.UnmarshalJSON([]byte(`123`)) + require.Error(t, err) + }) + + t.Run("roundtrip", func(t *testing.T) { + for _, res := range []TimeResolution{ResolutionSeconds, ResolutionMilliseconds, ResolutionMicroseconds, ResolutionNanoseconds} { + b, err := res.MarshalJSON() + require.NoError(t, err) + + var decoded TimeResolution + err = decoded.UnmarshalJSON(b) + require.NoError(t, err) + assert.Equal(t, res, decoded) + } + }) +} + +func TestConvertTimestamp(t *testing.T) { + const tsNanos uint64 = 1_700_000_000_000_000_000 // 1.7e18 ns + + tests := []struct { + name string + nanos uint64 + resolution TimeResolution + expected uint64 + }{ + {"to seconds", tsNanos, ResolutionSeconds, tsNanos / 1e9}, + {"to milliseconds", tsNanos, ResolutionMilliseconds, tsNanos / 1e6}, + {"to microseconds", tsNanos, ResolutionMicroseconds, tsNanos / 1e3}, + {"to nanoseconds", tsNanos, ResolutionNanoseconds, tsNanos}, + {"zero value", 0, ResolutionSeconds, 0}, + {"unknown resolution falls back to nanos", tsNanos, TimeResolution(255), tsNanos}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, ConvertTimestamp(tc.nanos, tc.resolution)) + }) + } +} + +func TestScaleSeconds(t *testing.T) { + const secs uint32 = 3600 // 1 hour + + tests := []struct { + name string + seconds uint32 + resolution TimeResolution + expected uint64 + }{ + {"to seconds", secs, ResolutionSeconds, 3600}, + {"to milliseconds", secs, ResolutionMilliseconds, 3_600_000}, + {"to microseconds", secs, ResolutionMicroseconds, 3_600_000_000}, + {"to nanoseconds", secs, ResolutionNanoseconds, 3_600_000_000_000}, + {"zero value", 0, ResolutionNanoseconds, 0}, + {"unknown resolution falls back to seconds", secs, TimeResolution(255), 3600}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, ScaleSeconds(tc.seconds, tc.resolution)) + }) + } +}