Skip to content

Commit 5fb03bf

Browse files
committed
feat: wait for checks before shutdown
1 parent d85f05b commit 5fb03bf

File tree

4 files changed

+41
-0
lines changed

4 files changed

+41
-0
lines changed

canary-checker.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@
66

77
# topology.runNow=true
88
log.level.db=warn
9+
# check.concurrency=100
910

1011
# jobs.ComponentRelationshipSync.runNow=true

cmd/operator.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
apicontext "github.com/flanksource/canary-checker/api/context"
99
"github.com/flanksource/canary-checker/pkg/cache"
1010
"github.com/flanksource/canary-checker/pkg/jobs"
11+
"github.com/flanksource/canary-checker/pkg/jobs/canary"
1112
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
1213
"github.com/flanksource/canary-checker/pkg/runner"
1314
"github.com/flanksource/canary-checker/pkg/utils"
@@ -96,6 +97,12 @@ func run() error {
9697
// so we use a goroutine to unblock server start
9798
// to prevent health check from failing
9899
go jobs.Start()
100+
101+
// TODO: stop the cron scheduler so that no more checks are scheduled
102+
103+
shutdown.AddHookWithPriority("check jobs", shutdown.PriorityJobs, func() {
104+
canary.AcquireAllCheckLocks(ctx)
105+
})
99106
}
100107

101108
go serve()

cmd/serve.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/flanksource/canary-checker/pkg/db"
1717
"github.com/flanksource/canary-checker/pkg/echo"
1818
"github.com/flanksource/canary-checker/pkg/jobs"
19+
"github.com/flanksource/canary-checker/pkg/jobs/canary"
1920
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
2021
echov4 "github.com/labstack/echo/v4"
2122

@@ -49,6 +50,12 @@ var Serve = &cobra.Command{
4950
canaryJobs.StartScanCanaryConfigs(apicontext.DefaultContext, dataFile, configFiles)
5051
if executor {
5152
jobs.Start()
53+
54+
// TODO: stop the cron scheduler so that no more checks are scheduled
55+
56+
shutdown.AddHookWithPriority("check jobs", shutdown.PriorityJobs, func() {
57+
canary.AcquireAllCheckLocks(apicontext.DefaultContext)
58+
})
5259
}
5360

5461
serve()

pkg/jobs/canary/sync.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,29 @@ import (
1919
"github.com/flanksource/duty/job"
2020
"github.com/flanksource/duty/models"
2121
"github.com/robfig/cron/v3"
22+
"golang.org/x/sync/semaphore"
2223
)
2324

25+
const propertyCheckConcurrency = "check.concurrency"
26+
27+
var (
28+
// The maximum number of checks that can run concurrently
29+
defaultCheckConcurrency = 50
30+
31+
// Holds in the lock for every running check.
32+
// Can be overwritten by 'check.concurrency' property.
33+
globalCheckSemaphore *semaphore.Weighted
34+
)
35+
36+
// AcquireAllCheckLocks blocks until the global check sempahore is fully acquired.
37+
//
38+
// This helps to ensure that no checks are currently running.
39+
func AcquireAllCheckLocks(ctx context.Context) {
40+
ctx.Logger.V(6).Infof("acquiring all check locks")
41+
globalCheckSemaphore.Acquire(ctx, int64(ctx.Properties().Int(propertyCheckConcurrency, defaultCheckConcurrency)))
42+
ctx.Logger.V(6).Infof("acquired all check locks")
43+
}
44+
2445
var canaryJobs sync.Map
2546

2647
const DefaultCanarySchedule = "@every 5m"
@@ -140,6 +161,7 @@ func newCanaryJob(c CanaryJob) {
140161
IgnoreSuccessHistory: true,
141162
Retention: job.RetentionBalanced,
142163
ResourceID: c.DBCanary.ID.String(),
164+
Semaphores: []*semaphore.Weighted{globalCheckSemaphore},
143165
ResourceType: "canary",
144166
ID: fmt.Sprintf("%s/%s", c.Canary.Namespace, c.Canary.Name),
145167
Fn: c.Run,
@@ -159,6 +181,10 @@ var SyncCanaryJobs = &job.Job{
159181
Schedule: "@every 5m",
160182
Retention: job.RetentionFew,
161183
Fn: func(ctx job.JobRuntime) error {
184+
if globalCheckSemaphore == nil {
185+
globalCheckSemaphore = semaphore.NewWeighted(int64(ctx.Properties().Int(propertyCheckConcurrency, defaultCheckConcurrency)))
186+
}
187+
162188
canaries, err := db.GetAllCanariesForSync(ctx.Context, runner.WatchNamespace)
163189
if err != nil {
164190
return err

0 commit comments

Comments
 (0)