From a794ff6f290172c61f88fe17e75fa1524ab3d6a4 Mon Sep 17 00:00:00 2001 From: Drew DeVault Date: Fri, 16 Dec 2022 10:12:33 +0100 Subject: [PATCH] Implement basic TCP server functionality --- cmd/tcpserv/main.ha | 112 +++++++++++++++++++++++++++++++++++++++++++++++++++++ ev/+linux/file.ha | 18 ++++++++++++++++++ ev/+linux/loop.ha | 2 ++ ev/+linux/socket.ha | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++ diff --git a/cmd/tcpserv/main.ha b/cmd/tcpserv/main.ha new file mode 100644 index 0000000000000000000000000000000000000000..7e0c3a711da394418f56506c8e6cc7959a56dda5 --- /dev/null +++ b/cmd/tcpserv/main.ha @@ -0,0 +1,112 @@ +use ev; +use io; +use log; +use net; +use net::ip; +use net::tcp; +use os; + +type server = struct { + loop: *ev::loop, + sock: *ev::file, + clients: []*client, +}; + +type client = struct { + server: *server, + sock: *ev::file, + buf: [os::BUFSIZ]u8, + wbuf: []u8, +}; + +export fn main() void = { + const loop = ev::newloop()!; + defer ev::finish(&loop); + + const sock = ev::listen_tcp(&loop, ip::LOCAL_V4, 12345)!; + defer ev::close(sock); + + let state = server { + loop = &loop, + sock = sock, + ... + }; + ev::setuser(sock, &state); + + ev::accept(sock, &sock_accept); + + log::println("Listening on 127.0.0.1:12345"); + for (ev::dispatch(&loop, -1)!) void; +}; + +fn sock_accept(sock: *ev::file, r: (*ev::file | net::error)) void = { + let server = ev::getuser(sock): *server; + const sock = match (r) { + case let sock: *ev::file => + yield sock; + case let err: net::error => + log::printfln("error: accept: {}", net::strerror(err)); + ev::stop(server.loop); + return; + }; + const file = ev::getfd(sock); + const (addr, port) = tcp::peeraddr(file) as (ip::addr, u16); + log::printfln("Connection from {}:{}", ip::string(addr), port); + + const client = alloc(client { + server = server, + sock = sock, + ... + }); + append(server.clients, client); + ev::setuser(client.sock, client); + ev::read(client.sock, &client_read, client.buf); + ev::accept(server.sock, &sock_accept); +}; + +fn client_read(sock: *ev::file, r: (size | io::EOF | io::error)) void = { + const client = ev::getuser(sock): *client; + const n = match (r) { + case let err: io::error => + log::printfln("error: read: {}", io::strerror(err)); + client_close(client); + return; + case io::EOF => + client_close(client); + return; + case let n: size => + yield n; + }; + client.wbuf = client.buf[..n]; + ev::write(client.sock, &client_write, client.wbuf); +}; + +fn client_write(sock: *ev::file, r: (size | io::error)) void = { + const client = ev::getuser(sock): *client; + const n = match (r) { + case let err: io::error => + log::printfln("error: write: {}", io::strerror(err)); + client_close(client); + return; + case let n: size => + yield n; + }; + static delete(client.wbuf[..n]); + if (len(client.wbuf) != 0) { + ev::write(client.sock, &client_write, client.wbuf); + } else { + ev::read(client.sock, &client_read, client.buf); + }; +}; + +fn client_close(client: *client) void = { + const server = client.server; + for (let i = 0z; i < len(server.clients); i += 1) { + if (server.clients[i] == client) { + delete(server.clients[i]); + break; + }; + }; + ev::close(client.sock); + free(client); +}; diff --git a/ev/+linux/file.ha b/ev/+linux/file.ha index a25a11e886324b3f33b4531cecc5b280c1f6aa7c..11e0976a6f826e36de5f8283ff9a71da782918d8 100644 --- a/ev/+linux/file.ha +++ b/ev/+linux/file.ha @@ -1,11 +1,13 @@ use errors; use io; +use net; use rt; export type op = enum { NONE, READV, WRITEV, + ACCEPT, }; export type fflags = enum uint { @@ -27,6 +29,7 @@ struct { vbuf: rt::iovec, vec: []rt::iovec, }, + sockflags: net::sockflags, }, }; @@ -74,6 +77,21 @@ // custom [[file]] which was never registered, so assert on // error. rt::epoll_ctl(loop.fd, rt::EPOLL_CTL_DEL, file.fd, null)!; }; + free(file); +}; + +// Unregisters a file object with an event loop, frees resources associated with +// it, and closes the underlying file descriptor. +export fn close(file: *file) void = { + const loop = file.ev; + if (file.flags & fflags::BLOCKING == 0) { + // The only way that this could fail is in the event of a + // use-after-free or if the user fucks around and constructs a + // custom [[file]] which was never registered, so assert on + // error. + rt::epoll_ctl(loop.fd, rt::EPOLL_CTL_DEL, file.fd, null)!; + }; + io::close(file.fd)!; free(file); }; diff --git a/ev/+linux/loop.ha b/ev/+linux/loop.ha index 0e5dfc953a8299590281bfc45f817b5657c67f76..cd5d9476bba1f1509c2441ddfd368e9396d21a70 100644 --- a/ev/+linux/loop.ha +++ b/ev/+linux/loop.ha @@ -92,6 +92,8 @@ case op::READV => readv_ready(file, ev); case op::WRITEV => writev_ready(file, ev); + case op::ACCEPT => + accept_ready(file, ev); }; }; diff --git a/ev/+linux/socket.ha b/ev/+linux/socket.ha new file mode 100644 index 0000000000000000000000000000000000000000..cf11b88fcf1d4bcbb3cca27fd836f142e2610035 --- /dev/null +++ b/ev/+linux/socket.ha @@ -0,0 +1,59 @@ +use errors; +use net; +use net::ip; +use net::tcp; +use rt; + +// Creates a socket which listens for incoming TCP connections on the given +// IP address and port. +export fn listen_tcp( + loop: *loop, + addr: ip::addr, + port: u16, + opts: tcp::listen_option... +) (*file | net::error | errors::error) = { + const sock = tcp::listen(addr, port, opts...)?; + return register(loop, sock)?; +}; + +// A callback for an [[accept]] operation. +export type acceptcb = fn(file: *file, result: (*file | net::error)) void; + +// Schedules an accept operation on a socket. +export fn accept( + sock: *file, + cb: *acceptcb, + flags: net::sockflags... +) req = { + assert(sock.op == op::NONE); + let fl: net::sockflags = 0; + for (let i = 0z; i < len(flags); i += 1) { + fl |= flags[i]; + }; + sock.op = op::ACCEPT; + sock.cb = cb; + sock.sockflags = fl; + filemod(sock, rt::EPOLLIN); + return req { ... }; +}; + +fn accept_ready( + sock: *file, + ev: *rt::epoll_event, +) void = { + assert(sock.op == op::ACCEPT); + assert(ev.events & rt::EPOLLIN != 0); + assert(sock.cb != null); + const cb = sock.cb: *acceptcb; + sock.op = op::NONE; + + const r = tcp::accept(sock.fd, sock.sockflags); + match (r) { + case let fd: net::socket => + // TODO: Bubble up errors from here? + const file = register(sock.ev, fd)!; + cb(sock, file); + case let err: net::error => + cb(sock, err); + }; +}; -- 2.48.1