diff --git a/Makefile b/Makefile index 794b585..d8c194c 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,7 @@ test: int-setup .PHONY: int-setup int-setup: int-teardown - docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.16 \ + docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.21 \ /usr/local/bin/etcd --listen-client-urls http://0.0.0.0:2379 \ --advertise-client-urls http://0.0.0.0:2379 diff --git a/concurrency/REAME.md b/concurrency/REAME.md deleted file mode 100644 index 8f9f692..0000000 --- a/concurrency/REAME.md +++ /dev/null @@ -1,10 +0,0 @@ -# Concurrency - -The following files in this package are adapted from etcd 3.5: - -* doc.go -* key.go -* mutex.go -* session.go - -These should be removed in favor of the native implementation once the rules engine switches to using etcd 3.5. diff --git a/metrics/prometheus.go b/metrics/prometheus.go index 61a02d8..8925dea 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -13,7 +13,7 @@ var ( Subsystem: "etcd", Namespace: "rules", Help: "etcd rules engine lock count", - }, []string{"locker", "method", "pattern", "success"}) + }, []string{"locker", "method", "pattern", "attempt", "success"}) rulesEngineUnlockErrorCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "unlock_error_count", Subsystem: "etcd", @@ -101,8 +101,8 @@ func init() { } // IncLockMetric increments the lock count. -func IncLockMetric(locker string, methodName string, pattern string, lockSucceeded bool) { - rulesEngineLockCount.WithLabelValues(locker, methodName, pattern, strconv.FormatBool(lockSucceeded)).Inc() +func IncLockMetric(locker string, methodName string, pattern string, attempt uint, lockSucceeded bool) { + rulesEngineLockCount.WithLabelValues(locker, methodName, pattern, strconv.FormatUint(uint64(attempt), 10), strconv.FormatBool(lockSucceeded)).Inc() } // IncUnlockMetric increments the unlock error count. diff --git a/metrics/prometheus_test.go b/metrics/prometheus_test.go index e274e7e..c40d723 100644 --- a/metrics/prometheus_test.go +++ b/metrics/prometheus_test.go @@ -41,11 +41,11 @@ func checkMetrics(t *testing.T, expectedOutput string) { } func TestIncLockMetric(t *testing.T) { - IncLockMetric("etcd", "getKey", "/key/pattern", true) - IncLockMetric("etcd", "getKey", "/second/pattern", false) + IncLockMetric("etcd", "getKey", "/key/pattern", 2, true) + IncLockMetric("etcd", "getKey", "/second/pattern", 1, false) - checkMetrics(t, `rules_etcd_lock_count{locker="etcd",method="getKey",pattern="/key/pattern",success="true"} 1`) - checkMetrics(t, `rules_etcd_lock_count{locker="etcd",method="getKey",pattern="/second/pattern",success="false"} 1`) + checkMetrics(t, `rules_etcd_lock_count{attempt="2",locker="etcd",method="getKey",pattern="/key/pattern",success="true"} 1`) + checkMetrics(t, `rules_etcd_lock_count{attempt="1",locker="etcd",method="getKey",pattern="/second/pattern",success="false"} 1`) } func TestIncSatisfiedThenNot(t *testing.T) { diff --git a/rules/doc.go b/rules/doc.go deleted file mode 100644 index 561d3a0..0000000 --- a/rules/doc.go +++ /dev/null @@ -1,5 +0,0 @@ -// Package rules is a rules engine for etcd. Specified functions are triggered -// whenever a specified rule is satisfied. Rules can contain gin-style attribute -// specifiers so that classes of keys are matched as opposed to having to specify -// each node key separately. -package rules diff --git a/rules/engine.go b/rules/engine.go index 04e05ac..5bc4223 100644 --- a/rules/engine.go +++ b/rules/engine.go @@ -19,6 +19,9 @@ const ( // WebhookURLEnv is the environment variable used to specify a callback // webhook that will get called every time a callback has finished executing. WebhookURLEnv = "RULES_ENGINE_CALLBACK_WEBHOOK" + sourceSource = "source" + sourceWatcher = "watcher" + sourceCrawler = "crawler" ) type stoppable interface { @@ -43,7 +46,6 @@ type baseEngine struct { metrics AdvancedMetricsCollector logger *zap.Logger options engineOptions - ruleLockTTLs map[int]int ruleMgr ruleManager stopped uint32 crawlers []stoppable @@ -144,7 +146,6 @@ func newV3Engine(logger *zap.Logger, cl *v3.Client, options ...EngineOption) v3E metrics: metrics, logger: logger, options: opts, - ruleLockTTLs: map[int]int{}, ruleMgr: ruleMgr, locker: finalLocker, callbackListener: cbListener, @@ -270,12 +271,8 @@ func (e *baseEngine) addRule(rule DynamicRule, lockPattern string, callback interface{}, options ...RuleOption) { - ruleIndex := e.ruleMgr.addRule(rule) opts := makeRuleOptions(options...) - ttl := e.options.lockTimeout - if opts.lockTimeout > 0 { - ttl = opts.lockTimeout - } + ruleIndex := e.ruleMgr.addRule(rule, opts) contextProvider := opts.contextProvider if contextProvider == nil { contextProvider = e.options.contextProvider @@ -284,7 +281,6 @@ func (e *baseEngine) addRule(rule DynamicRule, panic("Rule ID option missing") } ruleID := opts.ruleID - e.ruleLockTTLs[ruleIndex] = ttl e.keyProc.setCallback(ruleIndex, callback) e.keyProc.setLockKeyPattern(ruleIndex, lockPattern) e.keyProc.setContextProvider(ruleIndex, contextProvider) @@ -292,7 +288,7 @@ func (e *baseEngine) addRule(rule DynamicRule, } func (e *v3Engine) Run() { - e.logger.Info("Rules engine options", zap.Object("options", &e.options), zap.Int("rules", len(e.ruleMgr.rules))) + e.logger.Info("Rules engine options", zap.Object("options", &e.options), zap.Int("rules", len(e.ruleMgr.rulesLockOptions))) prefixSlice := []string{} prefixes := e.ruleMgr.prefixes // This is a map; used to ensure there are no duplicates @@ -340,10 +336,6 @@ func (e *v3Engine) Run() { } -func (e *baseEngine) getLockTTLForRule(index int) int { - return e.ruleLockTTLs[index] -} - type v3CallbackWrapper struct { ttlPathPattern string callback V3RuleTaskCallback diff --git a/rules/int_crawler.go b/rules/int_crawler.go index 08cc082..5214c93 100644 --- a/rules/int_crawler.go +++ b/rules/int_crawler.go @@ -114,14 +114,14 @@ func (ic *intCrawler) isStopped() bool { func (ic *intCrawler) run() { atomicSet(&ic.stopped, false) for !ic.isStopping() { - logger := ic.logger.With(zap.String("source", "crawler")) + logger := ic.logger.With(zap.String("source", sourceCrawler)) if ic.mutex == nil { ic.singleRun(logger) } else { mutex := "/crawler/" + *ic.mutex logger.Info("Attempting to obtain mutex", zap.String("mutex", mutex), zap.Int("Timeout", ic.mutexTimeout)) - lock, err := ic.locker.Lock(mutex, lock.MethodForLock("crawler"), lock.PatternForLock(mutex)) + lock, err := ic.locker.Lock(mutex, lock.MethodForLock(sourceCrawler), lock.PatternForLock(mutex)) if err != nil { logger.Error("Could not obtain mutex; skipping crawler run", zap.Error(err), zap.String("mutex", mutex)) } else { @@ -148,7 +148,7 @@ func (ic *intCrawler) run() { func (ic *intCrawler) singleRun(logger *zap.Logger) { crawlerStart := time.Now() logger.Info("Starting crawler run", zap.Int("prefixes", len(ic.prefixes))) - crawlerMethodName := "crawler" + crawlerMethodName := sourceCrawler if ic.isStopping() { return } @@ -187,6 +187,7 @@ func (ic *intCrawler) singleRun(logger *zap.Logger) { } logger.Info("Crawler run complete", zap.Duration("time", time.Since(crawlerStart)), zap.Int("values", len(values))) } + func (ic *intCrawler) processData(values map[string]string, logger *zap.Logger) { api := &cacheReadAPI{values: values} for k := range values { @@ -197,7 +198,7 @@ func (ic *intCrawler) processData(values map[string]string, logger *zap.Logger) // Check to see if any rule is satisfied from cache if ic.kp.isWork(k, &v, api) { // Process key if it is - ic.kp.processKey(k, &v, ic.api, logger, map[string]string{"source": "crawler"}, ic.incRuleProcessedCount) + ic.kp.processKey(k, &v, ic.api, logger, map[string]string{sourceSource: sourceCrawler}, ic.incRuleProcessedCount) } time.Sleep(ic.delay.Generate()) } diff --git a/rules/key_processor_test.go b/rules/key_processor_test.go index 974a5fb..76a1324 100644 --- a/rules/key_processor_test.go +++ b/rules/key_processor_test.go @@ -52,7 +52,8 @@ func TestV3KeyProcessor(t *testing.T) { rule, err := NewEqualsLiteralRule("/test/:key", &value) assert.NoError(t, err) rm := newRuleManager(map[string]constraint{}, false) - rm.addRule(rule) + opts := makeRuleOptions() + rm.addRule(rule, opts) api := newMapReadAPI() api.put("/test/key", value) callbacks := map[int]V3RuleTaskCallback{0: v3DummyCallback} @@ -84,7 +85,8 @@ func TestNewV3KeyProcessor(t *testing.T) { rule, err := NewEqualsLiteralRule("/test/:key", &value) assert.NoError(t, err) rm := newRuleManager(map[string]constraint{}, false) - rm.addRule(rule) + opts := makeRuleOptions() + rm.addRule(rule, opts) api := newMapReadAPI() api.put("/test/key", value) diff --git a/rules/lock/metrics.go b/rules/lock/metrics.go index 75a4114..e4cbca1 100644 --- a/rules/lock/metrics.go +++ b/rules/lock/metrics.go @@ -7,7 +7,7 @@ func WithMetrics(ruleLocker RuleLocker, name string) RuleLocker { return withMetrics(ruleLocker, name, metrics.IncLockMetric, metrics.IncUnlockErrorMetric) } func withMetrics(ruleLocker RuleLocker, name string, - observeLock func(locker string, methodName string, pattern string, lockSucceeded bool), + observeLock func(locker string, methodName string, pattern string, attempt uint, lockSucceeded bool), observeUnlockError func(locker string, methodName string, pattern string)) RuleLocker { return metricLocker{ RuleLocker: ruleLocker, @@ -21,7 +21,7 @@ type metricLocker struct { RuleLocker RuleLock lockerName string - observeLock func(locker string, methodName string, pattern string, lockSucceeded bool) + observeLock func(locker string, methodName string, pattern string, attempt uint, lockSucceeded bool) observeUnlockError func(locker string, methodName string, pattern string) } @@ -29,7 +29,7 @@ func (ml metricLocker) Lock(key string, options ...Option) (RuleLock, error) { opts := buildOptions(options...) var err error ml.RuleLock, err = ml.RuleLocker.Lock(key, options...) - ml.observeLock(ml.lockerName, opts.method, opts.pattern, err == nil) + ml.observeLock(ml.lockerName, opts.method, opts.pattern, opts.attempt, err == nil) return ml.RuleLock, err } diff --git a/rules/lock/metrics_test.go b/rules/lock/metrics_test.go index cc33b40..c410e05 100644 --- a/rules/lock/metrics_test.go +++ b/rules/lock/metrics_test.go @@ -77,7 +77,7 @@ func Test_metricLocker_Lock(t *testing.T) { return mockLock, tc.err }, } - observeLock := func(locker string, methodName string, pattern string, lockSucceeded bool) { + observeLock := func(locker string, methodName string, pattern string, attempt uint, lockSucceeded bool) { assert.Equal(t, tc.pattern, pattern) assert.Equal(t, tc.method, methodName) assert.Equal(t, tc.succeeded, lockSucceeded) diff --git a/rules/lock/options.go b/rules/lock/options.go index 00f59c9..d0ead5e 100644 --- a/rules/lock/options.go +++ b/rules/lock/options.go @@ -9,12 +9,15 @@ type options struct { pattern string // The method to provide context method string + // The attempt number for the lock + attempt uint } func buildOptions(opts ...Option) options { os := options{ pattern: unknown, method: unknown, + attempt: uint(1), } for _, opt := range opts { opt(&os) @@ -41,3 +44,11 @@ func MethodForLock(method string) Option { lo.method = method } } + +// MethodForLock is used to specify the context in which the lock was +// obtained. +func AttempForLock(attempt uint) Option { + return func(lo *options) { + lo.attempt = attempt + } +} diff --git a/rules/options.go b/rules/options.go index 3977033..33ed4cf 100644 --- a/rules/options.go +++ b/rules/options.go @@ -162,6 +162,8 @@ func KeyProcessorBuffer(size int) EngineOption { } // EngineLockTimeout controls the TTL of a lock in seconds. +// Default is 30s +// NOT BEING USED ANYMORE func EngineLockTimeout(lockTimeout int) EngineOption { return engineOptionFunction(func(o *engineOptions) { o.lockTimeout = lockTimeout @@ -169,7 +171,7 @@ func EngineLockTimeout(lockTimeout int) EngineOption { } // EngineLockAcquisitionTimeout controls the length of time we -// wait to acquire a lock. +// wait to acquire a lock. Default is 5s. func EngineLockAcquisitionTimeout(lockAcquisitionTimeout int) EngineOption { return engineOptionFunction(func(o *engineOptions) { o.lockAcquisitionTimeout = lockAcquisitionTimeout @@ -336,15 +338,19 @@ func EngineWatchProcessDelay(base time.Duration, jitterPercent float64) EngineOp } type ruleOptions struct { - lockTimeout int - contextProvider ContextProvider - ruleID string + lockTimeout int + contextProvider ContextProvider + ruleID string + watcherLockTries uint + watcherLockWait time.Duration } func makeRuleOptions(options ...RuleOption) ruleOptions { opts := ruleOptions{ - lockTimeout: 0, - ruleID: defaultRuleID, + lockTimeout: 0, + ruleID: defaultRuleID, + watcherLockTries: 1, + watcherLockWait: 0, } for _, opt := range options { opt.apply(&opts) @@ -363,8 +369,18 @@ func (f ruleOptionFunction) apply(o *ruleOptions) { f(o) } +// RuleWatcherLockWait controls the number of tries and time to wait for a +// watcher rule lock. Default is one try and no wait +func RuleWatcherLockRetries(tries uint, wait time.Duration) RuleOption { + return ruleOptionFunction(func(o *ruleOptions) { + o.watcherLockTries = tries + o.watcherLockWait = wait + }) +} + // RuleLockTimeout controls the TTL of the locks associated // with the rule, in seconds. +// NOT BEING USED ANYMORE func RuleLockTimeout(lockTimeout int) RuleOption { return ruleOptionFunction(func(o *ruleOptions) { o.lockTimeout = lockTimeout diff --git a/rules/options_test.go b/rules/options_test.go index b8d346f..f7556ea 100644 --- a/rules/options_test.go +++ b/rules/options_test.go @@ -14,6 +14,8 @@ func TestRuleOptions(t *testing.T) { assert.Equal(t, defaultRuleID, opts.ruleID) var defaultLockTimeout int assert.Equal(t, defaultLockTimeout, opts.lockTimeout) + assert.Equal(t, uint(1), opts.watcherLockTries) + assert.Equal(t, time.Duration(0), opts.watcherLockWait) opts = makeRuleOptions(RuleLockTimeout(300)) var threeHundred = 300 assert.Equal(t, threeHundred, opts.lockTimeout) @@ -22,6 +24,9 @@ func TestRuleOptions(t *testing.T) { testRuleID := "super-awesome-rule-id" opts = makeRuleOptions(RuleID(testRuleID)) assert.Equal(t, testRuleID, opts.ruleID) + opts = makeRuleOptions(RuleWatcherLockRetries(3, 4)) + assert.Equal(t, uint(3), opts.watcherLockTries) + assert.Equal(t, time.Duration(4), opts.watcherLockWait) } func TestEngineOptions(t *testing.T) { diff --git a/rules/rule_manager.go b/rules/rule_manager.go index 7855002..25b4f0c 100644 --- a/rules/rule_manager.go +++ b/rules/rule_manager.go @@ -2,6 +2,7 @@ package rules import ( "strings" + "time" ) type ruleManager struct { @@ -9,17 +10,22 @@ type ruleManager struct { currentIndex int rulesBySlashCount map[int]map[DynamicRule]int prefixes map[string]string - rules []DynamicRule + rulesLockOptions map[int]ruleMgrRuleLockOptions enhancedRuleFilter bool } +type ruleMgrRuleLockOptions struct { + watcherTries uint + watcherWait time.Duration +} + func newRuleManager(constraints map[string]constraint, enhancedRuleFilter bool) ruleManager { rm := ruleManager{ rulesBySlashCount: map[int]map[DynamicRule]int{}, prefixes: map[string]string{}, + rulesLockOptions: map[int]ruleMgrRuleLockOptions{}, constraints: constraints, currentIndex: 0, - rules: []DynamicRule{}, enhancedRuleFilter: enhancedRuleFilter, } return rm @@ -49,8 +55,8 @@ func (rm *ruleManager) getStaticRules(key string, value *string) map[staticRule] return out } -func (rm *ruleManager) addRule(rule DynamicRule) int { - rm.rules = append(rm.rules, rule) +func (rm *ruleManager) addRule(rule DynamicRule, opts ruleOptions) int { + rm.rulesLockOptions[rm.currentIndex] = ruleMgrRuleLockOptions{watcherTries: opts.watcherLockTries, watcherWait: opts.watcherLockWait} for _, pattern := range rule.getPatterns() { slashCount := strings.Count(pattern, "/") rules, ok := rm.rulesBySlashCount[slashCount] @@ -69,6 +75,10 @@ func (rm *ruleManager) addRule(rule DynamicRule) int { return lastIndex } +func (rm *ruleManager) getRuleLockOptions(ruleIndex int) ruleMgrRuleLockOptions { + return rm.rulesLockOptions[ruleIndex] +} + // Removes any path prefixes that have other path prefixes as // string prefixes func reducePrefixes(prefixes map[string]string) map[string]string { diff --git a/rules/rule_manager_test.go b/rules/rule_manager_test.go index c1526d0..0853afb 100644 --- a/rules/rule_manager_test.go +++ b/rules/rule_manager_test.go @@ -11,13 +11,14 @@ func TestRuleManager(t *testing.T) { rm := newRuleManager(map[string]constraint{}, erf) rule1, err1 := NewEqualsLiteralRule("/this/is/:a/rule", nil) assert.NoError(t, err1) - rm.addRule(rule1) + opts := makeRuleOptions() + rm.addRule(rule1, opts) rule2, err2 := NewEqualsLiteralRule("/that/is/:a/nother", nil) assert.NoError(t, err2) - rm.addRule(rule2) + rm.addRule(rule2, opts) rule3, err3 := NewEqualsLiteralRule("/this/is/:a", nil) assert.NoError(t, err3) - rm.addRule(rule3) + rm.addRule(rule3, opts) rules := rm.getStaticRules("/this/is/a/rule", nil) assert.Equal(t, 1, len(rules)) for r, index := range rules { @@ -26,6 +27,12 @@ func TestRuleManager(t *testing.T) { } rules = rm.getStaticRules("/nothing", nil) assert.Equal(t, 0, len(rules)) + + rule4, err4 := NewEqualsLiteralRule("/this/is/another/rule:a", nil) + assert.NoError(t, err4) + opts = makeRuleOptions(RuleWatcherLockRetries(3, 4)) + ruleIndex := rm.addRule(rule4, opts) + assert.Equal(t, ruleMgrRuleLockOptions{3, 4}, rm.getRuleLockOptions(ruleIndex)) } } @@ -34,7 +41,6 @@ func TestReducePrefixes(t *testing.T) { prefixes = reducePrefixes(prefixes) assert.Equal(t, 1, len(prefixes)) assert.Equal(t, "", prefixes["/servers"]) - } func TestSortPrefixesByLength(t *testing.T) { diff --git a/rules/watcher.go b/rules/watcher.go index 0fd2ca3..b2742e0 100644 --- a/rules/watcher.go +++ b/rules/watcher.go @@ -74,7 +74,7 @@ func (w *watcher) singleRun() { time.Sleep(delay) // TODO ideally a context should be used for fast shutdown, e.g. select { case <-ctx.Done(); case <-time.After(delay) } } w.logger.Debug("Calling process key", zap.String("key", key)) - w.kp.processKey(key, value, w.api, w.logger, map[string]string{"source": "watcher"}, incRuleProcessedCount) + w.kp.processKey(key, value, w.api, w.logger, map[string]string{sourceSource: sourceWatcher}, incRuleProcessedCount) } func incRuleProcessedCount(ruleID string) { diff --git a/rules/worker.go b/rules/worker.go index 653b2ac..1de75b1 100644 --- a/rules/worker.go +++ b/rules/worker.go @@ -71,7 +71,7 @@ func (bw *baseWorker) isStopped() bool { } func (bw *baseWorker) doWork(loggerPtr **zap.Logger, - rulePtr *staticRule, lockTTL int, callback workCallback, + rulePtr *staticRule, ruleOpts ruleMgrRuleLockOptions, callback workCallback, metricsInfo metricsInfo, lockKey string, ruleID string, source string) { logger := *loggerPtr logger = logger.With(zap.String("ruleID", ruleID), zap.String("mutex", lockKey)) @@ -93,11 +93,29 @@ func (bw *baseWorker) doWork(loggerPtr **zap.Logger, } return } - l, err2 := bw.locker.Lock(lockKey, lock.PatternForLock(metricsInfo.keyPattern), lock.MethodForLock("worker_lock")) - if err2 != nil { - logger.Warn("Failed to acquire rule lock", zap.Error(err2)) + + // Attempt to get the lock multiple times for the Watcher events to avoid being pushed to the crawler + var l lock.RuleLock + var lockErr error + for watcherTryCount := range ruleOpts.watcherTries { + // Wait after first attempt + if watcherTryCount > 0 { + time.Sleep(ruleOpts.watcherWait) + } + // Attempt to get the rule lock + l, lockErr = bw.locker.Lock(lockKey, lock.PatternForLock(metricsInfo.keyPattern), lock.MethodForLock("worker_lock"), lock.AttempForLock(watcherTryCount+1)) + if lockErr != nil { + logger.Warn("Failed to acquire rule lock", zap.Error(lockErr), zap.Uint("attempt", watcherTryCount+1)) + if source != sourceWatcher { + return + } + } + } + // Failed to get the lock after all attempts + if lockErr != nil { return } + defer func() { err := l.Unlock() if err != nil { @@ -181,7 +199,7 @@ func (w *v3Worker) singleRun() { task.Context = context task.cancel = cancelFunc metricsInfo := newMetricsInfo(context, work.keyPattern, work.metricsStartTime) - w.doWork(&task.Logger, &work.rule, w.engine.getLockTTLForRule(work.ruleIndex), func() { work.ruleTaskCallback(&task) }, metricsInfo, work.lockKey, work.ruleID, source) + w.doWork(&task.Logger, &work.rule, w.engine.ruleMgr.getRuleLockOptions(work.ruleIndex), func() { work.ruleTaskCallback(&task) }, metricsInfo, work.lockKey, work.ruleID, source) }() wg.Wait() } diff --git a/rules/worker_test.go b/rules/worker_test.go index 887a323..6b625c8 100644 --- a/rules/worker_test.go +++ b/rules/worker_test.go @@ -21,6 +21,9 @@ func TestWorkerSingleRun(t *testing.T) { cl, err := v3.New(conf) assert.NoError(t, err) e := newV3Engine(getTestLogger(), cl, EngineLockTimeout(300)) + ruleTest, err := NewEqualsLiteralRule("/this/is/another/rule:a", nil) + assert.NoError(t, err) + ruleIndex := e.ruleMgr.addRule(ruleTest, makeRuleOptions(RuleWatcherLockRetries(2, 2))) channel := e.workChannel lockChannel := make(chan bool) locker := lock.MockLocker{ @@ -43,7 +46,7 @@ func TestWorkerSingleRun(t *testing.T) { task := V3RuleTask{ Attr: &attr, Logger: getTestLogger(), - Metadata: map[string]string{}, + Metadata: map[string]string{sourceSource: sourceWatcher}, } cbChannel := make(chan bool) callback := testCallback{ @@ -57,6 +60,8 @@ func TestWorkerSingleRun(t *testing.T) { } rw := v3RuleWork{ rule: &rule, + ruleIndex: ruleIndex, + ruleID: "ID1", ruleTask: task, ruleTaskCallback: callback.callback, lockKey: "key", diff --git a/v3enginetest/main.go b/v3enginetest/main.go index 90dbeaa..c35c54c 100644 --- a/v3enginetest/main.go +++ b/v3enginetest/main.go @@ -97,7 +97,7 @@ func main() { engine := rules.NewV3Engine(cfg, logger, rules.EngineContextProvider(cpFunc), rules.EngineMetricsCollector(mFunc), - rules.EngineSyncInterval(5), + rules.EngineSyncInterval(10), rules.EngineCrawlMutex("inttest", 5), rules.EngineLockAcquisitionTimeout(5)) mw := &rules.MockWatcherWrapper{