Skip to content

Commit e25bb0f

Browse files
committed
server: Add support for reading eve-log through unix socket file
clippy clean on unix and windows cargo fmt changes applied
1 parent 84a922a commit e25bb0f

File tree

10 files changed

+343
-58
lines changed

10 files changed

+343
-58
lines changed

examples/agent.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ input:
4040
- "/var/log/suricata/eve.json"
4141
- "/var/log/suricata/eve.*.json"
4242

43+
# Suricata can write the EVE log to a unix stream socket by specifying
44+
# the eve-log filetype attribute as 'unix_stream' instead of 'regular'.
45+
# EveBox will create the sockets below and listen for events once connected.
46+
#sockets:
47+
# - "/var/log/suricata/eve.log.socket"
48+
4349
# Additional fields that will be added to each event. This is currently limited
4450
# to strings at this time.
4551
additional-fields:

examples/evebox.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ input:
100100
# - /usr/share/suricata/rules/*.rules
101101
# - /etc/suricata/rules/*.rules
102102

103+
# Suricata can write the EVE log to a unix stream socket by specifying
104+
# the eve-log filetype attribute as 'unix_stream' instead of 'regular'.
105+
# EveBox will create the sockets below and listen for events once connected.
106+
#sockets:
107+
# - "/var/log/suricata/eve.log.socket"
108+
103109
geoip:
104110
disabled: false
105111
# Path to the MaxMind database. This must be the version 2 database

src/bookmark.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl Bookmark {
7373

7474
#[cfg(not(unix))]
7575
fn check_inode(&self, _meta: &std::fs::Metadata) -> bool {
76-
return true;
76+
true
7777
}
7878
}
7979

