Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
ceae9fb
upgrade dependencies
Bobonium Apr 1, 2023
cb58f8c
update changelog
Bobonium Apr 1, 2023
20b9380
wip: introduce networking
Bobonium Apr 9, 2023
e7f06cf
improve routers and signal handling
Bobonium Apr 15, 2023
7abcdb8
WIP: networking
Bobonium Apr 15, 2023
778d001
add alternative approach to networking, based on mio
Bobonium Apr 24, 2023
fd0babc
added support for udp
Bobonium Apr 25, 2023
5a4604a
net_worker can now be dynamically be provided to net_manager for diff…
Bobonium Apr 26, 2023
0dc2a58
add spawn_multiple() to builder
Bobonium Apr 26, 2023
5695534
add test and documentation for spawn_multiple
Bobonium Apr 26, 2023
458c670
add SendToAllTargetsMessage to all router implementations
Bobonium Apr 26, 2023
2a6d741
properly shutdown udp sockets if process is killed
Bobonium Apr 28, 2023
4c592f7
improve net shutdown behavior
Bobonium Apr 28, 2023
9603fc9
net: allow configuration of maximum amount of worker actors
Bobonium Apr 28, 2023
7a875ad
update sharding_router to make use of HashRing
Bobonium Apr 28, 2023
942b7bf
rework sharded_router and implement Hash requirement to ActorMessage
Bobonium Apr 28, 2023
bc8936b
fix broken tests and examples
Bobonium Apr 30, 2023
d32f56d
harden net implementation by handling all potential errors
Bobonium Apr 30, 2023
b4b821d
WIP: rewrite ActorWrapper to be compatible with remoting
Bobonium May 2, 2023
2ab3757
require all ActorMessages to be serializable
Bobonium May 2, 2023
8e80988
fix delayed messages no longer working after reworking ActorWrapper; …
Bobonium May 7, 2023
b503d42
fix documentation
Bobonium May 7, 2023
f63633d
remove redundant bool from actor_wrapper
Bobonium May 7, 2023
596f30a
get rid of warnings
Bobonium May 7, 2023
4f8a090
cargo fmt
Bobonium May 7, 2023
a323340
added proper Serialize and Deserialize to AddActorMessage<A>
Bobonium May 7, 2023
c344d65
fix serde setup for ActorWrapper<A>
Bobonium May 7, 2023
f407a78
add ability to re-initialize ActorWrapper<A> after deserializing
Bobonium May 7, 2023
9bfba6d
fix router and worker name for netmanager created actors
Bobonium May 8, 2023
4896439
preparation for cluster implementation
Bobonium May 8, 2023
e6e9aa2
fix incorrectly hardcoded remote string
Bobonium May 10, 2023
dcf8f4b
add new hostname config and rename ActorAddress.remote to ActorAddres…
Bobonium May 10, 2023
f7e16c4
allow dynamic router injection into net_manager
Bobonium May 19, 2023
e1cf846
remove redundant '<>'
Bobonium May 19, 2023
2898687
WIP: start cluster integration into actor_system
Bobonium May 21, 2023
3c46739
WIP: implement clustering
Bobonium May 22, 2023
72ed4f4
WIP: implement clustering
Bobonium May 22, 2023
0a2f6ee
provide client_configs to net_manager
Bobonium May 23, 2023
9bf8e43
add simple_log dependency
Bobonium May 29, 2023
48c165a
wip
Bobonium Jun 26, 2023
7ba3d8b
upgrade dependencies; resolve warnings
Bobonium Mar 26, 2024
01176f0
upgrade dependencies
Bobonium Oct 24, 2024
6f5726c
rework graceful timeout to be a globally configured value used for st…
Bobonium Oct 29, 2024
04a299e
Merge branch 'master' into networking
Bobonium Oct 9, 2025
a0364a2
upgrade dependencies
Bobonium Oct 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,32 @@
# 1.1.0

