Skip to content

【Zig 日报】Zig 的新异步 I/O (文本版) #268

@jiacai2050

Description

@jiacai2050

为了庆祝 std.Io 引入补丁集的发布,这里是作者在 Zigtoberfest 2025 上做的简短交互式演示的文本版本。

这是即将发布的 Zig 0.16.0 中新的异步 I/O 原语的预览,预计在 3-4 个月内发布。内容很多,这里先介绍一些核心同步 API,所有 Zig 代码都可以使用。

让我们从基础开始,逐步添加更多异步内容。

示例 0

第一个例子没有异步操作,基本上是 Zig 中的“Hello, World!”。

const std = @import("std");

pub fn main() !void {
    doWork();
}

fn doWork() void {
    std.debug.print("working\n", .{});
    var timespec: std.posix.timespec = .{ .sec = 1, .nsec = 0 };
    _ = std.posix.system.nanosleep(&timespec, &timespec);
}

输出:

0s $ zig run example0.zig
0s working
1s $

示例 1

接下来做一些设置。虽然还没有使用 async/await,但在增加复杂性之前,需要一些工具。

const std = @import("std");
const Io = std.Io;
const Allocator = std.mem.Allocator;
const assert = std.debug.assert;

fn juicyMain(gpa: Allocator, io: Io) !void {
    _ = gpa;

    doWork(io);
}

fn doWork(io: Io) void {
    std.debug.print("working\n", .{});
    io.sleep(.fromSeconds(1), .awake) catch {};
}

pub fn main() !void {
    // 设置分配器。
    var debug_allocator: std.heap.DebugAllocator(.{}) = .init;
    defer assert(debug_allocator.deinit() == .ok);
    const gpa = debug_allocator.allocator();

    // 设置 I/O 实现。
    var threaded: std.Io.Threaded = .init(gpa);
    defer threaded.deinit();
    const io = threaded.io();

    return juicyMain(gpa, io);
}

输出(与之前相同):

0s $ zig run example0.zig
0s working
1s $

设置 std.Io 实现类似于设置分配器。通常在 main() 中执行一次,然后将实例传递到整个应用程序中。可重用代码应接受 Allocator 参数(如果需要分配),并应接受 Io 参数(如果需要执行 I/O 操作)。

在这种情况下,这是一个基于线程的 Io 实现。它没有使用 KQueue、IO_Uring 或事件循环,而是一个新的 std.Io 接口的线程实现。

所有示例的设置都相同,因此可以专注于示例代码,它与上次相同。仍然没什么有趣的 - 只是调用 doWork,当然只是调用 sleep()

示例 2

从现在开始省略冗余的设置代码。

fn juicyMain(gpa: Allocator, io: Io) !void {
    _ = gpa;

    var future = io.async(doWork, .{io});

    future.await(io); // 幂等
}

fn doWork(io: Io) void {
    std.debug.print("working\n", .{});
    io.sleep(.fromSeconds(1), .awake) catch {};
}

输出(与之前相同):

0s $ zig run example0.zig
0s working
1s $

现在使用 async/await 来调用 doWork。async/await 对 Zig 的意义是将函数调用与函数返回解耦。

此代码与之前相同。完全相同,因为在 async 和 await 之间没有放置任何代码。进行调用,然后立即等待返回。

示例 3

在下一个示例中,同时执行两件事:

fn juicyMain(gpa: Allocator, io: Io) !void {
    _ = gpa;

    var a = io.async(doWork, .{ io, "hard" });
    var b = io.async(doWork, .{ io, "on an excuse not to drink Spezi" });

    a.await(io);
    b.await(io);
}

fn doWork(io: Io, flavor_text: []const u8) void {
    std.debug.print("working {s}\n", .{flavor_text});
    io.sleep(.fromSeconds(1), .awake) catch {};
}

输出:

0s $ zig run example3.zig
0s working on an excuse not to drink Spezi
0s working hard
1s $

仔细观察,可以看出它没有等待两秒钟,而是等待了一秒钟,因为这些操作同时发生。这证明了使用 async/await 的好处 - 可以表达异步性。根据选择的 I/O 实现,它可能会利用表达的异步性并使代码运行得更快。例如,在本例中,std.Io.Threaded 能够在实际时间的一秒内完成两秒钟的工作。

示例 4

通过引入失败,开始使该示例更接近实际场景。

