From 4ac4f2df0031e88df24550fee37145943a26cfd2 Mon Sep 17 00:00:00 2001 From: Drew DeVault Date: Fri, 16 Dec 2022 21:53:21 +0100 Subject: [PATCH] Initial UDP support --- cmd/udpserv/main.ha | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++++ ev/+linux/file.ha | 28 +++++++++++++++++++++------- ev/+linux/loop.ha | 8 ++++++-- ev/+linux/socket.ha | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++-- diff --git a/cmd/udpserv/main.ha b/cmd/udpserv/main.ha new file mode 100644 index 0000000000000000000000000000000000000000..194468e1aeb554b59d7eb3d7051bad230f22f58a --- /dev/null +++ b/cmd/udpserv/main.ha @@ -0,0 +1,80 @@ +use errors; +use ev; +use log; +use net; +use net::ip; +use os; + +type state = struct { + loop: *ev::loop, + src: ip::addr, + port: u16, + buf: [os::BUFSIZ]u8, + wbuf: []u8, +}; + +export fn main() void = { + const loop = ev::newloop()!; + defer ev::finish(&loop); + + const sock = match (ev::listen_udp(&loop, ip::LOCAL_V4, 12345)) { + case let err: net::error => + log::fatalf("Error: listen: {}", net::strerror(err)); + case let err: errors::error => + log::fatalf("Error: listen: {}", errors::strerror(err)); + case let sock: *ev::file => + yield sock; + }; + defer ev::close(sock); + + let state = state { + loop = &loop, + ... + }; + ev::setuser(sock, &state); + ev::recvfrom(sock, &recv, state.buf); + + log::println("Listening on 127.0.0.1:12345"); + for (ev::dispatch(&loop, -1)!) void; +}; + +fn recv(sock: *ev::file, r: ((size, ip::addr, u16) | net::error)) void = { + const state = ev::getuser(sock): *state; + const (n, src, port) = match (r) { + case let err: net::error => + log::println("Error: recv:", net::strerror(err)); + ev::stop(state.loop); + return; + case let packet: (size, ip::addr, u16) => + yield packet; + }; + + // TODO: Make ev send/write all data and drop these fields and the need + // to manually manage a write buffer and re-call sendto in &send + state.src = src; + state.port = port; + + log::printfln("{} bytes from {}:{}", n, ip::string(src), port); + state.wbuf = state.buf[..n]; + ev::sendto(sock, &send, state.wbuf, src, port); +}; + +fn send(sock: *ev::file, r: (size | net::error)) void = { + const state = ev::getuser(sock): *state; + const n = match (r) { + case let err: net::error => + log::println("Error: send:", net::strerror(err)); + ev::stop(state.loop); + return; + case let n: size => + yield n; + }; + + static delete(state.wbuf[..n]); + + if (len(state.wbuf) != 0) { + ev::sendto(sock, &send, state.wbuf, state.src, state.port); + } else { + ev::recvfrom(sock, &recv, state.buf); + }; +}; diff --git a/ev/+linux/file.ha b/ev/+linux/file.ha index ea33ad752f550ac204327e5802eb5e4718782ec6..183c94d8cce92b6aa647f48618129997af64092c 100644 --- a/ev/+linux/file.ha +++ b/ev/+linux/file.ha @@ -1,6 +1,7 @@ use errors; use io; use net; +use net::ip; use rt; use unix::signal; @@ -9,12 +10,15 @@ NONE = 0, READV = 1 << 0, WRITEV = 1 << 1, - READABLE = 1 << 16, - WRITABLE = 2 << 16, - ACCEPT = 3 << 16, - CONNECT = 4 << 16, - SIGNAL = 5 << 16, - TIMER = 6 << 16, + READABLE = 1 << 16, + WRITABLE = 2 << 16, + ACCEPT = 3 << 16, + CONNECT_TCP = 4 << 16, + CONNECT_UDP = 5 << 16, + SIGNAL = 6 << 16, + TIMER = 7 << 16, + RECVFROM = 8 << 16, + SENDTO = 9 << 16, }; export type fflags = enum uint { @@ -41,6 +45,12 @@ wvec: []rt::iovec, }, sockflags: net::sockflags, sigmask: signal::sigset, + sendrecv: struct { + sbuf: []u8, + rbuf: []u8, + dest: ip::addr, + port: u16, + }, }, }; @@ -137,13 +147,17 @@ }; switch (file.op) { case op::ACCEPT => events |= rt::EPOLLIN; - case op::CONNECT => + case op::CONNECT_TCP => events |= rt::EPOLLOUT; case op::SIGNAL => events |= rt::EPOLLIN; case op::TIMER => events &= ~rt::EPOLLONESHOT; events |= rt::EPOLLIN; + case op::RECVFROM => + events |= rt::EPOLLIN; + case op::SENDTO => + events |= rt::EPOLLOUT; case => yield; }; diff --git a/ev/+linux/loop.ha b/ev/+linux/loop.ha index 64a0dc35d6b12ff91ead51be9a45d11fc206cdc7..f5fee568444f532bfdf86e659f01eadf60ba4e40 100644 --- a/ev/+linux/loop.ha +++ b/ev/+linux/loop.ha @@ -104,12 +104,16 @@ case op::WRITABLE => writable_ready(file, ev); case op::ACCEPT => accept_ready(file, ev); - case op::CONNECT => - connect_ready(file, ev); + case op::CONNECT_TCP => + connect_tcp_ready(file, ev); case op::SIGNAL => signal_ready(file, ev); case op::TIMER => timer_ready(file, ev); + case op::RECVFROM => + recvfrom_ready(file, ev); + case op::SENDTO => + sendto_ready(file, ev); case => assert(pending & ~(op::READV | op::WRITEV) == 0); }; diff --git a/ev/+linux/socket.ha b/ev/+linux/socket.ha index 90136dc12c0330cec7b03cb0c24c1454d920ba5f..5db2fc7ff373a1bbd861bc805a58e275701b3ce3 100644 --- a/ev/+linux/socket.ha +++ b/ev/+linux/socket.ha @@ -2,6 +2,7 @@ use errors; use net; use net::ip; use net::tcp; +use net::udp; use rt; // Creates a socket which listens for incoming TCP connections on the given @@ -13,6 +14,18 @@ port: u16, opts: tcp::listen_option... ) (*file | net::error | errors::error) = { const sock = tcp::listen(addr, port, opts...)?; + return register(loop, sock)?; +}; + +// Creates a socket which listens for incoming UDP packets on the given IP +// address and port. +export fn listen_udp( + loop: *loop, + addr: ip::addr, + port: u16, + opts: udp::listen_option... +) (*file | net::error | errors::error) = { + const sock = udp::listen(addr, port, opts...)?; return register(loop, sock)?; }; @@ -61,15 +74,28 @@ const sock = tcp::connect(addr, port, opt | net::sockflags::NONBLOCK)?; let file = register(loop, sock)?; file.user = user; file.cb = cb; - file.op = op::CONNECT; + file.op = op::CONNECT_TCP; file_epoll_ctl(file); }; -fn connect_ready( +// Creates a UDP socket on this event loop and sets the default destination to +// the given address. +export fn connect_udp( + loop: *loop, + dest: ip::addr, + port: u16, + opts: udp::connect_option... +) (*file | net::error | errors::error) = { + const sock = udp::connect(dest, port, opts...)?; + const file = register(loop, sock)?; + return file; +}; + +fn connect_tcp_ready( sock: *file, ev: *rt::epoll_event, ) void = { - assert(sock.op == op::CONNECT); + assert(sock.op == op::CONNECT_TCP); assert(ev.events & rt::EPOLLOUT != 0); assert(sock.cb != null); const cb = sock.cb: *connectcb; @@ -132,3 +158,85 @@ case let err: net::error => cb(sock, err); }; }; + +// TODO: Support recv & send in parallel + +// Callback for a [[recvfrom]] operation. The second parameter is either an +// error or a tuple of the number of bytes received and the IP address and port +// of the sender. +export type recvfromcb = fn( + file: *file, + r: ((size, ip::addr, u16) | net::error), +) void; + +// Schedules a receive operation on a socket. +export fn recvfrom( + sock: *file, + cb: *recvfromcb, + buf: []u8, +) req = { + assert(sock.op == op::NONE); + sock.op = op::RECVFROM; + sock.cb = cb; + sock.sendrecv.rbuf = buf; + file_epoll_ctl(sock); + return req { ... }; +}; + +fn recvfrom_ready( + sock: *file, + ev: *rt::epoll_event, +) void = { + assert(sock.op == op::RECVFROM); + assert(sock.cb != null); + const cb = sock.cb: *recvfromcb; + sock.op = op::NONE; + file_epoll_ctl(sock); + + let src: ip::addr = ip::ANY_V4, port = 0u16; + match (udp::recvfrom(sock.fd, sock.sendrecv.rbuf, &src, &port)) { + case let err: net::error => + cb(sock, err); + case let n: size => + cb(sock, (n, src, port)); + }; +}; + +export type sendtocb = fn(file: *file, r: (size | net::error)) void; + +// Schedules a send operation on a socket. +export fn sendto( + sock: *file, + cb: *sendtocb, + buf: []u8, + dest: ip::addr, + port: u16, +) req = { + assert(sock.op == op::NONE); + sock.op = op::SENDTO; + sock.cb = cb; + sock.sendrecv.sbuf = buf; + sock.sendrecv.dest = dest; + sock.sendrecv.port = port; + file_epoll_ctl(sock); + return req { ... }; +}; + +fn sendto_ready( + sock: *file, + ev: *rt::epoll_event, +) void = { + assert(sock.op == op::SENDTO); + assert(sock.cb != null); + const cb = sock.cb: *sendtocb; + sock.op = op::NONE; + file_epoll_ctl(sock); + + const r = udp::sendto( + sock.fd, + sock.sendrecv.sbuf, + sock.sendrecv.dest, + sock.sendrecv.port, + ); + cb(sock, r); +}; -- 2.48.1