- upgrade dependencies
- `LeastMessageRouter.min_mailbox_size` is now working correctly
- Routers now come with 2 new configurations to configure behavior
- `stop_on_system_stop` => if false, the user needs to manually stop the router for a clean and quick shutdown of the system
- `stop_on_empty_targets` => automatically stops the router if there are no more targets to receive a message to be routed. This does not apply to manually removed targets
- Routers now automatically remove stopped actors from their target pool if they have been stopped
- `Sharded_Router` now makes use of `HashRing`
- `ActorMessage` now needs to implement `Hash` and no longer need to explicitly implement `ActorMessage.get_id()` to be able to properly make use of the `ShardedRouter`
- renamed `ActorMessage.get_id()` to `ActorMessage.get_hash()` and changed return type to u64
- changed default implementation to `ActorMessage.get_hash()` to make use of `Hash` implementation
- `ActorMessage` now needs to implement `Serialize`
- this requirement comes from the fact, that all messages need to be serializable in theory to be able to be sent to other actor systems
- if a message is really intended to be sent it should obviously also implement `Deserialize`
- Added `.is_mailbox_stopped()`, `is_stopped()` and `wait_for_stop()` to `ActorWrapper<A>`
- Added ability to re-initialize `ActorWrapper<A>` after deserializing
- Added `general.graceful_timeout_in_seconds` to config
- reworked `system.stop()` to make use of configured value
- added `system.stop_override_graceful_termination_timeout(Duration)` to allow override of configured default
- value is also used for signal handling graceful timeout and timeout for network manager
- Added `ActorBuilder<A>.spawn_multiple()`
- All routers now support a `SendToAllTargetsMessage<M>` that will forward `M` to all active targets
- Users can now use `ActorBuilder.get_existing(address: ActorAddress)` to get an `ActorWrapper<A>` for an already existing `Actor`
- `ActorBuilder.spawn()` will continue to return the `ActorWrapper<A>` for already existing actors
- renamed config `global.name` to `global.hostname`
- also renamed `ActorAddress.remote` to `ActorAddress.hostname`
- added new config `global.name` that defaults to the cargo package name or `tyra` if the application is not built using cargo

# 1.0.0

- added `LeastMessageRouter`
Expand Down
32 changes: 21 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,27 @@ name = "tyra"
path = "src/lib.rs"

[dependencies]
config = "0.13.4"
hostname = "0.3.1"
num_cpus = "1.16.0"
config = "0.15.18"
hostname = "0.4.1"
num_cpus = "1.17.0"
threadpool = "1.8.1"
crossbeam-channel = "0.5.13"
flume = "0.10.14"
dashmap = "5.5.3"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
log = "0.4"
crossbeam-channel = "0.5.15"
flume = "0.11.1"
dashmap = "6.1.0"
serde = { version = "1.0.228", features = ["derive"] }
serde-encrypt = "0.7.0"
thiserror = "2.0.17"
log = "0.4.28"
ctrlc = { version = "3.5.0", features = ["termination"] }
mio = {version = "1.0.4", features = ["os-poll", "os-ext", "net"]}
quiche = "0.24.6"
io-arc = "1.0.0"
hashring = "0.3.6"
bincode = "1.3.3"
regex = "1.11.3"
trust-dns-resolver = "0.23.2"


