Skip to content

Conversation

@jsbroks
Copy link
Member

@jsbroks jsbroks commented Oct 25, 2025

Summary by CodeRabbit

Release Notes

  • New Features

    • Added event-driven job creation workflow integrated with release deployments
    • Implemented event producer interface for improved event handling and replay support
    • Enhanced workspace event tracking system
  • Bug Fixes

    • Improved error handling in release execution and job creation flows
  • Tests

    • Expanded test coverage for event handling, release creation, and job triggering scenarios
    • Added comprehensive integration test infrastructure for event processing

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 25, 2025

Walkthrough

This PR introduces an event-driven job creation pattern by adding an EventProducer interface, threading it through workspace initialization and Kafka consumer processing, replacing the old Executor pattern with a new JobCreator, and updating the Store to track workspace IDs. In-memory event infrastructure is added for testing.

Changes

Cohort / File(s) Summary
Event Producer Interface & In-Memory Implementation
apps/workspace-engine/pkg/events/producer.go
Introduces EventProducer interface and InMemoryProducer for event queuing, processing, and testing without live Kafka.
Event Handler Wiring
apps/workspace-engine/pkg/events/handler/handler.go, apps/workspace-engine/pkg/events/events.go
Adds JobCreate event type, threads EventProducer into EventListener, registers job.created event handler, and updates NewEventHandler signatures.
Job Creation Handler
apps/workspace-engine/pkg/events/handler/jobs/jobs.go
Implements HandleJobCreated to process job.created events, cancel outdated jobs, and conditionally dispatch jobs.
Kafka Consumer & Main
apps/workspace-engine/pkg/kafka/kafka.go, apps/workspace-engine/main.go, apps/workspace-engine/pkg/kafka/kafka.go
Updates RunConsumer to accept EventProducer parameter and threads it through event handler initialization.
Store Workspace Binding
apps/workspace-engine/pkg/workspace/store/store.go
Modifies Store.New() to require workspaceID parameter; adds workspaceID field and WorkspaceID() accessor.
Workspace EventProducer Integration
apps/workspace-engine/pkg/workspace/workspace.go, apps/workspace-engine/pkg/server/openapi/utils/utils.go
Updates workspace constructors (New, NewNoFlush, NewAndLoad) and GetWorkspaceAndLoad to accept EventProducer; wires it into ReleaseManager.
Release Manager Event-Driven Pattern
apps/workspace-engine/pkg/workspace/releasemanager/manager.go, apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.go
Replaces Executor with JobCreator pattern; Manager now accepts EventProducer and uses JobCreator to emit job.created events.
Executor Removal
apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go
Removes old Executor type and related ExecuteRelease/CancelOutdatedJobs logic; functionality migrated to JobCreator.
Store Test Updates
apps/workspace-engine/pkg/db/jobs_test.go, apps/workspace-engine/pkg/workspace/store/\*_test.go, apps/workspace-engine/pkg/workspace/jobdispatch/github_test.go, apps/workspace-engine/pkg/ticker/ticker_test.go, apps/workspace-engine/pkg/workspace/releasemanager/\*_test.go
Updates all store.New() calls to pass workspace ID ("test-workspace"); consistent across multiple test files.
Mock Event Producer Test Utilities
apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator_test.go, apps/workspace-engine/pkg/workspace/releasemanager/manager_test.go
Adds MockEventProducer and MockEvent types with recording/inspection methods (GetEvents, GetEventCount, SetError, Reset) for test validation.
Integration Test Kafka Mocks
apps/workspace-engine/test/integration/kafka/kafka.go, apps/workspace-engine/test/integration/kafka/consumer.go, apps/workspace-engine/test/integration/kafka/queue.go, apps/workspace-engine/test/integration/kafka/README.md
Adds ProducerMock, ConsumerMock, MessageQueue, and documentation for lightweight Kafka simulation in integration tests.
Test Workspace In-Memory Event Pipeline
apps/workspace-engine/test/integration/workspace.go
Replaces direct event listener invocation with InMemoryProducer-based event handling; adds Flush() method to wait for event processing.
E2E Test Updates
apps/workspace-engine/test/e2e/engine_deployment_test.go, apps/workspace-engine/test/e2e/engine_kafka_replay_test.go, apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go
Updates constructor calls to pass EventProducer (nil in tests); adds engine.Flush() and assertions post-event-processing.

Sequence Diagram(s)

sequenceDiagram
    participant KafkaConsumer as Kafka<br/>Consumer
    participant EventHandler as Event<br/>Handler
    participant JobCreator as Job<br/>Creator
    participant Store as Store
    participant EventProducer as Event<br/>Producer
    
    KafkaConsumer->>EventHandler: ListenAndRoute(RawEvent)
    activate EventHandler
    EventHandler->>EventHandler: Match event type<br/>(JobCreate)
    EventHandler->>JobCreator: HandleJobCreated(workspace, event)
    activate JobCreator
    JobCreator->>Store: Upsert Release
    JobCreator->>Store: Create Job
    JobCreator->>EventProducer: ProduceEvent(job.created)
    activate EventProducer
    EventProducer->>EventProducer: Queue & Process<br/>message
    deactivate EventProducer
    JobCreator->>JobCreator: Dispatch Job<br/>(if applicable)
    deactivate JobCreator
    deactivate EventHandler
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Key attention areas:
    • EventProducer interface threading through workspace, releasemanager, and event handler layers—verify correct propagation and no missing wiring
    • Store.New() signature change affecting all test files—confirm all call sites updated consistently and workspace ID handling is correct
    • JobCreator vs. removed Executor pattern migration—verify all job creation, cancellation, and dispatch logic is preserved and working
    • In-memory event pipeline (InMemoryProducer, MessageQueue, ConsumerMock)—validate async processing, offset tracking, and test correctness
    • GetWorkspaceAndLoad and related factory method signature changes—ensure nil/non-nil producer handling in all call paths (e.g., API server, Kafka consumer, tests)
    • ReleaseManager.New now requires EventProducer—check for any call sites still passing old-style arguments

Possibly related PRs

Poem

🐰 A Rabbit's Ode to Event-Driven Joy

Through Kafka streams, the events now hop,
JobCreator springs where Executors drop,
In-memory queues for testing so fast,
Producer and workspace now bound at last!
Events flow freely, the pipeline's complete—
A hoppy refactor, oh so neat! 🎉

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 43.51% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title Check ❓ Inconclusive The title "init kafka job dispatch" references real aspects of the changeset—Kafka integration and job dispatch are indeed core components being modified. However, the title is somewhat vague and ambiguous. The term "init" is unclear about what specifically is being initialized or set up. While the changeset does involve Kafka job dispatch, the primary architectural changes center on introducing an EventProducer pattern and refactoring from an executor-based deployment model to an event-driven creator-based approach. The title captures only one aspect of these broader changes and doesn't clearly convey the main point of the refactoring or what developers should expect from these modifications. Consider revising the title to be more descriptive and specific. For example, "Refactor deployment to event-driven job creation via EventProducer" or "Introduce event-driven job dispatch through Kafka integration" would better capture the main architectural changes and help reviewers understand the scope and intent of this pull request more clearly.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch kafka/callback

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

📊 DB Package Test Coverage

pkg/db coverage: 55.6%

View detailed coverage report in artifacts

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 22

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
apps/workspace-engine/pkg/workspace/store/environments_bench_test.go (2)

125-129: Syntax bug: invalid for loop.

Go benchmarks must use for i := 0; i < b.N; i++ { … }. for range b.N won’t compile.

Apply:

-            // Run benchmark
-            for range b.N {
+            // Run benchmark
+            for i := 0; i < b.N; i++ {
                 ctx := context.Background()
                 _, err := recomputeFunc(ctx)
                 if err != nil {
                     b.Fatalf("Recompute failed: %v", err)
                 }
             }

323-333: Syntax bug: testing.B has no Loop().

Replace with the standard loop.

-    // Run benchmark
-    for b.Loop() {
+    // Run benchmark
+    for i := 0; i < b.N; i++ {
         ctx := context.Background()
         _, err := recomputeFunc(ctx)
         if err != nil {
             b.Fatalf("Recompute failed: %v", err)
         }
     }
apps/workspace-engine/pkg/ticker/ticker_test.go (1)

63-68: Add a workspace cleanup function to the package.

The review comment is correct. The global workspaces registry (line 218 of workspace.go) persists across tests, and there is no cleanup API (Delete, Remove, Reset, or Clear). Without cleanup, TestEmitTicks_NoWorkspaces will fail if run after TestEmitTicks_MultipleWorkspaces registers workspaces, causing test flakiness.

Add an exported cleanup function to apps/workspace-engine/pkg/workspace/workspace.go (e.g., Delete(id string) or Clear() to wipe all workspaces), then use it in the test's defer block:

defer func() {
	workspace.Delete("ws-1")
	workspace.Delete("ws-2")
	workspace.Delete("ws-3")
}()

Or, if adding a Clear() function for all workspaces:

defer workspace.Clear()
apps/workspace-engine/test/e2e/engine_deployment_test.go (1)

485-492: Benchmark won’t compile: invalid loops and wrong event type in warmup.

  • for range 100 is invalid Go.
  • b.Loop() doesn’t exist; use b.N.
  • Warmup produces DeploymentCreate with a Resource.

Fix:

   ctx := context.Background()
 
-  for range 100 {
-    engine.PushEvent(ctx, handler.DeploymentCreate, c.NewResource(workspaceID))
-  }
+  for i := 0; i < 100; i++ {
+    engine.PushEvent(ctx, handler.DeploymentCreate, c.NewDeployment(workspaceID))
+  }
 
   b.ResetTimer()
-  for b.Loop() {
-    engine.PushEvent(ctx, handler.DeploymentCreate, c.NewDeployment(workspaceID))
-  }
+  for i := 0; i < b.N; i++ {
+    engine.PushEvent(ctx, handler.DeploymentCreate, c.NewDeployment(workspaceID))
+  }
apps/workspace-engine/pkg/kafka/kafka.go (1)

130-137: Workspace ID logging should be string, not []byte.

You log msg.Key directly (a []byte). Cast to string for readability and to avoid odd logs.

Apply:

-		if ws == nil {
-			log.Error("Workspace not found", "workspaceID", msg.Key)
-			continue
-		}
+		if ws == nil {
+			log.Error("Workspace not found", "workspaceID", string(msg.Key))
+			continue
+		}
apps/workspace-engine/pkg/events/handler/handler.go (2)

127-134: Avoid attaching full message payload as tracing attribute.

attribute "event.data" with full msg.Value can be large and may contain sensitive data. Prefer hashing or size-limited snippet.

-attribute.String("event.data", string(msg.Value)),
+attribute.String("event.data.sha256", fmt.Sprintf("%x", sha256.Sum256(msg.Value))),

(Import crypto/sha256)


146-151: Use Int64 for timestamp attribute.

You’re encoding Unix seconds into a Float64 attribute; use Int64 to avoid precision loss.

-attribute.Float64("event.timestamp", float64(msg.Timestamp.Unix())),
+attribute.Int64("event.timestamp", msg.Timestamp.Unix()),
🧹 Nitpick comments (21)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.go (2)

15-19: Avoid duplicating EventProducer and JobCreatedEventData types.

These also exist elsewhere (events and handler packages). Centralize them (e.g., in pkg/events) and import here to prevent divergence and circular deps later.

Would you like a follow-up PR to introduce pkg/events/types.go and wire usages?

Also applies to: 36-40, 71-81


63-69: Telemetry nit: use numeric attribute for counts.

Prefer attribute.Int for variables.count.

-    span.SetAttributes(
-        attribute.Bool("job.created", true),
-        attribute.String("job.id", newJob.Id),
-        attribute.String("job.status", string(newJob.Status)),
-    )
+    span.SetAttributes(
+        attribute.Bool("job.created", true),
+        attribute.String("job.id", newJob.Id),
+        attribute.String("job.status", string(newJob.Status)),
+    )
...
-    _, span := tracer.Start(ctx, "BuildRelease",
+    _, span := tracer.Start(ctx, "BuildRelease",
         trace.WithAttributes(
             attribute.String("deployment.id", releaseTarget.DeploymentId),
             attribute.String("environment.id", releaseTarget.EnvironmentId),
             attribute.String("resource.id", releaseTarget.ResourceId),
             attribute.String("version.id", version.Id),
             attribute.String("version.tag", version.Tag),
-            attribute.String("variables.count", fmt.Sprintf("%d", len(variables))),
+            attribute.Int("variables.count", len(variables)),
         ))

Also applies to: 95-104

apps/workspace-engine/test/e2e/engine_deployment_test.go (2)

149-163: Harden against nil deref and missing entity checks.

You dereference *d1.JobAgentId/*d2.JobAgentId and ignore the exists flag from Get. If these are nil/missing, tests panic. Prefer explicit assertions:

- d1, _ := engine.Workspace().Deployments().Get(deploymentID1)
- if *d1.JobAgentId != jobAgentID1 {
+ d1, ok := engine.Workspace().Deployments().Get(deploymentID1)
+ require.True(t, ok, "deployment 1 not found")
+ require.NotNil(t, d1.JobAgentId, "deployment 1 job agent should be set")
+ if *d1.JobAgentId != jobAgentID1 {
   ...
- d2, _ := engine.Workspace().Deployments().Get(deploymentID2)
- if *d2.JobAgentId != jobAgentID2 {
+ d2, ok := engine.Workspace().Deployments().Get(deploymentID2)
+ require.True(t, ok, "deployment 2 not found")
+ require.NotNil(t, d2.JobAgentId, "deployment 2 job agent should be set")
+ if *d2.JobAgentId != jobAgentID2 {
   ...
- d3, _ := engine.Workspace().Deployments().Get(deploymentID3)
- if d3.JobAgentId != nil {
+ d3, ok := engine.Workspace().Deployments().Get(deploymentID3)
+ require.True(t, ok, "deployment 3 not found")
+ if d3.JobAgentId != nil {

43-52: Assert entity existence to avoid nil access; fix error message.

Use the boolean from Get and avoid the ambiguous “same id” message:

-engineD1, _ := engine.Workspace().Deployments().Get(deploymentID1)
-engineD2, _ := engine.Workspace().Deployments().Get(deploymentID2)
+engineD1, ok := engine.Workspace().Deployments().Get(deploymentID1)
+require.True(t, ok, "deployment %s not found", deploymentID1)
+engineD2, ok := engine.Workspace().Deployments().Get(deploymentID2)
+require.True(t, ok, "deployment %s not found", deploymentID2)

-if engineD1.Id != deploymentID1 {
-  t.Fatalf("deployments have the same id")
+if engineD1.Id != deploymentID1 {
+  t.Fatalf("deployment 1 id mismatch: got %s, want %s", engineD1.Id, deploymentID1)
 }
 
-if engineD2.Id != deploymentID2 {
-  t.Fatalf("deployments have the same id")
+if engineD2.Id != deploymentID2 {
+  t.Fatalf("deployment 2 id mismatch: got %s, want %s", engineD2.Id, deploymentID2)
 }
apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager_test.go (1)

22-31: Align Store workspace ID with entities to avoid hidden mismatches.

You instantiate store.New("test-workspace") but create entities under a different workspaceID. If any logic starts using Store.WorkspaceID(), tests may produce inconsistent results.

Use the same ID:

-func setupTestStore(t *testing.T) (*store.Store, string, string, string, string) {
-  ctx := context.Background()
-  st := store.New("test-workspace")
-
-  workspaceID := uuid.New().String()
+func setupTestStore(t *testing.T) (*store.Store, string, string, string, string) {
+  ctx := context.Background()
+  workspaceID := uuid.New().String()
+  st := store.New(workspaceID)
apps/workspace-engine/pkg/server/openapi/utils/utils.go (1)

21-23: Prefer DI for EventProducer.

Thread a server-scoped producer into this helper rather than passing nil to enable write flows without refactoring later.

apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go (1)

737-740: Handle time zone load errors to avoid panics/skewed assertions.

time.LoadLocation errors are ignored. On minimal images, these can fail.

-estLoc, _ := time.LoadLocation("America/New_York")
-pstLoc, _ := time.LoadLocation("America/Los_Angeles")
+estLoc, err := time.LoadLocation("America/New_York")
+require.NoError(t, err)
+pstLoc, err := time.LoadLocation("America/Los_Angeles")
+require.NoError(t, err)

Optionally fall back to time.UTC if strictness isn’t required.

apps/workspace-engine/pkg/workspace/store/store.go (1)

72-74: WorkspaceID isn’t serialized; ensure it’s set after decode.

GobEncode/Decode only handle the repo. If a Store is decoded from bytes without being constructed via New(id), workspaceID remains empty.

  • Verify that all decode paths create the Store with New(id) (as in workspace-level encode/decode), or
  • Persist and restore workspaceID (mind gob compatibility), or
  • Set workspaceID post-decode from the owning workspace.

If you want to persist it now without breaking existing blobs, consider keeping the current format and explicitly setting s.workspaceID from the caller after GobDecode.

Also applies to: 80-131

apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)

127-134: Direct job mutation may bypass store invariants

Cancelling jobs by directly setting fields (Status/UpdatedAt) on pointers returned from the store risks breaking indices/derived views if the store maintains secondary state. Prefer a store method (e.g., Jobs.Cancel(job)) or an Upsert that also updates indexes and emits change notifications.

apps/workspace-engine/test/integration/workspace.go (2)

49-58: Eliminate race on eventListener init in handler closure

processEvents starts immediately; the first event could hit memoryEventHandler before eventListener is assigned. You guard with an error, but that drops the event. Gate handler execution on a ready barrier.

@@
- var eventListener *handler.EventListener
- memoryEventHandler := func(ctx context.Context, msg *kafka.Message, offsetTracker handler.OffsetTracker) error {
-   if eventListener == nil {
-     return fmt.Errorf("event listener not initialized")
-   }
-   _, err := eventListener.ListenAndRoute(ctx, msg, offsetTracker)
-   return err
- }
+ var eventListener *handler.EventListener
+ ready := make(chan struct{})
+ memoryEventHandler := func(ctx context.Context, msg *kafka.Message, offsetTracker handler.OffsetTracker) error {
+   <-ready
+   _, err := eventListener.ListenAndRoute(ctx, msg, offsetTracker)
+   return err
+ }
@@
- eventProducer := events.NewInMemoryProducer(ctx, memoryEventHandler)
- eventListener = events.NewEventHandler(eventProducer)
+ eventProducer := events.NewInMemoryProducer(ctx, memoryEventHandler)
+ eventListener = events.NewEventHandler(eventProducer)
+ close(ready)

162-174: Auto-flush on test cleanup to reduce flakiness

Add a Cleanup to flush pending events/materializations at the end of each test.

@@ func NewTestWorkspace(
   }
 
   return tw
 }
+
+// In NewTestWorkspace, before 'return tw':
+ t.Cleanup(func() {
+   tw.Flush()
+ })
apps/workspace-engine/test/integration/kafka/kafka.go (1)

23-36: Default topic fallback for robustness

If msg.TopicPartition.Topic is nil, set it to the mock’s default topic to match typical producer behavior and avoid nil derefs in consumers.

 func (p *ProducerMock) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
-  p.queue.Push(msg)
+  if msg.TopicPartition.Topic == nil {
+    msg.TopicPartition.Topic = &p.topic
+  }
+  p.queue.Push(msg)
@@
   if deliveryChan != nil {
     go func(m *kafka.Message, ch chan kafka.Event) {
       cp := *m
       cp.TopicPartition.Error = nil
       ch <- &cp
     }(msg, deliveryChan)
   }
   return nil
 }
apps/workspace-engine/pkg/workspace/workspace.go (1)

233-245: Optionally upgrade existing workspace with a newly provided producer

If a workspace already exists without a producer and a later call supplies one, consider wiring it in and rebuilding the ReleaseManager to enable job creation without restart.

 func GetWorkspaceAndLoad(id string, eventProducer EventProducer) (*Workspace, error) {
   workspace, _ := workspaces.Get(id)
   if workspace == nil {
     workspace, err := NewAndLoad(context.Background(), id, eventProducer)
@@
     return workspace, err
   }
- 
+  if eventProducer != nil && workspace.eventProducer == nil {
+    workspace.eventProducer = eventProducer
+    workspace.releasemanager = releasemanager.New(workspace.store, eventProducer)
+  }
   return workspace, nil
 }
@@
 func GetNoFlushWorkspace(id string, eventProducer EventProducer) *Workspace {
   workspace, _ := workspaces.Get(id)
   if workspace == nil {
     workspace = NewNoFlush(id, eventProducer)
     workspaces.Set(id, workspace)
-
-    return workspace
-  } 
-
-  return workspace
+    return workspace
+  }
+  if eventProducer != nil && workspace.eventProducer == nil {
+    workspace.eventProducer = eventProducer
+    workspace.releasemanager = releasemanager.New(workspace.store, eventProducer)
+  }
+  return workspace
 }

Also applies to: 247-257

apps/workspace-engine/pkg/workspace/releasemanager/manager_test.go (2)

127-183: Workspace ID mismatch in fixtures (nit).

You initialize store with "test-workspace" but create entities with a different random workspaceID. Not harmful for these tests, but confusing.

Either use the store’s workspaceID for entities or add a brief comment explaining the divergence.


589-622: Concurrent access test: reduce flakiness.

Reliance on time.Sleep can be flaky. Prefer waiting on a condition (event count increases) with a timeout loop.

apps/workspace-engine/test/integration/kafka/consumer.go (1)

7-9: Drop testify/mock dependency (unused).

The embedded *mock.Mock isn’t used. Removing it simplifies the mock and dependencies.

-	"github.com/stretchr/testify/mock"
@@
-type ConsumerMock struct {
-	*mock.Mock
+type ConsumerMock struct {

Also applies to: 14-15

apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator_test.go (2)

20-25: Unify and harden MockEventProducer used in tests.

There’s a second MockEventProducer here differing from the one in manager_test.go (no mutex, fewer features). This duplication invites drift.

  • Extract a single test helper (e.g., apps/workspace-engine/test/helpers/mock_event_producer.go) used by both packages.
  • Add a mutex to make it safe if tests become concurrent.
    Example minimal hardening:
 type MockEventProducer struct {
-	events      []MockEvent
+	mu          sync.Mutex
+	events      []MockEvent
 	shouldError bool
 	errorMsg    string
 }
@@
 func (m *MockEventProducer) ProduceEvent(eventType string, workspaceID string, data any) error {
+	m.mu.Lock()
+	defer m.mu.Unlock()
 	if m.shouldError { return fmt.Errorf("%s", m.errorMsg) }
 	m.events = append(m.events, MockEvent{ ... })
 	return nil
 }
@@
 func (m *MockEventProducer) GetEvents() []MockEvent {
-	return append([]MockEvent{}, m.events...)
+	m.mu.Lock(); defer m.mu.Unlock()
+	return append([]MockEvent{}, m.events...)
 }

Also applies to: 35-41, 58-79


532-545: Strengthen variable cloning assertion.

Current test only checks presence. Verify immutability by comparing serialized values before/after original mutation.

-assert.NotNil(t, release.Variables["key1"])
+before := release.Variables["key1"]
+_ = originalValue.FromStringValue("modified")
+after := release.Variables["key1"]
+assert.Equal(t, before, after, "release variable should not change when original is mutated")
apps/workspace-engine/pkg/events/producer.go (3)

13-17: Stronger type-safety for event type.

Prefer handler.EventType over string in the EventProducer API to catch invalid event names at compile time and reduce casting at call sites. Consider:

  • type EventProducer interface { ProduceEvent(eventType handler.EventType, workspaceID string, data any) error }

35-50: Channel capacity should be configurable.

Buffer 1000 may be insufficient when handlers emit cascading events; risk of deadlock if the buffer fills while processEvents is executing a handler. Expose capacity via constructor arg or option (e.g., WithQueueSize(n)).

Do cascades in your tests ever exceed 1000 messages?


22-31: Cascading events risk filling the buffer (potential deadlock).

Handler runs on the same goroutine that drains the queue; if handler emits >cap(queue) additional events before returning, ProduceEvent will block and the system can deadlock. Options:

  • Increase capacity and make it configurable.
  • Provide a non-blocking TryProduceEvent for tests that can tolerate backpressure.
  • Use a small worker pool to decouple handler execution from queue draining (at the cost of strict ordering).

Do any handlers emit long cascades (O(1k)+) in your tests?

Also applies to: 41-42, 52-70

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 60df126 and dfd4246.

📒 Files selected for processing (32)
  • apps/workspace-engine/main.go (1 hunks)
  • apps/workspace-engine/pkg/db/jobs_test.go (15 hunks)
  • apps/workspace-engine/pkg/events/events.go (2 hunks)
  • apps/workspace-engine/pkg/events/handler/handler.go (3 hunks)
  • apps/workspace-engine/pkg/events/handler/jobs/jobs.go (1 hunks)
  • apps/workspace-engine/pkg/events/producer.go (1 hunks)
  • apps/workspace-engine/pkg/kafka/kafka.go (4 hunks)
  • apps/workspace-engine/pkg/server/openapi/utils/utils.go (1 hunks)
  • apps/workspace-engine/pkg/ticker/ticker_test.go (1 hunks)
  • apps/workspace-engine/pkg/workspace/jobdispatch/github_test.go (4 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.go (1 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator_test.go (1 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go (0 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/manager.go (4 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/manager_test.go (1 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/approval/approval_test.go (3 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environmentprogression_test.go (1 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/skipdeployed/skipdeployed_test.go (1 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/targets/taint_test.go (13 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager_test.go (7 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/variables/variablemanager_test.go (2 hunks)
  • apps/workspace-engine/pkg/workspace/store/environments_bench_test.go (3 hunks)
  • apps/workspace-engine/pkg/workspace/store/store.go (3 hunks)
  • apps/workspace-engine/pkg/workspace/workspace.go (5 hunks)
  • apps/workspace-engine/test/e2e/engine_deployment_test.go (11 hunks)
  • apps/workspace-engine/test/e2e/engine_kafka_replay_test.go (11 hunks)
  • apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go (10 hunks)
  • apps/workspace-engine/test/integration/kafka/README.md (1 hunks)
  • apps/workspace-engine/test/integration/kafka/consumer.go (1 hunks)
  • apps/workspace-engine/test/integration/kafka/kafka.go (1 hunks)
  • apps/workspace-engine/test/integration/kafka/queue.go (1 hunks)
  • apps/workspace-engine/test/integration/workspace.go (4 hunks)
💤 Files with no reviewable changes (1)
  • apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go
🧰 Additional context used
📓 Path-based instructions (3)
apps/workspace-engine/**/*.go

📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)

apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods

Files:

  • apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager_test.go
  • apps/workspace-engine/pkg/workspace/jobdispatch/github_test.go
  • apps/workspace-engine/pkg/ticker/ticker_test.go
  • apps/workspace-engine/pkg/events/handler/jobs/jobs.go
  • apps/workspace-engine/test/e2e/engine_deployment_test.go
  • apps/workspace-engine/pkg/db/jobs_test.go
  • apps/workspace-engine/pkg/workspace/store/environments_bench_test.go
  • apps/workspace-engine/main.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/skipdeployed/skipdeployed_test.go
  • apps/workspace-engine/pkg/workspace/releasemanager/targets/taint_test.go
  • apps/workspace-engine/test/integration/kafka/queue.go
  • apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go
  • apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/approval/approval_test.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environmentprogression_test.go
  • apps/workspace-engine/pkg/events/events.go
  • apps/workspace-engine/pkg/workspace/workspace.go
  • apps/workspace-engine/pkg/workspace/store/store.go
  • apps/workspace-engine/pkg/workspace/releasemanager/variables/variablemanager_test.go
  • apps/workspace-engine/pkg/kafka/kafka.go
  • apps/workspace-engine/pkg/workspace/releasemanager/manager.go
  • apps/workspace-engine/test/integration/kafka/consumer.go
  • apps/workspace-engine/pkg/server/openapi/utils/utils.go
  • apps/workspace-engine/pkg/events/handler/handler.go
  • apps/workspace-engine/test/integration/workspace.go
  • apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator_test.go
  • apps/workspace-engine/test/integration/kafka/kafka.go
  • apps/workspace-engine/pkg/workspace/releasemanager/manager_test.go
  • apps/workspace-engine/test/e2e/engine_kafka_replay_test.go
  • apps/workspace-engine/pkg/events/producer.go
apps/workspace-engine/**/*_test.go

📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)

Follow the existing test structure used in *_test.go files

Files:

  • apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager_test.go
  • apps/workspace-engine/pkg/workspace/jobdispatch/github_test.go
  • apps/workspace-engine/pkg/ticker/ticker_test.go
  • apps/workspace-engine/test/e2e/engine_deployment_test.go
  • apps/workspace-engine/pkg/db/jobs_test.go
  • apps/workspace-engine/pkg/workspace/store/environments_bench_test.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/skipdeployed/skipdeployed_test.go
  • apps/workspace-engine/pkg/workspace/releasemanager/targets/taint_test.go
  • apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/approval/approval_test.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environmentprogression_test.go
  • apps/workspace-engine/pkg/workspace/releasemanager/variables/variablemanager_test.go
  • apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator_test.go
  • apps/workspace-engine/pkg/workspace/releasemanager/manager_test.go
  • apps/workspace-engine/test/e2e/engine_kafka_replay_test.go
**/*.{js,jsx,ts,tsx,json,md,yml,yaml}

📄 CodeRabbit inference engine (CLAUDE.md)

Formatting: Prettier is used with @ctrlplane/prettier-config

Files:

  • apps/workspace-engine/test/integration/kafka/README.md
🧠 Learnings (1)
📚 Learning: 2025-08-12T18:13:54.630Z
Learnt from: CR
PR: ctrlplanedev/ctrlplane#0
File: apps/workspace-engine/CLAUDE.md:0-0
Timestamp: 2025-08-12T18:13:54.630Z
Learning: Applies to apps/workspace-engine/**/*_test.go : Follow the existing test structure used in *_test.go files

Applied to files:

  • apps/workspace-engine/pkg/ticker/ticker_test.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/approval/approval_test.go
🧬 Code graph analysis (29)
apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • New (13-37)
apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager.go (1)
  • New (17-22)
apps/workspace-engine/pkg/workspace/jobdispatch/github_test.go (3)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
  • New (49-68)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • New (13-37)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • New (26-39)
apps/workspace-engine/pkg/ticker/ticker_test.go (2)
apps/workspace-engine/pkg/workspace/workspace.go (2)
  • Set (221-223)
  • New (26-39)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
  • New (49-68)
apps/workspace-engine/pkg/events/handler/jobs/jobs.go (10)
apps/workspace-engine/pkg/oapi/oapi.gen.go (12)
  • Job (230-242)
  • ReleaseId (55-55)
  • Id (51-51)
  • Status (57-57)
  • InvalidJobAgent (40-40)
  • InvalidIntegration (39-39)
  • UpdatedAt (58-58)
  • Release (397-403)
  • ReleaseTarget (406-410)
  • Pending (41-41)
  • Cancelled (35-35)
  • JobAgentId (53-53)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • Workspace (70-77)
apps/workspace-engine/pkg/events/handler/handler.go (1)
  • RawEvent (86-91)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • Store (39-62)
apps/workspace-engine/pkg/workspace/store/releases.go (1)
  • Releases (16-18)
apps/workspace-engine/pkg/workspace/store/jobs.go (1)
  • Jobs (16-18)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/jobs/dispatcher.go (1)
  • ErrUnsupportedJobAgent (25-25)
apps/workspace-engine/pkg/workspace/store/job_agents.go (1)
  • JobAgents (16-18)
apps/workspace-engine/pkg/workspace/jobdispatch/types.go (1)
  • JobAgentTypeGithub (6-6)
apps/workspace-engine/pkg/workspace/jobdispatch/github.go (1)
  • NewGithubDispatcher (57-62)
apps/workspace-engine/test/e2e/engine_deployment_test.go (2)
apps/workspace-engine/test/integration/opts.go (7)
  • DeploymentCelResourceSelector (322-328)
  • WithDeploymentVersion (361-375)
  • DeploymentVersionTag (385-389)
  • WithDeployment (259-277)
  • WithEnvironment (279-293)
  • EnvironmentCelResourceSelector (429-435)
  • WithResource (107-132)
apps/workspace-engine/pkg/workspace/store/release_targets.go (1)
  • ReleaseTargets (27-31)
apps/workspace-engine/pkg/db/jobs_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • New (13-37)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • New (26-39)
apps/workspace-engine/pkg/workspace/store/environments_bench_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • New (13-37)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • New (26-39)
apps/workspace-engine/main.go (1)
apps/workspace-engine/pkg/kafka/kafka.go (1)
  • RunConsumer (76-213)
apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/skipdeployed/skipdeployed_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • New (13-37)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • New (26-39)
apps/workspace-engine/pkg/workspace/releasemanager/targets/taint_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • New (13-37)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • New (26-39)
apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go (3)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
  • New (49-68)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • New (13-37)
apps/workspace-engine/pkg/workspace/workspace.go (2)
  • New (26-39)
  • NewNoFlush (56-68)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.go (5)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • Store (39-62)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/jobs/factory.go (2)
  • Factory (20-22)
  • NewFactory (25-29)
apps/workspace-engine/pkg/events/producer.go (1)
  • EventProducer (15-17)
apps/workspace-engine/pkg/oapi/oapi.gen.go (3)
  • Release (397-403)
  • LiteralValue (290-292)
  • CreatedAt (49-49)
apps/workspace-engine/pkg/events/handler/jobs/jobs.go (1)
  • JobCreatedEventData (21-23)
apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/approval/approval_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • New (13-37)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • New (26-39)
apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environmentprogression_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • New (13-37)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • New (26-39)
apps/workspace-engine/pkg/events/events.go (2)
apps/workspace-engine/pkg/events/handler/handler.go (4)
  • JobCreate (60-60)
  • EventProducer (106-108)
  • EventListener (100-103)
  • NewEventListener (111-117)
apps/workspace-engine/pkg/events/handler/jobs/jobs.go (1)
  • HandleJobCreated (31-76)
apps/workspace-engine/pkg/workspace/workspace.go (3)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (2)
  • New (49-68)
  • EventProducer (25-27)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • New (13-37)
apps/workspace-engine/pkg/db/changeset.go (1)
  • NewChangesetConsumer (165-170)
apps/workspace-engine/pkg/workspace/store/store.go (3)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
  • New (49-68)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • New (26-39)
apps/workspace-engine/pkg/workspace/store/repository/repo.go (2)
  • New (60-78)
  • Repository (80-101)
apps/workspace-engine/pkg/workspace/releasemanager/variables/variablemanager_test.go (2)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • New (13-37)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • New (26-39)
apps/workspace-engine/pkg/kafka/kafka.go (3)
apps/workspace-engine/pkg/events/handler/handler.go (1)
  • EventProducer (106-108)
apps/workspace-engine/pkg/workspace/workspace.go (2)
  • EventProducer (19-21)
  • GetWorkspaceAndLoad (233-245)
apps/workspace-engine/pkg/events/events.go (1)
  • NewEventHandler (85-87)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (2)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.go (3)
  • JobCreator (21-25)
  • EventProducer (16-18)
  • NewJobCreator (28-34)
apps/workspace-engine/pkg/workspace/workspace.go (2)
  • New (26-39)
  • EventProducer (19-21)
apps/workspace-engine/test/integration/kafka/consumer.go (2)
apps/workspace-engine/test/integration/kafka/queue.go (1)
  • MessageQueue (11-15)
apps/workspace-engine/pkg/kafka/kafka.go (1)
  • Topic (23-23)
apps/workspace-engine/pkg/server/openapi/utils/utils.go (1)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • GetWorkspaceAndLoad (233-245)
apps/workspace-engine/pkg/events/handler/handler.go (1)
apps/workspace-engine/pkg/workspace/workspace.go (2)
  • EventProducer (19-21)
  • GetWorkspaceAndLoad (233-245)
apps/workspace-engine/test/integration/workspace.go (6)
apps/workspace-engine/pkg/events/handler/handler.go (3)
  • EventProducer (106-108)
  • EventListener (100-103)
  • OffsetTracker (119-123)
apps/workspace-engine/pkg/events/producer.go (3)
  • EventProducer (15-17)
  • NewInMemoryProducer (37-50)
  • InMemoryProducer (25-31)
apps/workspace-engine/pkg/workspace/workspace.go (2)
  • EventProducer (19-21)
  • GetNoFlushWorkspace (247-257)
apps/workspace-engine/pkg/events/events.go (1)
  • NewEventHandler (85-87)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • Store (39-62)
apps/workspace-engine/pkg/workspace/store/release_targets.go (1)
  • ReleaseTargets (27-31)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator_test.go (3)
apps/workspace-engine/pkg/workspace/releasemanager/manager_test.go (3)
  • MockEvent (33-38)
  • NewMockEventProducer (41-45)
  • MockEventProducer (24-30)
apps/workspace-engine/pkg/workspace/store/store.go (2)
  • Store (39-62)
  • New (13-37)
apps/workspace-engine/pkg/oapi/oapi.gen.go (3)
  • Id (51-51)
  • CreatedAt (49-49)
  • LiteralValue (290-292)
apps/workspace-engine/test/integration/kafka/kafka.go (1)
apps/workspace-engine/test/integration/kafka/queue.go (1)
  • MessageQueue (11-15)
apps/workspace-engine/pkg/workspace/releasemanager/manager_test.go (4)
apps/workspace-engine/pkg/workspace/store/store.go (2)
  • Store (39-62)
  • New (13-37)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
  • New (49-68)
apps/workspace-engine/pkg/changeset/changeset.go (3)
  • NewChangeSet (32-36)
  • ChangeTypeUpdate (12-12)
  • ChangeTypeCreate (11-11)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/creator.go (1)
  • JobCreatedEventData (37-39)
apps/workspace-engine/test/e2e/engine_kafka_replay_test.go (3)
apps/workspace-engine/pkg/workspace/workspace.go (2)
  • GetWorkspaceAndLoad (233-245)
  • New (26-39)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)
  • New (49-68)
