Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions pkg/icingadb/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package icingadb
import (
"context"
"fmt"
"runtime"
"time"

"github.com/icinga/icinga-go-library/com"
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/logging"
Expand All @@ -16,8 +19,6 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"runtime"
"time"
)

// Sync implements a rendezvous point for Icinga DB and Redis to synchronize their entities.
Expand Down Expand Up @@ -171,9 +172,35 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
// Delete
if len(delta.Delete) > 0 {
s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), strcase.Delimited(types.Name(delta.Subject.Entity()), ' '))
g.Go(func() error {
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), database.OnSuccessIncrement[any](stat))
})

ids := delta.Delete.IDs()

// Those objects are probably removed from the configuration files, i.e., Icinga 2 won't send the
// corresponding end/removed events for them. So try to mark them as cancelled manually, so that
// they don't show up as if they were still active in the UI.
//
// This is not performed in a separate goroutine to ensure that the relevant Comment/Downtime
// object is not deleted before closing the history.

var err error
switch delta.Subject.Entity().(type) {
case *v1.Comment:
err = s.mockCommentEnd(ctx, ids)
case *v1.Downtime:
err = s.mockDowntimeEnd(ctx, ids)
}

if ctx.Err() == nil {
g.Go(func() error { return err })
}

// Start the deletion process only if we haven't aborted the config sync due to an error in the
// downtime history update above.
if err == nil && ctx.Err() == nil {
g.Go(func() error {
return s.db.Delete(ctx, delta.Subject.Entity(), ids, database.OnSuccessIncrement[any](stat))
})
}
}

return g.Wait()
Expand Down
294 changes: 294 additions & 0 deletions pkg/icingadb/sync_history_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
package icingadb

import (
"context"
"crypto/sha1" // #nosec G505 -- required by Icinga 2's HashValue function
"database/sql"
"fmt"
"time"

"github.com/icinga/icinga-go-library/backoff"
"github.com/icinga/icinga-go-library/objectpacker"
"github.com/icinga/icinga-go-library/retry"
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icingadb/pkg/icingadb/v1/history"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.uber.org/zap"
)

// mkGenericHistoryMockStmts creates SQL queries to acquire data and to create a new history entry.
//
// This helper function is used in the methods implementing closeObjectAndMockHistory below.
func mkGenericHistoryMockStmts(tableName string) (stmtPopulate, stmtHistoryInsert string) {
stmtPopulate = fmt.Sprintf(`
SELECT
history.environment_id AS environment_id,
history.endpoint_id AS endpoint_id,
history.object_type AS object_type,
history.host_id AS host_id,
history.service_id AS service_id,
%[1]s.name AS name
FROM %[1]s
JOIN history ON
%[1]s.id = history.%[1]s_history_id
WHERE %[1]s.id = :%[1]s_id`, tableName)

stmtHistoryInsert = fmt.Sprintf(`
INSERT INTO history (
id, environment_id, endpoint_id, object_type, host_id, service_id, %[1]s_history_id, event_type, event_time
) VALUES (
:id, :environment_id, :endpoint_id, :object_type, :host_id, :service_id, :%[1]s_id, :event_type, :event_time
)`, tableName)

return
}

// calcEventId mocks Icinga 2's IcingaDB::CalcEventID method for missing history entries.
//
// This is a helper function used below to simulate a new history.id primary key.
func calcEventId(environmentId, eventType, objectName string) (types.Binary, error) {
h := sha1.New() // #nosec G401 -- required by Icinga 2's HashValue function

err := objectpacker.PackAny([]string{environmentId, eventType, objectName}, h)
if err != nil {
return nil, err
}

return types.Binary(h.Sum(nil)), nil
}

