From b21a20877863395c9f008f761785493e519e9ab8 Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Mon, 6 Oct 2025 17:04:50 -0700 Subject: [PATCH 1/6] isolate Response fields --- src/bake/DevServer.zig | 10 +- src/bun.js/api/filesystem_router.zig | 2 +- src/bun.js/api/html_rewriter.zig | 71 ++++--- src/bun.js/api/server.zig | 2 +- src/bun.js/api/server/FileRoute.zig | 13 +- src/bun.js/api/server/RequestContext.zig | 62 +++--- src/bun.js/api/server/StaticRoute.zig | 15 +- src/bun.js/node/types.zig | 5 +- src/bun.js/webcore/BakeResponse.zig | 12 +- src/bun.js/webcore/Blob.zig | 14 +- src/bun.js/webcore/Request.zig | 14 +- src/bun.js/webcore/Response.zig | 246 ++++++++++++++--------- src/bun.js/webcore/fetch.zig | 127 ++++++------ 13 files changed, 335 insertions(+), 258 deletions(-) diff --git a/src/bake/DevServer.zig b/src/bake/DevServer.zig index 9264eeafa6935e..9e7c1a7725a1fe 100644 --- a/src/bake/DevServer.zig +++ b/src/bake/DevServer.zig @@ -3353,13 +3353,15 @@ fn sendSerializedFailures( } } const fetch_headers = try headers.toFetchHeaders(r.global); - var response = Response{ - .body = .{ .value = .{ .Blob = any_blob.toBlob(r.global) } }, - .init = Response.Init{ + var response = Response.init( + .{ .status_code = 500, .headers = fetch_headers, }, - }; + .{ .value = .{ .Blob = any_blob.toBlob(r.global) } }, + bun.String.empty, + false, + ); dev.vm.eventLoop().enter(); r.promise.reject(r.global, response.toJS(r.global)); defer dev.vm.eventLoop().exit(); diff --git a/src/bun.js/api/filesystem_router.zig b/src/bun.js/api/filesystem_router.zig index 6a6cd56d0154d3..2b1147239ba68d 100644 --- a/src/bun.js/api/filesystem_router.zig +++ b/src/bun.js/api/filesystem_router.zig @@ -281,7 +281,7 @@ pub const FileSystemRouter = struct { } if (argument.as(jsc.WebCore.Response)) |resp| { - break :brk resp.url.toUTF8(globalThis.allocator()); + break :brk resp.getUTF8Url(globalThis.allocator()); } } diff --git a/src/bun.js/api/html_rewriter.zig b/src/bun.js/api/html_rewriter.zig index b928bb7e8da05f..a18e2c3f9887c1 100644 --- a/src/bun.js/api/html_rewriter.zig +++ b/src/bun.js/api/html_rewriter.zig @@ -174,7 +174,8 @@ pub const HTMLRewriter = struct { pub fn transform_(this: *HTMLRewriter, global: *JSGlobalObject, response_value: jsc.JSValue) bun.JSError!JSValue { if (response_value.as(Response)) |response| { - if (response.body.value == .Used) { + const body_value = response.getBodyValue(); + if (body_value.* == .Used) { return global.throwInvalidArguments("Response body already used", .{}); } const out = try this.beginTransform(global, response); @@ -198,12 +199,14 @@ pub const HTMLRewriter = struct { if (kind != .other) { { const body_value = try jsc.WebCore.Body.extract(global, response_value); - const resp = bun.new(Response, Response{ - .init = .{ + const resp = bun.new(Response, Response.init( + .{ .status_code = 200, }, - .body = body_value, - }); + body_value, + bun.String.empty, + false, + )); defer resp.finalize(); const out_response_value = try this.beginTransform(global, resp); // Check if the returned value is an error and throw it properly @@ -212,7 +215,7 @@ pub const HTMLRewriter = struct { } out_response_value.ensureStillAlive(); var out_response = out_response_value.as(Response) orelse return out_response_value; - var blob = out_response.body.value.useAsAnyBlobAllowNonUTF8String(); + var blob = out_response.getBodyValue().useAsAnyBlobAllowNonUTF8String(); defer { _ = Response.js.dangerouslySetPtr(out_response_value, null); @@ -415,11 +418,11 @@ pub const HTMLRewriter = struct { .response = undefined, }); defer sink.deref(); - var result = bun.new(Response, .{ - .init = .{ + var result = bun.new(Response, Response.init( + .{ .status_code = 200, }, - .body = .{ + .{ .value = .{ .Locked = .{ .global = global, @@ -427,11 +430,13 @@ pub const HTMLRewriter = struct { }, }, }, - }); + bun.String.empty, + false, + )); sink.response = result; var sink_error: jsc.JSValue = .zero; - const input_size = original.body.len(); + const input_size = original.getBodyLen(); var vm = global.bunVM(); // Since we're still using vm.waitForPromise, we have to also @@ -467,20 +472,22 @@ pub const HTMLRewriter = struct { return createLOLHTMLError(global); }; - result.init.method = original.init.method; - result.init.status_code = original.init.status_code; - result.init.status_text = original.init.status_text.clone(); + result.setInit( + original.getMethod(), + original.getInitStatusCode(), + original.getInitStatusText().clone(), + ); // https://github.com/oven-sh/bun/issues/3334 - if (original.init.headers) |headers| { - result.init.headers = try headers.cloneThis(global); + if (original.getInitHeaders()) |_headers| { + result.setInitHeaders(try _headers.cloneThis(global)); } // Hold off on cloning until we're actually done. const response_js_value = sink.response.toJS(sink.global); sink.response_value.set(global, response_js_value); - result.url = original.url.clone(); + result.setUrl(original.getUrl().clone()); const value = original.getBodyValue(); sink.ref(); @@ -522,21 +529,22 @@ pub const HTMLRewriter = struct { pub fn onFinishedBuffering(sink: *BufferOutputSink, bytes: []const u8, js_err: ?jsc.WebCore.Body.Value.ValueError, is_async: bool) void { defer sink.deref(); if (js_err) |err| { - if (sink.response.body.value == .Locked and @intFromPtr(sink.response.body.value.Locked.task) == @intFromPtr(sink) and - sink.response.body.value.Locked.promise == null) + const sinkBodyValue = sink.response.getBodyValue(); + if (sinkBodyValue.* == .Locked and @intFromPtr(sinkBodyValue.Locked.task) == @intFromPtr(sink) and + sinkBodyValue.Locked.promise == null) { - sink.response.body.value.Locked.readable.deinit(); - sink.response.body.value = .{ .Empty = {} }; + sinkBodyValue.Locked.readable.deinit(); + sinkBodyValue.* = .{ .Empty = {} }; // is there a pending promise? // we will need to reject it - } else if (sink.response.body.value == .Locked and @intFromPtr(sink.response.body.value.Locked.task) == @intFromPtr(sink) and - sink.response.body.value.Locked.promise != null) + } else if (sinkBodyValue.* == .Locked and @intFromPtr(sinkBodyValue.Locked.task) == @intFromPtr(sink) and + sinkBodyValue.Locked.promise != null) { - sink.response.body.value.Locked.onReceiveValue = null; - sink.response.body.value.Locked.task = null; + sinkBodyValue.Locked.onReceiveValue = null; + sinkBodyValue.Locked.task = null; } if (is_async) { - sink.response.body.value.toErrorInstance(err.dupe(sink.global), sink.global); + sinkBodyValue.toErrorInstance(err.dupe(sink.global), sink.global); } else { var ret_err = createLOLHTMLError(sink.global); ret_err.ensureStillAlive(); @@ -566,7 +574,7 @@ pub const HTMLRewriter = struct { sink.rewriter.?.write(bytes) catch { if (is_async) { - response.body.value.toErrorInstance(.{ .Message = createLOLHTMLStringError() }, global); + response.getBodyValue().toErrorInstance(.{ .Message = createLOLHTMLStringError() }, global); return null; } else { return createLOLHTMLError(global); @@ -577,7 +585,7 @@ pub const HTMLRewriter = struct { if (!is_async) response.finalize(); sink.response = undefined; if (is_async) { - response.body.value.toErrorInstance(.{ .Message = createLOLHTMLStringError() }, global); + response.getBodyValue().toErrorInstance(.{ .Message = createLOLHTMLStringError() }, global); return null; } else { return createLOLHTMLError(global); @@ -590,8 +598,9 @@ pub const HTMLRewriter = struct { pub const Sync = enum { suspended, pending, done }; pub fn done(this: *BufferOutputSink) void { - var prev_value = this.response.body.value; - this.response.body.value = jsc.WebCore.Body.Value{ + const bodyValue = this.response.getBodyValue(); + var prev_value = bodyValue.*; + bodyValue.* = jsc.WebCore.Body.Value{ .InternalBlob = .{ .bytes = this.bytes.list.toManaged(bun.default_allocator), }, @@ -606,7 +615,7 @@ pub const HTMLRewriter = struct { }; prev_value.resolve( - &this.response.body.value, + bodyValue, this.global, null, ); diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 1448dc2e5232b4..f4a452f15526b2 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1228,7 +1228,7 @@ pub fn NewServer(protocol_enum: enum { http, https }, development_kind: enum { d } if (response_value.as(jsc.WebCore.Response)) |resp| { - resp.url = existing_request.url.clone(); + resp.setUrl(existing_request.url.clone()); } return jsc.JSPromise.resolvedPromiseValue(ctx, response_value); } diff --git a/src/bun.js/api/server/FileRoute.zig b/src/bun.js/api/server/FileRoute.zig index ab4b57bfa8b0c1..bb1981659d2a57 100644 --- a/src/bun.js/api/server/FileRoute.zig +++ b/src/bun.js/api/server/FileRoute.zig @@ -59,18 +59,19 @@ pub fn memoryCost(this: *const FileRoute) usize { pub fn fromJS(globalThis: *jsc.JSGlobalObject, argument: jsc.JSValue) bun.JSError!?*FileRoute { if (argument.as(jsc.WebCore.Response)) |response| { - response.body.value.toBlobIfPossible(); - if (response.body.value == .Blob and response.body.value.Blob.needsToReadFile()) { - if (response.body.value.Blob.store.?.data.file.pathlike == .fd) { + const bodyValue = response.getBodyValue(); + bodyValue.toBlobIfPossible(); + if (bodyValue.* == .Blob and bodyValue.Blob.needsToReadFile()) { + if (bodyValue.Blob.store.?.data.file.pathlike == .fd) { return globalThis.throwTODO("Support serving files from a file descriptor. Please pass a path instead."); } - var blob = response.body.value.use(); + var blob = bodyValue.use(); blob.globalThis = globalThis; bun.assertf(!blob.isHeapAllocated(), "expected blob not to be heap-allocated", .{}); - response.body.value = .{ .Blob = blob.dupe() }; - const headers = bun.handleOom(Headers.from(response.init.headers, bun.default_allocator, .{ .body = &.{ .Blob = blob } })); + bodyValue.* = .{ .Blob = blob.dupe() }; + const headers = bun.handleOom(Headers.from(response.getInitHeaders(), bun.default_allocator, .{ .body = &.{ .Blob = blob } })); return bun.new(FileRoute, .{ .ref_count = .init(), diff --git a/src/bun.js/api/server/RequestContext.zig b/src/bun.js/api/server/RequestContext.zig index 84b83f0a1d08d8..d6d5604a70cd61 100644 --- a/src/bun.js/api/server/RequestContext.zig +++ b/src/bun.js/api/server/RequestContext.zig @@ -677,9 +677,10 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } if (this.response_ptr) |response| { - if (response.body.value == .Locked) { - var strong_readable = response.body.value.Locked.readable; - response.body.value.Locked.readable = .{}; + const bodyValue = response.getBodyValue(); + if (bodyValue.* == .Locked) { + var strong_readable = bodyValue.Locked.readable; + bodyValue.Locked.readable = .{}; defer strong_readable.deinit(); if (strong_readable.get(globalThis)) |readable| { readable.abort(globalThis); @@ -1214,7 +1215,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } // TODO: should this timeout? - this.response_ptr.?.body.value = .{ + const bodyValue = this.response_ptr.?.getBodyValue(); + bodyValue.* = .{ .Locked = .{ .readable = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis), .global = globalThis, @@ -1454,10 +1456,11 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } } // not content-length or transfer-encoding so we need to respect the body - response.body.value.toBlobIfPossible(); - switch (response.body.value) { + const bodyValue = response.getBodyValue(); + bodyValue.toBlobIfPossible(); + switch (bodyValue.*) { .InternalBlob, .WTFStringImpl => { - var blob = response.body.value.useAsAnyBlobAllowNonUTF8String(); + var blob = bodyValue.useAsAnyBlobAllowNonUTF8String(); defer blob.detach(); const size = blob.size(); this.renderMetadata(); @@ -1559,9 +1562,10 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } return; } else { - response.body.value.toBlobIfPossible(); + const bodyValue = response.getBodyValue(); + bodyValue.toBlobIfPossible(); - switch (response.body.value) { + switch (bodyValue.*) { .Blob => |*blob| { if (blob.needsToReadFile()) { response_value.protect(); @@ -1618,8 +1622,9 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } return; } - response.body.value.toBlobIfPossible(); - switch (response.body.value) { + const bodyValue = response.getBodyValue(); + bodyValue.toBlobIfPossible(); + switch (bodyValue.*) { .Blob => |*blob| { if (blob.needsToReadFile()) { fulfilled_value.protect(); @@ -1659,14 +1664,14 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (req.response_ptr) |resp| { assert(req.server != null); - - if (resp.body.value == .Locked) { - const global = resp.body.value.Locked.global; - if (resp.body.value.Locked.readable.get(global)) |stream| { + const bodyValue = resp.getBodyValue(); + if (bodyValue.* == .Locked) { + const global = bodyValue.Locked.global; + if (bodyValue.Locked.readable.get(global)) |stream| { stream.done(global); } - resp.body.value.Locked.readable.deinit(); - resp.body.value = .{ .Used = {} }; + bodyValue.Locked.readable.deinit(); + bodyValue.* = .{ .Used = {} }; } } @@ -1714,12 +1719,13 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } if (req.response_ptr) |resp| { - if (resp.body.value == .Locked) { - if (resp.body.value.Locked.readable.get(globalThis)) |stream| { + const bodyValue = resp.getBodyValue(); + if (bodyValue.* == .Locked) { + if (bodyValue.Locked.readable.get(globalThis)) |stream| { stream.done(globalThis); } - resp.body.value.Locked.readable.deinit(); - resp.body.value = .{ .Used = {} }; + bodyValue.Locked.readable.deinit(); + bodyValue.* = .{ .Used = {} }; } } @@ -1990,7 +1996,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, return; } var response = this.response_ptr.?; - this.doRenderWithBody(&response.body.value); + this.doRenderWithBody(response.getBodyValue()); } pub fn renderProductionError(this: *RequestContext, status: u16) void { @@ -2162,8 +2168,9 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, ctx.flags.response_protected = false; ctx.response_ptr = response; - response.body.value.toBlobIfPossible(); - switch (response.body.value) { + const bodyValue = response.getBodyValue(); + bodyValue.toBlobIfPossible(); + switch (bodyValue.*) { .Blob => |*blob| { if (blob.needsToReadFile()) { fulfilled_value.protect(); @@ -2216,14 +2223,15 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, status; const content_type, const needs_content_type, const content_type_needs_free = getContentType( - response.init.headers, + response.getInitHeaders(), &this.blob, this.allocator, ); defer if (content_type_needs_free) content_type.deinit(this.allocator); var has_content_disposition = false; var has_content_range = false; - if (response.init.headers) |headers_| { + if (response.swapInitHeaders()) |headers_| { + defer headers_.deref(); has_content_disposition = headers_.fastHas(.ContentDisposition); has_content_range = headers_.fastHas(.ContentRange); needs_content_range = needs_content_range and has_content_range; @@ -2233,8 +2241,6 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, this.doWriteStatus(status); this.doWriteHeaders(headers_); - response.init.headers = null; - headers_.deref(); } else if (needs_content_range) { status = 206; this.doWriteStatus(status); diff --git a/src/bun.js/api/server/StaticRoute.zig b/src/bun.js/api/server/StaticRoute.zig index 9e931826a85f7e..1ddfcaa2ad0dcd 100644 --- a/src/bun.js/api/server/StaticRoute.zig +++ b/src/bun.js/api/server/StaticRoute.zig @@ -91,10 +91,11 @@ pub fn fromJS(globalThis: *jsc.JSGlobalObject, argument: jsc.JSValue) bun.JSErro // The user may want to pass in the same Response object multiple endpoints // Let's let them do that. - response.body.value.toBlobIfPossible(); + const bodyValue = response.getBodyValue(); + bodyValue.toBlobIfPossible(); const blob: AnyBlob = brk: { - switch (response.body.value) { + switch (bodyValue.*) { .Used => { return globalThis.throwInvalidArguments("Response body has already been used", .{}); }, @@ -108,17 +109,17 @@ pub fn fromJS(globalThis: *jsc.JSGlobalObject, argument: jsc.JSValue) bun.JSErro }, .Blob, .InternalBlob, .WTFStringImpl => { - if (response.body.value == .Blob and response.body.value.Blob.needsToReadFile()) { + if (bodyValue.* == .Blob and bodyValue.Blob.needsToReadFile()) { return globalThis.throwTODO("TODO: support Bun.file(path) in static routes"); } - var blob = response.body.value.use(); + var blob = bodyValue.use(); blob.globalThis = globalThis; bun.assertf( !blob.isHeapAllocated(), "expected blob not to be heap-allocated", .{}, ); - response.body.value = .{ .Blob = blob.dupe() }; + bodyValue.* = .{ .Blob = blob.dupe() }; break :brk .{ .Blob = blob }; }, @@ -131,13 +132,13 @@ pub fn fromJS(globalThis: *jsc.JSGlobalObject, argument: jsc.JSValue) bun.JSErro var has_content_disposition = false; - if (response.init.headers) |headers| { + if (response.getInitHeaders()) |headers| { has_content_disposition = headers.fastHas(.ContentDisposition); headers.fastRemove(.TransferEncoding); headers.fastRemove(.ContentLength); } - var headers: Headers = if (response.init.headers) |h| + var headers: Headers = if (response.getInitHeaders()) |h| Headers.from(h, bun.default_allocator, .{ .body = &blob, }) catch { diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig index d2162a9372b547..a06292877a1bb5 100644 --- a/src/bun.js/node/types.zig +++ b/src/bun.js/node/types.zig @@ -98,9 +98,10 @@ pub const BlobOrStringOrBuffer = union(enum) { } if (value.as(jsc.WebCore.Response)) |response| { - response.body.value.toBlobIfPossible(); + const bodyValue = response.getBodyValue(); + bodyValue.toBlobIfPossible(); - if (response.body.value.tryUseAsAnyBlob()) |any_blob_| { + if (bodyValue.tryUseAsAnyBlob()) |any_blob_| { var any_blob = any_blob_; defer any_blob.detach(); return .{ .blob = any_blob.toBlob(global) }; diff --git a/src/bun.js/webcore/BakeResponse.zig b/src/bun.js/webcore/BakeResponse.zig index 4c03c755c017d0..9e1485dfb8faf0 100644 --- a/src/bun.js/webcore/BakeResponse.zig +++ b/src/bun.js/webcore/BakeResponse.zig @@ -105,11 +105,8 @@ pub fn constructRender( defer path_utf8.deinit(); // Create a Response with Render body - const response = bun.new(Response, Response{ - .body = Body{ - .value = .Empty, - }, - .init = Response.Init{ + const response = bun.new(Response, Response.init( + .{ .status_code = 200, .headers = headers: { var headers = bun.webcore.FetchHeaders.createEmpty(); @@ -117,7 +114,10 @@ pub fn constructRender( break :headers headers; }, }, - }); + .{ .value = .Empty }, + bun.String.empty, + false, + )); const response_js = toJSForSSR(response, globalThis, .render); response_js.ensureStillAlive(); diff --git a/src/bun.js/webcore/Blob.zig b/src/bun.js/webcore/Blob.zig index 37e6ff6f254787..d8a829402b33fc 100644 --- a/src/bun.js/webcore/Blob.zig +++ b/src/bun.js/webcore/Blob.zig @@ -1340,7 +1340,8 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr // TODO: implement a writeev() fast path var source_blob: Blob = brk: { if (data.as(Response)) |response| { - switch (response.body.value) { + const bodyValue = response.getBodyValue(); + switch (bodyValue.*) { .WTFStringImpl, .InternalBlob, .Used, @@ -1348,11 +1349,11 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr .Blob, .Null, => { - break :brk response.body.use(); + break :brk bodyValue.use(); }, .Error => |*err_ref| { destination_blob.detach(); - _ = response.body.value.use(); + _ = bodyValue.use(); return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err_ref.toJS(globalThis)); }, .Locked => |*locked| { @@ -1360,7 +1361,7 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr const s3 = &destination_blob.store.?.data.s3; var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis); defer aws_options.deinit(); - _ = try response.body.value.toReadableStream(globalThis); + _ = try bodyValue.toReadableStream(globalThis); if (locked.readable.get(globalThis)) |readable| { if (readable.isDisturbed(globalThis)) { destination_blob.detach(); @@ -1392,9 +1393,8 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr .promise = jsc.JSPromise.Strong.init(globalThis), .mkdirp_if_not_exists = options.mkdirp_if_not_exists orelse true, }); - - response.body.value.Locked.task = task; - response.body.value.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap; + bodyValue.Locked.task = task; + bodyValue.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap; return task.promise.value(); }, } diff --git a/src/bun.js/webcore/Request.zig b/src/bun.js/webcore/Request.zig index 559f9cb32437e2..98e3886fef3773 100644 --- a/src/bun.js/webcore/Request.zig +++ b/src/bun.js/webcore/Request.zig @@ -619,29 +619,31 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV if (value.asDirect(Response)) |response| { if (!fields.contains(.method)) { - req.method = response.init.method; + req.method = response.getMethod(); fields.insert(.method); } if (!fields.contains(.headers)) { - if (response.init.headers) |headers| { + if (response.getInitHeaders()) |headers| { req._headers = try headers.cloneThis(globalThis); fields.insert(.headers); } } if (!fields.contains(.url)) { - if (!response.url.isEmpty()) { - req.url = response.url.dupeRef(); + const url = response.getUrl(); + if (!url.isEmpty()) { + req.url = url.dupeRef(); fields.insert(.url); } } if (!fields.contains(.body)) { - switch (response.body.value) { + const bodyValue = response.getBodyValue(); + switch (bodyValue.*) { .Null, .Empty, .Used => {}, else => { - req.body.value = try response.body.value.clone(globalThis); + req.body.value = try bodyValue.clone(globalThis); fields.insert(.body); }, } diff --git a/src/bun.js/webcore/Response.zig b/src/bun.js/webcore/Response.zig index fa0a6b3c067f59..470396b504792e 100644 --- a/src/bun.js/webcore/Response.zig +++ b/src/bun.js/webcore/Response.zig @@ -10,16 +10,16 @@ pub const js = jsc.Codegen.JSResponse; pub const fromJS = js.fromJS; pub const fromJSDirect = js.fromJSDirect; -body: Body, -init: Init, -url: bun.String = bun.String.empty, -redirected: bool = false, +#body: Body, +#init: Init, +#url: bun.String = bun.String.empty, +#redirected: bool = false, /// We increment this count in fetch so if JS Response is discarted we can resolve the Body /// In the server we use a flag response_protected to protect/unprotect the response ref_count: u32 = 1, // We must report a consistent value for this -reported_estimated_size: usize = 0, +#reported_estimated_size: usize = 0, pub const getText = ResponseMixin.getText; pub const getBody = ResponseMixin.getBody; @@ -31,6 +31,59 @@ pub const getBlob = ResponseMixin.getBlob; pub const getBlobWithoutCallFrame = ResponseMixin.getBlobWithoutCallFrame; pub const getFormData = ResponseMixin.getFormData; +pub fn init(response_init: Init, body: Body, url: bun.String, redirected: bool) Response { + return Response{ + .#init = response_init, + .#body = body, + .#url = url, + .#redirected = redirected, + }; +} + +pub inline fn setInit(this: *Response, method: Method, status_code: u16, status_text: bun.String) void { + this.#init.method = method; + this.#init.status_code = status_code; + this.#init.status_text.deref(); + this.#init.status_text = status_text; +} +pub inline fn setInitHeaders(this: *Response, headers: ?*FetchHeaders) void { + if (this.#init.headers) |_headers| { + _headers.deref(); + } + this.#init.headers = headers; +} +pub inline fn getInitStatusCode(this: *Response) u16 { + return this.#init.status_code; +} +pub inline fn getInitStatusText(this: *Response) bun.String { + return this.#init.status_text; +} +pub inline fn setUrl(this: *Response, url: bun.String) void { + this.#url.deref(); + this.#url = url; +} +pub inline fn getUTF8Url(this: *Response, allocator: std.mem.Allocator) ZigString.Slice { + return this.#url.toUTF8(allocator); +} +pub inline fn getUrl(this: *Response) bun.String { + return this.#url; +} +pub inline fn getInitHeaders(this: *Response) ?*FetchHeaders { + return this.#init.headers; +} +pub inline fn swapInitHeaders(this: *Response) ?*FetchHeaders { + if (this.#init.headers) |headers| { + this.#init.headers = null; + return headers; + } + return null; +} +pub inline fn getBodyLen(this: *Response) usize { + return this.#body.len(); +} +pub inline fn getMethod(this: *Response) Method { + return this.#init.method; +} pub fn getFormDataEncoding(this: *Response) bun.JSError!?*bun.FormData.AsyncFormData { var content_type_slice: ZigString.Slice = (try this.getContentType()) orelse return null; defer content_type_slice.deinit(); @@ -39,13 +92,13 @@ pub fn getFormDataEncoding(this: *Response) bun.JSError!?*bun.FormData.AsyncForm } pub fn estimatedSize(this: *Response) callconv(.C) usize { - return this.reported_estimated_size; + return this.#reported_estimated_size; } pub fn calculateEstimatedByteSize(this: *Response) void { - this.reported_estimated_size = this.body.value.estimatedSize() + - this.url.byteSlice().len + - this.init.status_text.byteSlice().len + + this.#reported_estimated_size = this.#body.value.estimatedSize() + + this.#url.byteSlice().len + + this.#init.status_text.byteSlice().len + @sizeOf(Response); } @@ -57,11 +110,10 @@ pub fn toJS(this: *Response, globalObject: *JSGlobalObject) JSValue { pub fn getBodyValue( this: *Response, ) *Body.Value { - return &this.body.value; + return &this.#body.value; } -pub export fn jsFunctionRequestOrResponseHasBodyValue(globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) callconv(jsc.conv) jsc.JSValue { - _ = globalObject; // autofix +pub export fn jsFunctionRequestOrResponseHasBodyValue(_: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) callconv(jsc.conv) jsc.JSValue { const arguments = callframe.arguments_old(1); const this_value = arguments.ptr[0]; if (this_value.isEmptyOrUndefinedOrNull()) { @@ -69,7 +121,7 @@ pub export fn jsFunctionRequestOrResponseHasBodyValue(globalObject: *jsc.JSGloba } if (this_value.as(Response)) |response| { - return jsc.JSValue.jsBoolean(!response.body.value.isDefinitelyEmpty()); + return jsc.JSValue.jsBoolean(!response.#body.value.isDefinitelyEmpty()); } else if (this_value.as(Request)) |request| { return jsc.JSValue.jsBoolean(!request.body.value.isDefinitelyEmpty()); } @@ -86,7 +138,7 @@ pub export fn jsFunctionGetCompleteRequestOrResponseBodyValueAsArrayBuffer(globa const body: *Body.Value = brk: { if (this_value.as(Response)) |response| { - break :brk &response.body.value; + break :brk &response.#body.value; } else if (this_value.as(Request)) |request| { break :brk &request.body.value; } @@ -115,11 +167,11 @@ pub export fn jsFunctionGetCompleteRequestOrResponseBodyValueAsArrayBuffer(globa pub fn getFetchHeaders( this: *Response, ) ?*FetchHeaders { - return this.init.headers; + return this.#init.headers; } pub inline fn statusCode(this: *const Response) u16 { - return this.init.status_code; + return this.#init.status_code; } pub fn redirectLocation(this: *const Response) ?[]const u8 { @@ -127,7 +179,7 @@ pub fn redirectLocation(this: *const Response) ?[]const u8 { } pub fn header(this: *const Response, name: bun.webcore.FetchHeaders.HTTPHeaderName) ?[]const u8 { - return if (try (this.init.headers orelse return null).fastGet(name)) |str| + return if (try (this.#init.headers orelse return null).fastGet(name)) |str| str.slice() else null; @@ -137,7 +189,7 @@ pub const Props = struct {}; pub fn writeFormat(this: *Response, comptime Formatter: type, formatter: *Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void { const Writer = @TypeOf(writer); - try writer.print("Response ({}) {{\n", .{bun.fmt.size(this.body.len(), .{})}); + try writer.print("Response ({}) {{\n", .{bun.fmt.size(this.#body.len(), .{})}); { formatter.indent += 1; @@ -151,20 +203,20 @@ pub fn writeFormat(this: *Response, comptime Formatter: type, formatter: *Format try formatter.writeIndent(Writer, writer); try writer.writeAll(comptime Output.prettyFmt("url: \"", enable_ansi_colors)); - try writer.print(comptime Output.prettyFmt("{}", enable_ansi_colors), .{this.url}); + try writer.print(comptime Output.prettyFmt("{}", enable_ansi_colors), .{this.#url}); try writer.writeAll("\""); try formatter.printComma(Writer, writer, enable_ansi_colors); try writer.writeAll("\n"); try formatter.writeIndent(Writer, writer); try writer.writeAll(comptime Output.prettyFmt("status: ", enable_ansi_colors)); - try formatter.printAs(.Double, Writer, writer, jsc.JSValue.jsNumber(this.init.status_code), .NumberObject, enable_ansi_colors); + try formatter.printAs(.Double, Writer, writer, jsc.JSValue.jsNumber(this.#init.status_code), .NumberObject, enable_ansi_colors); try formatter.printComma(Writer, writer, enable_ansi_colors); try writer.writeAll("\n"); try formatter.writeIndent(Writer, writer); try writer.writeAll(comptime Output.prettyFmt("statusText: ", enable_ansi_colors)); - try writer.print(comptime Output.prettyFmt("\"{}\"", enable_ansi_colors), .{this.init.status_text}); + try writer.print(comptime Output.prettyFmt("\"{}\"", enable_ansi_colors), .{this.#init.status_text}); try formatter.printComma(Writer, writer, enable_ansi_colors); try writer.writeAll("\n"); @@ -176,12 +228,12 @@ pub fn writeFormat(this: *Response, comptime Formatter: type, formatter: *Format try formatter.writeIndent(Writer, writer); try writer.writeAll(comptime Output.prettyFmt("redirected: ", enable_ansi_colors)); - try formatter.printAs(.Boolean, Writer, writer, jsc.JSValue.jsBoolean(this.redirected), .BooleanObject, enable_ansi_colors); + try formatter.printAs(.Boolean, Writer, writer, jsc.JSValue.jsBoolean(this.#redirected), .BooleanObject, enable_ansi_colors); try formatter.printComma(Writer, writer, enable_ansi_colors); try writer.writeAll("\n"); formatter.resetLine(); - try this.body.writeFormat(Formatter, formatter, writer, enable_ansi_colors); + try this.#body.writeFormat(Formatter, formatter, writer, enable_ansi_colors); } try writer.writeAll("\n"); try formatter.writeIndent(Writer, writer); @@ -190,7 +242,7 @@ pub fn writeFormat(this: *Response, comptime Formatter: type, formatter: *Format } pub fn isOK(this: *const Response) bool { - return this.init.status_code >= 200 and this.init.status_code <= 299; + return this.#init.status_code >= 200 and this.#init.status_code <= 299; } pub fn getURL( @@ -198,14 +250,14 @@ pub fn getURL( globalThis: *jsc.JSGlobalObject, ) jsc.JSValue { // https://developer.mozilla.org/en-US/docs/Web/API/Response/url - return this.url.toJS(globalThis); + return this.#url.toJS(globalThis); } pub fn getResponseType( this: *Response, globalThis: *jsc.JSGlobalObject, ) jsc.JSValue { - if (this.init.status_code < 200) { + if (this.#init.status_code < 200) { return bun.String.static("error").toJS(globalThis); } @@ -217,7 +269,7 @@ pub fn getStatusText( globalThis: *jsc.JSGlobalObject, ) jsc.JSValue { // https://developer.mozilla.org/en-US/docs/Web/API/Response/statusText - return this.init.status_text.toJS(globalThis); + return this.#init.status_text.toJS(globalThis); } pub fn getRedirected( @@ -225,7 +277,7 @@ pub fn getRedirected( _: *jsc.JSGlobalObject, ) jsc.JSValue { // https://developer.mozilla.org/en-US/docs/Web/API/Response/redirected - return JSValue.jsBoolean(this.redirected); + return JSValue.jsBoolean(this.#redirected); } pub fn getOK( @@ -237,18 +289,18 @@ pub fn getOK( } fn getOrCreateHeaders(this: *Response, globalThis: *jsc.JSGlobalObject) bun.JSError!*FetchHeaders { - if (this.init.headers == null) { - this.init.headers = FetchHeaders.createEmpty(); + if (this.#init.headers == null) { + this.#init.headers = FetchHeaders.createEmpty(); - if (this.body.value == .Blob) { - const content_type = this.body.value.Blob.content_type; + if (this.#body.value == .Blob) { + const content_type = this.#body.value.Blob.content_type; if (content_type.len > 0) { - try this.init.headers.?.put(.ContentType, content_type, globalThis); + try this.#init.headers.?.put(.ContentType, content_type, globalThis); } } } - return this.init.headers.?; + return this.#init.headers.?; } pub fn getHeaders( @@ -269,14 +321,14 @@ pub fn doClone( const js_wrapper = Response.makeMaybePooled(globalThis, cloned); if (js_wrapper != .zero) { - if (cloned.body.value == .Locked) { - if (cloned.body.value.Locked.readable.get(globalThis)) |readable| { + if (cloned.#body.value == .Locked) { + if (cloned.#body.value.Locked.readable.get(globalThis)) |readable| { // If we are teed, then we need to update the cached .body // value to point to the new readable stream // We must do this on both the original and cloned response // but especially the original response since it will have a stale .body value now. js.bodySetCached(js_wrapper, globalThis, readable.value); - if (this.body.value.Locked.readable.get(globalThis)) |other_readable| { + if (this.#body.value.Locked.readable.get(globalThis)) |other_readable| { js.bodySetCached(this_value, globalThis, other_readable.value); } } @@ -294,15 +346,15 @@ pub fn cloneValue( this: *Response, globalThis: *JSGlobalObject, ) bun.JSError!Response { - var body = try this.body.clone(globalThis); + var body = try this.#body.clone(globalThis); errdefer body.deinit(bun.default_allocator); - var init = try this.init.clone(globalThis); - errdefer init.deinit(bun.default_allocator); + var _init = try this.#init.clone(globalThis); + errdefer _init.deinit(bun.default_allocator); return Response{ - .body = body, - .init = init, - .url = this.url.clone(), - .redirected = this.redirected, + .#body = body, + .#init = _init, + .#url = this.#url.clone(), + .#redirected = this.#redirected, }; } @@ -315,13 +367,13 @@ pub fn getStatus( _: *jsc.JSGlobalObject, ) jsc.JSValue { // https://developer.mozilla.org/en-US/docs/Web/API/Response/status - return JSValue.jsNumber(this.init.status_code); + return JSValue.jsNumber(this.#init.status_code); } fn destroy(this: *Response) void { - this.init.deinit(bun.default_allocator); - this.body.deinit(bun.default_allocator); - this.url.deref(); + this.#init.deinit(bun.default_allocator); + this.#body.deinit(bun.default_allocator); + this.#url.deref(); bun.destroy(this); } @@ -348,15 +400,15 @@ pub fn finalize( pub fn getContentType( this: *Response, ) bun.JSError!?ZigString.Slice { - if (this.init.headers) |headers| { + if (this.#init.headers) |headers| { if (headers.fastGet(.ContentType)) |value| { return value.toSlice(bun.default_allocator); } } - if (this.body.value == .Blob) { - if (this.body.value.Blob.content_type.len > 0) - return ZigString.Slice.fromUTF8NeverFree(this.body.value.Blob.content_type); + if (this.#body.value == .Blob) { + if (this.#body.value.Blob.content_type.len > 0) + return ZigString.Slice.fromUTF8NeverFree(this.#body.value.Blob.content_type); } return null; @@ -371,19 +423,19 @@ pub fn constructJSON( var args = jsc.CallFrame.ArgumentsSlice.init(globalThis.bunVM(), args_list.ptr[0..args_list.len]); var response = Response{ - .body = Body{ + .#body = Body{ .value = .{ .Empty = {} }, }, - .init = Response.Init{ + .#init = Response.Init{ .status_code = 200, }, - .url = bun.String.empty, + .#url = bun.String.empty, }; var did_succeed = false; defer { if (!did_succeed) { - response.body.deinit(bun.default_allocator); - response.init.deinit(bun.default_allocator); + response.#body.deinit(bun.default_allocator); + response.#init.deinit(bun.default_allocator); } } const json_value = args.nextEat() orelse jsc.JSValue.zero; @@ -401,26 +453,26 @@ pub fn constructJSON( if (!str.isEmpty()) { if (str.value.WTFStringImpl.toUTF8IfNeeded(bun.default_allocator)) |bytes| { defer str.deref(); - response.body.value = .{ + response.#body.value = .{ .InternalBlob = InternalBlob{ .bytes = std.ArrayList(u8).fromOwnedSlice(bun.default_allocator, @constCast(bytes.slice())), .was_string = true, }, }; } else { - response.body.value = Body.Value{ + response.#body.value = Body.Value{ .WTFStringImpl = str.value.WTFStringImpl, }; } } } - if (args.nextEat()) |init| { - if (init.isUndefinedOrNull()) {} else if (init.isNumber()) { - response.init.status_code = @as(u16, @intCast(@min(@max(0, init.toInt32()), std.math.maxInt(u16)))); + if (args.nextEat()) |arg_init| { + if (arg_init.isUndefinedOrNull()) {} else if (arg_init.isNumber()) { + response.#init.status_code = @as(u16, @intCast(@min(@max(0, arg_init.toInt32()), std.math.maxInt(u16)))); } else { - if (Response.Init.init(globalThis, init) catch |err| if (err == error.JSError) return .zero else null) |_init| { - response.init = _init; + if (Response.Init.init(globalThis, arg_init) catch |err| if (err == error.JSError) return .zero else null) |_init| { + response.#init = _init; } } } @@ -463,13 +515,13 @@ pub fn constructRedirectImpl( defer url_string_slice.deinit(); var response: Response = brk: { var response = Response{ - .init = Response.Init{ + .#init = Response.Init{ .status_code = 302, }, - .body = Body{ + .#body = Body{ .value = .{ .Empty = {} }, }, - .url = bun.String.empty, + .#url = bun.String.empty, }; const url_string_value = args.nextEat() orelse jsc.JSValue.zero; @@ -482,20 +534,20 @@ pub fn constructRedirectImpl( var did_succeed = false; defer { if (!did_succeed) { - response.body.deinit(bun.default_allocator); - response.init.deinit(bun.default_allocator); + response.#body.deinit(bun.default_allocator); + response.#init.deinit(bun.default_allocator); } } - if (args.nextEat()) |init| { - if (init.isUndefinedOrNull()) {} else if (init.isNumber()) { - response.init.status_code = try validateRedirectStatusCode(globalThis, init.toInt32()); - } else if (try Response.Init.init(globalThis, init)) |_init| { - errdefer response.init.deinit(bun.default_allocator); - response.init = _init; + if (args.nextEat()) |arg_init| { + if (arg_init.isUndefinedOrNull()) {} else if (arg_init.isNumber()) { + response.#init.status_code = try validateRedirectStatusCode(globalThis, arg_init.toInt32()); + } else if (try Response.Init.init(globalThis, arg_init)) |_init| { + errdefer response.#init.deinit(bun.default_allocator); + response.#init = _init; if (_init.status_code != 200) { - response.init.status_code = try validateRedirectStatusCode(globalThis, _init.status_code); + response.#init.status_code = try validateRedirectStatusCode(globalThis, _init.status_code); } } } @@ -507,8 +559,8 @@ pub fn constructRedirectImpl( break :brk response; }; - response.init.headers = try response.getOrCreateHeaders(globalThis); - var headers_ref = response.init.headers.?; + response.#init.headers = try response.getOrCreateHeaders(globalThis); + var headers_ref = response.#init.headers.?; try headers_ref.put(.Location, url_string_slice.slice(), globalThis); return response; } @@ -520,10 +572,10 @@ pub fn constructError( const response = bun.new( Response, Response{ - .init = Response.Init{ + .#init = Response.Init{ .status_code = 0, }, - .body = Body{ + .#body = Body{ .value = .{ .Empty = {} }, }, }, @@ -542,13 +594,13 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b return globalThis.throwInvalidArguments("new Response(s3File) do not support ResponseInit options", .{}); } var response: Response = .{ - .init = Response.Init{ + .#init = Response.Init{ .status_code = 302, }, - .body = Body{ + .#body = Body{ .value = .{ .Empty = {} }, }, - .url = bun.String.empty, + .#url = bun.String.empty, }; const credentials = blob.store.?.data.s3.getCredentials(); @@ -560,15 +612,15 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b return s3.throwSignError(sign_err, globalThis); }; defer result.deinit(); - response.init.headers = try response.getOrCreateHeaders(globalThis); - response.redirected = true; - var headers_ref = response.init.headers.?; + response.#init.headers = try response.getOrCreateHeaders(globalThis); + response.#redirected = true; + var headers_ref = response.#init.headers.?; try headers_ref.put(.Location, result.url, globalThis); return bun.new(Response, response); } } } - var init: Init = (brk: { + var _init: Init = (brk: { if (arguments[1].isUndefinedOrNull()) { break :brk Init{ .status_code = 200, @@ -583,7 +635,7 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b } return error.JSError; }); - errdefer init.deinit(bun.default_allocator); + errdefer _init.deinit(bun.default_allocator); if (globalThis.hasException()) { return error.JSError; @@ -604,16 +656,16 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b } var response = bun.new(Response, Response{ - .body = body, - .init = init, + .#body = body, + .#init = _init, }); - if (response.body.value == .Blob and - response.init.headers != null and - response.body.value.Blob.content_type.len > 0 and - !response.init.headers.?.fastHas(.ContentType)) + if (response.#body.value == .Blob and + response.#init.headers != null and + response.#body.value.Blob.content_type.len > 0 and + !response.#init.headers.?.fastHas(.ContentType)) { - try response.init.headers.?.put(.ContentType, response.body.value.Blob.content_type, globalThis); + try response.#init.headers.?.put(.ContentType, response.#body.value.Blob.content_type, globalThis); } response.calculateEstimatedByteSize(); @@ -666,7 +718,7 @@ pub const Init = struct { } if (response_init.asDirect(Response)) |resp| { - return try resp.init.clone(globalThis); + return try resp.#init.clone(globalThis); } } @@ -740,7 +792,7 @@ pub fn @"200"(globalThis: *jsc.JSGlobalObject) Response { inline fn emptyWithStatus(_: *jsc.JSGlobalObject, status: u16) Response { return bun.new(Response, .{ - .body = Body{ + .#body = Body{ .value = Body.Value{ .Null = {} }, }, .init = Init{ diff --git a/src/bun.js/webcore/fetch.zig b/src/bun.js/webcore/fetch.zig index 0b041a974cf56f..1c34b3d1fda857 100644 --- a/src/bun.js/webcore/fetch.zig +++ b/src/bun.js/webcore/fetch.zig @@ -399,14 +399,14 @@ pub const FetchTasklet = struct { } // if we are buffering resolve the promise if (this.getCurrentResponse()) |response| { - response.body.value.toErrorInstance(err, globalThis); need_deinit = false; // body value now owns the error - const body = response.body; - if (body.value == .Locked) { - if (body.value.Locked.promise) |promise_| { - const promise = promise_.asAnyPromise().?; - promise.reject(globalThis, response.body.value.Error.toJS(globalThis)); - } + const body = response.getBodyValue(); + if (body.* == .Locked and body.Locked.promise != null) { + const promise = body.Locked.promise.?.asAnyPromise().?; + body.toErrorInstance(err, globalThis); + promise.reject(globalThis, body.Error.toJS(globalThis)); + } else { + body.toErrorInstance(err, globalThis); } } return; @@ -445,9 +445,9 @@ pub const FetchTasklet = struct { } if (this.getCurrentResponse()) |response| { - var body = &response.body; - if (body.value == .Locked) { - if (body.value.Locked.readable.get(globalThis)) |readable| { + var body = response.getBodyValue(); + if (body.* == .Locked) { + if (body.Locked.readable.get(globalThis)) |readable| { if (readable.ptr == .Bytes) { readable.ptr.Bytes.size_hint = this.getSizeHint(); @@ -463,8 +463,8 @@ pub const FetchTasklet = struct { bun.default_allocator, ); } else { - var prev = body.value.Locked.readable; - body.value.Locked.readable = .{}; + var prev = body.Locked.readable; + body.Locked.readable = .{}; readable.value.ensureStillAlive(); prev.deinit(); readable.value.ensureStillAlive(); @@ -479,7 +479,7 @@ pub const FetchTasklet = struct { return; } } else { - response.body.value.Locked.size_hint = this.getSizeHint(); + body.Locked.size_hint = this.getSizeHint(); } // we will reach here when not streaming, this is also the only case we dont wanna to reset the buffer buffer_reset = false; @@ -488,13 +488,13 @@ pub const FetchTasklet = struct { this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice()); // done resolve body - var old = body.value; + var old = body.*; const body_value = Body.Value{ .InternalBlob = .{ .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), }, }; - response.body.value = body_value; + body.* = body_value; this.scheduled_response_buffer = .{ .allocator = this.memory_reporter.allocator(), @@ -505,7 +505,7 @@ pub const FetchTasklet = struct { }; if (old == .Locked) { - old.resolve(&response.body.value, this.global_this, response.getFetchHeaders()); + old.resolve(body, this.global_this, response.getFetchHeaders()); } } } @@ -962,18 +962,18 @@ pub const FetchTasklet = struct { const metadata = this.metadata.?; const http_response = metadata.response; this.is_waiting_body = this.result.has_more; - return Response{ - .url = bun.String.createAtomIfPossible(metadata.url), - .redirected = this.result.redirected, - .init = .{ + return Response.init( + .{ .headers = FetchHeaders.createFromPicoHeaders(http_response.headers), .status_code = @as(u16, @truncate(http_response.status_code)), .status_text = bun.String.createAtomIfPossible(http_response.status), }, - .body = .{ + Body{ .value = this.toBodyValue(), }, - }; + bun.String.createAtomIfPossible(metadata.url), + this.result.redirected, + ); } fn ignoreRemainingResponseBody(this: *FetchTasklet) void { @@ -1001,7 +1001,7 @@ pub const FetchTasklet = struct { export fn Bun__FetchResponse_finalize(this: *FetchTasklet) callconv(.C) void { log("onResponseFinalize", .{}); if (this.native_response) |response| { - const body = response.body; + const body = response.getBodyValue(); // Three scenarios: // // 1. We are streaming, in which case we should not ignore the body. @@ -1011,12 +1011,12 @@ pub const FetchTasklet = struct { // 3. We never started buffering, in which case we should ignore the body. // // Note: We cannot call .get() on the ReadableStreamRef. This is called inside a finalizer. - if (body.value != .Locked or this.readable_stream_ref.held.has()) { + if (body.* != .Locked or this.readable_stream_ref.held.has()) { // Scenario 1 or 3. return; } - if (body.value.Locked.promise) |promise| { + if (body.Locked.promise) |promise| { if (promise.isEmptyOrUndefinedOrNull()) { // Scenario 2b. this.ignoreRemainingResponseBody(); @@ -1428,21 +1428,17 @@ fn dataURLResponse( blob.content_type_allocated = true; } - var response = bun.new( - Response, - Response{ - .body = Body{ - .value = .{ - .Blob = blob, - }, - }, - .init = Response.Init{ - .status_code = 200, - .status_text = bun.String.createAtomASCII("OK"), - }, - .url = data_url.url.dupeRef(), + var response = bun.new(Response, Response.init( + .{ + .status_code = 200, + .status_text = bun.String.createAtomASCII("OK"), }, - ); + Body{ + .value = .{ .Blob = blob }, + }, + data_url.url.dupeRef(), + false, + )); return JSPromise.resolvedPromiseValue(globalThis, response.toJS(globalThis)); } @@ -2355,15 +2351,16 @@ pub fn Bun__fetch_( ); }; - const response = bun.new(Response, Response{ - .body = Body{ - .value = .{ .Blob = blob_to_use }, - }, - .init = Response.Init{ + const response = bun.new(Response, Response.init( + Response.Init{ .status_code = 200, }, - .url = url_string.clone(), - }); + Body{ + .value = .{ .Blob = blob_to_use }, + }, + url_string.clone(), + false, + )); return JSPromise.resolvedPromiseValue(globalThis, response.toJS(globalThis)); } @@ -2538,19 +2535,29 @@ pub fn Bun__fetch_( const global = self.global; switch (result) { .success => { - const response = bun.new(Response, Response{ - .body = .{ .value = .Empty }, - .redirected = false, - .init = .{ .method = .PUT, .status_code = 200 }, - .url = bun.String.createAtomIfPossible(self.url.href), - }); + const response = bun.new(Response, Response.init( + Response.Init{ + .method = .PUT, + .status_code = 200, + }, + Body{ + .value = .Empty, + }, + bun.String.createAtomIfPossible(self.url.href), + false, + )); const response_js = Response.makeMaybePooled(@as(*jsc.JSGlobalObject, global), response); response_js.ensureStillAlive(); self.promise.resolve(global, response_js); }, .failure => |err| { - const response = bun.new(Response, Response{ - .body = .{ + const response = bun.new(Response, Response.init( + .{ + .method = .PUT, + .status_code = 500, + .status_text = bun.String.createAtomIfPossible(err.code), + }, + .{ .value = .{ .InternalBlob = .{ .bytes = std.ArrayList(u8).fromOwnedSlice(bun.default_allocator, bun.handleOom(bun.default_allocator.dupe(u8, err.message))), @@ -2558,14 +2565,10 @@ pub fn Bun__fetch_( }, }, }, - .redirected = false, - .init = .{ - .method = .PUT, - .status_code = 500, - .status_text = bun.String.createAtomIfPossible(err.code), - }, - .url = bun.String.createAtomIfPossible(self.url.href), - }); + bun.String.createAtomIfPossible(self.url.href), + false, + )); + const response_js = Response.makeMaybePooled(@as(*jsc.JSGlobalObject, global), response); response_js.ensureStillAlive(); self.promise.resolve(global, response_js); From 368ba17bb9d40afdfc9019d27aa85b4c420ab322 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 7 Oct 2025 00:06:40 +0000 Subject: [PATCH 2/6] [autofix.ci] apply automated fixes --- src/bun.js/webcore/BakeResponse.zig | 7 +++---- test/internal/ban-limits.json | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/bun.js/webcore/BakeResponse.zig b/src/bun.js/webcore/BakeResponse.zig index 9e1485dfb8faf0..500a0662ef2a0e 100644 --- a/src/bun.js/webcore/BakeResponse.zig +++ b/src/bun.js/webcore/BakeResponse.zig @@ -135,12 +135,11 @@ fn assertStreamingDisabled(globalThis: *jsc.JSGlobalObject, async_local_storage: if (streaming_val.asBoolean()) return globalThis.throwInvalidArguments("\"{s}\" is not available when `export const streaming = true`", .{display_function}); } -const bun = @import("bun"); const std = @import("std"); +const bun = @import("bun"); +const Response = bun.webcore.Response; + const jsc = bun.jsc; const JSGlobalObject = jsc.JSGlobalObject; const JSValue = jsc.JSValue; - -const Body = bun.webcore.Body; -const Response = bun.webcore.Response; diff --git a/test/internal/ban-limits.json b/test/internal/ban-limits.json index 9102540fcb93d3..f89171d162f6ba 100644 --- a/test/internal/ban-limits.json +++ b/test/internal/ban-limits.json @@ -9,7 +9,7 @@ ".jsBoolean(true)": 0, ".stdDir()": 41, ".stdFile()": 17, - "// autofix": 167, + "// autofix": 166, ": [^=]+= undefined,$": 256, "== alloc.ptr": 0, "== allocator.ptr": 0, From dde97af2e09367ef075be5c39efb6dbb6b96908b Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Mon, 6 Oct 2025 19:08:44 -0700 Subject: [PATCH 3/6] check if promise is already handled before rejecting --- src/bun.js/webcore/fetch.zig | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/bun.js/webcore/fetch.zig b/src/bun.js/webcore/fetch.zig index 1c34b3d1fda857..65677c00ea80bd 100644 --- a/src/bun.js/webcore/fetch.zig +++ b/src/bun.js/webcore/fetch.zig @@ -404,6 +404,9 @@ pub const FetchTasklet = struct { if (body.* == .Locked and body.Locked.promise != null) { const promise = body.Locked.promise.?.asAnyPromise().?; body.toErrorInstance(err, globalThis); + if (promise.isHandled(globalThis.vm())) { + return; + } promise.reject(globalThis, body.Error.toJS(globalThis)); } else { body.toErrorInstance(err, globalThis); From d98eb1f3ae050822e97dd81765b56df29ce9ee55 Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Tue, 7 Oct 2025 02:10:55 -0700 Subject: [PATCH 4/6] fix(Response) change owner ship of the stream to allow cyclic references (#23319) ### What does this PR do? This is the second step to fix cyclic reference issues with the stream please review https://github.com/oven-sh/bun/pull/23313 first ### How did you verify your code works? Test + CI --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- src/bun.js/api/html_rewriter.zig | 3 +- src/bun.js/api/server.zig | 30 +-- src/bun.js/api/server/RequestContext.zig | 62 ++--- src/bun.js/bindings/JSBakeResponse.cpp | 6 +- src/bun.js/bindings/JSGlobalObject.zig | 6 + src/bun.js/node/types.zig | 5 +- src/bun.js/webcore/BakeResponse.zig | 8 +- src/bun.js/webcore/Blob.zig | 20 +- src/bun.js/webcore/Body.zig | 218 +++++++++++++---- src/bun.js/webcore/Request.zig | 219 ++++++++++++------ src/bun.js/webcore/Response.zig | 94 +++++++- src/bun.js/webcore/fetch.zig | 133 +++++------ src/bun.js/webcore/response.classes.ts | 4 + .../fetch/request-cyclic-reference.test.ts | 20 ++ .../fetch/response-cyclic-reference.test.ts | 20 ++ 15 files changed, 596 insertions(+), 252 deletions(-) create mode 100644 test/js/web/fetch/request-cyclic-reference.test.ts create mode 100644 test/js/web/fetch/response-cyclic-reference.test.ts diff --git a/src/bun.js/api/html_rewriter.zig b/src/bun.js/api/html_rewriter.zig index a18e2c3f9887c1..9291a9c78c88d3 100644 --- a/src/bun.js/api/html_rewriter.zig +++ b/src/bun.js/api/html_rewriter.zig @@ -490,11 +490,12 @@ pub const HTMLRewriter = struct { result.setUrl(original.getUrl().clone()); const value = original.getBodyValue(); + const owned_readable_stream = original.getBodyReadableStream(sink.global); sink.ref(); sink.bodyValueBufferer = jsc.WebCore.Body.ValueBufferer.init(sink, @ptrCast(&onFinishedBuffering), sink.global, bun.default_allocator); response_js_value.ensureStillAlive(); - sink.bodyValueBufferer.?.run(value) catch |buffering_error| { + sink.bodyValueBufferer.?.run(value, owned_readable_stream) catch |buffering_error| { defer sink.deref(); return switch (buffering_error) { error.StreamAlreadyUsed => { diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index f4a452f15526b2..19ce635155d6a6 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1186,7 +1186,7 @@ pub fn NewServer(protocol_enum: enum { http, https }, development_kind: enum { d } } - existing_request = Request.init( + existing_request = Request.init2( bun.String.cloneUTF8(url.href), headers, bun.handleOom(this.vm.initRequestBodyValue(body)), @@ -2213,13 +2213,13 @@ pub fn NewServer(protocol_enum: enum { http, https }, development_kind: enum { d ctx.signal = signal; signal.pendingActivityRef(); - const request_object = Request.new(.{ - .method = ctx.method, - .request_context = AnyRequestContext.init(ctx), - .https = ssl_enabled, - .signal = signal.ref(), - .body = body.ref(), - }); + const request_object = Request.new(Request.init( + ctx.method, + AnyRequestContext.init(ctx), + ssl_enabled, + signal.ref(), + body.ref(), + )); ctx.request_weakref = .initRef(request_object); if (comptime debug_mode) { @@ -2314,13 +2314,13 @@ pub fn NewServer(protocol_enum: enum { http, https }, development_kind: enum { d var signal = jsc.WebCore.AbortSignal.new(this.globalThis); ctx.signal = signal; - var request_object = Request.new(.{ - .method = ctx.method, - .request_context = AnyRequestContext.init(ctx), - .https = ssl_enabled, - .signal = signal.ref(), - .body = body.ref(), - }); + var request_object = Request.new(Request.init( + ctx.method, + AnyRequestContext.init(ctx), + ssl_enabled, + signal.ref(), + body.ref(), + )); ctx.upgrade_context = upgrade_ctx; ctx.request_weakref = .initRef(request_object); // We keep the Request object alive for the duration of the request so that we can remove the pointer to the UWS request object. diff --git a/src/bun.js/api/server/RequestContext.zig b/src/bun.js/api/server/RequestContext.zig index d6d5604a70cd61..4ada2485a3bd47 100644 --- a/src/bun.js/api/server/RequestContext.zig +++ b/src/bun.js/api/server/RequestContext.zig @@ -677,15 +677,11 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } if (this.response_ptr) |response| { - const bodyValue = response.getBodyValue(); - if (bodyValue.* == .Locked) { - var strong_readable = bodyValue.Locked.readable; - bodyValue.Locked.readable = .{}; - defer strong_readable.deinit(); - if (strong_readable.get(globalThis)) |readable| { - readable.abort(globalThis); - any_js_calls = true; - } + if (response.getBodyReadableStream(globalThis)) |stream| { + stream.value.ensureStillAlive(); + response.detachReadableStream(globalThis); + stream.abort(globalThis); + any_js_calls = true; } } } @@ -1091,7 +1087,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } pub fn doRenderWithBodyLocked(this: *anyopaque, value: *jsc.WebCore.Body.Value) void { - doRenderWithBody(bun.cast(*RequestContext, this), value); + doRenderWithBody(bun.cast(*RequestContext, this), value, null); } fn renderWithBlobFromBodyValue(this: *RequestContext) void { @@ -1664,13 +1660,13 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (req.response_ptr) |resp| { assert(req.server != null); + const globalThis = req.server.?.globalThis; const bodyValue = resp.getBodyValue(); - if (bodyValue.* == .Locked) { - const global = bodyValue.Locked.global; - if (bodyValue.Locked.readable.get(global)) |stream| { - stream.done(global); - } - bodyValue.Locked.readable.deinit(); + if (resp.getBodyReadableStream(globalThis)) |stream| { + stream.value.ensureStillAlive(); + resp.detachReadableStream(globalThis); + + stream.done(globalThis); bodyValue.* = .{ .Used = {} }; } } @@ -1720,11 +1716,10 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (req.response_ptr) |resp| { const bodyValue = resp.getBodyValue(); - if (bodyValue.* == .Locked) { - if (bodyValue.Locked.readable.get(globalThis)) |stream| { - stream.done(globalThis); - } - bodyValue.Locked.readable.deinit(); + if (resp.getBodyReadableStream(globalThis)) |stream| { + stream.value.ensureStillAlive(); + resp.detachReadableStream(globalThis); + stream.done(globalThis); bodyValue.* = .{ .Used = {} }; } } @@ -1802,7 +1797,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, req.endStream(req.shouldCloseConnection()); } - pub fn doRenderWithBody(this: *RequestContext, value: *jsc.WebCore.Body.Value) void { + pub fn doRenderWithBody(this: *RequestContext, value: *jsc.WebCore.Body.Value, owned_readable: ?jsc.WebCore.ReadableStream) void { this.drainMicrotasks(); // If a ReadableStream can trivially be converted to a Blob, do so. @@ -1832,11 +1827,20 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (this.isAbortedOrEnded()) { return; } - - if (lock.readable.get(globalThis)) |stream_| { - const stream: jsc.WebCore.ReadableStream = stream_; - // we hold the stream alive until we're done with it - this.readable_stream_ref = lock.readable; + const readable_stream = brk: { + if (lock.readable.get(globalThis)) |stream| { + // we hold the stream alive until we're done with it + this.readable_stream_ref = lock.readable; + break :brk stream; + } + if (owned_readable) |stream| { + // response owns the stream, so we hold a strong reference to it + this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis); + break :brk stream; + } + break :brk null; + }; + if (readable_stream) |stream| { value.* = .{ .Used = {} }; if (stream.isLocked(globalThis)) { @@ -1915,7 +1919,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, // someone else is waiting for the stream or waiting for `onStartStreaming` const readable = value.toReadableStream(globalThis) catch return; // TODO: properly propagate exception upwards readable.ensureStillAlive(); - this.doRenderWithBody(value); + this.doRenderWithBody(value, null); return; } @@ -1996,7 +2000,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, return; } var response = this.response_ptr.?; - this.doRenderWithBody(response.getBodyValue()); + this.doRenderWithBody(response.getBodyValue(), response.getBodyReadableStream(this.server.?.globalThis)); } pub fn renderProductionError(this: *RequestContext, status: u16) void { diff --git a/src/bun.js/bindings/JSBakeResponse.cpp b/src/bun.js/bindings/JSBakeResponse.cpp index 8b1e04b5046d64..7c3480cfec2e9a 100644 --- a/src/bun.js/bindings/JSBakeResponse.cpp +++ b/src/bun.js/bindings/JSBakeResponse.cpp @@ -33,7 +33,7 @@ static JSC_DECLARE_CUSTOM_GETTER(jsBakeResponsePrototypeGetDebugInfo); static JSC_DECLARE_CUSTOM_GETTER(jsBakeResponsePrototypeGetDebugStack); static JSC_DECLARE_CUSTOM_GETTER(jsBakeResponsePrototypeGetDebugTask); -extern JSC_CALLCONV void* JSC_HOST_CALL_ATTRIBUTES BakeResponseClass__constructForSSR(JSC::JSGlobalObject*, JSC::CallFrame*, int*); +extern JSC_CALLCONV void* JSC_HOST_CALL_ATTRIBUTES BakeResponseClass__constructForSSR(JSC::JSGlobalObject*, JSC::CallFrame*, int*, JSC::EncodedJSValue); extern "C" SYSV_ABI JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES ResponseClass__constructError(JSC::JSGlobalObject*, JSC::CallFrame*); extern "C" SYSV_ABI JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES ResponseClass__constructJSON(JSC::JSGlobalObject*, JSC::CallFrame*); extern "C" SYSV_ABI JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES BakeResponseClass__constructRender(JSC::JSGlobalObject*, JSC::CallFrame*); @@ -213,7 +213,7 @@ class JSBakeResponseConstructor final : public JSC::InternalFunction { JSBakeResponse* instance = JSBakeResponse::create(vm, globalObject, structure, nullptr); int arg_was_jsx = 0; - void* ptr = BakeResponseClass__constructForSSR(globalObject, callFrame, &arg_was_jsx); + void* ptr = BakeResponseClass__constructForSSR(globalObject, callFrame, &arg_was_jsx, JSValue::encode(instance)); if (scope.exception()) [[unlikely]] { ASSERT_WITH_MESSAGE(!ptr, "Memory leak detected: new Response() allocated memory without checking for exceptions."); return JSValue::encode(JSC::jsUndefined()); @@ -243,7 +243,7 @@ class JSBakeResponseConstructor final : public JSC::InternalFunction { Structure* structure = globalObject->bakeAdditions().JSBakeResponseStructure(globalObject); JSBakeResponse* instance = JSBakeResponse::create(vm, globalObject, structure, nullptr); - void* ptr = BakeResponseClass__constructForSSR(globalObject, callFrame, nullptr); + void* ptr = BakeResponseClass__constructForSSR(globalObject, callFrame, nullptr, JSValue::encode(instance)); if (scope.exception()) [[unlikely]] { ASSERT_WITH_MESSAGE(!ptr, "Memory leak detected: new Response() allocated memory without checking for exceptions."); return JSValue::encode(JSC::jsUndefined()); diff --git a/src/bun.js/bindings/JSGlobalObject.zig b/src/bun.js/bindings/JSGlobalObject.zig index 67dda877766414..d1168de32d30f9 100644 --- a/src/bun.js/bindings/JSGlobalObject.zig +++ b/src/bun.js/bindings/JSGlobalObject.zig @@ -901,6 +901,12 @@ pub const JSGlobalObject = opaque { // We're done validating. From now on, deal with extracting the body. body.toBlobIfPossible(); + if (body.* == .Locked) { + if (response.getBodyReadableStream(this)) |stream| { + return stream.value; + } + } + var any_blob = switch (body.*) { .Locked => body.tryUseAsAnyBlob() orelse return body.toReadableStream(this), else => body.useAsAnyBlob(), diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig index a06292877a1bb5..f1020fa0264b04 100644 --- a/src/bun.js/node/types.zig +++ b/src/bun.js/node/types.zig @@ -86,9 +86,10 @@ pub const BlobOrStringOrBuffer = union(enum) { } if (allow_request_response) { if (value.as(jsc.WebCore.Request)) |request| { - request.body.value.toBlobIfPossible(); + const bodyValue = request.getBodyValue(); + bodyValue.toBlobIfPossible(); - if (request.body.value.tryUseAsAnyBlob()) |any_blob_| { + if (bodyValue.tryUseAsAnyBlob()) |any_blob_| { var any_blob = any_blob_; defer any_blob.detach(); return .{ .blob = any_blob.toBlob(global) }; diff --git a/src/bun.js/webcore/BakeResponse.zig b/src/bun.js/webcore/BakeResponse.zig index 500a0662ef2a0e..b73b231d602c11 100644 --- a/src/bun.js/webcore/BakeResponse.zig +++ b/src/bun.js/webcore/BakeResponse.zig @@ -18,8 +18,8 @@ pub fn toJSForSSR(this: *Response, globalObject: *JSGlobalObject, kind: SSRKind) return BakeResponse__createForSSR(globalObject, this, @intFromEnum(kind)); } -pub export fn BakeResponseClass__constructForSSR(globalObject: *jsc.JSGlobalObject, callFrame: *jsc.CallFrame, bake_ssr_has_jsx: *c_int) callconv(jsc.conv) ?*anyopaque { - return @as(*Response, constructor(globalObject, callFrame, bake_ssr_has_jsx) catch |err| switch (err) { +pub export fn BakeResponseClass__constructForSSR(globalObject: *jsc.JSGlobalObject, callFrame: *jsc.CallFrame, bake_ssr_has_jsx: *c_int, js_this: jsc.JSValue) callconv(jsc.conv) ?*anyopaque { + return @as(*Response, constructor(globalObject, callFrame, bake_ssr_has_jsx, js_this) catch |err| switch (err) { error.JSError => return null, error.OutOfMemory => { globalObject.throwOutOfMemory() catch {}; @@ -28,7 +28,7 @@ pub export fn BakeResponseClass__constructForSSR(globalObject: *jsc.JSGlobalObje }); } -pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, bake_ssr_has_jsx: *c_int) bun.JSError!*Response { +pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, bake_ssr_has_jsx: *c_int, js_this: jsc.JSValue) bun.JSError!*Response { var arguments = callframe.argumentsAsArray(2); // Allow `return new Response( ... , { ... }` @@ -44,7 +44,7 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, b } } - return Response.constructor(globalThis, callframe); + return Response.constructor(globalThis, callframe, js_this); } pub export fn BakeResponseClass__constructRedirect(globalObject: *jsc.JSGlobalObject, callFrame: *jsc.CallFrame) callconv(jsc.conv) jsc.JSValue { diff --git a/src/bun.js/webcore/Blob.zig b/src/bun.js/webcore/Blob.zig index d8a829402b33fc..5c8ef310ae64a9 100644 --- a/src/bun.js/webcore/Blob.zig +++ b/src/bun.js/webcore/Blob.zig @@ -1356,13 +1356,14 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr _ = bodyValue.use(); return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err_ref.toJS(globalThis)); }, - .Locked => |*locked| { + .Locked => { if (destination_blob.isS3()) { const s3 = &destination_blob.store.?.data.s3; var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis); defer aws_options.deinit(); _ = try bodyValue.toReadableStream(globalThis); - if (locked.readable.get(globalThis)) |readable| { + + if (response.getBodyReadableStream(globalThis) orelse bodyValue.Locked.readable.get(globalThis)) |readable| { if (readable.isDisturbed(globalThis)) { destination_blob.detach(); return globalThis.throwInvalidArguments("ReadableStream has already been used", .{}); @@ -1401,7 +1402,8 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr } if (data.as(Request)) |request| { - switch (request.body.value) { + const bodyValue = request.getBodyValue(); + switch (bodyValue.*) { .WTFStringImpl, .InternalBlob, .Used, @@ -1409,11 +1411,11 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr .Blob, .Null, => { - break :brk request.body.value.use(); + break :brk bodyValue.use(); }, .Error => |*err_ref| { destination_blob.detach(); - _ = request.body.value.use(); + _ = bodyValue.use(); return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err_ref.toJS(globalThis)); }, .Locked => |locked| { @@ -1421,8 +1423,8 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr const s3 = &destination_blob.store.?.data.s3; var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis); defer aws_options.deinit(); - _ = try request.body.value.toReadableStream(globalThis); - if (locked.readable.get(globalThis)) |readable| { + _ = try bodyValue.toReadableStream(globalThis); + if (request.getBodyReadableStream(globalThis) orelse locked.readable.get(globalThis)) |readable| { if (readable.isDisturbed(globalThis)) { destination_blob.detach(); return globalThis.throwInvalidArguments("ReadableStream has already been used", .{}); @@ -1453,8 +1455,8 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr .mkdirp_if_not_exists = options.mkdirp_if_not_exists orelse true, }); - request.body.value.Locked.task = task; - request.body.value.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap; + bodyValue.Locked.task = task; + bodyValue.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap; return task.promise.value(); }, diff --git a/src/bun.js/webcore/Body.zig b/src/bun.js/webcore/Body.zig index 04dbf20f0f7c10..89da0374cb6851 100644 --- a/src/bun.js/webcore/Body.zig +++ b/src/bun.js/webcore/Body.zig @@ -22,6 +22,12 @@ pub fn clone(this: *Body, globalThis: *JSGlobalObject) bun.JSError!Body { }; } +pub fn cloneWithReadableStream(this: *Body, globalThis: *JSGlobalObject, readable: ?*jsc.WebCore.ReadableStream) bun.JSError!Body { + return Body{ + .value = try this.value.cloneWithReadableStream(globalThis, readable), + }; +} + pub fn writeFormat(this: *Body, comptime Formatter: type, formatter: *Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void { const Writer = @TypeOf(writer); @@ -157,9 +163,9 @@ pub const PendingValue = struct { return null; } - pub fn setPromise(value: *PendingValue, globalThis: *jsc.JSGlobalObject, action: Action) JSValue { + pub fn setPromise(value: *PendingValue, globalThis: *jsc.JSGlobalObject, action: Action, owned_readable: ?jsc.WebCore.ReadableStream) JSValue { value.action = action; - if (value.readable.get(globalThis)) |readable| { + if (owned_readable orelse value.readable.get(globalThis)) |readable| { switch (action) { .getFormData, .getText, .getJSON, .getBlob, .getArrayBuffer, .getBytes => { const promise = switch (action) { @@ -964,9 +970,25 @@ pub const Value = union(Tag) { } } - pub fn tee(this: *Value, globalThis: *jsc.JSGlobalObject) bun.JSError!Value { + pub fn tee(this: *Value, globalThis: *jsc.JSGlobalObject, owned_readable: ?*jsc.WebCore.ReadableStream) bun.JSError!Value { var locked = &this.Locked; + if (owned_readable) |readable| { + if (readable.isDisturbed(globalThis)) { + return Value{ .Used = {} }; + } + if (try readable.tee(globalThis)) |new_readable| { + // we current readable to be a strong reference when cloning and we will return the second one in the result + // this will be checked and downgraded to a write barrier if needed + this.Locked.readable = jsc.WebCore.ReadableStream.Strong.init(new_readable[0], globalThis); + return Value{ + .Locked = .{ + .readable = jsc.WebCore.ReadableStream.Strong.init(new_readable[1], globalThis), + .global = globalThis, + }, + }; + } + } if (locked.readable.isDisturbed(globalThis)) { return Value{ .Used = {} }; } @@ -979,6 +1001,7 @@ pub const Value = union(Tag) { }, }; } + if (locked.promise != null or locked.action != .none or locked.readable.has()) { return Value{ .Used = {} }; } @@ -1032,10 +1055,14 @@ pub const Value = union(Tag) { } pub fn clone(this: *Value, globalThis: *jsc.JSGlobalObject) bun.JSError!Value { + return this.cloneWithReadableStream(globalThis, null); + } + + pub fn cloneWithReadableStream(this: *Value, globalThis: *jsc.JSGlobalObject, readable: ?*jsc.WebCore.ReadableStream) bun.JSError!Value { this.toBlobIfPossible(); if (this.* == .Locked) { - return this.tee(globalThis); + return this.tee(globalThis, readable); } if (this.* == .InternalBlob) { @@ -1049,10 +1076,6 @@ pub const Value = union(Tag) { }; } - // if (this.* == .InlineBlob) { - // return this.*; - // } - if (this.* == .Blob) { return Value{ .Blob = this.Blob.dupe() }; } @@ -1086,6 +1109,8 @@ pub fn extract( pub fn Mixin(comptime Type: type) type { return struct { + const log = Output.scoped(.BodyMixin, .visible); + pub fn getText(this: *Type, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { var value: *Body.Value = this.getBodyValue(); if (value.* == .Used) { @@ -1093,11 +1118,21 @@ pub fn Mixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { - return handleBodyAlreadyUsed(globalObject); + if (@hasDecl(Type, "getBodyReadableStream")) { + if (this.getBodyReadableStream(globalObject)) |readable| { + if (readable.isDisturbed(globalObject)) { + return handleBodyAlreadyUsed(globalObject); + } + return value.Locked.setPromise(globalObject, .{ .getText = {} }, readable); + } } + if (value.* == .Locked) { + if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + return handleBodyAlreadyUsed(globalObject); + } - return value.Locked.setPromise(globalObject, .{ .getText = {} }); + return value.Locked.setPromise(globalObject, .{ .getText = {} }, null); + } } var blob = value.useAsAnyBlobAllowNonUTF8String(); @@ -1110,7 +1145,13 @@ pub fn Mixin(comptime Type: type) type { if (body.* == .Used) { return jsc.WebCore.ReadableStream.used(globalThis); } - + if (@hasDecl(Type, "getBodyReadableStream")) { + if (body.* == .Locked) { + if (this.getBodyReadableStream(globalThis)) |readable| { + return readable.value; + } + } + } return body.toReadableStream(globalThis); } @@ -1122,6 +1163,11 @@ pub fn Mixin(comptime Type: type) type { if (pending.action != .none) { break :brk true; } + if (@hasDecl(Type, "getBodyReadableStream")) { + if (this.getBodyReadableStream(globalObject)) |readable| { + break :brk readable.isDisturbed(globalObject); + } + } if (pending.readable.get(globalObject)) |*stream| { break :brk stream.isDisturbed(globalObject); @@ -1149,13 +1195,27 @@ pub fn Mixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { - return handleBodyAlreadyUsed(globalObject); - } + if (@hasDecl(Type, "getBodyReadableStream")) { + if (this.getBodyReadableStream(globalObject)) |readable| { + if (readable.isDisturbed(globalObject)) { + return handleBodyAlreadyUsed(globalObject); + } - value.toBlobIfPossible(); + value.toBlobIfPossible(); + if (value.* == .Locked) { + return value.Locked.setPromise(globalObject, .{ .getJSON = {} }, readable); + } + } + } if (value.* == .Locked) { - return value.Locked.setPromise(globalObject, .{ .getJSON = {} }); + if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + return handleBodyAlreadyUsed(globalObject); + } + + value.toBlobIfPossible(); + if (value.* == .Locked) { + return value.Locked.setPromise(globalObject, .{ .getJSON = {} }, null); + } } } @@ -1169,6 +1229,7 @@ pub fn Mixin(comptime Type: type) type { } pub fn getArrayBuffer(this: *Type, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { + log("getArrayBuffer", .{}); var value: *Body.Value = this.getBodyValue(); if (value.* == .Used) { @@ -1176,13 +1237,26 @@ pub fn Mixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { - return handleBodyAlreadyUsed(globalObject); + if (@hasDecl(Type, "getBodyReadableStream")) { + if (this.getBodyReadableStream(globalObject)) |readable| { + if (readable.isDisturbed(globalObject)) { + return handleBodyAlreadyUsed(globalObject); + } + value.toBlobIfPossible(); + if (value.* == .Locked) { + return value.Locked.setPromise(globalObject, .{ .getArrayBuffer = {} }, readable); + } + } } - value.toBlobIfPossible(); - if (value.* == .Locked) { - return value.Locked.setPromise(globalObject, .{ .getArrayBuffer = {} }); + if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + return handleBodyAlreadyUsed(globalObject); + } + value.toBlobIfPossible(); + + if (value.* == .Locked) { + return value.Locked.setPromise(globalObject, .{ .getArrayBuffer = {} }, null); + } } } @@ -1200,12 +1274,25 @@ pub fn Mixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { - return handleBodyAlreadyUsed(globalObject); + if (@hasDecl(Type, "getBodyReadableStream")) { + if (this.getBodyReadableStream(globalObject)) |readable| { + if (readable.isDisturbed(globalObject)) { + return handleBodyAlreadyUsed(globalObject); + } + value.toBlobIfPossible(); + if (value.* == .Locked) { + return value.Locked.setPromise(globalObject, .{ .getBytes = {} }, readable); + } + } } - value.toBlobIfPossible(); if (value.* == .Locked) { - return value.Locked.setPromise(globalObject, .{ .getBytes = {} }); + if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + return handleBodyAlreadyUsed(globalObject); + } + value.toBlobIfPossible(); + if (value.* == .Locked) { + return value.Locked.setPromise(globalObject, .{ .getBytes = {} }, null); + } } } @@ -1222,10 +1309,20 @@ pub fn Mixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { - return handleBodyAlreadyUsed(globalObject); + if (@hasDecl(Type, "getBodyReadableStream")) { + if (this.getBodyReadableStream(globalObject)) |readable| { + if (readable.isDisturbed(globalObject)) { + return handleBodyAlreadyUsed(globalObject); + } + value.toBlobIfPossible(); + } + } + if (value.* == .Locked) { + if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + return handleBodyAlreadyUsed(globalObject); + } + value.toBlobIfPossible(); } - value.toBlobIfPossible(); } var encoder = (try this.getFormDataEncoding()) orelse { @@ -1234,7 +1331,11 @@ pub fn Mixin(comptime Type: type) type { }; if (value.* == .Locked) { - return value.Locked.setPromise(globalObject, .{ .getFormData = encoder }); + if (@hasDecl(Type, "getBodyReadableStream")) { + return value.Locked.setPromise(globalObject, .{ .getFormData = encoder }, this.getBodyReadableStream(globalObject)); + } else { + return value.Locked.setPromise(globalObject, .{ .getFormData = encoder }, null); + } } var blob: AnyBlob = value.useAsAnyBlob(); @@ -1273,17 +1374,33 @@ pub fn Mixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.action != .none or - ((this_value != .zero and value.Locked.isDisturbed(Type, globalObject, this_value)) or - (this_value == .zero and value.Locked.readable.isDisturbed(globalObject)))) - { - return handleBodyAlreadyUsed(globalObject); + if (@hasDecl(Type, "getBodyReadableStream")) { + if (this.getBodyReadableStream(globalObject)) |readable| { + if (value.Locked.action != .none or + ((this_value != .zero and readable.isDisturbed(globalObject)) or + (this_value == .zero and readable.isDisturbed(globalObject)))) + { + return handleBodyAlreadyUsed(globalObject); + } + value.toBlobIfPossible(); + if (value.* == .Locked) { + return value.Locked.setPromise(globalObject, .{ .getBlob = {} }, readable); + } + } } + if (value.* == .Locked) { + if (value.Locked.action != .none or + ((this_value != .zero and value.Locked.isDisturbed(Type, globalObject, this_value)) or + (this_value == .zero and value.Locked.readable.isDisturbed(globalObject)))) + { + return handleBodyAlreadyUsed(globalObject); + } - value.toBlobIfPossible(); + value.toBlobIfPossible(); - if (value.* == .Locked) { - return value.Locked.setPromise(globalObject, .{ .getBlob = {} }); + if (value.* == .Locked) { + return value.Locked.setPromise(globalObject, .{ .getBlob = {} }, null); + } } } @@ -1372,7 +1489,7 @@ pub const ValueBufferer = struct { return this; } - pub fn run(sink: *@This(), value: *jsc.WebCore.Body.Value) !void { + pub fn run(sink: *@This(), value: *jsc.WebCore.Body.Value, owned_readable_stream: ?jsc.WebCore.ReadableStream) !void { value.toBlobIfPossible(); switch (value.*) { @@ -1410,7 +1527,7 @@ pub const ValueBufferer = struct { return; }, .Locked => { - try sink.bufferLockedBodyValue(value); + try sink.bufferLockedBodyValue(value, owned_readable_stream); }, } } @@ -1563,12 +1680,23 @@ pub const ValueBufferer = struct { return error.PipeFailed; } - fn bufferLockedBodyValue(sink: *@This(), value: *jsc.WebCore.Body.Value) !void { + fn bufferLockedBodyValue(sink: *@This(), value: *jsc.WebCore.Body.Value, owned_readable_stream: ?jsc.WebCore.ReadableStream) !void { assert(value.* == .Locked); const locked = &value.Locked; - if (locked.readable.get(sink.global)) |stream| { - // keep the stream alive until we're done with it - sink.readable_stream_ref = locked.readable; + const readable_stream = brk: { + if (locked.readable.get(sink.global)) |stream| { + // keep the stream alive until we're done with it + sink.readable_stream_ref = locked.readable; + break :brk stream; + } + if (owned_readable_stream) |stream| { + // response owns the stream, so we hold a strong reference to it + sink.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(stream, sink.global); + break :brk stream; + } + break :brk null; + }; + if (readable_stream) |stream| { value.* = .{ .Used = {} }; if (stream.isLocked(sink.global)) { @@ -1616,7 +1744,7 @@ pub const ValueBufferer = struct { const readable = try value.toReadableStream(sink.global); readable.ensureStillAlive(); readable.protect(); - return try sink.bufferLockedBodyValue(value); + return try sink.bufferLockedBodyValue(value, null); } // is safe to wait it buffer locked.task = @ptrCast(sink); diff --git a/src/bun.js/webcore/Request.zig b/src/bun.js/webcore/Request.zig index 98e3886fef3773..140a46642e9f3d 100644 --- a/src/bun.js/webcore/Request.zig +++ b/src/bun.js/webcore/Request.zig @@ -3,10 +3,11 @@ const Request = @This(); url: bun.String = bun.String.empty, -// NOTE(@cirospaciari): renamed to _headers to avoid direct manipulation, use getFetchHeaders, setFetchHeaders, ensureFetchHeaders and hasFetchHeaders instead -_headers: ?*FetchHeaders = null, + +#headers: ?*FetchHeaders = null, signal: ?*AbortSignal = null, -body: *Body.Value.HiveRef, +#body: *Body.Value.HiveRef, +#js_ref: jsc.JSRef = .empty(), method: Method = Method.GET, redirect: FetchRedirect = .follow, request_context: jsc.API.AnyRequestContext = jsc.API.AnyRequestContext.Null, @@ -36,7 +37,7 @@ pub const getBlobWithoutCallFrame = RequestMixin.getBlobWithoutCallFrame; pub const WeakRef = bun.ptr.WeakPtr(Request, "weak_ptr_data"); pub fn memoryCost(this: *const Request) usize { - return @sizeOf(Request) + this.request_context.memoryCost() + this.url.byteSlice().len + this.body.value.memoryCost(); + return @sizeOf(Request) + this.request_context.memoryCost() + this.url.byteSlice().len + this.#body.value.memoryCost(); } pub export fn Request__setCookiesOnRequestContext(this: *Request, cookieMap: ?*jsc.WebCore.CookieMap) void { @@ -110,6 +111,23 @@ pub const InternalJSEventCallback = struct { }; pub fn init( + method: Method, + request_context: jsc.API.AnyRequestContext, + https: bool, + signal: ?*AbortSignal, + body: *Body.Value.HiveRef, +) Request { + return Request{ + .request_context = request_context, + .method = method, + .https = https, + .signal = signal, + .#body = body, + }; +} + +/// TODO: do we need this? +pub fn init2( url: bun.String, headers: ?*FetchHeaders, body: *Body.Value.HiveRef, @@ -117,8 +135,8 @@ pub fn init( ) Request { return Request{ .url = url, - ._headers = headers, - .body = body, + .#headers = headers, + .#body = body, .method = method, }; } @@ -132,15 +150,15 @@ pub fn getContentType( } } - if (this._headers) |headers| { + if (this.#headers) |headers| { if (headers.fastGet(.ContentType)) |value| { return value.toSlice(bun.default_allocator); } } - if (this.body.value == .Blob) { - if (this.body.value.Blob.content_type.len > 0) - return ZigString.Slice.fromUTF8NeverFree(this.body.value.Blob.content_type); + if (this.#body.value == .Blob) { + if (this.#body.value.Blob.content_type.len > 0) + return ZigString.Slice.fromUTF8NeverFree(this.#body.value.Blob.content_type); } return null; @@ -166,15 +184,45 @@ pub fn getRemoteSocketInfo(this: *Request, globalObject: *jsc.JSGlobalObject) ?j } pub fn calculateEstimatedByteSize(this: *Request) void { - this.reported_estimated_size = this.body.value.estimatedSize() + this.sizeOfURL() + @sizeOf(Request); + this.reported_estimated_size = this.#body.value.estimatedSize() + this.sizeOfURL() + @sizeOf(Request); } pub export fn Bun__JSRequest__calculateEstimatedByteSize(this: *Request) void { this.calculateEstimatedByteSize(); } +pub inline fn getBodyReadableStream( + this: *Request, + globalObject: *JSGlobalObject, +) ?jsc.WebCore.ReadableStream { + if (this.#js_ref.tryGet()) |js_ref| { + if (js.gc.stream.get(js_ref)) |stream| { + // JS is always source of truth for the stream + return jsc.WebCore.ReadableStream.fromJS(stream, globalObject) catch |err| { + _ = globalObject.takeException(err); + return null; + }; + } + } + if (this.#body.value == .Locked) { + return this.#body.value.Locked.readable.get(globalObject); + } + return null; +} +pub inline fn detachReadableStream(this: *Request, globalObject: *jsc.JSGlobalObject) void { + if (this.#js_ref.tryGet()) |js_ref| { + js.gc.stream.clear(js_ref, globalObject); + } + if (this.#body.value == .Locked) { + var old = this.#body.value.Locked.readable; + old.deinit(); + this.#body.value.Locked.readable = .{}; + } +} + pub fn toJS(this: *Request, globalObject: *JSGlobalObject) JSValue { this.calculateEstimatedByteSize(); + this.checkBodyStreamRef(globalObject); return js.toJSUnchecked(globalObject, this); } @@ -199,7 +247,7 @@ pub fn writeFormat(this: *Request, this_value: JSValue, comptime Formatter: type .zero => "Request", else => "BunRequest", }; - try writer.print("{s} ({}) {{\n", .{ class_label, bun.fmt.size(this.body.value.size(), .{}) }); + try writer.print("{s} ({}) {{\n", .{ class_label, bun.fmt.size(this.#body.value.size(), .{}) }); { formatter.indent += 1; defer formatter.indent -|= 1; @@ -231,22 +279,22 @@ pub fn writeFormat(this: *Request, this_value: JSValue, comptime Formatter: type try writer.writeAll(comptime Output.prettyFmt("headers: ", enable_ansi_colors)); try formatter.printAs(.Private, Writer, writer, try this.getHeaders(formatter.globalThis), .DOMWrapper, enable_ansi_colors); - if (this.body.value == .Blob) { + if (this.#body.value == .Blob) { try writer.writeAll("\n"); try formatter.writeIndent(Writer, writer); - try this.body.value.Blob.writeFormat(Formatter, formatter, writer, enable_ansi_colors); - } else if (this.body.value == .InternalBlob or this.body.value == .WTFStringImpl) { + try this.#body.value.Blob.writeFormat(Formatter, formatter, writer, enable_ansi_colors); + } else if (this.#body.value == .InternalBlob or this.#body.value == .WTFStringImpl) { try writer.writeAll("\n"); try formatter.writeIndent(Writer, writer); - const size = this.body.value.size(); + const size = this.#body.value.size(); if (size == 0) { var empty = Blob.initEmpty(undefined); try empty.writeFormat(Formatter, formatter, writer, enable_ansi_colors); } else { try Blob.writeFormatForSize(false, size, writer, enable_ansi_colors); } - } else if (this.body.value == .Locked) { - if (this.body.value.Locked.readable.get(this.body.value.Locked.global)) |stream| { + } else if (this.#body.value == .Locked) { + if (this.getBodyReadableStream(formatter.globalThis)) |stream| { try writer.writeAll("\n"); try formatter.writeIndent(Writer, writer); try formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors); @@ -259,13 +307,13 @@ pub fn writeFormat(this: *Request, this_value: JSValue, comptime Formatter: type } pub fn mimeType(this: *const Request) string { - if (this._headers) |headers| { + if (this.#headers) |headers| { if (try headers.fastGet(.ContentType)) |content_type| { return content_type.slice(); } } - switch (this.body.value) { + switch (this.#body.value) { .Blob => |blob| { if (blob.content_type.len > 0) { return blob.content_type; @@ -273,9 +321,9 @@ pub fn mimeType(this: *const Request) string { return MimeType.other.value; }, - .InternalBlob => return this.body.value.InternalBlob.contentType(), + .InternalBlob => return this.#body.value.InternalBlob.contentType(), .WTFStringImpl => return MimeType.text.value, - // .InlineBlob => return this.body.value.InlineBlob.contentType(), + // .InlineBlob => return this.#body.value.InlineBlob.contentType(), .Null, .Error, .Used, .Locked, .Empty => return MimeType.other.value, } } @@ -336,9 +384,9 @@ pub fn getMode( } pub fn finalizeWithoutDeinit(this: *Request) void { - if (this._headers) |headers| { + if (this.#headers) |headers| { headers.deref(); - this._headers = null; + this.#headers = null; } this.url.deref(); @@ -352,8 +400,9 @@ pub fn finalizeWithoutDeinit(this: *Request) void { } pub fn finalize(this: *Request) void { + this.#js_ref.finalize(); this.finalizeWithoutDeinit(); - _ = this.body.unref(); + _ = this.#body.unref(); if (this.weak_ptr_data.onFinalize()) { bun.destroy(this); } @@ -371,7 +420,7 @@ pub fn getReferrer( this: *Request, globalObject: *jsc.JSGlobalObject, ) jsc.JSValue { - if (this._headers) |headers_ref| { + if (this.#headers) |headers_ref| { if (headers_ref.get("referrer", globalObject)) |referrer| { return ZigString.init(referrer).toJS(globalObject); } @@ -521,20 +570,36 @@ const Fields = enum { // timeout, url, }; - -pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSValue) bun.JSError!Request { +fn checkBodyStreamRef(this: *Request, globalObject: *JSGlobalObject) void { + if (this.#js_ref.tryGet()) |js_value| { + if (this.#body.value == .Locked) { + if (this.#body.value.Locked.readable.get(globalObject)) |stream| { + // we dont hold a strong reference to the stream we will guard it in js.gc.stream + // so we avoid cycled references + // anyone using Response should not use Locked.readable directly because it dont always owns it + // the owner will be always the Response object it self + stream.value.ensureStillAlive(); + js.gc.stream.set(js_value, globalObject, stream.value); + this.#body.value.Locked.readable.deinit(); + this.#body.value.Locked.readable = .{}; + } + } + } +} +pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSValue, this_value: jsc.JSValue) bun.JSError!Request { var success = false; const vm = globalThis.bunVM(); const body = try vm.initRequestBodyValue(.{ .Null = {} }); var req = Request{ - .body = body, + .#body = body, + .#js_ref = .initWeak(this_value), }; defer { if (!success) { req.finalizeWithoutDeinit(); - _ = req.body.unref(); + _ = req.#body.unref(); } - if (req.body != body) { + if (req.#body != body) { _ = body.unref(); } } @@ -599,7 +664,7 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV if (!fields.contains(.headers)) { if (try request.cloneHeaders(globalThis)) |headers| { - req._headers = headers; + req.#headers = headers; fields.insert(.headers); } @@ -607,10 +672,10 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV } if (!fields.contains(.body)) { - switch (request.body.value) { + switch (request.#body.value) { .Null, .Empty, .Used => {}, else => { - req.body.value = try request.body.value.clone(globalThis); + req.#body.value = try request.#body.value.clone(globalThis); fields.insert(.body); }, } @@ -625,7 +690,7 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV if (!fields.contains(.headers)) { if (response.getInitHeaders()) |headers| { - req._headers = try headers.cloneThis(globalThis); + req.#headers = try headers.cloneThis(globalThis); fields.insert(.headers); } } @@ -643,7 +708,7 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV switch (bodyValue.*) { .Null, .Empty, .Used => {}, else => { - req.body.value = try bodyValue.clone(globalThis); + req.#body.value = try bodyValue.clone(globalThis); fields.insert(.body); }, } @@ -656,7 +721,7 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV if (!fields.contains(.body)) { if (try value.fastGet(globalThis, .body)) |body_| { fields.insert(.body); - req.body.value = try Body.Value.fromJS(globalThis, body_); + req.#body.value = try Body.Value.fromJS(globalThis, body_); } if (globalThis.hasException()) return error.JSError; @@ -705,7 +770,7 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV if (!explicit_check or (explicit_check and (try value.fastGet(globalThis, .headers)) != null)) { if (response_init.headers) |headers| { if (!fields.contains(.headers)) { - req._headers = headers; + req.#headers = headers; fields.insert(.headers); } else { headers.deref(); @@ -763,32 +828,33 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV req.url = href; - if (req.body.value == .Blob and - req._headers != null and - req.body.value.Blob.content_type.len > 0 and - !req._headers.?.fastHas(.ContentType)) + if (req.#body.value == .Blob and + req.#headers != null and + req.#body.value.Blob.content_type.len > 0 and + !req.#headers.?.fastHas(.ContentType)) { - try req._headers.?.put(.ContentType, req.body.value.Blob.content_type, globalThis); + try req.#headers.?.put(.ContentType, req.#body.value.Blob.content_type, globalThis); } req.calculateEstimatedByteSize(); + req.checkBodyStreamRef(globalThis); success = true; return req; } -pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!*Request { +pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, this_value: jsc.JSValue) bun.JSError!*Request { const arguments_ = callframe.arguments_old(2); const arguments = arguments_.ptr[0..arguments_.len]; - const request = try constructInto(globalThis, arguments); + const request = try constructInto(globalThis, arguments, this_value); return Request.new(request); } pub fn getBodyValue( this: *Request, ) *Body.Value { - return &this.body.value; + return &this.#body.value; } pub fn doClone( @@ -801,26 +867,26 @@ pub fn doClone( const js_wrapper = cloned.toJS(globalThis); if (js_wrapper != .zero) { - if (cloned.body.value == .Locked) { - if (cloned.body.value.Locked.readable.get(globalThis)) |readable| { + if (cloned.#body.value == .Locked) { + if (cloned.#body.value.Locked.readable.get(globalThis)) |readable| { // If we are teed, then we need to update the cached .body // value to point to the new readable stream // We must do this on both the original and cloned request // but especially the original request since it will have a stale .body value now. js.bodySetCached(js_wrapper, globalThis, readable.value); - if (this.body.value.Locked.readable.get(globalThis)) |other_readable| { + if (this.#body.value.Locked.readable.get(globalThis)) |other_readable| { js.bodySetCached(this_value, globalThis, other_readable.value); } } } } - + this.checkBodyStreamRef(globalThis); return js_wrapper; } // Returns if the request has headers already cached/set. pub fn hasFetchHeaders(this: *Request) bool { - return this._headers != null; + return this.#headers != null; } /// Sets the headers of the request. This will take ownership of the headers. @@ -829,11 +895,11 @@ pub fn setFetchHeaders( this: *Request, headers: ?*FetchHeaders, ) void { - if (this._headers) |old_headers| { + if (this.#headers) |old_headers| { old_headers.deref(); } - this._headers = headers; + this.#headers = headers; } /// Returns the headers of the request. If the headers are not already cached, it will create a new FetchHeaders object. @@ -843,18 +909,18 @@ pub fn ensureFetchHeaders( this: *Request, globalThis: *jsc.JSGlobalObject, ) bun.JSError!*FetchHeaders { - if (this._headers) |headers| { + if (this.#headers) |headers| { // headers is already set return headers; } if (this.request_context.getRequest()) |req| { // we have a request context, so we can get the headers from it - this._headers = FetchHeaders.createFromUWS(req); + this.#headers = FetchHeaders.createFromUWS(req); } else { // we don't have a request context, so we need to create an empty headers object - this._headers = FetchHeaders.createEmpty(); - const content_type = switch (this.body.value) { + this.#headers = FetchHeaders.createEmpty(); + const content_type = switch (this.#body.value) { .Blob => |blob| blob.content_type, .Locked => |locked| if (locked.readable.get(globalThis)) |*readable| switch (readable.ptr) { .Blob => |blob| blob.content_type, @@ -865,25 +931,25 @@ pub fn ensureFetchHeaders( if (content_type) |content_type_| { if (content_type_.len > 0) { - try this._headers.?.put(.ContentType, content_type_, globalThis); + try this.#headers.?.put(.ContentType, content_type_, globalThis); } } } - return this._headers.?; + return this.#headers.?; } pub fn getFetchHeadersUnlessEmpty( this: *Request, ) ?*FetchHeaders { - if (this._headers == null) { + if (this.#headers == null) { if (this.request_context.getRequest()) |req| { // we have a request context, so we can get the headers from it - this._headers = FetchHeaders.createFromUWS(req); + this.#headers = FetchHeaders.createFromUWS(req); } } - const headers = this._headers orelse return null; + const headers = this.#headers orelse return null; if (headers.isEmpty()) { return null; } @@ -894,7 +960,7 @@ pub fn getFetchHeadersUnlessEmpty( pub fn getFetchHeaders( this: *Request, ) ?*FetchHeaders { - return this._headers; + return this.#headers; } /// This should only be called by the JS code. use getFetchHeaders to get the current headers or ensureFetchHeaders to get the headers and create them if they don't exist. @@ -906,13 +972,13 @@ pub fn getHeaders( } pub fn cloneHeaders(this: *Request, globalThis: *JSGlobalObject) bun.JSError!?*FetchHeaders { - if (this._headers == null) { + if (this.#headers == null) { if (this.request_context.getRequest()) |uws_req| { - this._headers = FetchHeaders.createFromUWS(uws_req); + this.#headers = FetchHeaders.createFromUWS(uws_req); } } - if (this._headers) |head| { + if (this.#headers) |head| { if (head.isEmpty()) { return null; } @@ -933,20 +999,31 @@ pub fn cloneInto( _ = allocator; this.ensureURL() catch {}; const vm = globalThis.bunVM(); - var body_ = try this.body.value.clone(globalThis); + var body_ = brk: { + if (this.#js_ref.tryGet()) |js_ref| { + if (js.gc.stream.get(js_ref)) |stream| { + var readable = try jsc.WebCore.ReadableStream.fromJS(stream, globalThis); + if (readable != null) { + break :brk try this.#body.value.cloneWithReadableStream(globalThis, &readable.?); + } + } + } + + break :brk try this.#body.value.clone(globalThis); + }; errdefer body_.deinit(); const body = try vm.initRequestBodyValue(body_); const url = if (preserve_url) req.url else this.url.dupeRef(); errdefer if (!preserve_url) url.deref(); - const _headers = try this.cloneHeaders(globalThis); - errdefer if (_headers) |_h| _h.deref(); + const headers = try this.cloneHeaders(globalThis); + errdefer if (headers) |_h| _h.deref(); req.* = Request{ - .body = body, + .#body = body, .url = url, .method = this.method, .redirect = this.redirect, - ._headers = _headers, + .#headers = headers, }; if (this.signal) |signal| { diff --git a/src/bun.js/webcore/Response.zig b/src/bun.js/webcore/Response.zig index 470396b504792e..78232d9348dbfc 100644 --- a/src/bun.js/webcore/Response.zig +++ b/src/bun.js/webcore/Response.zig @@ -17,6 +17,7 @@ pub const fromJSDirect = js.fromJSDirect; /// We increment this count in fetch so if JS Response is discarted we can resolve the Body /// In the server we use a flag response_protected to protect/unprotect the response ref_count: u32 = 1, +#js_ref: jsc.JSRef = .empty(), // We must report a consistent value for this #reported_estimated_size: usize = 0, @@ -102,17 +103,76 @@ pub fn calculateEstimatedByteSize(this: *Response) void { @sizeOf(Response); } +fn checkBodyStreamRef(this: *Response, globalObject: *JSGlobalObject) void { + if (this.#js_ref.tryGet()) |js_value| { + if (this.#body.value == .Locked) { + if (this.#body.value.Locked.readable.get(globalObject)) |stream| { + // we dont hold a strong reference to the stream we will guard it in js.gc.stream + // so we avoid cycled references + // anyone using Response should not use Locked.readable directly because it dont always owns it + // the owner will be always the Response object it self + stream.value.ensureStillAlive(); + js.gc.stream.set(js_value, globalObject, stream.value); + this.#body.value.Locked.readable.deinit(); + this.#body.value.Locked.readable = .{}; + } + } + } +} pub fn toJS(this: *Response, globalObject: *JSGlobalObject) JSValue { this.calculateEstimatedByteSize(); - return js.toJSUnchecked(globalObject, this); + const js_value = js.toJSUnchecked(globalObject, this); + this.#js_ref = .initWeak(js_value); + + this.checkBodyStreamRef(globalObject); + return js_value; } -pub fn getBodyValue( +pub inline fn getBodyValue( this: *Response, ) *Body.Value { return &this.#body.value; } +pub inline fn getBodyReadableStream( + this: *Response, + globalObject: *JSGlobalObject, +) ?jsc.WebCore.ReadableStream { + if (this.#js_ref.tryGet()) |js_ref| { + if (js.gc.stream.get(js_ref)) |stream| { + // JS is always source of truth for the stream + return jsc.WebCore.ReadableStream.fromJS(stream, globalObject) catch |err| { + _ = globalObject.takeException(err); + return null; + }; + } + } + if (this.#body.value == .Locked) { + return this.#body.value.Locked.readable.get(globalObject); + } + return null; +} +pub inline fn detachReadableStream(this: *Response, globalObject: *jsc.JSGlobalObject) void { + if (this.#js_ref.tryGet()) |js_ref| { + js.gc.stream.clear(js_ref, globalObject); + } + if (this.#body.value == .Locked) { + var old = this.#body.value.Locked.readable; + old.deinit(); + this.#body.value.Locked.readable = .{}; + } +} +pub inline fn setSizeHint(this: *Response, size_hint: Blob.SizeType) void { + if (this.#body.value == .Locked) { + this.#body.value.Locked.size_hint = size_hint; + if (this.#body.value.Locked.readable.get(this.#body.value.Locked.global)) |readable| { + if (readable.ptr == .Bytes) { + readable.ptr.Bytes.size_hint = size_hint; + } + } + } +} + pub export fn jsFunctionRequestOrResponseHasBodyValue(_: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) callconv(jsc.conv) jsc.JSValue { const arguments = callframe.arguments_old(1); const this_value = arguments.ptr[0]; @@ -123,7 +183,7 @@ pub export fn jsFunctionRequestOrResponseHasBodyValue(_: *jsc.JSGlobalObject, ca if (this_value.as(Response)) |response| { return jsc.JSValue.jsBoolean(!response.#body.value.isDefinitelyEmpty()); } else if (this_value.as(Request)) |request| { - return jsc.JSValue.jsBoolean(!request.body.value.isDefinitelyEmpty()); + return jsc.JSValue.jsBoolean(!request.getBodyValue().isDefinitelyEmpty()); } return .false; @@ -140,7 +200,7 @@ pub export fn jsFunctionGetCompleteRequestOrResponseBodyValueAsArrayBuffer(globa if (this_value.as(Response)) |response| { break :brk &response.#body.value; } else if (this_value.as(Request)) |request| { - break :brk &request.body.value; + break :brk request.getBodyValue(); } return .js_undefined; @@ -335,6 +395,8 @@ pub fn doClone( } } + this.checkBodyStreamRef(globalThis); + return js_wrapper; } @@ -346,7 +408,18 @@ pub fn cloneValue( this: *Response, globalThis: *JSGlobalObject, ) bun.JSError!Response { - var body = try this.#body.clone(globalThis); + var body = brk: { + if (this.#js_ref.tryGet()) |js_ref| { + if (js.gc.stream.get(js_ref)) |stream| { + var readable = try jsc.WebCore.ReadableStream.fromJS(stream, globalThis); + if (readable != null) { + break :brk try this.#body.cloneWithReadableStream(globalThis, &readable.?); + } + } + } + + break :brk try this.#body.clone(globalThis); + }; errdefer body.deinit(bun.default_allocator); var _init = try this.#init.clone(globalThis); errdefer _init.deinit(bun.default_allocator); @@ -394,6 +467,7 @@ pub fn unref(this: *Response) void { pub fn finalize( this: *Response, ) callconv(.C) void { + this.#js_ref.finalize(); this.unref(); } @@ -581,10 +655,12 @@ pub fn constructError( }, ); - return response.toJS(globalThis); + const js_value = response.toJS(globalThis); + response.#js_ref = .initWeak(js_value); + return js_value; } -pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!*Response { +pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, js_this: jsc.JSValue) bun.JSError!*Response { var arguments = callframe.argumentsAsArray(2); if (!arguments[0].isUndefinedOrNull() and arguments[0].isObject()) { @@ -601,6 +677,7 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b .value = .{ .Empty = {} }, }, .#url = bun.String.empty, + .#js_ref = .initWeak(js_this), }; const credentials = blob.store.?.data.s3.getCredentials(); @@ -658,6 +735,7 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b var response = bun.new(Response, Response{ .#body = body, .#init = _init, + .#js_ref = .initWeak(js_this), }); if (response.#body.value == .Blob and @@ -669,7 +747,7 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b } response.calculateEstimatedByteSize(); - + response.checkBodyStreamRef(globalThis); return response; } diff --git a/src/bun.js/webcore/fetch.zig b/src/bun.js/webcore/fetch.zig index 65677c00ea80bd..f497c56df394c3 100644 --- a/src/bun.js/webcore/fetch.zig +++ b/src/bun.js/webcore/fetch.zig @@ -365,6 +365,7 @@ pub const FetchTasklet = struct { const globalThis = this.global_this; // reset the buffer if we are streaming or if we are not waiting for bufferig anymore var buffer_reset = true; + log("onBodyReceived success={} has_more={}", .{ success, this.result.has_more }); defer { if (buffer_reset) { this.scheduled_response_buffer.reset(); @@ -416,6 +417,7 @@ pub const FetchTasklet = struct { } if (this.readable_stream_ref.get(globalThis)) |readable| { + log("onBodyReceived readable_stream_ref", .{}); if (readable.ptr == .Bytes) { readable.ptr.Bytes.size_hint = this.getSizeHint(); // body can be marked as used but we still need to pipe the data @@ -448,68 +450,65 @@ pub const FetchTasklet = struct { } if (this.getCurrentResponse()) |response| { - var body = response.getBodyValue(); - if (body.* == .Locked) { - if (body.Locked.readable.get(globalThis)) |readable| { - if (readable.ptr == .Bytes) { - readable.ptr.Bytes.size_hint = this.getSizeHint(); - - const scheduled_response_buffer = this.scheduled_response_buffer.list; - - const chunk = scheduled_response_buffer.items; - - if (this.result.has_more) { - readable.ptr.Bytes.onData( - .{ - .temporary = bun.ByteList.fromBorrowedSliceDangerous(chunk), - }, - bun.default_allocator, - ); - } else { - var prev = body.Locked.readable; - body.Locked.readable = .{}; - readable.value.ensureStillAlive(); - prev.deinit(); - readable.value.ensureStillAlive(); - readable.ptr.Bytes.onData( - .{ - .temporary_and_done = bun.ByteList.fromBorrowedSliceDangerous(chunk), - }, - bun.default_allocator, - ); - } - - return; + log("onBodyReceived Current Response", .{}); + const sizeHint = this.getSizeHint(); + response.setSizeHint(sizeHint); + if (response.getBodyReadableStream(globalThis)) |readable| { + log("onBodyReceived CurrentResponse BodyReadableStream", .{}); + if (readable.ptr == .Bytes) { + const scheduled_response_buffer = this.scheduled_response_buffer.list; + + const chunk = scheduled_response_buffer.items; + + if (this.result.has_more) { + readable.ptr.Bytes.onData( + .{ + .temporary = bun.ByteList.fromBorrowedSliceDangerous(chunk), + }, + bun.default_allocator, + ); + } else { + readable.value.ensureStillAlive(); + response.detachReadableStream(globalThis); + readable.ptr.Bytes.onData( + .{ + .temporary_and_done = bun.ByteList.fromBorrowedSliceDangerous(chunk), + }, + bun.default_allocator, + ); } - } else { - body.Locked.size_hint = this.getSizeHint(); + + return; } - // we will reach here when not streaming, this is also the only case we dont wanna to reset the buffer - buffer_reset = false; - if (!this.result.has_more) { - var scheduled_response_buffer = this.scheduled_response_buffer.list; - this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice()); + } - // done resolve body - var old = body.*; - const body_value = Body.Value{ - .InternalBlob = .{ - .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), - }, - }; - body.* = body_value; + // we will reach here when not streaming, this is also the only case we dont wanna to reset the buffer + buffer_reset = false; + if (!this.result.has_more) { + var scheduled_response_buffer = this.scheduled_response_buffer.list; + this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice()); + const body = response.getBodyValue(); + // done resolve body + var old = body.*; + const body_value = Body.Value{ + .InternalBlob = .{ + .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), + }, + }; + body.* = body_value; + log("onBodyReceived body_value length={}", .{body_value.InternalBlob.bytes.items.len}); - this.scheduled_response_buffer = .{ - .allocator = this.memory_reporter.allocator(), - .list = .{ - .items = &.{}, - .capacity = 0, - }, - }; + this.scheduled_response_buffer = .{ + .allocator = this.memory_reporter.allocator(), + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; - if (old == .Locked) { - old.resolve(body, this.global_this, response.getFetchHeaders()); - } + if (old == .Locked) { + log("onBodyReceived old.resolve", .{}); + old.resolve(body, this.global_this, response.getFetchHeaders()); } } } @@ -2102,21 +2101,25 @@ pub fn Bun__fetch_( } if (request) |req| { - if (req.body.value == .Used or (req.body.value == .Locked and (req.body.value.Locked.action != .none or req.body.value.Locked.isDisturbed(Request, globalThis, first_arg)))) { + const bodyValue = req.getBodyValue(); + if (bodyValue.* == .Used or (bodyValue.* == .Locked and (bodyValue.Locked.action != .none or bodyValue.Locked.isDisturbed(Request, globalThis, first_arg)))) { return globalThis.ERR(.BODY_ALREADY_USED, "Request body already used", .{}).throw(); } - if (req.body.value == .Locked) { - if (req.body.value.Locked.readable.has()) { - break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get(globalThis).?, globalThis) }; + if (bodyValue.* == .Locked) { + if (req.getBodyReadableStream(globalThis)) |readable| { + break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis) }; + } + if (bodyValue.Locked.readable.has()) { + break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(bodyValue.Locked.readable.get(globalThis).?, globalThis) }; } - const readable = try req.body.value.toReadableStream(globalThis); - if (!readable.isEmptyOrUndefinedOrNull() and req.body.value == .Locked and req.body.value.Locked.readable.has()) { - break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get(globalThis).?, globalThis) }; + const readable = try bodyValue.toReadableStream(globalThis); + if (!readable.isEmptyOrUndefinedOrNull() and bodyValue.* == .Locked and bodyValue.Locked.readable.has()) { + break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(bodyValue.Locked.readable.get(globalThis).?, globalThis) }; } } - break :extract_body FetchTasklet.HTTPRequestBody{ .AnyBlob = req.body.value.useAsAnyBlob() }; + break :extract_body FetchTasklet.HTTPRequestBody{ .AnyBlob = bodyValue.useAsAnyBlob() }; } if (request_init_object) |req| { diff --git a/src/bun.js/webcore/response.classes.ts b/src/bun.js/webcore/response.classes.ts index e1900cd059c8e3..2efa48a44de822 100644 --- a/src/bun.js/webcore/response.classes.ts +++ b/src/bun.js/webcore/response.classes.ts @@ -4,6 +4,7 @@ export default [ define({ name: "Request", construct: true, + constructNeedsThis: true, finalize: true, final: false, klass: {}, @@ -12,6 +13,7 @@ export default [ configurable: false, overridesToJS: true, memoryCost: true, + values: ["stream"], proto: { text: { fn: "getText", async: true }, json: { fn: "getJSON", async: true }, @@ -68,6 +70,7 @@ export default [ define({ name: "Response", construct: true, + constructNeedsThis: true, finalize: true, final: false, JSType: "0b11101110", @@ -85,6 +88,7 @@ export default [ fn: "constructError", }, }, + values: ["stream"], proto: { url: { getter: "getURL", diff --git a/test/js/web/fetch/request-cyclic-reference.test.ts b/test/js/web/fetch/request-cyclic-reference.test.ts new file mode 100644 index 00000000000000..83640c9e70757d --- /dev/null +++ b/test/js/web/fetch/request-cyclic-reference.test.ts @@ -0,0 +1,20 @@ +import { heapStats } from "bun:jsc"; +import { expect, test } from "bun:test"; + +test("stream should not leak when request is cyclic reference to itself", async () => { + function leak() { + const stream = new ReadableStream({ + pull(controller) {}, + }); + const response = new Request("http://localhost:1337", { body: stream }); + // @ts-ignore + stream.response = stream; + } + for (let i = 0; i < 10000; i++) { + leak(); + } + + await Bun.sleep(0); + Bun.gc(true); + expect(heapStats().objectTypeCounts.ReadableStream || 0).toBeLessThanOrEqual(4); +}); diff --git a/test/js/web/fetch/response-cyclic-reference.test.ts b/test/js/web/fetch/response-cyclic-reference.test.ts new file mode 100644 index 00000000000000..65685447991568 --- /dev/null +++ b/test/js/web/fetch/response-cyclic-reference.test.ts @@ -0,0 +1,20 @@ +import { heapStats } from "bun:jsc"; +import { expect, test } from "bun:test"; + +test("stream should not leak when response is cyclic reference to itself", async () => { + function leak() { + const stream = new ReadableStream({ + pull(controller) {}, + }); + const response = new Response(stream); + // @ts-ignore + stream.response = stream; + } + for (let i = 0; i < 10000; i++) { + leak(); + } + + await Bun.sleep(0); + Bun.gc(true); + expect(heapStats().objectTypeCounts.ReadableStream || 0).toBeLessThanOrEqual(4); +}); From 5cc5bfe0e083cd7ae62dca564cac96eb1547ef6f Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Tue, 7 Oct 2025 02:53:42 -0700 Subject: [PATCH 5/6] Simplify --- src/bun.js/api/server/RequestContext.zig | 50 ++++++++++--------- src/bun.js/webcore/Body.zig | 33 ++++-------- src/bun.js/webcore/Request.zig | 7 ++- src/bun.js/webcore/fetch.zig | 11 +--- .../fetch/request-cyclic-reference.test.ts | 28 ++++++++++- .../fetch/response-cyclic-reference.test.ts | 20 ++++++++ 6 files changed, 87 insertions(+), 62 deletions(-) diff --git a/src/bun.js/api/server/RequestContext.zig b/src/bun.js/api/server/RequestContext.zig index 4ada2485a3bd47..de989f1b55b40f 100644 --- a/src/bun.js/api/server/RequestContext.zig +++ b/src/bun.js/api/server/RequestContext.zig @@ -63,7 +63,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, sink: ?*ResponseStream.JSSink = null, byte_stream: ?*jsc.WebCore.ByteStream = null, // reference to the readable stream / byte_stream alive - readable_stream_ref: jsc.WebCore.ReadableStream.Strong = .{}, + response_body_readable_stream_ref: jsc.WebCore.ReadableStream.Strong = .{}, /// Used in errors pathname: bun.String = bun.String.empty, @@ -678,7 +678,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (this.response_ptr) |response| { if (response.getBodyReadableStream(globalThis)) |stream| { - stream.value.ensureStillAlive(); + defer stream.value.ensureStillAlive(); response.detachReadableStream(globalThis); stream.abort(globalThis); any_js_calls = true; @@ -752,7 +752,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, stream.unpipeWithoutDeref(); } - this.readable_stream_ref.deinit(); + this.response_body_readable_stream_ref.deinit(); if (!this.pathname.isEmpty()) { this.pathname.deref(); @@ -1121,7 +1121,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (this.isAbortedOrEnded()) { stream.cancel(globalThis); - this.readable_stream_ref.deinit(); + this.response_body_readable_stream_ref.deinit(); return; } const resp = this.resp.?; @@ -1189,7 +1189,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, this.sink = null; response_stream.sink.destroy(); stream.done(globalThis); - this.readable_stream_ref.deinit(); + this.response_body_readable_stream_ref.deinit(); this.endStream(this.shouldCloseConnection()); return; } @@ -1230,22 +1230,22 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, }, .fulfilled => { streamLog("promise Fulfilled", .{}); - var readable_stream_ref = this.readable_stream_ref; - this.readable_stream_ref = .{}; + var response_body_readable_stream_ref = this.response_body_readable_stream_ref; + this.response_body_readable_stream_ref = .{}; defer { stream.done(globalThis); - readable_stream_ref.deinit(); + response_body_readable_stream_ref.deinit(); } this.handleResolveStream(); }, .rejected => { streamLog("promise Rejected", .{}); - var readable_stream_ref = this.readable_stream_ref; - this.readable_stream_ref = .{}; + var response_body_readable_stream_ref = this.response_body_readable_stream_ref; + this.response_body_readable_stream_ref = .{}; defer { stream.cancel(globalThis); - readable_stream_ref.deinit(); + response_body_readable_stream_ref.deinit(); } this.handleRejectStream(globalThis, promise.result(globalThis.vm())); }, @@ -1264,7 +1264,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (this.isAbortedOrEnded()) { response_stream.detach(globalThis); stream.cancel(globalThis); - defer this.readable_stream_ref.deinit(); + defer this.response_body_readable_stream_ref.deinit(); response_stream.sink.markDone(); response_stream.sink.onFirstWrite = null; @@ -1272,9 +1272,9 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, response_stream.sink.finalize(); return; } - var readable_stream_ref = this.readable_stream_ref; - this.readable_stream_ref = .{}; - defer readable_stream_ref.deinit(); + var response_body_readable_stream_ref = this.response_body_readable_stream_ref; + this.response_body_readable_stream_ref = .{}; + defer response_body_readable_stream_ref.deinit(); const is_in_progress = response_stream.sink.has_backpressure or !(response_stream.sink.wrote == 0 and response_stream.sink.buffer.len == 0); @@ -1667,8 +1667,9 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, resp.detachReadableStream(globalThis); stream.done(globalThis); - bodyValue.* = .{ .Used = {} }; } + + bodyValue.* = .Used; } if (req.isAbortedOrEnded()) { @@ -1716,12 +1717,15 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (req.response_ptr) |resp| { const bodyValue = resp.getBodyValue(); + if (resp.getBodyReadableStream(globalThis)) |stream| { stream.value.ensureStillAlive(); resp.detachReadableStream(globalThis); stream.done(globalThis); - bodyValue.* = .{ .Used = {} }; } + + if (bodyValue.* == .Locked) + bodyValue.* = .{ .Used = {} }; } // aborted so call finalizeForAbort @@ -1830,12 +1834,12 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, const readable_stream = brk: { if (lock.readable.get(globalThis)) |stream| { // we hold the stream alive until we're done with it - this.readable_stream_ref = lock.readable; + this.response_body_readable_stream_ref = lock.readable; break :brk stream; } if (owned_readable) |stream| { // response owns the stream, so we hold a strong reference to it - this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis); + this.response_body_readable_stream_ref = .init(stream, globalThis); break :brk stream; } break :brk null; @@ -1856,7 +1860,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, switch (stream.ptr) { .Invalid => { - this.readable_stream_ref.deinit(); + this.response_body_readable_stream_ref.deinit(); }, // toBlobIfPossible will typically convert .Blob streams, or .File streams into a Blob object, but cannot always. .Blob, @@ -1878,7 +1882,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (this.resp == null) { // we don't have a response, so we can discard the stream stream.done(globalThis); - this.readable_stream_ref.deinit(); + this.response_body_readable_stream_ref.deinit(); return; } const resp = this.resp.?; @@ -1887,13 +1891,13 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (byte_stream.has_received_last_chunk) { var byte_list = byte_stream.drain(); this.blob = .fromArrayList(byte_list.moveToListManaged(bun.default_allocator)); - this.readable_stream_ref.deinit(); + this.response_body_readable_stream_ref.deinit(); this.doRenderBlob(); return; } this.ref(); byte_stream.pipe = jsc.WebCore.Pipe.Wrap(@This(), onPipe).init(this); - this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis); + this.response_body_readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis); this.byte_stream = byte_stream; var response_buf = byte_stream.drain(); diff --git a/src/bun.js/webcore/Body.zig b/src/bun.js/webcore/Body.zig index 89da0374cb6851..64cdfc2a05da6d 100644 --- a/src/bun.js/webcore/Body.zig +++ b/src/bun.js/webcore/Body.zig @@ -135,23 +135,6 @@ pub const PendingValue = struct { return this.readable.held.has() or (this.promise != null and !this.promise.?.isEmptyOrUndefinedOrNull()); } - pub fn hasPendingPromise(this: *PendingValue) bool { - const promise = this.promise orelse return false; - - if (promise.asAnyPromise()) |internal| { - if (internal.status(this.global.vm()) != .pending) { - promise.unprotect(); - this.promise = null; - return false; - } - - return true; - } - - this.promise = null; - return false; - } - pub fn toAnyBlobAllowPromise(this: *PendingValue) ?AnyBlob { var stream = if (this.readable.get(this.global)) |readable| readable else return null; @@ -899,13 +882,15 @@ pub const Value = union(Tag) { locked.readable = .{}; defer strong_readable.deinit(); - if (locked.hasPendingPromise()) { - const promise = locked.promise.?; - defer promise.unprotect(); + if (locked.promise) |promise_value| { locked.promise = null; + defer promise_value.ensureStillAlive(); + defer promise_value.unprotect(); - if (promise.asAnyPromise()) |internal| { - internal.reject(global, this.Error.toJS(global)); + if (promise_value.asAnyPromise()) |promise| { + if (promise.status(global.vm()) == .pending) { + promise.reject(global, this.Error.toJS(global)); + } } } @@ -978,8 +963,8 @@ pub const Value = union(Tag) { } if (try readable.tee(globalThis)) |new_readable| { - // we current readable to be a strong reference when cloning and we will return the second one in the result - // this will be checked and downgraded to a write barrier if needed + // Keep the current readable as a strong reference when cloning, and return the second one in the result. + // This will be checked and downgraded to a write barrier if needed. this.Locked.readable = jsc.WebCore.ReadableStream.Strong.init(new_readable[0], globalThis); return Value{ .Locked = .{ diff --git a/src/bun.js/webcore/Request.zig b/src/bun.js/webcore/Request.zig index 140a46642e9f3d..d733f8a23126ac 100644 --- a/src/bun.js/webcore/Request.zig +++ b/src/bun.js/webcore/Request.zig @@ -574,10 +574,9 @@ fn checkBodyStreamRef(this: *Request, globalObject: *JSGlobalObject) void { if (this.#js_ref.tryGet()) |js_value| { if (this.#body.value == .Locked) { if (this.#body.value.Locked.readable.get(globalObject)) |stream| { - // we dont hold a strong reference to the stream we will guard it in js.gc.stream - // so we avoid cycled references - // anyone using Response should not use Locked.readable directly because it dont always owns it - // the owner will be always the Response object it self + // Store the stream in js.gc.stream instead of holding a strong reference + // to avoid circular references. The Request object owns the stream, + // so Locked.readable should not be used directly by consumers. stream.value.ensureStillAlive(); js.gc.stream.set(js_value, globalObject, stream.value); this.#body.value.Locked.readable.deinit(); diff --git a/src/bun.js/webcore/fetch.zig b/src/bun.js/webcore/fetch.zig index f497c56df394c3..80be6e2ef43718 100644 --- a/src/bun.js/webcore/fetch.zig +++ b/src/bun.js/webcore/fetch.zig @@ -402,16 +402,7 @@ pub const FetchTasklet = struct { if (this.getCurrentResponse()) |response| { need_deinit = false; // body value now owns the error const body = response.getBodyValue(); - if (body.* == .Locked and body.Locked.promise != null) { - const promise = body.Locked.promise.?.asAnyPromise().?; - body.toErrorInstance(err, globalThis); - if (promise.isHandled(globalThis.vm())) { - return; - } - promise.reject(globalThis, body.Error.toJS(globalThis)); - } else { - body.toErrorInstance(err, globalThis); - } + body.toErrorInstance(err, globalThis); } return; } diff --git a/test/js/web/fetch/request-cyclic-reference.test.ts b/test/js/web/fetch/request-cyclic-reference.test.ts index 83640c9e70757d..31ac19e3b84f6c 100644 --- a/test/js/web/fetch/request-cyclic-reference.test.ts +++ b/test/js/web/fetch/request-cyclic-reference.test.ts @@ -6,7 +6,7 @@ test("stream should not leak when request is cyclic reference to itself", async const stream = new ReadableStream({ pull(controller) {}, }); - const response = new Request("http://localhost:1337", { body: stream }); + const response = new Request("http://localhost:1337", { method: "POST", body: stream }); // @ts-ignore stream.response = stream; } @@ -18,3 +18,29 @@ test("stream should not leak when request is cyclic reference to itself", async Bun.gc(true); expect(heapStats().objectTypeCounts.ReadableStream || 0).toBeLessThanOrEqual(4); }); + +test("stream should not leak when creating a stream contained in another request", async () => { + var req1: Request | null = null; + var req2: Request | null = null; + function leak() { + const stream = new ReadableStream({ + async pull(controller) { + await 42; + controller.stream = req1; + controller.stream2 = req2; + }, + }); + req1 = new Request("http://localhost:1337", { method: "POST", body: stream }); + req2 = new Request("http://localhost:1337", { method: "POST", body: req1.body }); + // @ts-ignore + stream.req2 = req2; + stream.req = req1; + } + for (let i = 0; i < 10000; i++) { + leak(); + } + + await Bun.sleep(0); + Bun.gc(true); + expect(heapStats().objectTypeCounts.ReadableStream || 0).toBeLessThanOrEqual(4); +}); diff --git a/test/js/web/fetch/response-cyclic-reference.test.ts b/test/js/web/fetch/response-cyclic-reference.test.ts index 65685447991568..a9d0b78bbbc264 100644 --- a/test/js/web/fetch/response-cyclic-reference.test.ts +++ b/test/js/web/fetch/response-cyclic-reference.test.ts @@ -18,3 +18,23 @@ test("stream should not leak when response is cyclic reference to itself", async Bun.gc(true); expect(heapStats().objectTypeCounts.ReadableStream || 0).toBeLessThanOrEqual(4); }); + +test("stream should not leak when creating a stream contained in another response", async () => { + function leak() { + const stream = new ReadableStream({ + pull(controller) {}, + }); + const response = new Response(stream); + const response2 = new Response(response.body); + // @ts-ignore + stream.response = stream; + stream.response2 = response2; + } + for (let i = 0; i < 10000; i++) { + leak(); + } + + await Bun.sleep(0); + Bun.gc(true); + expect(heapStats().objectTypeCounts.ReadableStream || 0).toBeLessThanOrEqual(4); +}); From e0e7ce2f83667a19bc34e198a75aae1f997f9006 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Tue, 7 Oct 2025 02:54:22 -0700 Subject: [PATCH 6/6] Update RequestContext.zig --- src/bun.js/api/server/RequestContext.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bun.js/api/server/RequestContext.zig b/src/bun.js/api/server/RequestContext.zig index de989f1b55b40f..d130e4eaeb78bd 100644 --- a/src/bun.js/api/server/RequestContext.zig +++ b/src/bun.js/api/server/RequestContext.zig @@ -62,7 +62,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, sink: ?*ResponseStream.JSSink = null, byte_stream: ?*jsc.WebCore.ByteStream = null, - // reference to the readable stream / byte_stream alive + /// This keeps the Response body's ReadableStream alive. response_body_readable_stream_ref: jsc.WebCore.ReadableStream.Strong = .{}, /// Used in errors