apps/workspace-engine/pkg/kafka/kafka.go (1)
  • RunConsumer (76-213)
apps/workspace-engine/pkg/events/producer.go (2)
apps/workspace-engine/pkg/events/handler/handler.go (3)
  • OffsetTracker (119-123)
  • RawEvent (86-91)
  • EventType (21-21)
apps/workspace-engine/pkg/kafka/kafka.go (1)
  • Topic (23-23)
🔇 Additional comments (16)
apps/workspace-engine/pkg/workspace/store/environments_bench_test.go (1)

68-69: Constructor signature update looks good.

Passing workspaceID into store.New aligns with the new API. No issues.

Also applies to: 169-171, 238-239

apps/workspace-engine/pkg/ticker/ticker_test.go (1)

58-61: Workspace constructor update OK.

Using workspace.New("ws-*", nil) matches the new signature.

apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environmentprogression_test.go (1)

13-13: LGTM! Store initialization updated correctly.

The test now correctly passes a workspace ID to store.New(), aligning with the updated constructor signature that requires a workspace identifier.

apps/workspace-engine/pkg/workspace/jobdispatch/github_test.go (1)

40-40: LGTM! Test store initialization updated consistently.

All test functions correctly initialize the store with the workspace ID, consistent with the updated store.New(wsId string) constructor signature.

