Skip to content

Commit bdf2639

Browse files
authored
Merge pull request #740 from http-rs/bind
Add `Server::bind`
2 parents 09a53f0 + 6daa700 commit bdf2639

File tree

10 files changed

+578
-246
lines changed

10 files changed

+578
-246
lines changed

src/listener/concurrent_listener.rs

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::listener::{Listener, ToListener};
1+
use crate::listener::{ListenInfo, Listener, ToListener};
22
use crate::Server;
33

44
use std::fmt::{self, Debug, Display, Formatter};
@@ -33,12 +33,14 @@ use futures_util::stream::{futures_unordered::FuturesUnordered, StreamExt};
3333
///```
3434
3535
#[derive(Default)]
36-
pub struct ConcurrentListener<State>(Vec<Box<dyn Listener<State>>>);
36+
pub struct ConcurrentListener<State> {
37+
listeners: Vec<Box<dyn Listener<State>>>,
38+
}
3739

3840
impl<State: Clone + Send + Sync + 'static> ConcurrentListener<State> {
3941
/// creates a new ConcurrentListener
4042
pub fn new() -> Self {
41-
Self(vec![])
43+
Self { listeners: vec![] }
4244
}
4345

4446
/// Adds any [`ToListener`](crate::listener::ToListener) to this
@@ -55,8 +57,11 @@ impl<State: Clone + Send + Sync + 'static> ConcurrentListener<State> {
5557
/// # std::mem::drop(tide::new().listen(listener)); // for the State generic
5658
/// # Ok(()) }
5759
/// ```
58-
pub fn add<TL: ToListener<State>>(&mut self, listener: TL) -> io::Result<()> {
59-
self.0.push(Box::new(listener.to_listener()?));
60+
pub fn add<L>(&mut self, listener: L) -> io::Result<()>
61+
where
62+
L: ToListener<State>,
63+
{
64+
self.listeners.push(Box::new(listener.to_listener()?));
6065
Ok(())
6166
}
6267

@@ -71,39 +76,59 @@ impl<State: Clone + Send + Sync + 'static> ConcurrentListener<State> {
7176
/// .with_listener(async_std::net::TcpListener::bind("127.0.0.1:8081").await?),
7277
/// ).await?;
7378
/// # Ok(()) }) }
74-
pub fn with_listener<TL: ToListener<State>>(mut self, listener: TL) -> Self {
79+
pub fn with_listener<L>(mut self, listener: L) -> Self
80+
where
81+
L: ToListener<State>,
82+
{
7583
self.add(listener).expect("Unable to add listener");
7684
self
7785
}
7886
}
7987

8088
#[async_trait::async_trait]
81-
impl<State: Clone + Send + Sync + 'static> Listener<State> for ConcurrentListener<State> {
82-
async fn listen(&mut self, app: Server<State>) -> io::Result<()> {
89+
impl<State> Listener<State> for ConcurrentListener<State>
90+
where
91+
State: Clone + Send + Sync + 'static,
92+
{
93+
async fn bind(&mut self, app: Server<State>) -> io::Result<()> {
94+
for listener in self.listeners.iter_mut() {
95+
listener.bind(app.clone()).await?;
96+
}
97+
Ok(())
98+
}
99+
100+
async fn accept(&mut self) -> io::Result<()> {
83101
let mut futures_unordered = FuturesUnordered::new();
84102

85-
for listener in self.0.iter_mut() {
86-
let app = app.clone();
87-
futures_unordered.push(listener.listen(app));
103+
for listener in self.listeners.iter_mut() {
104+
futures_unordered.push(listener.accept());
88105
}
89106

90107
while let Some(result) = futures_unordered.next().await {
91108
result?;
92109
}
93110
Ok(())
94111
}
112+
113+
fn info(&self) -> Vec<ListenInfo> {
114+
self.listeners
115+
.iter()
116+
.map(|listener| listener.info().into_iter())
117+
.flatten()
118+
.collect()
119+
}
95120
}
96121

97122
impl<State> Debug for ConcurrentListener<State> {
98123
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
99-
write!(f, "{:?}", self.0)
124+
write!(f, "{:?}", self.listeners)
100125
}
101126
}
102127

103128
impl<State> Display for ConcurrentListener<State> {
104129
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
105130
let string = self
106-
.0
131+
.listeners
107132
.iter()
108133
.map(|l| l.to_string())
109134
.collect::<Vec<_>>()

src/listener/failover_listener.rs

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use std::fmt::{self, Debug, Display, Formatter};
55

66
use async_std::io;
77

8+
use crate::listener::ListenInfo;
9+
810
/// FailoverListener allows tide to attempt to listen in a sequential
911
/// order to any number of ports/addresses. The first successful
1012
/// listener is used.
@@ -31,14 +33,22 @@ use async_std::io;
3133
/// })
3234
///}
3335
///```
34-
3536
#[derive(Default)]
36-
pub struct FailoverListener<State>(Vec<Box<dyn Listener<State>>>);
37+
pub struct FailoverListener<State> {
38+
listeners: Vec<Option<Box<dyn Listener<State>>>>,
39+
index: Option<usize>,
40+
}
3741

38-
impl<State: Clone + Send + Sync + 'static> FailoverListener<State> {
42+
impl<State> FailoverListener<State>
43+
where
44+
State: Clone + Send + Sync + 'static,
45+
{
3946
/// creates a new FailoverListener
4047
pub fn new() -> Self {
41-
Self(vec![])
48+
Self {
49+
listeners: vec![],
50+
index: None,
51+
}
4252
}
4353

4454
/// Adds any [`ToListener`](crate::listener::ToListener) to this
@@ -57,8 +67,11 @@ impl<State: Clone + Send + Sync + 'static> FailoverListener<State> {
5767
/// # std::mem::drop(tide::new().listen(listener)); // for the State generic
5868
/// # Ok(()) }
5969
/// ```
60-
pub fn add<TL: ToListener<State>>(&mut self, listener: TL) -> io::Result<()> {
61-
self.0.push(Box::new(listener.to_listener()?));
70+
pub fn add<L>(&mut self, listener: L) -> io::Result<()>
71+
where
72+
L: ToListener<State>,
73+
{
74+
self.listeners.push(Some(Box::new(listener.to_listener()?)));
6275
Ok(())
6376
}
6477

@@ -73,21 +86,30 @@ impl<State: Clone + Send + Sync + 'static> FailoverListener<State> {
7386
/// .with_listener(("localhost", 8081)),
7487
/// ).await?;
7588
/// # Ok(()) }) }
76-
pub fn with_listener<TL: ToListener<State>>(mut self, listener: TL) -> Self {
89+
pub fn with_listener<L>(mut self, listener: L) -> Self
90+
where
91+
L: ToListener<State>,
92+
{
7793
self.add(listener).expect("Unable to add listener");
7894
self
7995
}
8096
}
8197

8298
#[async_trait::async_trait]
83-
impl<State: Clone + Send + Sync + 'static> Listener<State> for FailoverListener<State> {
84-
async fn listen(&mut self, app: Server<State>) -> io::Result<()> {
85-
for listener in self.0.iter_mut() {
86-
let app = app.clone();
87-
match listener.listen(app).await {
88-
Ok(_) => return Ok(()),
99+
impl<State> Listener<State> for FailoverListener<State>
100+
where
101+
State: Clone + Send + Sync + 'static,
102+
{
103+
async fn bind(&mut self, app: Server<State>) -> io::Result<()> {
104+
for (index, listener) in self.listeners.iter_mut().enumerate() {
105+
let listener = listener.as_deref_mut().expect("bind called twice");
106+
match listener.bind(app.clone()).await {
107+
Ok(_) => {
108+
self.index = Some(index);
109+
return Ok(());
110+
}
89111
Err(e) => {
90-
crate::log::info!("unable to listen", {
112+
crate::log::info!("unable to bind", {
91113
listener: listener.to_string(),
92114
error: e.to_string()
93115
});
@@ -100,20 +122,47 @@ impl<State: Clone + Send + Sync + 'static> Listener<State> for FailoverListener<
100122
"unable to bind to any supplied listener spec",
101123
))
102124
}
125+
126+
async fn accept(&mut self) -> io::Result<()> {
127+
match self.index {
128+
Some(index) => {
129+
let mut listener = self.listeners[index].take().expect("accept called twice");
130+
listener.accept().await?;
131+
Ok(())
132+
}
133+
None => Err(io::Error::new(
134+
io::ErrorKind::AddrNotAvailable,
135+
"unable to listen to any supplied listener spec",
136+
)),
137+
}
138+
}
139+
140+
fn info(&self) -> Vec<ListenInfo> {
141+
match self.index {
142+
Some(index) => match self.listeners.get(index) {
143+
Some(Some(listener)) => listener.info(),
144+
_ => vec![],
145+
},
146+
None => vec![],
147+
}
148+
}
103149
}
104150

105151
impl<State> Debug for FailoverListener<State> {
106152
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
107-
write!(f, "{:?}", self.0)
153+
write!(f, "{:?}", self.listeners)
108154
}
109155
}
110156

111157
impl<State> Display for FailoverListener<State> {
112158
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
113159
let string = self
114-
.0
160+
.listeners
115161
.iter()
116-
.map(|l| l.to_string())
162+
.map(|l| match l {
163+
Some(l) => l.to_string(),
164+
None => String::new(),
165+
})
117166
.collect::<Vec<_>>()
118167
.join(", ");
119168

src/listener/mod.rs

Lines changed: 90 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@ mod to_listener_impls;
1212
#[cfg(all(unix, feature = "h1-server"))]
1313
mod unix_listener;
1414

15-
use crate::Server;
15+
use std::fmt::{Debug, Display};
16+
1617
use async_std::io;
18+
use async_trait::async_trait;
19+
20+
use crate::Server;
1721

1822
pub use concurrent_listener::ConcurrentListener;
1923
pub use failover_listener::FailoverListener;
@@ -26,18 +30,46 @@ pub(crate) use tcp_listener::TcpListener;
2630
#[cfg(all(unix, feature = "h1-server"))]
2731
pub(crate) use unix_listener::UnixListener;
2832

29-
/// The Listener trait represents an implementation of http transport
30-
/// for a tide application. In order to provide a Listener to tide,
31-
/// you will also need to implement at least one [`ToListener`](crate::listener::ToListener) that
33+
/// The Listener trait represents an implementation of http transport for a tide
34+
/// application. In order to provide a Listener to tide, you will also need to
35+
/// implement at least one [`ToListener`](crate::listener::ToListener) that
3236
/// outputs your Listener type.
33-
#[async_trait::async_trait]
34-
pub trait Listener<State: 'static>:
35-
std::fmt::Debug + std::fmt::Display + Send + Sync + 'static
37+
#[async_trait]
38+
pub trait Listener<State>: Debug + Display + Send + Sync + 'static
39+
where
40+
State: Send + Sync + 'static,
41+
{
42+
/// Bind the listener. This starts the listening process by opening the
43+
/// necessary network ports, but not yet accepting incoming connections. This
44+
/// method must be called before `accept`.
45+
async fn bind(&mut self, app: Server<State>) -> io::Result<()>;
46+
47+
/// Start accepting incoming connections. This method must be called only
48+
/// after `bind` has succeeded.
49+
async fn accept(&mut self) -> io::Result<()>;
50+
51+
/// Expose information about the connection. This should always return valid
52+
/// data after `bind` has succeeded.
53+
fn info(&self) -> Vec<ListenInfo>;
54+
}
55+
56+
#[async_trait]
57+
impl<L, State> Listener<State> for Box<L>
58+
where
59+
L: Listener<State>,
60+
State: Send + Sync + 'static,
3661
{
37-
/// This is the primary entrypoint for the Listener trait. listen
38-
/// is called exactly once, and is expected to spawn tasks for
39-
/// each incoming connection.
40-
async fn listen(&mut self, app: Server<State>) -> io::Result<()>;
62+
async fn bind(&mut self, app: Server<State>) -> io::Result<()> {
63+
self.as_mut().bind(app).await
64+
}
65+
66+
async fn accept(&mut self) -> io::Result<()> {
67+
self.as_mut().accept().await
68+
}
69+
70+
fn info(&self) -> Vec<ListenInfo> {
71+
self.as_ref().info()
72+
}
4173
}
4274

4375
/// crate-internal shared logic used by tcp and unix listeners to
@@ -52,3 +84,50 @@ pub(crate) fn is_transient_error(e: &io::Error) -> bool {
5284
ConnectionRefused | ConnectionAborted | ConnectionReset
5385
)
5486
}
87+
88+
/// Information about the `Listener`.
89+
///
90+
/// See [`Report`](../listener/trait.Report.html) for more.
91+
#[derive(Debug, Clone)]
92+
pub struct ListenInfo {
93+
conn_string: String,
94+
transport: String,
95+
tls: bool,
96+
}
97+
98+
impl ListenInfo {
99+
/// Create a new instance of `ListenInfo`.
100+
///
101+
/// This method should only be called when implementing a new Tide `listener`
102+
/// strategy.
103+
pub fn new(conn_string: String, transport: String, tls: bool) -> Self {
104+
Self {
105+
conn_string,
106+
transport,
107+
tls,
108+
}
109+
}
110+
111+
/// Get the connection string.
112+
pub fn connection(&self) -> &str {
113+
self.conn_string.as_str()
114+
}
115+
116+
/// The underlying transport this connection listens on.
117+
///
118+
/// Examples are: "tcp", "uds", etc.
119+
pub fn transport(&self) -> &str {
120+
self.transport.as_str()
121+
}
122+
123+
/// Is the connection encrypted?
124+
pub fn is_encrypted(&self) -> bool {
125+
self.tls
126+
}
127+
}
128+
129+
impl Display for ListenInfo {
130+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131+
write!(f, "{}", self.conn_string)
132+
}
133+
}

0 commit comments

Comments
 (0)