Compare commits

..

No commits in common. "b41d8f8e4074c4f633fb67d7d31b97db59472674" and "c5a8f6ef4abb43536dafe13027f224b883a3a9d8" have entirely different histories.

22 changed files with 199 additions and 276 deletions

View File

@ -1283,7 +1283,9 @@ frp supports feature gates to enable or disable experimental features. This allo
To enable an experimental feature, add the feature gate to your configuration: To enable an experimental feature, add the feature gate to your configuration:
```toml ```toml
featureGates = { VirtualNet = true } featureGates = {
VirtualNet = true
}
``` ```
### Feature Lifecycle ### Feature Lifecycle

View File

@ -1,3 +1,8 @@
### Bug Fixes ### Notes
* **VirtualNet:** Resolved various issues related to connection handling, TUN device management, and stability in the virtual network feature. * **Feature Gates Introduced:** This version introduces a new experimental mechanism called Feature Gates. This allows users to enable or disable specific experimental features before they become generally available. Feature gates can be configured in the `featureGates` map within the configuration file.
* **VirtualNet Feature Gate:** The first available feature gate is `VirtualNet`, which enables the experimental Virtual Network functionality (currently in Alpha stage).
### Features
* **Virtual Network (VirtualNet):** Introduce experimental virtual network capabilities (Alpha). This allows creating a TUN device managed by frp, enabling Layer 3 connectivity between different clients within the frp network. Requires root/admin privileges and is currently supported on Linux and macOS. Configuration is done via the `virtualNet` section and the `virtual_net` plugin. Enable with feature gate `VirtualNet`. **Note: As an Alpha feature, configuration details may change in future releases.**

View File

