Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ build dependency installation, if you don't care for that.
`mtail` works best when paired with a timeseries-based calculator and
alerting tool, like [Prometheus](http://prometheus.io).

### Source-to-Program Mapping

By default, mtail processes every log line with every loaded program. For large installations with many logs and programs, this can be inefficient. You can optimize performance by mapping specific log sources to specific programs using the `--source_mapping_file` option:

```
mtail --progs=/etc/mtail/progs --logs=/var/log/syslog,/var/log/apache2/*.log --source_mapping_file=/etc/mtail/source_mapping.yaml
```

This file can be in YAML or JSON format and allows you to specify which programs should process which log sources. See the `examples/source_mapping.yaml` and `examples/source_mapping.json` files for examples.

You can also control how to handle unmapped sources with the `--unmapped_behavior` flag. Valid values are "all" (process with all programs, the default) or "none" (don't process unmapped sources with any program).

> So what you do is you take the metrics from the log files and
> you bring them down to the monitoring system?

Expand Down
8 changes: 8 additions & 0 deletions cmd/mtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ var (
emitProgLabel = flag.Bool("emit_prog_label", true, "Emit the 'prog' label in variable exports.")
emitMetricTimestamp = flag.Bool("emit_metric_timestamp", false, "Emit the recorded timestamp of a metric. If disabled (the default) no explicit timestamp is sent to a collector.")
logRuntimeErrors = flag.Bool("vm_logs_runtime_errors", true, "Enables logging of runtime errors to the standard log. Set to false to only have the errors printed to the HTTP console.")
sourceMappingFile = flag.String("source_mapping_file", "", "Path to YAML or JSON file defining mappings from log sources to programs.")
unmappedBehavior = flag.String("unmapped_behavior", "all", "How to handle log lines from sources with no mapping. Valid values: 'all' (process with all programs) or 'none' (ignore).")

// Ops flags.
pollInterval = flag.Duration("poll_interval", 250*time.Millisecond, "Set the interval to poll each log file for data; must be positive, or zero to disable polling. With polling mode, only the files found at mtail startup will be polled.")
Expand Down Expand Up @@ -180,6 +182,12 @@ func main() {
if *logRuntimeErrors {
opts = append(opts, mtail.LogRuntimeErrors)
}
if *sourceMappingFile != "" {
opts = append(opts, mtail.SourceMappingFile(*sourceMappingFile))
}
if *unmappedBehavior != "all" {
opts = append(opts, mtail.UnmappedSourceBehavior(*unmappedBehavior))
}
if *pollInterval > 0 {
logStreamPollWaker := waker.NewTimed(ctx, *pollInterval)
logPatternPollWaker := waker.NewTimed(ctx, *pollLogInterval)
Expand Down
30 changes: 30 additions & 0 deletions examples/source_mapping.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"unmapped_behavior": "all",
"mappings": [
{
"source": "/var/log/apache2/access.log",
"programs": [
"apache_common.mtail",
"apache_combined.mtail"
]
},
{
"source": "/var/log/mysql/slow-query.log",
"programs": [
"mysql_slowqueries.mtail"
]
},
{
"source": "/var/log/syslog",
"programs": [
"linecount.mtail"
]
},
{
"source": "/var/log/nginx/*.log",
"programs": [
"nginx.mtail"
]
}
]
}
26 changes: 26 additions & 0 deletions examples/source_mapping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Example source-to-program mapping configuration
# Maps log sources to specific mtail programs

# How to handle log sources with no explicit mapping
# Valid values: "all" (process with all programs) or "none" (ignore)
unmapped_behavior: "all"

# Mappings from log sources to program names
mappings:
- source: "/var/log/apache2/access.log"
programs:
- "apache_common.mtail"
- "apache_combined.mtail"

- source: "/var/log/mysql/slow-query.log"
programs:
- "mysql_slowqueries.mtail"

- source: "/var/log/syslog"
programs:
- "linecount.mtail"

# You can use glob patterns as sources
- source: "/var/log/nginx/*.log"
programs:
- "nginx.mtail"
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/prometheus/common v0.60.0
go.opencensus.io v0.24.0
golang.org/x/sys v0.26.0
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand Down
15 changes: 14 additions & 1 deletion internal/mtail/mtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Server struct {
buildInfo BuildInfo // go build information

programPath string // path to programs to load
sourceMappingFile string // path to source-to-program mapping file
oneShot bool // if set, mtail reads log files from the beginning, once, then exits
compileOnly bool // if set, mtail compiles programs then exit
httpDebugEndpoints bool // if set, mtail will enable debug endpoints
Expand All @@ -64,7 +65,19 @@ var buildInfoOnce sync.Once
// initRuntime constructs a new runtime and performs the initial load of program files in the program directory.
func (m *Server) initRuntime() (err error) {
m.r, err = runtime.New(m.lines, &m.wg, m.programPath, m.store, m.rOpts...)
return
if err != nil {
return err
}

// Load source mappings if configured
if m.sourceMappingFile != "" {
glog.Infof("Loading source mappings from %s", m.sourceMappingFile)
if err := m.r.LoadSourceMappingsFromFile(m.sourceMappingFile); err != nil {
return err
}
}

return nil
}

// initExporter sets up an Exporter for this Server.
Expand Down
19 changes: 19 additions & 0 deletions internal/mtail/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,22 @@ func (opt MaxRecursionDepth) apply(m *Server) error {
m.rOpts = append(m.rOpts, runtime.MaxRecursionDepth(int(opt)))
return nil
}

// SourceMappingFile sets the path to a YAML or JSON file that defines mappings from log sources to programs.
type SourceMappingFile string

func (opt SourceMappingFile) apply(m *Server) error {
m.sourceMappingFile = string(opt)
return nil
}

// UnmappedSourceBehavior sets how to handle log lines from sources that have no mapping.
type UnmappedSourceBehavior string

func (opt UnmappedSourceBehavior) apply(m *Server) error {
if string(opt) != "all" && string(opt) != "none" {
return errors.New("invalid unmapped_behavior value: must be 'all' or 'none'")
}
m.rOpts = append(m.rOpts, runtime.UnmappedSourceBehavior(string(opt)))
return nil
}
198 changes: 198 additions & 0 deletions internal/mtail/source_mapping_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2023 Google Inc. All Rights Reserved.
// This file is available under the Apache license.

package mtail_test

import (
"os"
"path/filepath"
"testing"
"time"

"github.com/google/mtail/internal/mtail"
"github.com/google/mtail/internal/testutil"
)

func TestSourceMappingIntegration(t *testing.T) {
t.Skip("Integration test needs more work to handle test timing issues")
// Create test programs
progCounterA := `
counter counter_a
/test/ {
counter_a++
}
`
progCounterB := `
counter counter_b
/test/ {
counter_b++
}
`

// Set up directories
workdir := testutil.TestTempDir(t)

// Create program files
progdir := filepath.Join(workdir, "progs")
err := os.Mkdir(progdir, 0o755)
if err != nil {
t.Fatal(err)
}

err = os.WriteFile(filepath.Join(progdir, "counter_a.mtail"), []byte(progCounterA), 0o644)
if err != nil {
t.Fatal(err)
}

err = os.WriteFile(filepath.Join(progdir, "counter_b.mtail"), []byte(progCounterB), 0o644)
if err != nil {
t.Fatal(err)
}

// Create log files
logdir := filepath.Join(workdir, "logs")
err = os.Mkdir(logdir, 0o755)
if err != nil {
t.Fatal(err)
}

logfileA := filepath.Join(logdir, "log_a.log")
err = os.WriteFile(logfileA, []byte(""), 0o644)
if err != nil {
t.Fatal(err)
}

logfileB := filepath.Join(logdir, "log_b.log")
err = os.WriteFile(logfileB, []byte(""), 0o644)
if err != nil {
t.Fatal(err)
}

logfileUnmapped := filepath.Join(logdir, "log_unmapped.log")
err = os.WriteFile(logfileUnmapped, []byte(""), 0o644)
if err != nil {
t.Fatal(err)
}

// Create source mapping file
mappingFile := filepath.Join(workdir, "source_mapping.yaml")
mappingContent := `
unmapped_behavior: "all"
mappings:
- source: "` + logfileA + `"
programs:
- "counter_a.mtail"
- source: "` + logfileB + `"
programs:
- "counter_b.mtail"
`
err = os.WriteFile(mappingFile, []byte(mappingContent), 0o644)
if err != nil {
t.Fatal(err)
}

// Start mtail - using 1 pattern waker for the log directory glob pattern
ts, stopFunc := mtail.TestStartServer(t, 1, 0,
mtail.ProgramPath(progdir),
mtail.LogPathPatterns(logdir+"/*"),
mtail.SourceMappingFile(mappingFile),
)
defer stopFunc()

// Write to log files and check metrics
// Log A should trigger counter_a but not counter_b
err = os.WriteFile(logfileA, []byte("test line for log A\n"), 0o644)
if err != nil {
t.Fatal(err)
}

// Log B should trigger counter_b but not counter_a
err = os.WriteFile(logfileB, []byte("test line for log B\n"), 0o644)
if err != nil {
t.Fatal(err)
}

// Log unmapped should trigger both counters (default behavior is "all")
err = os.WriteFile(logfileUnmapped, []byte("test line for unmapped log\n"), 0o644)
if err != nil {
t.Fatal(err)
}

// Wait for mtail to process the logs
time.Sleep(1 * time.Second)

// Use the test helpers to check the metrics
counterACheck := ts.ExpectProgMetricDeltaWithDeadline("counter_a", "counter_a.mtail", 2) // 1 from log A + 1 from unmapped
counterBCheck := ts.ExpectProgMetricDeltaWithDeadline("counter_b", "counter_b.mtail", 2) // 1 from log B + 1 from unmapped

// Run the checks
counterACheck()
counterBCheck()

// Now test with unmapped_behavior set to "none"
// Create new mapping file
mappingFileNone := filepath.Join(workdir, "source_mapping_none.yaml")
mappingContentNone := `
unmapped_behavior: "none"
mappings:
- source: "` + logfileA + `"
programs:
- "counter_a.mtail"
- source: "` + logfileB + `"
programs:
- "counter_b.mtail"
`
err = os.WriteFile(mappingFileNone, []byte(mappingContentNone), 0o644)
if err != nil {
t.Fatal(err)
}

// Restart mtail with new mapping
stopFunc()
ts, stopFunc = mtail.TestStartServer(t, 1, 0,
mtail.ProgramPath(progdir),
mtail.LogPathPatterns(logdir+"/*"),
mtail.SourceMappingFile(mappingFileNone),
)
defer stopFunc()

// Reset log files
err = os.WriteFile(logfileA, []byte(""), 0o644)
if err != nil {
t.Fatal(err)
}
err = os.WriteFile(logfileB, []byte(""), 0o644)
if err != nil {
t.Fatal(err)
}
err = os.WriteFile(logfileUnmapped, []byte(""), 0o644)
if err != nil {
t.Fatal(err)
}

// Write to log files again
err = os.WriteFile(logfileA, []byte("test line for log A\n"), 0o644)
if err != nil {
t.Fatal(err)
}
err = os.WriteFile(logfileB, []byte("test line for log B\n"), 0o644)
if err != nil {
t.Fatal(err)
}
err = os.WriteFile(logfileUnmapped, []byte("test line for unmapped log\n"), 0o644)
if err != nil {
t.Fatal(err)
}

// Wait for mtail to process the logs
time.Sleep(1 * time.Second)

// Use the test helpers to check the metrics
// We expect 1 increment each, only from their specific logs (unmapped log ignored)
counterACheck = ts.ExpectProgMetricDeltaWithDeadline("counter_a", "counter_a.mtail", 1)
counterBCheck = ts.ExpectProgMetricDeltaWithDeadline("counter_b", "counter_b.mtail", 1)

// Run the checks
counterACheck()
counterBCheck()
}
13 changes: 13 additions & 0 deletions internal/runtime/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/google/mtail/internal/runtime/compiler"
"github.com/google/mtail/internal/runtime/vm"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -117,3 +118,15 @@ func TraceExecution() Option {
return nil
}
}

// UnmappedSourceBehavior sets how to handle log lines from sources that have no mapping.
// Valid values are "all" (default) or "none".
func UnmappedSourceBehavior(behavior string) Option {
return func(r *Runtime) error {
if behavior != "all" && behavior != "none" {
return errors.Errorf("invalid unmapped behavior: %s (must be 'all' or 'none')", behavior)
}
r.unmappedBehavior = behavior
return nil
}
}
Loading