Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions frontend/docs/pages/self-hosting/prometheus-metrics.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ rate(hatchet_reassigned_tasks_total[5m])
| `hatchet_tenant_cancelled_tasks_total` | Counter | The total number of tasks cancelled |
| `hatchet_tenant_assigned_tasks` | Counter | The total number of tasks assigned to a worker |
| `hatchet_tenant_scheduling_timed_out` | Counter | The total number of tasks that timed out while waiting to be scheduled |
| `hatchet_tenant_task_scheduling_timed_out` | Counter | The total number of tasks that timed out containing a reason |
| `hatchet_tenant_rate_limited` | Counter | The total number of tasks that were rate limited |
| `hatchet_tenant_queued_to_assigned` | Counter | The total number of unique tasks that were queued and later got assigned to a worker |
| `hatchet_tenant_queued_to_assigned_time_seconds` | Histogram | Buckets of time in seconds spent in the queue before being assigned to a worker |
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,8 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers

schedulingPoolV1, cleanupSchedulingPoolV1, err := v1.NewSchedulingPool(
dc.V1.Scheduler(),
dc.V1.Workflows(),
dc.EngineRepository.Step(),
&queueLogger,
cf.Runtime.SingleQueueLimit,
cf.Runtime.SchedulerConcurrencyRateLimit,
Expand Down
6 changes: 6 additions & 0 deletions pkg/integrations/metrics/prometheus/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
TenantWorkflowDurationMilliseconds TenantHatchetMetric = "hatchet_tenant_workflow_duration_milliseconds"
TenantAssignedTasksTotal TenantHatchetMetric = "hatchet_tenant_assigned_tasks"
TenantSchedulingTimedOutTotal TenantHatchetMetric = "hatchet_tenant_scheduling_timed_out"
TenantTaskSchedulingTimedOutTotal TenantHatchetMetric = "hatchet_tenant_task_scheduling_timed_out"
TenantRateLimitedTotal TenantHatchetMetric = "hatchet_tenant_rate_limited"
TenantQueuedToAssignedTotal TenantHatchetMetric = "hatchet_tenant_queued_to_assigned"
TenantQueuedToAssignedTimeSeconds TenantHatchetMetric = "hatchet_tenant_queued_to_assigned_time_seconds"
Expand Down Expand Up @@ -109,6 +110,11 @@ var (
Help: "The total number of tasks that timed out while waiting to be scheduled",
}, []string{"tenant_id"})

TenantTaskSchedulingTimedOut = promauto.NewCounterVec(prometheus.CounterOpts{
Name: string(TenantTaskSchedulingTimedOutTotal),
Help: "The total number of tasks that timed out containing a reason",
}, []string{"tenant_id", "workflow_name", "task_name", "reason"})

