diff --git a/CHANGELOG.md b/CHANGELOG.md index c5d0070..fb6ecf6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,32 @@ +# 1.1.0 + + - upgrade dependencies + - `LeastMessageRouter.min_mailbox_size` is now working correctly + - Routers now come with 2 new configurations to configure behavior + - `stop_on_system_stop` => if false, the user needs to manually stop the router for a clean and quick shutdown of the system + - `stop_on_empty_targets` => automatically stops the router if there are no more targets to receive a message to be routed. This does not apply to manually removed targets + - Routers now automatically remove stopped actors from their target pool if they have been stopped + - `Sharded_Router` now makes use of `HashRing` + - `ActorMessage` now needs to implement `Hash` and no longer need to explicitly implement `ActorMessage.get_id()` to be able to properly make use of the `ShardedRouter` + - renamed `ActorMessage.get_id()` to `ActorMessage.get_hash()` and changed return type to u64 + - changed default implementation to `ActorMessage.get_hash()` to make use of `Hash` implementation + - `ActorMessage` now needs to implement `Serialize` + - this requirement comes from the fact, that all messages need to be serializable in theory to be able to be sent to other actor systems + - if a message is really intended to be sent it should obviously also implement `Deserialize` + - Added `.is_mailbox_stopped()`, `is_stopped()` and `wait_for_stop()` to `ActorWrapper` + - Added ability to re-initialize `ActorWrapper` after deserializing + - Added `general.graceful_timeout_in_seconds` to config + - reworked `system.stop()` to make use of configured value + - added `system.stop_override_graceful_termination_timeout(Duration)` to allow override of configured default + - value is also used for signal handling graceful timeout and timeout for network manager + - Added `ActorBuilder.spawn_multiple()` + - All routers now support a `SendToAllTargetsMessage` that will forward `M` to all active targets + - Users can now use `ActorBuilder.get_existing(address: ActorAddress)` to get an `ActorWrapper` for an already existing `Actor` + - `ActorBuilder.spawn()` will continue to return the `ActorWrapper` for already existing actors + - renamed config `global.name` to `global.hostname` + - also renamed `ActorAddress.remote` to `ActorAddress.hostname` + - added new config `global.name` that defaults to the cargo package name or `tyra` if the application is not built using cargo + # 1.0.0 - added `LeastMessageRouter` diff --git a/Cargo.toml b/Cargo.toml index 29b06ad..7796878 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,17 +18,27 @@ name = "tyra" path = "src/lib.rs" [dependencies] -config = "0.13.4" -hostname = "0.3.1" -num_cpus = "1.16.0" +config = "0.15.18" +hostname = "0.4.1" +num_cpus = "1.17.0" threadpool = "1.8.1" -crossbeam-channel = "0.5.13" -flume = "0.10.14" -dashmap = "5.5.3" -serde = { version = "1.0", features = ["derive"] } -thiserror = "1.0" -log = "0.4" +crossbeam-channel = "0.5.15" +flume = "0.11.1" +dashmap = "6.1.0" +serde = { version = "1.0.228", features = ["derive"] } +serde-encrypt = "0.7.0" +thiserror = "2.0.17" +log = "0.4.28" +ctrlc = { version = "3.5.0", features = ["termination"] } +mio = {version = "1.0.4", features = ["os-poll", "os-ext", "net"]} +quiche = "0.24.6" +io-arc = "1.0.0" +hashring = "0.3.6" +bincode = "1.3.3" +regex = "1.11.3" +trust-dns-resolver = "0.23.2" + [dev-dependencies] -bincode = "1.3.3" -ntest = "0.8.1" \ No newline at end of file +ntest = "0.9.3" +simple_logger = "5.0.0" \ No newline at end of file diff --git a/README.md b/README.md index e6a3c22..54da7a2 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,7 @@ impl ActorFactory for TestActorFactory { impl Handler for TestActor { fn handle(&mut self, _msg: TestMessage, context: &ActorContext) -> Result> { println!("HELLO WORLD!"); - context.system.stop(Duration::from_millis(1000)); + context.system.stop(); Ok(ActorResult::Ok) } } diff --git a/examples/benchmark_bulk_router.rs b/examples/benchmark_bulk_router.rs index dec535e..20c0b9a 100644 --- a/examples/benchmark_bulk_router.rs +++ b/examples/benchmark_bulk_router.rs @@ -1,3 +1,4 @@ +use serde::Serialize; use std::error::Error; use std::process::exit; use std::thread::sleep; @@ -5,14 +6,17 @@ use std::time::{Duration, Instant}; use tyra::prelude::*; use tyra::router::{AddActorMessage, BulkRouterMessage, RoundRobinRouterFactory}; +#[derive(Hash, Serialize)] struct MessageA {} impl ActorMessage for MessageA {} +#[derive(Hash, Serialize)] struct Finish {} impl ActorMessage for Finish {} +#[derive(Hash, Serialize)] struct Start {} impl ActorMessage for Start {} @@ -139,7 +143,7 @@ impl Handler for Aggregator { "{} It took {:?} to finish {} actors", self.name, duration, self.total_actors ); - self.ctx.system.stop(Duration::from_secs(60)); + self.ctx.system.stop(); } Ok(ActorResult::Ok) } @@ -165,7 +169,7 @@ fn main() { // ideal number is "amount of threads - 3" let actor_count = 7; - let router_factory = RoundRobinRouterFactory::new(); + let router_factory = RoundRobinRouterFactory::new(true, true); let router = actor_system .builder() .spawn("benchmark-router", router_factory) diff --git a/examples/benchmark_router_round_robin.rs b/examples/benchmark_router_round_robin.rs index 933b086..2efc787 100644 --- a/examples/benchmark_router_round_robin.rs +++ b/examples/benchmark_router_round_robin.rs @@ -1,17 +1,21 @@ +use serde::Serialize; use std::error::Error; use std::process::exit; -use std::time::{Duration, Instant}; +use std::time::Instant; use tyra::prelude::*; use tyra::router::{AddActorMessage, RoundRobinRouterFactory}; +#[derive(Hash, Serialize)] struct MessageA {} impl ActorMessage for MessageA {} +#[derive(Hash, Serialize)] struct Finish {} impl ActorMessage for Finish {} +#[derive(Hash, Serialize)] struct Start {} impl ActorMessage for Start {} @@ -134,7 +138,7 @@ impl Handler for Aggregator { "{} It took {:?} to finish {} actors", self.name, duration, self.total_actors ); - self.ctx.system.stop(Duration::from_secs(60)); + self.ctx.system.stop(); } Ok(ActorResult::Ok) } @@ -158,7 +162,7 @@ fn main() { let message_count = 10000000; let actor_count = 10; - let router_factory = RoundRobinRouterFactory::new(); + let router_factory = RoundRobinRouterFactory::new(true, true); let router = actor_system .builder() .spawn("benchmark-router", router_factory) diff --git a/examples/benchmark_single_actor.rs b/examples/benchmark_single_actor.rs index 7e81fd4..5d37f9a 100644 --- a/examples/benchmark_single_actor.rs +++ b/examples/benchmark_single_actor.rs @@ -1,8 +1,10 @@ +use serde::Serialize; use std::error::Error; use std::process::exit; -use std::time::{Duration, Instant}; +use std::time::Instant; use tyra::prelude::*; +#[derive(Hash, Serialize)] struct MessageA {} impl ActorMessage for MessageA {} @@ -61,7 +63,7 @@ impl Handler for Benchmark { ); } if self.count == self.total_msgs { - context.system.stop(Duration::from_secs(60)); + context.system.stop(); } Ok(ActorResult::Ok) } diff --git a/examples/benchmark_single_actor_process_after_send.rs b/examples/benchmark_single_actor_process_after_send.rs index fd271ce..5ef51d0 100644 --- a/examples/benchmark_single_actor_process_after_send.rs +++ b/examples/benchmark_single_actor_process_after_send.rs @@ -1,8 +1,10 @@ +use serde::Serialize; use std::error::Error; use std::process::exit; use std::time::{Duration, Instant}; use tyra::prelude::*; +#[derive(Hash, Serialize)] struct MessageA {} impl ActorMessage for MessageA {} @@ -61,7 +63,7 @@ impl Handler for Benchmark { ); } if self.count == self.total_msgs { - context.system.stop(Duration::from_secs(60)); + context.system.stop(); } Ok(ActorResult::Ok) } diff --git a/examples/benchmark_single_actor_process_after_send_single_thread.rs b/examples/benchmark_single_actor_process_after_send_single_thread.rs index fee271b..9de72fd 100644 --- a/examples/benchmark_single_actor_process_after_send_single_thread.rs +++ b/examples/benchmark_single_actor_process_after_send_single_thread.rs @@ -1,8 +1,10 @@ +use serde::Serialize; use std::error::Error; use std::process::exit; use std::time::{Duration, Instant}; use tyra::prelude::*; +#[derive(Hash, Serialize)] struct MessageA {} impl ActorMessage for MessageA {} @@ -61,7 +63,7 @@ impl Handler for Benchmark { ); } if self.count == self.total_msgs { - context.system.stop(Duration::from_secs(60)); + context.system.stop(); } Ok(ActorResult::Ok) } diff --git a/examples/benchmark_single_actor_single_thread.rs b/examples/benchmark_single_actor_single_thread.rs index 3e1142d..6e8fd5e 100644 --- a/examples/benchmark_single_actor_single_thread.rs +++ b/examples/benchmark_single_actor_single_thread.rs @@ -1,8 +1,10 @@ +use serde::Serialize; use std::error::Error; use std::process::exit; -use std::time::{Duration, Instant}; +use std::time::Instant; use tyra::prelude::*; +#[derive(Hash, Serialize)] struct MessageA {} impl ActorMessage for MessageA {} @@ -61,7 +63,7 @@ impl Handler for Benchmark { ); } if self.count == self.total_msgs { - context.system.stop(Duration::from_secs(60)); + context.system.stop(); } Ok(ActorResult::Ok) } diff --git a/examples/net.rs b/examples/net.rs new file mode 100644 index 0000000..d4805b7 --- /dev/null +++ b/examples/net.rs @@ -0,0 +1,20 @@ +use simple_logger::SimpleLogger; +use tyra::prelude::*; + +fn main() { + SimpleLogger::new().init().unwrap(); + + // generate config + let mut actor_config = TyraConfig::new().unwrap(); + actor_config.cluster.enabled = true; + let cluster = ThreadPoolConfig::new(22, 4, 4, 1.00); + actor_config + .thread_pool + .config + .insert(String::from("mio"), cluster); + // start system with config + let actor_system = ActorSystem::new(actor_config); + + + std::process::exit(actor_system.await_shutdown()); +} diff --git a/examples/quickstart.rs b/examples/quickstart.rs index 2e905a5..e499f47 100644 --- a/examples/quickstart.rs +++ b/examples/quickstart.rs @@ -1,8 +1,10 @@ +use serde::Serialize; use std::error::Error; -use std::time::Duration; use tyra::prelude::*; // define an `ActorMessage` that can be sent to `Actors` that implement the corresponding `Handler` + +#[derive(Hash, Serialize)] struct TestMessage {} impl TestMessage { pub fn new() -> Self { @@ -44,7 +46,7 @@ impl Handler for TestActor { context: &ActorContext, ) -> Result> { println!("HELLO WORLD!"); - context.system.stop(Duration::from_millis(1000)); + context.system.stop(); Ok(ActorResult::Ok) } } diff --git a/examples/serialize.rs b/examples/serialize.rs index 09d78f2..da8a1d3 100644 --- a/examples/serialize.rs +++ b/examples/serialize.rs @@ -1,17 +1,17 @@ use serde::{Deserialize, Serialize}; use std::error::Error; use std::process::exit; -use std::time::{Duration, Instant}; +use std::time::Duration; use tyra::prelude::*; -#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] +#[derive(Serialize, Deserialize, Hash, Clone)] struct TestMsg { content: String, + actor_wrapper: ActorWrapper, } impl ActorMessage for TestMsg {} -#[derive(Clone)] struct RemoteActor {} impl Actor for RemoteActor { @@ -24,10 +24,9 @@ impl Actor for RemoteActor { if result.is_err() { return Ok(ActorResult::Ok); } - let decoded: TestMsg = result.unwrap(); - context - .actor_ref - .send_after(decoded, Duration::from_millis(50))?; + let mut deserialized: TestMsg = result.unwrap(); + deserialized.actor_wrapper.init_after_deserialize(&context.system); + deserialized.actor_wrapper.send_after(deserialized.clone(), Duration::from_millis(50))?; Ok(ActorResult::Ok) } } @@ -36,10 +35,11 @@ impl Handler for RemoteActor { fn handle( &mut self, msg: TestMsg, - _context: &ActorContext, + context: &ActorContext, ) -> Result> { println!("{}", msg.content); - Ok(ActorResult::Ok) + context.system.stop(); + Ok(ActorResult::Stop) } } @@ -59,19 +59,15 @@ fn main() { let actor_system = ActorSystem::new(actor_config); let hw = RemoteActorFactory {}; - let x = actor_system.builder().spawn("hello-world", hw).unwrap(); + let remote_actor = actor_system.builder().spawn("hello-world", hw).unwrap(); let msg = TestMsg { content: String::from("Hello World!"), + actor_wrapper: remote_actor.clone(), }; let serialized = bincode::serialize(&msg).unwrap(); - actor_system.send_to_address(x.get_address(), SerializedMessage::new(serialized)); - let start = Instant::now(); + actor_system.send_to_address(remote_actor.get_address(), serialized); - actor_system.stop(Duration::from_secs(10)); let result = actor_system.await_shutdown(); - let duration = start.elapsed(); - println!("It took {:?} to send stop", duration); - exit(result); } diff --git a/src/actor/actor.rs b/src/actor/actor.rs index e857d0f..ad55544 100644 --- a/src/actor/actor.rs +++ b/src/actor/actor.rs @@ -1,4 +1,3 @@ -use crate::message::actor_stop_message::ActorStopMessage; use crate::prelude::{ActorContext, ActorPanicSource, ActorResult, SerializedMessage}; use log::error; use std::error::Error; @@ -92,7 +91,7 @@ use std::panic::UnwindSafe; /// └─◄─┴──────────────────────────────◄─┴──────────────────────◄─┴─────────────────────────────────────────────────────┘ └─────────────────────────────────────────────┘ /// /// ``` -pub trait Actor: Send + Sync + UnwindSafe + Sized { +pub trait Actor: Send + Sync + UnwindSafe + Sized + 'static { /// executed whenever Actor receives a [SerializedMessage](../prelude/struct.SerializedMessage.html) /// panic triggers `self.on_panic()` with `source = ActorPanicSource::Message` fn handle_serialized_message( @@ -124,7 +123,7 @@ pub trait Actor: Send + Sync + UnwindSafe + Sized { /// } /// impl Actor for TestActor { /// fn on_panic(&mut self, context: &ActorContext, source: ActorPanicSource) -> Result> { - /// context.system.stop(Duration::from_millis(5000)); + /// context.system.stop(); /// return Ok(ActorResult::Kill); /// } /// } @@ -189,7 +188,7 @@ pub trait Actor: Send + Sync + UnwindSafe + Sized { /// } /// impl Actor for TestActor { /// fn on_error(&mut self, context: &ActorContext, err: Box) -> ActorResult { - /// context.system.stop(Duration::from_millis(5000)); + /// context.system.stop(); /// return ActorResult::Kill; /// } /// } @@ -247,7 +246,7 @@ pub trait Actor: Send + Sync + UnwindSafe + Sized { /// } /// impl Actor for TestActor { /// fn pre_start(&mut self, context: &ActorContext) -> Result> { - /// context.system.stop(Duration::from_millis(5000)); + /// context.system.stop(); /// return Ok(ActorResult::Kill); /// } /// } @@ -302,7 +301,7 @@ pub trait Actor: Send + Sync + UnwindSafe + Sized { /// } /// impl Actor for TestActor { /// fn pre_restart(&mut self, context: &ActorContext) { - /// context.system.stop(Duration::from_millis(5000)); + /// context.system.stop(); /// } /// } /// @@ -335,6 +334,56 @@ pub trait Actor: Send + Sync + UnwindSafe + Sized { /// ``` fn pre_restart(&mut self, _context: &ActorContext) {} + /// executed before mailbox will be disabled + /// + /// # Examples + /// + /// ```rust + /// use tyra::prelude::*; + /// use std::error::Error; + /// use std::time::Duration; + /// + /// struct TestActor {} + /// impl TestActor { + /// pub fn new() -> Self { + /// Self {} + /// } + /// } + /// impl Actor for TestActor { + /// fn pre_stop(&mut self, context: &ActorContext) { + /// context.system.stop(); + /// } + /// } + /// + /// struct TestActorFactory {} + /// impl TestActorFactory { + /// pub fn new() -> Self { + /// Self {} + /// } + /// } + /// impl ActorFactory for TestActorFactory { + /// fn new_actor(&mut self, _context: ActorContext) -> Result> { + /// Ok(TestActor::new()) + /// } + /// } + /// + /// impl Handler for TestActor { + /// fn handle(&mut self, _msg: ActorInitMessage, context: &ActorContext) -> Result> { + /// return Ok(ActorResult::Stop); + /// } + /// } + /// + /// #[ntest::timeout(10000)] + /// fn main() { + /// let actor_config = TyraConfig::new().unwrap(); + /// let actor_system = ActorSystem::new(actor_config); + /// let actor = actor_system.builder().spawn("test", TestActorFactory::new()).unwrap(); + /// actor.send(ActorInitMessage::new()).unwrap(); + /// std::process::exit(actor_system.await_shutdown()); + /// } + /// ``` + fn pre_stop(&mut self, _context: &ActorContext) {} + /// executed after the last message is handled /// /// # Examples @@ -352,7 +401,7 @@ pub trait Actor: Send + Sync + UnwindSafe + Sized { /// } /// impl Actor for TestActor { /// fn post_stop(&mut self, context: &ActorContext) { - /// context.system.stop(Duration::from_millis(5000)); + /// context.system.stop(); /// } /// } /// @@ -424,7 +473,7 @@ pub trait Actor: Send + Sync + UnwindSafe + Sized { /// /// impl Handler for TestActor { /// fn handle(&mut self, _msg: ActorInitMessage, context: &ActorContext) -> Result> { - /// context.system.stop(Duration::from_millis(5000)); + /// context.system.stop(); /// return Ok(ActorResult::Ok); /// } /// } @@ -443,12 +492,13 @@ pub trait Actor: Send + Sync + UnwindSafe + Sized { &mut self, context: &ActorContext, ) -> Result> { - let result = context.actor_ref.send(ActorStopMessage::new()); + let result = context.actor_ref.stop(); if result.is_err() { error!( "Could not forward message ActorStopMessage to target {}", context.actor_ref.get_address().actor ); + return Ok(ActorResult::Stop); } return Ok(ActorResult::Ok); } diff --git a/src/actor/actor_address.rs b/src/actor/actor_address.rs index 1c07888..b8defef 100644 --- a/src/actor/actor_address.rs +++ b/src/actor/actor_address.rs @@ -1,7 +1,25 @@ -#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] +use serde::{Deserialize, Serialize}; + +#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)] pub struct ActorAddress { - pub remote: String, + pub hostname: String, pub system: String, pub pool: String, pub actor: String, } + +impl ActorAddress { + pub fn new( + hostname: impl Into, + system: impl Into, + pool: impl Into, + actor: impl Into, + ) -> Self { + return Self { + hostname: hostname.into(), + system: system.into(), + pool: pool.into(), + actor: actor.into(), + }; + } +} diff --git a/src/actor/actor_builder.rs b/src/actor/actor_builder.rs index 1a4987f..0efd8df 100644 --- a/src/actor/actor_builder.rs +++ b/src/actor/actor_builder.rs @@ -1,11 +1,10 @@ use crate::actor::actor_address::ActorAddress; use crate::actor::actor_config::ActorConfig; use crate::actor::actor_factory::ActorFactory; -use crate::actor::actor_wrapper::ActorWrapper; use crate::actor::executor::{Executor, ExecutorTrait}; use crate::actor::mailbox::Mailbox; use crate::config::tyra_config::DEFAULT_POOL; -use crate::prelude::{Actor, Handler, SerializedMessage}; +use crate::prelude::{Actor, ActorWrapper, Handler, SerializedMessage}; use crate::system::actor_error::ActorError; use crate::system::actor_system::ActorSystem; use crate::system::internal_actor_manager::InternalActorManager; @@ -118,34 +117,40 @@ where /// } /// } /// - /// struct SecondActor {} - /// impl SecondActor { + /// struct BrokenActor {} + /// impl BrokenActor { /// pub fn new() -> Self { /// Self {} /// } /// } - /// impl Actor for SecondActor {} + /// impl Actor for BrokenActor {} /// - /// struct SecondActorFactory {} - /// impl SecondActorFactory { + /// struct BrokenActorFactory {} + /// impl BrokenActorFactory { /// pub fn new() -> Self { /// Self {} /// } /// } - /// impl ActorFactory for SecondActorFactory { - /// fn new_actor(&mut self, _context: ActorContext) -> Result> { + /// impl ActorFactory for BrokenActorFactory { + /// fn new_actor(&mut self, _context: ActorContext) -> Result> { /// let error = std::io::Error::from_raw_os_error(1337); /// return Err(Box::new(error)); /// } /// } /// - /// #[ntest::timeout(10000)] + /// #[ntest::timeout(100000)] /// fn main() { /// let mut actor_config = TyraConfig::new().unwrap(); /// actor_config.thread_pool.config.insert(String::from("default"), ThreadPoolConfig::new(1, 1, 1, 1.0)); /// let actor_system = ActorSystem::new(actor_config); - /// let actor_name = "test"; /// + /// //this does not work, because although there's not yet an actor called `broken` on the pool the `new_actor` method returns an error + /// let this_is_not_working = actor_system.builder().spawn("broken", BrokenActorFactory::new()); + /// assert!(this_is_not_working.is_err(), "The BrokenActor was spawned"); + /// let err = this_is_not_working.err().unwrap(); + /// assert_eq!(err, ActorError::InitError, "Error is not correct"); + /// + /// let actor_name = "test"; /// //this works, because there's no actor called `test` yet on the pool /// let this_works = actor_system.builder().spawn(actor_name, TestActorFactory::new()); /// assert!(this_works.is_ok(), "The actor could not be spawned"); @@ -160,25 +165,19 @@ where /// let err = pool_full.err().unwrap(); /// assert_eq!(err, ActorError::ThreadPoolHasTooManyActorsError, "Error is not correct"); /// - /// //this does not work, because the pool does not exist in the configuration + /// ////this does not work, because the pool does not exist in the configuration /// let invalid_pool = actor_system.builder().set_pool_name("invalid").spawn(actor_name, TestActorFactory::new()); /// assert!(invalid_pool.is_err(), "The Actor was spawned"); /// let err = invalid_pool.err().unwrap(); /// assert_eq!(err, ActorError::ThreadPoolDoesNotExistError, "Error is not correct"); /// - /// //this does not work, because although there's not yet an actor called `second` on the pool the `new_actor` method returns an error - /// let this_is_not_working = actor_system.builder().spawn("second", SecondActorFactory::new()); - /// assert!(this_is_not_working.is_err(), "The SecondActor was spawned"); - /// let err = this_is_not_working.err().unwrap(); - /// assert_eq!(err, ActorError::InitError, "Error is not correct"); - /// - /// //this does not work, because there's already an actor called `test` with a different type on the pool - /// let this_is_not_working_either = actor_system.builder().spawn(actor_name, SecondActorFactory::new()); + /// ////this does not work, because there's already an actor called `test` with a different type on the pool + /// let this_is_not_working_either = actor_system.builder().spawn(actor_name, BrokenActorFactory::new()); /// assert!(this_is_not_working_either.is_err(), "Illegal Actor type conversion"); /// let err = this_is_not_working_either.err().unwrap(); /// assert_eq!(err, ActorError::InvalidActorTypeError, "Error is not correct"); /// - /// actor_system.stop(Duration::from_millis(1000)); + /// actor_system.stop(); /// std::process::exit(actor_system.await_shutdown()); /// } /// ``` @@ -186,17 +185,22 @@ where where P: ActorFactory + 'static, { - let actor_address = ActorAddress { - actor: name.into(), - system: String::from(self.system.get_name()), - pool: self.actor_config.pool_name.clone(), - remote: String::from("local"), - }; + let actor_address = ActorAddress::new( + self.system.get_hostname(), + self.system.get_name(), + self.actor_config.pool_name.clone(), + name, + ); if self.system_state.is_mailbox_active(&actor_address) { return self .system_state - .get_actor_ref(actor_address, self.internal_actor_manager.clone()); + .get_actor_ref(&actor_address, self.internal_actor_manager.clone()); + } + + let result = self.system_state.increase_pool_actor_count(&actor_address); + if result.is_err() { + return Err(result.unwrap_err()); } let (sender, receiver) = if self.actor_config.mailbox_size == 0 { @@ -216,6 +220,7 @@ where actor_address.clone(), self.wakeup_manager.clone(), self.internal_actor_manager.clone(), + self.system_state.clone(), ); let actor_handler = Executor::new( @@ -230,23 +235,266 @@ where match actor_handler { Ok(a) => { - let result = self - .system_state + self.system_state .add_mailbox(actor_address.clone(), mailbox); - - if result.is_err() { - return Err(result.unwrap_err()); - } - self.wakeup_manager .add_inactive_actor(a.get_address(), Arc::new(RwLock::new(a))); - self.existing.insert(actor_address, actor_ref.clone()); return Ok(actor_ref); } Err(e) => { + self.system_state.decrease_pool_actor_count(&actor_address); return Err(e); } } } + + /// Gets the defined [Actor] for the [ActorAddress] on the [ActorSystem] + /// + /// # Returns + /// + /// `Ok(ActorWrapper)` if actor already exists on the system + /// + /// `Err(ActorError)` see [ActorError](../prelude/enum.ActorError.html) for detailed information + /// + /// # Examples + /// + /// ```rust + /// use tyra::prelude::*; + /// use std::error::Error; + /// use std::time::Duration; + /// + /// struct TestActor {} + /// impl TestActor { + /// pub fn new() -> Self { + /// Self {} + /// } + /// } + /// impl Actor for TestActor {} + /// + /// struct TestActorFactory {} + /// impl TestActorFactory { + /// pub fn new() -> Self { + /// Self {} + /// } + /// } + /// impl ActorFactory for TestActorFactory { + /// fn new_actor(&mut self, _context: ActorContext) -> Result> { + /// Ok(TestActor::new()) + /// } + /// } + /// + /// struct BrokenActor {} + /// impl BrokenActor { + /// pub fn new() -> Self { + /// Self {} + /// } + /// } + /// impl Actor for BrokenActor {} + /// + /// struct BrokenActorFactory {} + /// impl BrokenActorFactory { + /// pub fn new() -> Self { + /// Self {} + /// } + /// } + /// impl ActorFactory for BrokenActorFactory { + /// fn new_actor(&mut self, _context: ActorContext) -> Result> { + /// let error = std::io::Error::from_raw_os_error(1337); + /// return Err(Box::new(error)); + /// } + /// } + /// + /// #[ntest::timeout(100000)] + /// fn main() { + /// let mut actor_config = TyraConfig::new().unwrap(); + /// let actor_system = ActorSystem::new(actor_config); + /// + /// let pool_name = "default"; + /// let actor_name = "test"; + /// let address = ActorAddress::new(actor_system.get_hostname(), actor_system.get_name(), pool_name, actor_name); + /// + /// //this does not work, because the actor does not exist yet + /// let this_is_not_working :Result, ActorError> = actor_system.builder().get_existing(&address); + /// assert!(this_is_not_working.is_err(), "The BrokenActor existed"); + /// let err = this_is_not_working.err().unwrap(); + /// assert_eq!(err, ActorError::DoesNotExistError, "Error is not correct"); + /// + /// //this works, because there's no actor called `test` yet on the pool + /// let this_works = actor_system.builder().set_pool_name(pool_name).spawn(actor_name, TestActorFactory::new()); + /// assert!(this_works.is_ok(), "The actor could not be spawned"); + /// + /// //this does not work, because the actor type does not match + /// let this_is_not_working :Result, ActorError> = actor_system.builder().get_existing(&address); + /// assert!(this_is_not_working.is_err(), "The BrokenActor existed"); + /// let err = this_is_not_working.err().unwrap(); + /// assert_eq!(err, ActorError::InvalidActorTypeError, "Error is not correct"); + /// + /// //this does work, because the actor type matches + /// let this_works :Result, ActorError> = actor_system.builder().get_existing(&address); + /// assert!(this_works.is_ok(), "The TestActor did not exist"); + /// + /// actor_system.stop(); + /// std::process::exit(actor_system.await_shutdown()); + /// } + /// ``` + pub fn get_existing(&self, actor_address: &ActorAddress) -> Result, ActorError> { + if self.system_state.is_mailbox_active(actor_address) { + return self + .system_state + .get_actor_ref(actor_address, self.internal_actor_manager.clone()); + } + return Err(ActorError::DoesNotExistError); + } + + /// Always returns a ActorWrapper, even if it does not exist on the current ActorSystem + /// + /// # Returns + /// + /// `ActorWrapper` + /// + /// # Examples + /// + /// ```rust + /// use tyra::prelude::*; + /// use std::error::Error; + /// use std::time::Duration; + /// + /// struct TestActor {} + /// impl TestActor { + /// pub fn new() -> Self { + /// Self {} + /// } + /// } + /// impl Actor for TestActor {} + /// + /// #[derive(Clone)] + /// struct TestActorFactory {} + /// impl TestActorFactory { + /// pub fn new() -> Self { + /// Self {} + /// } + /// } + /// impl ActorFactory for TestActorFactory { + /// fn new_actor(&mut self, _context: ActorContext) -> Result> { + /// Ok(TestActor::new()) + /// } + /// } + /// + /// #[ntest::timeout(100000)] + /// fn main() { + /// let mut actor_config = TyraConfig::new().unwrap(); + /// actor_config.thread_pool.config.insert(String::from("default"), ThreadPoolConfig::new(2, 1, 1, 1.0)); + /// let actor_system = ActorSystem::new(actor_config); + /// + /// let address = ActorAddress::new("remote", "system", "pool", "actor"); + /// let actor_wrapper :ActorWrapper = actor_system.builder().init_after_deserialize(&address); + /// + /// actor_system.stop(); + /// std::process::exit(actor_system.await_shutdown()); + /// } + /// ``` + pub fn init_after_deserialize(&self, actor_address: &ActorAddress) -> ActorWrapper { + let result = self.get_existing(actor_address); + if result.is_ok() { + let result = result.unwrap(); + return result; + } + return ActorWrapper::from_address(actor_address.clone(), self.system_state.clone()); + } + + /// Creates N defined [Actor]s on the [ActorSystem] + /// + /// Requires [ActorFactory] to implement `Clone` + /// + /// # Returns + /// + /// `Ok(Vec>)` if actors were created successfully + /// + /// `Ok(Vec>)` if the actors are already running on the system + /// + /// `Err(ActorError)` see [ActorError](../prelude/enum.ActorError.html) for detailed information. Will also stop any actors that might already have been started + /// + /// + /// # Examples + /// + /// ```rust + /// use tyra::prelude::*; + /// use std::error::Error; + /// use std::time::Duration; + /// + /// struct TestActor {} + /// impl TestActor { + /// pub fn new() -> Self { + /// Self {} + /// } + /// } + /// impl Actor for TestActor {} + /// + /// #[derive(Clone)] + /// struct TestActorFactory {} + /// impl TestActorFactory { + /// pub fn new() -> Self { + /// Self {} + /// } + /// } + /// impl ActorFactory for TestActorFactory { + /// fn new_actor(&mut self, _context: ActorContext) -> Result> { + /// Ok(TestActor::new()) + /// } + /// } + /// + /// #[ntest::timeout(100000)] + /// fn main() { + /// let mut actor_config = TyraConfig::new().unwrap(); + /// actor_config.thread_pool.config.insert(String::from("default"), ThreadPoolConfig::new(2, 1, 1, 1.0)); + /// let actor_system = ActorSystem::new(actor_config); + /// + /// let actor_name = "test"; + /// //this works, because there's no actor called `test` yet on the pool + /// let this_works = actor_system.builder().spawn_multiple(actor_name, TestActorFactory::new(), 2); + /// assert!(this_works.is_ok(), "The actors could not be spawned"); + /// + /// //this works, because there's already an actor called `test` with type `TestActor` on the pool, therefore the result is the same actor that was created in the previous spawn command + /// let this_works_as_well = actor_system.builder().spawn_multiple(actor_name, TestActorFactory::new(), 2); + /// assert!(this_works_as_well.is_ok(), "The `ActorWrapper` could not be fetched"); + /// + /// //this does not work, because the pool is currently configured to only allow two actors + /// let pool_full = actor_system.builder().spawn_multiple("full", TestActorFactory::new(), 10); + /// assert!(pool_full.is_err(), "The actor could not be spawned"); + /// let err = pool_full.err().unwrap(); + /// assert_eq!(err, ActorError::ThreadPoolHasTooManyActorsError, "Error is not correct"); + /// + /// actor_system.stop(); + /// std::process::exit(actor_system.await_shutdown()); + /// } + /// ``` + pub fn spawn_multiple

