From 5b71c4a9bd6fa5ffcf6b947471abcf5c188e5a34 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Wed, 15 Apr 2026 15:42:07 -0300 Subject: [PATCH 01/13] migrate websocket to Zig 0.16 --- build.zig | 7 + src/buffer.zig | 3 +- src/client/client.zig | 40 ++- src/compat.zig | 506 +++++++++++++++++++++++++++++++++++++ src/proto.zig | 9 +- src/server/handshake.zig | 5 +- src/server/server.zig | 191 +++++++------- src/server/thread_pool.zig | 28 +- src/t.zig | 43 +--- src/testing.zig | 3 +- test_runner.zig | 79 +++--- 11 files changed, 703 insertions(+), 211 deletions(-) create mode 100644 src/compat.zig diff --git a/build.zig b/build.zig index 03e06dc..44762fd 100644 --- a/build.zig +++ b/build.zig @@ -3,12 +3,18 @@ const std = @import("std"); pub fn build(b: *std.Build) !void { const target = b.standardTargetOptions(.{}); const optimize = b.standardOptimizeOption(.{}); + const compat_module = b.createModule(.{ + .target = target, + .optimize = optimize, + .root_source_file = b.path("src/compat.zig"), + }); const websocket_module = b.addModule("websocket", .{ .target = target, .optimize = optimize, .root_source_file = b.path("src/websocket.zig"), }); + websocket_module.addImport("compat", compat_module); { const options = b.addOptions(); @@ -23,6 +29,7 @@ pub fn build(b: *std.Build) !void { .test_runner = .{ .path = b.path("test_runner.zig"), .mode = .simple }, }); tests.root_module.link_libc = true; + tests.root_module.addImport("compat", compat_module); const force_blocking = b.option(bool, "force_blocking", "Force blocking mode") orelse false; const options = b.addOptions(); options.addOption(bool, "websocket_blocking", force_blocking); diff --git a/src/buffer.zig b/src/buffer.zig index d5dc77e..e5c1de2 100644 --- a/src/buffer.zig +++ b/src/buffer.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const sync = @import("compat").sync; const Allocator = std.mem.Allocator; @@ -215,7 +216,7 @@ pub const Pool = struct { available: usize, buffers: [][]u8, allocator: Allocator, - mutex: std.Thread.Mutex, + mutex: sync.Mutex, pub fn init(allocator: Allocator, count: usize, buffer_size: usize) !Pool { const buffers = try allocator.alloc([]u8, count); diff --git a/src/client/client.zig b/src/client/client.zig index 13ce82c..895ac9b 100644 --- a/src/client/client.zig +++ b/src/client/client.zig @@ -1,9 +1,10 @@ const std = @import("std"); const proto = @import("../proto.zig"); const buffer = @import("../buffer.zig"); +const std_compat = @import("compat"); const ascii = std.ascii; -const net = std.net; +const net = @import("compat").net; const posix = std.posix; const tls = std.crypto.tls; const log = std.log.scoped(.websocket); @@ -249,7 +250,7 @@ pub const Client = struct { } orelse { reader.fill(stream) catch |err| switch (err) { error.WouldBlock => return null, - error.Closed, error.ConnectionResetByPeer, error.BrokenPipe, error.NotOpenForReading => { + error.Closed, error.ConnectionResetByPeer, error.NotOpenForReading => { @atomicStore(bool, &self._closed, true, .monotonic); return error.Closed; }, @@ -416,7 +417,7 @@ pub const Stream = struct { } else if (native_os == .wasi and !builtin.link_libc) { _ = std.os.wasi.sock_shutdown(fd, .{ .WR = true, .RD = true }); } else { - std.posix.shutdown(fd, .both) catch {}; + _ = std.posix.system.shutdown(fd, std.posix.SHUT.RDWR); } tls_client.deinit(); } @@ -494,6 +495,9 @@ const TLSClient = struct { stream_writer: net.Stream.Writer, stream_reader: net.Stream.Reader, arena: std.heap.ArenaAllocator, + ca_bundle: Bundle = .empty, + ca_bundle_lock: std.Io.RwLock = .init, + has_ca_bundle: bool = false, fn init(allocator: Allocator, stream: net.Stream, config: *const Client.Config) !*TLSClient { var arena = std.heap.ArenaAllocator.init(allocator); @@ -502,8 +506,8 @@ const TLSClient = struct { const aa = arena.allocator(); const bundle = config.ca_bundle orelse blk: { - var b = Bundle{}; - try b.rescan(aa); + var b = Bundle.empty; + try b.rescan(aa, std_compat.io(), std.Io.Timestamp.now(std_compat.io(), .real)); break :blk b; }; @@ -523,16 +527,30 @@ const TLSClient = struct { .client = undefined, .stream_writer = stream.writer(buf.ptr[0..buf_len][0..buf_len]), .stream_reader = stream.reader(buf.ptr[buf_len .. 2 * buf_len][0..buf_len]), + .ca_bundle = .empty, + .ca_bundle_lock = .init, + .has_ca_bundle = false, }; + self.ca_bundle = bundle; + self.has_ca_bundle = true; + var entropy: [tls.Client.Options.entropy_len]u8 = undefined; + std_compat.crypto.random.bytes(&entropy); self.client = try tls.Client.init( - self.stream_reader.interface(), + &self.stream_reader.interface, &self.stream_writer.interface, .{ - .ca = .{ .bundle = bundle }, + .ca = .{ .bundle = .{ + .gpa = aa, + .io = std_compat.io(), + .lock = &self.ca_bundle_lock, + .bundle = &self.ca_bundle, + } }, .host = .{ .explicit = config.host }, .read_buffer = buf.ptr[2 * buf_len .. 3 * buf_len][0..buf_len], .write_buffer = buf.ptr[3 * buf_len .. 4 * buf_len][0..buf_len], + .entropy = &entropy, + .realtime_now = std.Io.Timestamp.now(std_compat.io(), .real), }, ); @@ -550,13 +568,13 @@ fn generateKey() [16]u8 { return [16]u8{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16 }; } var key: [16]u8 = undefined; - std.crypto.random.bytes(&key); + std_compat.crypto.random.bytes(&key); return key; } fn generateMask() [4]u8 { var m: [4]u8 = undefined; - std.crypto.random.bytes(&m); + std_compat.crypto.random.bytes(&m); return m; } @@ -619,7 +637,7 @@ const HandShakeReply = struct { fn read(buf: []u8, key: []const u8, opts: *const Client.HandshakeOpts, compression: bool, stream: anytype) !HandShakeReply { const timeout_ms = opts.timeout_ms; - const deadline = std.time.milliTimestamp() + timeout_ms; + const deadline = std_compat.time.milliTimestamp() + timeout_ms; try stream.readTimeout(timeout_ms); var pos: usize = 0; @@ -727,7 +745,7 @@ const HandShakeReply = struct { } } - if (std.time.milliTimestamp() > deadline) { + if (std_compat.time.milliTimestamp() > deadline) { return error.Timeout; } diff --git a/src/compat.zig b/src/compat.zig new file mode 100644 index 0000000..0bab495 --- /dev/null +++ b/src/compat.zig @@ -0,0 +1,506 @@ +const std = @import("std"); +const builtin = @import("builtin"); + +const Allocator = std.mem.Allocator; + +var fallback_threaded: std.Io.Threaded = .init_single_threaded; + +pub fn io() std.Io { + if (builtin.is_test) return std.testing.io; + return fallback_threaded.io(); +} + +pub fn currentEnviron() std.process.Environ { + return environ(); +} + +fn environ() std.process.Environ { + if (builtin.is_test) return std.testing.environ; + return switch (builtin.os.tag) { + .windows, .freestanding, .other => .{ .block = .global }, + .wasi, .emscripten => if (builtin.link_libc) blk: { + const c_environ = std.c.environ; + var env_count: usize = 0; + while (c_environ[env_count] != null) : (env_count += 1) {} + break :blk .{ .block = .{ .slice = c_environ[0..env_count :null] } }; + } else .{ .block = .global }, + else => blk: { + const c_environ = std.c.environ; + var env_count: usize = 0; + while (c_environ[env_count] != null) : (env_count += 1) {} + break :blk .{ .block = .{ .slice = c_environ[0..env_count :null] } }; + }, + }; +} + +pub const process = struct { + pub const GetEnvVarOwnedError = error{ + EnvironmentVariableNotFound, + } || Allocator.Error || error{ InvalidWtf8, Unexpected }; + + pub fn getEnvVarOwned(allocator: Allocator, name: []const u8) GetEnvVarOwnedError![]u8 { + return environ().getAlloc(allocator, name) catch |err| switch (err) { + error.EnvironmentVariableMissing => error.EnvironmentVariableNotFound, + else => |e| e, + }; + } +}; + +pub const time = struct { + fn nowNanoseconds() i128 { + return switch (builtin.os.tag) { + .windows => blk: { + const epoch_ns = std.time.epoch.windows * std.time.ns_per_s; + break :blk @as(i128, std.os.windows.ntdll.RtlGetSystemTimePrecise()) * 100 + epoch_ns; + }, + .wasi => blk: { + var ts: std.os.wasi.timestamp_t = undefined; + if (std.os.wasi.clock_time_get(.REALTIME, 1, &ts) == .SUCCESS) { + break :blk @intCast(ts); + } + break :blk 0; + }, + else => blk: { + var ts: std.posix.timespec = undefined; + switch (std.posix.errno(std.posix.system.clock_gettime(.REALTIME, &ts))) { + .SUCCESS => break :blk @as(i128, ts.sec) * std.time.ns_per_s + ts.nsec, + else => break :blk 0, + } + }, + }; + } + + pub fn timestamp() i64 { + return @intCast(@divTrunc(nowNanoseconds(), std.time.ns_per_s)); + } + + pub fn milliTimestamp() i64 { + return @intCast(@divTrunc(nowNanoseconds(), std.time.ns_per_ms)); + } + + pub fn nanoTimestamp() i128 { + return nowNanoseconds(); + } +}; + +pub const thread = struct { + pub fn sleep(nanoseconds: u64) void { + std.Io.sleep(io(), .fromNanoseconds(@intCast(nanoseconds)), .awake) catch {}; + } +}; + +pub const crypto = struct { + pub const random = struct { + pub fn bytes(buffer: []u8) void { + std.Io.randomSecure(io(), buffer) catch std.Io.random(io(), buffer); + } + }; +}; + +pub const net = struct { + const IoNet = std.Io.net; + const posix = std.posix; + + pub const has_unix_sockets = false; + + pub const Stream = struct { + handle: Handle, + + pub const Handle = IoNet.Socket.Handle; + pub const Reader = IoNet.Stream.Reader; + pub const Writer = IoNet.Stream.Writer; + pub const ReadError = posix.ReadError; + pub const WriteError = IoNet.Stream.Writer.Error; + + fn toInner(self: Stream) IoNet.Stream { + return .{ + .socket = .{ + .handle = self.handle, + .address = .{ .ip4 = .loopback(0) }, + }, + }; + } + + pub fn close(self: Stream) void { + self.toInner().close(io()); + } + + pub fn reader(self: Stream, buffer: []u8) Reader { + return self.toInner().reader(io(), buffer); + } + + pub fn writer(self: Stream, buffer: []u8) Writer { + return self.toInner().writer(io(), buffer); + } + + pub fn read(self: Stream, buffer: []u8) ReadError!usize { + return posix.read(self.handle, buffer); + } + + pub fn readAtLeast(self: Stream, buffer: []u8, len: usize) ReadError!usize { + std.debug.assert(len <= buffer.len); + var index: usize = 0; + while (index < len) { + const amt = try self.read(buffer[index..]); + if (amt == 0) break; + index += amt; + } + return index; + } + + pub fn write(self: Stream, bytes: []const u8) WriteError!usize { + var stream_writer = self.toInner().writer(io(), &[_]u8{}); + stream_writer.interface.writeAll(bytes) catch |err| switch (err) { + error.WriteFailed => return stream_writer.err orelse error.Unexpected, + }; + return bytes.len; + } + + pub fn writeAll(self: Stream, bytes: []const u8) WriteError!void { + var stream_writer = self.toInner().writer(io(), &[_]u8{}); + stream_writer.interface.writeAll(bytes) catch |err| switch (err) { + error.WriteFailed => return stream_writer.err orelse error.Unexpected, + }; + } + + pub fn shutdown(self: Stream, how: IoNet.ShutdownHow) IoNet.ShutdownError!void { + try self.toInner().shutdown(io(), how); + } + }; + + pub const Ip4Address = extern struct { + sa: posix.sockaddr.in, + + pub fn getPort(self: Ip4Address) u16 { + return std.mem.bigToNative(u16, self.sa.port); + } + + pub fn setPort(self: *Ip4Address, port: u16) void { + self.sa.port = std.mem.nativeToBig(u16, port); + } + + pub fn getOsSockLen(self: Ip4Address) posix.socklen_t { + _ = self; + return @sizeOf(posix.sockaddr.in); + } + }; + + pub const Ip6Address = extern struct { + sa: posix.sockaddr.in6, + + pub fn getPort(self: Ip6Address) u16 { + return std.mem.bigToNative(u16, self.sa.port); + } + + pub fn setPort(self: *Ip6Address, port: u16) void { + self.sa.port = std.mem.nativeToBig(u16, port); + } + + pub fn getOsSockLen(self: Ip6Address) posix.socklen_t { + _ = self; + return @sizeOf(posix.sockaddr.in6); + } + }; + + fn ip4FromCurrent(ip4: IoNet.Ip4Address) Ip4Address { + return .{ + .sa = .{ + .port = std.mem.nativeToBig(u16, ip4.port), + .addr = @as(*align(1) const u32, @ptrCast(&ip4.bytes)).*, + }, + }; + } + + fn ip4ToCurrent(ip4: Ip4Address) IoNet.Ip4Address { + return .{ + .bytes = @bitCast(ip4.sa.addr), + .port = ip4.getPort(), + }; + } + + fn ip6FromCurrent(ip6: IoNet.Ip6Address) Ip6Address { + return .{ + .sa = .{ + .port = std.mem.nativeToBig(u16, ip6.port), + .flowinfo = ip6.flow, + .addr = ip6.bytes, + .scope_id = ip6.interface.index, + }, + }; + } + + fn ip6ToCurrent(ip6: Ip6Address) IoNet.Ip6Address { + return .{ + .port = ip6.getPort(), + .bytes = ip6.sa.addr, + .flow = ip6.sa.flowinfo, + .interface = .{ .index = ip6.sa.scope_id }, + }; + } + + fn setNonBlocking(handle: IoNet.Socket.Handle, enabled: bool) !void { + if (comptime builtin.os.tag == .windows) return; + + const flags: u32 = blk: { + const rc = posix.system.fcntl(handle, posix.F.GETFL, @as(c_int, 0)); + switch (posix.errno(rc)) { + .SUCCESS => break :blk @intCast(rc), + else => |err| return posix.unexpectedErrno(err), + } + }; + + const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true })); + const next_flags = if (enabled) flags | nonblocking else flags & ~nonblocking; + switch (posix.errno(posix.system.fcntl(handle, posix.F.SETFL, next_flags))) { + .SUCCESS => {}, + else => |err| return posix.unexpectedErrno(err), + } + } + + pub const Address = extern union { + any: posix.sockaddr, + in: Ip4Address, + in6: Ip6Address, + + fn fromCurrent(addr: IoNet.IpAddress) Address { + return switch (addr) { + .ip4 => |ip4| .{ .in = ip4FromCurrent(ip4) }, + .ip6 => |ip6| .{ .in6 = ip6FromCurrent(ip6) }, + }; + } + + pub fn parseIp4(name: []const u8, port: u16) !Address { + return fromCurrent(try IoNet.IpAddress.parseIp4(name, port)); + } + + pub fn parseIp6(name: []const u8, port: u16) !Address { + return fromCurrent(try IoNet.IpAddress.parseIp6(name, port)); + } + + pub fn parseIp(name: []const u8, port: u16) !Address { + return fromCurrent(try IoNet.IpAddress.parse(name, port)); + } + + pub fn initUnix(_: []const u8) !Address { + return error.UnixSocketsNotSupported; + } + + pub fn resolveIp(name: []const u8, port: u16) !Address { + return fromCurrent(try IoNet.IpAddress.resolve(io(), name, port)); + } + + pub fn toCurrent(self: Address) IoNet.IpAddress { + return switch (self.any.family) { + posix.AF.INET => .{ .ip4 = ip4ToCurrent(self.in) }, + posix.AF.INET6 => .{ .ip6 = ip6ToCurrent(self.in6) }, + else => unreachable, + }; + } + + pub fn format(self: Address, writer: *std.Io.Writer) std.Io.Writer.Error!void { + try self.toCurrent().format(writer); + } + + pub fn getOsSockLen(self: Address) posix.socklen_t { + return switch (self.any.family) { + posix.AF.INET => self.in.getOsSockLen(), + posix.AF.INET6 => self.in6.getOsSockLen(), + else => @sizeOf(posix.sockaddr), + }; + } + + pub const ListenOptions = struct { + reuse_address: bool = false, + force_nonblocking: bool = false, + }; + + pub fn listen(self: Address, options: ListenOptions) !Server { + const current = self.toCurrent(); + const server = try current.listen(io(), .{ + .reuse_address = options.reuse_address, + .mode = .stream, + .protocol = .tcp, + }); + try setNonBlocking(server.socket.handle, options.force_nonblocking); + + return .{ + .listen_address = Address.fromCurrent(server.socket.address), + .stream = .{ .handle = server.socket.handle }, + }; + } + }; + + pub const Server = struct { + listen_address: Address, + stream: Stream, + + pub const Connection = struct { + stream: Stream, + address: Address, + }; + + pub fn deinit(self: *Server) void { + self.stream.close(); + self.* = undefined; + } + + pub const AcceptError = IoNet.Server.AcceptError; + + pub fn accept(self: *Server) AcceptError!Connection { + const accept_options: IoNet.Server.AcceptOptions = if (comptime IoNet.Server.AcceptOptions == void) {} else .{ .mode = .stream, .protocol = .tcp }; + var server: IoNet.Server = .{ + .socket = .{ + .handle = self.stream.handle, + .address = self.listen_address.toCurrent(), + }, + .options = accept_options, + }; + const stream = try server.accept(io()); + try setNonBlocking(stream.socket.handle, false); + return .{ + .stream = .{ .handle = stream.socket.handle }, + .address = self.listen_address, + }; + } + }; + + pub const AddressList = struct { + arena: std.heap.ArenaAllocator, + addrs: []Address, + canon_name: ?[]u8 = null, + + pub fn deinit(self: *AddressList) void { + var arena = self.arena; + arena.deinit(); + } + }; + + pub const GetAddressListError = Allocator.Error || error{ + TemporaryNameServerFailure, + NameServerFailure, + AddressFamilyNotSupported, + UnknownHostName, + ServiceUnavailable, + Unexpected, + SystemResources, + }; + + pub fn tcpConnectToAddress(address: Address) !Stream { + const stream = try address.toCurrent().connect(io(), .{ + .mode = .stream, + .protocol = .tcp, + }); + try setNonBlocking(stream.socket.handle, false); + return .{ .handle = stream.socket.handle }; + } + + pub fn tcpConnectToHost(allocator: Allocator, host: []const u8, port: u16) !Stream { + const addresses = try getAddressList(allocator, host, port); + defer addresses.deinit(); + if (addresses.addrs.len == 0) return error.UnknownHostName; + return tcpConnectToAddress(addresses.addrs[0]); + } + + pub fn getAddressList(gpa: Allocator, name: []const u8, port: u16) GetAddressListError!*AddressList { + const result = blk: { + var arena = std.heap.ArenaAllocator.init(gpa); + errdefer arena.deinit(); + + const list = try arena.allocator().create(AddressList); + list.* = .{ + .arena = arena, + .addrs = undefined, + .canon_name = null, + }; + break :blk list; + }; + errdefer result.deinit(); + + const arena = result.arena.allocator(); + + if (Address.resolveIp(name, port)) |addr| { + result.addrs = try arena.dupe(Address, &.{addr}); + return result; + } else |_| {} + + var name_buffer: [IoNet.HostName.max_len:0]u8 = undefined; + @memcpy(name_buffer[0..name.len], name); + name_buffer[name.len] = 0; + const name_c = name_buffer[0..name.len :0]; + + var port_buffer: [8]u8 = undefined; + const port_c = std.fmt.bufPrintZ(&port_buffer, "{d}", .{port}) catch unreachable; + + const hints: posix.addrinfo = .{ + .flags = .{ .CANONNAME = false, .NUMERICSERV = true }, + .family = posix.AF.UNSPEC, + .socktype = posix.SOCK.STREAM, + .protocol = posix.IPPROTO.TCP, + .canonname = null, + .addr = null, + .addrlen = 0, + .next = null, + }; + var res: ?*posix.addrinfo = null; + switch (posix.system.getaddrinfo(name_c.ptr, port_c.ptr, &hints, &res)) { + @as(posix.system.EAI, @enumFromInt(0)) => {}, + .ADDRFAMILY, .FAMILY => return error.AddressFamilyNotSupported, + .AGAIN => return error.TemporaryNameServerFailure, + .FAIL => return error.NameServerFailure, + .MEMORY => return error.SystemResources, + .NODATA, .NONAME => return error.UnknownHostName, + else => return error.Unexpected, + } + defer if (res) |some| posix.system.freeaddrinfo(some); + + var addrs: std.ArrayList(Address) = .empty; + defer addrs.deinit(arena); + + var it = res; + while (it) |info| : (it = info.next) { + const addr = info.addr orelse continue; + switch (addr.family) { + posix.AF.INET => try addrs.append(arena, .{ .in = .{ .sa = @as(*const posix.sockaddr.in, @ptrCast(@alignCast(addr))).* } }), + posix.AF.INET6 => try addrs.append(arena, .{ .in6 = .{ .sa = @as(*const posix.sockaddr.in6, @ptrCast(@alignCast(addr))).* } }), + else => {}, + } + } + + result.addrs = try addrs.toOwnedSlice(arena); + return result; + } +}; + +pub const sync = struct { + pub const Mutex = struct { + inner: std.Io.Mutex = .init, + + pub fn tryLock(self: *Mutex) bool { + return self.inner.tryLock(); + } + + pub fn lock(self: *Mutex) void { + self.inner.lockUncancelable(io()); + } + + pub fn unlock(self: *Mutex) void { + self.inner.unlock(io()); + } + }; + + pub const Condition = struct { + inner: std.Io.Condition = .init, + + pub fn wait(self: *Condition, mutex: *Mutex) void { + self.inner.waitUncancelable(io(), &mutex.inner); + } + + pub fn signal(self: *Condition) void { + self.inner.signal(io()); + } + + pub fn broadcast(self: *Condition) void { + self.inner.broadcast(io()); + } + }; +}; diff --git a/src/proto.zig b/src/proto.zig index 7eeff8d..47e327c 100644 --- a/src/proto.zig +++ b/src/proto.zig @@ -725,12 +725,9 @@ test "Reader: fuzz" { i = 0; while (true) { - reader.fill(&writer) catch |err| switch (err) { - error.Closed => { - try t.expectEqual(@as(u32, @intCast(i)), MESSAGE_TO_SEND); - break; - }, - else => return err, + reader.fill(&writer) catch { + try t.expectEqual(@as(u32, @intCast(i)), MESSAGE_TO_SEND); + break; }; while (true) { diff --git a/src/server/handshake.zig b/src/server/handshake.zig index 9bcebb1..22b7535 100644 --- a/src/server/handshake.zig +++ b/src/server/handshake.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const sync = @import("compat").sync; const posix = std.posix; const ascii = std.ascii; @@ -356,7 +357,7 @@ pub const Handshake = struct { }; pub const Pool = struct { - mutex: std.Thread.Mutex, + mutex: sync.Mutex, available: usize, allocator: Allocator, buffer_size: usize, @@ -660,7 +661,7 @@ fn testPool(p: *Pool) void { var hs = p.acquire() catch unreachable; std.debug.assert(hs.buf[0] == 0); hs.buf[0] = 255; - std.Thread.sleep(random.uintAtMost(u32, 100000)); + @import("compat").thread.sleep(random.uintAtMost(u32, 100000)); hs.buf[0] = 0; p.release(hs); } diff --git a/src/server/server.zig b/src/server/server.zig index 4805811..51af892 100644 --- a/src/server/server.zig +++ b/src/server/server.zig @@ -1,9 +1,10 @@ const std = @import("std"); const builtin = @import("builtin"); +const sync = @import("compat").sync; const proto = @import("../proto.zig"); const buffer = @import("../buffer.zig"); -const net = std.net; +const net = @import("compat").net; const posix = std.posix; const Thread = std.Thread; const Allocator = std.mem.Allocator; @@ -96,8 +97,8 @@ pub fn Server(comptime H: type) type { _state: WorkerState, _signals: []posix.fd_t, - _mut: Thread.Mutex, - _cond: Thread.Condition, + _mut: sync.Mutex, + _cond: sync.Condition, const Self = @This(); @@ -155,13 +156,11 @@ pub fn Server(comptime H: type) type { const config = &self.config; - var no_delay = true; const address = blk: { if (config.unix_path) |unix_path| { - if (comptime std.net.has_unix_sockets == false) { + if (comptime net.has_unix_sockets == false) { return error.UnixPathNotSupported; } - no_delay = false; std.fs.deleteFileAbsolute(unix_path) catch {}; break :blk try net.Address.initUnix(unix_path); } else { @@ -172,40 +171,17 @@ pub fn Server(comptime H: type) type { }; const socket = blk: { - var sock_flags: u32 = posix.SOCK.STREAM | posix.SOCK.CLOEXEC; - if (blockingMode() == false) sock_flags |= posix.SOCK.NONBLOCK; - - const socket_proto = if (address.any.family == posix.AF.UNIX) @as(u32, 0) else posix.IPPROTO.TCP; - break :blk try posix.socket(address.any.family, sock_flags, socket_proto); + const server = try address.listen(.{ + .reuse_address = true, + .force_nonblocking = !blockingMode(), + }); + break :blk server.stream.handle; }; - if (no_delay) { - // TODO: Broken on darwin: - // https://github.com/ziglang/zig/issues/17260 - // if (@hasDecl(os.TCP, "NODELAY")) { - // try os.setsockopt(socket.sockfd.?, os.IPPROTO.TCP, os.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1))); - // } - try posix.setsockopt(socket, posix.IPPROTO.TCP, 1, &std.mem.toBytes(@as(c_int, 1))); - } - - if (@hasDecl(posix.SO, "REUSEPORT_LB")) { - try posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.REUSEPORT_LB, &std.mem.toBytes(@as(c_int, 1))); - } else if (@hasDecl(posix.SO, "REUSEPORT")) { - try posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.REUSEPORT, &std.mem.toBytes(@as(c_int, 1))); - } else { - try posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); - } - - { - const socklen = address.getOsSockLen(); - try posix.bind(socket, &address.any, socklen); - try posix.listen(socket, 1024); // kernel backlog - } - const C = @TypeOf(ctx); if (comptime blockingMode()) { - errdefer posix.close(socket); + errdefer _ = posix.system.close(socket); var w = try Blocking(H).init(self.allocator, &self._state); defer w.deinit(); @@ -220,7 +196,7 @@ pub fn Server(comptime H: type) type { self._mut.unlock(); thrd.join(); } else { - defer posix.close(socket); + defer _ = posix.system.close(socket); const W = NonBlocking(H, C); const allocator = self.allocator; @@ -234,7 +210,7 @@ pub fn Server(comptime H: type) type { errdefer for (0..started) |i| { // on success, these will be closed by a call to stop(); - posix.close(signals[i]); + _ = posix.system.close(signals[i]); }; defer { @@ -246,8 +222,8 @@ pub fn Server(comptime H: type) type { } for (0..worker_count) |i| { - const pipe = try posix.pipe2(.{ .NONBLOCK = true }); - errdefer posix.close(pipe[1]); + const pipe = try std.Io.Threaded.pipe2(.{ .NONBLOCK = true }); + errdefer _ = posix.system.close(pipe[1]); workers[i] = try W.init(self.allocator, &self._state, ctx); errdefer workers[i].deinit(); @@ -280,9 +256,9 @@ pub fn Server(comptime H: type) type { // necessary to unblock accept on linux // (which might not be that necessary since, on Linux, // NonBlocking should be used) - posix.shutdown(s, .recv) catch {}; + _ = posix.system.shutdown(s, posix.SHUT.RD); } - posix.close(s); + _ = posix.system.close(s); } self._cond.wait(&self._mut); } @@ -354,7 +330,7 @@ pub fn Blocking(comptime H: type) type { log.debug("({f}) connected", .{address}); const thread = std.Thread.spawn(.{}, Self.handleConnection, .{ self, socket, address, ctx }) catch |err| { - posix.close(socket); + _ = posix.system.close(socket); log.err("({f}) failed to spawn connection thread: {}", .{ address, err }); continue; }; @@ -441,13 +417,13 @@ pub fn Blocking(comptime H: type) type { if (conn_manager.count() == 0) { return; } - std.Thread.sleep(std.time.ns_per_ms * 100); + @import("compat").thread.sleep(std.time.ns_per_ms * 100); } } // called for each hc when shutting down fn shutdownCleanup(_: *Self, hc: *HandlerConn(H)) void { - posix.shutdown(hc.socket, .recv) catch {}; + _ = posix.system.shutdown(hc.socket, posix.SHUT.RD); } }; } @@ -531,7 +507,7 @@ fn NonBlocking(comptime H: type, comptime C: type) type { var it = self.loop.wait(timeout) catch |err| { log.err("failed to wait on events: {}", .{err}); - std.Thread.sleep(std.time.ns_per_s); + @import("compat").thread.sleep(std.time.ns_per_s); continue; }; @@ -541,7 +517,7 @@ fn NonBlocking(comptime H: type, comptime C: type) type { if (data == 0) { self.accept(listener, now) catch |err| { log.err("accept error: {}", .{err}); - std.Thread.sleep(std.time.ns_per_ms); + @import("compat").thread.sleep(std.time.ns_per_ms); }; continue; } @@ -610,25 +586,45 @@ fn NonBlocking(comptime H: type, comptime C: type) type { var address: net.Address = undefined; var address_len: posix.socklen_t = @sizeOf(net.Address); - const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.CLOEXEC) catch |err| { - // When available, we use SO_REUSEPORT_LB or SO_REUSEPORT, so WouldBlock - // should not be possible in those cases, but if it isn't available - // this error should be ignored as it means another thread picked it up. - return if (err == error.WouldBlock) {} else err; + const socket = while (true) { + const rc = posix.system.accept(listener, &address.any, &address_len); + switch (posix.errno(rc)) { + .SUCCESS => { + const fd: posix.fd_t = @intCast(rc); + errdefer _ = posix.system.close(fd); + switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(u32, posix.FD_CLOEXEC)))) { + .SUCCESS => {}, + else => |err| return posix.unexpectedErrno(err), + } + break fd; + }, + .INTR => continue, + .AGAIN => return, + else => |err| return posix.unexpectedErrno(err), + } }; log.debug("({f}) connected", .{address}); { - errdefer posix.close(socket); + errdefer _ = posix.system.close(socket); // socket is _probably_ in NONBLOCKING mode (it inherits // the flag from the listening socket). - const flags = try posix.fcntl(socket, posix.F.GETFL, 0); + const flags: u32 = blk: { + const rc = posix.system.fcntl(socket, posix.F.GETFL, @as(c_int, 0)); + switch (posix.errno(rc)) { + .SUCCESS => break :blk @intCast(rc), + else => |err| return posix.unexpectedErrno(err), + } + }; const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true })); if (flags & nonblocking == nonblocking) { // Yup, it's in nonblocking mode. Disable that flag to // put it in blocking mode. - _ = try posix.fcntl(socket, posix.F.SETFL, flags & ~nonblocking); + switch (posix.errno(posix.system.fcntl(socket, posix.F.SETFL, flags & ~nonblocking))) { + .SUCCESS => {}, + else => |err| return posix.unexpectedErrno(err), + } } } const hc = try self.base.newConn(socket, address, now); @@ -833,7 +829,7 @@ const KQueue = struct { fn init() !KQueue { return .{ - .q = try posix.kqueue(), + .q = try std.Io.Kqueue.createFileDescriptor(), .change_count = 0, .change_buffer = undefined, .event_list = undefined, @@ -841,7 +837,7 @@ const KQueue = struct { } fn deinit(self: KQueue) void { - posix.close(self.q); + _ = posix.system.close(self.q); } fn monitorAccept(self: *KQueue, fd: c_int) !void { @@ -871,7 +867,7 @@ const KQueue = struct { .data = 0, .udata = @intFromPtr(hc), }; - _ = try posix.kevent(self.q, &.{event}, &[_]Kevent{}, null); + _ = try std.Io.Kqueue.kevent(self.q, &.{event}, &[_]Kevent{}, null); } fn change(self: *KQueue, fd: posix.fd_t, data: usize, filter: i16, flags: u16) !void { @@ -880,7 +876,7 @@ const KQueue = struct { if (change_count == change_buffer.len) { // calling this with an empty event_list will return immediate - _ = try posix.kevent(self.q, change_buffer, &[_]Kevent{}, null); + _ = try std.Io.Kqueue.kevent(self.q, change_buffer, &[_]Kevent{}, null); change_count = 0; } change_buffer[change_count] = .{ @@ -897,7 +893,7 @@ const KQueue = struct { fn wait(self: *KQueue, timeout_sec: ?i32) !Iterator { const event_list = &self.event_list; const timeout: ?posix.timespec = if (timeout_sec) |ts| posix.timespec{ .sec = ts, .nsec = 0 } else null; - const event_count = try posix.kevent(self.q, self.change_buffer[0..self.change_count], event_list, if (timeout) |ts| &ts else null); + const event_count = try std.Io.Kqueue.kevent(self.q, self.change_buffer[0..self.change_count], event_list, if (timeout) |ts| &ts else null); self.change_count = 0; return .{ @@ -937,7 +933,7 @@ const EPoll = struct { } fn deinit(self: EPoll) void { - posix.close(self.q); + _ = posix.system.close(self.q); } fn monitorAccept(self: *EPoll, fd: c_int) !void { @@ -1102,7 +1098,7 @@ pub fn HandlerConn(comptime H: type) type { reader: ?Reader, socket: posix.socket_t, // denormalization from conn.stream.handle handshake: ?*Handshake.State, - cleanup: Thread.Mutex = .{}, + cleanup: sync.Mutex = .{}, compression: ?Compression = null, next: ?*HandlerConn(H) = null, prev: ?*HandlerConn(H) = null, @@ -1116,21 +1112,21 @@ pub fn HandlerConn(comptime H: type) type { pub fn ConnManager(comptime H: type, comptime MANAGE_HS: bool) type { return struct { - lock: Thread.Mutex, + lock: sync.Mutex, allocator: Allocator, active: List(HandlerConn(H)), pending: List(HandlerConn(H)), - pool: std.heap.MemoryPool(HandlerConn(H)), + pool: std.heap.memory_pool.Managed(HandlerConn(H)), compression: ?Compression, - compression_pool: std.heap.MemoryPool(Conn.Compression), + compression_pool: std.heap.memory_pool.Managed(Conn.Compression), const Self = @This(); pub fn init(allocator: Allocator, compression: ?Compression) !Self { - var pool = std.heap.MemoryPool(HandlerConn(H)).init(allocator); + var pool = std.heap.memory_pool.Managed(HandlerConn(H)).init(allocator); errdefer pool.deinit(); - var compression_pool = std.heap.MemoryPool(Conn.Compression).init(allocator); + var compression_pool = std.heap.memory_pool.Managed(Conn.Compression).init(allocator); errdefer compression_pool.deinit(); return .{ @@ -1159,7 +1155,7 @@ pub fn ConnManager(comptime H: type, comptime MANAGE_HS: bool) type { } pub fn create(self: *Self, socket: posix.socket_t, address: net.Address, now: u32) !*HandlerConn(H) { - errdefer posix.close(socket); + errdefer _ = posix.system.close(socket); self.lock.lock(); defer self.lock.unlock(); @@ -1318,7 +1314,7 @@ pub const Conn = struct { started: u32, stream: net.Stream, address: net.Address, - lock: Thread.Mutex = .{}, + lock: sync.Mutex = .{}, compression: ?*Conn.Compression = null, const Compression = struct { @@ -1449,28 +1445,13 @@ pub const Conn = struct { self.lock.lock(); defer self.lock.unlock(); - if (comptime builtin.os.tag == .windows) { - for (vec) |iov| { - var written: usize = 0; - while (written < iov.len) { - const n = try socketWrite(socket, iov.base[written..iov.len]); - if (n == 0) return error.Closed; - written += n; - } - } - return; - } - - var i: usize = 0; - while (true) { - var n = try std.posix.writev(socket, vec[i..]); - while (n >= vec[i].len) { - n -= vec[i].len; - i += 1; - if (i >= vec.len) return; + for (vec) |iov| { + var written: usize = 0; + while (written < iov.len) { + const n = try socketWrite(socket, iov.base[written..iov.len]); + if (n == 0) return error.Closed; + written += n; } - vec[i].base += n; - vec[i].len -= n; } } @@ -1489,7 +1470,7 @@ pub const Conn = struct { fn closeSocket(self: *Conn) void { if (@atomicRmw(bool, &self._closed, .Xchg, true, .monotonic) == false) { - posix.close(self.stream.handle); + _ = posix.system.close(self.stream.handle); } } @@ -1556,7 +1537,7 @@ fn _handleHandshake(comptime H: type, worker: anytype, hc: *HandlerConn(H), ctx: } } else { switch (err) { - error.BrokenPipe, error.ConnectionResetByPeer => log.debug("({f}) handshake connection closed: {}", .{ conn.address, err }), + error.ConnectionResetByPeer => log.debug("({f}) handshake connection closed: {}", .{ conn.address, err }), error.WouldBlock => { std.debug.assert(blockingMode()); log.debug("({f}) handshake timeout", .{conn.address}); @@ -1631,7 +1612,7 @@ fn _handleClientData(comptime H: type, hc: *HandlerConn(H), allocator: Allocator var reader = &hc.reader.?; reader.fill(conn.stream) catch |err| { switch (err) { - error.BrokenPipe, error.Closed, error.ConnectionResetByPeer => log.debug("({f}) connection closed: {}", .{ conn.address, err }), + error.Closed, error.ConnectionResetByPeer => log.debug("({f}) connection closed: {}", .{ conn.address, err }), else => log.warn("({f}) error reading from connection: {}", .{ conn.address, err }), } return false; @@ -1826,9 +1807,17 @@ fn socketRead(socket: posix.socket_t, buf: []u8) !usize { fn socketWrite(socket: posix.socket_t, buf: []const u8) !usize { if (comptime builtin.os.tag == .windows) { - return posix.send(socket, buf, 0); + return error.WouldBlock; + } + while (true) { + const rc = posix.system.write(socket, buf.ptr, buf.len); + switch (posix.errno(rc)) { + .SUCCESS => return @intCast(rc), + .INTR => continue, + .AGAIN => return error.WouldBlock, + else => return error.SystemResources, + } } - return posix.write(socket, buf); } fn shouldClearReceiveTimeout(os_tag: std.Target.Os.Tag) bool { @@ -1839,11 +1828,7 @@ fn shouldClearReceiveTimeout(os_tag: std.Target.Os.Tag) bool { } fn timestamp() u32 { - if (comptime @hasDecl(posix, "CLOCK") == false or posix.CLOCK == void) { - return @intCast(std.time.timestamp()); - } - const ts = posix.clock_gettime(posix.CLOCK.REALTIME) catch unreachable; - return @intCast(ts.sec); + return @intCast(@import("compat").time.timestamp()); } // intrusive doubly-linked list with count, not thread safe @@ -1891,7 +1876,7 @@ const t = @import("../t.zig"); var test_thread: Thread = undefined; var test_server: Server(TestHandler) = undefined; -var global_test_allocator = std.heap.GeneralPurposeAllocator(.{}){}; +var global_test_allocator: std.heap.DebugAllocator(.{}) = .init; test "tests:beforeAll" { test_server = try Server(TestHandler).init(global_test_allocator.allocator(), .{ @@ -1905,7 +1890,7 @@ test "tests:afterAll" { test_server.stop(); test_thread.join(); test_server.deinit(); - try t.expectEqual(false, global_test_allocator.detectLeaks()); + try t.expectEqual(std.heap.Check.ok, global_test_allocator.deinit()); } test "Server: invalid handshake" { @@ -2009,8 +1994,8 @@ test "shouldClearReceiveTimeout skips Windows" { fn testStream(handshake: bool) !net.Stream { const timeout = std.mem.toBytes(std.posix.timeval{ .sec = 0, .usec = 20_000 }); - const address = try std.net.Address.parseIp("127.0.0.1", 9292); - const stream = try std.net.tcpConnectToAddress(address); + const address = try net.Address.parseIp("127.0.0.1", 9292); + const stream = try net.tcpConnectToAddress(address); try std.posix.setsockopt(stream.handle, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, &timeout); try std.posix.setsockopt(stream.handle, std.posix.SOL.SOCKET, std.posix.SO.SNDTIMEO, &timeout); diff --git a/src/server/thread_pool.zig b/src/server/thread_pool.zig index 1f7d72b..6618865 100644 --- a/src/server/thread_pool.zig +++ b/src/server/thread_pool.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const sync = @import("compat").sync; const Thread = std.Thread; const Allocator = std.mem.Allocator; @@ -27,17 +28,12 @@ pub fn ThreadPool(comptime F: anytype) type { // []u8. But this ThreadPool is private and being used for 2 specific cases // that we control. - var fields: [ARG_COUNT]std.builtin.Type.StructField = undefined; - inline for (full_fields[0..ARG_COUNT], 0..) |field, index| fields[index] = field; + var arg_types: [ARG_COUNT]type = undefined; + inline for (full_fields[0..ARG_COUNT], 0..) |field, index| { + arg_types[index] = field.type; + } - const Args = comptime @Type(.{ - .@"struct" = .{ - .layout = .auto, - .is_tuple = true, - .fields = &fields, - .decls = &.{}, - }, - }); + const Args = std.meta.Tuple(&arg_types); return struct { stopped: bool, @@ -46,9 +42,9 @@ pub fn ThreadPool(comptime F: anytype) type { pending: usize, queue: []Args, threads: []Thread, - mutex: Thread.Mutex, - pull_cond: Thread.Condition, - push_cond: Thread.Condition, + mutex: sync.Mutex, + pull_cond: sync.Condition, + push_cond: sync.Condition, queue_end: usize, allocator: Allocator, @@ -195,7 +191,7 @@ test "ThreadPool: small fuzz" { tp.spawn(.{1}); } while (tp.empty() == false) { - std.Thread.sleep(std.time.ns_per_ms); + @import("compat").thread.sleep(std.time.ns_per_ms); } tp.deinit(); try t.expectEqual(50_000, testSum); @@ -209,7 +205,7 @@ test "ThreadPool: large fuzz" { tp.spawn(.{1}); } while (tp.empty() == false) { - std.Thread.sleep(std.time.ns_per_ms); + @import("compat").thread.sleep(std.time.ns_per_ms); } tp.deinit(); try t.expectEqual(50_000, testSum); @@ -220,5 +216,5 @@ fn testIncr(c: u64, buf: []u8) void { std.debug.assert(buf.len == 512); _ = @atomicRmw(u64, &testSum, .Add, c, .monotonic); // let the threadpool queue get backed up - std.Thread.sleep(std.time.ns_per_us * 100); + @import("compat").thread.sleep(std.time.ns_per_us * 100); } diff --git a/src/t.zig b/src/t.zig index ebac23c..0084940 100644 --- a/src/t.zig +++ b/src/t.zig @@ -1,8 +1,10 @@ const std = @import("std"); const proto = @import("proto.zig"); +const compat = @import("compat"); const posix = std.posix; const ArrayList = std.ArrayList; +const net = compat.net; const Message = proto.Message; @@ -18,7 +20,7 @@ pub const expectSlice = std.testing.expectEqualSlices; pub fn getRandom() std.Random.DefaultPrng { var seed: u64 = undefined; - std.posix.getrandom(std.mem.asBytes(&seed)) catch unreachable; + compat.crypto.random.bytes(std.mem.asBytes(&seed)); return std.Random.DefaultPrng.init(seed); } @@ -145,44 +147,25 @@ pub const Writer = struct { pub const SocketPair = struct { writer: Writer, - client: std.net.Stream, - server: std.net.Stream, + client: net.Stream, + server: net.Stream, const Opts = struct { port: ?u16 = null, }; pub fn init(opts: Opts) SocketPair { - var address = std.net.Address.parseIp("127.0.0.1", opts.port orelse 0) catch unreachable; - var address_len = address.getOsSockLen(); + var address = net.Address.parseIp("127.0.0.1", opts.port orelse 0) catch unreachable; + var listener = address.listen(.{ .reuse_address = true }) catch unreachable; + defer listener.deinit(); - const listener = posix.socket(address.any.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, posix.IPPROTO.TCP) catch unreachable; - defer posix.close(listener); - - { - // setup our listener - posix.bind(listener, &address.any, address_len) catch unreachable; - posix.listen(listener, 1) catch unreachable; - posix.getsockname(listener, &address.any, &address_len) catch unreachable; - } - - const client = posix.socket(address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP) catch unreachable; - { - // connect the client - const flags = posix.fcntl(client, posix.F.GETFL, 0) catch unreachable; - _ = posix.fcntl(client, posix.F.SETFL, flags | posix.SOCK.NONBLOCK) catch unreachable; - posix.connect(client, &address.any, address_len) catch |err| switch (err) { - error.WouldBlock => {}, - else => unreachable, - }; - _ = posix.fcntl(client, posix.F.SETFL, flags) catch unreachable; - } - - const server = posix.accept(listener, &address.any, &address_len, posix.SOCK.CLOEXEC) catch unreachable; + address = listener.listen_address; + const client = net.tcpConnectToAddress(address) catch unreachable; + const server = listener.accept() catch unreachable; return .{ - .client = .{ .handle = client }, - .server = .{ .handle = server }, + .client = client, + .server = server.stream, .writer = Writer.init(), }; } diff --git a/src/testing.zig b/src/testing.zig index 9555e8d..a29635a 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -1,6 +1,7 @@ const std = @import("std"); const t = @import("t.zig"); const ws = @import("websocket.zig"); +const net = @import("compat").net; pub fn init() Testing { return Testing.init(); @@ -53,7 +54,7 @@ pub const Testing = struct { ._closed = false, .started = 0, .stream = pair.server, - .address = std.net.Address.parseIp("127.0.0.1", port) catch unreachable, + .address = net.Address.parseIp("127.0.0.1", port) catch unreachable, }, .reader = reader, .received = std.ArrayList(ws.Message).init(aa), diff --git a/test_runner.zig b/test_runner.zig index d9c5e62..cdaed84 100644 --- a/test_runner.zig +++ b/test_runner.zig @@ -10,6 +10,7 @@ pub const std_options = std.Options{ .log_scope_levels = &[_]std.log.ScopeLevel{ const std = @import("std"); const builtin = @import("builtin"); +const compat = @import("compat"); const Allocator = std.mem.Allocator; @@ -34,6 +35,7 @@ pub fn main() !void { var fail: usize = 0; var skip: usize = 0; var leak: usize = 0; + const testing_environ = compat.currentEnviron(); Printer.fmt("\r\x1b[0K", .{}); // beginning of line and clear to end of line @@ -74,12 +76,17 @@ pub fn main() !void { }; current_test = friendly_name; + std.testing.environ = testing_environ; std.testing.allocator_instance = .{}; + std.testing.io_instance = .init(std.testing.allocator, .{ + .environ = testing_environ, + }); const result = t.func(); current_test = null; const ns_taken = slowest.endTiming(friendly_name); + std.testing.io_instance.deinit(); if (std.testing.allocator_instance.deinit() == .leak) { leak += 1; Printer.status(.fail, "\n{s}\n\"{s}\" - Memory Leak\n{s}\n", .{ BORDER, friendly_name, BORDER }); @@ -97,7 +104,7 @@ pub fn main() !void { fail += 1; Printer.status(.fail, "\n{s}\n\"{s}\" - {s}\n{s}\n", .{ BORDER, friendly_name, @errorName(err), BORDER }); if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); + std.debug.dumpErrorReturnTrace(trace); } if (env.fail_first) { break; @@ -134,7 +141,7 @@ pub fn main() !void { Printer.fmt("\n", .{}); try slowest.display(); Printer.fmt("\n", .{}); - std.posix.exit(if (fail == 0) 0 else 1); + std.process.exit(if (fail == 0) 0 else 1); } const Printer = struct { @@ -161,19 +168,17 @@ const Status = enum { }; const SlowTracker = struct { - const SlowestQueue = std.PriorityDequeue(TestInfo, void, compareTiming); + allocator: Allocator, max: usize, - slowest: SlowestQueue, - timer: std.time.Timer, + slowest: std.ArrayList(TestInfo), + started_ns: i128, fn init(allocator: Allocator, count: u32) SlowTracker { - const timer = std.time.Timer.start() catch @panic("failed to start timer"); - var slowest = SlowestQueue.init(allocator, {}); - slowest.ensureTotalCapacity(count) catch @panic("OOM"); return .{ + .allocator = allocator, .max = count, - .timer = timer, - .slowest = slowest, + .started_ns = 0, + .slowest = .empty, }; } @@ -182,57 +187,49 @@ const SlowTracker = struct { name: []const u8, }; - fn deinit(self: SlowTracker) void { - self.slowest.deinit(); + fn deinit(self: *SlowTracker) void { + self.slowest.deinit(self.allocator); } fn startTiming(self: *SlowTracker) void { - self.timer.reset(); + self.started_ns = compat.time.nanoTimestamp(); } fn endTiming(self: *SlowTracker, test_name: []const u8) u64 { - var timer = self.timer; - const ns = timer.lap(); - - var slowest = &self.slowest; - - if (slowest.count() < self.max) { - // Capacity is fixed to the # of slow tests we want to track - // If we've tracked fewer tests than this capacity, than always add - slowest.add(TestInfo{ .ns = ns, .name = test_name }) catch @panic("failed to track test timing"); + const ended_ns = compat.time.nanoTimestamp(); + const ns: u64 = if (ended_ns > self.started_ns) + @intCast(ended_ns - self.started_ns) + else + 0; + + if (self.slowest.items.len < self.max) { + self.slowest.append(self.allocator, .{ .ns = ns, .name = test_name }) catch @panic("failed to track test timing"); return ns; } - { - // Optimization to avoid shifting the dequeue for the common case - // where the test isn't one of our slowest. - const fastest_of_the_slow = slowest.peekMin() orelse unreachable; - if (fastest_of_the_slow.ns > ns) { - // the test was faster than our fastest slow test, don't add - return ns; + var fastest_index: usize = 0; + for (self.slowest.items[1..], 1..) |info, index| { + if (info.ns < self.slowest.items[fastest_index].ns) { + fastest_index = index; } } - // the previous fastest of our slow tests, has been pushed off. - _ = slowest.removeMin(); - slowest.add(TestInfo{ .ns = ns, .name = test_name }) catch @panic("failed to track test timing"); + if (self.slowest.items[fastest_index].ns > ns) { + return ns; + } + + self.slowest.items[fastest_index] = .{ .ns = ns, .name = test_name }; return ns; } fn display(self: *SlowTracker) !void { - var slowest = self.slowest; - const count = slowest.count(); + const count = self.slowest.items.len; Printer.fmt("Slowest {d} test{s}: \n", .{ count, if (count != 1) "s" else "" }); - while (slowest.removeMinOrNull()) |info| { + for (self.slowest.items) |info| { const ms = @as(f64, @floatFromInt(info.ns)) / 1_000_000.0; Printer.fmt(" {d:.2}ms\t{s}\n", .{ ms, info.name }); } } - - fn compareTiming(context: void, a: TestInfo, b: TestInfo) std.math.Order { - _ = context; - return std.math.order(a.ns, b.ns); - } }; const Env = struct { @@ -255,7 +252,7 @@ const Env = struct { } fn readEnv(allocator: Allocator, key: []const u8) ?[]const u8 { - const v = std.process.getEnvVarOwned(allocator, key) catch |err| { + const v = compat.process.getEnvVarOwned(allocator, key) catch |err| { if (err == error.EnvironmentVariableNotFound) { return null; } From a234b546b78ddd26271429e0ef3c1638becb083c Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Wed, 15 Apr 2026 15:47:04 -0300 Subject: [PATCH 02/13] fix Linux epoll for Zig 0.16 --- src/server/server.zig | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/src/server/server.zig b/src/server/server.zig index 51af892..b9fb7c9 100644 --- a/src/server/server.zig +++ b/src/server/server.zig @@ -926,9 +926,14 @@ const EPoll = struct { const EpollEvent = linux.epoll_event; fn init() !EPoll { + const rc = linux.epoll_create1(0); + const q = switch (posix.errno(rc)) { + .SUCCESS => @as(i32, @intCast(rc)), + else => |err| return posix.unexpectedErrno(err), + }; return .{ .event_list = undefined, - .q = try posix.epoll_create1(0), + .q = q, }; } @@ -938,18 +943,18 @@ const EPoll = struct { fn monitorAccept(self: *EPoll, fd: c_int) !void { var event = linux.epoll_event{ .events = linux.EPOLL.IN, .data = .{ .ptr = 0 } }; - return std.posix.epoll_ctl(self.q, linux.EPOLL.CTL_ADD, fd, &event); + return epollCtl(self.q, linux.EPOLL.CTL_ADD, fd, &event); } fn monitorSignal(self: *EPoll, fd: c_int) !void { var event = linux.epoll_event{ .events = linux.EPOLL.IN, .data = .{ .ptr = 1 } }; - return std.posix.epoll_ctl(self.q, linux.EPOLL.CTL_ADD, fd, &event); + return epollCtl(self.q, linux.EPOLL.CTL_ADD, fd, &event); } fn monitorRead(self: *EPoll, hc: anytype, comptime rearm: bool) !void { const op = if (rearm) linux.EPOLL.CTL_MOD else linux.EPOLL.CTL_ADD; var event = linux.epoll_event{ .events = linux.EPOLL.IN | linux.EPOLL.ONESHOT, .data = .{ .ptr = @intFromPtr(hc) } }; - return posix.epoll_ctl(self.q, op, hc.socket, &event); + return epollCtl(self.q, op, hc.socket, &event); } fn wait(self: *EPoll, timeout_sec: ?i32) !Iterator { @@ -964,13 +969,31 @@ const EPoll = struct { } } - const event_count = posix.epoll_wait(self.q, event_list, timeout); + const event_count = try epollWait(self.q, event_list, timeout); return .{ .index = 0, .events = event_list[0..event_count], }; } + fn epollCtl(epoll_fd: i32, op: u32, fd: i32, event: *linux.epoll_event) !void { + switch (posix.errno(linux.epoll_ctl(epoll_fd, op, fd, event))) { + .SUCCESS => {}, + else => |err| return posix.unexpectedErrno(err), + } + } + + fn epollWait(epoll_fd: i32, event_list: []linux.epoll_event, timeout: i32) !usize { + while (true) { + const rc = linux.epoll_wait(epoll_fd, event_list.ptr, @intCast(event_list.len), timeout); + switch (posix.errno(rc)) { + .SUCCESS => return @intCast(rc), + .INTR => continue, + else => |err| return posix.unexpectedErrno(err), + } + } + } + const Iterator = struct { index: usize, events: []EpollEvent, From d7f25c6a2bb7523f097b1d68f6bedf6f27007329 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Wed, 15 Apr 2026 16:02:02 -0300 Subject: [PATCH 03/13] fix Windows compatibility for websocket tests --- src/server/server.zig | 25 +++++++++++++++++-------- src/testing.zig | 6 ++++-- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/src/server/server.zig b/src/server/server.zig index b9fb7c9..8902a7f 100644 --- a/src/server/server.zig +++ b/src/server/server.zig @@ -319,13 +319,20 @@ pub fn Blocking(comptime H: type) type { while (true) { var address: net.Address = undefined; var address_len: posix.socklen_t = @sizeOf(net.Address); - const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.CLOEXEC) catch |err| { - if (err == error.ConnectionAborted or err == error.SocketNotListening) { - log.info("received shutdown signal", .{}); - return; + const socket = while (true) { + const rc = posix.system.accept(listener, &address.any, &address_len); + switch (posix.errno(rc)) { + .SUCCESS => break @as(posix.socket_t, @intCast(rc)), + .INTR => continue, + .BADF, .INVAL, .NOTSOCK => { + log.info("received shutdown signal", .{}); + return; + }, + else => |err| { + log.err("failed to accept socket: {}", .{err}); + continue; + }, } - log.err("failed to accept socket: {}", .{err}); - continue; }; log.debug("({f}) connected", .{address}); @@ -2019,8 +2026,10 @@ fn testStream(handshake: bool) !net.Stream { const timeout = std.mem.toBytes(std.posix.timeval{ .sec = 0, .usec = 20_000 }); const address = try net.Address.parseIp("127.0.0.1", 9292); const stream = try net.tcpConnectToAddress(address); - try std.posix.setsockopt(stream.handle, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, &timeout); - try std.posix.setsockopt(stream.handle, std.posix.SOL.SOCKET, std.posix.SO.SNDTIMEO, &timeout); + if (comptime builtin.os.tag != .windows) { + try std.posix.setsockopt(stream.handle, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, &timeout); + try std.posix.setsockopt(stream.handle, std.posix.SOL.SOCKET, std.posix.SO.SNDTIMEO, &timeout); + } if (handshake == false) { return stream; diff --git a/src/testing.zig b/src/testing.zig index a29635a..df5f54e 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const builtin = @import("builtin"); const t = @import("t.zig"); const ws = @import("websocket.zig"); const net = @import("compat").net; @@ -33,7 +34,9 @@ pub const Testing = struct { .sec = 0, .usec = 50_000, }); - std.posix.setsockopt(pair.client.handle, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, &timeout) catch unreachable; + if (comptime builtin.os.tag != .windows) { + std.posix.setsockopt(pair.client.handle, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, &timeout) catch unreachable; + } const aa = arena.allocator(); const buffer_provider = aa.create(ws.buffer.Provider) catch unreachable; @@ -132,7 +135,6 @@ pub const Testing = struct { // For these tests, we realy don't know if the server-side of the connection // is closed, so we try to close and ignore any errors. fn close(fd: std.posix.fd_t) void { - const builtin = @import("builtin"); const native_os = builtin.os.tag; if (native_os == .windows) { return std.os.windows.CloseHandle(fd); From e32a6b95af407e3228cf68552dcc8dce347e7bd9 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Wed, 15 Apr 2026 16:05:07 -0300 Subject: [PATCH 04/13] fix Windows socket accept compatibility --- src/server/server.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/server.zig b/src/server/server.zig index 8902a7f..328c748 100644 --- a/src/server/server.zig +++ b/src/server/server.zig @@ -322,7 +322,7 @@ pub fn Blocking(comptime H: type) type { const socket = while (true) { const rc = posix.system.accept(listener, &address.any, &address_len); switch (posix.errno(rc)) { - .SUCCESS => break @as(posix.socket_t, @intCast(rc)), + .SUCCESS => break rc, .INTR => continue, .BADF, .INVAL, .NOTSOCK => { log.info("received shutdown signal", .{}); From b572338eecc1aa187e518fb691b3e8ca3ee8c3e2 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Wed, 15 Apr 2026 16:08:27 -0300 Subject: [PATCH 05/13] fix websocket blocking accept on Windows --- src/server/server.zig | 51 ++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/src/server/server.zig b/src/server/server.zig index 328c748..c0caafe 100644 --- a/src/server/server.zig +++ b/src/server/server.zig @@ -317,27 +317,44 @@ pub fn Blocking(comptime H: type) type { pub fn run(self: *Self, listener: posix.socket_t, ctx: anytype) void { defer self.shutdown(); while (true) { - var address: net.Address = undefined; - var address_len: posix.socklen_t = @sizeOf(net.Address); - const socket = while (true) { - const rc = posix.system.accept(listener, &address.any, &address_len); - switch (posix.errno(rc)) { - .SUCCESS => break rc, - .INTR => continue, - .BADF, .INVAL, .NOTSOCK => { - log.info("received shutdown signal", .{}); - return; - }, - else => |err| { - log.err("failed to accept socket: {}", .{err}); - continue; - }, - } + const accepted = if (comptime builtin.os.tag == .windows) blk: { + var server = net.Server{ + .listen_address = net.Address.parseIp("127.0.0.1", 0) catch unreachable, + .stream = .{ .handle = listener }, + }; + break :blk server.accept() catch |err| { + log.err("failed to accept socket: {}", .{err}); + continue; + }; + } else blk: { + var address: net.Address = undefined; + var address_len: posix.socklen_t = @sizeOf(net.Address); + const socket = while (true) { + const rc = posix.system.accept(listener, &address.any, &address_len); + switch (posix.errno(rc)) { + .SUCCESS => break @as(posix.socket_t, @intCast(rc)), + .INTR => continue, + .BADF, .INVAL, .NOTSOCK => { + log.info("received shutdown signal", .{}); + return; + }, + else => |err| { + log.err("failed to accept socket: {}", .{err}); + continue; + }, + } + }; + break :blk net.Server.Connection{ + .stream = .{ .handle = socket }, + .address = address, + }; }; + const socket = accepted.stream.handle; + const address = accepted.address; log.debug("({f}) connected", .{address}); const thread = std.Thread.spawn(.{}, Self.handleConnection, .{ self, socket, address, ctx }) catch |err| { - _ = posix.system.close(socket); + (net.Stream{ .handle = socket }).close(); log.err("({f}) failed to spawn connection thread: {}", .{ address, err }); continue; }; From 6d87ac570201d7a0f0fc25b7ff7aa0bf74467825 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Wed, 15 Apr 2026 16:10:41 -0300 Subject: [PATCH 06/13] fix websocket Windows recv and timeout compatibility --- src/server/server.zig | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/server/server.zig b/src/server/server.zig index c0caafe..538f3c8 100644 --- a/src/server/server.zig +++ b/src/server/server.zig @@ -379,7 +379,9 @@ pub fn Blocking(comptime H: type) type { errdefer self.cleanupConn(hc); const timeout = self.handshake_timeout; const deadline = timestamp() + timeout.sec; - try posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout.timeval); + if (comptime builtin.os.tag != .windows) { + try posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout.timeval); + } while (true) { const compression, const ok = handleHandshake(H, self, hc, ctx); @@ -409,7 +411,9 @@ pub fn Blocking(comptime H: type) type { pub fn readLoop(self: *Self, hc: *HandlerConn(H)) !void { defer self.cleanupConn(hc); if (shouldClearReceiveTimeout(builtin.os.tag)) { - try posix.setsockopt(hc.socket, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &Timeout.none); + if (comptime builtin.os.tag != .windows) { + try posix.setsockopt(hc.socket, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &Timeout.none); + } } // In BlockingMode, we always assign a reader for the duration of the connection @@ -1832,7 +1836,9 @@ fn preHandOffWrite(conn: *Conn, response: []const u8) void { const socket = conn.stream.handle; const timeout = std.mem.toBytes(posix.timeval{ .sec = 5, .usec = 0 }); - posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout) catch return; + if (comptime builtin.os.tag != .windows) { + posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout) catch return; + } var pos: usize = 0; while (pos < response.len) { @@ -1847,7 +1853,15 @@ fn preHandOffWrite(conn: *Conn, response: []const u8) void { fn socketRead(socket: posix.socket_t, buf: []u8) !usize { if (comptime builtin.os.tag == .windows) { - return posix.recv(socket, buf, 0); + while (true) { + const rc = std.c.recv(socket, buf.ptr, buf.len, 0); + switch (posix.errno(rc)) { + .SUCCESS => return @intCast(rc), + .INTR => continue, + .AGAIN => return error.WouldBlock, + else => return error.SystemResources, + } + } } return posix.read(socket, buf); } From d57d20bfac570b793a1c89fe232b4a5fd66a3353 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Wed, 15 Apr 2026 16:12:12 -0300 Subject: [PATCH 07/13] fix websocket Windows handshake error mapping --- src/server/server.zig | 1 - 1 file changed, 1 deletion(-) diff --git a/src/server/server.zig b/src/server/server.zig index 538f3c8..4ba659e 100644 --- a/src/server/server.zig +++ b/src/server/server.zig @@ -1579,7 +1579,6 @@ fn _handleHandshake(comptime H: type, worker: anytype, hc: *HandlerConn(H), ctx: const n = socketRead(hc.socket, buf[len..]) catch |err| { if (comptime builtin.os.tag == .windows) { switch (err) { - error.ConnectionResetByPeer => log.debug("({f}) handshake connection closed: {}", .{ conn.address, err }), error.WouldBlock => { std.debug.assert(blockingMode()); log.debug("({f}) handshake timeout", .{conn.address}); From cc2b8994e7c4a23631f85283474d539e06a4c65b Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Wed, 15 Apr 2026 16:15:19 -0300 Subject: [PATCH 08/13] fix websocket Windows socket read/write compatibility --- src/server/server.zig | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/server/server.zig b/src/server/server.zig index 4ba659e..fd38027 100644 --- a/src/server/server.zig +++ b/src/server/server.zig @@ -1852,22 +1852,14 @@ fn preHandOffWrite(conn: *Conn, response: []const u8) void { fn socketRead(socket: posix.socket_t, buf: []u8) !usize { if (comptime builtin.os.tag == .windows) { - while (true) { - const rc = std.c.recv(socket, buf.ptr, buf.len, 0); - switch (posix.errno(rc)) { - .SUCCESS => return @intCast(rc), - .INTR => continue, - .AGAIN => return error.WouldBlock, - else => return error.SystemResources, - } - } + return (net.Stream{ .handle = socket }).read(buf); } return posix.read(socket, buf); } fn socketWrite(socket: posix.socket_t, buf: []const u8) !usize { if (comptime builtin.os.tag == .windows) { - return error.WouldBlock; + return (net.Stream{ .handle = socket }).write(buf); } while (true) { const rc = posix.system.write(socket, buf.ptr, buf.len); From 97ec870f45a036daa40b39c1a96164d91265a048 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Fri, 17 Apr 2026 01:28:32 -0300 Subject: [PATCH 09/13] chore: migrate websocket to zig 0.16 --- .github/workflows/ci.yml | 29 ++++++++-- .github/workflows/release.yml | 77 ++++++++++++++++++++++++++ Dockerfile | 23 +++++--- build.zig.zon | 21 +++---- readme.md | 8 +-- src/client/client.zig | 80 +++++++++++---------------- src/compat.zig | 56 ++++++++++++++++++- src/server/server.zig | 33 ++++------- src/testing.zig | 5 +- src/websocket.zig | 2 +- support/autobahn/client/build.zig.zon | 15 +++-- support/autobahn/client/main.zig | 32 ++++++++--- support/autobahn/server/build.zig.zon | 15 ++--- support/autobahn/server/main.zig | 13 ++--- 14 files changed, 276 insertions(+), 133 deletions(-) create mode 100644 .github/workflows/release.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 761cf7a..4132d87 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,7 +1,8 @@ name: CI env: - ZIG_VERSION: "0.15.2" + FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: "true" + ZIG_VERSION: "0.16.0" on: push: @@ -16,15 +17,30 @@ jobs: run: shell: bash steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - - name: Install Zig 0.15.2 + - name: Install Zig 0.16.0 run: bash .github/scripts/install-zig.sh "${ZIG_VERSION}" + - name: Cache Zig build outputs + uses: actions/cache@v5 + with: + path: | + .zig-cache + ~/.cache/zig + key: zig-ubuntu-${{ hashFiles('src/**/*.zig', 'build.zig', 'build.zig.zon', 'test_runner.zig', 'support/autobahn/**/*.zig', 'support/autobahn/**/build.zig', 'support/autobahn/**/build.zig.zon', 'Dockerfile') }} + restore-keys: zig-ubuntu- + - name: non-blocking test run: zig build test -Dforce_blocking=false - name: blocking test run: zig build test -Dforce_blocking=true + + - name: build server native + run: cd support/autobahn/server && zig build + - name: build client native + run: cd support/autobahn/client && zig build + - name: build server x86-linux-gnu run: cd support/autobahn/server && zig build -Dtarget=x86-linux-gnu - name: build server x86_64-linux-gnu @@ -33,7 +49,10 @@ jobs: run: cd support/autobahn/server && zig build -Dtarget=x86_64-windows-gnu - name: build client x86-linux-gnu run: cd support/autobahn/client && zig build -Dtarget=x86-linux-gnu - - name: build server x86_64-linux-gnu + - name: build client x86_64-linux-gnu run: cd support/autobahn/client && zig build -Dtarget=x86_64-linux-gnu - - name: build clietn x86_64-windows-gnu + - name: build client x86_64-windows-gnu run: cd support/autobahn/client && zig build -Dtarget=x86_64-windows-gnu + + - name: build Docker test image + run: docker build -t websocket-zig16-test . diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..64bb5d1 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,77 @@ +name: Release + +env: + FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: "true" + ZIG_VERSION: "0.16.0" + +on: + push: + tags: ['v*'] + workflow_dispatch: + +jobs: + verify: + runs-on: ubuntu-latest + defaults: + run: + shell: bash + + steps: + - uses: actions/checkout@v6 + + - name: Install Zig 0.16.0 + run: bash .github/scripts/install-zig.sh "${ZIG_VERSION}" + + - name: non-blocking test + run: zig build test -Dforce_blocking=false + - name: blocking test + run: zig build test -Dforce_blocking=true + - name: build server native + run: cd support/autobahn/server && zig build + - name: build client native + run: cd support/autobahn/client && zig build + - name: build Docker test image + run: docker build -t websocket-zig16-release . + + source: + name: Prepare source archive + runs-on: ubuntu-latest + defaults: + run: + shell: bash + + steps: + - uses: actions/checkout@v6 + + - name: Create source archive + run: | + archive_name="websocket-source-${GITHUB_REF_NAME}.tar.gz" + tar \ + --exclude='.git' \ + --exclude='.zig-cache' \ + --exclude='zig-out' \ + -czf "/tmp/${archive_name}" . + mv "/tmp/${archive_name}" . + echo "ARCHIVE_NAME=${archive_name}" >> "$GITHUB_ENV" + + - name: Upload source archive + uses: actions/upload-artifact@v7 + with: + name: websocket-source + path: ${{ env.ARCHIVE_NAME }} + + release: + needs: [verify, source] + if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v') + runs-on: ubuntu-latest + permissions: + contents: write + + steps: + - uses: actions/download-artifact@v8 + + - name: Create release + uses: softprops/action-gh-release@v2 + with: + files: websocket-source/*.tar.gz + generate_release_notes: true diff --git a/Dockerfile b/Dockerfile index 462845e..9092521 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,16 @@ -from ubuntu:latest -run apt-get update && apt-get install -y wget xz-utils +FROM ubuntu:24.04 -run wget "https://ziglang.org/builds/zig-linux-aarch64-0.14.0-dev.244+0d79aa017.tar.xz" -run tar -xJvf zig-linux-aarch64-0.14.0-dev.244+0d79aa017.tar.xz && \ - mv /zig-linux-aarch64-0.14.0-dev.244+0d79aa017/ /zig && \ - chmod a+x /zig && \ - rm -fr /zig-* +ARG ZIG_VERSION=0.16.0 -workdir /opt -entrypoint ["/zig/zig", "build", "test"] +RUN apt-get update && apt-get install -y bash ca-certificates curl python3 xz-utils && rm -rf /var/lib/apt/lists/* + +COPY .github/scripts/install-zig.sh /tmp/install-zig.sh +RUN chmod +x /tmp/install-zig.sh && \ + GITHUB_PATH=/tmp/zig-path bash /tmp/install-zig.sh "${ZIG_VERSION}" && \ + install_dir="$(cat /tmp/zig-path)" && \ + ln -s "${install_dir}/zig" /usr/local/bin/zig + +WORKDIR /opt/websocket +COPY . . + +ENTRYPOINT ["bash", "-lc", "zig build test -Dforce_blocking=false && zig build test -Dforce_blocking=true"] diff --git a/build.zig.zon b/build.zig.zon index da8b7d1..3d5309b 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -1,12 +1,13 @@ .{ - .name = .websocket, - .version = "0.1.0", - .dependencies = .{}, - .fingerprint = 0x42ce80b97512f264, - .paths = .{ - "readme.md", - "build.zig", - "build.zig.zon", - "src", - }, + .name = .websocket, + .version = "0.1.0", + .dependencies = .{}, + .fingerprint = 0x42ce80b97512f264, + .minimum_zig_version = "0.16.0", + .paths = .{ + "readme.md", + "build.zig", + "build.zig.zon", + "src", + }, } diff --git a/readme.md b/readme.md index c0823c6..54f31c8 100644 --- a/readme.md +++ b/readme.md @@ -1,5 +1,5 @@ # A zig websocket server. -The master branch targets the latest stable of Zig (0.15.1). The dev branch targets the latest version of Zig. If you're looking for an older version, look for an zig-X.YZ branches. +The main branch targets Zig 0.16.0. If you're looking for an older version, look for a `zig-X.YZ` branch. Skip to the [client section](#client). @@ -11,8 +11,7 @@ const std = @import("std"); const ws = @import("websocket"); pub fn main() !void { - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; - const allocator = gpa.allocator(); + const allocator = std.heap.smp_allocator; var server = try ws.Server(Handler).init(allocator, .{ .port = 9224, @@ -423,8 +422,7 @@ The `*websocket.Client` can be used in one of two ways. At its simplest, after c ```zig pub fn main() !void { - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; - const allocator = gpa.allocator(); + const allocator = std.heap.smp_allocator; // create the client var client = try websocket.Client.init(allocator, .{ diff --git a/src/client/client.zig b/src/client/client.zig index 895ac9b..e6502de 100644 --- a/src/client/client.zig +++ b/src/client/client.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const builtin = @import("builtin"); const proto = @import("../proto.zig"); const buffer = @import("../buffer.zig"); const std_compat = @import("compat"); @@ -80,7 +81,7 @@ pub const Client = struct { pub fn init(allocator: Allocator, config: Config) !Client { if (config.compression != null) { - log.err("Compression is disabled as part of the 0.15 upgrade. I do hope to re-enable it soon.", .{}); + log.err("Compression is temporarily disabled in this branch. I do hope to re-enable it soon.", .{}); return error.InvalidConfiguraion; } @@ -124,7 +125,7 @@ pub const Client = struct { ._closed = false, ._own_bp = own_bp, ._mask_fn = config.mask_fn, - ._compression_opts = null, //TODO: ZIG 0.15 + ._compression_opts = null, // TODO: re-enable compression support ._reader = Reader.init(reader_buf, buffer_provider, null), }; } @@ -248,16 +249,21 @@ pub const Client = struct { self.close(.{ .code = 1002 }) catch unreachable; return err; } orelse { - reader.fill(stream) catch |err| switch (err) { - error.WouldBlock => return null, - error.Closed, error.ConnectionResetByPeer, error.NotOpenForReading => { - @atomicStore(bool, &self._closed, true, .monotonic); - return error.Closed; - }, - else => { - self.close(.{ .code = 1002 }) catch unreachable; - return err; - }, + reader.fill(stream) catch |err| { + const read_err: anyerror = err; + if (read_err == error.Timeout or read_err == error.WouldBlock) { + return null; + } + switch (read_err) { + error.Closed, error.ConnectionResetByPeer, error.NotOpenForReading => { + @atomicStore(bool, &self._closed, true, .monotonic); + return error.Closed; + }, + else => { + self.close(.{ .code = 1002 }) catch unreachable; + return err; + }, + } }; continue; }; @@ -406,36 +412,12 @@ pub const Stream = struct { } pub fn close(self: *Stream) void { - const fd = self.stream.handle; - const builtin = @import("builtin"); - const native_os = builtin.os.tag; - if (self.tls_client) |tls_client| { // Shutdown the socket first, so readLoop() can exit, before tls_client's buffers are freed - if (native_os == .windows) { - _ = std.os.windows.ws2_32.shutdown(fd, std.os.windows.ws2_32.SD_BOTH); - } else if (native_os == .wasi and !builtin.link_libc) { - _ = std.os.wasi.sock_shutdown(fd, .{ .WR = true, .RD = true }); - } else { - _ = std.posix.system.shutdown(fd, std.posix.SHUT.RDWR); - } + _ = self.stream.shutdown(.both) catch {}; tls_client.deinit(); } - - // std.posix.close panics on EBADF - // This is a general issue in Zig: - // https://github.com/ziglang/zig/issues/6389 - // - // we don't want to crash on double close - - if (native_os == .windows) { - return std.os.windows.CloseHandle(fd); - } - if (native_os == .wasi and !builtin.link_libc) { - _ = std.os.wasi.fd_close(fd); - return; - } - _ = std.posix.system.close(fd); + self.stream.close(); } pub fn read(self: *Stream, buf: []u8) !usize { @@ -463,18 +445,19 @@ pub const Stream = struct { return self.stream.writeAll(data); } - const zero_timeout = std.mem.toBytes(posix.timeval{ .sec = 0, .usec = 0 }); pub fn writeTimeout(self: *const Stream, ms: u32) !void { - return self.setTimeout(posix.SO.SNDTIMEO, ms); + const opt_name = if (comptime builtin.os.tag == .windows) std.os.windows.ws2_32.SO.SNDTIMEO else posix.SO.SNDTIMEO; + return self.setTimeout(opt_name, ms); } pub fn readTimeout(self: *const Stream, ms: u32) !void { - return self.setTimeout(posix.SO.RCVTIMEO, ms); + const opt_name = if (comptime builtin.os.tag == .windows) std.os.windows.ws2_32.SO.RCVTIMEO else posix.SO.RCVTIMEO; + return self.setTimeout(opt_name, ms); } fn setTimeout(self: *const Stream, opt_name: u32, ms: u32) !void { - if (ms == 0) { - return self.setsockopt(opt_name, &zero_timeout); + if (comptime builtin.os.tag == .windows) { + return; } const timeout = std.mem.toBytes(posix.timeval{ @@ -646,9 +629,12 @@ const HandShakeReply = struct { var server_compression: bool = false; while (true) { - const n = stream.read(buf[pos..]) catch |err| switch (err) { - error.WouldBlock => return error.Timeout, - else => return err, + const n = stream.read(buf[pos..]) catch |err| { + const read_err: anyerror = err; + if (read_err == error.Timeout or read_err == error.WouldBlock) { + return error.Timeout; + } + return err; }; if (n == 0) { return error.ConnectionClosed; @@ -732,7 +718,7 @@ const HandShakeReply = struct { return error.InvalidExtensionHeader; } if (!sc.client_no_context_takeover or !sc.server_no_context_takeover) { - // as of Zig 0.15, we no longer support context takeover + // Context takeover is currently disabled in this branch. // We told the server this, it should have respected it. return error.InvalidExtensionHeader; } diff --git a/src/compat.zig b/src/compat.zig index 0bab495..68747eb 100644 --- a/src/compat.zig +++ b/src/compat.zig @@ -109,7 +109,7 @@ pub const net = struct { pub const Handle = IoNet.Socket.Handle; pub const Reader = IoNet.Stream.Reader; pub const Writer = IoNet.Stream.Writer; - pub const ReadError = posix.ReadError; + pub const ReadError = if (builtin.os.tag == .windows) Reader.Error else posix.ReadError; pub const WriteError = IoNet.Stream.Writer.Error; fn toInner(self: Stream) IoNet.Stream { @@ -134,6 +134,11 @@ pub const net = struct { } pub fn read(self: Stream, buffer: []u8) ReadError!usize { + if (buffer.len == 0) return 0; + if (comptime builtin.os.tag == .windows) { + var data = [_][]u8{buffer}; + return io().vtable.netRead(io().userdata, self.handle, &data); + } return posix.read(self.handle, buffer); } @@ -423,6 +428,55 @@ pub const net = struct { return result; } else |_| {} + if (comptime builtin.os.tag == .windows) { + const host_name = IoNet.HostName.init(name) catch return error.UnknownHostName; + var canonical_name_buffer: [IoNet.HostName.max_len]u8 = undefined; + var lookup_buffer: [32]IoNet.HostName.LookupResult = undefined; + var lookup_queue: std.Io.Queue(IoNet.HostName.LookupResult) = .init(&lookup_buffer); + var lookup_future = io().async(IoNet.HostName.lookup, .{ + host_name, + io(), + &lookup_queue, + .{ + .port = port, + .canonical_name_buffer = &canonical_name_buffer, + }, + }); + defer lookup_future.cancel(io()) catch {}; + + var addrs: std.ArrayList(Address) = .empty; + defer addrs.deinit(arena); + + while (lookup_queue.getOne(io())) |lookup_result| switch (lookup_result) { + .address => |address| try addrs.append(arena, Address.fromCurrent(address)), + .canonical_name => |canonical_name| { + result.canon_name = try arena.dupe(u8, canonical_name.bytes); + }, + } else |err| switch (err) { + error.Canceled => return error.Unexpected, + error.Closed => { + lookup_future.await(io()) catch |lookup_err| switch (lookup_err) { + error.UnknownHostName, error.NoAddressReturned => return error.UnknownHostName, + error.NameServerFailure => return error.NameServerFailure, + error.AddressFamilyUnsupported => return error.AddressFamilyNotSupported, + error.SystemResources => return error.SystemResources, + error.NetworkDown, error.DetectingNetworkConfigurationFailed => return error.ServiceUnavailable, + error.ResolvConfParseFailed, + error.InvalidDnsARecord, + error.InvalidDnsAAAARecord, + error.InvalidDnsCnameRecord, + error.Canceled, + => return error.Unexpected, + else => return error.Unexpected, + }; + }, + } + + result.addrs = try addrs.toOwnedSlice(arena); + if (result.addrs.len == 0) return error.UnknownHostName; + return result; + } + var name_buffer: [IoNet.HostName.max_len:0]u8 = undefined; @memcpy(name_buffer[0..name.len], name); name_buffer[name.len] = 0; diff --git a/src/server/server.zig b/src/server/server.zig index fd38027..1866249 100644 --- a/src/server/server.zig +++ b/src/server/server.zig @@ -112,7 +112,7 @@ pub fn Server(comptime H: type) type { } if (config.compression != null) { - log.err("Compression is disabled as part of the 0.15 upgrade. I do hope to re-enable it soon.", .{}); + log.err("Compression is temporarily disabled in this branch. I do hope to re-enable it soon.", .{}); return error.InvalidConfiguraion; } @@ -1440,7 +1440,7 @@ pub const Conn = struct { pub fn writeFrame(self: *Conn, op_code: OpCode, data: []const u8) !void { const payload = data; - // Zig 0.15 compression disabled + // Compression is temporarily disabled in this branch. const compressed = false; // if (self.compression) |c| { // if (data.len >= c.write_treshold) { @@ -1577,23 +1577,13 @@ fn _handleHandshake(comptime H: type, worker: anytype, hc: *HandlerConn(H), ctx: } const n = socketRead(hc.socket, buf[len..]) catch |err| { - if (comptime builtin.os.tag == .windows) { - switch (err) { - error.WouldBlock => { - std.debug.assert(blockingMode()); - log.debug("({f}) handshake timeout", .{conn.address}); - }, - else => log.warn("({f}) handshake error reading from socket: {}", .{ conn.address, err }), - } - } else { - switch (err) { - error.ConnectionResetByPeer => log.debug("({f}) handshake connection closed: {}", .{ conn.address, err }), - error.WouldBlock => { - std.debug.assert(blockingMode()); - log.debug("({f}) handshake timeout", .{conn.address}); - }, - else => log.warn("({f}) handshake error reading from socket: {}", .{ conn.address, err }), - } + switch (@as(anyerror, err)) { + error.ConnectionResetByPeer => log.debug("({f}) handshake connection closed: {}", .{ conn.address, err }), + error.Timeout, error.WouldBlock => { + std.debug.assert(blockingMode()); + log.debug("({f}) handshake timeout", .{conn.address}); + }, + else => log.warn("({f}) handshake error reading from socket: {}", .{ conn.address, err }), } return .{ false, false }; }; @@ -1861,8 +1851,9 @@ fn socketWrite(socket: posix.socket_t, buf: []const u8) !usize { if (comptime builtin.os.tag == .windows) { return (net.Stream{ .handle = socket }).write(buf); } + const flags = if (@hasDecl(posix.MSG, "NOSIGNAL")) posix.MSG.NOSIGNAL else 0; while (true) { - const rc = posix.system.write(socket, buf.ptr, buf.len); + const rc = posix.system.sendto(socket, buf.ptr, buf.len, flags, null, 0); switch (posix.errno(rc)) { .SUCCESS => return @intCast(rc), .INTR => continue, @@ -2045,7 +2036,7 @@ test "shouldClearReceiveTimeout skips Windows" { } fn testStream(handshake: bool) !net.Stream { - const timeout = std.mem.toBytes(std.posix.timeval{ .sec = 0, .usec = 20_000 }); + const timeout = std.mem.toBytes(std.posix.timeval{ .sec = 0, .usec = 250_000 }); const address = try net.Address.parseIp("127.0.0.1", 9292); const stream = try net.tcpConnectToAddress(address); if (comptime builtin.os.tag != .windows) { diff --git a/src/testing.zig b/src/testing.zig index df5f54e..757e534 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -32,7 +32,7 @@ pub const Testing = struct { const pair = t.SocketPair.init(.{ .port = port }); const timeout = std.mem.toBytes(std.posix.timeval{ .sec = 0, - .usec = 50_000, + .usec = 250_000, }); if (comptime builtin.os.tag != .windows) { std.posix.setsockopt(pair.client.handle, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, &timeout) catch unreachable; @@ -100,7 +100,8 @@ pub const Testing = struct { return error.NotClosed; } - // we have a 50ms timeout on this socket. It's all localhost. We expect + // We keep a short read timeout on this localhost test socket, but allow + // enough headroom for slower CI and containerized runs. // to be able to read messages in that time. pub fn ensureMessage(self: *Testing) !void { if (self.received_index < self.received.items.len) { diff --git a/src/websocket.zig b/src/websocket.zig index fdb822c..7afea55 100644 --- a/src/websocket.zig +++ b/src/websocket.zig @@ -22,7 +22,7 @@ pub const Handshake = @import("server/handshake.zig").Handshake; pub const Compression = struct { write_threshold: ?usize = null, retain_write_buffer: bool = true, - // don't know how to support these with the Zig 0.15 changes. So, for now + // don't know how to support these in the current compatibility branch. So, for now // we'll always require these to be true // client_no_context_takeover: bool = false, // server_no_context_takeover: bool = false, diff --git a/support/autobahn/client/build.zig.zon b/support/autobahn/client/build.zig.zon index 9cc6022..0bbbd79 100644 --- a/support/autobahn/client/build.zig.zon +++ b/support/autobahn/client/build.zig.zon @@ -1,11 +1,10 @@ .{ - .name = .autobahn_test_client, - .paths = .{""}, - .version = "0.0.0", - .fingerprint = 0xf155d3bf7ec3bfc, - .dependencies = .{ - .websocket = .{ - .path = "../../../" + .name = .autobahn_test_client, + .paths = .{""}, + .version = "0.0.0", + .fingerprint = 0xf155d3bf7ec3bfc, + .minimum_zig_version = "0.16.0", + .dependencies = .{ + .websocket = .{ .path = "../../../" }, }, - }, } diff --git a/support/autobahn/client/main.zig b/support/autobahn/client/main.zig index 4c123b8..b551603 100644 --- a/support/autobahn/client/main.zig +++ b/support/autobahn/client/main.zig @@ -4,10 +4,7 @@ const websocket = @import("websocket"); const Allocator = std.mem.Allocator; pub fn main() !void { - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; - defer _ = gpa.detectLeaks(); - - const allocator = gpa.allocator(); + const allocator = std.heap.smp_allocator; const cases = [_][]const u8{ "1.1.1", "1.1.2", "1.1.3", "1.1.4", "1.1.5", "1.1.6", "1.1.7", "1.1.8", "1.2.1", "1.2.2", "1.2.3", "1.2.4", "1.2.5", "1.2.6", "1.2.7", "1.2.8", @@ -28,13 +25,25 @@ pub fn main() !void { "7.9.6", "7.9.7", "7.9.8", "7.9.9", "7.13.1", "7.13.2", "9.1.1", "9.1.2", "9.1.3", "9.1.4", "9.1.5", "9.1.6", "9.2.1", "9.2.2", "9.2.3", "9.2.4", "9.2.5", "9.2.6", "9.3.1", "9.3.2", "9.3.3", "9.3.4", "9.3.5", "9.3.6", "9.3.7", "9.3.8", "9.3.9", "9.4.1", "9.4.2", "9.4.3", "9.4.4", "9.4.5", "9.4.6", "9.4.7", "9.4.8", "9.4.9", "9.5.1", "9.5.2", "9.5.3", "9.5.4", "9.5.5", "9.5.6", "9.6.1", "9.6.2", "9.6.3", "9.6.4", "9.6.5", "9.6.6", - "9.7.1", "9.7.2", "9.7.3", "9.7.4", "9.7.5", "9.7.6", "9.8.1", "9.8.2", "9.8.3", "9.8.4", "9.8.5", "9.8.6", "10.1.1", - "12.1.1", "12.1.2", "12.1.3", "12.1.4", "12.1.5", "12.1.6", "12.1.7", "12.1.8", "12.1.9", "12.1.10", "12.1.11", "12.1.12", "12.1.13", "12.1.14", "12.1.15", "12.1.16", "12.1.17", "12.1.18", "12.2.1", "12.2.2", "12.2.3", "12.2.4", "12.2.5", "12.2.6", "12.2.7", "12.2.8", "12.2.9", "12.2.10", "12.2.11", "12.2.12", "12.2.13", "12.2.14", "12.2.15", "12.2.16", "12.2.17", "12.2.18", "12.3.1", "12.3.2", "12.3.3", "12.3.4", "12.3.5", "12.3.6", "12.3.7", "12.3.8", "12.3.9", "12.3.10", "12.3.11", "12.3.12", "12.3.13", "12.3.14", "12.3.15", "12.3.16", "12.3.17", "12.3.18", "12.4.1", "12.4.2", "12.4.3", "12.4.4", "12.4.5", "12.4.6", "12.4.7", "12.4.8", "12.4.9", "12.4.10", "12.4.11", "12.4.12", "12.4.13", "12.4.14", "12.4.15", "12.4.16", "12.4.17", "12.4.18", "12.5.1", "12.5.2", "12.5.3", "12.5.4", "12.5.5", "12.5.6", "12.5.7", "12.5.8", "12.5.9", "12.5.10", "12.5.11", "12.5.12", "12.5.13", "12.5.14", "12.5.15", "12.5.16", "12.5.17", "12.5.18", - "13.1.1", "13.1.2", "13.1.3", "13.1.4", "13.1.5", "13.1.6", "13.1.7", "13.1.8", "13.1.9", "13.1.10", "13.1.11", "13.1.12", "13.1.13", "13.1.14", "13.1.15", "13.1.16", "13.1.17", "13.1.18", "13.2.1", "13.2.2", "13.2.3", "13.2.4", "13.2.5", "13.2.6", "13.2.7", "13.2.8", "13.2.9", "13.2.10", "13.2.11", "13.2.12", "13.2.13", "13.2.14", "13.2.15", "13.2.16", "13.2.17", "13.2.18", "13.3.1", "13.3.2", "13.3.3", "13.3.4", "13.3.5", "13.3.6", "13.3.7", "13.3.8", "13.3.9", "13.3.10", "13.3.11", "13.3.12", "13.3.13", "13.3.14", "13.3.15", "13.3.16", "13.3.17", "13.3.18", "13.4.1", "13.4.2", "13.4.3", "13.4.4", "13.4.5", "13.4.6", "13.4.7", "13.4.8", "13.4.9", "13.4.10", "13.4.11", "13.4.12", "13.4.13", "13.4.14", "13.4.15", "13.4.16", "13.4.17", "13.4.18", "13.5.1", "13.5.2", "13.5.3", "13.5.4", "13.5.5", "13.5.6", "13.5.7", "13.5.8", "13.5.9", "13.5.10", "13.5.11", "13.5.12", "13.5.13", "13.5.14", "13.5.15", "13.5.16", "13.5.17", "13.5.18", "13.6.1", "13.6.2", "13.6.3", "13.6.4", "13.6.5", "13.6.6", "13.6.7", "13.6.8", "13.6.9", "13.6.10", "13.6.11", "13.6.12", "13.6.13", "13.6.14", "13.6.15", "13.6.16", "13.6.17", "13.6.18", "13.7.1", "13.7.2", "13.7.3", "13.7.4", "13.7.5", "13.7.6", "13.7.7", "13.7.8", "13.7.9", "13.7.10", "13.7.11", "13.7.12", "13.7.13", "13.7.14", "13.7.15", "13.7.16", "13.7.17", "13.7.18", + "9.7.1", "9.7.2", "9.7.3", "9.7.4", "9.7.5", "9.7.6", "9.8.1", "9.8.2", "9.8.3", "9.8.4", "9.8.5", "9.8.6", "10.1.1", "12.1.1", "12.1.2", "12.1.3", + "12.1.4", "12.1.5", "12.1.6", "12.1.7", "12.1.8", "12.1.9", "12.1.10", "12.1.11", "12.1.12", "12.1.13", "12.1.14", "12.1.15", "12.1.16", "12.1.17", "12.1.18", "12.2.1", + "12.2.2", "12.2.3", "12.2.4", "12.2.5", "12.2.6", "12.2.7", "12.2.8", "12.2.9", "12.2.10", "12.2.11", "12.2.12", "12.2.13", "12.2.14", "12.2.15", "12.2.16", "12.2.17", + "12.2.18", "12.3.1", "12.3.2", "12.3.3", "12.3.4", "12.3.5", "12.3.6", "12.3.7", "12.3.8", "12.3.9", "12.3.10", "12.3.11", "12.3.12", "12.3.13", "12.3.14", "12.3.15", + "12.3.16", "12.3.17", "12.3.18", "12.4.1", "12.4.2", "12.4.3", "12.4.4", "12.4.5", "12.4.6", "12.4.7", "12.4.8", "12.4.9", "12.4.10", "12.4.11", "12.4.12", "12.4.13", + "12.4.14", "12.4.15", "12.4.16", "12.4.17", "12.4.18", "12.5.1", "12.5.2", "12.5.3", "12.5.4", "12.5.5", "12.5.6", "12.5.7", "12.5.8", "12.5.9", "12.5.10", "12.5.11", + "12.5.12", "12.5.13", "12.5.14", "12.5.15", "12.5.16", "12.5.17", "12.5.18", "13.1.1", "13.1.2", "13.1.3", "13.1.4", "13.1.5", "13.1.6", "13.1.7", "13.1.8", "13.1.9", + "13.1.10", "13.1.11", "13.1.12", "13.1.13", "13.1.14", "13.1.15", "13.1.16", "13.1.17", "13.1.18", "13.2.1", "13.2.2", "13.2.3", "13.2.4", "13.2.5", "13.2.6", "13.2.7", + "13.2.8", "13.2.9", "13.2.10", "13.2.11", "13.2.12", "13.2.13", "13.2.14", "13.2.15", "13.2.16", "13.2.17", "13.2.18", "13.3.1", "13.3.2", "13.3.3", "13.3.4", "13.3.5", + "13.3.6", "13.3.7", "13.3.8", "13.3.9", "13.3.10", "13.3.11", "13.3.12", "13.3.13", "13.3.14", "13.3.15", "13.3.16", "13.3.17", "13.3.18", "13.4.1", "13.4.2", "13.4.3", + "13.4.4", "13.4.5", "13.4.6", "13.4.7", "13.4.8", "13.4.9", "13.4.10", "13.4.11", "13.4.12", "13.4.13", "13.4.14", "13.4.15", "13.4.16", "13.4.17", "13.4.18", "13.5.1", + "13.5.2", "13.5.3", "13.5.4", "13.5.5", "13.5.6", "13.5.7", "13.5.8", "13.5.9", "13.5.10", "13.5.11", "13.5.12", "13.5.13", "13.5.14", "13.5.15", "13.5.16", "13.5.17", + "13.5.18", "13.6.1", "13.6.2", "13.6.3", "13.6.4", "13.6.5", "13.6.6", "13.6.7", "13.6.8", "13.6.9", "13.6.10", "13.6.11", "13.6.12", "13.6.13", "13.6.14", "13.6.15", + "13.6.16", "13.6.17", "13.6.18", "13.7.1", "13.7.2", "13.7.3", "13.7.4", "13.7.5", "13.7.6", "13.7.7", "13.7.8", "13.7.9", "13.7.10", "13.7.11", "13.7.12", "13.7.13", + "13.7.14", "13.7.15", "13.7.16", "13.7.17", "13.7.18", }; // wait 5 seconds for autobanh server to be up - std.Thread.sleep(std.time.ns_per_s * 5); + sleepSeconds(5); var buffer_provider = try websocket.bufferProvider(allocator, .{ .count = 10, .size = 32768, .max = 20_000_000 }); defer buffer_provider.deinit(); @@ -92,7 +101,7 @@ const Handler = struct { .port = 9001, .host = "localhost", .buffer_provider = buffer_provider, - // zig 0.15 + // Compression is temporarily disabled in this branch. // .compression = .{ // .write_threshold = 0, // }, @@ -129,3 +138,8 @@ const Handler = struct { } } }; + +fn sleepSeconds(seconds: u32) void { + var threaded: std.Io.Threaded = .init_single_threaded; + std.Io.sleep(threaded.io(), .fromSeconds(seconds), .awake) catch {}; +} diff --git a/support/autobahn/server/build.zig.zon b/support/autobahn/server/build.zig.zon index 8a462b2..f02e3bd 100644 --- a/support/autobahn/server/build.zig.zon +++ b/support/autobahn/server/build.zig.zon @@ -1,9 +1,10 @@ .{ - .name = .autobahn_test_server, - .paths = .{""}, - .version = "0.0.0", - .fingerprint = 0x923c8c98ca4d09ab, - .dependencies = .{ - .websocket = .{.path = "../../../"}, - }, + .name = .autobahn_test_server, + .paths = .{""}, + .version = "0.0.0", + .fingerprint = 0x923c8c98ca4d09ab, + .minimum_zig_version = "0.16.0", + .dependencies = .{ + .websocket = .{ .path = "../../../" }, + }, } diff --git a/support/autobahn/server/main.zig b/support/autobahn/server/main.zig index 3a80db9..edc44e7 100644 --- a/support/autobahn/server/main.zig +++ b/support/autobahn/server/main.zig @@ -14,12 +14,9 @@ var nonblocking_server: websocket.Server(Handler) = undefined; var nonblocking_bp_server: websocket.Server(Handler) = undefined; pub fn main() !void { - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; - const allocator = gpa.allocator(); + const allocator = std.heap.smp_allocator; if (@import("builtin").os.tag != .windows) { - defer _ = gpa.detectLeaks(); - std.posix.sigaction(std.posix.SIG.TERM, &.{ .handler = .{ .handler = shutdown }, .mask = std.posix.sigemptyset(), @@ -53,7 +50,7 @@ fn startNonBlocking(allocator: Allocator) !std.Thread { .max_size = 1024, .max_headers = 10, }, - // zig 0.15 + // Compression is temporarily disabled in this branch. // .compression = .{ // .write_threshold = 0, // }, @@ -77,7 +74,7 @@ fn startNonBlockingBufferPool(allocator: Allocator) !std.Thread { .max_size = 1024, .max_headers = 10, }, - // zig 0.15 + // Compression is temporarily disabled in this branch. // .compression = .{ // .write_threshold = 0, // }, @@ -102,14 +99,14 @@ const Handler = struct { if (std.unicode.utf8ValidateSlice(data)) { try self.conn.writeText(data); } else { - self.conn.close(.{.code = 1007}) catch {}; + self.conn.close(.{ .code = 1007 }) catch {}; } }, } } }; -fn shutdown(_: c_int) callconv(.c) void { +fn shutdown(_: @TypeOf(std.posix.SIG.TERM)) callconv(.c) void { nonblocking_server.stop(); nonblocking_bp_server.stop(); } From 4d1d5290f6c9b243414cae384f1c5f63c47ea8ef Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Fri, 17 Apr 2026 06:42:49 -0300 Subject: [PATCH 10/13] fix: use global environ in test runner --- src/compat.zig | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/src/compat.zig b/src/compat.zig index 68747eb..af60b55 100644 --- a/src/compat.zig +++ b/src/compat.zig @@ -16,21 +16,7 @@ pub fn currentEnviron() std.process.Environ { fn environ() std.process.Environ { if (builtin.is_test) return std.testing.environ; - return switch (builtin.os.tag) { - .windows, .freestanding, .other => .{ .block = .global }, - .wasi, .emscripten => if (builtin.link_libc) blk: { - const c_environ = std.c.environ; - var env_count: usize = 0; - while (c_environ[env_count] != null) : (env_count += 1) {} - break :blk .{ .block = .{ .slice = c_environ[0..env_count :null] } }; - } else .{ .block = .global }, - else => blk: { - const c_environ = std.c.environ; - var env_count: usize = 0; - while (c_environ[env_count] != null) : (env_count += 1) {} - break :blk .{ .block = .{ .slice = c_environ[0..env_count :null] } }; - }, - }; + return .{ .block = .global }; } pub const process = struct { From 617fb933d004b9bc3ffdcbbad5ab8a3511dc4e21 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Fri, 17 Apr 2026 06:45:25 -0300 Subject: [PATCH 11/13] fix: initialize websocket test environ from process init --- src/compat.zig | 7 ++++++- test_runner.zig | 6 ++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/compat.zig b/src/compat.zig index af60b55..f5de761 100644 --- a/src/compat.zig +++ b/src/compat.zig @@ -4,19 +4,24 @@ const builtin = @import("builtin"); const Allocator = std.mem.Allocator; var fallback_threaded: std.Io.Threaded = .init_single_threaded; +var process_environ: ?std.process.Environ = null; pub fn io() std.Io { if (builtin.is_test) return std.testing.io; return fallback_threaded.io(); } +pub fn initProcessMinimal(init: std.process.Init.Minimal) void { + process_environ = init.environ; +} + pub fn currentEnviron() std.process.Environ { return environ(); } fn environ() std.process.Environ { if (builtin.is_test) return std.testing.environ; - return .{ .block = .global }; + return process_environ orelse .empty; } pub const process = struct { diff --git a/test_runner.zig b/test_runner.zig index cdaed84..fb1d0c6 100644 --- a/test_runner.zig +++ b/test_runner.zig @@ -19,7 +19,9 @@ const BORDER = "=" ** 80; // use in custom panic handler var current_test: ?[]const u8 = null; -pub fn main() !void { +pub fn main(init: std.process.Init.Minimal) !void { + compat.initProcessMinimal(init); + var mem: [8192]u8 = undefined; var fba = std.heap.FixedBufferAllocator.init(&mem); @@ -35,7 +37,7 @@ pub fn main() !void { var fail: usize = 0; var skip: usize = 0; var leak: usize = 0; - const testing_environ = compat.currentEnviron(); + const testing_environ = init.environ; Printer.fmt("\r\x1b[0K", .{}); // beginning of line and clear to end of line From c693ad73570296edb811e7317ae9de5c6d1836b2 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Fri, 17 Apr 2026 06:48:00 -0300 Subject: [PATCH 12/13] fix: read websocket test env from process context --- src/compat.zig | 3 ++- test_runner.zig | 40 +++++++++++++++++++++++----------------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/src/compat.zig b/src/compat.zig index f5de761..f747d81 100644 --- a/src/compat.zig +++ b/src/compat.zig @@ -20,8 +20,9 @@ pub fn currentEnviron() std.process.Environ { } fn environ() std.process.Environ { + if (process_environ) |env| return env; if (builtin.is_test) return std.testing.environ; - return process_environ orelse .empty; + return .empty; } pub const process = struct { diff --git a/test_runner.zig b/test_runner.zig index fb1d0c6..c3c137a 100644 --- a/test_runner.zig +++ b/test_runner.zig @@ -19,15 +19,15 @@ const BORDER = "=" ** 80; // use in custom panic handler var current_test: ?[]const u8 = null; -pub fn main(init: std.process.Init.Minimal) !void { - compat.initProcessMinimal(init); +pub fn main(init: std.process.Init) !void { + compat.initProcessMinimal(init.minimal); var mem: [8192]u8 = undefined; var fba = std.heap.FixedBufferAllocator.init(&mem); const allocator = fba.allocator(); - const env = Env.init(allocator); + const env = Env.init(allocator, init.environ_map); defer env.deinit(allocator); var slowest = SlowTracker.init(allocator, 5); @@ -37,7 +37,7 @@ pub fn main(init: std.process.Init.Minimal) !void { var fail: usize = 0; var skip: usize = 0; var leak: usize = 0; - const testing_environ = init.environ; + const testing_environ = init.minimal.environ; Printer.fmt("\r\x1b[0K", .{}); // beginning of line and clear to end of line @@ -239,11 +239,11 @@ const Env = struct { fail_first: bool, filter: ?[]const u8, - fn init(allocator: Allocator) Env { + fn init(allocator: Allocator, environ_map: *const std.process.Environ.Map) Env { return .{ - .verbose = readEnvBool(allocator, "TEST_VERBOSE", true), - .fail_first = readEnvBool(allocator, "TEST_FAIL_FIRST", false), - .filter = readEnv(allocator, "TEST_FILTER"), + .verbose = readEnvBool(allocator, environ_map, "TEST_VERBOSE", true), + .fail_first = readEnvBool(allocator, environ_map, "TEST_FAIL_FIRST", false), + .filter = readEnv(allocator, environ_map, "TEST_FILTER"), }; } @@ -253,19 +253,25 @@ const Env = struct { } } - fn readEnv(allocator: Allocator, key: []const u8) ?[]const u8 { - const v = compat.process.getEnvVarOwned(allocator, key) catch |err| { - if (err == error.EnvironmentVariableNotFound) { - return null; - } - std.log.warn("failed to get env var {s} due to err {}", .{ key, err }); + fn readEnv( + allocator: Allocator, + environ_map: *const std.process.Environ.Map, + key: []const u8, + ) ?[]const u8 { + const value = environ_map.get(key) orelse return null; + return allocator.dupe(u8, value) catch |err| { + std.log.warn("failed to copy env var {s} due to err {}", .{ key, err }); return null; }; - return v; } - fn readEnvBool(allocator: Allocator, key: []const u8, deflt: bool) bool { - const value = readEnv(allocator, key) orelse return deflt; + fn readEnvBool( + allocator: Allocator, + environ_map: *const std.process.Environ.Map, + key: []const u8, + deflt: bool, + ) bool { + const value = readEnv(allocator, environ_map, key) orelse return deflt; defer allocator.free(value); return std.ascii.eqlIgnoreCase(value, "true"); } From a59b85dea387368b1c5ed003d643feba469c138c Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Fri, 17 Apr 2026 10:34:07 -0300 Subject: [PATCH 13/13] fix: preserve peer address and retry websocket connects --- src/compat.zig | 15 +++++++++++++-- src/websocket.zig | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/src/compat.zig b/src/compat.zig index f747d81..1ad0fff 100644 --- a/src/compat.zig +++ b/src/compat.zig @@ -356,7 +356,7 @@ pub const net = struct { try setNonBlocking(stream.socket.handle, false); return .{ .stream = .{ .handle = stream.socket.handle }, - .address = self.listen_address, + .address = Address.fromCurrent(stream.socket.address), }; } }; @@ -391,11 +391,22 @@ pub const net = struct { return .{ .handle = stream.socket.handle }; } + pub fn tcpConnectToAddresses(addresses: []const Address) !Stream { + var last_err: ?anyerror = null; + for (addresses) |address| { + return tcpConnectToAddress(address) catch |err| { + last_err = err; + continue; + }; + } + return last_err orelse error.UnknownHostName; + } + pub fn tcpConnectToHost(allocator: Allocator, host: []const u8, port: u16) !Stream { const addresses = try getAddressList(allocator, host, port); defer addresses.deinit(); if (addresses.addrs.len == 0) return error.UnknownHostName; - return tcpConnectToAddress(addresses.addrs[0]); + return tcpConnectToAddresses(addresses.addrs); } pub fn getAddressList(gpa: Allocator, name: []const u8, port: u16) GetAddressListError!*AddressList { diff --git a/src/websocket.zig b/src/websocket.zig index 7afea55..a73d369 100644 --- a/src/websocket.zig +++ b/src/websocket.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const compat = @import("compat"); pub const buffer = @import("buffer.zig"); @@ -107,3 +108,44 @@ test "frameBin" { } } } + +test "compat net.Server.accept returns peer address" { + var address = try compat.net.Address.parseIp("127.0.0.1", 0); + var listener = try address.listen(.{ .reuse_address = true }); + defer listener.deinit(); + + const client = try compat.net.tcpConnectToAddress(listener.listen_address); + defer client.close(); + + var accepted = try listener.accept(); + defer accepted.stream.close(); + + var client_address: compat.net.Address = undefined; + var client_address_len: std.posix.socklen_t = @sizeOf(compat.net.Address); + switch (std.posix.errno(std.posix.system.getsockname(client.handle, &client_address.any, &client_address_len))) { + .SUCCESS => {}, + else => |err| return std.posix.unexpectedErrno(err), + } + + try t.expectEqual(client_address.any.family, accepted.address.any.family); + try t.expectEqual(client_address.in.sa.addr, accepted.address.in.sa.addr); + try t.expectEqual(client_address.in.getPort(), accepted.address.in.getPort()); + try std.testing.expect(client_address.in.getPort() != listener.listen_address.in.getPort()); +} + +test "compat net.tcpConnectToAddresses falls back to a working address" { + var address = try compat.net.Address.parseIp("127.0.0.1", 0); + var listener = try address.listen(.{ .reuse_address = true }); + defer listener.deinit(); + + const addresses = [_]compat.net.Address{ + try compat.net.Address.parseIp("::1", listener.listen_address.in.getPort()), + listener.listen_address, + }; + + const client = try compat.net.tcpConnectToAddresses(&addresses); + defer client.close(); + + var accepted = try listener.accept(); + defer accepted.stream.close(); +}