diff --git a/README.md b/README.md index 88c77e57b1..dd0663659a 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ [![Documentation](https://img.shields.io/badge/Documentation-pgwat.ch-brightgreen)](https://pgwat.ch) [![License: MIT](https://img.shields.io/badge/License-BSD_3-green.svg)](https://opensource.org/license/bsd-3-clause) [![Go Build & Test](https://github.com/cybertec-postgresql/pgwatch/actions/workflows/build.yml/badge.svg)](https://github.com/cybertec-postgresql/pgwatch/actions/workflows/build.yml) -[![Coverage Status](https://img.shields.io/coverallsCoverage/github/cybertec-postgresql/pgwatch?branch=master&label=Coverage&color=red)](https://coveralls.io/github/cybertec-postgresql/pgwatch?branch=master) +[![Coverage Status](https://img.shields.io/coverallsCoverage/github/cybertec-postgresql/pgwatch?branch=master&label=Coverage&color=gold)](https://coveralls.io/github/cybertec-postgresql/pgwatch?branch=master) [![Downloads](https://img.shields.io/github/downloads/cybertec-postgresql/pgwatch/total?label=Downloads)](https://github.com/cybertec-postgresql/pgwatch/releases) [![Docker Pulls](https://img.shields.io/docker/pulls/cybertecpostgresql/pgwatch?label=Docker%20Pulls)](https://hub.docker.com/r/cybertecpostgresql/pgwatch) diff --git a/internal/reaper/reaper_test.go b/internal/reaper/reaper_test.go index ded3bea739..9370422c1f 100644 --- a/internal/reaper/reaper_test.go +++ b/internal/reaper/reaper_test.go @@ -2,9 +2,11 @@ package reaper import ( "context" + "errors" "os" "path/filepath" "testing" + "time" "github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts" "github.com/cybertec-postgresql/pgwatch/v5/internal/log" @@ -12,7 +14,7 @@ import ( "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks" "github.com/cybertec-postgresql/pgwatch/v5/internal/sources" "github.com/cybertec-postgresql/pgwatch/v5/internal/testutil" - "github.com/pashagolub/pgxmock/v4" + "github.com/pashagolub/pgxmock/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -100,14 +102,14 @@ func TestReaper_LoadSources(t *testing.T) { t.Run("Test source config changes trigger restart", func(t *testing.T) { baseSource := sources.Source{ - Name: "TestSource", - IsEnabled: true, - Kind: sources.SourcePostgres, - ConnStr: "postgres://localhost:5432/testdb", - Metrics: map[string]float64{"cpu": 10, "memory": 20}, - MetricsStandby: map[string]float64{"cpu": 30}, - CustomTags: map[string]string{"env": "test"}, - Group: "default", + Name: "TestSource", + IsEnabled: true, + Kind: sources.SourcePostgres, + ConnStr: "postgres://localhost:5432/testdb", + Metrics: map[string]float64{"cpu": 10, "memory": 20}, + MetricsStandby: map[string]float64{"cpu": 30}, + CustomTags: map[string]string{"env": "test"}, + Group: "default", } testCases := []struct { @@ -331,3 +333,265 @@ func TestReaper_LoadSources(t *testing.T) { assert.Nil(t, mockConn1.ExpectationsWereMet(), "Expected all mock expectations to be met") }) } + +func newFetchMetricReaper() *Reaper { + return &Reaper{ + Options: &cmdopts.Options{ + Metrics: metrics.CmdOpts{}, + Sinks: sinks.CmdOpts{}, + }, + measurementCache: NewInstanceMetricCache(), + } +} + +func TestReaper_FetchMetric(t *testing.T) { + ctx := log.WithLogger(t.Context(), log.NewNoopLogger()) + + t.Run("metric not found in definitions", func(t *testing.T) { + r := newFetchMetricReaper() + md, mock := createTestSourceConn(t) + defer mock.Close() + + env, err := r.FetchMetric(ctx, md, "nonexistent_metric_xyz") + assert.ErrorIs(t, err, metrics.ErrMetricNotFound) + assert.Nil(t, env) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("primary-only metric skipped on standby", func(t *testing.T) { + r := newFetchMetricReaper() + metricDefs.MetricDefs["primary_only_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT 1"}, + NodeStatus: "primary", + } + md, mock := createTestSourceConn(t) + defer mock.Close() + md.IsInRecovery = true + + env, err := r.FetchMetric(ctx, md, "primary_only_metric") + assert.NoError(t, err) + assert.Nil(t, env) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("standby-only metric skipped on primary", func(t *testing.T) { + r := newFetchMetricReaper() + metricDefs.MetricDefs["standby_only_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT 1"}, + NodeStatus: "standby", + } + md, mock := createTestSourceConn(t) + defer mock.Close() + md.IsInRecovery = false + + env, err := r.FetchMetric(ctx, md, "standby_only_metric") + assert.NoError(t, err) + assert.Nil(t, env) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("default metric with no SQL for version returns nil", func(t *testing.T) { + r := newFetchMetricReaper() + metricDefs.MetricDefs["no_sql_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{}, // no SQL defined + } + md, mock := createTestSourceConn(t) + defer mock.Close() + + env, err := r.FetchMetric(ctx, md, "no_sql_metric") + assert.NoError(t, err) + assert.Nil(t, env) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("default metric query success returns envelope", func(t *testing.T) { + r := newFetchMetricReaper() + metricDefs.MetricDefs["test_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT 1"}, + } + md, mock := createTestSourceConn(t) + defer mock.Close() + md.Name = "mydb" + md.CustomTags = map[string]string{"env": "prod"} + + rows := pgxmock.NewRows([]string{"epoch_ns", "value"}). + AddRow(time.Now().UnixNano(), int64(42)) + mock.ExpectQuery("SELECT 1").WillReturnRows(rows) + + env, err := r.FetchMetric(ctx, md, "test_metric") + require.NoError(t, err) + require.NotNil(t, env) + assert.Equal(t, "mydb", env.DBName) + assert.Equal(t, "test_metric", env.MetricName) + assert.Len(t, env.Data, 1) + assert.Equal(t, map[string]string{"env": "prod"}, env.CustomTags) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("default metric query error returns error", func(t *testing.T) { + r := newFetchMetricReaper() + metricDefs.MetricDefs["error_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT fail"}, + } + md, mock := createTestSourceConn(t) + defer mock.Close() + + mock.ExpectQuery("SELECT fail").WillReturnError(assert.AnError) + + env, err := r.FetchMetric(ctx, md, "error_metric") + assert.Error(t, err) + assert.Nil(t, env) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("default metric query returns empty rows", func(t *testing.T) { + r := newFetchMetricReaper() + metricDefs.MetricDefs["empty_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT empty"}, + } + md, mock := createTestSourceConn(t) + defer mock.Close() + + mock.ExpectQuery("SELECT empty").WillReturnRows(pgxmock.NewRows([]string{"epoch_ns"})) + + env, err := r.FetchMetric(ctx, md, "empty_metric") + assert.NoError(t, err) + assert.Nil(t, env) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("storage name used as metric name in envelope", func(t *testing.T) { + r := newFetchMetricReaper() + metricDefs.MetricDefs["logical_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT 1"}, + StorageName: "physical_metric", + } + md, mock := createTestSourceConn(t) + defer mock.Close() + + rows := pgxmock.NewRows([]string{"epoch_ns", "v"}). + AddRow(time.Now().UnixNano(), int64(1)) + mock.ExpectQuery("SELECT 1").WillReturnRows(rows) + + env, err := r.FetchMetric(ctx, md, "logical_metric") + require.NoError(t, err) + require.NotNil(t, env) + assert.Equal(t, "physical_metric", env.MetricName) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("instance_up special metric returns envelope via GetInstanceUpMeasurement", func(t *testing.T) { + r := newFetchMetricReaper() + metricDefs.MetricDefs[specialMetricInstanceUp] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT 1"}, + } + md, mock := createTestSourceConn(t) + defer mock.Close() + mock.ExpectPing() + + env, err := r.FetchMetric(ctx, md, specialMetricInstanceUp) + require.NoError(t, err) + require.NotNil(t, env) + assert.Equal(t, specialMetricInstanceUp, env.MetricName) + assert.Len(t, env.Data, 1) + assert.Equal(t, 1, env.Data[0][specialMetricInstanceUp]) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("change_events special metric returns nil when no changes detected", func(t *testing.T) { + r := newFetchMetricReaper() + metricDefs.MetricDefs[specialMetricChangeEvents] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT 1"}, + } + // Remove all hash metric definitions so detection functions return early + delete(metricDefs.MetricDefs, "sproc_hashes") + delete(metricDefs.MetricDefs, "table_hashes") + delete(metricDefs.MetricDefs, "index_hashes") + delete(metricDefs.MetricDefs, "configuration_hashes") + delete(metricDefs.MetricDefs, "privilege_hashes") + + md, mock := createTestSourceConn(t) + defer mock.Close() + + env, err := r.FetchMetric(ctx, md, specialMetricChangeEvents) + assert.NoError(t, err) + assert.Nil(t, env, "expected nil envelope when no changes detected") + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("cache hit serves data without querying DB", func(t *testing.T) { + r := newFetchMetricReaper() + r.Metrics.InstanceLevelCacheMaxSeconds = 30 + + metricDefs.MetricDefs["cached_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT 1"}, + IsInstanceLevel: true, + } + md, mock := createTestSourceConn(t) + defer mock.Close() + md.Metrics = map[string]float64{"cached_metric": 10} + + // Pre-populate the cache + cachedData := metrics.Measurements{ + metrics.Measurement{ + metrics.EpochColumnName: time.Now().UnixNano(), + "value": int64(99), + }, + } + cacheKey := md.GetClusterIdentifier() + ":cached_metric" + r.measurementCache.Put(cacheKey, cachedData) + + // No DB query expected + env, err := r.FetchMetric(ctx, md, "cached_metric") + require.NoError(t, err) + require.NotNil(t, env) + assert.Equal(t, "cached_metric", env.MetricName) + assert.Len(t, env.Data, 1) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("sysinfo fields added to measurements", func(t *testing.T) { + r := newFetchMetricReaper() + r.Sinks.RealDbnameField = "real_dbname" + r.Sinks.SystemIdentifierField = "sys_id" + metricDefs.MetricDefs["sysinfo_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT sysinfo"}, + } + md, mock := createTestSourceConn(t) + defer mock.Close() + md.RealDbname = "realdb" + md.SystemIdentifier = "42" + + rows := pgxmock.NewRows([]string{"epoch_ns", "v"}). + AddRow(time.Now().UnixNano(), int64(1)) + mock.ExpectQuery("SELECT sysinfo").WillReturnRows(rows) + + env, err := r.FetchMetric(ctx, md, "sysinfo_metric") + require.NoError(t, err) + require.NotNil(t, env) + assert.Equal(t, "realdb", env.Data[0]["real_dbname"]) + assert.Equal(t, "42", env.Data[0]["sys_id"]) + assert.NoError(t, mock.ExpectationsWereMet()) + }) +} + +type mockErr string + +func (m mockErr) SyncMetric(string, string, sinks.SyncOp) error { + return errors.New(string(m)) +} + +func (m mockErr) Write(metrics.MeasurementEnvelope) error { + return errors.New(string(m)) +} + +func TestWriteMeasurements(t *testing.T) { + ctx, cancel := context.WithCancel(log.WithLogger(t.Context(), log.NewNoopLogger())) + defer cancel() + var err mockErr = "write error" + r := NewReaper(ctx, &cmdopts.Options{ + SinksWriter: err, + }) + go r.WriteMeasurements(ctx) + r.WriteInstanceDown(&sources.SourceConn{}) +} diff --git a/internal/testutil/mocks.go b/internal/testutil/mocks.go index dfa5bea413..d5a5917ae6 100644 --- a/internal/testutil/mocks.go +++ b/internal/testutil/mocks.go @@ -10,6 +10,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" ) +// Receiver implements the ReceiverServer interface for testing purposes type Receiver struct { pb.UnimplementedReceiverServer } @@ -44,8 +45,7 @@ func (receiver *Receiver) DefineMetrics(_ context.Context, metricsStruct *struct return &pb.Reply{Logmsg: "metrics defined successfully"}, nil } -//---------------Sources-Metrics Mocks-------------- - +// MockMetricsReaderWriter implements MetricsReaderWriter interface type MockMetricsReaderWriter struct { GetMetricsFunc func() (*metrics.Metrics, error) UpdateMetricFunc func(name string, m metrics.Metric) error @@ -82,6 +82,7 @@ func (m *MockMetricsReaderWriter) WriteMetrics(metricDefs *metrics.Metrics) erro return m.WriteMetricsFunc(metricDefs) } +// MockSourcesReaderWriter implements SourcesReaderWriter interface type MockSourcesReaderWriter struct { GetSourcesFunc func() (sources.Sources, error) UpdateSourceFunc func(md sources.Source) error