diff --git a/frontend/docs/pages/self-hosting/prometheus-metrics.mdx b/frontend/docs/pages/self-hosting/prometheus-metrics.mdx index 826d88b523..d49b93aa0a 100644 --- a/frontend/docs/pages/self-hosting/prometheus-metrics.mdx +++ b/frontend/docs/pages/self-hosting/prometheus-metrics.mdx @@ -172,6 +172,7 @@ rate(hatchet_reassigned_tasks_total[5m]) | `hatchet_tenant_cancelled_tasks_total` | Counter | The total number of tasks cancelled | | `hatchet_tenant_assigned_tasks` | Counter | The total number of tasks assigned to a worker | | `hatchet_tenant_scheduling_timed_out` | Counter | The total number of tasks that timed out while waiting to be scheduled | +| `hatchet_tenant_task_scheduling_timed_out` | Counter | The total number of tasks that timed out containing a reason | | `hatchet_tenant_rate_limited` | Counter | The total number of tasks that were rate limited | | `hatchet_tenant_queued_to_assigned` | Counter | The total number of unique tasks that were queued and later got assigned to a worker | | `hatchet_tenant_queued_to_assigned_time_seconds` | Histogram | Buckets of time in seconds spent in the queue before being assigned to a worker | diff --git a/pkg/config/loader/loader.go b/pkg/config/loader/loader.go index 434993499a..930fad1aae 100644 --- a/pkg/config/loader/loader.go +++ b/pkg/config/loader/loader.go @@ -585,6 +585,8 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers schedulingPoolV1, cleanupSchedulingPoolV1, err := v1.NewSchedulingPool( dc.V1.Scheduler(), + dc.V1.Workflows(), + dc.EngineRepository.Step(), &queueLogger, cf.Runtime.SingleQueueLimit, cf.Runtime.SchedulerConcurrencyRateLimit, diff --git a/pkg/integrations/metrics/prometheus/tenant.go b/pkg/integrations/metrics/prometheus/tenant.go index a39fe22530..b700e1bad2 100644 --- a/pkg/integrations/metrics/prometheus/tenant.go +++ b/pkg/integrations/metrics/prometheus/tenant.go @@ -11,6 +11,7 @@ const ( TenantWorkflowDurationMilliseconds TenantHatchetMetric = "hatchet_tenant_workflow_duration_milliseconds" TenantAssignedTasksTotal TenantHatchetMetric = "hatchet_tenant_assigned_tasks" TenantSchedulingTimedOutTotal TenantHatchetMetric = "hatchet_tenant_scheduling_timed_out" + TenantTaskSchedulingTimedOutTotal TenantHatchetMetric = "hatchet_tenant_task_scheduling_timed_out" TenantRateLimitedTotal TenantHatchetMetric = "hatchet_tenant_rate_limited" TenantQueuedToAssignedTotal TenantHatchetMetric = "hatchet_tenant_queued_to_assigned" TenantQueuedToAssignedTimeSeconds TenantHatchetMetric = "hatchet_tenant_queued_to_assigned_time_seconds" @@ -109,6 +110,11 @@ var ( Help: "The total number of tasks that timed out while waiting to be scheduled", }, []string{"tenant_id"}) + TenantTaskSchedulingTimedOut = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: string(TenantTaskSchedulingTimedOutTotal), + Help: "The total number of tasks that timed out containing a reason", + }, []string{"tenant_id", "workflow_name", "task_name", "reason"}) + TenantRateLimited = promauto.NewCounterVec(prometheus.CounterOpts{ Name: string(TenantRateLimitedTotal), Help: "The total number of tasks that were rate limited", diff --git a/pkg/repository/postgres/dbsqlc/step_runs.sql b/pkg/repository/postgres/dbsqlc/step_runs.sql index a728dfef01..98e1eaf3c2 100644 --- a/pkg/repository/postgres/dbsqlc/step_runs.sql +++ b/pkg/repository/postgres/dbsqlc/step_runs.sql @@ -145,6 +145,14 @@ FROM WHERE "stepId" = @stepId::uuid; +-- name: GetStepReadableIdsByIds :many +SELECT + id, "readableId" +FROM + "Step" +WHERE + "id" = ANY(@ids::uuid[]); + -- name: GetStepRunMeta :one SELECT jr."workflowRunId" AS "workflowRunId", diff --git a/pkg/repository/postgres/dbsqlc/step_runs.sql.go b/pkg/repository/postgres/dbsqlc/step_runs.sql.go index 7d63e71f5c..ce6406df28 100644 --- a/pkg/repository/postgres/dbsqlc/step_runs.sql.go +++ b/pkg/repository/postgres/dbsqlc/step_runs.sql.go @@ -934,6 +934,40 @@ func (q *Queries) GetStepExpressions(ctx context.Context, db DBTX, stepid pgtype return items, nil } +const getStepReadableIdsByIds = `-- name: GetStepReadableIdsByIds :many +SELECT + id, "readableId" +FROM + "Step" +WHERE + "id" = ANY($1::uuid[]) +` + +type GetStepReadableIdsByIdsRow struct { + ID pgtype.UUID `json:"id"` + ReadableId pgtype.Text `json:"readableId"` +} + +func (q *Queries) GetStepReadableIdsByIds(ctx context.Context, db DBTX, ids []pgtype.UUID) ([]*GetStepReadableIdsByIdsRow, error) { + rows, err := db.Query(ctx, getStepReadableIdsByIds, ids) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*GetStepReadableIdsByIdsRow + for rows.Next() { + var i GetStepReadableIdsByIdsRow + if err := rows.Scan(&i.ID, &i.ReadableId); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getStepRun = `-- name: GetStepRun :one SELECT "StepRun".id, "StepRun"."createdAt", "StepRun"."updatedAt", "StepRun"."deletedAt", "StepRun"."tenantId", "StepRun"."jobRunId", "StepRun"."stepId", "StepRun"."order", "StepRun"."workerId", "StepRun"."tickerId", "StepRun".status, "StepRun".input, "StepRun".output, "StepRun"."requeueAfter", "StepRun"."scheduleTimeoutAt", "StepRun".error, "StepRun"."startedAt", "StepRun"."finishedAt", "StepRun"."timeoutAt", "StepRun"."cancelledAt", "StepRun"."cancelledReason", "StepRun"."cancelledError", "StepRun"."inputSchema", "StepRun"."callerFiles", "StepRun"."gitRepoBranch", "StepRun"."retryCount", "StepRun"."semaphoreReleased", "StepRun".queue, "StepRun".priority, "StepRun"."internalRetryCount" diff --git a/pkg/repository/postgres/step.go b/pkg/repository/postgres/step.go index 814fa061a8..b139f4161f 100644 --- a/pkg/repository/postgres/step.go +++ b/pkg/repository/postgres/step.go @@ -8,6 +8,7 @@ import ( "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/validator" + "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog" ) @@ -33,3 +34,18 @@ func NewStepRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Log func (j *stepRepository) ListStepExpressions(ctx context.Context, stepId string) ([]*dbsqlc.StepExpression, error) { return j.queries.GetStepExpressions(ctx, j.pool, sqlchelpers.UUIDFromStr(stepId)) } + +func (j *stepRepository) ListReadableIds(ctx context.Context, stepIds []pgtype.UUID) (map[pgtype.UUID]string, error) { + rows, err := j.queries.GetStepReadableIdsByIds(ctx, j.pool, stepIds) + if err != nil { + return nil, err + } + + readableIds := make(map[pgtype.UUID]string) + + for _, row := range rows { + readableIds[row.ID] = row.ReadableId.String + } + + return readableIds, nil +} diff --git a/pkg/repository/step.go b/pkg/repository/step.go index da78b7f8d5..83dac20cbd 100644 --- a/pkg/repository/step.go +++ b/pkg/repository/step.go @@ -3,9 +3,12 @@ package repository import ( "context" + "github.com/jackc/pgx/v5/pgtype" + "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" ) type StepRepository interface { ListStepExpressions(ctx context.Context, stepId string) ([]*dbsqlc.StepExpression, error) + ListReadableIds(ctx context.Context, stepIds []pgtype.UUID) (map[pgtype.UUID]string, error) } diff --git a/pkg/scheduling/v1/pool.go b/pkg/scheduling/v1/pool.go index f7915fcd3e..8067cef25d 100644 --- a/pkg/scheduling/v1/pool.go +++ b/pkg/scheduling/v1/pool.go @@ -6,13 +6,16 @@ import ( "github.com/rs/zerolog" + "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" ) type sharedConfig struct { - repo v1.SchedulerRepository + repo v1.SchedulerRepository + workflowRepo v1.WorkflowRepository + stepRepo repository.StepRepository l *zerolog.Logger @@ -35,7 +38,7 @@ type SchedulingPool struct { concurrencyResultsCh chan *ConcurrencyResults } -func NewSchedulingPool(repo v1.SchedulerRepository, l *zerolog.Logger, singleQueueLimit int, schedulerConcurrencyRateLimit int) (*SchedulingPool, func() error, error) { +func NewSchedulingPool(repo v1.SchedulerRepository, workflowRepo v1.WorkflowRepository, stepRepo repository.StepRepository, l *zerolog.Logger, singleQueueLimit int, schedulerConcurrencyRateLimit int) (*SchedulingPool, func() error, error) { resultsCh := make(chan *QueueResults, 1000) concurrencyResultsCh := make(chan *ConcurrencyResults, 1000) @@ -43,6 +46,8 @@ func NewSchedulingPool(repo v1.SchedulerRepository, l *zerolog.Logger, singleQue Extensions: &Extensions{}, cf: &sharedConfig{ repo: repo, + workflowRepo: workflowRepo, + stepRepo: stepRepo, l: l, singleQueueLimit: singleQueueLimit, schedulerConcurrencyRateLimit: schedulerConcurrencyRateLimit, diff --git a/pkg/scheduling/v1/queuer.go b/pkg/scheduling/v1/queuer.go index 119cbca1aa..32788d286e 100644 --- a/pkg/scheduling/v1/queuer.go +++ b/pkg/scheduling/v1/queuer.go @@ -6,17 +6,22 @@ import ( "sync" "time" + lru "github.com/hashicorp/golang-lru/v2" "github.com/jackc/pgx/v5/pgtype" "github.com/rs/zerolog" "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/integrations/metrics/prometheus" + "github.com/hatchet-dev/hatchet/pkg/repository" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" ) type Queuer struct { - repo v1.QueueRepository + repo v1.QueueRepository + workflowRepo v1.WorkflowRepository + stepRepo repository.StepRepository + tenantId pgtype.UUID queueName string @@ -44,6 +49,15 @@ type Queuer struct { unassigned map[int64]*sqlcv1.V1QueueItem unassignedMu mutex + // used to keep a track of tasks that were rate limited and eventually scheduled timed out + unassignedRateLimited sync.Map + + // used to cache the workflow names by ID for prometheus metrics + workflowNameCache *lru.Cache[pgtype.UUID, string] + + // used to cache the step readable IDs by ID for prometheus metrics + stepReadableIdCache *lru.Cache[pgtype.UUID, string] + hasRateLimits bool } @@ -58,20 +72,34 @@ func newQueuer(conf *sharedConfig, tenantId pgtype.UUID, queueName string, s *Sc notifyQueueCh := make(chan map[string]string, 1) + workflowNameCache, err := lru.New[pgtype.UUID, string](1000) + if err != nil { + conf.l.Fatal().Err(err).Msg("failed to create workflow name cache") + } + + stepReadableIdCache, err := lru.New[pgtype.UUID, string](1000) + if err != nil { + conf.l.Fatal().Err(err).Msg("failed to create step readable ID cache") + } + q := &Queuer{ - repo: queueRepo, - tenantId: tenantId, - queueName: queueName, - l: conf.l, - s: s, - limit: defaultLimit, - resultsCh: resultsCh, - notifyQueueCh: notifyQueueCh, - queueMu: newMu(conf.l), - unackedMu: newRWMu(conf.l), - unacked: make(map[int64]struct{}), - unassigned: make(map[int64]*sqlcv1.V1QueueItem), - unassignedMu: newMu(conf.l), + repo: queueRepo, + workflowRepo: conf.workflowRepo, + stepRepo: conf.stepRepo, + tenantId: tenantId, + queueName: queueName, + l: conf.l, + s: s, + limit: defaultLimit, + resultsCh: resultsCh, + notifyQueueCh: notifyQueueCh, + queueMu: newMu(conf.l), + unackedMu: newRWMu(conf.l), + unacked: make(map[int64]struct{}), + unassigned: make(map[int64]*sqlcv1.V1QueueItem), + unassignedMu: newMu(conf.l), + workflowNameCache: workflowNameCache, + stepReadableIdCache: stepReadableIdCache, } ctx, cancel := context.WithCancel(context.Background()) @@ -260,9 +288,61 @@ func (q *Queuer) loopQueue(ctx context.Context) { } } - for range ar.schedulingTimedOut { + var workflowIdsMissingFromCache []pgtype.UUID + var stepIdsMissingFromCache []pgtype.UUID + + for _, qi := range ar.schedulingTimedOut { + if _, ok := q.workflowNameCache.Get(qi.WorkflowID); !ok { + workflowIdsMissingFromCache = append(workflowIdsMissingFromCache, qi.WorkflowID) + } + + if _, ok := q.stepReadableIdCache.Get(qi.StepID); !ok { + stepIdsMissingFromCache = append(stepIdsMissingFromCache, qi.StepID) + } + } + + if len(workflowIdsMissingFromCache) > 0 { + workflowNames, err := q.workflowRepo.ListWorkflowNamesByIds(ctx, q.tenantId.String(), workflowIdsMissingFromCache) + if err != nil { + q.l.Error().Err(err).Msg("error getting workflow names for prometheus metric: TenantTaskSchedulingTimedOut") + return + } + + for id, name := range workflowNames { + q.workflowNameCache.Add(id, name) + } + } + + if len(stepIdsMissingFromCache) > 0 { + stepReadableIds, err := q.stepRepo.ListReadableIds(ctx, stepIdsMissingFromCache) + if err != nil { + q.l.Error().Err(err).Msg("error getting step readable ids for prometheus metric: TenantTaskSchedulingTimedOut") + return + } + + for id, readableId := range stepReadableIds { + q.stepReadableIdCache.Add(id, readableId) + } + } + + for _, qi := range ar.schedulingTimedOut { prometheus.SchedulingTimedOut.Inc() prometheus.TenantSchedulingTimedOut.WithLabelValues(q.tenantId.String()).Inc() + + workflowName, wOk := q.workflowNameCache.Get(qi.WorkflowID) + stepReadableId, sOk := q.stepReadableIdCache.Get(qi.StepID) + + if !wOk || !sOk { + continue + } + + reason := "no workers available" + if _, ok := q.unassignedRateLimited.Load(qi.TaskID); ok { + reason = "rate limited" + } + q.unassignedRateLimited.Delete(qi.TaskID) + + prometheus.TenantTaskSchedulingTimedOut.WithLabelValues(q.tenantId.String(), workflowName, stepReadableId, reason).Inc() } for range ar.rateLimited { @@ -392,14 +472,19 @@ func (q *Queuer) ack(r *assignResults) { q.unassignedMu.Lock() defer q.unassignedMu.Unlock() + // WARNING: this ordering is very important since we depend on rate limited items to be + // processed last in order to make prometheus metrics for rate limited items work correctly + for _, assignedItem := range r.assigned { delete(q.unacked, assignedItem.QueueItem.ID) delete(q.unassigned, assignedItem.QueueItem.ID) + q.unassignedRateLimited.Delete(assignedItem.QueueItem.TaskID) } for _, unassignedItem := range r.unassigned { delete(q.unacked, unassignedItem.ID) q.unassigned[unassignedItem.ID] = unassignedItem + q.unassignedRateLimited.Delete(unassignedItem.TaskID) } for _, schedulingTimedOutItem := range r.schedulingTimedOut { @@ -415,6 +500,7 @@ func (q *Queuer) ack(r *assignResults) { for _, rateLimitedItem := range r.rateLimited { delete(q.unacked, rateLimitedItem.qi.ID) q.unassigned[rateLimitedItem.qi.ID] = rateLimitedItem.qi + q.unassignedRateLimited.Store(rateLimitedItem.qi.TaskID, struct{}{}) } }