|  | 
|  | 1 | +//! Serves an HTTP health server. | 
|  | 2 | +//! | 
|  | 3 | +//! * `GET /ready` -- returns 200 when the proxy is ready to participate in meshed | 
|  | 4 | +//!   traffic. | 
|  | 5 | +//! * `GET /live` -- returns 200 when the proxy is live. | 
|  | 6 | +
 | 
|  | 7 | +use futures::future; | 
|  | 8 | +use http::StatusCode; | 
|  | 9 | +use hyper::{ | 
|  | 10 | +    body::{Body, HttpBody}, | 
|  | 11 | +    Request, Response, | 
|  | 12 | +}; | 
|  | 13 | +use linkerd_app_core::Error; | 
|  | 14 | +use std::{ | 
|  | 15 | +    future::Future, | 
|  | 16 | +    pin::Pin, | 
|  | 17 | +    task::{Context, Poll}, | 
|  | 18 | +}; | 
|  | 19 | + | 
|  | 20 | +mod readiness; | 
|  | 21 | + | 
|  | 22 | +pub use self::readiness::{Latch, Readiness}; | 
|  | 23 | + | 
|  | 24 | +#[derive(Clone)] | 
|  | 25 | +pub struct Health { | 
|  | 26 | +    ready: Readiness, | 
|  | 27 | +} | 
|  | 28 | + | 
|  | 29 | +pub type ResponseFuture = | 
|  | 30 | +    Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send + 'static>>; | 
|  | 31 | + | 
|  | 32 | +impl Health { | 
|  | 33 | +    pub fn new(ready: Readiness) -> Self { | 
|  | 34 | +        Self { ready } | 
|  | 35 | +    } | 
|  | 36 | + | 
|  | 37 | +    fn ready_rsp(&self) -> Response<Body> { | 
|  | 38 | +        if self.ready.is_ready() { | 
|  | 39 | +            Response::builder() | 
|  | 40 | +                .status(StatusCode::OK) | 
|  | 41 | +                .header(http::header::CONTENT_TYPE, "text/plain") | 
|  | 42 | +                .body("ready\n".into()) | 
|  | 43 | +                .expect("builder with known status code must not fail") | 
|  | 44 | +        } else { | 
|  | 45 | +            Response::builder() | 
|  | 46 | +                .status(StatusCode::SERVICE_UNAVAILABLE) | 
|  | 47 | +                .body("not ready\n".into()) | 
|  | 48 | +                .expect("builder with known status code must not fail") | 
|  | 49 | +        } | 
|  | 50 | +    } | 
|  | 51 | + | 
|  | 52 | +    fn live_rsp() -> Response<Body> { | 
|  | 53 | +        Response::builder() | 
|  | 54 | +            .status(StatusCode::OK) | 
|  | 55 | +            .header(http::header::CONTENT_TYPE, "text/plain") | 
|  | 56 | +            .body("live\n".into()) | 
|  | 57 | +            .expect("builder with known status code must not fail") | 
|  | 58 | +    } | 
|  | 59 | + | 
|  | 60 | +    fn not_found() -> Response<Body> { | 
|  | 61 | +        Response::builder() | 
|  | 62 | +            .status(http::StatusCode::NOT_FOUND) | 
|  | 63 | +            .body(Body::empty()) | 
|  | 64 | +            .expect("builder with known status code must not fail") | 
|  | 65 | +    } | 
|  | 66 | +} | 
|  | 67 | + | 
|  | 68 | +impl<B> tower::Service<http::Request<B>> for Health | 
|  | 69 | +where | 
|  | 70 | +    B: HttpBody + Send + Sync + 'static, | 
|  | 71 | +    B::Error: Into<Error>, | 
|  | 72 | +    B::Data: Send, | 
|  | 73 | +{ | 
|  | 74 | +    type Response = http::Response<Body>; | 
|  | 75 | +    type Error = Error; | 
|  | 76 | +    type Future = ResponseFuture; | 
|  | 77 | + | 
|  | 78 | +    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | 
|  | 79 | +        Poll::Ready(Ok(())) | 
|  | 80 | +    } | 
|  | 81 | + | 
|  | 82 | +    fn call(&mut self, req: Request<B>) -> Self::Future { | 
|  | 83 | +        match req.uri().path() { | 
|  | 84 | +            "/live" => Box::pin(future::ok(Self::live_rsp())), | 
|  | 85 | +            "/ready" => Box::pin(future::ok(self.ready_rsp())), | 
|  | 86 | +            _ => Box::pin(future::ok(Self::not_found())), | 
|  | 87 | +        } | 
|  | 88 | +    } | 
|  | 89 | +} | 
|  | 90 | + | 
|  | 91 | +#[cfg(test)] | 
|  | 92 | +mod tests { | 
|  | 93 | +    use super::*; | 
|  | 94 | +    use http::method::Method; | 
|  | 95 | +    use std::time::Duration; | 
|  | 96 | +    use tokio::time::timeout; | 
|  | 97 | +    use tower::util::ServiceExt; | 
|  | 98 | + | 
|  | 99 | +    const TIMEOUT: Duration = Duration::from_secs(1); | 
|  | 100 | + | 
|  | 101 | +    #[tokio::test] | 
|  | 102 | +    async fn ready_when_latches_dropped() { | 
|  | 103 | +        let (r, l0) = Readiness::new(); | 
|  | 104 | +        let l1 = l0.clone(); | 
|  | 105 | + | 
|  | 106 | +        let health = Health::new(r); | 
|  | 107 | +        macro_rules! call { | 
|  | 108 | +            () => {{ | 
|  | 109 | +                let r = Request::builder() | 
|  | 110 | +                    .method(Method::GET) | 
|  | 111 | +                    .uri("http://0.0.0.0/ready") | 
|  | 112 | +                    .body(Body::empty()) | 
|  | 113 | +                    .unwrap(); | 
|  | 114 | +                let f = health.clone().oneshot(r); | 
|  | 115 | +                timeout(TIMEOUT, f).await.expect("timeout").expect("call") | 
|  | 116 | +            }}; | 
|  | 117 | +        } | 
|  | 118 | + | 
|  | 119 | +        assert_eq!(call!().status(), StatusCode::SERVICE_UNAVAILABLE); | 
|  | 120 | + | 
|  | 121 | +        drop(l0); | 
|  | 122 | +        assert_eq!(call!().status(), StatusCode::SERVICE_UNAVAILABLE); | 
|  | 123 | + | 
|  | 124 | +        drop(l1); | 
|  | 125 | +        assert_eq!(call!().status(), StatusCode::OK); | 
|  | 126 | +    } | 
|  | 127 | +} | 
0 commit comments