mirror of
https://github.com/fatedier/frp.git
synced 2026-01-11 22:23:12 +00:00
fix: frpc reconnect frps frequently lead to memory leak (#1722)
This commit is contained in:
@@ -183,6 +183,7 @@ func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) {
|
||||
func (ctl *Control) Close() error {
|
||||
ctl.pm.Close()
|
||||
ctl.conn.Close()
|
||||
ctl.vm.Close()
|
||||
if ctl.session != nil {
|
||||
ctl.session.Close()
|
||||
}
|
||||
|
||||
@@ -142,12 +142,34 @@ func (svr *Service) keepControllerWorking() {
|
||||
maxDelayTime := 20 * time.Second
|
||||
delayTime := time.Second
|
||||
|
||||
// if frpc reconnect frps, we need to limit retry times in 1min
|
||||
// current retry logic is sleep 0s, 0s, 0s, 1s, 2s, 4s, 8s, ...
|
||||
// when exceed 1min, we will reset delay and counts
|
||||
cutoffTime := time.Now().Add(time.Minute)
|
||||
reconnectDelay := time.Second
|
||||
reconnectCounts := 1
|
||||
|
||||
for {
|
||||
<-svr.ctl.ClosedDoneCh()
|
||||
if atomic.LoadUint32(&svr.exit) != 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// the first three retry with no delay
|
||||
if reconnectCounts > 3 {
|
||||
time.Sleep(reconnectDelay)
|
||||
reconnectDelay *= 2
|
||||
}
|
||||
reconnectCounts++
|
||||
|
||||
now := time.Now()
|
||||
if now.After(cutoffTime) {
|
||||
// reset
|
||||
cutoffTime = now.Add(time.Minute)
|
||||
reconnectDelay = time.Second
|
||||
reconnectCounts = 1
|
||||
}
|
||||
|
||||
for {
|
||||
xl.Info("try to reconnect to server...")
|
||||
conn, session, err := svr.login()
|
||||
@@ -166,6 +188,9 @@ func (svr *Service) keepControllerWorking() {
|
||||
ctl := NewControl(svr.ctx, svr.runId, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort, svr.authSetter)
|
||||
ctl.Run()
|
||||
svr.ctlMu.Lock()
|
||||
if svr.ctl != nil {
|
||||
svr.ctl.Close()
|
||||
}
|
||||
svr.ctl = ctl
|
||||
svr.ctlMu.Unlock()
|
||||
break
|
||||
|
||||
@@ -33,6 +33,8 @@ type VisitorManager struct {
|
||||
|
||||
mu sync.Mutex
|
||||
ctx context.Context
|
||||
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func NewVisitorManager(ctx context.Context, ctl *Control) *VisitorManager {
|
||||
@@ -42,22 +44,32 @@ func NewVisitorManager(ctx context.Context, ctl *Control) *VisitorManager {
|
||||
visitors: make(map[string]Visitor),
|
||||
checkInterval: 10 * time.Second,
|
||||
ctx: ctx,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (vm *VisitorManager) Run() {
|
||||
xl := xlog.FromContextSafe(vm.ctx)
|
||||
|
||||
ticker := time.NewTicker(vm.checkInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
time.Sleep(vm.checkInterval)
|
||||
vm.mu.Lock()
|
||||
for _, cfg := range vm.cfgs {
|
||||
name := cfg.GetBaseInfo().ProxyName
|
||||
if _, exist := vm.visitors[name]; !exist {
|
||||
xl.Info("try to start visitor [%s]", name)
|
||||
vm.startVisitor(cfg)
|
||||
select {
|
||||
case <-vm.stopCh:
|
||||
xl.Info("gracefully shutdown visitor manager")
|
||||
return
|
||||
case <-ticker.C:
|
||||
vm.mu.Lock()
|
||||
for _, cfg := range vm.cfgs {
|
||||
name := cfg.GetBaseInfo().ProxyName
|
||||
if _, exist := vm.visitors[name]; !exist {
|
||||
xl.Info("try to start visitor [%s]", name)
|
||||
vm.startVisitor(cfg)
|
||||
}
|
||||
}
|
||||
vm.mu.Unlock()
|
||||
}
|
||||
vm.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -126,4 +138,9 @@ func (vm *VisitorManager) Close() {
|
||||
for _, v := range vm.visitors {
|
||||
v.Close()
|
||||
}
|
||||
select {
|
||||
case <-vm.stopCh:
|
||||
default:
|
||||
close(vm.stopCh)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user