Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 90 additions & 16 deletions src/http/client.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

///|
priv enum ClientTransport {
Plain(@socket.Tcp)
Proxy(Client)
}

///|
/// Simple HTTP client which connect to a remote host via TCP
struct Client {
reader : Reader
conn : @socket.Tcp
transport : ClientTransport
tls : @tls.Tls?
sender : Sender
}

///|
/// Error raised when the proxy responded with a non-2XX response code
pub suberror ProxyError Response derive(Show)

///|
/// Create a new HTTP client by connecting to a remote host.
/// If `protocol` is `Https` (`Https` by default),
Expand All @@ -36,27 +46,72 @@ struct Client {
///
/// - Host
/// - Content-Length, Transfer-Encoding
///
/// If `proxy` is present, it should be another HTTP client in a clean state.
/// The new client will send a `CONNECT` request via the proxy client
/// and try to establish a tunnel via the proxy client.
/// All subsequent requests made by the new client will go through the proxy tunnel.
/// The ownership of the proxy client is transferred to the new client,
/// so it must not be used nor closed anymore by the caller.
/// Using another HTTP client as proxy allows advanced features such as
/// proxy authentication and https `CONNECT` proxy.
pub async fn Client::connect(
host : String,
headers? : Map[String, String] = {},
protocol? : Protocol = Https,
port? : Int = protocol.default_port(),
proxy? : Client,
) -> Client {
let conn = @socket.Tcp::connect_to_host(host, port~)
let tls = match protocol {
Http => None
Https => Some(@tls.Tls::client(conn, host~))
}
let reader = match tls {
None => Reader::new(conn)
Some(tls) => Reader::new(tls)
let transport = if proxy is Some(proxy) {
let path = "\{host}:\{port}"
try {
let response = proxy..request(Connect, path).end_request()
guard response.code is (200..<300) else { raise ProxyError(response) }
proxy..skip_response_body()..entre_passthrough_mode()
} catch {
err => {
proxy.close()
raise err
}
}
Proxy(proxy)
} else {
Plain(@socket.Tcp::connect_to_host(host, port~))
}
headers["Host"] = host
let sender = match tls {
None => Sender::new(conn, headers~)
Some(tls) => Sender::new(tls, headers~)
let (tls, reader, sender) = match (protocol, transport) {
(Http, Plain(conn)) =>
(None, Reader::new(conn), Sender::new(conn, headers~))
(Https, Plain(conn)) => {
let tls = @tls.Tls::client(conn, host~) catch {
err => {
transport.close()
raise err
}
}
(Some(tls), Reader::new(tls), Sender::new(tls, headers~))
}
(Http, Proxy(proxy)) =>
(None, Reader::new(proxy), Sender::new(proxy, headers~))
(Https, Proxy(proxy)) => {
let tls = @tls.Tls::client(proxy, host~) catch {
err => {
transport.close()
raise err
}
}
(Some(tls), Reader::new(tls), Sender::new(tls, headers~))
}
}
{ transport, tls, reader, sender }
}

///|
fn ClientTransport::close(self : ClientTransport) -> Unit {
match self {
Plain(conn) => conn.close()
Proxy(proxy) => proxy.close()
}
{ conn, tls, reader, sender }
}

///|
Expand All @@ -66,7 +121,7 @@ pub fn Client::close(self : Client) -> Unit {
if self.tls is Some(tls) {
tls.close()
}
self.conn.close()
self.transport.close()
}

///|
Expand All @@ -87,13 +142,13 @@ pub impl @io.Reader for Client with _get_internal_buffer(self) {
/// Writing to `@http.Client` MAY be buffered,
/// call `flush` manually to ensure data is delivered to the remote peer.
pub impl @io.Writer for Client with write_once(self, buf, offset~, len~) {
guard self.sender.mode is SendingBody
guard not(self.sender.mode is SendingHeader)
self.sender.write_once(buf, offset~, len~)
}

///|
pub impl @io.Writer for Client with write_reader(self, reader) {
guard self.sender.mode is SendingBody
guard not(self.sender.mode is SendingHeader)
self.sender.write_reader(reader)
}

Expand Down Expand Up @@ -188,3 +243,22 @@ pub async fn Client::post(
) -> Response {
self..request(Post, path, extra_headers~)..write(body).end_request()
}

///|
/// Let the client enter "pass through" mode,
/// where the client serve as a TCP tunnel (maybe TLS encrypted) directly.
/// This is useful for the special `CONNECT` HTTP request and HTTP protocol upgrade.
///
/// In passthrough mode,
/// Read/write the client becomes direct read/write on the underlying connection,
/// and all API except `@io.Reader` and `@io.Writer` must not be used anymore.
///
/// When entering pass through mode,
/// the client must be in a clean state
/// (i.e. not in the middle of sending a request).
/// Unread data from the body of the last response will be discarded.
pub async fn Client::entre_passthrough_mode(self : Client) -> Unit {
guard self.sender.mode is SendingHeader
self.reader.enter_passthrough_mode()
self.sender.enter_passthrough_mode()
}
3 changes: 2 additions & 1 deletion src/http/moon.pkg.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"send_wbtest.mbt": [ "native" ],
"parser_wbtest.mbt": [ "native" ],
"client_test.mbt": [ "native" ],
"request_test.mbt": [ "native" ]
"request_test.mbt": [ "native" ],
"proxy_test.mbt": [ "native" ]
},
"test-import": [
"moonbitlang/async"
Expand Down
15 changes: 14 additions & 1 deletion src/http/parser.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ priv enum BodyKind {
Empty
Fixed(Int)
Chunked(Int)
PassThrough
}

///|
Expand Down Expand Up @@ -88,12 +89,17 @@ impl @io.Reader for Reader with _direct_read(self, buf, offset~, max_len~) {
self.body = Chunked(remaining)
n
}
PassThrough => self.transport._direct_read(buf, offset~, max_len~)
}
}

///|
impl @io.Reader for Reader with _get_internal_buffer(self) {
self.read_buf
if self.body is PassThrough {
self.transport._get_internal_buffer()
} else {
self.read_buf
}
}

///|
Expand Down Expand Up @@ -221,3 +227,10 @@ async fn Reader::skip_body(self : Reader) -> Unit {

}
}

///|
async fn Reader::enter_passthrough_mode(self : Reader) -> Unit {
self.skip_body()
self.read_buf.clear()
self.body = PassThrough
}
21 changes: 14 additions & 7 deletions src/http/pkg.generated.mbti
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ import(
)

// Values
async fn get(String, headers? : Map[String, String], port? : Int, body? : &@io.Data) -> (Response, &@io.Data)
async fn get(String, headers? : Map[String, String], port? : Int, body? : &@io.Data, proxy? : Client) -> (Response, &@io.Data)

async fn get_stream(String, headers? : Map[String, String], port? : Int, body? : &@io.Data) -> (Response, Client)
async fn get_stream(String, headers? : Map[String, String], port? : Int, body? : &@io.Data, proxy? : Client) -> (Response, Client)

async fn post(String, &@io.Data, headers? : Map[String, String], port? : Int) -> (Response, &@io.Data)
async fn post(String, &@io.Data, headers? : Map[String, String], port? : Int, proxy? : Client) -> (Response, &@io.Data)

async fn post_stream(String, headers? : Map[String, String], port? : Int) -> Client
async fn post_stream(String, headers? : Map[String, String], port? : Int, proxy? : Client) -> Client

async fn put(String, &@io.Data, headers? : Map[String, String], port? : Int) -> (Response, &@io.Data)
async fn put(String, &@io.Data, headers? : Map[String, String], port? : Int, proxy? : Client) -> (Response, &@io.Data)

async fn put_stream(String, headers? : Map[String, String], port? : Int) -> Client
async fn put_stream(String, headers? : Map[String, String], port? : Int, proxy? : Client) -> Client

#deprecated
async fn run_server(@socket.Addr, async (ServerConnection, @socket.Addr) -> Unit, headers? : Map[String, String], dual_stack? : Bool, reuse_addr? : Bool, allow_failure? : Bool, max_connections? : Int) -> Unit
Expand All @@ -30,6 +30,9 @@ pub suberror HttpProtocolError {
}
impl Show for HttpProtocolError

pub suberror ProxyError Response
impl Show for ProxyError

pub suberror URIParseError {
InvalidFormat
UnsupportedProtocol(String)
Expand All @@ -39,8 +42,9 @@ impl Show for URIParseError
// Types and methods
type Client
fn Client::close(Self) -> Unit
async fn Client::connect(String, headers? : Map[String, String], protocol? : Protocol, port? : Int) -> Self
async fn Client::connect(String, headers? : Map[String, String], protocol? : Protocol, port? : Int, proxy? : Self) -> Self
async fn Client::end_request(Self) -> Response
async fn Client::entre_passthrough_mode(Self) -> Unit
async fn Client::flush(Self) -> Unit
async fn Client::get(Self, String, extra_headers? : Map[String, String], body? : &@io.Data) -> Response
async fn Client::post(Self, String, &@io.Data, extra_headers? : Map[String, String]) -> Response
Expand All @@ -61,6 +65,7 @@ pub(all) struct Request {
path : String
headers : Map[String, String]
}
impl Show for Request

pub(all) enum RequestMethod {
Get
Expand All @@ -81,6 +86,7 @@ pub(all) struct Response {
reason : String
headers : Map[String, String]
}
impl Show for Response

type Server
async fn Server::accept(Self) -> (ServerConnection, @socket.Addr)
Expand All @@ -92,6 +98,7 @@ async fn Server::run_forever(Self, async (Request, &@io.Reader, ServerConnection
type ServerConnection
fn ServerConnection::close(Self) -> Unit
async fn ServerConnection::end_response(Self) -> Unit
async fn ServerConnection::entre_passthrough_mode(Self) -> Unit
async fn ServerConnection::flush(Self) -> Unit
fn ServerConnection::new(@socket.Tcp, headers? : Map[String, String]) -> Self
async fn ServerConnection::read_request(Self) -> Request
Expand Down
Loading