Flash 228 lines
// pipe: anonymous SPSC byte pipe.
//
// One page per Pipe: header at offset 0, byte ring fills the rest
// (PAGE_SIZE - sizeof(Pipe)). head/tail are monotone u64 byte
// counters indexed modulo RING_CAP, so full vs. empty is
// distinguishable without a reserved slot. Page lifetime is owned by
// Pipe.refs, not mm.*_pages; unref() is the only path back to the
// allocator. Single-producer / single-consumer per end.
const builtin = #import("builtin")
// Named module; see src/wait_queue.flash.
const layout = #import("task_layout")
const wq_mod = #import("wait_queue")
pub const WaitQueue = wq_mod.WaitQueue
pub const TaskStruct = layout.TaskStruct
pub const FD_TABLE_SIZE = layout.FD_TABLE_SIZE
pub const PAGE_SIZE u64 = 1 << 12
extern fn get_free_page() u64
extern fn free_page(p u64) void
extern fn preempt_disable() void
extern fn preempt_enable() void
extern fn schedule() void
// In the freestanding kernel build the page allocator hands out a
// physical address; the kernel reads/writes the page through its
// TTBR1 linear-map alias at `pa | LINEAR_MAP_BASE`. The host test
// build allocates from a static buffer (tests/host_stubs.zig) and
// returns a bare host VA — no alias, identity mapping. Branching at
// comptime keeps the kernel path zero-overhead.
const LINEAR_MAP_BASE u64 = 0xFFFF000000000000
inline fn pageKva(pa u64) u64 {
return if (builtin.target.os.tag == .freestanding) pa | LINEAR_MAP_BASE else pa
}
pub const Pipe = extern struct {
refs u32 = 0,
_pad u32 = 0,
head u64 = 0,
tail u64 = 0,
readers_wq WaitQueue = .{},
writers_wq WaitQueue = .{},
// Ring data follows in the same page; see ringBase().
pub fn count(self *Pipe) u64 {
return self.head -% self.tail
}
pub fn isEmpty(self *Pipe) bool {
return self.head == self.tail
}
pub fn isFull(self *Pipe) bool {
return self.count() == RING_CAP
}
}
pub const HEADER_SIZE u64 = #sizeOf(Pipe)
pub const RING_CAP u64 = PAGE_SIZE - HEADER_SIZE
inline fn ringBase(p *mut Pipe) [*]mut u8 {
const base u64 = #intFromPtr(p) + HEADER_SIZE
return #ptrFromInt(base)
}
// Allocate and zero a Pipe. Returns null on allocator failure.
// refs starts at 0; the installer takes the first ref.
pub fn alloc() ?*mut Pipe {
const pa = get_free_page()
if pa == 0 { return null }
const kva = pageKva(pa)
const p *mut Pipe = #ptrFromInt(kva)
p.* = .{}
return p
}
pub fn ref(p *mut Pipe) void {
preempt_disable()
p.refs += 1
preempt_enable()
}
// Drop one ref. On the last drop, wake both wait queues (woken tasks
// observe refs == 0 on re-entry) and free the page.
pub fn unref(p *mut Pipe) void {
preempt_disable()
p.refs -= 1
const last = p.refs == 0
preempt_enable()
if !last { return }
// Wake runs after the refs == 0 decision. No other ref exists, so
// no concurrent reader or writer can race the free.
p.readers_wq.wake_all()
p.writers_wq.wake_all()
const kva u64 = #intFromPtr(p)
const pa u64 = if (builtin.target.os.tag == .freestanding) kva & ~LINEAR_MAP_BASE else kva
free_page(pa)
}
// Block until a byte is available, then drain up to len bytes.
// Returns 0 on EOF (refs <= 1 and empty: no writer can wake the
// reader). Negative is reserved for future short-read errors.
pub fn read(p *mut Pipe, buf [*]mut u8, len u64) i64 {
var written u64 = 0
while written < len {
p.readers_wq.prepare_to_wait()
if p.isEmpty() {
// Last-writer-closed EOF: caller's fd is the only ref.
if p.refs <= 1 {
p.readers_wq.finish_wait()
break
}
schedule()
continue
}
p.readers_wq.finish_wait()
preempt_disable()
const ring = ringBase(p)
while written < len && !p.isEmpty() {
buf[written] = ring[p.tail % RING_CAP]
p.tail +%= 1
written += 1
}
preempt_enable()
p.writers_wq.wake_one()
// One drain per call: short read is POSIX-conformant for pipes.
break
}
p.readers_wq.finish_wait()
return #intCast(written)
}
// Push bytes until `len` are written or the pipe loses all readers.
// Returns the number of bytes pushed; negative is reserved.
pub fn write(p *mut Pipe, buf [*]u8, len u64) i64 {
var pushed u64 = 0
while pushed < len {
p.writers_wq.prepare_to_wait()
if p.isFull() {
// Last reader closed. Short write of bytes pushed so far.
// TODO: SIGPIPE / signal delivery not implemented.
if p.refs <= 1 {
p.writers_wq.finish_wait()
break
}
schedule()
continue
}
p.writers_wq.finish_wait()
preempt_disable()
const ring = ringBase(p)
while pushed < len && !p.isFull() {
ring[p.head % RING_CAP] = buf[pushed]
p.head +%= 1
pushed += 1
}
preempt_enable()
p.readers_wq.wake_one()
}
p.writers_wq.finish_wait()
return #intCast(pushed)
}
// ---- Host tests ----
const std = #import("std")
test "empty pipe: isEmpty true, isFull false, count == 0" {
const p = alloc() orelse return error.OutOfMemory
p.refs = 1
try std.testing.expect(p.isEmpty())
try std.testing.expect(!p.isFull())
try std.testing.expectEqual(#as(u64, 0), p.count())
p.refs = 0
// Not calling unref — host stubs leak; bump-allocator doesn't recycle.
}
test "write then read round-trips bytes" {
const p = alloc() orelse return error.OutOfMemory
p.refs = 2 // two fds installed
const payload = "hello-pipe"
const n_w = write(p, payload.ptr, payload.len)
try std.testing.expectEqual(#as(i64, payload.len), n_w)
try std.testing.expectEqual(#as(u64, payload.len), p.count())
var buf [16]u8 = undefined
const n_r = read(p, &buf, payload.len)
try std.testing.expectEqual(#as(i64, payload.len), n_r)
try std.testing.expectEqualSlices(u8, payload, buf[0..#intCast(n_r)])
try std.testing.expect(p.isEmpty())
}
test "head/tail wraparound preserves byte order" {
const p = alloc() orelse return error.OutOfMemory
p.refs = 2
// Seed head/tail near wrap so the next write+read straddles modulo.
p.head = RING_CAP - 4
p.tail = RING_CAP - 4
const payload = "ABCDEFGH" // 8 bytes — last 4 wrap to ring[0..4]
_ = write(p, payload.ptr, payload.len)
try std.testing.expectEqual(#as(u64, 8), p.count())
var buf [8]u8 = undefined
_ = read(p, &buf, payload.len)
try std.testing.expectEqualSlices(u8, payload, buf[0..])
}
test "EOF: empty pipe with refs == 1 returns 0 instead of blocking" {
const p = alloc() orelse return error.OutOfMemory
p.refs = 1 // caller holds only the read end
var buf [4]u8 = undefined
const n = read(p, &buf, buf.len)
try std.testing.expectEqual(#as(i64, 0), n)
}
test "isFull vs isEmpty mutually exclusive at boundaries" {
const p = alloc() orelse return error.OutOfMemory
p.refs = 2
// count == 0 → empty, not full.
try std.testing.expect(p.isEmpty())
try std.testing.expect(!p.isFull())
// count == RING_CAP → full, not empty.
p.head = RING_CAP
p.tail = 0
try std.testing.expect(p.isFull())
try std.testing.expect(!p.isEmpty())
}