Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions frontend/docs/pages/self-hosting/prometheus-metrics.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ rate(hatchet_reassigned_tasks_total[5m])
| `hatchet_tenant_cancelled_tasks_total` | Counter | The total number of tasks cancelled |
| `hatchet_tenant_assigned_tasks` | Counter | The total number of tasks assigned to a worker |
| `hatchet_tenant_scheduling_timed_out` | Counter | The total number of tasks that timed out while waiting to be scheduled |
| `hatchet_tenant_task_scheduling_timed_out` | Counter | The total number of tasks that timed out containing a reason |
| `hatchet_tenant_rate_limited` | Counter | The total number of tasks that were rate limited |
| `hatchet_tenant_queued_to_assigned` | Counter | The total number of unique tasks that were queued and later got assigned to a worker |
| `hatchet_tenant_queued_to_assigned_time_seconds` | Histogram | Buckets of time in seconds spent in the queue before being assigned to a worker |
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,8 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers

schedulingPoolV1, cleanupSchedulingPoolV1, err := v1.NewSchedulingPool(
dc.V1.Scheduler(),
dc.V1.Workflows(),
dc.EngineRepository.Step(),
&queueLogger,
cf.Runtime.SingleQueueLimit,
cf.Runtime.SchedulerConcurrencyRateLimit,
Expand Down
6 changes: 6 additions & 0 deletions pkg/integrations/metrics/prometheus/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
TenantWorkflowDurationMilliseconds TenantHatchetMetric = "hatchet_tenant_workflow_duration_milliseconds"
TenantAssignedTasksTotal TenantHatchetMetric = "hatchet_tenant_assigned_tasks"
TenantSchedulingTimedOutTotal TenantHatchetMetric = "hatchet_tenant_scheduling_timed_out"
TenantTaskSchedulingTimedOutTotal TenantHatchetMetric = "hatchet_tenant_task_scheduling_timed_out"
TenantRateLimitedTotal TenantHatchetMetric = "hatchet_tenant_rate_limited"
TenantQueuedToAssignedTotal TenantHatchetMetric = "hatchet_tenant_queued_to_assigned"
TenantQueuedToAssignedTimeSeconds TenantHatchetMetric = "hatchet_tenant_queued_to_assigned_time_seconds"
Expand Down Expand Up @@ -106,6 +107,11 @@ var (
Help: "The total number of tasks that timed out while waiting to be scheduled",
}, []string{"tenant_id"})

TenantTaskSchedulingTimedOut = promauto.NewCounterVec(prometheus.CounterOpts{
Name: string(TenantTaskSchedulingTimedOutTotal),
Help: "The total number of tasks that timed out containing a reason",
}, []string{"tenant_id", "workflow_name", "task_name", "reason"})

