@@ -19,8 +19,31 @@ import (
1919 "github.com/flanksource/duty/job"
2020 "github.com/flanksource/duty/models"
2121 "github.com/robfig/cron/v3"
22+ "golang.org/x/sync/semaphore"
2223)
2324
25+ const propertyCheckConcurrency = "check.concurrency"
26+
27+ var (
28+ // The maximum number of checks that can run concurrently
29+ defaultCheckConcurrency = 50
30+
31+ // Holds in the lock for every running check.
32+ // Can be overwritten by 'check.concurrency' property.
33+ globalCheckSemaphore * semaphore.Weighted
34+ )
35+
36+ // AcquireAllCheckLocks blocks until the global check sempahore is fully acquired.
37+ //
38+ // This helps to ensure that no checks are currently running.
39+ func AcquireAllCheckLocks (ctx context.Context ) {
40+ ctx .Logger .V (6 ).Infof ("acquiring all check locks" )
41+ if err := globalCheckSemaphore .Acquire (ctx , int64 (ctx .Properties ().Int (propertyCheckConcurrency , defaultCheckConcurrency ))); err != nil {
42+ ctx .Logger .Errorf ("failed to acquire check semaphores: %v" , err )
43+ }
44+ ctx .Logger .V (6 ).Infof ("acquired all check locks" )
45+ }
46+
2447var canaryJobs sync.Map
2548
2649const DefaultCanarySchedule = "@every 5m"
@@ -140,6 +163,7 @@ func newCanaryJob(c CanaryJob) {
140163 IgnoreSuccessHistory : true ,
141164 Retention : job .RetentionBalanced ,
142165 ResourceID : c .DBCanary .ID .String (),
166+ Semaphores : []* semaphore.Weighted {globalCheckSemaphore },
143167 ResourceType : "canary" ,
144168 ID : fmt .Sprintf ("%s/%s" , c .Canary .Namespace , c .Canary .Name ),
145169 Fn : c .Run ,
@@ -159,6 +183,10 @@ var SyncCanaryJobs = &job.Job{
159183 Schedule : "@every 5m" ,
160184 Retention : job .RetentionFew ,
161185 Fn : func (ctx job.JobRuntime ) error {
186+ if globalCheckSemaphore == nil {
187+ globalCheckSemaphore = semaphore .NewWeighted (int64 (ctx .Properties ().Int (propertyCheckConcurrency , defaultCheckConcurrency )))
188+ }
189+
162190 canaries , err := db .GetAllCanariesForSync (ctx .Context , runner .WatchNamespace )
163191 if err != nil {
164192 return err
0 commit comments