Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[![Documentation](https://img.shields.io/badge/Documentation-pgwat.ch-brightgreen)](https://pgwat.ch)
[![License: MIT](https://img.shields.io/badge/License-BSD_3-green.svg)](https://opensource.org/license/bsd-3-clause)
[![Go Build & Test](https://github.com/cybertec-postgresql/pgwatch/actions/workflows/build.yml/badge.svg)](https://github.com/cybertec-postgresql/pgwatch/actions/workflows/build.yml)
[![Coverage Status](https://img.shields.io/coverallsCoverage/github/cybertec-postgresql/pgwatch?branch=master&label=Coverage&color=red)](https://coveralls.io/github/cybertec-postgresql/pgwatch?branch=master)
[![Coverage Status](https://img.shields.io/coverallsCoverage/github/cybertec-postgresql/pgwatch?branch=master&label=Coverage&color=gold)](https://coveralls.io/github/cybertec-postgresql/pgwatch?branch=master)
[![Downloads](https://img.shields.io/github/downloads/cybertec-postgresql/pgwatch/total?label=Downloads)](https://github.com/cybertec-postgresql/pgwatch/releases)
[![Docker Pulls](https://img.shields.io/docker/pulls/cybertecpostgresql/pgwatch?label=Docker%20Pulls)](https://hub.docker.com/r/cybertecpostgresql/pgwatch)

Expand Down
282 changes: 273 additions & 9 deletions internal/reaper/reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package reaper

import (
"context"
"errors"
"os"
"path/filepath"
"testing"
"time"

"github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
"github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
"github.com/pashagolub/pgxmock/v4"
"github.com/pashagolub/pgxmock/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -100,14 +102,14 @@ func TestReaper_LoadSources(t *testing.T) {

t.Run("Test source config changes trigger restart", func(t *testing.T) {
baseSource := sources.Source{
Name: "TestSource",
IsEnabled: true,
Kind: sources.SourcePostgres,
ConnStr: "postgres://localhost:5432/testdb",
Metrics: map[string]float64{"cpu": 10, "memory": 20},
MetricsStandby: map[string]float64{"cpu": 30},
CustomTags: map[string]string{"env": "test"},
Group: "default",
Name: "TestSource",
IsEnabled: true,
Kind: sources.SourcePostgres,
ConnStr: "postgres://localhost:5432/testdb",
Metrics: map[string]float64{"cpu": 10, "memory": 20},
MetricsStandby: map[string]float64{"cpu": 30},
CustomTags: map[string]string{"env": "test"},
Group: "default",
}

testCases := []struct {
Expand Down Expand Up @@ -331,3 +333,265 @@ func TestReaper_LoadSources(t *testing.T) {
assert.Nil(t, mockConn1.ExpectationsWereMet(), "Expected all mock expectations to be met")
})
}

func newFetchMetricReaper() *Reaper {
return &Reaper{
Options: &cmdopts.Options{
Metrics: metrics.CmdOpts{},
Sinks: sinks.CmdOpts{},
},
measurementCache: NewInstanceMetricCache(),
}
}

func TestReaper_FetchMetric(t *testing.T) {
ctx := log.WithLogger(t.Context(), log.NewNoopLogger())

t.Run("metric not found in definitions", func(t *testing.T) {
r := newFetchMetricReaper()
md, mock := createTestSourceConn(t)
defer mock.Close()

env, err := r.FetchMetric(ctx, md, "nonexistent_metric_xyz")
assert.ErrorIs(t, err, metrics.ErrMetricNotFound)
assert.Nil(t, env)
assert.NoError(t, mock.ExpectationsWereMet())
})

t.Run("primary-only metric skipped on standby", func(t *testing.T) {
r := newFetchMetricReaper()
metricDefs.MetricDefs["primary_only_metric"] = metrics.Metric{
SQLs: metrics.SQLs{0: "SELECT 1"},
NodeStatus: "primary",
}
md, mock := createTestSourceConn(t)
defer mock.Close()
md.IsInRecovery = true

env, err := r.FetchMetric(ctx, md, "primary_only_metric")
assert.NoError(t, err)
assert.Nil(t, env)
assert.NoError(t, mock.ExpectationsWereMet())
})

t.Run("standby-only metric skipped on primary", func(t *testing.T) {
r := newFetchMetricReaper()
metricDefs.MetricDefs["standby_only_metric"] = metrics.Metric{
SQLs: metrics.SQLs{0: "SELECT 1"},
NodeStatus: "standby",
}
md, mock := createTestSourceConn(t)
defer mock.Close()
md.IsInRecovery = false

env, err := r.FetchMetric(ctx, md, "standby_only_metric")
assert.NoError(t, err)
assert.Nil(t, env)
assert.NoError(t, mock.ExpectationsWereMet())
})

t.Run("default metric with no SQL for version returns nil", func(t *testing.T) {
r := newFetchMetricReaper()
metricDefs.MetricDefs["no_sql_metric"] = metrics.Metric{
SQLs: metrics.SQLs{}, // no SQL defined
}
md, mock := createTestSourceConn(t)
defer mock.Close()

env, err := r.FetchMetric(ctx, md, "no_sql_metric")
assert.NoError(t, err)
assert.Nil(t, env)
assert.NoError(t, mock.ExpectationsWereMet())
})

t.Run("default metric query success returns envelope", func(t *testing.T) {
r := newFetchMetricReaper()
metricDefs.MetricDefs["test_metric"] = metrics.Metric{
SQLs: metrics.SQLs{0: "SELECT 1"},
}
md, mock := createTestSourceConn(t)
defer mock.Close()
md.Name = "mydb"
md.CustomTags = map[string]string{"env": "prod"}

rows := pgxmock.NewRows([]string{"epoch_ns", "value"}).
AddRow(time.Now().UnixNano(), int64(42))
mock.ExpectQuery("SELECT 1").WillReturnRows(rows)

env, err := r.FetchMetric(ctx, md, "test_metric")
require.NoError(t, err)
require.NotNil(t, env)
assert.Equal(t, "mydb", env.DBName)
assert.Equal(t, "test_metric", env.MetricName)
assert.Len(t, env.Data, 1)
assert.Equal(t, map[string]string{"env": "prod"}, env.CustomTags)
assert.NoError(t, mock.ExpectationsWereMet())
})

t.Run("default metric query error returns error", func(t *testing.T) {
r := newFetchMetricReaper()
metricDefs.MetricDefs["error_metric"] = metrics.Metric{
SQLs: metrics.SQLs{0: "SELECT fail"},
}
md, mock := createTestSourceConn(t)
defer mock.Close()

mock.ExpectQuery("SELECT fail").WillReturnError(assert.AnError)

env, err := r.FetchMetric(ctx, md, "error_metric")
assert.Error(t, err)
assert.Nil(t, env)
assert.NoError(t, mock.ExpectationsWereMet())
})

t.Run("default metric query returns empty rows", func(t *testing.T) {
r := newFetchMetricReaper()
metricDefs.MetricDefs["empty_metric"] = metrics.Metric{
SQLs: metrics.SQLs{0: "SELECT empty"},
}
md, mock := createTestSourceConn(t)
defer mock.Close()

mock.ExpectQuery("SELECT empty").WillReturnRows(pgxmock.NewRows([]string{"epoch_ns"}))

env, err := r.FetchMetric(ctx, md, "empty_metric")
assert.NoError(t, err)
assert.Nil(t, env)
assert.NoError(t, mock.ExpectationsWereMet())
})

t.Run("storage name used as metric name in envelope", func(t *testing.T) {
r := newFetchMetricReaper()
metricDefs.MetricDefs["logical_metric"] = metrics.Metric{
SQLs: metrics.SQLs{0: "SELECT 1"},
StorageName: "physical_metric",
}
md, mock := createTestSourceConn(t)
defer mock.Close()

rows := pgxmock.NewRows([]string{"epoch_ns", "v"}).
AddRow(time.Now().UnixNano(), int64(1))
mock.ExpectQuery("SELECT 1").WillReturnRows(rows)

env, err := r.FetchMetric(ctx, md, "logical_metric")
require.NoError(t, err)
require.NotNil(t, env)
assert.Equal(t, "physical_metric", env.MetricName)
assert.NoError(t, mock.ExpectationsWereMet())
})

t.Run("instance_up special metric returns envelope via GetInstanceUpMeasurement", func(t *testing.T) {
r := newFetchMetricReaper()
metricDefs.MetricDefs[specialMetricInstanceUp] = metrics.Metric{
SQLs: metrics.SQLs{0: "SELECT 1"},
}
md, mock := createTestSourceConn(t)
defer mock.Close()
mock.ExpectPing()

env, err := r.FetchMetric(ctx, md, specialMetricInstanceUp)
require.NoError(t, err)
require.NotNil(t, env)
assert.Equal(t, specialMetricInstanceUp, env.MetricName)
assert.Len(t, env.Data, 1)
assert.Equal(t, 1, env.Data[0][specialMetricInstanceUp])
assert.NoError(t, mock.ExpectationsWereMet())
})

t.Run("change_events special metric returns nil when no changes detected", func(t *testing.T) {
r := newFetchMetricReaper()
metricDefs.MetricDefs[specialMetricChangeEvents] = metrics.Metric{
SQLs: metrics.SQLs{0: "SELECT 1"},
}
// Remove all hash metric definitions so detection functions return early
delete(metricDefs.MetricDefs, "sproc_hashes")
delete(metricDefs.MetricDefs, "table_hashes")
delete(metricDefs.MetricDefs, "index_hashes")
delete(metricDefs.MetricDefs, "configuration_hashes")
delete(metricDefs.MetricDefs, "privilege_hashes")

md, mock := createTestSourceConn(t)
defer mock.Close()

env, err := r.FetchMetric(ctx, md, specialMetricChangeEvents)
assert.NoError(t, err)
assert.Nil(t, env, "expected nil envelope when no changes detected")
assert.NoError(t, mock.ExpectationsWereMet())
})

t.Run("cache hit serves data without querying DB", func(t *testing.T) {
r := newFetchMetricReaper()
r.Metrics.InstanceLevelCacheMaxSeconds = 30

metricDefs.MetricDefs["cached_metric"] = metrics.Metric{
SQLs: metrics.SQLs{0: "SELECT 1"},
IsInstanceLevel: true,
}
md, mock := createTestSourceConn(t)
defer mock.Close()
md.Metrics = map[string]float64{"cached_metric": 10}

// Pre-populate the cache
cachedData := metrics.Measurements{
metrics.Measurement{
metrics.EpochColumnName: time.Now().UnixNano(),
"value": int64(99),
},
}
cacheKey := md.GetClusterIdentifier() + ":cached_metric"
r.measurementCache.Put(cacheKey, cachedData)

// No DB query expected
env, err := r.FetchMetric(ctx, md, "cached_metric")
require.NoError(t, err)
require.NotNil(t, env)
assert.Equal(t, "cached_metric", env.MetricName)
assert.Len(t, env.Data, 1)
assert.NoError(t, mock.ExpectationsWereMet())
})

t.Run("sysinfo fields added to measurements", func(t *testing.T) {
r := newFetchMetricReaper()
r.Sinks.RealDbnameField = "real_dbname"
r.Sinks.SystemIdentifierField = "sys_id"
metricDefs.MetricDefs["sysinfo_metric"] = metrics.Metric{
SQLs: metrics.SQLs{0: "SELECT sysinfo"},
}
md, mock := createTestSourceConn(t)
defer mock.Close()
md.RealDbname = "realdb"
md.SystemIdentifier = "42"

rows := pgxmock.NewRows([]string{"epoch_ns", "v"}).
AddRow(time.Now().UnixNano(), int64(1))
mock.ExpectQuery("SELECT sysinfo").WillReturnRows(rows)

env, err := r.FetchMetric(ctx, md, "sysinfo_metric")
require.NoError(t, err)
require.NotNil(t, env)
assert.Equal(t, "realdb", env.Data[0]["real_dbname"])
assert.Equal(t, "42", env.Data[0]["sys_id"])
assert.NoError(t, mock.ExpectationsWereMet())
})
}

type mockErr string

func (m mockErr) SyncMetric(string, string, sinks.SyncOp) error {
return errors.New(string(m))
}

func (m mockErr) Write(metrics.MeasurementEnvelope) error {
return errors.New(string(m))
}

func TestWriteMeasurements(t *testing.T) {
ctx, cancel := context.WithCancel(log.WithLogger(t.Context(), log.NewNoopLogger()))
defer cancel()
var err mockErr = "write error"
r := NewReaper(ctx, &cmdopts.Options{
SinksWriter: err,
})
go r.WriteMeasurements(ctx)
r.WriteInstanceDown(&sources.SourceConn{})
}
5 changes: 3 additions & 2 deletions internal/testutil/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"
)

// Receiver implements the ReceiverServer interface for testing purposes
type Receiver struct {
pb.UnimplementedReceiverServer
}
Expand Down Expand Up @@ -44,8 +45,7 @@ func (receiver *Receiver) DefineMetrics(_ context.Context, metricsStruct *struct
return &pb.Reply{Logmsg: "metrics defined successfully"}, nil
}

//---------------Sources-Metrics Mocks--------------

// MockMetricsReaderWriter implements MetricsReaderWriter interface
type MockMetricsReaderWriter struct {
GetMetricsFunc func() (*metrics.Metrics, error)
UpdateMetricFunc func(name string, m metrics.Metric) error
Expand Down Expand Up @@ -82,6 +82,7 @@ func (m *MockMetricsReaderWriter) WriteMetrics(metricDefs *metrics.Metrics) erro
return m.WriteMetricsFunc(metricDefs)
}

// MockSourcesReaderWriter implements SourcesReaderWriter interface
type MockSourcesReaderWriter struct {
GetSourcesFunc func() (sources.Sources, error)
UpdateSourceFunc func(md sources.Source) error
Expand Down
Loading