Lindenii Project Forge
Handle nomem in just about everything Signed-off-by: Runxi Yu <me@runxiyu.org>
use errors; use ev; use fmt; use os; use os::exec; use time; export fn main() void = { const loop = ev::newloop()!; defer ev::finish(&loop); const cmd = exec::cmd(os::args[1], os::args[2..]...)!; const child = ev::exec(&loop, &cmd)!; ev::wait(child, &child_exited); const timer = ev::newtimer(&loop, &expired, time::clock::MONOTONIC)!; ev::timer_configure(timer, 5 * time::SECOND, 0); ev::setuser(timer, child); for (ev::dispatch(&loop, -1)!) void; };
fn expired(timer: *ev::file) void = {
fn expired(timer: *ev::file) (void | nomem) = {
fmt::println("Child timed out, sending SIGTERM")!; const child = ev::getuser(timer): *ev::file; ev::kill(child)!; };
fn child_exited(child: *ev::file, r: exec::status) void = {
fn child_exited(child: *ev::file, r: exec::status) (void | nomem) = {
const exit = exec::exit(&r); fmt::printfln("child exited: {}", exec::exitstr(exit))!; const loop = ev::getloop(child); ev::stop(loop); };
use ev; use ev::dial; use fmt; use net::ip; use os; export fn main() void = { const loop = ev::newloop()!; defer ev::finish(&loop); const addr = os::args[1], svc = os::args[2]; dial::resolve(&loop, "tcp", addr, svc, &resolvecb, &loop)!; for (ev::dispatch(&loop, -1)!) void; }; fn resolvecb( user: nullable *opaque, r: (([]ip::addr, u16) | dial::error),
) void = {
) (void | nomem) = {
const loop = user: *ev::loop; defer ev::stop(loop); const (ip, port) = match (r) { case let ip: ([]ip::addr, u16) => yield ip; case let err: dial::error => fmt::fatal("Dial error:", dial::strerror(err)); }; for (let i = 0z; i < len(ip); i += 1) { fmt::printfln("{}:{}", ip::string(ip[i]), port)!; }; };
use ev; use ev::dial; use fmt; use io; use net::ip; use net::uri; use os; use strings; type state = struct { loop: *ev::loop, uri: *uri::uri, exit: int, buf: [os::BUFSZ]u8, }; export fn main() void = { const loop = ev::newloop()!; defer ev::finish(&loop); const uri = uri::parse(os::args[1])!; defer uri::finish(&uri); let state = state { loop = &loop, exit = os::status::SUCCESS, uri = &uri, buf = [0...], }; dial::dial_uri(&loop, "tcp", &uri, &dialcb, &state)!; for (ev::dispatch(&loop, -1)!) void; os::exit(state.exit); }; fn error(state: *state, details: str) void = { fmt::errorfln("Error: {}", details)!; state.exit = os::status::FAILURE; ev::stop(state.loop); };
fn dialcb(user: nullable *opaque, r: (*ev::file | dial::error)) void = {
fn dialcb(user: nullable *opaque, r: (*ev::file | dial::error)) (void | nomem) = {
let state = user: *state; const file = match (r) { case let file: *ev::file => yield file; case let err: dial::error => error(state, dial::strerror(err)); return; }; ev::setuser(file, state); const s = fmt::bsprintf(state.buf, "GET {} HTTP/1.1\r\n" "Host: {}\r\n" "Connection: close\r\n\r\n", if (state.uri.path == "") "/" else state.uri.path, match (state.uri.host) { case let s: str => yield s; case let ip: ip::addr => yield ip::string(ip);
});
})?;
ev::write(file, &writecb, state.buf[..len(s)]); };
fn writecb(file: *ev::file, r: (size | io::error)) void = {
fn writecb(file: *ev::file, r: (size | io::error)) (void | nomem) = {
const state = ev::getuser(file): *state; const z = match (r) { case let z: size => yield z; case let err: io::error => error(state, io::strerror(err)); return; }; ev::read(file, &readcb, state.buf); };
fn readcb(file: *ev::file, r: (size | io::EOF | io::error)) void = {
fn readcb(file: *ev::file, r: (size | io::EOF | io::error)) (void | nomem) = {
const state = ev::getuser(file): *state; const z = match (r) { case let z: size => yield z; case io::EOF => ev::stop(state.loop); return; case let err: io::error => error(state, io::strerror(err)); return; }; io::writeall(os::stdout, state.buf[..z])!; ev::read(file, &readcb, state.buf); };
use ev; use ev::client; use net::uri; use net::http; use ev::dial; use net; use io; use errors; use log; use os; use fmt; type state = struct { loop: *ev::loop, client: *http::client, exit: int, }; export fn main() void = { const loop = ev::newloop()!; defer ev::finish(&loop);
const client = http::newclient("Hare net::http test client");
const client = http::newclient("Hare net::http test client")!;
defer http::client_finish(&client); const targ = uri::parse("http://127.0.0.1:8080")!; defer uri::finish(&targ); let req = http::new_request(&client, "GET", &targ)!; let state = state { loop = &loop, client = &client, ... }; const con = match (client::http_do(&loop, &client, &req, &http_done, &state)) { case let req: ev::req => yield req; case let err: dial::error => log::println("dial error:", dial::strerror(err)); return; case let err: io::error => log::println("io error:", io::strerror(err)); return; }; for (ev::dispatch(&loop, -1)!) void; }; fn http_done( user: nullable *opaque, r: ( (*ev::file, http::response) | io::EOF | dial::error | io::error | http::protoerr | errors::unsupported ) ) void = { const state = user: *state; const (file, resp) = match (r) { case let resp: (*ev::file, http::response) => yield resp; case io::EOF => log::println("connection closed"); ev::stop(state.loop); return; case let err: dial::error => log::println("dial error", dial::strerror(err)); ev::stop(state.loop); return; case let err: io::error => log::println("io error:", io::strerror(err)); ev::stop(state.loop); return; case let err: http::protoerr => log::println("http protoerror:", http::strerror(err)); ev::stop(state.loop); return; case let err: errors::unsupported => log::println("errors:", errors::strerror(err)); ev::stop(state.loop); return; }; fmt::printfln("Headers:")!; http::write_header(os::stdout, &resp.header)!; fmt::println()!; io::copy(os::stdout, resp.body)!; ev::close(file); ev::stop(state.loop); };
use errors; use ev; use ev::server; use fmt; use io; use log; use memio; use net::http; use net::ip; use net::tcp; use net; use os; use unix::signal; type state = struct { loop: *ev::loop, exit: int, }; export fn main() void = { const loop = ev::newloop()!; defer ev::finish(&loop); const sock = match (ev::listen_tcp(&loop, ip::LOCAL_V4, 8080, tcp::reuseaddr)) { case let err: net::error => log::fatalf("Error: listen: {}", net::strerror(err)); case let err: errors::error => log::fatalf("Error: listen: {}", errors::strerror(err)); case let sock: *ev::file => yield sock; }; defer ev::close(sock); let state = state { loop = &loop, ... };
const server = server::http_serve(sock, &http_serve, &state);
const server = server::http_serve(sock, &http_serve, &state)!;
defer server::server_finish(server); const sig = ev::signal(&loop, &signal, signal::sig::INT, signal::sig::TERM)!; defer ev::close(sig); ev::setuser(sig, &state); log::println("Listening on 127.0.0.1:8080"); for (ev::dispatch(&loop, -1)!) void; os::exit(state.exit); };
fn signal(file: *ev::file, sig: signal::sig) void = {
fn signal(file: *ev::file, sig: signal::sig) (void | nomem) = {
log::printfln("Exiting due to {}", signal::signame(sig)); const state = ev::getuser(file): *state; ev::stop(state.loop); }; fn http_serve( user: nullable *opaque, req: http::request, rw: http::response_writer ) void = { const state = user: *state; const (ip, port) = http::peeraddr(&rw); log::printfln("Serving from {}:{}", ip::string(ip), port); const buf = memio::dynamic(); defer io::close(&buf)!; fmt::fprintfln(&buf, "Method: {}", req.method)!; fmt::fprintfln(&buf, "Path: {}", req.target.path)!; fmt::fprintfln(&buf, "Fragment: {}", req.target.fragment)!; fmt::fprintfln(&buf, "Query: {}", req.target.query)!; fmt::fprintfln(&buf, "Headers:")!; http::write_header(&buf, &req.header)!; fmt::fprintln(&buf)!; io::copy(&buf, req.body)!; io::seek(&buf, 0, io::whence::SET)!;
http::response_add_header(&rw, "Content-Type", "text/plain");
http::response_add_header(&rw, "Content-Type", "text/plain")!;
http::response_set_body(&rw, &buf); http::response_write(&rw)!; };
use errors; use ev; use fmt; use io; use net; use net::ip; use os; type state = struct { loop: *ev::loop, stdin: *ev::file, stdout: *ev::file, sock: *ev::file, stdbuf: [os::BUFSZ]u8, netbuf: [os::BUFSZ]u8, wbuf: []u8, }; export fn main() void = { const loop = ev::newloop()!; defer ev::finish(&loop); let state = state { loop = &loop, stdin = ev::register(&loop, os::stdin_file)!, stdout = ev::register(&loop, os::stdout_file)!, sock = null: *ev::file, // populated by [[connected]] ... }; ev::setuser(state.stdin, &state); ev::setuser(state.stdout, &state); match (ev::connect_tcp(&loop, &connected, ip::LOCAL_V4, 12345, &state)) { case let err: net::error => fmt::fatal("Error: connect:", net::strerror(err)); case let err: errors::error => fmt::fatal("Error: connect:", errors::strerror(err)); case => yield; }; for (ev::dispatch(&loop, -1)!) void; };
fn connected(result: (*ev::file | net::error), user: nullable *opaque) void = {
fn connected(result: (*ev::file | net::error), user: nullable *opaque) (void | nomem) = {
const state = user: *state; const sock = match (result) { case let err: net::error => fmt::fatal("Error: connect:", net::strerror(err)); case let sock: *ev::file => yield sock; }; ev::setuser(sock, state); state.sock = sock; ev::read(state.sock, &sock_read, state.netbuf);
ev::readable(state.stdin, &stdin_readable);
ev::readable(state.stdin, &stdin_readable)?;
};
fn stdin_readable(file: *ev::file) void = {
fn stdin_readable(file: *ev::file) (void | nomem) = {
const state = ev::getuser(file): *state; const n = match (io::read(os::stdin_file, state.stdbuf)) { case let n: size => yield n; case io::EOF => ev::stop(state.loop); return; case let err: io::error => fmt::errorln("Error: read:", io::strerror(err))!; ev::stop(state.loop); return; }; state.wbuf = state.stdbuf[..n]; ev::write(state.sock, &sock_write, state.wbuf); };
fn sock_read(file: *ev::file, r: (size | io::EOF | io::error)) void = {
fn sock_read(file: *ev::file, r: (size | io::EOF | io::error)) (void | nomem) = {
const state = ev::getuser(file): *state; const n = match (r) { case let n: size => yield n; case io::EOF => ev::stop(state.loop); return; case let err: io::error => fmt::errorln("Error: read:", io::strerror(err))!; ev::stop(state.loop); return; }; io::write(os::stdout_file, state.netbuf[..n])!; ev::read(state.sock, &sock_read, state.netbuf); };
fn sock_write(file: *ev::file, r: (size | io::error)) void = {
fn sock_write(file: *ev::file, r: (size | io::error)) (void | nomem) = {
const state = ev::getuser(file): *state; const n = match (r) { case let n: size => yield n; case let err: io::error => fmt::errorln("Error: read:", io::strerror(err))!; ev::stop(state.loop); return; }; static delete(state.wbuf[..n]); if (len(state.wbuf) != 0) { ev::write(state.sock, &sock_write, state.wbuf); } else {
ev::readable(state.stdin, &stdin_readable);
ev::readable(state.stdin, &stdin_readable)?;
}; };
use errors; use ev; use io; use log; use net; use net::ip; use net::tcp; use os; use unix::signal; type server = struct { loop: *ev::loop, sock: *ev::file, clients: []*client, exit: int, }; type client = struct { server: *server, sock: *ev::file, addr: ip::addr, port: u16, buf: [os::BUFSZ]u8, wbuf: []u8, }; export fn main() void = { const loop = ev::newloop()!; defer ev::finish(&loop); const sock = match (ev::listen_tcp(&loop, ip::LOCAL_V4, 12345)) { case let err: net::error => log::fatalf("Error: listen: {}", net::strerror(err)); case let err: errors::error => log::fatalf("Error: listen: {}", errors::strerror(err)); case let sock: *ev::file => yield sock; }; defer ev::close(sock); let state = server { loop = &loop, sock = sock, ... }; ev::setuser(sock, &state); ev::accept(sock, &server_accept); const sig = ev::signal(&loop, &signal, signal::sig::INT, signal::sig::TERM)!; defer ev::close(sig); ev::setuser(sig, &state); log::println("Listening on 127.0.0.1:12345"); for (ev::dispatch(&loop, -1)!) void; os::exit(state.exit); };
fn signal(file: *ev::file, sig: signal::sig) void = {
fn signal(file: *ev::file, sig: signal::sig) (void | nomem) = {
log::printfln("Exiting due to {}", signal::signame(sig)); const server = ev::getuser(file): *server; ev::stop(server.loop); };
fn server_accept(sock: *ev::file, r: (*ev::file | net::error)) void = {
fn server_accept(sock: *ev::file, r: (*ev::file | net::error)) (void | nomem) = {
let server = ev::getuser(sock): *server; const sock = match (r) { case let sock: *ev::file => yield sock; case let err: net::error => log::printfln("Error: accept: {}", net::strerror(err)); ev::stop(server.loop); server.exit = 1; return; }; const file = ev::getfd(sock); const (addr, port) = tcp::peeraddr(file) as (ip::addr, u16); log::printfln("Connection from {}:{}", ip::string(addr), port); const client = alloc(client { server = server, sock = sock, addr = addr, port = port, ... })!; append(server.clients, client)!; ev::setuser(client.sock, client); ev::read(client.sock, &client_read, client.buf); ev::accept(server.sock, &server_accept); };
fn client_read(sock: *ev::file, r: (size | io::EOF | io::error)) void = {
fn client_read(sock: *ev::file, r: (size | io::EOF | io::error)) (void | nomem) = {
const client = ev::getuser(sock): *client; const n = match (r) { case let err: io::error => log::printfln("{}:{}: Error: read: {}", ip::string(client.addr), client.port, io::strerror(err));
client_close(client);
client_close(client)?;
return; case io::EOF =>
client_close(client);
client_close(client)?;
return; case let n: size => yield n; }; client.wbuf = client.buf[..n]; ev::write(client.sock, &client_write, client.wbuf); };
fn client_write(sock: *ev::file, r: (size | io::error)) void = {
fn client_write(sock: *ev::file, r: (size | io::error)) (void | nomem) = {
const client = ev::getuser(sock): *client; const n = match (r) { case let err: io::error => log::printfln("{}:{}: Error: write: {}", ip::string(client.addr), client.port, io::strerror(err));
client_close(client);
client_close(client)?;
return; case let n: size => yield n; }; static delete(client.wbuf[..n]); if (len(client.wbuf) != 0) { ev::write(client.sock, &client_write, client.wbuf); } else { ev::read(client.sock, &client_read, client.buf); }; };
fn client_close(client: *client) void = {
fn client_close(client: *client) (void | nomem) = {
const server = client.server; for (let i = 0z; i < len(server.clients); i += 1) { if (server.clients[i] == client) { delete(server.clients[i]); break; }; }; log::printfln("{}:{}: Connection closed", ip::string(client.addr), client.port); ev::close(client.sock); free(client); };
use ev; use log; use time; export fn main() void = { const loop = ev::newloop()!; defer ev::finish(&loop); const timer = ev::newtimer(&loop, &expired, time::clock::MONOTONIC)!; ev::timer_configure(timer, 1 * time::SECOND, 500 * time::MILLISECOND); for (ev::dispatch(&loop, -1)!) void; };
fn expired(timer: *ev::file) void = {
fn expired(timer: *ev::file) (void | nomem) = {
log::println("timer expired"); };
// TODO: Update me when we can send/recv in parallel use ev; use fmt; use io; use net::ip; use net; use os; type state = struct { loop: *ev::loop, stdin: *ev::file, stdout: *ev::file, sock: *ev::file, stdbuf: [os::BUFSZ]u8, netbuf: [os::BUFSZ]u8, wbuf: []u8, }; export fn main() void = { const loop = ev::newloop()!; defer ev::finish(&loop); const sock = ev::connect_udp(&loop, ip::LOCAL_V4, 12345)!; defer ev::close(sock); let state = state { loop = &loop, stdin = ev::register(&loop, os::stdin_file)!, stdout = ev::register(&loop, os::stdout_file)!, sock = sock, ... }; ev::setuser(state.stdin, &state); ev::setuser(state.stdout, &state); ev::setuser(state.sock, &state);
ev::readable(state.stdin, &stdin_readable);
ev::readable(state.stdin, &stdin_readable)!;
for (ev::dispatch(&loop, -1)!) void; };
fn stdin_readable(file: *ev::file) void = {
fn stdin_readable(file: *ev::file) (void | nomem) = {
const state = ev::getuser(file): *state; const n = match (io::read(os::stdin_file, state.stdbuf)) { case let n: size => yield n; case io::EOF => ev::stop(state.loop); return; case let err: io::error => fmt::errorln("Error: read:", io::strerror(err))!; ev::stop(state.loop); return; }; state.wbuf = state.stdbuf[..n]; ev::send(state.sock, &sock_send, state.wbuf); };
fn sock_recv(file: *ev::file, r: (size | net::error)) void = {
fn sock_recv(file: *ev::file, r: (size | net::error)) (void | nomem) = {
const state = ev::getuser(file): *state; const n = match (r) { case let n: size => yield n; case let err: net::error => fmt::errorln("Error: recv:", net::strerror(err))!; ev::stop(state.loop); return; }; io::write(os::stdout_file, state.netbuf[..n])!; };
fn sock_send(file: *ev::file, r: (size | net::error)) void = {
fn sock_send(file: *ev::file, r: (size | net::error)) (void | nomem) = {
const state = ev::getuser(file): *state; const n = match (r) { case let n: size => yield n; case let err: net::error => fmt::errorln("Error: send:", net::strerror(err))!; ev::stop(state.loop); return; }; static delete(state.wbuf[..n]); if (len(state.wbuf) != 0) { ev::send(state.sock, &sock_send, state.wbuf); } else { ev::recv(state.sock, &sock_recv, state.netbuf);
ev::readable(state.stdin, &stdin_readable);
ev::readable(state.stdin, &stdin_readable)?;
}; };
use errors; use ev; use log; use net; use net::ip; use os; type state = struct { loop: *ev::loop, src: ip::addr, port: u16, buf: [os::BUFSZ]u8, wbuf: []u8, }; export fn main() void = { const loop = ev::newloop()!; defer ev::finish(&loop); const sock = match (ev::listen_udp(&loop, ip::LOCAL_V4, 12345)) { case let err: net::error => log::fatalf("Error: listen: {}", net::strerror(err)); case let err: errors::error => log::fatalf("Error: listen: {}", errors::strerror(err)); case let sock: *ev::file => yield sock; }; defer ev::close(sock); let state = state { loop = &loop, src = [0: u8, 0: u8, 0: u8, 0: u8], ... }; ev::setuser(sock, &state); ev::recvfrom(sock, &recv, state.buf); log::println("Listening on 127.0.0.1:12345"); for (ev::dispatch(&loop, -1)!) void; };
fn recv(sock: *ev::file, r: ((size, ip::addr, u16) | net::error)) void = {
fn recv(sock: *ev::file, r: ((size, ip::addr, u16) | net::error)) (void | nomem) = {
const state = ev::getuser(sock): *state; const (n, src, port) = match (r) { case let err: net::error => log::println("Error: recv:", net::strerror(err)); ev::stop(state.loop); return; case let packet: (size, ip::addr, u16) => yield packet; }; // TODO: Make ev send/write all data and drop these fields and the need // to manually manage a write buffer and re-call sendto in &send state.src = src; state.port = port; log::printfln("{} bytes from {}:{}", n, ip::string(src), port); state.wbuf = state.buf[..n]; ev::sendto(sock, &send, state.wbuf, src, port); };
fn send(sock: *ev::file, r: (size | net::error)) void = {
fn send(sock: *ev::file, r: (size | net::error)) (void | nomem) = {
const state = ev::getuser(sock): *state; const n = match (r) { case let err: net::error => log::println("Error: send:", net::strerror(err)); ev::stop(state.loop); return; case let n: size => yield n; }; static delete(state.wbuf[..n]); if (len(state.wbuf) != 0) { ev::sendto(sock, &send, state.wbuf, state.src, state.port); } else { ev::recvfrom(sock, &recv, state.buf); }; };
use io; use rt; // A callback for a [[read]] or [[readv]] operation.
export type readcb = fn(file: *file, result: (size | io::EOF | io::error)) void;
export type readcb = fn(file: *file, result: (size | io::EOF | io::error)) (void | nomem);
// Schedules a read operation on a file object. The provided buffer must be // valid for the duration of the read operation. export fn read( file: *file, cb: *readcb, buf: []u8, ) req = { file.rvbuf = io::mkvector(buf); // XXX: Bit of a hack to avoid allocating a slice const vec = (&file.rvbuf: *[*]io::vector)[..1]; return readv(file, cb, vec...); }; // Schedules a vectored read operation on a file object. The provided vectors // must be valid for the duration of the read operation. export fn readv( file: *file, cb: *readcb, vec: io::vector... ) req = { assert(file.op & op::READV == 0); if (file.flags & fflags::BLOCKING != 0) { const r = io::readv(file.fd, vec...);
cb(file, r);
cb(file, r)!;
return req { ... }; }; file.op |= op::READV; file.cb = cb; file.rvec = vec; file_epoll_ctl(file); return mkreq(&readv_cancel, file); }; fn readv_ready(file: *file, ev: *rt::epoll_event) void = { assert(file.op & op::READV != 0); assert(file.cb != null); const cb = file.cb: *readcb; file.op &= ~op::READV; file_epoll_ctl(file); if (ev.events & rt::EPOLLHUP != 0) {
cb(file, io::EOF);
cb(file, io::EOF)!;
} else { const vec = file.rvec: []io::vector; const r = io::readv(file.fd, vec...);
cb(file, r);
cb(file, r)!;
}; }; fn readv_cancel(req: *req) void = { const file = req.user: *file; assert(file.op & op::READV != 0); file.op &= ~op::READV; file_epoll_ctl(file); }; // A callback for a [[write]] or [[writev]] operation.
export type writecb = fn(file: *file, result: (size | io::error)) void;
export type writecb = fn(file: *file, result: (size | io::error)) (void | nomem);
// Schedules a write operation on a file object. The provided buffer must be // valid for the duration of the write operation. export fn write( file: *file, cb: *writecb, buf: []u8, ) req = { file.wvbuf = io::mkvector(buf); // XXX: Bit of a hack to avoid allocating a slice const vec = (&file.wvbuf: *[*]io::vector)[..1]; return writev(file, cb, vec...); }; // Schedules a vectored read operation on a file object. The provided buffer // must be valid for the duration of the write operation. export fn writev( file: *file, cb: *writecb, vec: io::vector... ) req = { assert(file.op & op::WRITEV == 0); if (file.flags & fflags::BLOCKING != 0) { const r = io::writev(file.fd, vec...);
cb(file, r);
cb(file, r)!;
return req { ... }; }; file.op |= op::WRITEV; file.cb2 = cb; file.wvec = vec; file_epoll_ctl(file); return mkreq(&writev_cancel, file); }; fn writev_ready(file: *file, ev: *rt::epoll_event) void = { assert(file.op & op::WRITEV != 0); assert(file.cb != null); const vec = file.wvec: []io::vector; const r = io::writev(file.fd, vec...); const cb = file.cb2: *writecb; file.op &= ~op::WRITEV; file_epoll_ctl(file);
cb(file, r);
cb(file, r)!;
}; fn writev_cancel(req: *req) void = { const file = req.user: *file; assert(file.op & op::WRITEV != 0); file.op &= ~op::WRITEV; file_epoll_ctl(file); };
use errors; use io; use rt; use sort; use time; use types; use unix::signal; // Dispatch callback. See [[ondispatch]].
export type dispatchcb = fn(loop: *loop, user: nullable *opaque) void;
export type dispatchcb = fn(loop: *loop, user: nullable *opaque) (void | nomem);
export type ondispatch = struct { cb: *dispatchcb, user: nullable *opaque, loop: *loop, }; export type loop = struct { fd: io::file, events: []rt::epoll_event, dispatch: []*ondispatch, stop: bool, }; // Creates a new event loop. The user must pass the return value to [[finish]] // to free associated resources when done using the loop. // // The optional "events" parameter controls how many events may be pending at // once. Most applications should not need to configure this parameter. export fn newloop(events: size = 256) (loop | nomem | errors::error) = { const fd = match (rt::epoll_create1(rt::EPOLL_CLOEXEC)) { case let fd: int => yield fd: io::file; case let err: rt::errno => return errors::errno(err); }; return loop { fd = fd, events = alloc([rt::epoll_event { events = 0, data = rt::epoll_data { fd = 0, } }...], events)?, dispatch = [], stop = false, }; }; // Frees resources associated with an event loop. Must only be called once per // event loop object. Calling finish invalidates all I/O objects associated with // the event loop. export fn finish(loop: *loop) void = { free(loop.events); io::close(loop.fd)!; }; // Returns an [[io::file]] for this event loop which can be polled on when // events are available for processing, for chaining together different event // loops. The exact semantics of this function are platform-specific, and it may // not be available for all implementations. export fn loop_file(loop: *loop) io::file = { return loop.fd; }; // Registers a callback to be invoked before the next call to [[dispatch]] // processes requests. The callback may schedule additional I/O requests to be // processed in this batch. // // Dispatch callbacks are only called once. If you wish to schedule another // dispatch callback after the first, call [[do]] again. export fn do( loop: *loop, cb: *dispatchcb, user: nullable *opaque = null, ) (req | nomem) = { const dispatch = alloc(ondispatch { cb = cb, user = user, loop = loop, })?; append(loop.dispatch, dispatch)?; return mkreq(&do_cancel, dispatch); }; fn do_cancel(req: *req) void = { const dispatch = req.user: *ondispatch; const loop = dispatch.loop; for (let i = 0z; i < len(loop.dispatch); i += 1) { if (loop.dispatch[i] == dispatch) { delete(loop.dispatch[i]); break; }; }; free(dispatch); }; // Dispatches the event loop, waiting for new events and calling their callbacks // as appropriate. // // A timeout of -1 will block indefinitely until the next event occurs. A // timeout of 0 will cause dispatch to return immediately if no events are // available to process. Portable use of the timeout argument supports only // millisecond granularity of up to 24 days ([[types::INT_MAX]] milliseconds). // Negative values other than -1 will cause the program to abort. // // Returns false if the loop has been stopped via [[stop]], or true otherwise. export fn dispatch( loop: *loop, timeout: time::duration,
) (bool | errors::error) = {
) (bool | errors::error | nomem) = {
const millis: int = if (timeout == -1) { yield -1; } else if (timeout < 0) { abort("ev::dispatch: invalid timeout"); } else { yield (timeout / time::MILLISECOND): int; }; if (loop.stop) { return false; }; let todo = loop.dispatch; loop.dispatch = []; for (let dispatch .. todo) {
dispatch.cb(loop, dispatch.user);
dispatch.cb(loop, dispatch.user)?;
free(dispatch); }; free(todo); if (len(loop.events) == 0) { return true; }; // TODO: Deal with signals const maxev = len(loop.events); assert(maxev <= types::INT_MAX: size, "ev::dispatch: too many events"); const nevent = match(rt::epoll_pwait( loop.fd, &loop.events[0], maxev: int, millis, null)) { case let nevent: int => yield nevent; case let err: rt::errno => switch (err) { case rt::EINTR => // We shallow system suspension error code return true; case => abort("ev::dispatch: epoll_pwait failure"); }; }; if (nevent == 0) { return true; }; sort::sort(loop.events[..nevent], size(rt::epoll_event), &sort_events); for (let ev &.. loop.events[..nevent]) { const file = ev.data.ptr: *file; if (ev.events == 0) { continue; }; const pending = file.op; if (ev.events & (rt::EPOLLIN | rt::EPOLLHUP) != 0 && file.op & op::READV != 0) { readv_ready(file, ev); }; if (ev.events & (rt::EPOLLOUT | rt::EPOLLHUP) != 0 && file.op & op::WRITEV != 0) { writev_ready(file, ev); }; switch (pending) { case op::NONE => abort("No operation pending for ready object"); case op::READABLE =>
readable_ready(file, ev);
readable_ready(file, ev)?;
case op::WRITABLE =>
writable_ready(file, ev);
writable_ready(file, ev)?;
case op::ACCEPT =>
accept_ready(file, ev);
accept_ready(file, ev)?;
case op::CONNECT_TCP =>
connect_tcp_ready(file, ev);
connect_tcp_ready(file, ev)?;
case op::CONNECT_UNIX =>
connect_unix_ready(file, ev);
connect_unix_ready(file, ev)?;
case op::SIGNAL =>
signal_ready(file, ev);
signal_ready(file, ev)?;
case op::TIMER =>
timer_ready(file, ev);
timer_ready(file, ev)?;
case op::SENDTO =>
sendto_ready(file, ev);
sendto_ready(file, ev)?;
case op::RECVFROM =>
recvfrom_ready(file, ev);
recvfrom_ready(file, ev)?;
case op::SEND =>
send_ready(file, ev);
send_ready(file, ev)?;
case op::RECV =>
recv_ready(file, ev);
recv_ready(file, ev)?;
case op::WAIT =>
wait_ready(file, ev);
wait_ready(file, ev)?;
case => assert(pending & ~(op::READV | op::WRITEV) == 0); }; }; return !loop.stop; }; // Signals the loop to stop processing events. If called during a callback, it // will cause that invocation of [[dispatch]] to return false. Otherwise, false // will be returned only upon the next call to [[dispatch]]. export fn stop(loop: *loop) void = { loop.stop = true; };
use rt; // A callback for a [[readable]] operation.
export type readablecb = fn(file: *file) void;
export type readablecb = fn(file: *file) (void | nomem);
// Executes the callback when a given file is readable. Cannot be combined with // [[read]] or [[readv]]. export fn readable( file: *file, cb: *readablecb,
) req = {
) (req | nomem)= {
assert(file.op & op::READABLE == 0 && file.op & op::READV == 0); if (file.flags & fflags::BLOCKING != 0) {
cb(file);
cb(file)?;
return req { ... }; }; file.op |= op::READABLE; file.cb = cb; file_epoll_ctl(file); return mkreq(&readable_cancel, file); };
fn readable_ready(file: *file, ev: *rt::epoll_event) void = {
fn readable_ready(file: *file, ev: *rt::epoll_event) (void | nomem) = {
assert(file.op & op::READABLE != 0); assert(file.cb != null); const cb = file.cb: *readablecb; file.op &= ~op::READABLE; file_epoll_ctl(file);
cb(file);
cb(file)?;
}; fn readable_cancel(req: *req) void = { const file = req.user: *file; assert(file.op & op::READABLE != 0); file.op &= ~op::READABLE; file_epoll_ctl(file); }; // A callback for a [[writable]] operation.
export type writablecb = fn(file: *file) void;
export type writablecb = fn(file: *file) (void | nomem);
// Executes the callback when a given file is writable. Cannot be combined with // [[write]] or [[writev]]. export fn writable( file: *file, cb: *writablecb,
) req = {
) (req | nomem)= {
assert(file.op & op::WRITABLE == 0 && file.op & op::WRITEV == 0); if (file.flags & fflags::BLOCKING != 0) {
cb(file);
cb(file)?;
return req { ... }; }; file.op |= op::WRITABLE; file.cb = cb; file_epoll_ctl(file); return mkreq(&writable_cancel, file); };
fn writable_ready(file: *file, ev: *rt::epoll_event) void = {
fn writable_ready(file: *file, ev: *rt::epoll_event) (void | nomem) = {
assert(file.op & op::WRITABLE != 0); assert(file.cb != null); const cb = file.cb: *writablecb; file.op &= ~op::WRITABLE; file_epoll_ctl(file);
cb(file);
cb(file)?;
}; fn writable_cancel(req: *req) void = { const file = req.user: *file; assert(file.op & op::WRITABLE != 0); file.op &= ~op::WRITABLE; file_epoll_ctl(file); };
use errors; use os::exec; use rt; use time; use unix::signal::{sig, code}; // Starts a child process prepared with [[exec::cmd]] and returns a [[file]] // that can be used to [[wait]] for it to exit. export fn exec( loop: *loop, cmd: *exec::command, ) (*file | errors::error | nomem) = { let pidfd: int = 0; let args = rt::clone_args { flags = rt::CLONE_PIDFD, pidfd = &pidfd: uintptr: u64, exit_signal = sig::CHLD: u64, ... }; const pid = match (rt::clone3(&args)) { case let pid: rt::pid_t => yield pid; case let errno: rt::errno => return errors::errno(errno); }; if (pid == 0) { exec::exec(cmd); }; return register(loop, pidfd); }; // A callback for a [[wait]] operation.
export type waitcb = fn(file: *file, result: exec::status) void;
export type waitcb = fn(file: *file, result: exec::status) (void | nomem);
// Waits for a process to exit. After the callback is run, the [[file]] is // closed and unregistered from the event loop and may not be used again. export fn wait(proc: *file, cb: *waitcb) req = { assert(proc.op & op::WAIT == 0); // Attempt to waitid now to hit error cases early match (do_waitid(proc, cb, rt::WNOHANG)) { case let err: rt::errno => assert(err == rt::EAGAIN, "ev::wait: unexpected error from waitid (not a pidfd?)"); case void => // Child has already exited. do_waitid ran the callback, so we // can return with no further fanfare return req { ... }; }; proc.op |= op::WAIT; proc.cb = cb; file_epoll_ctl(proc); return mkreq(&wait_cancel, proc); }; // Calls [[rt::waitid]] with the given options. If successful, calls the // callback and cleans up the file (and returns void). If unsuccessful, returns // the error.
fn do_waitid(proc: *file, cb: *waitcb, options: int = 0) (void | rt::errno) = {
fn do_waitid(proc: *file, cb: *waitcb, options: int = 0) (void | rt::errno | nomem) = {
def WAIT_OPTIONS = rt::WEXITED | rt::WSTOPPED | rt::WCONTINUED; let si = rt::siginfo { ... }; let ru = rt::rusage { ... }; match (rt::waitid(rt::idtype::P_PIDFD, proc.fd: rt::id_t, &si, WAIT_OPTIONS | options, &ru)) { case let err: rt::errno => return err; case void => if (si.si_pid == 0) { assert(options & rt::WNOHANG != 0); return rt::EAGAIN; }; }; // Convert the siginfo data into a wait(2)-style exit status let status = 0i; if (si.si_code == code::EXITED) { status = si.si_status << 8; } else { status = si.si_status; }; let st = exec::status { status = status, ... }; rusage(&st, &ru);
cb(proc, st);
cb(proc, st)?;
close(proc); }; fn wait_cancel(req: *req) void = { const proc = req.user: *file; assert(proc.op & op::WAIT != 0); proc.op &= ~op::WAIT; file_epoll_ctl(proc); };
fn wait_ready(proc: *file, ev: *rt::epoll_event) void = {
fn wait_ready(proc: *file, ev: *rt::epoll_event) (void | nomem) = {
assert(proc.op & op::WAIT != 0); assert(proc.cb != null); const cb = proc.cb: *waitcb; proc.op &= ~op::WAIT; file_epoll_ctl(proc); // waitid should not fail at this point for (true) { match (do_waitid(proc, cb)) { case void => break; case let err: rt::errno => // Interrupted by signal handler, go again assert(err == rt::EINTR); }; }; }; // Copied from os::exec fn rusage(st: *exec::status, ru: *rt::rusage) void = { st.rusage.utime = time::instant { sec = ru.ru_utime.tv_sec, nsec = ru.ru_utime.tv_usec * time::MICROSECOND: i64, }; st.rusage.stime = time::instant { sec = ru.ru_stime.tv_sec, nsec = ru.ru_stime.tv_usec * time::MICROSECOND: i64, }; st.rusage.maxrss = ru.ru_maxrss; st.rusage.minflt = ru.ru_minflt; st.rusage.majflt = ru.ru_majflt; st.rusage.inblock = ru.ru_inblock; st.rusage.oublock = ru.ru_oublock; st.rusage.nvcsw = ru.ru_nvcsw; st.rusage.nivcsw = ru.ru_nivcsw; }; // Sends a signal to a process started with [[ev::exec]]. export fn kill(child: *file, sig: sig = sig::TERM) (void | errors::error) = { match (rt::pidfd_send_signal(child.fd, sig, null, 0)) { case void => void; case let err: rt::errno => return errors::errno(err); }; };
// TODO: Expose full siginfo data for non-portable use use errors; use rt; use unix::signal; // Callback function for [[signal]] operations.
export type signalcb = fn(file: *file, sig: signal::sig) void;
export type signalcb = fn(file: *file, sig: signal::sig) (void | nomem);
// Registers a signal handler with this event loop. The signals specified will // be masked so they are only raised via the provided callback. Closing this // file will unmask the signals. // // It is not necessary to call [[signal]] again after the callback has // processed; it will automatically re-register the operation for subsequent // signals. export fn signal( loop: *loop, cb: *signalcb, signals: signal::sig... ) (*file | errors::error | nomem) = { const fd = signal::signalfd(signals...)?; const file = register(loop, fd)?; file.op = op::SIGNAL; file.cb = cb; file_epoll_ctl(file); signal::sigset_empty(&file.sigmask); signal::sigset_add(&file.sigmask, signals...); signal::block(signals...); return file; }; fn signal_restore(file: *file) void = { assert(file.op == op::SIGNAL); let buf: [rt::NSIG]signal::sig = [0...]; let signals = buf[..0]; for (let i = 1; i < rt::NSIG; i += 1) { const sig = i: signal::sig; if (signal::sigset_member(&file.sigmask, sig)) { static append(signals, sig)!; }; }; signal::unblock(signals...); };
fn signal_ready(file: *file, ev: *rt::epoll_event) void = {
fn signal_ready(file: *file, ev: *rt::epoll_event) (void | nomem) = {
assert(file.op == op::SIGNAL); assert(file.cb != null); const cb = file.cb: *signalcb; const info = signal::read(file.fd)!;
cb(file, info.signo);
cb(file, info.signo)?;
};
use errors; use net; use net::ip; use net::tcp; use net::udp; use net::unix; use rt; // Creates a socket which listens for incoming TCP connections on the given // IP address and port. export fn listen_tcp( loop: *loop, addr: ip::addr, port: u16, opts: tcp::listen_option... ) (*file | net::error | errors::error | nomem) = { const sock = tcp::listen(addr, port, opts...)?; return register(loop, sock)?; }; // Creates a socket which listens for incoming UDP packets on the given IP // address and port. export fn listen_udp( loop: *loop, addr: ip::addr, port: u16, opts: udp::listen_option... ) (*file | net::error | errors::error | nomem) = { const sock = udp::listen(addr, port, opts...)?; return register(loop, sock)?; }; // Creates a UNIX domain socket at the given path. export fn listen_unix( loop: *loop, addr: unix::addr, opts: unix::listen_option... ) (*file | net::error | errors::error | nomem) = { const sock = unix::listen(addr, opts...)?; return register(loop, sock)?; }; // Creates a UDP socket on this event loop and sets the default destination to // the given address. export fn connect_udp( loop: *loop, dest: ip::addr, port: u16, opts: udp::connect_option... ) (*file | net::error | errors::error | nomem) = { const sock = udp::connect(dest, port, opts...)?; const file = register(loop, sock)?; return file; };
export type connectcb = fn(result: (*file | net::error), user: nullable *opaque) void;
export type connectcb = fn(result: (*file | net::error), user: nullable *opaque) (void | nomem);
// Creates a socket and connects to a given IP address and port over TCP. // // The variadic arguments accept [[net::sockflag]] and/or no more than one user // data pointer. If the user data pointer is provided, it will be passed to the // callback. This allows the user to pass a state object through the connection // process: // // let user: state = // ... // ev::connect_tcp(&loop, &connected, addr, port, &user); // // fn connected(result: (*ev::file | net::error), user: nullable *opaque) void = { // let user = user: *state; // }; // // The user data object provided will be assigned to the [[file]] which is // provided to the callback after the connection is established. // // If you don't need a user data object you can just omit it: // // ev::connect_tcp(&loop, &connected, addr, port); export fn connect_tcp( loop: *loop, cb: *connectcb, addr: ip::addr, port: u16, opts: (net::sockflag | *opaque)... ) (req | net::error | errors::error | nomem) = { // XXX: This doesn't let us set keepalive let opt: net::sockflag = 0; let user: nullable *opaque = null; for (let i = 0z; i < len(opts); i += 1) { match (opts[i]) { case let o: net::sockflag => opt |= o; case let u: *opaque => assert(user == null); user = u; }; }; const sock = tcp::connect(addr, port, opt | net::sockflag::NONBLOCK)?; let file = register(loop, sock)?; file.user = user; file.cb = cb; file.op = op::CONNECT_TCP; file_epoll_ctl(file); return mkreq(&connect_tcp_cancel, file); }; // Connects to a UNIX domain socket. // // The variadic arguments accept [[net::sockflag]] and/or no more than one user // data pointer. If the user data pointer is provided, it will be passed to the // callback. This allows the user to pass a state object through the connection // process: // // let user: state = // ... // ev::connect_user(&loop, &connected, addr, &user); // // fn connected(result: (*ev::file | net::error), user: nullable *opaque) void = { // let user = user: *state; // }; // // The user data object provided will be assigned to the [[file]] which is // provided to the callback after the connection is established. // // If you don't need a user data object you can just omit it: // // ev::connect_unix(&loop, &connected, addr); export fn connect_unix( loop: *loop, cb: *connectcb, addr: unix::addr, opts: (net::sockflag | *opaque)... ) (req | net::error | errors::error | nomem) = { let opt: net::sockflag = 0; let user: nullable *opaque = null; for (let i = 0z; i < len(opts); i += 1) { match (opts[i]) { case let o: net::sockflag => opt |= o; case let u: *opaque => assert(user == null); user = u; }; }; const sock = unix::connect(addr, opt | net::sockflag::NONBLOCK)?; let file = register(loop, sock)?; file.user = user; file.cb = cb; file.op = op::CONNECT_UNIX; file_epoll_ctl(file); return mkreq(&connect_unix_cancel, file); }; fn connect_tcp_ready( sock: *file, ev: *rt::epoll_event,
) void = {
) (void | nomem) = {
assert(sock.op == op::CONNECT_TCP); assert(ev.events & rt::EPOLLOUT != 0); assert(sock.cb != null); const cb = sock.cb: *connectcb; sock.op &= ~op::CONNECT_TCP; file_epoll_ctl(sock); let errno = 0i, optsz = size(int): u32; rt::getsockopt(sock.fd, rt::SOL_SOCKET, rt::SO_ERROR, &errno, &optsz)!; if (errno != 0) {
cb(errors::errno(errno), sock.user);
cb(errors::errno(errno), sock.user)?;
close(sock); } else { // XXX: If the user puts NONBLOCK into the opts provided at // [[connect_tcp]] we could try to preserve that here const fl = rt::fcntl(sock.fd, rt::F_GETFL, void)!; rt::fcntl(sock.fd, rt::F_SETFL, fl & ~rt::O_NONBLOCK)!;
cb(sock, sock.user);
cb(sock, sock.user)?;
}; }; fn connect_unix_ready( sock: *file, ev: *rt::epoll_event,
) void = {
) (void | nomem) = {
assert(sock.op == op::CONNECT_UNIX); assert(ev.events & rt::EPOLLOUT != 0); assert(sock.cb != null); const cb = sock.cb: *connectcb; sock.op &= ~op::CONNECT_UNIX; file_epoll_ctl(sock); let errno = 0i, optsz = size(int): u32; rt::getsockopt(sock.fd, rt::SOL_SOCKET, rt::SO_ERROR, &errno, &optsz)!; if (errno != 0) {
cb(errors::errno(errno), sock.user);
cb(errors::errno(errno), sock.user)?;
close(sock); } else { // XXX: If the user puts NONBLOCK into the opts provided at // [[connect_unix]] we could try to preserve that here const fl = rt::fcntl(sock.fd, rt::F_GETFL, void)!; rt::fcntl(sock.fd, rt::F_SETFL, fl & ~rt::O_NONBLOCK)!;
cb(sock, sock.user);
cb(sock, sock.user)?;
}; }; fn connect_tcp_cancel(req: *req) void = { const sock = req.user: *file; assert(sock.op == op::CONNECT_TCP); sock.op = op::NONE; file_epoll_ctl(sock); }; fn connect_unix_cancel(req: *req) void = { const sock = req.user: *file; assert(sock.op == op::CONNECT_UNIX); sock.op = op::NONE; file_epoll_ctl(sock); }; // A callback for an [[accept]] operation.
export type acceptcb = fn(file: *file, result: (*file | net::error)) void;
export type acceptcb = fn(file: *file, result: (*file | net::error)) (void | nomem);
// Schedules an accept operation on a socket. export fn accept( sock: *file, cb: *acceptcb, flags: net::sockflag... ) req = { assert(sock.op == op::NONE); let fl: net::sockflag = 0; for (let i = 0z; i < len(flags); i += 1) { fl |= flags[i]; }; sock.op = op::ACCEPT; sock.cb = cb; sock.sockflag = fl; file_epoll_ctl(sock); return mkreq(&accept_cancel, sock); }; fn accept_ready( sock: *file, ev: *rt::epoll_event,
) void = {
) (void | nomem) = {
assert(sock.op == op::ACCEPT); assert(ev.events & rt::EPOLLIN != 0); assert(sock.cb != null); const cb = sock.cb: *acceptcb; sock.op = op::NONE; file_epoll_ctl(sock); const r = tcp::accept(sock.fd, sock.sockflag); match (r) { case let fd: net::socket => // TODO: Bubble up errors from here? const file = register(sock.ev, fd)!;
cb(sock, file);
cb(sock, file)?;
case let err: net::error =>
cb(sock, err);
cb(sock, err)?;
}; }; fn accept_cancel(req: *req) void = { const sock = req.user: *file; assert(sock.op == op::ACCEPT); sock.op = op::NONE; file_epoll_ctl(sock); }; // TODO: Support recv & send in parallel // Callback for a [[recvfrom]] operation. The second parameter is either an // error or a tuple of the number of bytes received and the IP address and port // of the sender. export type recvfromcb = fn( file: *file, r: ((size, ip::addr, u16) | net::error),
) void;
) (void | nomem);
// Schedules a receive operation on a socket. export fn recvfrom( sock: *file, cb: *recvfromcb, buf: []u8, ) req = { assert(sock.op == op::NONE); sock.op = op::RECVFROM; sock.cb = cb; sock.sendrecv.rbuf = buf; file_epoll_ctl(sock); return mkreq(&recvfrom_cancel, sock); }; fn recvfrom_ready( sock: *file, ev: *rt::epoll_event,
) void = {
) (void | nomem) = {
assert(sock.op == op::RECVFROM); assert(sock.cb != null); const cb = sock.cb: *recvfromcb; sock.op = op::NONE; file_epoll_ctl(sock); let src: ip::addr = ip::ANY_V4, port = 0u16; match (udp::recvfrom(sock.fd, sock.sendrecv.rbuf, &src, &port)) { case let err: net::error =>
cb(sock, err);
cb(sock, err)?;
case let n: size =>
cb(sock, (n, src, port));
cb(sock, (n, src, port))?;
}; }; fn recvfrom_cancel(req: *req) void = { const sock = req.user: *file; assert(sock.op == op::RECVFROM); sock.op = op::NONE; file_epoll_ctl(sock); }; // Callback for a [[recv]] operation.
export type recvcb = fn(file: *file, r: (size | net::error)) void;
export type recvcb = fn(file: *file, r: (size | net::error)) (void | nomem);
// Schedules a receive operation on a (connected) socket. export fn recv( sock: *file, cb: *recvcb, buf: []u8, ) req = { assert(sock.op == op::NONE); sock.op = op::RECV; sock.cb = cb; sock.sendrecv.rbuf = buf; file_epoll_ctl(sock); return mkreq(&recv_cancel, sock); }; fn recv_ready( sock: *file, ev: *rt::epoll_event,
) void = {
) (void | nomem) = {
assert(sock.op == op::RECV); assert(sock.cb != null); const cb = sock.cb: *recvcb; sock.op = op::NONE; file_epoll_ctl(sock); const r = udp::recv(sock.fd, sock.sendrecv.rbuf);
cb(sock, r);
cb(sock, r)?;
}; fn recv_cancel(req: *req) void = { const sock = req.user: *file; assert(sock.op == op::RECV); sock.op = op::NONE; file_epoll_ctl(sock); }; // Callback for a [[send]] or [[sendto]] operation.
export type sendtocb = fn(file: *file, r: (size | net::error)) void;
export type sendtocb = fn(file: *file, r: (size | net::error)) (void | nomem);
// Schedules a send operation on a (connected) socket. export fn send( sock: *file, cb: *sendtocb, buf: []u8, ) req = { assert(sock.op == op::NONE); sock.op = op::SEND; sock.cb = cb; sock.sendrecv.sbuf = buf; file_epoll_ctl(sock); return mkreq(&send_cancel, sock); }; fn send_ready( sock: *file, ev: *rt::epoll_event,
) void = {
) (void | nomem) = {
assert(sock.op == op::SEND); assert(sock.cb != null); const cb = sock.cb: *sendtocb; sock.op = op::NONE; file_epoll_ctl(sock); const r = udp::send(sock.fd, sock.sendrecv.sbuf);
cb(sock, r);
cb(sock, r)?;
}; fn send_cancel(req: *req) void = { const sock = req.user: *file; assert(sock.op == op::SEND); sock.op = op::NONE; file_epoll_ctl(sock); }; // Schedules a send operation on a socket. export fn sendto( sock: *file, cb: *sendtocb, buf: []u8, dest: ip::addr, port: u16, ) req = { assert(sock.op == op::NONE); sock.op = op::SENDTO; sock.cb = cb; sock.sendrecv.sbuf = buf; sock.sendrecv.dest = dest; sock.sendrecv.port = port; file_epoll_ctl(sock); return mkreq(&sendto_cancel, sock); }; fn sendto_ready( sock: *file, ev: *rt::epoll_event,
) void = {
) (void | nomem) = {
assert(sock.op == op::SENDTO); assert(sock.cb != null); const cb = sock.cb: *sendtocb; sock.op = op::NONE; file_epoll_ctl(sock); const r = udp::sendto( sock.fd, sock.sendrecv.sbuf, sock.sendrecv.dest, sock.sendrecv.port, );
cb(sock, r);
cb(sock, r)?;
}; fn sendto_cancel(req: *req) void = { const sock = req.user: *file; assert(sock.op == op::SENDTO); sock.op = op::NONE; file_epoll_ctl(sock); };
use errors; use io; use rt; use time; // A callback which executes when a timer expires.
export type timercb = fn(file: *file) void;
export type timercb = fn(file: *file) (void | nomem);
// Creates a new timer. By default, this timer never expires; configure it with // [[timer_configure]]. export fn newtimer( loop: *loop, cb: *timercb, clock: time::clock, ) (*file | errors::error | nomem) = { const fd = match (rt::timerfd_create(clock, rt::TFD_NONBLOCK | rt::TFD_CLOEXEC)) { case let fd: int => yield fd: io::file; case let errno: rt::errno => return errors::errno(errno); }; const file = register(loop, fd)?; file.op = op::TIMER; file.cb = cb; file_epoll_ctl(file); return file; }; // Starts a timer created with [[newtimer]] to expire after the given "delay" // and indefinitely thereafter following each interval of "repeat". Setting both // values to zero disarms the timer; setting either value non-zero arms the // timer. export fn timer_configure( timer: *file, delay: time::duration, repeat: time::duration, ) void = { assert(timer.op == op::TIMER); let spec = rt::itimerspec { it_value = time::duration_to_timespec(delay), it_interval = time::duration_to_timespec(repeat), ... }; rt::timerfd_settime(timer.fd, 0, &spec, null)!; };
fn timer_ready(timer: *file, ev: *rt::epoll_event) void = {
fn timer_ready(timer: *file, ev: *rt::epoll_event) (void | nomem) = {
assert(timer.op == op::TIMER); let buf: [8]u8 = [0...]; match (io::read(timer.fd, buf)) { case errors::again => // This can occur if the timer was reconfigured in an event // handler while the expiration event was pending in the same // dispatch of the event loop. Just discard the event in this // case. return; case (io::EOF | io::error) => abort(); case => void; }; assert(timer.cb != null); const cb = timer.cb: *timercb;
cb(timer);
cb(timer)?;
};
use bufio; use errors; use ev::dial; use ev; use io; use memio; use net::http; use os; use types; // A callback for an [[http_do]] operation. export type http_donecb = fn( user: nullable *opaque, r: ( (*ev::file, http::response) | dial::error | io::EOF | io::error | http::protoerr | errors::unsupported ) ) void; export fn http_do( loop: *ev::loop, reqcli: *http::client, req: *http::request, cb: *http_donecb, user: nullable *opaque
) (ev::req | dial::error | io::error) = {
) (ev::req | dial::error | io::error | nomem) = {
const cli = alloc(client { cb = cb, user = user, buf = memio::dynamic(), ...
});
})?;
http::request_write_internal(&cli.buf, req, reqcli)?; const req = dial::dial_uri( loop, "tcp", req.target, &http_dialed, cli, )?; cli.req = req; return ev::mkreq(&http_do_cancel, cli); }; export fn http_send( file: *ev::file, reqcli: *http::client, req: *http::request, cb: *http_donecb, ) (void | io::error) = { const cli = ev::getuser(file): *client; http::request_write_internal(&cli.buf, req, reqcli)?; cli.wbuf = memio::buffer(&cli.buf: *memio::stream); ev::write(file, &http_write, cli.wbuf); }; fn http_do_cancel(req: *ev::req) void = { let cli = req.user: *client; ev::cancel(&cli.req); client_close(cli); };
fn http_dialed(user: nullable *opaque, r: (*ev::file | dial::error)) void = {
fn http_dialed(user: nullable *opaque, r: (*ev::file | dial::error)) (void | nomem) = {
const cli = user: *client; cli.req = ev::req { ... }; const file = match (r) { case let file: *ev::file => yield file; case let err: dial::error => const cb = cli.cb: *http_donecb; cb(cli.user, err); client_close(cli); return; }; ev::setuser(file, cli); cli.wbuf = memio::buffer(&cli.buf: *memio::stream); ev::write(file, &http_write, cli.wbuf); };
fn http_write(sock: *ev::file, r: (size | io::error)) void = {
fn http_write(sock: *ev::file, r: (size | io::error)) (void | nomem) = {
const cli = ev::getuser(sock): *client; const n = match (r) { case let err: io::error => const cb = cli.cb: *http_donecb; cb(cli.user, err); client_close(cli); return; case let n: size => yield n; }; static delete(cli.wbuf[..n]); if (len(cli.wbuf) != 0) { ev::write(sock, &http_write, cli.wbuf); return; }; memio::reset(&cli.buf); ev::read(sock, &http_read, cli.rbuf); };
fn http_read(sock: *ev::file, r: (size | io::EOF | io::error)) void = {
fn http_read(sock: *ev::file, r: (size | io::EOF | io::error)) (void | nomem) = {
const cli = ev::getuser(sock): *client; const cb = cli.cb: *http_donecb; const n = match (r) { case let err: io::error => cb(cli.user, err); client_close(cli); return; case io::EOF => cb(cli.user, io::EOF); client_close(cli); return; case let n: size => yield n; }; io::seek(&cli.buf, 0, io::whence::END)!; io::write(&cli.buf, cli.rbuf[..n])!; io::seek(&cli.buf, 0, io::whence::SET)!; let scan = bufio::newscanner(&cli.buf, types::SIZE_MAX); defer bufio::finish(&scan); let resp = match (http::response_scan(&scan)) { case let resp: http::response => yield resp; case io::EOF => ev::read(sock, &http_read, cli.rbuf); return; case let err: (http::protoerr | errors::unsupported | io::error) => cb(cli.user, err); client_close(cli); return; }; defer http::parsed_response_finish(&resp); memio::reset(&cli.buf); cb(cli.user, (sock, resp)); }; fn client_close(cli: *client) void = { if (!(cli.sock is null)) { ev::close(cli.sock as *ev::file); }; io::close(&cli.buf)!; free(cli); };
use ev; use fmt; use net; use net::dial; use net::ip; use net::uri; // Callback for a [[dial]] operation.
export type dialcb = fn(user: nullable *opaque, r: (*ev::file | error)) void;
export type dialcb = fn(user: nullable *opaque, r: (*ev::file | error)) (void | nomem);
// Dials a remote address, establishing a connection and returning the resulting // [[net::socket]] to the callback. The proto parameter should be the transport // protocol (e.g. "tcp"), the address parameter should be the remote address, // and the service should be the name of the service, or the default port to // use. // // See also [[net::dial::dial]]. export fn dial( loop: *ev::loop, proto: str, address: str, service: str, cb: *dialcb, user: nullable *opaque = null, ) (ev::req | error) = { for (const p .. default_protocols) { if (p.name == proto) { return p.dial(loop, address, service, cb, user); }; }; for (const p .. protocols) { if (p.name == proto) { return p.dial(loop, address, service, cb, user); }; }; return net::unknownproto: net::error; }; def HOST_MAX: size = 255; // Performs a [[dial]] operation for a given URI, taking the service name from // the URI scheme and forming an address from the URI host and port. // // See also [[net::dial::uri]]. export fn dial_uri( loop: *ev::loop, proto: str, uri: *uri::uri, cb: *dialcb, user: nullable *opaque = null, ) (ev::req | error) = { if (uri.host is str && len(uri.host as str) > HOST_MAX) { return invalid_address; }; static let addr: [HOST_MAX + len("[]:65535")]u8 = [0...]; const colon = if (uri.port != 0) ":" else ""; const port: fmt::formattable = if (uri.port != 0) uri.port else ""; let addr = match (uri.host) { case let host: str =>
yield fmt::bsprintf(addr, "{}{}{}", host, colon, port);
yield fmt::bsprintf(addr, "{}{}{}", host, colon, port)?;
case let ip: ip::addr4 => const host = ip::string(ip);
yield fmt::bsprintf(addr, "{}{}{}", host, colon, port);
yield fmt::bsprintf(addr, "{}{}{}", host, colon, port)?;
case let ip: ip::addr6 => const host = ip::string(ip);
yield fmt::bsprintf(addr, "[{}]{}{}", host, colon, port);
yield fmt::bsprintf(addr, "[{}]{}{}", host, colon, port)?;
}; return dial(loop, proto, addr, uri.scheme, cb, user); };
// License: MPL-2.0 // (c) 2021-2023 Drew DeVault <sir@cmpwn.com> // (c) 2021 Bor Grošelj Simić <bor.groseljsimic@telemach.net> // (c) 2021 Ember Sawady <ecs@d2evs.net> // // Provides default dialers for tcp and udp use errors; use ev; use math::random; use net::ip; use net::tcp; use net::udp; use net; use time; type tcp_dialer = struct { loop: *ev::loop, cb: *dialcb, user: nullable *opaque, req: ev::req, ip: []ip::addr, n: uint, port: u16, }; fn dial_tcp( loop: *ev::loop, addr: str, service: str, cb: *dialcb, user: nullable *opaque, ) (ev::req | error) = { let state = alloc(tcp_dialer { loop = loop, cb = cb, user = user, ... })?; match (resolve(loop, "tcp", addr, service, &dial_tcp_resolvecb, state)) { case let req: ev::req => state.req = req; return ev::mkreq(&dial_tcp_cancel, state); case let e: error => free(state); return e; }; }; fn dial_tcp_resolvecb( user: nullable *opaque, r: (([]ip::addr, u16) | error),
) void = {
) (void | nomem) = {
let state = user: *tcp_dialer; state.req = ev::req { ... }; const (ip, port) = match (r) { case let r: ([]ip::addr, u16) => yield r; case let err: error =>
dial_tcp_complete(state, err);
dial_tcp_complete(state, err)?;
return; }; state.ip = ip; state.port = port;
dial_tcp_connect(state);
dial_tcp_connect(state)?;
};
fn dial_tcp_connect(state: *tcp_dialer) void = {
fn dial_tcp_connect(state: *tcp_dialer) (void | nomem) = {
const req = match (ev::connect_tcp(state.loop, &dial_tcp_connectcb, state.ip[state.n], state.port, state)) { case let err: (net::error | errors::error) =>
dial_tcp_complete(state, err);
dial_tcp_complete(state, err)?;
return; case let req: ev::req => yield req; }; state.req = req; }; fn dial_tcp_connectcb( r: (*ev::file | net::error), user: nullable *opaque,
) void = {
) (void | nomem) = {
let state = user: *tcp_dialer; match (r) { case let sock: *ev::file => ev::setuser(sock, null);
dial_tcp_complete(state, sock);
dial_tcp_complete(state, sock)?;
return; case let err: net::error => if (err is errors::netunreachable) { state.n += 1; if (state.n < len(state.ip)) {
dial_tcp_connect(state);
dial_tcp_connect(state)?;
return; }; };
dial_tcp_complete(state, err);
dial_tcp_complete(state, err)?;
}; }; fn dial_tcp_cancel(req: *ev::req) void = { let state = req.user: *tcp_dialer; ev::cancel(&state.req); free(state.ip); free(state); };
fn dial_tcp_complete(state: *tcp_dialer, r: (*ev::file | error)) void = {
fn dial_tcp_complete(state: *tcp_dialer, r: (*ev::file | error)) (void | nomem) = {
const cb = state.cb; const user = state.user; free(state.ip); free(state);
cb(user, r);
cb(user, r)?;
}; type udp_dialer = struct { loop: *ev::loop, cb: *dialcb, user: nullable *opaque, req: ev::req, ip: []ip::addr, n: uint, port: u16, }; fn dial_udp( loop: *ev::loop, addr: str, service: str, cb: *dialcb, user: nullable *opaque, ) (ev::req | error) = { let state = alloc(udp_dialer { loop = loop, cb = cb, user = user, ... })?; match (resolve(loop, "udp", addr, service, &dial_udp_resolvecb, state)) { case let req: ev::req => state.req = req; return ev::mkreq(&dial_udp_cancel, state); case let e: error => free(state); return e; }; }; fn dial_udp_resolvecb( user: nullable *opaque, r: (([]ip::addr, u16) | error),
) void = {
) (void | nomem) = {
let state = user: *udp_dialer; state.req = ev::req { ... }; const (ip, port) = match (r) { case let r: ([]ip::addr, u16) => yield r; case let err: error =>
dial_udp_complete(state, err);
dial_udp_complete(state, err)?;
return; }; state.ip = ip; state.port = port;
dial_udp_connect(state);
dial_udp_connect(state)?;
};
fn dial_udp_connect(state: *udp_dialer) void = {
fn dial_udp_connect(state: *udp_dialer) (void | nomem) = {
for (true) { match (ev::connect_udp(state.loop, state.ip[state.n], state.port)) { case let err: net::error => if (err is errors::netunreachable) { state.n += 1; if (state.n < len(state.ip)) { continue; }; };
dial_udp_complete(state, err);
dial_udp_complete(state, err)?;
break; case let err: errors::error =>
dial_udp_complete(state, err);
dial_udp_complete(state, err)?;
if (err is errors::netunreachable) { state.n += 1; if (state.n < len(state.ip)) { continue; }; }; break; case let sock: *ev::file =>
dial_udp_complete(state, sock);
dial_udp_complete(state, sock)?;
break; }; }; }; fn dial_udp_cancel(req: *ev::req) void = { let state = req.user: *udp_dialer; ev::cancel(&state.req); free(state.ip); free(state); };
fn dial_udp_complete(state: *udp_dialer, r: (*ev::file | error)) void = {
fn dial_udp_complete(state: *udp_dialer, r: (*ev::file | error)) (void | nomem) = {
const cb = state.cb; const user = state.user; free(state.ip); free(state);
cb(user, r);
cb(user, r)?;
};
use crypto::random; use errors; use ev; use edns = ev::dns; use net; use net::ip; use net::dial; use net::dns; use unix::hosts; // Callback from a [[resolve]] operation. export type resolvecb = fn( user: nullable *opaque, r: (([]ip::addr, u16) | error),
) void;
) (void | nomem);
type resolve_state = struct { user: nullable *opaque, cb: *resolvecb, r4: ev::req, r6: ev::req, nq: uint, ip: []ip::addr, port: u16, }; // Performs DNS resolution on a given address string for a given service, // including /etc/hosts lookup and SRV resolution, and returns a list of // candidate IP addresses and the appropriate port, or an error, to the // callback. // // The caller must free the [[net::ip::addr]] slice. export fn resolve( loop: *ev::loop, proto: str, addr: str, service: str, cb: *resolvecb, user: nullable *opaque = null
) (ev::req | error) = {
) (ev::req | error | nomem) = {
// TODO: Reduce duplication with net::dial let state = alloc(resolve_state { cb = cb, user = user, ... })?; const (addr, port) = match (dial::splitaddr(addr, service)) { case let svc: (str, u16) => yield svc; case dial::invalid_address =>
resolve_finish(state, invalid_address);
resolve_finish(state, invalid_address)?;
return ev::req { ... }; }; if (service == "unknown" && port == 0) {
resolve_finish(state, unknown_service);
resolve_finish(state, unknown_service)?;
return ev::req { ... }; }; if (port == 0) { match (lookup_service(proto, service)) { case let p: u16 => port = p; case void => yield; }; }; // TODO: // - Consult /etc/services // - Fetch the SRV record if (port == 0) {
resolve_finish(state, unknown_service);
resolve_finish(state, unknown_service)?;
return ev::req { ... }; }; match (ip::parse(addr)) { case let addr: ip::addr => let addrs: []ip::addr = []; match (append(addrs, addr)) { case void =>
resolve_finish(state, (addrs, port));
resolve_finish(state, (addrs, port))?;
return ev::req { ... }; case nomem =>
resolve_finish(state, nomem);
resolve_finish(state, nomem)?;
return nomem; }; case ip::invalid => yield; }; const addrs = match (hosts::lookup(addr)) { case let addrs: []ip::addr => yield addrs; case let e: hosts::error =>
resolve_finish(state, nomem);
resolve_finish(state, nomem)?;
return e; }; if (len(addrs) != 0) {
resolve_finish(state, (addrs, port));
resolve_finish(state, (addrs, port))?;
return ev::req { ... }; }; state.port = port; return resolve_dns(state, loop, addr); }; fn resolve_dns( state: *resolve_state, loop: *ev::loop, addr: str, ) (ev::req | error) = { const domain = dns::parse_domain(addr); defer free(domain); let rand: []u8 = [0, 0]; random::buffer(rand); let id = *(&rand[0]: *u16); const query6 = dns::message { header = dns::header { id = id, op = dns::op { qr = dns::qr::QUERY, opcode = dns::opcode::QUERY, rd = true, ... }, qdcount = 1, ... }, questions = [ dns::question { qname = domain, qtype = dns::qtype::AAAA, qclass = dns::qclass::IN, }, ], ... }; const query4 = dns::message { header = dns::header { id = id + 1, op = dns::op { qr = dns::qr::QUERY, opcode = dns::opcode::QUERY, rd = true, ... }, qdcount = 1, ... }, questions = [ dns::question { qname = domain, qtype = dns::qtype::A, qclass = dns::qclass::IN, }, ], ... }; let ok = false; state.r6 = edns::query(loop, &query6, &query_cb_v6, state)?; defer if(!ok) ev::cancel(&state.r6); state.r4 = edns::query(loop, &query4, &query_cb_v4, state)?; ok = true; return ev::mkreq(&resolve_cancel, state); };
fn resolve_finish(st: *resolve_state, r: (([]ip::addr, u16) | error)) void = {
fn resolve_finish(st: *resolve_state, r: (([]ip::addr, u16) | error)) (void | nomem) = {
const user = st.user; const cb = st.cb; if (r is error) { free(st.ip); }; free(st);
cb(user, r);
cb(user, r)?;
}; fn resolve_cancel(req: *ev::req) void = { const state = req.user: *resolve_state; ev::cancel(&state.r4); ev::cancel(&state.r6); free(state.ip); free(state); };
fn query_cb_v4(user: nullable *opaque, r: (*dns::message | dns::error | nomem)) void = {
fn query_cb_v4(user: nullable *opaque, r: (*dns::message | dns::error | nomem)) (void | nomem) = {
let state = user: *resolve_state; state.r4 = ev::req { ... }; match (r) { case let err: dns::error => ev::cancel(&state.r6);
resolve_finish(state, err);
resolve_finish(state, err)?;
return; case nomem => ev::cancel(&state.r6);
resolve_finish(state, nomem);
resolve_finish(state, nomem)?;
return; case let msg: *dns::message => match (collect_answers(&state.ip, &msg.answers)) { case void => void; case nomem => ev::cancel(&state.r6);
resolve_finish(state, nomem);
resolve_finish(state, nomem)?;
return; }; state.nq += 1; }; if (state.nq < 2) { return; };
resolve_finish(state, (state.ip, state.port));
resolve_finish(state, (state.ip, state.port))?;
};
fn query_cb_v6(user: nullable *opaque, r: (*dns::message | dns::error | nomem)) void = {
fn query_cb_v6(user: nullable *opaque, r: (*dns::message | dns::error | nomem)) (void | nomem) = {
let state = user: *resolve_state; state.r6 = ev::req { ... }; match (r) { case let err: dns::error => ev::cancel(&state.r4);
resolve_finish(state, err);
resolve_finish(state, err)?;
return; case nomem => ev::cancel(&state.r4);
resolve_finish(state, nomem);
resolve_finish(state, nomem)?;
return; case let msg: *dns::message => match (collect_answers(&state.ip, &msg.answers)) { case void => void; case nomem => ev::cancel(&state.r4);
resolve_finish(state, nomem);
resolve_finish(state, nomem)?;
return; }; state.nq += 1; }; if (state.nq < 2) { return; };
resolve_finish(state, (state.ip, state.port));
resolve_finish(state, (state.ip, state.port))?;
}; fn collect_answers(addrs: *[]ip::addr, answers: *[]dns::rrecord) (void | nomem) = { for (let answer &.. answers) { match (answer.rdata) { case let addr: dns::aaaa => append(addrs, addr: ip::addr)?; case let addr: dns::a => append(addrs, addr: ip::addr)?; case => void; }; }; };
use endian; use errors; use ev; use io; use net; use net::dns; use net::ip; use net::udp; use time; use types; use unix::resolvconf; // TODO: Let users customize this? def TIMEOUT: time::duration = 3 * time::SECOND; // Callback for a [[query]] operation. export type querycb = fn( user: nullable *opaque, r: (*dns::message | dns::error | nomem),
) void;
) (void | nomem);
type qstate = struct { // Event loop objects loop: *ev::loop, socket4: *ev::file, socket6: *ev::file, r4: ev::req, r6: ev::req, timer: *ev::file, // Request ID rid: u16, // Outgoing DNS request query: [512]u8, qlen: u16, // Response buffer rbuf: []u8, rbuf_valid: u16, // Length buffer zbuf: [2]u8, // Callback and user data cb: *querycb, user: nullable *opaque, }; // Performs a DNS query against the provided set of DNS servers, or the list of // servers from /etc/resolv.conf if none are specified. The DNS message passed // to the callback is only valid for the duration of the callback. export fn query( loop: *ev::loop, query: *dns::message, cb: *querycb, user: nullable *opaque, servers: ip::addr... ) (ev::req | nomem | dns::error | net::error | errors::error) = { if (len(servers) == 0) { const rconf = resolvconf::load(); servers = rconf.nameservers; }; if (len(servers) == 0) { // Fall back to localhost servers = [ip::LOCAL_V6, ip::LOCAL_V4]; }; let stateok = false; const socket4 = ev::listen_udp(loop, ip::ANY_V4, 0)?; defer if (!stateok) ev::close(socket4); const socket6 = ev::listen_udp(loop, ip::ANY_V6, 0)?; defer if (!stateok) ev::close(socket6); const timeout = ev::newtimer(loop, &timeoutcb, time::clock::MONOTONIC)?; defer if (!stateok) ev::close(timeout); let rbuf: []u8 = alloc([0...], 512)?; defer if (!stateok) free(rbuf); let state = alloc(qstate { loop = loop, socket4 = socket4, socket6 = socket6, timer = timeout, rid = query.header.id, cb = cb, rbuf = rbuf, user = user, ... })?; defer if (!stateok) free(state); state.qlen = dns::encode(state.query, query)?: u16; ev::setuser(socket4, state); ev::setuser(socket6, state); ev::setuser(timeout, state); ev::timer_configure(timeout, TIMEOUT, 0); // Note: the initial set of requests is sent directly through net::udp // as it is assumed they can fit into the kernel's internal send buffer // and will finish without blocking const buf = state.query[..state.qlen]; for (const server .. servers) { match (server) { case ip::addr4 => udp::sendto(ev::getfd(socket4), buf, server, 53)?; case ip::addr6 => udp::sendto(ev::getfd(socket6), buf, server, 53)?; }; }; state.r4 = ev::recvfrom(socket4, &qrecvcb, state.rbuf); state.r6 = ev::recvfrom(socket6, &qrecvcb, state.rbuf); stateok = true; return ev::mkreq(&query_cancel, state); }; fn query_cancel(req: *ev::req) void = { const q = req.user: *qstate; query_destroy(q); }; fn query_destroy(q: *qstate) void = { ev::cancel(&q.r4); ev::cancel(&q.r6); ev::close(q.socket4); ev::close(q.socket6); ev::close(q.timer); free(q.rbuf); free(q); };
fn query_complete(q: *qstate, r: (*dns::message | dns::error | nomem)) void = {
fn query_complete(q: *qstate, r: (*dns::message | dns::error | nomem)) (void | nomem) = {
const cb = q.cb; const user = q.user;
cb(user, r);
cb(user, r)?;
match (r) { case let msg: *dns::message => dns::message_free(msg); case => void; }; query_destroy(q); };
fn timeoutcb(file: *ev::file) void = {
fn timeoutcb(file: *ev::file) (void | nomem) = {
const q = ev::getuser(file): *qstate;
query_complete(q, errors::timeout);
query_complete(q, errors::timeout)?;
};
fn qrecvcb(file: *ev::file, r: ((size, ip::addr, u16) | net::error)) void = {
fn qrecvcb(file: *ev::file, r: ((size, ip::addr, u16) | net::error)) (void | nomem) = {
const q = ev::getuser(file): *qstate; match (qrecv(q, file, r)) { case void => void; case let r: (*dns::message | dns::error) =>
query_complete(q, r);
query_complete(q, r)?;
}; }; fn qrecv( q: *qstate, file: *ev::file, r: ((size, ip::addr, u16) | net::error), ) (*dns::message | dns::error | void) = { let req: *ev::req = if (file == q.socket4) &q.r4 else &q.r6; *req = ev::req { ... }; const (z, addr, port) = match (r) { case let r: (size, ip::addr, u16) => yield r; case let err: net::error =>
query_complete(q, err);
query_complete(q, err)?;
return; }; const resp = match (dns::decode(q.rbuf[..z])) { case dns::format => *req = ev::recvfrom(file, &qrecvcb, q.rbuf); return; case let msg: *dns::message => yield msg; }; if (resp.header.id != q.rid || resp.header.op.qr != dns::qr::RESPONSE) { *req = ev::recvfrom(file, &qrecvcb, q.rbuf); dns::message_free(resp); return; }; if (!resp.header.op.tc) { return resp; }; dns::message_free(resp); // Reponse truncated, retry over TCP // // Note that when we switch to TCP, we only use the r4 field for // in-flight requests (even if we're using IPv6), and likewise once the // TCP connection is estabilshed the UDP socket at socket4 is closed and // replaced with the TCP socket (regardless of domain). // Cancel in-flight UDP queries ev::cancel(&q.r4); ev::cancel(&q.r6); match (ev::connect_tcp(q.loop, &qconnected, addr, 53, q)) { case let req: ev::req => q.r4 = req; case let err: net::error => return err; case let err: errors::error => return err: net::error; }; };
fn qconnected(result: (*ev::file | net::error), user: nullable *opaque) void = {
fn qconnected(result: (*ev::file | net::error), user: nullable *opaque) (void | nomem) = {
const q = user: *qstate; q.r4 = ev::req { ... }; const sock = match (result) { case let file: *ev::file => yield file; case let err: net::error =>
query_complete(q, err);
query_complete(q, err)?;
return; }; ev::close(q.socket4); q.socket4 = sock; endian::beputu16(q.zbuf, q.qlen); q.r4 = ev::writev(sock, &qtcp_write_cb, io::mkvector(q.zbuf), io::mkvector(q.query[..q.qlen])); };
fn qtcp_write_cb(file: *ev::file, result: (size | io::error)) void = {
fn qtcp_write_cb(file: *ev::file, result: (size | io::error)) (void | nomem) = {
const q = ev::getuser(file): *qstate; q.r4 = ev::req { ... }; match (result) { case let z: size => // XXX: some (stupid) configurations may have a TCP buffer less // than 514 bytes, which we might want to handle, but generally // the request should make it to the TCP buffer in a single // writev call. assert(z: u16 == q.qlen + 2); case let err: io::error =>
query_complete(q, err);
query_complete(q, err)?;
}; q.r4 = ev::read(file, &qtcp_readlength_cb, q.zbuf); }; fn qtcp_readlength_cb( file: *ev::file, result: (size | io::EOF | io::error),
) void = {
) (void | nomem) = {
const q = ev::getuser(file): *qstate; match (result) { case let z: size => if (z != 2) {
query_complete(q, dns::format);
query_complete(q, dns::format)?;
return; }; case let err: io::error =>
query_complete(q, err);
query_complete(q, err)?;
return; case io::EOF =>
query_complete(q, dns::format);
query_complete(q, dns::format)?;
return; }; const rlen = endian::begetu16(q.zbuf); q.rid = rlen; q.rbuf = match (alloc([0u8...], rlen)) { case let rbuf: []u8 => yield rbuf; case nomem =>
query_complete(q, nomem);
query_complete(q, nomem)?;
return; }; q.r4 = ev::read(file, &qtcp_readdata_cb, q.rbuf); }; fn qtcp_readdata_cb( file: *ev::file, result: (size | io::EOF | io::error),
) void = {
) (void | nomem) = {
const q = ev::getuser(file): *qstate; q.r4 = ev::req { ... }; match (result) { case let z: size => const rlen = z: u16; if (q.rbuf_valid + rlen > q.rid) {
query_complete(q, dns::format);
query_complete(q, dns::format)?;
return; }; q.rbuf_valid += rlen; case io::EOF => return; }; if (q.rbuf_valid < q.rid) { // Read more data from the socket q.r4 = ev::read(file, &qtcp_readdata_cb, q.rbuf[q.rbuf_valid..]); return; }; const resp = match (dns::decode(q.rbuf[..q.rbuf_valid])) { case dns::format =>
query_complete(q, dns::format);
query_complete(q, dns::format)?;
return; case let msg: *dns::message => yield msg; };
query_complete(q, resp);
query_complete(q, resp)?;
};
use ev; use bufio; use errors; use io; use memio; use net::http; use net::ip; use net::tcp; use net; use strings; use types; // A callback for an [[http_serve]] operation. export type http_servecb = fn(user: nullable *opaque, request: http::request, rw: http::response_writer) void; // Schedules an http serve operation on a socket. The user must free resources // when done with [[server_finish]] export fn http_serve( sock: *ev::file, cb: *http_servecb, user: nullable *opaque,
) *server = { const serv = newserver(sock, cb, user);
) (*server | nomem) = { const serv = newserver(sock, cb, user)?;
ev::setuser(sock, serv); ev::accept(sock, &http_accept); return serv; };
fn http_accept(sock: *ev::file, r: (*ev::file | net::error)) void = {
fn http_accept(sock: *ev::file, r: (*ev::file | net::error)) (void | nomem) = {
const server = ev::getuser(sock): *server; const sock = match (r) { case let sock: *ev::file => yield sock; case let err: net::error => // TODO handle it return; }; const client = newclient( server, sock,
);
)?;
ev::setuser(client.sock, client); ev::read(client.sock, &http_read, client.rbuf);
append(server.clients, client);
append(server.clients, client)?;
ev::accept(server.sock, &http_accept); };
fn http_read(sock: *ev::file, r: (size | io::EOF | io::error)) void = {
fn http_read(sock: *ev::file, r: (size | io::EOF | io::error)) (void | nomem) = {
const client = ev::getuser(sock): *server_client; const n = match (r) { case let err: io::error => client_close(client); return; case io::EOF => client_close(client); return; case let n: size => yield n; }; io::seek(&client.buf, 0, io::whence::END)!; io::write(&client.buf, client.rbuf[..n])!; io::seek(&client.buf, 0, io::whence::SET)!; let scan = bufio::newscanner(&client.buf, types::SIZE_MAX); defer bufio::finish(&scan); let req = match (http::request_scan(&scan)) { case let req: http::request => yield req; case io::EOF => ev::read(client.sock, &http_read, client.rbuf); return; case let err: (http::protoerr | errors::unsupported | io::error) => client_close(client); return; }; defer http::parsed_request_finish(&req); memio::reset(&client.buf); const resp = http::response { version = (1, 1), status = http::STATUS_OK,
reason = strings::dup(http::status_reason(http::STATUS_OK)),
reason = strings::dup(http::status_reason(http::STATUS_OK))?,
header = [], body = io::empty, }; defer io::close(resp.body)!; const cb = client.server.cb: *http_servecb; cb(client.server.user, req, http::response_writer { sock = ev::getfd(client.sock), resp = resp, writeto = &client.buf, }); client.wbuf = memio::buffer(&client.buf); ev::write(client.sock, &http_write, client.wbuf); };
fn http_write(sock: *ev::file, r: (size | io::error)) void = {
fn http_write(sock: *ev::file, r: (size | io::error)) (void | nomem) = {
const client = ev::getuser(sock): *server_client; const n = match (r) { case let err: io::error => client_close(client); return; case let n: size => yield n; }; static delete(client.wbuf[..n]); if (len(client.wbuf) != 0) { ev::write(client.sock, &http_write, client.wbuf); return; }; memio::reset(&client.buf); ev::read(client.sock, &http_read, client.rbuf); };
use ev; use io; use memio; use os; export type server = struct { sock: *ev::file, clients: []*server_client, cb: *opaque, user: nullable *opaque }; export type server_client = struct { sock: *ev::file, server: *server, rbuf: [os::BUFSZ]u8, wbuf: []u8, buf: memio::stream, };
export fn newserver(sock: *ev::file, cb: *opaque, user: nullable *opaque) *server = {
export fn newserver(sock: *ev::file, cb: *opaque, user: nullable *opaque) (*server | nomem) = {
return alloc(server { sock = sock, cb = cb, user = user, ... }); }; export fn server_finish(serv: *server) void = { for (let i = 0z; i < len(serv.clients); i += 1) { client_close(serv.clients[i]); i -= 1; }; free(serv); };
export fn newclient(serv: *server, sock: *ev::file) *server_client = {
export fn newclient(serv: *server, sock: *ev::file) (*server_client | nomem) = {
return alloc(server_client { server = serv, sock = sock, buf = memio::dynamic(), ... }); }; export fn client_close(client: *server_client) void = { const server = client.server; for (let i = 0z; i < len(server.clients); i += 1) { if (server.clients[i] == client) { delete(server.clients[i]); break; }; }; ev::close(client.sock); io::close(&client.buf)!; free(client); };