Skip to content

Conversation

abelanger5
Copy link
Contributor

@abelanger5 abelanger5 commented Sep 7, 2025

Description

Adds support for "optimistic" scheduling, meaning that if we can create tasks from the gRPC engine with transactional safety, and schedule tasks on workers which are connected to the current gRPC session (these are two separate concepts, referred to in code by localScheduler and localDispatcher). We allocate a small set of semaphores for that.

Features:

  • Up to a 3x speedup in scheduling performance, from 24ms -> 8ms for single-task workflows
  • Reduces overall pressure on the message queue and downstream components, as there are fewer messages being passed for scheduling purposes
  • Made some improvements to listening for a task completed event for a single-task workflow by hooking into an existing tenant message task-completed. We can similarly add task-failed and task-cancelled in the future.

Drawbacks:

  • Increases the complexity of scheduling as the paths for optimistic scheduling are quite different from the regular path, since we protect everything with a single transaction
  • Can increase pressure on the engines. I've tried to avoid major issues by only allocating 10 "scheduling slots" to each gRPC process (configurable via an env var)

Limitations:

  • Scheduling child workflows is still significantly slower than scheduling non-child workflows, because we have ~6ms of latency due to how we're checking idempotency on the child workflow trigger
    • I think we could improve this a lot with idempotency keys, it really should only be a single database transaction to insert/lookup the idempotency keys
  • This won't be turned on in HA mode and as n engines are horizontally scales the chances of optimistic scheduling reduce by 1/n - we only use local schedulers when they have a lease on a tenant. We will need to build out a sticky load balancing strategy to take advantage of optimistic scheduling in HA setups.

Type of change

  • New feature (non-breaking change which adds functionality)

Copy link

vercel bot commented Sep 7, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
hatchet-docs Ready Ready Preview Comment Sep 8, 2025 1:32pm
hatchet-v0-docs Ready Ready Preview Comment Sep 8, 2025 1:32pm

Comment on lines +891 to +958
var localScheduler *schedulerv1.Scheduler

if sc.HasService("all") || sc.HasService("scheduler") {
partitionCleanup, err := p.StartSchedulerPartition(ctx)

if err != nil {
return nil, fmt.Errorf("could not create create scheduler partition: %w", err)
}

teardown = append(teardown, Teardown{
Name: "scheduler partition",
Fn: partitionCleanup,
})

// create the dispatcher
s, err := scheduler.New(
scheduler.WithAlerter(sc.Alerter),
scheduler.WithMessageQueue(sc.MessageQueue),
scheduler.WithRepository(sc.EngineRepository),
scheduler.WithLogger(sc.Logger),
scheduler.WithPartition(p),
scheduler.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
scheduler.WithSchedulerPool(sc.SchedulingPool),
)

if err != nil {
return nil, fmt.Errorf("could not create dispatcher: %w", err)
}

cleanup, err := s.Start()

if err != nil {
return nil, fmt.Errorf("could not start dispatcher: %w", err)
}

teardown = append(teardown, Teardown{
Name: "scheduler",
Fn: cleanup,
})

sv1, err := schedulerv1.New(
schedulerv1.WithAlerter(sc.Alerter),
schedulerv1.WithMessageQueue(sc.MessageQueueV1),
schedulerv1.WithRepository(sc.EngineRepository),
schedulerv1.WithV2Repository(sc.V1),
schedulerv1.WithLogger(sc.Logger),
schedulerv1.WithPartition(p),
schedulerv1.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
schedulerv1.WithSchedulerPool(sc.SchedulingPoolV1),
)

if err != nil {
return nil, fmt.Errorf("could not create scheduler (v1): %w", err)
}

cleanup, err = sv1.Start()

if err != nil {
return nil, fmt.Errorf("could not start scheduler (v1): %w", err)
}

teardown = append(teardown, Teardown{
Name: "schedulerv1",
Fn: cleanup,
})

localScheduler = sv1
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved this up and added the localScheduler assignment, but otherwise no code changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need to be updated before final merge, as we'll probably get this deployed after payloads and partitions.

rec RECORD;
BEGIN
-- Only insert if there's a single task with initial_state = 'QUEUED' and concurrency_strategy_ids is not null
IF (SELECT COUNT(*) FROM new_table WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NOT NULL) > 0 THEN
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this IF statement to avoid cascading triggers on concurrency tables

WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NULL;

-- Only insert into v1_dag and v1_dag_to_task if dag_id and dag_inserted_at are not null
IF (SELECT COUNT(*) FROM new_table WHERE dag_id IS NOT NULL AND dag_inserted_at IS NOT NULL) = 0 THEN
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, added this IF statement to avoid extra statements in this trigger for single-task workflows

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to consolidate signaling logic between the local scheduler (which creates tasks) and the tasks controller, and this was the cleanest way to do it. I also figured it'd give us an easier way to hook into two-phase commits to keep the tasks controller and OLAP controller in sync.

}

