Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions rules/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ func (e *baseEngine) addRule(rule DynamicRule,
lockPattern string,
callback interface{},
options ...RuleOption) {
ruleIndex := e.ruleMgr.addRule(rule)
opts := makeRuleOptions(options...)
ruleIndex := e.ruleMgr.addRule(rule, opts)
ttl := e.options.lockTimeout
if opts.lockTimeout > 0 {
ttl = opts.lockTimeout
Expand All @@ -293,11 +293,7 @@ func (e *baseEngine) addRule(rule DynamicRule,

func (e *v3Engine) Run() {
e.logger.Info("Rules engine options", zap.Object("options", &e.options), zap.Int("rules", len(e.ruleMgr.rules)))
prefixSlice := []string{}
prefixes := e.ruleMgr.prefixes
// This is a map; used to ensure there are no duplicates
for prefix := range prefixes {
prefixSlice = append(prefixSlice, prefix)
for _, prefix := range e.ruleMgr.getWatcherPrefixes() {
logger := e.logger.With(zap.String("prefix", prefix))
w, err := newV3Watcher(e.cl, prefix, logger, e.baseEngine.keyProc, e.options.watchTimeout, e.kvWrapper, e.metrics, e.watcherWrapper, e.options.watchDelay)
if err != nil {
Expand All @@ -314,7 +310,7 @@ func (e *v3Engine) Run() {
logger,
e.options.crawlMutex,
e.options.lockAcquisitionTimeout,
prefixSlice,
e.ruleMgr.getPrioritizedPrefixes(),
e.kvWrapper,
e.options.syncDelay,
e.locker,
Expand Down
11 changes: 8 additions & 3 deletions rules/int_crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (ic *intCrawler) singleRun(logger *zap.Logger) {
ic.rulesProcessedCount = make(map[string]int)

queryStart := time.Now()
prioritizedKeys := []string{}
for _, prefix := range ic.prefixes {
pCtx := SetMethod(ctx, crawlerMethodName+"-"+prefix)
resp, err := ic.kv.Get(pCtx, prefix, v3.WithPrefix())
Expand All @@ -171,12 +172,15 @@ func (ic *intCrawler) singleRun(logger *zap.Logger) {
}
for _, kv := range resp.Kvs {
values[string(kv.Key)] = string(kv.Value)
// Using a map unsorts the prefixes, keep
// the priority by using a slice
prioritizedKeys = append(prioritizedKeys, string(kv.Key))
}
}
metrics.CrawlerQueryTime(ic.name, queryStart)
metrics.CrawlerValuesCount(ic.name, len(values))
evalStart := time.Now()
ic.processData(values, logger)
ic.processData(values, prioritizedKeys, logger)
metrics.CrawlerEvalTime(ic.name, evalStart)

ic.metricMutex.Lock()
Expand All @@ -187,9 +191,10 @@ func (ic *intCrawler) singleRun(logger *zap.Logger) {
}
logger.Info("Crawler run complete", zap.Duration("time", time.Since(crawlerStart)), zap.Int("values", len(values)))
}
func (ic *intCrawler) processData(values map[string]string, logger *zap.Logger) {

func (ic *intCrawler) processData(values map[string]string, prioritizedKeys []string, logger *zap.Logger) {
api := &cacheReadAPI{values: values}
for k := range values {
for _, k := range prioritizedKeys {
v := values[k]
if ic.isStopping() {
return
Expand Down
7 changes: 4 additions & 3 deletions rules/key_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type keyTask struct {
func (bkp *baseKeyProcessor) processKey(key string, value *string, rapi readAPI, logger *zap.Logger, dispatcher workDispatcher,
metadata map[string]string, timesEvaluated func(rulesID string)) {
logger.Debug("Processing key", zap.String("key", key))
rules := bkp.rm.getStaticRules(key, value)
rules, prioritized := bkp.rm.getStaticRules(key, value)
valueString := "<nil>"
if value != nil {
valueString = *value
Expand All @@ -168,7 +168,8 @@ func (bkp *baseKeyProcessor) processKey(key string, value *string, rapi readAPI,
logger.Error("Error getting keys to evaluate rules", zap.Error(err), zap.Int("rules", len(rules)), zap.Int("keys", len(keys)))
return
}
for rule, index := range rules {
for _, rule := range prioritized {
index := rules[rule]
ruleID := bkp.ruleIDs[index]
if timesEvaluated != nil {
timesEvaluated(ruleID)
Expand All @@ -189,7 +190,7 @@ func (bkp *baseKeyProcessor) processKey(key string, value *string, rapi readAPI,
}

func (bkp *baseKeyProcessor) isWork(key string, value *string, api readAPI) bool {
rules := bkp.rm.getStaticRules(key, value)
rules, _ := bkp.rm.getStaticRules(key, value)
for rule := range rules {
satisfied, _ := rule.satisfied(api) // #nosec G104 -- Map lookup
if satisfied {
Expand Down
6 changes: 4 additions & 2 deletions rules/key_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func TestV3KeyProcessor(t *testing.T) {
rule, err := NewEqualsLiteralRule("/test/:key", &value)
assert.NoError(t, err)
rm := newRuleManager(map[string]constraint{}, false)
rm.addRule(rule)
opts := makeRuleOptions()
rm.addRule(rule, opts)
api := newMapReadAPI()
api.put("/test/key", value)
callbacks := map[int]V3RuleTaskCallback{0: v3DummyCallback}
Expand Down Expand Up @@ -84,7 +85,8 @@ func TestNewV3KeyProcessor(t *testing.T) {
rule, err := NewEqualsLiteralRule("/test/:key", &value)
assert.NoError(t, err)
rm := newRuleManager(map[string]constraint{}, false)
rm.addRule(rule)
opts := makeRuleOptions()
rm.addRule(rule, opts)
api := newMapReadAPI()
api.put("/test/key", value)

Expand Down
25 changes: 25 additions & 0 deletions rules/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,16 @@ type ruleOptions struct {
lockTimeout int
contextProvider ContextProvider
ruleID string
crawlerOnly bool
priority uint
}

func makeRuleOptions(options ...RuleOption) ruleOptions {
opts := ruleOptions{
lockTimeout: 0,
ruleID: defaultRuleID,
crawlerOnly: false,
priority: 0,
}
for _, opt := range options {
opt.apply(&opts)
Expand Down Expand Up @@ -385,3 +389,24 @@ func RuleID(ruleID string) RuleOption {
o.ruleID = ruleID
})
}

// CrawlerOnly makes it so the rule is only
// evaluated by the crawler and is not assigned a watcher
func CrawlerOnly() RuleOption {
return ruleOptionFunction((func(o *ruleOptions) {
o.crawlerOnly = true
}))
}

// Priority sets the priority for fields
// that are associated with a rule during
// a crawler run. The higher the number, the
// higher the priority. The default is 0, or
// lowest priority.
// Watcher processing will
// still be done unless CrawlerOnly() is used.
func Priority(priority uint) RuleOption {
return ruleOptionFunction((func(o *ruleOptions) {
o.priority = priority
}))
}
102 changes: 88 additions & 14 deletions rules/rule_manager.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,40 @@
package rules

import (
"sort"
"strings"
)

type ruleManager struct {
constraints map[string]constraint
currentIndex int
rulesBySlashCount map[int]map[DynamicRule]int
prefixes map[string]string
rules []DynamicRule
prefixes map[string]ruleMgrRuleOptions
rules map[DynamicRule]uint
enhancedRuleFilter bool
}

type ruleMgrRuleOptions struct {
crawlerOnly bool
priority uint
}

func newRuleManager(constraints map[string]constraint, enhancedRuleFilter bool) ruleManager {
rm := ruleManager{
rulesBySlashCount: map[int]map[DynamicRule]int{},
prefixes: map[string]string{},
prefixes: map[string]ruleMgrRuleOptions{},
constraints: constraints,
currentIndex: 0,
rules: []DynamicRule{},
rules: map[DynamicRule]uint{},
enhancedRuleFilter: enhancedRuleFilter,
}
return rm
}

func (rm *ruleManager) getStaticRules(key string, value *string) map[staticRule]int {
func (rm *ruleManager) getStaticRules(key string, value *string) (map[staticRule]int, []staticRule) {
slashCount := strings.Count(key, "/")
out := make(map[staticRule]int)
toSort := make(map[staticRule]uint)
rules, ok := rm.rulesBySlashCount[slashCount]
if ok {
for rule, index := range rules {
Expand All @@ -37,20 +44,22 @@ func (rm *ruleManager) getStaticRules(key string, value *string) map[staticRule]
qSat := sRule.qSatisfiable(key, value)
if qSat == qTrue || qSat == qMaybe {
out[sRule] = index
toSort[sRule] = rm.rules[rule]
}
} else {
if sRule.satisfiable(key, value) {
out[sRule] = index
toSort[sRule] = rm.rules[rule]
}
}
}
}
}
return out
return out, sortRulesByPriority(toSort)
}

func (rm *ruleManager) addRule(rule DynamicRule) int {
rm.rules = append(rm.rules, rule)
func (rm *ruleManager) addRule(rule DynamicRule, opts ruleOptions) int {
rm.rules[rule] = opts.priority
for _, pattern := range rule.getPatterns() {
slashCount := strings.Count(pattern, "/")
rules, ok := rm.rulesBySlashCount[slashCount]
Expand All @@ -61,35 +70,100 @@ func (rm *ruleManager) addRule(rule DynamicRule) int {
rules[rule] = rm.currentIndex
}
for _, prefix := range rule.getPrefixesWithConstraints(rm.constraints) {
rm.prefixes[prefix] = ""

_, currentPriority := rm.prefixes[prefix]
// if value does not exist in map yet
if !currentPriority {
rm.prefixes[prefix] = ruleMgrRuleOptions{crawlerOnly: opts.crawlerOnly, priority: opts.priority}
} else {
// ensure that no high priority is overwritten
if rm.prefixes[prefix].priority < opts.priority {
rm.prefixes[prefix] = ruleMgrRuleOptions{crawlerOnly: rm.prefixes[prefix].crawlerOnly, priority: opts.priority}
}
// only update crawlerOnly value if new option is false
if !opts.crawlerOnly {
rm.prefixes[prefix] = ruleMgrRuleOptions{crawlerOnly: false, priority: rm.prefixes[prefix].priority}
}
}

}
rm.prefixes = reducePrefixes(rm.prefixes)
lastIndex := rm.currentIndex
rm.currentIndex = rm.currentIndex + 1
return lastIndex
}

func (rm *ruleManager) getPrioritizedPrefixes() []string {
out := []string{}
for prefix := range rm.prefixes {
out = append(out, prefix)
}
// sort slice by highest priority value
sort.SliceStable(out, func(i, j int) bool {
return rm.prefixes[out[i]].priority > rm.prefixes[out[j]].priority
})
return out
}

func sortRulesByPriority(rules map[staticRule]uint) []staticRule {
out := []staticRule{}
for rule := range rules {
out = append(out, rule)
}
// sort slice by highest priority value
sort.SliceStable(out, func(i, j int) bool {
return rules[out[i]] > rules[out[j]]
})
return out
}

func (rm *ruleManager) getWatcherPrefixes() []string {
out := []string{}
for prefix, ruleOpt := range rm.prefixes {
if !ruleOpt.crawlerOnly {
out = append(out, prefix)
}
}
return out
}

// Removes any path prefixes that have other path prefixes as
// string prefixes
func reducePrefixes(prefixes map[string]string) map[string]string {
out := map[string]string{}
func reducePrefixes(prefixes map[string]ruleMgrRuleOptions) map[string]ruleMgrRuleOptions {
out := map[string]ruleMgrRuleOptions{}
sorted := sortPrefixesByLength(prefixes)
for _, prefix := range sorted {
add := true
for addedPrefix := range out {
optionsToAdd := prefixes[prefix]
for addedPrefix, addedOptions := range out {
if strings.HasPrefix(prefix, addedPrefix) {
add = false
optsToUpdate := out[addedPrefix]
// update the addedPrefix to be the
// highest priority of any
// overlapping prefixes
if addedOptions.priority < optionsToAdd.priority {
optsToUpdate.priority = optionsToAdd.priority
out[addedPrefix] = optsToUpdate
}
// if any rule associated with the prefix
// is not crawler only, set crawlerOnly option
// to be false
if !optionsToAdd.crawlerOnly {
optsToUpdate.crawlerOnly = false
out[addedPrefix] = optsToUpdate
}
}
}
if add {
out[prefix] = ""
out[prefix] = optionsToAdd
}
}
return out
}

// Sorts prefixes shortest to longest
func sortPrefixesByLength(prefixes map[string]string) []string {
func sortPrefixesByLength(prefixes map[string]ruleMgrRuleOptions) []string {
out := []string{}
for prefix := range prefixes {
out = append(out, prefix)
Expand Down
Loading