change log method

This commit is contained in:
fatedier
2019-10-12 20:13:12 +08:00
parent 5dc8175fc8
commit 649f47c345
44 changed files with 670 additions and 688 deletions

View File

@@ -15,9 +15,11 @@
package client
import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"runtime/debug"
"sync"
"time"
@@ -25,8 +27,8 @@ import (
"github.com/fatedier/frp/client/proxy"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/xlog"
"github.com/fatedier/golib/control/shutdown"
"github.com/fatedier/golib/crypto"
@@ -45,7 +47,7 @@ type Control struct {
vm *VisitorManager
// control connection
conn frpNet.Conn
conn net.Conn
// tcp stream multiplexing, if enabled
session *fmux.Session
@@ -76,12 +78,19 @@ type Control struct {
mu sync.RWMutex
log.Logger
xl *xlog.Logger
// service context
ctx context.Context
}
func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, clientCfg config.ClientCommonConf,
pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf, serverUDPPort int) *Control {
func NewControl(ctx context.Context, runId string, conn net.Conn, session *fmux.Session,
clientCfg config.ClientCommonConf,
pxyCfgs map[string]config.ProxyConf,
visitorCfgs map[string]config.VisitorConf,
serverUDPPort int) *Control {
// new xlog instance
ctl := &Control{
runId: runId,
conn: conn,
@@ -96,11 +105,12 @@ func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, clientCfg
writerShutdown: shutdown.New(),
msgHandlerShutdown: shutdown.New(),
serverUDPPort: serverUDPPort,
Logger: log.NewPrefixLogger(""),
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
}
ctl.pm = proxy.NewProxyManager(ctl.sendCh, runId, clientCfg, serverUDPPort)
ctl.pm = proxy.NewProxyManager(ctl.ctx, ctl.sendCh, clientCfg, serverUDPPort)
ctl.vm = NewVisitorManager(ctl)
ctl.vm = NewVisitorManager(ctl.ctx, ctl)
ctl.vm.Reload(visitorCfgs)
return ctl
}
@@ -117,6 +127,7 @@ func (ctl *Control) Run() {
}
func (ctl *Control) HandleReqWorkConn(inMsg *msg.ReqWorkConn) {
xl := ctl.xl
workConn, err := ctl.connectServer()
if err != nil {
return
@@ -126,31 +137,31 @@ func (ctl *Control) HandleReqWorkConn(inMsg *msg.ReqWorkConn) {
RunId: ctl.runId,
}
if err = msg.WriteMsg(workConn, m); err != nil {
ctl.Warn("work connection write to server error: %v", err)
xl.Warn("work connection write to server error: %v", err)
workConn.Close()
return
}
var startMsg msg.StartWorkConn
if err = msg.ReadMsgInto(workConn, &startMsg); err != nil {
ctl.Error("work connection closed, %v", err)
xl.Error("work connection closed before response StartWorkConn message: %v", err)
workConn.Close()
return
}
workConn.AddLogPrefix(startMsg.ProxyName)
// dispatch this work connection to related proxy
ctl.pm.HandleWorkConn(startMsg.ProxyName, workConn, &startMsg)
}
func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) {
xl := ctl.xl
// Server will return NewProxyResp message to each NewProxy message.
// Start a new proxy handler if no error got
err := ctl.pm.StartProxy(inMsg.ProxyName, inMsg.RemoteAddr, inMsg.Error)
if err != nil {
ctl.Warn("[%s] start error: %v", inMsg.ProxyName, err)
xl.Warn("[%s] start error: %v", inMsg.ProxyName, err)
} else {
ctl.Info("[%s] start proxy success", inMsg.ProxyName)
xl.Info("[%s] start proxy success", inMsg.ProxyName)
}
}
@@ -169,15 +180,16 @@ func (ctl *Control) ClosedDoneCh() <-chan struct{} {
}
// connectServer return a new connection to frps
func (ctl *Control) connectServer() (conn frpNet.Conn, err error) {
func (ctl *Control) connectServer() (conn net.Conn, err error) {
xl := ctl.xl
if ctl.clientCfg.TcpMux {
stream, errRet := ctl.session.OpenStream()
if errRet != nil {
err = errRet
ctl.Warn("start new connection to server error: %v", err)
xl.Warn("start new connection to server error: %v", err)
return
}
conn = frpNet.WrapConn(stream)
conn = stream
} else {
var tlsConfig *tls.Config
if ctl.clientCfg.TLSEnable {
@@ -188,7 +200,7 @@ func (ctl *Control) connectServer() (conn frpNet.Conn, err error) {
conn, err = frpNet.ConnectServerByProxyWithTLS(ctl.clientCfg.HttpProxy, ctl.clientCfg.Protocol,
fmt.Sprintf("%s:%d", ctl.clientCfg.ServerAddr, ctl.clientCfg.ServerPort), tlsConfig)
if err != nil {
ctl.Warn("start new connection to server error: %v", err)
xl.Warn("start new connection to server error: %v", err)
return
}
}
@@ -197,10 +209,11 @@ func (ctl *Control) connectServer() (conn frpNet.Conn, err error) {
// reader read all messages from frps and send to readCh
func (ctl *Control) reader() {
xl := ctl.xl
defer func() {
if err := recover(); err != nil {
ctl.Error("panic error: %v", err)
ctl.Error(string(debug.Stack()))
xl.Error("panic error: %v", err)
xl.Error(string(debug.Stack()))
}
}()
defer ctl.readerShutdown.Done()
@@ -210,10 +223,10 @@ func (ctl *Control) reader() {
for {
if m, err := msg.ReadMsg(encReader); err != nil {
if err == io.EOF {
ctl.Debug("read from control connection EOF")
xl.Debug("read from control connection EOF")
return
} else {
ctl.Warn("read error: %v", err)
xl.Warn("read error: %v", err)
ctl.conn.Close()
return
}
@@ -225,20 +238,21 @@ func (ctl *Control) reader() {
// writer writes messages got from sendCh to frps
func (ctl *Control) writer() {
xl := ctl.xl
defer ctl.writerShutdown.Done()
encWriter, err := crypto.NewWriter(ctl.conn, []byte(ctl.clientCfg.Token))
if err != nil {
ctl.conn.Error("crypto new writer error: %v", err)
xl.Error("crypto new writer error: %v", err)
ctl.conn.Close()
return
}
for {
if m, ok := <-ctl.sendCh; !ok {
ctl.Info("control writer is closing")
xl.Info("control writer is closing")
return
} else {
if err := msg.WriteMsg(encWriter, m); err != nil {
ctl.Warn("write message to control connection error: %v", err)
xl.Warn("write message to control connection error: %v", err)
return
}
}
@@ -247,10 +261,11 @@ func (ctl *Control) writer() {
// msgHandler handles all channel events and do corresponding operations.
func (ctl *Control) msgHandler() {
xl := ctl.xl
defer func() {
if err := recover(); err != nil {
ctl.Error("panic error: %v", err)
ctl.Error(string(debug.Stack()))
xl.Error("panic error: %v", err)
xl.Error(string(debug.Stack()))
}
}()
defer ctl.msgHandlerShutdown.Done()
@@ -266,11 +281,11 @@ func (ctl *Control) msgHandler() {
select {
case <-hbSend.C:
// send heartbeat to server
ctl.Debug("send heartbeat to server")
xl.Debug("send heartbeat to server")
ctl.sendCh <- &msg.Ping{}
case <-hbCheck.C:
if time.Since(ctl.lastPong) > time.Duration(ctl.clientCfg.HeartBeatTimeout)*time.Second {
ctl.Warn("heartbeat timeout")
xl.Warn("heartbeat timeout")
// let reader() stop
ctl.conn.Close()
return
@@ -287,7 +302,7 @@ func (ctl *Control) msgHandler() {
ctl.HandleNewProxyResp(m)
case *msg.Pong:
ctl.lastPong = time.Now()
ctl.Debug("receive heartbeat from server")
xl.Debug("receive heartbeat from server")
}
}
}

View File

@@ -24,7 +24,7 @@ import (
"net/http"
"time"
"github.com/fatedier/frp/utils/log"
"github.com/fatedier/frp/utils/xlog"
)
var (
@@ -50,11 +50,11 @@ type HealthCheckMonitor struct {
ctx context.Context
cancel context.CancelFunc
l log.Logger
}
func NewHealthCheckMonitor(checkType string, intervalS int, timeoutS int, maxFailedTimes int, addr string, url string,
func NewHealthCheckMonitor(ctx context.Context, checkType string,
intervalS int, timeoutS int, maxFailedTimes int,
addr string, url string,
statusNormalFn func(), statusFailedFn func()) *HealthCheckMonitor {
if intervalS <= 0 {
@@ -66,7 +66,7 @@ func NewHealthCheckMonitor(checkType string, intervalS int, timeoutS int, maxFai
if maxFailedTimes <= 0 {
maxFailedTimes = 1
}
ctx, cancel := context.WithCancel(context.Background())
newctx, cancel := context.WithCancel(ctx)
return &HealthCheckMonitor{
checkType: checkType,
interval: time.Duration(intervalS) * time.Second,
@@ -77,15 +77,11 @@ func NewHealthCheckMonitor(checkType string, intervalS int, timeoutS int, maxFai
statusOK: false,
statusNormalFn: statusNormalFn,
statusFailedFn: statusFailedFn,
ctx: ctx,
ctx: newctx,
cancel: cancel,
}
}
func (monitor *HealthCheckMonitor) SetLogger(l log.Logger) {
monitor.l = l
}
func (monitor *HealthCheckMonitor) Start() {
go monitor.checkWorker()
}
@@ -95,6 +91,7 @@ func (monitor *HealthCheckMonitor) Stop() {
}
func (monitor *HealthCheckMonitor) checkWorker() {
xl := xlog.FromContextSafe(monitor.ctx)
for {
doCtx, cancel := context.WithDeadline(monitor.ctx, time.Now().Add(monitor.timeout))
err := monitor.doCheck(doCtx)
@@ -109,25 +106,17 @@ func (monitor *HealthCheckMonitor) checkWorker() {
}
if err == nil {
if monitor.l != nil {
monitor.l.Trace("do one health check success")
}
xl.Trace("do one health check success")
if !monitor.statusOK && monitor.statusNormalFn != nil {
if monitor.l != nil {
monitor.l.Info("health check status change to success")
}
xl.Info("health check status change to success")
monitor.statusOK = true
monitor.statusNormalFn()
}
} else {
if monitor.l != nil {
monitor.l.Warn("do one health check failed: %v", err)
}
xl.Warn("do one health check failed: %v", err)
monitor.failedTimes++
if monitor.statusOK && int(monitor.failedTimes) >= monitor.maxFailedTimes && monitor.statusFailedFn != nil {
if monitor.l != nil {
monitor.l.Warn("health check status change to failed")
}
xl.Warn("health check status change to failed")
monitor.statusOK = false
monitor.statusFailedFn()
}

View File

@@ -16,6 +16,7 @@ package proxy
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
@@ -29,8 +30,8 @@ import (
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/plugin"
"github.com/fatedier/frp/models/proto/udp"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/xlog"
"github.com/fatedier/golib/errors"
frpIo "github.com/fatedier/golib/io"
@@ -44,17 +45,17 @@ type Proxy interface {
Run() error
// InWorkConn accept work connections registered to server.
InWorkConn(frpNet.Conn, *msg.StartWorkConn)
InWorkConn(net.Conn, *msg.StartWorkConn)
Close()
log.Logger
}
func NewProxy(pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) {
func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) {
baseProxy := BaseProxy{
Logger: log.NewPrefixLogger(pxyConf.GetBaseInfo().ProxyName),
clientCfg: clientCfg,
serverUDPPort: serverUDPPort,
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
}
switch cfg := pxyConf.(type) {
case *config.TcpProxyConf:
@@ -93,10 +94,12 @@ func NewProxy(pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serve
type BaseProxy struct {
closed bool
mu sync.RWMutex
clientCfg config.ClientCommonConf
serverUDPPort int
log.Logger
mu sync.RWMutex
xl *xlog.Logger
ctx context.Context
}
// TCP
@@ -123,8 +126,8 @@ func (pxy *TcpProxy) Close() {
}
}
func (pxy *TcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
func (pxy *TcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
[]byte(pxy.clientCfg.Token), m)
}
@@ -152,8 +155,8 @@ func (pxy *HttpProxy) Close() {
}
}
func (pxy *HttpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
func (pxy *HttpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
[]byte(pxy.clientCfg.Token), m)
}
@@ -181,8 +184,8 @@ func (pxy *HttpsProxy) Close() {
}
}
func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
func (pxy *HttpsProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
[]byte(pxy.clientCfg.Token), m)
}
@@ -210,8 +213,8 @@ func (pxy *StcpProxy) Close() {
}
}
func (pxy *StcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
func (pxy *StcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
[]byte(pxy.clientCfg.Token), m)
}
@@ -239,12 +242,13 @@ func (pxy *XtcpProxy) Close() {
}
}
func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
func (pxy *XtcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
xl := pxy.xl
defer conn.Close()
var natHoleSidMsg msg.NatHoleSid
err := msg.ReadMsgInto(conn, &natHoleSidMsg)
if err != nil {
pxy.Error("xtcp read from workConn error: %v", err)
xl.Error("xtcp read from workConn error: %v", err)
return
}
@@ -259,7 +263,7 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
err = msg.WriteMsg(clientConn, natHoleClientMsg)
if err != nil {
pxy.Error("send natHoleClientMsg to server error: %v", err)
xl.Error("send natHoleClientMsg to server error: %v", err)
return
}
@@ -270,28 +274,28 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
buf := pool.GetBuf(1024)
n, err := clientConn.Read(buf)
if err != nil {
pxy.Error("get natHoleRespMsg error: %v", err)
xl.Error("get natHoleRespMsg error: %v", err)
return
}
err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
if err != nil {
pxy.Error("get natHoleRespMsg error: %v", err)
xl.Error("get natHoleRespMsg error: %v", err)
return
}
clientConn.SetReadDeadline(time.Time{})
clientConn.Close()
if natHoleRespMsg.Error != "" {
pxy.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
return
}
pxy.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
xl.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
// Send detect message
array := strings.Split(natHoleRespMsg.VisitorAddr, ":")
if len(array) <= 1 {
pxy.Error("get NatHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
xl.Error("get NatHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
}
laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String())
/*
@@ -301,18 +305,18 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
*/
port, err := strconv.ParseInt(array[1], 10, 64)
if err != nil {
pxy.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
xl.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
return
}
pxy.sendDetectMsg(array[0], int(port), laddr, []byte(natHoleRespMsg.Sid))
pxy.Trace("send all detect msg done")
xl.Trace("send all detect msg done")
msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{})
// Listen for clientConn's address and wait for visitor connection
lConn, err := net.ListenUDP("udp", laddr)
if err != nil {
pxy.Error("listen on visitorConn's local adress error: %v", err)
xl.Error("listen on visitorConn's local adress error: %v", err)
return
}
defer lConn.Close()
@@ -322,22 +326,22 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
var uAddr *net.UDPAddr
n, uAddr, err = lConn.ReadFromUDP(sidBuf)
if err != nil {
pxy.Warn("get sid from visitor error: %v", err)
xl.Warn("get sid from visitor error: %v", err)
return
}
lConn.SetReadDeadline(time.Time{})
if string(sidBuf[:n]) != natHoleRespMsg.Sid {
pxy.Warn("incorrect sid from visitor")
xl.Warn("incorrect sid from visitor")
return
}
pool.PutBuf(sidBuf)
pxy.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
lConn.WriteToUDP(sidBuf[:n], uAddr)
kcpConn, err := frpNet.NewKcpConnFromUdp(lConn, false, natHoleRespMsg.VisitorAddr)
if err != nil {
pxy.Error("create kcp connection from udp connection error: %v", err)
xl.Error("create kcp connection from udp connection error: %v", err)
return
}
@@ -346,18 +350,18 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
fmuxCfg.LogOutput = ioutil.Discard
sess, err := fmux.Server(kcpConn, fmuxCfg)
if err != nil {
pxy.Error("create yamux server from kcp connection error: %v", err)
xl.Error("create yamux server from kcp connection error: %v", err)
return
}
defer sess.Close()
muxConn, err := sess.Accept()
if err != nil {
pxy.Error("accept for yamux connection error: %v", err)
xl.Error("accept for yamux connection error: %v", err)
return
}
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf,
frpNet.WrapConn(muxConn), []byte(pxy.cfg.Sk), m)
HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf,
muxConn, []byte(pxy.cfg.Sk), m)
}
func (pxy *XtcpProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) {
@@ -390,7 +394,7 @@ type UdpProxy struct {
// include msg.UdpPacket and msg.Ping
sendCh chan msg.Message
workConn frpNet.Conn
workConn net.Conn
}
func (pxy *UdpProxy) Run() (err error) {
@@ -419,8 +423,9 @@ func (pxy *UdpProxy) Close() {
}
}
func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
pxy.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
xl := pxy.xl
xl.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
// close resources releated with old workConn
pxy.Close()
@@ -435,32 +440,32 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
for {
var udpMsg msg.UdpPacket
if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
pxy.Warn("read from workConn for udp error: %v", errRet)
xl.Warn("read from workConn for udp error: %v", errRet)
return
}
if errRet := errors.PanicToError(func() {
pxy.Trace("get udp package from workConn: %s", udpMsg.Content)
xl.Trace("get udp package from workConn: %s", udpMsg.Content)
readCh <- &udpMsg
}); errRet != nil {
pxy.Info("reader goroutine for udp work connection closed: %v", errRet)
xl.Info("reader goroutine for udp work connection closed: %v", errRet)
return
}
}
}
workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
defer func() {
pxy.Info("writer goroutine for udp work connection closed")
xl.Info("writer goroutine for udp work connection closed")
}()
var errRet error
for rawMsg := range sendCh {
switch m := rawMsg.(type) {
case *msg.UdpPacket:
pxy.Trace("send udp package to workConn: %s", m.Content)
xl.Trace("send udp package to workConn: %s", m.Content)
case *msg.Ping:
pxy.Trace("send ping message to udp workConn")
xl.Trace("send ping message to udp workConn")
}
if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
pxy.Error("udp work write error: %v", errRet)
xl.Error("udp work write error: %v", errRet)
return
}
}
@@ -472,7 +477,7 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
if errRet = errors.PanicToError(func() {
sendCh <- &msg.Ping{}
}); errRet != nil {
pxy.Trace("heartbeat goroutine for udp work connection closed")
xl.Trace("heartbeat goroutine for udp work connection closed")
break
}
}
@@ -485,20 +490,22 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
}
// Common handler for tcp work connections.
func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
baseInfo *config.BaseProxyConf, workConn frpNet.Conn, encKey []byte, m *msg.StartWorkConn) {
func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
baseInfo *config.BaseProxyConf, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) {
xl := xlog.FromContextSafe(ctx)
var (
remote io.ReadWriteCloser
err error
)
remote = workConn
xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t",
baseInfo.UseEncryption, baseInfo.UseCompression)
if baseInfo.UseEncryption {
remote, err = frpIo.WithEncryption(remote, encKey)
if err != nil {
workConn.Close()
workConn.Error("create encryption stream error: %v", err)
xl.Error("create encryption stream error: %v", err)
return
}
}
@@ -541,19 +548,19 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.
if proxyPlugin != nil {
// if plugin is set, let plugin handle connections first
workConn.Debug("handle by plugin: %s", proxyPlugin.Name())
xl.Debug("handle by plugin: %s", proxyPlugin.Name())
proxyPlugin.Handle(remote, workConn, extraInfo)
workConn.Debug("handle by plugin finished")
xl.Debug("handle by plugin finished")
return
} else {
localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort))
if err != nil {
workConn.Close()
workConn.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err)
xl.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err)
return
}
workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
xl.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
if len(extraInfo) > 0 {
@@ -561,6 +568,6 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.
}
frpIo.Join(localConn, remote)
workConn.Debug("join connections closed")
xl.Debug("join connections closed")
}
}

View File

@@ -1,14 +1,15 @@
package proxy
import (
"context"
"fmt"
"net"
"sync"
"github.com/fatedier/frp/client/event"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/xlog"
"github.com/fatedier/golib/errors"
)
@@ -25,19 +26,17 @@ type ProxyManager struct {
// The UDP port that the server is listening on
serverUDPPort int
logPrefix string
log.Logger
ctx context.Context
}
func NewProxyManager(msgSendCh chan (msg.Message), logPrefix string, clientCfg config.ClientCommonConf, serverUDPPort int) *ProxyManager {
func NewProxyManager(ctx context.Context, msgSendCh chan (msg.Message), clientCfg config.ClientCommonConf, serverUDPPort int) *ProxyManager {
return &ProxyManager{
proxies: make(map[string]*ProxyWrapper),
sendCh: msgSendCh,
proxies: make(map[string]*ProxyWrapper),
closed: false,
clientCfg: clientCfg,
serverUDPPort: serverUDPPort,
logPrefix: logPrefix,
Logger: log.NewPrefixLogger(logPrefix),
ctx: ctx,
}
}
@@ -65,7 +64,7 @@ func (pm *ProxyManager) Close() {
pm.proxies = make(map[string]*ProxyWrapper)
}
func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn, m *msg.StartWorkConn) {
func (pm *ProxyManager) HandleWorkConn(name string, workConn net.Conn, m *msg.StartWorkConn) {
pm.mu.RLock()
pw, ok := pm.proxies[name]
pm.mu.RUnlock()
@@ -104,6 +103,7 @@ func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
}
func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) {
xl := xlog.FromContextSafe(pm.ctx)
pm.mu.Lock()
defer pm.mu.Unlock()
@@ -127,13 +127,13 @@ func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) {
}
}
if len(delPxyNames) > 0 {
pm.Info("proxy removed: %v", delPxyNames)
xl.Info("proxy removed: %v", delPxyNames)
}
addPxyNames := make([]string, 0)
for name, cfg := range pxyCfgs {
if _, ok := pm.proxies[name]; !ok {
pxy := NewProxyWrapper(cfg, pm.clientCfg, pm.HandleEvent, pm.logPrefix, pm.serverUDPPort)
pxy := NewProxyWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.serverUDPPort)
pm.proxies[name] = pxy
addPxyNames = append(addPxyNames, name)
@@ -141,6 +141,6 @@ func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) {
}
}
if len(addPxyNames) > 0 {
pm.Info("proxy added: %v", addPxyNames)
xl.Info("proxy added: %v", addPxyNames)
}
}

View File

@@ -1,7 +1,9 @@
package proxy
import (
"context"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
@@ -10,8 +12,7 @@ import (
"github.com/fatedier/frp/client/health"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/xlog"
"github.com/fatedier/golib/errors"
)
@@ -62,11 +63,13 @@ type ProxyWrapper struct {
healthNotifyCh chan struct{}
mu sync.RWMutex
log.Logger
xl *xlog.Logger
ctx context.Context
}
func NewProxyWrapper(cfg config.ProxyConf, clientCfg config.ClientCommonConf, eventHandler event.EventHandler, logPrefix string, serverUDPPort int) *ProxyWrapper {
func NewProxyWrapper(ctx context.Context, cfg config.ProxyConf, clientCfg config.ClientCommonConf, eventHandler event.EventHandler, serverUDPPort int) *ProxyWrapper {
baseInfo := cfg.GetBaseInfo()
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(baseInfo.ProxyName)
pw := &ProxyWrapper{
ProxyStatus: ProxyStatus{
Name: baseInfo.ProxyName,
@@ -77,20 +80,19 @@ func NewProxyWrapper(cfg config.ProxyConf, clientCfg config.ClientCommonConf, ev
closeCh: make(chan struct{}),
healthNotifyCh: make(chan struct{}),
handler: eventHandler,
Logger: log.NewPrefixLogger(logPrefix),
xl: xl,
ctx: xlog.NewContext(ctx, xl),
}
pw.AddLogPrefix(pw.Name)
if baseInfo.HealthCheckType != "" {
pw.health = 1 // means failed
pw.monitor = health.NewHealthCheckMonitor(baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS,
pw.monitor = health.NewHealthCheckMonitor(pw.ctx, baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS,
baseInfo.HealthCheckTimeoutS, baseInfo.HealthCheckMaxFailed, baseInfo.HealthCheckAddr,
baseInfo.HealthCheckUrl, pw.statusNormalCallback, pw.statusFailedCallback)
pw.monitor.SetLogger(pw.Logger)
pw.Trace("enable health check monitor")
xl.Trace("enable health check monitor")
}
pw.pxy = NewProxy(pw.Cfg, clientCfg, serverUDPPort)
pw.pxy = NewProxy(pw.ctx, pw.Cfg, clientCfg, serverUDPPort)
return pw
}
@@ -147,6 +149,7 @@ func (pw *ProxyWrapper) Stop() {
}
func (pw *ProxyWrapper) checkWorker() {
xl := pw.xl
if pw.monitor != nil {
// let monitor do check request first
time.Sleep(500 * time.Millisecond)
@@ -161,7 +164,7 @@ func (pw *ProxyWrapper) checkWorker() {
(pw.Status == ProxyStatusWaitStart && now.After(pw.lastSendStartMsg.Add(waitResponseTimeout))) ||
(pw.Status == ProxyStatusStartErr && now.After(pw.lastStartErr.Add(startErrTimeout))) {
pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusWaitStart)
xl.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusWaitStart)
pw.Status = ProxyStatusWaitStart
var newProxyMsg msg.NewProxy
@@ -180,7 +183,7 @@ func (pw *ProxyWrapper) checkWorker() {
ProxyName: pw.Name,
},
})
pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusCheckFailed)
xl.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusCheckFailed)
pw.Status = ProxyStatusCheckFailed
}
pw.mu.Unlock()
@@ -196,6 +199,7 @@ func (pw *ProxyWrapper) checkWorker() {
}
func (pw *ProxyWrapper) statusNormalCallback() {
xl := pw.xl
atomic.StoreUint32(&pw.health, 0)
errors.PanicToError(func() {
select {
@@ -203,10 +207,11 @@ func (pw *ProxyWrapper) statusNormalCallback() {
default:
}
})
pw.Info("health check success")
xl.Info("health check success")
}
func (pw *ProxyWrapper) statusFailedCallback() {
xl := pw.xl
atomic.StoreUint32(&pw.health, 1)
errors.PanicToError(func() {
select {
@@ -214,15 +219,16 @@ func (pw *ProxyWrapper) statusFailedCallback() {
default:
}
})
pw.Info("health check failed")
xl.Info("health check failed")
}
func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn, m *msg.StartWorkConn) {
func (pw *ProxyWrapper) InWorkConn(workConn net.Conn, m *msg.StartWorkConn) {
xl := pw.xl
pw.mu.RLock()
pxy := pw.pxy
pw.mu.RUnlock()
if pxy != nil {
workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
xl.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
go pxy.InWorkConn(workConn, m)
} else {
workConn.Close()

View File

@@ -15,9 +15,11 @@
package client
import (
"context"
"crypto/tls"
"fmt"
"io/ioutil"
"net"
"runtime"
"sync"
"sync/atomic"
@@ -30,6 +32,7 @@ import (
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/version"
"github.com/fatedier/frp/utils/xlog"
fmux "github.com/hashicorp/yamux"
)
@@ -55,19 +58,25 @@ type Service struct {
// This is configured by the login response from frps
serverUDPPort int
exit uint32 // 0 means not exit
closedCh chan int
exit uint32 // 0 means not exit
// service context
ctx context.Context
// call cancel to stop service
cancel context.CancelFunc
}
// NewService creates a new client service with the given configuration.
func NewService(cfg config.ClientCommonConf, pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf, cfgFile string) (svr *Service, err error) {
ctx, cancel := context.WithCancel(context.Background())
svr = &Service{
cfg: cfg,
cfgFile: cfgFile,
pxyCfgs: pxyCfgs,
visitorCfgs: visitorCfgs,
exit: 0,
closedCh: make(chan int),
ctx: xlog.NewContext(ctx, xlog.New()),
cancel: cancel,
}
return
}
@@ -79,11 +88,13 @@ func (svr *Service) GetController() *Control {
}
func (svr *Service) Run() error {
// first login
xl := xlog.FromContextSafe(svr.ctx)
// login to frps
for {
conn, session, err := svr.login()
if err != nil {
log.Warn("login to server failed: %v", err)
xl.Warn("login to server failed: %v", err)
// if login_fail_exit is true, just exit this program
// otherwise sleep a while and try again to connect to server
@@ -94,7 +105,7 @@ func (svr *Service) Run() error {
}
} else {
// login success
ctl := NewControl(svr.runId, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort)
ctl := NewControl(svr.ctx, svr.runId, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort)
ctl.Run()
svr.ctlMu.Lock()
svr.ctl = ctl
@@ -118,12 +129,12 @@ func (svr *Service) Run() error {
}
log.Info("admin server listen on %s:%d", svr.cfg.AdminAddr, svr.cfg.AdminPort)
}
<-svr.closedCh
<-svr.ctx.Done()
return nil
}
func (svr *Service) keepControllerWorking() {
xl := xlog.FromContextSafe(svr.ctx)
maxDelayTime := 20 * time.Second
delayTime := time.Second
@@ -134,10 +145,10 @@ func (svr *Service) keepControllerWorking() {
}
for {
log.Info("try to reconnect to server...")
xl.Info("try to reconnect to server...")
conn, session, err := svr.login()
if err != nil {
log.Warn("reconnect to server error: %v", err)
xl.Warn("reconnect to server error: %v", err)
time.Sleep(delayTime)
delayTime = delayTime * 2
if delayTime > maxDelayTime {
@@ -148,7 +159,7 @@ func (svr *Service) keepControllerWorking() {
// reconnect success, init delayTime
delayTime = time.Second
ctl := NewControl(svr.runId, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort)
ctl := NewControl(svr.ctx, svr.runId, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort)
ctl.Run()
svr.ctlMu.Lock()
svr.ctl = ctl
@@ -161,7 +172,8 @@ func (svr *Service) keepControllerWorking() {
// login creates a connection to frps and registers it self as a client
// conn: control connection
// session: if it's not nil, using tcp mux
func (svr *Service) login() (conn frpNet.Conn, session *fmux.Session, err error) {
func (svr *Service) login() (conn net.Conn, session *fmux.Session, err error) {
xl := xlog.FromContextSafe(svr.ctx)
var tlsConfig *tls.Config
if svr.cfg.TLSEnable {
tlsConfig = &tls.Config{
@@ -197,7 +209,7 @@ func (svr *Service) login() (conn frpNet.Conn, session *fmux.Session, err error)
err = errRet
return
}
conn = frpNet.WrapConn(stream)
conn = stream
}
now := time.Now().Unix()
@@ -225,13 +237,16 @@ func (svr *Service) login() (conn frpNet.Conn, session *fmux.Session, err error)
if loginRespMsg.Error != "" {
err = fmt.Errorf("%s", loginRespMsg.Error)
log.Error("%s", loginRespMsg.Error)
xl.Error("%s", loginRespMsg.Error)
return
}
svr.runId = loginRespMsg.RunId
xl.ResetPrefixes()
xl.AppendPrefix(svr.runId)
svr.serverUDPPort = loginRespMsg.ServerUdpPort
log.Info("login to server success, get run id [%s], server udp port [%d]", loginRespMsg.RunId, loginRespMsg.ServerUdpPort)
xl.Info("login to server success, get run id [%s], server udp port [%d]", loginRespMsg.RunId, loginRespMsg.ServerUdpPort)
return
}
@@ -247,5 +262,5 @@ func (svr *Service) ReloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs
func (svr *Service) Close() {
atomic.StoreUint32(&svr.exit, 1)
svr.ctl.Close()
close(svr.closedCh)
svr.cancel()
}

View File

@@ -16,6 +16,7 @@ package client
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
@@ -25,9 +26,9 @@ import (
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/xlog"
frpIo "github.com/fatedier/golib/io"
"github.com/fatedier/golib/pool"
@@ -38,13 +39,13 @@ import (
type Visitor interface {
Run() error
Close()
log.Logger
}
func NewVisitor(ctl *Control, cfg config.VisitorConf) (visitor Visitor) {
func NewVisitor(ctx context.Context, ctl *Control, cfg config.VisitorConf) (visitor Visitor) {
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(cfg.GetBaseInfo().ProxyName)
baseVisitor := BaseVisitor{
ctl: ctl,
Logger: log.NewPrefixLogger(cfg.GetBaseInfo().ProxyName),
ctl: ctl,
ctx: xlog.NewContext(ctx, xl),
}
switch cfg := cfg.(type) {
case *config.StcpVisitorConf:
@@ -63,10 +64,11 @@ func NewVisitor(ctl *Control, cfg config.VisitorConf) (visitor Visitor) {
type BaseVisitor struct {
ctl *Control
l frpNet.Listener
l net.Listener
closed bool
mu sync.RWMutex
log.Logger
mu sync.RWMutex
ctx context.Context
}
type StcpVisitor struct {
@@ -76,7 +78,7 @@ type StcpVisitor struct {
}
func (sv *StcpVisitor) Run() (err error) {
sv.l, err = frpNet.ListenTcp(sv.cfg.BindAddr, sv.cfg.BindPort)
sv.l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", sv.cfg.BindAddr, sv.cfg.BindPort))
if err != nil {
return
}
@@ -90,10 +92,11 @@ func (sv *StcpVisitor) Close() {
}
func (sv *StcpVisitor) worker() {
xl := xlog.FromContextSafe(sv.ctx)
for {
conn, err := sv.l.Accept()
if err != nil {
sv.Warn("stcp local listener closed")
xl.Warn("stcp local listener closed")
return
}
@@ -101,10 +104,11 @@ func (sv *StcpVisitor) worker() {
}
}
func (sv *StcpVisitor) handleConn(userConn frpNet.Conn) {
func (sv *StcpVisitor) handleConn(userConn net.Conn) {
xl := xlog.FromContextSafe(sv.ctx)
defer userConn.Close()
sv.Debug("get a new stcp user connection")
xl.Debug("get a new stcp user connection")
visitorConn, err := sv.ctl.connectServer()
if err != nil {
return
@@ -121,7 +125,7 @@ func (sv *StcpVisitor) handleConn(userConn frpNet.Conn) {
}
err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
if err != nil {
sv.Warn("send newVisitorConnMsg to server error: %v", err)
xl.Warn("send newVisitorConnMsg to server error: %v", err)
return
}
@@ -129,13 +133,13 @@ func (sv *StcpVisitor) handleConn(userConn frpNet.Conn) {
visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
if err != nil {
sv.Warn("get newVisitorConnRespMsg error: %v", err)
xl.Warn("get newVisitorConnRespMsg error: %v", err)
return
}
visitorConn.SetReadDeadline(time.Time{})
if newVisitorConnRespMsg.Error != "" {
sv.Warn("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
xl.Warn("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
return
}
@@ -144,7 +148,7 @@ func (sv *StcpVisitor) handleConn(userConn frpNet.Conn) {
if sv.cfg.UseEncryption {
remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk))
if err != nil {
sv.Error("create encryption stream error: %v", err)
xl.Error("create encryption stream error: %v", err)
return
}
}
@@ -163,7 +167,7 @@ type XtcpVisitor struct {
}
func (sv *XtcpVisitor) Run() (err error) {
sv.l, err = frpNet.ListenTcp(sv.cfg.BindAddr, sv.cfg.BindPort)
sv.l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", sv.cfg.BindAddr, sv.cfg.BindPort))
if err != nil {
return
}
@@ -177,10 +181,11 @@ func (sv *XtcpVisitor) Close() {
}
func (sv *XtcpVisitor) worker() {
xl := xlog.FromContextSafe(sv.ctx)
for {
conn, err := sv.l.Accept()
if err != nil {
sv.Warn("xtcp local listener closed")
xl.Warn("xtcp local listener closed")
return
}
@@ -188,25 +193,26 @@ func (sv *XtcpVisitor) worker() {
}
}
func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) {
func (sv *XtcpVisitor) handleConn(userConn net.Conn) {
xl := xlog.FromContextSafe(sv.ctx)
defer userConn.Close()
sv.Debug("get a new xtcp user connection")
xl.Debug("get a new xtcp user connection")
if sv.ctl.serverUDPPort == 0 {
sv.Error("xtcp is not supported by server")
xl.Error("xtcp is not supported by server")
return
}
raddr, err := net.ResolveUDPAddr("udp",
fmt.Sprintf("%s:%d", sv.ctl.clientCfg.ServerAddr, sv.ctl.serverUDPPort))
if err != nil {
sv.Error("resolve server UDP addr error")
xl.Error("resolve server UDP addr error")
return
}
visitorConn, err := net.DialUDP("udp", nil, raddr)
if err != nil {
sv.Warn("dial server udp addr error: %v", err)
xl.Warn("dial server udp addr error: %v", err)
return
}
defer visitorConn.Close()
@@ -219,7 +225,7 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) {
}
err = msg.WriteMsg(visitorConn, natHoleVisitorMsg)
if err != nil {
sv.Warn("send natHoleVisitorMsg to server error: %v", err)
xl.Warn("send natHoleVisitorMsg to server error: %v", err)
return
}
@@ -229,24 +235,24 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) {
buf := pool.GetBuf(1024)
n, err := visitorConn.Read(buf)
if err != nil {
sv.Warn("get natHoleRespMsg error: %v", err)
xl.Warn("get natHoleRespMsg error: %v", err)
return
}
err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
if err != nil {
sv.Warn("get natHoleRespMsg error: %v", err)
xl.Warn("get natHoleRespMsg error: %v", err)
return
}
visitorConn.SetReadDeadline(time.Time{})
pool.PutBuf(buf)
if natHoleRespMsg.Error != "" {
sv.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
return
}
sv.Trace("get natHoleRespMsg, sid [%s], client address [%s], visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
xl.Trace("get natHoleRespMsg, sid [%s], client address [%s], visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
// Close visitorConn, so we can use it's local address.
visitorConn.Close()
@@ -255,12 +261,12 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) {
laddr, _ := net.ResolveUDPAddr("udp", visitorConn.LocalAddr().String())
daddr, err := net.ResolveUDPAddr("udp", natHoleRespMsg.ClientAddr)
if err != nil {
sv.Error("resolve client udp address error: %v", err)
xl.Error("resolve client udp address error: %v", err)
return
}
lConn, err := net.DialUDP("udp", laddr, daddr)
if err != nil {
sv.Error("dial client udp address error: %v", err)
xl.Error("dial client udp address error: %v", err)
return
}
defer lConn.Close()
@@ -272,23 +278,23 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) {
lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
n, err = lConn.Read(sidBuf)
if err != nil {
sv.Warn("get sid from client error: %v", err)
xl.Warn("get sid from client error: %v", err)
return
}
lConn.SetReadDeadline(time.Time{})
if string(sidBuf[:n]) != natHoleRespMsg.Sid {
sv.Warn("incorrect sid from client")
xl.Warn("incorrect sid from client")
return
}
pool.PutBuf(sidBuf)
sv.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
// wrap kcp connection
var remote io.ReadWriteCloser
remote, err = frpNet.NewKcpConnFromUdp(lConn, true, natHoleRespMsg.ClientAddr)
if err != nil {
sv.Error("create kcp connection from udp connection error: %v", err)
xl.Error("create kcp connection from udp connection error: %v", err)
return
}
@@ -297,13 +303,13 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) {
fmuxCfg.LogOutput = ioutil.Discard
sess, err := fmux.Client(remote, fmuxCfg)
if err != nil {
sv.Error("create yamux session error: %v", err)
xl.Error("create yamux session error: %v", err)
return
}
defer sess.Close()
muxConn, err := sess.Open()
if err != nil {
sv.Error("open yamux stream error: %v", err)
xl.Error("open yamux stream error: %v", err)
return
}
@@ -311,7 +317,7 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) {
if sv.cfg.UseEncryption {
muxConnRWCloser, err = frpIo.WithEncryption(muxConnRWCloser, []byte(sv.cfg.Sk))
if err != nil {
sv.Error("create encryption stream error: %v", err)
xl.Error("create encryption stream error: %v", err)
return
}
}
@@ -320,5 +326,5 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) {
}
frpIo.Join(userConn, muxConnRWCloser)
sv.Debug("join connections closed")
xl.Debug("join connections closed")
}

View File

@@ -15,11 +15,12 @@
package client
import (
"context"
"sync"
"time"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/utils/log"
"github.com/fatedier/frp/utils/xlog"
)
type VisitorManager struct {
@@ -30,26 +31,29 @@ type VisitorManager struct {
checkInterval time.Duration
mu sync.Mutex
mu sync.Mutex
ctx context.Context
}
func NewVisitorManager(ctl *Control) *VisitorManager {
func NewVisitorManager(ctx context.Context, ctl *Control) *VisitorManager {
return &VisitorManager{
ctl: ctl,
cfgs: make(map[string]config.VisitorConf),
visitors: make(map[string]Visitor),
checkInterval: 10 * time.Second,
ctx: ctx,
}
}
func (vm *VisitorManager) Run() {
xl := xlog.FromContextSafe(vm.ctx)
for {
time.Sleep(vm.checkInterval)
vm.mu.Lock()
for _, cfg := range vm.cfgs {
name := cfg.GetBaseInfo().ProxyName
if _, exist := vm.visitors[name]; !exist {
log.Info("try to start visitor [%s]", name)
xl.Info("try to start visitor [%s]", name)
vm.startVisitor(cfg)
}
}
@@ -59,19 +63,21 @@ func (vm *VisitorManager) Run() {
// Hold lock before calling this function.
func (vm *VisitorManager) startVisitor(cfg config.VisitorConf) (err error) {
xl := xlog.FromContextSafe(vm.ctx)
name := cfg.GetBaseInfo().ProxyName
visitor := NewVisitor(vm.ctl, cfg)
visitor := NewVisitor(vm.ctx, vm.ctl, cfg)
err = visitor.Run()
if err != nil {
visitor.Warn("start error: %v", err)
xl.Warn("start error: %v", err)
} else {
vm.visitors[name] = visitor
visitor.Info("start visitor success")
xl.Info("start visitor success")
}
return
}
func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) {
xl := xlog.FromContextSafe(vm.ctx)
vm.mu.Lock()
defer vm.mu.Unlock()
@@ -97,7 +103,7 @@ func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) {
}
}
if len(delNames) > 0 {
log.Info("visitor removed: %v", delNames)
xl.Info("visitor removed: %v", delNames)
}
addNames := make([]string, 0)
@@ -109,7 +115,7 @@ func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) {
}
}
if len(addNames) > 0 {
log.Info("visitor added: %v", addNames)
xl.Info("visitor added: %v", addNames)
}
return
}