-
Notifications
You must be signed in to change notification settings - Fork 267
feat: optimistic scheduling #2258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
The latest updates on your projects. Learn more about Vercel for GitHub.
|
var localScheduler *schedulerv1.Scheduler | ||
|
||
if sc.HasService("all") || sc.HasService("scheduler") { | ||
partitionCleanup, err := p.StartSchedulerPartition(ctx) | ||
|
||
if err != nil { | ||
return nil, fmt.Errorf("could not create create scheduler partition: %w", err) | ||
} | ||
|
||
teardown = append(teardown, Teardown{ | ||
Name: "scheduler partition", | ||
Fn: partitionCleanup, | ||
}) | ||
|
||
// create the dispatcher | ||
s, err := scheduler.New( | ||
scheduler.WithAlerter(sc.Alerter), | ||
scheduler.WithMessageQueue(sc.MessageQueue), | ||
scheduler.WithRepository(sc.EngineRepository), | ||
scheduler.WithLogger(sc.Logger), | ||
scheduler.WithPartition(p), | ||
scheduler.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue), | ||
scheduler.WithSchedulerPool(sc.SchedulingPool), | ||
) | ||
|
||
if err != nil { | ||
return nil, fmt.Errorf("could not create dispatcher: %w", err) | ||
} | ||
|
||
cleanup, err := s.Start() | ||
|
||
if err != nil { | ||
return nil, fmt.Errorf("could not start dispatcher: %w", err) | ||
} | ||
|
||
teardown = append(teardown, Teardown{ | ||
Name: "scheduler", | ||
Fn: cleanup, | ||
}) | ||
|
||
sv1, err := schedulerv1.New( | ||
schedulerv1.WithAlerter(sc.Alerter), | ||
schedulerv1.WithMessageQueue(sc.MessageQueueV1), | ||
schedulerv1.WithRepository(sc.EngineRepository), | ||
schedulerv1.WithV2Repository(sc.V1), | ||
schedulerv1.WithLogger(sc.Logger), | ||
schedulerv1.WithPartition(p), | ||
schedulerv1.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue), | ||
schedulerv1.WithSchedulerPool(sc.SchedulingPoolV1), | ||
) | ||
|
||
if err != nil { | ||
return nil, fmt.Errorf("could not create scheduler (v1): %w", err) | ||
} | ||
|
||
cleanup, err = sv1.Start() | ||
|
||
if err != nil { | ||
return nil, fmt.Errorf("could not start scheduler (v1): %w", err) | ||
} | ||
|
||
teardown = append(teardown, Teardown{ | ||
Name: "schedulerv1", | ||
Fn: cleanup, | ||
}) | ||
|
||
localScheduler = sv1 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just moved this up and added the localScheduler
assignment, but otherwise no code changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will need to be updated before final merge, as we'll probably get this deployed after payloads and partitions.
rec RECORD; | ||
BEGIN | ||
-- Only insert if there's a single task with initial_state = 'QUEUED' and concurrency_strategy_ids is not null | ||
IF (SELECT COUNT(*) FROM new_table WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NOT NULL) > 0 THEN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this IF statement to avoid cascading triggers on concurrency tables
WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NULL; | ||
|
||
-- Only insert into v1_dag and v1_dag_to_task if dag_id and dag_inserted_at are not null | ||
IF (SELECT COUNT(*) FROM new_table WHERE dag_id IS NOT NULL AND dag_inserted_at IS NOT NULL) = 0 THEN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, added this IF statement to avoid extra statements in this trigger for single-task workflows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to consolidate signaling logic between the local scheduler (which creates tasks) and the tasks controller, and this was the cleanest way to do it. I also figured it'd give us an easier way to hook into two-phase commits to keep the tasks controller and OLAP controller in sync.
} | ||
|
||
func (s *DispatcherImpl) Register(ctx context.Context, request *contracts.WorkerRegisterRequest) (*contracts.WorkerRegisterResponse, error) { | ||
func (d *DispatcherImpl) Register(ctx context.Context, request *contracts.WorkerRegisterRequest) (*contracts.WorkerRegisterResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drive-by, but the golint rules were complaining about using s *DispatcherImpl
in some places and d *DispatcherImpl
in other places, and I agree it makes the code harder to read, so changed all locations to use d *DispatcherImpl
.
res = append(res, &contracts.WorkflowRunEvent{ | ||
WorkflowRunId: payload.WorkflowRunId, | ||
EventType: contracts.WorkflowRunEventType_WORKFLOW_RUN_EVENT_TYPE_FINISHED, | ||
EventTimestamp: timestamppb.New(time.Now()), | ||
Results: []*contracts.StepRunResult{ | ||
{ | ||
StepRunId: payload.ExternalId, | ||
StepReadableId: payload.StepReadableId, | ||
JobRunId: payload.ExternalId, | ||
Output: &output, | ||
}, | ||
}, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the core change in this file, if we have a single-task workflow and we get a task-completed event, we send it immediately to the worker.
I think there's a chance of race conditions here, because we don't want to send a message to a worker before its been fully written/acknowledged by the database, which we risk here.
So if we were to fail to write the task-completed
event to the database, we won't successfully release the task, and it may have a different status after we hit a timeout/reassignment than the parent workflow would see.
Yet another case where 2PC is necessary - or perhaps just moving the processing of the task-completed event to the gRPC layer like we used to have. I need to think a bit more about it.
tenantIdWorkflowNameCache *expirable.LRU[string, *sqlcv1.ListWorkflowsByNamesRow] | ||
stepsInWorkflowVersionCache *expirable.LRU[string, []*sqlcv1.ListStepsByWorkflowVersionIdsRow] | ||
stepIdLabelsCache *expirable.LRU[string, []*sqlcv1.GetDesiredLabelsRow] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to start moving towards usage of the expirable
package instead of our home-built cache, because of nicer typing/LRU support - I think we can fully make this transition after we deprecate v0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some questions! I'm a little nervous about the amount of complexity this adds to some already-complex parts of the codebase, but if it's worth the performance gains then 🤷
func (i *AdminServiceImpl) ingest(ctx context.Context, tenantId string, opts ...*v1.WorkflowNameTriggerOpts) error { | ||
if i.localScheduler != nil { | ||
localWorkerIds := map[string]struct{}{} | ||
|
||
if i.localDispatcher != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this an exact copy of the implementation in internal/services/admin/server_v1.go
?
dagCp := dag | ||
msg, err := tasktypes.CreatedDAGMessage(tenantId, dagCp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we copying this because it gets mutated? if so, I'd much prefer to not mutate it 😅
for _, task := range tasks { | ||
taskExternalId := sqlchelpers.UUIDToStr(task.ExternalID) | ||
|
||
dataBytes := v1.NewCancelledTaskOutputEventFromTask(task).Bytes() | ||
|
||
internalEvents = append(internalEvents, v1.InternalTaskEvent{ | ||
TenantID: tenantId, | ||
TaskID: task.ID, | ||
TaskExternalID: taskExternalId, | ||
RetryCount: task.RetryCount, | ||
EventType: sqlcv1.V1TaskEventTypeCANCELLED, | ||
Data: dataBytes, | ||
}) | ||
} | ||
|
||
err := s.sendInternalEvents(ctx, tenantId, internalEvents) | ||
|
||
if err != nil { | ||
return err | ||
} | ||
|
||
// notify that tasks have been cancelled | ||
// TODO: make this transactionally safe? | ||
for _, task := range tasks { | ||
msg, err := tasktypes.MonitoringEventMessageFromInternal(tenantId, tasktypes.CreateMonitoringEventPayload{ | ||
TaskId: task.ID, | ||
RetryCount: task.RetryCount, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think it's worth consolidating these loops for performance?
// Note: this is very similar to handleTaskBulkAssignedTask, with some differences in what's sync vs run in a goroutine | ||
// In this method, we wait until all tasks have been sent to the worker before returning | ||
func (d *DispatcherImpl) HandleLocalAssignments(ctx context.Context, tenantId, workerId string, tasks []*schedulingv1.AssignedItemWithTask) error { | ||
// we set a timeout of 25 seconds because we don't want to hold the semaphore for longer than the visibility timeout (30 seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this 30 second visibility timeout hard-coded somewhere? it seems risky to hard-code this 25 sec if that's modifiable
for _, task := range bulkDatas { | ||
if parentData, ok := parentDataMap[task.ID]; ok { | ||
currInput := &v1.V1StepRunData{} | ||
|
||
if task.Input != nil { | ||
err := json.Unmarshal(task.Input, currInput) | ||
|
||
if err != nil { | ||
d.l.Warn().Err(err).Msg("failed to unmarshal input") | ||
continue | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just FYI, a bunch of this will conflict with the payloads changes
) | ||
} else { | ||
success = true | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we break
here on success?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're technically looping over individual worker sessions, and only one of those will be valid, the others will fail when trying to send to the worker (or they should). this is an edge case when a worker reconnects and we haven't determined that the old connection has been interrupted yet.
func eventToTaskV1(tenantId, eventExternalId, key string, data, additionalMeta []byte, priority *int32, scope *string, triggeringWebhookName *string) (*msgqueue.Message, error) { | ||
payloadTyped := tasktypes.UserEventTaskPayload{ | ||
EventExternalId: eventExternalId, | ||
func eventToPayload(tenantId, key string, data, additionalMeta []byte, priority *int32, scope *string, triggeringWebhookName *string) tasktypes.UserEventTaskPayload { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think this naming is a little confusing even if technically correct given our abuse of the term "payload"
return r.PrepareOptimisticTx(ctx) | ||
} | ||
|
||
func (r *optimisticSchedulingRepositoryImpl) TriggerFromEvents(ctx context.Context, tx *OptimisticTx, tenantId string, opts []EventTriggerOpts) ([]*sqlcv1.V1QueueItem, *TriggerFromEventsResult, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this and TriggerFromNames
are all copy and paste right? are there any changes here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah there are some small changes in how we're handling pre/post commits and treating the transactions
// look up the workflow versions for the workflow names | ||
workflowVersions, err := r.queries.ListWorkflowsByNames(ctx, tx, sqlcv1.ListWorkflowsByNamesParams{ | ||
Tenantid: sqlchelpers.UUIDFromStr(tenantId), | ||
Workflownames: workflowNamesToLookup, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check if workflowNamesToLookup
is empty before running this query?
|
||
steps, err := r.queries.ListStepsByWorkflowVersionIds(ctx, tx, sqlcv1.ListStepsByWorkflowVersionIdsParams{ | ||
Tenantid: sqlchelpers.UUIDFromStr(tenantId), | ||
Ids: workflowVersionsToLookup, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here maybe?
Description
Adds support for "optimistic" scheduling, meaning that if we can create tasks from the gRPC engine with transactional safety, and schedule tasks on workers which are connected to the current gRPC session (these are two separate concepts, referred to in code by
localScheduler
andlocalDispatcher
). We allocate a small set of semaphores for that.Features:
task-completed
. We can similarly addtask-failed
andtask-cancelled
in the future.Drawbacks:
Limitations:
n
engines are horizontally scales the chances of optimistic scheduling reduce by1/n
- we only use local schedulers when they have a lease on a tenant. We will need to build out a sticky load balancing strategy to take advantage of optimistic scheduling in HA setups.Type of change