API Reference
All public types are exported from src/root.zig and accessible via
@import("zigbolt").
Table of Contents
Section titled “Table of Contents”- Platform
- Core Data Structures
- Wire Codec
- IPC Channel
- UDP Channel
- Network Channel
- Reliability Protocol
- Fragment Layer
- Publisher / Subscriber API
- Transport
- Archive
- Sequencer
- Cluster (Raft Consensus)
- Write-Ahead Log
- Snapshots
- SBE Codec
- FIX Messages
- Wire Protocol Flyweights
- Broadcast Buffer
- Idle Strategies
- Agent Pattern
- Counters
- Congestion Control
- Flow Control
- Archive Catalog
- Archive Index
- Compression
- FFI Exports
Platform
Section titled “Platform”platform.config
Section titled “platform.config”const cache_line_size: usize; // 128 on modern CPUsconst page_size: usize; // 4096const is_linux: bool;const is_macos: bool;const supports_hugepages: bool; // true on Linuxconst supports_io_uring: bool; // true on Linuxconst frame_alignment: u32 = 8;const default_term_length: usize; // 1 << 20 (1 MB)const default_ring_capacity: usize; // 1 << 16 (64K)
fn timestampNs() u64; // nanosecond timestampfn alignUp(size: u32, alignment: u32) u32;platform.memory
Section titled “platform.memory”const SharedRegion = struct { base: [*]align(std.heap.page_size_min) u8, len: usize, fd: ?posix.fd_t, // Owned copy of the shm name (creator side only; deinit unlinks it). name_buf: [max_shm_name_len:0]u8, name_len: usize,
fn ptrAt(self: SharedRegion, comptime T: type, byte_offset: usize) *align(cache_line_size) T; fn sliceAt(self: SharedRegion, byte_offset: usize, len: usize) []u8; fn deinit(self: *SharedRegion) void; // munmap + close + unlink (if owner)};
const MemoryConfig = struct { use_hugepages: bool = false, // Linux, anonymous regions only (best-effort) pre_fault: bool = true, mlock: bool = true, // lock pages in RAM (prevent swapping)};
fn createShared(name: [*:0]const u8, size: usize, config: MemoryConfig) !SharedRegion;fn openShared(name: [*:0]const u8, size: usize) !SharedRegion;fn createAnonymous(size: usize, config: MemoryConfig) !SharedRegion;fn prefault(region: SharedRegion) void;Core Data Structures
Section titled “Core Data Structures”SpscRingBuffer(comptime capacity: usize)
Section titled “SpscRingBuffer(comptime capacity: usize)”Lock-free single-producer single-consumer ring buffer. capacity must be a
power of 2.
const RB = zigbolt.SpscRingBuffer(1024);var rb = RB.init();| Method | Signature | Description |
|---|---|---|
init | fn init() Self | Create a zeroed ring buffer |
write | fn write(self: *Self, data: []const u8, msg_type_id: i32) bool | Write a framed message. Returns false if full. |
read | fn read(self: *Self) ?ReadResult | Read the next message. Returns null if empty. |
ReadResult:
pub const ReadResult = struct { data: []const u8, msg_type_id: i32,};MpscRingBuffer(comptime capacity: usize)
Section titled “MpscRingBuffer(comptime capacity: usize)”Lock-free multi-producer single-consumer ring buffer using CAS.
capacity must be a power of 2.
const RB = zigbolt.MpscRingBuffer(1024);var rb = RB.init();| Method | Signature | Description |
|---|---|---|
init | fn init() Self | Create a zeroed ring buffer |
write | fn write(self: *Self, data: []const u8, msg_type_id: i32) bool | Thread-safe write via CAS. Returns false if full. |
read | fn read(self: *Self) ?ReadResult | Single-consumer read. Returns null if empty or uncommitted. |
LogBuffer(comptime cfg: LogBufferConfig)
Section titled “LogBuffer(comptime cfg: LogBufferConfig)”Aeron-style triple-buffered log with term rotation.
const Buf = zigbolt.LogBuffer(.{ .term_length = 1 << 20 });var buf = Buf.init();LogBufferConfig:
pub const LogBufferConfig = struct { term_length: usize = 1 << 20, // must be power of 2};| Method | Signature | Description |
|---|---|---|
init | fn init() Self | Create a zeroed log buffer |
claim | fn claim(self: *Self, length: u32) ?Claim | Claim space for a message. Returns null if consumer is too far behind. |
commit | fn commit(self: *Self, c: Claim, msg_type_id: i32) void | Commit a claimed frame, making it visible to readers. |
read | fn read(self: *Self, handler: *const fn([]const u8, i32) void, limit: u32) u32 | Read committed frames, calling handler for each. Returns count. |
Claim:
pub const Claim = struct { term_buffer: [*]u8, term_offset: u32, length: u32, term_id: u32,};FrameHeader
Section titled “FrameHeader”pub const FrameHeader = extern struct { frame_length: i32 = 0, // >0: data, <0: padding, =0: uncommitted msg_type_id: i32 = 0, pub const SIZE: u32 = 8;};Frame Helpers
Section titled “Frame Helpers”fn alignedFrameLength(payload_length: u32) u32;fn isPaddingFrame(frame_length: i32) bool;fn isDataFrame(frame_length: i32) bool;fn isUncommitted(frame_length: i32) bool;const MAX_PAYLOAD_SIZE: u32 = 1 << 24; // 16 MBWire Codec
Section titled “Wire Codec”WireCodec(comptime T: type)
Section titled “WireCodec(comptime T: type)”Comptime-generated zero-copy codec for packed structs. T must be a packed struct
with no pointer or slice fields. Wire size must be a multiple of 8 bytes.
const Codec = zigbolt.WireCodec(zigbolt.TickMessage);| Member | Type | Description |
|---|---|---|
wire_size | usize | Size of the wire representation in bytes |
Type | type | The underlying message type |
| Method | Signature | Description |
|---|---|---|
encode | fn encode(msg: *const T, buf: []u8) void | Copy message bytes into buffer |
decode | fn decode(buf: []const u8) *align(1) const T | Zero-copy: returns pointer into buffer. Precondition: buf.len >= wire_size (length-check first on the hot path) |
decodeChecked | fn decodeChecked(buf: []const u8) error{Truncated}!*align(1) const T | Checked zero-copy decode for untrusted input |
decodeMut | fn decodeMut(buf: []u8) *align(1) T | Mutable zero-copy decode |
batchDecode | fn batchDecode(buf: []const u8, out: []T) u32 | Decode multiple messages |
batchEncode | fn batchEncode(msgs: []const T, buf: []u8) u32 | Encode multiple messages |
Built-in Message Types
Section titled “Built-in Message Types”TickMessage (32 bytes):
pub const TickMessage = packed struct { timestamp_ns: u64, symbol_id: u32, price: i64, volume: u64, side: enum(u8) { bid = 0, ask = 1 }, _padding: u24 = 0,};OrderMessage (48 bytes):
pub const OrderMessage = packed struct { timestamp_ns: u64, order_id: u64, symbol_id: u32, price: i64, quantity: u64, side: enum(u8) { buy = 0, sell = 1 }, order_type: enum(u8) { limit = 0, market = 1, cancel = 2 }, _padding: u16 = 0,};IPC Channel
Section titled “IPC Channel”IpcConfig
Section titled “IpcConfig”pub const IpcConfig = struct { term_length: usize = default_term_length, // power of 2 use_hugepages: bool = false, // Linux only pre_fault: bool = true, // pre-fault pages};IpcChannel
Section titled “IpcChannel”Shared-memory IPC channel. SPSC: one publisher, one subscriber.
| Method | Signature | Description |
|---|---|---|
create | fn create(name: [*:0]const u8, config: IpcConfig) !IpcChannel | Create a new channel (publisher side) |
open | fn open(name: [*:0]const u8, config: IpcConfig) !IpcChannel | Open an existing channel (subscriber side) |
publish | fn publish(self: *IpcChannel, data: []const u8, msg_type_id: i32) !void | Publish a message |
poll | fn poll(self: *IpcChannel, handler: *const fn(ReadResult) void, limit: u32) u32 | Poll for messages. Returns count. |
deinit | fn deinit(self: *IpcChannel) void | Close and release resources |
ReadResult:
pub const ReadResult = struct { data: []const u8, msg_type_id: i32,};Errors:
error.InvalidChannel— magic number mismatch on openerror.UnsupportedVersion— protocol version mismatcherror.InvalidTermLength/error.TermLengthMismatch/error.ShmTooSmall— bad or inconsistent term length on create/openerror.MessageTooLarge— payload exceeds the maximum frame payloaderror.BackPressure— publish would overrun the unconsumed windowerror.CorruptChannel— shared positions failed validation
UDP Channel
Section titled “UDP Channel”UdpConfig
Section titled “UdpConfig”pub const UdpConfig = struct { bind_address: std.net.Address, remote_address: ?std.net.Address = null, multicast_group: ?[4]u8 = null, multicast_ttl: ?u8 = null, // null keeps the OS default (1) multicast_loop: ?bool = null, // null keeps the OS default (enabled) send_buffer_size: u32 = 2 * 1024 * 1024, // 2 MB recv_buffer_size: u32 = 2 * 1024 * 1024, // 2 MB non_blocking: bool = true,};UdpChannel
Section titled “UdpChannel”UDP unicast and multicast channel.
| Method | Signature | Description |
|---|---|---|
init | fn init(config: UdpConfig) !UdpChannel | Create and bind a UDP socket |
deinit | fn deinit(self: *UdpChannel) void | Close the socket |
send | fn send(self: *UdpChannel, data: []const u8, dest: ?net.Address) !usize | Send a raw datagram |
recv | fn recv(self: *UdpChannel, buf: []u8) !?RecvResult | Receive a raw datagram (non-blocking) |
sendFrame | fn sendFrame(self: *UdpChannel, data: []const u8, msg_type_id: i32, dest: ?net.Address) !void | Send a framed message (FrameHeader + payload) |
recvFrame | fn recvFrame(self: *UdpChannel, buf: []u8) !?FrameRecvResult | Receive and parse a framed message |
RecvResult:
pub const RecvResult = struct { data: []const u8, from: std.net.Address,};FrameRecvResult:
pub const FrameRecvResult = struct { payload: []const u8, msg_type_id: i32, from: std.net.Address,};Network Channel
Section titled “Network Channel”NetworkConfig
Section titled “NetworkConfig”pub const NetworkConfig = struct { udp: UdpConfig, session_id: u32 = 1, stream_id: u32 = 1, send_buffer_capacity: usize = 4096, recv_window_size: u64 = 4096, flow_control_window: i64 = 4 * 1024 * 1024, // 4 MB mtu: u32 = 1472, max_message_size: u32 = 1 << 20, heartbeat_interval_ns: u64 = 100_000_000, // 100 ms nak_delay_ns: u64 = 1_000_000, // 1 ms expected_peer: ?std.net.Address = null, // drop datagrams from other sources max_retransmits_per_interval: u32 = 1024, // NAK-amplification defence retransmit_interval_ns: u64 = 10_000_000, // 10 ms};NetworkChannel
Section titled “NetworkChannel”Reliable, ordered network channel. Combines UDP, NAK reliability, flow control, and fragmentation.
| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator, config: NetworkConfig) !NetworkChannel | Initialize all sub-components |
deinit | fn deinit(self: *NetworkChannel) void | Release all resources |
publish | fn publish(self: *NetworkChannel, data: []const u8, msg_type_id: i32) !void | Publish with reliability and flow control |
poll | fn poll(self: *NetworkChannel, handler: *const fn([]const u8) void, limit: u32) !u32 | Poll for complete messages |
Errors:
error.BackPressured— flow control window exhausted
Reliability Protocol
Section titled “Reliability Protocol”NetworkHeader
Section titled “NetworkHeader”pub const NetworkHeader = extern struct { version: u8 = 1, header_type: HeaderType, session_id: u32, stream_id: u32, sequence: u64, payload_length: u32, _reserved: [3]u8 = .{0, 0, 0},
pub const HeaderType = enum(u8) { data, nak, heartbeat, setup, teardown }; pub const SIZE: usize;};NakMessage
Section titled “NakMessage”pub const NakMessage = extern struct { session_id: u32, stream_id: u32, from_sequence: u64, count: u32, _padding: [4]u8,};SendBuffer
Section titled “SendBuffer”Stores sent payloads for retransmission on NAK.
| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator, capacity: usize) !SendBuffer | Allocate entry ring |
deinit | fn deinit(self: *SendBuffer, allocator: Allocator) void | Free all entries |
store | fn store(self: *SendBuffer, sequence: u64, data: []const u8, allocator: Allocator) !void | Store a copy for retransmit |
get | fn get(self: *SendBuffer, sequence: u64) ?*SendEntry | Look up by sequence |
release | fn release(self: *SendBuffer, up_to_sequence: u64) void | Release acknowledged entries |
RecvTracker
Section titled “RecvTracker”Bitmap-based gap detection.
| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator, window_size: u64) !RecvTracker | Allocate bitmap |
deinit | fn deinit(self: *RecvTracker) void | Free bitmap |
recordReceived | fn recordReceived(self: *RecvTracker, sequence: u64) ?GapInfo | Record a sequence, return gap if detected |
getMissing | fn getMissing(self: *RecvTracker, allocator: Allocator) ![]u64 | List all missing sequences in window |
slideWindow | fn slideWindow(self: *RecvTracker, new_base: u64) void | Advance the window forward |
FlowControl
Section titled “FlowControl”Credit-based flow control.
| Method | Signature | Description |
|---|---|---|
init | fn init(window_size: i64) FlowControl | Initialize with credit window |
tryConsume | fn tryConsume(self: *FlowControl, bytes: usize) bool | Atomically consume credits |
replenish | fn replenish(self: *FlowControl, bytes: usize) void | Add credits back |
available | fn available(self: *FlowControl) i64 | Current available credits |
Fragment Layer
Section titled “Fragment Layer”Fragmenter
Section titled “Fragmenter”Splits large messages into MTU-sized fragments.
Reassembler
Section titled “Reassembler”Collects fragments and delivers complete messages.
FragmentConfig
Section titled “FragmentConfig”pub const FragmentConfig = struct { mtu: u32 = 1472, max_message_size: u32 = 1 << 20,};Publisher / Subscriber API
Section titled “Publisher / Subscriber API”Publisher(comptime MsgType: type)
Section titled “Publisher(comptime MsgType: type)”Typed publisher using WireCodec(MsgType) over IPC.
var pub = zigbolt.Publisher(TickMessage).init(&channel, 1);try pub.offer(&tick_msg);| Method | Signature | Description |
|---|---|---|
init | fn init(channel: *IpcChannel, msg_type_id: i32) Self | Bind to a channel |
offer | fn offer(self: *Self, msg: *const MsgType) !void | Publish a typed message |
tryOffer | fn tryOffer(self: *Self, msg: *const MsgType) !bool | Returns false on back-pressure; other failures (MessageTooLarge, CorruptChannel) surface as errors |
offerRaw | fn offerRaw(self: *Self, data: []const u8) !void | Publish pre-encoded bytes |
RawPublisher
Section titled “RawPublisher”Untyped publisher for raw byte messages.
| Method | Signature | Description |
|---|---|---|
init | fn init(channel: *IpcChannel, msg_type_id: i32) RawPublisher | Bind to a channel |
offer | fn offer(self: *RawPublisher, data: []const u8) !void | Publish raw bytes |
Subscriber(comptime MsgType: type)
Section titled “Subscriber(comptime MsgType: type)”Typed subscriber using WireCodec(MsgType) over IPC.
var sub = zigbolt.Subscriber(TickMessage).init(&channel, 1);_ = sub.poll(&handleTick, 100);| Method | Signature | Description |
|---|---|---|
init | fn init(channel: *IpcChannel, msg_type_id: i32) Self | Bind to a channel |
poll | fn poll(self: *Self, handler: *const fn(*align(1) const MsgType) void, limit: u32) u32 | Poll and decode. Only frames matching msg_type_id (and at least wire_size bytes) are delivered; non-matching/short frames are consumed and skipped. Returns the number delivered |
pollRaw | fn pollRaw(self: *Self, handler: *const fn(IpcChannel.ReadResult) void, limit: u32) u32 | Poll raw frames (no type filter) |
RawSubscriber
Section titled “RawSubscriber”Untyped subscriber for raw byte messages.
| Method | Signature | Description |
|---|---|---|
init | fn init(channel: *IpcChannel) RawSubscriber | Bind to a channel |
poll | fn poll(self: *RawSubscriber, handler: *const fn(IpcChannel.ReadResult) void, limit: u32) u32 | Poll raw frames |
Transport
Section titled “Transport”TransportConfig
Section titled “TransportConfig”pub const TransportConfig = struct { term_length: usize = 1 << 20, use_hugepages: bool = false, pre_fault: bool = true,};Transport
Section titled “Transport”Main entry point. Manages IPC channels and creates typed publishers/subscribers.
| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator, config: TransportConfig) Transport | Create a transport |
deinit | fn deinit(self: *Transport) void | Shut down all channels |
addPublication | fn addPublication(self, comptime MsgType, name: [:0]const u8, msg_type_id: i32) !Publisher(MsgType) | Create a typed publisher |
addSubscription | fn addSubscription(self, comptime MsgType, name: [:0]const u8, msg_type_id: i32) !Subscriber(MsgType) | Create a typed subscriber |
addRawPublication | fn addRawPublication(self, name: [:0]const u8, msg_type_id: i32) !RawPublisher | Create a raw publisher |
addRawSubscription | fn addRawSubscription(self, name: [:0]const u8) !RawSubscriber | Create a raw subscriber |
Archive
Section titled “Archive”ArchiveConfig
Section titled “ArchiveConfig”pub const ArchiveConfig = struct { segment_size: usize = 256 * 1024 * 1024, // 256 MB base_path: []const u8 = "/tmp/zigbolt/archive", sync_policy: SyncPolicy = .periodic, sync_interval_ms: u32 = 1000,
pub const SyncPolicy = enum { none, periodic, every_segment };};(Compression is a standalone module — see Compression — and is not yet wired into the archive recording path.)
Archive
Section titled “Archive”Segment-based message recording and replay.
| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator, config: ArchiveConfig) !Archive | Initialize archive |
deinit | fn deinit(self: *Archive) void | Release resources |
record | fn record(self: *Archive, stream_id: u32, msg_type_id: i32, data: []const u8, timestamp_ns: u64) !void | Record a message |
replay | fn replay(self: *Archive, params: ReplayParams, handler: *const fn(Record) void) !u64 | Replay messages. Returns count. |
stats | fn stats(self: *const Archive) Stats | Get archive statistics |
ReplayParams:
pub const ReplayParams = struct { stream_id: ?u32 = null, // null = all streams from_segment: u64 = 0, from_offset: u64 = 0, limit: ?u64 = null,};Stats:
pub const Stats = struct { total_records: u64, total_bytes: u64, segment_count: u64,};Sequencer
Section titled “Sequencer”Sequencer
Section titled “Sequencer”Atomic total-order sequence assignment.
var seq = zigbolt.Sequencer.init(.{ .initial_sequence = 0 });const event = seq.sequence(stream_id, payload);| Method | Signature | Description |
|---|---|---|
init | fn init(config: SequencerConfig) Sequencer | Initialize sequencer |
sequence | fn sequence(self: *Sequencer, stream_id: u32, payload: []const u8) SequencedEvent | Assign next sequence number (thread-safe) |
peekNextSequence | fn peekNextSequence(self: *const Sequencer) u64 | Read next sequence without consuming |
reset | fn reset(self: *Sequencer, initial_sequence: u64) void | Reset for testing/replay |
SequencedEvent:
pub const SequencedEvent = struct { sequence: u64, timestamp_ns: u64, stream_id: u32, payload: []const u8,};MultiStreamSequencer
Section titled “MultiStreamSequencer”Merges multiple input streams into one globally ordered output.
| Method | Signature | Description |
|---|---|---|
init | fn init(config: SequencerConfig) error{TooManyStreams}!MultiStreamSequencer | Initialize (max 64 streams) |
sequenceFrom | fn sequenceFrom(self, stream_id: u32, payload: []const u8) error{InvalidStreamId}!SequencedEvent | Sequence from a specific stream |
getStreamStats | fn getStreamStats(self, stream_id: u32) ?StreamStats | Per-stream statistics (null if out of range) |
totalEvents | fn totalEvents(self) u64 | Total events across all streams |
SequenceIndex
Section titled “SequenceIndex”Maps sequence numbers to stream/offset for replay.
| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator) SequenceIndex | Initialize |
deinit | fn deinit(self: *SequenceIndex) void | Free memory |
add | fn add(self, entry: IndexEntry) !void | Add an index entry |
lookup | fn lookup(self, seq: u64) ?IndexEntry | Look up by sequence number |
rangeFrom | fn rangeFrom(self, from_sequence: u64) []const IndexEntry | Get all entries from a sequence |
Cluster (Raft Consensus)
Section titled “Cluster (Raft Consensus)”Experimental. The Raft module applies only committed entries, validates vote/response terms, supports single-node election, and persists durably (WAL + persisted vote/term + atomic snapshots + crash recovery on restart). However, there are no built-in election timers or transport loop — driving timeouts and delivering messages between nodes is the embedder’s responsibility — and there is no log compaction, InstallSnapshot RPC, or membership change support. It has not yet been validated with real multi-process fault injection. Do not treat it as production-ready.
RaftConfig
Section titled “RaftConfig”pub const RaftConfig = struct { node_id: u32, peer_count: u32, election_timeout_min_ms: u32 = 150, election_timeout_max_ms: u32 = 300, heartbeat_interval_ms: u32 = 50,};RaftNode
Section titled “RaftNode”Full Raft consensus implementation.
| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator, config: RaftConfig) !RaftNode | Initialize as follower (in-memory only) |
initWithPersistence | fn initWithPersistence(allocator: Allocator, config: RaftConfig, persistence: ?RaftPersistence) !RaftNode | Initialize with an optional durable backend (WAL + vote file + optional snapshots); recovers term/vote/log/snapshot from disk |
deinit | fn deinit(self: *RaftNode) void | Free resources |
handleMessage | fn handleMessage(self, from: u32, msg: RaftMessage) ?MessageResponse | Handle incoming Raft message; returns the response to send back, if any |
startElection | fn startElection(self) ?RaftMessage | Begin leader election. Returns the RequestVote to broadcast, or null if the node is wedged on a persistence failure |
propose | fn propose(self, data: []const u8) !u64 | Propose a log entry (leader only). Returns the log index; error.NotLeader otherwise |
createAppendEntries | fn createAppendEntries(self, peer_id: u32) ?AppendEntries | Create replication message for a peer (null for unknown peer / wedged node) |
createHeartbeat | fn createHeartbeat(self, peer_id: u32) ?AppendEntries | Create empty heartbeat for a peer |
takeSnapshot | fn takeSnapshot(self, state_data: []const u8) !void | Persist a point-in-time state snapshot (requires persistence with snapshots) |
getApplicableEntries | fn getApplicableEntries(self) []const StoredEntry | Get committed but unapplied entries |
markApplied | fn markApplied(self, up_to: u64) void | Mark entries as applied |
updateCommitIndex | fn updateCommitIndex(self) void | Recalculate commit index from match_index |
RaftPersistence:
pub const RaftPersistence = struct { wal: *WriteAheadLog, vote_path: []const u8, snapshots: ?*SnapshotManager = null,};NodeState: enum { follower, candidate, leader }
RaftMessage:
pub const RaftMessage = union(enum) { request_vote: RequestVote, request_vote_response: RequestVoteResponse, append_entries: AppendEntries, append_entries_response: AppendEntriesResponse,};ClusterConfig
Section titled “ClusterConfig”pub const ClusterConfig = struct { node_id: u32, peer_count: u32, election_timeout_min_ms: u32 = 150, election_timeout_max_ms: u32 = 300, heartbeat_interval_ms: u32 = 50,};StateMachine
Section titled “StateMachine”User-implemented interface for applying committed entries.
pub const StateMachine = struct { apply_fn: *const fn (entry: []const u8) void, snapshot_fn: ?*const fn () []const u8 = null, restore_fn: ?*const fn (snapshot: []const u8) void = null,};Cluster
Section titled “Cluster”High-level cluster that wraps RaftNode and a StateMachine.
| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator, config: ClusterConfig, sm: ?StateMachine) !Cluster | Initialize (in-memory) |
initWithPersistence | fn initWithPersistence(allocator: Allocator, config: ClusterConfig, sm: ?StateMachine, persistence: ?RaftPersistence) !Cluster | Initialize with durable backend; restores the state machine from the recovered snapshot via restore_fn |
deinit | fn deinit(self: *Cluster) void | Shut down |
propose | fn propose(self, data: []const u8) !u64 | Propose command (leader only) |
handleMessage | fn handleMessage(self, from: u32, msg: RaftMessage) ?MessageResponse | Process message |
tick | fn tick(self: *Cluster) void | Apply committed entries to state machine |
isLeader | fn isLeader(self) bool | Check leadership |
getState | fn getState(self) NodeState | Current Raft state |
Write-Ahead Log
Section titled “Write-Ahead Log”WriteAheadLog
Section titled “WriteAheadLog”Persistent WAL for Raft consensus. Each entry is CRC32-validated on disk.
var wal = try WriteAheadLog.init(allocator, .{ .path = "zigbolt_raft.wal", .sync_policy = .every_n_entries, .sync_interval = 100,});defer wal.deinit();WalConfig:
pub const WalConfig = struct { path: []const u8 = "zigbolt_raft.wal", sync_policy: SyncPolicy = .every_n_entries, sync_interval: u32 = 100,};pub const SyncPolicy = enum { every_entry, every_n_entries, explicit };| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator, config: WalConfig) !WriteAheadLog | Create or open a WAL file |
deinit | fn deinit(self: *WriteAheadLog) void | Sync and close |
append | fn append(self, term: u64, index: u64, data: []const u8) !void | Append a CRC32-validated entry |
readEntry | fn readEntry(self, log_index: u64) !?WalEntry | Read entry by log index |
truncateFrom | fn truncateFrom(self, from_index: u64) !void | Remove entries >= from_index |
recover | fn recover(self) ![]WalEntry | Scan file, rebuild index, return valid entries |
flush | fn flush(self) !void | Force fsync to disk |
lastIndex | fn lastIndex(self) u64 | Last written log index |
lastTerm | fn lastTerm(self) u64 | Term of last entry |
entryCount | fn entryCount(self) u64 | Number of entries |
WalEntry:
pub const WalEntry = struct { term: u64, index: u64, data: []const u8,};VoteState
Section titled “VoteState”Persistent Raft vote state (16-byte file).
| Method | Signature | Description |
|---|---|---|
save | fn save(self: VoteState, path: []const u8) !void | Atomically save to file |
load | fn load(path: []const u8) !?VoteState | Load from file, null if missing |
Snapshots
Section titled “Snapshots”SnapshotManager
Section titled “SnapshotManager”Manages Raft snapshots on disk with CRC32 validation.
var mgr = try SnapshotManager.init(allocator, .{ .base_path = "/var/lib/zigbolt/snapshots", .snapshot_interval = 10000,});defer mgr.deinit();| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator, config: SnapshotConfig) !SnapshotManager | Initialize (fails on overly long base path) |
deinit | fn deinit(self: *SnapshotManager) void | Cleanup |
shouldSnapshot | fn shouldSnapshot(self) bool | True if interval reached |
onEntryCommitted | fn onEntryCommitted(self) void | Track committed entries |
takeSnapshot | fn takeSnapshot(self, last_term: u64, last_index: u64, state_data: []const u8) !void | Write snapshot to disk |
loadLatestSnapshot | fn loadLatestSnapshot(self) !?SnapshotData | Load newest snapshot |
getLatestMeta | fn getLatestMeta(self) ?SnapshotMeta | Metadata without loading state |
cleanOldSnapshots | fn cleanOldSnapshots(self, keep_count: usize) !void | Delete all but N newest |
SnapshotData (caller must call deinit()):
pub const SnapshotData = struct { last_included_term: u64, last_included_index: u64, data: []u8, allocator: std.mem.Allocator, pub fn deinit(self: *SnapshotData) void;};SBE Codec
Section titled “SBE Codec”SbeEncoder
Section titled “SbeEncoder”Encodes SBE messages into caller-provided byte buffers. Zero heap allocations.
var buf: [4096]u8 = undefined;var enc = SbeEncoder.init(&buf);const hdr_pos = try enc.putMessageHeader(42, 1, 1);try enc.putU64(timestamp);try enc.putI64(price);enc.finishHeader(hdr_pos);const wire_bytes = buf[0..enc.encodedLength()];| Method | Signature | Description |
|---|---|---|
init | fn init(buf: []u8) SbeEncoder | Initialize over buffer |
encodedLength | fn encodedLength(self) usize | Bytes written so far |
putMessageHeader | fn putMessageHeader(self, template_id: u16, schema_id: u16, version: u16) !usize | Write 8-byte header, returns position for finishHeader |
finishHeader | fn finishHeader(self, header_pos: usize) void | Patch block_length after root fields |
putU8..putU64 | fn putU64(self, val: u64) !void | Write unsigned integers |
putI8..putI64 | fn putI64(self, val: i64) !void | Write signed integers |
putF32/putF64 | fn putF64(self, val: f64) !void | Write floats |
putChar | fn putChar(self, val: u8) !void | Write character |
putBytes | fn putBytes(self, data: []const u8) !void | Write fixed-length bytes |
putEnum | fn putEnum(self, comptime E: type, val: E) !void | Write enum as integer |
beginGroup | fn beginGroup(self, block_length: u16, count: u16) !void | Write group header |
putVarData | fn putVarData(self, data: []const u8) !void | Write [u32 len][data] |
SbeDecoder
Section titled “SbeDecoder”Zero-copy SBE decoder. Returns pointers directly into the underlying buffer.
| Method | Signature | Description |
|---|---|---|
init | fn init(buf: []const u8) SbeDecoder | Initialize over buffer |
position | fn position(self) usize | Current read position |
remaining | fn remaining(self) usize | Bytes left |
skip | fn skip(self, n: usize) !void | Advance position |
getMessageHeader | fn getMessageHeader(self) !MessageHeader | Read 8-byte header |
getGroupHeader | fn getGroupHeader(self) !GroupHeader | Read 4-byte group header |
getU8..getU64 | fn getU64(self) !u64 | Read unsigned integers |
getI8..getI64 | fn getI64(self) !i64 | Read signed integers |
getF32/getF64 | fn getF64(self) !f64 | Read floats |
getBytes | fn getBytes(self, comptime N: usize) !*const [N]u8 | Zero-copy fixed bytes |
getBytesSlice | fn getBytesSlice(self, n: usize) ![]const u8 | Zero-copy runtime-length bytes |
getEnum | fn getEnum(self, comptime E: type) !E | Read enum |
getVarData | fn getVarData(self) ![]const u8 | Zero-copy variable-length data |
Decimal64
Section titled “Decimal64”Fixed-point decimal for financial prices. Only the mantissa is transmitted on the wire.
| Method | Signature | Description |
|---|---|---|
fromFloat | fn fromFloat(val: f64, exp: i8) Decimal64 | Construct from float |
toFloat | fn toFloat(self) f64 | Convert to f64 |
isNull | fn isNull(self) bool | Check null sentinel |
nullValue | fn nullValue() Decimal64 | Create null sentinel |
FIX Messages
Section titled “FIX Messages”SBE-encoded FIX protocol messages in src/codec/fix_messages.zig
(zigbolt.fix). Schema: SCHEMA_ID = 1, SCHEMA_VERSION = 1. Wire format
per message: [MessageHeader (8 bytes)][root block (BLOCK_LENGTH bytes)][groups...].
Enum Types
Section titled “Enum Types”Enum values follow the CME/iLink convention — most are ASCII character codes:
pub const Side = enum(u8) { buy = 1, sell = 2 };pub const OrdType = enum(u8) { market = 1, limit = 2, stop = 3, stop_limit = 4, market_limit = 75 }; // 75 = 'K'pub const TimeInForce = enum(u8) { day = 0, gtc = 1, ioc = 3, gtd = 6 };pub const ExecType = enum(u8) { // ASCII codes new = 48, // '0' partial_fill = 49, // '1' fill = 50, // '2' canceled = 52, // '4' replaced = 53, // '5' rejected = 56, // '8' expired = 67, // 'C' trade = 70, // 'F' status = 73, // 'I'};pub const OrdStatus = enum(u8) { // ASCII codes new = 48, partial_fill = 49, filled = 50, canceled = 52, replaced = 53, rejected = 56, expired = 67,};pub const MDUpdateAction = enum(u8) { new = 0, change = 1, delete = 2, overlay = 5 };pub const MDEntryType = enum(u8) { // ASCII codes bid = 48, offer = 49, trade = 50, opening_price = 52, settlement = 54, session_high = 55, session_low = 56, trade_volume = 66, open_interest = 67,};Also defined: HandInst, CustomerOrFirm, BooleanType, fixed-length string
types String2/3/6/10/12/20/23 (FixedString(N)), Decimal64 (constant
exponent -7, mantissa-only on wire), and IntQty32.
Fixed-Block Messages
Section titled “Fixed-Block Messages”Every message’s BLOCK_LENGTH is @sizeOf(struct) — the messages are
extern structs copied directly to/from the wire.
| Message | Template ID | FIX MsgType | Key Fields |
|---|---|---|---|
NewOrderSingle | 68 | D | account, cl_ord_id, order_qty, ord_type, price_mantissa, side, symbol, time_in_force, transact_time, stop_px_mantissa, … |
OrderCancelRequest | 70 | F | account, cl_ord_id, order_id, orig_cl_ord_id, side, symbol, transact_time, … |
OrderCancelReplaceRequest | 71 | G | same shape as NewOrderSingle + order_id, orig_cl_ord_id |
ExecutionReport | 56 | 8 | order_id, cl_ord_id, exec_type, ord_status, symbol, side, leaves_qty, cum_qty, avg_px_mantissa, last_qty, last_px_mantissa, … |
Heartbeat | 48 | 0 | timestamp |
Logon | 65 | A | heart_bt_int, username, password, reset_seq_num |
Logout | 53 | 5 | session_status, text |
Group-Based Messages
Section titled “Group-Based Messages”| Message | Template ID | FIX MsgType | Structure |
|---|---|---|---|
MarketDataIncrementalRefresh | 88 | X | root MdIncRefreshRoot (trade_date) + group of MdIncGrpEntry (md_update_action, md_entry_type, security_id, md_entry_px_mantissa, …) |
MarketDataSnapshotFullRefresh | 87 | W | root MdSnapRefreshRoot (security_id, symbol, trading_session_id) + group of MdFullGrpEntry |
MassQuote | 105 | i | root MassQuoteRoot + nested groups: QuoteSets (QuoteSetEntry) each containing QuoteEntries (QuoteEntry) |
Encode / Decode API
Section titled “Encode / Decode API”Encoding writes header + block(s) into a caller buffer and returns the byte count (0 if the buffer is too small):
pub fn encode(self: *const T, buf: []u8) usize; // fixed-blockpub fn encode(trade_date: u16, entries: []const MdIncGrpEntry, buf: []u8) usize; // MD incrementalDecoding is checked: all decoders validate untrusted wire bytes (lengths,
group extents, and every exhaustive enum field) before returning a zero-copy
view, and return DecodeError!...:
pub const DecodeError = error{ Truncated, InvalidGroup, InvalidEnumValue, IndexOutOfRange };
// Fixed-block messages:const decoded = try NewOrderSingle.decode(buf);// decoded.header: MessageHeader, decoded.msg: *align(1) const NewOrderSingle
// Group messages return a validated view with bounds-checked accessors:const md = try MarketDataIncrementalRefresh.decode(buf);const e0 = try md.entry(0); // *align(1) const MdIncGrpEntry
const mq = try MassQuote.decode(buf);const set = try mq.quoteSet(0);const q = try set.entry(0); // *align(1) const QuoteEntryWire Protocol Flyweights
Section titled “Wire Protocol Flyweights”Aeron-compatible flyweights in src/protocol/flyweight.zig. Each wraps a []u8 buffer.
DataHeaderFlyweight (32 bytes)
Section titled “DataHeaderFlyweight (32 bytes)”| Method | Signature | Description |
|---|---|---|
wrap | fn wrap(buf: []u8) DataHeaderFlyweight | Wrap existing buffer |
init | fn init(buf: []u8) DataHeaderFlyweight | Wrap and set type=DATA |
frameLength/setFrameLength | i32 | Total frame size |
flags/setFlags | u8 | BEGIN/END/EOS flags |
termOffset/setTermOffset | u32 | Offset in term |
sessionId/setSessionId | i32 | Session identifier |
streamId/setStreamId | i32 | Stream identifier |
termId/setTermId | i32 | Term identifier |
reservedValue/setReservedValue | i64 | User metadata |
payload | fn payload(self) []u8 | Payload region after header |
isBeginMessage/isEndMessage/isEndOfStream | bool | Flag checks |
StatusMessageFlyweight (36 bytes)
Section titled “StatusMessageFlyweight (36 bytes)”| Method | Signature | Description |
|---|---|---|
sessionId/streamId | i32 | Identifiers |
consumptionTermId/consumptionTermOffset | i32 | Consumption position |
receiverWindowLength | i32 | Advertised window |
receiverId | i64 | Unique receiver ID |
NakFlyweight (28 bytes)
Section titled “NakFlyweight (28 bytes)”| Method | Signature | Description |
|---|---|---|
sessionId/streamId/termId | i32 | Identifiers |
termOffset | i32 | Start of missing range |
nakLength | i32 | Length of missing range |
SetupFlyweight (40 bytes), RttMeasurementFlyweight (40 bytes), ErrorFlyweight (28+ bytes)
Section titled “SetupFlyweight (40 bytes), RttMeasurementFlyweight (40 bytes), ErrorFlyweight (28+ bytes)”All follow the same pattern: wrap(buf), init(buf), typed getters/setters.
Position Helpers
Section titled “Position Helpers”| Function | Signature | Description |
|---|---|---|
computePosition | fn computePosition(term_offset, term_id, shift, initial_term_id) i64 | Absolute position from term addressing |
computeTermIdFromPosition | fn computeTermIdFromPosition(position, shift, initial_term_id) i32 | Term ID from position |
computeTermOffsetFromPosition | fn computeTermOffsetFromPosition(position, shift) i32 | Offset from position |
Broadcast Buffer
Section titled “Broadcast Buffer”BroadcastTransmitter
Section titled “BroadcastTransmitter”Single-producer transmitter for 1-to-N messaging.
var buf: [1024 + TRAILER_LENGTH]u8 align(cache_line_size) = [_]u8{0} ** (1024 + TRAILER_LENGTH);var tx = BroadcastTransmitter.init(&buf);tx.transmit(42, "market data update");| Method | Signature | Description |
|---|---|---|
init | fn init(buf: []u8) BroadcastTransmitter | Initialize (capacity must be power of 2) |
transmit | fn transmit(self, msg_type_id: i32, msg: []const u8) void | Transmit a message (always succeeds, old data overwritten) |
calculateMaxMessageLength | fn calculateMaxMessageLength(self) u32 | Max payload size: (capacity / 8) - 8 |
BroadcastReceiver
Section titled “BroadcastReceiver”Per-consumer receiver. Each maintains its own cursor.
| Method | Signature | Description |
|---|---|---|
init | fn init(buf: []const u8) BroadcastReceiver | Join from current tail position |
receiveNext | fn receiveNext(self) ?Message | Read next message, or null if none |
validate | fn validate(self) bool | Check data not overwritten |
lappedCount | fn lappedCount(self) u64 | Times receiver was lapped |
CopyBroadcastReceiver
Section titled “CopyBroadcastReceiver”Wrapper that copies payload to internal scratch buffer for safe retention.
| Method | Signature | Description |
|---|---|---|
init | fn init(buf: []const u8) CopyBroadcastReceiver | Initialize |
receiveNext | fn receiveNext(self) ?Message | Receive with copy to scratch |
lappedCount | fn lappedCount(self) u64 | Times lapped |
Idle Strategies
Section titled “Idle Strategies”IdleStrategy
Section titled “IdleStrategy”Tagged union dispatching to concrete strategies via idle(work_count) and reset().
var strategy = idle_strategy.backoff();strategy.idle(0); // no work -> back offstrategy.idle(1); // work done -> reset to active| Strategy | Latency | CPU | Description |
|---|---|---|---|
BusySpinIdleStrategy | Lowest | Highest | Hardware PAUSE instruction |
YieldingIdleStrategy | Low | High | Thread.yield() |
SleepingIdleStrategy | Medium | Low | Thread.sleep(N) |
BackoffIdleStrategy | Adaptive | Adaptive | NOT_IDLE -> SPINNING -> YIELDING -> PARKING |
NoOpIdleStrategy | N/A | N/A | Does nothing |
Convenience constructors: busySpin(), yielding(), sleeping(ns), backoff(), noOp().
Agent Pattern
Section titled “Agent Pattern”AgentFn
Section titled “AgentFn”Function-pointer-based agent interface for composable units of work.
pub const AgentFn = struct { doWorkFn: *const fn (ctx: *anyopaque) u32, // returns work count onStartFn: ?*const fn (ctx: *anyopaque) void, // lifecycle start onCloseFn: ?*const fn (ctx: *anyopaque) void, // lifecycle close ctx: *anyopaque, name: []const u8,};AgentRunner
Section titled “AgentRunner”Runs an agent on a dedicated thread with an idle strategy.
| Method | Signature | Description |
|---|---|---|
init | fn init(agent: AgentFn, idle: IdleStrategy) AgentRunner | Create runner |
start | fn start(self) !void | Start agent on new thread |
stop | fn stop(self) void | Stop agent and join thread |
isRunning | fn isRunning(self) bool | Check if running |
errorCount | fn errorCount(self) u64 | Error counter |
CompositeAgent
Section titled “CompositeAgent”Combines multiple agents. Returns sum of work from all sub-agents.
| Method | Signature | Description |
|---|---|---|
init | fn init(agents: []const AgentFn) CompositeAgent | Create composite |
agentFn | fn agentFn(self) AgentFn | Get AgentFn interface |
DutyCycleTracker
Section titled “DutyCycleTracker”Measures cycle performance for monitoring and tuning.
| Method | Signature | Description |
|---|---|---|
cycleStart | fn cycleStart(self) void | Record cycle start |
cycleEnd | fn cycleEnd(self, work_count: u32) void | Record cycle end |
averageCycleNs | fn averageCycleNs(self) u64 | Recent cycle duration |
workRatio | fn workRatio(self) f64 | Ratio of busy vs idle cycles (0.0—1.0) |
Counters
Section titled “Counters”Counter
Section titled “Counter”Lightweight atomic i64 counter handle for hot-path instrumentation.
| Method | Signature | Description |
|---|---|---|
increment | fn increment(self) void | Atomic +1 (monotonic) |
incrementBy | fn incrementBy(self, n: i64) void | Atomic +n |
decrement | fn decrement(self) void | Atomic -1 |
get | fn get(self) i64 | Load (acquire) |
set | fn set(self, val: i64) void | Store (release) |
getAndReset | fn getAndReset(self) i64 | Swap to 0 (acq_rel) |
CounterSet
Section titled “CounterSet”Fixed-capacity set of named atomic counters (max 64).
| Method | Signature | Description |
|---|---|---|
init | fn init() CounterSet | Zero-initialized set |
allocate | fn allocate(self, counter_type: CounterType, name: []const u8) ?Counter | Allocate counter slot |
getByType | fn getByType(self, counter_type: CounterType) ?Counter | Look up by type |
forEach | fn forEach(self, callback) void | Iterate all active counters |
snapshot | fn snapshot(self, out: []CounterSnapshot) u32 | Copy all values |
resetAll | fn resetAll(self) void | Reset all to zero |
GlobalCounters
Section titled “GlobalCounters”System-wide registry organized by subsystem (IPC, Network, Reliability, Archive, Cluster, Sequencer, System).
| Method | Signature | Description |
|---|---|---|
init | fn init() GlobalCounters | Empty counter sets |
initWithDefaults | fn initWithDefaults() GlobalCounters | Pre-register all standard counters |
formatReport | fn formatReport(self, buf: []u8) []const u8 | Human-readable report |
Congestion Control
Section titled “Congestion Control”CongestionControl
Section titled “CongestionControl”AIMD congestion control with slow start and congestion avoidance phases.
var cc = CongestionControl.init(.{ .initial_window = 64 * 1024, .max_window = 16 * 1024 * 1024, .min_window = 4 * 1024, .mss = 1460, .initial_ssthresh = 1024 * 1024,});| Method | Signature | Description |
|---|---|---|
init | fn init(cfg: CongestionConfig) CongestionControl | Initialize |
onAck | fn onAck(self, bytes_acked: u64) void | Window increase (slow start or CA) |
onLoss | fn onLoss(self) void | Multiplicative decrease |
onTimeout | fn onTimeout(self) void | Reset to min_window, re-enter slow start |
canSend | fn canSend(self, bytes: u64) bool | Check window allows sending |
onSend | fn onSend(self, bytes: u64) void | Record bytes in flight |
availableWindow | fn availableWindow(self) u64 | Bytes available in window |
RttEstimator
Section titled “RttEstimator”RFC 6298 EWMA-based RTT estimation.
| Method | Signature | Description |
|---|---|---|
init | fn init() RttEstimator | Initialize (1s initial RTO) |
update | fn update(self, rtt_ns: u64) void | Record RTT sample |
retransmitTimeout | fn retransmitTimeout(self) u64 | Current RTO (ns) |
smoothedRtt | fn smoothedRtt(self) u64 | Current SRTT (ns) |
NakController
Section titled “NakController”Exponential backoff for NAK timing.
| Method | Signature | Description |
|---|---|---|
init | fn init(config: NakConfig) NakController | Initialize |
shouldSendNak | fn shouldSendNak(self, now_ns: u64) bool | Check if enough time elapsed |
onNakSent | fn onNakSent(self, now_ns: u64) void | Record NAK sent, increase backoff |
onGapFilled | fn onGapFilled(self) void | Reset state for reuse |
isExhausted | fn isExhausted(self) bool | Max retransmits exceeded |
currentDelay | fn currentDelay(self) u64 | Current delay with backoff (ns) |
Flow Control
Section titled “Flow Control”FlowControl
Section titled “FlowControl”Unified flow control dispatching to Min, Max, or Tagged strategy.
var fc = FlowControl.init(.{ .strategy = .min, .receiver_timeout_ns = 5_000_000_000 });const new_limit = fc.onStatusMessage(status, sender_limit, initial_term_id, shift, now_ns);| Method | Signature | Description |
|---|---|---|
init | fn init(cfg: FlowControlConfig) FlowControl | Create from config |
onStatusMessage | fn onStatusMessage(self, status, sender_limit, initial_term_id, shift, now_ns) i64 | Process receiver status, return new sender limit |
onIdle | fn onIdle(self, now_ns, sender_limit, sender_position, is_eos) i64 | Remove stale receivers, return current limit |
hasRequiredReceivers | fn hasRequiredReceivers(self) bool | Check for active receivers |
MinFlowControl
Section titled “MinFlowControl”Sender limit = minimum position across all active receivers. Guarantees no receiver left behind.
MaxFlowControl
Section titled “MaxFlowControl”Sender always advances. No back-pressure. Suitable for market data where stale quotes are worthless.
TaggedFlowControl
Section titled “TaggedFlowControl”Only receivers matching required_group_tag constrain the sender. Untagged receivers are tracked but do not limit.
ReceiverStatus
Section titled “ReceiverStatus”pub const ReceiverStatus = struct { session_id: i32, stream_id: i32, consumption_term_id: i32, consumption_term_offset: i32, receiver_window_length: i32, receiver_id: i64, timestamp_ns: u64,};Archive Catalog
Section titled “Archive Catalog”Catalog
Section titled “Catalog”Tracks segment metadata with time-range and stream queries.
| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator, base_path: []const u8) !Catalog | Initialize |
deinit | fn deinit(self: *Catalog) void | Free entries |
addEntry | fn addEntry(self, entry: CatalogEntry) !void | Add segment metadata |
updateEntry | fn updateEntry(self, segment_id: u32, entry: CatalogEntry) !void | Update existing |
getEntry | fn getEntry(self, segment_id: u32) ?CatalogEntry | Look up by ID |
findByTimestamp | fn findByTimestamp(self, from_ns: u64, to_ns: u64) []const CatalogEntry | Time range query |
findByStream | fn findByStream(self, stream_id: u32) ![]CatalogEntry | Stream filter (caller frees) |
save | fn save(self) !void | Persist to disk |
load | fn load(allocator: Allocator, path: []const u8) !Catalog | Load from disk |
totalRecords | fn totalRecords(self) u64 | Sum of record counts |
totalBytes | fn totalBytes(self) u64 | Sum of payload bytes |
segmentCount | fn segmentCount(self) u32 | Number of segments |
CatalogEntry (56 bytes serialized):
pub const CatalogEntry = struct { segment_id: u32, start_offset: u64, end_offset: u64, start_timestamp_ns: u64, end_timestamp_ns: u64, stream_id: u32, record_count: u32, total_bytes: u64, closed: bool,};Archive Index
Section titled “Archive Index”SparseIndex
Section titled “SparseIndex”Indexes every Nth record for fast binary-search lookup within segments.
| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator, segment_id: u32, interval: u32) SparseIndex | Initialize |
deinit | fn deinit(self: *SparseIndex) void | Free entries |
record | fn record(self, seq: u32, offset: u64, timestamp_ns: u64, stream_id: u32) !void | Record an entry (indexes every Nth) |
findByTimestamp | fn findByTimestamp(self, timestamp_ns: u64) ?IndexEntry | Binary search by timestamp |
findBySequence | fn findBySequence(self, record_seq: u32) ?IndexEntry | Binary search by sequence |
save | fn save(self, base_path: []const u8) !void | Save to disk |
load | fn load(allocator: Allocator, base_path: []const u8, segment_id: u32) !SparseIndex | Load from disk |
rebuild | fn rebuild(allocator, segment_file, segment_id, interval) !SparseIndex | Rebuild by scanning segment |
IndexEntry (24 bytes serialized):
pub const IndexEntry = struct { record_seq: u32, file_offset: u64, timestamp_ns: u64, stream_id: u32,};Compression
Section titled “Compression”Compressor
Section titled “Compressor”LZ4-style compression with hash-table-based matching. 64 KB sliding window.
| Method | Signature | Description |
|---|---|---|
init | fn init(allocator: Allocator) !Compressor | Allocate hash table |
deinit | fn deinit(self, allocator: Allocator) void | Free hash table |
compress | fn compress(self, src: []const u8, dst: []u8) !usize | Compress into buffer, returns bytes written |
maxCompressedSize | fn maxCompressedSize(input_size: usize) usize | Worst-case output size |
Decompressor
Section titled “Decompressor”| Method | Signature | Description |
|---|---|---|
decompress | fn decompress(src: []const u8, dst: []u8) !usize | Decompress, returns bytes written |
Frame API
Section titled “Frame API”| Function | Signature | Description |
|---|---|---|
compressFrame | fn compressFrame(allocator, src: []const u8) ![]u8 | Compress with 16-byte header + CRC32 |
decompressFrame | fn decompressFrame(allocator, frame_data: []const u8) ![]u8 | Decompress and validate checksum |
CompressedFrame (16-byte header):
pub const CompressedFrame = struct { magic: u32, // 0x5A424C5A ("ZBLZ") original_size: u32, compressed_size: u32, checksum: u32, // CRC32 of original data};FFI Exports
Section titled “FFI Exports”C-ABI functions exported from src/ffi/exports.zig:
| Function | Signature | Description |
|---|---|---|
zigbolt_transport_create | (term_length: u32, use_hugepages: u8, pre_fault: u8) ?*anyopaque | Create transport |
zigbolt_transport_destroy | (handle: ?*anyopaque) void | Destroy transport |
zigbolt_ipc_create | (name: ?[*:0]const u8, term_length: u32) ?*anyopaque | Create IPC channel |
zigbolt_ipc_open | (name: ?[*:0]const u8, term_length: u32) ?*anyopaque | Open IPC channel |
zigbolt_ipc_destroy | (handle: ?*anyopaque) void | Destroy IPC channel |
zigbolt_publish | (handle: ?*anyopaque, data: ?[*]const u8, len: u32, msg_type_id: i32) i32 | Publish (0=success) |
zigbolt_poll | (handle: ?*anyopaque, callback: ?FragmentHandlerFn, limit: u32) u32 | Poll messages |
zigbolt_version_major | () u32 | Major version (0) |
zigbolt_version_minor | () u32 | Minor version (2) |
zigbolt_version_patch | () u32 | Patch version (1) |
zig build produces the shared library (zig-out/lib/libzigbolt.dylib on
macOS, .so on Linux) and the static archive zig-out/lib/libzigbolt.a.
Five language bindings — C, Rust, Python, Go, and TypeScript — are maintained
in the repository’s bindings/ directory. All five build and pass smoke
tests against this library (version 0.2.1). See
Installation for per-language setup,
including the shared ZIGBOLT_LIB_PATH environment variable.