Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- name: Set up Zig
uses: korandoru/setup-zig@v1
with:
zig-version: 0.15.2
zig-version: 0.16.0

- name: Lint
run: zig fmt --check *.zig
Expand All @@ -25,7 +25,7 @@ jobs:
- name: Set up Zig
uses: korandoru/setup-zig@v1
with:
zig-version: 0.15.2
zig-version: 0.16.0

- name: Test
run: zig build test --summary all
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ main*
zig-cache/
.zig-cache/
zig-out/
zig-pkg/
.tool-versions
10 changes: 3 additions & 7 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// This field is optional.
// This is currently advisory only; Zig does not yet do anything
// with this value.
.minimum_zig_version = "0.15.2",
.minimum_zig_version = "0.16.0",

// Together with name, this represents a globally unique package
// identifier. This field is generated by the Zig toolchain when the
Expand All @@ -33,12 +33,8 @@

.dependencies = .{
.snappyz = .{
.url = "https://github.com/blockblaz/zig-snappy/archive/v0.0.3.tar.gz",
.hash = "zig_snappy-0.0.3-bDFzXnBgAAD9LL_x0g6M_1SPPwMGAA6RAT_rlxG6n06j",
},
.zig_snappy = .{
.url = "https://github.com/blockblaz/zig-snappy/archive/v0.0.3.tar.gz",
.hash = "zig_snappy-0.0.3-bDFzXnBgAAD9LL_x0g6M_1SPPwMGAA6RAT_rlxG6n06j",
.url = "https://github.com/blockblaz/zig-snappy/archive/0ae2a122ea7e37b027b33a48287f7b842a35bd77.tar.gz",
.hash = "zig_snappy-0.0.3-bDFzXmBjAAAd1yTK5LQvRJ-srdnJbhkHZajl0KcXVJDx",
},
},

Expand Down
162 changes: 80 additions & 82 deletions src/frames.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const snappyz = @import("snappyz");

const Allocator = std.mem.Allocator;
const math = std.math;
const meta = std.meta;

