diff --git a/examples/agent.yaml b/examples/agent.yaml index b5703473..5b33ad9b 100644 --- a/examples/agent.yaml +++ b/examples/agent.yaml @@ -40,6 +40,12 @@ input: - "/var/log/suricata/eve.json" - "/var/log/suricata/eve.*.json" + # Suricata can write the EVE log to a unix stream socket by specifying + # the eve-log filetype attribute as 'unix_stream' instead of 'regular'. + # EveBox will create the sockets below and listen for events once connected. + #sockets: + # - "/var/log/suricata/eve.log.socket" + # Additional fields that will be added to each event. This is currently limited # to strings at this time. additional-fields: diff --git a/examples/evebox.yaml b/examples/evebox.yaml index 0e2a5bd1..578c8119 100644 --- a/examples/evebox.yaml +++ b/examples/evebox.yaml @@ -100,6 +100,12 @@ input: # - /usr/share/suricata/rules/*.rules # - /etc/suricata/rules/*.rules + # Suricata can write the EVE log to a unix stream socket by specifying + # the eve-log filetype attribute as 'unix_stream' instead of 'regular'. + # EveBox will create the sockets below and listen for events once connected. + #sockets: + # - "/var/log/suricata/eve.log.socket" + geoip: disabled: false # Path to the MaxMind database. This must be the version 2 database diff --git a/src/bookmark.rs b/src/bookmark.rs index 094aba52..0f099807 100644 --- a/src/bookmark.rs +++ b/src/bookmark.rs @@ -73,7 +73,7 @@ impl Bookmark { #[cfg(not(unix))] fn check_inode(&self, _meta: &std::fs::Metadata) -> bool { - return true; + true } } diff --git a/src/cli/agent.rs b/src/cli/agent.rs index 33fa20de..6176afa2 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -119,7 +119,8 @@ pub async fn main(args_matches: &clap::ArgMatches) -> anyhow::Result<()> { // Collect eve filenames. let eve_filenames = get_eve_filenames(&config)?; - if eve_filenames.is_empty() { + let eve_sockets = get_eve_sockets(&config)?; + if eve_filenames.is_empty() && eve_sockets.is_empty() { bail!("No EVE log files provided. Exiting as there is nothing to do."); } @@ -224,6 +225,17 @@ pub async fn main(args_matches: &clap::ArgMatches) -> anyhow::Result<()> { } } } + #[cfg(unix)] + for path in &eve_sockets { + if !log_runners.contains_key(path) { + info!("Starting EVE stream socket reader {}", path); + log_runners.insert(path.clone(), true); + match start_socket_runner(path, importer.clone(), filters.clone()) { + Ok(runner) => tasks.push(runner), + Err(err) => warn!("Could not create socket file {}: {}", path, err), + } + } + } tokio::select! { _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {} _ = tasks.select_next_some() => { @@ -240,7 +252,7 @@ fn start_runner( mut filters: EveFilterChain, ) -> JoinHandle<()> { let mut end = false; - let reader = crate::eve::reader::EveReader::new(filename.into()); + let reader = crate::eve::reader::EveReaderFile::new(filename.into()); let bookmark_filename = get_bookmark_filename(filename, bookmark_directory); if let Some(bookmark_filename) = &bookmark_filename { info!("Using bookmark file: {:?}", bookmark_filename); @@ -263,6 +275,26 @@ fn start_runner( }) } +#[cfg(unix)] +fn start_socket_runner( + filename: &str, + importer: EventSink, + mut filters: EveFilterChain, +) -> Result, eve::EveReaderError> { + let reader = crate::eve::reader::EveReaderSocket::new(filename.into())?; + let mut processor = crate::eve::Processor::new(reader, importer); + + filters.add_filter(eve::filters::AddAgentFilenameFilter::new( + filename.to_string(), + )); + + processor.filter_chain = Some(filters); + processor.report_interval = std::time::Duration::from_secs(60); + Ok(tokio::spawn(async move { + processor.run().await; + })) +} + fn find_config_filename() -> Option<&'static str> { let paths = ["./agent.yaml", "/etc/evebox/agent.yaml"]; for path in paths { @@ -322,6 +354,21 @@ fn get_eve_filenames(config: &Config) -> anyhow::Result> { Ok(eve_filenames) } +fn get_eve_sockets(config: &Config) -> anyhow::Result> { + let mut eve_sockets: Vec = vec![]; + + match config.get_value::>("input.sockets") { + Ok(Some(filenames)) => { + eve_sockets.extend(filenames); + } + Ok(None) => {} + Err(_) => { + bail!("There was an error reading 'input.sockets' from the configuration file"); + } + } + Ok(eve_sockets) +} + fn get_rule_filenames(config: &Config) -> anyhow::Result> { match config.get_value::>("rules") { Ok(Some(filenames)) => Ok(filenames), diff --git a/src/cli/oneshot.rs b/src/cli/oneshot.rs index 6dcdecb2..f29d4cfd 100644 --- a/src/cli/oneshot.rs +++ b/src/cli/oneshot.rs @@ -5,6 +5,7 @@ use crate::prelude::*; use crate::config::Config; use crate::eve; +use crate::eve::EveReader; use crate::geoip; use crate::server::main::build_axum_service; use crate::server::metrics::Metrics; @@ -171,7 +172,7 @@ async fn run_import( ) -> anyhow::Result<()> { let geoipdb = geoip::GeoIP::open(None).ok(); let mut indexer = sqlite::importer::SqliteEventSink::new(sqlx, writer, metrics); - let mut reader = eve::reader::EveReader::new(input.into()); + let mut reader = eve::reader::EveReaderFile::new(input.into()); info!("Reading {} ({} bytes)", input, reader.file_size()); let mut last_percent = 0; let mut count = 0; diff --git a/src/eve/mod.rs b/src/eve/mod.rs index 06722521..b8bf3b9b 100644 --- a/src/eve/mod.rs +++ b/src/eve/mod.rs @@ -11,3 +11,7 @@ pub(crate) mod watcher; pub(crate) use eve::Eve; pub(crate) use processor::Processor; pub(crate) use reader::EveReader; +pub(crate) use reader::EveReaderError; +pub(crate) use reader::EveReaderFile; +#[cfg(unix)] +pub(crate) use reader::EveReaderSocket; diff --git a/src/eve/processor.rs b/src/eve/processor.rs index c12be40d..02bfb15f 100644 --- a/src/eve/processor.rs +++ b/src/eve/processor.rs @@ -18,8 +18,8 @@ use super::filters::EveFilterChain; const DEFAULT_BATCH_SIZE: usize = 100; -pub(crate) struct Processor { - pub reader: EveReader, +pub(crate) struct Processor { + pub reader: R, pub importer: EventSink, pub filter_chain: Option, pub bookmark_filename: Option, @@ -33,8 +33,8 @@ pub(crate) struct Processor { pub oneshot: bool, } -impl Processor { - pub fn new(reader: EveReader, importer: EventSink) -> Self { +impl Processor { + pub fn new(reader: R, importer: EventSink) -> Self { Self { reader, importer, @@ -96,7 +96,7 @@ impl Processor { if self.report_interval > Duration::from_secs(0) && last_report.elapsed() > self.report_interval { - debug!(filename = ?self.reader.filename, "count={}, commits={}, eofs={}", count, commits, eofs); + debug!(filename = ?self.reader.get_filename(), "count={}, commits={}, eofs={}", count, commits, eofs); count = 0; commits = 0; eofs = 0; @@ -106,7 +106,7 @@ impl Processor { Err(err) => { error!( "Failed to read event from {}: {}", - self.reader.filename.display(), + self.reader.get_filename().display(), err ); self.sleep_for(1000).await; @@ -119,10 +119,14 @@ impl Processor { } else if !self.oneshot && self.reader.is_file_changed() { info!( "File may have been rotated, will reopen: filename={:?}", - self.reader.filename + self.reader.get_filename() ); if let Err(err) = self.reader.reopen() { - error!("Failed to reopen {:?}, error={}", self.reader.filename, err); + error!( + "Failed to reopen {:?}, error={}", + self.reader.get_filename(), + err + ); } } @@ -152,7 +156,7 @@ impl Processor { // give up some CPU to other tasks. self.sleep_for(0).await; } - info!(filename = ?self.reader.filename, "count={}, commits={}, eofs={}", count, commits, eofs); + info!(filename = ?self.reader.get_filename(), "count={}, commits={}, eofs={}", count, commits, eofs); } async fn sleep_for(&self, millis: u64) { diff --git a/src/eve/reader.rs b/src/eve/reader.rs index f2041a4d..6f28e58f 100644 --- a/src/eve/reader.rs +++ b/src/eve/reader.rs @@ -7,7 +7,11 @@ use std::io::BufReader; use std::io::Seek; use std::io::SeekFrom; #[cfg(unix)] +use std::os::unix::fs::FileTypeExt; +#[cfg(unix)] use std::os::unix::fs::MetadataExt; +#[cfg(unix)] +use std::os::unix::net::{UnixListener, UnixStream}; use std::path::PathBuf; use tracing::debug; @@ -29,7 +33,20 @@ impl From for EveReaderError { } } -pub(crate) struct EveReader { +pub trait EveReader { + fn get_filename(&self) -> PathBuf; + fn open(&mut self) -> Result<(), EveReaderError>; + fn reopen(&mut self) -> Result<(), EveReaderError>; + fn goto_lineno(&mut self, lineno: u64) -> Result; + fn goto_end(&mut self) -> Result; + fn offset(&mut self) -> u64; + fn next_record(&mut self) -> Result, EveReaderError>; + fn metadata(&self) -> Option; + fn is_file_changed(&self) -> bool; + fn file_size(&self) -> u64; +} + +pub(crate) struct EveReaderFile { pub filename: PathBuf, line: String, reader: Option>, @@ -37,18 +54,12 @@ pub(crate) struct EveReader { offset: u64, } -impl EveReader { - pub fn new(filename: PathBuf) -> Self { - Self { - filename, - line: String::new(), - reader: None, - lineno: 0, - offset: 0, - } +impl EveReader for EveReaderFile { + fn get_filename(&self) -> PathBuf { + self.filename.clone() } - pub fn open(&mut self) -> Result<(), EveReaderError> { + fn open(&mut self) -> Result<(), EveReaderError> { let file = File::open(&self.filename)?; let reader = BufReader::new(file); self.reader = Some(reader); @@ -57,7 +68,7 @@ impl EveReader { Ok(()) } - pub fn reopen(&mut self) -> Result<(), EveReaderError> { + fn reopen(&mut self) -> Result<(), EveReaderError> { if let Err(err) = self.open() { self.reader = None; self.lineno = 0; @@ -67,7 +78,7 @@ impl EveReader { Ok(()) } - pub fn goto_lineno(&mut self, lineno: u64) -> Result { + fn goto_lineno(&mut self, lineno: u64) -> Result { if self.reader.is_none() { self.open()?; } @@ -81,7 +92,7 @@ impl EveReader { Ok(count) } - pub fn goto_end(&mut self) -> Result { + fn goto_end(&mut self) -> Result { if self.reader.is_none() { self.open()?; } @@ -98,7 +109,7 @@ impl EveReader { /// Return the current offset the reader is into the file. /// /// Will return 0 if no file is open. - pub fn offset(&mut self) -> u64 { + fn offset(&mut self) -> u64 { if let Some(reader) = &mut self.reader { if let Ok(pos) = reader.stream_position() { return pos; @@ -107,31 +118,8 @@ impl EveReader { 0 } - fn next_line(&mut self) -> Result, EveReaderError> { - self.line.truncate(0); - if let Some(reader) = &mut self.reader { - let pos = reader.stream_position()?; - let n = reader.read_line(&mut self.line)?; - if n > 0 { - if !self.line.ends_with('\n') { - info!( - "Line does not end with new line character, seeking back to {}", - pos - ); - reader.seek(SeekFrom::Start(pos))?; - } else { - self.offset = pos + n as u64; - self.lineno += 1; - let line = self.line.trim(); - return Ok(Some(line)); - } - } - } - Ok(None) - } - /// Not named next as we don't implement the iterator pattern (yet). - pub fn next_record(&mut self) -> Result, EveReaderError> { + fn next_record(&mut self) -> Result, EveReaderError> { if self.reader.is_none() { self.open()?; } @@ -150,7 +138,7 @@ impl EveReader { Ok(None) } - pub fn metadata(&self) -> Option { + fn metadata(&self) -> Option { if let Some(reader) = &self.reader { match reader.get_ref().metadata() { Err(err) => { @@ -173,7 +161,7 @@ impl EveReader { // An overly complex method to check if the file on disk has been truncate, // or replaced. - pub fn is_file_changed(&self) -> bool { + fn is_file_changed(&self) -> bool { let open: Option = if let Some(reader) = &self.reader { match reader.get_ref().metadata() { Err(err) => { @@ -234,13 +222,48 @@ impl EveReader { /// Get the size of the file. This is taken directly from disk, so may not be the /// exact file currently being read by this reader. - pub fn file_size(&self) -> u64 { + fn file_size(&self) -> u64 { if let Ok(metadata) = std::fs::metadata(&self.filename) { metadata.len() } else { 0 } } +} + +impl EveReaderFile { + pub fn new(filename: PathBuf) -> Self { + Self { + filename, + line: String::new(), + reader: None, + lineno: 0, + offset: 0, + } + } + + fn next_line(&mut self) -> Result, EveReaderError> { + self.line.truncate(0); + if let Some(reader) = &mut self.reader { + let pos = reader.stream_position()?; + let n = reader.read_line(&mut self.line)?; + if n > 0 { + if !self.line.ends_with('\n') { + info!( + "Line does not end with new line character, seeking back to {}", + pos + ); + reader.seek(SeekFrom::Start(pos))?; + } else { + self.offset = pos + n as u64; + self.lineno += 1; + let line = self.line.trim(); + return Ok(Some(line)); + } + } + } + Ok(None) + } #[cfg(unix)] fn inode(&self, m: &std::fs::Metadata) -> Option { @@ -260,3 +283,136 @@ pub(crate) struct Metadata { pub size: u64, pub inode: Option, } + +#[cfg(unix)] +pub(crate) struct EveReaderSocket { + pub filename: PathBuf, + listener: UnixListener, + reader: Option>, +} + +#[cfg(unix)] +impl EveReaderSocket { + pub fn new(filename: PathBuf) -> Result { + let listener = Self::bind_listener(&filename)?; + Ok(Self { + filename, + listener, + reader: None, + }) + } + + // The UnixListener lives through the lifetime of EveReaderSocket, + // only needs to be bound once and can accept a connection from Suricata multiple times. + fn bind_listener(filename: &PathBuf) -> Result { + // Remove socket file before use + if let Ok(metadata) = std::fs::metadata(filename) { + if !metadata.file_type().is_socket() { + error!("Refusing to delete non-socket file"); + } else { + std::fs::remove_file(filename)?; + } + } + + let listener = UnixListener::bind(filename.clone())?; + if let Err(e) = listener.set_nonblocking(true) { + debug!( + "UnixListener for {} could not be set to non-blocking: {e}", + filename.display() + ); + } + Ok(listener) + } + + // A UnixListener can accept multiple simultaneous connections, but + // we only call accept() when there is not an established self.reader + fn accept(&mut self) -> Result<(), EveReaderError> { + let (socket, addr) = self.listener.accept()?; + debug!( + "Accepted connection on {} to: {addr:?}", + self.filename.display() + ); + if let Err(e) = socket.set_nonblocking(true) { + debug!( + "UnixStream for {} could not be set to non-blocking: {e}", + self.filename.display() + ); + } + self.reader = Some(BufReader::new(socket)); + Ok(()) + } +} + +#[cfg(unix)] +impl EveReader for EveReaderSocket { + fn get_filename(&self) -> PathBuf { + self.filename.clone() + } + + fn open(&mut self) -> Result<(), EveReaderError> { + self.accept() + } + + fn reopen(&mut self) -> Result<(), EveReaderError> { + // NOTE: this function is not used + self.reader = None; + self.open() + } + + // EveReaderSocket cannot seek + fn goto_lineno(&mut self, _lineno: u64) -> Result { + Ok(0) + } + + fn goto_end(&mut self) -> Result { + Ok(0) + } + + fn offset(&mut self) -> u64 { + 0 + } + + fn next_record(&mut self) -> Result, EveReaderError> { + match &mut self.reader { + Some(reader) => { + let mut line = String::new(); + let len = match reader.read_line(&mut line) { + Ok(len) => len, + Err(_) => { + // Assuming io error: Resource temporarily unavailable + return Ok(None); + } + }; + + if len == 0 { + // The stream is EOF + self.reader = None; + debug!("Socket stream EOF on {}", self.filename.display()); + return Ok(None); + } + + let record: serde_json::Value = + serde_json::from_str(line.as_str()).map_err(|err| { + error!("Failed to parse event: {}", err); + EveReaderError::ParseError(line.to_string()) + })?; + Ok(Some(record)) + } + None => { + // Stream has not been established + let _ = self.accept(); + Ok(None) + } + } + } + + fn metadata(&self) -> Option { + None + } + fn is_file_changed(&self) -> bool { + false + } + fn file_size(&self) -> u64 { + 0 + } +} diff --git a/src/eve/watcher.rs b/src/eve/watcher.rs index d2e26824..dfb13a40 100644 --- a/src/eve/watcher.rs +++ b/src/eve/watcher.rs @@ -4,7 +4,9 @@ use crate::prelude::*; use super::filters::EveFilterChain; -use super::{EveReader, Processor}; +#[cfg(unix)] +use super::EveReaderSocket; +use super::{EveReaderFile, Processor}; use crate::eve::filters::AddAgentFilenameFilter; use crate::importer::EventSink; use std::time::Duration; @@ -14,6 +16,8 @@ use std::{collections::HashSet, path::PathBuf}; /// pipeline when a new file is found. pub(crate) struct EvePatternWatcher { patterns: Vec, + #[cfg(unix)] + sockets: Vec, filenames: HashSet, sink: EventSink, filters: EveFilterChain, @@ -25,6 +29,7 @@ pub(crate) struct EvePatternWatcher { impl EvePatternWatcher { pub fn new( patterns: Vec, + #[cfg(unix)] sockets: Vec, sink: EventSink, filters: EveFilterChain, end: bool, @@ -33,6 +38,8 @@ impl EvePatternWatcher { ) -> Self { Self { patterns, + #[cfg(unix)] + sockets, filenames: HashSet::new(), sink, filters, @@ -66,10 +73,20 @@ impl EvePatternWatcher { } } } + #[cfg(unix)] + for socket in &self.sockets { + let path = PathBuf::from(socket); + if !self.filenames.contains(&path) { + info!("Starting EVE stream socket {}", path.display()); + if self.start_socket(path.clone()) { + self.filenames.insert(path); + } + } + } } fn start_file(&self, filename: &PathBuf) { - let reader = EveReader::new(filename.clone()); + let reader = EveReaderFile::new(filename.clone()); let mut processor = Processor::new(reader, self.sink.clone()); let mut filters = self.filters.clone(); filters.add_filter(AddAgentFilenameFilter::new(filename.display().to_string())); @@ -98,6 +115,32 @@ impl EvePatternWatcher { }); } + #[cfg(unix)] + fn start_socket(&self, filename: PathBuf) -> bool { + let reader = match EveReaderSocket::new(filename.clone()) { + Ok(socket) => socket, + Err(err) => { + warn!( + "Could not create socket file {}: {}", + filename.display(), + err + ); + return false; + } + }; + let mut processor = Processor::new(reader, self.sink.clone()); + let mut filters = self.filters.clone(); + filters.add_filter(AddAgentFilenameFilter::new(filename.display().to_string())); + + processor.filter_chain = Some(filters); + processor.report_interval = Duration::from_secs(60); + info!("Starting EVE processor for {}", filename.display()); + tokio::spawn(async move { + processor.run().await; + }); + true + } + pub fn run(mut self) { tokio::spawn(async move { loop { diff --git a/src/server/main.rs b/src/server/main.rs index 0f9fff5e..89689716 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -181,7 +181,8 @@ pub async fn main(args: &clap::ArgMatches) -> Result<()> { if is_input_enabled(&config) { let input_patterns = get_input_patterns(&config)?; - if input_patterns.is_empty() { + let input_sockets = get_input_sockets(&config)?; + if input_patterns.is_empty() && input_sockets.is_empty() { bail!("EVE input enabled, but no paths provided"); } let sink = context.datastore.get_importer().ok_or(anyhow!( @@ -232,6 +233,8 @@ pub async fn main(args: &clap::ArgMatches) -> Result<()> { let data_directory = server_config.data_directory.clone(); let watcher = EvePatternWatcher::new( input_patterns, + #[cfg(unix)] + input_sockets, sink, filters, end, @@ -332,6 +335,21 @@ fn get_input_patterns(config: &Config) -> Result> { Ok(input_patterns) } +fn get_input_sockets(config: &Config) -> Result> { + let mut eve_sockets: Vec = vec![]; + + match config.get_value::>("input.sockets") { + Ok(Some(filenames)) => { + eve_sockets.extend(filenames); + } + Ok(None) => {} + Err(_) => { + bail!("There was an error reading 'input.sockets' from the configuration file"); + } + } + Ok(eve_sockets) +} + pub(crate) fn build_axum_service( context: Arc, ) -> IntoMakeServiceWithConnectInfo {