src/cli/agent.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ pub async fn main(args_matches: &clap::ArgMatches) -> anyhow::Result<()> {
119119

120120
// Collect eve filenames.
121121
let eve_filenames = get_eve_filenames(&config)?;
122-
if eve_filenames.is_empty() {
122+
let eve_sockets = get_eve_sockets(&config)?;
123+
if eve_filenames.is_empty() && eve_sockets.is_empty() {
123124
bail!("No EVE log files provided. Exiting as there is nothing to do.");
124125
}
125126

@@ -224,6 +225,17 @@ pub async fn main(args_matches: &clap::ArgMatches) -> anyhow::Result<()> {
224225
}
225226
}
226227
}
228+
#[cfg(unix)]
229+
for path in &eve_sockets {
230+
if !log_runners.contains_key(path) {
231+
info!("Starting EVE stream socket reader {}", path);
232+
log_runners.insert(path.clone(), true);
233+
match start_socket_runner(path, importer.clone(), filters.clone()) {
234+
Ok(runner) => tasks.push(runner),
235+
Err(err) => warn!("Could not create socket file {}: {}", path, err),
236+
}
237+
}
238+
}
227239
tokio::select! {
228240
_ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {}
229241
_ = tasks.select_next_some() => {
@@ -240,7 +252,7 @@ fn start_runner(
240252
mut filters: EveFilterChain,
241253
) -> JoinHandle<()> {
242254
let mut end = false;
243-
let reader = crate::eve::reader::EveReader::new(filename.into());
255+
let reader = crate::eve::reader::EveReaderFile::new(filename.into());
244256
let bookmark_filename = get_bookmark_filename(filename, bookmark_directory);
245257
if let Some(bookmark_filename) = &bookmark_filename {
246258
info!("Using bookmark file: {:?}", bookmark_filename);
@@ -263,6 +275,26 @@ fn start_runner(
263275
})
264276
}
265277

278+
#[cfg(unix)]
279+
fn start_socket_runner(
280+
filename: &str,
281+
importer: EventSink,
282+
mut filters: EveFilterChain,
283+
) -> Result<JoinHandle<()>, eve::EveReaderError> {
284+
let reader = crate::eve::reader::EveReaderSocket::new(filename.into())?;
285+
let mut processor = crate::eve::Processor::new(reader, importer);
286+
287+
filters.add_filter(eve::filters::AddAgentFilenameFilter::new(
288+
filename.to_string(),
289+
));
290+
291+
processor.filter_chain = Some(filters);
292+
processor.report_interval = std::time::Duration::from_secs(60);
293+
Ok(tokio::spawn(async move {
294+
processor.run().await;
295+
}))
296+
}
297+
266298
fn find_config_filename() -> Option<&'static str> {
267299
let paths = ["./agent.yaml", "/etc/evebox/agent.yaml"];
268300
for path in paths {
@@ -322,6 +354,21 @@ fn get_eve_filenames(config: &Config) -> anyhow::Result<Vec<String>> {
322354
Ok(eve_filenames)
323355
}
324356

357+
fn get_eve_sockets(config: &Config) -> anyhow::Result<Vec<String>> {
358+
let mut eve_sockets: Vec<String> = vec![];
359+
360+
match config.get_value::<Vec<String>>("input.sockets") {
361+
Ok(Some(filenames)) => {
362+
eve_sockets.extend(filenames);
363+
}
364+
Ok(None) => {}
365+
Err(_) => {
366+
bail!("There was an error reading 'input.sockets' from the configuration file");
367+
}
368+
}
369+
Ok(eve_sockets)
370+
}
371+
325372
fn get_rule_filenames(config: &Config) -> anyhow::Result<Vec<String>> {
326373
match config.get_value::<Vec<String>>("rules") {
327374
Ok(Some(filenames)) => Ok(filenames),

src/cli/oneshot.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::prelude::*;
55

66
use crate::config::Config;
77
use crate::eve;
8+
use crate::eve::EveReader;
89
use crate::geoip;
910
use crate::server::main::build_axum_service;
1011
use crate::server::metrics::Metrics;
@@ -171,7 +172,7 @@ async fn run_import(
171172
) -> anyhow::Result<()> {
172173
let geoipdb = geoip::GeoIP::open(None).ok();
173174
let mut indexer = sqlite::importer::SqliteEventSink::new(sqlx, writer, metrics);
174-
let mut reader = eve::reader::EveReader::new(input.into());
175+
let mut reader = eve::reader::EveReaderFile::new(input.into());
175176
info!("Reading {} ({} bytes)", input, reader.file_size());
176177
let mut last_percent = 0;
177178
let mut count = 0;

src/eve/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,7 @@ pub(crate) mod watcher;
1111
pub(crate) use eve::Eve;
1212
pub(crate) use processor::Processor;
1313
pub(crate) use reader::EveReader;
14+
pub(crate) use reader::EveReaderError;
15+
pub(crate) use reader::EveReaderFile;
16+
#[cfg(unix)]
17+
pub(crate) use reader::EveReaderSocket;

src/eve/processor.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use super::filters::EveFilterChain;
1818

1919
const DEFAULT_BATCH_SIZE: usize = 100;
2020

21-
pub(crate) struct Processor {
22-
pub reader: EveReader,
21+
pub(crate) struct Processor<R: EveReader> {
22+
pub reader: R,
2323
pub importer: EventSink,
2424
pub filter_chain: Option<EveFilterChain>,
2525
pub bookmark_filename: Option<PathBuf>,
@@ -33,8 +33,8 @@ pub(crate) struct Processor {
3333
pub oneshot: bool,
3434
}
3535

36-
impl Processor {
37-
pub fn new(reader: EveReader, importer: EventSink) -> Self {
36+
impl<R: EveReader> Processor<R> {
37+
pub fn new(reader: R, importer: EventSink) -> Self {
3838
Self {
3939
reader,
4040
importer,
@@ -96,7 +96,7 @@ impl Processor {
9696
if self.report_interval > Duration::from_secs(0)
9797
&& last_report.elapsed() > self.report_interval
9898
{
99-
debug!(filename = ?self.reader.filename, "count={}, commits={}, eofs={}", count, commits, eofs);
99+
debug!(filename = ?self.reader.get_filename(), "count={}, commits={}, eofs={}", count, commits, eofs);
100100
count = 0;
101101
commits = 0;
102102
eofs = 0;
@@ -106,7 +106,7 @@ impl Processor {
106106
Err(err) => {
107107
error!(
108108
"Failed to read event from {}: {}",
109-
self.reader.filename.display(),
109+
self.reader.get_filename().display(),
110110
err
111111
);
112112
self.sleep_for(1000).await;
@@ -119,10 +119,14 @@ impl Processor {
119119
} else if !self.oneshot && self.reader.is_file_changed() {
120120
info!(
121121
"File may have been rotated, will reopen: filename={:?}",
122-
self.reader.filename
122+
self.reader.get_filename()
123123
);
124124
if let Err(err) = self.reader.reopen() {
125-
error!("Failed to reopen {:?}, error={}", self.reader.filename, err);
125+
error!(
126+
"Failed to reopen {:?}, error={}",
127+
self.reader.get_filename(),
128+
err
129+
);
126130
}
127131
}
128132

@@ -152,7 +156,7 @@ impl Processor {
152156
// give up some CPU to other tasks.
153157
self.sleep_for(0).await;
154158
}
155-
info!(filename = ?self.reader.filename, "count={}, commits={}, eofs={}", count, commits, eofs);
159+
info!(filename = ?self.reader.get_filename(), "count={}, commits={}, eofs={}", count, commits, eofs);
156160
}
157161

158162
async fn sleep_for(&self, millis: u64) {

0 commit comments

Comments
 (0)