Skip to content

Commit 0379117

Browse files
committed
add streaming http request helpers
1 parent 3d8ab4d commit 0379117

File tree

3 files changed

+236
-5
lines changed

3 files changed

+236
-5
lines changed

src/http/client_test.mbt

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright 2025 International Digital Economy Academy
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
///|
16+
async fn test_server(port : Int, log : &Logger) -> Unit {
17+
let addr = @socket.Addr::parse("127.0.0.1:\{port}")
18+
@http.run_server(addr, (conn, _) => {
19+
let buf = FixedArray::make(1024, b'\x00')
20+
for {
21+
let request = conn.read_request()
22+
log.write_string(
23+
"server received request: \{request.meth} \{request.path}\n",
24+
)
25+
while conn.read(buf) is n && n > 0 {
26+
let data = @encoding/utf8.decode(buf.unsafe_reinterpret_as_bytes()[:n])
27+
log.write_string("server received: \{data}")
28+
}
29+
conn.send_response(200, "OK")
30+
log.write_string("server: sending \{request.meth} response part 1\n")
31+
conn..write("\{request.meth} response part 1\n")..flush()
32+
@async.sleep(100)
33+
log.write_string("server: sending \{request.meth} response part 2\n")
34+
conn..write("\{request.meth} response part 2\n")..end_response()
35+
}
36+
})
37+
}
38+
39+
///|
40+
async test "request streaming" {
41+
let port = 4213
42+
let log = StringBuilder::new()
43+
@async.with_task_group(group => {
44+
group.spawn_bg(no_wait=true, () => test_server(port, log))
45+
let buf = FixedArray::make(1024, b'\x00')
46+
async fn fetch_response(client : @http.Client) {
47+
while client.read(buf) is n && n > 0 {
48+
let data = @encoding/utf8.decode(buf.unsafe_reinterpret_as_bytes()[:n])
49+
log.write_string("client received: \{data}")
50+
}
51+
}
52+
53+
{
54+
log.write_string("client sending GET request\n")
55+
let (response, client) = @http.get_stream(
56+
"http://localhost/get",
57+
port~,
58+
body="GET data\n",
59+
)
60+
defer client.close()
61+
inspect(response.code, content="200")
62+
fetch_response(client)
63+
}
64+
{
65+
log.write_string("client sending PUT request\n")
66+
let client = @http.put_stream("http://localhost/put", port~)
67+
defer client.close()
68+
client.flush()
69+
@async.sleep(100)
70+
log.write_string("client: sending PUT data part 1\n")
71+
client..write("PUT data part 1\n")..flush()
72+
@async.sleep(100)
73+
log.write_string("client: sending PUT data part 2\n")
74+
client.write("PUT data part 2\n")
75+
let response = client.end_request()
76+
inspect(response.code, content="200")
77+
fetch_response(client)
78+
}
79+
log.write_string("client sending POST request\n")
80+
let client = @http.post_stream("http://localhost/post", port~)
81+
defer client.close()
82+
client.flush()
83+
@async.sleep(100)
84+
log.write_string("client: sending POST data part 1\n")
85+
client..write("POST data part 1\n")..flush()
86+
@async.sleep(100)
87+
log.write_string("client: sending POST data part 2\n")
88+
client.write("POST data part 2\n")
89+
let response = client.end_request()
90+
inspect(response.code, content="200")
91+
fetch_response(client)
92+
})
93+
inspect(
94+
log,
95+
content=(
96+
#|client sending GET request
97+
#|server received request: Get /get
98+
#|server received: GET data
99+
#|server: sending Get response part 1
100+
#|client received: Get response part 1
101+
#|server: sending Get response part 2
102+
#|client received: Get response part 2
103+
#|client sending PUT request
104+
#|server received request: Put /put
105+
#|client: sending PUT data part 1
106+
#|server received: PUT data part 1
107+
#|client: sending PUT data part 2
108+
#|server received: PUT data part 2
109+
#|server: sending Put response part 1
110+
#|client received: Put response part 1
111+
#|server: sending Put response part 2
112+
#|client received: Put response part 2
113+
#|client sending POST request
114+
#|server received request: Post /post
115+
#|client: sending POST data part 1
116+
#|server received: POST data part 1
117+
#|client: sending POST data part 2
118+
#|server received: POST data part 2
119+
#|server: sending Post response part 1
120+
#|client received: Post response part 1
121+
#|server: sending Post response part 2
122+
#|client received: Post response part 2
123+
#|
124+
),
125+
)
126+
}

