service.Run supports passing in context (#3504)

This commit is contained in:
fatedier 2023-06-29 18:04:20 +08:00 committed by GitHub
parent 801e8c6742
commit 4c4d5f0d0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 39 additions and 18 deletions

View File

@ -4,7 +4,3 @@ copilot:summary
### WHY ### WHY
<!-- author to complete --> <!-- author to complete -->
### Walkthrough
copilot:walkthrough

View File

@ -86,16 +86,14 @@ func NewService(
visitorCfgs map[string]config.VisitorConf, visitorCfgs map[string]config.VisitorConf,
cfgFile string, cfgFile string,
) (svr *Service, err error) { ) (svr *Service, err error) {
ctx, cancel := context.WithCancel(context.Background())
svr = &Service{ svr = &Service{
authSetter: auth.NewAuthSetter(cfg.ClientConfig), authSetter: auth.NewAuthSetter(cfg.ClientConfig),
cfg: cfg, cfg: cfg,
cfgFile: cfgFile, cfgFile: cfgFile,
pxyCfgs: pxyCfgs, pxyCfgs: pxyCfgs,
visitorCfgs: visitorCfgs, visitorCfgs: visitorCfgs,
ctx: context.Background(),
exit: 0, exit: 0,
ctx: xlog.NewContext(ctx, xlog.New()),
cancel: cancel,
} }
return return
} }
@ -106,7 +104,11 @@ func (svr *Service) GetController() *Control {
return svr.ctl return svr.ctl
} }
func (svr *Service) Run() error { func (svr *Service) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
svr.ctx = xlog.NewContext(ctx, xlog.New())
svr.cancel = cancel
xl := xlog.FromContextSafe(svr.ctx) xl := xlog.FromContextSafe(svr.ctx)
// set custom DNSServer // set custom DNSServer
@ -161,6 +163,10 @@ func (svr *Service) Run() error {
log.Info("admin server listen on %s:%d", svr.cfg.AdminAddr, svr.cfg.AdminPort) log.Info("admin server listen on %s:%d", svr.cfg.AdminAddr, svr.cfg.AdminPort)
} }
<-svr.ctx.Done() <-svr.ctx.Done()
// service context may not be canceled by svr.Close(), we should call it here to release resources
if atomic.LoadUint32(&svr.exit) == 0 {
svr.Close()
}
return nil return nil
} }
@ -182,7 +188,7 @@ func (svr *Service) keepControllerWorking() {
return return
} }
// the first three retry with no delay // the first three attempts with a low delay
if reconnectCounts > 3 { if reconnectCounts > 3 {
util.RandomSleep(reconnectDelay, 0.9, 1.1) util.RandomSleep(reconnectDelay, 0.9, 1.1)
xl.Info("wait %v to reconnect", reconnectDelay) xl.Info("wait %v to reconnect", reconnectDelay)
@ -322,11 +328,14 @@ func (svr *Service) GracefulClose(d time.Duration) {
svr.ctlMu.RLock() svr.ctlMu.RLock()
if svr.ctl != nil { if svr.ctl != nil {
svr.ctl.GracefulClose(d) svr.ctl.GracefulClose(d)
svr.ctl = nil
} }
svr.ctlMu.RUnlock() svr.ctlMu.RUnlock()
if svr.cancel != nil {
svr.cancel() svr.cancel()
} }
}
type ConnectionManager struct { type ConnectionManager struct {
ctx context.Context ctx context.Context

View File

@ -15,6 +15,7 @@
package sub package sub
import ( import (
"context"
"fmt" "fmt"
"io/fs" "io/fs"
"net" "net"
@ -233,7 +234,7 @@ func startService(
go handleSignal(svr, closedDoneCh) go handleSignal(svr, closedDoneCh)
} }
err = svr.Run() err = svr.Run(context.Background())
if err == nil && shouldGracefulClose { if err == nil && shouldGracefulClose {
<-closedDoneCh <-closedDoneCh
} }

View File

@ -15,6 +15,7 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"os" "os"
@ -210,6 +211,6 @@ func runServer(cfg config.ServerCommonConf) (err error) {
return err return err
} }
log.Info("frps started successfully") log.Info("frps started successfully")
svr.Run() svr.Run(context.Background())
return return
} }

View File

@ -115,7 +115,6 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
return return
} }
ctx, cancel := context.WithCancel(context.Background())
svr = &Service{ svr = &Service{
ctlManager: NewControlManager(), ctlManager: NewControlManager(),
pxyManager: proxy.NewManager(), pxyManager: proxy.NewManager(),
@ -129,8 +128,7 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
authVerifier: auth.NewAuthVerifier(cfg.ServerConfig), authVerifier: auth.NewAuthVerifier(cfg.ServerConfig),
tlsConfig: tlsConfig, tlsConfig: tlsConfig,
cfg: cfg, cfg: cfg,
ctx: ctx, ctx: context.Background(),
cancel: cancel,
} }
// Create tcpmux httpconnect multiplexer. // Create tcpmux httpconnect multiplexer.
@ -329,7 +327,11 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
return return
} }
func (svr *Service) Run() { func (svr *Service) Run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
svr.ctx = ctx
svr.cancel = cancel
if svr.kcpListener != nil { if svr.kcpListener != nil {
go svr.HandleListener(svr.kcpListener) go svr.HandleListener(svr.kcpListener)
} }
@ -343,27 +345,39 @@ func (svr *Service) Run() {
go svr.rc.NatHoleController.CleanWorker(svr.ctx) go svr.rc.NatHoleController.CleanWorker(svr.ctx)
} }
svr.HandleListener(svr.listener) svr.HandleListener(svr.listener)
<-svr.ctx.Done()
// service context may not be canceled by svr.Close(), we should call it here to release resources
if svr.listener != nil {
svr.Close()
}
} }
func (svr *Service) Close() error { func (svr *Service) Close() error {
if svr.kcpListener != nil { if svr.kcpListener != nil {
svr.kcpListener.Close() svr.kcpListener.Close()
svr.kcpListener = nil
} }
if svr.quicListener != nil { if svr.quicListener != nil {
svr.quicListener.Close() svr.quicListener.Close()
svr.quicListener = nil
} }
if svr.websocketListener != nil { if svr.websocketListener != nil {
svr.websocketListener.Close() svr.websocketListener.Close()
svr.websocketListener = nil
} }
if svr.tlsListener != nil { if svr.tlsListener != nil {
svr.tlsListener.Close() svr.tlsListener.Close()
svr.tlsConfig = nil
} }
if svr.listener != nil { if svr.listener != nil {
svr.listener.Close() svr.listener.Close()
svr.listener = nil
} }
svr.cancel()
svr.ctlManager.Close() svr.ctlManager.Close()
if svr.cancel != nil {
svr.cancel()
}
return nil return nil
} }