Skip to content

Commit 8e06265

Browse files
authored
feat: enable EIM host labels propagation to downstream nodes. (#59)
1 parent d495de5 commit 8e06265

File tree

13 files changed

+767
-21
lines changed

13 files changed

+767
-21
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.1.1
1+
2.1.2

cmd/cluster-manager/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func main() {
6161
os.Exit(4)
6262
}
6363

64-
inv, err := rest.GetInventory(config)
64+
inv, err := rest.GetInventory(config, k8sclient)
6565
if err != nil {
6666
slog.Error("failed to start inventory client", "error", err)
6767
os.Exit(7)

deployment/charts/cluster-manager/Chart.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ type: application
1616
# This is the chart version. This version number should be incremented each time you make changes
1717
# to the chart and its templates, including the app version.
1818
# Versions are expected to follow Semantic Versioning (https://semver.org/)
19-
version: 2.1.1
20-
appVersion: 2.1.1
19+
version: 2.1.2
20+
appVersion: 2.1.2
2121
annotations: {}

deployment/charts/cluster-template-crd/Chart.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ apiVersion: v2
66
name: cluster-template-crd
77
description: A Helm chart for the ClusterTemplate CRD
88
type: application
9-
version: 2.1.1
10-
appVersion: 2.1.1
9+
version: 2.1.2
10+
appVersion: 2.1.2
1111
annotations: {}

internal/events/events.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// SPDX-FileCopyrightText: (C) 2025 Intel Corporation
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// Package events provides a framework for defining, handling, and processing asynchronous events.
5+
// It defines an Event interface for event handling, a base implementation for common functionality,
6+
// and utilities for event processing with context-aware cancellation and timeout support.
7+
// The package is designed to facilitate event-driven architectures by enabling decoupled event
8+
// processing and result reporting via channels.
9+
package events
10+
11+
import (
12+
"context"
13+
"log/slog"
14+
"time"
15+
)
16+
17+
// EventTimeout defines the default timeout for event handling responses
18+
const EventTimeout = 3 * time.Second
19+
20+
// Event is an interface that defines methods to handle events
21+
type Event interface {
22+
// Handle processes the event and returns any error
23+
Handle(ctx context.Context) error
24+
25+
// Output returns a channel for sending results back to the caller
26+
Output() chan<- error
27+
}
28+
29+
// EventBase provides common functionality for events
30+
type EventBase struct {
31+
Out chan<- error // channel to send error back to the caller
32+
}
33+
34+
// Output returns the output channel for the event
35+
func (e EventBase) Output() chan<- error {
36+
return e.Out
37+
}
38+
39+
// NewSink creates a channel to receive events and starts a goroutine to process them
40+
func NewSink(ctx context.Context) chan<- Event {
41+
events := make(chan Event)
42+
43+
go processEvents(ctx, events)
44+
45+
return events
46+
}
47+
48+
// processEvents handles incoming events from the provided channel
49+
func processEvents(ctx context.Context, events <-chan Event) {
50+
slog.Debug("event sink started")
51+
52+
for {
53+
select {
54+
case <-ctx.Done():
55+
slog.Debug("event sink shutting down due to context cancellation")
56+
return
57+
case e, ok := <-events:
58+
if !ok {
59+
slog.Debug("event sink closed")
60+
return
61+
}
62+
63+
handleEvent(ctx, e)
64+
}
65+
}
66+
}
67+
68+
// handleEvent processes a single event and sends the result to the output channel
69+
func handleEvent(ctx context.Context, e Event) {
70+
err := e.Handle(ctx)
71+
if err != nil {
72+
slog.Error("failed to handle event", "event", e, "error", err)
73+
}
74+
75+
out := e.Output()
76+
if out == nil {
77+
return
78+
}
79+
80+
// Send result with timeout
81+
sendCtx, cancel := context.WithTimeout(ctx, EventTimeout)
82+
defer cancel()
83+
84+
select {
85+
case out <- err:
86+
// Successfully sent result
87+
case <-sendCtx.Done():
88+
slog.Error("event output channel timed out", "event", e, "timeout", EventTimeout)
89+
}
90+
}

internal/events/events_test.go

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
// SPDX-FileCopyrightText: (C) 2025 Intel Corporation
2+
// SPDX-License-Identifier: Apache-2.0
3+
package events_test
4+
5+
import (
6+
"context"
7+
"errors"
8+
"log/slog"
9+
"sync"
10+
"testing"
11+
"time"
12+
13+
"github.com/stretchr/testify/require"
14+
15+
"github.com/open-edge-platform/cluster-manager/v2/internal/events"
16+
)
17+
18+
const paralellism = 10
19+
20+
// DummyEvent is a dummy implementation of the Event interface
21+
type DummyEvent struct {
22+
events.EventBase
23+
ID int // ID of the event
24+
}
25+
26+
// Handle processes the dummy event
27+
func (e DummyEvent) Handle(ctx context.Context) error {
28+
// Simulate some processing
29+
slog.Debug("handling dummy event", "ID", e.ID)
30+
time.Sleep(10 * time.Millisecond)
31+
32+
// Return nil to indicate success
33+
return nil
34+
}
35+
36+
// SlowEvent is a test event that takes a specified time to process
37+
type SlowEvent struct {
38+
events.EventBase
39+
ProcessTime time.Duration
40+
}
41+
42+
func (e SlowEvent) Handle(ctx context.Context) error {
43+
time.Sleep(e.ProcessTime)
44+
return nil
45+
}
46+
47+
// ErrorEvent is a test event that always returns a specific error
48+
type ErrorEvent struct {
49+
events.EventBase
50+
ErrorToReturn error
51+
}
52+
53+
func (e ErrorEvent) Handle(ctx context.Context) error {
54+
return e.ErrorToReturn
55+
}
56+
57+
func TestConcurrentEventHandling(t *testing.T) {
58+
ctx := context.Background()
59+
sink := events.NewSink(ctx)
60+
61+
// Create output channels to track results
62+
outputs := make([]chan error, paralellism)
63+
for i := range outputs {
64+
outputs[i] = make(chan error, 1)
65+
}
66+
67+
// Launch many events concurrently
68+
var wg sync.WaitGroup
69+
for i := range paralellism {
70+
wg.Add(1)
71+
go func() {
72+
defer wg.Done()
73+
event := DummyEvent{EventBase: events.EventBase{Out: outputs[i]}, ID: i}
74+
sink <- event
75+
}()
76+
}
77+
78+
// Wait for all events to be sent
79+
wg.Wait()
80+
81+
// Verify all events were processed
82+
for i, out := range outputs {
83+
select {
84+
case err := <-out:
85+
require.NoError(t, err, "Event %d failed", i)
86+
case <-time.After(100 * time.Millisecond):
87+
t.Errorf("Timeout waiting for event %d", i)
88+
}
89+
}
90+
91+
close(sink)
92+
}
93+
94+
func TestNilOutputChannel(t *testing.T) {
95+
ctx := context.Background()
96+
sink := events.NewSink(ctx)
97+
98+
// Send an event with a nil output channel
99+
event := DummyEvent{
100+
EventBase: events.EventBase{Out: nil},
101+
ID: 42,
102+
}
103+
104+
// This should not panic
105+
sink <- event
106+
107+
// Allow some time for processing
108+
time.Sleep(50 * time.Millisecond)
109+
110+
close(sink)
111+
// Test passes if no panic occurs
112+
}
113+
114+
func TestContextCancellation(t *testing.T) {
115+
// Create a context that we can cancel
116+
ctx, cancel := context.WithCancel(context.Background())
117+
sink := events.NewSink(ctx)
118+
119+
// Send a couple of events before cancellation
120+
outputs := make([]chan error, 2)
121+
for i := range outputs {
122+
outputs[i] = make(chan error, 1)
123+
event := DummyEvent{
124+
EventBase: events.EventBase{Out: outputs[i]},
125+
ID: i,
126+
}
127+
sink <- event
128+
}
129+
130+
// Verify these events are processed normally
131+
for i, out := range outputs {
132+
select {
133+
case err := <-out:
134+
require.NoError(t, err, "Event %d failed", i)
135+
case <-time.After(100 * time.Millisecond):
136+
t.Fatalf("Timeout waiting for event %d", i)
137+
}
138+
}
139+
140+
// Cancel the context
141+
cancel()
142+
143+
// Try to send one more event after cancellation
144+
outputAfterCancel := make(chan error, 1)
145+
afterCancelEvent := DummyEvent{
146+
EventBase: events.EventBase{Out: outputAfterCancel},
147+
ID: 999,
148+
}
149+
150+
// The event system should time out
151+
select {
152+
case sink <- afterCancelEvent:
153+
t.Fatal("Expected send to block after context cancellation")
154+
case <-time.After(100 * time.Millisecond):
155+
// This is expected - the event should not be processed
156+
}
157+
}
158+
159+
func TestMultipleSinks(t *testing.T) {
160+
// Create multiple sinks
161+
ctx := context.Background()
162+
sink1 := events.NewSink(ctx)
163+
sink2 := events.NewSink(ctx)
164+
165+
// Send events to both sinks
166+
out1 := make(chan error, 1)
167+
out2 := make(chan error, 1)
168+
169+
sink1 <- DummyEvent{EventBase: events.EventBase{Out: out1}, ID: 1}
170+
sink2 <- DummyEvent{EventBase: events.EventBase{Out: out2}, ID: 2}
171+
172+
// Verify both processed their events
173+
for i, out := range []chan error{out1, out2} {
174+
select {
175+
case err := <-out:
176+
require.NoError(t, err, "Sink %d event failed", i+1)
177+
case <-time.After(100 * time.Millisecond):
178+
t.Errorf("Timeout waiting for sink %d event", i+1)
179+
}
180+
}
181+
182+
close(sink1)
183+
close(sink2)
184+
}
185+
186+
func TestErrorPropagation(t *testing.T) {
187+
ctx := context.Background()
188+
sink := events.NewSink(ctx)
189+
190+
// Create an event that will return an error
191+
output := make(chan error, 1)
192+
event := ErrorEvent{
193+
EventBase: events.EventBase{Out: output},
194+
ErrorToReturn: errors.New("simulated error"),
195+
}
196+
197+
// Send the event
198+
sink <- event
199+
200+
// Verify the error is correctly propagated
201+
select {
202+
case err := <-output:
203+
require.Error(t, err)
204+
require.Equal(t, "simulated error", err.Error())
205+
case <-time.After(100 * time.Millisecond):
206+
t.Fatal("Timeout waiting for error to be propagated")
207+
}
208+
209+
close(sink)
210+
}
211+
212+
func TestGracefulShutdown(t *testing.T) {
213+
ctx := context.Background()
214+
sink := events.NewSink(ctx)
215+
216+
// Create an event that will take some time to process
217+
output := make(chan error, 1)
218+
event := SlowEvent{
219+
EventBase: events.EventBase{Out: output},
220+
ProcessTime: 200 * time.Millisecond,
221+
}
222+
223+
// Send the event
224+
sink <- event
225+
226+
// Wait a bit for processing to start
227+
time.Sleep(50 * time.Millisecond)
228+
229+
// Close the sink while event is processing
230+
close(sink)
231+
232+
// Verify the event still completes
233+
select {
234+
case err := <-output:
235+
require.NoError(t, err, "Event failed")
236+
case <-time.After(300 * time.Millisecond):
237+
t.Fatal("Timeout waiting for event to complete after sink closed")
238+
}
239+
}

0 commit comments

Comments
 (0)