[dev-dependencies]
bincode = "1.3.3"
ntest = "0.8.1"
ntest = "0.9.3"
simple_logger = "5.0.0"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl ActorFactory<TestActor> for TestActorFactory {
impl Handler<TestMessage> for TestActor {
fn handle(&mut self, _msg: TestMessage, context: &ActorContext<Self>) -> Result<ActorResult, Box<dyn Error>> {
println!("HELLO WORLD!");
context.system.stop(Duration::from_millis(1000));
context.system.stop();
Ok(ActorResult::Ok)
}
}
Expand Down
8 changes: 6 additions & 2 deletions examples/benchmark_bulk_router.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
use serde::Serialize;
use std::error::Error;
use std::process::exit;
use std::thread::sleep;
use std::time::{Duration, Instant};
use tyra::prelude::*;
use tyra::router::{AddActorMessage, BulkRouterMessage, RoundRobinRouterFactory};

#[derive(Hash, Serialize)]
struct MessageA {}

impl ActorMessage for MessageA {}

#[derive(Hash, Serialize)]
struct Finish {}

impl ActorMessage for Finish {}

#[derive(Hash, Serialize)]
struct Start {}

impl ActorMessage for Start {}
Expand Down Expand Up @@ -139,7 +143,7 @@ impl Handler<Finish> for Aggregator {
"{} It took {:?} to finish {} actors",
self.name, duration, self.total_actors
);
self.ctx.system.stop(Duration::from_secs(60));
self.ctx.system.stop();
}
Ok(ActorResult::Ok)
}
Expand All @@ -165,7 +169,7 @@ fn main() {
// ideal number is "amount of threads - 3"
let actor_count = 7;

let router_factory = RoundRobinRouterFactory::new();
let router_factory = RoundRobinRouterFactory::new(true, true);
let router = actor_system
.builder()
.spawn("benchmark-router", router_factory)
Expand Down
10 changes: 7 additions & 3 deletions examples/benchmark_router_round_robin.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
use serde::Serialize;
use std::error::Error;
use std::process::exit;
use std::time::{Duration, Instant};
use std::time::Instant;
use tyra::prelude::*;
use tyra::router::{AddActorMessage, RoundRobinRouterFactory};

#[derive(Hash, Serialize)]
struct MessageA {}

impl ActorMessage for MessageA {}

#[derive(Hash, Serialize)]
struct Finish {}

impl ActorMessage for Finish {}

#[derive(Hash, Serialize)]
struct Start {}

impl ActorMessage for Start {}
Expand Down Expand Up @@ -134,7 +138,7 @@ impl Handler<Finish> for Aggregator {
"{} It took {:?} to finish {} actors",
self.name, duration, self.total_actors
);
self.ctx.system.stop(Duration::from_secs(60));
self.ctx.system.stop();
}
Ok(ActorResult::Ok)
}
Expand All @@ -158,7 +162,7 @@ fn main() {
let message_count = 10000000;
let actor_count = 10;

let router_factory = RoundRobinRouterFactory::new();
let router_factory = RoundRobinRouterFactory::new(true, true);
let router = actor_system
.builder()
.spawn("benchmark-router", router_factory)
Expand Down
6 changes: 4 additions & 2 deletions examples/benchmark_single_actor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use serde::Serialize;
use std::error::Error;
use std::process::exit;
use std::time::{Duration, Instant};
use std::time::Instant;
use tyra::prelude::*;

#[derive(Hash, Serialize)]
struct MessageA {}

impl ActorMessage for MessageA {}
Expand Down Expand Up @@ -61,7 +63,7 @@ impl Handler<MessageA> for Benchmark {
);
}
if self.count == self.total_msgs {
context.system.stop(Duration::from_secs(60));
context.system.stop();
}
Ok(ActorResult::Ok)
}
Expand Down
4 changes: 3 additions & 1 deletion examples/benchmark_single_actor_process_after_send.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use serde::Serialize;
use std::error::Error;
use std::process::exit;
use std::time::{Duration, Instant};
use tyra::prelude::*;

#[derive(Hash, Serialize)]
struct MessageA {}

impl ActorMessage for MessageA {}
Expand Down Expand Up @@ -61,7 +63,7 @@ impl Handler<MessageA> for Benchmark {
);
}
if self.count == self.total_msgs {
context.system.stop(Duration::from_secs(60));
context.system.stop();
}
Ok(ActorResult::Ok)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use serde::Serialize;
use std::error::Error;
use std::process::exit;
use std::time::{Duration, Instant};
use tyra::prelude::*;

#[derive(Hash, Serialize)]
struct MessageA {}

impl ActorMessage for MessageA {}
Expand Down Expand Up @@ -61,7 +63,7 @@ impl Handler<MessageA> for Benchmark {
);
}
if self.count == self.total_msgs {
context.system.stop(Duration::from_secs(60));
context.system.stop();
}
Ok(ActorResult::Ok)
}
Expand Down
6 changes: 4 additions & 2 deletions examples/benchmark_single_actor_single_thread.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use serde::Serialize;
use std::error::Error;
use std::process::exit;
use std::time::{Duration, Instant};
use std::time::Instant;
use tyra::prelude::*;