fn juicyMain(gpa: Allocator, io: Io) !void {
    var a = io.async(doWork, .{ gpa, io, "hard" });
    var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });

    try a.await(io);
    try b.await(io);
}

fn doWork(gpa: Allocator, io: Io, flavor_text: []const u8) !void {
    // 模拟发生错误:
    if (flavor_text[0] == 'h') return error.OutOfMemory;

    const copied_string = try gpa.dupe(u8, flavor_text);
    defer gpa.free(copied_string);
    std.debug.print("working {s}\n", .{copied_string});
    io.sleep(.fromSeconds(1), .awake) catch {};
}

这段代码与之前相同,只是第一个任务会返回一个错误。

猜猜运行这段代码会发生什么?

输出:

(出现内存泄漏和panic)

问题在于,当第一个 try 激活时,它会跳过第二个 await,然后被泄漏检查器捕获。这是一个bug。但很不幸,不是吗?因为我们希望这样编写代码。

示例 5

这是一个修复方案:

fn juicyMain(gpa: Allocator, io: Io) !void {
    var a = io.async(doWork, .{ gpa, io, "hard" });
    var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });

    const a_result = a.await(io);
    const b_result = b.await(io);

    try a_result;
    try b_result;
}

fn doWork(gpa: Allocator, io: Io, flavor_text: []const u8) !void {
    // 模拟发生错误:
    if (flavor_text[0] == 'h') return error.OutOfMemory;

    const copied_string = try gpa.dupe(u8, flavor_text);
    defer gpa.free(copied_string);
    std.debug.print("working {s}\n", .{copied_string});
    io.sleep(.fromSeconds(1), .awake) catch {};
}

先执行 await,然后再执行 try。这将修复问题。

输出:

(正确处理了错误,没有资源泄漏)

它成功地失败了。错误得到了处理,没有泄漏任何资源。但这是一个陷阱。让我们找到一种更好的表达方式...

示例 6

这就是取消 (cancellation) 的用武之地。取消是一个非常方便的原语,因为现在可以像往常一样使用 defer、try 和 await,不仅修复了bug,而且还获得了更优化的代码。

fn juicyMain(gpa: Allocator, io: Io) !void {
    var a = io.async(doWork, .{ gpa, io, "hard" });
    defer a.cancel(io) catch {};

    var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });
    defer b.cancel(io) catch {};

    try a.await(io);
    try b.await(io);
}

fn doWork(gpa: Allocator, io: Io, flavor_text: []const u8) !void {
    // 模拟发生错误:
    if (flavor_text[0] == 'h') return error.OutOfMemory;

    const copied_string = try gpa.dupe(u8, flavor_text);
    defer gpa.free(copied_string);
    std.debug.print("working {s}\n", .{copied_string});
    io.sleep(.fromSeconds(1), .awake) catch {};
}

感谢取消,现在可以获得即时结果,因为一旦第一个任务返回错误,就会运行 cancel。

输出:

(快速返回错误)

cancel 是最好的朋友,因为它会阻止资源泄漏,并使代码运行得更优化。

cancel 很容易理解:它具有与 await 相同的语义,只是它还会请求取消。取消请求被接受的条件由每个 I/O 实现定义。

cancelawait 对于自身和彼此都是幂等的。

示例 7

接下来,介绍另一个真实场景:资源分配。在这种情况下,成功时分配一个字符串,调用者需要管理该字符串。

fn juicyMain(gpa: Allocator, io: Io) !void {
    var a = io.async(doWork, .{ gpa, io, "hard" });
    defer if (a.cancel(io)) |s| gpa.free(s) else |_| {};

    var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });
    defer if (b.cancel(io)) |s| gpa.free(s) else |_| {};

    const a_string = try a.await(io);
    const b_string = try b.await(io);
    std.debug.print("finished {s}\n", .{a_string});
    std.debug.print("finished {s}\n", .{b_string});
}

fn doWork(gpa: Allocator, io: Io, flavor_text: []const u8) ![]u8 {
    const copied_string = try gpa.dupe(u8, flavor_text);
    std.debug.print("working {s}\n", .{copied_string});
    io.sleep(.fromSeconds(1), .awake) catch {};
    return copied_string;
}

现在可以看到为什么 cancelawait 具有相同的 API。上面延迟的 cancel 调用释放了分配的资源,同时处理了成功调用(资源已分配)和失败调用(资源未分配)。

