-
Notifications
You must be signed in to change notification settings - Fork 1
Description
为了庆祝 std.Io 引入补丁集的发布,这里是作者在 Zigtoberfest 2025 上做的简短交互式演示的文本版本。
这是即将发布的 Zig 0.16.0 中新的异步 I/O 原语的预览,预计在 3-4 个月内发布。内容很多,这里先介绍一些核心同步 API,所有 Zig 代码都可以使用。
让我们从基础开始,逐步添加更多异步内容。
示例 0
第一个例子没有异步操作,基本上是 Zig 中的“Hello, World!”。
const std = @import("std");
pub fn main() !void {
doWork();
}
fn doWork() void {
std.debug.print("working\n", .{});
var timespec: std.posix.timespec = .{ .sec = 1, .nsec = 0 };
_ = std.posix.system.nanosleep(×pec, ×pec);
}输出:
0s $ zig run example0.zig
0s working
1s $
示例 1
接下来做一些设置。虽然还没有使用 async/await,但在增加复杂性之前,需要一些工具。
const std = @import("std");
const Io = std.Io;
const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
fn juicyMain(gpa: Allocator, io: Io) !void {
_ = gpa;
doWork(io);
}
fn doWork(io: Io) void {
std.debug.print("working\n", .{});
io.sleep(.fromSeconds(1), .awake) catch {};
}
pub fn main() !void {
// 设置分配器。
var debug_allocator: std.heap.DebugAllocator(.{}) = .init;
defer assert(debug_allocator.deinit() == .ok);
const gpa = debug_allocator.allocator();
// 设置 I/O 实现。
var threaded: std.Io.Threaded = .init(gpa);
defer threaded.deinit();
const io = threaded.io();
return juicyMain(gpa, io);
}输出(与之前相同):
0s $ zig run example0.zig
0s working
1s $
设置 std.Io 实现类似于设置分配器。通常在 main() 中执行一次,然后将实例传递到整个应用程序中。可重用代码应接受 Allocator 参数(如果需要分配),并应接受 Io 参数(如果需要执行 I/O 操作)。
在这种情况下,这是一个基于线程的 Io 实现。它没有使用 KQueue、IO_Uring 或事件循环,而是一个新的 std.Io 接口的线程实现。
所有示例的设置都相同,因此可以专注于示例代码,它与上次相同。仍然没什么有趣的 - 只是调用 doWork,当然只是调用 sleep()。
示例 2
从现在开始省略冗余的设置代码。
fn juicyMain(gpa: Allocator, io: Io) !void {
_ = gpa;
var future = io.async(doWork, .{io});
future.await(io); // 幂等
}
fn doWork(io: Io) void {
std.debug.print("working\n", .{});
io.sleep(.fromSeconds(1), .awake) catch {};
}输出(与之前相同):
0s $ zig run example0.zig
0s working
1s $
现在使用 async/await 来调用 doWork。async/await 对 Zig 的意义是将函数调用与函数返回解耦。
此代码与之前相同。完全相同,因为在 async 和 await 之间没有放置任何代码。进行调用,然后立即等待返回。
示例 3
在下一个示例中,同时执行两件事:
fn juicyMain(gpa: Allocator, io: Io) !void {
_ = gpa;
var a = io.async(doWork, .{ io, "hard" });
var b = io.async(doWork, .{ io, "on an excuse not to drink Spezi" });
a.await(io);
b.await(io);
}
fn doWork(io: Io, flavor_text: []const u8) void {
std.debug.print("working {s}\n", .{flavor_text});
io.sleep(.fromSeconds(1), .awake) catch {};
}输出:
0s $ zig run example3.zig
0s working on an excuse not to drink Spezi
0s working hard
1s $
仔细观察,可以看出它没有等待两秒钟,而是等待了一秒钟,因为这些操作同时发生。这证明了使用 async/await 的好处 - 可以表达异步性。根据选择的 I/O 实现,它可能会利用表达的异步性并使代码运行得更快。例如,在本例中,std.Io.Threaded 能够在实际时间的一秒内完成两秒钟的工作。
示例 4
通过引入失败,开始使该示例更接近实际场景。
fn juicyMain(gpa: Allocator, io: Io) !void {
var a = io.async(doWork, .{ gpa, io, "hard" });
var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });
try a.await(io);
try b.await(io);
}
fn doWork(gpa: Allocator, io: Io, flavor_text: []const u8) !void {
// 模拟发生错误:
if (flavor_text[0] == 'h') return error.OutOfMemory;
const copied_string = try gpa.dupe(u8, flavor_text);
defer gpa.free(copied_string);
std.debug.print("working {s}\n", .{copied_string});
io.sleep(.fromSeconds(1), .awake) catch {};
}这段代码与之前相同,只是第一个任务会返回一个错误。
猜猜运行这段代码会发生什么?
输出:
(出现内存泄漏和panic)
问题在于,当第一个 try 激活时,它会跳过第二个 await,然后被泄漏检查器捕获。这是一个bug。但很不幸,不是吗?因为我们希望这样编写代码。
示例 5
这是一个修复方案:
fn juicyMain(gpa: Allocator, io: Io) !void {
var a = io.async(doWork, .{ gpa, io, "hard" });
var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });
const a_result = a.await(io);
const b_result = b.await(io);
try a_result;
try b_result;
}
fn doWork(gpa: Allocator, io: Io, flavor_text: []const u8) !void {
// 模拟发生错误:
if (flavor_text[0] == 'h') return error.OutOfMemory;
const copied_string = try gpa.dupe(u8, flavor_text);
defer gpa.free(copied_string);
std.debug.print("working {s}\n", .{copied_string});
io.sleep(.fromSeconds(1), .awake) catch {};
}先执行 await,然后再执行 try。这将修复问题。
输出:
(正确处理了错误,没有资源泄漏)
它成功地失败了。错误得到了处理,没有泄漏任何资源。但这是一个陷阱。让我们找到一种更好的表达方式...
示例 6
这就是取消 (cancellation) 的用武之地。取消是一个非常方便的原语,因为现在可以像往常一样使用 defer、try 和 await,不仅修复了bug,而且还获得了更优化的代码。
fn juicyMain(gpa: Allocator, io: Io) !void {
var a = io.async(doWork, .{ gpa, io, "hard" });
defer a.cancel(io) catch {};
var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });
defer b.cancel(io) catch {};
try a.await(io);
try b.await(io);
}
fn doWork(gpa: Allocator, io: Io, flavor_text: []const u8) !void {
// 模拟发生错误:
if (flavor_text[0] == 'h') return error.OutOfMemory;
const copied_string = try gpa.dupe(u8, flavor_text);
defer gpa.free(copied_string);
std.debug.print("working {s}\n", .{copied_string});
io.sleep(.fromSeconds(1), .awake) catch {};
}感谢取消,现在可以获得即时结果,因为一旦第一个任务返回错误,就会运行 cancel。
输出:
(快速返回错误)
cancel 是最好的朋友,因为它会阻止资源泄漏,并使代码运行得更优化。
cancel 很容易理解:它具有与 await 相同的语义,只是它还会请求取消。取消请求被接受的条件由每个 I/O 实现定义。
cancel 和 await 对于自身和彼此都是幂等的。
示例 7
接下来,介绍另一个真实场景:资源分配。在这种情况下,成功时分配一个字符串,调用者需要管理该字符串。
fn juicyMain(gpa: Allocator, io: Io) !void {
var a = io.async(doWork, .{ gpa, io, "hard" });
defer if (a.cancel(io)) |s| gpa.free(s) else |_| {};
var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });
defer if (b.cancel(io)) |s| gpa.free(s) else |_| {};
const a_string = try a.await(io);
const b_string = try b.await(io);
std.debug.print("finished {s}\n", .{a_string});
std.debug.print("finished {s}\n", .{b_string});
}
fn doWork(gpa: Allocator, io: Io, flavor_text: []const u8) ![]u8 {
const copied_string = try gpa.dupe(u8, flavor_text);
std.debug.print("working {s}\n", .{copied_string});
io.sleep(.fromSeconds(1), .awake) catch {};
return copied_string;
}现在可以看到为什么 cancel 和 await 具有相同的 API。上面延迟的 cancel 调用释放了分配的资源,同时处理了成功调用(资源已分配)和失败调用(资源未分配)。
输出:
(成功完成并打印结果)
重要的一点是,通过像这样进行资源管理,我们能够在下面编写标准的、符合 Zig 习惯的代码,像往常一样使用 try 和 return,而无需担心特殊的资源管理情况。
示例 8
现在稍微改变一下方向。是时候了解为什么异步不是并发了。
在此示例中,生产者通过一个无缓冲队列向消费者发送一个项目。
fn juicyMain(io: Io) !void {
var queue: Io.Queue([]const u8) = .init(&.{});
var producer_task = io.async(producer, .{
io, &queue, "never gonna give you up",
});
defer producer_task.cancel(io) catch {};
var consumer_task = io.async(consumer, .{ io, &queue });
defer _ = consumer_task.cancel(io) catch {};
const result = try consumer_task.await(io);
std.debug.print("message received: {s}\n", .{result});
}
fn producer(
io: Io,
queue: *Io.Queue([]const u8),
flavor_text: []const u8,
) !void {
try queue.putOne(io, flavor_text);
}
fn consumer(
io: Io,
queue: *Io.Queue([]const u8),
) ![]const u8 {
return queue.getOne(io);
}使用 async 生成生产者,并使用 async 生成消费者。
输出:
(成功接收消息)
这不正确地成功了。从角度来看,我们可能由于线程池有多余的并发可用而“幸运”或“不幸”。
为了观察问题,我们可以人为地限制 std.Io.Threaded 实例使用一个线程池大小:
示例 9
// 设置 I/O 实现。
var threaded: std.Io.Threaded = .init(gpa);
threaded.cpu_count = 1;
defer threaded.deinit();
const io = threaded.io();
return juicyMain(io);
}输出:(死锁)
现在它只使用一个线程,它会死锁,因为消费者正在等待从队列中获取东西,而生产者计划运行,但尚未运行。
问题在于我们需要并发,但我们要求的是异步。
示例 10
为了解决这个问题,我们使用 io.concurrent 而不是 io.async。这可能会因为 error.ConcurrencyUnavailable 而失败。
fn juicyMain(io: Io) !void {
var queue: Io.Queue([]const u8) = .init(&.{});
var producer_task = try io.concurrent(producer, .{
io, &queue, "never gonna give you up",
});
defer producer_task.cancel(io) catch {};
var consumer_task = try io.concurrent(consumer, .{ io, &queue });
defer _ = consumer_task.cancel(io) catch {};
const result = try consumer_task.await(io);
std.debug.print("message received: {s}\n", .{result});
}
fn producer(
io: Io,
queue: *Io.Queue([]const u8),
flavor_text: []const u8,
) !void {
try queue.putOne(io, flavor_text);
}
fn consumer(
io: Io,
queue: *Io.Queue([]const u8),
) ![]const u8 {
return queue.getOne(io);
}输出:
(成功接收消息)
现在代码已修复,因为我们正确地表达了需要并发,std.Io.Threaded 通过过度订阅来满足了这一点。
如果添加 -fsingle-threaded 真正将可执行文件限制为一个线程,则过度订阅不可用,导致此输出:
(ConcurrencyUnavailable 错误)
结论
有一些使用 IoUring 和 KQueue 结合堆栈式协程的 std.Io 实现的概念验证,显示出很大的希望,但是该工作取决于一些语言增强才能实现。还有关于无堆栈协程的正在进行的设计工作。以下是一些相关的issue,供感兴趣的人跟踪:
- Restricted Function Types
- Builtin function to tell you the maximum stack size of a given function
- Eliminate Stack Overflow
- Stackless Coroutines
- Juicy Main
这些 API 尚未最终确定。可能需要几次迭代才能正确。请在实际应用中尝试它们,并告诉我们效果如何!让我们合作,使 I/O 接口实用且最佳。
加入我们
Zig 中文社区是一个开放的组织,我们致力于推广 Zig 在中文群体中的使用,有多种方式可以参与进来:
- 供稿,分享自己使用 Zig 的心得
- 改进 ZigCC 组织下的开源项目
- 加入微信群、Telegram 群组