func (s *DispatcherImpl) Register(ctx context.Context, request *contracts.WorkerRegisterRequest) (*contracts.WorkerRegisterResponse, error) {
func (d *DispatcherImpl) Register(ctx context.Context, request *contracts.WorkerRegisterRequest) (*contracts.WorkerRegisterResponse, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by, but the golint rules were complaining about using s *DispatcherImpl in some places and d *DispatcherImpl in other places, and I agree it makes the code harder to read, so changed all locations to use d *DispatcherImpl.

Comment on lines +1346 to +1358
res = append(res, &contracts.WorkflowRunEvent{
WorkflowRunId: payload.WorkflowRunId,
EventType: contracts.WorkflowRunEventType_WORKFLOW_RUN_EVENT_TYPE_FINISHED,
EventTimestamp: timestamppb.New(time.Now()),
Results: []*contracts.StepRunResult{
{
StepRunId: payload.ExternalId,
StepReadableId: payload.StepReadableId,
JobRunId: payload.ExternalId,
Output: &output,
},
},
})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the core change in this file, if we have a single-task workflow and we get a task-completed event, we send it immediately to the worker.

I think there's a chance of race conditions here, because we don't want to send a message to a worker before its been fully written/acknowledged by the database, which we risk here.

So if we were to fail to write the task-completed event to the database, we won't successfully release the task, and it may have a different status after we hit a timeout/reassignment than the parent workflow would see.

Yet another case where 2PC is necessary - or perhaps just moving the processing of the task-completed event to the gRPC layer like we used to have. I need to think a bit more about it.

Comment on lines +36 to +38
tenantIdWorkflowNameCache *expirable.LRU[string, *sqlcv1.ListWorkflowsByNamesRow]
stepsInWorkflowVersionCache *expirable.LRU[string, []*sqlcv1.ListStepsByWorkflowVersionIdsRow]
stepIdLabelsCache *expirable.LRU[string, []*sqlcv1.GetDesiredLabelsRow]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to start moving towards usage of the expirable package instead of our home-built cache, because of nicer typing/LRU support - I think we can fully make this transition after we deprecate v0.

Copy link
Contributor

@mrkaye97 mrkaye97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some questions! I'm a little nervous about the amount of complexity this adds to some already-complex parts of the codebase, but if it's worth the performance gains then 🤷

Comment on lines 440 to +444
func (i *AdminServiceImpl) ingest(ctx context.Context, tenantId string, opts ...*v1.WorkflowNameTriggerOpts) error {
if i.localScheduler != nil {
localWorkerIds := map[string]struct{}{}

if i.localDispatcher != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this an exact copy of the implementation in internal/services/admin/server_v1.go?

Comment on lines +45 to +46
dagCp := dag
msg, err := tasktypes.CreatedDAGMessage(tenantId, dagCp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we copying this because it gets mutated? if so, I'd much prefer to not mutate it 😅

Comment on lines +249 to +275
for _, task := range tasks {
taskExternalId := sqlchelpers.UUIDToStr(task.ExternalID)

dataBytes := v1.NewCancelledTaskOutputEventFromTask(task).Bytes()

internalEvents = append(internalEvents, v1.InternalTaskEvent{
TenantID: tenantId,
TaskID: task.ID,
TaskExternalID: taskExternalId,
RetryCount: task.RetryCount,
EventType: sqlcv1.V1TaskEventTypeCANCELLED,
Data: dataBytes,
})
}

err := s.sendInternalEvents(ctx, tenantId, internalEvents)

if err != nil {
return err
}

// notify that tasks have been cancelled
// TODO: make this transactionally safe?
for _, task := range tasks {
msg, err := tasktypes.MonitoringEventMessageFromInternal(tenantId, tasktypes.CreateMonitoringEventPayload{
TaskId: task.ID,
RetryCount: task.RetryCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think it's worth consolidating these loops for performance?

// Note: this is very similar to handleTaskBulkAssignedTask, with some differences in what's sync vs run in a goroutine
// In this method, we wait until all tasks have been sent to the worker before returning
func (d *DispatcherImpl) HandleLocalAssignments(ctx context.Context, tenantId, workerId string, tasks []*schedulingv1.AssignedItemWithTask) error {
// we set a timeout of 25 seconds because we don't want to hold the semaphore for longer than the visibility timeout (30 seconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this 30 second visibility timeout hard-coded somewhere? it seems risky to hard-code this 25 sec if that's modifiable

Comment on lines +437 to +448
for _, task := range bulkDatas {
if parentData, ok := parentDataMap[task.ID]; ok {
currInput := &v1.V1StepRunData{}

if task.Input != nil {
err := json.Unmarshal(task.Input, currInput)

if err != nil {
d.l.Warn().Err(err).Msg("failed to unmarshal input")
continue
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just FYI, a bunch of this will conflict with the payloads changes

)
} else {
success = true
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we break here on success?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're technically looping over individual worker sessions, and only one of those will be valid, the others will fail when trying to send to the worker (or they should). this is an edge case when a worker reconnects and we haven't determined that the old connection has been interrupted yet.

func eventToTaskV1(tenantId, eventExternalId, key string, data, additionalMeta []byte, priority *int32, scope *string, triggeringWebhookName *string) (*msgqueue.Message, error) {
payloadTyped := tasktypes.UserEventTaskPayload{
EventExternalId: eventExternalId,
func eventToPayload(tenantId, key string, data, additionalMeta []byte, priority *int32, scope *string, triggeringWebhookName *string) tasktypes.UserEventTaskPayload {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this naming is a little confusing even if technically correct given our abuse of the term "payload"

return r.PrepareOptimisticTx(ctx)
}

func (r *optimisticSchedulingRepositoryImpl) TriggerFromEvents(ctx context.Context, tx *OptimisticTx, tenantId string, opts []EventTriggerOpts) ([]*sqlcv1.V1QueueItem, *TriggerFromEventsResult, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and TriggerFromNames are all copy and paste right? are there any changes here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah there are some small changes in how we're handling pre/post commits and treating the transactions

// look up the workflow versions for the workflow names
workflowVersions, err := r.queries.ListWorkflowsByNames(ctx, tx, sqlcv1.ListWorkflowsByNamesParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Workflownames: workflowNamesToLookup,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check if workflowNamesToLookup is empty before running this query?


steps, err := r.queries.ListStepsByWorkflowVersionIds(ctx, tx, sqlcv1.ListStepsByWorkflowVersionIdsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Ids: workflowVersionsToLookup,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here maybe?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants