refactor the code related to xtcp (#3449)

This commit is contained in:
fatedier
2023-05-28 16:50:43 +08:00
committed by GitHub
parent 9f029e3248
commit c71efde303
44 changed files with 3305 additions and 1699 deletions

View File

@@ -33,6 +33,7 @@ import (
frpErr "github.com/fatedier/frp/pkg/errors"
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/version"
"github.com/fatedier/frp/pkg/util/xlog"
@@ -82,6 +83,16 @@ func (cm *ControlManager) GetByID(runID string) (ctl *Control, ok bool) {
return
}
func (cm *ControlManager) Close() error {
cm.mu.Lock()
defer cm.mu.Unlock()
for _, ctl := range cm.ctlsByRunID {
ctl.Close()
}
cm.ctlsByRunID = make(map[string]*Control)
return nil
}
type Control struct {
// all resource managers and controllers
rc *controller.ResourceController
@@ -95,6 +106,9 @@ type Control struct {
// verifies authentication based on selected method
authVerifier auth.Verifier
// other components can use this to communicate with client
msgTransporter transport.MessageTransporter
// login message
loginMsg *msg.Login
@@ -158,7 +172,7 @@ func NewControl(
if poolCount > int(serverCfg.MaxPoolCount) {
poolCount = int(serverCfg.MaxPoolCount)
}
return &Control{
ctl := &Control{
rc: rc,
pxyManager: pxyManager,
pluginManager: pluginManager,
@@ -182,6 +196,8 @@ func NewControl(
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
}
ctl.msgTransporter = transport.NewMessageTransporter(ctl.sendCh)
return ctl
}
// Start send a login success message to client and start working.
@@ -204,6 +220,18 @@ func (ctl *Control) Start() {
go ctl.stoper()
}
func (ctl *Control) Close() error {
ctl.allShutdown.Start()
return nil
}
func (ctl *Control) Replaced(newCtl *Control) {
xl := ctl.xl
xl.Info("Replaced by client [%s]", newCtl.runID)
ctl.runID = ""
ctl.allShutdown.Start()
}
func (ctl *Control) RegisterWorkConn(conn net.Conn) error {
xl := ctl.xl
defer func() {
@@ -275,13 +303,6 @@ func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
return
}
func (ctl *Control) Replaced(newCtl *Control) {
xl := ctl.xl
xl.Info("Replaced by client [%s]", newCtl.runID)
ctl.runID = ""
ctl.allShutdown.Start()
}
func (ctl *Control) writer() {
xl := ctl.xl
defer func() {
@@ -465,6 +486,12 @@ func (ctl *Control) manager() {
metrics.Server.NewProxy(m.ProxyName, m.ProxyType)
}
ctl.sendCh <- resp
case *msg.NatHoleVisitor:
go ctl.HandleNatHoleVisitor(m)
case *msg.NatHoleClient:
go ctl.HandleNatHoleClient(m)
case *msg.NatHoleReport:
go ctl.HandleNatHoleReport(m)
case *msg.CloseProxy:
_ = ctl.CloseProxy(m)
xl.Info("close proxy [%s] success", m.ProxyName)
@@ -497,6 +524,18 @@ func (ctl *Control) manager() {
}
}
func (ctl *Control) HandleNatHoleVisitor(m *msg.NatHoleVisitor) {
ctl.rc.NatHoleController.HandleVisitor(m, ctl.msgTransporter)
}
func (ctl *Control) HandleNatHoleClient(m *msg.NatHoleClient) {
ctl.rc.NatHoleController.HandleClient(m, ctl.msgTransporter)
}
func (ctl *Control) HandleNatHoleReport(m *msg.NatHoleReport) {
ctl.rc.NatHoleController.HandleReport(m)
}
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
var pxyConf config.ProxyConf
// Load configures from NewProxy message and check.

View File

@@ -44,41 +44,20 @@ func (pxy *XTCPProxy) Run() (remoteAddr string, err error) {
for {
select {
case <-pxy.closeCh:
break
case sidRequest := <-sidCh:
sr := sidRequest
return
case sid := <-sidCh:
workConn, errRet := pxy.GetWorkConnFromPool(nil, nil)
if errRet != nil {
continue
}
m := &msg.NatHoleSid{
Sid: sr.Sid,
Sid: sid,
}
errRet = msg.WriteMsg(workConn, m)
if errRet != nil {
xl.Warn("write nat hole sid package error, %v", errRet)
workConn.Close()
break
}
go func() {
raw, errRet := msg.ReadMsg(workConn)
if errRet != nil {
xl.Warn("read nat hole client ok package error: %v", errRet)
workConn.Close()
return
}
if _, ok := raw.(*msg.NatHoleClientDetectOK); !ok {
xl.Warn("read nat hole client ok package format error")
workConn.Close()
return
}
select {
case sr.NotifyCh <- struct{}{}:
default:
}
}()
workConn.Close()
}
}
}()

View File

@@ -99,6 +99,11 @@ type Service struct {
tlsConfig *tls.Config
cfg config.ServerCommonConf
// service context
ctx context.Context
// call cancel to stop service
cancel context.CancelFunc
}
func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
@@ -110,6 +115,7 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
return
}
ctx, cancel := context.WithCancel(context.Background())
svr = &Service{
ctlManager: NewControlManager(),
pxyManager: proxy.NewManager(),
@@ -123,6 +129,8 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
authVerifier: auth.NewAuthVerifier(cfg.ServerConfig),
tlsConfig: tlsConfig,
cfg: cfg,
ctx: ctx,
cancel: cancel,
}
// Create tcpmux httpconnect multiplexer.
@@ -290,17 +298,12 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
})
// Create nat hole controller.
if cfg.BindUDPPort > 0 {
var nc *nathole.Controller
address := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.BindUDPPort))
nc, err = nathole.NewController(address, []byte(cfg.Token))
if err != nil {
err = fmt.Errorf("create nat hole controller error, %v", err)
return
}
svr.rc.NatHoleController = nc
log.Info("nat hole udp service listen on %s", address)
nc, err := nathole.NewController(time.Duration(cfg.NatHoleAnalysisDataReserveHours) * time.Hour)
if err != nil {
err = fmt.Errorf("create nat hole controller error, %v", err)
return
}
svr.rc.NatHoleController = nc
var statsEnable bool
// Create dashboard web server.
@@ -327,22 +330,43 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
}
func (svr *Service) Run() {
if svr.rc.NatHoleController != nil {
go svr.rc.NatHoleController.Run()
}
if svr.kcpListener != nil {
go svr.HandleListener(svr.kcpListener)
}
if svr.quicListener != nil {
go svr.HandleQUICListener(svr.quicListener)
}
go svr.HandleListener(svr.websocketListener)
go svr.HandleListener(svr.tlsListener)
if svr.rc.NatHoleController != nil {
go svr.rc.NatHoleController.CleanWorker(svr.ctx)
}
svr.HandleListener(svr.listener)
}
func (svr *Service) Close() error {
if svr.kcpListener != nil {
svr.kcpListener.Close()
}
if svr.quicListener != nil {
svr.quicListener.Close()
}
if svr.websocketListener != nil {
svr.websocketListener.Close()
}
if svr.tlsListener != nil {
svr.tlsListener.Close()
}
if svr.listener != nil {
svr.listener.Close()
}
svr.cancel()
svr.ctlManager.Close()
return nil
}
func (svr *Service) handleConnection(ctx context.Context, conn net.Conn) {
xl := xlog.FromContextSafe(ctx)