Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.0
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260217160002-b56cb5356cc7
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260319155722-325b0156b188
github.com/smartcontractkit/chainlink-common/keystore v1.0.2
github.com/smartcontractkit/libocr v0.0.0-20260130195252-6e18e2a30acc
github.com/smartcontractkit/wsrpc v0.8.5-0.20250502134807-c57d3d995945
Expand Down Expand Up @@ -97,7 +97,7 @@ require (
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/smartcontractkit/chain-selectors v1.0.91 // indirect
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 // indirect
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f // indirect
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260226130359-963f935e0396 // indirect
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b // indirect
github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b // indirect
github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -567,14 +567,16 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartcontractkit/chain-selectors v1.0.91 h1:Aip7IZTv40RtbHgZ9mTjm5KyhYrpPefG7iVMzLZ27M4=
github.com/smartcontractkit/chain-selectors v1.0.91/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w=
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260217160002-b56cb5356cc7 h1:h5cmgzKpKn5N5ItpEDFhRcrtqs36nu9r/dciJub1hos=
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260217160002-b56cb5356cc7/go.mod h1:HXgSKzmZ/bhSx8nHU7hHW6dR+BHSXkdcpFv2T8qJcS8=
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260317141303-08016f5b4637 h1:kQjHx2gBmb0NY1scp7fE3dv/HcV3joT1xnMq0PMyNXk=
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260317141303-08016f5b4637/go.mod h1:0ghbAr7tRO0tT5ZqBXhOyzgUO37tNNe33Yn0hskauVM=
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260319155722-325b0156b188 h1:6n15Fng45mSa2OEqRAiFk4e/6O5mZpo15eXzgF5g5xA=
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260319155722-325b0156b188/go.mod h1:0ghbAr7tRO0tT5ZqBXhOyzgUO37tNNe33Yn0hskauVM=
github.com/smartcontractkit/chainlink-common/keystore v1.0.2 h1:AWisx4JT3QV8tcgh6J5NCrex+wAgTYpWyHsyNPSXzsQ=
github.com/smartcontractkit/chainlink-common/keystore v1.0.2/go.mod h1:rSkIHdomyak3YnUtXLenl6poIq8q0V3UZPiiyYqPdGA=
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg=
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY=
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f h1:MHlgzqiDPyDV397bZkzS9TtWXb3FR9Pb8FR9cP9h0As=
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8=
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260226130359-963f935e0396 h1:03tbcwjyIEjvHba1IWOj1sfThwebm2XNzyFHSuZtlWc=
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260226130359-963f935e0396/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8=
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM=
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY=
github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b h1:36knUpKHHAZ86K4FGWXtx8i/EQftGdk2bqCoEu/Cha8=
Expand Down
2 changes: 1 addition & 1 deletion llo/channel_definitions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type mockReportCodec struct {
err error
}

func (m mockReportCodec) Encode(Report, llotypes.ChannelDefinition) ([]byte, error) {
func (m mockReportCodec) Encode(Report, llotypes.ChannelDefinition, *OptsCache) ([]byte, error) {
return nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion llo/json_report_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var _ ReportCodec = JSONReportCodec{}

type JSONReportCodec struct{}

func (cdc JSONReportCodec) Encode(r Report, _ llotypes.ChannelDefinition) ([]byte, error) {
func (cdc JSONReportCodec) Encode(r Report, _ llotypes.ChannelDefinition, _ *OptsCache) ([]byte, error) {
type encode struct {
ConfigDigest types.ConfigDigest
SeqNr uint64
Expand Down
4 changes: 2 additions & 2 deletions llo/json_report_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func Test_JSONCodec_Properties(t *testing.T) {

properties.Property("Encode/Decode", prop.ForAll(
func(r Report) bool {
b, err := codec.Encode(r, cd)
b, err := codec.Encode(r, cd, nil)
require.NoError(t, err)
r2, err := codec.Decode(b)
require.NoError(t, err)
Expand Down Expand Up @@ -305,7 +305,7 @@ func Test_JSONCodec(t *testing.T) {

cdc := JSONReportCodec{}

encoded, err := cdc.Encode(r, llo.ChannelDefinition{})
encoded, err := cdc.Encode(r, llo.ChannelDefinition{}, nil)
require.NoError(t, err)

assert.Equal(t, `{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000","SeqNr":43,"ChannelID":46,"ValidAfterNanoseconds":44,"ObservationTimestampNanoseconds":45,"Values":[{"t":0,"v":"1"},{"t":0,"v":"2"},{"t":1,"v":"Q{Bid: 3.13, Benchmark: 4.4, Ask: 5.12}"}],"Specimen":true}`, string(encoded)) //nolint:testifylint // need to verify exact match including order for determinism
Expand Down
58 changes: 30 additions & 28 deletions llo/observation_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,29 +87,6 @@ func (c protoObservationCodec) Encode(obs Observation) (types.Observation, error
return b, nil
}

func channelDefinitionsToProtoObservation(in llotypes.ChannelDefinitions) (out map[uint32]*LLOChannelDefinitionProto) {
if len(in) > 0 {
out = make(map[uint32]*LLOChannelDefinitionProto, len(in))
for id, d := range in {
streams := make([]*LLOStreamDefinition, len(d.Streams))
for i, strm := range d.Streams {
streams[i] = &LLOStreamDefinition{
StreamID: strm.StreamID,
Aggregator: uint32(strm.Aggregator),
}
}
out[id] = &LLOChannelDefinitionProto{
ReportFormat: uint32(d.ReportFormat),
Streams: streams,
Opts: d.Opts,
Tombstone: d.Tombstone,
Source: d.Source,
}
}
}
return
}

func (c protoObservationCodec) Decode(b types.Observation) (Observation, error) {
var err error
if c.enableCompression {
Expand Down Expand Up @@ -187,12 +164,37 @@ func channelDefinitionsFromProtoObservation(channelDefinitions map[uint32]*LLOCh
}
}
dfns[id] = llotypes.ChannelDefinition{
ReportFormat: llotypes.ReportFormat(d.ReportFormat),
Streams: streams,
Opts: d.Opts,
Tombstone: d.Tombstone,
Source: d.Source,
ReportFormat: llotypes.ReportFormat(d.ReportFormat),
Streams: streams,
Opts: d.Opts,
Tombstone: d.Tombstone,
Source: d.Source,
DisableNilStreamValues: d.DisableNilStreamValues,
}
}
return dfns
}

func channelDefinitionsToProtoObservation(in llotypes.ChannelDefinitions) (out map[uint32]*LLOChannelDefinitionProto) {
if len(in) > 0 {
out = make(map[uint32]*LLOChannelDefinitionProto, len(in))
for id, d := range in {
streams := make([]*LLOStreamDefinition, len(d.Streams))
for i, strm := range d.Streams {
streams[i] = &LLOStreamDefinition{
StreamID: strm.StreamID,
Aggregator: uint32(strm.Aggregator),
}
}
out[id] = &LLOChannelDefinitionProto{
ReportFormat: uint32(d.ReportFormat),
Streams: streams,
Opts: d.Opts,
Tombstone: d.Tombstone,
Source: d.Source,
DisableNilStreamValues: d.DisableNilStreamValues,
}
}
}
return
}
32 changes: 32 additions & 0 deletions llo/observation_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,38 @@ func Test_protoObservationCodec(t *testing.T) {

assert.Equal(t, expectedObs, obs2)
})
t.Run("encode and decode preserves properties in channel definitions", func(t *testing.T) {
obs := Observation{
UnixTimestampNanoseconds: 1,
UpdateChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{
1: {
ReportFormat: llotypes.ReportFormatJSON,
Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}},
DisableNilStreamValues: true,
Opts: []byte(`{}`),
Tombstone: false,
Source: 1,
},
2: {
ReportFormat: llotypes.ReportFormatJSON,
Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorQuote}},
DisableNilStreamValues: false,
Opts: []byte(`{}`),
Tombstone: true,
Source: 2,
},
},
}
codec, err := NewProtoObservationCodec(logger.Nop(), true)
require.NoError(t, err)
obsBytes, err := codec.Encode(obs)
require.NoError(t, err)
obs2, err := codec.Decode(obsBytes)
require.NoError(t, err)
assert.Equal(t, obs.UpdateChannelDefinitions, obs2.UpdateChannelDefinitions)
assert.True(t, obs2.UpdateChannelDefinitions[1].DisableNilStreamValues)
assert.False(t, obs2.UpdateChannelDefinitions[2].DisableNilStreamValues)
})
t.Run("decoding with invalid data", func(t *testing.T) {
t.Run("not a protobuf", func(t *testing.T) {
codec, err := NewProtoObservationCodec(logger.Nop(), true)
Expand Down
121 changes: 121 additions & 0 deletions llo/opts_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package llo

import (
"bytes"
"fmt"
"reflect"
sync "sync"

"github.com/goccy/go-json"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
)

type optsCacheKey struct {
channelID llotypes.ChannelID
optsType reflect.Type
}

// OptsCache caches decoded channel definition options keyed by (ChannelID, target type).
// Raw opts bytes are stored via Set during channel definition changes in Outcome().
// Decoded values are produced lazily on the first GetOpts call for a given (channelID, type)
// and reused until the channel is updated or removed.
type OptsCache struct {
mu sync.Mutex
raw map[llotypes.ChannelID]llotypes.ChannelOpts
decoded map[optsCacheKey]any
}

func NewOptsCache() *OptsCache {
return &OptsCache{
raw: make(map[llotypes.ChannelID]llotypes.ChannelOpts),
decoded: make(map[optsCacheKey]any),
}
}

// Set stores the raw opts for a channel and invalidates any previously decoded
// values for that channel. It is a no-op when the raw bytes are identical to
// what is already stored.
func (c *OptsCache) Set(channelID llotypes.ChannelID, raw llotypes.ChannelOpts) {
c.mu.Lock()
defer c.mu.Unlock()

if existing, ok := c.raw[channelID]; ok && bytes.Equal(existing, raw) {
return
}
c.raw[channelID] = raw
for key := range c.decoded {
if key.channelID == channelID {
delete(c.decoded, key)
}
}
}

// Len returns the number of channels in the cache.
func (c *OptsCache) Len() int {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.raw)
}

// Remove removes all raw and decoded data for a channel.
func (c *OptsCache) Remove(channelID llotypes.ChannelID) {
c.mu.Lock()
defer c.mu.Unlock()

delete(c.raw, channelID)
for key := range c.decoded {
if key.channelID == channelID {
delete(c.decoded, key)
}
}
}

// ResetTo resets the cache to the given channel definitions.
func (c *OptsCache) ResetTo(channelDefinitions llotypes.ChannelDefinitions) {
c.mu.Lock()
defer c.mu.Unlock()
c.raw = make(map[llotypes.ChannelID]llotypes.ChannelOpts)
c.decoded = make(map[optsCacheKey]any)

for channelID, cd := range channelDefinitions {
c.raw[channelID] = cd.Opts
}
}

// GetOpts returns decoded channel opts of type T for the given channel.
// On the first call for a given (channelID, T) after Set, the raw bytes are
// decoded via json.Unmarshal and the result is cached. Subsequent calls return
// the cached value directly.
//
// Returns an error if the channel is not in the cache or decoding fails.
// The caller must pass a valid opts cache.
func GetOpts[T any](c *OptsCache, channelID llotypes.ChannelID) (T, error) {
var zero T
c.mu.Lock()
defer c.mu.Unlock()

key := optsCacheKey{
channelID: channelID,
optsType: reflect.TypeFor[T](),
}

if entry, ok := c.decoded[key]; ok {
return entry.(T), nil
}

raw, ok := c.raw[channelID]
if !ok {
return zero, fmt.Errorf("channel %d not in opts cache", channelID)
}

var result T
if len(raw) > 0 {
if err := json.Unmarshal(raw, &result); err != nil {
return zero, fmt.Errorf("failed to decode opts for channel %d: %w", channelID, err)
}
}

c.decoded[key] = result
return result, nil
}
Loading
Loading