lint by golangci-lint (#3080)

This commit is contained in:
fatedier
2022-08-29 01:02:53 +08:00
committed by GitHub
parent f4e4fbea62
commit 9d077b02cf
122 changed files with 947 additions and 1331 deletions

View File

@@ -20,10 +20,10 @@ import (
"net/http/pprof"
"time"
"github.com/gorilla/mux"
"github.com/fatedier/frp/assets"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/gorilla/mux"
)
var (
@@ -77,6 +77,8 @@ func (svr *Service) RunAdminServer(address string) (err error) {
return err
}
go server.Serve(ln)
go func() {
_ = server.Serve(ln)
}()
return
}

View File

@@ -49,7 +49,7 @@ func (svr *Service) apiReload(w http.ResponseWriter, r *http.Request) {
log.Info("api response [/api/reload], code [%d]", res.Code)
w.WriteHeader(res.Code)
if len(res.Msg) > 0 {
w.Write([]byte(res.Msg))
_, _ = w.Write([]byte(res.Msg))
}
}()
@@ -68,7 +68,6 @@ func (svr *Service) apiReload(w http.ResponseWriter, r *http.Request) {
return
}
log.Info("success reload conf")
return
}
type StatusResp struct {
@@ -173,7 +172,7 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) {
defer func() {
log.Info("Http response [/api/status]")
buf, _ = json.Marshal(&res)
w.Write(buf)
_, _ = w.Write(buf)
}()
ps := svr.ctl.pm.GetAllProxyStatus()
@@ -202,7 +201,6 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) {
sort.Sort(ByProxyStatusResp(res.STCP))
sort.Sort(ByProxyStatusResp(res.XTCP))
sort.Sort(ByProxyStatusResp(res.SUDP))
return
}
// GET api/config
@@ -214,7 +212,7 @@ func (svr *Service) apiGetConfig(w http.ResponseWriter, r *http.Request) {
log.Info("Http get response [/api/config], code [%d]", res.Code)
w.WriteHeader(res.Code)
if len(res.Msg) > 0 {
w.Write([]byte(res.Msg))
_, _ = w.Write([]byte(res.Msg))
}
}()
@@ -254,7 +252,7 @@ func (svr *Service) apiPutConfig(w http.ResponseWriter, r *http.Request) {
log.Info("Http put response [/api/config], code [%d]", res.Code)
w.WriteHeader(res.Code)
if len(res.Msg) > 0 {
w.Write([]byte(res.Msg))
_, _ = w.Write([]byte(res.Msg))
}
}()
@@ -315,7 +313,7 @@ func (svr *Service) apiPutConfig(w http.ResponseWriter, r *http.Request) {
}
content = strings.Join(newRows, "\n")
err = os.WriteFile(svr.cfgFile, []byte(content), 0644)
err = os.WriteFile(svr.cfgFile, []byte(content), 0o644)
if err != nil {
res.Code = 500
res.Msg = fmt.Sprintf("write content to frpc config file error: %v", err)

View File

@@ -21,9 +21,13 @@ import (
"net"
"runtime/debug"
"strconv"
"sync"
"time"
"github.com/fatedier/golib/control/shutdown"
"github.com/fatedier/golib/crypto"
libdial "github.com/fatedier/golib/net/dial"
fmux "github.com/hashicorp/yamux"
"github.com/fatedier/frp/client/proxy"
"github.com/fatedier/frp/pkg/auth"
"github.com/fatedier/frp/pkg/config"
@@ -31,11 +35,6 @@ import (
"github.com/fatedier/frp/pkg/transport"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/golib/control/shutdown"
"github.com/fatedier/golib/crypto"
libdial "github.com/fatedier/golib/net/dial"
fmux "github.com/hashicorp/yamux"
)
type Control struct {
@@ -79,8 +78,6 @@ type Control struct {
// The UDP port that the server is listening on
serverUDPPort int
mu sync.RWMutex
xl *xlog.Logger
// service context
@@ -95,8 +92,8 @@ func NewControl(ctx context.Context, runID string, conn net.Conn, session *fmux.
pxyCfgs map[string]config.ProxyConf,
visitorCfgs map[string]config.VisitorConf,
serverUDPPort int,
authSetter auth.Setter) *Control {
authSetter auth.Setter,
) *Control {
// new xlog instance
ctl := &Control{
runID: runID,
@@ -131,7 +128,6 @@ func (ctl *Control) Run() {
// start all visitors
go ctl.vm.Run()
return
}
func (ctl *Control) HandleReqWorkConn(inMsg *msg.ReqWorkConn) {
@@ -400,24 +396,21 @@ func (ctl *Control) worker() {
go ctl.reader()
go ctl.writer()
select {
case <-ctl.closedCh:
// close related channels and wait until other goroutines done
close(ctl.readCh)
ctl.readerShutdown.WaitDone()
ctl.msgHandlerShutdown.WaitDone()
<-ctl.closedCh
// close related channels and wait until other goroutines done
close(ctl.readCh)
ctl.readerShutdown.WaitDone()
ctl.msgHandlerShutdown.WaitDone()
close(ctl.sendCh)
ctl.writerShutdown.WaitDone()
close(ctl.sendCh)
ctl.writerShutdown.WaitDone()
ctl.pm.Close()
ctl.vm.Close()
ctl.pm.Close()
ctl.vm.Close()
close(ctl.closedDoneCh)
if ctl.session != nil {
ctl.session.Close()
}
return
close(ctl.closedDoneCh)
if ctl.session != nil {
ctl.session.Close()
}
}

View File

@@ -6,9 +6,7 @@ import (
"github.com/fatedier/frp/pkg/msg"
)
var (
ErrPayloadType = errors.New("error payload type")
)
var ErrPayloadType = errors.New("error payload type")
type Handler func(payload interface{}) error

View File

@@ -26,9 +26,7 @@ import (
"github.com/fatedier/frp/pkg/util/xlog"
)
var (
ErrHealthCheckType = errors.New("error health check type")
)
var ErrHealthCheckType = errors.New("error health check type")
type Monitor struct {
checkType string
@@ -54,8 +52,8 @@ type Monitor struct {
func NewMonitor(ctx context.Context, checkType string,
intervalS int, timeoutS int, maxFailedTimes int,
addr string, url string,
statusNormalFn func(), statusFailedFn func()) *Monitor {
statusNormalFn func(), statusFailedFn func(),
) *Monitor {
if intervalS <= 0 {
intervalS = 10
}
@@ -152,7 +150,7 @@ func (monitor *Monitor) doTCPCheck(ctx context.Context) error {
}
func (monitor *Monitor) doHTTPCheck(ctx context.Context) error {
req, err := http.NewRequest("GET", monitor.url, nil)
req, err := http.NewRequestWithContext(ctx, "GET", monitor.url, nil)
if err != nil {
return err
}
@@ -161,7 +159,7 @@ func (monitor *Monitor) doHTTPCheck(ctx context.Context) error {
return err
}
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
_, _ = io.Copy(io.Discard, resp.Body)
if resp.StatusCode/100 != 2 {
return fmt.Errorf("do http health check, StatusCode is [%d] not 2xx", resp.StatusCode)

View File

@@ -24,14 +24,6 @@ import (
"sync"
"time"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/client"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/golib/errors"
frpIo "github.com/fatedier/golib/io"
libdial "github.com/fatedier/golib/net/dial"
@@ -39,6 +31,14 @@ import (
fmux "github.com/hashicorp/yamux"
pp "github.com/pires/go-proxyproto"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/client"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
)
// Proxy defines how to handle work connections for different proxy type.
@@ -322,7 +322,7 @@ func (pxy *XTCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
// Wait for client address at most 5 seconds.
var natHoleRespMsg msg.NatHoleResp
clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))
_ = clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))
buf := pool.GetBuf(1024)
n, err := clientConn.Read(buf)
@@ -335,8 +335,8 @@ func (pxy *XTCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
xl.Error("get natHoleRespMsg error: %v", err)
return
}
clientConn.SetReadDeadline(time.Time{})
clientConn.Close()
_ = clientConn.SetReadDeadline(time.Time{})
_ = clientConn.Close()
if natHoleRespMsg.Error != "" {
xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
@@ -357,10 +357,13 @@ func (pxy *XTCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
xl.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
return
}
pxy.sendDetectMsg(host, int(port), laddr, []byte(natHoleRespMsg.Sid))
_ = pxy.sendDetectMsg(host, int(port), laddr, []byte(natHoleRespMsg.Sid))
xl.Trace("send all detect msg done")
msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{})
if err := msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{}); err != nil {
xl.Error("write message error: %v", err)
return
}
// Listen for clientConn's address and wait for visitor connection
lConn, err := net.ListenUDP("udp", laddr)
@@ -370,7 +373,7 @@ func (pxy *XTCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
}
defer lConn.Close()
lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
_ = lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
sidBuf := pool.GetBuf(1024)
var uAddr *net.UDPAddr
n, uAddr, err = lConn.ReadFromUDP(sidBuf)
@@ -378,7 +381,7 @@ func (pxy *XTCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
xl.Warn("get sid from visitor error: %v", err)
return
}
lConn.SetReadDeadline(time.Time{})
_ = lConn.SetReadDeadline(time.Time{})
if string(sidBuf[:n]) != natHoleRespMsg.Sid {
xl.Warn("incorrect sid from visitor")
return
@@ -386,7 +389,10 @@ func (pxy *XTCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
pool.PutBuf(sidBuf)
xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
lConn.WriteToUDP(sidBuf[:n], uAddr)
if _, err := lConn.WriteToUDP(sidBuf[:n], uAddr); err != nil {
xl.Error("write uaddr error: %v", err)
return
}
kcpConn, err := frpNet.NewKCPConnFromUDP(lConn, false, uAddr.String())
if err != nil {
@@ -424,12 +430,13 @@ func (pxy *XTCPProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, c
return err
}
//uConn := ipv4.NewConn(tConn)
//uConn.SetTTL(3)
// uConn := ipv4.NewConn(tConn)
// uConn.SetTTL(3)
tConn.Write(content)
tConn.Close()
return nil
if _, err := tConn.Write(content); err != nil {
return err
}
return tConn.Close()
}
// UDP
@@ -539,7 +546,7 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
}
}
}
heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
heartbeatFn := func(sendCh chan msg.Message) {
var errRet error
for {
time.Sleep(time.Duration(30) * time.Second)
@@ -554,7 +561,7 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
go workConnSenderFn(pxy.workConn, pxy.sendCh)
go workConnReaderFn(pxy.workConn, pxy.readCh)
go heartbeatFn(pxy.workConn, pxy.sendCh)
go heartbeatFn(pxy.sendCh)
udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh, int(pxy.clientCfg.UDPPacketSize))
}
@@ -685,7 +692,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
}
}
heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
heartbeatFn := func(sendCh chan msg.Message) {
ticker := time.NewTicker(30 * time.Second)
defer func() {
ticker.Stop()
@@ -711,14 +718,15 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
go workConnSenderFn(workConn, sendCh)
go workConnReaderFn(workConn, readCh)
go heartbeatFn(workConn, sendCh)
go heartbeatFn(sendCh)
udp.Forwarder(pxy.localAddr, readCh, sendCh, int(pxy.clientCfg.UDPPacketSize))
}
// Common handler for tcp work connections.
func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) {
baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn,
) {
xl := xlog.FromContextSafe(ctx)
var (
remote io.ReadWriteCloser
@@ -773,7 +781,7 @@ func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf
}
buf := bytes.NewBuffer(nil)
h.WriteTo(buf)
_, _ = h.WriteTo(buf)
extraInfo = buf.Bytes()
}
}
@@ -800,7 +808,11 @@ func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf
localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
if len(extraInfo) > 0 {
localConn.Write(extraInfo)
if _, err := localConn.Write(extraInfo); err != nil {
workConn.Close()
xl.Error("write extraInfo to local conn error: %v", err)
return
}
}
frpIo.Join(localConn, remote)

View File

@@ -6,12 +6,12 @@ import (
"net"
"sync"
"github.com/fatedier/golib/errors"
"github.com/fatedier/frp/client/event"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/golib/errors"
)
type Manager struct {
@@ -113,10 +113,8 @@ func (pm *Manager) Reload(pxyCfgs map[string]config.ProxyConf) {
cfg, ok := pxyCfgs[name]
if !ok {
del = true
} else {
if !pxy.Cfg.Compare(cfg) {
del = true
}
} else if !pxy.Cfg.Compare(cfg) {
del = true
}
if del {

View File

@@ -8,13 +8,13 @@ import (
"sync/atomic"
"time"
"github.com/fatedier/golib/errors"
"github.com/fatedier/frp/client/event"
"github.com/fatedier/frp/client/health"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/golib/errors"
)
const (
@@ -27,9 +27,9 @@ const (
)
var (
statusCheckInterval time.Duration = 3 * time.Second
waitResponseTimeout = 20 * time.Second
startErrTimeout = 30 * time.Second
statusCheckInterval = 3 * time.Second
waitResponseTimeout = 20 * time.Second
startErrTimeout = 30 * time.Second
)
type WorkingStatus struct {
@@ -145,7 +145,7 @@ func (pw *Wrapper) Stop() {
}
func (pw *Wrapper) close() {
pw.handler(&event.CloseProxyPayload{
_ = pw.handler(&event.CloseProxyPayload{
CloseProxyMsg: &msg.CloseProxy{
ProxyName: pw.Name,
},
@@ -174,7 +174,7 @@ func (pw *Wrapper) checkWorker() {
var newProxyMsg msg.NewProxy
pw.Cfg.MarshalToMsg(&newProxyMsg)
pw.lastSendStartMsg = now
pw.handler(&event.StartProxyPayload{
_ = pw.handler(&event.StartProxyPayload{
NewProxyMsg: &newProxyMsg,
})
}
@@ -201,7 +201,7 @@ func (pw *Wrapper) checkWorker() {
func (pw *Wrapper) statusNormalCallback() {
xl := pw.xl
atomic.StoreUint32(&pw.health, 0)
errors.PanicToError(func() {
_ = errors.PanicToError(func() {
select {
case pw.healthNotifyCh <- struct{}{}:
default:
@@ -213,7 +213,7 @@ func (pw *Wrapper) statusNormalCallback() {
func (pw *Wrapper) statusFailedCallback() {
xl := pw.xl
atomic.StoreUint32(&pw.health, 1)
errors.PanicToError(func() {
_ = errors.PanicToError(func() {
select {
case pw.healthNotifyCh <- struct{}{}:
default:

View File

@@ -28,6 +28,10 @@ import (
"sync/atomic"
"time"
"github.com/fatedier/golib/crypto"
libdial "github.com/fatedier/golib/net/dial"
fmux "github.com/hashicorp/yamux"
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/pkg/auth"
"github.com/fatedier/frp/pkg/config"
@@ -38,10 +42,6 @@ import (
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/version"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/golib/crypto"
libdial "github.com/fatedier/golib/net/dial"
fmux "github.com/hashicorp/yamux"
)
func init() {
@@ -81,8 +81,12 @@ type Service struct {
cancel context.CancelFunc
}
func NewService(cfg config.ClientCommonConf, pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf, cfgFile string) (svr *Service, err error) {
func NewService(
cfg config.ClientCommonConf,
pxyCfgs map[string]config.ProxyConf,
visitorCfgs map[string]config.VisitorConf,
cfgFile string,
) (svr *Service, err error) {
ctx, cancel := context.WithCancel(context.Background())
svr = &Service{
authSetter: auth.NewAuthSetter(cfg.ClientConfig),
@@ -208,7 +212,7 @@ func (svr *Service) keepControllerWorking() {
xl.Warn("reconnect to server error: %v, wait %v for another retry", err, delayTime)
util.RandomSleep(delayTime, 0.9, 1.1)
delayTime = delayTime * 2
delayTime *= 2
if delayTime > maxDelayTime {
delayTime = maxDelayTime
}
@@ -333,11 +337,11 @@ func (svr *Service) login() (conn net.Conn, session *fmux.Session, err error) {
}
var loginRespMsg msg.LoginResp
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
_ = conn.SetReadDeadline(time.Now().Add(10 * time.Second))
if err = msg.ReadMsgInto(conn, &loginRespMsg); err != nil {
return
}
conn.SetReadDeadline(time.Time{})
_ = conn.SetReadDeadline(time.Time{})
if loginRespMsg.Error != "" {
err = fmt.Errorf("%s", loginRespMsg.Error)

View File

@@ -24,17 +24,17 @@ import (
"sync"
"time"
"github.com/fatedier/golib/errors"
frpIo "github.com/fatedier/golib/io"
"github.com/fatedier/golib/pool"
fmux "github.com/hashicorp/yamux"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/golib/errors"
frpIo "github.com/fatedier/golib/io"
"github.com/fatedier/golib/pool"
fmux "github.com/hashicorp/yamux"
)
// Visitor is used for forward traffics from local port tot remote service.
@@ -71,9 +71,8 @@ func NewVisitor(ctx context.Context, ctl *Control, cfg config.VisitorConf) (visi
}
type BaseVisitor struct {
ctl *Control
l net.Listener
closed bool
ctl *Control
l net.Listener
mu sync.RWMutex
ctx context.Context
@@ -138,13 +137,13 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
}
var newVisitorConnRespMsg msg.NewVisitorConnResp
visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
_ = visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
if err != nil {
xl.Warn("get newVisitorConnRespMsg error: %v", err)
return
}
visitorConn.SetReadDeadline(time.Time{})
_ = visitorConn.SetReadDeadline(time.Time{})
if newVisitorConnRespMsg.Error != "" {
xl.Warn("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
@@ -239,7 +238,7 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
// Wait for client address at most 10 seconds.
var natHoleRespMsg msg.NatHoleResp
visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
_ = visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
buf := pool.GetBuf(1024)
n, err := visitorConn.Read(buf)
if err != nil {
@@ -252,7 +251,7 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
xl.Warn("get natHoleRespMsg error: %v", err)
return
}
visitorConn.SetReadDeadline(time.Time{})
_ = visitorConn.SetReadDeadline(time.Time{})
pool.PutBuf(buf)
if natHoleRespMsg.Error != "" {
@@ -279,17 +278,20 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
}
defer lConn.Close()
lConn.Write([]byte(natHoleRespMsg.Sid))
if _, err := lConn.Write([]byte(natHoleRespMsg.Sid)); err != nil {
xl.Error("write sid error: %v", err)
return
}
// read ack sid from client
sidBuf := pool.GetBuf(1024)
lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
_ = lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
n, err = lConn.Read(sidBuf)
if err != nil {
xl.Warn("get sid from client error: %v", err)
return
}
lConn.SetReadDeadline(time.Time{})
_ = lConn.SetReadDeadline(time.Time{})
if string(sidBuf[:n]) != natHoleRespMsg.Sid {
xl.Warn("incorrect sid from client")
return
@@ -411,7 +413,6 @@ func (sv *SUDPVisitor) dispatcher() {
default:
}
}
}
func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
@@ -437,13 +438,13 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
)
// frpc will send heartbeat in workConn to frpc visitor for keeping alive
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
_ = conn.SetReadDeadline(time.Now().Add(60 * time.Second))
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
xl.Warn("read from workconn for user udp conn error: %v", errRet)
return
}
conn.SetReadDeadline(time.Time{})
_ = conn.SetReadDeadline(time.Time{})
switch m := rawMsg.(type) {
case *msg.Ping:
xl.Debug("frpc visitor get ping message from frpc")
@@ -523,12 +524,12 @@ func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
}
var newVisitorConnRespMsg msg.NewVisitorConnResp
visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
_ = visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
if err != nil {
return nil, fmt.Errorf("frpc read newVisitorConnRespMsg error: %v", err)
}
visitorConn.SetReadDeadline(time.Time{})
_ = visitorConn.SetReadDeadline(time.Time{})
if newVisitorConnRespMsg.Error != "" {
return nil, fmt.Errorf("start new visitor connection error: %s", newVisitorConnRespMsg.Error)

View File

@@ -65,7 +65,7 @@ func (vm *VisitorManager) Run() {
name := cfg.GetBaseInfo().ProxyName
if _, exist := vm.visitors[name]; !exist {
xl.Info("try to start visitor [%s]", name)
vm.startVisitor(cfg)
_ = vm.startVisitor(cfg)
}
}
vm.mu.Unlock()
@@ -99,10 +99,8 @@ func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) {
cfg, ok := cfgs[name]
if !ok {
del = true
} else {
if !oldCfg.Compare(cfg) {
del = true
}
} else if !oldCfg.Compare(cfg) {
del = true
}
if del {
@@ -123,13 +121,12 @@ func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) {
if _, ok := vm.cfgs[name]; !ok {
vm.cfgs[name] = cfg
addNames = append(addNames, name)
vm.startVisitor(cfg)
_ = vm.startVisitor(cfg)
}
}
if len(addNames) > 0 {
xl.Info("visitor added: %v", addNames)
}
return
}
func (vm *VisitorManager) Close() {