From 0c60dd08ea5f5b0828c8b3f1af9fe8d252073589 Mon Sep 17 00:00:00 2001 From: fatedier Date: Thu, 3 Apr 2025 17:40:21 +0800 Subject: [PATCH] virtual-net: initial --- client/control.go | 8 +- client/proxy/proxy.go | 29 +- client/proxy/proxy_manager.go | 6 +- client/proxy/proxy_wrapper.go | 7 +- client/service.go | 32 +- client/visitor/stcp.go | 2 + client/visitor/visitor.go | 34 +- client/visitor/visitor_manager.go | 14 +- client/visitor/xtcp.go | 8 +- go.mod | 5 + go.sum | 16 + pkg/config/v1/client.go | 12 +- pkg/config/v1/{plugin.go => proxy_plugin.go} | 57 +-- pkg/config/v1/visitor.go | 3 + pkg/config/v1/visitor_plugin.go | 86 +++++ pkg/plugin/client/http2http.go | 8 +- pkg/plugin/client/http2https.go | 8 +- pkg/plugin/client/http_proxy.go | 6 +- pkg/plugin/client/https2http.go | 12 +- pkg/plugin/client/https2https.go | 12 +- pkg/plugin/client/plugin.go | 24 +- pkg/plugin/client/socks5.go | 9 +- pkg/plugin/client/static_file.go | 8 +- pkg/plugin/client/tls2raw.go | 7 +- pkg/plugin/client/unix_domain_socket.go | 11 +- pkg/plugin/client/virtual_net.go | 71 ++++ pkg/plugin/visitor/plugin.go | 58 +++ pkg/plugin/visitor/virtual_net.go | 234 ++++++++++++ pkg/vnet/README.md | 94 +++++ pkg/vnet/controller.go | 363 +++++++++++++++++++ pkg/vnet/examples/client.go.bak | 122 +++++++ pkg/vnet/examples/server.go.bak | 114 ++++++ pkg/vnet/tun.go | 73 ++++ pkg/vnet/tun_darwin.go | 84 +++++ pkg/vnet/tun_linux.go | 78 ++++ 35 files changed, 1606 insertions(+), 109 deletions(-) rename pkg/config/v1/{plugin.go => proxy_plugin.go} (95%) create mode 100644 pkg/config/v1/visitor_plugin.go create mode 100644 pkg/plugin/client/virtual_net.go create mode 100644 pkg/plugin/visitor/plugin.go create mode 100644 pkg/plugin/visitor/virtual_net.go create mode 100644 pkg/vnet/README.md create mode 100644 pkg/vnet/controller.go create mode 100644 pkg/vnet/examples/client.go.bak create mode 100644 pkg/vnet/examples/server.go.bak create mode 100644 pkg/vnet/tun.go create mode 100644 pkg/vnet/tun_darwin.go create mode 100644 pkg/vnet/tun_linux.go diff --git a/client/control.go b/client/control.go index 3e20c312..157b4aef 100644 --- a/client/control.go +++ b/client/control.go @@ -29,6 +29,7 @@ import ( netpkg "github.com/fatedier/frp/pkg/util/net" "github.com/fatedier/frp/pkg/util/wait" "github.com/fatedier/frp/pkg/util/xlog" + "github.com/fatedier/frp/pkg/vnet" ) type SessionContext struct { @@ -46,6 +47,8 @@ type SessionContext struct { AuthSetter auth.Setter // Connector is used to create new connections, which could be real TCP connections or virtual streams. Connector Connector + // Virtual net controller + VnetController *vnet.Controller } type Control struct { @@ -99,8 +102,9 @@ func NewControl(ctx context.Context, sessionCtx *SessionContext) (*Control, erro ctl.registerMsgHandlers() ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel()) - ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter) - ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common, ctl.connectServer, ctl.msgTransporter) + ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter, sessionCtx.VnetController) + ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common, + ctl.connectServer, ctl.msgTransporter, sessionCtx.VnetController) return ctl, nil } diff --git a/client/proxy/proxy.go b/client/proxy/proxy.go index 5cb5ccf2..dd3169f7 100644 --- a/client/proxy/proxy.go +++ b/client/proxy/proxy.go @@ -36,6 +36,7 @@ import ( "github.com/fatedier/frp/pkg/transport" "github.com/fatedier/frp/pkg/util/limit" "github.com/fatedier/frp/pkg/util/xlog" + "github.com/fatedier/frp/pkg/vnet" ) var proxyFactoryRegistry = map[reflect.Type]func(*BaseProxy, v1.ProxyConfigurer) Proxy{} @@ -58,6 +59,7 @@ func NewProxy( pxyConf v1.ProxyConfigurer, clientCfg *v1.ClientCommonConfig, msgTransporter transport.MessageTransporter, + vnetController *vnet.Controller, ) (pxy Proxy) { var limiter *rate.Limiter limitBytes := pxyConf.GetBaseConfig().Transport.BandwidthLimit.Bytes() @@ -70,6 +72,7 @@ func NewProxy( clientCfg: clientCfg, limiter: limiter, msgTransporter: msgTransporter, + vnetController: vnetController, xl: xlog.FromContextSafe(ctx), ctx: ctx, } @@ -85,6 +88,7 @@ type BaseProxy struct { baseCfg *v1.ProxyBaseConfig clientCfg *v1.ClientCommonConfig msgTransporter transport.MessageTransporter + vnetController *vnet.Controller limiter *rate.Limiter // proxyPlugin is used to handle connections instead of dialing to local service. // It's only validate for TCP protocol now. @@ -98,7 +102,10 @@ type BaseProxy struct { func (pxy *BaseProxy) Run() error { if pxy.baseCfg.Plugin.Type != "" { - p, err := plugin.Create(pxy.baseCfg.Plugin.Type, pxy.baseCfg.Plugin.ClientPluginOptions) + p, err := plugin.Create(pxy.baseCfg.Plugin.Type, plugin.ProxyContext{ + Name: pxy.baseCfg.Name, + VnetController: pxy.vnetController, + }, pxy.baseCfg.Plugin.ClientPluginOptions) if err != nil { return err } @@ -157,22 +164,22 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor } // check if we need to send proxy protocol info - var extraInfo plugin.ExtraInfo + var connInfo plugin.ConnectionInfo if m.SrcAddr != "" && m.SrcPort != 0 { if m.DstAddr == "" { m.DstAddr = "127.0.0.1" } srcAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.SrcAddr, strconv.Itoa(int(m.SrcPort)))) dstAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.DstAddr, strconv.Itoa(int(m.DstPort)))) - extraInfo.SrcAddr = srcAddr - extraInfo.DstAddr = dstAddr + connInfo.SrcAddr = srcAddr + connInfo.DstAddr = dstAddr } if baseCfg.Transport.ProxyProtocolVersion != "" && m.SrcAddr != "" && m.SrcPort != 0 { h := &pp.Header{ Command: pp.PROXY, - SourceAddr: extraInfo.SrcAddr, - DestinationAddr: extraInfo.DstAddr, + SourceAddr: connInfo.SrcAddr, + DestinationAddr: connInfo.DstAddr, } if strings.Contains(m.SrcAddr, ".") { @@ -186,13 +193,15 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor } else if baseCfg.Transport.ProxyProtocolVersion == "v2" { h.Version = 2 } - extraInfo.ProxyProtocolHeader = h + connInfo.ProxyProtocolHeader = h } + connInfo.Conn = remote + connInfo.UnderlyingConn = workConn if pxy.proxyPlugin != nil { // if plugin is set, let plugin handle connection first xl.Debugf("handle by plugin: %s", pxy.proxyPlugin.Name()) - pxy.proxyPlugin.Handle(pxy.ctx, remote, workConn, &extraInfo) + pxy.proxyPlugin.Handle(pxy.ctx, &connInfo) xl.Debugf("handle by plugin finished") return } @@ -210,8 +219,8 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor xl.Debugf("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(), localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String()) - if extraInfo.ProxyProtocolHeader != nil { - if _, err := extraInfo.ProxyProtocolHeader.WriteTo(localConn); err != nil { + if connInfo.ProxyProtocolHeader != nil { + if _, err := connInfo.ProxyProtocolHeader.WriteTo(localConn); err != nil { workConn.Close() xl.Errorf("write proxy protocol header to local conn error: %v", err) return diff --git a/client/proxy/proxy_manager.go b/client/proxy/proxy_manager.go index d42aedc0..ea5cc553 100644 --- a/client/proxy/proxy_manager.go +++ b/client/proxy/proxy_manager.go @@ -28,12 +28,14 @@ import ( "github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/transport" "github.com/fatedier/frp/pkg/util/xlog" + "github.com/fatedier/frp/pkg/vnet" ) type Manager struct { proxies map[string]*Wrapper msgTransporter transport.MessageTransporter inWorkConnCallback func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool + vnetController *vnet.Controller closed bool mu sync.RWMutex @@ -47,10 +49,12 @@ func NewManager( ctx context.Context, clientCfg *v1.ClientCommonConfig, msgTransporter transport.MessageTransporter, + vnetController *vnet.Controller, ) *Manager { return &Manager{ proxies: make(map[string]*Wrapper), msgTransporter: msgTransporter, + vnetController: vnetController, closed: false, clientCfg: clientCfg, ctx: ctx, @@ -159,7 +163,7 @@ func (pm *Manager) UpdateAll(proxyCfgs []v1.ProxyConfigurer) { for _, cfg := range proxyCfgs { name := cfg.GetBaseConfig().Name if _, ok := pm.proxies[name]; !ok { - pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter) + pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter, pm.vnetController) if pm.inWorkConnCallback != nil { pxy.SetInWorkConnCallback(pm.inWorkConnCallback) } diff --git a/client/proxy/proxy_wrapper.go b/client/proxy/proxy_wrapper.go index 95048f29..f3f17e2b 100644 --- a/client/proxy/proxy_wrapper.go +++ b/client/proxy/proxy_wrapper.go @@ -31,6 +31,7 @@ import ( "github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/transport" "github.com/fatedier/frp/pkg/util/xlog" + "github.com/fatedier/frp/pkg/vnet" ) const ( @@ -73,6 +74,8 @@ type Wrapper struct { handler event.Handler msgTransporter transport.MessageTransporter + // vnet controller + vnetController *vnet.Controller health uint32 lastSendStartMsg time.Time @@ -91,6 +94,7 @@ func NewWrapper( clientCfg *v1.ClientCommonConfig, eventHandler event.Handler, msgTransporter transport.MessageTransporter, + vnetController *vnet.Controller, ) *Wrapper { baseInfo := cfg.GetBaseConfig() xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(baseInfo.Name) @@ -105,6 +109,7 @@ func NewWrapper( healthNotifyCh: make(chan struct{}), handler: eventHandler, msgTransporter: msgTransporter, + vnetController: vnetController, xl: xl, ctx: xlog.NewContext(ctx, xl), } @@ -117,7 +122,7 @@ func NewWrapper( xl.Tracef("enable health check monitor") } - pw.pxy = NewProxy(pw.ctx, pw.Cfg, clientCfg, pw.msgTransporter) + pw.pxy = NewProxy(pw.ctx, pw.Cfg, clientCfg, pw.msgTransporter, pw.vnetController) return pw } diff --git a/client/service.go b/client/service.go index b06706a6..57eb4835 100644 --- a/client/service.go +++ b/client/service.go @@ -37,6 +37,7 @@ import ( "github.com/fatedier/frp/pkg/util/version" "github.com/fatedier/frp/pkg/util/wait" "github.com/fatedier/frp/pkg/util/xlog" + "github.com/fatedier/frp/pkg/vnet" ) func init() { @@ -110,6 +111,8 @@ type Service struct { // web server for admin UI and apis webServer *httppkg.Server + vnetController *vnet.Controller + cfgMu sync.RWMutex common *v1.ClientCommonConfig proxyCfgs []v1.ProxyConfigurer @@ -156,6 +159,9 @@ func NewService(options ServiceOptions) (*Service, error) { if webServer != nil { webServer.RouteRegister(s.registerRouteHandlers) } + if options.Common.VirtualNet.Address != "" { + s.vnetController = vnet.NewController(options.Common.VirtualNet) + } return s, nil } @@ -169,6 +175,19 @@ func (svr *Service) Run(ctx context.Context) error { netpkg.SetDefaultDNSAddress(svr.common.DNSServer) } + if svr.vnetController != nil { + if err := svr.vnetController.Init(); err != nil { + log.Errorf("init virtual network controller error: %v", err) + return err + } + go func() { + log.Infof("virtual network controller start...") + if err := svr.vnetController.Run(); err != nil { + log.Warnf("virtual network controller exit with error: %v", err) + } + }() + } + if svr.webServer != nil { go func() { log.Infof("admin server listen on %s", svr.webServer.Address()) @@ -311,12 +330,13 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE connEncrypted = false } sessionCtx := &SessionContext{ - Common: svr.common, - RunID: svr.runID, - Conn: conn, - ConnEncrypted: connEncrypted, - AuthSetter: svr.authSetter, - Connector: connector, + Common: svr.common, + RunID: svr.runID, + Conn: conn, + ConnEncrypted: connEncrypted, + AuthSetter: svr.authSetter, + Connector: connector, + VnetController: svr.vnetController, } ctl, err := NewControl(svr.ctx, sessionCtx) if err != nil { diff --git a/client/visitor/stcp.go b/client/visitor/stcp.go index b26faf52..a7049a9f 100644 --- a/client/visitor/stcp.go +++ b/client/visitor/stcp.go @@ -44,6 +44,8 @@ func (sv *STCPVisitor) Run() (err error) { } go sv.internalConnWorker() + + sv.plugin.Start() return } diff --git a/client/visitor/visitor.go b/client/visitor/visitor.go index d520f735..ffdcf3c0 100644 --- a/client/visitor/visitor.go +++ b/client/visitor/visitor.go @@ -20,9 +20,11 @@ import ( "sync" v1 "github.com/fatedier/frp/pkg/config/v1" + plugin "github.com/fatedier/frp/pkg/plugin/visitor" "github.com/fatedier/frp/pkg/transport" netpkg "github.com/fatedier/frp/pkg/util/net" "github.com/fatedier/frp/pkg/util/xlog" + "github.com/fatedier/frp/pkg/vnet" ) // Helper wraps some functions for visitor to use. @@ -34,6 +36,8 @@ type Helper interface { // MsgTransporter returns the message transporter that is used to send and receive messages // to the frp server through the controller. MsgTransporter() transport.MessageTransporter + // VNetController returns the vnet controller that is used to manage the virtual network. + VNetController() *vnet.Controller // RunID returns the run id of current controller. RunID() string } @@ -50,14 +54,34 @@ func NewVisitor( cfg v1.VisitorConfigurer, clientCfg *v1.ClientCommonConfig, helper Helper, -) (visitor Visitor) { +) (Visitor, error) { xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(cfg.GetBaseConfig().Name) + ctx = xlog.NewContext(ctx, xl) + var visitor Visitor baseVisitor := BaseVisitor{ clientCfg: clientCfg, helper: helper, - ctx: xlog.NewContext(ctx, xl), + ctx: ctx, internalLn: netpkg.NewInternalListener(), } + if cfg.GetBaseConfig().Plugin.Type != "" { + p, err := plugin.Create( + cfg.GetBaseConfig().Plugin.Type, + plugin.VisitorContext{ + Name: cfg.GetBaseConfig().Name, + Ctx: ctx, + VnetController: helper.VNetController(), + HandleConn: func(conn net.Conn) { + _ = baseVisitor.AcceptConn(conn) + }, + }, + cfg.GetBaseConfig().Plugin.VisitorPluginOptions, + ) + if err != nil { + return nil, err + } + baseVisitor.plugin = p + } switch cfg := cfg.(type) { case *v1.STCPVisitorConfig: visitor = &STCPVisitor{ @@ -77,7 +101,7 @@ func NewVisitor( checkCloseCh: make(chan struct{}), } } - return + return visitor, nil } type BaseVisitor struct { @@ -85,6 +109,7 @@ type BaseVisitor struct { helper Helper l net.Listener internalLn *netpkg.InternalListener + plugin plugin.Plugin mu sync.RWMutex ctx context.Context @@ -101,4 +126,7 @@ func (v *BaseVisitor) Close() { if v.internalLn != nil { v.internalLn.Close() } + if v.plugin != nil { + v.plugin.Close() + } } diff --git a/client/visitor/visitor_manager.go b/client/visitor/visitor_manager.go index 6ff65dab..b3539c69 100644 --- a/client/visitor/visitor_manager.go +++ b/client/visitor/visitor_manager.go @@ -27,6 +27,7 @@ import ( v1 "github.com/fatedier/frp/pkg/config/v1" "github.com/fatedier/frp/pkg/transport" "github.com/fatedier/frp/pkg/util/xlog" + "github.com/fatedier/frp/pkg/vnet" ) type Manager struct { @@ -50,6 +51,7 @@ func NewManager( clientCfg *v1.ClientCommonConfig, connectServer func() (net.Conn, error), msgTransporter transport.MessageTransporter, + vnetController *vnet.Controller, ) *Manager { m := &Manager{ clientCfg: clientCfg, @@ -62,6 +64,7 @@ func NewManager( m.helper = &visitorHelperImpl{ connectServerFn: connectServer, msgTransporter: msgTransporter, + vnetController: vnetController, transferConnFn: m.TransferConn, runID: runID, } @@ -112,7 +115,11 @@ func (vm *Manager) Close() { func (vm *Manager) startVisitor(cfg v1.VisitorConfigurer) (err error) { xl := xlog.FromContextSafe(vm.ctx) name := cfg.GetBaseConfig().Name - visitor := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.helper) + visitor, err := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.helper) + if err != nil { + xl.Warnf("new visitor error: %v", err) + return + } err = visitor.Run() if err != nil { xl.Warnf("start error: %v", err) @@ -187,6 +194,7 @@ func (vm *Manager) TransferConn(name string, conn net.Conn) error { type visitorHelperImpl struct { connectServerFn func() (net.Conn, error) msgTransporter transport.MessageTransporter + vnetController *vnet.Controller transferConnFn func(name string, conn net.Conn) error runID string } @@ -203,6 +211,10 @@ func (v *visitorHelperImpl) MsgTransporter() transport.MessageTransporter { return v.msgTransporter } +func (v *visitorHelperImpl) VNetController() *vnet.Controller { + return v.vnetController +} + func (v *visitorHelperImpl) RunID() string { return v.runID } diff --git a/client/visitor/xtcp.go b/client/visitor/xtcp.go index a1efd72b..2fcd08e7 100644 --- a/client/visitor/xtcp.go +++ b/client/visitor/xtcp.go @@ -73,6 +73,8 @@ func (sv *XTCPVisitor) Run() (err error) { sv.retryLimiter = rate.NewLimiter(rate.Every(time.Hour/time.Duration(sv.cfg.MaxRetriesAnHour)), sv.cfg.MaxRetriesAnHour) go sv.keepTunnelOpenWorker() } + + sv.plugin.Start() return } @@ -157,9 +159,9 @@ func (sv *XTCPVisitor) keepTunnelOpenWorker() { func (sv *XTCPVisitor) handleConn(userConn net.Conn) { xl := xlog.FromContextSafe(sv.ctx) - isConnTrasfered := false + isConnTransfered := false defer func() { - if !isConnTrasfered { + if !isConnTransfered { userConn.Close() } }() @@ -187,7 +189,7 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) { xl.Errorf("transfer connection to visitor %s error: %v", sv.cfg.FallbackTo, err) return } - isConnTrasfered = true + isConnTransfered = true return } diff --git a/go.mod b/go.mod index ecf20211..2ce124fa 100644 --- a/go.mod +++ b/go.mod @@ -19,16 +19,19 @@ require ( github.com/quic-go/quic-go v0.48.2 github.com/rodaine/table v1.2.0 github.com/samber/lo v1.47.0 + github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 github.com/tidwall/gjson v1.17.1 + github.com/vishvananda/netlink v1.3.0 github.com/xtaci/kcp-go/v5 v5.6.13 golang.org/x/crypto v0.30.0 golang.org/x/net v0.32.0 golang.org/x/oauth2 v0.16.0 golang.org/x/sync v0.10.0 golang.org/x/time v0.5.0 + golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173 gopkg.in/ini.v1 v1.67.0 k8s.io/apimachinery v0.28.8 k8s.io/client-go v0.28.8 @@ -64,12 +67,14 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/tjfoc/gmsm v1.4.1 // indirect + github.com/vishvananda/netns v0.0.4 // indirect go.uber.org/mock v0.5.0 // indirect golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d // indirect golang.org/x/mod v0.22.0 // indirect golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect golang.org/x/tools v0.28.0 // indirect + golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/protobuf v1.34.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index e1053561..698d84b3 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= +github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -117,6 +119,8 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= +github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8 h1:TG/diQgUe0pntT/2D9tmUCz4VNwm9MfrtPr0SU2qSX8= +github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8/go.mod h1:P5HUIBuIWKbyjl083/loAegFkfbFNx5i2qEP4CNbm7E= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -143,6 +147,10 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tjfoc/gmsm v1.4.1 h1:aMe1GlZb+0bLjn+cKTPEvvn9oUEBlJitaZiiBwsbgho= github.com/tjfoc/gmsm v1.4.1/go.mod h1:j4INPkHWMrhJb38G+J6W4Tw0AbuN8Thu3PbdVYhVcTE= +github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQdrZk= +github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs= +github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= +github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= github.com/xtaci/kcp-go/v5 v5.6.13 h1:FEjtz9+D4p8t2x4WjciGt/jsIuhlWjjgPCCWjrVR4Hk= github.com/xtaci/kcp-go/v5 v5.6.13/go.mod h1:75S1AKYYzNUSXIv30h+jPKJYZUwqpfvLshu63nCNSOM= github.com/xtaci/lossyconn v0.0.0-20200209145036-adba10fffc37 h1:EWU6Pktpas0n8lLQwDsRyZfmkPeRbdgPtW609es+/9E= @@ -201,9 +209,11 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -238,6 +248,10 @@ golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 h1:B82qJJgjvYKsXS9jeunTOisW56dUokqW/FOteYJJ/yg= +golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2/go.mod h1:deeaetjYA+DHMHg+sMSMI58GrEteJUUzzw7en6TJQcI= +golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173 h1:/jFs0duh4rdb8uIfPMv78iAJGcPKDeqAFnaLBropIC4= +golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173/go.mod h1:tkCQ4FQXmpAgYVh++1cq16/dH4QJtmvpRv19DWGAHSA= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= @@ -268,6 +282,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gvisor.dev/gvisor v0.0.0-20230927004350-cbd86285d259 h1:TbRPT0HtzFP3Cno1zZo7yPzEEnfu8EjLfl6IU9VfqkQ= +gvisor.dev/gvisor v0.0.0-20230927004350-cbd86285d259/go.mod h1:AVgIgHMwK63XvmAzWG9vLQ41YnVHN0du0tEC46fI7yY= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= k8s.io/apimachinery v0.28.8 h1:hi/nrxHwk4QLV+W/SHve1bypTE59HCDorLY1stBIxKQ= diff --git a/pkg/config/v1/client.go b/pkg/config/v1/client.go index d43ec1bc..dc89e3c2 100644 --- a/pkg/config/v1/client.go +++ b/pkg/config/v1/client.go @@ -58,9 +58,10 @@ type ClientCommonConfig struct { // set. Start []string `json:"start,omitempty"` - Log LogConfig `json:"log,omitempty"` - WebServer WebServerConfig `json:"webServer,omitempty"` - Transport ClientTransportConfig `json:"transport,omitempty"` + Log LogConfig `json:"log,omitempty"` + WebServer WebServerConfig `json:"webServer,omitempty"` + Transport ClientTransportConfig `json:"transport,omitempty"` + VirtualNet VirtualNetConfig `json:"virtualNet,omitempty"` // UDPPacketSize specifies the udp packet size // By default, this value is 1500 @@ -204,3 +205,8 @@ type AuthOIDCClientConfig struct { // this field will be transfer to map[string][]string in OIDC token generator. AdditionalEndpointParams map[string]string `json:"additionalEndpointParams,omitempty"` } + +type VirtualNetConfig struct { + Address string `json:"address,omitempty"` + Routes []string `json:"routes,omitempty"` +} diff --git a/pkg/config/v1/plugin.go b/pkg/config/v1/proxy_plugin.go similarity index 95% rename from pkg/config/v1/plugin.go rename to pkg/config/v1/proxy_plugin.go index cdf3cf26..3b565f0a 100644 --- a/pkg/config/v1/plugin.go +++ b/pkg/config/v1/proxy_plugin.go @@ -26,6 +26,32 @@ import ( "github.com/fatedier/frp/pkg/util/util" ) +const ( + PluginHTTP2HTTPS = "http2https" + PluginHTTPProxy = "http_proxy" + PluginHTTPS2HTTP = "https2http" + PluginHTTPS2HTTPS = "https2https" + PluginHTTP2HTTP = "http2http" + PluginSocks5 = "socks5" + PluginStaticFile = "static_file" + PluginUnixDomainSocket = "unix_domain_socket" + PluginTLS2Raw = "tls2raw" + PluginVirtualNet = "virtual_net" +) + +var clientPluginOptionsTypeMap = map[string]reflect.Type{ + PluginHTTP2HTTPS: reflect.TypeOf(HTTP2HTTPSPluginOptions{}), + PluginHTTPProxy: reflect.TypeOf(HTTPProxyPluginOptions{}), + PluginHTTPS2HTTP: reflect.TypeOf(HTTPS2HTTPPluginOptions{}), + PluginHTTPS2HTTPS: reflect.TypeOf(HTTPS2HTTPSPluginOptions{}), + PluginHTTP2HTTP: reflect.TypeOf(HTTP2HTTPPluginOptions{}), + PluginSocks5: reflect.TypeOf(Socks5PluginOptions{}), + PluginStaticFile: reflect.TypeOf(StaticFilePluginOptions{}), + PluginUnixDomainSocket: reflect.TypeOf(UnixDomainSocketPluginOptions{}), + PluginTLS2Raw: reflect.TypeOf(TLS2RawPluginOptions{}), + PluginVirtualNet: reflect.TypeOf(VirtualNetPluginOptions{}), +} + type ClientPluginOptions interface { Complete() } @@ -74,30 +100,6 @@ func (c *TypedClientPluginOptions) MarshalJSON() ([]byte, error) { return json.Marshal(c.ClientPluginOptions) } -const ( - PluginHTTP2HTTPS = "http2https" - PluginHTTPProxy = "http_proxy" - PluginHTTPS2HTTP = "https2http" - PluginHTTPS2HTTPS = "https2https" - PluginHTTP2HTTP = "http2http" - PluginSocks5 = "socks5" - PluginStaticFile = "static_file" - PluginUnixDomainSocket = "unix_domain_socket" - PluginTLS2Raw = "tls2raw" -) - -var clientPluginOptionsTypeMap = map[string]reflect.Type{ - PluginHTTP2HTTPS: reflect.TypeOf(HTTP2HTTPSPluginOptions{}), - PluginHTTPProxy: reflect.TypeOf(HTTPProxyPluginOptions{}), - PluginHTTPS2HTTP: reflect.TypeOf(HTTPS2HTTPPluginOptions{}), - PluginHTTPS2HTTPS: reflect.TypeOf(HTTPS2HTTPSPluginOptions{}), - PluginHTTP2HTTP: reflect.TypeOf(HTTP2HTTPPluginOptions{}), - PluginSocks5: reflect.TypeOf(Socks5PluginOptions{}), - PluginStaticFile: reflect.TypeOf(StaticFilePluginOptions{}), - PluginUnixDomainSocket: reflect.TypeOf(UnixDomainSocketPluginOptions{}), - PluginTLS2Raw: reflect.TypeOf(TLS2RawPluginOptions{}), -} - type HTTP2HTTPSPluginOptions struct { Type string `json:"type,omitempty"` LocalAddr string `json:"localAddr,omitempty"` @@ -185,3 +187,10 @@ type TLS2RawPluginOptions struct { } func (o *TLS2RawPluginOptions) Complete() {} + +type VirtualNetPluginOptions struct { + Type string `json:"type,omitempty"` + AllowedIPs []string `json:"allowedIPs,omitempty"` +} + +func (o *VirtualNetPluginOptions) Complete() {} diff --git a/pkg/config/v1/visitor.go b/pkg/config/v1/visitor.go index 51fe88a6..f00391c3 100644 --- a/pkg/config/v1/visitor.go +++ b/pkg/config/v1/visitor.go @@ -44,6 +44,9 @@ type VisitorBaseConfig struct { // It can be less than 0, it means don't bind to the port and only receive connections redirected from // other visitors. (This is not supported for SUDP now) BindPort int `json:"bindPort,omitempty"` + + // Plugin specifies what plugin should be used. + Plugin TypedVisitorPluginOptions `json:"plugin,omitempty"` } func (c *VisitorBaseConfig) GetBaseConfig() *VisitorBaseConfig { diff --git a/pkg/config/v1/visitor_plugin.go b/pkg/config/v1/visitor_plugin.go new file mode 100644 index 00000000..5a4909bd --- /dev/null +++ b/pkg/config/v1/visitor_plugin.go @@ -0,0 +1,86 @@ +// Copyright 2025 The frp Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1 + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "reflect" +) + +const ( + VisitorPluginVirtualNet = "virtual_net" +) + +var visitorPluginOptionsTypeMap = map[string]reflect.Type{ + VisitorPluginVirtualNet: reflect.TypeOf(VirtualNetVisitorPluginOptions{}), +} + +type VisitorPluginOptions interface { + Complete() +} + +type TypedVisitorPluginOptions struct { + Type string `json:"type"` + VisitorPluginOptions +} + +func (c *TypedVisitorPluginOptions) UnmarshalJSON(b []byte) error { + if len(b) == 4 && string(b) == "null" { + return nil + } + + typeStruct := struct { + Type string `json:"type"` + }{} + if err := json.Unmarshal(b, &typeStruct); err != nil { + return err + } + + c.Type = typeStruct.Type + if c.Type == "" { + return errors.New("visitor plugin type is empty") + } + + v, ok := visitorPluginOptionsTypeMap[typeStruct.Type] + if !ok { + return fmt.Errorf("unknown visitor plugin type: %s", typeStruct.Type) + } + options := reflect.New(v).Interface().(VisitorPluginOptions) + + decoder := json.NewDecoder(bytes.NewBuffer(b)) + if DisallowUnknownFields { + decoder.DisallowUnknownFields() + } + + if err := decoder.Decode(options); err != nil { + return fmt.Errorf("unmarshal VisitorPluginOptions error: %v", err) + } + c.VisitorPluginOptions = options + return nil +} + +func (c *TypedVisitorPluginOptions) MarshalJSON() ([]byte, error) { + return json.Marshal(c.VisitorPluginOptions) +} + +type VirtualNetVisitorPluginOptions struct { + Type string `json:"type"` + DestinationIP string `json:"destinationIP"` +} + +func (o *VirtualNetVisitorPluginOptions) Complete() {} diff --git a/pkg/plugin/client/http2http.go b/pkg/plugin/client/http2http.go index d7c4c7e4..5fb53ec2 100644 --- a/pkg/plugin/client/http2http.go +++ b/pkg/plugin/client/http2http.go @@ -18,9 +18,7 @@ package plugin import ( "context" - "io" stdlog "log" - "net" "net/http" "net/http/httputil" @@ -42,7 +40,7 @@ type HTTP2HTTPPlugin struct { s *http.Server } -func NewHTTP2HTTPPlugin(options v1.ClientPluginOptions) (Plugin, error) { +func NewHTTP2HTTPPlugin(_ ProxyContext, options v1.ClientPluginOptions) (Plugin, error) { opts := options.(*v1.HTTP2HTTPPluginOptions) listener := NewProxyListener() @@ -80,8 +78,8 @@ func NewHTTP2HTTPPlugin(options v1.ClientPluginOptions) (Plugin, error) { return p, nil } -func (p *HTTP2HTTPPlugin) Handle(_ context.Context, conn io.ReadWriteCloser, realConn net.Conn, _ *ExtraInfo) { - wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn) +func (p *HTTP2HTTPPlugin) Handle(_ context.Context, connInfo *ConnectionInfo) { + wrapConn := netpkg.WrapReadWriteCloserToConn(connInfo.Conn, connInfo.UnderlyingConn) _ = p.l.PutConn(wrapConn) } diff --git a/pkg/plugin/client/http2https.go b/pkg/plugin/client/http2https.go index 66f90989..edd3e0d5 100644 --- a/pkg/plugin/client/http2https.go +++ b/pkg/plugin/client/http2https.go @@ -19,9 +19,7 @@ package plugin import ( "context" "crypto/tls" - "io" stdlog "log" - "net" "net/http" "net/http/httputil" @@ -43,7 +41,7 @@ type HTTP2HTTPSPlugin struct { s *http.Server } -func NewHTTP2HTTPSPlugin(options v1.ClientPluginOptions) (Plugin, error) { +func NewHTTP2HTTPSPlugin(_ ProxyContext, options v1.ClientPluginOptions) (Plugin, error) { opts := options.(*v1.HTTP2HTTPSPluginOptions) listener := NewProxyListener() @@ -89,8 +87,8 @@ func NewHTTP2HTTPSPlugin(options v1.ClientPluginOptions) (Plugin, error) { return p, nil } -func (p *HTTP2HTTPSPlugin) Handle(_ context.Context, conn io.ReadWriteCloser, realConn net.Conn, _ *ExtraInfo) { - wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn) +func (p *HTTP2HTTPSPlugin) Handle(_ context.Context, connInfo *ConnectionInfo) { + wrapConn := netpkg.WrapReadWriteCloserToConn(connInfo.Conn, connInfo.UnderlyingConn) _ = p.l.PutConn(wrapConn) } diff --git a/pkg/plugin/client/http_proxy.go b/pkg/plugin/client/http_proxy.go index b7491bd1..37d9c07f 100644 --- a/pkg/plugin/client/http_proxy.go +++ b/pkg/plugin/client/http_proxy.go @@ -45,7 +45,7 @@ type HTTPProxy struct { s *http.Server } -func NewHTTPProxyPlugin(options v1.ClientPluginOptions) (Plugin, error) { +func NewHTTPProxyPlugin(_ ProxyContext, options v1.ClientPluginOptions) (Plugin, error) { opts := options.(*v1.HTTPProxyPluginOptions) listener := NewProxyListener() @@ -69,8 +69,8 @@ func (hp *HTTPProxy) Name() string { return v1.PluginHTTPProxy } -func (hp *HTTPProxy) Handle(_ context.Context, conn io.ReadWriteCloser, realConn net.Conn, _ *ExtraInfo) { - wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn) +func (hp *HTTPProxy) Handle(_ context.Context, connInfo *ConnectionInfo) { + wrapConn := netpkg.WrapReadWriteCloserToConn(connInfo.Conn, connInfo.UnderlyingConn) sc, rd := libnet.NewSharedConn(wrapConn) firstBytes := make([]byte, 7) diff --git a/pkg/plugin/client/https2http.go b/pkg/plugin/client/https2http.go index 9632a6fb..e3314af0 100644 --- a/pkg/plugin/client/https2http.go +++ b/pkg/plugin/client/https2http.go @@ -20,9 +20,7 @@ import ( "context" "crypto/tls" "fmt" - "io" stdlog "log" - "net" "net/http" "net/http/httputil" "time" @@ -48,7 +46,7 @@ type HTTPS2HTTPPlugin struct { s *http.Server } -func NewHTTPS2HTTPPlugin(options v1.ClientPluginOptions) (Plugin, error) { +func NewHTTPS2HTTPPlugin(_ ProxyContext, options v1.ClientPluginOptions) (Plugin, error) { opts := options.(*v1.HTTPS2HTTPPluginOptions) listener := NewProxyListener() @@ -106,10 +104,10 @@ func NewHTTPS2HTTPPlugin(options v1.ClientPluginOptions) (Plugin, error) { return p, nil } -func (p *HTTPS2HTTPPlugin) Handle(_ context.Context, conn io.ReadWriteCloser, realConn net.Conn, extra *ExtraInfo) { - wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn) - if extra.SrcAddr != nil { - wrapConn.SetRemoteAddr(extra.SrcAddr) +func (p *HTTPS2HTTPPlugin) Handle(_ context.Context, connInfo *ConnectionInfo) { + wrapConn := netpkg.WrapReadWriteCloserToConn(connInfo.Conn, connInfo.UnderlyingConn) + if connInfo.SrcAddr != nil { + wrapConn.SetRemoteAddr(connInfo.SrcAddr) } _ = p.l.PutConn(wrapConn) } diff --git a/pkg/plugin/client/https2https.go b/pkg/plugin/client/https2https.go index 8121e094..86ef7036 100644 --- a/pkg/plugin/client/https2https.go +++ b/pkg/plugin/client/https2https.go @@ -20,9 +20,7 @@ import ( "context" "crypto/tls" "fmt" - "io" stdlog "log" - "net" "net/http" "net/http/httputil" "time" @@ -48,7 +46,7 @@ type HTTPS2HTTPSPlugin struct { s *http.Server } -func NewHTTPS2HTTPSPlugin(options v1.ClientPluginOptions) (Plugin, error) { +func NewHTTPS2HTTPSPlugin(_ ProxyContext, options v1.ClientPluginOptions) (Plugin, error) { opts := options.(*v1.HTTPS2HTTPSPluginOptions) listener := NewProxyListener() @@ -112,10 +110,10 @@ func NewHTTPS2HTTPSPlugin(options v1.ClientPluginOptions) (Plugin, error) { return p, nil } -func (p *HTTPS2HTTPSPlugin) Handle(_ context.Context, conn io.ReadWriteCloser, realConn net.Conn, extra *ExtraInfo) { - wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn) - if extra.SrcAddr != nil { - wrapConn.SetRemoteAddr(extra.SrcAddr) +func (p *HTTPS2HTTPSPlugin) Handle(_ context.Context, connInfo *ConnectionInfo) { + wrapConn := netpkg.WrapReadWriteCloserToConn(connInfo.Conn, connInfo.UnderlyingConn) + if connInfo.SrcAddr != nil { + wrapConn.SetRemoteAddr(connInfo.SrcAddr) } _ = p.l.PutConn(wrapConn) } diff --git a/pkg/plugin/client/plugin.go b/pkg/plugin/client/plugin.go index 3dce8592..c347c7ba 100644 --- a/pkg/plugin/client/plugin.go +++ b/pkg/plugin/client/plugin.go @@ -25,13 +25,18 @@ import ( pp "github.com/pires/go-proxyproto" v1 "github.com/fatedier/frp/pkg/config/v1" + "github.com/fatedier/frp/pkg/vnet" ) +type ProxyContext struct { + Name string + VnetController *vnet.Controller +} + // Creators is used for create plugins to handle connections. var creators = make(map[string]CreatorFn) -// params has prefix "plugin_" -type CreatorFn func(options v1.ClientPluginOptions) (Plugin, error) +type CreatorFn func(pxyCtx ProxyContext, options v1.ClientPluginOptions) (Plugin, error) func Register(name string, fn CreatorFn) { if _, exist := creators[name]; exist { @@ -40,16 +45,19 @@ func Register(name string, fn CreatorFn) { creators[name] = fn } -func Create(name string, options v1.ClientPluginOptions) (p Plugin, err error) { - if fn, ok := creators[name]; ok { - p, err = fn(options) +func Create(pluginName string, pxyCtx ProxyContext, options v1.ClientPluginOptions) (p Plugin, err error) { + if fn, ok := creators[pluginName]; ok { + p, err = fn(pxyCtx, options) } else { - err = fmt.Errorf("plugin [%s] is not registered", name) + err = fmt.Errorf("plugin [%s] is not registered", pluginName) } return } -type ExtraInfo struct { +type ConnectionInfo struct { + Conn io.ReadWriteCloser + UnderlyingConn net.Conn + ProxyProtocolHeader *pp.Header SrcAddr net.Addr DstAddr net.Addr @@ -58,7 +66,7 @@ type ExtraInfo struct { type Plugin interface { Name() string - Handle(ctx context.Context, conn io.ReadWriteCloser, realConn net.Conn, extra *ExtraInfo) + Handle(ctx context.Context, connInfo *ConnectionInfo) Close() error } diff --git a/pkg/plugin/client/socks5.go b/pkg/plugin/client/socks5.go index 7478ebe1..8042b9db 100644 --- a/pkg/plugin/client/socks5.go +++ b/pkg/plugin/client/socks5.go @@ -20,7 +20,6 @@ import ( "context" "io" "log" - "net" gosocks5 "github.com/armon/go-socks5" @@ -36,7 +35,7 @@ type Socks5Plugin struct { Server *gosocks5.Server } -func NewSocks5Plugin(options v1.ClientPluginOptions) (p Plugin, err error) { +func NewSocks5Plugin(_ ProxyContext, options v1.ClientPluginOptions) (p Plugin, err error) { opts := options.(*v1.Socks5PluginOptions) cfg := &gosocks5.Config{ @@ -51,9 +50,9 @@ func NewSocks5Plugin(options v1.ClientPluginOptions) (p Plugin, err error) { return } -func (sp *Socks5Plugin) Handle(_ context.Context, conn io.ReadWriteCloser, realConn net.Conn, _ *ExtraInfo) { - defer conn.Close() - wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn) +func (sp *Socks5Plugin) Handle(_ context.Context, connInfo *ConnectionInfo) { + defer connInfo.Conn.Close() + wrapConn := netpkg.WrapReadWriteCloserToConn(connInfo.Conn, connInfo.UnderlyingConn) _ = sp.Server.ServeConn(wrapConn) } diff --git a/pkg/plugin/client/static_file.go b/pkg/plugin/client/static_file.go index 02cb9930..f3009c61 100644 --- a/pkg/plugin/client/static_file.go +++ b/pkg/plugin/client/static_file.go @@ -18,8 +18,6 @@ package plugin import ( "context" - "io" - "net" "net/http" "time" @@ -40,7 +38,7 @@ type StaticFilePlugin struct { s *http.Server } -func NewStaticFilePlugin(options v1.ClientPluginOptions) (Plugin, error) { +func NewStaticFilePlugin(_ ProxyContext, options v1.ClientPluginOptions) (Plugin, error) { opts := options.(*v1.StaticFilePluginOptions) listener := NewProxyListener() @@ -70,8 +68,8 @@ func NewStaticFilePlugin(options v1.ClientPluginOptions) (Plugin, error) { return sp, nil } -func (sp *StaticFilePlugin) Handle(_ context.Context, conn io.ReadWriteCloser, realConn net.Conn, _ *ExtraInfo) { - wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn) +func (sp *StaticFilePlugin) Handle(_ context.Context, connInfo *ConnectionInfo) { + wrapConn := netpkg.WrapReadWriteCloserToConn(connInfo.Conn, connInfo.UnderlyingConn) _ = sp.l.PutConn(wrapConn) } diff --git a/pkg/plugin/client/tls2raw.go b/pkg/plugin/client/tls2raw.go index adcc7741..be2d32e9 100644 --- a/pkg/plugin/client/tls2raw.go +++ b/pkg/plugin/client/tls2raw.go @@ -19,7 +19,6 @@ package plugin import ( "context" "crypto/tls" - "io" "net" libio "github.com/fatedier/golib/io" @@ -40,7 +39,7 @@ type TLS2RawPlugin struct { tlsConfig *tls.Config } -func NewTLS2RawPlugin(options v1.ClientPluginOptions) (Plugin, error) { +func NewTLS2RawPlugin(_ ProxyContext, options v1.ClientPluginOptions) (Plugin, error) { opts := options.(*v1.TLS2RawPluginOptions) p := &TLS2RawPlugin{ @@ -55,10 +54,10 @@ func NewTLS2RawPlugin(options v1.ClientPluginOptions) (Plugin, error) { return p, nil } -func (p *TLS2RawPlugin) Handle(ctx context.Context, conn io.ReadWriteCloser, realConn net.Conn, _ *ExtraInfo) { +func (p *TLS2RawPlugin) Handle(ctx context.Context, connInfo *ConnectionInfo) { xl := xlog.FromContextSafe(ctx) - wrapConn := netpkg.WrapReadWriteCloserToConn(conn, realConn) + wrapConn := netpkg.WrapReadWriteCloserToConn(connInfo.Conn, connInfo.UnderlyingConn) tlsConn := tls.Server(wrapConn, p.tlsConfig) if err := tlsConn.Handshake(); err != nil { diff --git a/pkg/plugin/client/unix_domain_socket.go b/pkg/plugin/client/unix_domain_socket.go index b6aa6075..22832502 100644 --- a/pkg/plugin/client/unix_domain_socket.go +++ b/pkg/plugin/client/unix_domain_socket.go @@ -18,7 +18,6 @@ package plugin import ( "context" - "io" "net" libio "github.com/fatedier/golib/io" @@ -35,7 +34,7 @@ type UnixDomainSocketPlugin struct { UnixAddr *net.UnixAddr } -func NewUnixDomainSocketPlugin(options v1.ClientPluginOptions) (p Plugin, err error) { +func NewUnixDomainSocketPlugin(_ ProxyContext, options v1.ClientPluginOptions) (p Plugin, err error) { opts := options.(*v1.UnixDomainSocketPluginOptions) unixAddr, errRet := net.ResolveUnixAddr("unix", opts.UnixPath) @@ -50,20 +49,20 @@ func NewUnixDomainSocketPlugin(options v1.ClientPluginOptions) (p Plugin, err er return } -func (uds *UnixDomainSocketPlugin) Handle(ctx context.Context, conn io.ReadWriteCloser, _ net.Conn, extra *ExtraInfo) { +func (uds *UnixDomainSocketPlugin) Handle(ctx context.Context, connInfo *ConnectionInfo) { xl := xlog.FromContextSafe(ctx) localConn, err := net.DialUnix("unix", nil, uds.UnixAddr) if err != nil { xl.Warnf("dial to uds %s error: %v", uds.UnixAddr, err) return } - if extra.ProxyProtocolHeader != nil { - if _, err := extra.ProxyProtocolHeader.WriteTo(localConn); err != nil { + if connInfo.ProxyProtocolHeader != nil { + if _, err := connInfo.ProxyProtocolHeader.WriteTo(localConn); err != nil { return } } - libio.Join(localConn, conn) + libio.Join(localConn, connInfo.Conn) } func (uds *UnixDomainSocketPlugin) Name() string { diff --git a/pkg/plugin/client/virtual_net.go b/pkg/plugin/client/virtual_net.go new file mode 100644 index 00000000..c37046a6 --- /dev/null +++ b/pkg/plugin/client/virtual_net.go @@ -0,0 +1,71 @@ +// Copyright 2025 The frp Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !frps + +package plugin + +import ( + "context" + + v1 "github.com/fatedier/frp/pkg/config/v1" + "github.com/fatedier/frp/pkg/util/xlog" +) + +func init() { + Register(v1.PluginVirtualNet, NewVirtualNetPlugin) +} + +type VirtualNetPlugin struct { + pxyCtx ProxyContext + opts *v1.VirtualNetPluginOptions +} + +func NewVirtualNetPlugin(pxyCtx ProxyContext, options v1.ClientPluginOptions) (Plugin, error) { + opts := options.(*v1.VirtualNetPluginOptions) + + p := &VirtualNetPlugin{ + pxyCtx: pxyCtx, + opts: opts, + } + return p, nil +} + +func (p *VirtualNetPlugin) Handle(ctx context.Context, connInfo *ConnectionInfo) { + xl := xlog.FromContextSafe(ctx) + + // Verify if virtual network controller is available + if p.pxyCtx.VnetController == nil { + return + } + + // Register the connection with the controller + routeName := p.pxyCtx.Name + err := p.pxyCtx.VnetController.RegisterServerConn(routeName, connInfo.Conn) + if err != nil { + xl.Errorf("virtual net failed to register server connection: %v", err) + return + } +} + +func (p *VirtualNetPlugin) Name() string { + return v1.PluginVirtualNet +} + +func (p *VirtualNetPlugin) Close() error { + if p.pxyCtx.VnetController != nil { + p.pxyCtx.VnetController.UnregisterServerConn(p.pxyCtx.Name) + } + return nil +} diff --git a/pkg/plugin/visitor/plugin.go b/pkg/plugin/visitor/plugin.go new file mode 100644 index 00000000..25751c2d --- /dev/null +++ b/pkg/plugin/visitor/plugin.go @@ -0,0 +1,58 @@ +// Copyright 2025 The frp Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package visitor + +import ( + "context" + "fmt" + "net" + + v1 "github.com/fatedier/frp/pkg/config/v1" + "github.com/fatedier/frp/pkg/vnet" +) + +type VisitorContext struct { + Name string + Ctx context.Context + VnetController *vnet.Controller + HandleConn func(net.Conn) +} + +// Creators is used for create plugins to handle connections. +var creators = make(map[string]CreatorFn) + +type CreatorFn func(visitorCtx VisitorContext, options v1.VisitorPluginOptions) (Plugin, error) + +func Register(name string, fn CreatorFn) { + if _, exist := creators[name]; exist { + panic(fmt.Sprintf("plugin [%s] is already registered", name)) + } + creators[name] = fn +} + +func Create(pluginName string, visitorCtx VisitorContext, options v1.VisitorPluginOptions) (p Plugin, err error) { + if fn, ok := creators[pluginName]; ok { + p, err = fn(visitorCtx, options) + } else { + err = fmt.Errorf("plugin [%s] is not registered", pluginName) + } + return +} + +type Plugin interface { + Name() string + Start() + Close() error +} diff --git a/pkg/plugin/visitor/virtual_net.go b/pkg/plugin/visitor/virtual_net.go new file mode 100644 index 00000000..1da7509b --- /dev/null +++ b/pkg/plugin/visitor/virtual_net.go @@ -0,0 +1,234 @@ +// Copyright 2025 The frp Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !frps + +package visitor + +import ( + "context" + "errors" + "fmt" + "net" + "sync" + "time" + + v1 "github.com/fatedier/frp/pkg/config/v1" + netutil "github.com/fatedier/frp/pkg/util/net" + "github.com/fatedier/frp/pkg/util/xlog" +) + +func init() { + Register(v1.VisitorPluginVirtualNet, NewVirtualNetPlugin) +} + +type VirtualNetPlugin struct { + visitorCtx VisitorContext + + routes []net.IPNet + + mu sync.Mutex + controllerConn net.Conn + closeSignal chan struct{} + + pluginCtx context.Context + pluginCancel context.CancelFunc +} + +func NewVirtualNetPlugin(visitorCtx VisitorContext, options v1.VisitorPluginOptions) (Plugin, error) { + opts := options.(*v1.VirtualNetVisitorPluginOptions) + + p := &VirtualNetPlugin{ + visitorCtx: visitorCtx, + routes: make([]net.IPNet, 0), + } + + p.pluginCtx, p.pluginCancel = context.WithCancel(visitorCtx.Ctx) + + if opts.DestinationIP == "" { + return nil, errors.New("destinationIP is required") + } + + // Parse DestinationIP as a single IP and create a host route + ip := net.ParseIP(opts.DestinationIP) + if ip == nil { + return nil, fmt.Errorf("invalid destination IP address [%s]", opts.DestinationIP) + } + + var mask net.IPMask + if ip.To4() != nil { + mask = net.CIDRMask(32, 32) // /32 for IPv4 + } else { + mask = net.CIDRMask(128, 128) // /128 for IPv6 + } + p.routes = append(p.routes, net.IPNet{IP: ip, Mask: mask}) + + return p, nil +} + +func (p *VirtualNetPlugin) Name() string { + return v1.VisitorPluginVirtualNet +} + +func (p *VirtualNetPlugin) Start() { + xl := xlog.FromContextSafe(p.visitorCtx.Ctx) + if p.visitorCtx.VnetController == nil { + return + } + + routeStr := "unknown" + if len(p.routes) > 0 { + routeStr = p.routes[0].String() + } + xl.Infof("Starting VirtualNetPlugin for visitor [%s], attempting to register routes for %s", p.visitorCtx.Name, routeStr) + + go p.run() +} + +func (p *VirtualNetPlugin) run() { + xl := xlog.FromContextSafe(p.pluginCtx) + reconnectDelay := 10 * time.Second + + for { + // Create a signal channel for this connection attempt + currentCloseSignal := make(chan struct{}) + + // Store the signal channel under lock + p.mu.Lock() + p.closeSignal = currentCloseSignal + p.mu.Unlock() + + select { + case <-p.pluginCtx.Done(): + xl.Infof("VirtualNetPlugin run loop for visitor [%s] stopping (context cancelled before pipe creation).", p.visitorCtx.Name) + // Ensure controllerConn from previous loop is cleaned up if necessary + p.cleanupControllerConn(xl) + return + default: + } + + controllerConn, pluginConn := net.Pipe() + + // Store controllerConn under lock for cleanup purposes + p.mu.Lock() + p.controllerConn = controllerConn + p.mu.Unlock() + + // Wrap pluginConn using CloseNotifyConn + pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func() { + close(currentCloseSignal) // Signal the run loop + }) + + xl.Infof("Attempting to register client route for visitor [%s]", p.visitorCtx.Name) + err := p.visitorCtx.VnetController.RegisterClientRoute(p.visitorCtx.Name, p.routes, controllerConn) + if err != nil { + xl.Errorf("Failed to register client route for visitor [%s]: %v. Retrying after %v", p.visitorCtx.Name, err, reconnectDelay) + p.cleanupPipePair(xl, controllerConn, pluginConn) // Close both ends on registration failure + + // Wait before retrying registration, unless context is cancelled + select { + case <-time.After(reconnectDelay): + continue // Retry the loop + case <-p.pluginCtx.Done(): + xl.Infof("VirtualNetPlugin registration retry wait interrupted for visitor [%s]", p.visitorCtx.Name) + return // Exit loop if context is cancelled during wait + } + } + + xl.Infof("Successfully registered client route for visitor [%s]. Starting connection handler with CloseNotifyConn.", p.visitorCtx.Name) + + // Pass the CloseNotifyConn to HandleConn. + // HandleConn is responsible for calling Close() on pluginNotifyConn. + p.visitorCtx.HandleConn(pluginNotifyConn) + + // Wait for either the plugin context to be cancelled or the wrapper's Close() to be called via the signal channel. + select { + case <-p.pluginCtx.Done(): + xl.Infof("VirtualNetPlugin run loop stopping for visitor [%s] (context cancelled while waiting).", p.visitorCtx.Name) + // Context cancelled, ensure controller side is closed if HandleConn didn't close its side yet. + p.cleanupControllerConn(xl) + return + case <-currentCloseSignal: + xl.Infof("Detected connection closed via CloseNotifyConn for visitor [%s].", p.visitorCtx.Name) + // HandleConn closed the plugin side (pluginNotifyConn). The closeFn was called, closing currentCloseSignal. + // We still need to close the controller side. + p.cleanupControllerConn(xl) + + // Add a delay before attempting to reconnect, respecting context cancellation. + xl.Infof("Waiting %v before attempting reconnection for visitor [%s]...", reconnectDelay, p.visitorCtx.Name) + select { + case <-time.After(reconnectDelay): + // Delay completed, loop will continue. + case <-p.pluginCtx.Done(): + xl.Infof("VirtualNetPlugin reconnection delay interrupted for visitor [%s]", p.visitorCtx.Name) + return // Exit loop if context is cancelled during wait + } + // Loop will continue to reconnect. + } + + // Loop will restart, context check at the beginning of the loop is sufficient. + xl.Infof("Re-establishing virtual connection for visitor [%s]...", p.visitorCtx.Name) + } +} + +// cleanupControllerConn closes the current controllerConn (if it exists) under lock. +func (p *VirtualNetPlugin) cleanupControllerConn(xl *xlog.Logger) { + p.mu.Lock() + defer p.mu.Unlock() + if p.controllerConn != nil { + xl.Debugf("Cleaning up controllerConn for visitor [%s]", p.visitorCtx.Name) + p.controllerConn.Close() + p.controllerConn = nil + } else { + // xl.Debugf("No active controllerConn to cleanup for visitor [%s]", p.visitorCtx.Name) + } + // Also clear the closeSignal reference for the completed/cancelled connection attempt + p.closeSignal = nil +} + +// cleanupPipePair closes both ends of a pipe, used typically when registration fails. +func (p *VirtualNetPlugin) cleanupPipePair(xl *xlog.Logger, controllerConn, pluginConn net.Conn) { + xl.Debugf("Cleaning up pipe pair for visitor [%s] after registration failure", p.visitorCtx.Name) + controllerConn.Close() + pluginConn.Close() + p.mu.Lock() + p.controllerConn = nil // Ensure field is nil if it was briefly set + p.closeSignal = nil // Ensure field is nil if it was briefly set + p.mu.Unlock() +} + +// Close initiates the plugin shutdown. +func (p *VirtualNetPlugin) Close() error { + xl := xlog.FromContextSafe(p.visitorCtx.Ctx) // Use base context for close logging + xl.Infof("Closing VirtualNetPlugin for visitor [%s]", p.visitorCtx.Name) + + // 1. Signal the run loop goroutine to stop via context cancellation. + p.pluginCancel() + + // 2. Unregister the route from the controller. + // This might implicitly cause the VnetController to close its end of the pipe (controllerConn). + if p.visitorCtx.VnetController != nil { + p.visitorCtx.VnetController.UnregisterClientRoute(p.visitorCtx.Name) + xl.Infof("Unregistered client route for visitor [%s]", p.visitorCtx.Name) + } else { + xl.Warnf("VnetController is nil during close for visitor [%s], cannot unregister route", p.visitorCtx.Name) + } + + // 3. Explicitly close the controller side of the pipe managed by this plugin. + // This ensures the pipe is broken even if the run loop is stuck or HandleConn hasn't closed its end. + p.cleanupControllerConn(xl) + xl.Infof("Finished cleaning up connections during close for visitor [%s]", p.visitorCtx.Name) + + return nil +} diff --git a/pkg/vnet/README.md b/pkg/vnet/README.md new file mode 100644 index 00000000..8f30f74c --- /dev/null +++ b/pkg/vnet/README.md @@ -0,0 +1,94 @@ +# Virtual Network Controller for FRP + +This package implements a virtual network controller that enables VPN-like functionality between FRP clients and servers. + +## Overview + +The Virtual Network Controller manages TUN devices and routing logic to create secure tunnels between clients and servers. It supports both client-side routing (based on destination IP) and server-side routing (based on source IP). + +## How it Works + +1. **Client Side**: Routes packets based on destination IP addresses. When a packet is destined for a network that matches a registered route, it's forwarded through the appropriate work connection. + +2. **Server Side**: Routes packets based on source IP addresses. When a packet is received over a connection, its source IP is registered with that connection. Future packets from the TUN device with that source IP will be routed back through the same connection. + +3. **TUN Device**: A virtual network interface that captures and injects packets at the IP layer. + +## Usage + +### Client Implementation + +```go +// Set up VNet configuration +clientCfg := v1.VirtualNetConfig{ + Address: "10.10.0.1/24", + Routes: []string{"10.20.0.0/24"}, // Routes to the server's network +} + +// Create and initialize controller +controller := vnet.NewController(clientCfg) +if err := controller.Init(); err != nil { + // Handle error +} + +// Parse route strings to IPNet objects +routes, err := vnet.ParseRoutes(clientCfg.Routes) +if err != nil { + // Handle error +} + +// Register work connection with routes +if err := controller.RegisterClientRoute("server1", routes, workConn); err != nil { + // Handle error +} + +// Start the controller (typically in a goroutine) +go func() { + if err := controller.Run(); err != nil { + // Handle error + } +}() +``` + +### Server Implementation + +```go +// Set up VNet configuration +serverCfg := v1.VirtualNetConfig{ + Address: "10.20.0.1/24", +} + +// Create and initialize controller +controller := vnet.NewController(serverCfg) +if err := controller.Init(); err != nil { + // Handle error +} + +// Register client work connection +if err := controller.RegisterServerConn("client1", workConn); err != nil { + // Handle error +} + +// Start the controller (typically in a goroutine) +go func() { + if err := controller.Run(); err != nil { + // Handle error + } +}() +``` + +## Network Setup + +For proper routing, you'll need to: + +1. Configure the TUN device with appropriate IP addresses +2. Add routes to your routing table for traffic that should go through the VPN +3. Enable IP forwarding if needed + +See the examples directory for a complete implementation example. + +## Security Considerations + +- The VNet controller operates at the IP layer, so all traffic is routed based on IP addresses +- Encryption should be handled at the transport layer (e.g., by the FRP connection) +- Consider using firewall rules to restrict access to the TUN device \ No newline at end of file diff --git a/pkg/vnet/controller.go b/pkg/vnet/controller.go new file mode 100644 index 00000000..c371f523 --- /dev/null +++ b/pkg/vnet/controller.go @@ -0,0 +1,363 @@ +// Copyright 2025 The frp Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vnet + +import ( + "encoding/base64" + "fmt" + "io" + "net" + "sync" + + "github.com/songgao/water/waterutil" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" + + v1 "github.com/fatedier/frp/pkg/config/v1" + "github.com/fatedier/frp/pkg/util/log" +) + +const ( + maxPacketSize = 1350 // Maximum TUN packet size +) + +type Controller struct { + addr string + + tun io.ReadWriteCloser + clientRouter *clientRouter // Route based on destination IP (client mode) + serverRouter *serverRouter // Route based on source IP (server mode) +} + +func NewController(cfg v1.VirtualNetConfig) *Controller { + return &Controller{ + addr: cfg.Address, + clientRouter: newClientRouter(), + serverRouter: newServerRouter(), + } +} + +func (c *Controller) Init() error { + conn, _, _, err := createTun(c.addr) + if err != nil { + return err + } + c.tun = conn + return nil +} + +func (c *Controller) Run() error { + conn := c.tun + + for { + buf := make([]byte, maxPacketSize) + n, err := conn.Read(buf) + if err != nil { + log.Warnf("vnet read from tun error: %v", err) + return err + } + log.Tracef("vnet read from tun [%d]: %s", n, base64.StdEncoding.EncodeToString(buf[:n])) + + var src, dst net.IP + if waterutil.IsIPv4(buf[:n]) { + header, err := ipv4.ParseHeader(buf[:n]) + if err != nil { + log.Warnf("parse ipv4 header error:", err) + continue + } + src = header.Src + dst = header.Dst + log.Infof("%s >> %s %d/%-4d %-4x %d", + header.Src, header.Dst, + header.Len, header.TotalLen, header.ID, header.Flags) + } else if waterutil.IsIPv6(buf[:n]) { + header, err := ipv6.ParseHeader(buf[:n]) + if err != nil { + log.Warnf("parse ipv6 header error:", err) + continue + } + src = header.Src + dst = header.Dst + log.Infof("%s >> %s %d %d", + header.Src, header.Dst, + header.PayloadLen, header.TrafficClass) + } else { + log.Warnf("unknown packet, discarded(%d)", n) + continue + } + + // 1. First try to route based on destination IP (client mode) + targetConn, err := c.clientRouter.findConn(dst) + if err == nil { + // Found matching destination route, sending data + if _, err := targetConn.Write(buf[:n]); err != nil { + log.Warnf("write to client target conn error: %v", err) + } + continue + } + + // 2. If client routing fails, try routing based on source IP (server mode) + targetConn, err = c.serverRouter.findConnBySrc(dst) + if err == nil { + // Found matching source route, sending data + if _, err := targetConn.Write(buf[:n]); err != nil { + log.Warnf("write to server target conn error: %v", err) + } + continue + } + + // 3. No matching route found + log.Warnf("no route found for packet from %s to %s", src, dst) + } +} + +func (c *Controller) Stop() error { + return c.tun.Close() +} + +// Client connection read loop +func (c *Controller) readLoopClient(conn io.ReadWriteCloser) { + defer func() { + // TODO + // unregister client route + }() + + for { + // Read fixed size packet + buf := make([]byte, maxPacketSize) + // TODO: custom message format + n, err := conn.Read(buf) + if err != nil { + log.Warnf("client read error: %v", err) + return + } + + if n == 0 { + continue + } + + if waterutil.IsIPv4(buf[:n]) { + header, err := ipv4.ParseHeader(buf[:n]) + if err != nil { + log.Warnf("parse ipv4 header error: %v", err) + continue + } + log.Infof("%s >> %s %d/%-4d %-4x %d", + header.Src, header.Dst, + header.Len, header.TotalLen, header.ID, header.Flags) + } else if waterutil.IsIPv6(buf[:n]) { + header, err := ipv6.ParseHeader(buf[:n]) + if err != nil { + log.Warnf("parse ipv6 header error: %v", err) + continue + } + log.Infof("%s >> %s %d %d", + header.Src, header.Dst, + header.PayloadLen, header.TrafficClass) + } else { + log.Warnf("unknown packet, discarded(%d)", n) + continue + } + + // Write to TUN device + log.Tracef("vnet write to tun (client) [%d]: %s", n, base64.StdEncoding.EncodeToString(buf[:n])) + _, err = c.tun.Write(buf[:n]) + if err != nil { + log.Warnf("client write tun error: %v", err) + } + } +} + +// Server connection read loop +func (c *Controller) readLoopServer(conn io.ReadWriteCloser) { + for { + // Read fixed size packet + buf := make([]byte, maxPacketSize) + // TODO: custom message format + n, err := conn.Read(buf) + if err != nil { + log.Warnf("server read error: %v", err) + return + } + + if n == 0 { + continue + } + + // Register source IP to connection mapping + if waterutil.IsIPv4(buf[:n]) || waterutil.IsIPv6(buf[:n]) { + var src net.IP + if waterutil.IsIPv4(buf[:n]) { + header, err := ipv4.ParseHeader(buf[:n]) + if err == nil { + src = header.Src + c.serverRouter.registerSrcIP(src, conn) + } + } else if waterutil.IsIPv6(buf[:n]) { + header, err := ipv6.ParseHeader(buf[:n]) + if err == nil { + src = header.Src + c.serverRouter.registerSrcIP(src, conn) + } + } + } + + // Write to TUN + log.Tracef("vnet write to tun (server) [%d]: %s", n, base64.StdEncoding.EncodeToString(buf[:n])) + _, err = c.tun.Write(buf[:n]) + if err != nil { + log.Warnf("server write tun error: %v", err) + } + } +} + +// RegisterClientRoute Register client route (based on destination IP CIDR) +func (c *Controller) RegisterClientRoute(name string, routes []net.IPNet, conn io.ReadWriteCloser) error { + if err := c.clientRouter.addRoute(name, routes, conn); err != nil { + return err + } + go c.readLoopClient(conn) + return nil +} + +// RegisterServerConn Register server connection (dynamically associates with source IPs) +func (c *Controller) RegisterServerConn(name string, conn io.ReadWriteCloser) error { + if err := c.serverRouter.addConn(name, conn); err != nil { + return err + } + go c.readLoopServer(conn) + return nil +} + +// UnregisterServerConn Remove server connection from routing table +func (c *Controller) UnregisterServerConn(name string) { + c.serverRouter.delConn(name) +} + +// UnregisterClientRoute Remove client route from routing table +func (c *Controller) UnregisterClientRoute(name string) { + c.clientRouter.delRoute(name) +} + +// ParseRoutes Convert route strings to IPNet objects +func ParseRoutes(routeStrings []string) ([]net.IPNet, error) { + routes := make([]net.IPNet, 0, len(routeStrings)) + for _, r := range routeStrings { + _, ipNet, err := net.ParseCIDR(r) + if err != nil { + return nil, fmt.Errorf("parse route %s error: %v", r, err) + } + routes = append(routes, *ipNet) + } + return routes, nil +} + +// Client router (based on destination IP routing) +type clientRouter struct { + routes map[string]*routeElement + mu sync.RWMutex +} + +func newClientRouter() *clientRouter { + return &clientRouter{ + routes: make(map[string]*routeElement), + } +} + +func (r *clientRouter) addRoute(name string, routes []net.IPNet, conn io.ReadWriteCloser) error { + r.mu.Lock() + defer r.mu.Unlock() + r.routes[name] = &routeElement{ + name: name, + routes: routes, + conn: conn, + } + return nil +} + +func (r *clientRouter) findConn(dst net.IP) (io.ReadWriteCloser, error) { + r.mu.RLock() + defer r.mu.RUnlock() + for _, re := range r.routes { + for _, route := range re.routes { + if route.Contains(dst) { + return re.conn, nil + } + } + } + return nil, fmt.Errorf("no route found for destination %s", dst) +} + +func (r *clientRouter) delRoute(name string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.routes, name) +} + +// Server router (based on source IP routing) +type serverRouter struct { + namedConns map[string]io.ReadWriteCloser // Name to connection mapping + srcIPConns map[string]io.ReadWriteCloser // Source IP string to connection mapping + mu sync.RWMutex +} + +func newServerRouter() *serverRouter { + return &serverRouter{ + namedConns: make(map[string]io.ReadWriteCloser), + srcIPConns: make(map[string]io.ReadWriteCloser), + } +} + +func (r *serverRouter) addConn(name string, conn io.ReadWriteCloser) error { + r.mu.Lock() + original, ok := r.namedConns[name] + r.namedConns[name] = conn + r.mu.Unlock() + if ok { + // Close the original connection if it exists + _ = original.Close() + } + return nil +} + +func (r *serverRouter) findConnBySrc(src net.IP) (io.ReadWriteCloser, error) { + r.mu.RLock() + defer r.mu.RUnlock() + conn, exists := r.srcIPConns[src.String()] + if !exists { + return nil, fmt.Errorf("no route found for source %s", src) + } + return conn, nil +} + +func (r *serverRouter) registerSrcIP(src net.IP, conn io.ReadWriteCloser) { + r.mu.Lock() + defer r.mu.Unlock() + r.srcIPConns[src.String()] = conn +} + +func (r *serverRouter) delConn(name string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.namedConns, name) + // Note: We don't delete mappings from srcIPConns because we don't know which source IPs are associated with this connection + // This might cause dangling references, but they will be overwritten on new connections or restart +} + +type routeElement struct { + name string + routes []net.IPNet + conn io.ReadWriteCloser +} diff --git a/pkg/vnet/examples/client.go.bak b/pkg/vnet/examples/client.go.bak new file mode 100644 index 00000000..28a636f6 --- /dev/null +++ b/pkg/vnet/examples/client.go.bak @@ -0,0 +1,122 @@ +// Copyright 2025 The frp Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "fmt" + "net" + "os" + "os/signal" + "syscall" + "time" + + v1 "github.com/fatedier/frp/pkg/config/v1" + "github.com/fatedier/frp/pkg/vnet" +) + +func main() { + // Parse command-line flags + serverAddr := flag.String("server", "localhost:8000", "Server address (ip:port)") + clientTUNAddr := flag.String("tun", "10.10.0.1/24", "Client TUN device address") + serverTUNNet := flag.String("server-net", "10.20.0.0/24", "Server TUN network for routing") + flag.Parse() + + fmt.Printf("Connecting to server at %s\n", *serverAddr) + fmt.Printf("Client TUN address: %s\n", *clientTUNAddr) + fmt.Printf("Server network route: %s\n", *serverTUNNet) + + // Set up vnet controller on the client + clientCfg := v1.VirtualNetConfig{ + Address: *clientTUNAddr, + Routes: []string{*serverTUNNet}, + } + clientController := vnet.NewController(clientCfg) + if err := clientController.Init(); err != nil { + fmt.Printf("Client init error: %v\n", err) + return + } + + // Handle shutdown gracefully + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-c + fmt.Println("\nShutting down client...") + os.Exit(0) + }() + + // Start the controller + go func() { + if err := clientController.Run(); err != nil { + fmt.Printf("Client run error: %v\n", err) + } + }() + + // Main connection loop + for { + // Connect to the server + conn, err := net.Dial("tcp", *serverAddr) + if err != nil { + fmt.Printf("Failed to connect to server %s: %v\n", *serverAddr, err) + time.Sleep(5 * time.Second) + continue + } + + fmt.Printf("Connected to server at %s\n", conn.RemoteAddr()) + + // Wrap the connection + rwc := &readWriteCloserConn{conn} + + // Parse routes from config + clientRoutes, err := vnet.ParseRoutes(clientCfg.Routes) + if err != nil { + fmt.Printf("Parse client routes error: %v\n", err) + conn.Close() + return + } + + // Register the route with the connection + if err := clientController.RegisterClientRoute("server", clientRoutes, rwc); err != nil { + fmt.Printf("Register client route error: %v\n", err) + conn.Close() + time.Sleep(5 * time.Second) + continue + } + + fmt.Println("VPN tunnel established with server") + fmt.Printf("Client TUN: %s, routes: %s\n", *clientTUNAddr, *serverTUNNet) + + time.Sleep(60 * time.Second) + } +} + +// readWriteCloserConn wraps net.Conn as io.ReadWriteCloser +type readWriteCloserConn struct { + net.Conn +} + +func (rwc *readWriteCloserConn) Read(p []byte) (n int, err error) { + return rwc.Conn.Read(p) +} + +func (rwc *readWriteCloserConn) Write(p []byte) (n int, err error) { + return rwc.Conn.Write(p) +} + +func (rwc *readWriteCloserConn) Close() error { + return rwc.Conn.Close() +} diff --git a/pkg/vnet/examples/server.go.bak b/pkg/vnet/examples/server.go.bak new file mode 100644 index 00000000..93100e24 --- /dev/null +++ b/pkg/vnet/examples/server.go.bak @@ -0,0 +1,114 @@ +// Copyright 2025 The frp Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "fmt" + "net" + "os" + "os/signal" + "syscall" + + v1 "github.com/fatedier/frp/pkg/config/v1" + "github.com/fatedier/frp/pkg/vnet" +) + +func main() { + // Parse command-line flags + listenAddr := flag.String("listen", ":8000", "Server listen address (ip:port)") + serverTUNAddr := flag.String("tun", "10.20.0.1/24", "Server TUN device address") + flag.Parse() + + // Listen for incoming connections + listener, err := net.Listen("tcp", *listenAddr) + if err != nil { + fmt.Printf("Failed to listen on %s: %v\n", *listenAddr, err) + return + } + defer listener.Close() + + fmt.Printf("Server listening on %s\n", *listenAddr) + fmt.Printf("Server TUN address: %s\n", *serverTUNAddr) + + // Set up vnet controller on the server + serverCfg := v1.VirtualNetConfig{ + Address: *serverTUNAddr, // Server TUN device address + } + serverController := vnet.NewController(serverCfg) + if err := serverController.Init(); err != nil { + fmt.Printf("Server init error: %v\n", err) + return + } + + // Start the controller + go func() { + if err := serverController.Run(); err != nil { + fmt.Printf("Server run error: %v\n", err) + } + }() + + // Handle shutdown gracefully + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-c + fmt.Println("\nShutting down server...") + os.Exit(0) + }() + + fmt.Println("Waiting for client connection...") + // Accept and handle connections + for { + conn, err := listener.Accept() + if err != nil { + fmt.Printf("Error accepting connection: %v\n", err) + continue + } + + fmt.Printf("New client connected from %s\n", conn.RemoteAddr()) + + // Wrap the connection + rwc := &readWriteCloserConn{conn} + + // Register the connection + if err := serverController.RegisterServerConn("client", rwc); err != nil { + fmt.Printf("Register server connection error: %v\n", err) + conn.Close() + continue + } + + fmt.Println("VPN tunnel established with client") + fmt.Printf("Server TUN: %s\n", *serverTUNAddr) + } +} + +// readWriteCloserConn wraps net.Conn as io.ReadWriteCloser +type readWriteCloserConn struct { + net.Conn +} + +func (rwc *readWriteCloserConn) Read(p []byte) (n int, err error) { + return rwc.Conn.Read(p) +} + +func (rwc *readWriteCloserConn) Write(p []byte) (n int, err error) { + return rwc.Conn.Write(p) +} + +func (rwc *readWriteCloserConn) Close() error { + return rwc.Conn.Close() +} diff --git a/pkg/vnet/tun.go b/pkg/vnet/tun.go new file mode 100644 index 00000000..3976d344 --- /dev/null +++ b/pkg/vnet/tun.go @@ -0,0 +1,73 @@ +// Copyright 2025 The frp Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vnet + +import ( + "io" + + "golang.zx2c4.com/wireguard/tun" +) + +const ( + offset = 16 +) + +type tunDevice struct { + dev tun.Device +} + +func (d *tunDevice) Read(p []byte) (int, error) { + buf := make([]byte, len(p)+offset) + + sz := make([]int, 1) + + n, err := d.dev.Read([][]byte{buf}, sz, offset) + if err != nil { + return 0, err + } + if n == 0 { + return 0, io.EOF + } + + dataSize := sz[0] + if dataSize > len(p) { + dataSize = len(p) + } + copy(p, buf[offset:offset+dataSize]) + return dataSize, nil +} + +func (d *tunDevice) Write(p []byte) (int, error) { + buf := make([]byte, len(p)+offset) + copy(buf[offset:], p) + return d.dev.Write([][]byte{buf}, offset) +} + +func (d *tunDevice) Close() error { + return d.dev.Close() +} + +func createTunDevice(adviceName string, mtu int) (io.ReadWriteCloser, string, error) { + ifce, err := tun.CreateTUN(adviceName, mtu) + if err != nil { + return nil, "", err + } + + name, err := ifce.Name() + if err != nil { + return nil, "", err + } + return &tunDevice{dev: ifce}, name, nil +} diff --git a/pkg/vnet/tun_darwin.go b/pkg/vnet/tun_darwin.go new file mode 100644 index 00000000..058771ae --- /dev/null +++ b/pkg/vnet/tun_darwin.go @@ -0,0 +1,84 @@ +// Copyright 2025 The frp Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vnet + +import ( + "fmt" + "io" + "net" + "os/exec" + "strings" +) + +const ( + defaultTunName = "utun" + defaultMTU = 1420 +) + +func createTun(addr string) (io.ReadWriteCloser, string, net.IP, error) { + ifce, name, err := createTunDevice(defaultTunName, defaultMTU) + if err != nil { + return nil, "", nil, err + } + + ip, ipNet, err := net.ParseCIDR(addr) + if err != nil { + return nil, "", nil, err + } + + // Calculate a peer IP for the point-to-point tunnel + peerIP := generatePeerIP(ip) + + // Configure the interface with proper point-to-point addressing + cmd := fmt.Sprintf("ifconfig %s inet %s %s mtu %d up", + name, ip.String(), peerIP.String(), defaultMTU) + args := strings.Split(cmd, " ") + if err = exec.Command(args[0], args[1:]...).Run(); err != nil { + return nil, "", nil, err + } + + // Add default route for the tunnel subnet + routes := []net.IPNet{*ipNet} + if err = addRoutes(name, routes); err != nil { + return nil, "", nil, err + } + + return ifce, name, ip, nil +} + +// generatePeerIP creates a peer IP for the point-to-point tunnel +// by incrementing the last octet of the IP +func generatePeerIP(ip net.IP) net.IP { + // Make a copy to avoid modifying the original + peerIP := make(net.IP, len(ip)) + copy(peerIP, ip) + + // Increment the last octet + peerIP[len(peerIP)-1]++ + + return peerIP +} + +// addRoutes configures system routes for the TUN interface +func addRoutes(ifName string, routes []net.IPNet) error { + for _, route := range routes { + cmd := fmt.Sprintf("route add -net %s -interface %s", route.String(), ifName) + args := strings.Split(cmd, " ") + if err := exec.Command(args[0], args[1:]...).Run(); err != nil { + return err + } + } + return nil +} diff --git a/pkg/vnet/tun_linux.go b/pkg/vnet/tun_linux.go new file mode 100644 index 00000000..0eded094 --- /dev/null +++ b/pkg/vnet/tun_linux.go @@ -0,0 +1,78 @@ +// Copyright 2025 The frp Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vnet + +import ( + "fmt" + "io" + "net" + + "github.com/vishvananda/netlink" +) + +const ( + defaultTunName = "utun" + defaultMTU = 1420 +) + +func createTun(addr string) (io.ReadWriteCloser, string, net.IP, error) { + dev, name, err := createTunDevice(defaultTunName, defaultMTU) + if err != nil { + return nil, "", nil, err + } + + ifce, err := net.InterfaceByName(name) + if err != nil { + return nil, "", nil, err + } + + link, err := netlink.LinkByName(name) + if err != nil { + return nil, "", nil, err + } + + ip, cidr, err := net.ParseCIDR(addr) + if err != nil { + return nil, "", nil, err + } + if err := netlink.AddrAdd(link, &netlink.Addr{ + IPNet: &net.IPNet{ + IP: ip, + Mask: cidr.Mask, + }, + }); err != nil { + return nil, "", nil, err + } + + if err := netlink.LinkSetUp(link); err != nil { + return nil, "", nil, err + } + + if err = addRoutes(ifce, cidr); err != nil { + return nil, "", nil, err + } + return dev, name, ip, nil +} + +func addRoutes(ifce *net.Interface, cidr *net.IPNet) error { + r := netlink.Route{ + Dst: cidr, + LinkIndex: ifce.Index, + } + if err := netlink.RouteReplace(&r); err != nil { + return fmt.Errorf("add route to %v error: %v", r.Dst, err) + } + return nil +}