Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/workspace-engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func main() {

go func() {
log.Info("Kafka consumer started")
if err := kafka.RunConsumer(ctx); err != nil {
if err := kafka.RunConsumer(ctx, producer); err != nil {
log.Error("received error from kafka consumer", "error", err)
panic(err)
}
Expand Down
30 changes: 15 additions & 15 deletions apps/workspace-engine/pkg/db/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestDBJobs_BasicWrite(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it (indexed by both hash ID and UUID)
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
// Also index by UUID for job lookup
testStore.Repo().Releases.Set(releaseID, &release)
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestDBJobs_BasicWriteAndUpdate(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it (indexed by both hash ID and UUID)
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
// Also index by UUID for job lookup
testStore.Repo().Releases.Set(releaseID, &release)
Expand Down Expand Up @@ -392,7 +392,7 @@ func TestDBJobs_CompleteJobLifecycle(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it (indexed by both hash ID and UUID)
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
// Also index by UUID for job lookup
testStore.Repo().Releases.Set(releaseID, &release)
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestDBJobs_BasicWriteAndDelete(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it (indexed by both hash ID and UUID)
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
// Also index by UUID for job lookup
testStore.Repo().Releases.Set(releaseID, &release)
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestDBJobs_MultipleJobsForSameRelease(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it (indexed by both hash ID and UUID)
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
// Also index by UUID for job lookup
testStore.Repo().Releases.Set(releaseID, &release)
Expand Down Expand Up @@ -656,7 +656,7 @@ func TestDBJobs_ComplexJobAgentConfig(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it (indexed by both hash ID and UUID)
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
// Also index by UUID for job lookup
testStore.Repo().Releases.Set(releaseID, &release)
Expand Down Expand Up @@ -722,7 +722,7 @@ func TestDBJobs_AllJobStatuses(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it (indexed by both hash ID and UUID)
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
// Also index by UUID for job lookup
testStore.Repo().Releases.Set(releaseID, &release)
Expand Down Expand Up @@ -807,7 +807,7 @@ func TestDBJobs_WorkspaceIsolation(t *testing.T) {
releaseID1, jobAgentID1 := createJobPrerequisites(t, workspaceID1, conn1, &release1)

// Create store for workspace 1 (indexed by both hash ID and UUID)
testStore1 := wsStore.New()
testStore1 := wsStore.New(workspaceID1)
testStore1.Releases.Upsert(t.Context(), &release1)
// Also index by UUID for job lookup
testStore1.Repo().Releases.Set(releaseID1, &release1)
Expand All @@ -817,7 +817,7 @@ func TestDBJobs_WorkspaceIsolation(t *testing.T) {
releaseID2, jobAgentID2 := createJobPrerequisites(t, workspaceID2, conn2, &release2)

// Create store for workspace 2 (indexed by both hash ID and UUID)
testStore2 := wsStore.New()
testStore2 := wsStore.New(workspaceID2)
testStore2.Releases.Upsert(t.Context(), &release2)
// Also index by UUID for job lookup
testStore2.Repo().Releases.Set(releaseID2, &release2)
Expand Down Expand Up @@ -943,7 +943,7 @@ func TestDBJobs_WriteAndRetrieveWithReleaseJob(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it (indexed by both hash ID and UUID)
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
// Also index by UUID for job lookup
testStore.Repo().Releases.Set(releaseID, &release)
Expand Down Expand Up @@ -1106,7 +1106,7 @@ func TestDBJobs_BasicMetadata(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
testStore.Repo().Releases.Set(releaseID, &release)

Expand Down Expand Up @@ -1183,7 +1183,7 @@ func TestDBJobs_EmptyMetadata(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
testStore.Repo().Releases.Set(releaseID, &release)

Expand Down Expand Up @@ -1242,7 +1242,7 @@ func TestDBJobs_MetadataUpdate(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
testStore.Repo().Releases.Set(releaseID, &release)

Expand Down Expand Up @@ -1347,7 +1347,7 @@ func TestDBJobs_MetadataRemoval(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
testStore.Repo().Releases.Set(releaseID, &release)

Expand Down Expand Up @@ -1444,7 +1444,7 @@ func TestDBJobs_MultipleJobsWithDifferentMetadata(t *testing.T) {
releaseID, jobAgentID := createJobPrerequisites(t, workspaceID, conn, &release)

// Create store and add release to it
testStore := wsStore.New()
testStore := wsStore.New(workspaceID)
testStore.Releases.Upsert(t.Context(), &release)
testStore.Repo().Releases.Set(releaseID, &release)

Expand Down
5 changes: 3 additions & 2 deletions apps/workspace-engine/pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var handlers = handler.HandlerRegistry{
handler.JobAgentUpdate: jobagents.HandleJobAgentUpdated,
handler.JobAgentDelete: jobagents.HandleJobAgentDeleted,

handler.JobCreate: jobs.HandleJobCreated,
handler.JobUpdate: jobs.HandleJobUpdated,

handler.PolicyCreate: policies.HandlePolicyCreated,
Expand All @@ -81,6 +82,6 @@ var handlers = handler.HandlerRegistry{
handler.ReleaseTargetDeploy: redeploy.HandleReleaseTargetDeploy,
}

func NewEventHandler() *handler.EventListener {
return handler.NewEventListener(handlers)
func NewEventHandler(eventProducer EventProducer) *handler.EventListener {
return handler.NewEventListener(handlers, eventProducer)
}
Comment on lines +85 to 87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Compile error: wrong type in signature.

EventProducer is defined in the handler package. Qualify the type.

-func NewEventHandler(eventProducer EventProducer) *handler.EventListener {
-  return handler.NewEventListener(handlers, eventProducer)
+func NewEventHandler(eventProducer handler.EventProducer) *handler.EventListener {
+  return handler.NewEventListener(handlers, eventProducer)
 }
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/events.go around lines 85 to 87, the
function signature uses an unqualified EventProducer type causing a compile
error; change the parameter type to handler.EventProducer (i.e., func
NewEventHandler(eventProducer handler.EventProducer) *handler.EventListener) so
it references the type from the handler package, and ensure the handler import
remains or is added if missing.

18 changes: 14 additions & 4 deletions apps/workspace-engine/pkg/events/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
JobAgentUpdate EventType = "job-agent.updated"
JobAgentDelete EventType = "job-agent.deleted"

JobCreate EventType = "job.created"
JobUpdate EventType = "job.updated"

PolicyCreate EventType = "policy.created"
Expand Down Expand Up @@ -97,12 +98,21 @@ type HandlerRegistry map[EventType]Handler

// EventListener listens for events on the queue and routes them to appropriate handlers
type EventListener struct {
handlers HandlerRegistry
handlers HandlerRegistry
eventProducer EventProducer
}

// EventProducer defines the interface for producing events (imported to avoid circular deps)
type EventProducer interface {
ProduceEvent(eventType string, workspaceID string, data any) error
}

// NewEventListener creates a new event listener with the provided handlers
func NewEventListener(handlers HandlerRegistry) *EventListener {
el := &EventListener{handlers: handlers}
func NewEventListener(handlers HandlerRegistry, eventProducer EventProducer) *EventListener {
el := &EventListener{
handlers: handlers,
eventProducer: eventProducer,
}
return el
}

Expand Down Expand Up @@ -154,7 +164,7 @@ func (el *EventListener) ListenAndRoute(ctx context.Context, msg *kafka.Message,
var ws *workspace.Workspace
changeSet := changeset.NewChangeSet[any]()

ws, err := workspace.GetWorkspaceAndLoad(rawEvent.WorkspaceID)
ws, err := workspace.GetWorkspaceAndLoad(rawEvent.WorkspaceID, el.eventProducer)
if ws == nil {
return nil, fmt.Errorf("workspace not found: %s: %w", rawEvent.WorkspaceID, err)
}
Comment on lines +167 to 170
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Potential type mismatch risk passing producer into workspace loader.

workspace.GetWorkspaceAndLoad requires workspace.EventProducer; el.eventProducer here is handler.EventProducer. Method sets match, but duplication increases risk. Same consolidation recommendation applies.

🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/handler.go around lines 167 to 170,
the call to workspace.GetWorkspaceAndLoad passes el.eventProducer which is typed
as handler.EventProducer while the function expects workspace.EventProducer,
creating a risky implicit match; fix by aligning types: either change the
handler struct field to use workspace.EventProducer (preferred) or implement an
explicit adapter that converts handler.EventProducer to workspace.EventProducer
and pass that adapter; update imports and constructor/initialization sites to
use the consolidated workspace.EventProducer type so the compiler enforces
compatibility.

Expand Down
90 changes: 90 additions & 0 deletions apps/workspace-engine/pkg/events/handler/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,105 @@ package jobs
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"workspace-engine/pkg/events/handler"
"workspace-engine/pkg/oapi"
"workspace-engine/pkg/workspace"
"workspace-engine/pkg/workspace/jobdispatch"
"workspace-engine/pkg/workspace/releasemanager/deployment/jobs"

"github.com/charmbracelet/log"
"github.com/google/uuid"
)

// JobCreatedEventData contains the data for a job.created event.
type JobCreatedEventData struct {
Job *oapi.Job `json:"job"`
}

// HandleJobCreated handles the job.created event.
// This handler performs write operations for job creation:
// - Cancels outdated jobs for the release target
// - Upserts the new job
// - Dispatches the job if it's not InvalidJobAgent
// Note: The release is already persisted before this event is sent
func HandleJobCreated(
ctx context.Context,
ws *workspace.Workspace,
event handler.RawEvent,
) error {
var data JobCreatedEventData
if err := json.Unmarshal(event.Data, &data); err != nil {
return fmt.Errorf("failed to unmarshal job.created event: %w", err)
}

job := data.Job

// Get the release for this job to cancel outdated jobs
release, exists := ws.Store().Releases.Get(job.ReleaseId)
if !exists {
return fmt.Errorf("release %s not found for job %s", job.ReleaseId, job.Id)
}
Comment on lines +36 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate event payload before use.

If Data lacks job or ReleaseId, this panics/later fails. Guard early.

 var data JobCreatedEventData
 if err := json.Unmarshal(event.Data, &data); err != nil {
     return fmt.Errorf("failed to unmarshal job.created event: %w", err)
 }
 
 job := data.Job
+if job == nil || job.Id == "" || job.ReleaseId == "" {
+    return fmt.Errorf("invalid job.created payload: missing job/id/releaseId")
+}
 
 // Get the release for this job to cancel outdated jobs
 release, exists := ws.Store().Releases.Get(job.ReleaseId)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var data JobCreatedEventData
if err := json.Unmarshal(event.Data, &data); err != nil {
return fmt.Errorf("failed to unmarshal job.created event: %w", err)
}
job := data.Job
// Get the release for this job to cancel outdated jobs
release, exists := ws.Store().Releases.Get(job.ReleaseId)
if !exists {
return fmt.Errorf("release %s not found for job %s", job.ReleaseId, job.Id)
}
var data JobCreatedEventData
if err := json.Unmarshal(event.Data, &data); err != nil {
return fmt.Errorf("failed to unmarshal job.created event: %w", err)
}
job := data.Job
if job == nil || job.Id == "" || job.ReleaseId == "" {
return fmt.Errorf("invalid job.created payload: missing job/id/releaseId")
}
// Get the release for this job to cancel outdated jobs
release, exists := ws.Store().Releases.Get(job.ReleaseId)
if !exists {
return fmt.Errorf("release %s not found for job %s", job.ReleaseId, job.Id)
}
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/jobs/jobs.go around lines 36 to 47,
the code unmarshals JobCreatedEventData and immediately uses data.Job and
data.Job.ReleaseId which can be missing/empty and cause panics or later
failures; add explicit validation after unmarshalling that data.Job is present
(or its required fields), that data.Job.ReleaseId is non-empty, and that
data.Job.Id is non-empty, returning a clear fmt.Errorf if any required field is
missing; then proceed to lookup the release only after these checks pass so the
function fails fast with a descriptive error instead of panicking later.


// Step 1: Cancel outdated jobs for this release target
// Cancel any pending jobs for this release target (outdated versions)
cancelOutdatedJobs(ctx, ws, release)

// Step 2: Upsert the new job
ws.Store().Jobs.Upsert(ctx, job)

// Skip dispatch in replay mode
if ws.Store().IsReplay() {
log.Info("Skipping job dispatch in replay mode", "job.id", job.Id)
return nil
}

// Step 3: Dispatch job to integration (ASYNC)
// Skip dispatch if job already has InvalidJobAgent status
if job.Status != oapi.InvalidJobAgent {
go func() {
if err := dispatchJob(ctx, ws, job); err != nil && !errors.Is(err, jobs.ErrUnsupportedJobAgent) {
log.Error("error dispatching job to integration", "error", err.Error())
job.Status = oapi.InvalidIntegration
job.UpdatedAt = time.Now()
ws.Store().Jobs.Upsert(ctx, job)
}
}()
}
Comment on lines +64 to +73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unsupported job agent leaves job stuck in pending. Mark it InvalidJobAgent.

Swallowing ErrUnsupportedJobAgent without persisting state causes indefinite pending jobs.

-        go func() {
-            if err := dispatchJob(ctx, ws, job); err != nil && !errors.Is(err, jobs.ErrUnsupportedJobAgent) {
-                log.Error("error dispatching job to integration", "error", err.Error())
-                job.Status = oapi.InvalidIntegration
-                job.UpdatedAt = time.Now()
-                ws.Store().Jobs.Upsert(ctx, job)
-            }
-        }()
+        go func() {
+            if err := dispatchJob(ctx, ws, job); err != nil {
+                if errors.Is(err, jobs.ErrUnsupportedJobAgent) {
+                    job.Status = oapi.InvalidJobAgent
+                } else {
+                    log.Error("error dispatching job to integration", "error", err.Error())
+                    job.Status = oapi.InvalidIntegration
+                }
+                job.UpdatedAt = time.Now()
+                ws.Store().Jobs.Upsert(ctx, job)
+            }
+        }()
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/jobs/jobs.go around lines 64 to 73,
the goroutine swallows ErrUnsupportedJobAgent and leaves the job pending; update
the error handling so that when dispatchJob returns ErrUnsupportedJobAgent you
set job.Status = oapi.InvalidJobAgent, set job.UpdatedAt = time.Now(), and
persist the change via ws.Store().Jobs.Upsert(ctx, job); for other errors keep
the existing InvalidIntegration handling. Ensure both error branches persist the
job and avoid duplicating Upsert logic by structuring the branches clearly.


return nil
}

// cancelOutdatedJobs cancels jobs for outdated releases.
func cancelOutdatedJobs(ctx context.Context, ws *workspace.Workspace, desiredRelease *oapi.Release) {
jobs := ws.Store().Jobs.GetJobsForReleaseTarget(&desiredRelease.ReleaseTarget)

for _, job := range jobs {
if job.Status == oapi.Pending {
job.Status = oapi.Cancelled
job.UpdatedAt = time.Now()
ws.Store().Jobs.Upsert(ctx, job)
}
}
}

// dispatchJob sends a job to the configured job agent for execution.
func dispatchJob(ctx context.Context, ws *workspace.Workspace, job *oapi.Job) error {
jobAgent, exists := ws.Store().JobAgents.Get(job.JobAgentId)
if !exists {
return fmt.Errorf("job agent %s not found", job.JobAgentId)
}

if jobAgent.Type == string(jobdispatch.JobAgentTypeGithub) {
return jobdispatch.NewGithubDispatcher(ws.Store()).DispatchJob(ctx, job)
}

return jobs.ErrUnsupportedJobAgent
}

func isStringUUID(s string) bool {
_, err := uuid.Parse(s)
return err == nil
Expand Down
Loading
Loading