diff --git a/src/lib.rs b/src/lib.rs index 9593b12..93df6a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,126 +1,21 @@ use core_affinity::CoreId; -use serde::Deserialize; -use std::fmt::Display; - -pub mod worker; - -/// Main workload configuration, contains general bits for all types of -/// workloads plus workload specific data. -#[derive(Debug, Copy, Clone, Deserialize)] -pub struct WorkloadConfig { - /// An amount of time for workload payload to run before restarting. - pub restart_interval: u64, - - /// Controls per-core mode to handle number of workers. If per-core mode - /// is enabled, `workers` will be treated as a number of workers per CPU - /// core. Otherwise it will be treated as a total number of workers. - #[serde(default = "default_per_core")] - pub per_core: bool, - - /// How many workers to spin, depending on `per_core` in either per-core - /// or total mode. - #[serde(default = "default_workers")] - pub workers: usize, - - /// Custom workload configuration. - pub workload: Workload, - - /// For how long to run the worker. Default value is zero, meaning no limit. - #[serde(default = "default_duration")] - pub duration: u64, -} - -fn default_workers() -> usize { - 1 -} - -fn default_per_core() -> bool { - true -} - -fn default_duration() -> u64 { - 0 -} - -/// Workload specific configuration, contains one enum value for each -/// workload type. -#[derive(Debug, Copy, Clone, Deserialize)] -#[serde(rename_all = "lowercase", tag = "type")] -pub enum Workload { - /// How to listen on ports. - Endpoints { - /// Governing the number of ports open. - #[serde(flatten)] - distribution: Distribution, +use fork::{fork, Fork}; +use itertools::iproduct; +use log::{info, warn}; +use nix::{ + sys::{ + signal::{kill, Signal}, + wait::waitpid, }, + unistd::Pid, +}; +use std::{fmt::Display, sync::Arc, thread}; +use workload::WorkloadConfig; - /// How to spawn processes. - Processes { - /// How often a new process will be spawn. - arrival_rate: f64, +use crate::worker::Worker; - /// How long processes are going to live. - departure_rate: f64, - - /// Spawn a new process with random arguments. - random_process: bool, - }, - - /// How to invoke syscalls - Syscalls { - /// How often to invoke a syscall. - arrival_rate: f64, - }, - - /// How to open network connections - Network { - /// Whether the instance functions as a server or client - server: bool, - - /// Which ip address to use for the server to listen on, - /// or for the client to connect to - address: (u8, u8, u8, u8), - - /// Port for the server to listen on, or for the client - /// to connect to. - target_port: u16, - - /// Rate of opening new connections - arrival_rate: f64, - - /// Rate of closing connections - departure_rate: f64, - - /// Starting number of connections - nconnections: u32, - - /// How often send data via new connections, in milliseconds. - /// The interval is applied for all connections, e.g. an interval - /// of 100 ms for 100 connections means that every 100 ms one out - /// of 100 connections will be allowed to send some data. - /// This parameter allows to control the overhead of sending data, - /// so that it will not impact connections monitoring. - #[serde(default = "default_network_send_interval")] - send_interval: u128, - }, -} - -fn default_network_send_interval() -> u128 { - 100 -} - -/// Distribution for number of ports to listen on -#[derive(Debug, Copy, Clone, Deserialize)] -#[serde(tag = "distribution")] -pub enum Distribution { - /// Few processes are opening large number of ports, the rest are only few. - #[serde(alias = "zipf")] - Zipfian { n_ports: u64, exponent: f64 }, - - /// Every process opens more or less the same number of ports. - #[serde(alias = "uniform")] - Uniform { lower: u64, upper: u64 }, -} +pub mod worker; +pub mod workload; #[derive(Debug)] pub enum WorkerError { @@ -133,15 +28,10 @@ impl Display for WorkerError { } } -/// Generic interface for workers of any type -pub trait Worker { - fn run_payload(&self) -> Result<(), WorkerError>; -} - /// General information for each worker, on which CPU is it running /// and what is the process number. #[derive(Debug, Copy, Clone)] -struct BaseConfig { +pub struct BaseConfig { cpu: CoreId, process: usize, } @@ -152,153 +42,89 @@ impl Display for BaseConfig { } } -#[cfg(test)] -mod tests { - use super::*; - use config::{Config, File, FileFormat}; - - #[test] - fn test_processes() { - let input = r#" - restart_interval = 10 - - [workload] - type = "processes" - arrival_rate = 10.0 - departure_rate = 200.0 - random_process = true - "#; +pub fn run(workload: WorkloadConfig) { + let duration_timer = std::time::SystemTime::now(); + let mut start_port = 1024; + let mut total_ports = 0; + + let core_ids: Vec = if workload.per_core { + // Retrieve the IDs of all active CPU cores. + core_affinity::get_core_ids().unwrap() + } else { + vec![CoreId { id: 0 }] + }; + + let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..workload.workers) + .map(|(cpu, process)| { + let config = BaseConfig { cpu, process }; + let worker = Worker::new(workload.clone(), config, start_port); + + if let Worker::Endpoint(w) = &worker { + start_port += w.size(); + total_ports += w.size(); + } - let config = Config::builder() - .add_source(File::from_str(input, FileFormat::Toml)) - .build() - .expect("failed to parse configuration") - .try_deserialize::() - .expect("failed to deserialize into WorkloadConfig"); + match fork() { + Ok(Fork::Parent(child)) => { + info!("Child {}", child); + Some(child) + } + Ok(Fork::Child) => { + if workload.per_core { + core_affinity::set_for_current(cpu); + } + + loop { + worker.run_payload().unwrap(); + } + } + Err(e) => { + warn!("Failed: {e:?}"); + None + } + } + }) + .collect(); - let WorkloadConfig { - restart_interval, - workload, - .. - } = config; - assert_eq!(restart_interval, 10); - if let Workload::Processes { - arrival_rate, - departure_rate, - random_process, - } = workload - { - assert_eq!(arrival_rate, 10.0); - assert_eq!(departure_rate, 200.0); - assert!(random_process); - } else { - panic!("wrong workload type found"); - } + if total_ports != 0 { + info!("In total: {total_ports}"); } - #[test] - fn test_endpoints_zipf() { - let input = r#" - restart_interval = 10 - - [workload] - type = "endpoints" - distribution = "zipf" - n_ports = 200 - exponent = 1.4 - "#; - - let config = Config::builder() - .add_source(File::from_str(input, FileFormat::Toml)) - .build() - .expect("failed to parse configuration") - .try_deserialize::() - .expect("failed to deserialize into WorkloadConfig"); - - let WorkloadConfig { - restart_interval, - workload, - .. - } = config; - assert_eq!(restart_interval, 10); - - if let Workload::Endpoints { distribution, .. } = workload { - if let Distribution::Zipfian { n_ports, exponent } = distribution { - assert_eq!(n_ports, 200); - assert_eq!(exponent, 1.4); - } else { - panic!("wrong distribution type found"); - } - } else { - panic!("wrong workload type found"); + let handles = Arc::new(handles); + + thread::scope(|s| { + if workload.duration != 0 { + // Cloning the Arc so we can hand it over to the watcher thread + let handles = handles.clone(); + + // Spin a watcher thread + s.spawn(move || loop { + thread::sleep(std::time::Duration::from_secs(1)); + let elapsed = duration_timer.elapsed().unwrap().as_secs(); + + if elapsed > workload.duration { + for handle in handles.iter().flatten() { + info!("Terminating: {}", *handle); + match kill(Pid::from_raw(*handle), Signal::SIGTERM) { + Ok(()) => { + continue; + } + Err(_) => { + continue; + } + } + } + + break; + } + }); } - } - - #[test] - fn test_endpoints_uniform() { - let input = r#" - restart_interval = 10 - - [workload] - type = "endpoints" - distribution = "uniform" - upper = 100 - lower = 1 - "#; - let config = Config::builder() - .add_source(File::from_str(input, FileFormat::Toml)) - .build() - .expect("failed to parse configuration") - .try_deserialize::() - .expect("failed to deserialize into WorkloadConfig"); - - let WorkloadConfig { - restart_interval, - workload, - .. - } = config; - assert_eq!(restart_interval, 10); - - if let Workload::Endpoints { distribution } = workload { - if let Distribution::Uniform { lower, upper } = distribution { - assert_eq!(lower, 1); - assert_eq!(upper, 100); - } else { - panic!("wrong distribution type found"); + s.spawn(move || { + for handle in handles.iter().flatten() { + info!("waitpid: {}", *handle); + waitpid(Pid::from_raw(*handle), None).unwrap(); } - } else { - panic!("wrong workload type found"); - } - } - - #[test] - fn test_syscalls() { - let input = r#" - restart_interval = 10 - - [workload] - type = "syscalls" - arrival_rate = 10.0 - "#; - - let config = Config::builder() - .add_source(File::from_str(input, FileFormat::Toml)) - .build() - .expect("failed to parse configuration") - .try_deserialize::() - .expect("failed to deserialize into WorkloadConfig"); - - let WorkloadConfig { - restart_interval, - workload, - .. - } = config; - assert_eq!(restart_interval, 10); - if let Workload::Syscalls { arrival_rate } = workload { - assert_eq!(arrival_rate, 10.0); - } else { - panic!("wrong workload type found"); - } - } + }); + }); } diff --git a/src/main.rs b/src/main.rs index 8873b96..5588a70 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,22 +18,13 @@ extern crate log; extern crate core_affinity; use config::Config; -use core_affinity::CoreId; -use fork::{fork, Fork}; -use itertools::iproduct; -use nix::sys::signal::{kill, Signal}; -use nix::sys::wait::waitpid; -use nix::unistd::Pid; -use std::time::SystemTime; -use std::{env, thread, time}; -use berserker::{worker::new_worker, WorkloadConfig}; +use berserker::workload::WorkloadConfig; fn main() { - let args: Vec = env::args().collect(); + let args: Vec = std::env::args().collect(); let default_config = String::from("workload.toml"); let config_path = &args.get(1).unwrap_or(&default_config); - let duration_timer = SystemTime::now(); let config = Config::builder() // Add in `./Settings.toml` @@ -54,81 +45,9 @@ fn main() { .try_deserialize::() .unwrap(); - let mut lower = 1024; - let mut upper = 1024; - env_logger::init(); info!("Config: {:?}", config); - let core_ids: Vec = if config.per_core { - // Retrieve the IDs of all active CPU cores. - core_affinity::get_core_ids().unwrap() - } else { - vec![CoreId { id: 0 }] - }; - - let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..config.workers) - .map(|(cpu, process)| { - let worker = - new_worker(config, cpu, process, &mut lower, &mut upper); - - match fork() { - Ok(Fork::Parent(child)) => { - info!("Child {}", child); - Some(child) - } - Ok(Fork::Child) => { - if config.per_core { - core_affinity::set_for_current(cpu); - } - - loop { - worker.run_payload().unwrap(); - } - } - Err(e) => { - warn!("Failed: {e:?}"); - None - } - } - }) - .collect(); - - info!("In total: {}", upper); - - let processes = &handles.clone(); - - thread::scope(|s| { - if config.duration != 0 { - // Spin a watcher thread - s.spawn(move || loop { - thread::sleep(time::Duration::from_secs(1)); - let elapsed = duration_timer.elapsed().unwrap().as_secs(); - - if elapsed > config.duration { - for handle in processes.into_iter().flatten() { - info!("Terminating: {}", *handle); - match kill(Pid::from_raw(*handle), Signal::SIGTERM) { - Ok(()) => { - continue; - } - Err(e) => { - continue; - } - } - } - - break; - } - }); - } - - s.spawn(move || { - for handle in processes.into_iter().flatten() { - info!("waitpid: {}", *handle); - waitpid(Pid::from_raw(*handle), None).unwrap(); - } - }); - }); + berserker::run(config); } diff --git a/src/worker/endpoints.rs b/src/worker/endpoints.rs index 35008cf..b286b47 100644 --- a/src/worker/endpoints.rs +++ b/src/worker/endpoints.rs @@ -1,59 +1,55 @@ -use std::{fmt::Display, net::TcpListener, thread, time}; +use std::{fmt::Display, net::TcpListener, ops::Range, thread, time}; -use core_affinity::CoreId; use log::info; -use crate::{BaseConfig, Worker, WorkerError, WorkloadConfig}; +use crate::{BaseConfig, WorkerError}; -struct EndpointWorkload { - restart_interval: u64, - lower: usize, - upper: usize, +struct PortRange { + start: u16, + length: u16, +} + +impl PortRange { + fn new(start: u16, length: u16) -> Self { + PortRange { start, length } + } + + fn get_range(&self) -> Range { + let end = self.start + self.length; + self.start..end + } } pub struct EndpointWorker { config: BaseConfig, - workload: EndpointWorkload, + restart_interval: u64, + ports: PortRange, } impl EndpointWorker { pub fn new( - workload: WorkloadConfig, - cpu: CoreId, - process: usize, - lower: usize, - upper: usize, + config: BaseConfig, + restart_interval: u64, + start_port: u16, + n_ports: u16, ) -> Self { - let WorkloadConfig { - restart_interval, - workload: _, - per_core: _, - workers: _, - duration: _, - } = workload; + let ports = PortRange::new(start_port, n_ports); EndpointWorker { - config: BaseConfig { cpu, process }, - workload: EndpointWorkload { - restart_interval, - lower, - upper, - }, + config, + restart_interval, + ports, } } -} -impl Worker for EndpointWorker { - fn run_payload(&self) -> Result<(), WorkerError> { + pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); - let EndpointWorkload { - restart_interval, - lower, - upper, - } = self.workload; - - let listeners: Vec<_> = (lower..upper) + // Copy the u64 to prevent moving self into the thread. + let restart_interval = self.restart_interval; + let listeners: Vec<_> = self + .ports + .get_range() .map(|port| thread::spawn(move || listen(port, restart_interval))) .collect(); @@ -63,6 +59,10 @@ impl Worker for EndpointWorker { Ok(()) } + + pub fn size(&self) -> u16 { + self.ports.length + } } impl Display for EndpointWorker { @@ -71,7 +71,7 @@ impl Display for EndpointWorker { } } -fn listen(port: usize, sleep: u64) -> std::io::Result<()> { +fn listen(port: u16, sleep: u64) -> std::io::Result<()> { let addr = format!("0.0.0.0:{port}"); let listener = TcpListener::bind(addr)?; diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 642b84a..11a47de 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1,8 +1,13 @@ -use core_affinity::CoreId; use rand::{thread_rng, Rng}; use rand_distr::{Uniform, Zipf}; -use crate::{Distribution, Worker, Workload, WorkloadConfig}; +use crate::{ + workload::{ + endpoints::{Distribution, Endpoints}, + Workload, + }, + BaseConfig, WorkerError, WorkloadConfig, +}; use self::{ endpoints::EndpointWorker, network::NetworkWorker, @@ -14,48 +19,58 @@ pub mod network; pub mod processes; pub mod syscalls; -pub fn new_worker( - workload: WorkloadConfig, - cpu: CoreId, - process: usize, - lower_bound: &mut usize, - upper_bound: &mut usize, -) -> Box { - match workload.workload { - Workload::Processes { .. } => { - Box::new(ProcessesWorker::new(workload, cpu, process)) +pub enum Worker { + Endpoint(EndpointWorker), + Process(ProcessesWorker), + Syscalls(SyscallsWorker), + Network(NetworkWorker), +} + +impl Worker { + pub fn run_payload(&self) -> Result<(), WorkerError> { + match self { + Worker::Endpoint(e) => e.run_payload(), + Worker::Process(p) => p.run_payload(), + Worker::Syscalls(s) => s.run_payload(), + Worker::Network(n) => n.run_payload(), } - Workload::Endpoints { distribution } => { - match distribution { - Distribution::Zipfian { n_ports, exponent } => { - let n_ports: f64 = thread_rng() - .sample(Zipf::new(n_ports, exponent).unwrap()); + } - *lower_bound = *upper_bound; - *upper_bound += n_ports as usize; - } - Distribution::Uniform { lower, upper } => { - // TODO: Double check this branch - let n_ports = - thread_rng().sample(Uniform::new(lower, upper)); + pub fn new( + workload: WorkloadConfig, + base_config: BaseConfig, + start_port: u16, + ) -> Worker { + match workload.workload { + Workload::Processes(processes) => { + Worker::Process(ProcessesWorker::new(processes, base_config)) + } + Workload::Endpoints(Endpoints { + restart_interval, + distribution, + }) => { + let n_ports: u16 = match distribution { + Distribution::Zipfian { n_ports, exponent } => thread_rng() + .sample(Zipf::new(n_ports, exponent).unwrap()) + as u16, + Distribution::Uniform { lower, upper } => { + thread_rng().sample(Uniform::new(lower, upper)) as u16 + } + }; - *lower_bound = *upper_bound; - *upper_bound += n_ports as usize; - } + Worker::Endpoint(EndpointWorker::new( + base_config, + restart_interval, + start_port, + n_ports, + )) + } + Workload::Syscalls(syscalls) => { + Worker::Syscalls(SyscallsWorker::new(syscalls, base_config)) + } + Workload::Network(network) => { + Worker::Network(NetworkWorker::new(network, base_config)) } - Box::new(EndpointWorker::new( - workload, - cpu, - process, - *lower_bound, - *upper_bound, - )) - } - Workload::Syscalls { .. } => { - Box::new(SyscallsWorker::new(workload, cpu, process)) - } - Workload::Network { .. } => { - Box::new(NetworkWorker::new(workload, cpu, process)) } } } diff --git a/src/worker/network.rs b/src/worker/network.rs index a7fa2fd..f044d98 100644 --- a/src/worker/network.rs +++ b/src/worker/network.rs @@ -1,8 +1,4 @@ -use core_affinity::CoreId; use log::{debug, info, trace}; -use rand::{thread_rng, Rng}; -use rand_distr::Exp; -use std::collections::HashMap; use std::os::unix::io::AsRawFd; use std::str; use std::time::{SystemTime, UNIX_EPOCH}; @@ -13,7 +9,8 @@ use std::{ thread, }; -use crate::{BaseConfig, Worker, WorkerError, Workload, WorkloadConfig}; +use crate::workload::network::Network; +use crate::{BaseConfig, WorkerError}; use smoltcp::iface::{Config, Interface, SocketSet}; use smoltcp::phy::{ @@ -26,29 +23,47 @@ use smoltcp::wire::{EthernetAddress, IpAddress, IpCidr, Ipv4Address}; pub struct NetworkWorker { config: BaseConfig, - workload: WorkloadConfig, + server: bool, + address: Ipv4Address, + target_port: u16, + nconnections: u32, + send_interval: u128, } impl NetworkWorker { - pub fn new(workload: WorkloadConfig, cpu: CoreId, process: usize) -> Self { + pub fn new(workload: Network, config: BaseConfig) -> Self { + let Network { + server, + address, + target_port, + nconnections, + send_interval, + } = workload; + + let address = Ipv4Address([address.0, address.1, address.2, address.3]); + NetworkWorker { - config: BaseConfig { cpu, process }, - workload: workload, + config, + server, + address, + target_port, + nconnections, + send_interval, } } /// Start a simple server. The client side is going to be a networking /// worker as well, so for convenience of troubleshooting do not error /// out if something unexpected happened, log and proceed instead. - fn start_server( - &self, - addr: Ipv4Address, - target_port: u16, - ) -> Result<(), WorkerError> { - debug!("Starting server at {:?}:{:?}", addr, target_port); + fn start_server(&self) -> Result<(), WorkerError> { + debug!( + "Starting server at {:?}:{:?}", + self.address, self.target_port + ); let listener = - TcpListener::bind((addr.to_string(), target_port)).unwrap(); + TcpListener::bind((self.address.to_string(), self.target_port)) + .unwrap(); for stream in listener.incoming() { let mut stream = stream.unwrap(); @@ -94,34 +109,20 @@ impl NetworkWorker { Ok(()) } - fn start_client( - &self, - addr: Ipv4Address, - target_port: u16, - ) -> Result<(), WorkerError> { - let Workload::Network { - server: _, - address: _, - target_port: _, - arrival_rate: _, - departure_rate: _, - nconnections, - send_interval, - } = self.workload.workload - else { - unreachable!() - }; - - debug!("Starting client at {:?}:{:?}", addr, target_port); + fn start_client(&self) -> Result<(), WorkerError> { + debug!( + "Starting client at {:?}:{:?}", + self.address, self.target_port + ); - let (mut iface, mut device, fd) = self.setup_tuntap(addr); + let (mut iface, mut device, fd) = self.setup_tuntap(self.address); let cx = iface.context(); // Open static set of connections, that are going to live throughout // the whole run let mut sockets = SocketSet::new(vec![]); - for _i in 0..nconnections { + for _i in 0..self.nconnections { let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 1024]); let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 1024]); let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer); @@ -136,10 +137,14 @@ impl NetworkWorker { { let index = i; let (local_addr, local_port) = - self.get_local_addr_port(addr, index); + self.get_local_addr_port(self.address, index); info!("connecting from {}:{}", local_addr, local_port); socket - .connect(cx, (addr, target_port), (local_addr, local_port)) + .connect( + cx, + (self.address, self.target_port), + (local_addr, local_port), + ) .unwrap(); } @@ -157,7 +162,7 @@ impl NetworkWorker { iface.poll(timestamp, &mut device, &mut sockets); // Iterate through all sockets, update the state for each one - for (i, (h, s)) in sockets.iter_mut().enumerate() { + for (i, (_, s)) in sockets.iter_mut().enumerate() { let socket = tcp::Socket::downcast_mut(s) .ok_or(WorkerError::Internal)?; @@ -182,7 +187,7 @@ impl NetworkWorker { // purpose is to excercise connection monitoring. // Sending data too frequently we risk producing too much // load and making connetion monitoring less reliable. - if elapsed > send_interval { + if elapsed > self.send_interval { // reset the timer send_timer = SystemTime::now(); @@ -218,7 +223,7 @@ impl NetworkWorker { addr: Ipv4Address, ) -> (Interface, FaultInjector>, i32) { let device_name = "tun0"; - let device = TunTapInterface::new(&device_name, Medium::Ip).unwrap(); + let device = TunTapInterface::new(device_name, Medium::Ip).unwrap(); let fd = device.as_raw_fd(); let seed = SystemTime::now() @@ -275,33 +280,16 @@ impl NetworkWorker { (((index / 100) + 2) % 255) as u8, ); - return (local_addr, local_port); + (local_addr, local_port) } -} -impl Worker for NetworkWorker { - fn run_payload(&self) -> Result<(), WorkerError> { + pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); - let Workload::Network { - server, - address, - target_port, - arrival_rate: _, - departure_rate: _, - nconnections: _, - send_interval: _, - } = self.workload.workload - else { - unreachable!() - }; - - let ip_addr = Ipv4Address([address.0, address.1, address.2, address.3]); - - if server { - let _ = self.start_server(ip_addr, target_port); + if self.server { + let _ = self.start_server(); } else { - let _ = self.start_client(ip_addr, target_port); + let _ = self.start_client(); } Ok(()) diff --git a/src/worker/processes.rs b/src/worker/processes.rs index 3028e87..8e66008 100644 --- a/src/worker/processes.rs +++ b/src/worker/processes.rs @@ -1,40 +1,28 @@ use std::{fmt::Display, process::Command, thread, time}; -use core_affinity::CoreId; use fork::{fork, Fork}; use log::{info, warn}; use nix::{sys::wait::waitpid, unistd::Pid}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use rand_distr::Exp; -use crate::{BaseConfig, Worker, WorkerError, Workload, WorkloadConfig}; +use crate::{workload, BaseConfig, WorkerError}; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub struct ProcessesWorker { config: BaseConfig, - workload: WorkloadConfig, + workload: workload::Processes, } impl ProcessesWorker { - pub fn new(workload: WorkloadConfig, cpu: CoreId, process: usize) -> Self { - ProcessesWorker { - config: BaseConfig { cpu, process }, - workload, - } + pub fn new(workload: workload::Processes, config: BaseConfig) -> Self { + ProcessesWorker { config, workload } } fn spawn_process(&self, lifetime: u64) -> Result<(), WorkerError> { - let Workload::Processes { - arrival_rate: _, - departure_rate: _, - random_process, - } = self.workload.workload - else { - unreachable!() - }; let BaseConfig { cpu, process } = self.config; - if random_process { + if self.workload.random_process { let uniq_arg: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(7) @@ -62,26 +50,21 @@ impl ProcessesWorker { } } } -} -impl Worker for ProcessesWorker { - fn run_payload(&self) -> Result<(), WorkerError> { + pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); - let Workload::Processes { + let workload::Processes { arrival_rate, departure_rate, random_process: _, - } = self.workload.workload - else { - unreachable!() - }; + } = self.workload; loop { let lifetime: f64 = thread_rng().sample(Exp::new(departure_rate).unwrap()); - let worker = *self; + let worker = self.clone(); thread::spawn(move || { worker.spawn_process((lifetime * 1000.0).round() as u64) }); diff --git a/src/worker/syscalls.rs b/src/worker/syscalls.rs index 9290bc3..7b3bc88 100644 --- a/src/worker/syscalls.rs +++ b/src/worker/syscalls.rs @@ -1,24 +1,23 @@ use std::{fmt::Display, thread, time}; -use core_affinity::CoreId; use log::{info, warn}; use rand::{thread_rng, Rng}; use rand_distr::Exp; use syscalls::{syscall, Sysno}; -use crate::{BaseConfig, Worker, WorkerError, Workload, WorkloadConfig}; +use crate::{workload, BaseConfig, WorkerError}; #[derive(Debug, Copy, Clone)] pub struct SyscallsWorker { config: BaseConfig, - workload: WorkloadConfig, + arrival_rate: f64, } impl SyscallsWorker { - pub fn new(workload: WorkloadConfig, cpu: CoreId, process: usize) -> Self { + pub fn new(workload: workload::Syscalls, config: BaseConfig) -> Self { SyscallsWorker { - config: BaseConfig { cpu, process }, - workload, + config, + arrival_rate: workload.arrival_rate, } } @@ -31,16 +30,10 @@ impl SyscallsWorker { } } } -} -impl Worker for SyscallsWorker { - fn run_payload(&self) -> Result<(), WorkerError> { + pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); - let Workload::Syscalls { arrival_rate } = self.workload.workload else { - unreachable!() - }; - loop { let worker = *self; thread::spawn(move || { @@ -48,7 +41,7 @@ impl Worker for SyscallsWorker { }); let interval: f64 = - thread_rng().sample(Exp::new(arrival_rate).unwrap()); + thread_rng().sample(Exp::new(self.arrival_rate).unwrap()); info!( "{}-{}: Interval {}, rounded {}", self.config.cpu.id, diff --git a/src/workload/endpoints.rs b/src/workload/endpoints.rs new file mode 100644 index 0000000..05fd851 --- /dev/null +++ b/src/workload/endpoints.rs @@ -0,0 +1,38 @@ +use std::fmt::Display; + +use serde::Deserialize; + +#[derive(Debug, Clone, Deserialize)] +pub struct Endpoints { + /// An amount of time for the workload to run before restarting + pub restart_interval: u64, + + /// Governing the number of ports open. + #[serde(flatten)] + pub distribution: Distribution, +} + +impl Display for Endpoints { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Using {} distribution", self.distribution) + } +} + +/// Distribution for number of ports to listen on +#[derive(Debug, Copy, Clone, Deserialize)] +#[serde(tag = "distribution")] +pub enum Distribution { + /// Few processes are opening large number of ports, the rest are only few. + #[serde(alias = "zipf")] + Zipfian { n_ports: u64, exponent: f64 }, + + /// Every process opens more or less the same number of ports. + #[serde(alias = "uniform")] + Uniform { lower: u64, upper: u64 }, +} + +impl Display for Distribution { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } +} diff --git a/src/workload/mod.rs b/src/workload/mod.rs new file mode 100644 index 0000000..369b7c4 --- /dev/null +++ b/src/workload/mod.rs @@ -0,0 +1,218 @@ +use serde::Deserialize; + +use self::{endpoints::Endpoints, network::Network}; + +pub(crate) mod endpoints; +pub(crate) mod network; + +/// Main workload configuration, contains general bits for all types of +/// workloads plus workload specific data. +#[derive(Debug, Clone, Deserialize)] +pub struct WorkloadConfig { + /// Controls per-core mode to handle number of workers. If per-core mode + /// is enabled, `workers` will be treated as a number of workers per CPU + /// core. Otherwise it will be treated as a total number of workers. + #[serde(default = "default_per_core")] + pub per_core: bool, + + /// How many workers to spin, depending on `per_core` in either per-core + /// or total mode. + #[serde(default = "default_workers")] + pub workers: usize, + + /// Custom workload configuration. + pub workload: Workload, + + /// For how long to run the worker. Default value is zero, meaning no limit. + #[serde(default = "default_duration")] + pub duration: u64, +} + +fn default_workers() -> usize { + 1 +} + +fn default_per_core() -> bool { + true +} + +fn default_duration() -> u64 { + 0 +} + +/// Workload specific configuration, contains one enum value for each +/// workload type. +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "lowercase", tag = "type")] +pub enum Workload { + /// How to listen on ports. + Endpoints(Endpoints), + + /// How to spawn processes. + Processes(Processes), + + /// How to invoke syscalls + Syscalls(Syscalls), + + /// How to open network connections + Network(Network), +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Processes { + /// How often a new process will be spawn. + pub arrival_rate: f64, + + /// How long processes are going to live. + pub departure_rate: f64, + + /// Spawn a new process with random arguments. + pub random_process: bool, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Syscalls { + /// How often to invoke a syscall. + pub arrival_rate: f64, +} + +#[cfg(test)] +mod tests { + use super::*; + use config::{Config, File, FileFormat}; + + #[test] + fn test_processes() { + let input = r#" + [workload] + type = "processes" + arrival_rate = 10.0 + departure_rate = 200.0 + random_process = true + "#; + + let config = Config::builder() + .add_source(File::from_str(input, FileFormat::Toml)) + .build() + .expect("failed to parse configuration") + .try_deserialize::() + .expect("failed to deserialize into WorkloadConfig"); + + let WorkloadConfig { workload, .. } = config; + if let Workload::Processes(Processes { + arrival_rate, + departure_rate, + random_process, + .. + }) = workload + { + assert_eq!(arrival_rate, 10.0); + assert_eq!(departure_rate, 200.0); + assert!(random_process); + } else { + panic!("wrong workload type found"); + } + } + + #[test] + fn test_endpoints_zipf() { + let input = r#" + [workload] + type = "endpoints" + restart_interval = 10 + distribution = "zipf" + n_ports = 200 + exponent = 1.4 + "#; + + let config = Config::builder() + .add_source(File::from_str(input, FileFormat::Toml)) + .build() + .expect("failed to parse configuration") + .try_deserialize::() + .expect("failed to deserialize into WorkloadConfig"); + + let WorkloadConfig { workload, .. } = config; + + if let Workload::Endpoints(Endpoints { + restart_interval, + distribution, + }) = workload + { + assert_eq!(restart_interval, 10); + + if let endpoints::Distribution::Zipfian { n_ports, exponent } = + distribution + { + assert_eq!(n_ports, 200); + assert_eq!(exponent, 1.4); + } else { + panic!("wrong distribution type found"); + } + } else { + panic!("wrong workload type found"); + } + } + + #[test] + fn test_endpoints_uniform() { + let input = r#" + [workload] + type = "endpoints" + restart_interval = 10 + distribution = "uniform" + upper = 100 + lower = 1 + "#; + + let config = Config::builder() + .add_source(File::from_str(input, FileFormat::Toml)) + .build() + .expect("failed to parse configuration") + .try_deserialize::() + .expect("failed to deserialize into WorkloadConfig"); + + let WorkloadConfig { workload, .. } = config; + + if let Workload::Endpoints(Endpoints { + restart_interval, + distribution, + }) = workload + { + assert_eq!(restart_interval, 10); + if let endpoints::Distribution::Uniform { lower, upper } = + distribution + { + assert_eq!(lower, 1); + assert_eq!(upper, 100); + } else { + panic!("wrong distribution type found"); + } + } else { + panic!("wrong workload type found"); + } + } + + #[test] + fn test_syscalls() { + let input = r#" + [workload] + type = "syscalls" + arrival_rate = 10.0 + "#; + + let config = Config::builder() + .add_source(File::from_str(input, FileFormat::Toml)) + .build() + .expect("failed to parse configuration") + .try_deserialize::() + .expect("failed to deserialize into WorkloadConfig"); + + let WorkloadConfig { workload, .. } = config; + if let Workload::Syscalls(Syscalls { arrival_rate, .. }) = workload { + assert_eq!(arrival_rate, 10.0); + } else { + panic!("wrong workload type found"); + } + } +} diff --git a/src/workload/network.rs b/src/workload/network.rs new file mode 100644 index 0000000..b008987 --- /dev/null +++ b/src/workload/network.rs @@ -0,0 +1,31 @@ +use serde::Deserialize; + +#[derive(Debug, Clone, Deserialize)] +pub struct Network { + /// Whether the instance functions as a server or client + pub server: bool, + + /// Which ip address to use for the server to listen on, + /// or for the client to connect to + pub address: (u8, u8, u8, u8), + + /// Port for the server to listen on, or for the client + /// to connect to. + pub target_port: u16, + + /// Starting number of connections + pub nconnections: u32, + + /// How often send data via new connections, in milliseconds. + /// The interval is applied for all connections, e.g. an interval + /// of 100 ms for 100 connections means that every 100 ms one out + /// of 100 connections will be allowed to send some data. + /// This parameter allows to control the overhead of sending data, + /// so that it will not impact connections monitoring. + #[serde(default = "default_network_send_interval")] + pub send_interval: u128, +} + +fn default_network_send_interval() -> u128 { + 10 +} diff --git a/workloads/endpoints-uniform.toml b/workloads/endpoints-uniform.toml index 197ba6e..3cf9a47 100644 --- a/workloads/endpoints-uniform.toml +++ b/workloads/endpoints-uniform.toml @@ -1,7 +1,6 @@ -restart_interval = 10 - [workload] type = "endpoints" +restart_interval = 10 distribution = "uniform" upper = 100 lower = 1 diff --git a/workloads/endpoints-zipf.toml b/workloads/endpoints-zipf.toml index 8680572..42d2fa4 100644 --- a/workloads/endpoints-zipf.toml +++ b/workloads/endpoints-zipf.toml @@ -1,7 +1,6 @@ -restart_interval = 10 - [workload] type = "endpoints" +restart_interval = 10 distribution = "zipf" n_ports = 200 exponent = 1.4 diff --git a/workloads/network.toml b/workloads/network.toml index 2f338c6..d01c1ff 100644 --- a/workloads/network.toml +++ b/workloads/network.toml @@ -1,5 +1,3 @@ -restart_interval = 10 - [workload] type = "network" server = false diff --git a/workloads/processes.toml b/workloads/processes.toml index f161df0..b265d47 100644 --- a/workloads/processes.toml +++ b/workloads/processes.toml @@ -1,5 +1,3 @@ -restart_interval = 10 - [workload] type = "processes" arrival_rate = 10.0 diff --git a/workloads/syscalls.toml b/workloads/syscalls.toml index 0cbdcfc..b0c99ac 100644 --- a/workloads/syscalls.toml +++ b/workloads/syscalls.toml @@ -1,5 +1,3 @@ -restart_interval = 10 - [workload] type = "syscalls" arrival_rate = 10.0