From b6dd3ffce416fa86c223fad5d2f6c3db5d5727e4 Mon Sep 17 00:00:00 2001 From: Runxi Yu Date: Sun, 17 Aug 2025 04:48:47 +0800 Subject: [PATCH] A few other context fixes --- forged/internal/incoming/hooks/hooks.go | 21 +++++++++++++-------- forged/internal/incoming/lmtp/config.go | 21 +++++++++++++-------- forged/internal/incoming/ssh/ssh.go | 13 ++++++++----- forged/internal/incoming/web/web.go | 16 +++++++++++----- forged/internal/server/server.go | 54 ++++++++++------------------------------------------- diff --git a/forged/internal/incoming/hooks/hooks.go b/forged/internal/incoming/hooks/hooks.go index 3be081121758fb799d80d1b96104c649895fb3fa..52ccb0f0b1a0a7d2bcdcf54f06182cbd925710c9 100644 --- a/forged/internal/incoming/hooks/hooks.go +++ b/forged/internal/incoming/hooks/hooks.go @@ -5,6 +5,7 @@ "context" "errors" "fmt" "net" + "time" "github.com/gliderlabs/ssh" "go.lindenii.runxiyu.org/forge/forged/internal/common/cmap" @@ -51,25 +52,29 @@ defer func() { _ = listener.Close() }() - go func() { - <-ctx.Done() + stop := context.AfterFunc(ctx, func() { _ = listener.Close() - // TODO: Log the error - }() + }) + defer stop() for { conn, err := listener.Accept() if err != nil { - if errors.Is(err, net.ErrClosed) { + if errors.Is(err, net.ErrClosed) || ctx.Err() != nil { return nil } return fmt.Errorf("accept conn: %w", err) } - go server.handleConn(conn) + go server.handleConn(ctx, conn) } } -func (server *Server) handleConn(conn net.Conn) { - panic("TODO: handle hook connection") +func (server *Server) handleConn(ctx context.Context, conn net.Conn) { + defer conn.Close() + unblock := context.AfterFunc(ctx, func() { + _ = conn.SetDeadline(time.Now()) + _ = conn.Close() + }) + defer unblock() } diff --git a/forged/internal/incoming/lmtp/config.go b/forged/internal/incoming/lmtp/config.go index ce32f3dbfbccd567fc2f60162675aec80d063582..def3ce9fe2487a48ce0fe90b91edbe19f9ba9a3b 100644 --- a/forged/internal/incoming/lmtp/config.go +++ b/forged/internal/incoming/lmtp/config.go @@ -5,6 +5,7 @@ "context" "errors" "fmt" "net" + "time" "go.lindenii.runxiyu.org/forge/forged/internal/common/misc" ) @@ -44,25 +45,29 @@ defer func() { _ = listener.Close() }() - go func() { - <-ctx.Done() + stop := context.AfterFunc(ctx, func() { _ = listener.Close() - // TODO: Log the error - }() + }) + defer stop() for { conn, err := listener.Accept() if err != nil { - if errors.Is(err, net.ErrClosed) { + if errors.Is(err, net.ErrClosed) || ctx.Err() != nil { return nil } return fmt.Errorf("accept conn: %w", err) } - go server.handleConn(conn) + go server.handleConn(ctx, conn) } } -func (server *Server) handleConn(conn net.Conn) { - panic("TODO: handle LMTP connection") +func (server *Server) handleConn(ctx context.Context, conn net.Conn) { + defer conn.Close() + unblock := context.AfterFunc(ctx, func() { + _ = conn.SetDeadline(time.Now()) + _ = conn.Close() + }) + defer unblock() } diff --git a/forged/internal/incoming/ssh/ssh.go b/forged/internal/incoming/ssh/ssh.go index 9f9bdff0f30a4421b6247e70a5409234333b354a..0c722c097fd875a8d5e261121d16081f9a514000 100644 --- a/forged/internal/incoming/ssh/ssh.go +++ b/forged/internal/incoming/ssh/ssh.go @@ -61,20 +61,23 @@ } func (server *Server) Run(ctx context.Context) (err error) { listener, err := misc.Listen(server.net, server.addr) + if err != nil { + return fmt.Errorf("listen for SSH: %w", err) + } defer func() { _ = listener.Close() }() - go func() { - <-ctx.Done() - shCtx, cancel := context.WithTimeout(context.Background(), time.Duration(server.shutdownTimeout)*time.Second) + stop := context.AfterFunc(ctx, func() { + shCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Duration(server.shutdownTimeout)*time.Second) defer cancel() _ = server.gliderServer.Shutdown(shCtx) _ = listener.Close() - }() + }) + defer stop() if err = server.gliderServer.Serve(listener); err != nil { - if errors.Is(err, gliderssh.ErrServerClosed) { + if errors.Is(err, gliderssh.ErrServerClosed) || ctx.Err() != nil { return nil } return fmt.Errorf("serve SSH: %w", err) diff --git a/forged/internal/incoming/web/web.go b/forged/internal/incoming/web/web.go index 391f6ff65a07cf5b54c3b28b9d602dd555c15626..dc2d9b49e6383c59902fd30c16cf0d838b287512 100644 --- a/forged/internal/incoming/web/web.go +++ b/forged/internal/incoming/web/web.go @@ -3,6 +3,7 @@ import ( "context" "fmt" + "net" "net/http" "time" @@ -53,21 +54,26 @@ } } func (server *Server) Run(ctx context.Context) (err error) { + server.httpServer.BaseContext = func(_ net.Listener) context.Context { return ctx } + listener, err := misc.Listen(server.net, server.addr) + if err != nil { + return fmt.Errorf("listen for web: %w", err) + } defer func() { _ = listener.Close() }() - go func() { - <-ctx.Done() - shCtx, cancel := context.WithTimeout(context.Background(), time.Duration(server.shutdownTimeout)*time.Second) + stop := context.AfterFunc(ctx, func() { + shCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Duration(server.shutdownTimeout)*time.Second) defer cancel() _ = server.httpServer.Shutdown(shCtx) _ = listener.Close() - }() + }) + defer stop() if err = server.httpServer.Serve(listener); err != nil { - if err == http.ErrServerClosed { + if err == http.ErrServerClosed || ctx.Err() != nil { return nil } return fmt.Errorf("serve web: %w", err) diff --git a/forged/internal/server/server.go b/forged/internal/server/server.go index 472df7a30db421edda1888693292749214fa1e8e..ab677e06bde1c18a40b3345d8c7f96e22e0f2d3b 100644 --- a/forged/internal/server/server.go +++ b/forged/internal/server/server.go @@ -10,6 +10,7 @@ "go.lindenii.runxiyu.org/forge/forged/internal/incoming/hooks" "go.lindenii.runxiyu.org/forge/forged/internal/incoming/lmtp" "go.lindenii.runxiyu.org/forge/forged/internal/incoming/ssh" "go.lindenii.runxiyu.org/forge/forged/internal/incoming/web" + "golang.org/x/sync/errgroup" ) type Server struct { @@ -51,57 +52,22 @@ func (server *Server) Run(ctx context.Context) (err error) { // TODO: Not running git2d because it should be run separately. // This needs to be documented somewhere, hence a TODO here for now. - subCtx, cancel := context.WithCancel(ctx) - defer cancel() + g, gctx := errgroup.WithContext(ctx) - server.database, err = database.Open(subCtx, server.config.DB) + server.database, err = database.Open(gctx, server.config.DB) if err != nil { return fmt.Errorf("open database: %w", err) } - - errCh := make(chan error) - - go func() { - if err := server.hookServer.Run(subCtx); err != nil { - select { - case errCh <- err: - default: - } - } - }() - - go func() { - if err := server.lmtpServer.Run(subCtx); err != nil { - select { - case errCh <- err: - default: - } - } - }() - - go func() { - if err := server.webServer.Run(subCtx); err != nil { - select { - case errCh <- err: - default: - } - } - }() + defer server.database.Close() - go func() { - if err := server.sshServer.Run(subCtx); err != nil { - select { - case errCh <- err: - default: - } - } - }() + g.Go(func() error { return server.hookServer.Run(gctx) }) + g.Go(func() error { return server.lmtpServer.Run(gctx) }) + g.Go(func() error { return server.webServer.Run(gctx) }) + g.Go(func() error { return server.sshServer.Run(gctx) }) - select { - case err := <-errCh: + if err := g.Wait(); err != nil { return fmt.Errorf("server error: %w", err) - case <-ctx.Done(): } - return nil + return ctx.Err() } -- 2.48.1