Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ input:
- "/var/log/suricata/eve.json"
- "/var/log/suricata/eve.*.json"

# Suricata can write the EVE log to a unix stream socket by specifying
# the eve-log filetype attribute as 'unix_stream' instead of 'regular'.
# EveBox will create the sockets below and listen for events once connected.
#sockets:
# - "/var/log/suricata/eve.log.socket"

# Additional fields that will be added to each event. This is currently limited
# to strings at this time.
additional-fields:
Expand Down
6 changes: 6 additions & 0 deletions examples/evebox.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ input:
# - /usr/share/suricata/rules/*.rules
# - /etc/suricata/rules/*.rules

# Suricata can write the EVE log to a unix stream socket by specifying
# the eve-log filetype attribute as 'unix_stream' instead of 'regular'.
# EveBox will create the sockets below and listen for events once connected.
#sockets:
# - "/var/log/suricata/eve.log.socket"

geoip:
disabled: false
# Path to the MaxMind database. This must be the version 2 database
Expand Down
2 changes: 1 addition & 1 deletion src/bookmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Bookmark {

#[cfg(not(unix))]
fn check_inode(&self, _meta: &std::fs::Metadata) -> bool {
return true;
true
}
}

Expand Down
51 changes: 49 additions & 2 deletions src/cli/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ pub async fn main(args_matches: &clap::ArgMatches) -> anyhow::Result<()> {

// Collect eve filenames.
let eve_filenames = get_eve_filenames(&config)?;
if eve_filenames.is_empty() {
let eve_sockets = get_eve_sockets(&config)?;
if eve_filenames.is_empty() && eve_sockets.is_empty() {
bail!("No EVE log files provided. Exiting as there is nothing to do.");
}

Expand Down Expand Up @@ -224,6 +225,17 @@ pub async fn main(args_matches: &clap::ArgMatches) -> anyhow::Result<()> {
}
}
}
#[cfg(unix)]
for path in &eve_sockets {
if !log_runners.contains_key(path) {
info!("Starting EVE stream socket reader {}", path);
log_runners.insert(path.clone(), true);
match start_socket_runner(path, importer.clone(), filters.clone()) {
Ok(runner) => tasks.push(runner),
Err(err) => warn!("Could not create socket file {}: {}", path, err),
}
}
}
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {}
_ = tasks.select_next_some() => {
Expand All @@ -240,7 +252,7 @@ fn start_runner(
mut filters: EveFilterChain,
) -> JoinHandle<()> {
let mut end = false;
let reader = crate::eve::reader::EveReader::new(filename.into());
let reader = crate::eve::reader::EveReaderFile::new(filename.into());
let bookmark_filename = get_bookmark_filename(filename, bookmark_directory);
if let Some(bookmark_filename) = &bookmark_filename {
info!("Using bookmark file: {:?}", bookmark_filename);
Expand All @@ -263,6 +275,26 @@ fn start_runner(
})
}

#[cfg(unix)]
fn start_socket_runner(
filename: &str,
importer: EventSink,
mut filters: EveFilterChain,
) -> Result<JoinHandle<()>, eve::EveReaderError> {
let reader = crate::eve::reader::EveReaderSocket::new(filename.into())?;
let mut processor = crate::eve::Processor::new(reader, importer);

filters.add_filter(eve::filters::AddAgentFilenameFilter::new(
filename.to_string(),
));

processor.filter_chain = Some(filters);
processor.report_interval = std::time::Duration::from_secs(60);
Ok(tokio::spawn(async move {
processor.run().await;
}))
}

fn find_config_filename() -> Option<&'static str> {
let paths = ["./agent.yaml", "/etc/evebox/agent.yaml"];
for path in paths {
Expand Down Expand Up @@ -322,6 +354,21 @@ fn get_eve_filenames(config: &Config) -> anyhow::Result<Vec<String>> {
Ok(eve_filenames)
}

fn get_eve_sockets(config: &Config) -> anyhow::Result<Vec<String>> {
Copy link

Copilot AI May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Logic for reading sockets is duplicated from server; consider extracting a shared utility function to avoid code duplication.

Copilot uses AI. Check for mistakes.
let mut eve_sockets: Vec<String> = vec![];

match config.get_value::<Vec<String>>("input.sockets") {
Ok(Some(filenames)) => {
eve_sockets.extend(filenames);
}
Ok(None) => {}
Err(_) => {
bail!("There was an error reading 'input.sockets' from the configuration file");
}
}
Ok(eve_sockets)
}

fn get_rule_filenames(config: &Config) -> anyhow::Result<Vec<String>> {
match config.get_value::<Vec<String>>("rules") {
Ok(Some(filenames)) => Ok(filenames),
Expand Down
3 changes: 2 additions & 1 deletion src/cli/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::prelude::*;

use crate::config::Config;
use crate::eve;
use crate::eve::EveReader;
Copy link

Copilot AI May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EveReader trait import is unused in this file; consider removing it.

Suggested change
use crate::eve::EveReader;

Copilot uses AI. Check for mistakes.
use crate::geoip;
use crate::server::main::build_axum_service;
use crate::server::metrics::Metrics;
Expand Down Expand Up @@ -171,7 +172,7 @@ async fn run_import(
) -> anyhow::Result<()> {
let geoipdb = geoip::GeoIP::open(None).ok();
let mut indexer = sqlite::importer::SqliteEventSink::new(sqlx, writer, metrics);
let mut reader = eve::reader::EveReader::new(input.into());
let mut reader = eve::reader::EveReaderFile::new(input.into());
info!("Reading {} ({} bytes)", input, reader.file_size());
let mut last_percent = 0;
let mut count = 0;
Expand Down
4 changes: 4 additions & 0 deletions src/eve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ pub(crate) mod watcher;
pub(crate) use eve::Eve;
pub(crate) use processor::Processor;
pub(crate) use reader::EveReader;
pub(crate) use reader::EveReaderError;
pub(crate) use reader::EveReaderFile;
#[cfg(unix)]
pub(crate) use reader::EveReaderSocket;
22 changes: 13 additions & 9 deletions src/eve/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use super::filters::EveFilterChain;

const DEFAULT_BATCH_SIZE: usize = 100;

pub(crate) struct Processor {
pub reader: EveReader,
pub(crate) struct Processor<R: EveReader> {
pub reader: R,
pub importer: EventSink,
pub filter_chain: Option<EveFilterChain>,
pub bookmark_filename: Option<PathBuf>,
Expand All @@ -33,8 +33,8 @@ pub(crate) struct Processor {
pub oneshot: bool,
}

impl Processor {
pub fn new(reader: EveReader, importer: EventSink) -> Self {
impl<R: EveReader> Processor<R> {
pub fn new(reader: R, importer: EventSink) -> Self {
Self {
reader,
importer,
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Processor {
if self.report_interval > Duration::from_secs(0)
&& last_report.elapsed() > self.report_interval
{
debug!(filename = ?self.reader.filename, "count={}, commits={}, eofs={}", count, commits, eofs);
debug!(filename = ?self.reader.get_filename(), "count={}, commits={}, eofs={}", count, commits, eofs);
count = 0;
commits = 0;
eofs = 0;
Expand All @@ -106,7 +106,7 @@ impl Processor {
Err(err) => {
error!(
"Failed to read event from {}: {}",
self.reader.filename.display(),
self.reader.get_filename().display(),
err
);
self.sleep_for(1000).await;
Expand All @@ -119,10 +119,14 @@ impl Processor {
} else if !self.oneshot && self.reader.is_file_changed() {
info!(
"File may have been rotated, will reopen: filename={:?}",
self.reader.filename
self.reader.get_filename()
);
if let Err(err) = self.reader.reopen() {
error!("Failed to reopen {:?}, error={}", self.reader.filename, err);
error!(
"Failed to reopen {:?}, error={}",
self.reader.get_filename(),
err
);
}
}

Expand Down Expand Up @@ -152,7 +156,7 @@ impl Processor {
// give up some CPU to other tasks.
self.sleep_for(0).await;
}
info!(filename = ?self.reader.filename, "count={}, commits={}, eofs={}", count, commits, eofs);
info!(filename = ?self.reader.get_filename(), "count={}, commits={}, eofs={}", count, commits, eofs);
}

async fn sleep_for(&self, millis: u64) {
Expand Down
Loading