From e025843d3c14a1a6801d3a277882cb43b6548e2b Mon Sep 17 00:00:00 2001 From: fatedier Date: Wed, 29 Oct 2025 01:08:48 +0800 Subject: [PATCH] vnet: add exponential backoff for failed reconnections (#5035) --- Release.md | 4 +++ client/visitor/stcp.go | 18 ++++++++++++- client/visitor/visitor.go | 2 +- client/visitor/xtcp.go | 11 ++++++++ pkg/plugin/visitor/plugin.go | 15 ++++++++--- pkg/plugin/visitor/virtual_net.go | 44 +++++++++++++++++++++++++------ pkg/util/net/conn.go | 21 ++++++++++++--- pkg/util/net/websocket.go | 2 +- 8 files changed, 99 insertions(+), 18 deletions(-) diff --git a/Release.md b/Release.md index e237538b..18aa0182 100644 --- a/Release.md +++ b/Release.md @@ -1,3 +1,7 @@ ## Features * HTTPS proxies now support load balancing groups. Multiple HTTPS proxies can be configured with the same `loadBalancer.group` and `loadBalancer.groupKey` to share the same custom domain and distribute traffic across multiple backend services, similar to the existing TCP and HTTP load balancing capabilities. + +## Improvements + +* **VirtualNet**: Implemented intelligent reconnection with exponential backoff. When connection errors occur repeatedly, the reconnect interval increases from 60s to 300s (max), reducing unnecessary reconnection attempts. Normal disconnections still reconnect quickly at 10s intervals. diff --git a/client/visitor/stcp.go b/client/visitor/stcp.go index 124202eb..31f6f174 100644 --- a/client/visitor/stcp.go +++ b/client/visitor/stcp.go @@ -15,6 +15,7 @@ package visitor import ( + "fmt" "io" "net" "strconv" @@ -81,11 +82,22 @@ func (sv *STCPVisitor) internalConnWorker() { func (sv *STCPVisitor) handleConn(userConn net.Conn) { xl := xlog.FromContextSafe(sv.ctx) - defer userConn.Close() + var tunnelErr error + defer func() { + // If there was an error and connection supports CloseWithError, use it + if tunnelErr != nil { + if eConn, ok := userConn.(interface{ CloseWithError(error) error }); ok { + _ = eConn.CloseWithError(tunnelErr) + return + } + } + userConn.Close() + }() xl.Debugf("get a new stcp user connection") visitorConn, err := sv.helper.ConnectServer() if err != nil { + tunnelErr = err return } defer visitorConn.Close() @@ -102,6 +114,7 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) { err = msg.WriteMsg(visitorConn, newVisitorConnMsg) if err != nil { xl.Warnf("send newVisitorConnMsg to server error: %v", err) + tunnelErr = err return } @@ -110,12 +123,14 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) { err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg) if err != nil { xl.Warnf("get newVisitorConnRespMsg error: %v", err) + tunnelErr = err return } _ = visitorConn.SetReadDeadline(time.Time{}) if newVisitorConnRespMsg.Error != "" { xl.Warnf("start new visitor connection error: %s", newVisitorConnRespMsg.Error) + tunnelErr = fmt.Errorf("%s", newVisitorConnRespMsg.Error) return } @@ -125,6 +140,7 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) { remote, err = libio.WithEncryption(remote, []byte(sv.cfg.SecretKey)) if err != nil { xl.Errorf("create encryption stream error: %v", err) + tunnelErr = err return } } diff --git a/client/visitor/visitor.go b/client/visitor/visitor.go index fb2b3e11..87e4f29f 100644 --- a/client/visitor/visitor.go +++ b/client/visitor/visitor.go @@ -71,7 +71,7 @@ func NewVisitor( Name: cfg.GetBaseConfig().Name, Ctx: ctx, VnetController: helper.VNetController(), - HandleConn: func(conn net.Conn) { + SendConnToVisitor: func(conn net.Conn) { _ = baseVisitor.AcceptConn(conn) }, }, diff --git a/client/visitor/xtcp.go b/client/visitor/xtcp.go index 353577db..cdfeb1ab 100644 --- a/client/visitor/xtcp.go +++ b/client/visitor/xtcp.go @@ -162,8 +162,16 @@ func (sv *XTCPVisitor) keepTunnelOpenWorker() { func (sv *XTCPVisitor) handleConn(userConn net.Conn) { xl := xlog.FromContextSafe(sv.ctx) isConnTransferred := false + var tunnelErr error defer func() { if !isConnTransferred { + // If there was an error and connection supports CloseWithError, use it + if tunnelErr != nil { + if eConn, ok := userConn.(interface{ CloseWithError(error) error }); ok { + _ = eConn.CloseWithError(tunnelErr) + return + } + } userConn.Close() } }() @@ -181,6 +189,8 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) { tunnelConn, err := sv.openTunnel(ctx) if err != nil { xl.Errorf("open tunnel error: %v", err) + tunnelErr = err + // no fallback, just return if sv.cfg.FallbackTo == "" { return @@ -200,6 +210,7 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) { muxConnRWCloser, err = libio.WithEncryption(muxConnRWCloser, []byte(sv.cfg.SecretKey)) if err != nil { xl.Errorf("create encryption stream error: %v", err) + tunnelErr = err return } } diff --git a/pkg/plugin/visitor/plugin.go b/pkg/plugin/visitor/plugin.go index 94adce09..27eecc82 100644 --- a/pkg/plugin/visitor/plugin.go +++ b/pkg/plugin/visitor/plugin.go @@ -23,11 +23,20 @@ import ( "github.com/fatedier/frp/pkg/vnet" ) +// PluginContext provides the necessary context and callbacks for visitor plugins. type PluginContext struct { - Name string - Ctx context.Context + // Name is the unique identifier for this visitor, used for logging and routing. + Name string + + // Ctx manages the plugin's lifecycle and carries the logger for structured logging. + Ctx context.Context + + // VnetController manages TUN device routing. May be nil if virtual networking is disabled. VnetController *vnet.Controller - HandleConn func(net.Conn) + + // SendConnToVisitor sends a connection to the visitor's internal processing queue. + // Does not return error; failures are handled by closing the connection. + SendConnToVisitor func(net.Conn) } // Creators is used for create plugins to handle connections. diff --git a/pkg/plugin/visitor/virtual_net.go b/pkg/plugin/visitor/virtual_net.go index f660c0c8..8193ce03 100644 --- a/pkg/plugin/visitor/virtual_net.go +++ b/pkg/plugin/visitor/virtual_net.go @@ -42,6 +42,8 @@ type VirtualNetPlugin struct { controllerConn net.Conn closeSignal chan struct{} + consecutiveErrors int // Tracks consecutive connection errors for exponential backoff + ctx context.Context cancel context.CancelFunc } @@ -98,7 +100,6 @@ func (p *VirtualNetPlugin) Start() { func (p *VirtualNetPlugin) run() { xl := xlog.FromContextSafe(p.ctx) - reconnectDelay := 10 * time.Second for { currentCloseSignal := make(chan struct{}) @@ -121,7 +122,10 @@ func (p *VirtualNetPlugin) run() { p.controllerConn = controllerConn p.mu.Unlock() - pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func() { + // Wrap with CloseNotifyConn which supports both close notification and error recording + var closeErr error + pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func(err error) { + closeErr = err close(currentCloseSignal) // Signal the run loop on close. }) @@ -129,9 +133,9 @@ func (p *VirtualNetPlugin) run() { p.pluginCtx.VnetController.RegisterClientRoute(p.ctx, p.pluginCtx.Name, p.routes, controllerConn) xl.Infof("successfully registered client route for visitor [%s]. Starting connection handler with CloseNotifyConn.", p.pluginCtx.Name) - // Pass the CloseNotifyConn to HandleConn. - // HandleConn is responsible for calling Close() on pluginNotifyConn. - p.pluginCtx.HandleConn(pluginNotifyConn) + // Pass the CloseNotifyConn to the visitor for handling. + // The visitor can call CloseWithError to record the failure reason. + p.pluginCtx.SendConnToVisitor(pluginNotifyConn) // Wait for context cancellation or connection close. select { @@ -140,8 +144,32 @@ func (p *VirtualNetPlugin) run() { p.cleanupControllerConn(xl) return case <-currentCloseSignal: - xl.Infof("detected connection closed via CloseNotifyConn for visitor [%s].", p.pluginCtx.Name) - // HandleConn closed the plugin side. Close the controller side. + // Determine reconnect delay based on error with exponential backoff + var reconnectDelay time.Duration + if closeErr != nil { + p.consecutiveErrors++ + xl.Warnf("connection closed with error for visitor [%s] (consecutive errors: %d): %v", + p.pluginCtx.Name, p.consecutiveErrors, closeErr) + + // Exponential backoff: 60s, 120s, 240s, 300s (capped) + baseDelay := 60 * time.Second + reconnectDelay = baseDelay * time.Duration(1< 300*time.Second { + reconnectDelay = 300 * time.Second + } + } else { + // Reset consecutive errors on successful connection + if p.consecutiveErrors > 0 { + xl.Infof("connection closed normally for visitor [%s], resetting error counter (was %d)", + p.pluginCtx.Name, p.consecutiveErrors) + p.consecutiveErrors = 0 + } else { + xl.Infof("connection closed normally for visitor [%s]", p.pluginCtx.Name) + } + reconnectDelay = 10 * time.Second + } + + // The visitor closed the plugin side. Close the controller side. p.cleanupControllerConn(xl) xl.Infof("waiting %v before attempting reconnection for visitor [%s]...", reconnectDelay, p.pluginCtx.Name) @@ -184,7 +212,7 @@ func (p *VirtualNetPlugin) Close() error { } // Explicitly close the controller side of the pipe. - // This ensures the pipe is broken even if the run loop is stuck or HandleConn hasn't closed its end. + // This ensures the pipe is broken even if the run loop is stuck or the visitor hasn't closed its end. p.cleanupControllerConn(xl) xl.Infof("finished cleaning up connections during close for visitor [%s]", p.pluginCtx.Name) diff --git a/pkg/util/net/conn.go b/pkg/util/net/conn.go index 6946b1c2..914a7bb5 100644 --- a/pkg/util/net/conn.go +++ b/pkg/util/net/conn.go @@ -135,11 +135,11 @@ type CloseNotifyConn struct { // 1 means closed closeFlag int32 - closeFn func() + closeFn func(error) } -// closeFn will be only called once -func WrapCloseNotifyConn(c net.Conn, closeFn func()) net.Conn { +// closeFn will be only called once with the error (nil if Close() was called, non-nil if CloseWithError() was called) +func WrapCloseNotifyConn(c net.Conn, closeFn func(error)) *CloseNotifyConn { return &CloseNotifyConn{ Conn: c, closeFn: closeFn, @@ -151,12 +151,25 @@ func (cc *CloseNotifyConn) Close() (err error) { if pflag == 0 { err = cc.Conn.Close() if cc.closeFn != nil { - cc.closeFn() + cc.closeFn(nil) } } return } +// CloseWithError closes the connection and passes the error to the close callback. +func (cc *CloseNotifyConn) CloseWithError(err error) error { + pflag := atomic.SwapInt32(&cc.closeFlag, 1) + if pflag == 0 { + closeErr := cc.Conn.Close() + if cc.closeFn != nil { + cc.closeFn(err) + } + return closeErr + } + return nil +} + type StatsConn struct { net.Conn diff --git a/pkg/util/net/websocket.go b/pkg/util/net/websocket.go index 263b3a1d..3ca8b332 100644 --- a/pkg/util/net/websocket.go +++ b/pkg/util/net/websocket.go @@ -32,7 +32,7 @@ func NewWebsocketListener(ln net.Listener) (wl *WebsocketListener) { muxer := http.NewServeMux() muxer.Handle(FrpWebsocketPath, websocket.Handler(func(c *websocket.Conn) { notifyCh := make(chan struct{}) - conn := WrapCloseNotifyConn(c, func() { + conn := WrapCloseNotifyConn(c, func(_ error) { close(notifyCh) }) wl.acceptCh <- conn