( + &self, + name: impl Into + Clone + std::fmt::Display, + props: P, + spawn_count: usize, + ) -> Result>, ActorError> + where + P: ActorFactory + 'static + Clone, + { + let mut to_return = Vec::new(); + for i in 0..spawn_count { + let name = format!("{}-{}", name.clone(), i); + let res = self.spawn(name, props.clone()); + match res { + Ok(res) => { + to_return.push(res); + } + Err(e) => { + for actor in &to_return { + let _ = actor.stop(); + } + return Err(e); + } + } + } + + return Ok(to_return); + } } diff --git a/src/actor/actor_send_error.rs b/src/actor/actor_send_error.rs index d4627bf..7c44c9e 100644 --- a/src/actor/actor_send_error.rs +++ b/src/actor/actor_send_error.rs @@ -9,4 +9,8 @@ pub enum ActorSendError { /// Triggered by [ActorWrapper.send](../prelude/struct.ActorWrapper.html#method.send) && [ActorWrapper.send_timeout](../prelude/struct.ActorWrapper.html#method.send_timout) when a message is sent to a stopped Actor #[error("Message could not be delivered")] AlreadyStoppedError, + + /// Triggered by [ActorWrapper](../prelude/struct.ActorWrapper.html) if a message can't be send to remote Actors + #[error("Message can't be delivered to remote Actor")] + NotAllowedForRemoteActorError } diff --git a/src/actor/context.rs b/src/actor/context.rs index 6d8e8b7..a021c55 100644 --- a/src/actor/context.rs +++ b/src/actor/context.rs @@ -1,5 +1,4 @@ -use crate::actor::actor_wrapper::ActorWrapper; -use crate::prelude::Actor; +use crate::prelude::{Actor, ActorWrapper}; use crate::system::actor_system::ActorSystem; use std::panic::UnwindSafe; diff --git a/src/actor/executor.rs b/src/actor/executor.rs index c785dcd..5505291 100644 --- a/src/actor/executor.rs +++ b/src/actor/executor.rs @@ -2,14 +2,14 @@ use crate::actor::actor_address::ActorAddress; use crate::actor::actor_config::ActorConfig; use crate::actor::actor_factory::ActorFactory; use crate::actor::actor_state::ActorState; -use crate::actor::actor_wrapper::ActorWrapper; + use crate::actor::context::ActorContext; use crate::actor::handler::Handler; use crate::actor::mailbox::Mailbox; use crate::message::actor_message::BaseActorMessage; use crate::message::envelope::{MessageEnvelope, MessageEnvelopeTrait}; use crate::message::system_stop_message::SystemStopMessage; -use crate::prelude::{Actor, ActorPanicSource, ActorResult}; +use crate::prelude::{Actor, ActorPanicSource, ActorResult, ActorWrapper}; use crate::system::actor_error::ActorError; use crate::system::actor_system::ActorSystem; use log::debug; @@ -113,6 +113,7 @@ where } fn stop_actor(&mut self, immediately: bool) -> ActorState { + let _ = catch_unwind(AssertUnwindSafe(|| self.actor.pre_stop(&self.context))); self.mailbox.is_stopped.store(true, Ordering::Relaxed); if immediately { let _ = catch_unwind(AssertUnwindSafe(|| self.actor.post_stop(&self.context))); diff --git a/src/actor/handler.rs b/src/actor/handler.rs index b0b32df..35ac12b 100644 --- a/src/actor/handler.rs +++ b/src/actor/handler.rs @@ -15,11 +15,13 @@ use std::error::Error; /// /// ```rust /// use std::error::Error; +/// use serde::Serialize; /// use tyra::prelude::{TyraConfig, ActorSystem, ActorFactory, ActorContext, SerializedMessage, Handler, Actor, ActorResult, ActorMessage}; /// /// struct TestActor {} /// impl Actor for TestActor {} /// +/// #[derive(Hash, Serialize)] /// struct FooBar {} /// impl ActorMessage for FooBar {} /// diff --git a/src/actor/mod.rs b/src/actor/mod.rs index 71c518c..4b743ee 100644 --- a/src/actor/mod.rs +++ b/src/actor/mod.rs @@ -7,7 +7,6 @@ pub mod actor_panic_source; pub mod actor_result; pub mod actor_send_error; pub mod actor_state; -pub mod actor_wrapper; pub mod context; pub mod executor; pub mod handler; @@ -15,12 +14,12 @@ pub mod mailbox; pub mod prelude { pub use crate::actor::actor::Actor; + pub use crate::actor::actor_address::ActorAddress; pub use crate::actor::actor_builder::ActorBuilder; pub use crate::actor::actor_factory::ActorFactory; pub use crate::actor::actor_panic_source::ActorPanicSource; pub use crate::actor::actor_result::ActorResult; pub use crate::actor::actor_send_error::ActorSendError; - pub use crate::actor::actor_wrapper::ActorWrapper; pub use crate::actor::context::ActorContext; pub use crate::actor::handler::Handler; } diff --git a/src/config/cluster_config.rs b/src/config/cluster_config.rs new file mode 100644 index 0000000..7964f88 --- /dev/null +++ b/src/config/cluster_config.rs @@ -0,0 +1,8 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ClusterConfig { + pub enabled: bool, + pub hosts: Vec, + pub members: Vec, +} diff --git a/src/config/default.toml b/src/config/default.toml index c04de50..dc09940 100644 --- a/src/config/default.toml +++ b/src/config/default.toml @@ -2,8 +2,11 @@ # general actor settings [general] # name of the actor system +# magic default: name of the cargo package; defaults to "tyra" if not built with cargo +name = "$CARGO_PKG_NAME" +# hostname of the actor system # magic default: system hostname -name = "$HOSTNAME" +hostname = "$HOSTNAME" # default mailbox size for every actor if no explicit size is set # 0 is treated as unlimited default_mailbox_size = 0 @@ -11,6 +14,11 @@ default_mailbox_size = 0 default_message_throughput = 15 # defines if the rust panic hook should be overwritten by the actor system on startup override_panic_hook = true +# will start a graceful shutdown on first SIGINT, SIGTERM or SIGHUP, or force a shutdown if multiple signals are received. +enable_signal_handling = true +# maximum graceful shutdown time before system is hard killed 0 disables graceful shutdown +# waning: setting to 0 can result in endlessly running applications +graceful_timeout_in_seconds = 300 # default pool settings [thread_pool.config.default] @@ -34,4 +42,75 @@ threads_min = 2 # maximum amount of threads to spawn for this pool threads_max = 3 # num_cpu * factor = amount of threads to spawn for this pool -threads_factor = 1 \ No newline at end of file +threads_factor = 1 + +# cluster pool settings +# only enabled if `cluster.enabled` is set to `true` +[thread_pool.config.cluster] +# amount of actors that this thread_pool can handle +# 0 is treated as unlimited +actor_limit = 5 +# minimum amount of threads to spawn for this pool +threads_min = 2 +# maximum amount of threads to spawn for this pool +threads_max = 5 +# num_cpu * factor = amount of threads to spawn for this pool +threads_factor = 1 + +# cluster configuration +[cluster] +# defines if cluster functionality should be enabled +enabled = false +## addresses and port used by the cluster. +## if port is omitted, the default of 2022 is used +## if port is set to 0, a port will automatically be assigned +hosts = ["tcp://0.0.0.0:2022", "udp://0.0.0.0:2023"] +## list of cluster members, entries can include IPs and DNS records, multi A records are supported as well +## if the same node is listed multiple times with different ip addresses, only the first working occurence will be used as a connection +## if port is omitted, the default of 2022 is used +members = [ + "tcp://registry.git.sers.dev", + "tcp://git.sers.dev", + "tcp://abc.sers.dev:1234", +] + +##WIP: How cluster config could look like +##### +# +# +## default cluster settings +#[cluster] + +#hosts = ["tcp://127.0.0.1:2022", "tcp://192.168.0.1:2022", "udp://0.0.0.0:2023"] +# +## cluster groups the node should be part of. Valid values are "server", "client" and "proxy" +## - server: can be elected as master node and will open specified ports in hosts +## - client: can't be elected as master node and will ignore cluster.hosts config +## - proxy: will forward traffic between two nodes that can't directly communicate with each other +#groups = ["server", "client", "proxy"] +# +## list of cluster members, entries can include IPs and DNS records, multi A records are supported as well +## if the same node is listed multiple times with different ip addresses, only the first working occurence will be used as a connection +## if port is omitted, the default of 2022 is used +#members = [ +# "tcp://jkasfhjklahsjkghjklalkj.ahjasdjkhaskhjg.asdf:1234", +# "tcp://registry.git.sers.dev", +# "tcp://git.sers.dev", +# "tcp://abc.sers.dev:1234", +#] +## list of cidrs that are allowed to connect +#trusted_cidrs = [ +# "78.47.25.243/32", +# "78.47.42.31/32", +#] +# +## name of the pre shared key that should be used to encrypt outgoing traffic +#active_psk = "default" +# +## list of encryption keys that are used in the cluster +## new connections will go through trial and error to find which key is used by which server +#[cluster.pre_shared_keys] +#default = "sers" +#additional = "hello-world" +# +# \ No newline at end of file diff --git a/src/config/global_config.rs b/src/config/global_config.rs index 403ab0d..f3eacad 100644 --- a/src/config/global_config.rs +++ b/src/config/global_config.rs @@ -3,7 +3,10 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct GeneralConfig { pub name: String, + pub hostname: String, pub default_mailbox_size: usize, pub default_message_throughput: usize, pub override_panic_hook: bool, + pub enable_signal_handling: bool, + pub graceful_timeout_in_seconds: u64, } diff --git a/src/config/mod.rs b/src/config/mod.rs index d1b8c07..8429384 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,8 +1,11 @@ pub mod global_config; pub mod pool_config; pub mod tyra_config; +pub mod cluster_config; pub mod prelude { + pub use crate::config::global_config::GeneralConfig; pub use crate::config::pool_config::ThreadPoolConfig; pub use crate::config::tyra_config::TyraConfig; + pub use crate::config::cluster_config::ClusterConfig; } diff --git a/src/config/tyra_config.rs b/src/config/tyra_config.rs index cf34275..36e25ec 100644 --- a/src/config/tyra_config.rs +++ b/src/config/tyra_config.rs @@ -4,6 +4,7 @@ use std::path::Path; use config::{Config, ConfigError, File, FileFormat}; use serde::{Deserialize, Serialize}; +use crate::prelude::ClusterConfig; pub const DEFAULT_POOL: &str = "default"; @@ -12,6 +13,7 @@ pub const DEFAULT_POOL: &str = "default"; pub struct TyraConfig { pub general: GeneralConfig, pub thread_pool: PoolConfig, + pub cluster: ClusterConfig, } impl TyraConfig { @@ -46,8 +48,11 @@ impl TyraConfig { let conf = config.build().expect("Could not fetch Config"); let mut parsed: TyraConfig = conf.try_deserialize().expect("Could not parse Config"); - if parsed.general.name == "$HOSTNAME" { - parsed.general.name = String::from(hostname::get().unwrap().to_str().unwrap()); + if parsed.general.hostname == "$HOSTNAME" { + parsed.general.hostname = String::from(hostname::get().unwrap().to_str().unwrap()); + } + if parsed.general.name == "$CARGO_PKG_NAME" { + parsed.general.name = option_env!("CARGO_PKG_NAME").unwrap_or("tyra").into(); } Ok(parsed) diff --git a/src/lib.rs b/src/lib.rs index face0c6..85a8f61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,8 +15,10 @@ //! use std::process::exit; //! use std::time::Duration; //! use std::error::Error; +//! use serde::Serialize; //! //! // define message +//! #[derive(Hash, Serialize)] //! struct FooBar {} //! impl ActorMessage for FooBar {} //! @@ -25,6 +27,7 @@ //! impl Actor for HelloWorld {} //! //! // setup required Factory +//! #[derive(Clone)] //! struct HelloWorldFactory {} //! impl ActorFactory for HelloWorldFactory { //! fn new_actor(&mut self, _context: ActorContext) -> Result> { @@ -56,7 +59,7 @@ //! //! // cleanup //! actor.stop().unwrap(); -//! actor_system.stop(Duration::from_millis(5000)); +//! actor_system.stop(); //! exit(actor_system.await_shutdown()); //! } //! ``` @@ -107,15 +110,19 @@ mod actor; mod config; mod message; +mod net; mod routers; mod system; +mod wrapper; /// core components pub mod prelude { pub use crate::actor::prelude::*; pub use crate::config::prelude::*; pub use crate::message::prelude::*; + pub use crate::net::prelude::*; pub use crate::system::prelude::*; + pub use crate::wrapper::prelude::*; } /// collection of different router implementations diff --git a/src/message/actor_init_message.rs b/src/message/actor_init_message.rs index 161cf45..ff1ec83 100644 --- a/src/message/actor_init_message.rs +++ b/src/message/actor_init_message.rs @@ -1,6 +1,8 @@ use crate::prelude::ActorMessage; +use serde::{Deserialize, Serialize}; /// Can be implemented by an Actor through Handler to be used to init an Actor +#[derive(Hash, Serialize, Deserialize)] pub struct ActorInitMessage {} impl ActorInitMessage { diff --git a/src/message/actor_message.rs b/src/message/actor_message.rs index c734aba..44ddcc7 100644 --- a/src/message/actor_message.rs +++ b/src/message/actor_message.rs @@ -1,18 +1,18 @@ +use serde::Serialize; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; + /// This trait is used internally by the `ActorSystem` and builds the base for all messaging /// It's automatically implemented by the `ActorMessage` trait that should be used /// /// It is used by Messages defined in the system /// All messages that use this trait directly should also implement a dynamic `Handler` that applies to any `Actor` -pub trait BaseActorMessage: Send + Sync {} +pub trait BaseActorMessage: Send + Sync + Hash + Serialize {} + /// This trait is used by Messages defined by the system /// All messages that use this trait should also implement a dynamic `Handler` that applies to any `Actor` -pub trait DefaultActorMessage: Send + Sync { - /// returns the message id - fn get_id(&self) -> usize { - return 0; - } -} +pub trait DefaultActorMessage: Send + Sync + Hash + Serialize {} impl BaseActorMessage for A where A: DefaultActorMessage {} @@ -23,17 +23,21 @@ impl BaseActorMessage for A where A: DefaultActorMessage {} /// Basic usage: /// /// ```rust +/// use serde::Serialize; /// use tyra::prelude::ActorMessage; /// +/// #[derive(Hash, Serialize)] /// struct FooBar {} /// impl ActorMessage for FooBar {} /// ``` -pub trait ActorMessage: Send + Sync { - /// returns the message id - fn get_id(&self) -> usize { - return 0; +pub trait ActorMessage: Send + Sync + Hash + Serialize { + /// returns the message hash + fn get_hash(&self) -> u64 { + let mut hasher = DefaultHasher::new(); + self.hash(&mut hasher); + return hasher.finish(); } } /// this should be `BaseActorMessage` but it's currently not possible because of https://github.com/rust-lang/rust/issues/20400 -impl DefaultActorMessage for A where A: ActorMessage {} +impl DefaultActorMessage for A where A: ActorMessage + Serialize {} diff --git a/src/message/actor_stop_message.rs b/src/message/actor_stop_message.rs index d1d38a8..ae09a65 100644 --- a/src/message/actor_stop_message.rs +++ b/src/message/actor_stop_message.rs @@ -1,5 +1,7 @@ use crate::message::actor_message::DefaultActorMessage; +use serde::{Deserialize, Serialize}; +#[derive(Hash, Serialize, Deserialize)] pub struct ActorStopMessage {} impl ActorStopMessage { diff --git a/src/message/bulk_actor_message.rs b/src/message/bulk_actor_message.rs index 115b6fe..4c86d49 100644 --- a/src/message/bulk_actor_message.rs +++ b/src/message/bulk_actor_message.rs @@ -1,8 +1,10 @@ /// Bulk Actor Message, that can wrap and send multiple [ActorMessage](../prelude/trait.ActorMessage.html) at once /// use crate::message::actor_message::BaseActorMessage; +use serde::Serialize; /// Wraps multiple [ActorMessage](../prelude/trait.ActorMessage.html) to be sent to an Actor +#[derive(Hash, Serialize)] pub struct BulkActorMessage where M: BaseActorMessage + 'static, diff --git a/src/message/delayed_message.rs b/src/message/delayed_message.rs index 77dfef2..1cd445a 100644 --- a/src/message/delayed_message.rs +++ b/src/message/delayed_message.rs @@ -1,8 +1,15 @@ use crate::message::actor_message::BaseActorMessage; use crate::prelude::{Actor, ActorMessage, ActorWrapper}; +use serde::Serialize; +use std::hash::{Hash, Hasher}; use std::time::{Duration, Instant}; /// Wraps an [ActorMessage](../prelude/trait.ActorMessage.html) to be sent at a later time +#[derive(Serialize)] +#[serde(bound( +serialize = "A: Actor", +deserialize = "A: Actor", +))] pub struct DelayedMessage where M: BaseActorMessage + 'static, @@ -11,6 +18,7 @@ where pub msg: M, pub destination: ActorWrapper, pub delay: Duration, + #[serde(skip)] pub started: Instant, } @@ -36,3 +44,13 @@ where } } } + +impl Hash for DelayedMessage +where + M: BaseActorMessage + 'static, + A: Actor, +{ + fn hash(&self, state: &mut H) { + self.msg.hash(state); + } +} diff --git a/src/message/serialized_message.rs b/src/message/serialized_message.rs index abc97dd..e2ef896 100644 --- a/src/message/serialized_message.rs +++ b/src/message/serialized_message.rs @@ -1,4 +1,7 @@ +use std::hash::{Hash, Hasher}; use crate::message::actor_message::DefaultActorMessage; +use serde::{Deserialize, Serialize}; +use crate::prelude::ActorAddress; /// For Remote message handling /// @@ -8,13 +11,24 @@ use crate::message::actor_message::DefaultActorMessage; /// and it may also include some additional fields to make deserialization easier for end users /// /// [ActorSystem.send_to_address](../prelude/struct.ActorSystem.html#method.send_to_address) uses this object to send serialized messages to Actors +#[derive(Serialize, Deserialize)] pub struct SerializedMessage { + pub destination_address: ActorAddress, pub content: Vec, } +impl Hash for SerializedMessage { + fn hash(&self, state: &mut H) { + self.destination_address.hash(state); + } +} + impl SerializedMessage { - pub fn new(content: Vec) -> Self { - Self { content } + pub fn new(destination_address: ActorAddress, content: Vec) -> Self { + Self { + destination_address, + content, + } } } diff --git a/src/message/sleep_message.rs b/src/message/sleep_message.rs index 045da71..224a316 100644 --- a/src/message/sleep_message.rs +++ b/src/message/sleep_message.rs @@ -1,7 +1,9 @@ use crate::message::actor_message::DefaultActorMessage; +use serde::{Deserialize, Serialize}; use std::time::Duration; /// Puts an actor to sleep for a specified time +#[derive(Hash, Serialize, Deserialize)] pub struct SleepMessage { pub duration: Duration, } diff --git a/src/message/system_stop_message.rs b/src/message/system_stop_message.rs index bca3cfb..5516a4e 100644 --- a/src/message/system_stop_message.rs +++ b/src/message/system_stop_message.rs @@ -1,5 +1,7 @@ use crate::message::actor_message::DefaultActorMessage; +use serde::{Deserialize, Serialize}; +#[derive(Hash, Serialize, Deserialize)] pub struct SystemStopMessage {} impl SystemStopMessage { diff --git a/src/net/mod.rs b/src/net/mod.rs new file mode 100644 index 0000000..b324f44 --- /dev/null +++ b/src/net/mod.rs @@ -0,0 +1,14 @@ +mod net_config; +mod net_manager; +pub mod net_messages; +mod net_worker; + +pub mod prelude { + pub use crate::net::net_config::NetConfig; + pub use crate::net::net_config::NetProtocol; + pub use crate::net::net_config::NetConnectionType; + pub use crate::net::net_manager::NetManager; + pub use crate::net::net_manager::NetManagerFactory; + pub use crate::net::net_worker::NetWorker; + pub use crate::net::net_worker::NetWorkerFactory; +} diff --git a/src/net/net_config.rs b/src/net/net_config.rs new file mode 100644 index 0000000..1876084 --- /dev/null +++ b/src/net/net_config.rs @@ -0,0 +1,30 @@ +#[derive(Clone, Ord, Eq, PartialOrd, PartialEq, Copy, Debug)] +pub enum NetProtocol { + TCP, + UDP, +} + +#[derive(Clone, Ord, Eq, PartialOrd, PartialEq, Copy, Debug)] +pub enum NetConnectionType { + CLIENT, + SERVER +} + +#[derive(Clone, Ord, Eq, PartialOrd, PartialEq, Debug)] +pub struct NetConfig { + pub protocol: NetProtocol, + pub connection_type: NetConnectionType, + pub host: String, + pub port: usize, +} + +impl NetConfig { + pub fn new(protocol: NetProtocol, connection_type: NetConnectionType, host: impl Into, port: usize) -> Self { + Self { + protocol, + connection_type, + host: host.into(), + port, + } + } +} diff --git a/src/net/net_manager.rs b/src/net/net_manager.rs new file mode 100644 index 0000000..2f607e2 --- /dev/null +++ b/src/net/net_manager.rs @@ -0,0 +1,483 @@ +use crate::net::net_messages::{ + AddTcpConnection, AddUdpSocket, ReceiveTcpMessage, ReceiveUdpMessage, RemoveTcpConnection, +}; +use crate::prelude::{Actor, ActorContext, ActorFactory, ActorInitMessage, ActorResult, ActorWrapper, Handler, NetConfig, NetConnectionType, NetProtocol}; +use crate::router::Router; +use io_arc::IoArc; +use log::{debug, error, warn}; +use mio::event::Source; +use mio::net::{TcpListener, TcpStream, UdpSocket}; +use mio::{Events, Interest, Poll, Token}; +use std::collections::HashMap; +use std::error::Error; +use std::io::{BufRead, BufReader}; +use std::marker::PhantomData; +use std::net::Shutdown; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::thread::sleep; +use std::time::{Duration, Instant}; + +pub struct NetManager +where + T: Handler + + Handler + + Handler + + Handler + + Handler + + 'static, + R: Router + Handler + Handler + Handler + Handler + Handler, +{ + graceful_shutdown_time_in_seconds: Duration, + on_stop_udp_timeout: Duration, + router: ActorWrapper, + workers: Vec>, + server_configs: Vec, + client_configs: Vec, + is_stopping: Arc, + is_stopped: Arc, +} + +impl NetManager +where + T: Handler + + Handler + + Handler + + Handler + + Handler + + 'static, + R: Router + Handler + Handler + Handler + Handler + Handler, +{ + pub fn new( + server_configs: Vec, + client_configs: Vec, + graceful_shutdown_time_in_seconds: Duration, + on_stop_udp_timeout: Duration, + workers: Vec>, + router: ActorWrapper, + ) -> Self + { + let is_stopping = Arc::new(AtomicBool::new(false)); + let is_stopped = Arc::new(AtomicBool::new(false)); + + return Self { + graceful_shutdown_time_in_seconds, + on_stop_udp_timeout, + router, + workers, + server_configs, + client_configs, + is_stopping, + is_stopped, + }; + } +} +impl Actor for NetManager +where + T: Handler + + Handler + + Handler + + Handler + + Handler + + 'static, + R: Router + Handler + Handler + Handler + Handler + Handler, + +{ + fn pre_stop(&mut self, _context: &ActorContext) { + let iterate_graceful_stop_sleep_duration_in_seconds = 1; + let sleep_duration = Duration::from_secs(iterate_graceful_stop_sleep_duration_in_seconds); + + let iterations = if self.graceful_shutdown_time_in_seconds.as_secs() > 0 { + self.graceful_shutdown_time_in_seconds.as_secs() / iterate_graceful_stop_sleep_duration_in_seconds + } else { + 0 + }; + + sleep(sleep_duration); + + self.is_stopping.store(true, Ordering::Relaxed); + + let mut i = 0; + loop { + if self.graceful_shutdown_time_in_seconds.as_secs() > 0 { + if i > iterations { + return; + } + i += 1; + } + + for net_config in &self.server_configs { + let address = format!("{}:{}", net_config.host, net_config.port); + match net_config.protocol { + NetProtocol::TCP => { + let _ = TcpStream::connect(address.parse().unwrap()); + break; + } + NetProtocol::UDP => { + let sock = UdpSocket::bind("127.0.0.1:0".parse().unwrap()); + if sock.is_ok() { + let sock = sock.unwrap(); + let _ = sock.send_to(b"", address.parse().unwrap()); + } + break; + } + } + } + if self.is_stopped.load(Ordering::Relaxed) { + return; + } + sleep(sleep_duration); + } + } + fn post_stop(&mut self, _context: &ActorContext) { + let _ = self.router.stop(); + for worker in &self.workers { + let _ = worker.stop(); + } + for worker in &self.workers { + worker.wait_for_stop(); + } + + self.is_stopped.store(true, Ordering::Relaxed); + for net_config in &self.server_configs { + let address = format!("{}:{}", net_config.host, net_config.port); + match net_config.protocol { + NetProtocol::TCP => { + let _ = TcpStream::connect(address.parse().unwrap()); + break; + } + NetProtocol::UDP => { + let sock = UdpSocket::bind("127.0.0.1:0".parse().unwrap()); + if sock.is_ok() { + let sock = sock.unwrap(); + let _ = sock.send_to(b"", address.parse().unwrap()); + } + break; + } + } + } + } +} + +pub struct NetManagerFactory +where + + T: Handler + + Handler + + Handler + + Handler + + Handler + + 'static, + R: Router + Handler + Handler + Handler + Handler + Handler, +{ + server_configs: Vec, + client_configs: Vec, + graceful_shutdown_time_in_seconds: Duration, + on_stop_udp_timeout: Duration, + workers: Vec>, + router: ActorWrapper, + phantom: PhantomData +} + +impl NetManagerFactory +where + T: Handler + + Handler + + Handler + + Handler + + Handler + + 'static, + R: Router + Handler + Handler + Handler + Handler + Handler, +{ + pub fn new( + server_configs: Vec, + client_configs: Vec, + graceful_shutdown_time_in_seconds: Duration, + on_stop_udp_timeout: Duration, + workers: Vec>, + router: ActorWrapper, + ) -> Self { + return Self { + server_configs, + client_configs, + graceful_shutdown_time_in_seconds, + on_stop_udp_timeout, + workers, + router, + phantom: PhantomData, + }; + } +} +impl ActorFactory> for NetManagerFactory +where + T: Handler + + Handler + + Handler + + Handler + + Handler + + 'static, + R: Router + Handler + Handler + Handler + Handler + Handler, +{ + fn new_actor( + &mut self, + context: ActorContext>, + ) -> Result, Box> { + context.actor_ref.send(ActorInitMessage::new()).unwrap(); + return Ok(NetManager::new( + self.server_configs.clone(), + self.client_configs.clone(), + self.graceful_shutdown_time_in_seconds, + self.on_stop_udp_timeout, + self.workers.clone(), + self.router.clone(), + )); + } +} + +impl Handler for NetManager +where + T: Handler + + Handler + + Handler + + Handler + + Handler + + 'static, + R: Router + Handler + Handler + Handler + Handler + Handler, + +{ + fn handle( + &mut self, + _msg: ActorInitMessage, + context: &ActorContext, + ) -> Result> { + let router = self.router.clone(); + let is_stopping = self.is_stopping.clone(); + let is_stopped = self.is_stopped.clone(); + let mut server_configs = self.server_configs.clone(); + let mut client_configs = self.client_configs.clone(); + let mut last_udp_message_received = Instant::now(); + let on_stop_udp_timeout = self.on_stop_udp_timeout.clone(); + let context = context.clone(); + thread::spawn(move || { + let mut tcp_listeners: HashMap = HashMap::new(); + let mut udp_sockets: HashMap> = HashMap::new(); + let poll = Poll::new(); + if poll.is_err() { + error!("Can't start Poll Port: {:?}", poll.err()); + is_stopped.store(true, Ordering::Relaxed); + let _ = context.actor_ref.stop(); + return; + } + let mut poll = poll.unwrap(); + + let mut i = 0; + server_configs.sort_by_key(|c| c.protocol); + client_configs.sort_by_key(|c| c.protocol); + server_configs.append(&mut client_configs); + for net_config in &server_configs { + + let token = Token(i); + i += 1; + + if net_config.connection_type == NetConnectionType::CLIENT { + let client = TcpStream::connect(format!("{}:{}", net_config.host, net_config.port).parse().unwrap()); + if client.is_err() { + warn!("Can't connect to client: {:?}", net_config); + continue; + } + let mut client = client.unwrap(); + let res = + poll.registry() + .register(&mut client, token, Interest::READABLE); + if res.is_err() { + error!("Can't register TCP Stream: {:?}", res.err()); + is_stopped.store(true, Ordering::Relaxed); + let _ = context.actor_ref.stop(); + return; + } + //tcp_listeners.insert(token, client); + println!("!!!!!!?"); + continue; + } + let address = format!("{}:{}", net_config.host, net_config.port) + .parse() + .unwrap(); + + match net_config.protocol { + NetProtocol::TCP => { + let listener = TcpListener::bind(address); + if listener.is_err() { + error!("Can't open TCP Port: {:?}", listener.err()); + is_stopped.store(true, Ordering::Relaxed); + let _ = context.actor_ref.stop(); + return; + } + let mut listener = listener.unwrap(); + let res = + poll.registry() + .register(&mut listener, token, Interest::READABLE); + if res.is_err() { + error!("Can't register TCP listener: {:?}", res.err()); + is_stopped.store(true, Ordering::Relaxed); + let _ = context.actor_ref.stop(); + return; + } + tcp_listeners.insert(token, listener); + } + NetProtocol::UDP => { + let socket = UdpSocket::bind(address); + if socket.is_err() { + error!("Can't open TCP Port: {:?}", socket.err()); + is_stopped.store(true, Ordering::Relaxed); + let _ = context.actor_ref.stop(); + return; + } + let mut socket = socket.unwrap(); + let res = poll + .registry() + .register(&mut socket, token, Interest::READABLE); + if res.is_err() { + error!("Can't register UDP Socket: {:?}", res.err()); + is_stopped.store(true, Ordering::Relaxed); + let _ = context.actor_ref.stop(); + return; + } + let socket = IoArc::new(socket); + udp_sockets.insert(token, socket.clone()); + let _ = router.send(AddUdpSocket::new(token.0, socket)); + } + } + } + let num_tcp_listeners = tcp_listeners.len(); + let num_total_listeners = server_configs.len(); + + let mut events = Events::with_capacity(1024); + let mut streams = HashMap::new(); + + let mut buf = [0; 65535]; + loop { + if is_stopped.load(Ordering::Relaxed) { + return; + } + + let res = poll.poll(&mut events, Some(Duration::from_secs(10))); + if res.is_err() { + debug!("Can't poll Network Events"); + continue; + } + + for event in events.iter() { + let stopping = is_stopping.load(Ordering::Relaxed); + if stopping + && streams.len() == 0 + && last_udp_message_received.elapsed() > on_stop_udp_timeout + { + is_stopped.store(true, Ordering::Relaxed); + break; + } + let token = &event.token(); + if token.0 < num_tcp_listeners { + println!("!!!!!!!"); + + let listener = tcp_listeners.get(token); + if listener.is_none() { + warn!("Can't find TcpListener for {:?}", token); + continue; + } + let listener = listener.unwrap(); + + loop { + match listener.accept() { + Ok((mut socket, address)) => { + if stopping { + let _ = socket.shutdown(Shutdown::Both); + continue; + } + let res = socket.register( + poll.registry(), + Token(i), + Interest::READABLE, + ); + if res.is_err() { + error!("Could not register TcpStream. {:?}", res.err()); + } + let sock = IoArc::new(socket); + streams.insert(i, (sock.clone(), address.clone())); + let _ = router.send(AddTcpConnection::new(i, sock, address)); + + i += 1; + if i < num_total_listeners { + i = num_total_listeners; + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + break; + } + Err(e) => { + error!("Something went wrong with the Listener. {:?}", e); + break; + } + } + } + } else if token.0 < num_total_listeners { + //UDP handling + let socket = udp_sockets.get(&token); + if socket.is_none() { + error!("Something went wrong with the UDP Socket."); + } + let socket = socket.unwrap(); + + let (len, from) = match socket.as_ref().recv_from(&mut buf) { + Ok(v) => v, + + Err(e) => { + if e.kind() == std::io::ErrorKind::WouldBlock { + continue; + } + panic!("recv() failed: {:?}", e); + } + }; + let request = String::from_utf8_lossy(&buf[..len]); + let _ = router.send(ReceiveUdpMessage::new( + token.0, + from, + request.into_owned(), + )); + last_udp_message_received = Instant::now(); + } else { + if event.is_read_closed() || event.is_write_closed() { + let _ = streams.remove(&token.0); + let _ = router.send(RemoveTcpConnection::new(token.0)); + } else if event.is_readable() { + let stream = streams.get(&token.0); + if stream.is_none() { + let _ = streams.remove(&token.0); + let _ = router.send(RemoveTcpConnection::new(token.0)); + continue; + } + let (stream, _address) = stream.unwrap(); + let buf_reader = BufReader::new(stream.clone()); + let request: Vec = buf_reader + .lines() + .map(|result| match result { + Ok(res) => { + return res; + } + Err(_err) => { + return String::from(""); + } + }) + .take_while(|line| !line.is_empty()) + .collect(); + if !request.is_empty() { + let _ = router.send(ReceiveTcpMessage::new(token.0, request)); + } + } + } + } + } + }); + return Ok(ActorResult::Ok); + } +} diff --git a/src/net/net_messages.rs b/src/net/net_messages.rs new file mode 100644 index 0000000..9bb4f09 --- /dev/null +++ b/src/net/net_messages.rs @@ -0,0 +1,118 @@ +use crate::prelude::ActorMessage; +use io_arc::IoArc; +use mio::net::{TcpStream, UdpSocket}; +use serde::Serialize; +use std::hash::{Hash, Hasher}; +use std::net::SocketAddr; + +#[derive(Serialize)] +pub struct AddTcpConnection { + pub stream_id: usize, + #[serde(skip)] + pub stream: IoArc, + pub address: SocketAddr, +} + +impl Hash for AddTcpConnection { + fn hash(&self, state: &mut H) { + self.stream_id.hash(state); + } +} + +impl AddTcpConnection { + pub fn new(stream_id: usize, stream: IoArc, address: SocketAddr) -> Self { + return Self { + stream_id, + stream, + address, + }; + } +} + +#[derive(Serialize)] +pub struct RemoveTcpConnection { + pub stream_id: usize, +} + +impl Hash for RemoveTcpConnection { + fn hash(&self, state: &mut H) { + self.stream_id.hash(state); + } +} + +impl ActorMessage for AddTcpConnection {} + +impl RemoveTcpConnection { + pub fn new(stream_id: usize) -> Self { + return Self { stream_id }; + } +} + +impl ActorMessage for RemoveTcpConnection {} + +#[derive(Serialize)] +pub struct ReceiveTcpMessage { + pub stream_id: usize, + pub request: Vec, +} + +impl Hash for ReceiveTcpMessage { + fn hash(&self, state: &mut H) { + self.stream_id.hash(state); + } +} + +impl ReceiveTcpMessage { + pub fn new(stream_id: usize, request: Vec) -> Self { + return Self { stream_id, request }; + } +} + +impl ActorMessage for ReceiveTcpMessage {} + +#[derive(Serialize)] +pub struct AddUdpSocket { + pub socket_id: usize, + #[serde(skip)] + pub socket: IoArc, +} + +impl Hash for AddUdpSocket { + fn hash(&self, state: &mut H) { + self.socket_id.hash(state); + } +} + +impl AddUdpSocket { + pub fn new(socket_id: usize, socket: IoArc) -> Self { + return Self { socket_id, socket }; + } +} + +impl ActorMessage for AddUdpSocket {} + +#[derive(Serialize)] +pub struct ReceiveUdpMessage { + pub socket_id: usize, + #[serde(skip)] + pub source: SocketAddr, + pub request: String, +} + +impl Hash for ReceiveUdpMessage { + fn hash(&self, state: &mut H) { + self.socket_id.hash(state); + } +} + +impl ReceiveUdpMessage { + pub fn new(socket_id: usize, source: SocketAddr, request: String) -> Self { + return Self { + socket_id, + source, + request, + }; + } +} + +impl ActorMessage for ReceiveUdpMessage {} diff --git a/src/net/net_worker.rs b/src/net/net_worker.rs new file mode 100644 index 0000000..793b7c8 --- /dev/null +++ b/src/net/net_worker.rs @@ -0,0 +1,160 @@ +use crate::net::net_messages::{ + AddTcpConnection, AddUdpSocket, ReceiveTcpMessage, ReceiveUdpMessage, RemoveTcpConnection, +}; +use crate::prelude::{Actor, ActorContext, ActorFactory, ActorResult, Handler, SerializedMessage}; +use io_arc::IoArc; +use log::{debug, trace, warn}; +use mio::net::{TcpStream, UdpSocket}; +use std::collections::HashMap; +use std::error::Error; +use std::io::Write; +use std::net::{Shutdown, SocketAddr}; + +#[derive(Clone)] +pub struct NetWorker { + streams: HashMap, SocketAddr)>, + sockets: HashMap>, +} + +impl NetWorker { + pub fn new() -> Self { + return Self { + streams: HashMap::new(), + sockets: HashMap::new(), + }; + } +} +impl Actor for NetWorker { + fn on_system_stop( + &mut self, + _context: &ActorContext, + ) -> Result> { + //we intentionally ignore if the actor system is stopped + //we only react if the actor is explicitly stopped by the manager, because there might still be open connections that we don't want to drop + Ok(ActorResult::Ok) + } + + fn handle_serialized_message(&mut self, _msg: SerializedMessage, _context: &ActorContext) -> Result> { + //handle all outgoing messages here + return Ok(ActorResult::Ok); + } +} + +#[derive(Clone)] +pub struct NetWorkerFactory {} +impl ActorFactory for NetWorkerFactory { + fn new_actor( + &mut self, + _context: ActorContext, + ) -> Result> { + return Ok(NetWorker::new()); + } +} + +impl NetWorkerFactory { + pub fn new() -> Self { + return Self {}; + } +} + +impl Handler for NetWorker { + fn handle( + &mut self, + msg: ReceiveTcpMessage, + _context: &ActorContext, + ) -> Result> { + let stream = self.streams.get_mut(&msg.stream_id); + + if stream.is_none() { + // temporary implementation for our instant http response, later on we won't have to care here if the stream is active, we'll just forward the message + debug!("Stream ID no longer exists, can't reply to request"); + return Ok(ActorResult::Ok); + } + let (stream, _) = stream.unwrap(); + stream.write_all("HTTP/1.1 200 OK\nContent-Type: text/html\nConnection: keep-alive\nContent-Length: 12\r\n\r\nHELLO-WORLD!".as_bytes()).unwrap(); + + // temporary implementation for our instant http response + // drops the connection if keep-alive has not been specified + let mut shutdown_connection = true; + for k in msg.request { + if k == "Connection: Keep-Alive" { + shutdown_connection = false; + break; + } + } + if shutdown_connection { + let _ = stream.as_ref().shutdown(Shutdown::Both); + } + + return Ok(ActorResult::Ok); + } +} + +impl Handler for NetWorker { + fn handle( + &mut self, + msg: AddTcpConnection, + _context: &ActorContext, + ) -> Result> { + trace!("Add TCP Connection: {:?}", msg.address); + let key_already_exists = self.streams.remove(&msg.stream_id); + if key_already_exists.is_some() { + warn!("Stream ID already exists, dropping old one in favor of the new connection."); + let (stream, _) = key_already_exists.unwrap(); + let _ = stream.as_ref().shutdown(Shutdown::Both); + } + + let _ = self + .streams + .insert(msg.stream_id, (msg.stream, msg.address)); + return Ok(ActorResult::Ok); + } +} + +impl Handler for NetWorker { + fn handle( + &mut self, + msg: RemoveTcpConnection, + _context: &ActorContext, + ) -> Result> { + trace!("Remove TCP Connection: {:?}", msg.stream_id); + + let _ = self.streams.remove(&msg.stream_id); + return Ok(ActorResult::Ok); + } +} + +impl Handler for NetWorker { + fn handle( + &mut self, + msg: ReceiveUdpMessage, + _context: &ActorContext, + ) -> Result> { + let socket = self.sockets.get_mut(&msg.socket_id); + if socket.is_none() { + // temporary implementation for our instant http response, later on we won't have to care here if the stream is active, we'll just forward the message + debug!("Socket ID no longer exists, can't reply to request"); + return Ok(ActorResult::Ok); + } + let socket = socket.unwrap(); + let _ = socket.as_ref().send_to(msg.request.as_bytes(), msg.source); + + return Ok(ActorResult::Ok); + } +} + +impl Handler for NetWorker { + fn handle( + &mut self, + msg: AddUdpSocket, + _context: &ActorContext, + ) -> Result> { + let key_already_exists = self.sockets.remove(&msg.socket_id); + if key_already_exists.is_some() { + warn!("Socket ID already exists, dropping old one in favor of the new."); + } + + let _ = self.sockets.insert(msg.socket_id, msg.socket); + return Ok(ActorResult::Ok); + } +} diff --git a/src/routers/add_actor_message.rs b/src/routers/add_actor_message.rs index d42d93d..03285a4 100644 --- a/src/routers/add_actor_message.rs +++ b/src/routers/add_actor_message.rs @@ -1,8 +1,15 @@ -use crate::actor::actor_wrapper::ActorWrapper; +use serde::{Deserialize, Serialize}; +use std::hash::{Hash, Hasher}; + use crate::message::actor_message::BaseActorMessage; -use crate::prelude::Actor; +use crate::prelude::{Actor, ActorWrapper}; /// Adds an Actor to the Router +#[derive(Serialize, Deserialize)] +#[serde(bound( +serialize = "A: Actor", +deserialize = "A: Actor", +))] pub struct AddActorMessage where A: Actor, @@ -15,8 +22,15 @@ where A: Actor, { pub fn new(actor: ActorWrapper) -> Self { - Self { actor } + return Self { actor }; } } impl BaseActorMessage for AddActorMessage where A: Actor {} + +impl Hash for AddActorMessage +where + A: Actor, +{ + fn hash(&self, _state: &mut H) {} +} diff --git a/src/routers/bulk_router_message.rs b/src/routers/bulk_router_message.rs index 6cea1f0..3d534cc 100644 --- a/src/routers/bulk_router_message.rs +++ b/src/routers/bulk_router_message.rs @@ -1,6 +1,9 @@ use crate::message::actor_message::BaseActorMessage; +use serde::Serialize; +use std::hash::{Hash, Hasher}; /// Wraps multiple [ActorMessage](../prelude/trait.ActorMessage.html) to be sent to a Router +#[derive(Serialize)] pub struct BulkRouterMessage where M: BaseActorMessage + 'static, @@ -18,3 +21,12 @@ where Self { data } } } + +impl Hash for BulkRouterMessage +where + M: BaseActorMessage + 'static, +{ + fn hash(&self, state: &mut H) { + self.data.hash(state); + } +} diff --git a/src/routers/least_message_router.rs b/src/routers/least_message_router.rs index 1eab8eb..8bcd68a 100644 --- a/src/routers/least_message_router.rs +++ b/src/routers/least_message_router.rs @@ -1,11 +1,13 @@ use crate::actor::actor_factory::ActorFactory; -use crate::actor::actor_wrapper::ActorWrapper; + use crate::actor::context::ActorContext; use crate::actor::handler::Handler; -use crate::prelude::{Actor, ActorMessage, ActorResult}; +use crate::message::actor_message::BaseActorMessage; +use crate::prelude::{Actor, ActorMessage, ActorResult, ActorWrapper}; +use crate::router::SendToAllTargetsMessage; use crate::routers::add_actor_message::AddActorMessage; use crate::routers::remove_actor_message::RemoveActorMessage; -use log::error; +use log::{debug, error}; use std::error::Error; pub struct LeastMessageRouter @@ -16,6 +18,8 @@ where min_mailbox_size: usize, route_to: Vec>, can_route: bool, + stop_on_system_stop: bool, + stop_on_empty_targets: bool, } /// implements [ActorFactory](../prelude/trait.ActorFactory.html) to spawn a LeastMessageRouter within an [ActorSystem](../prelude/struct.ActorSystem.html) @@ -29,8 +33,10 @@ where /// use tyra::prelude::*; /// use std::process::exit; /// use std::time::Duration; +/// use serde::Serialize; /// use tyra::router::{LeastMessageRouterFactory, AddActorMessage}; /// // define message +/// #[derive(Hash, Serialize)] /// struct FooBar {} /// impl ActorMessage for FooBar {} /// @@ -66,7 +72,7 @@ where /// .unwrap(); /// /// // create the router, fill it, and route a message -/// let router_factory = LeastMessageRouterFactory::new(15); +/// let router_factory = LeastMessageRouterFactory::new(15, true, true); /// let router = actor_system /// .builder() /// .spawn("router-hello-world", router_factory) @@ -75,12 +81,26 @@ where /// router.send(FooBar{}).unwrap(); /// ``` pub struct LeastMessageRouterFactory { + /// minimum mailbox size, that needs to be exceeded to actually search for the router with the least messages min_mailbox_size: usize, + /// defines if the actor should automatically be stopped when the system is stopped. If set to false it's up to the user to setup their own shutdown process if they want a quick and clean exit + stop_on_system_stop: bool, + /// defines if the actor should automatically be stopped if it receives a message after all targets have been automatically removed + /// this does not apply if the last target has been removed through a `RemoveActorMessage` + stop_on_empty_targets: bool, } impl LeastMessageRouterFactory { - pub fn new(min_mailbox_size: usize) -> Self { - Self { min_mailbox_size } + pub fn new( + min_mailbox_size: usize, + stop_on_system_stop: bool, + stop_on_empty_targets: bool, + ) -> Self { + Self { + min_mailbox_size, + stop_on_system_stop, + stop_on_empty_targets, + } } } @@ -92,29 +112,59 @@ where &mut self, _context: ActorContext>, ) -> Result, Box> { - return Ok(LeastMessageRouter::new(self.min_mailbox_size)); + return Ok(LeastMessageRouter::new( + self.min_mailbox_size, + self.stop_on_system_stop, + self.stop_on_empty_targets, + )); } } impl LeastMessageRouter where - A: Actor, + A: Actor + 'static, { - pub fn new(min_mailbox_size: usize) -> Self { + pub fn new( + min_mailbox_size: usize, + stop_on_system_stop: bool, + stop_on_empty_targets: bool, + ) -> Self { Self { next_route_index: 0, min_mailbox_size, route_to: Vec::new(), can_route: false, + stop_on_system_stop, + stop_on_empty_targets, } } } -impl Actor for LeastMessageRouter where A: Actor {} +impl Actor for LeastMessageRouter +where + A: Actor + 'static, +{ + fn on_system_stop( + &mut self, + context: &ActorContext, + ) -> Result> { + if self.stop_on_system_stop { + let result = context.actor_ref.stop(); + if result.is_err() { + error!( + "Could not forward message ActorStopMessage to target {}", + context.actor_ref.get_address().actor + ); + return Ok(ActorResult::Stop); + } + } + return Ok(ActorResult::Ok); + } +} impl Handler> for LeastMessageRouter where - A: Actor, + A: Actor + 'static, { fn handle( &mut self, @@ -129,7 +179,7 @@ where impl Handler> for LeastMessageRouter where - A: Actor, + A: Actor + 'static, { fn handle( &mut self, @@ -164,11 +214,31 @@ where return Ok(ActorResult::Ok); } - let mut target = self.route_to.get(self.next_route_index).unwrap(); + let mut target; + // skip/remove stopped actors + loop { + let route_index = self.next_route_index; + target = self.route_to.get(self.next_route_index).unwrap(); + + if target.is_stopped() { + self.next_route_index += 1; + if self.next_route_index >= (self.route_to.len() - 1) { + self.next_route_index = 0; + } + self.route_to.remove(route_index); + if self.route_to.len() == 0 && self.stop_on_empty_targets { + debug!("Stopping router, because all targets have been removed"); + return Ok(ActorResult::Stop); + } + } else { + break; + } + } + let mut mailbox_size = target.get_mailbox_size(); let target_len = self.route_to.len(); for i in 0..target_len { - if mailbox_size >= self.min_mailbox_size { + if mailbox_size <= self.min_mailbox_size { break; } let potential_target = self.route_to.get(i).unwrap(); @@ -194,3 +264,45 @@ where return Ok(ActorResult::Ok); } } + +impl Handler> for LeastMessageRouter +where + A: Actor + Handler + 'static, + M: BaseActorMessage + Clone + 'static, +{ + fn handle( + &mut self, + msg: SendToAllTargetsMessage, + _context: &ActorContext, + ) -> Result> { + if !self.can_route { + return Ok(ActorResult::Ok); + } + + // skip/remove stopped actors + loop { + let route_index = self.next_route_index; + let target = self.route_to.get(self.next_route_index).unwrap(); + + if target.is_stopped() { + self.next_route_index += 1; + if self.next_route_index >= (self.route_to.len() - 1) { + self.next_route_index = 0; + } + self.route_to.remove(route_index); + if self.route_to.len() == 0 && self.stop_on_empty_targets { + debug!("Stopping router, because all targets have been removed"); + return Ok(ActorResult::Stop); + } + } else { + break; + } + } + + for target in &self.route_to { + let _ = target.send(msg.msg.clone()); + } + + return Ok(ActorResult::Ok); + } +} diff --git a/src/routers/mod.rs b/src/routers/mod.rs index 6af6bbe..95c0b9e 100644 --- a/src/routers/mod.rs +++ b/src/routers/mod.rs @@ -3,6 +3,7 @@ mod bulk_router_message; mod least_message_router; mod remove_actor_message; mod round_robin_router; +mod send_to_all_targets_message; mod sharded_router; pub mod prelude { @@ -13,6 +14,8 @@ pub mod prelude { pub use crate::routers::remove_actor_message::RemoveActorMessage; pub use crate::routers::round_robin_router::RoundRobinRouter; pub use crate::routers::round_robin_router::RoundRobinRouterFactory; + pub use crate::routers::send_to_all_targets_message::SendToAllTargetsMessage; pub use crate::routers::sharded_router::ShardedRouter; pub use crate::routers::sharded_router::ShardedRouterFactory; + pub use crate::routers::sharded_router::Router; } diff --git a/src/routers/remove_actor_message.rs b/src/routers/remove_actor_message.rs index ba29c13..e67f11c 100644 --- a/src/routers/remove_actor_message.rs +++ b/src/routers/remove_actor_message.rs @@ -1,8 +1,15 @@ -use crate::actor::actor_wrapper::ActorWrapper; +use serde::Serialize; +use std::hash::{Hash, Hasher}; + use crate::message::actor_message::BaseActorMessage; -use crate::prelude::Actor; +use crate::prelude::{Actor, ActorWrapper}; /// Removes an Actor from the Router +#[derive(Serialize)] +#[serde(bound( +serialize = "A: Actor", +deserialize = "A: Actor", +))] pub struct RemoveActorMessage where A: Actor, @@ -20,3 +27,10 @@ where } impl BaseActorMessage for RemoveActorMessage where A: Actor {} + +impl Hash for RemoveActorMessage +where + A: Actor, +{ + fn hash(&self, _state: &mut H) {} +} diff --git a/src/routers/round_robin_router.rs b/src/routers/round_robin_router.rs index 95b18c7..1d2b60d 100644 --- a/src/routers/round_robin_router.rs +++ b/src/routers/round_robin_router.rs @@ -1,13 +1,14 @@ use crate::actor::actor_factory::ActorFactory; -use crate::actor::actor_wrapper::ActorWrapper; + use crate::actor::context::ActorContext; use crate::actor::handler::Handler; use crate::message::actor_message::{ActorMessage, BaseActorMessage}; -use crate::prelude::{Actor, ActorResult, BulkActorMessage}; +use crate::prelude::{Actor, ActorResult, ActorWrapper, BulkActorMessage}; +use crate::router::SendToAllTargetsMessage; use crate::routers::add_actor_message::AddActorMessage; use crate::routers::bulk_router_message::BulkRouterMessage; use crate::routers::remove_actor_message::RemoveActorMessage; -use log::error; +use log::{debug, error}; use std::error::Error; pub struct RoundRobinRouter @@ -17,6 +18,8 @@ where route_index: usize, route_to: Vec>, can_route: bool, + stop_on_system_stop: bool, + stop_on_empty_targets: bool, } /// implements [ActorFactory](../prelude/trait.ActorFactory.html) to spawn a RoundRobinRouter within an [ActorSystem](../prelude/struct.ActorSystem.html) @@ -30,9 +33,11 @@ where /// use tyra::prelude::*; /// use std::process::exit; /// use std::time::Duration; +/// use serde::Serialize; /// use tyra::router::{RoundRobinRouterFactory, AddActorMessage}; /// /// // define message +/// #[derive(Hash, Serialize)] /// struct FooBar {} /// impl ActorMessage for FooBar {} /// @@ -68,7 +73,7 @@ where /// .unwrap(); /// /// // create the router, fill it, and route a message -/// let router_factory = RoundRobinRouterFactory::new(); +/// let router_factory = RoundRobinRouterFactory::new(true, true); /// let router = actor_system /// .builder() /// .spawn("router-hello-world", router_factory) @@ -76,11 +81,20 @@ where /// router.send(AddActorMessage::new(actor.clone())).unwrap(); /// router.send(FooBar{}).unwrap(); /// ``` -pub struct RoundRobinRouterFactory {} +pub struct RoundRobinRouterFactory { + /// defines if the actor should automatically be stopped when the system is stopped. If set to false it's up to the user to setup their own shutdown process if they want a quick and clean exit + stop_on_system_stop: bool, + /// defines if the actor should automatically be stopped if it receives a message after all targets have been automatically removed + /// this does not apply if the last target has been removed through a `RemoveActorMessage + stop_on_empty_targets: bool, +} impl RoundRobinRouterFactory { - pub fn new() -> Self { - Self {} + pub fn new(stop_on_system_stop: bool, stop_on_empty_targets: bool) -> Self { + Self { + stop_on_system_stop, + stop_on_empty_targets, + } } } @@ -92,28 +106,53 @@ where &mut self, _context: ActorContext>, ) -> Result, Box> { - return Ok(RoundRobinRouter::new()); + return Ok(RoundRobinRouter::new( + self.stop_on_system_stop, + self.stop_on_empty_targets, + )); } } impl RoundRobinRouter where - A: Actor, + A: Actor + 'static, { - pub fn new() -> Self { + pub fn new(stop_on_system_stop: bool, stop_on_empty_targets: bool) -> Self { Self { route_index: 0, route_to: Vec::new(), can_route: false, + stop_on_system_stop, + stop_on_empty_targets, } } } -impl Actor for RoundRobinRouter where A: Actor {} +impl Actor for RoundRobinRouter +where + A: Actor + 'static, +{ + fn on_system_stop( + &mut self, + context: &ActorContext, + ) -> Result> { + if self.stop_on_system_stop { + let result = context.actor_ref.stop(); + if result.is_err() { + error!( + "Could not forward message ActorStopMessage to target {}", + context.actor_ref.get_address().actor + ); + return Ok(ActorResult::Stop); + } + } + return Ok(ActorResult::Ok); + } +} impl Handler> for RoundRobinRouter where - A: Actor, + A: Actor + 'static, { fn handle( &mut self, @@ -128,7 +167,7 @@ where impl Handler> for RoundRobinRouter where - A: Actor, + A: Actor + 'static, { fn handle( &mut self, @@ -163,6 +202,26 @@ where return Ok(ActorResult::Ok); } + // skip/remove stopped actors + loop { + let route_index = self.route_index; + let target = self.route_to.get(self.route_index).unwrap(); + + if target.is_stopped() { + self.route_index += 1; + if self.route_index >= (self.route_to.len() - 1) { + self.route_index = 0; + } + self.route_to.remove(route_index); + if self.route_to.len() == 0 && self.stop_on_empty_targets { + debug!("Stopping router, because all targets have been removed"); + return Ok(ActorResult::Stop); + } + } else { + break; + } + } + self.route_index += 1; if self.route_index >= self.route_to.len() { self.route_index = 0; @@ -194,6 +253,26 @@ where return Ok(ActorResult::Ok); } + // skip/remove stopped actors + loop { + let route_index = self.route_index; + let target = self.route_to.get(self.route_index).unwrap(); + + if target.is_stopped() { + self.route_index += 1; + if self.route_index >= (self.route_to.len() - 1) { + self.route_index = 0; + } + self.route_to.remove(route_index); + if self.route_to.len() == 0 && self.stop_on_empty_targets { + debug!("Stopping router, because all targets have been removed"); + return Ok(ActorResult::Stop); + } + } else { + break; + } + } + let total_messages = msg.data.len(); let total_routees = self.route_to.len(); let messages_per_routee = total_messages / total_routees; @@ -217,3 +296,45 @@ where return Ok(ActorResult::Ok); } } + +impl Handler> for RoundRobinRouter +where + A: Actor + Handler + 'static, + M: BaseActorMessage + Clone + 'static, +{ + fn handle( + &mut self, + msg: SendToAllTargetsMessage, + _context: &ActorContext, + ) -> Result> { + if !self.can_route { + return Ok(ActorResult::Ok); + } + + // skip/remove stopped actors + loop { + let route_index = self.route_index; + let target = self.route_to.get(self.route_index).unwrap(); + + if target.is_stopped() { + self.route_index += 1; + if self.route_index >= (self.route_to.len() - 1) { + self.route_index = 0; + } + self.route_to.remove(route_index); + if self.route_to.len() == 0 && self.stop_on_empty_targets { + debug!("Stopping router, because all targets have been removed"); + return Ok(ActorResult::Stop); + } + } else { + break; + } + } + + for target in &self.route_to { + let _ = target.send(msg.msg.clone()); + } + + return Ok(ActorResult::Ok); + } +} diff --git a/src/routers/send_to_all_targets_message.rs b/src/routers/send_to_all_targets_message.rs new file mode 100644 index 0000000..5cb3a00 --- /dev/null +++ b/src/routers/send_to_all_targets_message.rs @@ -0,0 +1,32 @@ +use crate::message::actor_message::BaseActorMessage; +use serde::Serialize; +use std::hash::{Hash, Hasher}; + +/// Wraps multiple [ActorMessage](../prelude/trait.ActorMessage.html) to be sent to a Router +#[derive(Serialize)] +pub struct SendToAllTargetsMessage +where + M: BaseActorMessage + 'static, +{ + pub msg: M, +} + +impl BaseActorMessage for SendToAllTargetsMessage where M: BaseActorMessage + 'static {} + +impl SendToAllTargetsMessage +where + M: BaseActorMessage + 'static, +{ + pub fn new(msg: M) -> Self { + Self { msg } + } +} + +impl Hash for SendToAllTargetsMessage +where + M: BaseActorMessage + 'static, +{ + fn hash(&self, state: &mut H) { + self.msg.hash(state); + } +} diff --git a/src/routers/sharded_router.rs b/src/routers/sharded_router.rs index 9238b12..2a60274 100644 --- a/src/routers/sharded_router.rs +++ b/src/routers/sharded_router.rs @@ -1,24 +1,35 @@ use crate::actor::actor_factory::ActorFactory; -use crate::actor::actor_wrapper::ActorWrapper; + use crate::actor::context::ActorContext; use crate::actor::handler::Handler; use crate::message::actor_message::BaseActorMessage; -use crate::prelude::{Actor, ActorMessage, ActorResult, BulkActorMessage}; +use crate::prelude::{Actor, ActorMessage, ActorResult, ActorWrapper, BulkActorMessage}; +use crate::router::SendToAllTargetsMessage; use crate::routers::add_actor_message::AddActorMessage; use crate::routers::bulk_router_message::BulkRouterMessage; use crate::routers::remove_actor_message::RemoveActorMessage; -use log::error; +use hashring::HashRing; +use log::{debug, error, info, warn}; use std::collections::HashMap; use std::error::Error; +pub trait Router: Actor + Handler>{} + +impl Router for ShardedRouter + where + A: Actor {} + pub struct ShardedRouter where A: Actor, { num_shards: usize, route_to: Vec>, + hash_ring: HashRing, sharding: HashMap>, can_route: bool, + stop_on_system_stop: bool, + stop_on_empty_targets: bool, } /// implements [ActorFactory](../prelude/trait.ActorFactory.html) to spawn a ShardedRouter within an [ActorSystem](../prelude/struct.ActorSystem.html) @@ -32,9 +43,11 @@ where /// use tyra::prelude::*; /// use std::process::exit; /// use std::time::Duration; +/// use serde::Serialize; /// use tyra::router::{ShardedRouterFactory, AddActorMessage}; /// /// // define message +/// #[derive(Hash, Serialize)] /// struct FooBar {} /// impl ActorMessage for FooBar {} /// @@ -70,7 +83,7 @@ where /// .unwrap(); /// /// // create the router, fill it, and route a message -/// let router_factory = ShardedRouterFactory::new(); +/// let router_factory = ShardedRouterFactory::new(true, true); /// let router = actor_system /// .builder() /// .spawn("router-hello-world", router_factory) @@ -78,11 +91,21 @@ where /// router.send(AddActorMessage::new(actor.clone())).unwrap(); /// router.send(FooBar{}).unwrap(); /// ``` -pub struct ShardedRouterFactory {} +#[derive(Clone)] +pub struct ShardedRouterFactory { + /// defines if the actor should automatically be stopped when the system is stopped. If set to false it's up to the user to setup their own shutdown process if they want a quick and clean exit + stop_on_system_stop: bool, + /// defines if the actor should automatically be stopped if it receives a message after all targets have been automatically removed + /// this does not apply if the last target has been removed through a `RemoveActorMessage + stop_on_empty_targets: bool, +} impl ShardedRouterFactory { - pub fn new() -> Self { - Self {} + pub fn new(stop_on_system_stop: bool, stop_on_empty_targets: bool) -> Self { + Self { + stop_on_system_stop, + stop_on_empty_targets, + } } } @@ -94,55 +117,71 @@ where &mut self, _context: ActorContext>, ) -> Result, Box> { - return Ok(ShardedRouter::new()); + return Ok(ShardedRouter::new( + self.stop_on_system_stop, + self.stop_on_empty_targets, + )); } } impl ShardedRouter where - A: Actor, + A: Actor + 'static, { - pub fn new() -> Self { + pub fn new(stop_on_system_stop: bool, stop_on_empty_targets: bool) -> Self { Self { num_shards: 0, route_to: Vec::new(), + hash_ring: HashRing::new(), sharding: HashMap::new(), can_route: false, + stop_on_system_stop, + stop_on_empty_targets, } } +} - fn recalculate_shards(&mut self) { - let num_routees = self.route_to.len(); - self.num_shards = self.route_to.len() * 5; - self.sharding.clear(); - for i in 0..self.num_shards { - let routee = self.route_to.get(i % num_routees).unwrap().clone(); - self.sharding.insert(i, routee); +impl Actor for ShardedRouter +where + A: Actor + 'static, +{ + fn on_system_stop( + &mut self, + context: &ActorContext, + ) -> Result> { + if self.stop_on_system_stop { + let result = context.actor_ref.stop(); + if result.is_err() { + error!( + "Could not forward message ActorStopMessage to target {}", + context.actor_ref.get_address().actor + ); + return Ok(ActorResult::Stop); + } } + return Ok(ActorResult::Ok); } } -impl Actor for ShardedRouter where A: Actor {} - impl Handler> for ShardedRouter where - A: Actor, + A: Actor + 'static, { fn handle( &mut self, msg: AddActorMessage, _context: &ActorContext, ) -> Result> { + self.hash_ring.add(self.route_to.len()); self.route_to.push(msg.actor); self.can_route = true; - self.recalculate_shards(); return Ok(ActorResult::Ok); } } impl Handler> for ShardedRouter where - A: Actor, + A: Actor + 'static, { fn handle( &mut self, @@ -154,8 +193,8 @@ where .iter() .position(|x| x.get_address() == msg.actor.get_address()) { + let _ = self.hash_ring.remove(&pos); self.route_to.remove(pos); - self.recalculate_shards(); } if self.route_to.len() == 0 { self.can_route = false @@ -178,13 +217,46 @@ where return Ok(ActorResult::Ok); } - let shard_id = msg.get_id() % self.num_shards; - let forward_to = self.sharding.get(&shard_id).unwrap(); - let result = forward_to.send(msg); + let hash = msg.get_hash(); + let target; + loop { + let target_id = self.hash_ring.get(&hash); + if target_id.is_none() { + warn!("Can't find target for hash."); + return Ok(ActorResult::Ok); + } + let target_id = target_id.unwrap(); + + let potential_target = self.route_to.get(target_id.clone()); + if potential_target.is_none() { + warn!("Target does not exist."); + return Ok(ActorResult::Ok); + } + let potential_target = potential_target.unwrap(); + if !potential_target.is_stopped() { + target = potential_target; + break; + } + + let target_id = target_id.clone(); + self.route_to.remove(target_id.clone()); + self.hash_ring.remove(&target_id); + if self.route_to.len() == 0 { + if self.stop_on_empty_targets { + debug!("Stopping router, because all targets have been stopped"); + return Ok(ActorResult::Stop); + } + self.can_route = false; + info!("Router has no valid targets to route to. Dropping message."); + return Ok(ActorResult::Ok); + } + } + + let result = target.send(msg); if result.is_err() { error!( "Could not forward message to target {}", - forward_to.get_address().actor + target.get_address().actor ); } return Ok(ActorResult::Ok); @@ -205,6 +277,13 @@ where return Ok(ActorResult::Ok); } + for i in 0..self.route_to.len() { + let target = self.route_to.get(i).unwrap(); + if target.is_stopped() { + self.route_to.remove(i); + } + } + let total_messages = msg.data.len(); let messages_per_routee = total_messages / self.num_shards; @@ -222,3 +301,32 @@ where return Ok(ActorResult::Ok); } } + +impl Handler> for ShardedRouter +where + A: Actor + Handler + 'static, + M: BaseActorMessage + Clone + 'static, +{ + fn handle( + &mut self, + msg: SendToAllTargetsMessage, + _context: &ActorContext, + ) -> Result> { + if !self.can_route { + return Ok(ActorResult::Ok); + } + + for i in 0..self.route_to.len() { + let target = self.route_to.get(i).unwrap(); + if target.is_stopped() { + self.route_to.remove(i); + } + } + + for target in &self.route_to { + let _ = target.send(msg.msg.clone()); + } + + return Ok(ActorResult::Ok); + } +} diff --git a/src/system/actor_error.rs b/src/system/actor_error.rs index e08c4c5..8a8a0d0 100644 --- a/src/system/actor_error.rs +++ b/src/system/actor_error.rs @@ -17,4 +17,8 @@ pub enum ActorError { /// Triggered by [ActorBuilder.spawn](../prelude/struct.ActorBuilder.html#method.spawn) if the actor can't be spawned, because the thread-pool does not exist #[error("Actor could not be started, because thread-pool does not exist")] ThreadPoolDoesNotExistError, + + /// Triggered by [ActorBuilder.get_existing](../prelude/struct.ActorBuilder.html#method.get_existing) if the provided ActorAddress does not exist + #[error("Actor does not exist")] + DoesNotExistError, } diff --git a/src/system/actor_system.rs b/src/system/actor_system.rs index 640a729..bd31bba 100644 --- a/src/system/actor_system.rs +++ b/src/system/actor_system.rs @@ -9,9 +9,11 @@ use crate::system::system_state::SystemState; use crate::system::thread_pool_manager::ThreadPoolManager; use crate::system::wakeup_manager::WakeupManager; use dashmap::DashMap; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread::sleep; use std::time::Duration; +use crate::system::cluster::{Cluster, CLUSTER_LB, CLUSTER_POOL}; /// Manages thread pools and actors #[derive(Clone)] @@ -20,8 +22,10 @@ pub struct ActorSystem { thread_pool_manager: ThreadPoolManager, wakeup_manager: WakeupManager, name: String, + hostname: String, config: Arc, internal_actor_manager: InternalActorManager, + sigint_received: Arc, } impl ActorSystem { @@ -42,7 +46,10 @@ impl ActorSystem { std::panic::set_hook(Box::new(|_| {})); } - let thread_pool_config = config.thread_pool.clone(); + let mut thread_pool_config = config.thread_pool.clone(); + if !config.cluster.enabled { + thread_pool_config.config.remove("cluster"); + } let thread_pool_manager = ThreadPoolManager::new(); let wakeup_manager = WakeupManager::new(); @@ -53,7 +60,10 @@ impl ActorSystem { thread_pool_manager.add_pool_with_config(key, value.clone()); thread_pool_max_actors.insert(key.clone(), value.actor_limit); } - let state = SystemState::new(wakeup_manager.clone(), Arc::new(thread_pool_max_actors)); + + let net_worker_lb_address = ActorAddress::new(config.general.hostname.clone(), config.general.name.clone(), CLUSTER_POOL, CLUSTER_LB); + + let state = SystemState::new(wakeup_manager.clone(), Arc::new(thread_pool_max_actors), net_worker_lb_address, config.general.name.clone(), config.general.hostname.clone()); let s = state.clone(); let t = thread_pool_manager.clone(); @@ -72,15 +82,52 @@ impl ActorSystem { thread_pool_manager, wakeup_manager, name: config.general.name.clone(), + hostname: config.general.hostname.clone(), config: Arc::new(config.clone()), internal_actor_manager: InternalActorManager::new(), + sigint_received: Arc::new(AtomicBool::new(false)), }; + if config.general.enable_signal_handling { + let sys = system.clone(); + let graceful_timeout = config.general.graceful_timeout_in_seconds.clone(); + ctrlc::set_handler(move || { + sys.sigint_handler(Duration::from_secs(graceful_timeout)); + }) + .unwrap(); + } + system.internal_actor_manager.init(system.clone()); + if config.cluster.enabled { + Cluster::init(&system, &config.cluster, Duration::from_secs(config.general.graceful_timeout_in_seconds)); + } system } + /// Adds a new named pool using the [default pool configuration](https://github.com/sers-dev/tyra/blob/master/src/config/default.toml) + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```rust + /// use std::time::Duration; + /// use tyra::prelude::{TyraConfig, ActorSystem}; + /// + /// let mut actor_config = TyraConfig::new().unwrap(); + /// //disable automatic setup of sigint handling, so that we can set it manually + /// actor_config.general.enable_signal_handling = false; + /// let actor_system = ActorSystem::new(actor_config); + /// ctrlc::set_handler(move || {actor_system.sigint_handler(Duration::from_secs(60));}).unwrap(); + /// ``` + pub fn sigint_handler(&self, graceful_termination_timeout: Duration) { + if self.sigint_received.load(Ordering::Relaxed) { + self.force_stop(); + } + self.sigint_received.store(true, Ordering::Relaxed); + self.stop_override_graceful_termination_timeout(graceful_termination_timeout); + } /// Adds a new named pool using the [default pool configuration](https://github.com/sers-dev/tyra/blob/master/src/config/default.toml) /// /// # Examples @@ -147,10 +194,12 @@ impl ActorSystem { /// /// ```rust /// use std::error::Error; + /// use serde::Serialize; /// use tyra::prelude::{TyraConfig, ActorSystem, ActorFactory, ActorContext, SerializedMessage, Handler, Actor, ActorResult, ActorMessage}; /// /// struct TestActor {} /// + /// #[derive(Hash, Serialize)] /// struct HelloWorld {} /// impl ActorMessage for HelloWorld {} /// impl Actor for TestActor { @@ -180,9 +229,9 @@ impl ActorSystem { /// let actor_system = ActorSystem::new(actor_config); /// let actor_wrapper = actor_system.builder().spawn("test", TestFactory{}).unwrap(); /// let address = actor_wrapper.get_address(); - /// actor_system.send_to_address(address, SerializedMessage::new(Vec::new())); + /// actor_system.send_to_address(address, Vec::new()); /// ``` - pub fn send_to_address(&self, address: &ActorAddress, msg: SerializedMessage) { + pub fn send_to_address(&self, address: &ActorAddress, msg: Vec) { self.state.send_to_address(address, msg); } @@ -241,12 +290,37 @@ impl ActorSystem { /// /// let actor_config = TyraConfig::new().unwrap(); /// let actor_system = ActorSystem::new(actor_config); - /// actor_system.stop(Duration::from_secs(1)); + /// actor_system.stop_override_graceful_termination_timeout(Duration::from_secs(1)); /// ``` - pub fn stop(&self, graceful_termination_timeout: Duration) { + pub fn stop_override_graceful_termination_timeout(&self, graceful_termination_timeout: Duration) { self.state.stop(graceful_termination_timeout); } + /// Sends a SystemStopMessage to all running Actors, and wakes them up if necessary. + /// Users can implement their own clean system stop behavior, by implementing [Actor.on_system_stop](../prelude/trait.Actor.html#method.on_system_stop) and [Actor.on_actor_stop](../prelude/trait.Actor.html#method.on_actor_stop) + /// + /// System will stop after all actors have been stopped or after `general.graceful_timeout_in_seconds` + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```rust + /// use tyra::prelude::{TyraConfig, ActorSystem, ThreadPoolConfig}; + /// use std::time::Duration; + /// + /// let actor_config = TyraConfig::new().unwrap(); + /// let actor_system = ActorSystem::new(actor_config); + /// actor_system.stop(); + /// ``` + pub fn stop(&self) { + self.stop_override_graceful_termination_timeout(Duration::from_secs(self.config.general.graceful_timeout_in_seconds)); + } + + pub fn force_stop(&self) { + self.state.force_stop(); + } + /// Same as stop, but with fixed user defined exit code /// /// # Examples @@ -263,7 +337,7 @@ impl ActorSystem { /// ``` pub fn stop_with_code(&self, graceful_termination_timeout: Duration, code: i32) { self.state.use_forced_exit_code(code); - self.stop(graceful_termination_timeout); + self.stop_override_graceful_termination_timeout(graceful_termination_timeout); } /// Waits for the system to stop @@ -287,7 +361,7 @@ impl ActorSystem { /// /// let actor_config = TyraConfig::new().unwrap(); /// let actor_system = ActorSystem::new(actor_config); - /// actor_system.stop(Duration::from_secs(3)); + /// actor_system.stop(); /// exit(actor_system.await_shutdown()); /// ``` pub fn await_shutdown(&self) -> i32 { @@ -334,4 +408,23 @@ impl ActorSystem { pub fn get_name(&self) -> &str { &self.name } + + /// Returns the configured hostname of the system + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```rust + /// use tyra::prelude::{TyraConfig, ActorSystem, ThreadPoolConfig}; + /// use std::time::Duration; + /// use std::process::exit; + /// + /// let actor_config = TyraConfig::new().unwrap(); + /// let actor_system = ActorSystem::new(actor_config); + /// let name = actor_system.get_hostname(); + /// ``` + pub fn get_hostname(&self) -> &str { + &self.hostname + } } diff --git a/src/system/cluster.rs b/src/system/cluster.rs new file mode 100644 index 0000000..2386b68 --- /dev/null +++ b/src/system/cluster.rs @@ -0,0 +1,119 @@ +use std::time::Duration; +use log::warn; +use regex::Regex; +use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; +use trust_dns_resolver::error::ResolveResult; +use trust_dns_resolver::Resolver; +use crate::prelude::{ActorSystem, ClusterConfig, NetConfig, NetConnectionType, NetManagerFactory, NetProtocol, NetWorkerFactory}; +use crate::router::{AddActorMessage, ShardedRouterFactory}; + +pub const CLUSTER_POOL: &str = "cluster"; +pub const CLUSTER_LB: &str = "cluster-router"; + +pub struct Cluster { + +} + +impl Cluster { + + fn generate_net_config(from: &Vec, connection_type: NetConnectionType) -> Vec { + let regex = Regex::new("(tcp|udp):\\/\\/(.*):(.*)").unwrap(); + let mut net_configs = Vec::new(); + + for host in from { + let captures = regex.captures(host); + if captures.is_none() { + continue; + } + let captures = captures.unwrap(); + if captures.len() < 3 { + continue; + } + let protocol = if &captures[1] == "tcp" { + NetProtocol::TCP + } else { + NetProtocol::UDP + }; + + let port = if captures.len() == 4 { + captures[3].parse::().unwrap() + } else { + 2022 as usize + }; + + net_configs.push(NetConfig::new(protocol, connection_type, &captures[2], port)); + + } + return net_configs; + } + + fn resolve_dns(from: &Vec) -> Vec { + let mut to_return = Vec::new(); + for member in from { + let resolver = Resolver::new(ResolverConfig::default(), ResolverOpts::default()).unwrap(); + + let response = resolver.lookup_ip(&member.host); + match response { + ResolveResult::Ok(addresses) => { + for address in addresses.iter() { + let mut res = member.clone(); + res.host = format!("{}", address); + to_return.push(res); + } + } + _ => { + warn!("Can't find DNS records for '{}'", &member.host) + } + } + } + + to_return.sort(); + to_return.dedup(); + return to_return; + + } + + fn setup_actors(system: &ActorSystem, server_configs: Vec, client_configs: Vec, graceful_timeout_in_seconds: Duration) { + let worker_factory = NetWorkerFactory::new(); + let router_factory = ShardedRouterFactory::new(false, false); + let router = system.builder().set_pool_name(CLUSTER_POOL).spawn(CLUSTER_LB, router_factory).unwrap(); + + let worker_count = system + .get_available_actor_count_for_pool(CLUSTER_POOL) + .unwrap() - 1; + let workers = system + .builder() + .set_pool_name(CLUSTER_POOL) + .spawn_multiple("cluster-worker", worker_factory.clone(), worker_count) + .unwrap(); + for worker in &workers { + router.send(AddActorMessage::new(worker.clone())).unwrap(); + } + let _actor = system + .builder() + .set_pool_name(CLUSTER_POOL) + .spawn( + "cluster-manager", + NetManagerFactory::new( + server_configs, + client_configs, + graceful_timeout_in_seconds, + Duration::from_secs(3), + workers, + router, + ), + ) + .unwrap(); + } + + pub fn init(system: &ActorSystem, cluster_config: &ClusterConfig, graceful_timeout_in_seconds: Duration) { + + let server_configs = Self::generate_net_config(&cluster_config.hosts, NetConnectionType::SERVER); + let client_configs = Self::generate_net_config(&cluster_config.members, NetConnectionType::CLIENT); + let client_configs = Self::resolve_dns(&client_configs); + + Self::setup_actors(system, server_configs, client_configs, graceful_timeout_in_seconds); + + + } +} \ No newline at end of file diff --git a/src/system/internal_actor_manager.rs b/src/system/internal_actor_manager.rs index 8c4afb0..ecfb966 100644 --- a/src/system/internal_actor_manager.rs +++ b/src/system/internal_actor_manager.rs @@ -24,7 +24,7 @@ impl InternalActorManager { .builder() .set_pool_name("tyra") .set_mailbox_unbounded() - .spawn("delay-router", RoundRobinRouterFactory::new()) + .spawn("delay-router", RoundRobinRouterFactory::new(true, true)) .unwrap(); let remaining_actors = system.get_available_actor_count_for_pool("tyra").unwrap(); for i in 0..remaining_actors { diff --git a/src/system/mod.rs b/src/system/mod.rs index 2ba3a18..657c97b 100644 --- a/src/system/mod.rs +++ b/src/system/mod.rs @@ -5,6 +5,7 @@ pub mod internal_actor_manager; pub mod system_state; mod thread_pool_manager; pub mod wakeup_manager; +mod cluster; pub mod prelude { pub use crate::system::actor_error::ActorError; diff --git a/src/system/system_state.rs b/src/system/system_state.rs index e40fc53..a6ef81c 100644 --- a/src/system/system_state.rs +++ b/src/system/system_state.rs @@ -23,12 +23,18 @@ pub struct SystemState { is_force_stopped: Arc, forced_exit_code: Arc, use_forced_exit_code: Arc, + net_worker_lb_address: ActorAddress, + system_name: String, + hostname: String, } impl SystemState { pub fn new( wakeup_manager: WakeupManager, max_actors_per_pool: Arc>, + net_worker_lb_address: ActorAddress, + system_name: String, + hostname: String, ) -> Self { Self { mailboxes: Arc::new(DashMap::new()), @@ -41,9 +47,17 @@ impl SystemState { is_force_stopped: Arc::new(AtomicBool::new(false)), forced_exit_code: Arc::new(AtomicI32::new(0)), use_forced_exit_code: Arc::new(AtomicBool::new(false)), + net_worker_lb_address, + system_name, + hostname, } } + pub fn force_stop(&self) { + self.is_force_stopped.store(true, Ordering::Relaxed); + self.stop(Duration::from_secs(1)); + } + pub fn stop(&self, graceful_termination_timeout: Duration) { if self.is_stopping() { return; @@ -55,13 +69,14 @@ impl SystemState { fn shutdown(&self, timeout: Duration) { let now = Instant::now(); + while self.get_actor_count() != 0 { - if now.elapsed() >= timeout { + if (timeout.as_secs() > 0 && now.elapsed() >= timeout) || self.is_force_stopped.load(Ordering::Relaxed) { self.is_force_stopped.store(true, Ordering::Relaxed); self.mailboxes.clear(); break; } - sleep(Duration::from_millis(10)); + sleep(Duration::from_millis(100)); } self.is_stopped.store(true, Ordering::Relaxed); } @@ -94,35 +109,24 @@ impl SystemState { self.total_actor_count.load(Ordering::Relaxed) } - pub fn send_to_address(&self, address: &ActorAddress, msg: SerializedMessage) { - let target = self.mailboxes.get(address); + pub fn send_to_address(&self, address: &ActorAddress, msg: Vec) { + let target = if address.system == self.system_name && address.hostname == self.hostname + { + self.mailboxes.get(address) + } else { + self.mailboxes.get(&self.net_worker_lb_address) + }; + if target.is_some() { let target = target.unwrap(); - target.send_serialized(msg); + target.send_serialized(SerializedMessage::new(address.clone(), msg)); if target.is_sleeping() { self.wakeup_manager.wakeup(target.key().clone()); } } } - pub fn remove_mailbox(&self, address: &ActorAddress) { - self.total_actor_count.fetch_sub(1, Ordering::Relaxed); - self.pool_actor_count - .entry(address.pool.clone()) - .and_modify(|v| { - v.fetch_sub(1, Ordering::Relaxed); - }); - self.mailboxes.remove(address); - } - - pub fn add_mailbox( - &self, - address: ActorAddress, - mailbox: Mailbox, - ) -> Result<(), ActorError> - where - A: Handler + 'static, - { + pub fn increase_pool_actor_count(&self, address: &ActorAddress) -> Result<(), ActorError> { let maximum_actor_count = self.max_actors_per_pool.get(&address.pool); if maximum_actor_count.is_none() { return Err(ActorError::ThreadPoolDoesNotExistError); @@ -143,10 +147,31 @@ impl SystemState { current_pool_count.fetch_add(1, Ordering::Relaxed); self.total_actor_count.fetch_add(1, Ordering::Relaxed); - self.mailboxes.insert(address, Arc::new(mailbox)); + return Ok(()); } + pub fn decrease_pool_actor_count(&self, address: &ActorAddress) { + self.pool_actor_count + .entry(address.pool.clone()) + .and_modify(|v| { + v.fetch_sub(1, Ordering::Relaxed); + }); + self.total_actor_count.fetch_sub(1, Ordering::Relaxed); + } + + pub fn remove_mailbox(&self, address: &ActorAddress) { + self.decrease_pool_actor_count(address); + self.mailboxes.remove(address); + } + + pub fn add_mailbox(&self, address: ActorAddress, mailbox: Mailbox) + where + A: Handler + 'static, + { + self.mailboxes.insert(address, Arc::new(mailbox)); + } + pub fn add_pool_actor_limit(&self, pool_name: String, max_actors: usize) { self.max_actors_per_pool.insert(pool_name, max_actors); } @@ -156,11 +181,18 @@ impl SystemState { if maximum_actor_count.is_none() { return Err(ActorError::ThreadPoolDoesNotExistError); } + let maximum_actor_count = maximum_actor_count.unwrap(); let maximum_actor_count = *maximum_actor_count.value(); - let current_pool_count = self.pool_actor_count.get(pool_name).unwrap(); - let current_pool_count = current_pool_count.value().load(Ordering::Relaxed); + let current_pool_count = self.pool_actor_count.get(pool_name); + + let current_pool_count = if current_pool_count.is_some() { + let current_pool_count = current_pool_count.unwrap(); + current_pool_count.value().load(Ordering::Relaxed) + } else { + 0 as usize + }; if maximum_actor_count == 0 { let result = usize::MAX - current_pool_count; @@ -173,19 +205,20 @@ impl SystemState { pub fn get_actor_ref( &self, - address: ActorAddress, + address: &ActorAddress, internal_actor_manager: InternalActorManager, ) -> Result, ActorError> where A: Handler + 'static, { - let mb = self.mailboxes.get(&address).unwrap().value().clone(); + let mb = self.mailboxes.get(address).unwrap().value().clone(); return match mb.as_any().downcast_ref::>() { Some(m) => Ok(ActorWrapper::new( m.clone(), - address, + address.clone(), self.wakeup_manager.clone(), internal_actor_manager, + self.clone(), )), None => Err(ActorError::InvalidActorTypeError), }; diff --git a/src/wrapper/actor_wrapper.rs b/src/wrapper/actor_wrapper.rs new file mode 100644 index 0000000..89ad412 --- /dev/null +++ b/src/wrapper/actor_wrapper.rs @@ -0,0 +1,219 @@ +use crate::actor::actor::Actor; +use crate::actor::actor_address::ActorAddress; +use crate::actor::actor_send_error::ActorSendError; +use crate::actor::handler::Handler; +use crate::actor::mailbox::Mailbox; +use crate::message::actor_message::BaseActorMessage; +use crate::system::internal_actor_manager::InternalActorManager; +use crate::system::system_state::SystemState; +use crate::system::wakeup_manager::WakeupManager; +use crate::wrapper::local_actor_wrapper::LocalActorWrapper; +use crate::wrapper::remote_actor_wrapper::RemoteActorWrapper; +use serde::{Deserialize, Serialize}; +use std::fmt::{Debug, Formatter}; +use std::hash::{Hash, Hasher}; +use std::panic::UnwindSafe; +use std::time::Duration; +use crate::prelude::ActorSystem; + +#[derive(Serialize, Deserialize)] +#[serde(bound( +serialize = "A: Actor", +deserialize = "A: Actor", +))] +pub struct ActorWrapper +where + A: Actor, +{ + address: ActorAddress, + remote: RemoteActorWrapper, + #[serde(skip)] + local: Option>, +} + +impl Debug for ActorWrapper +where + A: Actor, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "") + } +} + +impl UnwindSafe for ActorWrapper where A: Actor {} + +impl ActorWrapper +where + A: Actor + UnwindSafe + 'static, +{ + /// Automatically called by the [ActorBuilder.spawn](../prelude/struct.ActorBuilder.html#method.spawn) + pub fn new( + mailbox: Mailbox, + address: ActorAddress, + wakeup_manager: WakeupManager, + internal_actor_manager: InternalActorManager, + system_state: SystemState, + ) -> Self { + let local = Some(LocalActorWrapper::new( + mailbox, + wakeup_manager, + internal_actor_manager, + )); + let remote = RemoteActorWrapper::new(system_state); + Self { + address, + remote, + local, + } + } + + /// Sends a message to the actor that is then processed through the corresponding Handler implementation + /// Blocks until message has been sent, or fails if the target has been stopped + /// It is NOT recommended to use this to send messages to Actors with a limited mailbox. Use send_timeout() or send_after() for these cases + pub fn send(&self, msg: M) -> Result<(), ActorSendError> + where + A: Handler, + M: BaseActorMessage + 'static, + { + if self.local.is_some() { + return self.local.as_ref().unwrap().send(msg, self.address.clone()); + } + return self.remote.send(msg, &self.address); + } + + /// Same as send, but with a user defined timeout + pub fn send_timeout(&self, msg: M, timeout: Duration) -> Result<(), ActorSendError> + where + A: Handler, + M: BaseActorMessage + 'static, + { + if self.local.is_some() { + return self + .local + .as_ref() + .unwrap() + .send_timeout(msg, timeout, self.address.clone()); + } + return self.remote.send_timeout(msg, timeout, self.address.clone()); + } + + /// Sends a message to the actor after a specified delay + pub fn send_after(&self, msg: M, delay: Duration) -> Result<(), ActorSendError> + where + A: Handler + 'static, + M: BaseActorMessage + 'static, + { + if self.local.is_some() { + return self + .local + .as_ref() + .unwrap() + .send_after(msg, delay, self.clone()); + } + return self.remote.send_after(msg, delay, self.address.clone()); + } + + /// Tells the actor to stop accepting message and to shutdown after all existing messages have been processed + pub fn stop(&self) -> Result<(), ActorSendError> { + if self.local.is_some() { + return self.local.as_ref().unwrap().stop(self.address.clone()); + } + return self.remote.stop(); + } + + /// Tells the actor to sleep for the specified duration + pub fn sleep(&self, duration: Duration) -> Result<(), ActorSendError> { + if self.local.is_some() { + return self + .local + .as_ref() + .unwrap() + .sleep(duration, self.address.clone()); + } + return self.remote.sleep(duration, self.address.clone()); + } + + /// Returns a reference to the address of the actor + pub fn get_address(&self) -> &ActorAddress { + &self.address + } + + /// Returns the current mailbox size + pub fn get_mailbox_size(&self) -> usize { + if self.local.is_some() { + return self.local.as_ref().unwrap().get_mailbox_size(); + } + return self.remote.get_mailbox_size(); + } + + /// Returns true if an actor is no longer accepting messages + pub fn is_mailbox_stopped(&self) -> bool { + if self.local.is_some() { + return self.local.as_ref().unwrap().is_mailbox_stopped(); + } + return self.remote.is_mailbox_stopped(); + } + + /// Returns true if an actor has been completely stopped after processing all messages that are still within the queue + pub fn is_stopped(&self) -> bool { + if self.local.is_some() { + return self.local.as_ref().unwrap().is_stopped(); + } + return self.remote.is_stopped(); + } + + /// Blocks until the actor has been stopped + pub fn wait_for_stop(&self) { + if self.local.is_some() { + return self + .local + .as_ref() + .unwrap() + .wait_for_stop(self.address.clone()); + } + return self.remote.wait_for_stop(); + } + + /// This function is required after deserializing any ActorWrapper + /// If the actor is a local actor it will restore the LocalActorWrapper and any message can be sent directly to the actor again + /// If the actor is a remote actor it will add the SystemState to it, so that the messages can be forwarded to the destination + /// It is technically impossible to deserialize a working LocalActor or a RemoteActor directly, which is why this helper is required to feed the unserializable information back into it + pub fn init_after_deserialize(&mut self, system: &ActorSystem) { + let actor_wrapper = system.builder::().init_after_deserialize(self.get_address()); + self.local = actor_wrapper.local; + self.remote = actor_wrapper.remote; + } + + /// Returns an ActorWrapper that will be handled as a remote actor. + /// If the actor exists locally tru `init_after_deserialize` or `ActorBuilder::get_existing()` instead + pub fn from_address(address: ActorAddress, system_state: SystemState) -> Self { + return Self { + address, + remote: RemoteActorWrapper::new(system_state), + local: None, + } + } + +} + +impl Clone for ActorWrapper +where + A: Actor + UnwindSafe, +{ + fn clone(&self) -> Self { + Self { + remote: self.remote.clone(), + local: self.local.clone(), + address: self.address.clone(), + } + } +} + +impl Hash for ActorWrapper +where + A: Actor + UnwindSafe, +{ + fn hash(&self, state: &mut H) { + self.address.hash(state); + } +} diff --git a/src/actor/actor_wrapper.rs b/src/wrapper/local_actor_wrapper.rs similarity index 55% rename from src/actor/actor_wrapper.rs rename to src/wrapper/local_actor_wrapper.rs index 4cb5b1f..ab59e93 100644 --- a/src/actor/actor_wrapper.rs +++ b/src/wrapper/local_actor_wrapper.rs @@ -1,3 +1,4 @@ +use crate::actor::actor::Actor; use crate::actor::actor_address::ActorAddress; use crate::actor::actor_send_error::ActorSendError; use crate::actor::handler::Handler; @@ -5,58 +6,39 @@ use crate::actor::mailbox::Mailbox; use crate::message::actor_message::BaseActorMessage; use crate::message::actor_stop_message::ActorStopMessage; use crate::message::sleep_message::SleepMessage; -use crate::prelude::Actor; +use crate::prelude::ActorWrapper; use crate::system::internal_actor_manager::InternalActorManager; use crate::system::wakeup_manager::WakeupManager; -use std::fmt::{Debug, Formatter}; use std::panic::UnwindSafe; +use std::thread::sleep; use std::time::Duration; -/// Wrapper used to interact with [Actor] -pub struct ActorWrapper +pub struct LocalActorWrapper where A: Actor, { mailbox: Mailbox, - address: ActorAddress, wakeup_manager: WakeupManager, internal_actor_manager: Box, } -impl Debug for ActorWrapper -where - A: Actor, -{ - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "") - } -} - -impl UnwindSafe for ActorWrapper where A: Actor {} - -impl ActorWrapper +impl LocalActorWrapper where A: Actor + UnwindSafe, { - /// Automatically called by the [ActorBuilder.build](../prelude/struct.ActorBuilder.html#method.build) pub fn new( mailbox: Mailbox, - address: ActorAddress, wakeup_manager: WakeupManager, internal_actor_manager: InternalActorManager, ) -> Self { Self { mailbox, - address, wakeup_manager, internal_actor_manager: Box::new(internal_actor_manager), } } - /// Sends a message to the actor that is then processed through the corresponding Handler implementation - /// Blocks until message has been sent, or fails if the target has been stopped - /// It is NOT recommended to use this to send messages to Actors with a limited mailbox. Use send_timeout() or send_after() for these cases - pub fn send(&self, msg: M) -> Result<(), ActorSendError> + pub fn send(&self, msg: M, address: ActorAddress) -> Result<(), ActorSendError> where A: Handler, M: BaseActorMessage + 'static, @@ -72,14 +54,18 @@ where } if self.mailbox.is_sleeping() { - self.wakeup_manager.wakeup(self.address.clone()); + self.wakeup_manager.wakeup(address); } return Ok(()); } - /// Same as send, but with a user defined timeout - pub fn send_timeout(&self, msg: M, timeout: Duration) -> Result<(), ActorSendError> + pub fn send_timeout( + &self, + msg: M, + timeout: Duration, + address: ActorAddress, + ) -> Result<(), ActorSendError> where A: Handler, M: BaseActorMessage + 'static, @@ -95,14 +81,18 @@ where } if self.mailbox.is_sleeping() { - self.wakeup_manager.wakeup(self.address.clone()); + self.wakeup_manager.wakeup(address); } return Ok(()); } - /// Sends a message to the actor after a specified delay - pub fn send_after(&self, msg: M, delay: Duration) -> Result<(), ActorSendError> + pub fn send_after( + &self, + msg: M, + delay: Duration, + destination: ActorWrapper, + ) -> Result<(), ActorSendError> where A: Handler + 'static, M: BaseActorMessage + 'static, @@ -112,32 +102,40 @@ where } self.internal_actor_manager - .send_after(msg, self.clone(), delay); + .send_after(msg, destination, delay); return Ok(()); } - /// Tells the actor to stop accepting message and to shutdown after all existing messages have been processed - pub fn stop(&self) -> Result<(), ActorSendError> { - return self.send(ActorStopMessage::new()); + pub fn stop(&self, address: ActorAddress) -> Result<(), ActorSendError> { + return self.send(ActorStopMessage::new(), address); } - /// Tells the actor to sleep for the specified duration - pub fn sleep(&self, duration: Duration) -> Result<(), ActorSendError> { - return self.send(SleepMessage { duration }); - } - - /// Returns a reference to the address of the actor - pub fn get_address(&self) -> &ActorAddress { - &self.address + pub fn sleep(&self, duration: Duration, address: ActorAddress) -> Result<(), ActorSendError> { + return self.send(SleepMessage { duration }, address); } pub fn get_mailbox_size(&self) -> usize { return self.mailbox.len(); } + + pub fn is_mailbox_stopped(&self) -> bool { + return self.mailbox.is_stopped(); + } + + pub fn is_stopped(&self) -> bool { + return self.get_mailbox_size() == 0 && self.mailbox.is_stopped(); + } + + pub fn wait_for_stop(&self, address: ActorAddress) { + let _ = self.stop(address); + while !self.is_stopped() { + sleep(Duration::from_millis(25)); + } + } } -impl Clone for ActorWrapper +impl Clone for LocalActorWrapper where A: Actor + UnwindSafe, { @@ -145,7 +143,6 @@ where Self { wakeup_manager: self.wakeup_manager.clone(), mailbox: self.mailbox.clone(), - address: self.address.clone(), internal_actor_manager: self.internal_actor_manager.clone(), } } diff --git a/src/wrapper/mod.rs b/src/wrapper/mod.rs new file mode 100644 index 0000000..a18e63f --- /dev/null +++ b/src/wrapper/mod.rs @@ -0,0 +1,7 @@ +pub mod actor_wrapper; +mod local_actor_wrapper; +mod remote_actor_wrapper; + +pub mod prelude { + pub use crate::wrapper::actor_wrapper::ActorWrapper; +} diff --git a/src/wrapper/remote_actor_wrapper.rs b/src/wrapper/remote_actor_wrapper.rs new file mode 100644 index 0000000..ec87572 --- /dev/null +++ b/src/wrapper/remote_actor_wrapper.rs @@ -0,0 +1,91 @@ +use crate::actor::actor_address::ActorAddress; +use crate::actor::actor_send_error::ActorSendError; +use crate::message::actor_message::BaseActorMessage; +use crate::system::system_state::SystemState; +use serde::{Deserialize, Serialize}; +use std::hash::{Hash, Hasher}; +use std::time::Duration; +use crate::prelude::ActorSendError::NotAllowedForRemoteActorError; + +#[derive(Serialize, Deserialize, Clone)] +pub struct RemoteActorWrapper { + #[serde(skip)] + system_state: Option, +} + +impl RemoteActorWrapper { + pub fn new(system_state: SystemState) -> Self { + let system_state = Some(system_state); + return Self { system_state }; + } + + pub fn send(&self, msg: M, address: &ActorAddress) -> Result<(), ActorSendError> + where + M: BaseActorMessage + 'static, + { + let serialized = bincode::serialize(&msg).unwrap(); + self.system_state + .as_ref() + .unwrap() + .send_to_address(address, serialized); + return Ok(()); + } + + pub fn send_timeout( + &self, + msg: M, + _timeout: Duration, + address: ActorAddress, + ) -> Result<(), ActorSendError> + where + M: BaseActorMessage + 'static, + { + // since we don't work with the mailbox directly for remote actors, we can't send messages with timeout + // instead of giving an error, we'll simply send the message and ignore the timeout + return self.send(msg, &address); + } + + pub fn send_after( + &self, + _msg: M, + _delay: Duration, + _address: ActorAddress, + ) -> Result<(), ActorSendError> + where + M: BaseActorMessage + 'static, + { + //send serialized version of DelayedMessage to system_state + //destination address in SerializedMessage needs to be the delay-router on the remote system + return Ok(()); + } + + pub fn stop(&self) -> Result<(), ActorSendError> { + return Err(NotAllowedForRemoteActorError); + } + + pub fn sleep(&self, _duration: Duration, _address: ActorAddress) -> Result<(), ActorSendError> { + return Err(NotAllowedForRemoteActorError); + } + + pub fn get_mailbox_size(&self) -> usize { + return 0; + } + + pub fn is_mailbox_stopped(&self) -> bool { + return false; + } + + pub fn is_stopped(&self) -> bool { + return false; + } + + pub fn wait_for_stop(&self) { + return; + } +} + +impl Hash for RemoteActorWrapper { + fn hash(&self, _state: &mut H) { + return; + } +}