TenantRateLimited = promauto.NewCounterVec(prometheus.CounterOpts{
Name: string(TenantRateLimitedTotal),
Help: "The total number of tasks that were rate limited",
Expand Down
8 changes: 8 additions & 0 deletions pkg/repository/postgres/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ FROM
WHERE
"stepId" = @stepId::uuid;

-- name: GetStepReadableIdsByIds :many
SELECT
id, "readableId"
FROM
"Step"
WHERE
"id" = ANY(@ids::uuid[]);

-- name: GetStepRunMeta :one
SELECT
jr."workflowRunId" AS "workflowRunId",
Expand Down
34 changes: 34 additions & 0 deletions pkg/repository/postgres/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/repository/postgres/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
"github.com/hatchet-dev/hatchet/pkg/validator"

"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog"
)
Expand All @@ -33,3 +34,18 @@ func NewStepRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Log
func (j *stepRepository) ListStepExpressions(ctx context.Context, stepId string) ([]*dbsqlc.StepExpression, error) {
return j.queries.GetStepExpressions(ctx, j.pool, sqlchelpers.UUIDFromStr(stepId))
}

func (j *stepRepository) ListReadableIds(ctx context.Context, stepIds []pgtype.UUID) (map[pgtype.UUID]string, error) {
rows, err := j.queries.GetStepReadableIdsByIds(ctx, j.pool, stepIds)
if err != nil {
return nil, err
}

readableIds := make(map[pgtype.UUID]string)

for _, row := range rows {
readableIds[row.ID] = row.ReadableId.String
}

return readableIds, nil
}
3 changes: 3 additions & 0 deletions pkg/repository/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package repository
import (
"context"

"github.com/jackc/pgx/v5/pgtype"

"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
)

type StepRepository interface {
ListStepExpressions(ctx context.Context, stepId string) ([]*dbsqlc.StepExpression, error)
ListReadableIds(ctx context.Context, stepIds []pgtype.UUID) (map[pgtype.UUID]string, error)
}
9 changes: 7 additions & 2 deletions pkg/scheduling/v1/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import (

"github.com/rs/zerolog"

"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
)

type sharedConfig struct {
repo v1.SchedulerRepository
repo v1.SchedulerRepository
workflowRepo v1.WorkflowRepository
stepRepo repository.StepRepository

l *zerolog.Logger

Expand All @@ -35,14 +38,16 @@ type SchedulingPool struct {
concurrencyResultsCh chan *ConcurrencyResults
}

func NewSchedulingPool(repo v1.SchedulerRepository, l *zerolog.Logger, singleQueueLimit int, schedulerConcurrencyRateLimit int) (*SchedulingPool, func() error, error) {
func NewSchedulingPool(repo v1.SchedulerRepository, workflowRepo v1.WorkflowRepository, stepRepo repository.StepRepository, l *zerolog.Logger, singleQueueLimit int, schedulerConcurrencyRateLimit int) (*SchedulingPool, func() error, error) {
resultsCh := make(chan *QueueResults, 1000)
concurrencyResultsCh := make(chan *ConcurrencyResults, 1000)

s := &SchedulingPool{
Extensions: &Extensions{},
cf: &sharedConfig{
repo: repo,
workflowRepo: workflowRepo,
stepRepo: stepRepo,
l: l,
singleQueueLimit: singleQueueLimit,
schedulerConcurrencyRateLimit: schedulerConcurrencyRateLimit,
Expand Down
116 changes: 101 additions & 15 deletions pkg/scheduling/v1/queuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/jackc/pgx/v5/pgtype"
"github.com/rs/zerolog"

"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/integrations/metrics/prometheus"
"github.com/hatchet-dev/hatchet/pkg/repository"
v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
)

type Queuer struct {
repo v1.QueueRepository
repo v1.QueueRepository
workflowRepo v1.WorkflowRepository
stepRepo repository.StepRepository

tenantId pgtype.UUID
queueName string

Expand Down Expand Up @@ -44,6 +49,15 @@ type Queuer struct {
unassigned map[int64]*sqlcv1.V1QueueItem
unassignedMu mutex

// used to keep a track of tasks that were rate limited and eventually scheduled timed out
unassignedRateLimited sync.Map

// used to cache the workflow names by ID for prometheus metrics
workflowNameCache *lru.Cache[pgtype.UUID, string]

// used to cache the step readable IDs by ID for prometheus metrics
stepReadableIdCache *lru.Cache[pgtype.UUID, string]

hasRateLimits bool
}

Expand All @@ -58,20 +72,34 @@ func newQueuer(conf *sharedConfig, tenantId pgtype.UUID, queueName string, s *Sc

notifyQueueCh := make(chan map[string]string, 1)

workflowNameCache, err := lru.New[pgtype.UUID, string](1000)
if err != nil {
conf.l.Fatal().Err(err).Msg("failed to create workflow name cache")
}

stepReadableIdCache, err := lru.New[pgtype.UUID, string](1000)
if err != nil {
conf.l.Fatal().Err(err).Msg("failed to create step readable ID cache")
}

q := &Queuer{
repo: queueRepo,
tenantId: tenantId,
queueName: queueName,
l: conf.l,
s: s,
limit: defaultLimit,
resultsCh: resultsCh,
notifyQueueCh: notifyQueueCh,
queueMu: newMu(conf.l),
unackedMu: newRWMu(conf.l),
unacked: make(map[int64]struct{}),
unassigned: make(map[int64]*sqlcv1.V1QueueItem),
unassignedMu: newMu(conf.l),
repo: queueRepo,
workflowRepo: conf.workflowRepo,
stepRepo: conf.stepRepo,
tenantId: tenantId,
queueName: queueName,
l: conf.l,
s: s,
limit: defaultLimit,
resultsCh: resultsCh,
notifyQueueCh: notifyQueueCh,
queueMu: newMu(conf.l),
unackedMu: newRWMu(conf.l),
unacked: make(map[int64]struct{}),
unassigned: make(map[int64]*sqlcv1.V1QueueItem),
unassignedMu: newMu(conf.l),
workflowNameCache: workflowNameCache,
stepReadableIdCache: stepReadableIdCache,
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -260,9 +288,61 @@ func (q *Queuer) loopQueue(ctx context.Context) {
}
}

for range ar.schedulingTimedOut {
var workflowIdsMissingFromCache []pgtype.UUID
var stepIdsMissingFromCache []pgtype.UUID

for _, qi := range ar.schedulingTimedOut {
if _, ok := q.workflowNameCache.Get(qi.WorkflowID); !ok {
workflowIdsMissingFromCache = append(workflowIdsMissingFromCache, qi.WorkflowID)
}

if _, ok := q.stepReadableIdCache.Get(qi.StepID); !ok {
stepIdsMissingFromCache = append(stepIdsMissingFromCache, qi.StepID)
}
}

if len(workflowIdsMissingFromCache) > 0 {
workflowNames, err := q.workflowRepo.ListWorkflowNamesByIds(ctx, q.tenantId.String(), workflowIdsMissingFromCache)
if err != nil {
q.l.Error().Err(err).Msg("error getting workflow names for prometheus metric: TenantTaskSchedulingTimedOut")
return
}

for id, name := range workflowNames {
q.workflowNameCache.Add(id, name)
}
}

if len(stepIdsMissingFromCache) > 0 {
stepReadableIds, err := q.stepRepo.ListReadableIds(ctx, stepIdsMissingFromCache)
if err != nil {
q.l.Error().Err(err).Msg("error getting step readable ids for prometheus metric: TenantTaskSchedulingTimedOut")
return
}

for id, readableId := range stepReadableIds {
q.stepReadableIdCache.Add(id, readableId)
}
}

for _, qi := range ar.schedulingTimedOut {
prometheus.SchedulingTimedOut.Inc()
prometheus.TenantSchedulingTimedOut.WithLabelValues(q.tenantId.String()).Inc()

workflowName, wOk := q.workflowNameCache.Get(qi.WorkflowID)
stepReadableId, sOk := q.stepReadableIdCache.Get(qi.StepID)

if !wOk || !sOk {
continue
}

reason := "no workers available"
if _, ok := q.unassignedRateLimited.Load(qi.TaskID); ok {
reason = "rate limited"
}
q.unassignedRateLimited.Delete(qi.TaskID)

prometheus.TenantTaskSchedulingTimedOut.WithLabelValues(q.tenantId.String(), workflowName, stepReadableId, reason).Inc()
}

for range ar.rateLimited {
Expand Down Expand Up @@ -392,14 +472,19 @@ func (q *Queuer) ack(r *assignResults) {
q.unassignedMu.Lock()
defer q.unassignedMu.Unlock()

// WARNING: this ordering is very important since we depend on rate limited items to be
// processed last in order to make prometheus metrics for rate limited items work correctly

for _, assignedItem := range r.assigned {
delete(q.unacked, assignedItem.QueueItem.ID)
delete(q.unassigned, assignedItem.QueueItem.ID)
q.unassignedRateLimited.Delete(assignedItem.QueueItem.TaskID)
}

for _, unassignedItem := range r.unassigned {
delete(q.unacked, unassignedItem.ID)
q.unassigned[unassignedItem.ID] = unassignedItem
q.unassignedRateLimited.Delete(unassignedItem.TaskID)
}

for _, schedulingTimedOutItem := range r.schedulingTimedOut {
Expand All @@ -415,6 +500,7 @@ func (q *Queuer) ack(r *assignResults) {
for _, rateLimitedItem := range r.rateLimited {
delete(q.unacked, rateLimitedItem.qi.ID)
q.unassigned[rateLimitedItem.qi.ID] = rateLimitedItem.qi
q.unassignedRateLimited.Store(rateLimitedItem.qi.TaskID, struct{}{})
}
}

Expand Down
Loading