) -> Result> {
+ /// Ok(TestActor::new())
+ /// }
+ /// }
+ ///
+ /// #[ntest::timeout(100000)]
+ /// fn main() {
+ /// let mut actor_config = TyraConfig::new().unwrap();
+ /// actor_config.thread_pool.config.insert(String::from("default"), ThreadPoolConfig::new(2, 1, 1, 1.0));
+ /// let actor_system = ActorSystem::new(actor_config);
+ ///
+ /// let actor_name = "test";
+ /// //this works, because there's no actor called `test` yet on the pool
+ /// let this_works = actor_system.builder().spawn_multiple(actor_name, TestActorFactory::new(), 2);
+ /// assert!(this_works.is_ok(), "The actors could not be spawned");
+ ///
+ /// //this works, because there's already an actor called `test` with type `TestActor` on the pool, therefore the result is the same actor that was created in the previous spawn command
+ /// let this_works_as_well = actor_system.builder().spawn_multiple(actor_name, TestActorFactory::new(), 2);
+ /// assert!(this_works_as_well.is_ok(), "The `ActorWrapper` could not be fetched");
+ ///
+ /// //this does not work, because the pool is currently configured to only allow two actors
+ /// let pool_full = actor_system.builder().spawn_multiple("full", TestActorFactory::new(), 10);
+ /// assert!(pool_full.is_err(), "The actor could not be spawned");
+ /// let err = pool_full.err().unwrap();
+ /// assert_eq!(err, ActorError::ThreadPoolHasTooManyActorsError, "Error is not correct");
+ ///
+ /// actor_system.stop();
+ /// std::process::exit(actor_system.await_shutdown());
+ /// }
+ /// ```
+ pub fn spawn_multiple(
+ &self,
+ name: impl Into + Clone + std::fmt::Display,
+ props: P,
+ spawn_count: usize,
+ ) -> Result>, ActorError>
+ where
+ P: ActorFactory + 'static + Clone,
+ {
+ let mut to_return = Vec::new();
+ for i in 0..spawn_count {
+ let name = format!("{}-{}", name.clone(), i);
+ let res = self.spawn(name, props.clone());
+ match res {
+ Ok(res) => {
+ to_return.push(res);
+ }
+ Err(e) => {
+ for actor in &to_return {
+ let _ = actor.stop();
+ }
+ return Err(e);
+ }
+ }
+ }
+
+ return Ok(to_return);
+ }
}
diff --git a/src/actor/actor_send_error.rs b/src/actor/actor_send_error.rs
index d4627bf..7c44c9e 100644
--- a/src/actor/actor_send_error.rs
+++ b/src/actor/actor_send_error.rs
@@ -9,4 +9,8 @@ pub enum ActorSendError {
/// Triggered by [ActorWrapper.send](../prelude/struct.ActorWrapper.html#method.send) && [ActorWrapper.send_timeout](../prelude/struct.ActorWrapper.html#method.send_timout) when a message is sent to a stopped Actor
#[error("Message could not be delivered")]
AlreadyStoppedError,
+
+ /// Triggered by [ActorWrapper](../prelude/struct.ActorWrapper.html) if a message can't be send to remote Actors
+ #[error("Message can't be delivered to remote Actor")]
+ NotAllowedForRemoteActorError
}
diff --git a/src/actor/context.rs b/src/actor/context.rs
index 6d8e8b7..a021c55 100644
--- a/src/actor/context.rs
+++ b/src/actor/context.rs
@@ -1,5 +1,4 @@
-use crate::actor::actor_wrapper::ActorWrapper;
-use crate::prelude::Actor;
+use crate::prelude::{Actor, ActorWrapper};
use crate::system::actor_system::ActorSystem;
use std::panic::UnwindSafe;
diff --git a/src/actor/executor.rs b/src/actor/executor.rs
index c785dcd..5505291 100644
--- a/src/actor/executor.rs
+++ b/src/actor/executor.rs
@@ -2,14 +2,14 @@ use crate::actor::actor_address::ActorAddress;
use crate::actor::actor_config::ActorConfig;
use crate::actor::actor_factory::ActorFactory;
use crate::actor::actor_state::ActorState;
-use crate::actor::actor_wrapper::ActorWrapper;
+
use crate::actor::context::ActorContext;
use crate::actor::handler::Handler;
use crate::actor::mailbox::Mailbox;
use crate::message::actor_message::BaseActorMessage;
use crate::message::envelope::{MessageEnvelope, MessageEnvelopeTrait};
use crate::message::system_stop_message::SystemStopMessage;
-use crate::prelude::{Actor, ActorPanicSource, ActorResult};
+use crate::prelude::{Actor, ActorPanicSource, ActorResult, ActorWrapper};
use crate::system::actor_error::ActorError;
use crate::system::actor_system::ActorSystem;
use log::debug;
@@ -113,6 +113,7 @@ where
}
fn stop_actor(&mut self, immediately: bool) -> ActorState {
+ let _ = catch_unwind(AssertUnwindSafe(|| self.actor.pre_stop(&self.context)));
self.mailbox.is_stopped.store(true, Ordering::Relaxed);
if immediately {
let _ = catch_unwind(AssertUnwindSafe(|| self.actor.post_stop(&self.context)));
diff --git a/src/actor/handler.rs b/src/actor/handler.rs
index b0b32df..35ac12b 100644
--- a/src/actor/handler.rs
+++ b/src/actor/handler.rs
@@ -15,11 +15,13 @@ use std::error::Error;
///
/// ```rust
/// use std::error::Error;
+/// use serde::Serialize;
/// use tyra::prelude::{TyraConfig, ActorSystem, ActorFactory, ActorContext, SerializedMessage, Handler, Actor, ActorResult, ActorMessage};
///
/// struct TestActor {}
/// impl Actor for TestActor {}
///
+/// #[derive(Hash, Serialize)]
/// struct FooBar {}
/// impl ActorMessage for FooBar {}
///
diff --git a/src/actor/mod.rs b/src/actor/mod.rs
index 71c518c..4b743ee 100644
--- a/src/actor/mod.rs
+++ b/src/actor/mod.rs
@@ -7,7 +7,6 @@ pub mod actor_panic_source;
pub mod actor_result;
pub mod actor_send_error;
pub mod actor_state;
-pub mod actor_wrapper;
pub mod context;
pub mod executor;
pub mod handler;
@@ -15,12 +14,12 @@ pub mod mailbox;
pub mod prelude {
pub use crate::actor::actor::Actor;
+ pub use crate::actor::actor_address::ActorAddress;
pub use crate::actor::actor_builder::ActorBuilder;
pub use crate::actor::actor_factory::ActorFactory;
pub use crate::actor::actor_panic_source::ActorPanicSource;
pub use crate::actor::actor_result::ActorResult;
pub use crate::actor::actor_send_error::ActorSendError;
- pub use crate::actor::actor_wrapper::ActorWrapper;
pub use crate::actor::context::ActorContext;
pub use crate::actor::handler::Handler;
}
diff --git a/src/config/cluster_config.rs b/src/config/cluster_config.rs
new file mode 100644
index 0000000..7964f88
--- /dev/null
+++ b/src/config/cluster_config.rs
@@ -0,0 +1,8 @@
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Serialize, Deserialize, Clone)]
+pub struct ClusterConfig {
+ pub enabled: bool,
+ pub hosts: Vec,
+ pub members: Vec,
+}
diff --git a/src/config/default.toml b/src/config/default.toml
index c04de50..dc09940 100644
--- a/src/config/default.toml
+++ b/src/config/default.toml
@@ -2,8 +2,11 @@
# general actor settings
[general]
# name of the actor system
+# magic default: name of the cargo package; defaults to "tyra" if not built with cargo
+name = "$CARGO_PKG_NAME"
+# hostname of the actor system
# magic default: system hostname
-name = "$HOSTNAME"
+hostname = "$HOSTNAME"
# default mailbox size for every actor if no explicit size is set
# 0 is treated as unlimited
default_mailbox_size = 0
@@ -11,6 +14,11 @@ default_mailbox_size = 0
default_message_throughput = 15
# defines if the rust panic hook should be overwritten by the actor system on startup
override_panic_hook = true
+# will start a graceful shutdown on first SIGINT, SIGTERM or SIGHUP, or force a shutdown if multiple signals are received.
+enable_signal_handling = true
+# maximum graceful shutdown time before system is hard killed 0 disables graceful shutdown
+# waning: setting to 0 can result in endlessly running applications
+graceful_timeout_in_seconds = 300
# default pool settings
[thread_pool.config.default]
@@ -34,4 +42,75 @@ threads_min = 2
# maximum amount of threads to spawn for this pool
threads_max = 3
# num_cpu * factor = amount of threads to spawn for this pool
-threads_factor = 1
\ No newline at end of file
+threads_factor = 1
+
+# cluster pool settings
+# only enabled if `cluster.enabled` is set to `true`
+[thread_pool.config.cluster]
+# amount of actors that this thread_pool can handle
+# 0 is treated as unlimited
+actor_limit = 5
+# minimum amount of threads to spawn for this pool
+threads_min = 2
+# maximum amount of threads to spawn for this pool
+threads_max = 5
+# num_cpu * factor = amount of threads to spawn for this pool
+threads_factor = 1
+
+# cluster configuration
+[cluster]
+# defines if cluster functionality should be enabled
+enabled = false
+## addresses and port used by the cluster.
+## if port is omitted, the default of 2022 is used
+## if port is set to 0, a port will automatically be assigned
+hosts = ["tcp://0.0.0.0:2022", "udp://0.0.0.0:2023"]
+## list of cluster members, entries can include IPs and DNS records, multi A records are supported as well
+## if the same node is listed multiple times with different ip addresses, only the first working occurence will be used as a connection
+## if port is omitted, the default of 2022 is used
+members = [
+ "tcp://registry.git.sers.dev",
+ "tcp://git.sers.dev",
+ "tcp://abc.sers.dev:1234",
+]
+
+##WIP: How cluster config could look like
+#####
+#
+#
+## default cluster settings
+#[cluster]
+
+#hosts = ["tcp://127.0.0.1:2022", "tcp://192.168.0.1:2022", "udp://0.0.0.0:2023"]
+#
+## cluster groups the node should be part of. Valid values are "server", "client" and "proxy"
+## - server: can be elected as master node and will open specified ports in hosts
+## - client: can't be elected as master node and will ignore cluster.hosts config
+## - proxy: will forward traffic between two nodes that can't directly communicate with each other
+#groups = ["server", "client", "proxy"]
+#
+## list of cluster members, entries can include IPs and DNS records, multi A records are supported as well
+## if the same node is listed multiple times with different ip addresses, only the first working occurence will be used as a connection
+## if port is omitted, the default of 2022 is used
+#members = [
+# "tcp://jkasfhjklahsjkghjklalkj.ahjasdjkhaskhjg.asdf:1234",
+# "tcp://registry.git.sers.dev",
+# "tcp://git.sers.dev",
+# "tcp://abc.sers.dev:1234",
+#]
+## list of cidrs that are allowed to connect
+#trusted_cidrs = [
+# "78.47.25.243/32",
+# "78.47.42.31/32",
+#]
+#
+## name of the pre shared key that should be used to encrypt outgoing traffic
+#active_psk = "default"
+#
+## list of encryption keys that are used in the cluster
+## new connections will go through trial and error to find which key is used by which server
+#[cluster.pre_shared_keys]
+#default = "sers"
+#additional = "hello-world"
+#
+#
\ No newline at end of file
diff --git a/src/config/global_config.rs b/src/config/global_config.rs
index 403ab0d..f3eacad 100644
--- a/src/config/global_config.rs
+++ b/src/config/global_config.rs
@@ -3,7 +3,10 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GeneralConfig {
pub name: String,
+ pub hostname: String,
pub default_mailbox_size: usize,
pub default_message_throughput: usize,
pub override_panic_hook: bool,
+ pub enable_signal_handling: bool,
+ pub graceful_timeout_in_seconds: u64,
}
diff --git a/src/config/mod.rs b/src/config/mod.rs
index d1b8c07..8429384 100644
--- a/src/config/mod.rs
+++ b/src/config/mod.rs
@@ -1,8 +1,11 @@
pub mod global_config;
pub mod pool_config;
pub mod tyra_config;
+pub mod cluster_config;
pub mod prelude {
+ pub use crate::config::global_config::GeneralConfig;
pub use crate::config::pool_config::ThreadPoolConfig;
pub use crate::config::tyra_config::TyraConfig;
+ pub use crate::config::cluster_config::ClusterConfig;
}
diff --git a/src/config/tyra_config.rs b/src/config/tyra_config.rs
index cf34275..36e25ec 100644
--- a/src/config/tyra_config.rs
+++ b/src/config/tyra_config.rs
@@ -4,6 +4,7 @@ use std::path::Path;
use config::{Config, ConfigError, File, FileFormat};
use serde::{Deserialize, Serialize};
+use crate::prelude::ClusterConfig;
pub const DEFAULT_POOL: &str = "default";
@@ -12,6 +13,7 @@ pub const DEFAULT_POOL: &str = "default";
pub struct TyraConfig {
pub general: GeneralConfig,
pub thread_pool: PoolConfig,
+ pub cluster: ClusterConfig,
}
impl TyraConfig {
@@ -46,8 +48,11 @@ impl TyraConfig {
let conf = config.build().expect("Could not fetch Config");
let mut parsed: TyraConfig = conf.try_deserialize().expect("Could not parse Config");
- if parsed.general.name == "$HOSTNAME" {
- parsed.general.name = String::from(hostname::get().unwrap().to_str().unwrap());
+ if parsed.general.hostname == "$HOSTNAME" {
+ parsed.general.hostname = String::from(hostname::get().unwrap().to_str().unwrap());
+ }
+ if parsed.general.name == "$CARGO_PKG_NAME" {
+ parsed.general.name = option_env!("CARGO_PKG_NAME").unwrap_or("tyra").into();
}
Ok(parsed)
diff --git a/src/lib.rs b/src/lib.rs
index face0c6..85a8f61 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -15,8 +15,10 @@
//! use std::process::exit;
//! use std::time::Duration;
//! use std::error::Error;
+//! use serde::Serialize;
//!
//! // define message
+//! #[derive(Hash, Serialize)]
//! struct FooBar {}
//! impl ActorMessage for FooBar {}
//!
@@ -25,6 +27,7 @@
//! impl Actor for HelloWorld {}
//!
//! // setup required Factory
+//! #[derive(Clone)]
//! struct HelloWorldFactory {}
//! impl ActorFactory for HelloWorldFactory {
//! fn new_actor(&mut self, _context: ActorContext) -> Result> {
@@ -56,7 +59,7 @@
//!
//! // cleanup
//! actor.stop().unwrap();
-//! actor_system.stop(Duration::from_millis(5000));
+//! actor_system.stop();
//! exit(actor_system.await_shutdown());
//! }
//! ```
@@ -107,15 +110,19 @@
mod actor;
mod config;
mod message;
+mod net;
mod routers;
mod system;
+mod wrapper;
/// core components
pub mod prelude {
pub use crate::actor::prelude::*;
pub use crate::config::prelude::*;
pub use crate::message::prelude::*;
+ pub use crate::net::prelude::*;
pub use crate::system::prelude::*;
+ pub use crate::wrapper::prelude::*;
}
/// collection of different router implementations
diff --git a/src/message/actor_init_message.rs b/src/message/actor_init_message.rs
index 161cf45..ff1ec83 100644
--- a/src/message/actor_init_message.rs
+++ b/src/message/actor_init_message.rs
@@ -1,6 +1,8 @@
use crate::prelude::ActorMessage;
+use serde::{Deserialize, Serialize};
/// Can be implemented by an Actor through Handler to be used to init an Actor
+#[derive(Hash, Serialize, Deserialize)]
pub struct ActorInitMessage {}
impl ActorInitMessage {
diff --git a/src/message/actor_message.rs b/src/message/actor_message.rs
index c734aba..44ddcc7 100644
--- a/src/message/actor_message.rs
+++ b/src/message/actor_message.rs
@@ -1,18 +1,18 @@
+use serde::Serialize;
+use std::collections::hash_map::DefaultHasher;
+use std::hash::{Hash, Hasher};
+
/// This trait is used internally by the `ActorSystem` and builds the base for all messaging
/// It's automatically implemented by the `ActorMessage` trait that should be used
///
/// It is used by Messages defined in the system
/// All messages that use this trait directly should also implement a dynamic `Handler` that applies to any `Actor`
-pub trait BaseActorMessage: Send + Sync {}
+pub trait BaseActorMessage: Send + Sync + Hash + Serialize {}
+
/// This trait is used by Messages defined by the system
/// All messages that use this trait should also implement a dynamic `Handler` that applies to any `Actor`
-pub trait DefaultActorMessage: Send + Sync {
- /// returns the message id
- fn get_id(&self) -> usize {
- return 0;
- }
-}
+pub trait DefaultActorMessage: Send + Sync + Hash + Serialize {}
impl BaseActorMessage for A where A: DefaultActorMessage {}
@@ -23,17 +23,21 @@ impl BaseActorMessage for A where A: DefaultActorMessage {}
/// Basic usage:
///
/// ```rust
+/// use serde::Serialize;
/// use tyra::prelude::ActorMessage;
///
+/// #[derive(Hash, Serialize)]
/// struct FooBar {}
/// impl ActorMessage for FooBar {}
/// ```
-pub trait ActorMessage: Send + Sync {
- /// returns the message id
- fn get_id(&self) -> usize {
- return 0;
+pub trait ActorMessage: Send + Sync + Hash + Serialize {
+ /// returns the message hash
+ fn get_hash(&self) -> u64 {
+ let mut hasher = DefaultHasher::new();
+ self.hash(&mut hasher);
+ return hasher.finish();
}
}
/// this should be `BaseActorMessage` but it's currently not possible because of https://github.com/rust-lang/rust/issues/20400
-impl DefaultActorMessage for A where A: ActorMessage {}
+impl DefaultActorMessage for A where A: ActorMessage + Serialize {}
diff --git a/src/message/actor_stop_message.rs b/src/message/actor_stop_message.rs
index d1d38a8..ae09a65 100644
--- a/src/message/actor_stop_message.rs
+++ b/src/message/actor_stop_message.rs
@@ -1,5 +1,7 @@
use crate::message::actor_message::DefaultActorMessage;
+use serde::{Deserialize, Serialize};
+#[derive(Hash, Serialize, Deserialize)]
pub struct ActorStopMessage {}
impl ActorStopMessage {
diff --git a/src/message/bulk_actor_message.rs b/src/message/bulk_actor_message.rs
index 115b6fe..4c86d49 100644
--- a/src/message/bulk_actor_message.rs
+++ b/src/message/bulk_actor_message.rs
@@ -1,8 +1,10 @@
/// Bulk Actor Message, that can wrap and send multiple [ActorMessage](../prelude/trait.ActorMessage.html) at once
///
use crate::message::actor_message::BaseActorMessage;
+use serde::Serialize;
/// Wraps multiple [ActorMessage](../prelude/trait.ActorMessage.html) to be sent to an Actor
+#[derive(Hash, Serialize)]
pub struct BulkActorMessage
where
M: BaseActorMessage + 'static,
diff --git a/src/message/delayed_message.rs b/src/message/delayed_message.rs
index 77dfef2..1cd445a 100644
--- a/src/message/delayed_message.rs
+++ b/src/message/delayed_message.rs
@@ -1,8 +1,15 @@
use crate::message::actor_message::BaseActorMessage;
use crate::prelude::{Actor, ActorMessage, ActorWrapper};
+use serde::Serialize;
+use std::hash::{Hash, Hasher};
use std::time::{Duration, Instant};
/// Wraps an [ActorMessage](../prelude/trait.ActorMessage.html) to be sent at a later time
+#[derive(Serialize)]
+#[serde(bound(
+serialize = "A: Actor",
+deserialize = "A: Actor",
+))]
pub struct DelayedMessage
where
M: BaseActorMessage + 'static,
@@ -11,6 +18,7 @@ where
pub msg: M,
pub destination: ActorWrapper,
pub delay: Duration,
+ #[serde(skip)]
pub started: Instant,
}
@@ -36,3 +44,13 @@ where
}
}
}
+
+impl Hash for DelayedMessage
+where
+ M: BaseActorMessage + 'static,
+ A: Actor,
+{
+ fn hash(&self, state: &mut H) {
+ self.msg.hash(state);
+ }
+}
diff --git a/src/message/serialized_message.rs b/src/message/serialized_message.rs
index abc97dd..e2ef896 100644
--- a/src/message/serialized_message.rs
+++ b/src/message/serialized_message.rs
@@ -1,4 +1,7 @@
+use std::hash::{Hash, Hasher};
use crate::message::actor_message::DefaultActorMessage;
+use serde::{Deserialize, Serialize};
+use crate::prelude::ActorAddress;
/// For Remote message handling
///
@@ -8,13 +11,24 @@ use crate::message::actor_message::DefaultActorMessage;
/// and it may also include some additional fields to make deserialization easier for end users
///
/// [ActorSystem.send_to_address](../prelude/struct.ActorSystem.html#method.send_to_address) uses this object to send serialized messages to Actors
+#[derive(Serialize, Deserialize)]
pub struct SerializedMessage {
+ pub destination_address: ActorAddress,
pub content: Vec,
}
+impl Hash for SerializedMessage {
+ fn hash(&self, state: &mut H) {
+ self.destination_address.hash(state);
+ }
+}
+
impl SerializedMessage {
- pub fn new(content: Vec) -> Self {
- Self { content }
+ pub fn new(destination_address: ActorAddress, content: Vec) -> Self {
+ Self {
+ destination_address,
+ content,
+ }
}
}
diff --git a/src/message/sleep_message.rs b/src/message/sleep_message.rs
index 045da71..224a316 100644
--- a/src/message/sleep_message.rs
+++ b/src/message/sleep_message.rs
@@ -1,7 +1,9 @@
use crate::message::actor_message::DefaultActorMessage;
+use serde::{Deserialize, Serialize};
use std::time::Duration;
/// Puts an actor to sleep for a specified time
+#[derive(Hash, Serialize, Deserialize)]
pub struct SleepMessage {
pub duration: Duration,
}
diff --git a/src/message/system_stop_message.rs b/src/message/system_stop_message.rs
index bca3cfb..5516a4e 100644
--- a/src/message/system_stop_message.rs
+++ b/src/message/system_stop_message.rs
@@ -1,5 +1,7 @@
use crate::message::actor_message::DefaultActorMessage;
+use serde::{Deserialize, Serialize};
+#[derive(Hash, Serialize, Deserialize)]
pub struct SystemStopMessage {}
impl SystemStopMessage {
diff --git a/src/net/mod.rs b/src/net/mod.rs
new file mode 100644
index 0000000..b324f44
--- /dev/null
+++ b/src/net/mod.rs
@@ -0,0 +1,14 @@
+mod net_config;
+mod net_manager;
+pub mod net_messages;
+mod net_worker;
+
+pub mod prelude {
+ pub use crate::net::net_config::NetConfig;
+ pub use crate::net::net_config::NetProtocol;
+ pub use crate::net::net_config::NetConnectionType;
+ pub use crate::net::net_manager::NetManager;
+ pub use crate::net::net_manager::NetManagerFactory;
+ pub use crate::net::net_worker::NetWorker;
+ pub use crate::net::net_worker::NetWorkerFactory;
+}
diff --git a/src/net/net_config.rs b/src/net/net_config.rs
new file mode 100644
index 0000000..1876084
--- /dev/null
+++ b/src/net/net_config.rs
@@ -0,0 +1,30 @@
+#[derive(Clone, Ord, Eq, PartialOrd, PartialEq, Copy, Debug)]
+pub enum NetProtocol {
+ TCP,
+ UDP,
+}
+
+#[derive(Clone, Ord, Eq, PartialOrd, PartialEq, Copy, Debug)]
+pub enum NetConnectionType {
+ CLIENT,
+ SERVER
+}
+
+#[derive(Clone, Ord, Eq, PartialOrd, PartialEq, Debug)]
+pub struct NetConfig {
+ pub protocol: NetProtocol,
+ pub connection_type: NetConnectionType,
+ pub host: String,
+ pub port: usize,
+}
+
+impl NetConfig {
+ pub fn new(protocol: NetProtocol, connection_type: NetConnectionType, host: impl Into, port: usize) -> Self {
+ Self {
+ protocol,
+ connection_type,
+ host: host.into(),
+ port,
+ }
+ }
+}
diff --git a/src/net/net_manager.rs b/src/net/net_manager.rs
new file mode 100644
index 0000000..2f607e2
--- /dev/null
+++ b/src/net/net_manager.rs
@@ -0,0 +1,483 @@
+use crate::net::net_messages::{
+ AddTcpConnection, AddUdpSocket, ReceiveTcpMessage, ReceiveUdpMessage, RemoveTcpConnection,
+};
+use crate::prelude::{Actor, ActorContext, ActorFactory, ActorInitMessage, ActorResult, ActorWrapper, Handler, NetConfig, NetConnectionType, NetProtocol};
+use crate::router::Router;
+use io_arc::IoArc;
+use log::{debug, error, warn};
+use mio::event::Source;
+use mio::net::{TcpListener, TcpStream, UdpSocket};
+use mio::{Events, Interest, Poll, Token};
+use std::collections::HashMap;
+use std::error::Error;
+use std::io::{BufRead, BufReader};
+use std::marker::PhantomData;
+use std::net::Shutdown;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::thread;
+use std::thread::sleep;
+use std::time::{Duration, Instant};
+
+pub struct NetManager
+where
+ T: Handler
+ + Handler
+ + Handler
+ + Handler
+ + Handler
+ + 'static,
+ R: Router + Handler + Handler + Handler + Handler + Handler,
+{
+ graceful_shutdown_time_in_seconds: Duration,
+ on_stop_udp_timeout: Duration,
+ router: ActorWrapper,
+ workers: Vec>,
+ server_configs: Vec,
+ client_configs: Vec,
+ is_stopping: Arc,
+ is_stopped: Arc,
+}
+
+impl NetManager
+where
+ T: Handler
+ + Handler
+ + Handler
+ + Handler
+ + Handler
+ + 'static,
+ R: Router + Handler + Handler + Handler + Handler + Handler,
+{
+ pub fn new(
+ server_configs: Vec,
+ client_configs: Vec,
+ graceful_shutdown_time_in_seconds: Duration,
+ on_stop_udp_timeout: Duration,
+ workers: Vec>,
+ router: ActorWrapper,
+ ) -> Self
+ {
+ let is_stopping = Arc::new(AtomicBool::new(false));
+ let is_stopped = Arc::new(AtomicBool::new(false));
+
+ return Self {
+ graceful_shutdown_time_in_seconds,
+ on_stop_udp_timeout,
+ router,
+ workers,
+ server_configs,
+ client_configs,
+ is_stopping,
+ is_stopped,
+ };
+ }
+}
+impl Actor for NetManager
+where
+ T: Handler
+ + Handler
+ + Handler
+ + Handler
+ + Handler
+ + 'static,
+ R: Router + Handler + Handler + Handler + Handler + Handler,
+
+{
+ fn pre_stop(&mut self, _context: &ActorContext) {
+ let iterate_graceful_stop_sleep_duration_in_seconds = 1;
+ let sleep_duration = Duration::from_secs(iterate_graceful_stop_sleep_duration_in_seconds);
+
+ let iterations = if self.graceful_shutdown_time_in_seconds.as_secs() > 0 {
+ self.graceful_shutdown_time_in_seconds.as_secs() / iterate_graceful_stop_sleep_duration_in_seconds
+ } else {
+ 0
+ };
+
+ sleep(sleep_duration);
+
+ self.is_stopping.store(true, Ordering::Relaxed);
+
+ let mut i = 0;
+ loop {
+ if self.graceful_shutdown_time_in_seconds.as_secs() > 0 {
+ if i > iterations {
+ return;
+ }
+ i += 1;
+ }
+
+ for net_config in &self.server_configs {
+ let address = format!("{}:{}", net_config.host, net_config.port);
+ match net_config.protocol {
+ NetProtocol::TCP => {
+ let _ = TcpStream::connect(address.parse().unwrap());
+ break;
+ }
+ NetProtocol::UDP => {
+ let sock = UdpSocket::bind("127.0.0.1:0".parse().unwrap());
+ if sock.is_ok() {
+ let sock = sock.unwrap();
+ let _ = sock.send_to(b"", address.parse().unwrap());
+ }
+ break;
+ }
+ }
+ }
+ if self.is_stopped.load(Ordering::Relaxed) {
+ return;
+ }
+ sleep(sleep_duration);
+ }
+ }
+ fn post_stop(&mut self, _context: &ActorContext) {
+ let _ = self.router.stop();
+ for worker in &self.workers {
+ let _ = worker.stop();
+ }
+ for worker in &self.workers {
+ worker.wait_for_stop();
+ }
+
+ self.is_stopped.store(true, Ordering::Relaxed);
+ for net_config in &self.server_configs {
+ let address = format!("{}:{}", net_config.host, net_config.port);
+ match net_config.protocol {
+ NetProtocol::TCP => {
+ let _ = TcpStream::connect(address.parse().unwrap());
+ break;
+ }
+ NetProtocol::UDP => {
+ let sock = UdpSocket::bind("127.0.0.1:0".parse().unwrap());
+ if sock.is_ok() {
+ let sock = sock.unwrap();
+ let _ = sock.send_to(b"", address.parse().unwrap());
+ }
+ break;
+ }
+ }
+ }
+ }
+}
+
+pub struct NetManagerFactory
+where
+
+ T: Handler
+ + Handler
+ + Handler
+ + Handler
+ + Handler
+ + 'static,
+ R: Router + Handler + Handler + Handler + Handler + Handler,
+{
+ server_configs: Vec,
+ client_configs: Vec,
+ graceful_shutdown_time_in_seconds: Duration,
+ on_stop_udp_timeout: Duration,
+ workers: Vec>,
+ router: ActorWrapper,
+ phantom: PhantomData
+}
+
+impl NetManagerFactory
+where
+ T: Handler
+ + Handler
+ + Handler
+ + Handler
+ + Handler
+ + 'static,
+ R: Router + Handler + Handler + Handler + Handler + Handler,
+{
+ pub fn new(
+ server_configs: Vec,
+ client_configs: Vec,
+ graceful_shutdown_time_in_seconds: Duration,
+ on_stop_udp_timeout: Duration,
+ workers: Vec>,
+ router: ActorWrapper,
+ ) -> Self {
+ return Self {
+ server_configs,
+ client_configs,
+ graceful_shutdown_time_in_seconds,
+ on_stop_udp_timeout,
+ workers,
+ router,
+ phantom: PhantomData,
+ };
+ }
+}
+impl ActorFactory> for NetManagerFactory
+where
+ T: Handler
+ + Handler
+ + Handler
+ + Handler
+ + Handler
+ + 'static,
+ R: Router + Handler + Handler + Handler + Handler + Handler,
+{
+ fn new_actor(
+ &mut self,
+ context: ActorContext>,
+ ) -> Result, Box> {
+ context.actor_ref.send(ActorInitMessage::new()).unwrap();
+ return Ok(NetManager::new(
+ self.server_configs.clone(),
+ self.client_configs.clone(),
+ self.graceful_shutdown_time_in_seconds,
+ self.on_stop_udp_timeout,
+ self.workers.clone(),
+ self.router.clone(),
+ ));
+ }
+}
+
+impl Handler for NetManager
+where
+ T: Handler
+ + Handler
+ + Handler
+ + Handler
+ + Handler
+ + 'static,
+ R: Router + Handler + Handler + Handler + Handler + Handler,
+
+{
+ fn handle(
+ &mut self,
+ _msg: ActorInitMessage,
+ context: &ActorContext,
+ ) -> Result> {
+ let router = self.router.clone();
+ let is_stopping = self.is_stopping.clone();
+ let is_stopped = self.is_stopped.clone();
+ let mut server_configs = self.server_configs.clone();
+ let mut client_configs = self.client_configs.clone();
+ let mut last_udp_message_received = Instant::now();
+ let on_stop_udp_timeout = self.on_stop_udp_timeout.clone();
+ let context = context.clone();
+ thread::spawn(move || {
+ let mut tcp_listeners: HashMap = HashMap::new();
+ let mut udp_sockets: HashMap> = HashMap::new();
+ let poll = Poll::new();
+ if poll.is_err() {
+ error!("Can't start Poll Port: {:?}", poll.err());
+ is_stopped.store(true, Ordering::Relaxed);
+ let _ = context.actor_ref.stop();
+ return;
+ }
+ let mut poll = poll.unwrap();
+
+ let mut i = 0;
+ server_configs.sort_by_key(|c| c.protocol);
+ client_configs.sort_by_key(|c| c.protocol);
+ server_configs.append(&mut client_configs);
+ for net_config in &server_configs {
+
+ let token = Token(i);
+ i += 1;
+
+ if net_config.connection_type == NetConnectionType::CLIENT {
+ let client = TcpStream::connect(format!("{}:{}", net_config.host, net_config.port).parse().unwrap());
+ if client.is_err() {
+ warn!("Can't connect to client: {:?}", net_config);
+ continue;
+ }
+ let mut client = client.unwrap();
+ let res =
+ poll.registry()
+ .register(&mut client, token, Interest::READABLE);
+ if res.is_err() {
+ error!("Can't register TCP Stream: {:?}", res.err());
+ is_stopped.store(true, Ordering::Relaxed);
+ let _ = context.actor_ref.stop();
+ return;
+ }
+ //tcp_listeners.insert(token, client);
+ println!("!!!!!!?");
+ continue;
+ }
+ let address = format!("{}:{}", net_config.host, net_config.port)
+ .parse()
+ .unwrap();
+
+ match net_config.protocol {
+ NetProtocol::TCP => {
+ let listener = TcpListener::bind(address);
+ if listener.is_err() {
+ error!("Can't open TCP Port: {:?}", listener.err());
+ is_stopped.store(true, Ordering::Relaxed);
+ let _ = context.actor_ref.stop();
+ return;
+ }
+ let mut listener = listener.unwrap();
+ let res =
+ poll.registry()
+ .register(&mut listener, token, Interest::READABLE);
+ if res.is_err() {
+ error!("Can't register TCP listener: {:?}", res.err());
+ is_stopped.store(true, Ordering::Relaxed);
+ let _ = context.actor_ref.stop();
+ return;
+ }
+ tcp_listeners.insert(token, listener);
+ }
+ NetProtocol::UDP => {
+ let socket = UdpSocket::bind(address);
+ if socket.is_err() {
+ error!("Can't open TCP Port: {:?}", socket.err());
+ is_stopped.store(true, Ordering::Relaxed);
+ let _ = context.actor_ref.stop();
+ return;
+ }
+ let mut socket = socket.unwrap();
+ let res = poll
+ .registry()
+ .register(&mut socket, token, Interest::READABLE);
+ if res.is_err() {
+ error!("Can't register UDP Socket: {:?}", res.err());
+ is_stopped.store(true, Ordering::Relaxed);
+ let _ = context.actor_ref.stop();
+ return;
+ }
+ let socket = IoArc::new(socket);
+ udp_sockets.insert(token, socket.clone());
+ let _ = router.send(AddUdpSocket::new(token.0, socket));
+ }
+ }
+ }
+ let num_tcp_listeners = tcp_listeners.len();
+ let num_total_listeners = server_configs.len();
+
+ let mut events = Events::with_capacity(1024);
+ let mut streams = HashMap::new();
+
+ let mut buf = [0; 65535];
+ loop {
+ if is_stopped.load(Ordering::Relaxed) {
+ return;
+ }
+
+ let res = poll.poll(&mut events, Some(Duration::from_secs(10)));
+ if res.is_err() {
+ debug!("Can't poll Network Events");
+ continue;
+ }
+
+ for event in events.iter() {
+ let stopping = is_stopping.load(Ordering::Relaxed);
+ if stopping
+ && streams.len() == 0
+ && last_udp_message_received.elapsed() > on_stop_udp_timeout
+ {
+ is_stopped.store(true, Ordering::Relaxed);
+ break;
+ }
+ let token = &event.token();
+ if token.0 < num_tcp_listeners {
+ println!("!!!!!!!");
+
+ let listener = tcp_listeners.get(token);
+ if listener.is_none() {
+ warn!("Can't find TcpListener for {:?}", token);
+ continue;
+ }
+ let listener = listener.unwrap();
+
+ loop {
+ match listener.accept() {
+ Ok((mut socket, address)) => {
+ if stopping {
+ let _ = socket.shutdown(Shutdown::Both);
+ continue;
+ }
+ let res = socket.register(
+ poll.registry(),
+ Token(i),
+ Interest::READABLE,
+ );
+ if res.is_err() {
+ error!("Could not register TcpStream. {:?}", res.err());
+ }
+ let sock = IoArc::new(socket);
+ streams.insert(i, (sock.clone(), address.clone()));
+ let _ = router.send(AddTcpConnection::new(i, sock, address));
+
+ i += 1;
+ if i < num_total_listeners {
+ i = num_total_listeners;
+ }
+ }
+ Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ break;
+ }
+ Err(e) => {
+ error!("Something went wrong with the Listener. {:?}", e);
+ break;
+ }
+ }
+ }
+ } else if token.0 < num_total_listeners {
+ //UDP handling
+ let socket = udp_sockets.get(&token);
+ if socket.is_none() {
+ error!("Something went wrong with the UDP Socket.");
+ }
+ let socket = socket.unwrap();
+
+ let (len, from) = match socket.as_ref().recv_from(&mut buf) {
+ Ok(v) => v,
+
+ Err(e) => {
+ if e.kind() == std::io::ErrorKind::WouldBlock {
+ continue;
+ }
+ panic!("recv() failed: {:?}", e);
+ }
+ };
+ let request = String::from_utf8_lossy(&buf[..len]);
+ let _ = router.send(ReceiveUdpMessage::new(
+ token.0,
+ from,
+ request.into_owned(),
+ ));
+ last_udp_message_received = Instant::now();
+ } else {
+ if event.is_read_closed() || event.is_write_closed() {
+ let _ = streams.remove(&token.0);
+ let _ = router.send(RemoveTcpConnection::new(token.0));
+ } else if event.is_readable() {
+ let stream = streams.get(&token.0);
+ if stream.is_none() {
+ let _ = streams.remove(&token.0);
+ let _ = router.send(RemoveTcpConnection::new(token.0));
+ continue;
+ }
+ let (stream, _address) = stream.unwrap();
+ let buf_reader = BufReader::new(stream.clone());
+ let request: Vec = buf_reader
+ .lines()
+ .map(|result| match result {
+ Ok(res) => {
+ return res;
+ }
+ Err(_err) => {
+ return String::from("");
+ }
+ })
+ .take_while(|line| !line.is_empty())
+ .collect();
+ if !request.is_empty() {
+ let _ = router.send(ReceiveTcpMessage::new(token.0, request));
+ }
+ }
+ }
+ }
+ }
+ });
+ return Ok(ActorResult::Ok);
+ }
+}
diff --git a/src/net/net_messages.rs b/src/net/net_messages.rs
new file mode 100644
index 0000000..9bb4f09
--- /dev/null
+++ b/src/net/net_messages.rs
@@ -0,0 +1,118 @@
+use crate::prelude::ActorMessage;
+use io_arc::IoArc;
+use mio::net::{TcpStream, UdpSocket};
+use serde::Serialize;
+use std::hash::{Hash, Hasher};
+use std::net::SocketAddr;
+
+#[derive(Serialize)]
+pub struct AddTcpConnection {
+ pub stream_id: usize,
+ #[serde(skip)]
+ pub stream: IoArc,
+ pub address: SocketAddr,
+}
+
+impl Hash for AddTcpConnection {
+ fn hash(&self, state: &mut H) {
+ self.stream_id.hash(state);
+ }
+}
+
+impl AddTcpConnection {
+ pub fn new(stream_id: usize, stream: IoArc, address: SocketAddr) -> Self {
+ return Self {
+ stream_id,
+ stream,
+ address,
+ };
+ }
+}
+
+#[derive(Serialize)]
+pub struct RemoveTcpConnection {
+ pub stream_id: usize,
+}
+
+impl Hash for RemoveTcpConnection {
+ fn hash(&self, state: &mut H) {
+ self.stream_id.hash(state);
+ }
+}
+
+impl ActorMessage for AddTcpConnection {}
+
+impl RemoveTcpConnection {
+ pub fn new(stream_id: usize) -> Self {
+ return Self { stream_id };
+ }
+}
+
+impl ActorMessage for RemoveTcpConnection {}
+
+#[derive(Serialize)]
+pub struct ReceiveTcpMessage {
+ pub stream_id: usize,
+ pub request: Vec,
+}
+
+impl Hash for ReceiveTcpMessage {
+ fn hash(&self, state: &mut H) {
+ self.stream_id.hash(state);
+ }
+}
+
+impl ReceiveTcpMessage {
+ pub fn new(stream_id: usize, request: Vec) -> Self {
+ return Self { stream_id, request };
+ }
+}
+
+impl ActorMessage for ReceiveTcpMessage {}
+
+#[derive(Serialize)]
+pub struct AddUdpSocket {
+ pub socket_id: usize,
+ #[serde(skip)]
+ pub socket: IoArc,
+}
+
+impl Hash for AddUdpSocket {
+ fn hash(&self, state: &mut H) {
+ self.socket_id.hash(state);
+ }
+}
+
+impl AddUdpSocket {
+ pub fn new(socket_id: usize, socket: IoArc) -> Self {
+ return Self { socket_id, socket };
+ }
+}
+
+impl ActorMessage for AddUdpSocket {}
+
+#[derive(Serialize)]
+pub struct ReceiveUdpMessage {
+ pub socket_id: usize,
+ #[serde(skip)]
+ pub source: SocketAddr,
+ pub request: String,
+}
+
+impl Hash for ReceiveUdpMessage {
+ fn hash(&self, state: &mut H) {
+ self.socket_id.hash(state);
+ }
+}
+
+impl ReceiveUdpMessage {
+ pub fn new(socket_id: usize, source: SocketAddr, request: String) -> Self {
+ return Self {
+ socket_id,
+ source,
+ request,
+ };
+ }
+}
+
+impl ActorMessage for ReceiveUdpMessage {}
diff --git a/src/net/net_worker.rs b/src/net/net_worker.rs
new file mode 100644
index 0000000..793b7c8
--- /dev/null
+++ b/src/net/net_worker.rs
@@ -0,0 +1,160 @@
+use crate::net::net_messages::{
+ AddTcpConnection, AddUdpSocket, ReceiveTcpMessage, ReceiveUdpMessage, RemoveTcpConnection,
+};
+use crate::prelude::{Actor, ActorContext, ActorFactory, ActorResult, Handler, SerializedMessage};
+use io_arc::IoArc;
+use log::{debug, trace, warn};
+use mio::net::{TcpStream, UdpSocket};
+use std::collections::HashMap;
+use std::error::Error;
+use std::io::Write;
+use std::net::{Shutdown, SocketAddr};
+
+#[derive(Clone)]
+pub struct NetWorker {
+ streams: HashMap, SocketAddr)>,
+ sockets: HashMap>,
+}
+
+impl NetWorker {
+ pub fn new() -> Self {
+ return Self {
+ streams: HashMap::new(),
+ sockets: HashMap::new(),
+ };
+ }
+}
+impl Actor for NetWorker {
+ fn on_system_stop(
+ &mut self,
+ _context: &ActorContext,
+ ) -> Result> {
+ //we intentionally ignore if the actor system is stopped
+ //we only react if the actor is explicitly stopped by the manager, because there might still be open connections that we don't want to drop
+ Ok(ActorResult::Ok)
+ }
+
+ fn handle_serialized_message(&mut self, _msg: SerializedMessage, _context: &ActorContext) -> Result> {
+ //handle all outgoing messages here
+ return Ok(ActorResult::Ok);
+ }
+}
+
+#[derive(Clone)]
+pub struct NetWorkerFactory {}
+impl ActorFactory for NetWorkerFactory {
+ fn new_actor(
+ &mut self,
+ _context: ActorContext,
+ ) -> Result> {
+ return Ok(NetWorker::new());
+ }
+}
+
+impl NetWorkerFactory {
+ pub fn new() -> Self {
+ return Self {};
+ }
+}
+
+impl Handler for NetWorker {
+ fn handle(
+ &mut self,
+ msg: ReceiveTcpMessage,
+ _context: &ActorContext,
+ ) -> Result> {
+ let stream = self.streams.get_mut(&msg.stream_id);
+
+ if stream.is_none() {
+ // temporary implementation for our instant http response, later on we won't have to care here if the stream is active, we'll just forward the message
+ debug!("Stream ID no longer exists, can't reply to request");
+ return Ok(ActorResult::Ok);
+ }
+ let (stream, _) = stream.unwrap();
+ stream.write_all("HTTP/1.1 200 OK\nContent-Type: text/html\nConnection: keep-alive\nContent-Length: 12\r\n\r\nHELLO-WORLD!".as_bytes()).unwrap();
+
+ // temporary implementation for our instant http response
+ // drops the connection if keep-alive has not been specified
+ let mut shutdown_connection = true;
+ for k in msg.request {
+ if k == "Connection: Keep-Alive" {
+ shutdown_connection = false;
+ break;
+ }
+ }
+ if shutdown_connection {
+ let _ = stream.as_ref().shutdown(Shutdown::Both);
+ }
+
+ return Ok(ActorResult::Ok);
+ }
+}
+
+impl Handler for NetWorker {
+ fn handle(
+ &mut self,
+ msg: AddTcpConnection,
+ _context: &ActorContext,
+ ) -> Result> {
+ trace!("Add TCP Connection: {:?}", msg.address);
+ let key_already_exists = self.streams.remove(&msg.stream_id);
+ if key_already_exists.is_some() {
+ warn!("Stream ID already exists, dropping old one in favor of the new connection.");
+ let (stream, _) = key_already_exists.unwrap();
+ let _ = stream.as_ref().shutdown(Shutdown::Both);
+ }
+
+ let _ = self
+ .streams
+ .insert(msg.stream_id, (msg.stream, msg.address));
+ return Ok(ActorResult::Ok);
+ }
+}
+
+impl Handler for NetWorker {
+ fn handle(
+ &mut self,
+ msg: RemoveTcpConnection,
+ _context: &ActorContext,
+ ) -> Result> {
+ trace!("Remove TCP Connection: {:?}", msg.stream_id);
+
+ let _ = self.streams.remove(&msg.stream_id);
+ return Ok(ActorResult::Ok);
+ }
+}
+
+impl Handler for NetWorker {
+ fn handle(
+ &mut self,
+ msg: ReceiveUdpMessage,
+ _context: &ActorContext,
+ ) -> Result> {
+ let socket = self.sockets.get_mut(&msg.socket_id);
+ if socket.is_none() {
+ // temporary implementation for our instant http response, later on we won't have to care here if the stream is active, we'll just forward the message
+ debug!("Socket ID no longer exists, can't reply to request");
+ return Ok(ActorResult::Ok);
+ }
+ let socket = socket.unwrap();
+ let _ = socket.as_ref().send_to(msg.request.as_bytes(), msg.source);
+
+ return Ok(ActorResult::Ok);
+ }
+}
+
+impl Handler for NetWorker {
+ fn handle(
+ &mut self,
+ msg: AddUdpSocket,
+ _context: &ActorContext,
+ ) -> Result> {
+ let key_already_exists = self.sockets.remove(&msg.socket_id);
+ if key_already_exists.is_some() {
+ warn!("Socket ID already exists, dropping old one in favor of the new.");
+ }
+
+ let _ = self.sockets.insert(msg.socket_id, msg.socket);
+ return Ok(ActorResult::Ok);
+ }
+}
diff --git a/src/routers/add_actor_message.rs b/src/routers/add_actor_message.rs
index d42d93d..03285a4 100644
--- a/src/routers/add_actor_message.rs
+++ b/src/routers/add_actor_message.rs
@@ -1,8 +1,15 @@
-use crate::actor::actor_wrapper::ActorWrapper;
+use serde::{Deserialize, Serialize};
+use std::hash::{Hash, Hasher};
+
use crate::message::actor_message::BaseActorMessage;
-use crate::prelude::Actor;
+use crate::prelude::{Actor, ActorWrapper};
/// Adds an Actor to the Router
+#[derive(Serialize, Deserialize)]
+#[serde(bound(
+serialize = "A: Actor",
+deserialize = "A: Actor",
+))]
pub struct AddActorMessage
where
A: Actor,
@@ -15,8 +22,15 @@ where
A: Actor,
{
pub fn new(actor: ActorWrapper) -> Self {
- Self { actor }
+ return Self { actor };
}
}
impl BaseActorMessage for AddActorMessage where A: Actor {}
+
+impl Hash for AddActorMessage
+where
+ A: Actor,
+{
+ fn hash(&self, _state: &mut H) {}
+}
diff --git a/src/routers/bulk_router_message.rs b/src/routers/bulk_router_message.rs
index 6cea1f0..3d534cc 100644
--- a/src/routers/bulk_router_message.rs
+++ b/src/routers/bulk_router_message.rs
@@ -1,6 +1,9 @@
use crate::message::actor_message::BaseActorMessage;
+use serde::Serialize;
+use std::hash::{Hash, Hasher};
/// Wraps multiple [ActorMessage](../prelude/trait.ActorMessage.html) to be sent to a Router
+#[derive(Serialize)]
pub struct BulkRouterMessage