From a627767772fdd9c1edfcc9ca7805651bac2e7bad Mon Sep 17 00:00:00 2001 From: Drew DeVault Date: Fri, 16 Dec 2022 12:01:59 +0100 Subject: [PATCH] Implement concurrent read and write operations --- ev/+linux/file.ha | 39 ++++++++++++++++++++++++++++----------- ev/+linux/io.ha | 45 ++++++++++++++++++++++----------------------- ev/+linux/loop.ha | 17 +++++++++++------ ev/+linux/socket.ha | 8 ++++---- diff --git a/ev/+linux/file.ha b/ev/+linux/file.ha index 45a73d32769f98b8c1084ce059093b9a5bbf7179..65d683ff8e174d835ba1a9f2e61719eb7e1c2b41 100644 --- a/ev/+linux/file.ha +++ b/ev/+linux/file.ha @@ -3,12 +3,12 @@ use io; use net; use rt; -export type op = enum { - NONE, - READV, - WRITEV, - ACCEPT, - CONNECT, +export type op = enum u64 { + NONE = 0, + READV = 1 << 0, + WRITEV = 1 << 1, + ACCEPT = 1 << 2, + CONNECT = 1 << 3, }; export type fflags = enum uint { @@ -22,13 +22,16 @@ flags: fflags, op: op, cb: nullable *void, + cb2: nullable *void, user: nullable *void, // Operation-specific data union { struct { - vbuf: rt::iovec, - vec: []rt::iovec, + rvbuf: rt::iovec, + rvec: []rt::iovec, + wvbuf: rt::iovec, + wvec: []rt::iovec, }, sockflags: net::sockflags, }, @@ -112,10 +115,24 @@ export fn getloop(file: *file) *loop = { return file.ev; }; -// Modifies the epoll events for a given file. For internal use. -fn filemod(file: *file, events: u32) void = { +// Updates epoll events for a given file. For internal use. +fn file_epoll_ctl(file: *file) void = { + let events = rt::EPOLLONESHOT; + if (file.op & op::READV != 0) { + events |= rt::EPOLLIN | rt::EPOLLHUP; + }; + if (file.op & op::WRITEV != 0) { + events |= rt::EPOLLOUT | rt::EPOLLHUP; + }; + if (file.op & op::ACCEPT != 0) { + events |= rt::EPOLLIN; + }; + if (file.op & op::CONNECT != 0) { + events |= rt::EPOLLOUT; + }; + let ev = rt::epoll_event { - events = events | rt::EPOLLONESHOT, + events = events, ... }; ev.data.ptr = file; diff --git a/ev/+linux/io.ha b/ev/+linux/io.ha index 1be01d3c43c3212b2d14aed9207429ad565bc363..63ad4e27a5498e9851546ad900abaa4f715804a7 100644 --- a/ev/+linux/io.ha +++ b/ev/+linux/io.ha @@ -10,9 +10,9 @@ file: *file, cb: *readcb, buf: []u8, ) req = { - file.vbuf = io::mkvector(buf); + file.rvbuf = io::mkvector(buf); // XXX: Bit of a hack to avoid allocating a slice - const vec = (&file.vbuf: *[*]io::vector)[..1]; + const vec = (&file.rvbuf: *[*]io::vector)[..1]; return readv(file, cb, vec...); }; @@ -22,32 +22,31 @@ file: *file, cb: *readcb, vec: io::vector... ) req = { - assert(file.op == op::NONE); + assert(file.op & op::READV == 0); if (file.flags & fflags::BLOCKING != 0) { const r = io::readv(file.fd, vec...); cb(file, r); return req { ... }; }; - file.op = op::READV; + file.op |= op::READV; file.cb = cb; - file.vec = vec; - filemod(file, rt::EPOLLIN | rt::EPOLLHUP); + file.rvec = vec; + file_epoll_ctl(file); return req { ... }; }; fn readv_ready(file: *file, ev: *rt::epoll_event) void = { - assert(file.op == op::READV); - assert(ev.events & (rt::EPOLLIN | rt::EPOLLHUP) != 0); + assert(file.op & op::READV != 0); assert(file.cb != null); const cb = file.cb: *readcb; - file.op = op::NONE; - filemod(file, 0); + file.op &= ~op::READV; + file_epoll_ctl(file); if (ev.events & rt::EPOLLHUP != 0) { cb(file, io::EOF); } else { - const r = io::readv(file.fd, file.vec...); + const r = io::readv(file.fd, file.rvec...); cb(file, r); }; }; @@ -61,9 +60,9 @@ file: *file, cb: *writecb, buf: []u8, ) req = { - file.vbuf = io::mkvector(buf); + file.wvbuf = io::mkvector(buf); // XXX: Bit of a hack to avoid allocating a slice - const vec = (&file.vbuf: *[*]io::vector)[..1]; + const vec = (&file.wvbuf: *[*]io::vector)[..1]; return writev(file, cb, vec...); }; @@ -75,26 +74,26 @@ vec: io::vector... ) req = { // XXX: Should we support both pending reads and writes at the same // time? (yes) - assert(file.op == op::NONE); + assert(file.op & op::WRITEV == 0); if (file.flags & fflags::BLOCKING != 0) { const r = io::writev(file.fd, vec...); cb(file, r); return req { ... }; }; - file.op = op::WRITEV; - file.cb = cb; - file.vec = vec; - filemod(file, rt::EPOLLOUT | rt::EPOLLHUP); + file.op |= op::WRITEV; + file.cb2 = cb; + file.wvec = vec; + file_epoll_ctl(file); return req { ... }; }; fn writev_ready(file: *file, ev: *rt::epoll_event) void = { - assert(file.op == op::WRITEV && ev.events & rt::EPOLLOUT != 0); + assert(file.op & op::WRITEV != 0); assert(file.cb != null); - const r = io::writev(file.fd, file.vec...); - const cb = file.cb: *writecb; - file.op = op::NONE; - filemod(file, 0); + const r = io::writev(file.fd, file.wvec...); + const cb = file.cb2: *writecb; + file.op &= ~op::WRITEV; + file_epoll_ctl(file); cb(file, r); }; diff --git a/ev/+linux/loop.ha b/ev/+linux/loop.ha index 8151511187805511b0fec3c8f61fd9f472d4ecf6..9144a59c908bd109d36db358416c6b6b4a9da0eb 100644 --- a/ev/+linux/loop.ha +++ b/ev/+linux/loop.ha @@ -85,16 +85,21 @@ const file = ev.data.ptr: *file; if (ev.events == 0) { continue; }; - switch (file.op) { - case op::NONE => + if (file.op == op::NONE) { abort("Invalid pending operation"); - case op::READV => + }; + if (ev.events & (rt::EPOLLIN | rt::EPOLLHUP) != 0 + && file.op & op::READV != 0) { readv_ready(file, ev); - case op::WRITEV => + }; + if (ev.events & (rt::EPOLLOUT | rt::EPOLLHUP) != 0 + && file.op & op::WRITEV != 0) { writev_ready(file, ev); - case op::ACCEPT => + }; + if (file.op & op::ACCEPT != 0) { accept_ready(file, ev); - case op::CONNECT => + }; + if (file.op & op::CONNECT != 0) { connect_ready(file, ev); }; }; diff --git a/ev/+linux/socket.ha b/ev/+linux/socket.ha index c1d2da961c53298f3f74c1fa3460e0017ff5cd24..62163718ad7db2dce4b0dacdffd0c5b863233707 100644 --- a/ev/+linux/socket.ha +++ b/ev/+linux/socket.ha @@ -59,7 +59,7 @@ let file = register(loop, sock)?; file.user = user; file.cb = cb; file.op = op::CONNECT; - filemod(file, rt::EPOLLOUT); + file_epoll_ctl(file); }; fn connect_ready( @@ -71,7 +71,7 @@ assert(ev.events & rt::EPOLLOUT != 0); assert(sock.cb != null); const cb = sock.cb: *connectcb; sock.op = op::NONE; - filemod(sock, 0); + file_epoll_ctl(sock); let errno = 0i, optsz = size(int): u32; rt::getsockopt(sock.fd, rt::SOL_SOCKET, rt::SO_ERROR, &errno, &optsz)!; @@ -104,7 +104,7 @@ }; sock.op = op::ACCEPT; sock.cb = cb; sock.sockflags = fl; - filemod(sock, rt::EPOLLIN); + file_epoll_ctl(sock); return req { ... }; }; @@ -117,7 +117,7 @@ assert(ev.events & rt::EPOLLIN != 0); assert(sock.cb != null); const cb = sock.cb: *acceptcb; sock.op = op::NONE; - filemod(sock, 0); + file_epoll_ctl(sock); const r = tcp::accept(sock.fd, sock.sockflags); match (r) { -- 2.48.1