diff --git a/api/v1/server/handlers/v1/tasks/list_logs.go b/api/v1/server/handlers/v1/tasks/list_logs.go index 5c02d26fe7..ff1d81a6b7 100644 --- a/api/v1/server/handlers/v1/tasks/list_logs.go +++ b/api/v1/server/handlers/v1/tasks/list_logs.go @@ -8,6 +8,7 @@ import ( "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" transformers "github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers/v1" ) @@ -17,12 +18,25 @@ func (t *TasksService) V1LogLineList(ctx echo.Context, request gen.V1LogLineList tenantId := sqlchelpers.UUIDToStr(tenant.ID) task := ctx.Get("task").(*sqlcv1.V1TasksOlap) - logLines, err := t.config.V1.Logs().ListLogLines(ctx.Request().Context(), tenantId, task.ID, task.InsertedAt, &v1.ListLogsOpts{}) + reqCtx, span := telemetry.NewSpan(ctx.Request().Context(), "GET /api/v1/stable/tasks/{task}/logs") + defer span.End() + + telemetry.WithAttributes(span, + telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}, + telemetry.AttributeKV{Key: "task.id", Value: task.ID}, + ) + + logLines, err := t.config.V1.Logs().ListLogLines(reqCtx, tenantId, task.ID, task.InsertedAt, &v1.ListLogsOpts{}) if err != nil { + span.RecordError(err) return nil, err } + telemetry.WithAttributes(span, + telemetry.AttributeKV{Key: "log_lines.count", Value: len(logLines)}, + ) + rows := make([]gen.V1LogLine, len(logLines)) for i, log := range logLines { diff --git a/api/v1/server/handlers/v1/workflow-runs/list.go b/api/v1/server/handlers/v1/workflow-runs/list.go index 42a0573c93..3ac192e8f4 100644 --- a/api/v1/server/handlers/v1/workflow-runs/list.go +++ b/api/v1/server/handlers/v1/workflow-runs/list.go @@ -9,11 +9,11 @@ import ( "github.com/labstack/echo/v4" "github.com/hatchet-dev/hatchet/api/v1/server/oas/gen" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" transformers "github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers/v1" ) diff --git a/cmd/hatchet-api/api/run.go b/cmd/hatchet-api/api/run.go index d8a45aa0de..4a4e666505 100644 --- a/cmd/hatchet-api/api/run.go +++ b/cmd/hatchet-api/api/run.go @@ -6,8 +6,8 @@ import ( "strings" "github.com/hatchet-dev/hatchet/api/v1/server/run" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/config/loader" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func init() { diff --git a/cmd/hatchet-engine/engine/run.go b/cmd/hatchet-engine/engine/run.go index 99143fbe65..a9da7d71f2 100644 --- a/cmd/hatchet-engine/engine/run.go +++ b/cmd/hatchet-engine/engine/run.go @@ -28,11 +28,11 @@ import ( schedulerv1 "github.com/hatchet-dev/hatchet/internal/services/scheduler/v1" "github.com/hatchet-dev/hatchet/internal/services/ticker" "github.com/hatchet-dev/hatchet/internal/services/webhooks" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/config/loader" "github.com/hatchet-dev/hatchet/pkg/config/server" "github.com/hatchet-dev/hatchet/pkg/config/shared" "github.com/hatchet-dev/hatchet/pkg/repository/cache" + "github.com/hatchet-dev/hatchet/pkg/telemetry" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" diff --git a/internal/msgqueue/postgres/msgqueue.go b/internal/msgqueue/postgres/msgqueue.go index 3901be9df1..c048958628 100644 --- a/internal/msgqueue/postgres/msgqueue.go +++ b/internal/msgqueue/postgres/msgqueue.go @@ -13,10 +13,10 @@ import ( "github.com/hatchet-dev/hatchet/internal/cache" "github.com/hatchet-dev/hatchet/internal/msgqueue" "github.com/hatchet-dev/hatchet/internal/queueutils" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/logger" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type PostgresMessageQueue struct { diff --git a/internal/msgqueue/rabbitmq/rabbitmq.go b/internal/msgqueue/rabbitmq/rabbitmq.go index c6ae923996..6f92dfe2cb 100644 --- a/internal/msgqueue/rabbitmq/rabbitmq.go +++ b/internal/msgqueue/rabbitmq/rabbitmq.go @@ -15,9 +15,9 @@ import ( "github.com/rs/zerolog" "github.com/hatchet-dev/hatchet/internal/msgqueue" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/logger" "github.com/hatchet-dev/hatchet/pkg/random" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) const MAX_RETRY_COUNT = 15 diff --git a/internal/msgqueue/v1/postgres/msgqueue.go b/internal/msgqueue/v1/postgres/msgqueue.go index f837ef9b00..330b3c79c7 100644 --- a/internal/msgqueue/v1/postgres/msgqueue.go +++ b/internal/msgqueue/v1/postgres/msgqueue.go @@ -14,10 +14,10 @@ import ( "github.com/hatchet-dev/hatchet/internal/cache" msgqueue "github.com/hatchet-dev/hatchet/internal/msgqueue/v1" "github.com/hatchet-dev/hatchet/internal/queueutils" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/logger" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type PostgresMessageQueue struct { @@ -111,7 +111,7 @@ func (p *PostgresMessageQueue) SendMessage(ctx context.Context, queue msgqueue.Q ctx, span := telemetry.NewSpan(ctx, "PostgresMessageQueue.SendMessage") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: task.TenantID}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: task.TenantID}) err := p.addMessage(ctx, queue, task) @@ -128,7 +128,7 @@ func (p *PostgresMessageQueue) addMessage(ctx context.Context, queue msgqueue.Qu ctx, span := telemetry.NewSpan(ctx, "add-message") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: task.TenantID}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: task.TenantID}) // inject otel carrier into the message if task.OtelCarrier == nil { diff --git a/internal/msgqueue/v1/rabbitmq/rabbitmq.go b/internal/msgqueue/v1/rabbitmq/rabbitmq.go index 38d81a0674..0c7b411b8f 100644 --- a/internal/msgqueue/v1/rabbitmq/rabbitmq.go +++ b/internal/msgqueue/v1/rabbitmq/rabbitmq.go @@ -17,9 +17,9 @@ import ( msgqueue "github.com/hatchet-dev/hatchet/internal/msgqueue/v1" "github.com/hatchet-dev/hatchet/internal/queueutils" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/logger" "github.com/hatchet-dev/hatchet/pkg/random" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) const MAX_RETRY_COUNT = 15 @@ -281,7 +281,7 @@ func (t *MessageQueueImpl) pubMessage(ctx context.Context, q msgqueue.Queue, msg ctx, span := telemetry.NewSpanWithCarrier(ctx, "publish-message", otelCarrier) defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: msg.TenantID}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: msg.TenantID}) msg.SetOtelCarrier(otelCarrier) diff --git a/internal/services/controllers/events/controller.go b/internal/services/controllers/events/controller.go index a97d7bf6a0..f9734ce79d 100644 --- a/internal/services/controllers/events/controller.go +++ b/internal/services/controllers/events/controller.go @@ -13,11 +13,11 @@ import ( "github.com/hatchet-dev/hatchet/internal/datautils" "github.com/hatchet-dev/hatchet/internal/msgqueue" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/constants" "github.com/hatchet-dev/hatchet/pkg/logger" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type EventsController interface { @@ -165,7 +165,7 @@ func (ec *EventsControllerImpl) handleTask(ctx context.Context, task *msgqueue.M err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode task metadata: %w", err) } diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index 328774d0b1..9452ae690a 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -22,14 +22,14 @@ import ( "github.com/hatchet-dev/hatchet/internal/services/partition" "github.com/hatchet-dev/hatchet/internal/services/shared/recoveryutils" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" - "github.com/hatchet-dev/hatchet/internal/telemetry" - "github.com/hatchet-dev/hatchet/internal/telemetry/servertel" "github.com/hatchet-dev/hatchet/pkg/config/shared" hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors" "github.com/hatchet-dev/hatchet/pkg/logger" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" + "github.com/hatchet-dev/hatchet/pkg/telemetry/servertel" ) type JobsController interface { @@ -319,7 +319,7 @@ func (ec *JobsControllerImpl) handleJobRunQueued(ctx context.Context, task *msgq err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode job task metadata: %w", err) } @@ -377,7 +377,7 @@ func (ec *JobsControllerImpl) handleJobRunCancelled(ctx context.Context, task *m err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode job task metadata: %w", err) } @@ -434,7 +434,7 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode job task metadata: %w", err) } @@ -504,7 +504,7 @@ func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msg err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode job task metadata: %w", err) } @@ -602,7 +602,7 @@ func (ec *JobsControllerImpl) handleStepRunQueued(ctx context.Context, task *msg err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode job task metadata: %w", err) } @@ -699,7 +699,7 @@ func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tena ctx, span := telemetry.NewSpan(ctx, "handle-step-run-reassign") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) _, stepRunsToFail, err := ec.repo.StepRun().ListStepRunsToReassign(ctx, tenantId) @@ -732,7 +732,7 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId ctx, span := telemetry.NewSpan(ctx, "queue-step-run") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) // add the rendered data to the step run stepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, tenantId, stepRunId) @@ -944,7 +944,7 @@ func (ec *JobsControllerImpl) handleStepRunStarted(ctx context.Context, task *ms err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode step run started task metadata: %w", err) } @@ -981,7 +981,7 @@ func (ec *JobsControllerImpl) handleStepRunAcked(ctx context.Context, task *msgq err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode step run started task metadata: %w", err) } @@ -1018,7 +1018,7 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode step run finished task metadata: %w", err) } @@ -1088,7 +1088,7 @@ func (ec *JobsControllerImpl) handleStepRunFailed(ctx context.Context, task *msg err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode step run failed task metadata: %w", err) } @@ -1218,7 +1218,7 @@ func (ec *JobsControllerImpl) handleStepRunTimedOut(ctx context.Context, task *m err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode step run timed out task metadata: %w", err) } @@ -1242,7 +1242,7 @@ func (ec *JobsControllerImpl) handleStepRunCancel(ctx context.Context, task *msg err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode step run notify cancel task metadata: %w", err) } @@ -1254,7 +1254,7 @@ func (ec *JobsControllerImpl) cancelStepRun(ctx context.Context, tenantId, stepR ctx, span := telemetry.NewSpan(ctx, "cancel-step-run") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) // cancel current step run now := time.Now().UTC() diff --git a/internal/services/controllers/jobs/queue.go b/internal/services/controllers/jobs/queue.go index ccb04dfc28..afa17ac13c 100644 --- a/internal/services/controllers/jobs/queue.go +++ b/internal/services/controllers/jobs/queue.go @@ -15,11 +15,11 @@ import ( "github.com/hatchet-dev/hatchet/internal/services/partition" "github.com/hatchet-dev/hatchet/internal/services/shared/recoveryutils" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" - "github.com/hatchet-dev/hatchet/internal/telemetry" hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type queue struct { @@ -193,7 +193,7 @@ func (q *queue) handleCheckQueue(ctx context.Context, task *msgqueue.Message) er err := q.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode check queue metadata: %w", err) } @@ -235,7 +235,7 @@ func (q *queue) processStepRunUpdatesV2(ctx context.Context, tenantId string) (b ctx, span := telemetry.NewSpan(ctx, "process-step-run-updates-v2") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) dbCtx, cancel := context.WithTimeout(ctx, 300*time.Second) defer cancel() @@ -355,7 +355,7 @@ func (q *queue) processStepRunTimeouts(ctx context.Context, tenantId string) (bo ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timeout") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) shouldContinue, stepRuns, err := q.repo.StepRun().ListStepRunsToTimeout(ctx, tenantId) @@ -376,7 +376,7 @@ func (q *queue) processStepRunTimeouts(ctx context.Context, tenantId string) (bo scheduleCtx, span := telemetry.NewSpan(scheduleCtx, "handle-step-run-timeout-step-run") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) for i := range group { stepRunCp := group[i] @@ -408,7 +408,7 @@ func (q *queue) processStepRunRetries(ctx context.Context, tenantId string) (boo ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timeout") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) shouldContinue, err := q.repo.StepRun().RetryStepRuns(ctx, tenantId) diff --git a/internal/services/controllers/retention/events.go b/internal/services/controllers/retention/events.go index 6b80468c0f..7a3811b855 100644 --- a/internal/services/controllers/retention/events.go +++ b/internal/services/controllers/retention/events.go @@ -5,9 +5,9 @@ import ( "fmt" "time" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (rc *RetentionControllerImpl) runDeleteExpiredEvents(ctx context.Context) func() { @@ -45,7 +45,7 @@ func (wc *RetentionControllerImpl) runDeleteExpiredEventsTenant(ctx context.Cont tenantId := sqlchelpers.UUIDToStr(tenant.ID) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) createdBefore, err := GetDataRetentionExpiredTime(tenant.DataRetentionPeriod) @@ -80,7 +80,7 @@ func (wc *RetentionControllerImpl) runClearDeletedEventsPayloadTenant(ctx contex tenantId := sqlchelpers.UUIDToStr(tenant.ID) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) // keep deleting until the context is done for { diff --git a/internal/services/controllers/retention/jobruns.go b/internal/services/controllers/retention/jobruns.go index 7a46437329..59bd767fc7 100644 --- a/internal/services/controllers/retention/jobruns.go +++ b/internal/services/controllers/retention/jobruns.go @@ -5,9 +5,9 @@ import ( "fmt" "time" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (wc *RetentionControllerImpl) runDeleteExpiredJobRuns(ctx context.Context) func() { @@ -31,7 +31,7 @@ func (wc *RetentionControllerImpl) runDeleteExpireJobRunsTenant(ctx context.Cont tenantId := sqlchelpers.UUIDToStr(tenant.ID) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) // keep deleting until the context is done for { diff --git a/internal/services/controllers/retention/mq.go b/internal/services/controllers/retention/mq.go index 5c8835175c..a41c534c62 100644 --- a/internal/services/controllers/retention/mq.go +++ b/internal/services/controllers/retention/mq.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/hatchet-dev/hatchet/internal/telemetry" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (rc *RetentionControllerImpl) runDeleteMessageQueueItems(ctx context.Context) func() { diff --git a/internal/services/controllers/retention/queue.go b/internal/services/controllers/retention/queue.go index 1d5505dd9a..fe7bfa6b9b 100644 --- a/internal/services/controllers/retention/queue.go +++ b/internal/services/controllers/retention/queue.go @@ -4,9 +4,9 @@ import ( "context" "time" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (rc *RetentionControllerImpl) runDeleteQueueItems(ctx context.Context) func() { @@ -30,7 +30,7 @@ func (rc *RetentionControllerImpl) runDeleteQueueItemsTenant(ctx context.Context tenantId := sqlchelpers.UUIDToStr(tenant.ID) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) return rc.repo.StepRun().CleanupQueueItems(ctx, tenantId) } @@ -56,7 +56,7 @@ func (rc *RetentionControllerImpl) runDeleteInternalQueueItemsTenant(ctx context tenantId := sqlchelpers.UUIDToStr(tenant.ID) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) return rc.repo.StepRun().CleanupInternalQueueItems(ctx, tenantId) } @@ -82,7 +82,7 @@ func (rc *RetentionControllerImpl) runDeleteRetryQueueItemsTenant(ctx context.Co tenantId := sqlchelpers.UUIDToStr(tenant.ID) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) return rc.repo.StepRun().CleanupRetryQueueItems(ctx, tenantId) } diff --git a/internal/services/controllers/retention/stepruns.go b/internal/services/controllers/retention/stepruns.go index 2b1d8ad147..a0c7f3da39 100644 --- a/internal/services/controllers/retention/stepruns.go +++ b/internal/services/controllers/retention/stepruns.go @@ -5,9 +5,9 @@ import ( "fmt" "time" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (wc *RetentionControllerImpl) runDeleteExpiredStepRuns(ctx context.Context) func() { @@ -31,7 +31,7 @@ func (wc *RetentionControllerImpl) runDeleteExpireStepRunsTenant(ctx context.Con tenantId := sqlchelpers.UUIDToStr(tenant.ID) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) // keep deleting until the context is done for { diff --git a/internal/services/controllers/retention/workers.go b/internal/services/controllers/retention/workers.go index 8c600cf711..d81765fdf4 100644 --- a/internal/services/controllers/retention/workers.go +++ b/internal/services/controllers/retention/workers.go @@ -5,9 +5,9 @@ import ( "fmt" "time" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (rc *RetentionControllerImpl) runDeleteOldWorkers(ctx context.Context) func() { @@ -48,7 +48,7 @@ func (wc *RetentionControllerImpl) runDeleteOldWorkersTenant(ctx context.Context tenantId := sqlchelpers.UUIDToStr(tenant.ID) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) // hard-coded to last heartbeat before 24 hours lastHeartbeatBefore := time.Now().UTC().Add(-24 * time.Hour) @@ -80,7 +80,7 @@ func (wc *RetentionControllerImpl) runDeleteOldWorkerAssignEventsTenant(ctx cont tenantId := sqlchelpers.UUIDToStr(tenant.ID) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) // hard-coded to last heartbeat after 24 hours lastHeartbeatAfter := time.Now().UTC().Add(-24 * time.Hour) diff --git a/internal/services/controllers/retention/workflowruns.go b/internal/services/controllers/retention/workflowruns.go index 06a189cc03..394d4b6988 100644 --- a/internal/services/controllers/retention/workflowruns.go +++ b/internal/services/controllers/retention/workflowruns.go @@ -5,9 +5,9 @@ import ( "fmt" "time" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (wc *RetentionControllerImpl) runDeleteExpiredWorkflowRuns(ctx context.Context) func() { @@ -31,7 +31,7 @@ func (wc *RetentionControllerImpl) runDeleteExpiredWorkflowRunsTenant(ctx contex tenantId := sqlchelpers.UUIDToStr(tenant.ID) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) createdBefore, err := GetDataRetentionExpiredTime(tenant.DataRetentionPeriod) diff --git a/internal/services/controllers/v1/olap/create_partitions.go b/internal/services/controllers/v1/olap/create_partitions.go index 38221b4e1b..d08a059275 100644 --- a/internal/services/controllers/v1/olap/create_partitions.go +++ b/internal/services/controllers/v1/olap/create_partitions.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/hatchet-dev/hatchet/internal/telemetry" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (oc *OLAPControllerImpl) runOLAPTablePartition(ctx context.Context) func() { diff --git a/internal/services/controllers/v1/olap/process_alerts.go b/internal/services/controllers/v1/olap/process_alerts.go index 1d9a24990c..6463b3e710 100644 --- a/internal/services/controllers/v1/olap/process_alerts.go +++ b/internal/services/controllers/v1/olap/process_alerts.go @@ -5,11 +5,11 @@ import ( "fmt" "time" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (o *OLAPControllerImpl) runTenantProcessAlerts(ctx context.Context) func() { @@ -41,7 +41,7 @@ func (o *OLAPControllerImpl) processTenantAlerts(ctx context.Context, tenantId s ctx, span := telemetry.NewSpan(ctx, "process-tenant-alerts") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) isActive, lastAlerted, err := o.repo.Ticker().IsTenantAlertActive(ctx, tenantId) diff --git a/internal/services/controllers/v1/task/controller.go b/internal/services/controllers/v1/task/controller.go index f3550e97ef..264c27d47b 100644 --- a/internal/services/controllers/v1/task/controller.go +++ b/internal/services/controllers/v1/task/controller.go @@ -22,7 +22,6 @@ import ( "github.com/hatchet-dev/hatchet/internal/services/partition" "github.com/hatchet-dev/hatchet/internal/services/shared/recoveryutils" tasktypes "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/config/server" "github.com/hatchet-dev/hatchet/pkg/config/shared" hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors" @@ -33,6 +32,7 @@ import ( "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) const BULK_MSG_BATCH_SIZE = 50 @@ -475,7 +475,7 @@ func (tc *TasksControllerImpl) handleTaskCompleted(ctx context.Context, tenantId ctx, span := telemetry.NewSpan(ctx, "TasksControllerImpl.handleTaskCompleted") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) opts := make([]v1.CompleteTaskOpts, 0) idsToData := make(map[int64][]byte) @@ -519,7 +519,7 @@ func (tc *TasksControllerImpl) handleTaskFailed(ctx context.Context, tenantId st ctx, span := telemetry.NewSpan(ctx, "TasksControllerImpl.handleTaskFailed") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) opts := make([]v1.FailTaskOpts, 0) @@ -598,7 +598,7 @@ func (tc *TasksControllerImpl) processFailTasksResponse(ctx context.Context, ten ctx, span := telemetry.NewSpan(ctx, "TasksControllerImpl.processFailTasksResponse") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) retriedTaskIds := make(map[int64]struct{}) @@ -656,7 +656,7 @@ func (tc *TasksControllerImpl) handleTaskCancelled(ctx context.Context, tenantId ctx, span := telemetry.NewSpan(ctx, "TasksControllerImpl.handleTaskCancelled") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) opts := make([]v1.TaskIdInsertedAtRetryCount, 0) @@ -1019,7 +1019,7 @@ func (tc *TasksControllerImpl) handleProcessUserEvents(ctx context.Context, tena ctx, span := telemetry.NewSpan(ctx, "TasksControllerImpl.handleProcessUserEvents") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) msgs := msgqueue.JSONConvert[tasktypes.UserEventTaskPayload](payloads) @@ -1155,7 +1155,7 @@ func (tc *TasksControllerImpl) handleProcessInternalEvents(ctx context.Context, ctx, span := telemetry.NewSpan(ctx, "TasksControllerImpl.handleProcessInternalEvents") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) msgs := msgqueue.JSONConvert[v1.InternalTaskEvent](payloads) @@ -1194,7 +1194,7 @@ func (tc *TasksControllerImpl) sendInternalEvents(ctx context.Context, tenantId ctx, span := telemetry.NewSpan(ctx, "TasksControllerImpl.sendInternalEvents") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) if len(events) == 0 { return nil diff --git a/internal/services/controllers/v1/task/create_partitions.go b/internal/services/controllers/v1/task/create_partitions.go index b7117acb5d..56d26cf2dc 100644 --- a/internal/services/controllers/v1/task/create_partitions.go +++ b/internal/services/controllers/v1/task/create_partitions.go @@ -6,7 +6,7 @@ import ( "go.opentelemetry.io/otel/codes" - "github.com/hatchet-dev/hatchet/internal/telemetry" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (tc *TasksControllerImpl) runTaskTablePartition(ctx context.Context) func() { diff --git a/internal/services/controllers/v1/task/evict_idempotency_keys.go b/internal/services/controllers/v1/task/evict_idempotency_keys.go index 4386a5a209..654d208907 100644 --- a/internal/services/controllers/v1/task/evict_idempotency_keys.go +++ b/internal/services/controllers/v1/task/evict_idempotency_keys.go @@ -4,15 +4,15 @@ import ( "context" "fmt" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (tc *TasksControllerImpl) evictExpiredIdempotencyKeys(ctx context.Context, tenantId string) (bool, error) { ctx, span := telemetry.NewSpan(ctx, "evict-expired-idempotency-keys") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) err := tc.repov1.Idempotency().EvictExpiredIdempotencyKeys(ctx, sqlchelpers.UUIDFromStr(tenantId)) diff --git a/internal/services/controllers/v1/task/process_reassignments.go b/internal/services/controllers/v1/task/process_reassignments.go index 4dc8e0ff0e..10843e0850 100644 --- a/internal/services/controllers/v1/task/process_reassignments.go +++ b/internal/services/controllers/v1/task/process_reassignments.go @@ -7,17 +7,17 @@ import ( msgqueue "github.com/hatchet-dev/hatchet/internal/msgqueue/v1" tasktypes "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/integrations/metrics/prometheus" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (tc *TasksControllerImpl) processTaskReassignments(ctx context.Context, tenantId string) (bool, error) { ctx, span := telemetry.NewSpan(ctx, "process-task-reassignments") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) res, shouldContinue, err := tc.repov1.Tasks().ProcessTaskReassignments(ctx, tenantId) diff --git a/internal/services/controllers/v1/task/process_retry_queue_items.go b/internal/services/controllers/v1/task/process_retry_queue_items.go index feff03ef11..6e3589abaf 100644 --- a/internal/services/controllers/v1/task/process_retry_queue_items.go +++ b/internal/services/controllers/v1/task/process_retry_queue_items.go @@ -9,15 +9,15 @@ import ( msgqueue "github.com/hatchet-dev/hatchet/internal/msgqueue/v1" tasktypes "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (tc *TasksControllerImpl) processTaskRetryQueueItems(ctx context.Context, tenantId string) (bool, error) { ctx, span := telemetry.NewSpan(ctx, "process-retry-queue-items") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) retryQueueItems, shouldContinue, err := tc.repov1.Tasks().ProcessTaskRetryQueueItems(ctx, tenantId) diff --git a/internal/services/controllers/v1/task/process_sleeps.go b/internal/services/controllers/v1/task/process_sleeps.go index 562ba65392..f29926a0f0 100644 --- a/internal/services/controllers/v1/task/process_sleeps.go +++ b/internal/services/controllers/v1/task/process_sleeps.go @@ -4,14 +4,14 @@ import ( "context" "fmt" - "github.com/hatchet-dev/hatchet/internal/telemetry" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (tc *TasksControllerImpl) processSleeps(ctx context.Context, tenantId string) (bool, error) { ctx, span := telemetry.NewSpan(ctx, "process-sleep") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) matchResult, shouldContinue, err := tc.repov1.Tasks().ProcessDurableSleeps(ctx, tenantId) diff --git a/internal/services/controllers/v1/task/process_timeouts.go b/internal/services/controllers/v1/task/process_timeouts.go index bf7ee945d4..fb1bc1ab80 100644 --- a/internal/services/controllers/v1/task/process_timeouts.go +++ b/internal/services/controllers/v1/task/process_timeouts.go @@ -7,16 +7,16 @@ import ( msgqueue "github.com/hatchet-dev/hatchet/internal/msgqueue/v1" tasktypes "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (tc *TasksControllerImpl) processTaskTimeouts(ctx context.Context, tenantId string) (bool, error) { ctx, span := telemetry.NewSpan(ctx, "process-task-timeout") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) res, shouldContinue, err := tc.repov1.Tasks().ProcessTaskTimeouts(ctx, tenantId) diff --git a/internal/services/controllers/workflows/controller.go b/internal/services/controllers/workflows/controller.go index 54e6882e25..6c1656eda7 100644 --- a/internal/services/controllers/workflows/controller.go +++ b/internal/services/controllers/workflows/controller.go @@ -19,13 +19,13 @@ import ( "github.com/hatchet-dev/hatchet/internal/services/partition" "github.com/hatchet-dev/hatchet/internal/services/shared/recoveryutils" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" - "github.com/hatchet-dev/hatchet/internal/telemetry" hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors" "github.com/hatchet-dev/hatchet/pkg/logger" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/cache" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type WorkflowsController interface { @@ -368,7 +368,7 @@ func (wc *WorkflowsControllerImpl) handleCheckQueue(ctx context.Context, task *m err := wc.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode check queue metadata: %w", err) } @@ -417,7 +417,7 @@ func (wc *WorkflowsControllerImpl) handleReplayWorkflowRun(ctx context.Context, err = wc.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode replay workflow run task metadata: %w", err) } @@ -452,7 +452,7 @@ func (ec *WorkflowsControllerImpl) handleGroupKeyRunStarted(ctx context.Context, err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode group key run started task metadata: %w", err) } @@ -488,7 +488,7 @@ func (wc *WorkflowsControllerImpl) handleGroupKeyRunFinished(ctx context.Context err = wc.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode group key run finished task metadata: %w", err) } @@ -616,7 +616,7 @@ func (wc *WorkflowsControllerImpl) handleGroupKeyRunFailed(ctx context.Context, err = wc.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode group key run failed task metadata: %w", err) } @@ -656,7 +656,7 @@ func (wc *WorkflowsControllerImpl) handleGetGroupKeyRunTimedOut(ctx context.Cont err = wc.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode get group key run run timed out task metadata: %w", err) } @@ -668,7 +668,7 @@ func (wc *WorkflowsControllerImpl) cancelGetGroupKeyRun(ctx context.Context, ten ctx, span := telemetry.NewSpan(ctx, "cancel-get-group-key-run") // nolint: ineffassign defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) // cancel current step run now := time.Now().UTC() @@ -766,7 +766,7 @@ func (wc *WorkflowsControllerImpl) processWorkflowEvents(ctx context.Context, te ctx, span := telemetry.NewSpan(ctx, "process-workflow-events") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) dbCtx, cancel := context.WithTimeout(ctx, 300*time.Second) defer cancel() @@ -784,7 +784,7 @@ func (wc *WorkflowsControllerImpl) unpauseWorkflowRuns(ctx context.Context, tena ctx, span := telemetry.NewSpan(ctx, "unpause-workflow-runs") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) dbCtx, cancel := context.WithTimeout(ctx, 300*time.Second) defer cancel() diff --git a/internal/services/controllers/workflows/queue.go b/internal/services/controllers/workflows/queue.go index 16fd8db1be..f05a05ffc9 100644 --- a/internal/services/controllers/workflows/queue.go +++ b/internal/services/controllers/workflows/queue.go @@ -13,11 +13,11 @@ import ( "github.com/hatchet-dev/hatchet/internal/datautils" "github.com/hatchet-dev/hatchet/internal/msgqueue" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" - "github.com/hatchet-dev/hatchet/internal/telemetry" - "github.com/hatchet-dev/hatchet/internal/telemetry/servertel" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" + "github.com/hatchet-dev/hatchet/pkg/telemetry/servertel" ) func (wc *WorkflowsControllerImpl) handleWorkflowRunQueued(ctx context.Context, task *msgqueue.Message) error { @@ -40,7 +40,7 @@ func (wc *WorkflowsControllerImpl) handleWorkflowRunQueued(ctx context.Context, err = wc.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode job task metadata: %w", err) } @@ -289,7 +289,7 @@ func (wc *WorkflowsControllerImpl) handleWorkflowRunFinished(ctx context.Context err = wc.dv.DecodeAndValidate(task.Metadata, &metadata) if err == nil { - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: metadata.TenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: metadata.TenantId}) } else { return fmt.Errorf("could not decode job task metadata: %w", err) } @@ -392,7 +392,7 @@ func (wc *WorkflowsControllerImpl) scheduleGetGroupAction( tenantId := sqlchelpers.UUIDToStr(getGroupKeyRun.GetGroupKeyRun.TenantId) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) getGroupKeyRunId := sqlchelpers.UUIDToStr(getGroupKeyRun.GetGroupKeyRun.ID) workflowRunId := sqlchelpers.UUIDToStr(getGroupKeyRun.WorkflowRunId) @@ -474,7 +474,7 @@ func (ec *WorkflowsControllerImpl) runGetGroupKeyRunRequeueTenant(ctx context.Co ctx, span := telemetry.NewSpan(ctx, "handle-get-group-key-run-requeue") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) getGroupKeyRuns, err := ec.repo.GetGroupKeyRun().ListGetGroupKeyRunsToRequeue(ctx, tenantId) @@ -494,7 +494,7 @@ func (ec *WorkflowsControllerImpl) runGetGroupKeyRunRequeueTenant(ctx context.Co ctx, span := telemetry.NewSpan(ctx, "handle-get-group-key-run-requeue-tenant") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) getGroupKeyRunId := sqlchelpers.UUIDToStr(getGroupKeyRunCp.ID) @@ -565,7 +565,7 @@ func (ec *WorkflowsControllerImpl) runGetGroupKeyRunReassignTenant(ctx context.C ctx, span := telemetry.NewSpan(ctx, "handle-get-group-key-run-reassign") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) getGroupKeyRuns, err := ec.repo.GetGroupKeyRun().ListGetGroupKeyRunsToReassign(ctx, tenantId) @@ -585,7 +585,7 @@ func (ec *WorkflowsControllerImpl) runGetGroupKeyRunReassignTenant(ctx context.C ctx, span := telemetry.NewSpan(ctx, "handle-get-group-key-run-reassign-tenant") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) getGroupKeyRunId := sqlchelpers.UUIDToStr(getGroupKeyRunCp.ID) @@ -613,7 +613,7 @@ func (wc *WorkflowsControllerImpl) queueByCancelInProgress(ctx context.Context, ctx, span := telemetry.NewSpan(ctx, "queue-by-cancel-in-progress") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) workflowVersionId := sqlchelpers.UUIDToStr(workflowVersion.WorkflowVersion.ID) maxRuns := int(workflowVersion.ConcurrencyMaxRuns.Int32) @@ -672,7 +672,7 @@ func (wc *WorkflowsControllerImpl) queueByCancelNewest(ctx context.Context, tena ctx, span := telemetry.NewSpan(ctx, "queue-by-cancel-newest") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) workflowVersionId := sqlchelpers.UUIDToStr(workflowVersion.WorkflowVersion.ID) maxRuns := int(workflowVersion.ConcurrencyMaxRuns.Int32) @@ -731,7 +731,7 @@ func (wc *WorkflowsControllerImpl) queueByGroupRoundRobin(ctx context.Context, t ctx, span := telemetry.NewSpan(ctx, "queue-by-group-round-robin") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: tenantId}) workflowVersionId := sqlchelpers.UUIDToStr(workflowVersion.WorkflowVersion.ID) maxRuns := int(workflowVersion.ConcurrencyMaxRuns.Int32) diff --git a/internal/services/dispatcher/dispatcher.go b/internal/services/dispatcher/dispatcher.go index 33fdc3ddcd..33b327c03d 100644 --- a/internal/services/dispatcher/dispatcher.go +++ b/internal/services/dispatcher/dispatcher.go @@ -20,14 +20,14 @@ import ( "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts" "github.com/hatchet-dev/hatchet/internal/services/shared/recoveryutils" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" - "github.com/hatchet-dev/hatchet/internal/telemetry" - "github.com/hatchet-dev/hatchet/internal/telemetry/servertel" "github.com/hatchet-dev/hatchet/pkg/logger" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/cache" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" + "github.com/hatchet-dev/hatchet/pkg/telemetry/servertel" "github.com/hatchet-dev/hatchet/pkg/validator" hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors" diff --git a/internal/services/dispatcher/dispatcher_v1.go b/internal/services/dispatcher/dispatcher_v1.go index a2c54880d3..eaa597afa0 100644 --- a/internal/services/dispatcher/dispatcher_v1.go +++ b/internal/services/dispatcher/dispatcher_v1.go @@ -17,10 +17,10 @@ import ( "github.com/hatchet-dev/hatchet/internal/queueutils" "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts" tasktypesv1 "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (worker *subscribedWorker) StartTaskFromBulk( @@ -55,7 +55,7 @@ func (worker *subscribedWorker) sendToWorker( telemetry.WithAttributes( span, telemetry.AttributeKV{ - Key: "worker_id", + Key: "worker.id", Value: worker.workerId, }, ) @@ -63,7 +63,7 @@ func (worker *subscribedWorker) sendToWorker( telemetry.WithAttributes( span, telemetry.AttributeKV{ - Key: "payload_size", + Key: "payload.size_bytes", Value: len(action.ActionPayload), }, ) @@ -91,7 +91,7 @@ func (worker *subscribedWorker) sendToWorker( lockSpan.End() telemetry.WithAttributes(span, telemetry.AttributeKV{ - Key: "lock_duration_ms", + Key: "lock.duration_ms", Value: time.Since(lockBegin).Milliseconds(), }) diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index 6f7fc3e2b0..eff86bde7e 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -25,12 +25,12 @@ import ( "github.com/hatchet-dev/hatchet/internal/msgqueue" "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/cache" "github.com/hatchet-dev/hatchet/pkg/repository/metered" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type subscribedWorker struct { diff --git a/internal/services/ingestor/ingestor.go b/internal/services/ingestor/ingestor.go index 85f881f472..4f7505f0d4 100644 --- a/internal/services/ingestor/ingestor.go +++ b/internal/services/ingestor/ingestor.go @@ -11,13 +11,13 @@ import ( msgqueuev1 "github.com/hatchet-dev/hatchet/internal/msgqueue/v1" "github.com/hatchet-dev/hatchet/internal/services/ingestor/contracts" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/metered" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" "github.com/hatchet-dev/hatchet/pkg/validator" ) @@ -216,7 +216,7 @@ func (i *IngestorImpl) ingestEventV0(ctx context.Context, tenant *dbsqlc.Tenant, } telemetry.WithAttributes(span, telemetry.AttributeKV{ - Key: "event_id", + Key: "event.id", Value: event.ID, }) diff --git a/internal/services/ingestor/ingestor_v1.go b/internal/services/ingestor/ingestor_v1.go index 452b4fea9f..4867f786c9 100644 --- a/internal/services/ingestor/ingestor_v1.go +++ b/internal/services/ingestor/ingestor_v1.go @@ -11,12 +11,12 @@ import ( msgqueue "github.com/hatchet-dev/hatchet/internal/msgqueue/v1" tasktypes "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type EventResult struct { diff --git a/internal/services/partition/partition.go b/internal/services/partition/partition.go index 5b1681175e..c07672c23b 100644 --- a/internal/services/partition/partition.go +++ b/internal/services/partition/partition.go @@ -9,9 +9,9 @@ import ( "github.com/go-co-op/gocron/v2" "github.com/rs/zerolog" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) const ( diff --git a/internal/services/scheduler/scheduler.go b/internal/services/scheduler/scheduler.go index 9045760ee1..89376f37c8 100644 --- a/internal/services/scheduler/scheduler.go +++ b/internal/services/scheduler/scheduler.go @@ -15,7 +15,6 @@ import ( "github.com/hatchet-dev/hatchet/internal/services/partition" "github.com/hatchet-dev/hatchet/internal/services/shared/recoveryutils" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/config/shared" hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors" "github.com/hatchet-dev/hatchet/pkg/logger" @@ -23,6 +22,7 @@ import ( "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" v0 "github.com/hatchet-dev/hatchet/pkg/scheduling/v0" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type SchedulerOpt func(*SchedulerOpts) diff --git a/internal/services/scheduler/v1/scheduler.go b/internal/services/scheduler/v1/scheduler.go index 65f28f0703..5b0a9cbf12 100644 --- a/internal/services/scheduler/v1/scheduler.go +++ b/internal/services/scheduler/v1/scheduler.go @@ -16,7 +16,6 @@ import ( "github.com/hatchet-dev/hatchet/internal/services/partition" "github.com/hatchet-dev/hatchet/internal/services/shared/recoveryutils" tasktypes "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/config/shared" hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors" "github.com/hatchet-dev/hatchet/pkg/logger" @@ -26,6 +25,7 @@ import ( repov1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" v1 "github.com/hatchet-dev/hatchet/pkg/scheduling/v1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type SchedulerOpt func(*SchedulerOpts) diff --git a/pkg/repository/postgres/buffer_user_events.go b/pkg/repository/postgres/buffer_user_events.go index b168ef02e4..ab176f75b4 100644 --- a/pkg/repository/postgres/buffer_user_events.go +++ b/pkg/repository/postgres/buffer_user_events.go @@ -8,11 +8,11 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/buffer" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func newUserEventBuffer(shared *sharedRepository, conf buffer.ConfigFileBuffer) (*buffer.TenantBufferManager[*repository.CreateEventOpts, dbsqlc.Event], error) { diff --git a/pkg/repository/postgres/event.go b/pkg/repository/postgres/event.go index d12725f8e8..21878a2d4b 100644 --- a/pkg/repository/postgres/event.go +++ b/pkg/repository/postgres/event.go @@ -12,12 +12,12 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/buffer" "github.com/hatchet-dev/hatchet/pkg/repository/metered" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type eventAPIRepository struct { diff --git a/pkg/repository/postgres/mq.go b/pkg/repository/postgres/mq.go index fd26caf543..f2daa3e4dc 100644 --- a/pkg/repository/postgres/mq.go +++ b/pkg/repository/postgres/mq.go @@ -8,10 +8,10 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type messageQueueRepository struct { diff --git a/pkg/repository/postgres/scheduler_assignment.go b/pkg/repository/postgres/scheduler_assignment.go index 58f96ef493..f7eee4aaa4 100644 --- a/pkg/repository/postgres/scheduler_assignment.go +++ b/pkg/repository/postgres/scheduler_assignment.go @@ -5,8 +5,8 @@ import ( "github.com/jackc/pgx/v5/pgtype" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type assignmentRepository struct { diff --git a/pkg/repository/postgres/scheduler_lease.go b/pkg/repository/postgres/scheduler_lease.go index 40bdd105d2..6104700139 100644 --- a/pkg/repository/postgres/scheduler_lease.go +++ b/pkg/repository/postgres/scheduler_lease.go @@ -7,10 +7,10 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type leaseRepository struct { diff --git a/pkg/repository/postgres/scheduler_queue.go b/pkg/repository/postgres/scheduler_queue.go index cbeab9ed87..85040cfb60 100644 --- a/pkg/repository/postgres/scheduler_queue.go +++ b/pkg/repository/postgres/scheduler_queue.go @@ -9,11 +9,11 @@ import ( "github.com/jackc/pgx/v5/pgtype" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/cache" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type queueFactoryRepository struct { diff --git a/pkg/repository/postgres/step_run.go b/pkg/repository/postgres/step_run.go index 2166b4bd0f..eaba55348c 100644 --- a/pkg/repository/postgres/step_run.go +++ b/pkg/repository/postgres/step_run.go @@ -17,12 +17,12 @@ import ( "github.com/rs/zerolog" "golang.org/x/sync/errgroup" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/config/server" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/cache" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type stepRunAPIRepository struct { diff --git a/pkg/repository/postgres/workflow.go b/pkg/repository/postgres/workflow.go index a38a2fbe4d..5b1874f9ce 100644 --- a/pkg/repository/postgres/workflow.go +++ b/pkg/repository/postgres/workflow.go @@ -14,12 +14,12 @@ import ( "github.com/hatchet-dev/hatchet/internal/cel" "github.com/hatchet-dev/hatchet/internal/dagutils" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/cache" "github.com/hatchet-dev/hatchet/pkg/repository/metered" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type workflowAPIRepository struct { diff --git a/pkg/repository/postgres/workflow_run.go b/pkg/repository/postgres/workflow_run.go index d63cd1caf8..4c59031886 100644 --- a/pkg/repository/postgres/workflow_run.go +++ b/pkg/repository/postgres/workflow_run.go @@ -20,12 +20,12 @@ import ( "github.com/hatchet-dev/hatchet/internal/datautils" "github.com/hatchet-dev/hatchet/internal/queueutils" "github.com/hatchet-dev/hatchet/internal/services/shared/defaults" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/config/server" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/metered" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type workflowRunAPIRepository struct { diff --git a/pkg/repository/v1/match.go b/pkg/repository/v1/match.go index 4740edd4f5..9a7f9d6793 100644 --- a/pkg/repository/v1/match.go +++ b/pkg/repository/v1/match.go @@ -12,9 +12,9 @@ import ( "github.com/google/cel-go/cel" "github.com/google/uuid" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type CandidateEventMatch struct { diff --git a/pkg/repository/v1/olap.go b/pkg/repository/v1/olap.go index d806c44c6e..500e6b4b2b 100644 --- a/pkg/repository/v1/olap.go +++ b/pkg/repository/v1/olap.go @@ -19,10 +19,10 @@ import ( "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" "github.com/hatchet-dev/hatchet/pkg/validator" ) diff --git a/pkg/repository/v1/payloadstore.go b/pkg/repository/v1/payloadstore.go index c27d443fe4..298c186abb 100644 --- a/pkg/repository/v1/payloadstore.go +++ b/pkg/repository/v1/payloadstore.go @@ -6,9 +6,9 @@ import ( "sort" "time" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog" diff --git a/pkg/repository/v1/scheduler_assignment.go b/pkg/repository/v1/scheduler_assignment.go index e7959d16f4..2099bf0fe4 100644 --- a/pkg/repository/v1/scheduler_assignment.go +++ b/pkg/repository/v1/scheduler_assignment.go @@ -5,8 +5,8 @@ import ( "github.com/jackc/pgx/v5/pgtype" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type assignmentRepository struct { diff --git a/pkg/repository/v1/scheduler_lease.go b/pkg/repository/v1/scheduler_lease.go index c24d044b7d..7a7f4780dc 100644 --- a/pkg/repository/v1/scheduler_lease.go +++ b/pkg/repository/v1/scheduler_lease.go @@ -7,9 +7,9 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type ListActiveWorkersResult struct { diff --git a/pkg/repository/v1/scheduler_queue.go b/pkg/repository/v1/scheduler_queue.go index ca611a58bd..c11dfea87f 100644 --- a/pkg/repository/v1/scheduler_queue.go +++ b/pkg/repository/v1/scheduler_queue.go @@ -8,9 +8,9 @@ import ( "github.com/jackc/pgx/v5/pgtype" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/cache" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" ) diff --git a/pkg/repository/v1/task.go b/pkg/repository/v1/task.go index 55933f4eb1..2e630ee11b 100644 --- a/pkg/repository/v1/task.go +++ b/pkg/repository/v1/task.go @@ -16,9 +16,9 @@ import ( "go.opentelemetry.io/otel/codes" "github.com/hatchet-dev/hatchet/internal/cel" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type CreateTaskOpts struct { diff --git a/pkg/repository/v1/workflow.go b/pkg/repository/v1/workflow.go index 92fda1aa96..399bfd2f42 100644 --- a/pkg/repository/v1/workflow.go +++ b/pkg/repository/v1/workflow.go @@ -16,10 +16,10 @@ import ( "github.com/hatchet-dev/hatchet/internal/cel" "github.com/hatchet-dev/hatchet/internal/datautils" "github.com/hatchet-dev/hatchet/internal/digest" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/client/types" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) var ErrDagParentNotFound = errors.New("dag parent not found") diff --git a/pkg/scheduling/v0/queuer.go b/pkg/scheduling/v0/queuer.go index 87aa2b5368..d4ebf0e08a 100644 --- a/pkg/scheduling/v0/queuer.go +++ b/pkg/scheduling/v0/queuer.go @@ -10,10 +10,10 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/rs/zerolog" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type Queuer struct { @@ -109,7 +109,7 @@ func (q *Queuer) queue(ctx context.Context) { ctx, span := telemetry.NewSpan(ctx, "notify-queue") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: q.tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: q.tenantId}) q.notifyQueueCh <- telemetry.GetCarrier(ctx) }() @@ -131,8 +131,8 @@ func (q *Queuer) loopQueue(ctx context.Context) { ctx, span := telemetry.NewSpanWithCarrier(ctx, "queue", carrier) telemetry.WithAttributes(span, - telemetry.AttributeKV{Key: "queue", Value: q.queueName}, - telemetry.AttributeKV{Key: "tenant_id", Value: q.tenantId}, + telemetry.AttributeKV{Key: "queue.name", Value: q.queueName}, + telemetry.AttributeKV{Key: "tenant.id", Value: q.tenantId}, ) start := time.Now() @@ -374,7 +374,7 @@ func (q *Queuer) flushToDatabase(ctx context.Context, r *assignResults) int { ctx, span := telemetry.NewSpan(ctx, "flush-to-database") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: q.tenantId}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: q.tenantId}) q.l.Debug().Int("assigned", len(r.assigned)).Int("unassigned", len(r.unassigned)).Int("scheduling_timed_out", len(r.schedulingTimedOut)).Msg("flushing to database") diff --git a/pkg/scheduling/v0/scheduler.go b/pkg/scheduling/v0/scheduler.go index d3e2090ee7..4b15a1ef22 100644 --- a/pkg/scheduling/v0/scheduler.go +++ b/pkg/scheduling/v0/scheduler.go @@ -10,11 +10,11 @@ import ( "github.com/rs/zerolog" "github.com/hatchet-dev/hatchet/internal/queueutils" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/scheduling/v0/randomticker" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) // Scheduler is responsible for scheduling steps to workers as efficiently as possible. @@ -453,7 +453,7 @@ func (s *Scheduler) tryAssignBatch( uniqueTenantIds := telemetry.CollectUniqueTenantIDs(qis, func(qi *dbsqlc.QueueItem) string { return sqlchelpers.UUIDToStr(qi.TenantId) }) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: uniqueTenantIds}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: uniqueTenantIds}) } res = make([]*assignSingleResult, len(qis)) @@ -616,7 +616,7 @@ func (s *Scheduler) tryAssignSingleton( ctx, span := telemetry.NewSpan(ctx, "try-assign-singleton") // nolint: ineffassign defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: sqlchelpers.UUIDToStr(qi.TenantId)}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: sqlchelpers.UUIDToStr(qi.TenantId)}) if qi.Sticky.Valid || len(labels) > 0 { candidateSlots = getRankedSlots(qi, labels, candidateSlots) @@ -674,7 +674,7 @@ func (s *Scheduler) tryAssign( uniqueTenantIds := telemetry.CollectUniqueTenantIDs(qis, func(qi *dbsqlc.QueueItem) string { return sqlchelpers.UUIDToStr(qi.TenantId) }) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: uniqueTenantIds}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: uniqueTenantIds}) } // split into groups based on action ids, and process each action id in parallel diff --git a/pkg/scheduling/v1/concurrency.go b/pkg/scheduling/v1/concurrency.go index b11b696535..3c54e14c9b 100644 --- a/pkg/scheduling/v1/concurrency.go +++ b/pkg/scheduling/v1/concurrency.go @@ -9,10 +9,10 @@ import ( "github.com/rs/zerolog" "golang.org/x/time/rate" - "github.com/hatchet-dev/hatchet/internal/telemetry" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" "github.com/hatchet-dev/hatchet/pkg/scheduling/v0/randomticker" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type ConcurrencyResults struct { @@ -86,7 +86,7 @@ func (c *ConcurrencyManager) notify(ctx context.Context) { ctx, span := telemetry.NewSpan(ctx, "notify-concurrency") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: c.tenantId.String()}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: c.tenantId.String()}) // non-blocking write select { @@ -112,8 +112,8 @@ func (c *ConcurrencyManager) loopConcurrency(ctx context.Context) { ctx, span := telemetry.NewSpanWithCarrier(ctx, "concurrency-manager", carrier) telemetry.WithAttributes(span, - telemetry.AttributeKV{Key: "strategy_id", Value: c.strategy.ID}, - telemetry.AttributeKV{Key: "tenant_id", Value: c.tenantId.String()}, + telemetry.AttributeKV{Key: "concurrency.strategy.id", Value: c.strategy.ID}, + telemetry.AttributeKV{Key: "tenant.id", Value: c.tenantId.String()}, ) if !c.rateLimiter.Allow() { @@ -159,8 +159,8 @@ func (c *ConcurrencyManager) loopCheckActive(ctx context.Context) { ctx, span := telemetry.NewSpan(ctx, "concurrency-check-active") telemetry.WithAttributes(span, - telemetry.AttributeKV{Key: "strategy_id", Value: c.strategy.ID}, - telemetry.AttributeKV{Key: "tenant_id", Value: c.tenantId.String()}, + telemetry.AttributeKV{Key: "concurrency.strategy.id", Value: c.strategy.ID}, + telemetry.AttributeKV{Key: "tenant.id", Value: c.tenantId.String()}, ) start := time.Now() diff --git a/pkg/scheduling/v1/queuer.go b/pkg/scheduling/v1/queuer.go index fe06bbac50..51d19aeeb4 100644 --- a/pkg/scheduling/v1/queuer.go +++ b/pkg/scheduling/v1/queuer.go @@ -9,10 +9,10 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/rs/zerolog" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/integrations/metrics/prometheus" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type Queuer struct { @@ -110,7 +110,7 @@ func (q *Queuer) queue(ctx context.Context) { ctx, span := telemetry.NewSpan(ctx, "notify-queue") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: q.tenantId.String()}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: q.tenantId.String()}) q.notifyQueueCh <- telemetry.GetCarrier(ctx) }() @@ -135,8 +135,8 @@ func (q *Queuer) loopQueue(ctx context.Context) { ctx, span := telemetry.NewSpanWithCarrier(ctx, "queue", carrier) telemetry.WithAttributes(span, - telemetry.AttributeKV{Key: "queue", Value: q.queueName}, - telemetry.AttributeKV{Key: "tenant_id", Value: q.tenantId.String()}, + telemetry.AttributeKV{Key: "queue.name", Value: q.queueName}, + telemetry.AttributeKV{Key: "tenant.id", Value: q.tenantId.String()}, ) start := time.Now() @@ -440,7 +440,7 @@ func (q *Queuer) flushToDatabase(ctx context.Context, r *assignResults) int { ctx, span := telemetry.NewSpan(ctx, "flush-to-database") defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: q.tenantId.String()}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: q.tenantId.String()}) begin := time.Now() diff --git a/pkg/scheduling/v1/scheduler.go b/pkg/scheduling/v1/scheduler.go index d0da016ac2..2668e60c9e 100644 --- a/pkg/scheduling/v1/scheduler.go +++ b/pkg/scheduling/v1/scheduler.go @@ -10,11 +10,11 @@ import ( "github.com/rs/zerolog" "github.com/hatchet-dev/hatchet/internal/queueutils" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" "github.com/hatchet-dev/hatchet/pkg/scheduling/v0/randomticker" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) const rateLimitedRequeueAfterThreshold = 2 * time.Second @@ -475,7 +475,7 @@ func (s *Scheduler) tryAssignBatch( uniqueTenantIds := telemetry.CollectUniqueTenantIDs(qis, func(qi *sqlcv1.V1QueueItem) string { return qi.TenantID.String() }) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: uniqueTenantIds}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: uniqueTenantIds}) } res = make([]*assignSingleResult, len(qis)) @@ -638,7 +638,7 @@ func (s *Scheduler) tryAssignSingleton( ctx, span := telemetry.NewSpan(ctx, "try-assign-singleton") // nolint: ineffassign defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: qi.TenantID.String()}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: qi.TenantID.String()}) ringOffset = ringOffset % len(candidateSlots) @@ -700,7 +700,7 @@ func (s *Scheduler) tryAssign( uniqueTenantIds := telemetry.CollectUniqueTenantIDs(qis, func(qi *sqlcv1.V1QueueItem) string { return qi.TenantID.String() }) - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant_id", Value: uniqueTenantIds}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: uniqueTenantIds}) } // split into groups based on action ids, and process each action id in parallel diff --git a/internal/telemetry/servertel/attributes.go b/pkg/telemetry/servertel/attributes.go similarity index 89% rename from internal/telemetry/servertel/attributes.go rename to pkg/telemetry/servertel/attributes.go index cb3bd49055..8904538e52 100644 --- a/internal/telemetry/servertel/attributes.go +++ b/pkg/telemetry/servertel/attributes.go @@ -3,9 +3,9 @@ package servertel import ( "github.com/jackc/pgx/v5/pgtype" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/telemetry" "go.opentelemetry.io/otel/trace" ) @@ -40,63 +40,63 @@ func WithWorkflowRunModel(span trace.Span, workflowRun *dbsqlc.GetWorkflowRunRow func TenantId(tenantId pgtype.UUID) telemetry.AttributeKV { return telemetry.AttributeKV{ - Key: "tenantId", + Key: "tenant.id", Value: sqlchelpers.UUIDToStr(tenantId), } } func StepRunId(stepRunId pgtype.UUID) telemetry.AttributeKV { return telemetry.AttributeKV{ - Key: "stepRunId", + Key: "step_run.id", Value: sqlchelpers.UUIDToStr(stepRunId), } } func Step(step pgtype.UUID) telemetry.AttributeKV { return telemetry.AttributeKV{ - Key: "stepId", + Key: "step.id", Value: sqlchelpers.UUIDToStr(step), } } func JobRunId(jobRunId pgtype.UUID) telemetry.AttributeKV { return telemetry.AttributeKV{ - Key: "jobRunId", + Key: "job_run.id", Value: sqlchelpers.UUIDToStr(jobRunId), } } func Job(job pgtype.UUID) telemetry.AttributeKV { return telemetry.AttributeKV{ - Key: "jobId", + Key: "job.id", Value: sqlchelpers.UUIDToStr(job), } } func WorkflowRunId(workflowRunId pgtype.UUID) telemetry.AttributeKV { return telemetry.AttributeKV{ - Key: "workflowRunId", + Key: "workflow_run.id", Value: sqlchelpers.UUIDToStr(workflowRunId), } } func WorkflowVersion(workflowVersion pgtype.UUID) telemetry.AttributeKV { return telemetry.AttributeKV{ - Key: "workflowVersionId", + Key: "workflow_version.id", Value: sqlchelpers.UUIDToStr(workflowVersion), } } func WorkerId(workerId pgtype.UUID) telemetry.AttributeKV { return telemetry.AttributeKV{ - Key: "workerId", + Key: "worker.id", Value: sqlchelpers.UUIDToStr(workerId), } } func EventId(eventId pgtype.UUID) telemetry.AttributeKV { return telemetry.AttributeKV{ - Key: "eventId", + Key: "event.id", Value: sqlchelpers.UUIDToStr(eventId), } } diff --git a/internal/telemetry/telemetry.go b/pkg/telemetry/telemetry.go similarity index 100% rename from internal/telemetry/telemetry.go rename to pkg/telemetry/telemetry.go