Skip to content

Commit c07fc31

Browse files
authored
feat: session RPC methods (#91)
1 parent a8ce9d5 commit c07fc31

File tree

11 files changed

+462
-18
lines changed

11 files changed

+462
-18
lines changed

clippy.toml

Lines changed: 0 additions & 4 deletions
This file was deleted.

examples/session.rs

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
use {
2+
futures_util::TryStreamExt as _,
3+
relay_client::{
4+
error::ClientError,
5+
websocket::{Client, CloseFrame, ConnectionHandler, PublishedMessage},
6+
ConnectionOptions,
7+
},
8+
relay_rpc::{
9+
auth::{ed25519_dalek::SigningKey, AuthToken},
10+
domain::Topic,
11+
},
12+
std::time::Duration,
13+
structopt::StructOpt,
14+
};
15+
16+
#[derive(StructOpt)]
17+
struct Args {
18+
/// Specify WebSocket address.
19+
#[structopt(short, long, default_value = "wss://relay.walletconnect.com")]
20+
address: String,
21+
22+
/// Specify WalletConnect project ID.
23+
#[structopt(short, long, default_value = "3cbaa32f8fbf3cdcc87d27ca1fa68069")]
24+
project_id: String,
25+
}
26+
27+
struct Handler {
28+
name: &'static str,
29+
}
30+
31+
impl Handler {
32+
fn new(name: &'static str) -> Self {
33+
Self { name }
34+
}
35+
}
36+
37+
impl ConnectionHandler for Handler {
38+
fn connected(&mut self) {
39+
println!("[{}] connection open", self.name);
40+
}
41+
42+
fn disconnected(&mut self, frame: Option<CloseFrame<'static>>) {
43+
println!("[{}] connection closed: frame={frame:?}", self.name);
44+
}
45+
46+
fn message_received(&mut self, message: PublishedMessage) {
47+
println!(
48+
"[{}] inbound message: topic={} message={}",
49+
self.name, message.topic, message.message
50+
);
51+
}
52+
53+
fn inbound_error(&mut self, error: ClientError) {
54+
println!("[{}] inbound error: {error}", self.name);
55+
}
56+
57+
fn outbound_error(&mut self, error: ClientError) {
58+
println!("[{}] outbound error: {error}", self.name);
59+
}
60+
}
61+
62+
fn create_conn_opts(address: &str, project_id: &str) -> ConnectionOptions {
63+
let key = SigningKey::generate(&mut rand::thread_rng());
64+
65+
let auth = AuthToken::new("http://example.com")
66+
.aud(address)
67+
.ttl(Duration::from_secs(60 * 60))
68+
.as_jwt(&key)
69+
.unwrap();
70+
71+
ConnectionOptions::new(project_id, auth).with_address(address)
72+
}
73+
74+
#[tokio::main]
75+
async fn main() -> anyhow::Result<()> {
76+
let args = Args::from_args();
77+
78+
let app_client = Client::new(Handler::new("client1"));
79+
app_client
80+
.connect(&create_conn_opts(&args.address, &args.project_id))
81+
.await?;
82+
83+
let wallet_client = Client::new(Handler::new("client2"));
84+
wallet_client
85+
.connect(&create_conn_opts(&args.address, &args.project_id))
86+
.await?;
87+
88+
// Pre-generate topics, while the actual clients would derive them from the keys
89+
// exchanged during pairing:
90+
let pairing_topic = Topic::generate();
91+
let session_topic = Topic::generate();
92+
93+
// App proposes session:
94+
app_client
95+
.propose_session(
96+
pairing_topic.clone(),
97+
"wc_sessionPropose_req",
98+
Some("attestation".into()),
99+
)
100+
.await?;
101+
println!("[client1] proposed session: pairing_topic={pairing_topic}");
102+
103+
// Wallet scans the QR code and receives the `wc_sessionPropose` request:
104+
let msg = wallet_client
105+
.fetch_stream([pairing_topic.clone()])
106+
.try_collect::<Vec<_>>()
107+
.await?
108+
.pop()
109+
.unwrap();
110+
println!("[client2] received session proposal: {msg:?}");
111+
112+
// After user confirmation, the wallet approves this session:
113+
wallet_client
114+
.approve_session(
115+
pairing_topic.clone(),
116+
session_topic.clone(),
117+
"wc_sessionPropose_res",
118+
"wc_sessionSettle_req",
119+
)
120+
.await?;
121+
println!(
122+
"[client2] approved session: pairing_topic={pairing_topic} session_topic={session_topic}"
123+
);
124+
125+
// App receives `wc_sessionPropose` response, derives `session_topic` and
126+
// subscribes to it:
127+
app_client.subscribe(session_topic.clone()).await?;
128+
129+
tokio::time::sleep(Duration::from_millis(500)).await;
130+
131+
// App responds to the `wc_sessionSettle`:
132+
app_client
133+
.publish(
134+
session_topic.clone(),
135+
"wc_sessionSettle_res",
136+
None,
137+
1103,
138+
Duration::from_secs(300),
139+
false,
140+
)
141+
.await?;
142+
println!("[client1] published `wc_sessionSettle` response: session_topic={session_topic}");
143+
144+
tokio::time::sleep(Duration::from_millis(1000)).await;
145+
146+
drop(app_client);
147+
drop(wallet_client);
148+
149+
tokio::time::sleep(Duration::from_millis(100)).await;
150+
151+
println!("clients disconnected");
152+
153+
Ok(())
154+
}

examples/webhook.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ async fn main() -> anyhow::Result<()> {
151151
.watch_register(
152152
WatchRegisterRequest {
153153
service_url: server_url.clone(),
154-
webhook_url: format!("{}{}", server_url, SUB_WH_PATH),
154+
webhook_url: format!("{server_url}{SUB_WH_PATH}"),
155155
watch_type: rpc::WatchType::Subscriber,
156156
tags: vec![1100],
157157
statuses: vec![rpc::WatchStatus::Queued],
@@ -173,7 +173,7 @@ async fn main() -> anyhow::Result<()> {
173173
.watch_register(
174174
WatchRegisterRequest {
175175
service_url: server_url.clone(),
176-
webhook_url: format!("{}{}", server_url, PUB_WH_PATH),
176+
webhook_url: format!("{server_url}{PUB_WH_PATH}"),
177177
watch_type: rpc::WatchType::Publisher,
178178
tags: vec![1100],
179179
statuses: vec![rpc::WatchStatus::Accepted],

relay_client/src/http.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,40 @@ impl Client {
100100
})
101101
}
102102

103+
pub async fn create_topic(&self, topic: Topic) -> Response<rpc::CreateTopic> {
104+
self.request(rpc::CreateTopic { topic }).await
105+
}
106+
107+
pub async fn propose_session(
108+
&self,
109+
pairing_topic: Topic,
110+
session_proposal: impl Into<Arc<str>>,
111+
attestation: impl Into<Option<Arc<str>>>,
112+
) -> Response<rpc::ProposeSession> {
113+
self.request(rpc::ProposeSession {
114+
pairing_topic,
115+
session_proposal: session_proposal.into(),
116+
attestation: attestation.into(),
117+
})
118+
.await
119+
}
120+
121+
pub async fn approve_session(
122+
&self,
123+
pairing_topic: Topic,
124+
session_topic: Topic,
125+
pairing_response: impl Into<Arc<str>>,
126+
session_settlement_request: impl Into<Arc<str>>,
127+
) -> Response<rpc::ApproveSession> {
128+
self.request(rpc::ApproveSession {
129+
pairing_topic,
130+
session_topic,
131+
pairing_response: pairing_response.into(),
132+
session_settlement_request: session_settlement_request.into(),
133+
})
134+
.await
135+
}
136+
103137
/// Publishes a message over the network on given topic.
104138
pub async fn publish(
105139
&self,

relay_client/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ impl ConnectionOptions {
137137

138138
let mut request = url
139139
.into_client_request()
140+
.map_err(Box::new)
140141
.map_err(WebsocketClientError::Transport)?;
141142

142143
self.update_request_headers(request.headers_mut())?;

relay_client/src/websocket.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@ use {
77
relay_rpc::{
88
domain::{MessageId, SubscriptionId, Topic},
99
rpc::{
10+
ApproveSession,
1011
BatchFetchMessages,
1112
BatchReceiveMessages,
1213
BatchSubscribe,
1314
BatchSubscribeBlocking,
1415
BatchUnsubscribe,
16+
CreateTopic,
1517
FetchMessages,
18+
ProposeSession,
1619
Publish,
1720
Receipt,
1821
Subscribe,
@@ -36,7 +39,8 @@ pub use {
3639
tokio_tungstenite::tungstenite::protocol::CloseFrame,
3740
};
3841

39-
pub type TransportError = tokio_tungstenite::tungstenite::Error;
42+
pub type RawTransportError = tokio_tungstenite::tungstenite::Error;
43+
pub type TransportError = Box<RawTransportError>;
4044

4145
#[derive(Debug, thiserror::Error)]
4246
pub enum WebsocketClientError {
@@ -151,6 +155,50 @@ impl Client {
151155
Self { control_tx }
152156
}
153157

158+
pub fn create_topic(&self, topic: Topic) -> ResponseFuture<CreateTopic> {
159+
let (request, response) = create_request(CreateTopic { topic });
160+
161+
self.request(request);
162+
163+
response
164+
}
165+
166+
pub fn propose_session(
167+
&self,
168+
pairing_topic: Topic,
169+
session_proposal: impl Into<Arc<str>>,
170+
attestation: impl Into<Option<Arc<str>>>,
171+
) -> ResponseFuture<ProposeSession> {
172+
let (request, response) = create_request(ProposeSession {
173+
pairing_topic,
174+
session_proposal: session_proposal.into(),
175+
attestation: attestation.into(),
176+
});
177+
178+
self.request(request);
179+
180+
response
181+
}
182+
183+
pub fn approve_session(
184+
&self,
185+
pairing_topic: Topic,
186+
session_topic: Topic,
187+
pairing_response: impl Into<Arc<str>>,
188+
session_settlement_request: impl Into<Arc<str>>,
189+
) -> ResponseFuture<ApproveSession> {
190+
let (request, response) = create_request(ApproveSession {
191+
pairing_topic,
192+
session_topic,
193+
pairing_response: pairing_response.into(),
194+
session_settlement_request: session_settlement_request.into(),
195+
});
196+
197+
self.request(request);
198+
199+
response
200+
}
201+
154202
/// Publishes a message over the network on given topic.
155203
pub fn publish(
156204
&self,

relay_client/src/websocket/connection.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use {
33
outbound::OutboundRequest,
44
stream::{create_stream, ClientStream},
55
ConnectionHandler,
6-
TransportError,
6+
RawTransportError,
77
WebsocketClientError,
88
},
99
crate::{
@@ -123,7 +123,10 @@ impl Connection {
123123
match stream {
124124
Some(mut stream) => stream.close(None).await,
125125

126-
None => Err(WebsocketClientError::ClosingFailed(TransportError::AlreadyClosed).into()),
126+
None => Err(WebsocketClientError::ClosingFailed(Box::new(
127+
RawTransportError::AlreadyClosed,
128+
))
129+
.into()),
127130
}
128131
}
129132

relay_client/src/websocket/stream.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub type SocketStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
4040
pub async fn create_stream(request: HttpRequest<()>) -> Result<ClientStream, WebsocketClientError> {
4141
let (socket, _) = connect_async(request)
4242
.await
43+
.map_err(Box::new)
4344
.map_err(WebsocketClientError::ConnectionFailed)?;
4445

4546
Ok(ClientStream::new(socket))
@@ -145,7 +146,7 @@ impl ClientStream {
145146
self.socket
146147
.close(frame)
147148
.await
148-
.map_err(|err| WebsocketClientError::ClosingFailed(err).into())
149+
.map_err(|err| WebsocketClientError::ClosingFailed(Box::new(err)).into())
149150
}
150151

151152
fn parse_inbound(&mut self, result: Result<Message, TransportError>) -> Option<StreamEvent> {
@@ -244,19 +245,19 @@ impl ClientStream {
244245
Poll::Ready(Ok(())) => {
245246
if let Poll::Ready(Some(next_message)) = self.outbound_rx.poll_recv(cx) {
246247
if let Err(err) = self.socket.start_send_unpin(next_message) {
247-
return Poll::Ready(Err(err));
248+
return Poll::Ready(Err(Box::new(err)));
248249
}
249250

250251
should_flush = true;
251252
} else if should_flush {
252253
// We've sent out some messages, now we need to flush.
253-
return self.socket.poll_flush_unpin(cx);
254+
return self.socket.poll_flush_unpin(cx).map_err(Box::new);
254255
} else {
255256
return Poll::Pending;
256257
}
257258
}
258259

259-
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
260+
Poll::Ready(Err(err)) => return Poll::Ready(Err(Box::new(err))),
260261

261262
// The sink is not ready.
262263
Poll::Pending => return Poll::Pending,
@@ -276,6 +277,8 @@ impl Stream for ClientStream {
276277
while let Poll::Ready(data) = self.socket.poll_next_unpin(cx) {
277278
match data {
278279
Some(result) => {
280+
let result = result.map_err(Box::new);
281+
279282
if let Some(event) = self.parse_inbound(result) {
280283
return Poll::Ready(Some(event));
281284
}

0 commit comments

Comments
 (0)