Lindenii Project Forge
Clear epoll flags in callbacks
use io; use rt; // A callback for a [[read]] or [[readv]] operation. export type readcb = fn(file: *file, result: (size | io::EOF | io::error)) void; // Schedules a read operation on a file object. export fn read( file: *file, cb: *readcb, buf: []u8, ) req = { file.vbuf = io::mkvector(buf); // XXX: Bit of a hack to avoid allocating a slice const vec = (&file.vbuf: *[*]io::vector)[..1]; return readv(file, cb, vec...); }; // Schedules a vectored read operation on a file object. export fn readv( file: *file, cb: *readcb, vec: io::vector... ) req = { assert(file.op == op::NONE); if (file.flags & fflags::BLOCKING != 0) { const r = io::readv(file.fd, vec...); cb(file, r); return req { ... }; }; file.op = op::READV; file.cb = cb; file.vec = vec; filemod(file, rt::EPOLLIN | rt::EPOLLHUP); return req { ... }; }; fn readv_ready(file: *file, ev: *rt::epoll_event) void = { assert(file.op == op::READV); assert(ev.events & (rt::EPOLLIN | rt::EPOLLHUP) != 0); assert(file.cb != null); const cb = file.cb: *readcb; file.op = op::NONE;
filemod(file, 0);
if (ev.events & rt::EPOLLHUP != 0) { cb(file, io::EOF); } else { const r = io::readv(file.fd, file.vec...); cb(file, r); }; }; // A callback for a [[write]] or [[writev]] operation. export type writecb = fn(file: *file, result: (size | io::error)) void; // Schedules a write operation on a file object. export fn write( file: *file, cb: *writecb, buf: []u8, ) req = { file.vbuf = io::mkvector(buf); // XXX: Bit of a hack to avoid allocating a slice const vec = (&file.vbuf: *[*]io::vector)[..1]; return writev(file, cb, vec...); }; // Schedules a vectored read operation on a file object. export fn writev( file: *file, cb: *writecb, vec: io::vector... ) req = { // XXX: Should we support both pending reads and writes at the same // time? (yes) assert(file.op == op::NONE); if (file.flags & fflags::BLOCKING != 0) { const r = io::writev(file.fd, vec...); cb(file, r); return req { ... }; }; file.op = op::WRITEV; file.cb = cb; file.vec = vec; filemod(file, rt::EPOLLOUT | rt::EPOLLHUP); return req { ... }; }; fn writev_ready(file: *file, ev: *rt::epoll_event) void = { assert(file.op == op::WRITEV && ev.events & rt::EPOLLOUT != 0); assert(file.cb != null); const r = io::writev(file.fd, file.vec...); const cb = file.cb: *writecb; file.op = op::NONE;
filemod(file, 0);
cb(file, r); };
use errors; use net; use net::ip; use net::tcp; use rt; // Creates a socket which listens for incoming TCP connections on the given // IP address and port. export fn listen_tcp( loop: *loop, addr: ip::addr, port: u16, opts: tcp::listen_option... ) (*file | net::error | errors::error) = { const sock = tcp::listen(addr, port, opts...)?; return register(loop, sock)?; }; // A callback for an [[accept]] operation. export type acceptcb = fn(file: *file, result: (*file | net::error)) void; // Schedules an accept operation on a socket. export fn accept( sock: *file, cb: *acceptcb, flags: net::sockflags... ) req = { assert(sock.op == op::NONE); let fl: net::sockflags = 0; for (let i = 0z; i < len(flags); i += 1) { fl |= flags[i]; }; sock.op = op::ACCEPT; sock.cb = cb; sock.sockflags = fl; filemod(sock, rt::EPOLLIN); return req { ... }; }; fn accept_ready( sock: *file, ev: *rt::epoll_event, ) void = { assert(sock.op == op::ACCEPT); assert(ev.events & rt::EPOLLIN != 0); assert(sock.cb != null); const cb = sock.cb: *acceptcb; sock.op = op::NONE;
filemod(sock, 0);
const r = tcp::accept(sock.fd, sock.sockflags); match (r) { case let fd: net::socket => // TODO: Bubble up errors from here? const file = register(sock.ev, fd)!; cb(sock, file); case let err: net::error => cb(sock, err); }; };