Also applies to: 95-95, 135-135, 403-403

apps/workspace-engine/pkg/workspace/releasemanager/variables/variablemanager_test.go (1)

14-14: LGTM! Test initialization updated correctly.

Both the helper function and test properly initialize the store with the required workspace ID parameter.

Also applies to: 390-390

apps/workspace-engine/pkg/workspace/releasemanager/targets/taint_test.go (1)

144-144: LGTM! All test store initializations updated correctly.

All test functions across the file consistently initialize stores with the workspace ID, properly adapting to the updated store.New() API.

Also applies to: 177-177, 210-210, 244-244, 277-277, 310-310, 344-344, 385-385, 412-412, 455-455, 480-480, 516-516, 534-534

apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/approval/approval_test.go (1)

12-12: LGTM! Store initialization updated correctly.

The helper function and test cases properly initialize stores with the workspace ID, aligning with the updated constructor signature.

Also applies to: 192-192, 247-247

apps/workspace-engine/pkg/db/jobs_test.go (1)

240-240: LGTM! Test store initialization properly scoped to workspaces.

All test store initializations correctly pass the workspace ID, with multi-workspace tests appropriately using distinct workspace IDs (workspaceID1, workspaceID2) to ensure proper isolation.

Also applies to: 311-311, 395-395, 498-498, 580-580, 659-659, 725-725, 810-810, 820-820, 946-946, 1109-1109, 1186-1186, 1245-1245, 1350-1350, 1447-1447

