Code refactoring related to message handling and retry logic. (#3745)

This commit is contained in:
fatedier
2023-11-06 10:51:48 +08:00
committed by GitHub
parent 5760c1cf92
commit 184223cb2f
11 changed files with 701 additions and 540 deletions

View File

@@ -17,15 +17,12 @@ package server
import (
"context"
"fmt"
"io"
"net"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/fatedier/golib/control/shutdown"
"github.com/fatedier/golib/crypto"
"github.com/fatedier/golib/errors"
"github.com/samber/lo"
"github.com/fatedier/frp/pkg/auth"
@@ -35,8 +32,10 @@ import (
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/version"
"github.com/fatedier/frp/pkg/util/wait"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/metrics"
@@ -111,18 +110,16 @@ type Control struct {
// other components can use this to communicate with client
msgTransporter transport.MessageTransporter
// msgDispatcher is a wrapper for control connection.
// It provides a channel for sending messages, and you can register handlers to process messages based on their respective types.
msgDispatcher *msg.Dispatcher
// login message
loginMsg *msg.Login
// control connection
conn net.Conn
// put a message in this channel to send it over control connection to client
sendCh chan (msg.Message)
// read from this channel to get the next message sent by client
readCh chan (msg.Message)
// work connections
workConnCh chan net.Conn
@@ -136,27 +133,21 @@ type Control struct {
portsUsedNum int
// last time got the Ping message
lastPing time.Time
lastPing atomic.Value
// A new run id will be generated when a new client login.
// If run id got from login message has same run id, it means it's the same client, so we can
// replace old controller instantly.
runID string
readerShutdown *shutdown.Shutdown
writerShutdown *shutdown.Shutdown
managerShutdown *shutdown.Shutdown
allShutdown *shutdown.Shutdown
started bool
mu sync.RWMutex
// Server configuration information
serverCfg *v1.ServerConfig
xl *xlog.Logger
ctx context.Context
xl *xlog.Logger
ctx context.Context
doneCh chan struct{}
}
func NewControl(
@@ -168,36 +159,38 @@ func NewControl(
ctlConn net.Conn,
loginMsg *msg.Login,
serverCfg *v1.ServerConfig,
) *Control {
) (*Control, error) {
poolCount := loginMsg.PoolCount
if poolCount > int(serverCfg.Transport.MaxPoolCount) {
poolCount = int(serverCfg.Transport.MaxPoolCount)
}
ctl := &Control{
rc: rc,
pxyManager: pxyManager,
pluginManager: pluginManager,
authVerifier: authVerifier,
conn: ctlConn,
loginMsg: loginMsg,
sendCh: make(chan msg.Message, 10),
readCh: make(chan msg.Message, 10),
workConnCh: make(chan net.Conn, poolCount+10),
proxies: make(map[string]proxy.Proxy),
poolCount: poolCount,
portsUsedNum: 0,
lastPing: time.Now(),
runID: loginMsg.RunID,
readerShutdown: shutdown.New(),
writerShutdown: shutdown.New(),
managerShutdown: shutdown.New(),
allShutdown: shutdown.New(),
serverCfg: serverCfg,
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
rc: rc,
pxyManager: pxyManager,
pluginManager: pluginManager,
authVerifier: authVerifier,
conn: ctlConn,
loginMsg: loginMsg,
workConnCh: make(chan net.Conn, poolCount+10),
proxies: make(map[string]proxy.Proxy),
poolCount: poolCount,
portsUsedNum: 0,
runID: loginMsg.RunID,
serverCfg: serverCfg,
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
doneCh: make(chan struct{}),
}
ctl.msgTransporter = transport.NewMessageTransporter(ctl.sendCh)
return ctl
ctl.lastPing.Store(time.Now())
cryptoRW, err := utilnet.NewCryptoReadWriter(ctl.conn, []byte(ctl.serverCfg.Auth.Token))
if err != nil {
return nil, err
}
ctl.msgDispatcher = msg.NewDispatcher(cryptoRW)
ctl.registerMsgHandlers()
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
return ctl, nil
}
// Start send a login success message to client and start working.
@@ -208,27 +201,18 @@ func (ctl *Control) Start() {
Error: "",
}
_ = msg.WriteMsg(ctl.conn, loginRespMsg)
ctl.mu.Lock()
ctl.started = true
ctl.mu.Unlock()
go ctl.writer()
go func() {
for i := 0; i < ctl.poolCount; i++ {
// ignore error here, that means that this control is closed
_ = errors.PanicToError(func() {
ctl.sendCh <- &msg.ReqWorkConn{}
})
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
}
}()
go ctl.manager()
go ctl.reader()
go ctl.stoper()
go ctl.worker()
}
func (ctl *Control) Close() error {
ctl.allShutdown.Start()
ctl.conn.Close()
return nil
}
@@ -236,7 +220,7 @@ func (ctl *Control) Replaced(newCtl *Control) {
xl := ctl.xl
xl.Info("Replaced by client [%s]", newCtl.runID)
ctl.runID = ""
ctl.allShutdown.Start()
ctl.conn.Close()
}
func (ctl *Control) RegisterWorkConn(conn net.Conn) error {
@@ -282,9 +266,7 @@ func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
xl.Debug("get work connection from pool")
default:
// no work connections available in the poll, send message to frpc to get more
if err = errors.PanicToError(func() {
ctl.sendCh <- &msg.ReqWorkConn{}
}); err != nil {
if err := ctl.msgDispatcher.Send(&msg.ReqWorkConn{}); err != nil {
return nil, fmt.Errorf("control is already closed")
}
@@ -304,92 +286,39 @@ func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
}
// When we get a work connection from pool, replace it with a new one.
_ = errors.PanicToError(func() {
ctl.sendCh <- &msg.ReqWorkConn{}
})
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
return
}
func (ctl *Control) writer() {
func (ctl *Control) heartbeatWorker() {
xl := ctl.xl
defer func() {
if err := recover(); err != nil {
xl.Error("panic error: %v", err)
xl.Error(string(debug.Stack()))
}
}()
defer ctl.allShutdown.Start()
defer ctl.writerShutdown.Done()
encWriter, err := crypto.NewWriter(ctl.conn, []byte(ctl.serverCfg.Auth.Token))
if err != nil {
xl.Error("crypto new writer error: %v", err)
ctl.allShutdown.Start()
return
}
for {
m, ok := <-ctl.sendCh
if !ok {
xl.Info("control writer is closing")
return
}
if err := msg.WriteMsg(encWriter, m); err != nil {
xl.Warn("write message to control connection error: %v", err)
return
}
}
}
func (ctl *Control) reader() {
xl := ctl.xl
defer func() {
if err := recover(); err != nil {
xl.Error("panic error: %v", err)
xl.Error(string(debug.Stack()))
}
}()
defer ctl.allShutdown.Start()
defer ctl.readerShutdown.Done()
encReader := crypto.NewReader(ctl.conn, []byte(ctl.serverCfg.Auth.Token))
for {
m, err := msg.ReadMsg(encReader)
if err != nil {
if err == io.EOF {
xl.Debug("control connection closed")
// Don't need application heartbeat if TCPMux is enabled,
// yamux will do same thing.
// TODO(fatedier): let default HeartbeatTimeout to -1 if TCPMux is enabled. Users can still set it to positive value to enable it.
if !lo.FromPtr(ctl.serverCfg.Transport.TCPMux) && ctl.serverCfg.Transport.HeartbeatTimeout > 0 {
go wait.Until(func() {
if time.Since(ctl.lastPing.Load().(time.Time)) > time.Duration(ctl.serverCfg.Transport.HeartbeatTimeout)*time.Second {
xl.Warn("heartbeat timeout")
return
}
xl.Warn("read error: %v", err)
ctl.conn.Close()
return
}
ctl.readCh <- m
}, time.Second, ctl.doneCh)
}
}
func (ctl *Control) stoper() {
// block until Control closed
func (ctl *Control) WaitClosed() {
<-ctl.doneCh
}
func (ctl *Control) worker() {
xl := ctl.xl
defer func() {
if err := recover(); err != nil {
xl.Error("panic error: %v", err)
xl.Error(string(debug.Stack()))
}
}()
ctl.allShutdown.WaitStart()
go ctl.heartbeatWorker()
go ctl.msgDispatcher.Run()
<-ctl.msgDispatcher.Done()
ctl.conn.Close()
ctl.readerShutdown.WaitDone()
close(ctl.readCh)
ctl.managerShutdown.WaitDone()
close(ctl.sendCh)
ctl.writerShutdown.WaitDone()
ctl.mu.Lock()
defer ctl.mu.Unlock()
@@ -419,136 +348,104 @@ func (ctl *Control) stoper() {
}()
}
ctl.allShutdown.Done()
xl.Info("client exit success")
metrics.Server.CloseClient()
xl.Info("client exit success")
close(ctl.doneCh)
}
// block until Control closed
func (ctl *Control) WaitClosed() {
ctl.mu.RLock()
started := ctl.started
ctl.mu.RUnlock()
func (ctl *Control) registerMsgHandlers() {
ctl.msgDispatcher.RegisterHandler(&msg.NewProxy{}, ctl.handleNewProxy)
ctl.msgDispatcher.RegisterHandler(&msg.Ping{}, ctl.handlePing)
ctl.msgDispatcher.RegisterHandler(&msg.NatHoleVisitor{}, msg.AsyncHandler(ctl.handleNatHoleVisitor))
ctl.msgDispatcher.RegisterHandler(&msg.NatHoleClient{}, msg.AsyncHandler(ctl.handleNatHoleClient))
ctl.msgDispatcher.RegisterHandler(&msg.NatHoleReport{}, msg.AsyncHandler(ctl.handleNatHoleReport))
ctl.msgDispatcher.RegisterHandler(&msg.CloseProxy{}, ctl.handleCloseProxy)
}
if !started {
ctl.allShutdown.Done()
func (ctl *Control) handleNewProxy(m msg.Message) {
xl := ctl.xl
inMsg := m.(*msg.NewProxy)
content := &plugin.NewProxyContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
NewProxy: *inMsg,
}
var remoteAddr string
retContent, err := ctl.pluginManager.NewProxy(content)
if err == nil {
inMsg = &retContent.NewProxy
remoteAddr, err = ctl.RegisterProxy(inMsg)
}
// register proxy in this control
resp := &msg.NewProxyResp{
ProxyName: inMsg.ProxyName,
}
if err != nil {
xl.Warn("new proxy [%s] type [%s] error: %v", inMsg.ProxyName, inMsg.ProxyType, err)
resp.Error = util.GenerateResponseErrorString(fmt.Sprintf("new proxy [%s] error", inMsg.ProxyName),
err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient))
} else {
resp.RemoteAddr = remoteAddr
xl.Info("new proxy [%s] type [%s] success", inMsg.ProxyName, inMsg.ProxyType)
metrics.Server.NewProxy(inMsg.ProxyName, inMsg.ProxyType)
}
_ = ctl.msgDispatcher.Send(resp)
}
func (ctl *Control) handlePing(m msg.Message) {
xl := ctl.xl
inMsg := m.(*msg.Ping)
content := &plugin.PingContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
Ping: *inMsg,
}
retContent, err := ctl.pluginManager.Ping(content)
if err == nil {
inMsg = &retContent.Ping
err = ctl.authVerifier.VerifyPing(inMsg)
}
if err != nil {
xl.Warn("received invalid ping: %v", err)
_ = ctl.msgDispatcher.Send(&msg.Pong{
Error: util.GenerateResponseErrorString("invalid ping", err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient)),
})
return
}
ctl.allShutdown.WaitDone()
ctl.lastPing.Store(time.Now())
xl.Debug("receive heartbeat")
_ = ctl.msgDispatcher.Send(&msg.Pong{})
}
func (ctl *Control) manager() {
func (ctl *Control) handleNatHoleVisitor(m msg.Message) {
inMsg := m.(*msg.NatHoleVisitor)
ctl.rc.NatHoleController.HandleVisitor(inMsg, ctl.msgTransporter, ctl.loginMsg.User)
}
func (ctl *Control) handleNatHoleClient(m msg.Message) {
inMsg := m.(*msg.NatHoleClient)
ctl.rc.NatHoleController.HandleClient(inMsg, ctl.msgTransporter)
}
func (ctl *Control) handleNatHoleReport(m msg.Message) {
inMsg := m.(*msg.NatHoleReport)
ctl.rc.NatHoleController.HandleReport(inMsg)
}
func (ctl *Control) handleCloseProxy(m msg.Message) {
xl := ctl.xl
defer func() {
if err := recover(); err != nil {
xl.Error("panic error: %v", err)
xl.Error(string(debug.Stack()))
}
}()
defer ctl.allShutdown.Start()
defer ctl.managerShutdown.Done()
var heartbeatCh <-chan time.Time
// Don't need application heartbeat if TCPMux is enabled,
// yamux will do same thing.
if !lo.FromPtr(ctl.serverCfg.Transport.TCPMux) && ctl.serverCfg.Transport.HeartbeatTimeout > 0 {
heartbeat := time.NewTicker(time.Second)
defer heartbeat.Stop()
heartbeatCh = heartbeat.C
}
for {
select {
case <-heartbeatCh:
if time.Since(ctl.lastPing) > time.Duration(ctl.serverCfg.Transport.HeartbeatTimeout)*time.Second {
xl.Warn("heartbeat timeout")
return
}
case rawMsg, ok := <-ctl.readCh:
if !ok {
return
}
switch m := rawMsg.(type) {
case *msg.NewProxy:
content := &plugin.NewProxyContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
NewProxy: *m,
}
var remoteAddr string
retContent, err := ctl.pluginManager.NewProxy(content)
if err == nil {
m = &retContent.NewProxy
remoteAddr, err = ctl.RegisterProxy(m)
}
// register proxy in this control
resp := &msg.NewProxyResp{
ProxyName: m.ProxyName,
}
if err != nil {
xl.Warn("new proxy [%s] type [%s] error: %v", m.ProxyName, m.ProxyType, err)
resp.Error = util.GenerateResponseErrorString(fmt.Sprintf("new proxy [%s] error", m.ProxyName),
err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient))
} else {
resp.RemoteAddr = remoteAddr
xl.Info("new proxy [%s] type [%s] success", m.ProxyName, m.ProxyType)
metrics.Server.NewProxy(m.ProxyName, m.ProxyType)
}
ctl.sendCh <- resp
case *msg.NatHoleVisitor:
go ctl.HandleNatHoleVisitor(m)
case *msg.NatHoleClient:
go ctl.HandleNatHoleClient(m)
case *msg.NatHoleReport:
go ctl.HandleNatHoleReport(m)
case *msg.CloseProxy:
_ = ctl.CloseProxy(m)
xl.Info("close proxy [%s] success", m.ProxyName)
case *msg.Ping:
content := &plugin.PingContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
Ping: *m,
}
retContent, err := ctl.pluginManager.Ping(content)
if err == nil {
m = &retContent.Ping
err = ctl.authVerifier.VerifyPing(m)
}
if err != nil {
xl.Warn("received invalid ping: %v", err)
ctl.sendCh <- &msg.Pong{
Error: util.GenerateResponseErrorString("invalid ping", err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient)),
}
return
}
ctl.lastPing = time.Now()
xl.Debug("receive heartbeat")
ctl.sendCh <- &msg.Pong{}
}
}
}
}
func (ctl *Control) HandleNatHoleVisitor(m *msg.NatHoleVisitor) {
ctl.rc.NatHoleController.HandleVisitor(m, ctl.msgTransporter, ctl.loginMsg.User)
}
func (ctl *Control) HandleNatHoleClient(m *msg.NatHoleClient) {
ctl.rc.NatHoleController.HandleClient(m, ctl.msgTransporter)
}
func (ctl *Control) HandleNatHoleReport(m *msg.NatHoleReport) {
ctl.rc.NatHoleController.HandleReport(m)
inMsg := m.(*msg.CloseProxy)
_ = ctl.CloseProxy(inMsg)
xl.Info("close proxy [%s] success", inMsg.ProxyName)
}
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {

View File

@@ -516,13 +516,14 @@ func (svr *Service) HandleQUICListener(l *quic.Listener) {
}
}
func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err error) {
func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) error {
// If client's RunID is empty, it's a new client, we just create a new controller.
// Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one.
var err error
if loginMsg.RunID == "" {
loginMsg.RunID, err = util.RandID()
if err != nil {
return
return err
}
}
@@ -534,11 +535,16 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err
ctlConn.RemoteAddr().String(), loginMsg.Version, loginMsg.Hostname, loginMsg.Os, loginMsg.Arch)
// Check auth.
if err = svr.authVerifier.VerifyLogin(loginMsg); err != nil {
return
if err := svr.authVerifier.VerifyLogin(loginMsg); err != nil {
return err
}
ctl := NewControl(ctx, svr.rc, svr.pxyManager, svr.pluginManager, svr.authVerifier, ctlConn, loginMsg, svr.cfg)
ctl, err := NewControl(ctx, svr.rc, svr.pxyManager, svr.pluginManager, svr.authVerifier, ctlConn, loginMsg, svr.cfg)
if err != nil {
xl.Warn("create new controller error: %v", err)
// don't return detailed errors to client
return fmt.Errorf("unexpect error when creating new controller")
}
if oldCtl := svr.ctlManager.Add(loginMsg.RunID, ctl); oldCtl != nil {
oldCtl.WaitClosed()
}
@@ -553,7 +559,7 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err
ctl.WaitClosed()
svr.ctlManager.Del(loginMsg.RunID, ctl)
}()
return
return nil
}
// RegisterWorkConn register a new work connection to control and proxies need it.