TenantRateLimited = promauto.NewCounterVec(prometheus.CounterOpts{
Name: string(TenantRateLimitedTotal),
Help: "The total number of tasks that were rate limited",
Expand Down
8 changes: 8 additions & 0 deletions pkg/repository/postgres/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ FROM
WHERE
"stepId" = @stepId::uuid;

-- name: GetStepReadableIdsByIds :many
SELECT
id, "readableId"
FROM
"Step"
WHERE
"id" = ANY(@ids::uuid[]);

-- name: GetStepRunMeta :one
SELECT
jr."workflowRunId" AS "workflowRunId",
Expand Down
34 changes: 34 additions & 0 deletions pkg/repository/postgres/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/repository/postgres/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
"github.com/hatchet-dev/hatchet/pkg/validator"

"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog"
)
Expand All @@ -33,3 +34,18 @@ func NewStepRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Log
func (j *stepRepository) ListStepExpressions(ctx context.Context, stepId string) ([]*dbsqlc.StepExpression, error) {
return j.queries.GetStepExpressions(ctx, j.pool, sqlchelpers.UUIDFromStr(stepId))
}

func (j *stepRepository) ListReadableIds(ctx context.Context, stepIds []pgtype.UUID) (map[pgtype.UUID]string, error) {
rows, err := j.queries.GetStepReadableIdsByIds(ctx, j.pool, stepIds)
if err != nil {
return nil, err
}

readableIds := make(map[pgtype.UUID]string)

for _, row := range rows {
readableIds[row.ID] = row.ReadableId.String
}

return readableIds, nil
}
3 changes: 3 additions & 0 deletions pkg/repository/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package repository
import (
"context"

"github.com/jackc/pgx/v5/pgtype"

"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
)

type StepRepository interface {
ListStepExpressions(ctx context.Context, stepId string) ([]*dbsqlc.StepExpression, error)
ListReadableIds(ctx context.Context, stepIds []pgtype.UUID) (map[pgtype.UUID]string, error)
}
9 changes: 7 additions & 2 deletions pkg/scheduling/v1/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import (

"github.com/rs/zerolog"

"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
)

type sharedConfig struct {
repo v1.SchedulerRepository
repo v1.SchedulerRepository
workflowRepo v1.WorkflowRepository
stepRepo repository.StepRepository

l *zerolog.Logger

Expand All @@ -35,14 +38,16 @@ type SchedulingPool struct {
concurrencyResultsCh chan *ConcurrencyResults
}

func NewSchedulingPool(repo v1.SchedulerRepository, l *zerolog.Logger, singleQueueLimit int, schedulerConcurrencyRateLimit int) (*SchedulingPool, func() error, error) {
func NewSchedulingPool(repo v1.SchedulerRepository, workflowRepo v1.WorkflowRepository, stepRepo repository.StepRepository, l *zerolog.Logger, singleQueueLimit int, schedulerConcurrencyRateLimit int) (*SchedulingPool, func() error, error) {
resultsCh := make(chan *QueueResults, 1000)
concurrencyResultsCh := make(chan *ConcurrencyResults, 1000)

s := &SchedulingPool{
Extensions: &Extensions{},
cf: &sharedConfig{
repo: repo,
workflowRepo: workflowRepo,
stepRepo: stepRepo,
l: l,
singleQueueLimit: singleQueueLimit,
schedulerConcurrencyRateLimit: schedulerConcurrencyRateLimit,
Expand Down
118 changes: 103 additions & 15 deletions pkg/scheduling/v1/queuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/jackc/pgx/v5/pgtype"
"github.com/rs/zerolog"

"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/integrations/metrics/prometheus"
"github.com/hatchet-dev/hatchet/pkg/repository"
v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
)

type Queuer struct {
repo v1.QueueRepository
repo v1.QueueRepository
workflowRepo v1.WorkflowRepository
stepRepo repository.StepRepository

tenantId pgtype.UUID
queueName string

Expand Down Expand Up @@ -44,6 +49,15 @@ type Queuer struct {
unassigned map[int64]*sqlcv1.V1QueueItem
unassignedMu mutex

// used to keep a track of tasks that were rate limited and eventually scheduled timed out
unassignedRateLimited sync.Map

// used to cache the workflow names by ID for prometheus metrics
workflowNameCache *lru.Cache[pgtype.UUID, string]

// used to cache the step readable IDs by ID for prometheus metrics
stepReadableIdCache *lru.Cache[pgtype.UUID, string]

hasRateLimits bool
}

Expand All @@ -54,24 +68,43 @@ func newQueuer(conf *sharedConfig, tenantId pgtype.UUID, queueName string, s *Sc
defaultLimit = conf.singleQueueLimit
}

cacheSize := 1000
if conf.singleQueueLimit > 1000 {
cacheSize = conf.singleQueueLimit
}

queueRepo := conf.repo.QueueFactory().NewQueue(tenantId, queueName)

notifyQueueCh := make(chan map[string]string, 1)

workflowNameCache, err := lru.New[pgtype.UUID, string](cacheSize)
if err != nil {
conf.l.Fatal().Err(err).Msg("failed to create workflow name cache")
}

stepReadableIdCache, err := lru.New[pgtype.UUID, string](cacheSize)
if err != nil {
conf.l.Fatal().Err(err).Msg("failed to create step readable ID cache")
}

q := &Queuer{
repo: queueRepo,
tenantId: tenantId,
queueName: queueName,
l: conf.l,
s: s,
limit: defaultLimit,
resultsCh: resultsCh,
notifyQueueCh: notifyQueueCh,
queueMu: newMu(conf.l),
unackedMu: newRWMu(conf.l),
unacked: make(map[int64]struct{}),
unassigned: make(map[int64]*sqlcv1.V1QueueItem),
unassignedMu: newMu(conf.l),
repo: queueRepo,
workflowRepo: conf.workflowRepo,
stepRepo: conf.stepRepo,
tenantId: tenantId,
queueName: queueName,
l: conf.l,
s: s,
limit: defaultLimit,
resultsCh: resultsCh,
notifyQueueCh: notifyQueueCh,
queueMu: newMu(conf.l),
unackedMu: newRWMu(conf.l),
unacked: make(map[int64]struct{}),
unassigned: make(map[int64]*sqlcv1.V1QueueItem),
unassignedMu: newMu(conf.l),
workflowNameCache: workflowNameCache,
stepReadableIdCache: stepReadableIdCache,
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -260,9 +293,61 @@ func (q *Queuer) loopQueue(ctx context.Context) {
}
}

for range ar.schedulingTimedOut {
var workflowIdsMissingFromCache []pgtype.UUID
var stepIdsMissingFromCache []pgtype.UUID

for _, qi := range ar.schedulingTimedOut {
if _, ok := q.workflowNameCache.Get(qi.WorkflowID); !ok {
workflowIdsMissingFromCache = append(workflowIdsMissingFromCache, qi.WorkflowID)
}

if _, ok := q.stepReadableIdCache.Get(qi.StepID); !ok {
stepIdsMissingFromCache = append(stepIdsMissingFromCache, qi.StepID)
}
}

if len(workflowIdsMissingFromCache) > 0 {
workflowNames, err := q.workflowRepo.ListWorkflowNamesByIds(ctx, q.tenantId.String(), workflowIdsMissingFromCache)
if err != nil {
q.l.Error().Err(err).Msg("error getting workflow names for prometheus metric: TenantTaskSchedulingTimedOut")
return
}

for id, name := range workflowNames {
q.workflowNameCache.Add(id, name)
}
}

if len(stepIdsMissingFromCache) > 0 {
stepReadableIds, err := q.stepRepo.ListReadableIds(ctx, stepIdsMissingFromCache)
if err != nil {
q.l.Error().Err(err).Msg("error getting step readable ids for prometheus metric: TenantTaskSchedulingTimedOut")
return
}

for id, readableId := range stepReadableIds {
q.stepReadableIdCache.Add(id, readableId)
}
}

for _, qi := range ar.schedulingTimedOut {
prometheus.SchedulingTimedOut.Inc()
prometheus.TenantSchedulingTimedOut.WithLabelValues(q.tenantId.String()).Inc()

workflowName, wOk := q.workflowNameCache.Get(qi.WorkflowID)
stepReadableId, sOk := q.stepReadableIdCache.Get(qi.StepID)

if !wOk || !sOk {
continue
}

reason := "no workers available"
if _, ok := q.unassignedRateLimited.Load(qi.TaskID); ok {
reason = "rate limited"
}
q.unassignedRateLimited.Delete(qi.TaskID)

prometheus.TenantTaskSchedulingTimedOut.WithLabelValues(q.tenantId.String(), workflowName, stepReadableId, reason).Inc()
}

for range ar.rateLimited {
Expand Down Expand Up @@ -395,11 +480,13 @@ func (q *Queuer) ack(r *assignResults) {
for _, assignedItem := range r.assigned {
delete(q.unacked, assignedItem.QueueItem.ID)
delete(q.unassigned, assignedItem.QueueItem.ID)
q.unassignedRateLimited.Delete(assignedItem.QueueItem.TaskID)
}

for _, unassignedItem := range r.unassigned {
delete(q.unacked, unassignedItem.ID)
q.unassigned[unassignedItem.ID] = unassignedItem
q.unassignedRateLimited.Delete(unassignedItem.TaskID)
}

for _, schedulingTimedOutItem := range r.schedulingTimedOut {
Expand All @@ -410,6 +497,7 @@ func (q *Queuer) ack(r *assignResults) {
for _, rateLimitedItem := range r.rateLimited {
delete(q.unacked, rateLimitedItem.qi.ID)
q.unassigned[rateLimitedItem.qi.ID] = rateLimitedItem.qi
q.unassignedRateLimited.Store(rateLimitedItem.qi.TaskID, struct{}{})
}
}

Expand Down
Loading