apps/workspace-engine/main.go (1)

167-167: LGTM! Producer correctly threaded through consumer.

The Kafka producer is now passed to RunConsumer, enabling the event-driven job creation pattern introduced in this PR. The producer serves dual purposes:

  1. Producing periodic ticker events (lines 149, 157)
  2. Enabling the consumer to produce new events (e.g., job.created) during event processing

This design correctly implements the event-driven architecture described in the PR objectives.

apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/skipdeployed/skipdeployed_test.go (1)

13-13: LGTM! Store initialization updated correctly.

The helper function properly initializes the store with a workspace ID, consistent with the updated store.New(wsId string) API.

apps/workspace-engine/pkg/workspace/releasemanager/targets/targets_manager_test.go (1)

231-240: Empty-store path OK.

Using store.New("test-workspace") in the empty-case test is fine since no entities are created.

apps/workspace-engine/pkg/events/events.go (1)

60-62: Good addition: JobCreate handler registered.

apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)

252-257: Intentional design: nil producer limited to read-only paths; optional sentinel error for clarity

The nil eventProducer pattern is intentional and limited in scope: tests use nil for replay/recovery scenarios, and the server code explicitly documents that read-only operations pass nil as a temporary measure. The hard error at lines 252–257 acts as a safety guard and will only trigger if release operations are invoked on nil-producer workspaces, which is not the case in current code paths.