#[derive(Hash, Serialize)]
struct MessageA {}

impl ActorMessage for MessageA {}
Expand Down Expand Up @@ -61,7 +63,7 @@ impl Handler<MessageA> for Benchmark {
);
}
if self.count == self.total_msgs {
context.system.stop(Duration::from_secs(60));
context.system.stop();
}
Ok(ActorResult::Ok)
}
Expand Down
20 changes: 20 additions & 0 deletions examples/net.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use simple_logger::SimpleLogger;
use tyra::prelude::*;

fn main() {
SimpleLogger::new().init().unwrap();

// generate config
let mut actor_config = TyraConfig::new().unwrap();
actor_config.cluster.enabled = true;
let cluster = ThreadPoolConfig::new(22, 4, 4, 1.00);
actor_config
.thread_pool
.config
.insert(String::from("mio"), cluster);
// start system with config
let actor_system = ActorSystem::new(actor_config);


std::process::exit(actor_system.await_shutdown());
}
6 changes: 4 additions & 2 deletions examples/quickstart.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use serde::Serialize;
use std::error::Error;
use std::time::Duration;
use tyra::prelude::*;

// define an `ActorMessage` that can be sent to `Actors` that implement the corresponding `Handler<T>`

#[derive(Hash, Serialize)]
struct TestMessage {}
impl TestMessage {
pub fn new() -> Self {
Expand Down Expand Up @@ -44,7 +46,7 @@ impl Handler<TestMessage> for TestActor {
context: &ActorContext<Self>,
) -> Result<ActorResult, Box<dyn Error>> {
println!("HELLO WORLD!");
context.system.stop(Duration::from_millis(1000));
context.system.stop();
Ok(ActorResult::Ok)
}
}
Expand Down
28 changes: 12 additions & 16 deletions examples/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::process::exit;
use std::time::{Duration, Instant};
use std::time::Duration;
use tyra::prelude::*;

#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
#[derive(Serialize, Deserialize, Hash, Clone)]
struct TestMsg {
content: String,
actor_wrapper: ActorWrapper<RemoteActor>,
}

impl ActorMessage for TestMsg {}

#[derive(Clone)]
struct RemoteActor {}

impl Actor for RemoteActor {
Expand All @@ -24,10 +24,9 @@ impl Actor for RemoteActor {
if result.is_err() {
return Ok(ActorResult::Ok);
}
let decoded: TestMsg = result.unwrap();
context
.actor_ref
.send_after(decoded, Duration::from_millis(50))?;
let mut deserialized: TestMsg = result.unwrap();
deserialized.actor_wrapper.init_after_deserialize(&context.system);
deserialized.actor_wrapper.send_after(deserialized.clone(), Duration::from_millis(50))?;
Ok(ActorResult::Ok)
}
}
Expand All @@ -36,10 +35,11 @@ impl Handler<TestMsg> for RemoteActor {
fn handle(
&mut self,
msg: TestMsg,
_context: &ActorContext<Self>,
context: &ActorContext<Self>,
) -> Result<ActorResult, Box<dyn Error>> {
println!("{}", msg.content);
Ok(ActorResult::Ok)
context.system.stop();
Ok(ActorResult::Stop)
}
}

Expand All @@ -59,19 +59,15 @@ fn main() {
let actor_system = ActorSystem::new(actor_config);

let hw = RemoteActorFactory {};
let x = actor_system.builder().spawn("hello-world", hw).unwrap();
let remote_actor = actor_system.builder().spawn("hello-world", hw).unwrap();
let msg = TestMsg {
content: String::from("Hello World!"),
actor_wrapper: remote_actor.clone(),
};
let serialized = bincode::serialize(&msg).unwrap();
actor_system.send_to_address(x.get_address(), SerializedMessage::new(serialized));
let start = Instant::now();
actor_system.send_to_address(remote_actor.get_address(), serialized);

actor_system.stop(Duration::from_secs(10));
let result = actor_system.await_shutdown();

let duration = start.elapsed();
println!("It took {:?} to send stop", duration);

exit(result);
}
Loading