Skip to content

Commit afd94b9

Browse files
authored
Merge pull request #40 from scunningham/final_flush
Flush at EOF with a final Eval using a time in the future.
2 parents 5b28811 + aeec428 commit afd94b9

File tree

1 file changed

+67
-36
lines changed

1 file changed

+67
-36
lines changed

internal/pkg/engine/engine.go

Lines changed: 67 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"io"
9+
"math"
910
"os"
1011
"sync"
1112
"sync/atomic"
@@ -32,7 +33,10 @@ import (
3233

3334
type LogData = resolve.LogData
3435

35-
const ramLimit = 512 << 20 // 512 MiB
36+
const (
37+
ramLimit = 512 << 20 // 512 MiB
38+
futureMark = int64(math.MaxInt64) - 1 // Avoid future issues with MaxInt64 used as a flag
39+
)
3640

3741
var (
3842
ErrRuleNotFound = errors.New("rule not found")
@@ -615,14 +619,15 @@ func (r *RuntimeT) _run(ctx context.Context, wg *sync.WaitGroup, sources []*LogD
615619

616620
func (r *RuntimeT) _runSrc(ctx context.Context, wg *sync.WaitGroup, ld *LogData, matchers *RuleMatchersT, stop int64, lines *atomic.Int64) error {
617621

618-
type pairT struct {
622+
type trioT struct {
619623
matcher matchCB
624+
flusher flushCB
620625
compilerCb compiler.CallbackT
621626
}
622627

623628
var (
624629
srcType = ld.SrcType()
625-
cbs = make([]pairT, 0, len(matchers.eventSrc))
630+
cbs = make([]trioT, 0, len(matchers.eventSrc))
626631
)
627632

628633
for ruleId, pe := range matchers.eventSrc {
@@ -637,14 +642,19 @@ func (r *RuntimeT) _runSrc(ctx context.Context, wg *sync.WaitGroup, ld *LogData,
637642
Str("ruleId", ruleId).
638643
Msg("Matching source")
639644

640-
cb, err := _makeMatchCb(srcType, matchers.match[ruleId])
641-
if err != nil {
642-
log.Error().Err(err).Msg("Failed to make stdin callback")
643-
return err
645+
matcher := matchers.match[ruleId]
646+
647+
lm, ok := matcher.(lm.Matcher)
648+
if !ok {
649+
return errors.New("invalid matcher")
644650
}
645651

646-
cbs = append(cbs, pairT{
652+
cb := _bindMatchCb(srcType, lm)
653+
fb := _bindFlushCB(srcType, lm)
654+
655+
cbs = append(cbs, trioT{
647656
matcher: cb,
657+
flusher: fb,
648658
compilerCb: matchers.cb[ruleId],
649659
})
650660
}
@@ -673,22 +683,40 @@ func (r *RuntimeT) _runSrc(ctx context.Context, wg *sync.WaitGroup, ld *LogData,
673683
// Use an atomic instead of calling tracker directly to decrease overhead.
674684
lines.Add(1)
675685

676-
for _, pair := range cbs {
677-
if msgHits := pair.matcher(entry); msgHits != nil {
686+
for _, trio := range cbs {
687+
if msgHits := trio.matcher(entry); msgHits != nil {
678688
log.Info().
679689
Interface("hits", msgHits).
680690
Msg("Hits")
681-
pair.compilerCb(ctx, *msgHits)
691+
trio.compilerCb(ctx, *msgHits)
682692
}
683693
}
684694

685695
return false
686696
}
687697

698+
finalFlush := func() {
699+
for _, trio := range cbs {
700+
if msgHits := trio.flusher(); msgHits != nil {
701+
log.Info().
702+
Interface("hits", msgHits).
703+
Msg("Hits on final flush")
704+
trio.compilerCb(ctx, *msgHits)
705+
}
706+
}
707+
}
708+
688709
wg.Add(1)
689710
go func() {
690711
defer wg.Done()
712+
713+
// Spin across the logs
691714
_spinLogs(ld, scanCb, stop, tracker)
715+
716+
// Finally flush out any pending negative matches
717+
finalFlush()
718+
719+
// Close the tracker
692720
tracker.MarkAsDone()
693721
}()
694722

@@ -754,41 +782,44 @@ func _spinLogs(ld *LogData, scanF scanner.ScanFuncT, stop int64, tracker *progre
754782
}
755783

756784
type matchCB func(entry entry.LogEntry) *matchz.HitsT
785+
type flushCB func() *matchz.HitsT
757786

758-
func _makeMatchCb(src string, matcher any) (matchCB, error) {
759-
760-
mm, ok := matcher.(lm.Matcher)
761-
if !ok {
762-
return nil, errors.New("invalid matcher")
787+
func makeHitZ(src string, hits lm.Hits) *matchz.HitsT {
788+
if hits.Cnt == 0 {
789+
return nil
763790
}
764791

765-
scanCb := func(entry entry.LogEntry) *matchz.HitsT {
792+
log.Trace().Any("hits", hits).Msg("Hits")
766793

767-
hits := mm.Scan(entry)
768-
if hits.Cnt == 0 {
769-
return nil
770-
}
771-
772-
log.Trace().Any("hits", hits).Msg("Hits")
794+
msgHits := matchz.HitsT{
795+
Entries: make([]matchz.EntryT, 0, len(hits.Logs)),
796+
}
773797

774-
msgHits := matchz.HitsT{
775-
Entries: make([]matchz.EntryT, 0, len(hits.Logs)),
776-
}
798+
for _, line := range hits.Logs {
799+
msgHits.Entries = append(msgHits.Entries, matchz.EntryT{
800+
Timestamp: line.Timestamp,
801+
Entry: []byte(line.Line),
802+
})
803+
}
777804

778-
for _, line := range hits.Logs {
779-
msgHits.Entries = append(msgHits.Entries, matchz.EntryT{
780-
Timestamp: line.Timestamp,
781-
Entry: []byte(line.Line),
782-
})
783-
}
805+
msgHits.Count = uint32(hits.Cnt)
806+
msgHits.Entity.FileName = src
784807

785-
msgHits.Count = uint32(hits.Cnt)
786-
msgHits.Entity.FileName = src
808+
return &msgHits
809+
}
787810

788-
return &msgHits
811+
func _bindMatchCb(src string, mm lm.Matcher) matchCB {
812+
return func(entry entry.LogEntry) *matchz.HitsT {
813+
hits := mm.Scan(entry)
814+
return makeHitZ(src, hits)
789815
}
816+
}
790817

791-
return scanCb, nil
818+
func _bindFlushCB(src string, mm lm.Matcher) flushCB {
819+
return func() *matchz.HitsT {
820+
hits := mm.Eval(futureMark)
821+
return makeHitZ(src, hits)
822+
}
792823
}
793824

794825
type TrkRdr struct {

0 commit comments

Comments
 (0)