@ -165,9 +165,9 @@ func (svr *Service) apiStatus(w http.ResponseWriter, _ *http.Request) {
res StatusResp = make(map[string][]ProxyStatusResp) res StatusResp = make(map[string][]ProxyStatusResp)
) )
log.Infof("http request [/api/status]") log.Infof("Http request [/api/status]")
defer func() { defer func() {
log.Infof("http response [/api/status]") log.Infof("Http response [/api/status]")
buf, _ = json.Marshal(&res) buf, _ = json.Marshal(&res)
_, _ = w.Write(buf) _, _ = w.Write(buf)
}() }()
@ -198,9 +198,9 @@ func (svr *Service) apiStatus(w http.ResponseWriter, _ *http.Request) {
func (svr *Service) apiGetConfig(w http.ResponseWriter, _ *http.Request) { func (svr *Service) apiGetConfig(w http.ResponseWriter, _ *http.Request) {
res := GeneralResponse{Code: 200} res := GeneralResponse{Code: 200}
log.Infof("http get request [/api/config]") log.Infof("Http get request [/api/config]")
defer func() { defer func() {
log.Infof("http get response [/api/config], code [%d]", res.Code) log.Infof("Http get response [/api/config], code [%d]", res.Code)
w.WriteHeader(res.Code) w.WriteHeader(res.Code)
if len(res.Msg) > 0 { if len(res.Msg) > 0 {
_, _ = w.Write([]byte(res.Msg)) _, _ = w.Write([]byte(res.Msg))
@ -228,9 +228,9 @@ func (svr *Service) apiGetConfig(w http.ResponseWriter, _ *http.Request) {
func (svr *Service) apiPutConfig(w http.ResponseWriter, r *http.Request) { func (svr *Service) apiPutConfig(w http.ResponseWriter, r *http.Request) {
res := GeneralResponse{Code: 200} res := GeneralResponse{Code: 200}
log.Infof("http put request [/api/config]") log.Infof("Http put request [/api/config]")
defer func() { defer func() {
log.Infof("http put response [/api/config], code [%d]", res.Code) log.Infof("Http put response [/api/config], code [%d]", res.Code)
w.WriteHeader(res.Code) w.WriteHeader(res.Code)
if len(res.Msg) > 0 { if len(res.Msg) > 0 {
_, _ = w.Write([]byte(res.Msg)) _, _ = w.Write([]byte(res.Msg))

View File

@ -189,7 +189,7 @@ func (ctl *Control) handlePong(m msg.Message) {
inMsg := m.(*msg.Pong) inMsg := m.(*msg.Pong)
if inMsg.Error != "" { if inMsg.Error != "" {
xl.Errorf("pong message contains error: %s", inMsg.Error) xl.Errorf("Pong message contains error: %s", inMsg.Error)
ctl.closeSession() ctl.closeSession()
return return
} }

View File

@ -341,7 +341,7 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
ctl, err := NewControl(svr.ctx, sessionCtx) ctl, err := NewControl(svr.ctx, sessionCtx)
if err != nil { if err != nil {
conn.Close() conn.Close()
xl.Errorf("new control error: %v", err) xl.Errorf("NewControl error: %v", err)
return false, err return false, err
} }
ctl.SetInWorkConnCallback(svr.handleWorkConnCb) ctl.SetInWorkConnCallback(svr.handleWorkConnCb)

View File

@ -49,7 +49,9 @@ type = "virtual_net"
# frpc.toml (client side) # frpc.toml (client side)
serverAddr = "x.x.x.x" serverAddr = "x.x.x.x"
serverPort = 7000 serverPort = 7000
featureGates = { VirtualNet = true } featureGates = {
VirtualNet = true
}
# Configure the virtual network interface # Configure the virtual network interface
virtualNet.address = "100.86.0.2/24" virtualNet.address = "100.86.0.2/24"

View File

@ -18,10 +18,9 @@ package client
import ( import (
"context" "context"
"io"
"sync"
v1 "github.com/fatedier/frp/pkg/config/v1" v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/util/xlog"
) )
func init() { func init() {
@ -31,8 +30,6 @@ func init() {
type VirtualNetPlugin struct { type VirtualNetPlugin struct {
pluginCtx PluginContext pluginCtx PluginContext
opts *v1.VirtualNetPluginOptions opts *v1.VirtualNetPluginOptions
mu sync.Mutex
conns map[io.ReadWriteCloser]struct{}
} }
func NewVirtualNetPlugin(pluginCtx PluginContext, options v1.ClientPluginOptions) (Plugin, error) { func NewVirtualNetPlugin(pluginCtx PluginContext, options v1.ClientPluginOptions) (Plugin, error) {
@ -46,32 +43,19 @@ func NewVirtualNetPlugin(pluginCtx PluginContext, options v1.ClientPluginOptions
} }
func (p *VirtualNetPlugin) Handle(ctx context.Context, connInfo *ConnectionInfo) { func (p *VirtualNetPlugin) Handle(ctx context.Context, connInfo *ConnectionInfo) {
xl := xlog.FromContextSafe(ctx)
// Verify if virtual network controller is available // Verify if virtual network controller is available
if p.pluginCtx.VnetController == nil { if p.pluginCtx.VnetController == nil {
return return
} }
// Add the connection before starting the read loop to avoid race condition // Register the connection with the controller
// where RemoveConn might be called before the connection is added. routeName := p.pluginCtx.Name
p.mu.Lock() err := p.pluginCtx.VnetController.RegisterServerConn(ctx, routeName, connInfo.Conn)
if p.conns == nil { if err != nil {
p.conns = make(map[io.ReadWriteCloser]struct{}) xl.Errorf("virtual net failed to register server connection: %v", err)
} return
p.conns[connInfo.Conn] = struct{}{}
p.mu.Unlock()
// Register the connection with the controller and pass the cleanup function
p.pluginCtx.VnetController.StartServerConnReadLoop(ctx, connInfo.Conn, func() {
p.RemoveConn(connInfo.Conn)
})
}
func (p *VirtualNetPlugin) RemoveConn(conn io.ReadWriteCloser) {
p.mu.Lock()
defer p.mu.Unlock()
// Check if the map exists, as Close might have set it to nil concurrently
if p.conns != nil {
delete(p.conns, conn)
} }
} }
@ -80,13 +64,8 @@ func (p *VirtualNetPlugin) Name() string {
} }
func (p *VirtualNetPlugin) Close() error { func (p *VirtualNetPlugin) Close() error {
p.mu.Lock() if p.pluginCtx.VnetController != nil {
defer p.mu.Unlock() p.pluginCtx.VnetController.UnregisterServerConn(p.pluginCtx.Name)
// Close any remaining connections
for conn := range p.conns {
_ = conn.Close()
} }
p.conns = nil
return nil return nil
} }

View File

@ -60,7 +60,7 @@ func NewVirtualNetPlugin(pluginCtx PluginContext, options v1.VisitorPluginOption
return nil, errors.New("destinationIP is required") return nil, errors.New("destinationIP is required")
} }
// Parse DestinationIP and create a host route. // Parse DestinationIP as a single IP and create a host route
ip := net.ParseIP(opts.DestinationIP) ip := net.ParseIP(opts.DestinationIP)
if ip == nil { if ip == nil {
return nil, fmt.Errorf("invalid destination IP address [%s]", opts.DestinationIP) return nil, fmt.Errorf("invalid destination IP address [%s]", opts.DestinationIP)
@ -91,7 +91,7 @@ func (p *VirtualNetPlugin) Start() {
if len(p.routes) > 0 { if len(p.routes) > 0 {
routeStr = p.routes[0].String() routeStr = p.routes[0].String()
} }
xl.Infof("starting VirtualNetPlugin for visitor [%s], attempting to register routes for %s", p.pluginCtx.Name, routeStr) xl.Infof("Starting VirtualNetPlugin for visitor [%s], attempting to register routes for %s", p.pluginCtx.Name, routeStr)
go p.run() go p.run()
} }
@ -101,8 +101,10 @@ func (p *VirtualNetPlugin) run() {
reconnectDelay := 10 * time.Second reconnectDelay := 10 * time.Second
for { for {
// Create a signal channel for this connection attempt
currentCloseSignal := make(chan struct{}) currentCloseSignal := make(chan struct{})
// Store the signal channel under lock
p.mu.Lock() p.mu.Lock()
p.closeSignal = currentCloseSignal p.closeSignal = currentCloseSignal
p.mu.Unlock() p.mu.Unlock()
@ -110,6 +112,7 @@ func (p *VirtualNetPlugin) run() {
select { select {
case <-p.ctx.Done(): case <-p.ctx.Done():
xl.Infof("VirtualNetPlugin run loop for visitor [%s] stopping (context cancelled before pipe creation).", p.pluginCtx.Name) xl.Infof("VirtualNetPlugin run loop for visitor [%s] stopping (context cancelled before pipe creation).", p.pluginCtx.Name)
// Ensure controllerConn from previous loop is cleaned up if necessary
p.cleanupControllerConn(xl) p.cleanupControllerConn(xl)
return return
default: default:
@ -117,43 +120,65 @@ func (p *VirtualNetPlugin) run() {
controllerConn, pluginConn := net.Pipe() controllerConn, pluginConn := net.Pipe()
// Store controllerConn under lock for cleanup purposes
p.mu.Lock() p.mu.Lock()
p.controllerConn = controllerConn p.controllerConn = controllerConn
p.mu.Unlock() p.mu.Unlock()
// Wrap pluginConn using CloseNotifyConn
pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func() { pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func() {
close(currentCloseSignal) // Signal the run loop on close. close(currentCloseSignal) // Signal the run loop
}) })
xl.Infof("attempting to register client route for visitor [%s]", p.pluginCtx.Name) xl.Infof("Attempting to register client route for visitor [%s]", p.pluginCtx.Name)
p.pluginCtx.VnetController.RegisterClientRoute(p.ctx, p.pluginCtx.Name, p.routes, controllerConn) err := p.pluginCtx.VnetController.RegisterClientRoute(p.ctx, p.pluginCtx.Name, p.routes, controllerConn)
xl.Infof("successfully registered client route for visitor [%s]. Starting connection handler with CloseNotifyConn.", p.pluginCtx.Name) if err != nil {
xl.Errorf("Failed to register client route for visitor [%s]: %v. Retrying after %v", p.pluginCtx.Name, err, reconnectDelay)
p.cleanupPipePair(xl, controllerConn, pluginConn) // Close both ends on registration failure
// Wait before retrying registration, unless context is cancelled
select {
case <-time.After(reconnectDelay):
continue // Retry the loop
case <-p.ctx.Done():
xl.Infof("VirtualNetPlugin registration retry wait interrupted for visitor [%s]", p.pluginCtx.Name)
return // Exit loop if context is cancelled during wait
}
}
xl.Infof("Successfully registered client route for visitor [%s]. Starting connection handler with CloseNotifyConn.", p.pluginCtx.Name)
// Pass the CloseNotifyConn to HandleConn. // Pass the CloseNotifyConn to HandleConn.
// HandleConn is responsible for calling Close() on pluginNotifyConn. // HandleConn is responsible for calling Close() on pluginNotifyConn.
p.pluginCtx.HandleConn(pluginNotifyConn) p.pluginCtx.HandleConn(pluginNotifyConn)
// Wait for context cancellation or connection close. // Wait for either the plugin context to be cancelled or the wrapper's Close() to be called via the signal channel.
select { select {
case <-p.ctx.Done(): case <-p.ctx.Done():
xl.Infof("VirtualNetPlugin run loop stopping for visitor [%s] (context cancelled while waiting).", p.pluginCtx.Name) xl.Infof("VirtualNetPlugin run loop stopping for visitor [%s] (context cancelled while waiting).", p.pluginCtx.Name)
// Context cancelled, ensure controller side is closed if HandleConn didn't close its side yet.
p.cleanupControllerConn(xl) p.cleanupControllerConn(xl)
return return
case <-currentCloseSignal: case <-currentCloseSignal:
xl.Infof("detected connection closed via CloseNotifyConn for visitor [%s].", p.pluginCtx.Name) xl.Infof("Detected connection closed via CloseNotifyConn for visitor [%s].", p.pluginCtx.Name)
// HandleConn closed the plugin side. Close the controller side. // HandleConn closed the plugin side (pluginNotifyConn). The closeFn was called, closing currentCloseSignal.
// We still need to close the controller side.
p.cleanupControllerConn(xl) p.cleanupControllerConn(xl)
xl.Infof("waiting %v before attempting reconnection for visitor [%s]...", reconnectDelay, p.pluginCtx.Name) // Add a delay before attempting to reconnect, respecting context cancellation.
xl.Infof("Waiting %v before attempting reconnection for visitor [%s]...", reconnectDelay, p.pluginCtx.Name)
select { select {
case <-time.After(reconnectDelay): case <-time.After(reconnectDelay):
// Delay completed, loop will continue.
case <-p.ctx.Done(): case <-p.ctx.Done():
xl.Infof("VirtualNetPlugin reconnection delay interrupted for visitor [%s]", p.pluginCtx.Name) xl.Infof("VirtualNetPlugin reconnection delay interrupted for visitor [%s]", p.pluginCtx.Name)
return return // Exit loop if context is cancelled during wait
} }
// Loop will continue to reconnect.
} }
xl.Infof("re-establishing virtual connection for visitor [%s]...", p.pluginCtx.Name) // Loop will restart, context check at the beginning of the loop is sufficient.
xl.Infof("Re-establishing virtual connection for visitor [%s]...", p.pluginCtx.Name)
} }
} }
@ -162,31 +187,46 @@ func (p *VirtualNetPlugin) cleanupControllerConn(xl *xlog.Logger) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
if p.controllerConn != nil { if p.controllerConn != nil {
xl.Debugf("cleaning up controllerConn for visitor [%s]", p.pluginCtx.Name) xl.Debugf("Cleaning up controllerConn for visitor [%s]", p.pluginCtx.Name)
p.controllerConn.Close() p.controllerConn.Close()
p.controllerConn = nil p.controllerConn = nil
} }
// Also clear the closeSignal reference for the completed/cancelled connection attempt
p.closeSignal = nil p.closeSignal = nil
} }
// cleanupPipePair closes both ends of a pipe, used typically when registration fails.
func (p *VirtualNetPlugin) cleanupPipePair(xl *xlog.Logger, controllerConn, pluginConn net.Conn) {
xl.Debugf("Cleaning up pipe pair for visitor [%s] after registration failure", p.pluginCtx.Name)
controllerConn.Close()
pluginConn.Close()
p.mu.Lock()
p.controllerConn = nil // Ensure field is nil if it was briefly set
p.closeSignal = nil // Ensure field is nil if it was briefly set
p.mu.Unlock()
}
// Close initiates the plugin shutdown. // Close initiates the plugin shutdown.
func (p *VirtualNetPlugin) Close() error { func (p *VirtualNetPlugin) Close() error {
xl := xlog.FromContextSafe(p.pluginCtx.Ctx) xl := xlog.FromContextSafe(p.pluginCtx.Ctx) // Use base context for close logging
xl.Infof("closing VirtualNetPlugin for visitor [%s]", p.pluginCtx.Name) xl.Infof("Closing VirtualNetPlugin for visitor [%s]", p.pluginCtx.Name)
// Signal the run loop goroutine to stop. // 1. Signal the run loop goroutine to stop via context cancellation.
p.cancel() p.cancel()
// Unregister the route from the controller. // 2. Unregister the route from the controller.
// This might implicitly cause the VnetController to close its end of the pipe (controllerConn).
if p.pluginCtx.VnetController != nil { if p.pluginCtx.VnetController != nil {
p.pluginCtx.VnetController.UnregisterClientRoute(p.pluginCtx.Name) p.pluginCtx.VnetController.UnregisterClientRoute(p.pluginCtx.Name)
xl.Infof("unregistered client route for visitor [%s]", p.pluginCtx.Name) xl.Infof("Unregistered client route for visitor [%s]", p.pluginCtx.Name)
} else {
xl.Warnf("VnetController is nil during close for visitor [%s], cannot unregister route", p.pluginCtx.Name)
} }
// Explicitly close the controller side of the pipe. // 3. Explicitly close the controller side of the pipe managed by this plugin.
// This ensures the pipe is broken even if the run loop is stuck or HandleConn hasn't closed its end. // This ensures the pipe is broken even if the run loop is stuck or HandleConn hasn't closed its end.
p.cleanupControllerConn(xl) p.cleanupControllerConn(xl)
xl.Infof("finished cleaning up connections during close for visitor [%s]", p.pluginCtx.Name) xl.Infof("Finished cleaning up connections during close for visitor [%s]", p.pluginCtx.Name)
return nil return nil
} }

View File

@ -14,7 +14,7 @@
package version package version
var version = "0.62.1" var version = "0.62.0"
func Full() string { func Full() string {
return version return version

View File

@ -162,7 +162,7 @@ func (rp *HTTPReverseProxy) UnRegister(routeCfg RouteConfig) {
func (rp *HTTPReverseProxy) GetRouteConfig(domain, location, routeByHTTPUser string) *RouteConfig { func (rp *HTTPReverseProxy) GetRouteConfig(domain, location, routeByHTTPUser string) *RouteConfig {
vr, ok := rp.getVhost(domain, location, routeByHTTPUser) vr, ok := rp.getVhost(domain, location, routeByHTTPUser)
if ok { if ok {
log.Debugf("get new http request host [%s] path [%s] httpuser [%s]", domain, location, routeByHTTPUser) log.Debugf("get new HTTP request host [%s] path [%s] httpuser [%s]", domain, location, routeByHTTPUser)
return vr.payload.(*RouteConfig) return vr.payload.(*RouteConfig)
} }
return nil return nil

View File

@ -275,7 +275,7 @@ func (l *Listener) Accept() (net.Conn, error) {
xl := xlog.FromContextSafe(l.ctx) xl := xlog.FromContextSafe(l.ctx)
conn, ok := <-l.accept conn, ok := <-l.accept
if !ok { if !ok {
return nil, fmt.Errorf("listener closed") return nil, fmt.Errorf("Listener closed")
} }
// if rewriteHost func is exist // if rewriteHost func is exist

View File

@ -87,7 +87,7 @@ func (c *Controller) handlePacket(buf []byte) {
case waterutil.IsIPv4(buf): case waterutil.IsIPv4(buf):
header, err := ipv4.ParseHeader(buf) header, err := ipv4.ParseHeader(buf)
if err != nil { if err != nil {
log.Warnf("parse ipv4 header error: %v", err) log.Warnf("parse ipv4 header error:", err)
return return
} }
src = header.Src src = header.Src
@ -98,7 +98,7 @@ func (c *Controller) handlePacket(buf []byte) {
case waterutil.IsIPv6(buf): case waterutil.IsIPv6(buf):
header, err := ipv6.ParseHeader(buf) header, err := ipv6.ParseHeader(buf)
if err != nil { if err != nil {
log.Warnf("parse ipv6 header error: %v", err) log.Warnf("parse ipv6 header error:", err)
return return
} }
src = header.Src src = header.Src
@ -137,12 +137,6 @@ func (c *Controller) Stop() error {
// Client connection read loop // Client connection read loop
func (c *Controller) readLoopClient(ctx context.Context, conn io.ReadWriteCloser) { func (c *Controller) readLoopClient(ctx context.Context, conn io.ReadWriteCloser) {
xl := xlog.FromContextSafe(ctx) xl := xlog.FromContextSafe(ctx)
defer func() {
// Remove the route when read loop ends (connection closed)
c.clientRouter.removeConnRoute(conn)
conn.Close()
}()
for { for {
data, err := ReadMessage(conn) data, err := ReadMessage(conn)
if err != nil { if err != nil {
@ -187,18 +181,8 @@ func (c *Controller) readLoopClient(ctx context.Context, conn io.ReadWriteCloser
} }
// Server connection read loop // Server connection read loop
func (c *Controller) readLoopServer(ctx context.Context, conn io.ReadWriteCloser, onClose func()) { func (c *Controller) readLoopServer(ctx context.Context, conn io.ReadWriteCloser) {
xl := xlog.FromContextSafe(ctx) xl := xlog.FromContextSafe(ctx)
defer func() {
// Clean up all IP mappings associated with this connection when it closes
c.serverRouter.cleanupConnIPs(conn)
// Call the provided callback upon closure
if onClose != nil {
onClose()
}
conn.Close()
}()
for { for {
data, err := ReadMessage(conn) data, err := ReadMessage(conn)
if err != nil { if err != nil {
@ -236,11 +220,27 @@ func (c *Controller) readLoopServer(ctx context.Context, conn io.ReadWriteCloser
} }
} }
// RegisterClientRoute registers a client route (based on destination IP CIDR) // RegisterClientRoute Register client route (based on destination IP CIDR)
// and starts the read loop func (c *Controller) RegisterClientRoute(ctx context.Context, name string, routes []net.IPNet, conn io.ReadWriteCloser) error {
func (c *Controller) RegisterClientRoute(ctx context.Context, name string, routes []net.IPNet, conn io.ReadWriteCloser) { if err := c.clientRouter.addRoute(name, routes, conn); err != nil {
c.clientRouter.addRoute(name, routes, conn) return err
}
go c.readLoopClient(ctx, conn) go c.readLoopClient(ctx, conn)
return nil
}
// RegisterServerConn Register server connection (dynamically associates with source IPs)
func (c *Controller) RegisterServerConn(ctx context.Context, name string, conn io.ReadWriteCloser) error {
if err := c.serverRouter.addConn(name, conn); err != nil {
return err
}
go c.readLoopServer(ctx, conn)
return nil
}
// UnregisterServerConn Remove server connection from routing table
func (c *Controller) UnregisterServerConn(name string) {
c.serverRouter.delConn(name)
} }
// UnregisterClientRoute Remove client route from routing table // UnregisterClientRoute Remove client route from routing table
@ -248,12 +248,6 @@ func (c *Controller) UnregisterClientRoute(name string) {
c.clientRouter.delRoute(name) c.clientRouter.delRoute(name)
} }
// StartServerConnReadLoop starts the read loop for a server connection
// (dynamically associates with source IPs)
func (c *Controller) StartServerConnReadLoop(ctx context.Context, conn io.ReadWriteCloser, onClose func()) {
go c.readLoopServer(ctx, conn, onClose)
}
// ParseRoutes Convert route strings to IPNet objects // ParseRoutes Convert route strings to IPNet objects
func ParseRoutes(routeStrings []string) ([]net.IPNet, error) { func ParseRoutes(routeStrings []string) ([]net.IPNet, error) {
routes := make([]net.IPNet, 0, len(routeStrings)) routes := make([]net.IPNet, 0, len(routeStrings))
@ -279,7 +273,7 @@ func newClientRouter() *clientRouter {
} }
} }
func (r *clientRouter) addRoute(name string, routes []net.IPNet, conn io.ReadWriteCloser) { func (r *clientRouter) addRoute(name string, routes []net.IPNet, conn io.ReadWriteCloser) error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.routes[name] = &routeElement{ r.routes[name] = &routeElement{
@ -287,6 +281,7 @@ func (r *clientRouter) addRoute(name string, routes []net.IPNet, conn io.ReadWri
routes: routes, routes: routes,
conn: conn, conn: conn,
} }
return nil
} }
func (r *clientRouter) findConn(dst net.IP) (io.Writer, error) { func (r *clientRouter) findConn(dst net.IP) (io.Writer, error) {
@ -308,29 +303,32 @@ func (r *clientRouter) delRoute(name string) {
delete(r.routes, name) delete(r.routes, name)
} }
func (r *clientRouter) removeConnRoute(conn io.Writer) { // Server router (based on source IP routing)
r.mu.Lock()
defer r.mu.Unlock()
for name, re := range r.routes {
if re.conn == conn {
delete(r.routes, name)
return
}
}
}
// Server router (based solely on source IP routing)
type serverRouter struct { type serverRouter struct {
namedConns map[string]io.ReadWriteCloser // Name to connection mapping
srcIPConns map[string]io.Writer // Source IP string to connection mapping srcIPConns map[string]io.Writer // Source IP string to connection mapping
mu sync.RWMutex mu sync.RWMutex
} }
func newServerRouter() *serverRouter { func newServerRouter() *serverRouter {
return &serverRouter{ return &serverRouter{
namedConns: make(map[string]io.ReadWriteCloser),
srcIPConns: make(map[string]io.Writer), srcIPConns: make(map[string]io.Writer),
} }
} }
func (r *serverRouter) addConn(name string, conn io.ReadWriteCloser) error {
r.mu.Lock()
original, ok := r.namedConns[name]
r.namedConns[name] = conn
r.mu.Unlock()
if ok {
// Close the original connection if it exists
_ = original.Close()
}
return nil
}
func (r *serverRouter) findConnBySrc(src net.IP) (io.Writer, error) { func (r *serverRouter) findConnBySrc(src net.IP) (io.Writer, error) {
r.mu.RLock() r.mu.RLock()
defer r.mu.RUnlock() defer r.mu.RUnlock()
@ -342,41 +340,17 @@ func (r *serverRouter) findConnBySrc(src net.IP) (io.Writer, error) {
} }
func (r *serverRouter) registerSrcIP(src net.IP, conn io.Writer) { func (r *serverRouter) registerSrcIP(src net.IP, conn io.Writer) {
key := src.String()
r.mu.RLock()
existingConn, ok := r.srcIPConns[key]
r.mu.RUnlock()
// If the entry exists and the connection is the same, no need to do anything.
if ok && existingConn == conn {
return
}
// Acquire write lock to update the map.
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.srcIPConns[src.String()] = conn
// Double-check after acquiring the write lock to handle potential race conditions.
existingConn, ok = r.srcIPConns[key]
if ok && existingConn == conn {
return
}
r.srcIPConns[key] = conn
} }
// cleanupConnIPs removes all IP mappings associated with the specified connection func (r *serverRouter) delConn(name string) {
func (r *serverRouter) cleanupConnIPs(conn io.Writer) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
delete(r.namedConns, name)
// Find and delete all IP mappings pointing to this connection // Note: We don't delete mappings from srcIPConns because we don't know which source IPs are associated with this connection
for ip, mappedConn := range r.srcIPConns { // This might cause dangling references, but they will be overwritten on new connections or restart
if mappedConn == conn {
delete(r.srcIPConns, ip)
}
}
} }
type routeElement struct { type routeElement struct {

View File

@ -33,7 +33,7 @@ func ReadMessage(r io.Reader) ([]byte, error) {
var length uint32 var length uint32
err := binary.Read(r, binary.LittleEndian, &length) err := binary.Read(r, binary.LittleEndian, &length)
if err != nil { if err != nil {
return nil, fmt.Errorf("read message length error: %w", err) return nil, fmt.Errorf("read message length error: %v", err)
} }
// Check length to prevent DoS // Check length to prevent DoS
@ -48,7 +48,7 @@ func ReadMessage(r io.Reader) ([]byte, error) {
data := make([]byte, length) data := make([]byte, length)
_, err = io.ReadFull(r, data) _, err = io.ReadFull(r, data)
if err != nil { if err != nil {
return nil, fmt.Errorf("read message data error: %w", err) return nil, fmt.Errorf("read message data error: %v", err)
} }
return data, nil return data, nil
@ -68,13 +68,13 @@ func WriteMessage(w io.Writer, data []byte) error {
// Write length // Write length
err := binary.Write(w, binary.LittleEndian, length) err := binary.Write(w, binary.LittleEndian, length)
if err != nil { if err != nil {
return fmt.Errorf("write message length error: %w", err) return fmt.Errorf("write message length error: %v", err)
} }
// Write message data // Write message data
_, err = w.Write(data) _, err = w.Write(data)
if err != nil { if err != nil {
return fmt.Errorf("write message data error: %w", err) return fmt.Errorf("write message data error: %v", err)
} }
return nil return nil

View File

@ -24,7 +24,6 @@ import (
const ( const (
offset = 16 offset = 16
defaultPacketSize = 1420
) )
type TunDevice interface { type TunDevice interface {
@ -36,45 +35,20 @@ func OpenTun(ctx context.Context, addr string) (TunDevice, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &tunDeviceWrapper{dev: td}, nil
mtu, err := td.MTU()
if err != nil {
mtu = defaultPacketSize
}
bufferSize := max(mtu, defaultPacketSize)
batchSize := td.BatchSize()
device := &tunDeviceWrapper{
dev: td,
bufferSize: bufferSize,
readBuffers: make([][]byte, batchSize),
sizeBuffer: make([]int, batchSize),
}
for i := range device.readBuffers {
device.readBuffers[i] = make([]byte, offset+bufferSize)
}
return device, nil
} }
type tunDeviceWrapper struct { type tunDeviceWrapper struct {
dev tun.Device dev tun.Device
bufferSize int
readBuffers [][]byte
packetBuffers [][]byte
sizeBuffer []int
} }
func (d *tunDeviceWrapper) Read(p []byte) (int, error) { func (d *tunDeviceWrapper) Read(p []byte) (int, error) {
if len(d.packetBuffers) > 0 { buf := pool.GetBuf(len(p) + offset)
n := copy(p, d.packetBuffers[0]) defer pool.PutBuf(buf)
d.packetBuffers = d.packetBuffers[1:]
return n, nil
}
n, err := d.dev.Read(d.readBuffers, d.sizeBuffer, offset) sz := make([]int, 1)
n, err := d.dev.Read([][]byte{buf}, sz, offset)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -82,26 +56,20 @@ func (d *tunDeviceWrapper) Read(p []byte) (int, error) {
return 0, io.EOF return 0, io.EOF
} }
for i := range n { dataSize := sz[0]
if d.sizeBuffer[i] <= 0 { if dataSize > len(p) {
continue dataSize = len(p)
} }
d.packetBuffers = append(d.packetBuffers, d.readBuffers[i][offset:offset+d.sizeBuffer[i]]) copy(p, buf[offset:offset+dataSize])
}
dataSize := copy(p, d.packetBuffers[0])
d.packetBuffers = d.packetBuffers[1:]
return dataSize, nil return dataSize, nil
} }
func (d *tunDeviceWrapper) Write(p []byte) (int, error) { func (d *tunDeviceWrapper) Write(p []byte) (int, error) {
buf := pool.GetBuf(offset + d.bufferSize) buf := pool.GetBuf(len(p) + offset)
defer pool.PutBuf(buf) defer pool.PutBuf(buf)
n := copy(buf[offset:], p) copy(buf[offset:], p)
_, err := d.dev.Write([][]byte{buf[:offset+n]}, offset) return d.dev.Write([][]byte{buf}, offset)
return n, err
} }
func (d *tunDeviceWrapper) Close() error { func (d *tunDeviceWrapper) Close() error {

View File

@ -16,44 +16,35 @@ package vnet
import ( import (
"context" "context"
"crypto/sha256"
"encoding/hex"
"fmt" "fmt"
"net" "net"
"strconv"
"strings"
"github.com/vishvananda/netlink" "github.com/vishvananda/netlink"
"golang.zx2c4.com/wireguard/tun" "golang.zx2c4.com/wireguard/tun"
) )
const ( const (
baseTunName = "utun" defaultTunName = "utun"
defaultMTU = 1420 defaultMTU = 1420
) )
func openTun(_ context.Context, addr string) (tun.Device, error) { func openTun(_ context.Context, addr string) (tun.Device, error) {
name, err := findNextTunName(baseTunName) dev, err := tun.CreateTUN(defaultTunName, defaultMTU)
if err != nil {
name = getFallbackTunName(baseTunName, addr)
}
tunDevice, err := tun.CreateTUN(name, defaultMTU)
if err != nil {
return nil, fmt.Errorf("failed to create TUN device '%s': %w", name, err)
}
actualName, err := tunDevice.Name()
if err != nil { if err != nil {
return nil, err return nil, err
} }
ifn, err := net.InterfaceByName(actualName) name, err := dev.Name()
if err != nil { if err != nil {
return nil, err return nil, err
} }
link, err := netlink.LinkByName(actualName) ifn, err := net.InterfaceByName(name)
if err != nil {
return nil, err
}
link, err := netlink.LinkByName(name)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -78,34 +69,7 @@ func openTun(_ context.Context, addr string) (tun.Device, error) {
if err = addRoutes(ifn, cidr); err != nil { if err = addRoutes(ifn, cidr); err != nil {
return nil, err return nil, err
} }
return tunDevice, nil return dev, nil
}
func findNextTunName(basename string) (string, error) {
interfaces, err := net.Interfaces()
if err != nil {
return "", fmt.Errorf("failed to get network interfaces: %w", err)
}
maxSuffix := -1
for _, iface := range interfaces {
name := iface.Name
if strings.HasPrefix(name, basename) {
suffix := name[len(basename):]
if suffix == "" {
continue
}
numSuffix, err := strconv.Atoi(suffix)
if err == nil && numSuffix > maxSuffix {
maxSuffix = numSuffix
}
}
}
nextSuffix := maxSuffix + 1
name := fmt.Sprintf("%s%d", basename, nextSuffix)
return name, nil
} }
func addRoutes(ifn *net.Interface, cidr *net.IPNet) error { func addRoutes(ifn *net.Interface, cidr *net.IPNet) error {
@ -118,14 +82,3 @@ func addRoutes(ifn *net.Interface, cidr *net.IPNet) error {
} }
return nil return nil
} }
// getFallbackTunName generates a deterministic fallback TUN device name
// based on the base name and the provided address string using a hash.
func getFallbackTunName(baseName, addr string) string {
hasher := sha256.New()
hasher.Write([]byte(addr))
hashBytes := hasher.Sum(nil)
// Use first 4 bytes -> 8 hex chars for brevity, respecting IFNAMSIZ limit.
shortHash := hex.EncodeToString(hashBytes[:4])
return fmt.Sprintf("%s%s", baseName, shortHash)
}

View File

@ -224,7 +224,7 @@ func (ctl *Control) Close() error {
func (ctl *Control) Replaced(newCtl *Control) { func (ctl *Control) Replaced(newCtl *Control) {
xl := ctl.xl xl := ctl.xl
xl.Infof("replaced by client [%s]", newCtl.runID) xl.Infof("Replaced by client [%s]", newCtl.runID)
ctl.runID = "" ctl.runID = ""
ctl.conn.Close() ctl.conn.Close()
} }

View File

@ -97,14 +97,14 @@ func (svr *Service) healthz(w http.ResponseWriter, _ *http.Request) {
func (svr *Service) apiServerInfo(w http.ResponseWriter, r *http.Request) { func (svr *Service) apiServerInfo(w http.ResponseWriter, r *http.Request) {
res := GeneralResponse{Code: 200} res := GeneralResponse{Code: 200}
defer func() { defer func() {
log.Infof("http response [%s]: code [%d]", r.URL.Path, res.Code) log.Infof("Http response [%s]: code [%d]", r.URL.Path, res.Code)
w.WriteHeader(res.Code) w.WriteHeader(res.Code)
if len(res.Msg) > 0 { if len(res.Msg) > 0 {
_, _ = w.Write([]byte(res.Msg)) _, _ = w.Write([]byte(res.Msg))
} }
}() }()
log.Infof("http request: [%s]", r.URL.Path) log.Infof("Http request: [%s]", r.URL.Path)
serverStats := mem.StatsCollector.GetServer() serverStats := mem.StatsCollector.GetServer()
svrResp := serverInfoResp{ svrResp := serverInfoResp{
Version: version.Full(), Version: version.Full(),
@ -218,13 +218,13 @@ func (svr *Service) apiProxyByType(w http.ResponseWriter, r *http.Request) {
proxyType := params["type"] proxyType := params["type"]
defer func() { defer func() {
log.Infof("http response [%s]: code [%d]", r.URL.Path, res.Code) log.Infof("Http response [%s]: code [%d]", r.URL.Path, res.Code)
w.WriteHeader(res.Code) w.WriteHeader(res.Code)
if len(res.Msg) > 0 { if len(res.Msg) > 0 {
_, _ = w.Write([]byte(res.Msg)) _, _ = w.Write([]byte(res.Msg))
} }
}() }()
log.Infof("http request: [%s]", r.URL.Path) log.Infof("Http request: [%s]", r.URL.Path)
proxyInfoResp := GetProxyInfoResp{} proxyInfoResp := GetProxyInfoResp{}
proxyInfoResp.Proxies = svr.getProxyStatsByType(proxyType) proxyInfoResp.Proxies = svr.getProxyStatsByType(proxyType)
@ -290,13 +290,13 @@ func (svr *Service) apiProxyByTypeAndName(w http.ResponseWriter, r *http.Request
name := params["name"] name := params["name"]
defer func() { defer func() {
log.Infof("http response [%s]: code [%d]", r.URL.Path, res.Code) log.Infof("Http response [%s]: code [%d]", r.URL.Path, res.Code)
w.WriteHeader(res.Code) w.WriteHeader(res.Code)
if len(res.Msg) > 0 { if len(res.Msg) > 0 {
_, _ = w.Write([]byte(res.Msg)) _, _ = w.Write([]byte(res.Msg))
} }
}() }()
log.Infof("http request: [%s]", r.URL.Path) log.Infof("Http request: [%s]", r.URL.Path)
var proxyStatsResp GetProxyStatsResp var proxyStatsResp GetProxyStatsResp
proxyStatsResp, res.Code, res.Msg = svr.getProxyStatsByTypeAndName(proxyType, name) proxyStatsResp, res.Code, res.Msg = svr.getProxyStatsByTypeAndName(proxyType, name)
@ -358,13 +358,13 @@ func (svr *Service) apiProxyTraffic(w http.ResponseWriter, r *http.Request) {
name := params["name"] name := params["name"]
defer func() { defer func() {
log.Infof("http response [%s]: code [%d]", r.URL.Path, res.Code) log.Infof("Http response [%s]: code [%d]", r.URL.Path, res.Code)
w.WriteHeader(res.Code) w.WriteHeader(res.Code)
if len(res.Msg) > 0 { if len(res.Msg) > 0 {
_, _ = w.Write([]byte(res.Msg)) _, _ = w.Write([]byte(res.Msg))
} }
}() }()
log.Infof("http request: [%s]", r.URL.Path) log.Infof("Http request: [%s]", r.URL.Path)
trafficResp := GetProxyTrafficResp{} trafficResp := GetProxyTrafficResp{}
trafficResp.Name = name trafficResp.Name = name
@ -386,9 +386,9 @@ func (svr *Service) apiProxyTraffic(w http.ResponseWriter, r *http.Request) {
func (svr *Service) deleteProxies(w http.ResponseWriter, r *http.Request) { func (svr *Service) deleteProxies(w http.ResponseWriter, r *http.Request) {
res := GeneralResponse{Code: 200} res := GeneralResponse{Code: 200}
log.Infof("http request: [%s]", r.URL.Path) log.Infof("Http request: [%s]", r.URL.Path)
defer func() { defer func() {
log.Infof("http response [%s]: code [%d]", r.URL.Path, res.Code) log.Infof("Http response [%s]: code [%d]", r.URL.Path, res.Code)
w.WriteHeader(res.Code) w.WriteHeader(res.Code)
if len(res.Msg) > 0 { if len(res.Msg) > 0 {
_, _ = w.Write([]byte(res.Msg)) _, _ = w.Write([]byte(res.Msg))

View File

@ -427,7 +427,7 @@ func (svr *Service) handleConnection(ctx context.Context, conn net.Conn, interna
_ = conn.SetReadDeadline(time.Now().Add(connReadTimeout)) _ = conn.SetReadDeadline(time.Now().Add(connReadTimeout))
if rawMsg, err = msg.ReadMsg(conn); err != nil { if rawMsg, err = msg.ReadMsg(conn); err != nil {
log.Tracef("failed to read message: %v", err) log.Tracef("Failed to read message: %v", err)
conn.Close() conn.Close()
return return
} }
@ -475,7 +475,7 @@ func (svr *Service) handleConnection(ctx context.Context, conn net.Conn, interna
}) })
} }
default: default:
log.Warnf("error message type for the new connection [%s]", conn.RemoteAddr().String()) log.Warnf("Error message type for the new connection [%s]", conn.RemoteAddr().String())
conn.Close() conn.Close()
} }
} }
@ -488,7 +488,7 @@ func (svr *Service) HandleListener(l net.Listener, internal bool) {
for { for {
c, err := l.Accept() c, err := l.Accept()
if err != nil { if err != nil {
log.Warnf("listener for incoming connections from client closed") log.Warnf("Listener for incoming connections from client closed")
return return
} }
// inject xlog object into net.Conn context // inject xlog object into net.Conn context
@ -504,7 +504,7 @@ func (svr *Service) HandleListener(l net.Listener, internal bool) {
var isTLS, custom bool var isTLS, custom bool
c, isTLS, custom, err = netpkg.CheckAndEnableTLSServerConnWithTimeout(c, svr.tlsConfig, forceTLS, connReadTimeout) c, isTLS, custom, err = netpkg.CheckAndEnableTLSServerConnWithTimeout(c, svr.tlsConfig, forceTLS, connReadTimeout)
if err != nil { if err != nil {
log.Warnf("checkAndEnableTLSServerConnWithTimeout error: %v", err) log.Warnf("CheckAndEnableTLSServerConnWithTimeout error: %v", err)
originConn.Close() originConn.Close()
continue continue
} }
@ -520,7 +520,7 @@ func (svr *Service) HandleListener(l net.Listener, internal bool) {
fmuxCfg.MaxStreamWindowSize = 6 * 1024 * 1024 fmuxCfg.MaxStreamWindowSize = 6 * 1024 * 1024
session, err := fmux.Server(frpConn, fmuxCfg) session, err := fmux.Server(frpConn, fmuxCfg)
if err != nil { if err != nil {
log.Warnf("failed to create mux connection: %v", err) log.Warnf("Failed to create mux connection: %v", err)
frpConn.Close() frpConn.Close()
return return
} }
@ -528,7 +528,7 @@ func (svr *Service) HandleListener(l net.Listener, internal bool) {
for { for {
stream, err := session.AcceptStream() stream, err := session.AcceptStream()
if err != nil { if err != nil {
log.Debugf("accept new mux stream error: %v", err) log.Debugf("Accept new mux stream error: %v", err)
session.Close() session.Close()
return return
} }
@ -546,7 +546,7 @@ func (svr *Service) HandleQUICListener(l *quic.Listener) {
for { for {
c, err := l.Accept(context.Background()) c, err := l.Accept(context.Background())
if err != nil { if err != nil {
log.Warnf("quic listener for incoming connections from client closed") log.Warnf("QUICListener for incoming connections from client closed")
return return
} }
// Start a new goroutine to handle connection. // Start a new goroutine to handle connection.
@ -554,7 +554,7 @@ func (svr *Service) HandleQUICListener(l *quic.Listener) {
for { for {
stream, err := frpConn.AcceptStream(context.Background()) stream, err := frpConn.AcceptStream(context.Background())
if err != nil { if err != nil {
log.Debugf("accept new quic mux stream error: %v", err) log.Debugf("Accept new quic mux stream error: %v", err)
_ = frpConn.CloseWithError(0, "") _ = frpConn.CloseWithError(0, "")
return return
} }
@ -620,7 +620,7 @@ func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn)
xl := netpkg.NewLogFromConn(workConn) xl := netpkg.NewLogFromConn(workConn)
ctl, exist := svr.ctlManager.GetByID(newMsg.RunID) ctl, exist := svr.ctlManager.GetByID(newMsg.RunID)
if !exist { if !exist {
xl.Warnf("no client control found for run id [%s]", newMsg.RunID) xl.Warnf("No client control found for run id [%s]", newMsg.RunID)
return fmt.Errorf("no client control found for run id [%s]", newMsg.RunID) return fmt.Errorf("no client control found for run id [%s]", newMsg.RunID)
} }
// server plugin hook // server plugin hook

View File

@ -38,7 +38,7 @@ func RunE2ETests(t *testing.T) {
// Randomize specs as well as suites // Randomize specs as well as suites
suiteConfig.RandomizeAllSpecs = true suiteConfig.RandomizeAllSpecs = true
log.Infof("starting e2e run %q on Ginkgo node %d of total %d", log.Infof("Starting e2e run %q on Ginkgo node %d of total %d",
framework.RunID, suiteConfig.ParallelProcess, suiteConfig.ParallelTotal) framework.RunID, suiteConfig.ParallelProcess, suiteConfig.ParallelTotal)
ginkgo.RunSpecs(t, "frp e2e suite", suiteConfig, reporterConfig) ginkgo.RunSpecs(t, "frp e2e suite", suiteConfig, reporterConfig)
} }

View File

@ -20,7 +20,7 @@ func ExpectResponseCode(code int) EnsureFunc {
if resp.Code == code { if resp.Code == code {
return true return true
} }
flog.Warnf("expect code %d, but got %d", code, resp.Code) flog.Warnf("Expect code %d, but got %d", code, resp.Code)
return false return false
} }
} }
@ -111,14 +111,14 @@ func (e *RequestExpect) Ensure(fns ...EnsureFunc) {
if len(fns) == 0 { if len(fns) == 0 {
if !bytes.Equal(e.expectResp, ret.Content) { if !bytes.Equal(e.expectResp, ret.Content) {
flog.Tracef("response info: %+v", ret) flog.Tracef("Response info: %+v", ret)
} }
ExpectEqualValuesWithOffset(1, string(ret.Content), string(e.expectResp), e.explain...) ExpectEqualValuesWithOffset(1, string(ret.Content), string(e.expectResp), e.explain...)
} else { } else {
for _, fn := range fns { for _, fn := range fns {
ok := fn(ret) ok := fn(ret)
if !ok { if !ok {
flog.Tracef("response info: %+v", ret) flog.Tracef("Response info: %+v", ret)
} }
ExpectTrueWithOffset(1, ok, e.explain...) ExpectTrueWithOffset(1, ok, e.explain...)
} }

View File

@ -93,7 +93,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
f.RunProcesses([]string{serverConf}, []string{clientConf}) f.RunProcesses([]string{serverConf}, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure(func(resp *request.Response) bool { framework.NewRequestExpect(f).Port(remotePort).Ensure(func(resp *request.Response) bool {
log.Tracef("proxy protocol get SourceAddr: %s", string(resp.Content)) log.Tracef("ProxyProtocol get SourceAddr: %s", string(resp.Content))
addr, err := net.ResolveTCPAddr("tcp", string(resp.Content)) addr, err := net.ResolveTCPAddr("tcp", string(resp.Content))
if err != nil { if err != nil {
return false return false
@ -142,7 +142,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
r.HTTP().HTTPHost("normal.example.com") r.HTTP().HTTPHost("normal.example.com")
}).Ensure(framework.ExpectResponseCode(404)) }).Ensure(framework.ExpectResponseCode(404))
log.Tracef("proxy protocol get SourceAddr: %s", srcAddrRecord) log.Tracef("ProxyProtocol get SourceAddr: %s", srcAddrRecord)
addr, err := net.ResolveTCPAddr("tcp", srcAddrRecord) addr, err := net.ResolveTCPAddr("tcp", srcAddrRecord)
framework.ExpectNoError(err, srcAddrRecord) framework.ExpectNoError(err, srcAddrRecord)
framework.ExpectEqualValues("127.0.0.1", addr.IP.String()) framework.ExpectEqualValues("127.0.0.1", addr.IP.String())

View File

@ -215,7 +215,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
f.RunProcesses([]string{serverConf}, []string{clientConf}) f.RunProcesses([]string{serverConf}, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure(func(resp *request.Response) bool { framework.NewRequestExpect(f).Port(remotePort).Ensure(func(resp *request.Response) bool {
log.Tracef("proxy protocol get SourceAddr: %s", string(resp.Content)) log.Tracef("ProxyProtocol get SourceAddr: %s", string(resp.Content))
addr, err := net.ResolveTCPAddr("tcp", string(resp.Content)) addr, err := net.ResolveTCPAddr("tcp", string(resp.Content))
if err != nil { if err != nil {
return false return false
@ -265,7 +265,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
r.HTTP().HTTPHost("normal.example.com") r.HTTP().HTTPHost("normal.example.com")
}).Ensure(framework.ExpectResponseCode(404)) }).Ensure(framework.ExpectResponseCode(404))
log.Tracef("proxy protocol get SourceAddr: %s", srcAddrRecord) log.Tracef("ProxyProtocol get SourceAddr: %s", srcAddrRecord)
addr, err := net.ResolveTCPAddr("tcp", srcAddrRecord) addr, err := net.ResolveTCPAddr("tcp", srcAddrRecord)
framework.ExpectNoError(err, srcAddrRecord) framework.ExpectNoError(err, srcAddrRecord)
framework.ExpectEqualValues("127.0.0.1", addr.IP.String()) framework.ExpectEqualValues("127.0.0.1", addr.IP.String())