Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ test: int-setup

.PHONY: int-setup
int-setup: int-teardown
docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.16 \
docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.21 \
/usr/local/bin/etcd --listen-client-urls http://0.0.0.0:2379 \
--advertise-client-urls http://0.0.0.0:2379

Expand Down
10 changes: 0 additions & 10 deletions concurrency/REAME.md

This file was deleted.

6 changes: 3 additions & 3 deletions metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var (
Subsystem: "etcd",
Namespace: "rules",
Help: "etcd rules engine lock count",
}, []string{"locker", "method", "pattern", "success"})
}, []string{"locker", "method", "pattern", "attempt", "success"})
rulesEngineUnlockErrorCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "unlock_error_count",
Subsystem: "etcd",
Expand Down Expand Up @@ -101,8 +101,8 @@ func init() {
}

// IncLockMetric increments the lock count.
func IncLockMetric(locker string, methodName string, pattern string, lockSucceeded bool) {
rulesEngineLockCount.WithLabelValues(locker, methodName, pattern, strconv.FormatBool(lockSucceeded)).Inc()
func IncLockMetric(locker string, methodName string, pattern string, attempt uint, lockSucceeded bool) {
rulesEngineLockCount.WithLabelValues(locker, methodName, pattern, strconv.FormatUint(uint64(attempt), 10), strconv.FormatBool(lockSucceeded)).Inc()
}

// IncUnlockMetric increments the unlock error count.
Expand Down
8 changes: 4 additions & 4 deletions metrics/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ func checkMetrics(t *testing.T, expectedOutput string) {
}

func TestIncLockMetric(t *testing.T) {
IncLockMetric("etcd", "getKey", "/key/pattern", true)
IncLockMetric("etcd", "getKey", "/second/pattern", false)
IncLockMetric("etcd", "getKey", "/key/pattern", 2, true)
IncLockMetric("etcd", "getKey", "/second/pattern", 1, false)

checkMetrics(t, `rules_etcd_lock_count{locker="etcd",method="getKey",pattern="/key/pattern",success="true"} 1`)
checkMetrics(t, `rules_etcd_lock_count{locker="etcd",method="getKey",pattern="/second/pattern",success="false"} 1`)
checkMetrics(t, `rules_etcd_lock_count{attempt="2",locker="etcd",method="getKey",pattern="/key/pattern",success="true"} 1`)
checkMetrics(t, `rules_etcd_lock_count{attempt="1",locker="etcd",method="getKey",pattern="/second/pattern",success="false"} 1`)
}

func TestIncSatisfiedThenNot(t *testing.T) {
Expand Down
5 changes: 0 additions & 5 deletions rules/doc.go

This file was deleted.

18 changes: 5 additions & 13 deletions rules/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ const (
// WebhookURLEnv is the environment variable used to specify a callback
// webhook that will get called every time a callback has finished executing.
WebhookURLEnv = "RULES_ENGINE_CALLBACK_WEBHOOK"
sourceSource = "source"
sourceWatcher = "watcher"
sourceCrawler = "crawler"
)

type stoppable interface {
Expand All @@ -43,7 +46,6 @@ type baseEngine struct {
metrics AdvancedMetricsCollector
logger *zap.Logger
options engineOptions
ruleLockTTLs map[int]int
ruleMgr ruleManager
stopped uint32
crawlers []stoppable
Expand Down Expand Up @@ -144,7 +146,6 @@ func newV3Engine(logger *zap.Logger, cl *v3.Client, options ...EngineOption) v3E
metrics: metrics,
logger: logger,
options: opts,
ruleLockTTLs: map[int]int{},
ruleMgr: ruleMgr,
locker: finalLocker,
callbackListener: cbListener,
Expand Down Expand Up @@ -270,12 +271,8 @@ func (e *baseEngine) addRule(rule DynamicRule,
lockPattern string,
callback interface{},
options ...RuleOption) {
ruleIndex := e.ruleMgr.addRule(rule)
opts := makeRuleOptions(options...)
ttl := e.options.lockTimeout
if opts.lockTimeout > 0 {
ttl = opts.lockTimeout
}
ruleIndex := e.ruleMgr.addRule(rule, opts)
contextProvider := opts.contextProvider
if contextProvider == nil {
contextProvider = e.options.contextProvider
Expand All @@ -284,15 +281,14 @@ func (e *baseEngine) addRule(rule DynamicRule,
panic("Rule ID option missing")
}
ruleID := opts.ruleID
e.ruleLockTTLs[ruleIndex] = ttl
e.keyProc.setCallback(ruleIndex, callback)
e.keyProc.setLockKeyPattern(ruleIndex, lockPattern)
e.keyProc.setContextProvider(ruleIndex, contextProvider)
e.keyProc.setRuleID(ruleIndex, ruleID)
}

func (e *v3Engine) Run() {
e.logger.Info("Rules engine options", zap.Object("options", &e.options), zap.Int("rules", len(e.ruleMgr.rules)))
e.logger.Info("Rules engine options", zap.Object("options", &e.options), zap.Int("rules", len(e.ruleMgr.rulesLockOptions)))
prefixSlice := []string{}
prefixes := e.ruleMgr.prefixes
// This is a map; used to ensure there are no duplicates
Expand Down Expand Up @@ -340,10 +336,6 @@ func (e *v3Engine) Run() {

}

func (e *baseEngine) getLockTTLForRule(index int) int {
return e.ruleLockTTLs[index]
}

type v3CallbackWrapper struct {
ttlPathPattern string
callback V3RuleTaskCallback
Expand Down
9 changes: 5 additions & 4 deletions rules/int_crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ func (ic *intCrawler) isStopped() bool {
func (ic *intCrawler) run() {
atomicSet(&ic.stopped, false)
for !ic.isStopping() {
logger := ic.logger.With(zap.String("source", "crawler"))
logger := ic.logger.With(zap.String("source", sourceCrawler))
if ic.mutex == nil {
ic.singleRun(logger)
} else {
mutex := "/crawler/" + *ic.mutex
logger.Info("Attempting to obtain mutex",
zap.String("mutex", mutex), zap.Int("Timeout", ic.mutexTimeout))
lock, err := ic.locker.Lock(mutex, lock.MethodForLock("crawler"), lock.PatternForLock(mutex))
lock, err := ic.locker.Lock(mutex, lock.MethodForLock(sourceCrawler), lock.PatternForLock(mutex))
if err != nil {
logger.Error("Could not obtain mutex; skipping crawler run", zap.Error(err), zap.String("mutex", mutex))
} else {
Expand All @@ -148,7 +148,7 @@ func (ic *intCrawler) run() {
func (ic *intCrawler) singleRun(logger *zap.Logger) {
crawlerStart := time.Now()
logger.Info("Starting crawler run", zap.Int("prefixes", len(ic.prefixes)))
crawlerMethodName := "crawler"
crawlerMethodName := sourceCrawler
if ic.isStopping() {
return
}
Expand Down Expand Up @@ -187,6 +187,7 @@ func (ic *intCrawler) singleRun(logger *zap.Logger) {
}
logger.Info("Crawler run complete", zap.Duration("time", time.Since(crawlerStart)), zap.Int("values", len(values)))
}

func (ic *intCrawler) processData(values map[string]string, logger *zap.Logger) {
api := &cacheReadAPI{values: values}
for k := range values {
Expand All @@ -197,7 +198,7 @@ func (ic *intCrawler) processData(values map[string]string, logger *zap.Logger)
// Check to see if any rule is satisfied from cache
if ic.kp.isWork(k, &v, api) {
// Process key if it is
ic.kp.processKey(k, &v, ic.api, logger, map[string]string{"source": "crawler"}, ic.incRuleProcessedCount)
ic.kp.processKey(k, &v, ic.api, logger, map[string]string{sourceSource: sourceCrawler}, ic.incRuleProcessedCount)
}
time.Sleep(ic.delay.Generate())
}
Expand Down
6 changes: 4 additions & 2 deletions rules/key_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func TestV3KeyProcessor(t *testing.T) {
rule, err := NewEqualsLiteralRule("/test/:key", &value)
assert.NoError(t, err)
rm := newRuleManager(map[string]constraint{}, false)
rm.addRule(rule)
opts := makeRuleOptions()
rm.addRule(rule, opts)
api := newMapReadAPI()
api.put("/test/key", value)
callbacks := map[int]V3RuleTaskCallback{0: v3DummyCallback}
Expand Down Expand Up @@ -84,7 +85,8 @@ func TestNewV3KeyProcessor(t *testing.T) {
rule, err := NewEqualsLiteralRule("/test/:key", &value)
assert.NoError(t, err)
rm := newRuleManager(map[string]constraint{}, false)
rm.addRule(rule)
opts := makeRuleOptions()
rm.addRule(rule, opts)
api := newMapReadAPI()
api.put("/test/key", value)

Expand Down
6 changes: 3 additions & 3 deletions rules/lock/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func WithMetrics(ruleLocker RuleLocker, name string) RuleLocker {
return withMetrics(ruleLocker, name, metrics.IncLockMetric, metrics.IncUnlockErrorMetric)
}
func withMetrics(ruleLocker RuleLocker, name string,
observeLock func(locker string, methodName string, pattern string, lockSucceeded bool),
observeLock func(locker string, methodName string, pattern string, attempt uint, lockSucceeded bool),
observeUnlockError func(locker string, methodName string, pattern string)) RuleLocker {
return metricLocker{
RuleLocker: ruleLocker,
Expand All @@ -21,15 +21,15 @@ type metricLocker struct {
RuleLocker
RuleLock
lockerName string
observeLock func(locker string, methodName string, pattern string, lockSucceeded bool)
observeLock func(locker string, methodName string, pattern string, attempt uint, lockSucceeded bool)
observeUnlockError func(locker string, methodName string, pattern string)
}

func (ml metricLocker) Lock(key string, options ...Option) (RuleLock, error) {
opts := buildOptions(options...)
var err error
ml.RuleLock, err = ml.RuleLocker.Lock(key, options...)
ml.observeLock(ml.lockerName, opts.method, opts.pattern, err == nil)
ml.observeLock(ml.lockerName, opts.method, opts.pattern, opts.attempt, err == nil)
return ml.RuleLock, err
}

Expand Down
2 changes: 1 addition & 1 deletion rules/lock/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func Test_metricLocker_Lock(t *testing.T) {
return mockLock, tc.err
},
}
observeLock := func(locker string, methodName string, pattern string, lockSucceeded bool) {
observeLock := func(locker string, methodName string, pattern string, attempt uint, lockSucceeded bool) {
assert.Equal(t, tc.pattern, pattern)
assert.Equal(t, tc.method, methodName)
assert.Equal(t, tc.succeeded, lockSucceeded)
Expand Down
11 changes: 11 additions & 0 deletions rules/lock/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ type options struct {
pattern string
// The method to provide context
method string
// The attempt number for the lock
attempt uint
}

func buildOptions(opts ...Option) options {
os := options{
pattern: unknown,
method: unknown,
attempt: uint(1),
}
for _, opt := range opts {
opt(&os)
Expand All @@ -41,3 +44,11 @@ func MethodForLock(method string) Option {
lo.method = method
}
}

// MethodForLock is used to specify the context in which the lock was
// obtained.
func AttempForLock(attempt uint) Option {
return func(lo *options) {
lo.attempt = attempt
}
}
28 changes: 22 additions & 6 deletions rules/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,16 @@ func KeyProcessorBuffer(size int) EngineOption {
}

// EngineLockTimeout controls the TTL of a lock in seconds.
// Default is 30s
// NOT BEING USED ANYMORE
func EngineLockTimeout(lockTimeout int) EngineOption {
return engineOptionFunction(func(o *engineOptions) {
o.lockTimeout = lockTimeout
})
}

// EngineLockAcquisitionTimeout controls the length of time we
// wait to acquire a lock.
// wait to acquire a lock. Default is 5s.
func EngineLockAcquisitionTimeout(lockAcquisitionTimeout int) EngineOption {
return engineOptionFunction(func(o *engineOptions) {
o.lockAcquisitionTimeout = lockAcquisitionTimeout
Expand Down Expand Up @@ -336,15 +338,19 @@ func EngineWatchProcessDelay(base time.Duration, jitterPercent float64) EngineOp
}

type ruleOptions struct {
lockTimeout int
contextProvider ContextProvider
ruleID string
lockTimeout int
contextProvider ContextProvider
ruleID string
watcherLockTries uint
watcherLockWait time.Duration
}

func makeRuleOptions(options ...RuleOption) ruleOptions {
opts := ruleOptions{
lockTimeout: 0,
ruleID: defaultRuleID,
lockTimeout: 0,
ruleID: defaultRuleID,
watcherLockTries: 1,
watcherLockWait: 0,
}
for _, opt := range options {
opt.apply(&opts)
Expand All @@ -363,8 +369,18 @@ func (f ruleOptionFunction) apply(o *ruleOptions) {
f(o)
}

// RuleWatcherLockWait controls the number of tries and time to wait for a
// watcher rule lock. Default is one try and no wait
func RuleWatcherLockRetries(tries uint, wait time.Duration) RuleOption {
return ruleOptionFunction(func(o *ruleOptions) {
o.watcherLockTries = tries
o.watcherLockWait = wait
})
}

// RuleLockTimeout controls the TTL of the locks associated
// with the rule, in seconds.
// NOT BEING USED ANYMORE
func RuleLockTimeout(lockTimeout int) RuleOption {
return ruleOptionFunction(func(o *ruleOptions) {
o.lockTimeout = lockTimeout
Expand Down
5 changes: 5 additions & 0 deletions rules/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ func TestRuleOptions(t *testing.T) {
assert.Equal(t, defaultRuleID, opts.ruleID)
var defaultLockTimeout int
assert.Equal(t, defaultLockTimeout, opts.lockTimeout)
assert.Equal(t, uint(1), opts.watcherLockTries)
assert.Equal(t, time.Duration(0), opts.watcherLockWait)
opts = makeRuleOptions(RuleLockTimeout(300))
var threeHundred = 300
assert.Equal(t, threeHundred, opts.lockTimeout)
Expand All @@ -22,6 +24,9 @@ func TestRuleOptions(t *testing.T) {
testRuleID := "super-awesome-rule-id"
opts = makeRuleOptions(RuleID(testRuleID))
assert.Equal(t, testRuleID, opts.ruleID)
opts = makeRuleOptions(RuleWatcherLockRetries(3, 4))
assert.Equal(t, uint(3), opts.watcherLockTries)
assert.Equal(t, time.Duration(4), opts.watcherLockWait)
}

func TestEngineOptions(t *testing.T) {
Expand Down
18 changes: 14 additions & 4 deletions rules/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,30 @@ package rules

import (
"strings"
"time"
)

type ruleManager struct {
constraints map[string]constraint
currentIndex int
rulesBySlashCount map[int]map[DynamicRule]int
prefixes map[string]string
rules []DynamicRule
rulesLockOptions map[int]ruleMgrRuleLockOptions
enhancedRuleFilter bool
}

type ruleMgrRuleLockOptions struct {
watcherTries uint
watcherWait time.Duration
}

func newRuleManager(constraints map[string]constraint, enhancedRuleFilter bool) ruleManager {
rm := ruleManager{
rulesBySlashCount: map[int]map[DynamicRule]int{},
prefixes: map[string]string{},
rulesLockOptions: map[int]ruleMgrRuleLockOptions{},
constraints: constraints,
currentIndex: 0,
rules: []DynamicRule{},
enhancedRuleFilter: enhancedRuleFilter,
}
return rm
Expand Down Expand Up @@ -49,8 +55,8 @@ func (rm *ruleManager) getStaticRules(key string, value *string) map[staticRule]
return out
}

func (rm *ruleManager) addRule(rule DynamicRule) int {
rm.rules = append(rm.rules, rule)
func (rm *ruleManager) addRule(rule DynamicRule, opts ruleOptions) int {
rm.rulesLockOptions[rm.currentIndex] = ruleMgrRuleLockOptions{watcherTries: opts.watcherLockTries, watcherWait: opts.watcherLockWait}
for _, pattern := range rule.getPatterns() {
slashCount := strings.Count(pattern, "/")
rules, ok := rm.rulesBySlashCount[slashCount]
Expand All @@ -69,6 +75,10 @@ func (rm *ruleManager) addRule(rule DynamicRule) int {
return lastIndex
}

func (rm *ruleManager) getRuleLockOptions(ruleIndex int) ruleMgrRuleLockOptions {
return rm.rulesLockOptions[ruleIndex]
}

// Removes any path prefixes that have other path prefixes as
// string prefixes
func reducePrefixes(prefixes map[string]string) map[string]string {
Expand Down
Loading
Loading