// closeObjectAndMockHistory manually closes vanished Icinga 2 objects, e.g., Downtimes of vanished Hosts.
//
// This method provides a common stub for each type-specific implementation below.
func (s Sync) closeObjectAndMockHistory(
ctx context.Context,
ids []any,
objectType string,
action func(ctx context.Context, tx *sqlx.Tx, id types.Binary) error,
) error {
s.logger.Debugw("Start manually closing vanished objects",
zap.String("type", objectType),
zap.Int("amount", len(ids)))

binaryIds := make([]types.Binary, 0, len(ids))
for _, id := range ids {
binaryId, ok := id.(types.Binary)
if !ok {
return errors.Errorf("ID is not types.Binary, but %T", id)
}
binaryIds = append(binaryIds, binaryId)
}

// Process action callback in bulks of the given step size instead of creating a transaction for
// each action anew. The current step size is quite arbitrary, tweaked during testing.
const steps = 256
for infimum := 0; infimum < len(binaryIds); infimum += steps {
startTime := time.Now()

err := retry.WithBackoff(
ctx,
func(ctx context.Context) error {
return s.db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error {
for i := infimum; i < min(len(binaryIds), infimum+steps); i++ {
err := action(ctx, tx, binaryIds[i])
if err != nil {
return err
}
}

return nil
})
},
retry.Retryable,
backoff.DefaultBackoff,
s.db.GetDefaultRetrySettings())
if err != nil {
return errors.Wrap(err, "can't mock object end")
}

s.logger.Debugw("Manually closed vanished objects",
zap.String("type", objectType),
zap.Int("amount", min(len(binaryIds), infimum+steps)-infimum),
zap.Duration("duration", time.Since(startTime)))
}

return nil
}

// mockCommentEnd manually closes comments by altering the related history tables.
//
// This affects history retention by setting comment_history.remove_time.
func (s Sync) mockCommentEnd(ctx context.Context, ids []any) error {
type Comment struct {
history.CommentHistory
history.HistoryMeta

Name types.String
EventTime types.UnixMilli
}

stmtPopulate, stmtHistoryInsert := mkGenericHistoryMockStmts("comment")

// Marks the existing comment_history as removed.
stmtCommentHistoryUpdate := `
UPDATE comment_history
SET
removed_by = :removed_by,
remove_time = :remove_time,
has_been_removed = :has_been_removed
WHERE comment_id = :comment_id`

return s.closeObjectAndMockHistory(
ctx,
ids,
"comment",
func(ctx context.Context, tx *sqlx.Tx, id types.Binary) error {
eventTime := time.Now()

comment := &Comment{
CommentHistory: history.CommentHistory{
CommentHistoryEntity: history.CommentHistoryEntity{
CommentId: id,
},
},
}

// Start by populating information based on the previous data.
prepStmtCommentPopulate, err := tx.PrepareNamedContext(ctx, stmtPopulate)
if err != nil {
return errors.Wrap(err, "can't prepare statement to populate entry")
}
defer func() { _ = prepStmtCommentPopulate.Close() }()

err = prepStmtCommentPopulate.GetContext(ctx, comment, comment)
if errors.Is(err, sql.ErrNoRows) {
s.logger.Infow("Can't fetch vanished Comment from database for cleanup",
zap.String("comment_id", id.String()),
zap.Error(err))
return nil
} else if err != nil {
return errors.Wrap(err, "can't fetch information to populate entry")
}

// Fill fields to update comment_history
comment.RemovedBy = types.MakeString("Comment Config Removed")
comment.RemoveTime = types.UnixMilli(eventTime)
comment.HasBeenRemoved = types.MakeBool(true)

_, err = tx.NamedExecContext(ctx, stmtCommentHistoryUpdate, comment)
if err != nil {
return errors.Wrap(err, "can't update comment_history")
}

// Update fields for a new history entry.
comment.Id, err = calcEventId(comment.EnvironmentId.String(), "comment_remove", comment.Name.String)
if err != nil {
return errors.Wrap(err, "can't calculate event_id")
}

comment.EventType = "comment_remove"
comment.EventTime = types.UnixMilli(eventTime)

_, err = tx.NamedExecContext(ctx, stmtHistoryInsert, comment)
if err != nil {
return errors.Wrap(err, "can't insert new history entry")
}

return nil
})
}

