-
Couldn't load subscription status.
- Fork 10
init kafka job dispatch #701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis PR introduces an event-driven job creation pattern by adding an EventProducer interface, threading it through workspace initialization and Kafka consumer processing, replacing the old Executor pattern with a new JobCreator, and updating the Store to track workspace IDs. In-memory event infrastructure is added for testing. Changes
Sequence Diagram(s)sequenceDiagram
participant KafkaConsumer as Kafka<br/>Consumer
participant EventHandler as Event<br/>Handler
participant JobCreator as Job<br/>Creator
participant Store as Store
participant EventProducer as Event<br/>Producer
KafkaConsumer->>EventHandler: ListenAndRoute(RawEvent)
activate EventHandler
EventHandler->>EventHandler: Match event type<br/>(JobCreate)
EventHandler->>JobCreator: HandleJobCreated(workspace, event)
activate JobCreator
JobCreator->>Store: Upsert Release
JobCreator->>Store: Create Job
JobCreator->>EventProducer: ProduceEvent(job.created)
activate EventProducer
EventProducer->>EventProducer: Queue & Process<br/>message
deactivate EventProducer
JobCreator->>JobCreator: Dispatch Job<br/>(if applicable)
deactivate JobCreator
deactivate EventHandler
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
📊 DB Package Test Coveragepkg/db coverage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 22
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (7)
apps/workspace-engine/pkg/workspace/store/environments_bench_test.go (2)
125-129: Syntax bug: invalid for loop.Go benchmarks must use for i := 0; i < b.N; i++ { … }. for range b.N won’t compile.
Apply:
- // Run benchmark - for range b.N { + // Run benchmark + for i := 0; i < b.N; i++ { ctx := context.Background() _, err := recomputeFunc(ctx) if err != nil { b.Fatalf("Recompute failed: %v", err) } }
323-333: Syntax bug: testing.B has no Loop().Replace with the standard loop.
- // Run benchmark - for b.Loop() { + // Run benchmark + for i := 0; i < b.N; i++ { ctx := context.Background() _, err := recomputeFunc(ctx) if err != nil { b.Fatalf("Recompute failed: %v", err) } }apps/workspace-engine/pkg/ticker/ticker_test.go (1)
63-68: Add a workspace cleanup function to the package.The review comment is correct. The global
workspacesregistry (line 218 of workspace.go) persists across tests, and there is no cleanup API (Delete,Remove,Reset, orClear). Without cleanup,TestEmitTicks_NoWorkspaceswill fail if run afterTestEmitTicks_MultipleWorkspacesregisters workspaces, causing test flakiness.Add an exported cleanup function to
apps/workspace-engine/pkg/workspace/workspace.go(e.g.,Delete(id string)orClear()to wipe all workspaces), then use it in the test's defer block:defer func() { workspace.Delete("ws-1") workspace.Delete("ws-2") workspace.Delete("ws-3") }()Or, if adding a
Clear()function for all workspaces:defer workspace.Clear()apps/workspace-engine/test/e2e/engine_deployment_test.go (1)
485-492: Benchmark won’t compile: invalid loops and wrong event type in warmup.
for range 100is invalid Go.b.Loop()doesn’t exist; useb.N.- Warmup produces
DeploymentCreatewith a Resource.Fix:
ctx := context.Background() - for range 100 { - engine.PushEvent(ctx, handler.DeploymentCreate, c.NewResource(workspaceID)) - } + for i := 0; i < 100; i++ { + engine.PushEvent(ctx, handler.DeploymentCreate, c.NewDeployment(workspaceID)) + } b.ResetTimer() - for b.Loop() { - engine.PushEvent(ctx, handler.DeploymentCreate, c.NewDeployment(workspaceID)) - } + for i := 0; i < b.N; i++ { + engine.PushEvent(ctx, handler.DeploymentCreate, c.NewDeployment(workspaceID)) + }apps/workspace-engine/pkg/kafka/kafka.go (1)
130-137: Workspace ID logging should be string, not []byte.You log msg.Key directly (a []byte). Cast to string for readability and to avoid odd logs.
Apply:
- if ws == nil { - log.Error("Workspace not found", "workspaceID", msg.Key) - continue - } + if ws == nil { + log.Error("Workspace not found", "workspaceID", string(msg.Key)) + continue + }apps/workspace-engine/pkg/events/handler/handler.go (2)
127-134: Avoid attaching full message payload as tracing attribute.attribute "event.data" with full msg.Value can be large and may contain sensitive data. Prefer hashing or size-limited snippet.
-attribute.String("event.data", string(msg.Value)), +attribute.String("event.data.sha256", fmt.Sprintf("%x", sha256.Sum256(msg.Value))),(Import crypto/sha256)
146-151: Use Int64 for timestamp attribute.You’re encoding Unix seconds into a Float64 attribute; use Int64 to avoid precision loss.
-attribute.Float64("event.timestamp", float64(msg.Timestamp.Unix())), +attribute.Int64("event.timestamp", msg.Timestamp.Unix()),
🧹 Nitpick comments (21)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.go (2)
15-19: Avoid duplicating EventProducer and JobCreatedEventData types.These also exist elsewhere (events and handler packages). Centralize them (e.g., in pkg/events) and import here to prevent divergence and circular deps later.
Would you like a follow-up PR to introduce pkg/events/types.go and wire usages?
Also applies to: 36-40, 71-81
63-69: Telemetry nit: use numeric attribute for counts.Prefer attribute.Int for variables.count.
- span.SetAttributes( - attribute.Bool("job.created", true), - attribute.String("job.id", newJob.Id), - attribute.String("job.status", string(newJob.Status)), - ) + span.SetAttributes( + attribute.Bool("job.created", true), + attribute.String("job.id", newJob.Id), + attribute.String("job.status", string(newJob.Status)), + ) ... - _, span := tracer.Start(ctx, "BuildRelease", + _, span := tracer.Start(ctx, "BuildRelease", trace.WithAttributes( attribute.String("deployment.id", releaseTarget.DeploymentId), attribute.String("environment.id", releaseTarget.EnvironmentId), attribute.String("resource.id", releaseTarget.ResourceId), attribute.String("version.id", version.Id), attribute.String("version.tag", version.Tag), - attribute.String("variables.count", fmt.Sprintf("%d", len(variables))), + attribute.Int("variables.count", len(variables)), ))Also applies to: 95-104
apps/workspace-engine/test/e2e/engine_deployment_test.go (2)
149-163: Harden against nil deref and missing entity checks.You dereference
*d1.JobAgentId/*d2.JobAgentIdand ignore theexistsflag fromGet. If these are nil/missing, tests panic. Prefer explicit assertions:- d1, _ := engine.Workspace().Deployments().Get(deploymentID1) - if *d1.JobAgentId != jobAgentID1 { + d1, ok := engine.Workspace().Deployments().Get(deploymentID1) + require.True(t, ok, "deployment 1 not found") + require.NotNil(t, d1.JobAgentId, "deployment 1 job agent should be set") + if *d1.JobAgentId != jobAgentID1 { ... - d2, _ := engine.Workspace().Deployments().Get(deploymentID2) - if *d2.JobAgentId != jobAgentID2 { + d2, ok := engine.Workspace().Deployments().Get(deploymentID2) + require.True(t, ok, "deployment 2 not found") + require.NotNil(t, d2.JobAgentId, "deployment 2 job agent should be set") + if *d2.JobAgentId != jobAgentID2 { ... - d3, _ := engine.Workspace().Deployments().Get(deploymentID3) - if d3.JobAgentId != nil { + d3, ok := engine.Workspace().Deployments().Get(deploymentID3) + require.True(t, ok, "deployment 3 not found") + if d3.JobAgentId != nil {
43-52: Assert entity existence to avoid nil access; fix error message.Use the boolean from
Getand avoid the ambiguous “same id” message:-engineD1, _ := engine.Workspace().Deployments().Get(deploymentID1) -engineD2, _ := engine.Workspace().Deployments().Get(deploymentID2) +engineD1, ok := engine.Workspace().Deployments().Get(deploymentID1) +require.True(t, ok, "deployment %s not found", deploymentID1) +engineD2, ok := engine.Workspace().Deployments().Get(deploymentID2) +require.True(t, ok, "deployment %s not found", deploymentID2) -if engineD1.Id != deploymentID1 { - t.Fatalf("deployments have the same id") +if engineD1.Id != deploymentID1 { + t.Fatalf("deployment 1 id mismatch: got %s, want %s", engineD1.Id, deploymentID1) } -if engineD2.Id != deploymentID2 { - t.Fatalf("deployments have the same id") +if engineD2.Id != deploymentID2 { + t.Fatalf("deployment 2 id mismatch: got %s, want %s", engineD2.Id, deploymentID2) }apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager_test.go (1)
22-31: Align Store workspace ID with entities to avoid hidden mismatches.You instantiate
store.New("test-workspace")but create entities under a differentworkspaceID. If any logic starts usingStore.WorkspaceID(), tests may produce inconsistent results.Use the same ID:
-func setupTestStore(t *testing.T) (*store.Store, string, string, string, string) { - ctx := context.Background() - st := store.New("test-workspace") - - workspaceID := uuid.New().String() +func setupTestStore(t *testing.T) (*store.Store, string, string, string, string) { + ctx := context.Background() + workspaceID := uuid.New().String() + st := store.New(workspaceID)apps/workspace-engine/pkg/server/openapi/utils/utils.go (1)
21-23: Prefer DI for EventProducer.Thread a server-scoped producer into this helper rather than passing nil to enable write flows without refactoring later.
apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go (1)
737-740: Handle time zone load errors to avoid panics/skewed assertions.
time.LoadLocationerrors are ignored. On minimal images, these can fail.-estLoc, _ := time.LoadLocation("America/New_York") -pstLoc, _ := time.LoadLocation("America/Los_Angeles") +estLoc, err := time.LoadLocation("America/New_York") +require.NoError(t, err) +pstLoc, err := time.LoadLocation("America/Los_Angeles") +require.NoError(t, err)Optionally fall back to
time.UTCif strictness isn’t required.apps/workspace-engine/pkg/workspace/store/store.go (1)
72-74: WorkspaceID isn’t serialized; ensure it’s set after decode.
GobEncode/Decodeonly handle the repo. If a Store is decoded from bytes without being constructed viaNew(id),workspaceIDremains empty.
- Verify that all decode paths create the Store with
New(id)(as in workspace-level encode/decode), or- Persist and restore
workspaceID(mind gob compatibility), or- Set
workspaceIDpost-decode from the owning workspace.If you want to persist it now without breaking existing blobs, consider keeping the current format and explicitly setting
s.workspaceIDfrom the caller afterGobDecode.Also applies to: 80-131
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
127-134: Direct job mutation may bypass store invariantsCancelling jobs by directly setting fields (Status/UpdatedAt) on pointers returned from the store risks breaking indices/derived views if the store maintains secondary state. Prefer a store method (e.g., Jobs.Cancel(job)) or an Upsert that also updates indexes and emits change notifications.
apps/workspace-engine/test/integration/workspace.go (2)
49-58: Eliminate race on eventListener init in handler closureprocessEvents starts immediately; the first event could hit memoryEventHandler before eventListener is assigned. You guard with an error, but that drops the event. Gate handler execution on a ready barrier.
@@ - var eventListener *handler.EventListener - memoryEventHandler := func(ctx context.Context, msg *kafka.Message, offsetTracker handler.OffsetTracker) error { - if eventListener == nil { - return fmt.Errorf("event listener not initialized") - } - _, err := eventListener.ListenAndRoute(ctx, msg, offsetTracker) - return err - } + var eventListener *handler.EventListener + ready := make(chan struct{}) + memoryEventHandler := func(ctx context.Context, msg *kafka.Message, offsetTracker handler.OffsetTracker) error { + <-ready + _, err := eventListener.ListenAndRoute(ctx, msg, offsetTracker) + return err + } @@ - eventProducer := events.NewInMemoryProducer(ctx, memoryEventHandler) - eventListener = events.NewEventHandler(eventProducer) + eventProducer := events.NewInMemoryProducer(ctx, memoryEventHandler) + eventListener = events.NewEventHandler(eventProducer) + close(ready)
162-174: Auto-flush on test cleanup to reduce flakinessAdd a Cleanup to flush pending events/materializations at the end of each test.
@@ func NewTestWorkspace( } return tw } + +// In NewTestWorkspace, before 'return tw': + t.Cleanup(func() { + tw.Flush() + })apps/workspace-engine/test/integration/kafka/kafka.go (1)
23-36: Default topic fallback for robustnessIf msg.TopicPartition.Topic is nil, set it to the mock’s default topic to match typical producer behavior and avoid nil derefs in consumers.
func (p *ProducerMock) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error { - p.queue.Push(msg) + if msg.TopicPartition.Topic == nil { + msg.TopicPartition.Topic = &p.topic + } + p.queue.Push(msg) @@ if deliveryChan != nil { go func(m *kafka.Message, ch chan kafka.Event) { cp := *m cp.TopicPartition.Error = nil ch <- &cp }(msg, deliveryChan) } return nil }apps/workspace-engine/pkg/workspace/workspace.go (1)
233-245: Optionally upgrade existing workspace with a newly provided producerIf a workspace already exists without a producer and a later call supplies one, consider wiring it in and rebuilding the ReleaseManager to enable job creation without restart.
func GetWorkspaceAndLoad(id string, eventProducer EventProducer) (*Workspace, error) { workspace, _ := workspaces.Get(id) if workspace == nil { workspace, err := NewAndLoad(context.Background(), id, eventProducer) @@ return workspace, err } - + if eventProducer != nil && workspace.eventProducer == nil { + workspace.eventProducer = eventProducer + workspace.releasemanager = releasemanager.New(workspace.store, eventProducer) + } return workspace, nil } @@ func GetNoFlushWorkspace(id string, eventProducer EventProducer) *Workspace { workspace, _ := workspaces.Get(id) if workspace == nil { workspace = NewNoFlush(id, eventProducer) workspaces.Set(id, workspace) - - return workspace - } - - return workspace + return workspace + } + if eventProducer != nil && workspace.eventProducer == nil { + workspace.eventProducer = eventProducer + workspace.releasemanager = releasemanager.New(workspace.store, eventProducer) + } + return workspace }Also applies to: 247-257
apps/workspace-engine/pkg/workspace/releasemanager/manager_test.go (2)
127-183: Workspace ID mismatch in fixtures (nit).You initialize store with "test-workspace" but create entities with a different random workspaceID. Not harmful for these tests, but confusing.
Either use the store’s workspaceID for entities or add a brief comment explaining the divergence.
589-622: Concurrent access test: reduce flakiness.Reliance on time.Sleep can be flaky. Prefer waiting on a condition (event count increases) with a timeout loop.
apps/workspace-engine/test/integration/kafka/consumer.go (1)
7-9: Drop testify/mock dependency (unused).The embedded *mock.Mock isn’t used. Removing it simplifies the mock and dependencies.
- "github.com/stretchr/testify/mock" @@ -type ConsumerMock struct { - *mock.Mock +type ConsumerMock struct {Also applies to: 14-15
apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator_test.go (2)
20-25: Unify and harden MockEventProducer used in tests.There’s a second MockEventProducer here differing from the one in manager_test.go (no mutex, fewer features). This duplication invites drift.
- Extract a single test helper (e.g., apps/workspace-engine/test/helpers/mock_event_producer.go) used by both packages.
- Add a mutex to make it safe if tests become concurrent.
Example minimal hardening:type MockEventProducer struct { - events []MockEvent + mu sync.Mutex + events []MockEvent shouldError bool errorMsg string } @@ func (m *MockEventProducer) ProduceEvent(eventType string, workspaceID string, data any) error { + m.mu.Lock() + defer m.mu.Unlock() if m.shouldError { return fmt.Errorf("%s", m.errorMsg) } m.events = append(m.events, MockEvent{ ... }) return nil } @@ func (m *MockEventProducer) GetEvents() []MockEvent { - return append([]MockEvent{}, m.events...) + m.mu.Lock(); defer m.mu.Unlock() + return append([]MockEvent{}, m.events...) }Also applies to: 35-41, 58-79
532-545: Strengthen variable cloning assertion.Current test only checks presence. Verify immutability by comparing serialized values before/after original mutation.
-assert.NotNil(t, release.Variables["key1"]) +before := release.Variables["key1"] +_ = originalValue.FromStringValue("modified") +after := release.Variables["key1"] +assert.Equal(t, before, after, "release variable should not change when original is mutated")apps/workspace-engine/pkg/events/producer.go (3)
13-17: Stronger type-safety for event type.Prefer handler.EventType over string in the EventProducer API to catch invalid event names at compile time and reduce casting at call sites. Consider:
- type EventProducer interface { ProduceEvent(eventType handler.EventType, workspaceID string, data any) error }
35-50: Channel capacity should be configurable.Buffer 1000 may be insufficient when handlers emit cascading events; risk of deadlock if the buffer fills while processEvents is executing a handler. Expose capacity via constructor arg or option (e.g., WithQueueSize(n)).
Do cascades in your tests ever exceed 1000 messages?
22-31: Cascading events risk filling the buffer (potential deadlock).Handler runs on the same goroutine that drains the queue; if handler emits >cap(queue) additional events before returning, ProduceEvent will block and the system can deadlock. Options:
- Increase capacity and make it configurable.
- Provide a non-blocking TryProduceEvent for tests that can tolerate backpressure.
- Use a small worker pool to decouple handler execution from queue draining (at the cost of strict ordering).
Do any handlers emit long cascades (O(1k)+) in your tests?
Also applies to: 41-42, 52-70
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (32)
apps/workspace-engine/main.go(1 hunks)apps/workspace-engine/pkg/db/jobs_test.go(15 hunks)apps/workspace-engine/pkg/events/events.go(2 hunks)apps/workspace-engine/pkg/events/handler/handler.go(3 hunks)apps/workspace-engine/pkg/events/handler/jobs/jobs.go(1 hunks)apps/workspace-engine/pkg/events/producer.go(1 hunks)apps/workspace-engine/pkg/kafka/kafka.go(4 hunks)apps/workspace-engine/pkg/server/openapi/utils/utils.go(1 hunks)apps/workspace-engine/pkg/ticker/ticker_test.go(1 hunks)apps/workspace-engine/pkg/workspace/jobdispatch/github_test.go(4 hunks)apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.go(1 hunks)apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator_test.go(1 hunks)apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go(0 hunks)apps/workspace-engine/pkg/workspace/releasemanager/manager.go(4 hunks)apps/workspace-engine/pkg/workspace/releasemanager/manager_test.go(1 hunks)apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/approval/approval_test.go(3 hunks)apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environmentprogression_test.go(1 hunks)apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/skipdeployed/skipdeployed_test.go(1 hunks)apps/workspace-engine/pkg/workspace/releasemanager/targets/taint_test.go(13 hunks)apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager_test.go(7 hunks)apps/workspace-engine/pkg/workspace/releasemanager/variables/variablemanager_test.go(2 hunks)apps/workspace-engine/pkg/workspace/store/environments_bench_test.go(3 hunks)apps/workspace-engine/pkg/workspace/store/store.go(3 hunks)apps/workspace-engine/pkg/workspace/workspace.go(5 hunks)apps/workspace-engine/test/e2e/engine_deployment_test.go(11 hunks)apps/workspace-engine/test/e2e/engine_kafka_replay_test.go(11 hunks)apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go(10 hunks)apps/workspace-engine/test/integration/kafka/README.md(1 hunks)apps/workspace-engine/test/integration/kafka/consumer.go(1 hunks)apps/workspace-engine/test/integration/kafka/kafka.go(1 hunks)apps/workspace-engine/test/integration/kafka/queue.go(1 hunks)apps/workspace-engine/test/integration/workspace.go(4 hunks)
💤 Files with no reviewable changes (1)
- apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go
🧰 Additional context used
📓 Path-based instructions (3)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager_test.goapps/workspace-engine/pkg/workspace/jobdispatch/github_test.goapps/workspace-engine/pkg/ticker/ticker_test.goapps/workspace-engine/pkg/events/handler/jobs/jobs.goapps/workspace-engine/test/e2e/engine_deployment_test.goapps/workspace-engine/pkg/db/jobs_test.goapps/workspace-engine/pkg/workspace/store/environments_bench_test.goapps/workspace-engine/main.goapps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/skipdeployed/skipdeployed_test.goapps/workspace-engine/pkg/workspace/releasemanager/targets/taint_test.goapps/workspace-engine/test/integration/kafka/queue.goapps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.goapps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.goapps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/approval/approval_test.goapps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environmentprogression_test.goapps/workspace-engine/pkg/events/events.goapps/workspace-engine/pkg/workspace/workspace.goapps/workspace-engine/pkg/workspace/store/store.goapps/workspace-engine/pkg/workspace/releasemanager/variables/variablemanager_test.goapps/workspace-engine/pkg/kafka/kafka.goapps/workspace-engine/pkg/workspace/releasemanager/manager.goapps/workspace-engine/test/integration/kafka/consumer.goapps/workspace-engine/pkg/server/openapi/utils/utils.goapps/workspace-engine/pkg/events/handler/handler.goapps/workspace-engine/test/integration/workspace.goapps/workspace-engine/pkg/workspace/releasemanager/deployment/creator_test.goapps/workspace-engine/test/integration/kafka/kafka.goapps/workspace-engine/pkg/workspace/releasemanager/manager_test.goapps/workspace-engine/test/e2e/engine_kafka_replay_test.goapps/workspace-engine/pkg/events/producer.go
apps/workspace-engine/**/*_test.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
Follow the existing test structure used in *_test.go files
Files:
apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager_test.goapps/workspace-engine/pkg/workspace/jobdispatch/github_test.goapps/workspace-engine/pkg/ticker/ticker_test.goapps/workspace-engine/test/e2e/engine_deployment_test.goapps/workspace-engine/pkg/db/jobs_test.goapps/workspace-engine/pkg/workspace/store/environments_bench_test.goapps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/skipdeployed/skipdeployed_test.goapps/workspace-engine/pkg/workspace/releasemanager/targets/taint_test.goapps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.goapps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/approval/approval_test.goapps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environmentprogression_test.goapps/workspace-engine/pkg/workspace/releasemanager/variables/variablemanager_test.goapps/workspace-engine/pkg/workspace/releasemanager/deployment/creator_test.goapps/workspace-engine/pkg/workspace/releasemanager/manager_test.goapps/workspace-engine/test/e2e/engine_kafka_replay_test.go
**/*.{js,jsx,ts,tsx,json,md,yml,yaml}
📄 CodeRabbit inference engine (CLAUDE.md)
Formatting: Prettier is used with
@ctrlplane/prettier-config
Files:
apps/workspace-engine/test/integration/kafka/README.md
🧠 Learnings (1)
📚 Learning: 2025-08-12T18:13:54.630Z
Learnt from: CR
PR: ctrlplanedev/ctrlplane#0
File: apps/workspace-engine/CLAUDE.md:0-0
Timestamp: 2025-08-12T18:13:54.630Z
Learning: Applies to apps/workspace-engine/**/*_test.go : Follow the existing test structure used in *_test.go files
Applied to files:
apps/workspace-engine/pkg/ticker/ticker_test.goapps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/approval/approval_test.go
🧬 Code graph analysis (29)
apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
New(13-37)apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager.go (1)
New(17-22)
apps/workspace-engine/pkg/workspace/jobdispatch/github_test.go (3)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
New(49-68)apps/workspace-engine/pkg/workspace/store/store.go (1)
New(13-37)apps/workspace-engine/pkg/workspace/workspace.go (1)
New(26-39)
apps/workspace-engine/pkg/ticker/ticker_test.go (2)
apps/workspace-engine/pkg/workspace/workspace.go (2)
Set(221-223)New(26-39)apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
New(49-68)
apps/workspace-engine/pkg/events/handler/jobs/jobs.go (10)
apps/workspace-engine/pkg/oapi/oapi.gen.go (12)
Job(230-242)ReleaseId(55-55)Id(51-51)Status(57-57)InvalidJobAgent(40-40)InvalidIntegration(39-39)UpdatedAt(58-58)Release(397-403)ReleaseTarget(406-410)Pending(41-41)Cancelled(35-35)JobAgentId(53-53)apps/workspace-engine/pkg/workspace/workspace.go (1)
Workspace(70-77)apps/workspace-engine/pkg/events/handler/handler.go (1)
RawEvent(86-91)apps/workspace-engine/pkg/workspace/store/store.go (1)
Store(39-62)apps/workspace-engine/pkg/workspace/store/releases.go (1)
Releases(16-18)apps/workspace-engine/pkg/workspace/store/jobs.go (1)
Jobs(16-18)apps/workspace-engine/pkg/workspace/releasemanager/deployment/jobs/dispatcher.go (1)
ErrUnsupportedJobAgent(25-25)apps/workspace-engine/pkg/workspace/store/job_agents.go (1)
JobAgents(16-18)apps/workspace-engine/pkg/workspace/jobdispatch/types.go (1)
JobAgentTypeGithub(6-6)apps/workspace-engine/pkg/workspace/jobdispatch/github.go (1)
NewGithubDispatcher(57-62)
apps/workspace-engine/test/e2e/engine_deployment_test.go (2)
apps/workspace-engine/test/integration/opts.go (7)
DeploymentCelResourceSelector(322-328)WithDeploymentVersion(361-375)DeploymentVersionTag(385-389)WithDeployment(259-277)WithEnvironment(279-293)EnvironmentCelResourceSelector(429-435)WithResource(107-132)apps/workspace-engine/pkg/workspace/store/release_targets.go (1)
ReleaseTargets(27-31)
apps/workspace-engine/pkg/db/jobs_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
New(13-37)apps/workspace-engine/pkg/workspace/workspace.go (1)
New(26-39)
apps/workspace-engine/pkg/workspace/store/environments_bench_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
New(13-37)apps/workspace-engine/pkg/workspace/workspace.go (1)
New(26-39)
apps/workspace-engine/main.go (1)
apps/workspace-engine/pkg/kafka/kafka.go (1)
RunConsumer(76-213)
apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/skipdeployed/skipdeployed_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
New(13-37)apps/workspace-engine/pkg/workspace/workspace.go (1)
New(26-39)
apps/workspace-engine/pkg/workspace/releasemanager/targets/taint_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
New(13-37)apps/workspace-engine/pkg/workspace/workspace.go (1)
New(26-39)
apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go (3)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
New(49-68)apps/workspace-engine/pkg/workspace/store/store.go (1)
New(13-37)apps/workspace-engine/pkg/workspace/workspace.go (2)
New(26-39)NewNoFlush(56-68)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.go (5)
apps/workspace-engine/pkg/workspace/store/store.go (1)
Store(39-62)apps/workspace-engine/pkg/workspace/releasemanager/deployment/jobs/factory.go (2)
Factory(20-22)NewFactory(25-29)apps/workspace-engine/pkg/events/producer.go (1)
EventProducer(15-17)apps/workspace-engine/pkg/oapi/oapi.gen.go (3)
Release(397-403)LiteralValue(290-292)CreatedAt(49-49)apps/workspace-engine/pkg/events/handler/jobs/jobs.go (1)
JobCreatedEventData(21-23)
apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/approval/approval_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
New(13-37)apps/workspace-engine/pkg/workspace/workspace.go (1)
New(26-39)
apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environmentprogression_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
New(13-37)apps/workspace-engine/pkg/workspace/workspace.go (1)
New(26-39)
apps/workspace-engine/pkg/events/events.go (2)
apps/workspace-engine/pkg/events/handler/handler.go (4)
JobCreate(60-60)EventProducer(106-108)EventListener(100-103)NewEventListener(111-117)apps/workspace-engine/pkg/events/handler/jobs/jobs.go (1)
HandleJobCreated(31-76)
apps/workspace-engine/pkg/workspace/workspace.go (3)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (2)
New(49-68)EventProducer(25-27)apps/workspace-engine/pkg/workspace/store/store.go (1)
New(13-37)apps/workspace-engine/pkg/db/changeset.go (1)
NewChangesetConsumer(165-170)
apps/workspace-engine/pkg/workspace/store/store.go (3)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
New(49-68)apps/workspace-engine/pkg/workspace/workspace.go (1)
New(26-39)apps/workspace-engine/pkg/workspace/store/repository/repo.go (2)
New(60-78)Repository(80-101)
apps/workspace-engine/pkg/workspace/releasemanager/variables/variablemanager_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
New(13-37)apps/workspace-engine/pkg/workspace/workspace.go (1)
New(26-39)
apps/workspace-engine/pkg/kafka/kafka.go (3)
apps/workspace-engine/pkg/events/handler/handler.go (1)
EventProducer(106-108)apps/workspace-engine/pkg/workspace/workspace.go (2)
EventProducer(19-21)GetWorkspaceAndLoad(233-245)apps/workspace-engine/pkg/events/events.go (1)
NewEventHandler(85-87)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (2)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.go (3)
JobCreator(21-25)EventProducer(16-18)NewJobCreator(28-34)apps/workspace-engine/pkg/workspace/workspace.go (2)
New(26-39)EventProducer(19-21)
apps/workspace-engine/test/integration/kafka/consumer.go (2)
apps/workspace-engine/test/integration/kafka/queue.go (1)
MessageQueue(11-15)apps/workspace-engine/pkg/kafka/kafka.go (1)
Topic(23-23)
apps/workspace-engine/pkg/server/openapi/utils/utils.go (1)
apps/workspace-engine/pkg/workspace/workspace.go (1)
GetWorkspaceAndLoad(233-245)
apps/workspace-engine/pkg/events/handler/handler.go (1)
apps/workspace-engine/pkg/workspace/workspace.go (2)
EventProducer(19-21)GetWorkspaceAndLoad(233-245)
apps/workspace-engine/test/integration/workspace.go (6)
apps/workspace-engine/pkg/events/handler/handler.go (3)
EventProducer(106-108)EventListener(100-103)OffsetTracker(119-123)apps/workspace-engine/pkg/events/producer.go (3)
EventProducer(15-17)NewInMemoryProducer(37-50)InMemoryProducer(25-31)apps/workspace-engine/pkg/workspace/workspace.go (2)
EventProducer(19-21)GetNoFlushWorkspace(247-257)apps/workspace-engine/pkg/events/events.go (1)
NewEventHandler(85-87)apps/workspace-engine/pkg/workspace/store/store.go (1)
Store(39-62)apps/workspace-engine/pkg/workspace/store/release_targets.go (1)
ReleaseTargets(27-31)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator_test.go (3)
apps/workspace-engine/pkg/workspace/releasemanager/manager_test.go (3)
MockEvent(33-38)NewMockEventProducer(41-45)MockEventProducer(24-30)apps/workspace-engine/pkg/workspace/store/store.go (2)
Store(39-62)New(13-37)apps/workspace-engine/pkg/oapi/oapi.gen.go (3)
Id(51-51)CreatedAt(49-49)LiteralValue(290-292)
apps/workspace-engine/test/integration/kafka/kafka.go (1)
apps/workspace-engine/test/integration/kafka/queue.go (1)
MessageQueue(11-15)
apps/workspace-engine/pkg/workspace/releasemanager/manager_test.go (4)
apps/workspace-engine/pkg/workspace/store/store.go (2)
Store(39-62)New(13-37)apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
New(49-68)apps/workspace-engine/pkg/changeset/changeset.go (3)
NewChangeSet(32-36)ChangeTypeUpdate(12-12)ChangeTypeCreate(11-11)apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.go (1)
JobCreatedEventData(37-39)
apps/workspace-engine/test/e2e/engine_kafka_replay_test.go (3)
apps/workspace-engine/pkg/workspace/workspace.go (2)
GetWorkspaceAndLoad(233-245)New(26-39)apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
New(49-68)apps/workspace-engine/pkg/kafka/kafka.go (1)
RunConsumer(76-213)
apps/workspace-engine/pkg/events/producer.go (2)
apps/workspace-engine/pkg/events/handler/handler.go (3)
OffsetTracker(119-123)RawEvent(86-91)EventType(21-21)apps/workspace-engine/pkg/kafka/kafka.go (1)
Topic(23-23)
🔇 Additional comments (16)
apps/workspace-engine/pkg/workspace/store/environments_bench_test.go (1)
68-69: Constructor signature update looks good.Passing workspaceID into store.New aligns with the new API. No issues.
Also applies to: 169-171, 238-239
apps/workspace-engine/pkg/ticker/ticker_test.go (1)
58-61: Workspace constructor update OK.Using workspace.New("ws-*", nil) matches the new signature.
apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environmentprogression_test.go (1)
13-13: LGTM! Store initialization updated correctly.The test now correctly passes a workspace ID to
store.New(), aligning with the updated constructor signature that requires a workspace identifier.apps/workspace-engine/pkg/workspace/jobdispatch/github_test.go (1)
40-40: LGTM! Test store initialization updated consistently.All test functions correctly initialize the store with the workspace ID, consistent with the updated
store.New(wsId string)constructor signature.Also applies to: 95-95, 135-135, 403-403
apps/workspace-engine/pkg/workspace/releasemanager/variables/variablemanager_test.go (1)
14-14: LGTM! Test initialization updated correctly.Both the helper function and test properly initialize the store with the required workspace ID parameter.
Also applies to: 390-390
apps/workspace-engine/pkg/workspace/releasemanager/targets/taint_test.go (1)
144-144: LGTM! All test store initializations updated correctly.All test functions across the file consistently initialize stores with the workspace ID, properly adapting to the updated
store.New()API.Also applies to: 177-177, 210-210, 244-244, 277-277, 310-310, 344-344, 385-385, 412-412, 455-455, 480-480, 516-516, 534-534
apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/approval/approval_test.go (1)
12-12: LGTM! Store initialization updated correctly.The helper function and test cases properly initialize stores with the workspace ID, aligning with the updated constructor signature.
Also applies to: 192-192, 247-247
apps/workspace-engine/pkg/db/jobs_test.go (1)
240-240: LGTM! Test store initialization properly scoped to workspaces.All test store initializations correctly pass the workspace ID, with multi-workspace tests appropriately using distinct workspace IDs (workspaceID1, workspaceID2) to ensure proper isolation.
Also applies to: 311-311, 395-395, 498-498, 580-580, 659-659, 725-725, 810-810, 820-820, 946-946, 1109-1109, 1186-1186, 1245-1245, 1350-1350, 1447-1447
apps/workspace-engine/main.go (1)
167-167: LGTM! Producer correctly threaded through consumer.The Kafka producer is now passed to
RunConsumer, enabling the event-driven job creation pattern introduced in this PR. The producer serves dual purposes:
- Producing periodic ticker events (lines 149, 157)
- Enabling the consumer to produce new events (e.g., job.created) during event processing
This design correctly implements the event-driven architecture described in the PR objectives.
apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/skipdeployed/skipdeployed_test.go (1)
13-13: LGTM! Store initialization updated correctly.The helper function properly initializes the store with a workspace ID, consistent with the updated
store.New(wsId string)API.apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager_test.go (1)
231-240: Empty-store path OK.Using
store.New("test-workspace")in the empty-case test is fine since no entities are created.apps/workspace-engine/pkg/events/events.go (1)
60-62: Good addition: JobCreate handler registered.apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
252-257: Intentional design: nil producer limited to read-only paths; optional sentinel error for clarityThe nil eventProducer pattern is intentional and limited in scope: tests use nil for replay/recovery scenarios, and the server code explicitly documents that read-only operations pass nil as a temporary measure. The hard error at lines 252–257 acts as a safety guard and will only trigger if release operations are invoked on nil-producer workspaces, which is not the case in current code paths.
The suggested sentinel error (e.g.,
ErrJobCreationDisabled) remains a reasonable optional refactor for distinguishing configuration failures from operational failures in upstream handling, but is not required for correctness given the read-only constraint.apps/workspace-engine/test/e2e/engine_kafka_replay_test.go (1)
48-49: Nil EventProducer disables job dispatch; ensure handlers never deref itThese tests pass nil for EventProducer, which is fine for replay/state assertions but will disable job creation and may panic if any handler unconditionally calls ProduceEvent. Verify handlers guard against a nil producer or consider using a no-op producer for tests that don’t care about emissions.
Quick scan helper:
Also applies to: 91-92, 285-286, 305-306, 310-311, 314-315, 423-424, 495-496, 531-532, 589-590, 647-648, 696-697
apps/workspace-engine/pkg/events/handler/handler.go (1)
101-108: The EventProducer duplication is necessary and intentional—consolidation would create a circular import.The events/producer.go package already imports pkg/events/handler to use handler.OffsetTracker, handler.RawEvent, and handler.EventType. If handler.go were to import events.EventProducer (as suggested), this would create a circular dependency: events → handler → events. The existing duplicate interface and its comment correctly document this architectural constraint. The code is correct as-is.
Likely an incorrect or invalid review comment.
apps/workspace-engine/pkg/kafka/kafka.go (1)
144-145: Review comment contains incorrect type identification; the code functions correctly via Go's structural typing.The review comment claims that
events.NewEventHandlerreceiveshandler.EventProducer, but the actual flow passeskafka.EventProducerfrommain.go. Due to Go's structural typing,kafka.EventProducer(which includesProduceEvent,Flush, andClosemethods) satisfies bothevents.EventProducerandhandler.EventProducerinterfaces (both requiring onlyProduceEvent). This works without issues.While the underlying concern about multiple
EventProducerinterface definitions across packages is valid for long-term maintainability, the specific claim about a type mismatch between packages is incorrect—the code compiles and functions as intended.Likely an incorrect or invalid review comment.
| func NewEventHandler(eventProducer EventProducer) *handler.EventListener { | ||
| return handler.NewEventListener(handlers, eventProducer) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compile error: wrong type in signature.
EventProducer is defined in the handler package. Qualify the type.
-func NewEventHandler(eventProducer EventProducer) *handler.EventListener {
- return handler.NewEventListener(handlers, eventProducer)
+func NewEventHandler(eventProducer handler.EventProducer) *handler.EventListener {
+ return handler.NewEventListener(handlers, eventProducer)
}🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/events.go around lines 85 to 87, the
function signature uses an unqualified EventProducer type causing a compile
error; change the parameter type to handler.EventProducer (i.e., func
NewEventHandler(eventProducer handler.EventProducer) *handler.EventListener) so
it references the type from the handler package, and ensure the handler import
remains or is added if missing.
| ws, err := workspace.GetWorkspaceAndLoad(rawEvent.WorkspaceID, el.eventProducer) | ||
| if ws == nil { | ||
| return nil, fmt.Errorf("workspace not found: %s: %w", rawEvent.WorkspaceID, err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Potential type mismatch risk passing producer into workspace loader.
workspace.GetWorkspaceAndLoad requires workspace.EventProducer; el.eventProducer here is handler.EventProducer. Method sets match, but duplication increases risk. Same consolidation recommendation applies.
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/handler.go around lines 167 to 170,
the call to workspace.GetWorkspaceAndLoad passes el.eventProducer which is typed
as handler.EventProducer while the function expects workspace.EventProducer,
creating a risky implicit match; fix by aligning types: either change the
handler struct field to use workspace.EventProducer (preferred) or implement an
explicit adapter that converts handler.EventProducer to workspace.EventProducer
and pass that adapter; update imports and constructor/initialization sites to
use the consolidated workspace.EventProducer type so the compiler enforces
compatibility.
| var data JobCreatedEventData | ||
| if err := json.Unmarshal(event.Data, &data); err != nil { | ||
| return fmt.Errorf("failed to unmarshal job.created event: %w", err) | ||
| } | ||
|
|
||
| job := data.Job | ||
|
|
||
| // Get the release for this job to cancel outdated jobs | ||
| release, exists := ws.Store().Releases.Get(job.ReleaseId) | ||
| if !exists { | ||
| return fmt.Errorf("release %s not found for job %s", job.ReleaseId, job.Id) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate event payload before use.
If Data lacks job or ReleaseId, this panics/later fails. Guard early.
var data JobCreatedEventData
if err := json.Unmarshal(event.Data, &data); err != nil {
return fmt.Errorf("failed to unmarshal job.created event: %w", err)
}
job := data.Job
+if job == nil || job.Id == "" || job.ReleaseId == "" {
+ return fmt.Errorf("invalid job.created payload: missing job/id/releaseId")
+}
// Get the release for this job to cancel outdated jobs
release, exists := ws.Store().Releases.Get(job.ReleaseId)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var data JobCreatedEventData | |
| if err := json.Unmarshal(event.Data, &data); err != nil { | |
| return fmt.Errorf("failed to unmarshal job.created event: %w", err) | |
| } | |
| job := data.Job | |
| // Get the release for this job to cancel outdated jobs | |
| release, exists := ws.Store().Releases.Get(job.ReleaseId) | |
| if !exists { | |
| return fmt.Errorf("release %s not found for job %s", job.ReleaseId, job.Id) | |
| } | |
| var data JobCreatedEventData | |
| if err := json.Unmarshal(event.Data, &data); err != nil { | |
| return fmt.Errorf("failed to unmarshal job.created event: %w", err) | |
| } | |
| job := data.Job | |
| if job == nil || job.Id == "" || job.ReleaseId == "" { | |
| return fmt.Errorf("invalid job.created payload: missing job/id/releaseId") | |
| } | |
| // Get the release for this job to cancel outdated jobs | |
| release, exists := ws.Store().Releases.Get(job.ReleaseId) | |
| if !exists { | |
| return fmt.Errorf("release %s not found for job %s", job.ReleaseId, job.Id) | |
| } |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/jobs/jobs.go around lines 36 to 47,
the code unmarshals JobCreatedEventData and immediately uses data.Job and
data.Job.ReleaseId which can be missing/empty and cause panics or later
failures; add explicit validation after unmarshalling that data.Job is present
(or its required fields), that data.Job.ReleaseId is non-empty, and that
data.Job.Id is non-empty, returning a clear fmt.Errorf if any required field is
missing; then proceed to lookup the release only after these checks pass so the
function fails fast with a descriptive error instead of panicking later.
| if job.Status != oapi.InvalidJobAgent { | ||
| go func() { | ||
| if err := dispatchJob(ctx, ws, job); err != nil && !errors.Is(err, jobs.ErrUnsupportedJobAgent) { | ||
| log.Error("error dispatching job to integration", "error", err.Error()) | ||
| job.Status = oapi.InvalidIntegration | ||
| job.UpdatedAt = time.Now() | ||
| ws.Store().Jobs.Upsert(ctx, job) | ||
| } | ||
| }() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsupported job agent leaves job stuck in pending. Mark it InvalidJobAgent.
Swallowing ErrUnsupportedJobAgent without persisting state causes indefinite pending jobs.
- go func() {
- if err := dispatchJob(ctx, ws, job); err != nil && !errors.Is(err, jobs.ErrUnsupportedJobAgent) {
- log.Error("error dispatching job to integration", "error", err.Error())
- job.Status = oapi.InvalidIntegration
- job.UpdatedAt = time.Now()
- ws.Store().Jobs.Upsert(ctx, job)
- }
- }()
+ go func() {
+ if err := dispatchJob(ctx, ws, job); err != nil {
+ if errors.Is(err, jobs.ErrUnsupportedJobAgent) {
+ job.Status = oapi.InvalidJobAgent
+ } else {
+ log.Error("error dispatching job to integration", "error", err.Error())
+ job.Status = oapi.InvalidIntegration
+ }
+ job.UpdatedAt = time.Now()
+ ws.Store().Jobs.Upsert(ctx, job)
+ }
+ }()🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/jobs/jobs.go around lines 64 to 73,
the goroutine swallows ErrUnsupportedJobAgent and leaves the job pending; update
the error handling so that when dispatchJob returns ErrUnsupportedJobAgent you
set job.Status = oapi.InvalidJobAgent, set job.UpdatedAt = time.Now(), and
persist the change via ws.Store().Jobs.Upsert(ctx, job); for other errors keep
the existing InvalidIntegration handling. Ensure both error branches persist the
job and avoid duplicating Upsert logic by structuring the branches clearly.
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "time" | ||
| "workspace-engine/pkg/events/handler" | ||
|
|
||
| "github.com/confluentinc/confluent-kafka-go/v2/kafka" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Imports needed for safe concurrency and topic constant.
Add sync, sync/atomic, and the app Kafka package (aliased) to use the configured topic instead of a hardcoded string.
import (
"context"
"encoding/json"
"fmt"
+ "sync"
+ "sync/atomic"
"time"
"workspace-engine/pkg/events/handler"
+ appkafka "workspace-engine/pkg/kafka"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import ( | |
| "context" | |
| "encoding/json" | |
| "fmt" | |
| "time" | |
| "workspace-engine/pkg/events/handler" | |
| "github.com/confluentinc/confluent-kafka-go/v2/kafka" | |
| ) | |
| import ( | |
| "context" | |
| "encoding/json" | |
| "fmt" | |
| "sync" | |
| "sync/atomic" | |
| "time" | |
| "workspace-engine/pkg/events/handler" | |
| appkafka "workspace-engine/pkg/kafka" | |
| "github.com/confluentinc/confluent-kafka-go/v2/kafka" | |
| ) |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/producer.go around lines 3 to 11, the file
lacks imports for sync, sync/atomic and the application Kafka package alias
needed to use the configured topic instead of a hardcoded string; add imports
"sync" and "sync/atomic" and the app Kafka package aliased (for example
appkafka) to the import block, replace any hardcoded topic with the configured
value from the app Kafka package (e.g. appkafka.Topic or appkafka.Config.Topic),
and use sync/atomic to manage any producer-ready boolean and sync.Mutex or
sync.WaitGroup where concurrent access to the producer or close sequence occurs
to ensure safe concurrency.
| func setupBenchmarkStore(b *testing.B, workspaceID string, numResources int) (*Store, string) { | ||
| st := New() | ||
| st := New(workspaceID) | ||
|
|
||
| // Create system |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused parameter ‘b’ causes build failure.
setupBenchmarkStore doesn’t use b. Mark it used or drop it.
func setupBenchmarkStore(b *testing.B, workspaceID string, numResources int) (*Store, string) {
- st := New(workspaceID)
+ st := New(workspaceID)
+ _ = b // avoid unused param in benchmarks helper📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func setupBenchmarkStore(b *testing.B, workspaceID string, numResources int) (*Store, string) { | |
| st := New() | |
| st := New(workspaceID) | |
| // Create system | |
| func setupBenchmarkStore(b *testing.B, workspaceID string, numResources int) (*Store, string) { | |
| st := New(workspaceID) | |
| _ = b // avoid unused param in benchmarks helper | |
| // Create system |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/store/environments_bench_test.go around
lines 67 to 70, the benchmark helper setupBenchmarkStore accepts a *testing.B
parameter named b but does not use it, causing a build failure; either remove
the b parameter if benchmarks do not need it, or mark it as used (for example
assign to the blank identifier or call b.Helper()) to silence the unused
variable error—pick one approach and update the function signature and any
callers accordingly.
| // Reinitialize release manager with the decoded store | ||
| w.releasemanager = releasemanager.New(w.store) | ||
| // Use the workspace's existing event producer | ||
| if w.eventProducer != nil { | ||
| w.releasemanager = releasemanager.New(w.store, w.eventProducer) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix stale ReleaseManager after GobDecode when eventProducer is nil
releasemanager.Manager retains a pointer to the pre-decode store if you don’t reinitialize it, leading to state drift. Always reinitialize with the decoded store; eventProducer may be nil.
- // Reinitialize release manager with the decoded store
- // Use the workspace's existing event producer
- if w.eventProducer != nil {
- w.releasemanager = releasemanager.New(w.store, w.eventProducer)
- }
+ // Reinitialize release manager with the decoded store (eventProducer may be nil)
+ w.releasemanager = releasemanager.New(w.store, w.eventProducer)🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/workspace.go around lines 181 to 185, the
releasemanager is only reinitialized when w.eventProducer is non-nil which
leaves the Manager holding a pointer to the pre-decoded store; always
reinitialize releasemanager with the decoded w.store even if w.eventProducer is
nil. Replace the conditional so releasemanager.New is called unconditionally
(passing w.eventProducer which may be nil) to ensure the Manager references the
updated store.
| func (c *ConsumerMock) Poll(timeoutMs int) kafka.Event { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
|
|
||
| // If not yet assigned and subscribed, trigger assignment | ||
| if len(c.assignedPartitions) == 0 && len(c.subscribed) > 0 { | ||
| // Assign partition 0 for all subscribed topics | ||
| partitions := make([]kafka.TopicPartition, 0, len(c.subscribed)) | ||
| for _, topic := range c.subscribed { | ||
| topicCopy := topic | ||
| partitions = append(partitions, kafka.TopicPartition{ | ||
| Topic: &topicCopy, | ||
| Partition: 0, | ||
| Offset: kafka.OffsetBeginning, | ||
| }) | ||
| } | ||
| return kafka.AssignedPartitions{Partitions: partitions} | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cache assigned partitions in mock upon Poll.
Poll returns AssignedPartitions but doesn’t update internal state. Some code paths may rely on Assignment() later. Cache them for consistency.
if len(c.assignedPartitions) == 0 && len(c.subscribed) > 0 {
@@
- return kafka.AssignedPartitions{Partitions: partitions}
+ // Cache internally and emit the assignment event
+ c.assignedPartitions = partitions
+ return kafka.AssignedPartitions{Partitions: partitions}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (c *ConsumerMock) Poll(timeoutMs int) kafka.Event { | |
| c.mu.Lock() | |
| defer c.mu.Unlock() | |
| // If not yet assigned and subscribed, trigger assignment | |
| if len(c.assignedPartitions) == 0 && len(c.subscribed) > 0 { | |
| // Assign partition 0 for all subscribed topics | |
| partitions := make([]kafka.TopicPartition, 0, len(c.subscribed)) | |
| for _, topic := range c.subscribed { | |
| topicCopy := topic | |
| partitions = append(partitions, kafka.TopicPartition{ | |
| Topic: &topicCopy, | |
| Partition: 0, | |
| Offset: kafka.OffsetBeginning, | |
| }) | |
| } | |
| return kafka.AssignedPartitions{Partitions: partitions} | |
| } | |
| func (c *ConsumerMock) Poll(timeoutMs int) kafka.Event { | |
| c.mu.Lock() | |
| defer c.mu.Unlock() | |
| // If not yet assigned and subscribed, trigger assignment | |
| if len(c.assignedPartitions) == 0 && len(c.subscribed) > 0 { | |
| // Assign partition 0 for all subscribed topics | |
| partitions := make([]kafka.TopicPartition, 0, len(c.subscribed)) | |
| for _, topic := range c.subscribed { | |
| topicCopy := topic | |
| partitions = append(partitions, kafka.TopicPartition{ | |
| Topic: &topicCopy, | |
| Partition: 0, | |
| Offset: kafka.OffsetBeginning, | |
| }) | |
| } | |
| c.assignedPartitions = partitions | |
| return kafka.AssignedPartitions{Partitions: partitions} | |
| } | |
🤖 Prompt for AI Agents
In apps/workspace-engine/test/integration/kafka/consumer.go around lines 81 to
99, Poll constructs and returns an AssignedPartitions event but never updates
the ConsumerMock internal assignedPartitions field; modify Poll so that before
returning it stores the constructed partitions into c.assignedPartitions (while
still holding the mutex) so subsequent Assignment() calls see the same
partitions, ensuring you copy or use the same slice instance and maintain the
lock/unlock discipline.
| // Pop removes and returns the first message from the queue | ||
| // Returns nil if timeout is reached | ||
| func (mq *MessageQueue) Pop(timeout time.Duration) (*kafka.Message, error) { | ||
| mq.mu.Lock() | ||
| defer mq.mu.Unlock() | ||
|
|
||
| deadline := time.Now().Add(timeout) | ||
| for len(mq.messages) == 0 { | ||
| remaining := time.Until(deadline) | ||
| if remaining <= 0 { | ||
| // Timeout reached | ||
| return nil, kafka.NewError(kafka.ErrTimedOut, "timed out waiting for message", false) | ||
| } | ||
|
|
||
| // Wait for signal or timeout | ||
| done := make(chan struct{}) | ||
| go func() { | ||
| time.Sleep(remaining) | ||
| close(done) | ||
| }() | ||
|
|
||
| mq.cond.Wait() | ||
|
|
||
| select { | ||
| case <-done: | ||
| // Timeout | ||
| if len(mq.messages) == 0 { | ||
| return nil, kafka.NewError(kafka.ErrTimedOut, "timed out waiting for message", false) | ||
| } | ||
| default: | ||
| // Got signaled, continue loop to check messages | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timeout logic is broken; Pop can block forever.
sync.Cond.Wait doesn’t wake on your timer; without a Push, Pop never returns. Replace with a channel-based queue.
type MessageQueue struct {
- mu sync.Mutex
- messages []*kafka.Message
- cond *sync.Cond
+ ch chan *kafka.Message
}
// NewMessageQueue creates a new message queue
func NewMessageQueue() *MessageQueue {
- mq := &MessageQueue{
- messages: make([]*kafka.Message, 0),
- }
- mq.cond = sync.NewCond(&mq.mu)
- return mq
+ return &MessageQueue{ch: make(chan *kafka.Message, 1024)}
}
// Push adds a message to the queue
func (mq *MessageQueue) Push(msg *kafka.Message) {
- mq.mu.Lock()
- defer mq.mu.Unlock()
- mq.messages = append(mq.messages, msg)
- mq.cond.Signal()
+ mq.ch <- msg
}
// Pop removes and returns the first message from the queue
// Returns nil if timeout is reached
func (mq *MessageQueue) Pop(timeout time.Duration) (*kafka.Message, error) {
- mq.mu.Lock()
- defer mq.mu.Unlock()
-
- deadline := time.Now().Add(timeout)
- for len(mq.messages) == 0 {
- remaining := time.Until(deadline)
- if remaining <= 0 {
- // Timeout reached
- return nil, kafka.NewError(kafka.ErrTimedOut, "timed out waiting for message", false)
- }
-
- // Wait for signal or timeout
- done := make(chan struct{})
- go func() {
- time.Sleep(remaining)
- close(done)
- }()
-
- mq.cond.Wait()
-
- select {
- case <-done:
- // Timeout
- if len(mq.messages) == 0 {
- return nil, kafka.NewError(kafka.ErrTimedOut, "timed out waiting for message", false)
- }
- default:
- // Got signaled, continue loop to check messages
- }
- }
-
- // Get first message
- msg := mq.messages[0]
- mq.messages = mq.messages[1:]
- return msg, nil
+ select {
+ case msg := <-mq.ch:
+ return msg, nil
+ case <-time.After(timeout):
+ return nil, kafka.NewError(kafka.ErrTimedOut, "timed out waiting for message", false)
+ }
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In apps/workspace-engine/test/integration/kafka/queue.go around lines 34–66, the
Pop implementation uses sync.Cond and a manual goroutine timer so sync.Cond.Wait
can block forever if no Push occurs; replace the blocking cond/timer pattern
with a channel-based queue: change MessageQueue to hold a buffered channel for
messages (or create one alongside the existing fields), have Push send into that
channel (non-blocking or with context/timeout as appropriate), and implement Pop
by selecting on the message channel and time.After(timeout) so Pop returns
either a message or a timeout error reliably without relying on sync.Cond
wakeups.
| # Kafka Integration Test Mock | ||
|
|
||
| This package provides a realistic Kafka mock for integration testing of the workspace-engine. It wraps the `confluent-kafka-go` MockCluster to provide a simple, easy-to-use API for testing Kafka-related functionality. | ||
|
|
||
| ## Features | ||
|
|
||
| - **Realistic Kafka Behavior**: Uses the official confluent-kafka-go MockCluster | ||
| - **Multiple Partitions**: Test partition assignment and key-based routing | ||
| - **Offset Management**: Test offset commits, seeks, and resumption | ||
| - **Batch Operations**: Easily produce multiple events at once | ||
| - **Simple API**: Clean, easy-to-use helper functions | ||
| - **Automatic Cleanup**: Proper resource management with `Close()` | ||
|
|
||
| ## Basic Usage | ||
|
|
||
| ### Creating a Mock Cluster | ||
|
|
||
| ```go | ||
| func TestMyFeature(t *testing.T) { | ||
| mock := NewMockKafkaCluster(t) | ||
| defer mock.Close() | ||
|
|
||
| // Your test code here | ||
| } | ||
| ``` | ||
|
|
||
| ### Producing Events | ||
|
|
||
| ```go | ||
| // Single event | ||
| err := mock.ProduceEvent( | ||
| string(handler.ResourceCreate), | ||
| "workspace-1", | ||
| map[string]string{"key": "value"}, | ||
| ) | ||
|
|
||
| // Batch of events | ||
| events := []EventData{ | ||
| { | ||
| EventType: string(handler.ResourceCreate), | ||
| WorkspaceID: "workspace-1", | ||
| Data: map[string]string{"type": "resource1"}, | ||
| }, | ||
| { | ||
| EventType: string(handler.ResourceUpdate), | ||
| WorkspaceID: "workspace-1", | ||
| Data: map[string]string{"type": "resource2"}, | ||
| }, | ||
| } | ||
| err := mock.ProduceBatch(events) | ||
| ``` | ||
|
|
||
| ### Consuming Events | ||
|
|
||
| ```go | ||
| ctx := context.Background() | ||
|
|
||
| // Setup consumer with subscription and partition assignment | ||
| consumer, partitions, err := mock.SetupConsumerWithSubscription(ctx) | ||
| require.NoError(t, err) | ||
| defer consumer.Close() | ||
|
|
||
| // Consume single message | ||
| msg, err := mock.ConsumeMessage(consumer, 5*time.Second) | ||
| require.NoError(t, err) | ||
|
|
||
| // Consume multiple messages | ||
| messages, err := mock.ConsumeMessages(consumer, 3, 10*time.Second) | ||
| require.NoError(t, err) | ||
|
|
||
| // Parse event | ||
| event, err := mock.ParseEvent(msg) | ||
| require.NoError(t, err) | ||
| assert.Equal(t, handler.ResourceCreate, event.EventType) | ||
| ``` | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Docs may not match implementation.
This README promises a confluent-kafka-go MockCluster, but the PR wires an in-memory EventProducer for tests. Please verify the mock exists and examples compile. If not, update the README to reflect the in-memory producer pattern and remove/constrain Confluent-specific examples.
Also applies to: 248-307
---
🏁 Script executed:
```shell
#!/bin/bash
# Find Go files in the kafka test integration directory
find apps/workspace-engine/test/integration/kafka -type f -name "*.go" | head -20
Length of output: 164
🏁 Script executed:
#!/bin/bash
# Search for NewMockKafkaCluster function definition
rg -n "func NewMockKafkaCluster" --type goLength of output: 0
🏁 Script executed:
#!/bin/bash
# Search for imports related to confluent-kafka or kafka in the test directory
rg -n "confluent-kafka|EventProducer" apps/workspace-engine/test/integration/kafka --type go -C 3Length of output: 1555
🏁 Script executed:
#!/bin/bash
# Check the actual functions exported/available in the mock
rg -n "^func.*Mock" apps/workspace-engine/test/integration/kafka --type go | head -20Length of output: 1792
🏁 Script executed:
#!/bin/bash
# Look for any implementation of ProduceBatch, ConsumeMessage, etc
rg -n "ProduceBatch|ConsumeMessage|ParseEvent|SetupConsumerWithSubscription" apps/workspace-engine/test/integration/kafka --type goLength of output: 0
---
🏁 Script executed:
```shell
#!/bin/bash
# Examine the kafka.go file to see ProducerMock implementation
cat apps/workspace-engine/test/integration/kafka/kafka.go
Length of output: 1033
🏁 Script executed:
#!/bin/bash
# Examine the consumer.go file
cat apps/workspace-engine/test/integration/kafka/consumer.goLength of output: 3479
🏁 Script executed:
#!/bin/bash
# Examine the queue.go file
cat apps/workspace-engine/test/integration/kafka/queue.goLength of output: 1556
README documents non-existent API—all examples will fail.
The documented functions do not exist in the codebase:
NewMockKafkaCluster(t)— actually useNewProducerMock(queue)andNewConsumerMock(queue)separately with aMessageQueueProduceEvent(),ProduceBatch()— useProducerMock.Produce(msg, deliveryChan)directlySetupConsumerWithSubscription(),ConsumeMessage(),ConsumeMessages(),ParseEvent()— useConsumerMock.SubscribeTopics(),.ReadMessage(),.Poll()with rawkafka.Messagetypes
Update the README to reflect the actual low-level API:
NewMessageQueue()to initialize the shared queueNewProducerMock(queue)/NewConsumerMock(queue)to create mocks- Direct usage of confluent-kafka-go types (
kafka.Message,kafka.TopicPartition, etc.) and corresponding mock method calls
🤖 Prompt for AI Agents
In apps/workspace-engine/test/integration/kafka/README.md lines 1-76, the README
references high-level helper functions that don't exist (NewMockKafkaCluster,
ProduceEvent, ProduceBatch, SetupConsumerWithSubscription, ConsumeMessage,
ConsumeMessages, ParseEvent); update the document to show the real low-level
API: explain creating a shared MessageQueue via NewMessageQueue(), creating
mocks with NewProducerMock(queue) and NewConsumerMock(queue), and demonstrate
producing/consuming using ProducerMock.Produce(msg, deliveryChan),
ConsumerMock.SubscribeTopics(), ConsumerMock.ReadMessage()/Poll(), and raw
confluent-kafka-go types (kafka.Message, kafka.TopicPartition) with brief
example snippets showing how to build kafka.Message, produce to delivery
channel, and read/parse kafka.Message from the consumer.
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Tests