输出:

(成功完成并打印结果)

重要的一点是,通过像这样进行资源管理,我们能够在下面编写标准的、符合 Zig 习惯的代码,像往常一样使用 try 和 return,而无需担心特殊的资源管理情况。

示例 8

现在稍微改变一下方向。是时候了解为什么异步不是并发了。

在此示例中,生产者通过一个无缓冲队列向消费者发送一个项目。

fn juicyMain(io: Io) !void {
    var queue: Io.Queue([]const u8) = .init(&.{});

    var producer_task = io.async(producer, .{
        io, &queue, "never gonna give you up",
    });
    defer producer_task.cancel(io) catch {};

    var consumer_task = io.async(consumer, .{ io, &queue });
    defer _ = consumer_task.cancel(io) catch {};

    const result = try consumer_task.await(io);
    std.debug.print("message received: {s}\n", .{result});
}

fn producer(
    io: Io,
    queue: *Io.Queue([]const u8),
    flavor_text: []const u8,
) !void {
    try queue.putOne(io, flavor_text);
}

fn consumer(
    io: Io,
    queue: *Io.Queue([]const u8),
) ![]const u8 {
    return queue.getOne(io);
}

使用 async 生成生产者,并使用 async 生成消费者。

输出:

(成功接收消息)

这不正确地成功了。从角度来看,我们可能由于线程池有多余的并发可用而“幸运”或“不幸”。

为了观察问题,我们可以人为地限制 std.Io.Threaded 实例使用一个线程池大小:

示例 9

// 设置 I/O 实现。
    var threaded: std.Io.Threaded = .init(gpa);
    threaded.cpu_count = 1;
    defer threaded.deinit();
    const io = threaded.io();

    return juicyMain(io);
}

输出:(死锁)

现在它只使用一个线程,它会死锁,因为消费者正在等待从队列中获取东西,而生产者计划运行,但尚未运行。

问题在于我们需要并发,但我们要求的是异步。

示例 10

为了解决这个问题,我们使用 io.concurrent 而不是 io.async。这可能会因为 error.ConcurrencyUnavailable 而失败。

fn juicyMain(io: Io) !void {
    var queue: Io.Queue([]const u8) = .init(&.{});

    var producer_task = try io.concurrent(producer, .{
        io, &queue, "never gonna give you up",
    });
    defer producer_task.cancel(io) catch {};

    var consumer_task = try io.concurrent(consumer, .{ io, &queue });
    defer _ = consumer_task.cancel(io) catch {};

    const result = try consumer_task.await(io);
    std.debug.print("message received: {s}\n", .{result});
}

fn producer(
    io: Io,
    queue: *Io.Queue([]const u8),
    flavor_text: []const u8,
) !void {
    try queue.putOne(io, flavor_text);
}

fn consumer(
    io: Io,
    queue: *Io.Queue([]const u8),
) ![]const u8 {
    return queue.getOne(io);
}

输出:

(成功接收消息)

现在代码已修复,因为我们正确地表达了需要并发,std.Io.Threaded 通过过度订阅来满足了这一点。

如果添加 -fsingle-threaded 真正将可执行文件限制为一个线程,则过度订阅不可用,导致此输出:

(ConcurrencyUnavailable 错误)

结论

有一些使用 IoUring 和 KQueue 结合堆栈式协程的 std.Io 实现的概念验证,显示出很大的希望,但是该工作取决于一些语言增强才能实现。还有关于无堆栈协程的正在进行的设计工作。以下是一些相关的issue,供感兴趣的人跟踪:

  • Restricted Function Types
  • Builtin function to tell you the maximum stack size of a given function
  • Eliminate Stack Overflow
  • Stackless Coroutines
  • Juicy Main

这些 API 尚未最终确定。可能需要几次迭代才能正确。请在实际应用中尝试它们,并告诉我们效果如何!让我们合作,使 I/O 接口实用且最佳。

Zig's New Async I/O (Text Version) - Andrew Kelley

加入我们

Zig 中文社区是一个开放的组织,我们致力于推广 Zig 在中文群体中的使用,有多种方式可以参与进来:

  1. 供稿,分享自己使用 Zig 的心得
  2. 改进 ZigCC 组织下的开源项目
  3. 加入微信群Telegram 群组

Metadata

Metadata

Assignees

No one assigned

    Labels

    日报daily report

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions