From d54b32e507f7e0eac750787462b893b5df9c352c Mon Sep 17 00:00:00 2001 From: parzivale <40600864+parzivale@users.noreply.github.com> Date: Fri, 18 Jul 2025 17:21:14 +0200 Subject: [PATCH 1/3] Feat: Marked SendFuture and SendWrapper as unsafe types Moved away from general unsafe impls to better communicate the usage style of SendFutures and how they should be used, removed unneeded Sync + Send impls from the Ai Module (Can instead implement SendWrapper) --- worker-sandbox/src/alarm.rs | 1 - worker-sandbox/src/analytics_engine.rs | 3 +-- worker-sandbox/src/assets.rs | 1 - worker-sandbox/src/auto_response.rs | 1 - worker-sandbox/src/cache.rs | 5 ----- worker-sandbox/src/counter.rs | 2 -- worker-sandbox/src/d1.rs | 14 -------------- worker-sandbox/src/durable.rs | 4 ---- worker-sandbox/src/fetch.rs | 9 --------- worker-sandbox/src/form.rs | 4 ---- worker-sandbox/src/js_snippets.rs | 2 -- worker-sandbox/src/kv.rs | 8 -------- worker-sandbox/src/put_raw.rs | 1 - worker-sandbox/src/queue.rs | 2 -- worker-sandbox/src/r2.rs | 8 -------- worker-sandbox/src/request.rs | 6 ------ worker-sandbox/src/router.rs | 9 ++++++--- worker-sandbox/src/service.rs | 2 -- worker-sandbox/src/socket.rs | 10 +++------- worker-sandbox/src/sql_counter.rs | 1 - worker-sandbox/src/sql_iterator.rs | 1 - worker-sandbox/src/ws.rs | 1 - worker/src/ai.rs | 5 +---- worker/src/crypto.rs | 2 +- worker/src/rate_limit.rs | 2 +- worker/src/send.rs | 26 ++++++++++++++++++++++++-- worker/src/websocket.rs | 2 +- 27 files changed, 38 insertions(+), 94 deletions(-) diff --git a/worker-sandbox/src/alarm.rs b/worker-sandbox/src/alarm.rs index 213dd29dd..1f06fbe6c 100644 --- a/worker-sandbox/src/alarm.rs +++ b/worker-sandbox/src/alarm.rs @@ -40,7 +40,6 @@ impl DurableObject for AlarmObject { } } -#[worker::send] pub async fn handle_alarm(_req: Request, env: Env, _data: SomeSharedData) -> Result { let namespace = env.durable_object("ALARM")?; let stub = namespace.id_from_name("alarm")?.get_stub()?; diff --git a/worker-sandbox/src/analytics_engine.rs b/worker-sandbox/src/analytics_engine.rs index cd0880d97..152d6f2f8 100644 --- a/worker-sandbox/src/analytics_engine.rs +++ b/worker-sandbox/src/analytics_engine.rs @@ -2,7 +2,6 @@ use super::SomeSharedData; use uuid::Uuid; use worker::{AnalyticsEngineDataPointBuilder, Env, Request, Response, Result}; -#[worker::send] pub async fn handle_analytics_event( req: Request, env: Env, @@ -31,5 +30,5 @@ pub async fn handle_analytics_event( .add_double(200) .write_to(&dataset)?; - return Response::ok("Events sent"); + Response::ok("Events sent") } diff --git a/worker-sandbox/src/assets.rs b/worker-sandbox/src/assets.rs index f3bedc0f7..eb69bfdc1 100644 --- a/worker-sandbox/src/assets.rs +++ b/worker-sandbox/src/assets.rs @@ -17,7 +17,6 @@ pub async fn handle_asset( } #[cfg(feature = "http")] -#[worker::send] pub async fn handle_asset( req: worker::Request, env: worker::Env, diff --git a/worker-sandbox/src/auto_response.rs b/worker-sandbox/src/auto_response.rs index 0582f3bd5..38c56211d 100644 --- a/worker-sandbox/src/auto_response.rs +++ b/worker-sandbox/src/auto_response.rs @@ -33,7 +33,6 @@ impl DurableObject for AutoResponseObject { } // Route handler to exercise the Durable Object from tests. -#[worker::send] pub async fn handle_auto_response( _req: Request, env: Env, diff --git a/worker-sandbox/src/cache.rs b/worker-sandbox/src/cache.rs index dcb73c67a..cedbca94f 100644 --- a/worker-sandbox/src/cache.rs +++ b/worker-sandbox/src/cache.rs @@ -10,7 +10,6 @@ fn key(req: &Request) -> Result> { Ok(segments.nth(2).map(ToOwned::to_owned)) } -#[worker::send] pub async fn handle_cache_example( req: Request, _env: Env, @@ -34,7 +33,6 @@ pub async fn handle_cache_example( } } -#[worker::send] pub async fn handle_cache_api_get( req: Request, _env: Env, @@ -50,7 +48,6 @@ pub async fn handle_cache_api_get( Response::error("key missing", 400) } -#[worker::send] pub async fn handle_cache_api_put( req: Request, _env: Env, @@ -69,7 +66,6 @@ pub async fn handle_cache_api_put( Response::error("key missing", 400) } -#[worker::send] pub async fn handle_cache_api_delete( req: Request, _env: Env, @@ -84,7 +80,6 @@ pub async fn handle_cache_api_delete( Response::error("key missing", 400) } -#[worker::send] pub async fn handle_cache_stream( req: Request, _env: Env, diff --git a/worker-sandbox/src/counter.rs b/worker-sandbox/src/counter.rs index 931d1c518..a99d36b25 100644 --- a/worker-sandbox/src/counter.rs +++ b/worker-sandbox/src/counter.rs @@ -91,7 +91,6 @@ impl DurableObject for Counter { } } -#[worker::send] pub async fn handle_id(req: Request, env: Env, _data: SomeSharedData) -> Result { let durable_object_name = if req.path().contains("shared") { "SHARED_COUNTER" @@ -106,7 +105,6 @@ pub async fn handle_id(req: Request, env: Env, _data: SomeSharedData) -> Result< stub.fetch_with_str("https://fake-host/").await } -#[worker::send] pub async fn handle_websocket(req: Request, env: Env, _data: SomeSharedData) -> Result { let durable_object_name = if req.path().contains("shared") { "SHARED_COUNTER" diff --git a/worker-sandbox/src/d1.rs b/worker-sandbox/src/d1.rs index de7a84d83..5b0f99e9b 100644 --- a/worker-sandbox/src/d1.rs +++ b/worker-sandbox/src/d1.rs @@ -14,7 +14,6 @@ struct Person { age: u32, } -#[worker::send] pub async fn prepared_statement( _req: Request, env: Env, @@ -68,7 +67,6 @@ pub async fn prepared_statement( Response::ok("ok") } -#[worker::send] pub async fn batch(_req: Request, env: Env, _data: SomeSharedData) -> Result { let db = env.d1("DB")?; let mut results = db @@ -93,7 +91,6 @@ pub async fn batch(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let db = env.d1("DB")?; let result = db @@ -104,14 +101,12 @@ pub async fn exec(mut req: Request, env: Env, _data: SomeSharedData) -> Result Result { let db = env.d1("DB")?; let bytes = db.dump().await?; Response::from_bytes(bytes) } -#[worker::send] pub async fn error(_req: Request, env: Env, _data: SomeSharedData) -> Result { let db = env.d1("DB")?; let error = db @@ -138,7 +133,6 @@ struct NullablePerson { age: Option, } -#[worker::send] pub async fn jsvalue_null_is_null( _req: Request, _env: Env, @@ -151,7 +145,6 @@ pub async fn jsvalue_null_is_null( Response::ok("ok") } -#[worker::send] pub async fn serialize_optional_none( _req: Request, _env: Env, @@ -167,7 +160,6 @@ pub async fn serialize_optional_none( Response::ok("ok") } -#[worker::send] pub async fn serialize_optional_some( _req: Request, _env: Env, @@ -183,7 +175,6 @@ pub async fn serialize_optional_some( Response::ok("ok") } -#[worker::send] pub async fn deserialize_optional_none( _req: Request, _env: Env, @@ -207,7 +198,6 @@ pub async fn deserialize_optional_none( Response::ok("ok") } -#[worker::send] pub async fn insert_and_retrieve_optional_none( _req: Request, env: Env, @@ -233,7 +223,6 @@ pub async fn insert_and_retrieve_optional_none( Response::ok("ok") } -#[worker::send] pub async fn insert_and_retrieve_optional_some( _req: Request, env: Env, @@ -258,7 +247,6 @@ pub async fn insert_and_retrieve_optional_some( Response::ok("ok") } -#[worker::send] pub async fn retrieve_optional_none( _req: Request, env: Env, @@ -275,7 +263,6 @@ pub async fn retrieve_optional_none( Response::ok("ok") } -#[worker::send] pub async fn retrieve_optional_some( _req: Request, env: Env, @@ -292,7 +279,6 @@ pub async fn retrieve_optional_some( Response::ok("ok") } -#[worker::send] pub async fn retrive_first_none( _req: Request, env: Env, diff --git a/worker-sandbox/src/durable.rs b/worker-sandbox/src/durable.rs index 7aa368a7b..2eadf0ba7 100644 --- a/worker-sandbox/src/durable.rs +++ b/worker-sandbox/src/durable.rs @@ -164,7 +164,6 @@ impl DurableObject for MyClass { } // Route handlers to exercise the Durable Object from tests. -#[worker::send] pub async fn handle_hello( _req: Request, env: Env, @@ -178,7 +177,6 @@ pub async fn handle_hello( .await } -#[worker::send] pub async fn handle_hello_unique( _req: Request, env: Env, @@ -192,7 +190,6 @@ pub async fn handle_hello_unique( .await } -#[worker::send] pub async fn handle_storage( _req: Request, env: Env, @@ -203,7 +200,6 @@ pub async fn handle_storage( stub.fetch_with_str("https://fake-host/storage").await } -#[worker::send] pub async fn handle_basic_test( _req: Request, env: Env, diff --git a/worker-sandbox/src/fetch.rs b/worker-sandbox/src/fetch.rs index d5c351bc6..26a584dda 100644 --- a/worker-sandbox/src/fetch.rs +++ b/worker-sandbox/src/fetch.rs @@ -7,7 +7,6 @@ use worker::{ RequestInit, Response, Result, }; -#[worker::send] pub async fn handle_fetch(_req: Request, _env: Env, _data: SomeSharedData) -> Result { let req = Request::new("https://example.com", Method::Post)?; let resp = Fetch::Request(req).send().await?; @@ -19,7 +18,6 @@ pub async fn handle_fetch(_req: Request, _env: Env, _data: SomeSharedData) -> Re )) } -#[worker::send] pub async fn handle_fetch_json( _req: Request, _env: Env, @@ -40,7 +38,6 @@ pub async fn handle_fetch_json( )) } -#[worker::send] pub async fn handle_proxy_request( req: Request, _env: Env, @@ -57,7 +54,6 @@ pub async fn handle_proxy_request( Fetch::Url(url.parse()?).send().await } -#[worker::send] pub async fn handle_request_init_fetch( _req: Request, _env: Env, @@ -69,7 +65,6 @@ pub async fn handle_request_init_fetch( .await } -#[worker::send] pub async fn handle_request_init_fetch_post( _req: Request, _env: Env, @@ -82,7 +77,6 @@ pub async fn handle_request_init_fetch_post( .await } -#[worker::send] pub async fn handle_cancelled_fetch( _req: Request, _env: Env, @@ -115,7 +109,6 @@ pub async fn handle_cancelled_fetch( Ok(res) } -#[worker::send] pub async fn handle_fetch_timeout( _req: Request, _env: Env, @@ -158,7 +151,6 @@ pub async fn handle_fetch_timeout( } } -#[worker::send] pub async fn handle_cloned_fetch( _req: Request, _env: Env, @@ -179,7 +171,6 @@ pub async fn handle_cloned_fetch( Response::ok((left == right).to_string()) } -#[worker::send] pub async fn handle_cloned_response_attributes( _req: Request, _env: Env, diff --git a/worker-sandbox/src/form.rs b/worker-sandbox/src/form.rs index 4c6ad57f1..7f88824b6 100644 --- a/worker-sandbox/src/form.rs +++ b/worker-sandbox/src/form.rs @@ -5,7 +5,6 @@ use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use worker::{kv, Env, FormEntry, Request, Response, Result}; -#[worker::send] pub async fn handle_formdata_name( mut req: Request, _env: Env, @@ -48,7 +47,6 @@ struct FileSize { size: u32, } -#[worker::send] pub async fn handle_formdata_file_size( mut req: Request, env: Env, @@ -88,7 +86,6 @@ pub async fn handle_formdata_file_size( Response::error("Bad Request", 400) } -#[worker::send] pub async fn handle_formdata_file_size_hash( req: Request, env: Env, @@ -108,7 +105,6 @@ pub async fn handle_formdata_file_size_hash( Response::error("Bad Request", 400) } -#[worker::send] pub async fn handle_is_secret( mut req: Request, env: Env, diff --git a/worker-sandbox/src/js_snippets.rs b/worker-sandbox/src/js_snippets.rs index 8784c8ccf..ead05c980 100644 --- a/worker-sandbox/src/js_snippets.rs +++ b/worker-sandbox/src/js_snippets.rs @@ -15,12 +15,10 @@ extern "C" { fn js_console_log(value: String); } -#[worker::send] pub async fn performance_now(_req: Request, _env: Env, _data: SomeSharedData) -> Result { Response::ok(format!("now: {}", js_performance_now())) } -#[worker::send] pub async fn console_log(_req: Request, _env: Env, _data: SomeSharedData) -> Result { js_console_log("test".to_owned()); Response::ok("OK") diff --git a/worker-sandbox/src/kv.rs b/worker-sandbox/src/kv.rs index 55533aae6..62726db8e 100644 --- a/worker-sandbox/src/kv.rs +++ b/worker-sandbox/src/kv.rs @@ -17,7 +17,6 @@ macro_rules! kv_assert_eq { }}; } -#[worker::send] pub async fn handle_post_key_value( req: Request, env: Env, @@ -39,7 +38,6 @@ pub async fn handle_post_key_value( const TEST_NAMESPACE: &str = "TEST"; -#[worker::send] pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result { let store = env.kv(TEST_NAMESPACE)?; let value = store.get("simple").text().await?; @@ -49,7 +47,6 @@ pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let store = env.kv(TEST_NAMESPACE)?; let value = store.get("not_found").text().await?; @@ -59,7 +56,6 @@ pub async fn get_not_found(_req: Request, env: Env, _data: SomeSharedData) -> Re } } -#[worker::send] pub async fn list_keys(_req: Request, env: Env, _data: SomeSharedData) -> Result { let store = env.kv(TEST_NAMESPACE)?; let list_res = store.list().execute().await?; @@ -70,7 +66,6 @@ pub async fn list_keys(_req: Request, env: Env, _data: SomeSharedData) -> Result Response::ok("passed") } -#[worker::send] pub async fn put_simple(_req: Request, env: Env, _data: SomeSharedData) -> Result { let store = env.kv(TEST_NAMESPACE)?; store.put("put_a", "test")?.execute().await?; @@ -81,7 +76,6 @@ pub async fn put_simple(_req: Request, env: Env, _data: SomeSharedData) -> Resul Response::ok("passed") } -#[worker::send] pub async fn put_metadata(_req: Request, env: Env, _data: SomeSharedData) -> Result { let store = env.kv(TEST_NAMESPACE)?; store.put("put_b", "test")?.metadata(100)?.execute().await?; @@ -93,7 +87,6 @@ pub async fn put_metadata(_req: Request, env: Env, _data: SomeSharedData) -> Res Response::ok("passed") } -#[worker::send] pub async fn put_expiration(_req: Request, env: Env, _data: SomeSharedData) -> Result { const EXPIRATION: u64 = 2_000_000_000; let store = env.kv(TEST_NAMESPACE)?; @@ -117,7 +110,6 @@ pub async fn put_expiration(_req: Request, env: Env, _data: SomeSharedData) -> R Response::ok("passed") } -#[worker::send] pub async fn put_metadata_struct( _req: Request, env: Env, diff --git a/worker-sandbox/src/put_raw.rs b/worker-sandbox/src/put_raw.rs index 1ad869757..8309891ac 100644 --- a/worker-sandbox/src/put_raw.rs +++ b/worker-sandbox/src/put_raw.rs @@ -58,7 +58,6 @@ impl DurableObject for PutRawTestObject { } } -#[worker::send] pub(crate) async fn handle_put_raw( req: Request, env: Env, diff --git a/worker-sandbox/src/queue.rs b/worker-sandbox/src/queue.rs index 5e9da4a8a..feb1f7aa4 100644 --- a/worker-sandbox/src/queue.rs +++ b/worker-sandbox/src/queue.rs @@ -26,7 +26,6 @@ pub async fn queue(message_batch: MessageBatch, _env: Env, _ctx: Cont Ok(()) } -#[worker::send] pub async fn handle_queue_send(req: Request, env: Env, _data: SomeSharedData) -> Result { let uri = req.url()?; let mut segments = uri.path_segments().unwrap(); @@ -54,7 +53,6 @@ pub async fn handle_queue_send(req: Request, env: Env, _data: SomeSharedData) -> } } -#[worker::send] pub async fn handle_batch_send(mut req: Request, env: Env, _: SomeSharedData) -> Result { let messages: Vec = match req.json().await { Ok(messages) => messages, diff --git a/worker-sandbox/src/r2.rs b/worker-sandbox/src/r2.rs index f55c3b8e4..c076a05ac 100644 --- a/worker-sandbox/src/r2.rs +++ b/worker-sandbox/src/r2.rs @@ -31,7 +31,6 @@ pub async fn seed_bucket(bucket: &Bucket) -> Result<()> { Ok(()) } -#[worker::send] pub async fn list_empty(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("EMPTY_BUCKET")?; @@ -43,7 +42,6 @@ pub async fn list_empty(_req: Request, env: Env, _data: SomeSharedData) -> Resul Response::ok("ok") } -#[worker::send] pub async fn list(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("SEEDED_BUCKET")?; seed_bucket(&bucket).await?; @@ -96,7 +94,6 @@ pub async fn list(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let bucket = env.bucket("EMPTY_BUCKET")?; @@ -119,7 +116,6 @@ pub async fn get_empty(_req: Request, env: Env, _data: SomeSharedData) -> Result Response::ok("ok") } -#[worker::send] pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("SEEDED_BUCKET")?; seed_bucket(&bucket).await?; @@ -141,7 +137,6 @@ pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let bucket = env.bucket("PUT_BUCKET")?; @@ -175,7 +170,6 @@ pub async fn put(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let bucket = env.bucket("PUT_BUCKET")?; let (http_metadata, custom_metadata, object_with_props) = @@ -190,7 +184,6 @@ pub async fn put_properties(_req: Request, env: Env, _data: SomeSharedData) -> R } #[allow(clippy::large_stack_arrays)] -#[worker::send] pub async fn put_multipart(_req: Request, env: Env, _data: SomeSharedData) -> Result { const R2_MULTIPART_CHUNK_MIN_SIZE: usize = 5 * 1_024 * 1_024; // 5MiB. // const TEST_CHUNK_COUNT: usize = 3; @@ -246,7 +239,6 @@ pub async fn put_multipart(_req: Request, env: Env, _data: SomeSharedData) -> Re Response::ok("ok") } -#[worker::send] pub async fn delete(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("DELETE_BUCKET")?; diff --git a/worker-sandbox/src/request.rs b/worker-sandbox/src/request.rs index 6b89113bb..3428f8584 100644 --- a/worker-sandbox/src/request.rs +++ b/worker-sandbox/src/request.rs @@ -69,7 +69,6 @@ pub async fn handle_headers(req: Request, _env: Env, _data: SomeSharedData) -> R .ok("returned your headers to you.") } -#[worker::send] pub async fn handle_post_file_size( mut req: Request, _env: Env, @@ -79,7 +78,6 @@ pub async fn handle_post_file_size( Response::ok(format!("size = {}", bytes.len())) } -#[worker::send] pub async fn handle_async_text_echo( mut req: Request, _env: Env, @@ -110,7 +108,6 @@ pub async fn handle_bytes(_req: Request, _env: Env, _data: SomeSharedData) -> Re Response::from_bytes(vec![1, 2, 3, 4, 5, 6, 7]) } -#[worker::send] pub async fn handle_api_data( mut req: Request, _env: Env, @@ -181,7 +178,6 @@ pub async fn handle_now(_req: Request, _env: Env, _data: SomeSharedData) -> Resu Response::ok(js_date.to_string()) } -#[worker::send] pub async fn handle_cloned(_req: Request, _env: Env, _data: SomeSharedData) -> Result { let mut resp = Response::ok("Hello")?; let mut resp1 = resp.cloned()?; @@ -192,7 +188,6 @@ pub async fn handle_cloned(_req: Request, _env: Env, _data: SomeSharedData) -> R Response::ok((left == right).to_string()) } -#[worker::send] pub async fn handle_cloned_stream( _req: Request, _env: Env, @@ -224,7 +219,6 @@ pub async fn handle_custom_response_body( Response::from_body(ResponseBody::Body(vec![b'h', b'e', b'l', b'l', b'o'])) } -#[worker::send] pub async fn handle_wait_delay(req: Request, _env: Env, _data: SomeSharedData) -> Result { let uri = req.url()?; let mut segments = uri.path_segments().unwrap(); diff --git a/worker-sandbox/src/router.rs b/worker-sandbox/src/router.rs index 15ac088dc..b31c88f57 100644 --- a/worker-sandbox/src/router.rs +++ b/worker-sandbox/src/router.rs @@ -7,6 +7,9 @@ use crate::{ use std::convert::TryInto; use std::sync::atomic::Ordering; +#[cfg(feature = "http")] +use worker::send::SendFuture; + use worker::{console_log, Env, Fetch, Request, Response, ResponseBuilder, Result}; #[cfg(not(feature = "http"))] @@ -60,7 +63,9 @@ macro_rules! format_route ( macro_rules! handler ( ($name:path) => { |Extension(env): Extension, Extension(data): Extension, req: axum::extract::Request| async { - let resp = $name(req.try_into().expect("convert request"), env, data).await.expect("handler result"); + // SAFETY + // This can only be called from a worker context + let resp = unsafe { SendFuture::new($name(req.try_into().expect("convert request"), env, data)).await.expect("handler result") }; Into::>::into(resp) } }; @@ -252,7 +257,6 @@ async fn respond_async(req: Request, _env: Env, _data: SomeSharedData) -> Result .ok(format!("Ok (async): {}", String::from(req.method()))) } -#[worker::send] async fn handle_close_event(_req: Request, env: Env, _data: SomeSharedData) -> Result { let some_namespace_kv = env.kv("SOME_NAMESPACE")?; let got_close_event = some_namespace_kv @@ -265,7 +269,6 @@ async fn handle_close_event(_req: Request, env: Env, _data: SomeSharedData) -> R Response::ok(got_close_event) } -#[worker::send] async fn catchall(req: Request, _env: Env, _data: SomeSharedData) -> Result { let uri = req.url()?; let path = uri.path(); diff --git a/worker-sandbox/src/service.rs b/worker-sandbox/src/service.rs index 15c83f690..264669d96 100644 --- a/worker-sandbox/src/service.rs +++ b/worker-sandbox/src/service.rs @@ -3,7 +3,6 @@ use super::SomeSharedData; use std::convert::TryInto; use worker::{Env, Method, Request, RequestInit, Response, Result}; -#[worker::send] pub async fn handle_remote_by_request( req: Request, env: Env, @@ -26,7 +25,6 @@ pub async fn handle_remote_by_request( result } -#[worker::send] pub async fn handle_remote_by_path( req: Request, env: Env, diff --git a/worker-sandbox/src/socket.rs b/worker-sandbox/src/socket.rs index 81fe2983e..7687dfce9 100644 --- a/worker-sandbox/src/socket.rs +++ b/worker-sandbox/src/socket.rs @@ -3,7 +3,6 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use worker::{ConnectionBuilder, Env, Error, Request, Response, Result}; -#[worker::send] pub async fn handle_socket_failed( _req: Request, _env: Env, @@ -12,16 +11,13 @@ pub async fn handle_socket_failed( let socket = ConnectionBuilder::new().connect("127.0.0.1", 25000)?; match socket.opened().await { - Ok(_) => { - return Err(Error::RustError( - "Socket should have failed to open.".to_owned(), - )) - } + Ok(_) => Err(Error::RustError( + "Socket should have failed to open.".to_owned(), + )), Err(e) => Response::ok(format!("{e:?}")), } } -#[worker::send] pub async fn handle_socket_read( _req: Request, _env: Env, diff --git a/worker-sandbox/src/sql_counter.rs b/worker-sandbox/src/sql_counter.rs index 93d92395b..c5080467e 100644 --- a/worker-sandbox/src/sql_counter.rs +++ b/worker-sandbox/src/sql_counter.rs @@ -83,7 +83,6 @@ impl SqlCounter { } } -#[worker::send] /// Route handler that proxies a request to our SqlCounter Durable Object with id derived from the /// path `/sql-counter/{name}` (so every name gets its own instance). pub async fn handle_sql_counter( diff --git a/worker-sandbox/src/sql_iterator.rs b/worker-sandbox/src/sql_iterator.rs index 4f5d1fe26..2ef020cb0 100644 --- a/worker-sandbox/src/sql_iterator.rs +++ b/worker-sandbox/src/sql_iterator.rs @@ -168,7 +168,6 @@ impl SqlIterator { } } -#[worker::send] /// Route handler for the SQL iterator test Durable Object. pub async fn handle_sql_iterator( req: Request, diff --git a/worker-sandbox/src/ws.rs b/worker-sandbox/src/ws.rs index e0c3899af..79ad22e65 100644 --- a/worker-sandbox/src/ws.rs +++ b/worker-sandbox/src/ws.rs @@ -40,7 +40,6 @@ pub async fn handle_websocket(_req: Request, env: Env, _data: SomeSharedData) -> Response::from_websocket(pair.client) } -#[worker::send] pub async fn handle_websocket_client( _req: Request, _env: Env, diff --git a/worker/src/ai.rs b/worker/src/ai.rs index 3855fcc99..89af45433 100644 --- a/worker/src/ai.rs +++ b/worker/src/ai.rs @@ -14,7 +14,7 @@ impl Ai { model: impl AsRef, input: T, ) -> Result { - let fut = SendFuture::new(JsFuture::from( + let fut = SendFuture::new_unchecked(JsFuture::from( self.0 .run(model.as_ref(), serde_wasm_bindgen::to_value(&input)?), )); @@ -25,9 +25,6 @@ impl Ai { } } -unsafe impl Sync for Ai {} -unsafe impl Send for Ai {} - impl From for Ai { fn from(inner: AiSys) -> Self { Self(inner) diff --git a/worker/src/crypto.rs b/worker/src/crypto.rs index bc2a38038..3c87b9724 100644 --- a/worker/src/crypto.rs +++ b/worker/src/crypto.rs @@ -35,7 +35,7 @@ impl DigestStream { } pub async fn digest(&self) -> Result { - let fut = SendFuture::new(JsFuture::from(self.inner.digest())); + let fut = SendFuture::new_unchecked(JsFuture::from(self.inner.digest())); let buffer: ArrayBuffer = fut.await?.unchecked_into(); Ok(Uint8Array::new(&buffer)) } diff --git a/worker/src/rate_limit.rs b/worker/src/rate_limit.rs index 4ca5d5228..5b1be4d7d 100644 --- a/worker/src/rate_limit.rs +++ b/worker/src/rate_limit.rs @@ -26,7 +26,7 @@ impl RateLimiter { pub async fn limit(&self, key: String) -> Result { let arg = serde_wasm_bindgen::to_value(&RateLimitOptions { key })?; let promise = self.0.limit(arg.into())?; - let fut = SendFuture::new(JsFuture::from(promise)); + let fut = SendFuture::new_unchecked(JsFuture::from(promise)); let result = fut.await?; let outcome = serde_wasm_bindgen::from_value(result)?; Ok(outcome) diff --git a/worker/src/send.rs b/worker/src/send.rs index 074f354b9..ad3d1b86a 100644 --- a/worker/src/send.rs +++ b/worker/src/send.rs @@ -24,7 +24,20 @@ pub struct SendFuture { } impl SendFuture { - pub fn new(inner: F) -> Self { + /// # SAFETY + /// If the future is used in the context + /// of cloudflare workers it is always safe to + /// construct as a worker is forced to be single threaded + /// + /// # WARNING + /// using this outside of a single threaded context + /// will cause undefined behavior and could lead + /// to programs crashing + pub unsafe fn new(inner: F) -> Self { + Self { inner } + } + + pub(crate) fn new_unchecked(inner: F) -> Self { Self { inner } } } @@ -53,7 +66,16 @@ unsafe impl Send for SendWrapper {} unsafe impl Sync for SendWrapper {} impl SendWrapper { - pub fn new(inner: T) -> Self { + /// # SAFETY + /// If the future is used in the context + /// of cloudflare workers it is always safe to + /// construct as a worker is forced to be single threaded + /// + /// WARNING + /// using this outside of a single threaded context + /// will cause undefined behavior and could lead + /// to programs crashing + pub unsafe fn new(inner: T) -> Self { Self(inner) } } diff --git a/worker/src/websocket.rs b/worker/src/websocket.rs index c772e537e..921acb5a0 100644 --- a/worker/src/websocket.rs +++ b/worker/src/websocket.rs @@ -461,7 +461,7 @@ async fn fetch_with_request_raw(request: crate::Request) -> Result Date: Mon, 8 Sep 2025 19:15:18 +0200 Subject: [PATCH 2/3] fix: fixed codegen to use new unsafe function --- worker-codegen/src/wit.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker-codegen/src/wit.rs b/worker-codegen/src/wit.rs index 50e2aa0a1..f70fbbb09 100644 --- a/worker-codegen/src/wit.rs +++ b/worker-codegen/src/wit.rs @@ -95,7 +95,7 @@ fn expand_from_impl(struct_name: &Ident, from_type: &syn::Type) -> anyhow::Resul let impl_raw = quote!( impl From<#from_type> for #struct_name { fn from(fetcher: #from_type) -> Self { - Self(::worker::send::SendWrapper::new(fetcher.into_rpc())) + Self(unsafe { ::worker::send::SendWrapper::new(fetcher.into_rpc()) }) } } ); From 3ef1b53a6f0615c348b7eb8f1c04eea5f1b0ba61 Mon Sep 17 00:00:00 2001 From: parzivale <40600864+parzivale@users.noreply.github.com> Date: Thu, 25 Sep 2025 22:20:04 +0200 Subject: [PATCH 3/3] Added unsafe marker to new code --- worker-codegen/src/wit.rs | 2 +- worker-sandbox/src/durable.rs | 2 -- worker-sandbox/src/lib.rs | 1 + worker-sandbox/src/secret_store.rs | 2 -- worker/src/secret_store.rs | 4 ++-- 5 files changed, 4 insertions(+), 7 deletions(-) diff --git a/worker-codegen/src/wit.rs b/worker-codegen/src/wit.rs index f70fbbb09..e9786af77 100644 --- a/worker-codegen/src/wit.rs +++ b/worker-codegen/src/wit.rs @@ -144,7 +144,7 @@ fn expand_rpc_impl( let method_raw = quote!( async fn #ident(&self) -> ::worker::Result<#ret_type> { let promise = #invocation_item?; - let fut = ::worker::send::SendFuture::new(::worker::wasm_bindgen_futures::JsFuture::from(promise)); + let fut = unsafe { ::worker::send::SendFuture::new(::worker::wasm_bindgen_futures::JsFuture::from(promise)) }; let output = fut.await?; Ok(::serde_wasm_bindgen::from_value(output)?) } diff --git a/worker-sandbox/src/durable.rs b/worker-sandbox/src/durable.rs index 88eff3fdc..3f8f4faf7 100644 --- a/worker-sandbox/src/durable.rs +++ b/worker-sandbox/src/durable.rs @@ -274,7 +274,6 @@ pub async fn handle_basic_test( Response::ok("ok") } -#[worker::send] pub async fn handle_get_by_name( _req: Request, env: Env, @@ -292,7 +291,6 @@ pub async fn handle_get_by_name( .await } -#[worker::send] pub async fn handle_get_by_name_with_location_hint( _req: Request, env: Env, diff --git a/worker-sandbox/src/lib.rs b/worker-sandbox/src/lib.rs index 9d6ccd51c..410d4260b 100644 --- a/worker-sandbox/src/lib.rs +++ b/worker-sandbox/src/lib.rs @@ -38,6 +38,7 @@ mod user; mod utils; mod ws; +#[allow(dead_code)] #[derive(Deserialize, Serialize)] struct MyData { message: String, diff --git a/worker-sandbox/src/secret_store.rs b/worker-sandbox/src/secret_store.rs index 9410356a2..47c3316f1 100644 --- a/worker-sandbox/src/secret_store.rs +++ b/worker-sandbox/src/secret_store.rs @@ -1,7 +1,6 @@ use crate::SomeSharedData; use worker::{Env, Request, Response, Result}; -#[worker::send] pub async fn get_from_secret_store( _req: Request, env: Env, @@ -16,7 +15,6 @@ pub async fn get_from_secret_store( } } -#[worker::send] pub async fn get_from_secret_store_missing( _req: Request, env: Env, diff --git a/worker/src/secret_store.rs b/worker/src/secret_store.rs index e5103844b..d5b3f6ae5 100644 --- a/worker/src/secret_store.rs +++ b/worker/src/secret_store.rs @@ -46,7 +46,7 @@ impl From for SecretStore { impl From for SecretStore { fn from(fetcher: Fetcher) -> Self { - Self(SendWrapper::new(fetcher.into_rpc())) + Self(unsafe { SendWrapper::new(fetcher.into_rpc()) }) } } @@ -66,7 +66,7 @@ impl SecretStore { Err(_) => return Ok(None), // Secret not found }; - let fut = SendFuture::new(JsFuture::from(promise)); + let fut = unsafe { SendFuture::new(JsFuture::from(promise)) }; let output = match fut.await { Ok(val) => val,