const FrameError = error{
UnexpectedEof,
Expand All @@ -30,7 +29,7 @@ const ChunkType = enum(u8) {
pub const ParseError = error{InvalidValue};

pub fn fromByte(value: u8) ParseError!ChunkType {
return meta.intToEnum(ChunkType, value) catch ParseError.InvalidValue;
return std.enums.fromInt(ChunkType, value) orelse ParseError.InvalidValue;
}

pub fn toByte(self: ChunkType) u8 {
Expand Down Expand Up @@ -104,33 +103,33 @@ pub const FrameEncoder = struct {

/// Encode all data into a fresh Snappy frame stored in an owned slice.
pub fn encode(allocator: Allocator, data: []const u8) ![]u8 {
var output = std.ArrayListUnmanaged(u8).empty;
errdefer output.deinit(allocator);
var allocating = std.Io.Writer.Allocating.init(allocator);
errdefer allocating.deinit();

var encoder = FrameEncoder.init(allocator);

var index: usize = 0;
while (index < data.len) {
const end_index = @min(index + recommended_chunk, data.len);
const chunk_input = data[index..end_index];
try encoder.writeChunk(output.writer(allocator), chunk_input);
try encoder.writeChunk(&allocating.writer, chunk_input);
index = end_index;
}

try encoder.finish(output.writer(allocator));
try encoder.finish(&allocating.writer);

return output.toOwnedSlice(allocator);
return allocating.toOwnedSlice();
}

/// Stream input from `reader` into the frame writer without buffering the entire payload.
pub fn encodeToWriter(allocator: Allocator, reader: anytype, writer: anytype) !void {
pub fn encodeToWriter(allocator: Allocator, reader: *std.Io.Reader, writer: *std.Io.Writer) !void {
var encoder = FrameEncoder.init(allocator);

var chunk_input_buffer = try allocator.alloc(u8, recommended_chunk);
defer allocator.free(chunk_input_buffer);

while (true) {
const read_len = try reader.read(chunk_input_buffer);
const read_len = try reader.readSliceShort(chunk_input_buffer);
if (read_len == 0) break;

try encoder.writeChunk(writer, chunk_input_buffer[0..read_len]);
Expand All @@ -152,8 +151,8 @@ pub fn decode(allocator: Allocator, data: []const u8) ![]u8 {
}

/// Decode framed input from `reader`, writing decompressed output into `writer`.
pub fn decodeFromReader(allocator: Allocator, reader: anytype, writer: anytype) !void {
var chunk_buf = std.ArrayListUnmanaged(u8).empty;
pub fn decodeFromReader(allocator: Allocator, reader: *std.Io.Reader, writer: *std.Io.Writer) !void {
var chunk_buf: std.ArrayList(u8) = .empty;
defer chunk_buf.deinit(allocator);

var processed_any_chunk = false;
Expand Down Expand Up @@ -212,9 +211,12 @@ pub fn decodeFromReader(allocator: Allocator, reader: anytype, writer: anytype)
chunk_buf.clearRetainingCapacity();
}

// A stream that contained the identifier but no data chunks is a valid
// A stream that contains the identifier but no data chunks is a valid
// empty payload per the Snappy framing spec. Only treat the input as
// unframed when neither was seen.
// unframed when neither was seen — peer clients (Go's snappy.NewReader,
// Rust's snap::read::FrameDecoder) accept this shape and decode it to
// an empty slice. Cross-client interop fixtures emit exactly the
// 10-byte "\xff\x06\x00\x00sNaPpY" form for empty input.
if (!saw_stream_identifier and !saw_data_chunk) return FrameError.NotFramed;
}

Expand All @@ -225,8 +227,8 @@ fn decodeFramed(allocator: Allocator, data: []const u8) ![]u8 {
var saw_data_chunk = false;
var saw_stream_identifier = false;

var output = std.ArrayListUnmanaged(u8).empty;
errdefer output.deinit(allocator);
var allocating = std.Io.Writer.Allocating.init(allocator);
errdefer allocating.deinit();

while (cursor < data.len) {
if (data.len - cursor < 4) return FrameError.UnexpectedEof;
Expand Down Expand Up @@ -254,13 +256,11 @@ fn decodeFramed(allocator: Allocator, data: []const u8) ![]u8 {
saw_stream_identifier = true;
},
.compressed => {
const writer = output.writer(allocator);
try writeCompressedChunk(allocator, writer, chunk_data);
try writeCompressedChunk(allocator, &allocating.writer, chunk_data);
saw_data_chunk = true;
},
.uncompressed => {
const writer = output.writer(allocator);
try writeUncompressedChunk(writer, chunk_data);
try writeUncompressedChunk(&allocating.writer, chunk_data);
saw_data_chunk = true;
},
}
Expand All @@ -276,13 +276,13 @@ fn decodeFramed(allocator: Allocator, data: []const u8) ![]u8 {
return FrameError.UnsupportedUnskippableChunkType;
}

// A stream that contained the identifier but no data chunks is a valid
// A stream that contains the identifier but no data chunks is a valid
// empty payload per the Snappy framing spec. Only treat the input as
// unframed when neither was seen.
// unframed when neither was seen — see `decodeFromReader` for the
// full rationale.
if (!saw_stream_identifier and !saw_data_chunk) return FrameError.NotFramed;

// Some producers may omit the identifier. Only enforce when data present with mismatched chunk.
return output.toOwnedSlice(allocator);
return allocating.toOwnedSlice();
}

fn ensureStreamIdentifier(chunk_payload: []const u8) !void {
Expand All @@ -292,15 +292,15 @@ fn ensureStreamIdentifier(chunk_payload: []const u8) !void {
}
}

fn writeUncompressedChunk(writer: anytype, chunk_payload: []const u8) !void {
fn writeUncompressedChunk(writer: *std.Io.Writer, chunk_payload: []const u8) !void {
if (chunk_payload.len < 4) return FrameError.UnexpectedEof;
const expected_checksum = readU32le(chunk_payload[0..4]);
const raw_payload = chunk_payload[4..];
try validateChecksum(raw_payload, expected_checksum);
try writer.writeAll(raw_payload);
}

fn writeCompressedChunk(allocator: Allocator, writer: anytype, chunk_payload: []const u8) !void {
fn writeCompressedChunk(allocator: Allocator, writer: *std.Io.Writer, chunk_payload: []const u8) !void {
if (chunk_payload.len < 4) return FrameError.UnexpectedEof;
const expected_checksum = readU32le(chunk_payload[0..4]);
const compressed_payload = chunk_payload[4..];
Expand All @@ -310,7 +310,7 @@ fn writeCompressedChunk(allocator: Allocator, writer: anytype, chunk_payload: []
try writer.writeAll(decoded);
}

fn writeChunkHeader(writer: anytype, chunk_type: ChunkType, payload_len: usize) !void {
fn writeChunkHeader(writer: *std.Io.Writer, chunk_type: ChunkType, payload_len: usize) !void {
if (payload_len > max_chunk_len) return FrameError.ChunkTooLarge;
const chunk_type_byte: u8 = chunk_type.toByte();
const byte0: u8 = @intCast(payload_len & 0xff);
Expand Down Expand Up @@ -355,31 +355,24 @@ fn crc32c(data: []const u8) u32 {
return std.hash.crc.Crc32Iscsi.hash(data);
}

fn readExact(reader: anytype, buffer: []u8) !void {
fn readExact(reader: *std.Io.Reader, buffer: []u8) !void {
var index: usize = 0;
while (index < buffer.len) {
const read_len = try reader.read(buffer[index..]);
const read_len = try reader.readSliceShort(buffer[index..]);
if (read_len == 0) return FrameError.UnexpectedEof;
index += read_len;
}
}

fn readByte(reader: anytype) !?u8 {
fn readByte(reader: *std.Io.Reader) !?u8 {
var byte: [1]u8 = undefined;
const read_len = try reader.read(&byte);
const read_len = try reader.readSliceShort(&byte);
if (read_len == 0) return null;
return byte[0];
}

fn loadFileAlloc(allocator: Allocator, path: []const u8) ![]u8 {
var file = try std.fs.cwd().openFile(path, .{});
defer file.close();

const stat = try file.stat();
const buf = try allocator.alloc(u8, stat.size);
const read_len = try file.readAll(buf);
std.debug.assert(read_len == stat.size);
return buf;
return std.Io.Dir.cwd().readFileAlloc(std.testing.io, path, allocator, .unlimited);
}

const go_writer_golden_frame =
Expand Down Expand Up @@ -412,44 +405,45 @@ test "encodeToWriter matches encode" {
const direct = try encode(allocator, sample);
defer allocator.free(direct);

var reader_stream = std.io.fixedBufferStream(sample);
var encoded_buffer = std.ArrayListUnmanaged(u8).empty;
defer encoded_buffer.deinit(allocator);
var reader_stream: std.Io.Reader = .fixed(sample);
var encoded_buffer = std.Io.Writer.Allocating.init(allocator);
defer encoded_buffer.deinit();

try encodeToWriter(allocator, reader_stream.reader(), encoded_buffer.writer(allocator));
try encodeToWriter(allocator, &reader_stream, &encoded_buffer.writer);

try std.testing.expectEqualSlices(u8, direct, encoded_buffer.items);
try std.testing.expectEqualSlices(u8, direct, encoded_buffer.written());
}

test "FrameEncoder manual streaming API" {
const allocator = std.testing.allocator;
const parts = [_][]const u8{ "frame-", "encoder-", "stream" };

var encoder = FrameEncoder.init(allocator);
var encoded = std.ArrayListUnmanaged(u8).empty;
defer encoded.deinit(allocator);
var encoded = std.Io.Writer.Allocating.init(allocator);
defer encoded.deinit();

var i: usize = 0;
while (i < parts.len) : (i += 1) {
try encoder.writeChunk(encoded.writer(allocator), parts[i]);
try encoder.writeChunk(&encoded.writer, parts[i]);
}

try encoder.finish(encoded.writer(allocator));
try encoder.finish(&encoded.writer);

var combined_builder = std.ArrayListUnmanaged(u8).empty;
var combined_builder: std.ArrayList(u8) = .empty;
defer combined_builder.deinit(allocator);
for (parts) |segment| {
try combined_builder.appendSlice(allocator, segment);
}
const combined = try combined_builder.toOwnedSlice(allocator);
defer allocator.free(combined);

const decoded_manual = try decode(allocator, encoded.items);
const encoded_bytes = encoded.written();
const decoded_manual = try decode(allocator, encoded_bytes);
defer allocator.free(decoded_manual);

try std.testing.expectEqualSlices(u8, combined, decoded_manual);

try std.testing.expect(std.mem.startsWith(u8, encoded.items, stream_identifier));
try std.testing.expect(std.mem.startsWith(u8, encoded_bytes, stream_identifier));
}

test "decodeFromReader matches decode" {
Expand All @@ -459,13 +453,13 @@ test "decodeFromReader matches decode" {
const encoded = try encode(allocator, sample);
defer allocator.free(encoded);

var reader_stream = std.io.fixedBufferStream(encoded);
var decoded_buffer = std.ArrayListUnmanaged(u8).empty;
defer decoded_buffer.deinit(allocator);
var reader_stream: std.Io.Reader = .fixed(encoded);
var decoded_buffer = std.Io.Writer.Allocating.init(allocator);
defer decoded_buffer.deinit();

try decodeFromReader(allocator, reader_stream.reader(), decoded_buffer.writer(allocator));
try decodeFromReader(allocator, &reader_stream, &decoded_buffer.writer);

try std.testing.expectEqualSlices(u8, sample, decoded_buffer.items);
try std.testing.expectEqualSlices(u8, sample, decoded_buffer.written());
}

test "frame roundtrip samples" {
Expand Down Expand Up @@ -532,33 +526,6 @@ test "decode falls back to raw snappy payloads" {
try std.testing.expectEqualSlices(u8, sample, decoded);
}

test "decode accepts identifier-only stream as empty payload" {
// A frame containing only the stream identifier (no data chunks) is a
// valid empty payload per the Snappy framing spec. Go's snappy.NewReader
// and Rust's snap::read::FrameDecoder both decode this 10-byte input to
// an empty slice; cross-client interop fixtures (e.g. leanSpec) emit
// exactly this representation for empty input.
const allocator = std.testing.allocator;
const identifier_only = "\xff\x06\x00\x00sNaPpY";

const decoded = try decode(allocator, identifier_only);
defer allocator.free(decoded);

try std.testing.expectEqual(@as(usize, 0), decoded.len);
}

test "decodeFromReader accepts identifier-only stream as empty payload" {
const allocator = std.testing.allocator;
const identifier_only = "\xff\x06\x00\x00sNaPpY";

var input_stream = std.io.fixedBufferStream(identifier_only);
var output = std.ArrayListUnmanaged(u8).empty;
defer output.deinit(allocator);

try decodeFromReader(allocator, input_stream.reader(), output.writer(allocator));
try std.testing.expectEqual(@as(usize, 0), output.items.len);
}

test "decode rejects invalid stream identifier" {
const allocator = std.testing.allocator;
const sample = "identifier";
Expand Down Expand Up @@ -643,3 +610,34 @@ test "encode compatibility with rust snappy frame alice29" {

try std.testing.expectEqualSlices(u8, expected_frame, encoded);
}

test "decode accepts identifier-only stream as empty payload" {
// Canonical 10-byte "empty" Snappy framed stream: stream identifier
// chunk only, no data chunks. Go's `snappy.NewReader` and Rust's
// `snap::read::FrameDecoder` both decode this to an empty slice;
// accepting it here makes the decoder interoperable with peer
// implementations and with leanSpec's `test_snappy_frame_empty`
// fixture. The existing "frame roundtrip samples" test already covered
// round-tripping "" through the lib's own encoder, but the encoder
// appends an empty data chunk in finish(), which masked the gap on the
// decode side.
const allocator = std.testing.allocator;
const identifier_only = "\xff\x06\x00\x00sNaPpY";

const decoded = try decode(allocator, identifier_only);
defer allocator.free(decoded);

try std.testing.expectEqual(@as(usize, 0), decoded.len);
}

test "decodeFromReader accepts identifier-only stream as empty payload" {
const allocator = std.testing.allocator;
const identifier_only = "\xff\x06\x00\x00sNaPpY";

var reader_stream: std.Io.Reader = .fixed(identifier_only);
var decoded_buffer = std.Io.Writer.Allocating.init(allocator);
defer decoded_buffer.deinit();

try decodeFromReader(allocator, &reader_stream, &decoded_buffer.writer);
try std.testing.expectEqual(@as(usize, 0), decoded_buffer.written().len);
}
Loading