From 11e7fa2f00dc6faebf522f9dd5df87923533346f Mon Sep 17 00:00:00 2001 From: Drew DeVault Date: Wed, 27 Sep 2023 13:37:26 +0200 Subject: [PATCH] Implement I/O request cancellation --- ev/+linux/io.ha | 18 ++++++++++++++++-- ev/+linux/poll.ha | 18 ++++++++++++++++-- ev/+linux/req.ha | 26 ++++++++++++++++++++++++++ ev/+linux/socket.ha | 57 ++++++++++++++++++++++++++++++++++++++++++++++------- ev/+linux/types.ha | 3 --- diff --git a/ev/+linux/io.ha b/ev/+linux/io.ha index a8480b99b6fd7d731d832d07f11c499da1fd8af6..1758d60e2e76ad3c72293a364d13197b0ffd4f9f 100644 --- a/ev/+linux/io.ha +++ b/ev/+linux/io.ha @@ -35,7 +35,7 @@ file.op |= op::READV; file.cb = cb; file.rvec = vec; file_epoll_ctl(file); - return req { ... }; + return mkreq(&readv_cancel, file); }; fn readv_ready(file: *file, ev: *rt::epoll_event) void = { @@ -52,6 +52,13 @@ const vec = file.rvec: []io::vector; const r = io::readv(file.fd, vec...); cb(file, r); }; +}; + +fn readv_cancel(req: *req) void = { + const file = req.user: *file; + assert(file.op & op::READV != 0); + file.op &= ~op::READV; + file_epoll_ctl(file); }; // A callback for a [[write]] or [[writev]] operation. @@ -88,7 +95,7 @@ file.op |= op::WRITEV; file.cb2 = cb; file.wvec = vec; file_epoll_ctl(file); - return req { ... }; + return mkreq(&writev_cancel, file); }; fn writev_ready(file: *file, ev: *rt::epoll_event) void = { @@ -101,3 +108,10 @@ file.op &= ~op::WRITEV; file_epoll_ctl(file); cb(file, r); }; + +fn writev_cancel(req: *req) void = { + const file = req.user: *file; + assert(file.op & op::WRITEV != 0); + file.op &= ~op::WRITEV; + file_epoll_ctl(file); +}; diff --git a/ev/+linux/poll.ha b/ev/+linux/poll.ha index 8f4de68a76836b5581d7132aa332c21dd121dd4e..33b77eafd1991103db9c09d5dd99585bbb7750b9 100644 --- a/ev/+linux/poll.ha +++ b/ev/+linux/poll.ha @@ -18,7 +18,7 @@ file.op |= op::READABLE; file.cb = cb; file_epoll_ctl(file); - return req { ... }; + return mkreq(&readable_cancel, file); }; fn readable_ready(file: *file, ev: *rt::epoll_event) void = { @@ -28,6 +28,13 @@ const cb = file.cb: *readablecb; file.op &= ~op::READABLE; file_epoll_ctl(file); cb(file); +}; + +fn readable_cancel(req: *req) void = { + const file = req.user: *file; + assert(file.op & op::READABLE != 0); + file.op &= ~op::READABLE; + file_epoll_ctl(file); }; // A callback for a [[writable]] operation. @@ -48,7 +55,7 @@ file.op |= op::WRITABLE; file.cb = cb; file_epoll_ctl(file); - return req { ... }; + return mkreq(&writable_cancel, file); }; fn writable_ready(file: *file, ev: *rt::epoll_event) void = { @@ -59,3 +66,10 @@ file.op &= ~op::WRITABLE; file_epoll_ctl(file); cb(file); }; + +fn writable_cancel(req: *req) void = { + const file = req.user: *file; + assert(file.op & op::WRITABLE != 0); + file.op &= ~op::WRITABLE; + file_epoll_ctl(file); +}; diff --git a/ev/+linux/req.ha b/ev/+linux/req.ha new file mode 100644 index 0000000000000000000000000000000000000000..6d1fc067f254f08a9de3361ccee1a3d83eece81a --- /dev/null +++ b/ev/+linux/req.ha @@ -0,0 +1,26 @@ +export type req = struct { + cancel: nullable *cancelfn, + user: nullable *opaque, +}; + +// Makes a new request object. +export fn mkreq(cancel: *cancelfn, user: nullable *opaque) req = { + return req { + cancel = cancel, + user = user, + }; +}; + +// A function which cancels an in-flight request. +export type cancelfn = fn(req: *req) void; + +// Cancels an in-flight request. The user can safely cancel a request more than +// once. A request cannot be cancelled once it has completed. +export fn cancel(req: *req) void = { + match (req.cancel) { + case let cancel: *cancelfn => + cancel(req); + case null => yield; + }; + req.cancel = null; +}; diff --git a/ev/+linux/socket.ha b/ev/+linux/socket.ha index a3b9a165dd1756df7ec2eb7f9c8967036989bc74..ef20d54138eef961e085a656b135d70013c4d72c 100644 --- a/ev/+linux/socket.ha +++ b/ev/+linux/socket.ha @@ -70,7 +70,7 @@ cb: *connectcb, addr: ip::addr, port: u16, opts: (net::sockflag | *opaque)... -) (void | net::error | errors::error) = { +) (req | net::error | errors::error) = { // XXX: This doesn't let us set keepalive let opt: net::sockflag = 0; let user: nullable *opaque = null; @@ -89,6 +89,7 @@ file.user = user; file.cb = cb; file.op = op::CONNECT_TCP; file_epoll_ctl(file); + return mkreq(&connect_tcp_cancel, file); }; fn connect_tcp_ready( @@ -99,7 +100,7 @@ assert(sock.op == op::CONNECT_TCP); assert(ev.events & rt::EPOLLOUT != 0); assert(sock.cb != null); const cb = sock.cb: *connectcb; - sock.op = op::NONE; + sock.op &= ~op::CONNECT_TCP; file_epoll_ctl(sock); let errno = 0i, optsz = size(int): u32; @@ -116,6 +117,13 @@ cb(sock, sock.user); }; }; +fn connect_tcp_cancel(req: *req) void = { + const sock = req.user: *file; + assert(sock.op == op::CONNECT_TCP); + sock.op = op::NONE; + file_epoll_ctl(sock); +}; + // A callback for an [[accept]] operation. export type acceptcb = fn(file: *file, result: (*file | net::error)) void; @@ -134,7 +142,7 @@ sock.op = op::ACCEPT; sock.cb = cb; sock.sockflag = fl; file_epoll_ctl(sock); - return req { ... }; + return mkreq(&accept_cancel, sock); }; fn accept_ready( @@ -159,6 +167,13 @@ cb(sock, err); }; }; +fn accept_cancel(req: *req) void = { + const sock = req.user: *file; + assert(sock.op == op::ACCEPT); + sock.op = op::NONE; + file_epoll_ctl(sock); +}; + // TODO: Support recv & send in parallel // Callback for a [[recvfrom]] operation. The second parameter is either an @@ -180,7 +195,7 @@ sock.op = op::RECVFROM; sock.cb = cb; sock.sendrecv.rbuf = buf; file_epoll_ctl(sock); - return req { ... }; + return mkreq(&recvfrom_cancel, sock); }; fn recvfrom_ready( @@ -202,6 +217,13 @@ cb(sock, (n, src, port)); }; }; +fn recvfrom_cancel(req: *req) void = { + const sock = req.user: *file; + assert(sock.op == op::RECVFROM); + sock.op = op::NONE; + file_epoll_ctl(sock); +}; + // Callback for a [[recv]] operation. export type recvcb = fn(file: *file, r: (size | net::error)) void; @@ -216,7 +238,7 @@ sock.op = op::RECV; sock.cb = cb; sock.sendrecv.rbuf = buf; file_epoll_ctl(sock); - return req { ... }; + return mkreq(&recv_cancel, sock); }; fn recv_ready( @@ -233,6 +255,13 @@ const r = udp::recv(sock.fd, sock.sendrecv.rbuf); cb(sock, r); }; +fn recv_cancel(req: *req) void = { + const sock = req.user: *file; + assert(sock.op == op::RECV); + sock.op = op::NONE; + file_epoll_ctl(sock); +}; + // Callback for a [[send]] or [[sendto]] operation. export type sendtocb = fn(file: *file, r: (size | net::error)) void; @@ -247,7 +276,7 @@ sock.op = op::SEND; sock.cb = cb; sock.sendrecv.sbuf = buf; file_epoll_ctl(sock); - return req { ... }; + return mkreq(&send_cancel, sock); }; fn send_ready( @@ -264,6 +293,13 @@ const r = udp::send(sock.fd, sock.sendrecv.sbuf); cb(sock, r); }; +fn send_cancel(req: *req) void = { + const sock = req.user: *file; + assert(sock.op == op::SEND); + sock.op = op::NONE; + file_epoll_ctl(sock); +}; + // Schedules a send operation on a socket. export fn sendto( sock: *file, @@ -279,7 +315,7 @@ sock.sendrecv.sbuf = buf; sock.sendrecv.dest = dest; sock.sendrecv.port = port; file_epoll_ctl(sock); - return req { ... }; + return mkreq(&sendto_cancel, sock); }; fn sendto_ready( @@ -300,3 +336,10 @@ sock.sendrecv.port, ); cb(sock, r); }; + +fn sendto_cancel(req: *req) void = { + const sock = req.user: *file; + assert(sock.op == op::SENDTO); + sock.op = op::NONE; + file_epoll_ctl(sock); +}; diff --git a/ev/+linux/types.ha b/ev/+linux/types.ha deleted file mode 100644 index 067ce7bf41b5de538d0cf6c18c48ac6ab12641ab..0000000000000000000000000000000000000000 --- a/ev/+linux/types.ha +++ /dev/null @@ -1,3 +0,0 @@ -export type req = struct { - placeholder: uint, -}; -- 2.48.1