Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions examples/http_file_server/main.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -98,42 +98,37 @@ async fn serve_404(conn : @http.ServerConnection) -> Unit {
}

///|
pub async fn server(
pub async fn server_main(
server : @http.Server,
path~ : String,
port~ : Int,
log? : (String) -> Unit = println,
) -> Unit {
let base_path = path
@http.run_server(@socket.Addr::parse("0.0.0.0:\{port}"), fn(conn, addr) {
log("received new connection from \{addr}")
defer log("closing connection from \{addr}")
for {
let request = conn.read_request()
let (path, download_zip) = match request.path {
[.. path, .. "?download_zip"] => (path.to_string(), true)
path => (path, false)
}
log("serving \{path}")
guard request.meth is Get else { return }
let file = @fs.open(base_path + path, mode=ReadOnly) catch {
_ => {
serve_404(conn)
continue
}
server.run_forever((request, _body, conn) => {
let (path, download_zip) = match request.path {
[.. path, .. "?download_zip"] => (path.to_string(), true)
path => (path, false)
}
log("serving \{path}")
guard request.meth is Get else { return }
let file = @fs.open(base_path + path, mode=ReadOnly) catch {
_ => {
serve_404(conn)
return
}
if file.kind() is Directory {
if download_zip {
file.close()
serve_zip(conn, base_path + path)
} else {
let dir = file.as_dir()
defer dir.close()
serve_directory(conn, dir, path~)
}
}
if file.kind() is Directory {
if download_zip {
file.close()
serve_zip(conn, base_path + path)
} else {
defer file.close()
serve_file(conn, file, path~)
let dir = file.as_dir()
defer dir.close()
serve_directory(conn, dir, path~)
}
} else {
defer file.close()
serve_file(conn, file, path~)
}
})
}
Expand All @@ -144,7 +139,8 @@ async fn main {
[] | [_] => "."
[_, path, ..] => path
}
server(path~, port=8000) catch {
let server = @http.Server::new(@socket.Addr::parse("0.0.0.0:8000"))
server_main(server, path~) catch {
err => println("server terminate due to \{err}")
}
}
17 changes: 9 additions & 8 deletions examples/http_file_server/server_test.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@ fn output_response(response : @http.Response, logger : &Logger) -> Unit {
}
}

///|
let port = 4207

///|
async fn client(
addr : @socket.Addr,
path : String,
logger : &Logger,
handle_response? : async (@http.Client) -> Unit,
) -> Unit {
let conn = @http.Client::connect("localhost", protocol=Http, port~)
let conn = @http.Client::connect("localhost", protocol=Http, port=addr.port())
defer conn.close()
let response = conn.get(path)
output_response(response, logger)
Expand All @@ -47,24 +45,27 @@ async test "basic" {

let client_log = StringBuilder::new()
@async.with_task_group(fn(root) {
root.spawn_bg(no_wait=true, () => @http_file_server.server(
let server = @http.Server::new(@socket.Addr::parse("127.0.0.1:0"))
let addr = server.addr()
root.spawn_bg(no_wait=true, () => @http_file_server.server_main(
server,
path=".",
port~,
log~,
))
@async.sleep(50)
client_log.write_string("client: GET /examples/http_file_server\n")
client("/examples/http_file_server", client_log)
client(addr, "/examples/http_file_server", client_log)
client_log.write_string("\n\n")
client_log.write_string(
"client: GET /examples/http_file_server/moon.pkg.json\n",
)
client("/examples/http_file_server/moon.pkg.json", client_log)
client(addr, "/examples/http_file_server/moon.pkg.json", client_log)
client_log.write_string("\n\n")
let tmp_file_name = "examples/test.zip"
let tmp_file = @fs.create(tmp_file_name, permission=0o644)
root.add_defer(() => @fs.remove(tmp_file_name))
client(
addr,
"/examples/http_file_server?download_zip",
client_log,
handle_response=tmp_file.write_reader(_),
Expand Down
13 changes: 5 additions & 8 deletions examples/http_server_benchmark/http_server_benchmark.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ let port : Int = 3001

///|
async fn main {
@http.run_server(@socket.Addr::parse("0.0.0.0:\{port}"), fn(conn, _) {
for {
let request = conn.read_request()
match (request.meth, request.path) {
(Get, "/") => conn..send_response(200, "OK")..end_response()
_ => conn..send_response(404, "NotFound")..end_response()
}
}
let server = @http.Server::new(@socket.Addr::parse("0.0.0.0:\{port}"))
server.run_forever((request, _body, conn) => match
(request.meth, request.path) {
(Get, "/") => conn.send_response(200, "OK")
_ => conn.send_response(404, "NotFound")
})
}
42 changes: 17 additions & 25 deletions src/http/README.mbt.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,38 +54,30 @@ async test {
```

## Writing HTTP servers
The `@http.ServerConnection` type provides abstraction for a connection in a HTTP server.
It can be created via `@http.ServerConnection::new(tcp_connection)`.
The workflow of processing a request via `@http.ServerConnection` is:
The recommended way to create HTTP servers is `@http.Server::run_forever(..)`.
A HTTP server should first get created via `@http.Server::new(..)`,
after that, `server.run_forever(f)` automatically start and run the server.
The callback function `f` is used to handle HTTP requests.
It receives three parameters:

1. use `server.read_request()` to wait for incoming request
and obtain the header of the request
1. read the request body by usign `@http.ServerConnection` as a `@io.Reader`.
or use `server.read_all()` to obtain the whole request body.
Yon can also ignore the body via `server.skip_request_body()`
1. use `server.send_response` to initiate a response and send the response header
1. send response body by using `@http.ServerConnection` as a `@io.Writer`
1. call `server.end_response()` to complete the response
- the request to process
- a `&@io.Reader` that can be used to read request body
- a `@http.ServerConnection` that can be used to send response.
The procedure to send a response is:
1. initiate a response and send response header via `.send_response()`
2. send response body by using the `@http.ServerConnection` as `@io.Writer`
3. (optional) complete the response via `.end_response()`.
If `.end_response()` is not called, it will be called automatically
after `f` returns.

The `@http` package also provides a helper `@http.run_server`
for setting up and running a HTTP server directly.
It accepts a callback function for handling connection,
the callback will receive a `@http.ServerConnection` and the address of client.
Here's an example server that returns 404 to every request:

```moonbit
///|
#cfg(target="native")
pub async fn server(listen_addr : @socket.Addr) -> Unit {
@http.run_server(listen_addr, fn(conn, _) {
for {
let request = conn.read_request()
conn.skip_request_body()
conn
..send_response(404, "NotFound")
..write("`\{request.path}` not found")
..end_response()
}
})
@http.Server::new(listen_addr).run_forever((request, _body, conn) => conn
..send_response(404, "NotFound")
..write("`\{request.path}` not found"))
}
```
15 changes: 7 additions & 8 deletions src/http/client_test.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@
// limitations under the License.

///|
async fn test_server(port : Int, log : &Logger) -> Unit {
let addr = @socket.Addr::parse("127.0.0.1:\{port}")
@http.run_server(addr, (conn, _) => for {
let request = conn.read_request()
async fn test_server(server : @http.Server, log : &Logger) -> Unit {
server.run_forever((request, body, conn) => {
log.write_string(
"server received request: \{request.meth} \{request.path}\n",
)
while conn.read_some() is Some(data) {
while body.read_some() is Some(data) {
let data = @encoding/utf8.decode(data)
log.write_string("server received: \{data}")
}
Expand All @@ -29,16 +27,17 @@ async fn test_server(port : Int, log : &Logger) -> Unit {
conn..write("\{request.meth} response part 1\n")..flush()
@async.sleep(100)
log.write_string("server: sending \{request.meth} response part 2\n")
conn..write("\{request.meth} response part 2\n")..end_response()
conn..write("\{request.meth} response part 2\n")
})
}

///|
async test "request streaming" {
let port = 4213
let log = StringBuilder::new()
@async.with_task_group(group => {
group.spawn_bg(no_wait=true, () => test_server(port, log))
let server = @http.Server::new(@socket.Addr::parse("127.0.0.1:0"))
let port = server.addr().port()
group.spawn_bg(no_wait=true, () => test_server(server, log))
async fn fetch_response(client : @http.Client) {
while client.read_some() is Some(data) {
let data = @encoding/utf8.decode(data)
Expand Down
8 changes: 8 additions & 0 deletions src/http/pkg.generated.mbti
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async fn put(String, &@io.Data, headers? : Map[String, String], port? : Int) ->

async fn put_stream(String, headers? : Map[String, String], port? : Int) -> Client

#deprecated
async fn run_server(@socket.Addr, async (ServerConnection, @socket.Addr) -> Unit, headers? : Map[String, String], dual_stack? : Bool, reuse_addr? : Bool, allow_failure? : Bool, max_connections? : Int) -> Unit

// Errors
Expand Down Expand Up @@ -81,6 +82,13 @@ pub(all) struct Response {
headers : Map[String, String]
}

type Server
async fn Server::accept(Self) -> (ServerConnection, @socket.Addr)
fn Server::addr(Self) -> @socket.Addr
fn Server::close(Self) -> Unit
fn Server::new(@socket.Addr, dual_stack? : Bool, reuse_addr? : Bool, headers? : Map[String, String]) -> Self raise
async fn Server::run_forever(Self, async (Request, &@io.Reader, ServerConnection) -> Unit, allow_failure? : Bool, max_connections? : Int) -> Unit

type ServerConnection
fn ServerConnection::close(Self) -> Unit
async fn ServerConnection::end_response(Self) -> Unit
Expand Down
90 changes: 90 additions & 0 deletions src/http/server.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ pub async fn ServerConnection::send_response(
///
/// The meaning of `dual_stack` and `reuse_addr` is the same as `@socket.TcpServer::new`,
/// see its document for more details.
#deprecated("use `@http.Server::run_forever()` instead")
pub async fn run_server(
addr : @socket.Addr,
f : async (ServerConnection, @socket.Addr) -> Unit,
Expand All @@ -166,3 +167,92 @@ pub async fn run_server(
},
)
}

///|
/// A HTTP server
struct Server {
server : @socket.TcpServer
headers : Map[String, String]
}

///|
/// Create a new HTTP server listening on `addr`.
///
/// The meaning of `dual_stack` and `reuse_addr` is the same as
/// `@socket.TcpServer::new()`, see there for more details.
///
/// `headers` can be used to specify common headers
/// shared by all responses sent by this server.
pub fn Server::new(
addr : @socket.Addr,
dual_stack? : Bool,
reuse_addr? : Bool,
headers? : Map[String, String] = {},
) -> Server raise {
{ server: @socket.TcpServer::new(addr, dual_stack?, reuse_addr?), headers }
}

///|
pub fn Server::close(self : Server) -> Unit {
self.server.close()
}

///|
/// Get the listen address of the server
pub fn Server::addr(self : Server) -> @socket.Addr {
self.server.addr()
}

///|
/// Accept a new connection from a HTTP server.
/// Return the new HTTP connection and the address of peer.
pub async fn Server::accept(self : Server) -> (ServerConnection, @socket.Addr) {
let (conn, addr) = self.server.accept()
(ServerConnection::new(conn, headers=self.headers), addr)
}

///|
/// Start the main loop of a HTTP server,
/// the server will keep listening for new connections
/// and new requests from existing connections.
/// Requests are handled by the callback function `f`.
///
/// The callback function `f` accepts three arguments:
/// - the request to process
/// - a `&@io.Reader` that can be used to read request body
/// - a `@http.ServerConnection` that can be used to send response.
/// The procedure to send a response is:
/// 1. initiate a response and send response header via `.send_response()`
/// 2. send response body by using the `@http.ServerConnection` as `@io.Writer`
/// 3. (optional) complete the response via `.end_response()`.
/// If `.end_response()` is not called, it will be called automatically
/// after `f` returns.
///
/// If `allow_failure` is `true` (`true` by default),
/// error raised by `f` will not crash the whole server.
/// In this case, `run_forever` will only terminate when cancelled externally.
/// Note that if `f` fails, the connection used by `f` will still get aborted.
///
/// If `max_connections` is present,
/// at most `max_connections` clients are allowed in parallel.
/// New clients will only get handled after a previous client terminates.
///
/// `run_forever` will close the server and each connection automatically.
/// So `f` or the caller must not manually close the connections/server.
pub async fn Server::run_forever(
self : Server,
f : async (Request, &@io.Reader, ServerConnection) -> Unit,
allow_failure? : Bool,
max_connections? : Int,
) -> Unit {
self.server.run_forever(allow_failure?, max_connections?, (conn, _) => {
let conn = ServerConnection::new(conn, headers=self.headers)
for {
let request = conn.read_request()
f(request, conn, conn)
if conn.sender.mode is SendingBody {
conn.end_response()
}
}
})
}