// mockDowntimeEnd manually closes downtimes by altering the related history tables.
//
// This affects SLA calculations (sla_history_downtime table).
func (s Sync) mockDowntimeEnd(ctx context.Context, ids []any) error {
type Downtime struct {
history.DowntimeHistory
history.HistoryMeta

Name types.String
EventTime types.UnixMilli
}

stmtPopulate, stmtHistoryInsert := mkGenericHistoryMockStmts("downtime")

// Marks the existing downtime_history as cancelled.
stmtDowntimeHistoryUpdate := `
UPDATE downtime_history
SET
cancelled_by = :cancelled_by,
has_been_cancelled = :has_been_cancelled,
cancel_time = :cancel_time
WHERE downtime_id = :downtime_id`

// Mark the existing sla_history_downtime as ended.
stmtSlaHistoryDowntimeUpdate := `
UPDATE sla_history_downtime
SET downtime_end = :cancel_time
WHERE downtime_id = :downtime_id`

return s.closeObjectAndMockHistory(
ctx,
ids,
"downtime",
func(ctx context.Context, tx *sqlx.Tx, id types.Binary) error {
eventTime := time.Now()

downtime := &Downtime{
DowntimeHistory: history.DowntimeHistory{
DowntimeHistoryEntity: history.DowntimeHistoryEntity{
DowntimeId: id,
},
},
}

// Start by populating information based on the previous data.
prepStmtHistorySelect, err := tx.PrepareNamedContext(ctx, stmtPopulate)
if err != nil {
return errors.Wrap(err, "can't prepare statement to populate entry")
}
defer func() { _ = prepStmtHistorySelect.Close() }()

err = prepStmtHistorySelect.GetContext(ctx, downtime, downtime)
if errors.Is(err, sql.ErrNoRows) {
s.logger.Infow("Can't fetch vanished Downtime from database for cleanup",
zap.String("downtime_id", id.String()),
zap.Error(err))
return nil
} else if err != nil {
return errors.Wrap(err, "can't fetch information to populate entry")
}

// Fill fields to update downtime_history and sla_history_downtime.
downtime.CancelledBy = types.MakeString("Downtime Config Removed")
downtime.HasBeenCancelled = types.MakeBool(true)
downtime.CancelTime = types.UnixMilli(eventTime)

_, err = tx.NamedExecContext(ctx, stmtDowntimeHistoryUpdate, downtime)
if err != nil {
return errors.Wrap(err, "can't update downtime_history")
}

_, err = tx.NamedExecContext(ctx, stmtSlaHistoryDowntimeUpdate, downtime)
if err != nil {
return errors.Wrap(err, "can't update sla_history_downtime")
}

// Update fields for a new history entry.
downtime.Id, err = calcEventId(downtime.EnvironmentId.String(), "downtime_end", downtime.Name.String)
if err != nil {
return errors.Wrap(err, "can't calculate event_id")
}

downtime.EventType = "downtime_end"
downtime.EventTime = types.UnixMilli(eventTime)

_, err = tx.NamedExecContext(ctx, stmtHistoryInsert, downtime)
if err != nil {
return errors.Wrap(err, "can't insert new history entry")
}

return nil
})
}
10 changes: 7 additions & 3 deletions tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/goccy/go-yaml v1.13.0
github.com/google/uuid v1.6.0
github.com/icinga/icinga-go-library v0.9.0
github.com/icinga/icinga-testing v0.0.0-20251203103317-c39907077768
github.com/icinga/icinga-testing v0.0.0-20260120124143-63d72a55737e
github.com/jmoiron/sqlx v1.4.0
github.com/lib/pq v1.12.0
github.com/redis/go-redis/v9 v9.18.0
Expand All @@ -19,10 +19,12 @@ require (

require (
filippo.io/edwards25519 v1.1.1 // indirect
github.com/Icinga/go-libs v0.0.0-20220420130327-ef58ad52edd8 // indirect
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/caarlos0/env/v11 v11.4.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/creasty/defaults v1.8.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/distribution/reference v0.5.0 // indirect
Expand All @@ -34,12 +36,14 @@ require (
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/jessevdk/go-flags v1.6.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/ssgreg/journald v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/otel v1.40.0 // indirect
Expand All @@ -52,6 +56,6 @@ require (
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/tools v0.21.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading