ajhahn.de
← FlashOS
Flash 228 lines
// pipe: anonymous SPSC byte pipe.
//
// One page per Pipe: header at offset 0, byte ring fills the rest
// (PAGE_SIZE - sizeof(Pipe)). head/tail are monotone u64 byte
// counters indexed modulo RING_CAP, so full vs. empty is
// distinguishable without a reserved slot. Page lifetime is owned by
// Pipe.refs, not mm.*_pages; unref() is the only path back to the
// allocator. Single-producer / single-consumer per end.

const builtin = #import("builtin")
// Named module; see src/wait_queue.flash.
const layout = #import("task_layout")
const wq_mod = #import("wait_queue")

pub const WaitQueue = wq_mod.WaitQueue
pub const TaskStruct = layout.TaskStruct
pub const FD_TABLE_SIZE = layout.FD_TABLE_SIZE

pub const PAGE_SIZE u64 = 1 << 12

extern fn get_free_page() u64
extern fn free_page(p u64) void
extern fn preempt_disable() void
extern fn preempt_enable() void
extern fn schedule() void

// In the freestanding kernel build the page allocator hands out a
// physical address; the kernel reads/writes the page through its
// TTBR1 linear-map alias at `pa | LINEAR_MAP_BASE`. The host test
// build allocates from a static buffer (tests/host_stubs.zig) and
// returns a bare host VA — no alias, identity mapping. Branching at
// comptime keeps the kernel path zero-overhead.
const LINEAR_MAP_BASE u64 = 0xFFFF000000000000

inline fn pageKva(pa u64) u64 {
    return if (builtin.target.os.tag == .freestanding) pa | LINEAR_MAP_BASE else pa
}

pub const Pipe = extern struct {
    refs u32 = 0,
    _pad u32 = 0,
    head u64 = 0,
    tail u64 = 0,
    readers_wq WaitQueue = .{},
    writers_wq WaitQueue = .{},
    // Ring data follows in the same page; see ringBase().

    pub fn count(self *Pipe) u64 {
        return self.head -% self.tail
    }
    pub fn isEmpty(self *Pipe) bool {
        return self.head == self.tail
    }
    pub fn isFull(self *Pipe) bool {
        return self.count() == RING_CAP
    }
}

pub const HEADER_SIZE u64 = #sizeOf(Pipe)
pub const RING_CAP u64 = PAGE_SIZE - HEADER_SIZE

inline fn ringBase(p *mut Pipe) [*]mut u8 {
    const base u64 = #intFromPtr(p) + HEADER_SIZE
    return #ptrFromInt(base)
}

// Allocate and zero a Pipe. Returns null on allocator failure.
// refs starts at 0; the installer takes the first ref.
pub fn alloc() ?*mut Pipe {
    const pa = get_free_page()
    if pa == 0 { return null }
    const kva = pageKva(pa)
    const p *mut Pipe = #ptrFromInt(kva)
    p.* = .{}
    return p
}

pub fn ref(p *mut Pipe) void {
    preempt_disable()
    p.refs += 1
    preempt_enable()
}

// Drop one ref. On the last drop, wake both wait queues (woken tasks
// observe refs == 0 on re-entry) and free the page.
pub fn unref(p *mut Pipe) void {
    preempt_disable()
    p.refs -= 1
    const last = p.refs == 0
    preempt_enable()
    if !last { return }
    // Wake runs after the refs == 0 decision. No other ref exists, so
    // no concurrent reader or writer can race the free.
    p.readers_wq.wake_all()
    p.writers_wq.wake_all()
    const kva u64 = #intFromPtr(p)
    const pa u64 = if (builtin.target.os.tag == .freestanding) kva & ~LINEAR_MAP_BASE else kva
    free_page(pa)
}

// Block until a byte is available, then drain up to len bytes.
// Returns 0 on EOF (refs <= 1 and empty: no writer can wake the
// reader). Negative is reserved for future short-read errors.
pub fn read(p *mut Pipe, buf [*]mut u8, len u64) i64 {
    var written u64 = 0
    while written < len {
        p.readers_wq.prepare_to_wait()
        if p.isEmpty() {
            // Last-writer-closed EOF: caller's fd is the only ref.
            if p.refs <= 1 {
                p.readers_wq.finish_wait()
                break
            }
            schedule()
            continue
        }
        p.readers_wq.finish_wait()
        preempt_disable()
        const ring = ringBase(p)
        while written < len && !p.isEmpty() {
            buf[written] = ring[p.tail % RING_CAP]
            p.tail +%= 1
            written += 1
        }
        preempt_enable()
        p.writers_wq.wake_one()
        // One drain per call: short read is POSIX-conformant for pipes.
        break
    }
    p.readers_wq.finish_wait()
    return #intCast(written)
}

// Push bytes until `len` are written or the pipe loses all readers.
// Returns the number of bytes pushed; negative is reserved.
pub fn write(p *mut Pipe, buf [*]u8, len u64) i64 {
    var pushed u64 = 0
    while pushed < len {
        p.writers_wq.prepare_to_wait()
        if p.isFull() {
            // Last reader closed. Short write of bytes pushed so far.
            // TODO: SIGPIPE / signal delivery not implemented.
            if p.refs <= 1 {
                p.writers_wq.finish_wait()
                break
            }
            schedule()
            continue
        }
        p.writers_wq.finish_wait()
        preempt_disable()
        const ring = ringBase(p)
        while pushed < len && !p.isFull() {
            ring[p.head % RING_CAP] = buf[pushed]
            p.head +%= 1
            pushed += 1
        }
        preempt_enable()
        p.readers_wq.wake_one()
    }
    p.writers_wq.finish_wait()
    return #intCast(pushed)
}

// ---- Host tests ----

const std = #import("std")

test "empty pipe: isEmpty true, isFull false, count == 0" {
    const p = alloc() orelse return error.OutOfMemory
    p.refs = 1
    try std.testing.expect(p.isEmpty())
    try std.testing.expect(!p.isFull())
    try std.testing.expectEqual(#as(u64, 0), p.count())
    p.refs = 0
    // Not calling unref — host stubs leak; bump-allocator doesn't recycle.
}

test "write then read round-trips bytes" {
    const p = alloc() orelse return error.OutOfMemory
    p.refs = 2 // two fds installed
    const payload = "hello-pipe"
    const n_w = write(p, payload.ptr, payload.len)
    try std.testing.expectEqual(#as(i64, payload.len), n_w)
    try std.testing.expectEqual(#as(u64, payload.len), p.count())

    var buf [16]u8 = undefined
    const n_r = read(p, &buf, payload.len)
    try std.testing.expectEqual(#as(i64, payload.len), n_r)
    try std.testing.expectEqualSlices(u8, payload, buf[0..#intCast(n_r)])
    try std.testing.expect(p.isEmpty())
}

test "head/tail wraparound preserves byte order" {
    const p = alloc() orelse return error.OutOfMemory
    p.refs = 2
    // Seed head/tail near wrap so the next write+read straddles modulo.
    p.head = RING_CAP - 4
    p.tail = RING_CAP - 4
    const payload = "ABCDEFGH" // 8 bytes — last 4 wrap to ring[0..4]
    _ = write(p, payload.ptr, payload.len)
    try std.testing.expectEqual(#as(u64, 8), p.count())
    var buf [8]u8 = undefined
    _ = read(p, &buf, payload.len)
    try std.testing.expectEqualSlices(u8, payload, buf[0..])
}

test "EOF: empty pipe with refs == 1 returns 0 instead of blocking" {
    const p = alloc() orelse return error.OutOfMemory
    p.refs = 1 // caller holds only the read end
    var buf [4]u8 = undefined
    const n = read(p, &buf, buf.len)
    try std.testing.expectEqual(#as(i64, 0), n)
}

test "isFull vs isEmpty mutually exclusive at boundaries" {
    const p = alloc() orelse return error.OutOfMemory
    p.refs = 2
    // count == 0 → empty, not full.
    try std.testing.expect(p.isEmpty())
    try std.testing.expect(!p.isFull())
    // count == RING_CAP → full, not empty.
    p.head = RING_CAP
    p.tail = 0
    try std.testing.expect(p.isFull())
    try std.testing.expect(!p.isEmpty())
}