The suggested sentinel error (e.g., ErrJobCreationDisabled) remains a reasonable optional refactor for distinguishing configuration failures from operational failures in upstream handling, but is not required for correctness given the read-only constraint.

apps/workspace-engine/test/e2e/engine_kafka_replay_test.go (1)

48-49: Nil EventProducer disables job dispatch; ensure handlers never deref it

These tests pass nil for EventProducer, which is fine for replay/state assertions but will disable job creation and may panic if any handler unconditionally calls ProduceEvent. Verify handlers guard against a nil producer or consider using a no-op producer for tests that don’t care about emissions.

Quick scan helper:

Also applies to: 91-92, 285-286, 305-306, 310-311, 314-315, 423-424, 495-496, 531-532, 589-590, 647-648, 696-697

apps/workspace-engine/pkg/events/handler/handler.go (1)

101-108: The EventProducer duplication is necessary and intentional—consolidation would create a circular import.

The events/producer.go package already imports pkg/events/handler to use handler.OffsetTracker, handler.RawEvent, and handler.EventType. If handler.go were to import events.EventProducer (as suggested), this would create a circular dependency: events → handler → events. The existing duplicate interface and its comment correctly document this architectural constraint. The code is correct as-is.

Likely an incorrect or invalid review comment.

apps/workspace-engine/pkg/kafka/kafka.go (1)

144-145: Review comment contains incorrect type identification; the code functions correctly via Go's structural typing.

The review comment claims that events.NewEventHandler receives handler.EventProducer, but the actual flow passes kafka.EventProducer from main.go. Due to Go's structural typing, kafka.EventProducer (which includes ProduceEvent, Flush, and Close methods) satisfies both events.EventProducer and handler.EventProducer interfaces (both requiring only ProduceEvent). This works without issues.

While the underlying concern about multiple EventProducer interface definitions across packages is valid for long-term maintainability, the specific claim about a type mismatch between packages is incorrect—the code compiles and functions as intended.

Likely an incorrect or invalid review comment.

