From 0e32ae366130bf1c611e444eb8cadf63aaf10210 Mon Sep 17 00:00:00 2001 From: Drew DeVault Date: Fri, 16 Dec 2022 22:20:35 +0100 Subject: [PATCH] ev::send, ev::recv This still needs to be updated to support send & receive in parallel --- cmd/udpclient/main.ha | 89 +++++++++++++++++++++++++++++++++++++++++++++++++++++ ev/+linux/file.ha | 16 +++++++++------- ev/+linux/loop.ha | 8 ++++++-- ev/+linux/socket.ha | 86 +++++++++++++++++++++++++++++++++++++++++++++-------- diff --git a/cmd/udpclient/main.ha b/cmd/udpclient/main.ha new file mode 100644 index 0000000000000000000000000000000000000000..7f993f1c797749769a2e26a8af8af4e5555005ca --- /dev/null +++ b/cmd/udpclient/main.ha @@ -0,0 +1,89 @@ +// TODO: Update me when we can send/recv in parallel +use ev; +use fmt; +use io; +use net::ip; +use net; +use os; + +type state = struct { + loop: *ev::loop, + stdin: *ev::file, + stdout: *ev::file, + sock: *ev::file, + stdbuf: [os::BUFSIZ]u8, + netbuf: [os::BUFSIZ]u8, + wbuf: []u8, +}; + +export fn main() void = { + const loop = ev::newloop()!; + defer ev::finish(&loop); + + const sock = ev::connect_udp(&loop, ip::LOCAL_V4, 12345)!; + defer ev::close(sock); + + let state = state { + loop = &loop, + stdin = ev::register(&loop, os::stdin_file)!, + stdout = ev::register(&loop, os::stdout_file)!, + sock = sock, + ... + }; + ev::setuser(state.stdin, &state); + ev::setuser(state.stdout, &state); + ev::setuser(state.sock, &state); + + ev::readable(state.stdin, &stdin_readable); + + for (ev::dispatch(&loop, -1)!) void; +}; + +fn stdin_readable(file: *ev::file) void = { + const state = ev::getuser(file): *state; + const n = match (io::read(os::stdin_file, state.stdbuf)) { + case let n: size => + yield n; + case io::EOF => + ev::stop(state.loop); + return; + case let err: io::error => + fmt::errorln("Error: read:", io::strerror(err))!; + ev::stop(state.loop); + return; + }; + state.wbuf = state.stdbuf[..n]; + ev::send(state.sock, &sock_send, state.wbuf); +}; + +fn sock_recv(file: *ev::file, r: (size | net::error)) void = { + const state = ev::getuser(file): *state; + const n = match (r) { + case let n: size => + yield n; + case let err: net::error => + fmt::errorln("Error: recv:", net::strerror(err))!; + ev::stop(state.loop); + return; + }; + io::write(os::stdout_file, state.netbuf[..n])!; +}; + +fn sock_send(file: *ev::file, r: (size | net::error)) void = { + const state = ev::getuser(file): *state; + const n = match (r) { + case let n: size => + yield n; + case let err: net::error => + fmt::errorln("Error: send:", net::strerror(err))!; + ev::stop(state.loop); + return; + }; + static delete(state.wbuf[..n]); + if (len(state.wbuf) != 0) { + ev::send(state.sock, &sock_send, state.wbuf); + } else { + ev::recv(state.sock, &sock_recv, state.netbuf); + ev::readable(state.stdin, &stdin_readable); + }; +}; diff --git a/ev/+linux/file.ha b/ev/+linux/file.ha index 183c94d8cce92b6aa647f48618129997af64092c..87d4d089d49bd1a109c8f2f87369dbf679c0b8ab 100644 --- a/ev/+linux/file.ha +++ b/ev/+linux/file.ha @@ -17,8 +17,10 @@ CONNECT_TCP = 4 << 16, CONNECT_UDP = 5 << 16, SIGNAL = 6 << 16, TIMER = 7 << 16, - RECVFROM = 8 << 16, - SENDTO = 9 << 16, + SENDTO = 8 << 16, + RECVFROM = 9 << 16, + SEND = 10 << 16, + RECV = 11 << 16, }; export type fflags = enum uint { @@ -138,10 +140,10 @@ // Updates epoll events for a given file. For internal use. fn file_epoll_ctl(file: *file) void = { let events = rt::EPOLLONESHOT; - if (file.op & op::READV != 0 || file.op & op::READABLE != 0) { + if (file.op & op::READV != 0 || file.op == op::READABLE) { events |= rt::EPOLLIN | rt::EPOLLHUP; }; - if (file.op & op::WRITEV != 0 || file.op & op::WRITABLE != 0) { + if (file.op & op::WRITEV != 0 || file.op == op::WRITABLE) { events |= rt::EPOLLOUT | rt::EPOLLHUP; }; switch (file.op) { @@ -154,10 +156,10 @@ events |= rt::EPOLLIN; case op::TIMER => events &= ~rt::EPOLLONESHOT; events |= rt::EPOLLIN; - case op::RECVFROM => - events |= rt::EPOLLIN; - case op::SENDTO => + case op::SEND, op::SENDTO => events |= rt::EPOLLOUT; + case op::RECV, op::RECVFROM => + events |= rt::EPOLLIN; case => yield; }; diff --git a/ev/+linux/loop.ha b/ev/+linux/loop.ha index f5fee568444f532bfdf86e659f01eadf60ba4e40..887e62d73a646292b555fe63def83b279f39e8cf 100644 --- a/ev/+linux/loop.ha +++ b/ev/+linux/loop.ha @@ -110,10 +110,14 @@ case op::SIGNAL => signal_ready(file, ev); case op::TIMER => timer_ready(file, ev); + case op::SENDTO => + sendto_ready(file, ev); case op::RECVFROM => recvfrom_ready(file, ev); - case op::SENDTO => - sendto_ready(file, ev); + case op::SEND => + send_ready(file, ev); + case op::RECV => + recv_ready(file, ev); case => assert(pending & ~(op::READV | op::WRITEV) == 0); }; diff --git a/ev/+linux/socket.ha b/ev/+linux/socket.ha index 5db2fc7ff373a1bbd861bc805a58e275701b3ce3..62c3c252eb4cd196b6fa15e94f29050465967956 100644 --- a/ev/+linux/socket.ha +++ b/ev/+linux/socket.ha @@ -29,6 +29,19 @@ const sock = udp::listen(addr, port, opts...)?; return register(loop, sock)?; }; +// Creates a UDP socket on this event loop and sets the default destination to +// the given address. +export fn connect_udp( + loop: *loop, + dest: ip::addr, + port: u16, + opts: udp::connect_option... +) (*file | net::error | errors::error) = { + const sock = udp::connect(dest, port, opts...)?; + const file = register(loop, sock)?; + return file; +}; + export type connectcb = fn(result: (*file | net::error), user: nullable *void) void; // Creates a socket and connects to a given IP address and port over TCP. @@ -76,19 +89,6 @@ file.user = user; file.cb = cb; file.op = op::CONNECT_TCP; file_epoll_ctl(file); -}; - -// Creates a UDP socket on this event loop and sets the default destination to -// the given address. -export fn connect_udp( - loop: *loop, - dest: ip::addr, - port: u16, - opts: udp::connect_option... -) (*file | net::error | errors::error) = { - const sock = udp::connect(dest, port, opts...)?; - const file = register(loop, sock)?; - return file; }; fn connect_tcp_ready( @@ -202,7 +202,67 @@ cb(sock, (n, src, port)); }; }; +// Callback for a [[recv]] operation. +export type recvcb = fn(file: *file, r: (size | net::error)) void; + +// Schedules a receive operation on a (connected) socket. +export fn recv( + sock: *file, + cb: *recvcb, + buf: []u8, +) req = { + assert(sock.op == op::NONE); + sock.op = op::RECV; + sock.cb = cb; + sock.sendrecv.rbuf = buf; + file_epoll_ctl(sock); + return req { ... }; +}; + +fn recv_ready( + sock: *file, + ev: *rt::epoll_event, +) void = { + assert(sock.op == op::RECV); + assert(sock.cb != null); + const cb = sock.cb: *recvcb; + sock.op = op::NONE; + file_epoll_ctl(sock); + + const r = udp::recv(sock.fd, sock.sendrecv.rbuf); + cb(sock, r); +}; + +// Callback for a [[send]] or [[sendto]] operation. export type sendtocb = fn(file: *file, r: (size | net::error)) void; + +// Schedules a send operation on a (connected) socket. +export fn send( + sock: *file, + cb: *sendtocb, + buf: []u8, +) req = { + assert(sock.op == op::NONE); + sock.op = op::SEND; + sock.cb = cb; + sock.sendrecv.sbuf = buf; + file_epoll_ctl(sock); + return req { ... }; +}; + +fn send_ready( + sock: *file, + ev: *rt::epoll_event, +) void = { + assert(sock.op == op::SEND); + assert(sock.cb != null); + const cb = sock.cb: *sendtocb; + sock.op = op::NONE; + file_epoll_ctl(sock); + + const r = udp::send(sock.fd, sock.sendrecv.sbuf); + cb(sock, r); +}; // Schedules a send operation on a socket. export fn sendto( -- 2.48.1