-
Couldn't load subscription status.
- Fork 11
init kafka job dispatch #701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,7 @@ const ( | |
| JobAgentUpdate EventType = "job-agent.updated" | ||
| JobAgentDelete EventType = "job-agent.deleted" | ||
|
|
||
| JobCreate EventType = "job.created" | ||
| JobUpdate EventType = "job.updated" | ||
|
|
||
| PolicyCreate EventType = "policy.created" | ||
|
|
@@ -97,12 +98,21 @@ type HandlerRegistry map[EventType]Handler | |
|
|
||
| // EventListener listens for events on the queue and routes them to appropriate handlers | ||
| type EventListener struct { | ||
| handlers HandlerRegistry | ||
| handlers HandlerRegistry | ||
| eventProducer EventProducer | ||
| } | ||
|
|
||
| // EventProducer defines the interface for producing events (imported to avoid circular deps) | ||
| type EventProducer interface { | ||
| ProduceEvent(eventType string, workspaceID string, data any) error | ||
| } | ||
|
|
||
| // NewEventListener creates a new event listener with the provided handlers | ||
| func NewEventListener(handlers HandlerRegistry) *EventListener { | ||
| el := &EventListener{handlers: handlers} | ||
| func NewEventListener(handlers HandlerRegistry, eventProducer EventProducer) *EventListener { | ||
| el := &EventListener{ | ||
| handlers: handlers, | ||
| eventProducer: eventProducer, | ||
| } | ||
| return el | ||
| } | ||
|
|
||
|
|
@@ -154,7 +164,7 @@ func (el *EventListener) ListenAndRoute(ctx context.Context, msg *kafka.Message, | |
| var ws *workspace.Workspace | ||
| changeSet := changeset.NewChangeSet[any]() | ||
|
|
||
| ws, err := workspace.GetWorkspaceAndLoad(rawEvent.WorkspaceID) | ||
| ws, err := workspace.GetWorkspaceAndLoad(rawEvent.WorkspaceID, el.eventProducer) | ||
| if ws == nil { | ||
| return nil, fmt.Errorf("workspace not found: %s: %w", rawEvent.WorkspaceID, err) | ||
| } | ||
|
Comment on lines
+167
to
170
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major Potential type mismatch risk passing producer into workspace loader. workspace.GetWorkspaceAndLoad requires workspace.EventProducer; el.eventProducer here is handler.EventProducer. Method sets match, but duplication increases risk. Same consolidation recommendation applies. 🤖 Prompt for AI Agents |
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,15 +3,105 @@ package jobs | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "encoding/json" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "errors" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "workspace-engine/pkg/events/handler" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "workspace-engine/pkg/oapi" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "workspace-engine/pkg/workspace" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "workspace-engine/pkg/workspace/jobdispatch" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "workspace-engine/pkg/workspace/releasemanager/deployment/jobs" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/charmbracelet/log" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/google/uuid" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // JobCreatedEventData contains the data for a job.created event. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| type JobCreatedEventData struct { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Job *oapi.Job `json:"job"` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // HandleJobCreated handles the job.created event. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // This handler performs write operations for job creation: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // - Cancels outdated jobs for the release target | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // - Upserts the new job | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // - Dispatches the job if it's not InvalidJobAgent | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Note: The release is already persisted before this event is sent | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func HandleJobCreated( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ctx context.Context, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ws *workspace.Workspace, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| event handler.RawEvent, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var data JobCreatedEventData | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err := json.Unmarshal(event.Data, &data); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("failed to unmarshal job.created event: %w", err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| job := data.Job | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Get the release for this job to cancel outdated jobs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| release, exists := ws.Store().Releases.Get(job.ReleaseId) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !exists { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("release %s not found for job %s", job.ReleaseId, job.Id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+36
to
+47
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Validate event payload before use. If Data lacks job or ReleaseId, this panics/later fails. Guard early. var data JobCreatedEventData
if err := json.Unmarshal(event.Data, &data); err != nil {
return fmt.Errorf("failed to unmarshal job.created event: %w", err)
}
job := data.Job
+if job == nil || job.Id == "" || job.ReleaseId == "" {
+ return fmt.Errorf("invalid job.created payload: missing job/id/releaseId")
+}
// Get the release for this job to cancel outdated jobs
release, exists := ws.Store().Releases.Get(job.ReleaseId)📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Step 1: Cancel outdated jobs for this release target | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Cancel any pending jobs for this release target (outdated versions) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cancelOutdatedJobs(ctx, ws, release) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Step 2: Upsert the new job | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ws.Store().Jobs.Upsert(ctx, job) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Skip dispatch in replay mode | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if ws.Store().IsReplay() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Info("Skipping job dispatch in replay mode", "job.id", job.Id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Step 3: Dispatch job to integration (ASYNC) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Skip dispatch if job already has InvalidJobAgent status | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if job.Status != oapi.InvalidJobAgent { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| go func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err := dispatchJob(ctx, ws, job); err != nil && !errors.Is(err, jobs.ErrUnsupportedJobAgent) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Error("error dispatching job to integration", "error", err.Error()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| job.Status = oapi.InvalidIntegration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| job.UpdatedAt = time.Now() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ws.Store().Jobs.Upsert(ctx, job) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+64
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unsupported job agent leaves job stuck in pending. Mark it InvalidJobAgent. Swallowing ErrUnsupportedJobAgent without persisting state causes indefinite pending jobs. - go func() {
- if err := dispatchJob(ctx, ws, job); err != nil && !errors.Is(err, jobs.ErrUnsupportedJobAgent) {
- log.Error("error dispatching job to integration", "error", err.Error())
- job.Status = oapi.InvalidIntegration
- job.UpdatedAt = time.Now()
- ws.Store().Jobs.Upsert(ctx, job)
- }
- }()
+ go func() {
+ if err := dispatchJob(ctx, ws, job); err != nil {
+ if errors.Is(err, jobs.ErrUnsupportedJobAgent) {
+ job.Status = oapi.InvalidJobAgent
+ } else {
+ log.Error("error dispatching job to integration", "error", err.Error())
+ job.Status = oapi.InvalidIntegration
+ }
+ job.UpdatedAt = time.Now()
+ ws.Store().Jobs.Upsert(ctx, job)
+ }
+ }()🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // cancelOutdatedJobs cancels jobs for outdated releases. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func cancelOutdatedJobs(ctx context.Context, ws *workspace.Workspace, desiredRelease *oapi.Release) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| jobs := ws.Store().Jobs.GetJobsForReleaseTarget(&desiredRelease.ReleaseTarget) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for _, job := range jobs { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if job.Status == oapi.Pending { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| job.Status = oapi.Cancelled | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| job.UpdatedAt = time.Now() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ws.Store().Jobs.Upsert(ctx, job) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // dispatchJob sends a job to the configured job agent for execution. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func dispatchJob(ctx context.Context, ws *workspace.Workspace, job *oapi.Job) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| jobAgent, exists := ws.Store().JobAgents.Get(job.JobAgentId) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !exists { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("job agent %s not found", job.JobAgentId) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if jobAgent.Type == string(jobdispatch.JobAgentTypeGithub) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return jobdispatch.NewGithubDispatcher(ws.Store()).DispatchJob(ctx, job) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return jobs.ErrUnsupportedJobAgent | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func isStringUUID(s string) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _, err := uuid.Parse(s) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return err == nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compile error: wrong type in signature.
EventProduceris defined in thehandlerpackage. Qualify the type.🤖 Prompt for AI Agents