Comment on lines +85 to 87
func NewEventHandler(eventProducer EventProducer) *handler.EventListener {
return handler.NewEventListener(handlers, eventProducer)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Compile error: wrong type in signature.

EventProducer is defined in the handler package. Qualify the type.

-func NewEventHandler(eventProducer EventProducer) *handler.EventListener {
-  return handler.NewEventListener(handlers, eventProducer)
+func NewEventHandler(eventProducer handler.EventProducer) *handler.EventListener {
+  return handler.NewEventListener(handlers, eventProducer)
 }
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/events.go around lines 85 to 87, the
function signature uses an unqualified EventProducer type causing a compile
error; change the parameter type to handler.EventProducer (i.e., func
NewEventHandler(eventProducer handler.EventProducer) *handler.EventListener) so
it references the type from the handler package, and ensure the handler import
remains or is added if missing.

Comment on lines +167 to 170
ws, err := workspace.GetWorkspaceAndLoad(rawEvent.WorkspaceID, el.eventProducer)
if ws == nil {
return nil, fmt.Errorf("workspace not found: %s: %w", rawEvent.WorkspaceID, err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Potential type mismatch risk passing producer into workspace loader.

workspace.GetWorkspaceAndLoad requires workspace.EventProducer; el.eventProducer here is handler.EventProducer. Method sets match, but duplication increases risk. Same consolidation recommendation applies.

🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/handler.go around lines 167 to 170,
the call to workspace.GetWorkspaceAndLoad passes el.eventProducer which is typed
as handler.EventProducer while the function expects workspace.EventProducer,
creating a risky implicit match; fix by aligning types: either change the
handler struct field to use workspace.EventProducer (preferred) or implement an
explicit adapter that converts handler.EventProducer to workspace.EventProducer
and pass that adapter; update imports and constructor/initialization sites to
use the consolidated workspace.EventProducer type so the compiler enforces
compatibility.

Comment on lines +36 to +47
var data JobCreatedEventData
if err := json.Unmarshal(event.Data, &data); err != nil {
return fmt.Errorf("failed to unmarshal job.created event: %w", err)
}

job := data.Job

// Get the release for this job to cancel outdated jobs
release, exists := ws.Store().Releases.Get(job.ReleaseId)
if !exists {
return fmt.Errorf("release %s not found for job %s", job.ReleaseId, job.Id)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate event payload before use.

If Data lacks job or ReleaseId, this panics/later fails. Guard early.

 var data JobCreatedEventData
 if err := json.Unmarshal(event.Data, &data); err != nil {
     return fmt.Errorf("failed to unmarshal job.created event: %w", err)
 }
 
 job := data.Job
+if job == nil || job.Id == "" || job.ReleaseId == "" {
+    return fmt.Errorf("invalid job.created payload: missing job/id/releaseId")
+}
 
 // Get the release for this job to cancel outdated jobs
 release, exists := ws.Store().Releases.Get(job.ReleaseId)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var data JobCreatedEventData
if err := json.Unmarshal(event.Data, &data); err != nil {
return fmt.Errorf("failed to unmarshal job.created event: %w", err)
}
job := data.Job
// Get the release for this job to cancel outdated jobs
release, exists := ws.Store().Releases.Get(job.ReleaseId)
if !exists {
return fmt.Errorf("release %s not found for job %s", job.ReleaseId, job.Id)
}
var data JobCreatedEventData
if err := json.Unmarshal(event.Data, &data); err != nil {
return fmt.Errorf("failed to unmarshal job.created event: %w", err)
}
job := data.Job
if job == nil || job.Id == "" || job.ReleaseId == "" {
return fmt.Errorf("invalid job.created payload: missing job/id/releaseId")
}
// Get the release for this job to cancel outdated jobs
release, exists := ws.Store().Releases.Get(job.ReleaseId)
if !exists {
return fmt.Errorf("release %s not found for job %s", job.ReleaseId, job.Id)
}
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/jobs/jobs.go around lines 36 to 47,
the code unmarshals JobCreatedEventData and immediately uses data.Job and
data.Job.ReleaseId which can be missing/empty and cause panics or later
failures; add explicit validation after unmarshalling that data.Job is present
(or its required fields), that data.Job.ReleaseId is non-empty, and that
data.Job.Id is non-empty, returning a clear fmt.Errorf if any required field is
missing; then proceed to lookup the release only after these checks pass so the
function fails fast with a descriptive error instead of panicking later.

Comment on lines +64 to +73
if job.Status != oapi.InvalidJobAgent {
go func() {
if err := dispatchJob(ctx, ws, job); err != nil && !errors.Is(err, jobs.ErrUnsupportedJobAgent) {
log.Error("error dispatching job to integration", "error", err.Error())
job.Status = oapi.InvalidIntegration
job.UpdatedAt = time.Now()
ws.Store().Jobs.Upsert(ctx, job)
}
}()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unsupported job agent leaves job stuck in pending. Mark it InvalidJobAgent.

Swallowing ErrUnsupportedJobAgent without persisting state causes indefinite pending jobs.

-        go func() {
-            if err := dispatchJob(ctx, ws, job); err != nil && !errors.Is(err, jobs.ErrUnsupportedJobAgent) {
-                log.Error("error dispatching job to integration", "error", err.Error())
-                job.Status = oapi.InvalidIntegration
-                job.UpdatedAt = time.Now()
-                ws.Store().Jobs.Upsert(ctx, job)
-            }
-        }()
+        go func() {
+            if err := dispatchJob(ctx, ws, job); err != nil {
+                if errors.Is(err, jobs.ErrUnsupportedJobAgent) {
+                    job.Status = oapi.InvalidJobAgent
+                } else {
+                    log.Error("error dispatching job to integration", "error", err.Error())
+                    job.Status = oapi.InvalidIntegration
+                }
+                job.UpdatedAt = time.Now()
+                ws.Store().Jobs.Upsert(ctx, job)
+            }
+        }()
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/jobs/jobs.go around lines 64 to 73,
the goroutine swallows ErrUnsupportedJobAgent and leaves the job pending; update
the error handling so that when dispatchJob returns ErrUnsupportedJobAgent you
set job.Status = oapi.InvalidJobAgent, set job.UpdatedAt = time.Now(), and
persist the change via ws.Store().Jobs.Upsert(ctx, job); for other errors keep
the existing InvalidIntegration handling. Ensure both error branches persist the
job and avoid duplicating Upsert logic by structuring the branches clearly.

Comment on lines +3 to +11
import (
"context"
"encoding/json"
"fmt"
"time"
"workspace-engine/pkg/events/handler"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Imports needed for safe concurrency and topic constant.

Add sync, sync/atomic, and the app Kafka package (aliased) to use the configured topic instead of a hardcoded string.

 import (
   "context"
   "encoding/json"
   "fmt"
+  "sync"
+  "sync/atomic"
   "time"
   "workspace-engine/pkg/events/handler"
+  appkafka "workspace-engine/pkg/kafka"

   "github.com/confluentinc/confluent-kafka-go/v2/kafka"
 )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import (
"context"
"encoding/json"
"fmt"
"time"
"workspace-engine/pkg/events/handler"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
"workspace-engine/pkg/events/handler"
appkafka "workspace-engine/pkg/kafka"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/producer.go around lines 3 to 11, the file
lacks imports for sync, sync/atomic and the application Kafka package alias
needed to use the configured topic instead of a hardcoded string; add imports
"sync" and "sync/atomic" and the app Kafka package aliased (for example
appkafka) to the import block, replace any hardcoded topic with the configured
value from the app Kafka package (e.g. appkafka.Topic or appkafka.Config.Topic),
and use sync/atomic to manage any producer-ready boolean and sync.Mutex or
sync.WaitGroup where concurrent access to the producer or close sequence occurs
to ensure safe concurrency.

Comment on lines 67 to 70
func setupBenchmarkStore(b *testing.B, workspaceID string, numResources int) (*Store, string) {
st := New()
st := New(workspaceID)

// Create system
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unused parameter ‘b’ causes build failure.

setupBenchmarkStore doesn’t use b. Mark it used or drop it.

 func setupBenchmarkStore(b *testing.B, workspaceID string, numResources int) (*Store, string) {
-    st := New(workspaceID)
+    st := New(workspaceID)
+    _ = b // avoid unused param in benchmarks helper
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func setupBenchmarkStore(b *testing.B, workspaceID string, numResources int) (*Store, string) {
st := New()
st := New(workspaceID)
// Create system
func setupBenchmarkStore(b *testing.B, workspaceID string, numResources int) (*Store, string) {
st := New(workspaceID)
_ = b // avoid unused param in benchmarks helper
// Create system
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/store/environments_bench_test.go around
lines 67 to 70, the benchmark helper setupBenchmarkStore accepts a *testing.B
parameter named b but does not use it, causing a build failure; either remove
the b parameter if benchmarks do not need it, or mark it as used (for example
assign to the blank identifier or call b.Helper()) to silence the unused
variable error—pick one approach and update the function signature and any
callers accordingly.

Comment on lines 181 to +185
// Reinitialize release manager with the decoded store
w.releasemanager = releasemanager.New(w.store)
// Use the workspace's existing event producer
if w.eventProducer != nil {
w.releasemanager = releasemanager.New(w.store, w.eventProducer)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix stale ReleaseManager after GobDecode when eventProducer is nil

releasemanager.Manager retains a pointer to the pre-decode store if you don’t reinitialize it, leading to state drift. Always reinitialize with the decoded store; eventProducer may be nil.

- // Reinitialize release manager with the decoded store
- // Use the workspace's existing event producer
- if w.eventProducer != nil {
-   w.releasemanager = releasemanager.New(w.store, w.eventProducer)
- }
+ // Reinitialize release manager with the decoded store (eventProducer may be nil)
+ w.releasemanager = releasemanager.New(w.store, w.eventProducer)
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/workspace.go around lines 181 to 185, the
releasemanager is only reinitialized when w.eventProducer is non-nil which
leaves the Manager holding a pointer to the pre-decoded store; always
reinitialize releasemanager with the decoded w.store even if w.eventProducer is
nil. Replace the conditional so releasemanager.New is called unconditionally
(passing w.eventProducer which may be nil) to ensure the Manager references the
updated store.

Comment on lines +81 to +99
func (c *ConsumerMock) Poll(timeoutMs int) kafka.Event {
c.mu.Lock()
defer c.mu.Unlock()

// If not yet assigned and subscribed, trigger assignment
if len(c.assignedPartitions) == 0 && len(c.subscribed) > 0 {
// Assign partition 0 for all subscribed topics
partitions := make([]kafka.TopicPartition, 0, len(c.subscribed))
for _, topic := range c.subscribed {
topicCopy := topic
partitions = append(partitions, kafka.TopicPartition{
Topic: &topicCopy,
Partition: 0,
Offset: kafka.OffsetBeginning,
})
}
return kafka.AssignedPartitions{Partitions: partitions}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Cache assigned partitions in mock upon Poll.

Poll returns AssignedPartitions but doesn’t update internal state. Some code paths may rely on Assignment() later. Cache them for consistency.

 	if len(c.assignedPartitions) == 0 && len(c.subscribed) > 0 {
@@
-		return kafka.AssignedPartitions{Partitions: partitions}
+		// Cache internally and emit the assignment event
+		c.assignedPartitions = partitions
+		return kafka.AssignedPartitions{Partitions: partitions}
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *ConsumerMock) Poll(timeoutMs int) kafka.Event {
c.mu.Lock()
defer c.mu.Unlock()
// If not yet assigned and subscribed, trigger assignment
if len(c.assignedPartitions) == 0 && len(c.subscribed) > 0 {
// Assign partition 0 for all subscribed topics
partitions := make([]kafka.TopicPartition, 0, len(c.subscribed))
for _, topic := range c.subscribed {
topicCopy := topic
partitions = append(partitions, kafka.TopicPartition{
Topic: &topicCopy,
Partition: 0,
Offset: kafka.OffsetBeginning,
})
}
return kafka.AssignedPartitions{Partitions: partitions}
}
func (c *ConsumerMock) Poll(timeoutMs int) kafka.Event {
c.mu.Lock()
defer c.mu.Unlock()
// If not yet assigned and subscribed, trigger assignment
if len(c.assignedPartitions) == 0 && len(c.subscribed) > 0 {
// Assign partition 0 for all subscribed topics
partitions := make([]kafka.TopicPartition, 0, len(c.subscribed))
for _, topic := range c.subscribed {
topicCopy := topic
partitions = append(partitions, kafka.TopicPartition{
Topic: &topicCopy,
Partition: 0,
Offset: kafka.OffsetBeginning,
})
}
c.assignedPartitions = partitions
return kafka.AssignedPartitions{Partitions: partitions}
}
🤖 Prompt for AI Agents
In apps/workspace-engine/test/integration/kafka/consumer.go around lines 81 to
99, Poll constructs and returns an AssignedPartitions event but never updates
the ConsumerMock internal assignedPartitions field; modify Poll so that before
returning it stores the constructed partitions into c.assignedPartitions (while
still holding the mutex) so subsequent Assignment() calls see the same
partitions, ensuring you copy or use the same slice instance and maintain the
lock/unlock discipline.

Comment on lines +34 to +66
// Pop removes and returns the first message from the queue
// Returns nil if timeout is reached
func (mq *MessageQueue) Pop(timeout time.Duration) (*kafka.Message, error) {
mq.mu.Lock()
defer mq.mu.Unlock()

deadline := time.Now().Add(timeout)
for len(mq.messages) == 0 {
remaining := time.Until(deadline)
if remaining <= 0 {
// Timeout reached
return nil, kafka.NewError(kafka.ErrTimedOut, "timed out waiting for message", false)
}

// Wait for signal or timeout
done := make(chan struct{})
go func() {
time.Sleep(remaining)
close(done)
}()

mq.cond.Wait()

select {
case <-done:
// Timeout
if len(mq.messages) == 0 {
return nil, kafka.NewError(kafka.ErrTimedOut, "timed out waiting for message", false)
}
default:
// Got signaled, continue loop to check messages
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Timeout logic is broken; Pop can block forever.

sync.Cond.Wait doesn’t wake on your timer; without a Push, Pop never returns. Replace with a channel-based queue.

 type MessageQueue struct {
-    mu       sync.Mutex
-    messages []*kafka.Message
-    cond     *sync.Cond
+    ch chan *kafka.Message
 }
 
 // NewMessageQueue creates a new message queue
 func NewMessageQueue() *MessageQueue {
-    mq := &MessageQueue{
-        messages: make([]*kafka.Message, 0),
-    }
-    mq.cond = sync.NewCond(&mq.mu)
-    return mq
+    return &MessageQueue{ch: make(chan *kafka.Message, 1024)}
 }
 
 // Push adds a message to the queue
 func (mq *MessageQueue) Push(msg *kafka.Message) {
-    mq.mu.Lock()
-    defer mq.mu.Unlock()
-    mq.messages = append(mq.messages, msg)
-    mq.cond.Signal()
+    mq.ch <- msg
 }
 
 // Pop removes and returns the first message from the queue
 // Returns nil if timeout is reached
 func (mq *MessageQueue) Pop(timeout time.Duration) (*kafka.Message, error) {
-    mq.mu.Lock()
-    defer mq.mu.Unlock()
-
-    deadline := time.Now().Add(timeout)
-    for len(mq.messages) == 0 {
-        remaining := time.Until(deadline)
-        if remaining <= 0 {
-            // Timeout reached
-            return nil, kafka.NewError(kafka.ErrTimedOut, "timed out waiting for message", false)
-        }
-
-        // Wait for signal or timeout
-        done := make(chan struct{})
-        go func() {
-            time.Sleep(remaining)
-            close(done)
-        }()
-
-        mq.cond.Wait()
-
-        select {
-        case <-done:
-            // Timeout
-            if len(mq.messages) == 0 {
-                return nil, kafka.NewError(kafka.ErrTimedOut, "timed out waiting for message", false)
-            }
-        default:
-            // Got signaled, continue loop to check messages
-        }
-    }
-
-    // Get first message
-    msg := mq.messages[0]
-    mq.messages = mq.messages[1:]
-    return msg, nil
+    select {
+    case msg := <-mq.ch:
+        return msg, nil
+    case <-time.After(timeout):
+        return nil, kafka.NewError(kafka.ErrTimedOut, "timed out waiting for message", false)
+    }
 }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In apps/workspace-engine/test/integration/kafka/queue.go around lines 34–66, the
Pop implementation uses sync.Cond and a manual goroutine timer so sync.Cond.Wait
can block forever if no Push occurs; replace the blocking cond/timer pattern
with a channel-based queue: change MessageQueue to hold a buffered channel for
messages (or create one alongside the existing fields), have Push send into that
channel (non-blocking or with context/timeout as appropriate), and implement Pop
by selecting on the message channel and time.After(timeout) so Pop returns
either a message or a timeout error reliably without relying on sync.Cond
wakeups.

Comment on lines +1 to +76
# Kafka Integration Test Mock

This package provides a realistic Kafka mock for integration testing of the workspace-engine. It wraps the `confluent-kafka-go` MockCluster to provide a simple, easy-to-use API for testing Kafka-related functionality.

## Features

- **Realistic Kafka Behavior**: Uses the official confluent-kafka-go MockCluster
- **Multiple Partitions**: Test partition assignment and key-based routing
- **Offset Management**: Test offset commits, seeks, and resumption
- **Batch Operations**: Easily produce multiple events at once
- **Simple API**: Clean, easy-to-use helper functions
- **Automatic Cleanup**: Proper resource management with `Close()`

## Basic Usage

### Creating a Mock Cluster

```go
func TestMyFeature(t *testing.T) {
mock := NewMockKafkaCluster(t)
defer mock.Close()

// Your test code here
}
```

### Producing Events

```go
// Single event
err := mock.ProduceEvent(
string(handler.ResourceCreate),
"workspace-1",
map[string]string{"key": "value"},
)

// Batch of events
events := []EventData{
{
EventType: string(handler.ResourceCreate),
WorkspaceID: "workspace-1",
Data: map[string]string{"type": "resource1"},
},
{
EventType: string(handler.ResourceUpdate),
WorkspaceID: "workspace-1",
Data: map[string]string{"type": "resource2"},
},
}
err := mock.ProduceBatch(events)
```

### Consuming Events

```go
ctx := context.Background()

// Setup consumer with subscription and partition assignment
consumer, partitions, err := mock.SetupConsumerWithSubscription(ctx)
require.NoError(t, err)
defer consumer.Close()

// Consume single message
msg, err := mock.ConsumeMessage(consumer, 5*time.Second)
require.NoError(t, err)

// Consume multiple messages
messages, err := mock.ConsumeMessages(consumer, 3, 10*time.Second)
require.NoError(t, err)

// Parse event
event, err := mock.ParseEvent(msg)
require.NoError(t, err)
assert.Equal(t, handler.ResourceCreate, event.EventType)
```

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Docs may not match implementation.

This README promises a confluent-kafka-go MockCluster, but the PR wires an in-memory EventProducer for tests. Please verify the mock exists and examples compile. If not, update the README to reflect the in-memory producer pattern and remove/constrain Confluent-specific examples.

Also applies to: 248-307



---

🏁 Script executed:

```shell
#!/bin/bash
# Find Go files in the kafka test integration directory
find apps/workspace-engine/test/integration/kafka -type f -name "*.go" | head -20

Length of output: 164


🏁 Script executed:

#!/bin/bash
# Search for NewMockKafkaCluster function definition
rg -n "func NewMockKafkaCluster" --type go

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Search for imports related to confluent-kafka or kafka in the test directory
rg -n "confluent-kafka|EventProducer" apps/workspace-engine/test/integration/kafka --type go -C 3

Length of output: 1555


🏁 Script executed:

#!/bin/bash
# Check the actual functions exported/available in the mock
rg -n "^func.*Mock" apps/workspace-engine/test/integration/kafka --type go | head -20

Length of output: 1792


🏁 Script executed:

#!/bin/bash
# Look for any implementation of ProduceBatch, ConsumeMessage, etc
rg -n "ProduceBatch|ConsumeMessage|ParseEvent|SetupConsumerWithSubscription" apps/workspace-engine/test/integration/kafka --type go

Length of output: 0



---

🏁 Script executed:

```shell
#!/bin/bash
# Examine the kafka.go file to see ProducerMock implementation
cat apps/workspace-engine/test/integration/kafka/kafka.go

Length of output: 1033


🏁 Script executed:

#!/bin/bash
# Examine the consumer.go file
cat apps/workspace-engine/test/integration/kafka/consumer.go

Length of output: 3479


🏁 Script executed:

#!/bin/bash
# Examine the queue.go file
cat apps/workspace-engine/test/integration/kafka/queue.go

Length of output: 1556


README documents non-existent API—all examples will fail.

The documented functions do not exist in the codebase:

  • NewMockKafkaCluster(t) — actually use NewProducerMock(queue) and NewConsumerMock(queue) separately with a MessageQueue
  • ProduceEvent(), ProduceBatch() — use ProducerMock.Produce(msg, deliveryChan) directly
  • SetupConsumerWithSubscription(), ConsumeMessage(), ConsumeMessages(), ParseEvent() — use ConsumerMock.SubscribeTopics(), .ReadMessage(), .Poll() with raw kafka.Message types

Update the README to reflect the actual low-level API:

  • NewMessageQueue() to initialize the shared queue
  • NewProducerMock(queue) / NewConsumerMock(queue) to create mocks
  • Direct usage of confluent-kafka-go types (kafka.Message, kafka.TopicPartition, etc.) and corresponding mock method calls
🤖 Prompt for AI Agents
In apps/workspace-engine/test/integration/kafka/README.md lines 1-76, the README
references high-level helper functions that don't exist (NewMockKafkaCluster,
ProduceEvent, ProduceBatch, SetupConsumerWithSubscription, ConsumeMessage,
ConsumeMessages, ParseEvent); update the document to show the real low-level
API: explain creating a shared MessageQueue via NewMessageQueue(), creating
mocks with NewProducerMock(queue) and NewConsumerMock(queue), and demonstrate
producing/consuming using ProducerMock.Produce(msg, deliveryChan),
ConsumerMock.SubscribeTopics(), ConsumerMock.ReadMessage()/Poll(), and raw
confluent-kafka-go types (kafka.Message, kafka.TopicPartition) with brief
example snippets showing how to build kafka.Message, produce to delivery
channel, and read/parse kafka.Message from the consumer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant