From 872b902602569c8c9a93af550e9bbc8f7997d77d Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Fri, 5 Sep 2025 14:48:07 +0800 Subject: [PATCH 1/4] Introduce service.Settings.TelemetryFactory The `service.Settings` type now has a `TelemetryFactory` field for injecting the `telemetry.Factory` to be used for creating a logger and logger provider, meter provider, and tracer provider. The `otelcol` package is hard-coded to inject an otelconftelemetry factory for now. In a followup we will make it possible to inject the telemetry through `otelcol.Factories`. --- .../service-settings-telemetryfactory.yaml | 25 + internal/e2e/status_test.go | 3 +- otelcol/collector.go | 5 + otelcol/collector_windows.go | 3 +- otelcol/config_test.go | 24 +- otelcol/unmarshaler.go | 2 +- otelcol/unmarshaler_test.go | 25 +- service/config.go | 4 +- service/config_test.go | 92 +-- service/go.mod | 2 + service/service.go | 25 +- service/service_test.go | 722 ++++++------------ service/telemetry/telemetrytest/providers.go | 96 +++ .../telemetry/telemetrytest/providers_test.go | 123 +++ 14 files changed, 553 insertions(+), 598 deletions(-) create mode 100644 .chloggen/service-settings-telemetryfactory.yaml create mode 100644 service/telemetry/telemetrytest/providers.go create mode 100644 service/telemetry/telemetrytest/providers_test.go diff --git a/.chloggen/service-settings-telemetryfactory.yaml b/.chloggen/service-settings-telemetryfactory.yaml new file mode 100644 index 00000000000..323c8c835d9 --- /dev/null +++ b/.chloggen/service-settings-telemetryfactory.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The `service.Settings` type now requires a `telemetry.Factory` to be provided + +# One or more tracking issues or pull requests related to the change +issues: [4970] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/internal/e2e/status_test.go b/internal/e2e/status_test.go index ecb1b084525..522968f70f0 100644 --- a/internal/e2e/status_test.go +++ b/internal/e2e/status_test.go @@ -75,11 +75,12 @@ func Test_ComponentStatusReporting_SharedInstance(t *testing.T) { ExtensionsFactories: map[component.Type]extension.Factory{ component.MustNewType("watcher"): newExtensionFactory(), }, + TelemetryFactory: otelconftelemetry.NewFactory(), } set.BuildInfo = component.BuildInfo{Version: "test version", Command: "otelcoltest"} cfg := service.Config{ - Telemetry: otelconftelemetry.Config{ + Telemetry: &otelconftelemetry.Config{ Logs: otelconftelemetry.LogsConfig{ Level: zapcore.InfoLevel, Development: false, diff --git a/otelcol/collector.go b/otelcol/collector.go index a719d4db550..419a855d742 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/confmap/xconfmap" "go.opentelemetry.io/collector/otelcol/internal/grpclog" "go.opentelemetry.io/collector/service" + "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry" ) // State defines Collector's state. @@ -216,6 +217,10 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { }, AsyncErrorChannel: col.asyncErrorChannel, LoggingOptions: col.set.LoggingOptions, + + // TODO: inject the telemetry factory through factories. + // See https://github.com/open-telemetry/opentelemetry-collector/issues/4970 + TelemetryFactory: otelconftelemetry.NewFactory(), }, cfg.Service) if err != nil { return err diff --git a/otelcol/collector_windows.go b/otelcol/collector_windows.go index c88ccb9758d..f8fee8d99d6 100644 --- a/otelcol/collector_windows.go +++ b/otelcol/collector_windows.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/service" + "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry" ) type windowsService struct { @@ -214,7 +215,7 @@ func (w windowsEventLogCore) Sync() error { func withWindowsCore(elog *eventlog.Log, serviceConfig **service.Config) func(zapcore.Core) zapcore.Core { return func(core zapcore.Core) zapcore.Core { if serviceConfig != nil && *serviceConfig != nil { - for _, output := range (*serviceConfig).Telemetry.Logs.OutputPaths { + for _, output := range (*serviceConfig).Telemetry.(*otelconftelemetry.Config).Logs.OutputPaths { if output != "stdout" && output != "stderr" { // A log file was specified in the configuration, so we should not use the Windows Event Log return core diff --git a/otelcol/config_test.go b/otelcol/config_test.go index 2a3b5ecc3bd..e9c80d96ae3 100644 --- a/otelcol/config_test.go +++ b/otelcol/config_test.go @@ -50,10 +50,10 @@ func TestConfigValidate(t *testing.T) { expected: nil, }, { - name: "custom-service-telemetrySettings-encoding", + name: "valid-telemetry-config", cfgFn: func() *Config { cfg := generateConfig() - cfg.Service.Telemetry.Logs.Encoding = "json" + cfg.Service.Telemetry = fakeTelemetryConfig{} return cfg }, expected: nil, @@ -89,6 +89,15 @@ func TestConfigValidate(t *testing.T) { }, expected: errMissingReceivers, }, + { + name: "invalid-telemetry-config", + cfgFn: func() *Config { + cfg := generateConfig() + cfg.Service.Telemetry = fakeTelemetryConfig{Invalid: true} + return cfg + }, + expected: errors.New("service::telemetry: invalid config"), + }, { name: "invalid-extension-reference", cfgFn: func() *Config { @@ -330,3 +339,14 @@ func generateConfig() *Config { func newPtr[T int | string](str T) *T { return &str } + +type fakeTelemetryConfig struct { + Invalid bool `mapstructure:"invalid"` +} + +func (cfg fakeTelemetryConfig) Validate() error { + if cfg.Invalid { + return errors.New("invalid config") + } + return nil +} diff --git a/otelcol/unmarshaler.go b/otelcol/unmarshaler.go index d38e140b066..0874e48f417 100644 --- a/otelcol/unmarshaler.go +++ b/otelcol/unmarshaler.go @@ -30,7 +30,7 @@ func unmarshal(v *confmap.Conf, factories Factories) (*configSettings, error) { // TODO: inject the telemetry factory through factories, once available. // See https://github.com/open-telemetry/opentelemetry-collector/issues/4970 telFactory := otelconftelemetry.NewFactory() - defaultTelConfig := *telFactory.CreateDefaultConfig().(*otelconftelemetry.Config) + defaultTelConfig := telFactory.CreateDefaultConfig().(*otelconftelemetry.Config) // Unmarshal top level sections and validate. cfg := &configSettings{ diff --git a/otelcol/unmarshaler_test.go b/otelcol/unmarshaler_test.go index 5a35305ecf8..66f370ebebf 100644 --- a/otelcol/unmarshaler_test.go +++ b/otelcol/unmarshaler_test.go @@ -56,7 +56,7 @@ func TestUnmarshalEmptyAllSections(t *testing.T) { OutputPaths: zapProdCfg.OutputPaths, ErrorOutputPaths: zapProdCfg.ErrorOutputPaths, InitialFields: zapProdCfg.InitialFields, - }, cfg.Service.Telemetry.Logs) + }, cfg.Service.Telemetry.(*otelconftelemetry.Config).Logs) } func TestUnmarshalUnknownTopLevel(t *testing.T) { @@ -149,26 +149,13 @@ func TestServiceUnmarshalError(t *testing.T) { expectError string }{ { - name: "invalid-logs-level", + name: "invalid-telemetry-unknown-key", conf: confmap.NewFromStringMap(map[string]any{ "telemetry": map[string]any{ - "logs": map[string]any{ - "level": "UNKNOWN", - }, - }, - }), - expectError: "decoding failed due to the following error(s):\n\n'telemetry.logs' decoding failed due to the following error(s):\n\n'level' unrecognized level: \"UNKNOWN\"", - }, - { - name: "invalid-metrics-level", - conf: confmap.NewFromStringMap(map[string]any{ - "telemetry": map[string]any{ - "metrics": map[string]any{ - "level": "unknown", - }, + "unknown": "key", }, }), - expectError: "decoding failed due to the following error(s):\n\n'telemetry.metrics' decoding failed due to the following error(s):\n\n'level' unknown metrics level \"unknown\"", + expectError: "decoding failed due to the following error(s):\n\n'telemetry' has invalid keys: unknown", }, { name: "invalid-service-extensions-section", @@ -201,7 +188,9 @@ func TestServiceUnmarshalError(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - err := tt.conf.Unmarshal(&service.Config{}) + err := tt.conf.Unmarshal(&service.Config{ + Telemetry: fakeTelemetryConfig{}, + }) require.ErrorContains(t, err, tt.expectError) }) } diff --git a/service/config.go b/service/config.go index e69a1417c79..efe94791d8c 100644 --- a/service/config.go +++ b/service/config.go @@ -4,15 +4,15 @@ package service // import "go.opentelemetry.io/collector/service" import ( + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/pipelines" - "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry" ) // Config defines the configurable components of the Service. type Config struct { // Telemetry is the configuration for collector's own telemetry. - Telemetry otelconftelemetry.Config `mapstructure:"telemetry"` + Telemetry component.Config `mapstructure:"telemetry"` // Extensions are the ordered list of extensions configured for the service. Extensions extensions.Config `mapstructure:"extensions,omitempty"` diff --git a/service/config_test.go b/service/config_test.go index e9fffd02b20..60065255b82 100644 --- a/service/config_test.go +++ b/service/config_test.go @@ -6,21 +6,16 @@ package service import ( "errors" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - config "go.opentelemetry.io/contrib/otelconf/v0.3.0" - "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/xconfmap" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/pipelines" - "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry" ) func TestConfigValidate(t *testing.T) { @@ -35,10 +30,10 @@ func TestConfigValidate(t *testing.T) { expected: nil, }, { - name: "custom-service-telemetrySettings-encoding", + name: "valid-telemetry-config", cfgFn: func() *Config { cfg := generateConfig() - cfg.Telemetry.Logs.Encoding = "json" + cfg.Telemetry = fakeTelemetryConfig{Invalid: false} return cfg }, expected: nil, @@ -54,14 +49,13 @@ func TestConfigValidate(t *testing.T) { expected: errors.New(`references processor "nop" multiple times`), }, { - name: "invalid-telemetry-metric-config", + name: "invalid-telemetry-config", cfgFn: func() *Config { cfg := generateConfig() - cfg.Telemetry.Metrics.Level = configtelemetry.LevelBasic - cfg.Telemetry.Metrics.Readers = nil + cfg.Telemetry = fakeTelemetryConfig{Invalid: true} return cfg }, - expected: errors.New("collector telemetry metrics reader should exist when metric level is not none"), + expected: errors.New("telemetry: invalid config"), }, } @@ -79,80 +73,19 @@ func TestConfigValidate(t *testing.T) { } func TestConfmapMarshalConfig(t *testing.T) { - telFactory := otelconftelemetry.NewFactory() - defaultTelConfig := *telFactory.CreateDefaultConfig().(*otelconftelemetry.Config) conf := confmap.New() require.NoError(t, conf.Marshal(Config{ - Telemetry: defaultTelConfig, + Telemetry: fakeTelemetryConfig{}, })) assert.Equal(t, map[string]any{ "pipelines": map[string]any(nil), - "telemetry": map[string]any{ - "logs": map[string]any{ - "encoding": "console", - "level": "info", - "error_output_paths": []any{"stderr"}, - "output_paths": []any{"stderr"}, - "sampling": map[string]any{ - "enabled": true, - "initial": 10, - "thereafter": 100, - "tick": 10 * time.Second, - }, - }, - "metrics": map[string]any{ - "level": "Normal", - "readers": []any{ - map[string]any{ - "pull": map[string]any{ - "exporter": map[string]any{ - "prometheus": map[string]any{ - "host": "localhost", - "port": 8888, - "with_resource_constant_labels": map[string]any{ - "included": []any{}, - }, - "without_scope_info": true, - "without_type_suffix": true, - "without_units": true, - }, - }, - }, - }, - }, - }, - }, + "telemetry": map[string]any{"invalid": false}, }, conf.ToStringMap()) } func generateConfig() *Config { return &Config{ - Telemetry: otelconftelemetry.Config{ - Logs: otelconftelemetry.LogsConfig{ - Level: zapcore.DebugLevel, - Development: true, - Encoding: "console", - DisableCaller: true, - DisableStacktrace: true, - OutputPaths: []string{"stderr", "./output-logs"}, - ErrorOutputPaths: []string{"stderr", "./error-output-logs"}, - InitialFields: map[string]any{"fieldKey": "filed-value"}, - }, - Metrics: otelconftelemetry.MetricsConfig{ - Level: configtelemetry.LevelNormal, - MeterProvider: config.MeterProvider{ - Readers: []config.MetricReader{ - { - Pull: &config.PullMetricReader{Exporter: config.PullMetricExporter{Prometheus: &config.Prometheus{ - Host: newPtr("localhost"), - Port: newPtr(8080), - }}}, - }, - }, - }, - }, - }, Extensions: extensions.Config{component.MustNewID("nop")}, Pipelines: pipelines.Config{ pipeline.NewID(pipeline.SignalTraces): { @@ -163,3 +96,14 @@ func generateConfig() *Config { }, } } + +type fakeTelemetryConfig struct { + Invalid bool `mapstructure:"invalid"` +} + +func (cfg fakeTelemetryConfig) Validate() error { + if cfg.Invalid { + return errors.New("invalid config") + } + return nil +} diff --git a/service/go.mod b/service/go.mod index 15369079314..55c3c9d3b59 100644 --- a/service/go.mod +++ b/service/go.mod @@ -51,6 +51,7 @@ require ( go.opentelemetry.io/contrib/propagators/b3 v1.36.0 go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/log v0.14.0 + go.opentelemetry.io/otel/log/logtest v0.14.0 go.opentelemetry.io/otel/metric v1.38.0 go.opentelemetry.io/otel/sdk v1.38.0 go.opentelemetry.io/otel/sdk/metric v1.38.0 @@ -77,6 +78,7 @@ require ( github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v1.0.0 // indirect + github.com/google/go-cmp v0.7.0 // indirect github.com/google/go-tpm v0.9.5 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect github.com/hashicorp/go-version v1.7.0 // indirect diff --git a/service/service.go b/service/service.go index eb64c83f48c..d8e3b8b14df 100644 --- a/service/service.go +++ b/service/service.go @@ -37,7 +37,6 @@ import ( "go.opentelemetry.io/collector/service/internal/proctelemetry" "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/telemetry" - "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry" ) // This feature gate is deprecated and will be removed in 1.40.0. Views can now be configured. @@ -94,6 +93,9 @@ type Settings struct { // LoggingOptions provides a way to change behavior of zap logging. LoggingOptions []zap.Option + + // TelemetryFactory is the factory for creating internal telemetry providers. + TelemetryFactory telemetry.Factory } // Service represents the implementation of a component.Host. @@ -125,16 +127,14 @@ func New(ctx context.Context, set Settings, cfg Config) (_ *Service, resultErr e collectorConf: set.CollectorConf, } - telemetryFactory := otelconftelemetry.NewFactory() - telemetrySettings := telemetry.Settings{BuildInfo: set.BuildInfo} - // Create the logger & LoggerProvider first. These may be used // when creating the other telemetry providers. + telemetrySettings := telemetry.Settings{BuildInfo: set.BuildInfo} loggerSettings := telemetry.LoggerSettings{ Settings: telemetrySettings, ZapOptions: set.LoggingOptions, } - logger, loggerProvider, err := telemetryFactory.CreateLogger(ctx, loggerSettings, &cfg.Telemetry) + logger, loggerProvider, err := set.TelemetryFactory.CreateLogger(ctx, loggerSettings, cfg.Telemetry) if err != nil { return nil, fmt.Errorf("failed to create logger: %w", err) } @@ -161,14 +161,11 @@ func New(ctx context.Context, set Settings, cfg Config) (_ *Service, resultErr e })) meterSettings := telemetry.MeterSettings{ - Settings: telemetrySettings, - Logger: logger, - } - mpConfig := &cfg.Telemetry.Metrics.MeterProvider - if mpConfig.Views == nil { - mpConfig.Views = configureViews(cfg.Telemetry.Metrics.Level) + Settings: telemetrySettings, + Logger: logger, + DefaultViews: configureViews, } - meterProvider, err := telemetryFactory.CreateMeterProvider(ctx, meterSettings, &cfg.Telemetry) + meterProvider, err := set.TelemetryFactory.CreateMeterProvider(ctx, meterSettings, cfg.Telemetry) if err != nil { return nil, fmt.Errorf("failed to create meter provider: %w", err) } @@ -183,7 +180,7 @@ func New(ctx context.Context, set Settings, cfg Config) (_ *Service, resultErr e Settings: telemetrySettings, Logger: logger, } - tracerProvider, err := telemetryFactory.CreateTracerProvider(ctx, tracerSettings, &cfg.Telemetry) + tracerProvider, err := set.TelemetryFactory.CreateTracerProvider(ctx, tracerSettings, cfg.Telemetry) if err != nil { return nil, fmt.Errorf("failed to create tracer provider: %w", err) } @@ -194,7 +191,7 @@ func New(ctx context.Context, set Settings, cfg Config) (_ *Service, resultErr e }() srv.tracerProvider = tracerProvider - resource, err := telemetryFactory.CreateResource(ctx, telemetrySettings, &cfg.Telemetry) + resource, err := set.TelemetryFactory.CreateResource(ctx, telemetrySettings, cfg.Telemetry) if err != nil { return nil, fmt.Errorf("failed to create resource: %w", err) } diff --git a/service/service_test.go b/service/service_test.go index 4c0d0ce3902..00478e55aea 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -4,22 +4,23 @@ package service import ( - "bufio" "context" "errors" - "fmt" - "io" "net/http" - "net/http/httptest" - "strings" "testing" "time" - "github.com/prometheus/common/expfmt" - "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - config "go.opentelemetry.io/contrib/otelconf/v0.3.0" + otelconf "go.opentelemetry.io/contrib/otelconf/v0.3.0" + "go.opentelemetry.io/otel/log" + "go.opentelemetry.io/otel/log/logtest" + nooplog "go.opentelemetry.io/otel/log/noop" + noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + nooptrace "go.opentelemetry.io/otel/trace/noop" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -33,154 +34,20 @@ import ( "go.opentelemetry.io/collector/extension/zpagesextension" "go.opentelemetry.io/collector/internal/testutil" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/promtest" "go.opentelemetry.io/collector/service/pipelines" + "go.opentelemetry.io/collector/service/telemetry" "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry" + "go.opentelemetry.io/collector/service/telemetry/telemetrytest" ) -type labelState int - const ( - labelNotPresent labelState = iota - labelSpecificValue - labelAnyValue -) - -type labelValue struct { - label string - state labelState -} - -type ownMetricsTestCase struct { - name string - userDefinedResource map[string]*string - expectedLabels map[string]labelValue -} - -var ( - testResourceAttrValue = "resource_attr_test_value" - testInstanceID = "test_instance_id" - testServiceVersion = "2022-05-20" - testServiceName = "test name" + otelCommand = "otelcoltest" ) -// prometheusToOtelConv is used to check that the expected resource labels exist as -// part of the otel resource attributes. -var prometheusToOtelConv = map[string]string{ - "service_instance_id": "service.instance.id", - "service_name": "service.name", - "service_version": "service.version", -} - -const ( - metricsVersion = "test version" - otelCommand = "otelcoltest" -) - -func ownMetricsTestCases() []ownMetricsTestCase { - return []ownMetricsTestCase{ - { - name: "no resource", - userDefinedResource: nil, - // All labels added to all collector metrics by default are listed below. - // These labels are hard coded here in order to avoid inadvertent changes: - // at this point changing labels should be treated as a breaking changing - // and requires a good justification. The reason is that changes to metric - // names or labels can break alerting, dashboards, etc that are used to - // monitor the Collector in production deployments. - expectedLabels: map[string]labelValue{ - "service_instance_id": {state: labelAnyValue}, - "service_name": {label: otelCommand, state: labelSpecificValue}, - "service_version": {label: metricsVersion, state: labelSpecificValue}, - }, - }, - { - name: "resource with custom attr", - userDefinedResource: map[string]*string{ - "custom_resource_attr": &testResourceAttrValue, - }, - expectedLabels: map[string]labelValue{ - "service_instance_id": {state: labelAnyValue}, - "service_name": {label: otelCommand, state: labelSpecificValue}, - "service_version": {label: metricsVersion, state: labelSpecificValue}, - "custom_resource_attr": {label: "resource_attr_test_value", state: labelSpecificValue}, - }, - }, - { - name: "override service.name", - userDefinedResource: map[string]*string{ - "service.name": &testServiceName, - }, - expectedLabels: map[string]labelValue{ - "service_instance_id": {state: labelAnyValue}, - "service_name": {label: testServiceName, state: labelSpecificValue}, - "service_version": {label: metricsVersion, state: labelSpecificValue}, - }, - }, - { - name: "suppress service.name", - userDefinedResource: map[string]*string{ - "service.name": nil, - }, - expectedLabels: map[string]labelValue{ - "service_instance_id": {state: labelAnyValue}, - "service_name": {state: labelNotPresent}, - "service_version": {label: metricsVersion, state: labelSpecificValue}, - }, - }, - { - name: "override service.instance.id", - userDefinedResource: map[string]*string{ - "service.instance.id": &testInstanceID, - }, - expectedLabels: map[string]labelValue{ - "service_instance_id": {label: "test_instance_id", state: labelSpecificValue}, - "service_name": {label: otelCommand, state: labelSpecificValue}, - "service_version": {label: metricsVersion, state: labelSpecificValue}, - }, - }, - { - name: "suppress service.instance.id", - userDefinedResource: map[string]*string{ - "service.instance.id": nil, // nil value in config is used to suppress attributes. - }, - expectedLabels: map[string]labelValue{ - "service_instance_id": {state: labelNotPresent}, - "service_name": {label: otelCommand, state: labelSpecificValue}, - "service_version": {label: metricsVersion, state: labelSpecificValue}, - }, - }, - { - name: "override service.version", - userDefinedResource: map[string]*string{ - "service.version": &testServiceVersion, - }, - expectedLabels: map[string]labelValue{ - "service_instance_id": {state: labelAnyValue}, - "service_name": {label: otelCommand, state: labelSpecificValue}, - "service_version": {label: "2022-05-20", state: labelSpecificValue}, - }, - }, - { - name: "suppress service.version", - userDefinedResource: map[string]*string{ - "service.version": nil, // nil value in config is used to suppress attributes. - }, - expectedLabels: map[string]labelValue{ - "service_instance_id": {state: labelAnyValue}, - "service_name": {label: otelCommand, state: labelSpecificValue}, - "service_version": {state: labelNotPresent}, - }, - }, - } -} - var ( nopType = component.MustNewType("nop") wrongType = component.MustNewType("wrong") @@ -273,50 +140,18 @@ func TestServiceTelemetryCleanupOnError(t *testing.T) { } func TestServiceTelemetryLogging(t *testing.T) { - // Create a server for receiving OTLP logs. - var received []plog.Logs - mux := http.NewServeMux() - mux.HandleFunc("/v1/logs", func(_ http.ResponseWriter, req *http.Request) { - body, err := io.ReadAll(req.Body) - assert.NoError(t, err) - - exportRequest := plogotlp.NewExportRequest() - assert.NoError(t, exportRequest.UnmarshalProto(body)) - received = append(received, exportRequest.Logs()) - }) - httpServer := httptest.NewServer(mux) - defer httpServer.Close() - - // We'll divert zap logs to an observer. observerCore, observedLogs := observer.New(zapcore.WarnLevel) + zapLogger := zap.New(observerCore) + recorder := logtest.NewRecorder() set := newNopSettings() set.BuildInfo = component.BuildInfo{Version: "test version", Command: otelCommand} - set.LoggingOptions = []zap.Option{ - zap.WrapCore(func(zapcore.Core) zapcore.Core { - return observerCore - }), - } + set.TelemetryFactory = telemetry.NewFactory( + func() component.Config { return nil }, + telemetrytest.WithLogger(zapLogger, recorder), + ) cfg := newNopConfig() - cfg.Telemetry.Logs.Sampling = &otelconftelemetry.LogsSamplingConfig{ - Enabled: true, - Tick: time.Minute, - Initial: 2, - Thereafter: 0, - } - cfg.Telemetry.Logs.Processors = []config.LogRecordProcessor{{ - Simple: &config.SimpleLogRecordProcessor{ - Exporter: config.LogRecordExporter{ - OTLP: &config.OTLP{ - Endpoint: ptr(httpServer.URL), - Protocol: ptr("http/protobuf"), - Insecure: ptr(true), - }, - }, - }, - }} - srv, err := New(context.Background(), set, cfg) require.NoError(t, err) require.NoError(t, srv.Start(context.Background())) @@ -324,85 +159,117 @@ func TestServiceTelemetryLogging(t *testing.T) { assert.NoError(t, srv.Shutdown(context.Background())) }() - // The level we configured on the initial Zap logger should have - // propagated to the final one provided to components. require.NotNil(t, srv.telemetrySettings.Logger) + assert.Equal(t, srv.telemetrySettings.Logger, srv.Logger()) assert.Equal(t, zapcore.WarnLevel, srv.telemetrySettings.Logger.Level()) - - // Log 5 messages at different levels. Only the warning messages should - // be accepted, and only 2 of those due to sampling. - for i := 0; i < 5; i++ { - srv.telemetrySettings.Logger.Warn("warn_message") - srv.telemetrySettings.Logger.Info("info_message") - srv.telemetrySettings.Logger.Debug("debug_message") - } - assert.Equal(t, 2, observedLogs.Len()) - assert.Equal(t, 2, observedLogs.FilterMessage("warn_message").Len()) - require.Len(t, received, 2) - for _, logs := range received { - assert.Equal(t, - "warn_message", - logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().AsString(), - ) - } + srv.telemetrySettings.Logger.Warn("warn_message") + srv.telemetrySettings.Logger.Info("info_message") + + entries := observedLogs.All() + require.Len(t, entries, 1) + assert.Equal(t, "warn_message", entries[0].Message) + + logtest.AssertEqual(t, logtest.Recording{ + logtest.Scope{ + Name: "go.opentelemetry.io/collector/service", + }: []logtest.Record{{ + Context: context.Background(), + Timestamp: time.Time{}, + Severity: log.SeverityWarn, + SeverityText: "warn", + Body: log.StringValue("warn_message"), + Attributes: []log.KeyValue{}, + }}, + }, recorder.Result(), + logtest.Transform(func(recording logtest.Recording) logtest.Recording { + // Remove empty scopes. + newRecording := make(logtest.Recording) + for scope, records := range recording { + if len(records) != 0 { + newRecording[scope] = records + } + } + return newRecording + }), + logtest.Transform(func(record logtest.Record) logtest.Record { + // Clear timestamp and any attributes for easier testing. + record.Timestamp = time.Time{} + record.Attributes = nil + return record + }), + ) } func TestServiceTelemetryMetrics(t *testing.T) { - for _, tc := range ownMetricsTestCases() { - t.Run("ipv4_"+tc.name, func(t *testing.T) { - testCollectorStartHelperWithReaders(t, tc, promtest.GetAvailableLocalAddressPrometheus(t)) - }) - t.Run("ipv6_"+tc.name, func(t *testing.T) { - testCollectorStartHelperWithReaders(t, tc, promtest.GetAvailableLocalIPv6AddressPrometheus(t)) - }) - } -} - -func testCollectorStartHelperWithReaders(t *testing.T, tc ownMetricsTestCase, metricsAddr *config.Prometheus) { - set := newNopSettings() - set.BuildInfo = component.BuildInfo{Version: "test version", Command: otelCommand} - - cfg := newNopConfig() - cfg.Telemetry.Metrics.Readers = []config.MetricReader{ - { - Pull: &config.PullMetricReader{ - Exporter: config.PullMetricExporter{ - Prometheus: metricsAddr, - }, - }, - }, - } - cfg.Telemetry.Resource = make(map[string]*string) - // Include resource attributes under the service::telemetry::resource key. - for k, v := range tc.userDefinedResource { - cfg.Telemetry.Resource[k] = v - } - // Start a service and check that metrics are produced as expected. // We do this twice to ensure that the server is stopped cleanly. for i := 0; i < 2; i++ { - srv, err := New(context.Background(), set, cfg) - require.NoError(t, err) + reader := metric.NewManualReader() + set := newNopSettings() + set.TelemetryFactory = telemetry.NewFactory( + func() component.Config { return nil }, + telemetrytest.WithMeterProvider( + metric.NewMeterProvider( + metric.WithReader(reader), + ), + ), + ) + srv, err := New(context.Background(), set, newNopConfig()) + require.NoError(t, err) require.NoError(t, srv.Start(context.Background())) - // Wait for the HTTP server to start. - promHost := fmt.Sprintf("%s:%d", *metricsAddr.Host, *metricsAddr.Port) - require.Eventually(t, func() bool { - resp, err := http.Get("http://" + promHost + "/metrics") - if err != nil { - return false - } - defer resp.Body.Close() - return resp.StatusCode == http.StatusOK - }, 10*time.Second, 50*time.Millisecond) + var rm metricdata.ResourceMetrics + err = reader.Collect(context.Background(), &rm) + require.NoError(t, err) - assertResourceLabels(t, srv.telemetrySettings.Resource, tc.expectedLabels) - assertMetrics(t, promHost, tc.expectedLabels) + assertMetrics(t, rm) require.NoError(t, srv.Shutdown(context.Background())) } } +func assertMetrics(t *testing.T, rm metricdata.ResourceMetrics) { + require.Len(t, rm.ScopeMetrics, 1) + assert.Equal(t, "go.opentelemetry.io/collector/service", rm.ScopeMetrics[0].Scope.Name) + + actualNames := make([]string, len(rm.ScopeMetrics[0].Metrics)) + for i, m := range rm.ScopeMetrics[0].Metrics { + actualNames[i] = m.Name + } + assert.ElementsMatch(t, []string{ + "otelcol_process_cpu_seconds", + "otelcol_process_memory_rss", + "otelcol_process_runtime_heap_alloc_bytes", + "otelcol_process_runtime_total_alloc_bytes", + "otelcol_process_runtime_total_sys_memory_bytes", + "otelcol_process_uptime", + }, actualNames) +} + +func TestServiceTelemetryDefaultViews(t *testing.T) { + var views []otelconf.View + set := newNopSettings() + set.TelemetryFactory = telemetry.NewFactory( + func() component.Config { return nil }, + telemetry.WithCreateMeterProvider( + func(_ context.Context, set telemetry.MeterSettings, _ component.Config) (telemetry.MeterProvider, error) { + views = set.DefaultViews(configtelemetry.LevelBasic) + return telemetrytest.ShutdownMeterProvider{ + MeterProvider: noopmetric.NewMeterProvider(), + }, nil + }, + ), + ) + + srv, err := New(context.Background(), set, newNopConfig()) + require.NoError(t, err) + require.NoError(t, srv.Start(context.Background())) + defer func() { + assert.NoError(t, srv.Shutdown(context.Background())) + }() + require.NotEmpty(t, views) +} + // TestServiceTelemetryZPages verifies that the zpages extension works correctly with servce telemetry. func TestServiceTelemetryZPages(t *testing.T) { t.Run("ipv4", func(t *testing.T) { @@ -427,7 +294,15 @@ func testZPages(t *testing.T, zpagesAddr string) { cfg := newNopConfig() cfg.Extensions = []component.ID{component.MustNewID("zpages")} - cfg.Telemetry.Logs.Level = zapcore.FatalLevel // disable logs + + // The zpages extension will register/unregister a span processor with + // the tracer provider if it implements the RegisterSpanProcessor and + // UnregisterSpanProcessor methods of the opentelemetry-go SDK implementation. + // Hence we use sdktrace below, rather than the noop tracer provider. + set.TelemetryFactory = telemetry.NewFactory( + func() component.Config { return nil }, + telemetrytest.WithTracerProvider(sdktrace.NewTracerProvider()), + ) // Start a service and check that zpages is healthy. // We do this twice to ensure that the server is stopped cleanly. @@ -467,105 +342,90 @@ func zpagesHealthy(zpagesAddr string) bool { return true } -// TestServiceTelemetryRestart tests that the service correctly restarts the telemetry server. +// TestServiceTelemetryRestart tests that the service starts and shuts down telemetry as expected. func TestServiceTelemetryRestart(t *testing.T) { - metricsAddr := promtest.GetAvailableLocalAddressPrometheus(t) - cfg := newNopConfig() - cfg.Telemetry.Metrics.Readers = []config.MetricReader{ - { - Pull: &config.PullMetricReader{ - Exporter: config.PullMetricExporter{ - Prometheus: metricsAddr, - }, - }, - }, - } - // Create a service - srvOne, err := New(context.Background(), newNopSettings(), cfg) - require.NoError(t, err) - - // URL of the telemetry service metrics endpoint - telemetryURL := fmt.Sprintf("http://%s:%d/metrics", *metricsAddr.Host, *metricsAddr.Port) + telemetryCreated := make(chan struct{}, 1) + telemetryShutdown := make(chan struct{}, 1) - // Start the service - require.NoError(t, srvOne.Start(context.Background())) - - // check telemetry server to ensure we get a response - var resp *http.Response - - //nolint:gosec - resp, err = http.Get(telemetryURL) - assert.NoError(t, err) - assert.NoError(t, resp.Body.Close()) - assert.Equal(t, http.StatusOK, resp.StatusCode) - // Response body must be closed now instead of defer as the test - // restarts the server on the same port. Leaving response open - // leaks a goroutine. - resp.Body.Close() - - // Shutdown the service - require.NoError(t, srvOne.Shutdown(context.Background())) - - // Create a new service with the same telemetry - srvTwo, err := New(context.Background(), newNopSettings(), cfg) - require.NoError(t, err) - - // Start the new service - require.NoError(t, srvTwo.Start(context.Background())) - - // check telemetry server to ensure we get a response - require.Eventually(t, - func() bool { - //nolint:gosec - resp, err = http.Get(telemetryURL) - assert.NoError(t, resp.Body.Close()) - return err == nil - }, - 500*time.Millisecond, - 100*time.Millisecond, - "Must get a valid response from the service", + set := newNopSettings() + set.TelemetryFactory = telemetry.NewFactory( + func() component.Config { return nil }, + telemetry.WithCreateTracerProvider( + func(context.Context, telemetry.TracerSettings, component.Config) (telemetry.TracerProvider, error) { + telemetryCreated <- struct{}{} + return telemetrytest.ShutdownTracerProvider{ + TracerProvider: nooptrace.NewTracerProvider(), + ShutdownFunc: func(context.Context) error { + telemetryShutdown <- struct{}{} + return nil + }, + }, nil + }, + ), ) - defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) - // Shutdown the new service - assert.NoError(t, srvTwo.Shutdown(context.Background())) + for i := 0; i < 2; i++ { + // Create and start a service, telemetry should be created. + srv, err := New(context.Background(), set, newNopConfig()) + require.NoError(t, err) + require.NoError(t, srv.Start(context.Background())) + <-telemetryCreated + + // Shutdown the service, telemetry should be shutdown. + require.NoError(t, srv.Shutdown(context.Background())) + <-telemetryShutdown + } } func TestServiceTelemetryShutdownError(t *testing.T) { - cfg := newNopConfig() - cfg.Telemetry.Logs.Level = zapcore.DebugLevel - cfg.Telemetry.Logs.Processors = []config.LogRecordProcessor{{ - Batch: &config.BatchLogRecordProcessor{ - Exporter: config.LogRecordExporter{ - OTLP: &config.OTLP{ - Protocol: ptr("http/protobuf"), - Endpoint: ptr("http://testing.invalid"), - }, - }, - }, - }} - cfg.Telemetry.Metrics.Level = configtelemetry.LevelDetailed - cfg.Telemetry.Metrics.Readers = []config.MetricReader{{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.PushMetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: ptr("http/protobuf"), - Endpoint: ptr("http://testing.invalid"), - }, + set := newNopSettings() + set.TelemetryFactory = telemetry.NewFactory( + func() component.Config { return nil }, + telemetry.WithCreateLogger( + func(context.Context, telemetry.LoggerSettings, component.Config) (*zap.Logger, telemetry.LoggerProvider, error) { + return zap.NewNop(), telemetrytest.ShutdownLoggerProvider{ + LoggerProvider: nooplog.NewLoggerProvider(), + ShutdownFunc: func(context.Context) error { + return errors.New("an exception occurred") + }, + }, nil + }, + ), + telemetry.WithCreateMeterProvider( + func(context.Context, telemetry.MeterSettings, component.Config) (telemetry.MeterProvider, error) { + return telemetrytest.ShutdownMeterProvider{ + MeterProvider: noopmetric.NewMeterProvider(), + ShutdownFunc: func(context.Context) error { + return errors.New("an exception occurred") + }, + }, nil + }, + ), + telemetry.WithCreateTracerProvider( + func(context.Context, telemetry.TracerSettings, component.Config) (telemetry.TracerProvider, error) { + return telemetrytest.ShutdownTracerProvider{ + TracerProvider: nooptrace.NewTracerProvider(), + ShutdownFunc: func(context.Context) error { + return errors.New("an exception occurred") + }, + }, nil }, - }, - }} + ), + ) // Create and start a service - srv, err := New(context.Background(), newNopSettings(), cfg) + cfg := newNopConfig() + srv, err := New(context.Background(), set, cfg) require.NoError(t, err) require.NoError(t, srv.Start(context.Background())) // Shutdown the service err = srv.Shutdown(context.Background()) - require.ErrorContains(t, err, `failed to shutdown logger provider`) - require.ErrorContains(t, err, `failed to shutdown meter provider`) + assert.EqualError(t, err, ""+ + "failed to shutdown tracer provider: an exception occurred; "+ + "failed to shutdown meter provider: an exception occurred; "+ + "failed to shutdown logger provider: an exception occurred", + ) } func TestExtensionNotificationFailure(t *testing.T) { @@ -644,171 +504,66 @@ func TestServiceFatalError(t *testing.T) { require.ErrorIs(t, err, assert.AnError) } -func TestServiceInvalidTelemetryConfiguration(t *testing.T) { - tests := []struct { - name string - wantErr string - cfg otelconftelemetry.Config - }{ - { - name: "log config with processors and invalid config", - cfg: func() otelconftelemetry.Config { - cfg := otelconftelemetry.NewFactory().CreateDefaultConfig().(*otelconftelemetry.Config) - cfg.Logs.Level = zapcore.FatalLevel - cfg.Logs.Processors = []config.LogRecordProcessor{{ - Batch: &config.BatchLogRecordProcessor{ - Exporter: config.LogRecordExporter{ - OTLP: &config.OTLP{}, - }, - }, - }} - return *cfg - }(), - wantErr: "failed to create logger: no valid log exporter", +func TestServiceTelemetryCreateProvidersError(t *testing.T) { + loggerOpt := telemetry.WithCreateLogger( + func(context.Context, telemetry.LoggerSettings, component.Config) (*zap.Logger, telemetry.LoggerProvider, error) { + return nil, nil, errors.New("something went wrong") }, - { - name: "invalid metric reader", - cfg: func() otelconftelemetry.Config { - cfg := otelconftelemetry.NewFactory().CreateDefaultConfig().(*otelconftelemetry.Config) - cfg.Logs.Level = zapcore.FatalLevel - cfg.Metrics.Level = configtelemetry.LevelDetailed - cfg.Metrics.Readers = []config.MetricReader{{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.PushMetricExporter{ - OTLP: &config.OTLPMetric{}, - }, - }, - }} - return *cfg - }(), - wantErr: "failed to create meter provider: no valid metric exporter", + ) + meterOpt := telemetry.WithCreateMeterProvider( + func(context.Context, telemetry.MeterSettings, component.Config) (telemetry.MeterProvider, error) { + return nil, errors.New("something went wrong") }, - { - name: "invalid trace exporter", - cfg: func() otelconftelemetry.Config { - cfg := otelconftelemetry.NewFactory().CreateDefaultConfig().(*otelconftelemetry.Config) - cfg.Logs.Level = zapcore.FatalLevel - cfg.Traces.Level = configtelemetry.LevelDetailed - cfg.Traces.Processors = []config.SpanProcessor{{ - Batch: &config.BatchSpanProcessor{ - Exporter: config.SpanExporter{ - OTLP: &config.OTLP{}, - }, - }, - }} - return *cfg - }(), - wantErr: "failed to create tracer provider: no valid span exporter", + ) + tracerOpt := telemetry.WithCreateTracerProvider( + func(context.Context, telemetry.TracerSettings, component.Config) (telemetry.TracerProvider, error) { + return nil, errors.New("something went wrong") + }, + ) + resourceOpt := telemetry.WithCreateResource( + func(context.Context, telemetry.Settings, component.Config) (pcommon.Resource, error) { + return pcommon.Resource{}, errors.New("something went wrong") }, + ) + + type testcase struct { + opts []telemetry.FactoryOption + expectedErr string } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + for name, tc := range map[string]testcase{ + "CreateLogger": { + opts: []telemetry.FactoryOption{loggerOpt, meterOpt, tracerOpt, resourceOpt}, + expectedErr: "failed to create logger: something went wrong", + }, + "CreateMeterProvider": { + opts: []telemetry.FactoryOption{meterOpt, tracerOpt, resourceOpt}, + expectedErr: "failed to create meter provider: something went wrong", + }, + "CreateTracerProvider": { + opts: []telemetry.FactoryOption{tracerOpt, resourceOpt}, + expectedErr: "failed to create tracer provider: something went wrong", + }, + "CreateResource": { + opts: []telemetry.FactoryOption{resourceOpt}, + expectedErr: "failed to create resource: something went wrong", + }, + } { + t.Run(name, func(t *testing.T) { set := newNopSettings() - set.AsyncErrorChannel = make(chan error) - - cfg := newNopConfig() - cfg.Telemetry = tt.cfg - _, err := New(context.Background(), set, cfg) - if tt.wantErr != "" { - require.EqualError(t, err, tt.wantErr) - } else { - require.NoError(t, err) - } + set.TelemetryFactory = telemetry.NewFactory(func() component.Config { return nil }, tc.opts...) + _, err := New(context.Background(), set, newNopConfig()) + require.EqualError(t, err, tc.expectedErr) }) } } -func assertResourceLabels(t *testing.T, res pcommon.Resource, expectedLabels map[string]labelValue) { - for key, labelValue := range expectedLabels { - lookupKey, ok := prometheusToOtelConv[key] - if !ok { - lookupKey = key - } - value, ok := res.Attributes().Get(lookupKey) - switch labelValue.state { - case labelNotPresent: - assert.False(t, ok) - case labelAnyValue: - assert.True(t, ok) - default: - assert.Equal(t, labelValue.label, value.AsString()) - } - } -} - -func assertMetrics(t *testing.T, metricsAddr string, expectedLabels map[string]labelValue) { - client := &http.Client{} - resp, err := client.Get("http://" + metricsAddr + "/metrics") - require.NoError(t, err) - - t.Cleanup(func() { - assert.NoError(t, resp.Body.Close()) - }) - reader := bufio.NewReader(resp.Body) - - parser := expfmt.NewTextParser(model.UTF8Validation) - parsed, err := parser.TextToMetricFamilies(reader) - require.NoError(t, err) - - prefix := "otelcol" - expectedMetrics := map[string]bool{ - "target_info": false, - "otelcol_process_memory_rss": false, - "otelcol_process_cpu_seconds": false, - "otelcol_process_runtime_total_sys_memory_bytes": false, - "otelcol_process_runtime_heap_alloc_bytes": false, - "otelcol_process_runtime_total_alloc_bytes": false, - "otelcol_process_uptime": false, - "promhttp_metric_handler_errors_total": false, - } - for metricName, metricFamily := range parsed { - if _, ok := expectedMetrics[metricName]; !ok { - require.True(t, ok, "unexpected metric: %s", metricName) - } - expectedMetrics[metricName] = true - if metricName == "promhttp_metric_handler_errors_total" { - continue - } - if metricName != "target_info" { - // require is used here so test fails with a single message. - require.True( - t, - strings.HasPrefix(metricName, prefix), - "expected prefix %q but string starts with %q", - prefix, - metricName[:len(prefix)+1]+"...") - } - - for _, metric := range metricFamily.Metric { - labelMap := map[string]string{} - for _, labelPair := range metric.Label { - labelMap[*labelPair.Name] = *labelPair.Value - } - - for k, v := range expectedLabels { - switch v.state { - case labelNotPresent: - _, present := labelMap[k] - assert.Falsef(t, present, "label %q must not be present", k) - case labelSpecificValue: - require.Equalf(t, v.label, labelMap[k], "mandatory label %q value mismatch", k) - case labelAnyValue: - assert.NotEmptyf(t, labelMap[k], "mandatory label %q not present", k) - } - } - } - } - for k, val := range expectedMetrics { - require.True(t, val, "missing metric: %s", k) - } -} - func newNopSettings() Settings { receiversConfigs, receiversFactories := builders.NewNopReceiverConfigsAndFactories() processorsConfigs, processorsFactories := builders.NewNopProcessorConfigsAndFactories() connectorsConfigs, connectorsFactories := builders.NewNopConnectorConfigsAndFactories() exportersConfigs, exportersFactories := builders.NewNopExporterConfigsAndFactories() extensionsConfigs, extensionsFactories := builders.NewNopExtensionConfigsAndFactories() + telemetryFactory := telemetry.NewFactory(func() component.Config { return nil }) return Settings{ BuildInfo: component.NewDefaultBuildInfo(), @@ -824,6 +579,7 @@ func newNopSettings() Settings { ExtensionsConfigs: extensionsConfigs, ExtensionsFactories: extensionsFactories, AsyncErrorChannel: make(chan error), + TelemetryFactory: telemetryFactory, } } @@ -907,10 +663,6 @@ func newConfigWatcherExtensionFactory(name component.Type) extension.Factory { ) } -func newPtr[T int | string](str T) *T { - return &str -} - func TestValidateGraph(t *testing.T) { testCases := map[string]struct { connectorCfg map[component.ID]component.Config diff --git a/service/telemetry/telemetrytest/providers.go b/service/telemetry/telemetrytest/providers.go new file mode 100644 index 00000000000..d254d9f162c --- /dev/null +++ b/service/telemetry/telemetrytest/providers.go @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetrytest // import "go.opentelemetry.io/collector/service/telemetry/telemetrytest" + +import ( + "context" + + "go.opentelemetry.io/otel/log" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/service/telemetry" +) + +// WithResource returns a telemetry.FactoryOption that configures the +// factory's CreateResource method to return res. +func WithResource(res pcommon.Resource) telemetry.FactoryOption { + return telemetry.WithCreateResource( + func(context.Context, telemetry.Settings, component.Config) (pcommon.Resource, error) { + return res, nil + }, + ) +} + +// WithLogger returns a telemetry.FactoryOption that configures the +// factory's CreateLogger method to return logger and provider. +// If provider does not implement the Shutdown method, it will be +// wrapped with ShutdownLoggerProvider with a no-op shutdown func. +func WithLogger(logger *zap.Logger, provider log.LoggerProvider) telemetry.FactoryOption { + return telemetry.WithCreateLogger( + func(context.Context, telemetry.LoggerSettings, component.Config) ( + *zap.Logger, telemetry.LoggerProvider, error, + ) { + withShutdown, ok := provider.(telemetry.LoggerProvider) + if !ok { + withShutdown = ShutdownLoggerProvider{LoggerProvider: provider} + } + return logger, withShutdown, nil + }, + ) +} + +// WithMeterProvider returns a telemetry.FactoryOption that configures the +// factory's CreateMeterProvider method to return provider. If provider does +// not implement the Shutdown method, it will be wrapped with +// ShutdownMeterProvider with a no-op shutdown func. +func WithMeterProvider(provider metric.MeterProvider) telemetry.FactoryOption { + return telemetry.WithCreateMeterProvider( + func(context.Context, telemetry.MeterSettings, component.Config) ( + telemetry.MeterProvider, error, + ) { + withShutdown, ok := provider.(telemetry.MeterProvider) + if !ok { + withShutdown = ShutdownMeterProvider{MeterProvider: provider} + } + return withShutdown, nil + }, + ) +} + +// WithTracerProvider returns a telemetry.FactoryOption that configures the +// factory's CreateTracerProvider method to return provider. If provider does +// not implement the Shutdown method, it will be wrapped with +// ShutdownTracerProvider with a no-op shutdown func. +func WithTracerProvider(provider trace.TracerProvider) telemetry.FactoryOption { + return telemetry.WithCreateTracerProvider( + func(context.Context, telemetry.TracerSettings, component.Config) ( + telemetry.TracerProvider, error, + ) { + withShutdown, ok := provider.(telemetry.TracerProvider) + if !ok { + withShutdown = ShutdownTracerProvider{TracerProvider: provider} + } + return withShutdown, nil + }, + ) +} + +type ShutdownLoggerProvider struct { + log.LoggerProvider + component.ShutdownFunc +} + +type ShutdownMeterProvider struct { + metric.MeterProvider + component.ShutdownFunc +} + +type ShutdownTracerProvider struct { + trace.TracerProvider + component.ShutdownFunc +} diff --git a/service/telemetry/telemetrytest/providers_test.go b/service/telemetry/telemetrytest/providers_test.go new file mode 100644 index 00000000000..3d5ade08646 --- /dev/null +++ b/service/telemetry/telemetrytest/providers_test.go @@ -0,0 +1,123 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetrytest // import "go.opentelemetry.io/collector/service/telemetry/telemetrytest" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/log" + nooplog "go.opentelemetry.io/otel/log/noop" + "go.opentelemetry.io/otel/metric" + noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + nooptrace "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/service/telemetry" +) + +func TestWithResource(t *testing.T) { + res := pcommon.NewResource() + res.Attributes().PutStr("key", "value") + + factory := telemetry.NewFactory(nil, WithResource(res)) + createdResource, err := factory.CreateResource(context.Background(), telemetry.Settings{}, nil) + require.NoError(t, err) + assert.Equal(t, res, createdResource) +} + +func TestWithLogger(t *testing.T) { + test := func(t *testing.T, provider log.LoggerProvider, expectedProvider telemetry.LoggerProvider) { + logger := zap.NewNop() + factory := telemetry.NewFactory(nil, WithLogger(logger, provider)) + createdLogger, createdProvider, err := factory.CreateLogger(context.Background(), telemetry.LoggerSettings{}, nil) + require.NoError(t, err) + assert.Same(t, logger, createdLogger) + assert.Equal(t, expectedProvider, createdProvider) + } + t.Run("Without Shutdown method", func(t *testing.T) { + provider := nooplog.NewLoggerProvider() + test(t, provider, ShutdownLoggerProvider{LoggerProvider: provider}) + }) + t.Run("With Shutdown method", func(t *testing.T) { + provider := new(struct{ telemetry.LoggerProvider }) + test(t, provider, provider) + }) +} + +func TestWithMeterProvider(t *testing.T) { + test := func(t *testing.T, provider metric.MeterProvider, expected telemetry.MeterProvider) { + factory := telemetry.NewFactory(nil, WithMeterProvider(provider)) + createdProvider, err := factory.CreateMeterProvider(context.Background(), telemetry.MeterSettings{}, nil) + require.NoError(t, err) + assert.Equal(t, expected, createdProvider) + } + t.Run("Without Shutdown method", func(t *testing.T) { + provider := noopmetric.NewMeterProvider() + test(t, provider, ShutdownMeterProvider{MeterProvider: provider}) + }) + t.Run("With Shutdown method", func(t *testing.T) { + provider := new(struct{ telemetry.MeterProvider }) + test(t, provider, provider) + }) +} + +func TestWithTracerProvider(t *testing.T) { + test := func(t *testing.T, provider trace.TracerProvider, expected telemetry.TracerProvider) { + factory := telemetry.NewFactory(nil, WithTracerProvider(provider)) + createdProvider, err := factory.CreateTracerProvider(context.Background(), telemetry.TracerSettings{}, nil) + require.NoError(t, err) + assert.Equal(t, expected, createdProvider) + } + t.Run("Without Shutdown method", func(t *testing.T) { + provider := nooptrace.NewTracerProvider() + test(t, provider, ShutdownTracerProvider{TracerProvider: provider}) + }) + t.Run("With Shutdown method", func(t *testing.T) { + provider := new(struct{ telemetry.TracerProvider }) + test(t, provider, provider) + }) +} + +/* +// WithMeterProvider returns a telemetry.FactoryOption that configures the +// factory's CreateMeterProvider method to return provider. If provider does +// not implement the Shutdown method, it will be wrapped with +// ShutdownMeterProvider with a no-op shutdown func. +func WithMeterProvider(provider metric.MeterProvider) telemetry.FactoryOption { + return telemetry.WithCreateMeterProvider( + func(context.Context, telemetry.MeterSettings, component.Config) ( + telemetry.MeterProvider, error, + ) { + withShutdown, ok := provider.(telemetry.MeterProvider) + if !ok { + withShutdown = ShutdownMeterProvider{MeterProvider: provider} + } + return withShutdown, nil + }, + ) +} + +// WithTracerProvider returns a telemetry.FactoryOption that configures the +// factory's CreateTracerProvider method to return provider. If provider does +// not implement the Shutdown method, it will be wrapped with +// ShutdownTracerProvider with a no-op shutdown func. +func WithTracerProvider(provider trace.TracerProvider) telemetry.FactoryOption { + return telemetry.WithCreateTracerProvider( + func(context.Context, telemetry.TracerSettings, component.Config) ( + telemetry.TracerProvider, error, + ) { + withShutdown, ok := provider.(telemetry.TracerProvider) + if !ok { + withShutdown = ShutdownTracerProvider{TracerProvider: provider} + } + return withShutdown, nil + }, + ) +} +*/ From b70c51754264f09fd84f7516e61c7ac6b6c32a71 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 24 Sep 2025 14:23:58 +0800 Subject: [PATCH 2/4] Update telemetrytest --- service/telemetry/telemetrytest/providers.go | 20 ++---- .../telemetry/telemetrytest/providers_test.go | 66 ++++--------------- 2 files changed, 15 insertions(+), 71 deletions(-) diff --git a/service/telemetry/telemetrytest/providers.go b/service/telemetry/telemetrytest/providers.go index d254d9f162c..6c8f0f5f47f 100644 --- a/service/telemetry/telemetrytest/providers.go +++ b/service/telemetry/telemetrytest/providers.go @@ -6,7 +6,6 @@ package telemetrytest // import "go.opentelemetry.io/collector/service/telemetry import ( "context" - "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -27,19 +26,13 @@ func WithResource(res pcommon.Resource) telemetry.FactoryOption { } // WithLogger returns a telemetry.FactoryOption that configures the -// factory's CreateLogger method to return logger and provider. -// If provider does not implement the Shutdown method, it will be -// wrapped with ShutdownLoggerProvider with a no-op shutdown func. -func WithLogger(logger *zap.Logger, provider log.LoggerProvider) telemetry.FactoryOption { +// factory's CreateLogger method to return logger and shutdown func. +func WithLogger(logger *zap.Logger, shutdownFunc component.ShutdownFunc) telemetry.FactoryOption { return telemetry.WithCreateLogger( func(context.Context, telemetry.LoggerSettings, component.Config) ( - *zap.Logger, telemetry.LoggerProvider, error, + *zap.Logger, component.ShutdownFunc, error, ) { - withShutdown, ok := provider.(telemetry.LoggerProvider) - if !ok { - withShutdown = ShutdownLoggerProvider{LoggerProvider: provider} - } - return logger, withShutdown, nil + return logger, shutdownFunc, nil }, ) } @@ -80,11 +73,6 @@ func WithTracerProvider(provider trace.TracerProvider) telemetry.FactoryOption { ) } -type ShutdownLoggerProvider struct { - log.LoggerProvider - component.ShutdownFunc -} - type ShutdownMeterProvider struct { metric.MeterProvider component.ShutdownFunc diff --git a/service/telemetry/telemetrytest/providers_test.go b/service/telemetry/telemetrytest/providers_test.go index 3d5ade08646..b18e7d9e7f7 100644 --- a/service/telemetry/telemetrytest/providers_test.go +++ b/service/telemetry/telemetrytest/providers_test.go @@ -5,12 +5,11 @@ package telemetrytest // import "go.opentelemetry.io/collector/service/telemetry import ( "context" + "errors" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/log" - nooplog "go.opentelemetry.io/otel/log/noop" "go.opentelemetry.io/otel/metric" noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" @@ -32,22 +31,17 @@ func TestWithResource(t *testing.T) { } func TestWithLogger(t *testing.T) { - test := func(t *testing.T, provider log.LoggerProvider, expectedProvider telemetry.LoggerProvider) { - logger := zap.NewNop() - factory := telemetry.NewFactory(nil, WithLogger(logger, provider)) - createdLogger, createdProvider, err := factory.CreateLogger(context.Background(), telemetry.LoggerSettings{}, nil) - require.NoError(t, err) - assert.Same(t, logger, createdLogger) - assert.Equal(t, expectedProvider, createdProvider) + logger := zap.NewNop() + shutdownErr := errors.New("shutdown error") + shutdown := func(context.Context) error { + return shutdownErr } - t.Run("Without Shutdown method", func(t *testing.T) { - provider := nooplog.NewLoggerProvider() - test(t, provider, ShutdownLoggerProvider{LoggerProvider: provider}) - }) - t.Run("With Shutdown method", func(t *testing.T) { - provider := new(struct{ telemetry.LoggerProvider }) - test(t, provider, provider) - }) + + factory := telemetry.NewFactory(nil, WithLogger(logger, shutdown)) + createdLogger, createdShutdown, err := factory.CreateLogger(context.Background(), telemetry.LoggerSettings{}, nil) + require.NoError(t, err) + assert.Same(t, logger, createdLogger) + assert.Same(t, shutdownErr, createdShutdown(t.Context())) } func TestWithMeterProvider(t *testing.T) { @@ -83,41 +77,3 @@ func TestWithTracerProvider(t *testing.T) { test(t, provider, provider) }) } - -/* -// WithMeterProvider returns a telemetry.FactoryOption that configures the -// factory's CreateMeterProvider method to return provider. If provider does -// not implement the Shutdown method, it will be wrapped with -// ShutdownMeterProvider with a no-op shutdown func. -func WithMeterProvider(provider metric.MeterProvider) telemetry.FactoryOption { - return telemetry.WithCreateMeterProvider( - func(context.Context, telemetry.MeterSettings, component.Config) ( - telemetry.MeterProvider, error, - ) { - withShutdown, ok := provider.(telemetry.MeterProvider) - if !ok { - withShutdown = ShutdownMeterProvider{MeterProvider: provider} - } - return withShutdown, nil - }, - ) -} - -// WithTracerProvider returns a telemetry.FactoryOption that configures the -// factory's CreateTracerProvider method to return provider. If provider does -// not implement the Shutdown method, it will be wrapped with -// ShutdownTracerProvider with a no-op shutdown func. -func WithTracerProvider(provider trace.TracerProvider) telemetry.FactoryOption { - return telemetry.WithCreateTracerProvider( - func(context.Context, telemetry.TracerSettings, component.Config) ( - telemetry.TracerProvider, error, - ) { - withShutdown, ok := provider.(telemetry.TracerProvider) - if !ok { - withShutdown = ShutdownTracerProvider{TracerProvider: provider} - } - return withShutdown, nil - }, - ) -} -*/ From 6c5fec02005604a52a567cce2e6cd6f463b25af6 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 24 Sep 2025 14:34:35 +0800 Subject: [PATCH 3/4] Fix service package --- service/service.go | 2 +- service/service_test.go | 47 +++++------------------------------------ 2 files changed, 6 insertions(+), 43 deletions(-) diff --git a/service/service.go b/service/service.go index b68610115ec..19a7c39a170 100644 --- a/service/service.go +++ b/service/service.go @@ -131,7 +131,7 @@ func New(ctx context.Context, set Settings, cfg Config) (_ *Service, resultErr e Settings: telemetrySettings, ZapOptions: set.LoggingOptions, } - logger, loggerShutdownFunc, err := telemetryFactory.CreateLogger(ctx, loggerSettings, cfg.Telemetry) + logger, loggerShutdownFunc, err := set.TelemetryFactory.CreateLogger(ctx, loggerSettings, cfg.Telemetry) if err != nil { return nil, fmt.Errorf("failed to create logger: %w", err) } diff --git a/service/service_test.go b/service/service_test.go index f423a357e7f..934b6be59f7 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -13,9 +13,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" otelconf "go.opentelemetry.io/contrib/otelconf/v0.3.0" - "go.opentelemetry.io/otel/log" - "go.opentelemetry.io/otel/log/logtest" - nooplog "go.opentelemetry.io/otel/log/noop" noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" @@ -142,13 +139,12 @@ func TestServiceTelemetryCleanupOnError(t *testing.T) { func TestServiceTelemetryLogging(t *testing.T) { observerCore, observedLogs := observer.New(zapcore.WarnLevel) zapLogger := zap.New(observerCore) - recorder := logtest.NewRecorder() set := newNopSettings() set.BuildInfo = component.BuildInfo{Version: "test version", Command: otelCommand} set.TelemetryFactory = telemetry.NewFactory( func() component.Config { return nil }, - telemetrytest.WithLogger(zapLogger, recorder), + telemetrytest.WithLogger(zapLogger, nil), ) cfg := newNopConfig() @@ -168,36 +164,6 @@ func TestServiceTelemetryLogging(t *testing.T) { entries := observedLogs.All() require.Len(t, entries, 1) assert.Equal(t, "warn_message", entries[0].Message) - - logtest.AssertEqual(t, logtest.Recording{ - logtest.Scope{ - Name: "go.opentelemetry.io/collector/service", - }: []logtest.Record{{ - Context: context.Background(), - Timestamp: time.Time{}, - Severity: log.SeverityWarn, - SeverityText: "warn", - Body: log.StringValue("warn_message"), - Attributes: []log.KeyValue{}, - }}, - }, recorder.Result(), - logtest.Transform(func(recording logtest.Recording) logtest.Recording { - // Remove empty scopes. - newRecording := make(logtest.Recording) - for scope, records := range recording { - if len(records) != 0 { - newRecording[scope] = records - } - } - return newRecording - }), - logtest.Transform(func(record logtest.Record) logtest.Record { - // Clear timestamp and any attributes for easier testing. - record.Timestamp = time.Time{} - record.Attributes = nil - return record - }), - ) } func TestServiceTelemetryMetrics(t *testing.T) { @@ -382,12 +348,9 @@ func TestServiceTelemetryShutdownError(t *testing.T) { set.TelemetryFactory = telemetry.NewFactory( func() component.Config { return nil }, telemetry.WithCreateLogger( - func(context.Context, telemetry.LoggerSettings, component.Config) (*zap.Logger, telemetry.LoggerProvider, error) { - return zap.NewNop(), telemetrytest.ShutdownLoggerProvider{ - LoggerProvider: nooplog.NewLoggerProvider(), - ShutdownFunc: func(context.Context) error { - return errors.New("an exception occurred") - }, + func(context.Context, telemetry.LoggerSettings, component.Config) (*zap.Logger, component.ShutdownFunc, error) { + return zap.NewNop(), func(context.Context) error { + return errors.New("an exception occurred") }, nil }, ), @@ -506,7 +469,7 @@ func TestServiceFatalError(t *testing.T) { func TestServiceTelemetryCreateProvidersError(t *testing.T) { loggerOpt := telemetry.WithCreateLogger( - func(context.Context, telemetry.LoggerSettings, component.Config) (*zap.Logger, telemetry.LoggerProvider, error) { + func(context.Context, telemetry.LoggerSettings, component.Config) (*zap.Logger, component.ShutdownFunc, error) { return nil, nil, errors.New("something went wrong") }, ) From 1c3d40e5d6ac8b69945b2f2e32f27a65b3137a86 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 24 Sep 2025 14:39:34 +0800 Subject: [PATCH 4/4] make tidy --- service/go.mod | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/service/go.mod b/service/go.mod index bb635126fa3..a8ca09594e1 100644 --- a/service/go.mod +++ b/service/go.mod @@ -50,8 +50,6 @@ require ( go.opentelemetry.io/contrib/otelconf v0.18.0 go.opentelemetry.io/contrib/propagators/b3 v1.38.0 go.opentelemetry.io/otel v1.38.0 - go.opentelemetry.io/otel/log v0.14.0 - go.opentelemetry.io/otel/log/logtest v0.14.0 go.opentelemetry.io/otel/metric v1.38.0 go.opentelemetry.io/otel/sdk v1.38.0 go.opentelemetry.io/otel/sdk/metric v1.38.0 @@ -78,7 +76,6 @@ require ( github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v1.0.0 // indirect - github.com/google/go-cmp v0.7.0 // indirect github.com/google/go-tpm v0.9.6 // indirect github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect @@ -131,6 +128,7 @@ require ( go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.14.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.38.0 // indirect + go.opentelemetry.io/otel/log v0.14.0 // indirect go.opentelemetry.io/otel/sdk/log v0.14.0 // indirect go.opentelemetry.io/proto/otlp v1.7.1 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect