virtual-net: initial (#4751)

This commit is contained in:
fatedier
2025-04-16 16:05:54 +08:00
committed by GitHub
parent 773169e0c4
commit a78814a2e9
48 changed files with 1822 additions and 180 deletions

View File

@@ -29,6 +29,7 @@ import (
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/wait"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/pkg/vnet"
)
type SessionContext struct {
@@ -46,6 +47,8 @@ type SessionContext struct {
AuthSetter auth.Setter
// Connector is used to create new connections, which could be real TCP connections or virtual streams.
Connector Connector
// Virtual net controller
VnetController *vnet.Controller
}
type Control struct {
@@ -99,8 +102,9 @@ func NewControl(ctx context.Context, sessionCtx *SessionContext) (*Control, erro
ctl.registerMsgHandlers()
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter)
ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common, ctl.connectServer, ctl.msgTransporter)
ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter, sessionCtx.VnetController)
ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common,
ctl.connectServer, ctl.msgTransporter, sessionCtx.VnetController)
return ctl, nil
}

View File

@@ -36,6 +36,7 @@ import (
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/limit"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/pkg/vnet"
)
var proxyFactoryRegistry = map[reflect.Type]func(*BaseProxy, v1.ProxyConfigurer) Proxy{}
@@ -58,6 +59,7 @@ func NewProxy(
pxyConf v1.ProxyConfigurer,
clientCfg *v1.ClientCommonConfig,
msgTransporter transport.MessageTransporter,
vnetController *vnet.Controller,
) (pxy Proxy) {
var limiter *rate.Limiter
limitBytes := pxyConf.GetBaseConfig().Transport.BandwidthLimit.Bytes()
@@ -70,6 +72,7 @@ func NewProxy(
clientCfg: clientCfg,
limiter: limiter,
msgTransporter: msgTransporter,
vnetController: vnetController,
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
}
@@ -85,6 +88,7 @@ type BaseProxy struct {
baseCfg *v1.ProxyBaseConfig
clientCfg *v1.ClientCommonConfig
msgTransporter transport.MessageTransporter
vnetController *vnet.Controller
limiter *rate.Limiter
// proxyPlugin is used to handle connections instead of dialing to local service.
// It's only validate for TCP protocol now.
@@ -98,7 +102,10 @@ type BaseProxy struct {
func (pxy *BaseProxy) Run() error {
if pxy.baseCfg.Plugin.Type != "" {
p, err := plugin.Create(pxy.baseCfg.Plugin.Type, pxy.baseCfg.Plugin.ClientPluginOptions)
p, err := plugin.Create(pxy.baseCfg.Plugin.Type, plugin.PluginContext{
Name: pxy.baseCfg.Name,
VnetController: pxy.vnetController,
}, pxy.baseCfg.Plugin.ClientPluginOptions)
if err != nil {
return err
}
@@ -157,22 +164,22 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
}
// check if we need to send proxy protocol info
var extraInfo plugin.ExtraInfo
var connInfo plugin.ConnectionInfo
if m.SrcAddr != "" && m.SrcPort != 0 {
if m.DstAddr == "" {
m.DstAddr = "127.0.0.1"
}
srcAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.SrcAddr, strconv.Itoa(int(m.SrcPort))))
dstAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.DstAddr, strconv.Itoa(int(m.DstPort))))
extraInfo.SrcAddr = srcAddr
extraInfo.DstAddr = dstAddr
connInfo.SrcAddr = srcAddr
connInfo.DstAddr = dstAddr
}
if baseCfg.Transport.ProxyProtocolVersion != "" && m.SrcAddr != "" && m.SrcPort != 0 {
h := &pp.Header{
Command: pp.PROXY,
SourceAddr: extraInfo.SrcAddr,
DestinationAddr: extraInfo.DstAddr,
SourceAddr: connInfo.SrcAddr,
DestinationAddr: connInfo.DstAddr,
}
if strings.Contains(m.SrcAddr, ".") {
@@ -186,13 +193,15 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
} else if baseCfg.Transport.ProxyProtocolVersion == "v2" {
h.Version = 2
}
extraInfo.ProxyProtocolHeader = h
connInfo.ProxyProtocolHeader = h
}
connInfo.Conn = remote
connInfo.UnderlyingConn = workConn
if pxy.proxyPlugin != nil {
// if plugin is set, let plugin handle connection first
xl.Debugf("handle by plugin: %s", pxy.proxyPlugin.Name())
pxy.proxyPlugin.Handle(pxy.ctx, remote, workConn, &extraInfo)
pxy.proxyPlugin.Handle(pxy.ctx, &connInfo)
xl.Debugf("handle by plugin finished")
return
}
@@ -210,8 +219,8 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
xl.Debugf("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
if extraInfo.ProxyProtocolHeader != nil {
if _, err := extraInfo.ProxyProtocolHeader.WriteTo(localConn); err != nil {
if connInfo.ProxyProtocolHeader != nil {
if _, err := connInfo.ProxyProtocolHeader.WriteTo(localConn); err != nil {
workConn.Close()
xl.Errorf("write proxy protocol header to local conn error: %v", err)
return

View File

@@ -28,12 +28,14 @@ import (
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/pkg/vnet"
)
type Manager struct {
proxies map[string]*Wrapper
msgTransporter transport.MessageTransporter
inWorkConnCallback func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool
vnetController *vnet.Controller
closed bool
mu sync.RWMutex
@@ -47,10 +49,12 @@ func NewManager(
ctx context.Context,
clientCfg *v1.ClientCommonConfig,
msgTransporter transport.MessageTransporter,
vnetController *vnet.Controller,
) *Manager {
return &Manager{
proxies: make(map[string]*Wrapper),
msgTransporter: msgTransporter,
vnetController: vnetController,
closed: false,
clientCfg: clientCfg,
ctx: ctx,
@@ -159,7 +163,7 @@ func (pm *Manager) UpdateAll(proxyCfgs []v1.ProxyConfigurer) {
for _, cfg := range proxyCfgs {
name := cfg.GetBaseConfig().Name
if _, ok := pm.proxies[name]; !ok {
pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter)
pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter, pm.vnetController)
if pm.inWorkConnCallback != nil {
pxy.SetInWorkConnCallback(pm.inWorkConnCallback)
}

View File

@@ -31,6 +31,7 @@ import (
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/pkg/vnet"
)
const (
@@ -73,6 +74,8 @@ type Wrapper struct {
handler event.Handler
msgTransporter transport.MessageTransporter
// vnet controller
vnetController *vnet.Controller
health uint32
lastSendStartMsg time.Time
@@ -91,6 +94,7 @@ func NewWrapper(
clientCfg *v1.ClientCommonConfig,
eventHandler event.Handler,
msgTransporter transport.MessageTransporter,
vnetController *vnet.Controller,
) *Wrapper {
baseInfo := cfg.GetBaseConfig()
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(baseInfo.Name)
@@ -105,6 +109,7 @@ func NewWrapper(
healthNotifyCh: make(chan struct{}),
handler: eventHandler,
msgTransporter: msgTransporter,
vnetController: vnetController,
xl: xl,
ctx: xlog.NewContext(ctx, xl),
}
@@ -117,7 +122,7 @@ func NewWrapper(
xl.Tracef("enable health check monitor")
}
pw.pxy = NewProxy(pw.ctx, pw.Cfg, clientCfg, pw.msgTransporter)
pw.pxy = NewProxy(pw.ctx, pw.Cfg, clientCfg, pw.msgTransporter, pw.vnetController)
return pw
}

View File

@@ -37,6 +37,7 @@ import (
"github.com/fatedier/frp/pkg/util/version"
"github.com/fatedier/frp/pkg/util/wait"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/pkg/vnet"
)
func init() {
@@ -110,6 +111,8 @@ type Service struct {
// web server for admin UI and apis
webServer *httppkg.Server
vnetController *vnet.Controller
cfgMu sync.RWMutex
common *v1.ClientCommonConfig
proxyCfgs []v1.ProxyConfigurer
@@ -156,6 +159,9 @@ func NewService(options ServiceOptions) (*Service, error) {
if webServer != nil {
webServer.RouteRegister(s.registerRouteHandlers)
}
if options.Common.VirtualNet.Address != "" {
s.vnetController = vnet.NewController(options.Common.VirtualNet)
}
return s, nil
}
@@ -169,6 +175,19 @@ func (svr *Service) Run(ctx context.Context) error {
netpkg.SetDefaultDNSAddress(svr.common.DNSServer)
}
if svr.vnetController != nil {
if err := svr.vnetController.Init(); err != nil {
log.Errorf("init virtual network controller error: %v", err)
return err
}
go func() {
log.Infof("virtual network controller start...")
if err := svr.vnetController.Run(); err != nil {
log.Warnf("virtual network controller exit with error: %v", err)
}
}()
}
if svr.webServer != nil {
go func() {
log.Infof("admin server listen on %s", svr.webServer.Address())
@@ -311,12 +330,13 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
connEncrypted = false
}
sessionCtx := &SessionContext{
Common: svr.common,
RunID: svr.runID,
Conn: conn,
ConnEncrypted: connEncrypted,
AuthSetter: svr.authSetter,
Connector: connector,
Common: svr.common,
RunID: svr.runID,
Conn: conn,
ConnEncrypted: connEncrypted,
AuthSetter: svr.authSetter,
Connector: connector,
VnetController: svr.vnetController,
}
ctl, err := NewControl(svr.ctx, sessionCtx)
if err != nil {

View File

@@ -44,6 +44,10 @@ func (sv *STCPVisitor) Run() (err error) {
}
go sv.internalConnWorker()
if sv.plugin != nil {
sv.plugin.Start()
}
return
}

View File

@@ -20,9 +20,11 @@ import (
"sync"
v1 "github.com/fatedier/frp/pkg/config/v1"
plugin "github.com/fatedier/frp/pkg/plugin/visitor"
"github.com/fatedier/frp/pkg/transport"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/pkg/vnet"
)
// Helper wraps some functions for visitor to use.
@@ -34,6 +36,8 @@ type Helper interface {
// MsgTransporter returns the message transporter that is used to send and receive messages
// to the frp server through the controller.
MsgTransporter() transport.MessageTransporter
// VNetController returns the vnet controller that is used to manage the virtual network.
VNetController() *vnet.Controller
// RunID returns the run id of current controller.
RunID() string
}
@@ -50,14 +54,34 @@ func NewVisitor(
cfg v1.VisitorConfigurer,
clientCfg *v1.ClientCommonConfig,
helper Helper,
) (visitor Visitor) {
) (Visitor, error) {
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(cfg.GetBaseConfig().Name)
ctx = xlog.NewContext(ctx, xl)
var visitor Visitor
baseVisitor := BaseVisitor{
clientCfg: clientCfg,
helper: helper,
ctx: xlog.NewContext(ctx, xl),
ctx: ctx,
internalLn: netpkg.NewInternalListener(),
}
if cfg.GetBaseConfig().Plugin.Type != "" {
p, err := plugin.Create(
cfg.GetBaseConfig().Plugin.Type,
plugin.PluginContext{
Name: cfg.GetBaseConfig().Name,
Ctx: ctx,
VnetController: helper.VNetController(),
HandleConn: func(conn net.Conn) {
_ = baseVisitor.AcceptConn(conn)
},
},
cfg.GetBaseConfig().Plugin.VisitorPluginOptions,
)
if err != nil {
return nil, err
}
baseVisitor.plugin = p
}
switch cfg := cfg.(type) {
case *v1.STCPVisitorConfig:
visitor = &STCPVisitor{
@@ -77,7 +101,7 @@ func NewVisitor(
checkCloseCh: make(chan struct{}),
}
}
return
return visitor, nil
}
type BaseVisitor struct {
@@ -85,6 +109,7 @@ type BaseVisitor struct {
helper Helper
l net.Listener
internalLn *netpkg.InternalListener
plugin plugin.Plugin
mu sync.RWMutex
ctx context.Context
@@ -101,4 +126,7 @@ func (v *BaseVisitor) Close() {
if v.internalLn != nil {
v.internalLn.Close()
}
if v.plugin != nil {
v.plugin.Close()
}
}

View File

@@ -27,6 +27,7 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/pkg/vnet"
)
type Manager struct {
@@ -50,6 +51,7 @@ func NewManager(
clientCfg *v1.ClientCommonConfig,
connectServer func() (net.Conn, error),
msgTransporter transport.MessageTransporter,
vnetController *vnet.Controller,
) *Manager {
m := &Manager{
clientCfg: clientCfg,
@@ -62,6 +64,7 @@ func NewManager(
m.helper = &visitorHelperImpl{
connectServerFn: connectServer,
msgTransporter: msgTransporter,
vnetController: vnetController,
transferConnFn: m.TransferConn,
runID: runID,
}
@@ -112,7 +115,11 @@ func (vm *Manager) Close() {
func (vm *Manager) startVisitor(cfg v1.VisitorConfigurer) (err error) {
xl := xlog.FromContextSafe(vm.ctx)
name := cfg.GetBaseConfig().Name
visitor := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.helper)
visitor, err := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.helper)
if err != nil {
xl.Warnf("new visitor error: %v", err)
return
}
err = visitor.Run()
if err != nil {
xl.Warnf("start error: %v", err)
@@ -187,6 +194,7 @@ func (vm *Manager) TransferConn(name string, conn net.Conn) error {
type visitorHelperImpl struct {
connectServerFn func() (net.Conn, error)
msgTransporter transport.MessageTransporter
vnetController *vnet.Controller
transferConnFn func(name string, conn net.Conn) error
runID string
}
@@ -203,6 +211,10 @@ func (v *visitorHelperImpl) MsgTransporter() transport.MessageTransporter {
return v.msgTransporter
}
func (v *visitorHelperImpl) VNetController() *vnet.Controller {
return v.vnetController
}
func (v *visitorHelperImpl) RunID() string {
return v.runID
}

View File

@@ -73,6 +73,10 @@ func (sv *XTCPVisitor) Run() (err error) {
sv.retryLimiter = rate.NewLimiter(rate.Every(time.Hour/time.Duration(sv.cfg.MaxRetriesAnHour)), sv.cfg.MaxRetriesAnHour)
go sv.keepTunnelOpenWorker()
}
if sv.plugin != nil {
sv.plugin.Start()
}
return
}
@@ -157,9 +161,9 @@ func (sv *XTCPVisitor) keepTunnelOpenWorker() {
func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
xl := xlog.FromContextSafe(sv.ctx)
isConnTrasfered := false
isConnTransfered := false
defer func() {
if !isConnTrasfered {
if !isConnTransfered {
userConn.Close()
}
}()
@@ -187,7 +191,7 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
xl.Errorf("transfer connection to visitor %s error: %v", sv.cfg.FallbackTo, err)
return
}
isConnTrasfered = true
isConnTransfered = true
return
}