support yaml/json/toml configuration format, make ini deprecated (#3599)

This commit is contained in:
fatedier
2023-09-06 10:18:02 +08:00
committed by GitHub
parent 885b029fcf
commit c95311d1a0
103 changed files with 4178 additions and 3829 deletions

View File

@@ -38,7 +38,7 @@ func (svr *Service) RunAdminServer(address string) (err error) {
router.HandleFunc("/healthz", svr.healthz)
// debug
if svr.cfg.PprofEnable {
if svr.cfg.WebServer.PprofEnable {
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
@@ -47,7 +47,7 @@ func (svr *Service) RunAdminServer(address string) (err error) {
}
subRouter := router.NewRoute().Subrouter()
user, passwd := svr.cfg.AdminUser, svr.cfg.AdminPwd
user, passwd := svr.cfg.WebServer.User, svr.cfg.WebServer.Password
subRouter.Use(utilnet.NewHTTPAuthMiddleware(user, passwd).SetAuthFailDelay(200 * time.Millisecond).Middleware)
// api, see admin_api.go

View File

@@ -30,6 +30,7 @@ import (
"github.com/fatedier/frp/client/proxy"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/config/v1/validation"
"github.com/fatedier/frp/pkg/util/log"
)
@@ -56,15 +57,21 @@ func (svr *Service) apiReload(w http.ResponseWriter, _ *http.Request) {
}
}()
_, pxyCfgs, visitorCfgs, err := config.ParseClientConfig(svr.cfgFile)
cliCfg, pxyCfgs, visitorCfgs, _, err := config.LoadClientConfig(svr.cfgFile)
if err != nil {
res.Code = 400
res.Msg = err.Error()
log.Warn("reload frpc proxy config error: %s", res.Msg)
return
}
if _, err := validation.ValidateAllClientConfig(cliCfg, pxyCfgs, visitorCfgs); err != nil {
res.Code = 400
res.Msg = err.Error()
log.Warn("reload frpc proxy config error: %s", res.Msg)
return
}
if err = svr.ReloadConf(pxyCfgs, visitorCfgs); err != nil {
if err := svr.ReloadConf(pxyCfgs, visitorCfgs); err != nil {
res.Code = 500
res.Msg = err.Error()
log.Warn("reload frpc proxy config error: %s", res.Msg)
@@ -112,7 +119,7 @@ func NewProxyStatusResp(status *proxy.WorkingStatus, serverAddr string) ProxySta
if baseCfg.LocalPort != 0 {
psr.LocalAddr = net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort))
}
psr.Plugin = baseCfg.Plugin
psr.Plugin = baseCfg.Plugin.Type
if status.Err == "" {
psr.RemoteAddr = status.RemoteAddr
@@ -172,24 +179,14 @@ func (svr *Service) apiGetConfig(w http.ResponseWriter, _ *http.Request) {
return
}
content, err := config.GetRenderedConfFromFile(svr.cfgFile)
content, err := os.ReadFile(svr.cfgFile)
if err != nil {
res.Code = 400
res.Msg = err.Error()
log.Warn("load frpc config file error: %s", res.Msg)
return
}
rows := strings.Split(string(content), "\n")
newRows := make([]string, 0, len(rows))
for _, row := range rows {
row = strings.TrimSpace(row)
if strings.HasPrefix(row, "token") {
continue
}
newRows = append(newRows, row)
}
res.Msg = strings.Join(newRows, "\n")
res.Msg = string(content)
}
// PUT /api/config
@@ -221,49 +218,7 @@ func (svr *Service) apiPutConfig(w http.ResponseWriter, r *http.Request) {
return
}
// get token from origin content
token := ""
b, err := os.ReadFile(svr.cfgFile)
if err != nil {
res.Code = 400
res.Msg = err.Error()
log.Warn("load frpc config file error: %s", res.Msg)
return
}
content := string(b)
for _, row := range strings.Split(content, "\n") {
row = strings.TrimSpace(row)
if strings.HasPrefix(row, "token") {
token = row
break
}
}
tmpRows := make([]string, 0)
for _, row := range strings.Split(string(body), "\n") {
row = strings.TrimSpace(row)
if strings.HasPrefix(row, "token") {
continue
}
tmpRows = append(tmpRows, row)
}
newRows := make([]string, 0)
if token != "" {
for _, row := range tmpRows {
newRows = append(newRows, row)
if strings.HasPrefix(row, "[common]") {
newRows = append(newRows, token)
}
}
} else {
newRows = tmpRows
}
content = strings.Join(newRows, "\n")
err = os.WriteFile(svr.cfgFile, []byte(content), 0o644)
if err != nil {
if err := os.WriteFile(svr.cfgFile, body, 0o644); err != nil {
res.Code = 500
res.Msg = fmt.Sprintf("write content to frpc config file error: %v", err)
log.Warn("%s", res.Msg)

View File

@@ -23,11 +23,12 @@ import (
"github.com/fatedier/golib/control/shutdown"
"github.com/fatedier/golib/crypto"
"github.com/samber/lo"
"github.com/fatedier/frp/client/proxy"
"github.com/fatedier/frp/client/visitor"
"github.com/fatedier/frp/pkg/auth"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/xlog"
@@ -43,7 +44,7 @@ type Control struct {
runID string
// manage all proxies
pxyCfgs map[string]config.ProxyConf
pxyCfgs []v1.ProxyConfigurer
pm *proxy.Manager
// manage all visitors
@@ -69,7 +70,7 @@ type Control struct {
lastPong time.Time
// The client configuration
clientCfg config.ClientCommonConf
clientCfg *v1.ClientCommonConfig
readerShutdown *shutdown.Shutdown
writerShutdown *shutdown.Shutdown
@@ -83,9 +84,9 @@ type Control struct {
func NewControl(
ctx context.Context, runID string, conn net.Conn, cm *ConnectionManager,
clientCfg config.ClientCommonConf,
pxyCfgs map[string]config.ProxyConf,
visitorCfgs map[string]config.VisitorConf,
clientCfg *v1.ClientCommonConfig,
pxyCfgs []v1.ProxyConfigurer,
visitorCfgs []v1.VisitorConfigurer,
authSetter auth.Setter,
) *Control {
// new xlog instance
@@ -220,7 +221,7 @@ func (ctl *Control) reader() {
defer ctl.readerShutdown.Done()
defer close(ctl.closedCh)
encReader := crypto.NewReader(ctl.conn, []byte(ctl.clientCfg.Token))
encReader := crypto.NewReader(ctl.conn, []byte(ctl.clientCfg.Auth.Token))
for {
m, err := msg.ReadMsg(encReader)
if err != nil {
@@ -240,7 +241,7 @@ func (ctl *Control) reader() {
func (ctl *Control) writer() {
xl := ctl.xl
defer ctl.writerShutdown.Done()
encWriter, err := crypto.NewWriter(ctl.conn, []byte(ctl.clientCfg.Token))
encWriter, err := crypto.NewWriter(ctl.conn, []byte(ctl.clientCfg.Auth.Token))
if err != nil {
xl.Error("crypto new writer error: %v", err)
ctl.conn.Close()
@@ -274,15 +275,16 @@ func (ctl *Control) msgHandler() {
var hbSendCh <-chan time.Time
// TODO(fatedier): disable heartbeat if TCPMux is enabled.
// Just keep it here to keep compatible with old version frps.
if ctl.clientCfg.HeartbeatInterval > 0 {
hbSend := time.NewTicker(time.Duration(ctl.clientCfg.HeartbeatInterval) * time.Second)
if ctl.clientCfg.Transport.HeartbeatInterval > 0 {
hbSend := time.NewTicker(time.Duration(ctl.clientCfg.Transport.HeartbeatInterval) * time.Second)
defer hbSend.Stop()
hbSendCh = hbSend.C
}
var hbCheckCh <-chan time.Time
// Check heartbeat timeout only if TCPMux is not enabled and users don't disable heartbeat feature.
if ctl.clientCfg.HeartbeatInterval > 0 && ctl.clientCfg.HeartbeatTimeout > 0 && !ctl.clientCfg.TCPMux {
if ctl.clientCfg.Transport.HeartbeatInterval > 0 && ctl.clientCfg.Transport.HeartbeatTimeout > 0 &&
!lo.FromPtr(ctl.clientCfg.Transport.TCPMux) {
hbCheck := time.NewTicker(time.Second)
defer hbCheck.Stop()
hbCheckCh = hbCheck.C
@@ -301,7 +303,7 @@ func (ctl *Control) msgHandler() {
}
ctl.sendCh <- pingMsg
case <-hbCheckCh:
if time.Since(ctl.lastPong) > time.Duration(ctl.clientCfg.HeartbeatTimeout)*time.Second {
if time.Since(ctl.lastPong) > time.Duration(ctl.clientCfg.Transport.HeartbeatTimeout)*time.Second {
xl.Warn("heartbeat timeout")
// let reader() stop
ctl.conn.Close()
@@ -354,7 +356,7 @@ func (ctl *Control) worker() {
ctl.cm.Close()
}
func (ctl *Control) ReloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf) error {
func (ctl *Control) ReloadConf(pxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) error {
ctl.vm.Reload(visitorCfgs)
ctl.pm.Reload(pxyCfgs)
return nil

View File

@@ -21,8 +21,10 @@ import (
"io"
"net"
"net/http"
"strings"
"time"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/util/xlog"
)
@@ -49,26 +51,33 @@ type Monitor struct {
cancel context.CancelFunc
}
func NewMonitor(ctx context.Context, checkType string,
intervalS int, timeoutS int, maxFailedTimes int,
addr string, url string,
func NewMonitor(ctx context.Context, cfg v1.HealthCheckConfig, addr string,
statusNormalFn func(), statusFailedFn func(),
) *Monitor {
if intervalS <= 0 {
intervalS = 10
if cfg.IntervalSeconds <= 0 {
cfg.IntervalSeconds = 10
}
if timeoutS <= 0 {
timeoutS = 3
if cfg.TimeoutSeconds <= 0 {
cfg.TimeoutSeconds = 3
}
if maxFailedTimes <= 0 {
maxFailedTimes = 1
if cfg.MaxFailed <= 0 {
cfg.MaxFailed = 1
}
newctx, cancel := context.WithCancel(ctx)
var url string
if cfg.Type == "http" && cfg.Path != "" {
s := "http://" + addr
if !strings.HasPrefix(cfg.Path, "/") {
s += "/"
}
url = s + cfg.Path
}
return &Monitor{
checkType: checkType,
interval: time.Duration(intervalS) * time.Second,
timeout: time.Duration(timeoutS) * time.Second,
maxFailedTimes: maxFailedTimes,
checkType: cfg.Type,
interval: time.Duration(cfg.IntervalSeconds) * time.Second,
timeout: time.Duration(cfg.TimeoutSeconds) * time.Second,
maxFailedTimes: cfg.MaxFailed,
addr: addr,
url: url,
statusOK: false,

View File

@@ -17,16 +17,16 @@ package proxy
import (
"reflect"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
)
func init() {
pxyConfs := []config.ProxyConf{
&config.TCPProxyConf{},
&config.HTTPProxyConf{},
&config.HTTPSProxyConf{},
&config.STCPProxyConf{},
&config.TCPMuxProxyConf{},
pxyConfs := []v1.ProxyConfigurer{
&v1.TCPProxyConfig{},
&v1.HTTPProxyConfig{},
&v1.HTTPSProxyConfig{},
&v1.STCPProxyConfig{},
&v1.TCPMuxProxyConfig{},
}
for _, cfg := range pxyConfs {
RegisterProxyFactory(reflect.TypeOf(cfg), NewGeneralTCPProxy)
@@ -40,7 +40,7 @@ type GeneralTCPProxy struct {
*BaseProxy
}
func NewGeneralTCPProxy(baseProxy *BaseProxy, _ config.ProxyConf) Proxy {
func NewGeneralTCPProxy(baseProxy *BaseProxy, _ v1.ProxyConfigurer) Proxy {
return &GeneralTCPProxy{
BaseProxy: baseProxy,
}

View File

@@ -30,7 +30,8 @@ import (
pp "github.com/pires/go-proxyproto"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/config/types"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/client"
"github.com/fatedier/frp/pkg/transport"
@@ -38,9 +39,9 @@ import (
"github.com/fatedier/frp/pkg/util/xlog"
)
var proxyFactoryRegistry = map[reflect.Type]func(*BaseProxy, config.ProxyConf) Proxy{}
var proxyFactoryRegistry = map[reflect.Type]func(*BaseProxy, v1.ProxyConfigurer) Proxy{}
func RegisterProxyFactory(proxyConfType reflect.Type, factory func(*BaseProxy, config.ProxyConf) Proxy) {
func RegisterProxyFactory(proxyConfType reflect.Type, factory func(*BaseProxy, v1.ProxyConfigurer) Proxy) {
proxyFactoryRegistry[proxyConfType] = factory
}
@@ -56,23 +57,23 @@ type Proxy interface {
func NewProxy(
ctx context.Context,
pxyConf config.ProxyConf,
clientCfg config.ClientCommonConf,
pxyConf v1.ProxyConfigurer,
clientCfg *v1.ClientCommonConfig,
msgTransporter transport.MessageTransporter,
) (pxy Proxy) {
var limiter *rate.Limiter
limitBytes := pxyConf.GetBaseConfig().BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseConfig().BandwidthLimitMode == config.BandwidthLimitModeClient {
limitBytes := pxyConf.GetBaseConfig().Transport.BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseConfig().Transport.BandwidthLimitMode == types.BandwidthLimitModeClient {
limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
}
baseProxy := BaseProxy{
baseProxyConfig: pxyConf.GetBaseConfig(),
clientCfg: clientCfg,
limiter: limiter,
msgTransporter: msgTransporter,
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
baseCfg: pxyConf.GetBaseConfig(),
clientCfg: clientCfg,
limiter: limiter,
msgTransporter: msgTransporter,
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
}
factory := proxyFactoryRegistry[reflect.TypeOf(pxyConf)]
@@ -83,10 +84,10 @@ func NewProxy(
}
type BaseProxy struct {
baseProxyConfig *config.BaseProxyConf
clientCfg config.ClientCommonConf
msgTransporter transport.MessageTransporter
limiter *rate.Limiter
baseCfg *v1.ProxyBaseConfig
clientCfg *v1.ClientCommonConfig
msgTransporter transport.MessageTransporter
limiter *rate.Limiter
// proxyPlugin is used to handle connections instead of dialing to local service.
// It's only validate for TCP protocol now.
proxyPlugin plugin.Plugin
@@ -97,8 +98,8 @@ type BaseProxy struct {
}
func (pxy *BaseProxy) Run() error {
if pxy.baseProxyConfig.Plugin != "" {
p, err := plugin.Create(pxy.baseProxyConfig.Plugin, pxy.baseProxyConfig.PluginParams)
if pxy.baseCfg.Plugin.Type != "" {
p, err := plugin.Create(pxy.baseCfg.Plugin.Type, pxy.baseCfg.Plugin.ClientPluginOptions)
if err != nil {
return err
}
@@ -114,13 +115,13 @@ func (pxy *BaseProxy) Close() {
}
func (pxy *BaseProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
pxy.HandleTCPWorkConnection(conn, m, []byte(pxy.clientCfg.Token))
pxy.HandleTCPWorkConnection(conn, m, []byte(pxy.clientCfg.Auth.Token))
}
// Common handler for tcp work connections.
func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWorkConn, encKey []byte) {
xl := pxy.xl
baseConfig := pxy.baseProxyConfig
baseCfg := pxy.baseCfg
var (
remote io.ReadWriteCloser
err error
@@ -133,8 +134,8 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
}
xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t",
baseConfig.UseEncryption, baseConfig.UseCompression)
if baseConfig.UseEncryption {
baseCfg.Transport.UseEncryption, baseCfg.Transport.UseCompression)
if baseCfg.Transport.UseEncryption {
remote, err = libio.WithEncryption(remote, encKey)
if err != nil {
workConn.Close()
@@ -143,13 +144,13 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
}
}
var compressionResourceRecycleFn func()
if baseConfig.UseCompression {
if baseCfg.Transport.UseCompression {
remote, compressionResourceRecycleFn = libio.WithCompressionFromPool(remote)
}
// check if we need to send proxy protocol info
var extraInfo []byte
if baseConfig.ProxyProtocolVersion != "" {
if baseCfg.Transport.ProxyProtocolVersion != "" {
if m.SrcAddr != "" && m.SrcPort != 0 {
if m.DstAddr == "" {
m.DstAddr = "127.0.0.1"
@@ -168,9 +169,9 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
h.TransportProtocol = pp.TCPv6
}
if baseConfig.ProxyProtocolVersion == "v1" {
if baseCfg.Transport.ProxyProtocolVersion == "v1" {
h.Version = 1
} else if baseConfig.ProxyProtocolVersion == "v2" {
} else if baseCfg.Transport.ProxyProtocolVersion == "v2" {
h.Version = 2
}
@@ -189,12 +190,12 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
}
localConn, err := libdial.Dial(
net.JoinHostPort(baseConfig.LocalIP, strconv.Itoa(baseConfig.LocalPort)),
net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)),
libdial.WithTimeout(10*time.Second),
)
if err != nil {
workConn.Close()
xl.Error("connect to local service [%s:%d] error: %v", baseConfig.LocalIP, baseConfig.LocalPort, err)
xl.Error("connect to local service [%s:%d] error: %v", baseCfg.LocalIP, baseCfg.LocalPort, err)
return
}

View File

@@ -21,8 +21,10 @@ import (
"reflect"
"sync"
"github.com/samber/lo"
"github.com/fatedier/frp/client/event"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/xlog"
@@ -35,14 +37,14 @@ type Manager struct {
closed bool
mu sync.RWMutex
clientCfg config.ClientCommonConf
clientCfg *v1.ClientCommonConfig
ctx context.Context
}
func NewManager(
ctx context.Context,
clientCfg config.ClientCommonConf,
clientCfg *v1.ClientCommonConfig,
msgTransporter transport.MessageTransporter,
) *Manager {
return &Manager{
@@ -113,15 +115,18 @@ func (pm *Manager) GetAllProxyStatus() []*WorkingStatus {
return ps
}
func (pm *Manager) Reload(pxyCfgs map[string]config.ProxyConf) {
func (pm *Manager) Reload(pxyCfgs []v1.ProxyConfigurer) {
xl := xlog.FromContextSafe(pm.ctx)
pxyCfgsMap := lo.KeyBy(pxyCfgs, func(c v1.ProxyConfigurer) string {
return c.GetBaseConfig().Name
})
pm.mu.Lock()
defer pm.mu.Unlock()
delPxyNames := make([]string, 0)
for name, pxy := range pm.proxies {
del := false
cfg, ok := pxyCfgs[name]
cfg, ok := pxyCfgsMap[name]
if !ok || !reflect.DeepEqual(pxy.Cfg, cfg) {
del = true
}
@@ -137,7 +142,8 @@ func (pm *Manager) Reload(pxyCfgs map[string]config.ProxyConf) {
}
addPxyNames := make([]string, 0)
for name, cfg := range pxyCfgs {
for _, cfg := range pxyCfgs {
name := cfg.GetBaseConfig().Name
if _, ok := pm.proxies[name]; !ok {
pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter)
pm.proxies[name] = pxy

View File

@@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net"
"strconv"
"sync"
"sync/atomic"
"time"
@@ -26,7 +27,7 @@ import (
"github.com/fatedier/frp/client/event"
"github.com/fatedier/frp/client/health"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/xlog"
@@ -48,11 +49,11 @@ var (
)
type WorkingStatus struct {
Name string `json:"name"`
Type string `json:"type"`
Phase string `json:"status"`
Err string `json:"err"`
Cfg config.ProxyConf `json:"cfg"`
Name string `json:"name"`
Type string `json:"type"`
Phase string `json:"status"`
Err string `json:"err"`
Cfg v1.ProxyConfigurer `json:"cfg"`
// Got from server.
RemoteAddr string `json:"remote_addr"`
@@ -86,17 +87,17 @@ type Wrapper struct {
func NewWrapper(
ctx context.Context,
cfg config.ProxyConf,
clientCfg config.ClientCommonConf,
cfg v1.ProxyConfigurer,
clientCfg *v1.ClientCommonConfig,
eventHandler event.Handler,
msgTransporter transport.MessageTransporter,
) *Wrapper {
baseInfo := cfg.GetBaseConfig()
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(baseInfo.ProxyName)
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(baseInfo.Name)
pw := &Wrapper{
WorkingStatus: WorkingStatus{
Name: baseInfo.ProxyName,
Type: baseInfo.ProxyType,
Name: baseInfo.Name,
Type: baseInfo.Type,
Phase: ProxyPhaseNew,
Cfg: cfg,
},
@@ -108,11 +109,11 @@ func NewWrapper(
ctx: xlog.NewContext(ctx, xl),
}
if baseInfo.HealthCheckType != "" {
if baseInfo.HealthCheck.Type != "" && baseInfo.LocalPort > 0 {
pw.health = 1 // means failed
pw.monitor = health.NewMonitor(pw.ctx, baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS,
baseInfo.HealthCheckTimeoutS, baseInfo.HealthCheckMaxFailed, baseInfo.HealthCheckAddr,
baseInfo.HealthCheckURL, pw.statusNormalCallback, pw.statusFailedCallback)
addr := net.JoinHostPort(baseInfo.LocalIP, strconv.Itoa(baseInfo.LocalPort))
pw.monitor = health.NewMonitor(pw.ctx, baseInfo.HealthCheck, addr,
pw.statusNormalCallback, pw.statusFailedCallback)
xl.Trace("enable health check monitor")
}

View File

@@ -25,7 +25,7 @@ import (
"github.com/fatedier/golib/errors"
libio "github.com/fatedier/golib/io"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
@@ -33,21 +33,21 @@ import (
)
func init() {
RegisterProxyFactory(reflect.TypeOf(&config.SUDPProxyConf{}), NewSUDPProxy)
RegisterProxyFactory(reflect.TypeOf(&v1.SUDPProxyConfig{}), NewSUDPProxy)
}
type SUDPProxy struct {
*BaseProxy
cfg *config.SUDPProxyConf
cfg *v1.SUDPProxyConfig
localAddr *net.UDPAddr
closeCh chan struct{}
}
func NewSUDPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
unwrapped, ok := cfg.(*config.SUDPProxyConf)
func NewSUDPProxy(baseProxy *BaseProxy, cfg v1.ProxyConfigurer) Proxy {
unwrapped, ok := cfg.(*v1.SUDPProxyConfig)
if !ok {
return nil
}
@@ -88,15 +88,15 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
return conn.Close()
})
}
if pxy.cfg.UseEncryption {
rwc, err = libio.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
if pxy.cfg.Transport.UseEncryption {
rwc, err = libio.WithEncryption(rwc, []byte(pxy.clientCfg.Auth.Token))
if err != nil {
conn.Close()
xl.Error("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.UseCompression {
if pxy.cfg.Transport.UseCompression {
rwc = libio.WithCompression(rwc)
}
conn = utilnet.WrapReadWriteCloserToConn(rwc, conn)

View File

@@ -24,7 +24,7 @@ import (
"github.com/fatedier/golib/errors"
libio "github.com/fatedier/golib/io"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
@@ -32,13 +32,13 @@ import (
)
func init() {
RegisterProxyFactory(reflect.TypeOf(&config.UDPProxyConf{}), NewUDPProxy)
RegisterProxyFactory(reflect.TypeOf(&v1.UDPProxyConfig{}), NewUDPProxy)
}
type UDPProxy struct {
*BaseProxy
cfg *config.UDPProxyConf
cfg *v1.UDPProxyConfig
localAddr *net.UDPAddr
readCh chan *msg.UDPPacket
@@ -49,8 +49,8 @@ type UDPProxy struct {
closed bool
}
func NewUDPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
unwrapped, ok := cfg.(*config.UDPProxyConf)
func NewUDPProxy(baseProxy *BaseProxy, cfg v1.ProxyConfigurer) Proxy {
unwrapped, ok := cfg.(*v1.UDPProxyConfig)
if !ok {
return nil
}
@@ -99,15 +99,15 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
return conn.Close()
})
}
if pxy.cfg.UseEncryption {
rwc, err = libio.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
if pxy.cfg.Transport.UseEncryption {
rwc, err = libio.WithEncryption(rwc, []byte(pxy.clientCfg.Auth.Token))
if err != nil {
conn.Close()
xl.Error("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.UseCompression {
if pxy.cfg.Transport.UseCompression {
rwc = libio.WithCompression(rwc)
}
conn = utilnet.WrapReadWriteCloserToConn(rwc, conn)

View File

@@ -23,7 +23,7 @@ import (
fmux "github.com/hashicorp/yamux"
"github.com/quic-go/quic-go"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/nathole"
"github.com/fatedier/frp/pkg/transport"
@@ -31,17 +31,17 @@ import (
)
func init() {
RegisterProxyFactory(reflect.TypeOf(&config.XTCPProxyConf{}), NewXTCPProxy)
RegisterProxyFactory(reflect.TypeOf(&v1.XTCPProxyConfig{}), NewXTCPProxy)
}
type XTCPProxy struct {
*BaseProxy
cfg *config.XTCPProxyConf
cfg *v1.XTCPProxyConfig
}
func NewXTCPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
unwrapped, ok := cfg.(*config.XTCPProxyConf)
func NewXTCPProxy(baseProxy *BaseProxy, cfg v1.ProxyConfigurer) Proxy {
unwrapped, ok := cfg.(*v1.XTCPProxyConfig)
if !ok {
return nil
}
@@ -75,7 +75,7 @@ func (pxy *XTCPProxy) InWorkConn(conn net.Conn, startWorkConnMsg *msg.StartWorkC
transactionID := nathole.NewTransactionID()
natHoleClientMsg := &msg.NatHoleClient{
TransactionID: transactionID,
ProxyName: pxy.cfg.ProxyName,
ProxyName: pxy.cfg.Name,
Sid: natHoleSidMsg.Sid,
MappedAddrs: prepareResult.Addrs,
AssistedAddrs: prepareResult.AssistedAddrs,
@@ -93,7 +93,7 @@ func (pxy *XTCPProxy) InWorkConn(conn net.Conn, startWorkConnMsg *msg.StartWorkC
natHoleRespMsg.AssistedAddrs, natHoleRespMsg.DetectBehavior)
listenConn := prepareResult.ListenConn
newListenConn, raddr, err := nathole.MakeHole(pxy.ctx, listenConn, natHoleRespMsg, []byte(pxy.cfg.Sk))
newListenConn, raddr, err := nathole.MakeHole(pxy.ctx, listenConn, natHoleRespMsg, []byte(pxy.cfg.Secretkey))
if err != nil {
listenConn.Close()
xl.Warn("make hole error: %v", err)
@@ -154,7 +154,7 @@ func (pxy *XTCPProxy) listenByKCP(listenConn *net.UDPConn, raddr *net.UDPAddr, s
xl.Error("accept connection error: %v", err)
return
}
go pxy.HandleTCPWorkConnection(muxConn, startWorkConnMsg, []byte(pxy.cfg.Sk))
go pxy.HandleTCPWorkConnection(muxConn, startWorkConnMsg, []byte(pxy.cfg.Secretkey))
}
}
@@ -170,9 +170,9 @@ func (pxy *XTCPProxy) listenByQUIC(listenConn *net.UDPConn, _ *net.UDPAddr, star
tlsConfig.NextProtos = []string{"frp"}
quicListener, err := quic.Listen(listenConn, tlsConfig,
&quic.Config{
MaxIdleTimeout: time.Duration(pxy.clientCfg.QUICMaxIdleTimeout) * time.Second,
MaxIncomingStreams: int64(pxy.clientCfg.QUICMaxIncomingStreams),
KeepAlivePeriod: time.Duration(pxy.clientCfg.QUICKeepalivePeriod) * time.Second,
MaxIdleTimeout: time.Duration(pxy.clientCfg.Transport.QUIC.MaxIdleTimeout) * time.Second,
MaxIncomingStreams: int64(pxy.clientCfg.Transport.QUIC.MaxIncomingStreams),
KeepAlivePeriod: time.Duration(pxy.clientCfg.Transport.QUIC.KeepalivePeriod) * time.Second,
},
)
if err != nil {
@@ -192,6 +192,6 @@ func (pxy *XTCPProxy) listenByQUIC(listenConn *net.UDPConn, _ *net.UDPAddr, star
_ = c.CloseWithError(0, "")
return
}
go pxy.HandleTCPWorkConnection(utilnet.QuicStreamToNetConn(stream, c), startWorkConnMsg, []byte(pxy.cfg.Sk))
go pxy.HandleTCPWorkConnection(utilnet.QuicStreamToNetConn(stream, c), startWorkConnMsg, []byte(pxy.cfg.Secretkey))
}
}

View File

@@ -19,7 +19,6 @@ import (
"crypto/tls"
"fmt"
"io"
"math/rand"
"net"
"runtime"
"strconv"
@@ -32,10 +31,11 @@ import (
libdial "github.com/fatedier/golib/net/dial"
fmux "github.com/hashicorp/yamux"
quic "github.com/quic-go/quic-go"
"github.com/samber/lo"
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/pkg/auth"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/log"
@@ -47,8 +47,6 @@ import (
func init() {
crypto.DefaultSalt = "frp"
// TODO: remove this when we drop support for go1.19
rand.Seed(time.Now().UnixNano())
}
// Service is a client service.
@@ -63,9 +61,9 @@ type Service struct {
// Sets authentication based on selected method
authSetter auth.Setter
cfg config.ClientCommonConf
pxyCfgs map[string]config.ProxyConf
visitorCfgs map[string]config.VisitorConf
cfg *v1.ClientCommonConfig
pxyCfgs []v1.ProxyConfigurer
visitorCfgs []v1.VisitorConfigurer
cfgMu sync.RWMutex
// The configuration file used to initialize this client, or an empty
@@ -81,13 +79,13 @@ type Service struct {
}
func NewService(
cfg config.ClientCommonConf,
pxyCfgs map[string]config.ProxyConf,
visitorCfgs map[string]config.VisitorConf,
cfg *v1.ClientCommonConfig,
pxyCfgs []v1.ProxyConfigurer,
visitorCfgs []v1.VisitorConfigurer,
cfgFile string,
) (svr *Service, err error) {
svr = &Service{
authSetter: auth.NewAuthSetter(cfg.ClientConfig),
authSetter: auth.NewAuthSetter(cfg.Auth),
cfg: cfg,
cfgFile: cfgFile,
pxyCfgs: pxyCfgs,
@@ -134,7 +132,7 @@ func (svr *Service) Run(ctx context.Context) error {
// if login_fail_exit is true, just exit this program
// otherwise sleep a while and try again to connect to server
if svr.cfg.LoginFailExit {
if lo.FromPtr(svr.cfg.LoginFailExit) {
return err
}
util.RandomSleep(5*time.Second, 0.9, 1.1)
@@ -151,16 +149,16 @@ func (svr *Service) Run(ctx context.Context) error {
go svr.keepControllerWorking()
if svr.cfg.AdminPort != 0 {
if svr.cfg.WebServer.Port != 0 {
// Init admin server assets
assets.Load(svr.cfg.AssetsDir)
assets.Load(svr.cfg.WebServer.AssetsDir)
address := net.JoinHostPort(svr.cfg.AdminAddr, strconv.Itoa(svr.cfg.AdminPort))
address := net.JoinHostPort(svr.cfg.WebServer.Addr, strconv.Itoa(svr.cfg.WebServer.Port))
err := svr.RunAdminServer(address)
if err != nil {
log.Warn("run admin server error: %v", err)
}
log.Info("admin server listen on %s:%d", svr.cfg.AdminAddr, svr.cfg.AdminPort)
log.Info("admin server listen on %s:%d", svr.cfg.WebServer.Addr, svr.cfg.WebServer.Port)
}
<-svr.ctx.Done()
// service context may not be canceled by svr.Close(), we should call it here to release resources
@@ -244,7 +242,7 @@ func (svr *Service) keepControllerWorking() {
// session: if it's not nil, using tcp mux
func (svr *Service) login() (conn net.Conn, cm *ConnectionManager, err error) {
xl := xlog.FromContextSafe(svr.ctx)
cm = NewConnectionManager(svr.ctx, &svr.cfg)
cm = NewConnectionManager(svr.ctx, svr.cfg)
if err = cm.OpenConnection(); err != nil {
return nil, nil, err
@@ -264,12 +262,12 @@ func (svr *Service) login() (conn net.Conn, cm *ConnectionManager, err error) {
loginMsg := &msg.Login{
Arch: runtime.GOARCH,
Os: runtime.GOOS,
PoolCount: svr.cfg.PoolCount,
PoolCount: svr.cfg.Transport.PoolCount,
User: svr.cfg.User,
Version: version.Full(),
Timestamp: time.Now().Unix(),
RunID: svr.runID,
Metas: svr.cfg.Metas,
Metas: svr.cfg.Metadatas,
}
// Add auth
@@ -302,7 +300,7 @@ func (svr *Service) login() (conn net.Conn, cm *ConnectionManager, err error) {
return
}
func (svr *Service) ReloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf) error {
func (svr *Service) ReloadConf(pxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) error {
svr.cfgMu.Lock()
svr.pxyCfgs = pxyCfgs
svr.visitorCfgs = visitorCfgs
@@ -339,13 +337,13 @@ func (svr *Service) GracefulClose(d time.Duration) {
type ConnectionManager struct {
ctx context.Context
cfg *config.ClientCommonConf
cfg *v1.ClientCommonConfig
muxSession *fmux.Session
quicConn quic.Connection
}
func NewConnectionManager(ctx context.Context, cfg *config.ClientCommonConf) *ConnectionManager {
func NewConnectionManager(ctx context.Context, cfg *v1.ClientCommonConfig) *ConnectionManager {
return &ConnectionManager{
ctx: ctx,
cfg: cfg,
@@ -356,18 +354,18 @@ func (cm *ConnectionManager) OpenConnection() error {
xl := xlog.FromContextSafe(cm.ctx)
// special for quic
if strings.EqualFold(cm.cfg.Protocol, "quic") {
if strings.EqualFold(cm.cfg.Transport.Protocol, "quic") {
var tlsConfig *tls.Config
var err error
sn := cm.cfg.TLSServerName
sn := cm.cfg.Transport.TLS.ServerName
if sn == "" {
sn = cm.cfg.ServerAddr
}
if cm.cfg.TLSEnable {
if lo.FromPtr(cm.cfg.Transport.TLS.Enable) {
tlsConfig, err = transport.NewClientTLSConfig(
cm.cfg.TLSCertFile,
cm.cfg.TLSKeyFile,
cm.cfg.TLSTrustedCaFile,
cm.cfg.Transport.TLS.CertFile,
cm.cfg.Transport.TLS.KeyFile,
cm.cfg.Transport.TLS.TrustedCaFile,
sn)
} else {
tlsConfig, err = transport.NewClientTLSConfig("", "", "", sn)
@@ -382,9 +380,9 @@ func (cm *ConnectionManager) OpenConnection() error {
cm.ctx,
net.JoinHostPort(cm.cfg.ServerAddr, strconv.Itoa(cm.cfg.ServerPort)),
tlsConfig, &quic.Config{
MaxIdleTimeout: time.Duration(cm.cfg.QUICMaxIdleTimeout) * time.Second,
MaxIncomingStreams: int64(cm.cfg.QUICMaxIncomingStreams),
KeepAlivePeriod: time.Duration(cm.cfg.QUICKeepalivePeriod) * time.Second,
MaxIdleTimeout: time.Duration(cm.cfg.Transport.QUIC.MaxIdleTimeout) * time.Second,
MaxIncomingStreams: int64(cm.cfg.Transport.QUIC.MaxIncomingStreams),
KeepAlivePeriod: time.Duration(cm.cfg.Transport.QUIC.KeepalivePeriod) * time.Second,
})
if err != nil {
return err
@@ -393,7 +391,7 @@ func (cm *ConnectionManager) OpenConnection() error {
return nil
}
if !cm.cfg.TCPMux {
if !lo.FromPtr(cm.cfg.Transport.TCPMux) {
return nil
}
@@ -403,7 +401,7 @@ func (cm *ConnectionManager) OpenConnection() error {
}
fmuxCfg := fmux.DefaultConfig()
fmuxCfg.KeepAliveInterval = time.Duration(cm.cfg.TCPMuxKeepaliveInterval) * time.Second
fmuxCfg.KeepAliveInterval = time.Duration(cm.cfg.Transport.TCPMuxKeepaliveInterval) * time.Second
fmuxCfg.LogOutput = io.Discard
fmuxCfg.MaxStreamWindowSize = 6 * 1024 * 1024
session, err := fmux.Client(conn, fmuxCfg)
@@ -436,20 +434,20 @@ func (cm *ConnectionManager) realConnect() (net.Conn, error) {
xl := xlog.FromContextSafe(cm.ctx)
var tlsConfig *tls.Config
var err error
tlsEnable := cm.cfg.TLSEnable
if cm.cfg.Protocol == "wss" {
tlsEnable := lo.FromPtr(cm.cfg.Transport.TLS.Enable)
if cm.cfg.Transport.Protocol == "wss" {
tlsEnable = true
}
if tlsEnable {
sn := cm.cfg.TLSServerName
sn := cm.cfg.Transport.TLS.ServerName
if sn == "" {
sn = cm.cfg.ServerAddr
}
tlsConfig, err = transport.NewClientTLSConfig(
cm.cfg.TLSCertFile,
cm.cfg.TLSKeyFile,
cm.cfg.TLSTrustedCaFile,
cm.cfg.Transport.TLS.CertFile,
cm.cfg.Transport.TLS.KeyFile,
cm.cfg.Transport.TLS.TrustedCaFile,
sn)
if err != nil {
xl.Warn("fail to build tls configuration, err: %v", err)
@@ -457,19 +455,19 @@ func (cm *ConnectionManager) realConnect() (net.Conn, error) {
}
}
proxyType, addr, auth, err := libdial.ParseProxyURL(cm.cfg.HTTPProxy)
proxyType, addr, auth, err := libdial.ParseProxyURL(cm.cfg.Transport.ProxyURL)
if err != nil {
xl.Error("fail to parse proxy url")
return nil, err
}
dialOptions := []libdial.DialOption{}
protocol := cm.cfg.Protocol
protocol := cm.cfg.Transport.Protocol
switch protocol {
case "websocket":
protocol = "tcp"
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{Hook: utilnet.DialHookWebsocket(protocol, "")}))
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{
Hook: utilnet.DialHookCustomTLSHeadByte(tlsConfig != nil, cm.cfg.DisableCustomTLSFirstByte),
Hook: utilnet.DialHookCustomTLSHeadByte(tlsConfig != nil, lo.FromPtr(cm.cfg.Transport.TLS.DisableCustomTLSFirstByte)),
}))
dialOptions = append(dialOptions, libdial.WithTLSConfig(tlsConfig))
case "wss":
@@ -481,13 +479,13 @@ func (cm *ConnectionManager) realConnect() (net.Conn, error) {
dialOptions = append(dialOptions, libdial.WithTLSConfig(tlsConfig))
}
if cm.cfg.ConnectServerLocalIP != "" {
dialOptions = append(dialOptions, libdial.WithLocalAddr(cm.cfg.ConnectServerLocalIP))
if cm.cfg.Transport.ConnectServerLocalIP != "" {
dialOptions = append(dialOptions, libdial.WithLocalAddr(cm.cfg.Transport.ConnectServerLocalIP))
}
dialOptions = append(dialOptions,
libdial.WithProtocol(protocol),
libdial.WithTimeout(time.Duration(cm.cfg.DialServerTimeout)*time.Second),
libdial.WithKeepAlive(time.Duration(cm.cfg.DialServerKeepAlive)*time.Second),
libdial.WithTimeout(time.Duration(cm.cfg.Transport.DialServerTimeout)*time.Second),
libdial.WithKeepAlive(time.Duration(cm.cfg.Transport.DialServerKeepAlive)*time.Second),
libdial.WithProxy(proxyType, addr),
libdial.WithProxyAuth(auth),
)

View File

@@ -22,7 +22,7 @@ import (
libio "github.com/fatedier/golib/io"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/xlog"
@@ -31,7 +31,7 @@ import (
type STCPVisitor struct {
*BaseVisitor
cfg *config.STCPVisitorConf
cfg *v1.STCPVisitorConfig
}
func (sv *STCPVisitor) Run() (err error) {
@@ -90,10 +90,10 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
newVisitorConnMsg := &msg.NewVisitorConn{
RunID: sv.helper.RunID(),
ProxyName: sv.cfg.ServerName,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
SignKey: util.GetAuthKey(sv.cfg.SecretKey, now),
Timestamp: now,
UseEncryption: sv.cfg.UseEncryption,
UseCompression: sv.cfg.UseCompression,
UseEncryption: sv.cfg.Transport.UseEncryption,
UseCompression: sv.cfg.Transport.UseCompression,
}
err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
if err != nil {
@@ -117,15 +117,15 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
var remote io.ReadWriteCloser
remote = visitorConn
if sv.cfg.UseEncryption {
remote, err = libio.WithEncryption(remote, []byte(sv.cfg.Sk))
if sv.cfg.Transport.UseEncryption {
remote, err = libio.WithEncryption(remote, []byte(sv.cfg.SecretKey))
if err != nil {
xl.Error("create encryption stream error: %v", err)
return
}
}
if sv.cfg.UseCompression {
if sv.cfg.Transport.UseCompression {
var recycleFn func()
remote, recycleFn = libio.WithCompressionFromPool(remote)
defer recycleFn()

View File

@@ -25,7 +25,7 @@ import (
"github.com/fatedier/golib/errors"
libio "github.com/fatedier/golib/io"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
utilnet "github.com/fatedier/frp/pkg/util/net"
@@ -42,7 +42,7 @@ type SUDPVisitor struct {
readCh chan *msg.UDPPacket
sendCh chan *msg.UDPPacket
cfg *config.SUDPVisitorConf
cfg *v1.SUDPVisitorConfig
}
// SUDP Run start listen a udp port
@@ -208,10 +208,10 @@ func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
newVisitorConnMsg := &msg.NewVisitorConn{
RunID: sv.helper.RunID(),
ProxyName: sv.cfg.ServerName,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
SignKey: util.GetAuthKey(sv.cfg.SecretKey, now),
Timestamp: now,
UseEncryption: sv.cfg.UseEncryption,
UseCompression: sv.cfg.UseCompression,
UseEncryption: sv.cfg.Transport.UseEncryption,
UseCompression: sv.cfg.Transport.UseCompression,
}
err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
if err != nil {
@@ -232,14 +232,14 @@ func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
var remote io.ReadWriteCloser
remote = visitorConn
if sv.cfg.UseEncryption {
remote, err = libio.WithEncryption(remote, []byte(sv.cfg.Sk))
if sv.cfg.Transport.UseEncryption {
remote, err = libio.WithEncryption(remote, []byte(sv.cfg.SecretKey))
if err != nil {
xl.Error("create encryption stream error: %v", err)
return nil, err
}
}
if sv.cfg.UseCompression {
if sv.cfg.Transport.UseCompression {
remote = libio.WithCompression(remote)
}
return utilnet.WrapReadWriteCloserToConn(remote, visitorConn), nil

View File

@@ -19,7 +19,7 @@ import (
"net"
"sync"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
@@ -47,11 +47,11 @@ type Visitor interface {
func NewVisitor(
ctx context.Context,
cfg config.VisitorConf,
clientCfg config.ClientCommonConf,
cfg v1.VisitorConfigurer,
clientCfg *v1.ClientCommonConfig,
helper Helper,
) (visitor Visitor) {
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(cfg.GetBaseConfig().ProxyName)
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(cfg.GetBaseConfig().Name)
baseVisitor := BaseVisitor{
clientCfg: clientCfg,
helper: helper,
@@ -59,18 +59,18 @@ func NewVisitor(
internalLn: utilnet.NewInternalListener(),
}
switch cfg := cfg.(type) {
case *config.STCPVisitorConf:
case *v1.STCPVisitorConfig:
visitor = &STCPVisitor{
BaseVisitor: &baseVisitor,
cfg: cfg,
}
case *config.XTCPVisitorConf:
case *v1.XTCPVisitorConfig:
visitor = &XTCPVisitor{
BaseVisitor: &baseVisitor,
cfg: cfg,
startTunnelCh: make(chan struct{}),
}
case *config.SUDPVisitorConf:
case *v1.SUDPVisitorConfig:
visitor = &SUDPVisitor{
BaseVisitor: &baseVisitor,
cfg: cfg,
@@ -81,7 +81,7 @@ func NewVisitor(
}
type BaseVisitor struct {
clientCfg config.ClientCommonConf
clientCfg *v1.ClientCommonConfig
helper Helper
l net.Listener
internalLn *utilnet.InternalListener

View File

@@ -22,14 +22,16 @@ import (
"sync"
"time"
"github.com/fatedier/frp/pkg/config"
"github.com/samber/lo"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/xlog"
)
type Manager struct {
clientCfg config.ClientCommonConf
cfgs map[string]config.VisitorConf
clientCfg *v1.ClientCommonConfig
cfgs map[string]v1.VisitorConfigurer
visitors map[string]Visitor
helper Helper
@@ -44,13 +46,13 @@ type Manager struct {
func NewManager(
ctx context.Context,
runID string,
clientCfg config.ClientCommonConf,
clientCfg *v1.ClientCommonConfig,
connectServer func() (net.Conn, error),
msgTransporter transport.MessageTransporter,
) *Manager {
m := &Manager{
clientCfg: clientCfg,
cfgs: make(map[string]config.VisitorConf),
cfgs: make(map[string]v1.VisitorConfigurer),
visitors: make(map[string]Visitor),
checkInterval: 10 * time.Second,
ctx: ctx,
@@ -79,7 +81,7 @@ func (vm *Manager) Run() {
case <-ticker.C:
vm.mu.Lock()
for _, cfg := range vm.cfgs {
name := cfg.GetBaseConfig().ProxyName
name := cfg.GetBaseConfig().Name
if _, exist := vm.visitors[name]; !exist {
xl.Info("try to start visitor [%s]", name)
_ = vm.startVisitor(cfg)
@@ -104,9 +106,9 @@ func (vm *Manager) Close() {
}
// Hold lock before calling this function.
func (vm *Manager) startVisitor(cfg config.VisitorConf) (err error) {
func (vm *Manager) startVisitor(cfg v1.VisitorConfigurer) (err error) {
xl := xlog.FromContextSafe(vm.ctx)
name := cfg.GetBaseConfig().ProxyName
name := cfg.GetBaseConfig().Name
visitor := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.helper)
err = visitor.Run()
if err != nil {
@@ -118,15 +120,18 @@ func (vm *Manager) startVisitor(cfg config.VisitorConf) (err error) {
return
}
func (vm *Manager) Reload(cfgs map[string]config.VisitorConf) {
func (vm *Manager) Reload(cfgs []v1.VisitorConfigurer) {
xl := xlog.FromContextSafe(vm.ctx)
cfgsMap := lo.KeyBy(cfgs, func(c v1.VisitorConfigurer) string {
return c.GetBaseConfig().Name
})
vm.mu.Lock()
defer vm.mu.Unlock()
delNames := make([]string, 0)
for name, oldCfg := range vm.cfgs {
del := false
cfg, ok := cfgs[name]
cfg, ok := cfgsMap[name]
if !ok || !reflect.DeepEqual(oldCfg, cfg) {
del = true
}
@@ -145,7 +150,8 @@ func (vm *Manager) Reload(cfgs map[string]config.VisitorConf) {
}
addNames := make([]string, 0)
for name, cfg := range cfgs {
for _, cfg := range cfgs {
name := cfg.GetBaseConfig().Name
if _, ok := vm.cfgs[name]; !ok {
vm.cfgs[name] = cfg
addNames = append(addNames, name)

View File

@@ -29,7 +29,7 @@ import (
quic "github.com/quic-go/quic-go"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/nathole"
"github.com/fatedier/frp/pkg/transport"
@@ -47,7 +47,7 @@ type XTCPVisitor struct {
retryLimiter *rate.Limiter
cancel context.CancelFunc
cfg *config.XTCPVisitorConf
cfg *v1.XTCPVisitorConfig
}
func (sv *XTCPVisitor) Run() (err error) {
@@ -56,7 +56,7 @@ func (sv *XTCPVisitor) Run() (err error) {
if sv.cfg.Protocol == "kcp" {
sv.session = NewKCPTunnelSession()
} else {
sv.session = NewQUICTunnelSession(&sv.clientCfg)
sv.session = NewQUICTunnelSession(sv.clientCfg)
}
if sv.cfg.BindPort > 0 {
@@ -192,14 +192,14 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
}
var muxConnRWCloser io.ReadWriteCloser = tunnelConn
if sv.cfg.UseEncryption {
muxConnRWCloser, err = libio.WithEncryption(muxConnRWCloser, []byte(sv.cfg.Sk))
if sv.cfg.Transport.UseEncryption {
muxConnRWCloser, err = libio.WithEncryption(muxConnRWCloser, []byte(sv.cfg.SecretKey))
if err != nil {
xl.Error("create encryption stream error: %v", err)
return
}
}
if sv.cfg.UseCompression {
if sv.cfg.Transport.UseCompression {
var recycleFn func()
muxConnRWCloser, recycleFn = libio.WithCompressionFromPool(muxConnRWCloser)
defer recycleFn()
@@ -292,7 +292,7 @@ func (sv *XTCPVisitor) makeNatHole() {
TransactionID: transactionID,
ProxyName: sv.cfg.ServerName,
Protocol: sv.cfg.Protocol,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
SignKey: util.GetAuthKey(sv.cfg.SecretKey, now),
Timestamp: now,
MappedAddrs: prepareResult.Addrs,
AssistedAddrs: prepareResult.AssistedAddrs,
@@ -310,7 +310,7 @@ func (sv *XTCPVisitor) makeNatHole() {
natHoleRespMsg.Sid, natHoleRespMsg.Protocol, natHoleRespMsg.CandidateAddrs,
natHoleRespMsg.AssistedAddrs, natHoleRespMsg.DetectBehavior)
newListenConn, raddr, err := nathole.MakeHole(sv.ctx, listenConn, natHoleRespMsg, []byte(sv.cfg.Sk))
newListenConn, raddr, err := nathole.MakeHole(sv.ctx, listenConn, natHoleRespMsg, []byte(sv.cfg.SecretKey))
if err != nil {
listenConn.Close()
xl.Warn("make hole error: %v", err)
@@ -398,10 +398,10 @@ type QUICTunnelSession struct {
listenConn *net.UDPConn
mu sync.RWMutex
clientCfg *config.ClientCommonConf
clientCfg *v1.ClientCommonConfig
}
func NewQUICTunnelSession(clientCfg *config.ClientCommonConf) TunnelSession {
func NewQUICTunnelSession(clientCfg *v1.ClientCommonConfig) TunnelSession {
return &QUICTunnelSession{
clientCfg: clientCfg,
}
@@ -415,9 +415,9 @@ func (qs *QUICTunnelSession) Init(listenConn *net.UDPConn, raddr *net.UDPAddr) e
tlsConfig.NextProtos = []string{"frp"}
quicConn, err := quic.Dial(context.Background(), listenConn, raddr, tlsConfig,
&quic.Config{
MaxIdleTimeout: time.Duration(qs.clientCfg.QUICMaxIdleTimeout) * time.Second,
MaxIncomingStreams: int64(qs.clientCfg.QUICMaxIncomingStreams),
KeepAlivePeriod: time.Duration(qs.clientCfg.QUICKeepalivePeriod) * time.Second,
MaxIdleTimeout: time.Duration(qs.clientCfg.Transport.QUIC.MaxIdleTimeout) * time.Second,
MaxIncomingStreams: int64(qs.clientCfg.Transport.QUIC.MaxIncomingStreams),
KeepAlivePeriod: time.Duration(qs.clientCfg.Transport.QUIC.KeepalivePeriod) * time.Second,
})
if err != nil {
return fmt.Errorf("dial quic error: %v", err)