Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/bun.js/event_loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ comptime {
@export(&externRunCallback3, .{ .name = "Bun__EventLoop__runCallback3" });
}

/// Prefer `runCallbackWithResult` unless you really need to make sure that microtasks are drained.
pub fn runCallbackWithResultAndForcefullyDrainMicrotasks(this: *EventLoop, callback: jsc.JSValue, globalObject: *jsc.JSGlobalObject, thisValue: jsc.JSValue, arguments: []const jsc.JSValue) !jsc.JSValue {
const result = try callback.call(globalObject, thisValue, arguments);
result.ensureStillAlive();
try this.drainMicrotasksWithGlobal(globalObject, globalObject.bunVM().jsc_vm);
return result;
}

pub fn runCallbackWithResult(this: *EventLoop, callback: jsc.JSValue, globalObject: *jsc.JSGlobalObject, thisValue: jsc.JSValue, arguments: []const jsc.JSValue) jsc.JSValue {
this.enter();
defer this.exit();
Expand Down
11 changes: 6 additions & 5 deletions src/bun.js/test/Collection.zig
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,12 @@ pub fn step(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject
this.active_scope = new_scope;
group.log("collection:runOne set scope to {s}", .{this.active_scope.base.name orelse "undefined"});

BunTest.runTestCallback(buntest_strong, globalThis, callback.get(), false, .{
.collection = .{
.active_scope = previous_scope,
},
}, .epoch);
if (BunTest.runTestCallback(buntest_strong, globalThis, callback.get(), false, .{
.collection = .{ .active_scope = previous_scope },
}, &.epoch)) |cfg_data| {
// the result is available immediately; queue
buntest.addResult(cfg_data);
}

return .{ .waiting = .{} };
}
Expand Down
30 changes: 17 additions & 13 deletions src/bun.js/test/Execution.zig
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,11 @@ pub fn step(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject
defer groupLog.end();
const buntest = buntest_strong.get();
const this = &buntest.execution;
var now = bun.timespec.now();

switch (data) {
.start => {
return try stepGroup(buntest_strong, globalThis, bun.timespec.now());
return try stepGroup(buntest_strong, globalThis, &now);
},
else => {
// determine the active sequence,group
Expand All @@ -242,21 +243,20 @@ pub fn step(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject
bun.assert(sequence.active_index < sequence.entries(this).len);
this.advanceSequence(sequence, group);

const now = bun.timespec.now();
const sequence_result = try stepSequence(buntest_strong, globalThis, sequence, group, sequence_index, now);
const sequence_result = try stepSequence(buntest_strong, globalThis, sequence, group, sequence_index, &now);
switch (sequence_result) {
.done => {},
.execute => |exec| return .{ .waiting = .{ .timeout = exec.timeout } },
}
if (group.remaining_incomplete_entries == 0) {
return try stepGroup(buntest_strong, globalThis, now);
return try stepGroup(buntest_strong, globalThis, &now);
}
return .{ .waiting = .{} };
},
}
}

pub fn stepGroup(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, now: bun.timespec) bun.JSError!bun_test.StepResult {
pub fn stepGroup(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, now: *bun.timespec) bun.JSError!bun_test.StepResult {
groupLog.begin(@src());
defer groupLog.end();
const buntest = buntest_strong.get();
Expand Down Expand Up @@ -295,7 +295,7 @@ pub fn stepGroup(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalO
}
}
const AdvanceStatus = union(enum) { done, execute: struct { timeout: bun.timespec = .epoch } };
fn stepGroupOne(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, group: *ConcurrentGroup, now: bun.timespec) !AdvanceStatus {
fn stepGroupOne(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, group: *ConcurrentGroup, now: *bun.timespec) !AdvanceStatus {
const buntest = buntest_strong.get();
const this = &buntest.execution;
var final_status: AdvanceStatus = .done;
Expand All @@ -320,13 +320,13 @@ const AdvanceSequenceStatus = union(enum) {
timeout: bun.timespec = .epoch,
},
};
fn stepSequence(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, sequence: *ExecutionSequence, group: *ConcurrentGroup, sequence_index: usize, now: bun.timespec) !AdvanceSequenceStatus {
fn stepSequence(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, sequence: *ExecutionSequence, group: *ConcurrentGroup, sequence_index: usize, now: *bun.timespec) !AdvanceSequenceStatus {
while (true) {
return try stepSequenceOne(buntest_strong, globalThis, sequence, group, sequence_index, now) orelse continue;
}
}
/// returns null if the while loop should continue
fn stepSequenceOne(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, sequence: *ExecutionSequence, group: *ConcurrentGroup, sequence_index: usize, now: bun.timespec) !?AdvanceSequenceStatus {
fn stepSequenceOne(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, sequence: *ExecutionSequence, group: *ConcurrentGroup, sequence_index: usize, now: *bun.timespec) !?AdvanceSequenceStatus {
groupLog.begin(@src());
defer groupLog.end();
const buntest = buntest_strong.get();
Expand All @@ -337,10 +337,7 @@ fn stepSequenceOne(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGloba
bun.debugAssert(false); // sequence is executing with no active entry
return .{ .execute = .{} };
};
if (!active_entry.timespec.eql(&.epoch) and active_entry.timespec.order(&now) == .lt) {
// timed out
sequence.result = if (active_entry == sequence.test_entry) if (active_entry.has_done_parameter) .fail_because_timeout_with_done_callback else .fail_because_timeout else if (active_entry.has_done_parameter) .fail_because_hook_timeout_with_done_callback else .fail_because_hook_timeout;
sequence.maybe_skip = true;
if (active_entry.evaluateTimeout(sequence, now)) {
this.advanceSequence(sequence, group);
return null; // run again
}
Expand Down Expand Up @@ -374,7 +371,14 @@ fn stepSequenceOne(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGloba
};
groupLog.log("runSequence queued callback: {}", .{callback_data});

BunTest.runTestCallback(buntest_strong, globalThis, cb.get(), next_item.has_done_parameter, callback_data, next_item.timespec);
if (BunTest.runTestCallback(buntest_strong, globalThis, cb.get(), next_item.has_done_parameter, callback_data, &next_item.timespec) != null) {
now.* = bun.timespec.now();
_ = next_item.evaluateTimeout(sequence, now);

// the result is available immediately; advance the sequence and run again.
this.advanceSequence(sequence, group);
return null; // run again
}
return .{ .execute = .{ .timeout = next_item.timespec } };
} else {
switch (next_item.base.mode) {
Expand Down
124 changes: 85 additions & 39 deletions src/bun.js/test/bun_test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ pub const BunTest = struct {
errdefer group.log("ended in error", .{});

const result, const this_ptr = callframe.argumentsAsArray(2);
if (this_ptr.isEmptyOrUndefinedOrNull()) return;

const refdata: *RefData = this_ptr.asPromisePtr(RefData);
defer refdata.deref();
Expand Down Expand Up @@ -472,21 +473,21 @@ pub const BunTest = struct {
}
}

this.updateMinTimeout(globalThis, min_timeout);
this.updateMinTimeout(globalThis, &min_timeout);
}

fn updateMinTimeout(this: *BunTest, globalThis: *jsc.JSGlobalObject, min_timeout: bun.timespec) void {
fn updateMinTimeout(this: *BunTest, globalThis: *jsc.JSGlobalObject, min_timeout: *const bun.timespec) void {
group.begin(@src());
defer group.end();
// only set the timer if the new timeout is sooner than the current timeout. this unfortunately means that we can't unset an unnecessary timer.
group.log("-> timeout: {} {}, {s}", .{ min_timeout, this.timer.next, @tagName(min_timeout.orderIgnoreEpoch(this.timer.next)) });
group.log("-> timeout: {} {}, {s}", .{ min_timeout.*, this.timer.next, @tagName(min_timeout.orderIgnoreEpoch(this.timer.next)) });
if (min_timeout.orderIgnoreEpoch(this.timer.next) == .lt) {
group.log("-> setting timer to {}", .{min_timeout});
group.log("-> setting timer to {}", .{min_timeout.*});
if (!this.timer.next.eql(&.epoch)) {
group.log("-> removing existing timer", .{});
globalThis.bunVM().timer.remove(&this.timer);
}
this.timer.next = min_timeout;
this.timer.next = min_timeout.*;
if (!this.timer.next.eql(&.epoch)) {
group.log("-> inserting timer", .{});
globalThis.bunVM().timer.insert(&this.timer);
Expand Down Expand Up @@ -534,48 +535,55 @@ pub const BunTest = struct {
}
}

fn drain(globalThis: *jsc.JSGlobalObject) void {
const bun_vm = globalThis.bunVM();
bun_vm.drainMicrotasks();
var count = bun_vm.unhandled_error_counter;
bun_vm.global.handleRejectedPromises();
while (bun_vm.unhandled_error_counter > count) {
count = bun_vm.unhandled_error_counter;
bun_vm.drainMicrotasks();
bun_vm.global.handleRejectedPromises();
}
}

/// if sync, the result is queued and appended later
pub fn runTestCallback(this_strong: BunTestPtr, globalThis: *jsc.JSGlobalObject, cfg_callback: jsc.JSValue, cfg_done_parameter: bool, cfg_data: BunTest.RefDataValue, timeout: bun.timespec) void {
/// if sync, the result is returned. if async, null is returned.
pub fn runTestCallback(this_strong: BunTestPtr, globalThis: *jsc.JSGlobalObject, cfg_callback: jsc.JSValue, cfg_done_parameter: bool, cfg_data: BunTest.RefDataValue, timeout: *const bun.timespec) ?RefDataValue {
group.begin(@src());
defer group.end();
const this = this_strong.get();
const vm = globalThis.bunVM();

var done_arg: ?jsc.JSValue = null;
// Don't use ?jsc.JSValue to make it harder for the conservative stack
// scanner to miss it.
var done_arg: jsc.JSValue = .zero;
var done_callback: jsc.JSValue = .zero;

var done_callback: ?jsc.JSValue = null;
if (cfg_done_parameter) {
group.log("callTestCallback -> appending done callback param: data {}", .{cfg_data});
done_callback = DoneCallback.createUnbound(globalThis);
done_arg = DoneCallback.bind(done_callback.?, globalThis) catch |e| blk: {
done_arg = DoneCallback.bind(done_callback, globalThis) catch |e| blk: {
this.onUncaughtException(globalThis, globalThis.takeException(e), false, cfg_data);
break :blk jsc.JSValue.js_undefined; // failed to bind done callback
break :blk .zero; // failed to bind done callback
};
}

this.updateMinTimeout(globalThis, timeout);
const result: ?jsc.JSValue = cfg_callback.call(globalThis, .js_undefined, if (done_arg) |done| &.{done} else &.{}) catch blk: {
const result: jsc.JSValue = vm.eventLoop().runCallbackWithResultAndForcefullyDrainMicrotasks(cfg_callback, globalThis, .js_undefined, if (done_arg != .zero) &.{done_arg} else &.{}) catch blk: {
globalThis.clearTerminationException();
this.onUncaughtException(globalThis, globalThis.tryTakeException(), false, cfg_data);
group.log("callTestCallback -> error", .{});
break :blk null;
break :blk .zero;
};

done_callback.ensureStillAlive();

// Drain unhandled promise rejections.
while (true) {
// Prevent the user's Promise rejection from going into the uncaught promise rejection queue.
if (result != .zero)
if (result.asPromise()) |promise|
if (promise.status(globalThis.vm()) == .rejected)
promise.setHandled(globalThis.vm());

const prev_unhandled_count = vm.unhandled_error_counter;
globalThis.handleRejectedPromises();
if (vm.unhandled_error_counter == prev_unhandled_count)
break;
}

var dcb_ref: ?*RefData = null;
if (done_callback) |dcb| {
if (DoneCallback.fromJS(dcb)) |dcb_data| {
if (dcb_data.called or result == null) {
if (done_callback != .zero and result != .zero) {
if (DoneCallback.fromJS(done_callback)) |dcb_data| {
if (dcb_data.called) {
// done callback already called or the callback errored; add result immediately
} else {
dcb_ref = ref(this_strong, cfg_data);
Expand All @@ -584,25 +592,43 @@ pub const BunTest = struct {
} else bun.debugAssert(false); // this should be unreachable, we create DoneCallback above
}

if (result != null and result.?.asPromise() != null) {
group.log("callTestCallback -> promise: data {}", .{cfg_data});
const this_ref: *RefData = if (dcb_ref) |dcb_ref_value| dcb_ref_value.dupe() else ref(this_strong, cfg_data);
result.?.then(globalThis, this_ref, bunTestThen, bunTestCatch);
drain(globalThis);
return;
if (result != .zero) {
if (result.asPromise()) |promise| {
defer result.ensureStillAlive(); // because sometimes we use promise without result

group.log("callTestCallback -> promise: data {}", .{cfg_data});

switch (promise.status(globalThis.vm())) {
.pending => {
// not immediately resolved; register 'then' to handle the result when it becomes available
const this_ref: *RefData = if (dcb_ref) |dcb_ref_value| dcb_ref_value.dupe() else ref(this_strong, cfg_data);
result.then(globalThis, this_ref, bunTestThen, bunTestCatch);
return null;
},
.fulfilled => {
// Do not register a then callback when it's already fulfilled.
return cfg_data;
},
.rejected => {
const value = promise.result(globalThis.vm());
this.onUncaughtException(globalThis, value, true, cfg_data);

// We previously marked it as handled above.

return cfg_data;
},
}
}
}

if (dcb_ref) |_| {
// completed asynchronously
group.log("callTestCallback -> wait for done callback", .{});
drain(globalThis);
return;
return null;
}

group.log("callTestCallback -> sync", .{});
drain(globalThis);
this.addResult(cfg_data);
return;
return cfg_data;
}

/// called from the uncaught exception handler, or if a test callback rejects or throws an error
Expand Down Expand Up @@ -843,6 +869,26 @@ pub const ExecutionEntry = struct {
}
return entry;
}

pub fn evaluateTimeout(this: *ExecutionEntry, sequence: *Execution.ExecutionSequence, now: *const bun.timespec) bool {
if (!this.timespec.eql(&.epoch) and this.timespec.order(now) == .lt) {
// timed out
sequence.result = if (this == sequence.test_entry)
if (this.has_done_parameter)
.fail_because_timeout_with_done_callback
else
.fail_because_timeout
else if (this.has_done_parameter)
.fail_because_hook_timeout_with_done_callback
else
.fail_because_hook_timeout;
sequence.maybe_skip = true;
return true;
}

return false;
}

pub fn destroy(this: *ExecutionEntry, gpa: std.mem.Allocator) void {
if (this.callback) |*c| c.deinit();
this.base.deinit(gpa);
Expand Down
15 changes: 15 additions & 0 deletions test/js/bun/test/concurrent_immediate.fixture.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
beforeEach(() => {
console.log("beforeEach");
});
afterEach(() => {
console.log("afterEach");
});
test.concurrent("test 1", () => {
console.log("start test 1");
});
test.concurrent("test 2", () => {
console.log("start test 2");
});
test.concurrent("test 3", () => {
console.log("start test 3");
});
Loading