src/http/pkg.generated.mbti

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,16 @@ import(
99
// Values
1010
async fn get(String, headers? : Map[String, String], port? : Int, body? : &@io.Data) -> (Response, &@io.Data)
1111

12+
async fn get_stream(String, headers? : Map[String, String], port? : Int, body? : &@io.Data) -> (Response, Client)
13+
1214
async fn post(String, &@io.Data, headers? : Map[String, String], port? : Int) -> (Response, &@io.Data)
1315

16+
async fn post_stream(String, headers? : Map[String, String], port? : Int) -> Client
17+
1418
async fn put(String, &@io.Data, headers? : Map[String, String], port? : Int) -> (Response, &@io.Data)
1519

20+
async fn put_stream(String, headers? : Map[String, String], port? : Int) -> Client
21+
1622
async fn run_server(@socket.Addr, async (ServerConnection, @socket.Addr) -> Unit, headers? : Map[String, String], dual_stack? : Bool, allow_failure? : Bool, max_connections? : Int) -> Unit
1723

1824
// Errors

src/http/request.mbt

Lines changed: 104 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,10 @@ pub suberror URIParseError {
3333
} derive(Show)
3434

3535
///|
36-
async fn perform_request(
36+
fn resolve_url(
3737
uri : String,
38-
meth : RequestMethod,
39-
headers : Map[String, String],
40-
body : &@io.Data,
4138
port? : Int,
42-
) -> (Response, &@io.Data) {
39+
) -> (Protocol, Int, String, String) raise {
4340
guard uri.find("://") is Some(protocol_len) else { raise InvalidFormat }
4441
let protocol = match uri[:protocol_len] {
4542
"http" => Http
@@ -61,6 +58,18 @@ async fn perform_request(
6158
(uri.to_string(), "/")
6259
}
6360
let path = if path == "" { "/" } else { path }
61+
(protocol, port, host, path)
62+
}
63+
64+
///|
65+
async fn perform_request(
66+
uri : String,
67+
meth : RequestMethod,
68+
headers : Map[String, String],
69+
body : &@io.Data,
70+
port? : Int,
71+
) -> (Response, &@io.Data) {
72+
let (protocol, port, host, path) = resolve_url(uri, port?)
6473
let client = Client::connect(host, headers~, protocol~, port~)
6574
defer client.close()
6675
client..request(meth, path)..write(body)
@@ -107,3 +116,93 @@ pub async fn post(
107116
) -> (Response, &@io.Data) {
108117
perform_request(uri, Post, headers, content, port?)
109118
}
119+
120+
///|
121+
/// Similar to `@http.get`, but allow reading response body streamingly.
122+
/// A pair `(response, client)` will be returned,
123+
/// where `response` is the response header from the server,
124+
/// and `client` is the HTTP client that performs the request.
125+
/// `client` can be used to read the content of response body via `@io.Reader`,
126+
/// see `@http.Client` for more details.
127+
///
128+
/// Note that the returned client must be manually closed via `.close()`
129+
/// to close the underlying connection used for the request.
130+
pub async fn get_stream(
131+
uri : String,
132+
headers? : Map[String, String] = {},
133+
port? : Int,
134+
body? : &@io.Data = b"",
135+
) -> (Response, Client) {
136+
let (protocol, port, host, path) = resolve_url(uri, port?)
137+
let client = Client::connect(host, headers~, protocol~, port~)
138+
try {
139+
client..request(Get, path)..write(body)
140+
let response = client.end_request()
141+
(response, client)
142+
} catch {
143+
err => {
144+
client.close()
145+
raise err
146+
}
147+
}
148+
}
149+
150+
///|
151+
/// Similar to `@http.put`, but allow writing request body streamingly.
152+
/// The return value `client` is the HTTP client that performs the request,
153+
/// it can be used to write the content of request body via `@io.Writer`.
154+
/// Notice that writing to `@http.Client` is buffered,
155+
/// so if you need to send data to the server immediately, `.flush()` must be called.
156+
/// After writing all the content, `.end_request()` must be called
157+
/// to complete the request and obtain response from the server.
158+
/// After that, the response body from the server can be obtained
159+
/// by using `client` as a `@io.Reader`. See `@http.Client` for more details.
160+
///
161+
/// Note that the returned `client` must be manually closed via `.close()`
162+
/// to close the underlying connection used for the request.
163+
pub async fn put_stream(
164+
uri : String,
165+
headers? : Map[String, String] = {},
166+
port? : Int,
167+
) -> Client {
168+
let (protocol, port, host, path) = resolve_url(uri, port?)
169+
let client = Client::connect(host, headers~, protocol~, port~)
170+
try client.request(Put, path) catch {
171+
err => {
172+
client.close()
173+
raise err
174+
}
175+
} noraise {
176+
_ => client
177+
}
178+
}
179+
180+
///|
181+
/// Similar to `@http.post`, but allow writing request body streamingly.
182+
/// The return value `client` is the HTTP client that performs the request,
183+
/// it can be used to write the content of request body via `@io.Writer`.
184+
/// Notice that writing to `@http.Client` is buffered,
185+
/// so if you need to send data to the server immediately, `.flush()` must be called.
186+
/// After writing all the content, `.end_request()` must be called
187+
/// to complete the request and obtain response from the server.
188+
/// After that, the response body from the server can be obtained
189+
/// by using `client` as a `@io.Reader`. See `@http.Client` for more details.
190+
///
191+
/// Note that the returned `client` must be manually closed via `.close()`
192+
/// to close the underlying connection used for the request.
193+
pub async fn post_stream(
194+
uri : String,
195+
headers? : Map[String, String] = {},
196+
port? : Int,
197+
) -> Client {
198+
let (protocol, port, host, path) = resolve_url(uri, port?)
199+
let client = Client::connect(host, headers~, protocol~, port~)
200+
try client.request(Post, path) catch {
201+
err => {
202+
client.close()
203+
raise err
204+
}
205+
} noraise {
206+
_ => client
207+
}
208+
}

0 commit comments

Comments
 (0)