Skip to content

Commit 11aea7a

Browse files
committed
the response owns the value of the stream
1 parent b21a208 commit 11aea7a

File tree

6 files changed

+209
-114
lines changed

6 files changed

+209
-114
lines changed

src/bun.js/api/html_rewriter.zig

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,11 +490,12 @@ pub const HTMLRewriter = struct {
490490
result.setUrl(original.getUrl().clone());
491491

492492
const value = original.getBodyValue();
493+
const owned_readable_stream = original.getBodyReadableStream(sink.global);
493494
sink.ref();
494495
sink.bodyValueBufferer = jsc.WebCore.Body.ValueBufferer.init(sink, @ptrCast(&onFinishedBuffering), sink.global, bun.default_allocator);
495496
response_js_value.ensureStillAlive();
496497

497-
sink.bodyValueBufferer.?.run(value) catch |buffering_error| {
498+
sink.bodyValueBufferer.?.run(value, owned_readable_stream) catch |buffering_error| {
498499
defer sink.deref();
499500
return switch (buffering_error) {
500501
error.StreamAlreadyUsed => {

src/bun.js/api/server/RequestContext.zig

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -677,15 +677,11 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
677677
}
678678

679679
if (this.response_ptr) |response| {
680-
const bodyValue = response.getBodyValue();
681-
if (bodyValue.* == .Locked) {
682-
var strong_readable = bodyValue.Locked.readable;
683-
bodyValue.Locked.readable = .{};
684-
defer strong_readable.deinit();
685-
if (strong_readable.get(globalThis)) |readable| {
686-
readable.abort(globalThis);
687-
any_js_calls = true;
688-
}
680+
if (response.getBodyReadableStream(globalThis)) |stream| {
681+
stream.value.ensureStillAlive();
682+
response.detachReadableStream(globalThis);
683+
stream.abort(globalThis);
684+
any_js_calls = true;
689685
}
690686
}
691687
}
@@ -1091,7 +1087,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
10911087
}
10921088

10931089
pub fn doRenderWithBodyLocked(this: *anyopaque, value: *jsc.WebCore.Body.Value) void {
1094-
doRenderWithBody(bun.cast(*RequestContext, this), value);
1090+
doRenderWithBody(bun.cast(*RequestContext, this), value, null);
10951091
}
10961092

10971093
fn renderWithBlobFromBodyValue(this: *RequestContext) void {
@@ -1664,13 +1660,13 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
16641660

16651661
if (req.response_ptr) |resp| {
16661662
assert(req.server != null);
1663+
const globalThis = req.server.?.globalThis;
16671664
const bodyValue = resp.getBodyValue();
1668-
if (bodyValue.* == .Locked) {
1669-
const global = bodyValue.Locked.global;
1670-
if (bodyValue.Locked.readable.get(global)) |stream| {
1671-
stream.done(global);
1672-
}
1673-
bodyValue.Locked.readable.deinit();
1665+
if (resp.getBodyReadableStream(globalThis)) |stream| {
1666+
stream.value.ensureStillAlive();
1667+
resp.detachReadableStream(globalThis);
1668+
1669+
stream.done(globalThis);
16741670
bodyValue.* = .{ .Used = {} };
16751671
}
16761672
}
@@ -1720,11 +1716,10 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
17201716

17211717
if (req.response_ptr) |resp| {
17221718
const bodyValue = resp.getBodyValue();
1723-
if (bodyValue.* == .Locked) {
1724-
if (bodyValue.Locked.readable.get(globalThis)) |stream| {
1725-
stream.done(globalThis);
1726-
}
1727-
bodyValue.Locked.readable.deinit();
1719+
if (resp.getBodyReadableStream(globalThis)) |stream| {
1720+
stream.value.ensureStillAlive();
1721+
resp.detachReadableStream(globalThis);
1722+
stream.done(globalThis);
17281723
bodyValue.* = .{ .Used = {} };
17291724
}
17301725
}
@@ -1802,7 +1797,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
18021797
req.endStream(req.shouldCloseConnection());
18031798
}
18041799

1805-
pub fn doRenderWithBody(this: *RequestContext, value: *jsc.WebCore.Body.Value) void {
1800+
pub fn doRenderWithBody(this: *RequestContext, value: *jsc.WebCore.Body.Value, owned_readable: ?jsc.WebCore.ReadableStream) void {
18061801
this.drainMicrotasks();
18071802

18081803
// If a ReadableStream can trivially be converted to a Blob, do so.
@@ -1832,11 +1827,20 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
18321827
if (this.isAbortedOrEnded()) {
18331828
return;
18341829
}
1835-
1836-
if (lock.readable.get(globalThis)) |stream_| {
1837-
const stream: jsc.WebCore.ReadableStream = stream_;
1838-
// we hold the stream alive until we're done with it
1839-
this.readable_stream_ref = lock.readable;
1830+
const readable_stream = brk: {
1831+
if (lock.readable.get(globalThis)) |stream| {
1832+
// we hold the stream alive until we're done with it
1833+
this.readable_stream_ref = lock.readable;
1834+
break :brk stream;
1835+
}
1836+
if (owned_readable) |stream| {
1837+
// response owns the stream, so we hold a strong reference to it
1838+
this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis);
1839+
break :brk stream;
1840+
}
1841+
break :brk null;
1842+
};
1843+
if (readable_stream) |stream| {
18401844
value.* = .{ .Used = {} };
18411845

18421846
if (stream.isLocked(globalThis)) {
@@ -1915,7 +1919,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
19151919
// someone else is waiting for the stream or waiting for `onStartStreaming`
19161920
const readable = value.toReadableStream(globalThis) catch return; // TODO: properly propagate exception upwards
19171921
readable.ensureStillAlive();
1918-
this.doRenderWithBody(value);
1922+
this.doRenderWithBody(value, null);
19191923
return;
19201924
}
19211925

@@ -1996,7 +2000,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
19962000
return;
19972001
}
19982002
var response = this.response_ptr.?;
1999-
this.doRenderWithBody(response.getBodyValue());
2003+
this.doRenderWithBody(response.getBodyValue(), response.getBodyReadableStream(this.server.?.globalThis));
20002004
}
20012005

20022006
pub fn renderProductionError(this: *RequestContext, status: u16) void {

src/bun.js/webcore/Body.zig

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ pub fn clone(this: *Body, globalThis: *JSGlobalObject) bun.JSError!Body {
2222
};
2323
}
2424

25+
pub fn cloneWithReadableStream(this: *Body, globalThis: *JSGlobalObject, readable: ?*jsc.WebCore.ReadableStream) bun.JSError!Body {
26+
return Body{
27+
.value = try this.value.cloneWithReadableStream(globalThis, readable),
28+
};
29+
}
30+
2531
pub fn writeFormat(this: *Body, comptime Formatter: type, formatter: *Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void {
2632
const Writer = @TypeOf(writer);
2733

@@ -964,20 +970,34 @@ pub const Value = union(Tag) {
964970
}
965971
}
966972

967-
pub fn tee(this: *Value, globalThis: *jsc.JSGlobalObject) bun.JSError!Value {
973+
pub fn tee(this: *Value, globalThis: *jsc.JSGlobalObject, owned_readable: ?*jsc.WebCore.ReadableStream) bun.JSError!Value {
968974
var locked = &this.Locked;
975+
if (owned_readable) |readable| {
976+
if (readable.isDisturbed(globalThis)) {
977+
return Value{ .Used = {} };
978+
}
969979

970-
if (locked.readable.isDisturbed(globalThis)) {
971-
return Value{ .Used = {} };
972-
}
980+
if (try readable.tee(globalThis)) |new_readable| {
981+
return Value{
982+
.Locked = .{
983+
.readable = jsc.WebCore.ReadableStream.Strong.init(new_readable[1], globalThis),
984+
.global = globalThis,
985+
},
986+
};
987+
}
988+
} else {
989+
if (locked.readable.isDisturbed(globalThis)) {
990+
return Value{ .Used = {} };
991+
}
973992

974-
if (try locked.readable.tee(globalThis)) |readable| {
975-
return Value{
976-
.Locked = .{
977-
.readable = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis),
978-
.global = globalThis,
979-
},
980-
};
993+
if (try locked.readable.tee(globalThis)) |readable| {
994+
return Value{
995+
.Locked = .{
996+
.readable = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis),
997+
.global = globalThis,
998+
},
999+
};
1000+
}
9811001
}
9821002
if (locked.promise != null or locked.action != .none or locked.readable.has()) {
9831003
return Value{ .Used = {} };
@@ -1032,10 +1052,14 @@ pub const Value = union(Tag) {
10321052
}
10331053

10341054
pub fn clone(this: *Value, globalThis: *jsc.JSGlobalObject) bun.JSError!Value {
1055+
return this.cloneWithReadableStream(globalThis, null);
1056+
}
1057+
1058+
pub fn cloneWithReadableStream(this: *Value, globalThis: *jsc.JSGlobalObject, readable: ?*jsc.WebCore.ReadableStream) bun.JSError!Value {
10351059
this.toBlobIfPossible();
10361060

10371061
if (this.* == .Locked) {
1038-
return this.tee(globalThis);
1062+
return this.tee(globalThis, readable);
10391063
}
10401064

10411065
if (this.* == .InternalBlob) {
@@ -1049,10 +1073,6 @@ pub const Value = union(Tag) {
10491073
};
10501074
}
10511075

1052-
// if (this.* == .InlineBlob) {
1053-
// return this.*;
1054-
// }
1055-
10561076
if (this.* == .Blob) {
10571077
return Value{ .Blob = this.Blob.dupe() };
10581078
}
@@ -1372,7 +1392,7 @@ pub const ValueBufferer = struct {
13721392
return this;
13731393
}
13741394

1375-
pub fn run(sink: *@This(), value: *jsc.WebCore.Body.Value) !void {
1395+
pub fn run(sink: *@This(), value: *jsc.WebCore.Body.Value, owned_readable_stream: ?jsc.WebCore.ReadableStream) !void {
13761396
value.toBlobIfPossible();
13771397

13781398
switch (value.*) {
@@ -1410,7 +1430,7 @@ pub const ValueBufferer = struct {
14101430
return;
14111431
},
14121432
.Locked => {
1413-
try sink.bufferLockedBodyValue(value);
1433+
try sink.bufferLockedBodyValue(value, owned_readable_stream);
14141434
},
14151435
}
14161436
}
@@ -1563,12 +1583,23 @@ pub const ValueBufferer = struct {
15631583
return error.PipeFailed;
15641584
}
15651585

1566-
fn bufferLockedBodyValue(sink: *@This(), value: *jsc.WebCore.Body.Value) !void {
1586+
fn bufferLockedBodyValue(sink: *@This(), value: *jsc.WebCore.Body.Value, owned_readable_stream: ?jsc.WebCore.ReadableStream) !void {
15671587
assert(value.* == .Locked);
15681588
const locked = &value.Locked;
1569-
if (locked.readable.get(sink.global)) |stream| {
1570-
// keep the stream alive until we're done with it
1571-
sink.readable_stream_ref = locked.readable;
1589+
const readable_stream = brk: {
1590+
if (locked.readable.get(sink.global)) |stream| {
1591+
// keep the stream alive until we're done with it
1592+
sink.readable_stream_ref = locked.readable;
1593+
break :brk stream;
1594+
}
1595+
if (owned_readable_stream) |stream| {
1596+
// response owns the stream, so we hold a strong reference to it
1597+
sink.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(stream, sink.global);
1598+
break :brk stream;
1599+
}
1600+
break :brk null;
1601+
};
1602+
if (readable_stream) |stream| {
15721603
value.* = .{ .Used = {} };
15731604

15741605
if (stream.isLocked(sink.global)) {
@@ -1616,7 +1647,7 @@ pub const ValueBufferer = struct {
16161647
const readable = try value.toReadableStream(sink.global);
16171648
readable.ensureStillAlive();
16181649
readable.protect();
1619-
return try sink.bufferLockedBodyValue(value);
1650+
return try sink.bufferLockedBodyValue(value, null);
16201651
}
16211652
// is safe to wait it buffer
16221653
locked.task = @ptrCast(sink);

src/bun.js/webcore/Response.zig

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub const fromJSDirect = js.fromJSDirect;
1717
/// We increment this count in fetch so if JS Response is discarted we can resolve the Body
1818
/// In the server we use a flag response_protected to protect/unprotect the response
1919
ref_count: u32 = 1,
20+
#js_ref: jsc.JSRef = .empty(),
2021

2122
// We must report a consistent value for this
2223
#reported_estimated_size: usize = 0,
@@ -104,15 +105,66 @@ pub fn calculateEstimatedByteSize(this: *Response) void {
104105

105106
pub fn toJS(this: *Response, globalObject: *JSGlobalObject) JSValue {
106107
this.calculateEstimatedByteSize();
107-
return js.toJSUnchecked(globalObject, this);
108+
const js_value = js.toJSUnchecked(globalObject, this);
109+
this.#js_ref = .initWeak(js_value);
110+
111+
if (this.getBodyReadableStream(globalObject)) |stream| {
112+
// we dont hold a strong reference to the stream we will guard it in js.gc.stream
113+
// so we avoid cycled references
114+
// anyone using Response should not use Locked.readable directly because it dont always owns it
115+
// the owner will be always the Response object it self
116+
stream.value.ensureStillAlive();
117+
this.detachReadableStream(globalObject);
118+
js.gc.stream.set(js_value, globalObject, stream.value);
119+
}
120+
return js_value;
108121
}
109122

110-
pub fn getBodyValue(
123+
pub inline fn getBodyValue(
111124
this: *Response,
112125
) *Body.Value {
113126
return &this.#body.value;
114127
}
115128

129+
pub inline fn getBodyReadableStream(
130+
this: *Response,
131+
globalObject: *JSGlobalObject,
132+
) ?jsc.WebCore.ReadableStream {
133+
if (this.#js_ref.tryGet()) |js_ref| {
134+
if (js.gc.stream.get(js_ref)) |stream| {
135+
// JS is always source of truth for the stream
136+
return jsc.WebCore.ReadableStream.fromJS(stream, globalObject) catch |err| {
137+
_ = globalObject.takeException(err);
138+
return null;
139+
};
140+
}
141+
}
142+
if (this.#body.value == .Locked) {
143+
return this.#body.value.Locked.readable.get(globalObject);
144+
}
145+
return null;
146+
}
147+
pub inline fn detachReadableStream(this: *Response, globalObject: *jsc.JSGlobalObject) void {
148+
if (this.#js_ref.tryGet()) |js_ref| {
149+
js.gc.stream.clear(js_ref, globalObject);
150+
}
151+
if (this.#body.value == .Locked) {
152+
var old = this.#body.value.Locked.readable;
153+
old.deinit();
154+
this.#body.value.Locked.readable = .{};
155+
}
156+
}
157+
pub inline fn setSizeHint(this: *Response, size_hint: Blob.SizeType) void {
158+
if (this.#body.value == .Locked) {
159+
this.#body.value.Locked.size_hint = size_hint;
160+
if (this.#body.value.Locked.readable.get(this.#body.value.Locked.global)) |readable| {
161+
if (readable.ptr == .Bytes) {
162+
readable.ptr.Bytes.size_hint = size_hint;
163+
}
164+
}
165+
}
166+
}
167+
116168
pub export fn jsFunctionRequestOrResponseHasBodyValue(_: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) callconv(jsc.conv) jsc.JSValue {
117169
const arguments = callframe.arguments_old(1);
118170
const this_value = arguments.ptr[0];
@@ -346,7 +398,17 @@ pub fn cloneValue(
346398
this: *Response,
347399
globalThis: *JSGlobalObject,
348400
) bun.JSError!Response {
349-
var body = try this.#body.clone(globalThis);
401+
var body = brk: {
402+
if (this.#js_ref.tryGet()) |js_ref| {
403+
if (js.gc.stream.get(js_ref)) |stream| {
404+
var readable = try jsc.WebCore.ReadableStream.fromJS(stream, globalThis);
405+
if (readable != null) {
406+
break :brk try this.#body.cloneWithReadableStream(globalThis, &readable.?);
407+
}
408+
}
409+
}
410+
break :brk try this.#body.clone(globalThis);
411+
};
350412
errdefer body.deinit(bun.default_allocator);
351413
var _init = try this.#init.clone(globalThis);
352414
errdefer _init.deinit(bun.default_allocator);
@@ -394,6 +456,7 @@ pub fn unref(this: *Response) void {
394456
pub fn finalize(
395457
this: *Response,
396458
) callconv(.C) void {
459+
this.#js_ref.finalize();
397460
this.unref();
398461
}
399462

@@ -581,7 +644,9 @@ pub fn constructError(
581644
},
582645
);
583646

584-
return response.toJS(globalThis);
647+
const js_value = response.toJS(globalThis);
648+
response.#js_ref = .initWeak(js_value);
649+
return js_value;
585650
}
586651

587652
pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!*Response {

0 commit comments

Comments
 (0)