From a359f8d321397cd0067f8533745f2fba2910538f Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 25 Mar 2025 15:45:00 +0000 Subject: [PATCH 1/6] upgrade-lint Signed-off-by: Doug Davis Signed-off-by: flc1125 --- .github/workflows/go-lint.yaml | 5 ++--- .golangci.yaml | 1 + v2/binding/buffering/copy_message.go | 7 ++++--- v2/binding/test/transformer.go | 8 +++++--- v2/event/extensions.go | 2 +- v2/protocol/http/message_test.go | 5 +++-- v2/protocol/http/protocol.go | 2 +- v2/protocol/http/utility_test.go | 5 +++-- 8 files changed, 20 insertions(+), 15 deletions(-) diff --git a/.github/workflows/go-lint.yaml b/.github/workflows/go-lint.yaml index 9e5b698d9..235a1726f 100644 --- a/.github/workflows/go-lint.yaml +++ b/.github/workflows/go-lint.yaml @@ -33,8 +33,7 @@ jobs: - name: Go Lint on ./v2 if: steps.golangci_configuration.outputs.files_exists == 'true' - uses: golangci/golangci-lint-action@55c2c1448f86e01eaae002a5a3a9624417608d84 # v6.5.2 + uses: golangci/golangci-lint-action@1481404843c368bc19ca9406f87d6e0fc97bdcfd # v7.0.0 with: - version: v1.61 + version: v2.0 working-directory: v2 - diff --git a/.golangci.yaml b/.golangci.yaml index 4ae1bfc86..d191c77aa 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -1,3 +1,4 @@ +version: "2" run: timeout: 5m diff --git a/v2/binding/buffering/copy_message.go b/v2/binding/buffering/copy_message.go index c704bb2aa..90dc29ad6 100644 --- a/v2/binding/buffering/copy_message.go +++ b/v2/binding/buffering/copy_message.go @@ -44,11 +44,12 @@ func CopyMessage(ctx context.Context, m binding.Message, transformers ...binding bm := binaryBufferedMessage{} encoding, err := binding.DirectWrite(ctx, m, &sm, &bm, transformers...) - if encoding == binding.EncodingStructured { + switch encoding { + case binding.EncodingStructured: return &sm, err - } else if encoding == binding.EncodingBinary { + case binding.EncodingBinary: return &bm, err - } else { + default: e, err := binding.ToEvent(ctx, m, transformers...) if err != nil { return nil, err diff --git a/v2/binding/test/transformer.go b/v2/binding/test/transformer.go index 325b114a4..13b0aaa70 100644 --- a/v2/binding/test/transformer.go +++ b/v2/binding/test/transformer.go @@ -45,15 +45,17 @@ func RunTransformerTests(t *testing.T, ctx context.Context, tests []TransformerT require.NoError(t, err) var e *event.Event - if enc == binding.EncodingStructured { + switch enc { + case binding.EncodingStructured: e, err = binding.ToEvent(ctx, &mockStructured) require.NoError(t, err) - } else if enc == binding.EncodingBinary { + case binding.EncodingBinary: e, err = binding.ToEvent(ctx, &mockBinary) require.NoError(t, err) - } else { + default: t.Fatalf("Unexpected encoding %v", enc) } + require.NoError(t, err) if tt.AssertFunc != nil { tt.AssertFunc(t, *e) diff --git a/v2/event/extensions.go b/v2/event/extensions.go index 72d0e757a..87e5de572 100644 --- a/v2/event/extensions.go +++ b/v2/event/extensions.go @@ -49,7 +49,7 @@ func validateExtensionName(key string) error { } for _, c := range key { - if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9')) { + if (c < 'a' || c > 'z') && (c < 'A' || c > 'Z') && (c < '0' || c > '9') { return errors.New("bad key, CloudEvents attribute names MUST consist of lower-case letters ('a' to 'z'), upper-case letters ('A' to 'Z') or digits ('0' to '9') from the ASCII character set") } } diff --git a/v2/protocol/http/message_test.go b/v2/protocol/http/message_test.go index 2833adeee..5d4aa211b 100644 --- a/v2/protocol/http/message_test.go +++ b/v2/protocol/http/message_test.go @@ -39,9 +39,10 @@ func TestNewMessageFromHttpRequest(t *testing.T) { test.EachEvent(t, test.Events(), func(t *testing.T, eventIn event.Event) { t.Run(tt.name, func(t *testing.T) { ctx := context.TODO() - if tt.encoding == binding.EncodingStructured { + switch tt.encoding { + case binding.EncodingStructured: ctx = binding.WithForceStructured(ctx) - } else if tt.encoding == binding.EncodingBinary { + case binding.EncodingBinary: ctx = binding.WithForceBinary(ctx) } diff --git a/v2/protocol/http/protocol.go b/v2/protocol/http/protocol.go index 18bd604a6..ed76c7dc5 100644 --- a/v2/protocol/http/protocol.go +++ b/v2/protocol/http/protocol.go @@ -326,7 +326,7 @@ func (p *Protocol) ServeHTTP(rw http.ResponseWriter, req *http.Request) { if !ok { rw.Header().Add("Retry-After", strconv.Itoa(int(reset))) - http.Error(rw, "limit exceeded", 429) + http.Error(rw, "limit exceeded", http.StatusTooManyRequests) return } diff --git a/v2/protocol/http/utility_test.go b/v2/protocol/http/utility_test.go index 39a50a662..de4313251 100644 --- a/v2/protocol/http/utility_test.go +++ b/v2/protocol/http/utility_test.go @@ -38,9 +38,10 @@ func TestNewEventFromHttpRequest(t *testing.T) { test.EachEvent(t, test.Events(), func(t *testing.T, eventIn event.Event) { t.Run(tt.name, func(t *testing.T) { ctx := context.TODO() - if tt.encoding == binding.EncodingStructured { + switch tt.encoding { + case binding.EncodingStructured: ctx = binding.WithForceStructured(ctx) - } else if tt.encoding == binding.EncodingBinary { + case binding.EncodingBinary: ctx = binding.WithForceBinary(ctx) } From 77253b19598490cfe6598cf854dcd25d7e6d9863 Mon Sep 17 00:00:00 2001 From: flc1125 Date: Fri, 11 Apr 2025 17:10:49 +0800 Subject: [PATCH 2/6] feat(client): add parallel execution control for receiver callbacks Signed-off-by: flc1125 --- v2/client/client.go | 18 +++++++++++++++--- v2/client/options.go | 14 +++++++++++--- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/v2/client/client.go b/v2/client/client.go index 80051b95c..c2311a904 100644 --- a/v2/client/client.go +++ b/v2/client/client.go @@ -100,7 +100,7 @@ type ceClient struct { receiverMu sync.Mutex eventDefaulterFns []EventDefaulter pollGoroutines int - blockingCallback bool + parallelGoroutines int ackMalformedEvent bool } @@ -238,6 +238,10 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { wg.Add(1) go func() { defer wg.Done() + var parallel chan struct{} + if c.parallelGoroutines > 0 { + parallel = make(chan struct{}, c.parallelGoroutines) + } for { var msg binding.Message var respFn protocol.ResponseFn @@ -265,8 +269,16 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { } } - if c.blockingCallback { - callback() + if parallel != nil { + wg.Add(1) + parallel <- struct{}{} + go func() { + defer func() { + <-parallel + wg.Done() + }() + callback() + }() } else { // Do not block on the invoker. wg.Add(1) diff --git a/v2/client/options.go b/v2/client/options.go index 44394be34..06e580af9 100644 --- a/v2/client/options.go +++ b/v2/client/options.go @@ -119,15 +119,23 @@ func WithInboundContextDecorator(dec func(context.Context, binding.Message) cont // i.e. in each poll go routine, the next event will not be received until the callback on current event completes. // To make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1) func WithBlockingCallback() Option { + return WithParallelGoroutines(1) +} + +// WithParallelGoroutines enables the callback function of the passed-in StartReceiver to execute asynchronously +// with a fixed number of goroutines. +func WithParallelGoroutines(num int) Option { return func(i interface{}) error { - if c, ok := i.(*ceClient); ok { - c.blockingCallback = true + if num > 0 { + if c, ok := i.(*ceClient); ok { + c.parallelGoroutines = num + } } return nil } } -// WithAckMalformedevents causes malformed events received within StartReceiver to be acknowledged +// WithAckMalformedEvent causes malformed events received within StartReceiver to be acknowledged // rather than being permanently not-acknowledged. This can be useful when a protocol does not // provide a responder implementation and would otherwise cause the receiver to be partially or // fully stuck. From f4a396f57a4b8694e245587a15d6ccad33e6aa4e Mon Sep 17 00:00:00 2001 From: flc1125 Date: Fri, 11 Apr 2025 17:17:00 +0800 Subject: [PATCH 3/6] feat(client): add parallel execution control for receiver callbacks Signed-off-by: flc1125 --- v2/client/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/client/options.go b/v2/client/options.go index 06e580af9..608d78d6b 100644 --- a/v2/client/options.go +++ b/v2/client/options.go @@ -135,7 +135,7 @@ func WithParallelGoroutines(num int) Option { } } -// WithAckMalformedEvent causes malformed events received within StartReceiver to be acknowledged +// WithAckMalformedEvents causes malformed events received within StartReceiver to be acknowledged // rather than being permanently not-acknowledged. This can be useful when a protocol does not // provide a responder implementation and would otherwise cause the receiver to be partially or // fully stuck. From a9877b8363020a43984caf10f8a9e98b2052d908 Mon Sep 17 00:00:00 2001 From: flc1125 Date: Fri, 11 Apr 2025 17:17:23 +0800 Subject: [PATCH 4/6] feat(client): add parallel execution control for receiver callbacks Signed-off-by: flc1125 --- v2/client/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/client/options.go b/v2/client/options.go index 608d78d6b..d1dd35727 100644 --- a/v2/client/options.go +++ b/v2/client/options.go @@ -135,7 +135,7 @@ func WithParallelGoroutines(num int) Option { } } -// WithAckMalformedEvents causes malformed events received within StartReceiver to be acknowledged +// WithAckMalformedevents causes malformed events received within StartReceiver to be acknowledged // rather than being permanently not-acknowledged. This can be useful when a protocol does not // provide a responder implementation and would otherwise cause the receiver to be partially or // fully stuck. From 89496ded9868228aa6341afc9d15eed0790daf56 Mon Sep 17 00:00:00 2001 From: flc1125 Date: Fri, 11 Apr 2025 22:39:02 +0800 Subject: [PATCH 5/6] fix(options): validate number of parallel goroutines in WithParallelGoroutines Signed-off-by: Flc --- v2/client/options.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/v2/client/options.go b/v2/client/options.go index d1dd35727..4f578418f 100644 --- a/v2/client/options.go +++ b/v2/client/options.go @@ -7,6 +7,7 @@ package client import ( "context" + "errors" "fmt" "github.com/cloudevents/sdk-go/v2/binding" @@ -126,11 +127,14 @@ func WithBlockingCallback() Option { // with a fixed number of goroutines. func WithParallelGoroutines(num int) Option { return func(i interface{}) error { - if num > 0 { - if c, ok := i.(*ceClient); ok { - c.parallelGoroutines = num - } + if num <= 0 { + return errors.New("number of parallel goroutines must be greater than 0") } + + if c, ok := i.(*ceClient); ok { + c.parallelGoroutines = num + } + return nil } } From 0c61762376f58bd337ec520cb87a6e24867af5c1 Mon Sep 17 00:00:00 2001 From: Flc Date: Mon, 14 Apr 2025 21:24:32 +0800 Subject: [PATCH 6/6] revert: restore blocking configuration and functionality Signed-off-by: Flc --- v2/client/client.go | 24 +++++++++++++++++------- v2/client/options.go | 8 +++++++- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/v2/client/client.go b/v2/client/client.go index c2311a904..5a7e3380b 100644 --- a/v2/client/client.go +++ b/v2/client/client.go @@ -100,6 +100,7 @@ type ceClient struct { receiverMu sync.Mutex eventDefaulterFns []EventDefaulter pollGoroutines int + blockingCallback bool parallelGoroutines int ackMalformedEvent bool } @@ -269,6 +270,14 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { } } + // if the blockingCallback option is set, we need to wait for the callback to finish + if c.blockingCallback { + // Wait for the callback to finish before receiving the next message. + callback() + continue + } + + // if the parallelGoroutines option is set, we need to limit the number of goroutines if parallel != nil { wg.Add(1) parallel <- struct{}{} @@ -279,14 +288,15 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { }() callback() }() - } else { - // Do not block on the invoker. - wg.Add(1) - go func() { - defer wg.Done() - callback() - }() + continue } + + // otherwise, we can just call the callback directly in a new goroutine + wg.Add(1) + go func() { + defer wg.Done() + callback() + }() } }() } diff --git a/v2/client/options.go b/v2/client/options.go index 4f578418f..37d46ce8f 100644 --- a/v2/client/options.go +++ b/v2/client/options.go @@ -120,11 +120,17 @@ func WithInboundContextDecorator(dec func(context.Context, binding.Message) cont // i.e. in each poll go routine, the next event will not be received until the callback on current event completes. // To make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1) func WithBlockingCallback() Option { - return WithParallelGoroutines(1) + return func(i interface{}) error { + if c, ok := i.(*ceClient); ok { + c.blockingCallback = true + } + return nil + } } // WithParallelGoroutines enables the callback function of the passed-in StartReceiver to execute asynchronously // with a fixed number of goroutines. +// WithBlockingCallback takes precedence over this configuration. func WithParallelGoroutines(num int) Option { return func(i interface{}) error { if num <= 0 {