Compare commits

..

44 Commits

Author SHA1 Message Date
fatedier
0d6d968fe8 Merge pull request #3454 from fatedier/dev
release v0.49.0
2023-05-29 01:12:26 +08:00
fatedier
8fb99ef7a9 Merge pull request #3348 from fatedier/dev
bump version
2023-03-08 11:40:31 +08:00
fatedier
88e74ff24d Merge pull request #3300 from fatedier/dev
sync
2023-02-10 01:12:00 +08:00
fatedier
534dc99d55 Merge pull request #3299 from fatedier/dev
sync
2023-02-09 23:06:14 +08:00
fatedier
595aba5a9b Merge pull request #3248 from fatedier/dev
bump version
2023-01-10 10:26:56 +08:00
fatedier
a4189ba474 Merge branch 'dev' 2022-12-18 19:27:22 +08:00
fatedier
9ec84f8143 Merge pull request #3218 from fatedier/dev
release v0.46.0
2022-12-18 18:46:52 +08:00
fatedier
8ab474cc97 remove unsupported platform (#3148) 2022-10-27 10:22:47 +08:00
fatedier
a301046f3d Merge pull request #3147 from fatedier/dev
bump version
2022-10-26 23:18:40 +08:00
fatedier
8888610d83 Merge pull request #3010 from fatedier/dev
release v0.44.0
2022-07-11 00:10:43 +08:00
fatedier
fe5fb0326b Merge pull request #2955 from fatedier/dev
bump version to v0.43.0
2022-05-27 16:27:19 +08:00
fatedier
eb1e19a821 Merge pull request #2906 from fatedier/dev
bump version
2022-04-22 11:32:27 +08:00
fatedier
10f2620131 Merge pull request #2869 from fatedier/dev
bump version to v0.41.0
2022-03-23 21:19:59 +08:00
fatedier
ce677820c6 Merge pull request #2834 from fatedier/dev
bump version
2022-03-11 19:51:32 +08:00
fatedier
88fcc079e8 Merge pull request #2792 from fatedier/dev
bump version
2022-02-09 16:11:20 +08:00
fatedier
2dab5d0bca Merge pull request #2782 from fatedier/dev
bump version
2022-01-26 20:17:54 +08:00
fatedier
143750901e Merge pull request #2638 from fatedier/dev
bump version to v0.38.0
2021-10-25 20:31:13 +08:00
fatedier
997d406ec2 Merge pull request #2508 from fatedier/dev
bump version
2021-08-03 23:13:31 +08:00
fatedier
cfd1a3128a Merge pull request #2426 from fatedier/dev
update workflow file
2021-06-03 00:59:21 +08:00
fatedier
57577ea044 Merge pull request #2425 from fatedier/dev
bump version
2021-06-03 00:14:32 +08:00
fatedier
c5c79e4148 Merge pull request #2324 from fatedier/dev
bump version v0.36.2
2021-03-22 14:56:48 +08:00
fatedier
55da58eca4 Merge pull request #2310 from fatedier/dev
bump version
2021-03-18 11:14:56 +08:00
fatedier
76a1efccd9 update 2021-03-17 11:43:23 +08:00
fatedier
980f084ad1 Merge pull request #2302 from fatedier/dev
bump version
2021-03-15 21:54:52 +08:00
fatedier
3bf1eb8565 Merge pull request #2216 from fatedier/dev
bump version
2021-01-25 16:15:52 +08:00
fatedier
b2ae433e18 Merge pull request #2206 from fatedier/dev
bump version
2021-01-19 20:56:06 +08:00
fatedier
aa0a41ee4e Merge pull request #2088 from fatedier/dev
bump version to v0.34.3
2020-11-20 17:04:55 +08:00
fatedier
1ea1530b36 Merge pull request #2058 from fatedier/dev
bump version to v0.34.2
2020-11-06 14:50:50 +08:00
fatedier
e0c45a1aca Merge pull request #2018 from fatedier/dev
bump version to v0.34.1
2020-09-30 15:13:08 +08:00
fatedier
813c45f5c2 Merge pull request #1993 from fatedier/dev
bump version to v0.34.0
2020-09-20 00:30:51 +08:00
fatedier
aa74dc4646 Merge pull request #1990 from fatedier/dev
bump version to v0.34.0
2020-09-20 00:10:32 +08:00
fatedier
2406ecdfea Merge pull request #1780 from fatedier/dev
bump version
2020-04-27 16:50:34 +08:00
fatedier
8668fef136 Merge pull request #1728 from fatedier/dev
bump version to v0.32.1
2020-04-03 01:14:58 +08:00
fatedier
ea62bc5a34 remove vendor (#1697) 2020-03-11 14:39:43 +08:00
fatedier
23bb76397a Merge pull request #1696 from fatedier/dev
bump version to v0.32.0
2020-03-11 14:30:47 +08:00
fatedier
487c8d7c29 Merge pull request #1637 from fatedier/dev
bump version to v0.31.2
2020-02-04 21:54:28 +08:00
fatedier
f480160e2d Merge pull request #1596 from fatedier/dev
v0.31.1, fix bugs
2020-01-06 15:55:44 +08:00
fatedier
30c246c488 Merge pull request #1588 from fatedier/dev
bump version to v0.31.0
2020-01-03 11:45:22 +08:00
fatedier
75f3bce04d Merge pull request #1542 from fatedier/dev
bump version to v0.30.0
2019-11-28 14:21:27 +08:00
fatedier
adc3adc13b Merge pull request #1494 from fatedier/dev
bump version to v0.29.1
2019-11-02 21:14:50 +08:00
fatedier
e62d9a5242 Merge pull request #1415 from fatedier/dev
bump version to v0.29.0
2019-08-29 21:22:30 +08:00
fatedier
134a46c00b Merge pull request #1369 from fatedier/dev
bump version to v0.28.2
2019-08-09 12:59:13 +08:00
fatedier
ae08811636 Merge pull request #1364 from fatedier/dev
bump version to v0.28.1 and remove support for go1.11
2019-08-08 17:32:57 +08:00
fatedier
6451583e60 Merge pull request #1349 from fatedier/dev
bump version to v0.28.0
2019-08-01 14:04:55 +08:00
71 changed files with 1078 additions and 1131 deletions

1
.gitignore vendored
View File

@@ -29,7 +29,6 @@ packages/
release/
test/bin/
vendor/
lastversion/
dist/
.idea/
.vscode/

View File

@@ -46,23 +46,8 @@ e2e:
e2e-trace:
DEBUG=true LOG_LEVEL=trace ./hack/run-e2e.sh
e2e-compatibility-last-frpc:
if [ ! -d "./lastversion" ]; then \
TARGET_DIRNAME=lastversion ./hack/download.sh; \
fi
FRPC_PATH="`pwd`/lastversion/frpc" ./hack/run-e2e.sh
rm -r ./lastversion
e2e-compatibility-last-frps:
if [ ! -d "./lastversion" ]; then \
TARGET_DIRNAME=lastversion ./hack/download.sh; \
fi
FRPS_PATH="`pwd`/lastversion/frps" ./hack/run-e2e.sh
rm -r ./lastversion
alltest: vet gotest e2e
clean:
rm -f ./bin/frpc
rm -f ./bin/frps
rm -rf ./lastversion

File diff suppressed because one or more lines are too long

View File

@@ -4,7 +4,7 @@
<head>
<meta charset="utf-8">
<title>frps dashboard</title>
<script type="module" crossorigin src="./index-ea3edf22.js"></script>
<script type="module" crossorigin src="./index-93e38bbf.js"></script>
<link rel="stylesheet" href="./index-1e0c7400.css">
</head>

View File

@@ -23,7 +23,7 @@ import (
"github.com/gorilla/mux"
"github.com/fatedier/frp/assets"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
var (
@@ -48,7 +48,7 @@ func (svr *Service) RunAdminServer(address string) (err error) {
subRouter := router.NewRoute().Subrouter()
user, passwd := svr.cfg.AdminUser, svr.cfg.AdminPwd
subRouter.Use(utilnet.NewHTTPAuthMiddleware(user, passwd).SetAuthFailDelay(200 * time.Millisecond).Middleware)
subRouter.Use(frpNet.NewHTTPAuthMiddleware(user, passwd).SetAuthFailDelay(200 * time.Millisecond).Middleware)
// api, see admin_api.go
subRouter.HandleFunc("/api/reload", svr.apiReload).Methods("GET")
@@ -58,7 +58,7 @@ func (svr *Service) RunAdminServer(address string) (err error) {
// view
subRouter.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET")
subRouter.PathPrefix("/static/").Handler(utilnet.MakeHTTPGzipHandler(http.StripPrefix("/static/", http.FileServer(assets.FileSystem)))).Methods("GET")
subRouter.PathPrefix("/static/").Handler(frpNet.MakeHTTPGzipHandler(http.StripPrefix("/static/", http.FileServer(assets.FileSystem)))).Methods("GET")
subRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/static/", http.StatusMovedPermanently)
})

View File

@@ -91,7 +91,7 @@ func NewProxyStatusResp(status *proxy.WorkingStatus, serverAddr string) ProxySta
Status: status.Phase,
Err: status.Err,
}
baseCfg := status.Cfg.GetBaseConfig()
baseCfg := status.Cfg.GetBaseInfo()
if baseCfg.LocalPort != 0 {
psr.LocalAddr = net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort))
}

View File

@@ -109,7 +109,7 @@ func NewControl(
ctl.msgTransporter = transport.NewMessageTransporter(ctl.sendCh)
ctl.pm = proxy.NewManager(ctl.ctx, clientCfg, ctl.msgTransporter)
ctl.vm = visitor.NewManager(ctl.ctx, ctl.runID, ctl.clientCfg, ctl.connectServer, ctl.msgTransporter)
ctl.vm = visitor.NewManager(ctl.ctx, ctl.clientCfg, ctl.connectServer, ctl.msgTransporter)
ctl.vm.Reload(visitorCfgs)
return ctl
}

View File

@@ -1,47 +0,0 @@
// Copyright 2023 The frp Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"reflect"
"github.com/fatedier/frp/pkg/config"
)
func init() {
pxyConfs := []config.ProxyConf{
&config.TCPProxyConf{},
&config.HTTPProxyConf{},
&config.HTTPSProxyConf{},
&config.STCPProxyConf{},
&config.TCPMuxProxyConf{},
}
for _, cfg := range pxyConfs {
RegisterProxyFactory(reflect.TypeOf(cfg), NewGeneralTCPProxy)
}
}
// GeneralTCPProxy is a general implementation of Proxy interface for TCP protocol.
// If the default GeneralTCPProxy cannot meet the requirements, you can customize
// the implementation of the Proxy interface.
type GeneralTCPProxy struct {
*BaseProxy
}
func NewGeneralTCPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
return &GeneralTCPProxy{
BaseProxy: baseProxy,
}
}

View File

@@ -19,13 +19,12 @@ import (
"context"
"io"
"net"
"reflect"
"strconv"
"strings"
"sync"
"time"
libio "github.com/fatedier/golib/io"
frpIo "github.com/fatedier/golib/io"
libdial "github.com/fatedier/golib/net/dial"
pp "github.com/pires/go-proxyproto"
"golang.org/x/time/rate"
@@ -38,12 +37,6 @@ import (
"github.com/fatedier/frp/pkg/util/xlog"
)
var proxyFactoryRegistry = map[reflect.Type]func(*BaseProxy, config.ProxyConf) Proxy{}
func RegisterProxyFactory(proxyConfType reflect.Type, factory func(*BaseProxy, config.ProxyConf) Proxy) {
proxyFactoryRegistry[proxyConfType] = factory
}
// Proxy defines how to handle work connections for different proxy type.
type Proxy interface {
Run() error
@@ -61,94 +54,253 @@ func NewProxy(
msgTransporter transport.MessageTransporter,
) (pxy Proxy) {
var limiter *rate.Limiter
limitBytes := pxyConf.GetBaseConfig().BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseConfig().BandwidthLimitMode == config.BandwidthLimitModeClient {
limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseInfo().BandwidthLimitMode == config.BandwidthLimitModeClient {
limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
}
baseProxy := BaseProxy{
baseProxyConfig: pxyConf.GetBaseConfig(),
clientCfg: clientCfg,
limiter: limiter,
msgTransporter: msgTransporter,
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
clientCfg: clientCfg,
limiter: limiter,
msgTransporter: msgTransporter,
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
}
factory := proxyFactoryRegistry[reflect.TypeOf(pxyConf)]
if factory == nil {
return nil
switch cfg := pxyConf.(type) {
case *config.TCPProxyConf:
pxy = &TCPProxy{
BaseProxy: &baseProxy,
cfg: cfg,
}
case *config.TCPMuxProxyConf:
pxy = &TCPMuxProxy{
BaseProxy: &baseProxy,
cfg: cfg,
}
case *config.UDPProxyConf:
pxy = &UDPProxy{
BaseProxy: &baseProxy,
cfg: cfg,
}
case *config.HTTPProxyConf:
pxy = &HTTPProxy{
BaseProxy: &baseProxy,
cfg: cfg,
}
case *config.HTTPSProxyConf:
pxy = &HTTPSProxy{
BaseProxy: &baseProxy,
cfg: cfg,
}
case *config.STCPProxyConf:
pxy = &STCPProxy{
BaseProxy: &baseProxy,
cfg: cfg,
}
case *config.XTCPProxyConf:
pxy = &XTCPProxy{
BaseProxy: &baseProxy,
cfg: cfg,
}
case *config.SUDPProxyConf:
pxy = &SUDPProxy{
BaseProxy: &baseProxy,
cfg: cfg,
closeCh: make(chan struct{}),
}
}
return factory(&baseProxy, pxyConf)
return
}
type BaseProxy struct {
baseProxyConfig *config.BaseProxyConf
clientCfg config.ClientCommonConf
msgTransporter transport.MessageTransporter
limiter *rate.Limiter
// proxyPlugin is used to handle connections instead of dialing to local service.
// It's only validate for TCP protocol now.
proxyPlugin plugin.Plugin
closed bool
clientCfg config.ClientCommonConf
msgTransporter transport.MessageTransporter
limiter *rate.Limiter
mu sync.RWMutex
xl *xlog.Logger
ctx context.Context
}
func (pxy *BaseProxy) Run() error {
if pxy.baseProxyConfig.Plugin != "" {
p, err := plugin.Create(pxy.baseProxyConfig.Plugin, pxy.baseProxyConfig.PluginParams)
if err != nil {
return err
}
pxy.proxyPlugin = p
}
return nil
// TCP
type TCPProxy struct {
*BaseProxy
cfg *config.TCPProxyConf
proxyPlugin plugin.Plugin
}
func (pxy *BaseProxy) Close() {
func (pxy *TCPProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
func (pxy *TCPProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
func (pxy *BaseProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
pxy.HandleTCPWorkConnection(conn, m, []byte(pxy.clientCfg.Token))
func (pxy *TCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
}
// TCP Multiplexer
type TCPMuxProxy struct {
*BaseProxy
cfg *config.TCPMuxProxyConf
proxyPlugin plugin.Plugin
}
func (pxy *TCPMuxProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
func (pxy *TCPMuxProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
func (pxy *TCPMuxProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
}
// HTTP
type HTTPProxy struct {
*BaseProxy
cfg *config.HTTPProxyConf
proxyPlugin plugin.Plugin
}
func (pxy *HTTPProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
func (pxy *HTTPProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
func (pxy *HTTPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
}
// HTTPS
type HTTPSProxy struct {
*BaseProxy
cfg *config.HTTPSProxyConf
proxyPlugin plugin.Plugin
}
func (pxy *HTTPSProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
func (pxy *HTTPSProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
func (pxy *HTTPSProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
}
// STCP
type STCPProxy struct {
*BaseProxy
cfg *config.STCPProxyConf
proxyPlugin plugin.Plugin
}
func (pxy *STCPProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
func (pxy *STCPProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
func (pxy *STCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
}
// Common handler for tcp work connections.
func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWorkConn, encKey []byte) {
xl := pxy.xl
baseConfig := pxy.baseProxyConfig
func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn,
) {
xl := xlog.FromContextSafe(ctx)
var (
remote io.ReadWriteCloser
err error
)
remote = workConn
if pxy.limiter != nil {
remote = libio.WrapReadWriteCloser(limit.NewReader(workConn, pxy.limiter), limit.NewWriter(workConn, pxy.limiter), func() error {
if limiter != nil {
remote = frpIo.WrapReadWriteCloser(limit.NewReader(workConn, limiter), limit.NewWriter(workConn, limiter), func() error {
return workConn.Close()
})
}
xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t",
baseConfig.UseEncryption, baseConfig.UseCompression)
if baseConfig.UseEncryption {
remote, err = libio.WithEncryption(remote, encKey)
baseInfo.UseEncryption, baseInfo.UseCompression)
if baseInfo.UseEncryption {
remote, err = frpIo.WithEncryption(remote, encKey)
if err != nil {
workConn.Close()
xl.Error("create encryption stream error: %v", err)
return
}
}
if baseConfig.UseCompression {
remote = libio.WithCompression(remote)
if baseInfo.UseCompression {
remote = frpIo.WithCompression(remote)
}
// check if we need to send proxy protocol info
var extraInfo []byte
if baseConfig.ProxyProtocolVersion != "" {
if baseInfo.ProxyProtocolVersion != "" {
if m.SrcAddr != "" && m.SrcPort != 0 {
if m.DstAddr == "" {
m.DstAddr = "127.0.0.1"
@@ -167,9 +319,9 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
h.TransportProtocol = pp.TCPv6
}
if baseConfig.ProxyProtocolVersion == "v1" {
if baseInfo.ProxyProtocolVersion == "v1" {
h.Version = 1
} else if baseConfig.ProxyProtocolVersion == "v2" {
} else if baseInfo.ProxyProtocolVersion == "v2" {
h.Version = 2
}
@@ -179,21 +331,21 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
}
}
if pxy.proxyPlugin != nil {
// if plugin is set, let plugin handle connection first
xl.Debug("handle by plugin: %s", pxy.proxyPlugin.Name())
pxy.proxyPlugin.Handle(remote, workConn, extraInfo)
if proxyPlugin != nil {
// if plugin is set, let plugin handle connections first
xl.Debug("handle by plugin: %s", proxyPlugin.Name())
proxyPlugin.Handle(remote, workConn, extraInfo)
xl.Debug("handle by plugin finished")
return
}
localConn, err := libdial.Dial(
net.JoinHostPort(baseConfig.LocalIP, strconv.Itoa(baseConfig.LocalPort)),
net.JoinHostPort(localInfo.LocalIP, strconv.Itoa(localInfo.LocalPort)),
libdial.WithTimeout(10*time.Second),
)
if err != nil {
workConn.Close()
xl.Error("connect to local service [%s:%d] error: %v", baseConfig.LocalIP, baseConfig.LocalPort, err)
xl.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIP, localInfo.LocalPort, err)
return
}
@@ -208,7 +360,7 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
}
}
_, _, errs := libio.Join(localConn, remote)
_, _, errs := frpIo.Join(localConn, remote)
xl.Debug("join connections closed")
if len(errs) > 0 {
xl.Trace("join connections errors: %v", errs)

View File

@@ -18,7 +18,6 @@ import (
"context"
"fmt"
"net"
"reflect"
"sync"
"github.com/fatedier/frp/client/event"
@@ -122,18 +121,21 @@ func (pm *Manager) Reload(pxyCfgs map[string]config.ProxyConf) {
for name, pxy := range pm.proxies {
del := false
cfg, ok := pxyCfgs[name]
if !ok || !reflect.DeepEqual(pxy.Cfg, cfg) {
if !ok {
del = true
} else if !pxy.Cfg.Compare(cfg) {
del = true
}
if del {
delPxyNames = append(delPxyNames, name)
delete(pm.proxies, name)
pxy.Stop()
}
}
if len(delPxyNames) > 0 {
xl.Info("proxy removed: %s", delPxyNames)
xl.Info("proxy removed: %v", delPxyNames)
}
addPxyNames := make([]string, 0)
@@ -147,6 +149,6 @@ func (pm *Manager) Reload(pxyCfgs map[string]config.ProxyConf) {
}
}
if len(addPxyNames) > 0 {
xl.Info("proxy added: %s", addPxyNames)
xl.Info("proxy added: %v", addPxyNames)
}
}

View File

@@ -91,7 +91,7 @@ func NewWrapper(
eventHandler event.Handler,
msgTransporter transport.MessageTransporter,
) *Wrapper {
baseInfo := cfg.GetBaseConfig()
baseInfo := cfg.GetBaseInfo()
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(baseInfo.ProxyName)
pw := &Wrapper{
WorkingStatus: WorkingStatus{

View File

@@ -17,25 +17,20 @@ package proxy
import (
"io"
"net"
"reflect"
"strconv"
"sync"
"time"
"github.com/fatedier/golib/errors"
libio "github.com/fatedier/golib/io"
frpIo "github.com/fatedier/golib/io"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
func init() {
RegisterProxyFactory(reflect.TypeOf(&config.SUDPProxyConf{}), NewSUDPProxy)
}
type SUDPProxy struct {
*BaseProxy
@@ -46,18 +41,6 @@ type SUDPProxy struct {
closeCh chan struct{}
}
func NewSUDPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
unwrapped, ok := cfg.(*config.SUDPProxyConf)
if !ok {
return nil
}
return &SUDPProxy{
BaseProxy: baseProxy,
cfg: unwrapped,
closeCh: make(chan struct{}),
}
}
func (pxy *SUDPProxy) Run() (err error) {
pxy.localAddr, err = net.ResolveUDPAddr("udp", net.JoinHostPort(pxy.cfg.LocalIP, strconv.Itoa(pxy.cfg.LocalPort)))
if err != nil {
@@ -84,12 +67,12 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
var rwc io.ReadWriteCloser = conn
var err error
if pxy.limiter != nil {
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
rwc = frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
return conn.Close()
})
}
if pxy.cfg.UseEncryption {
rwc, err = libio.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
if err != nil {
conn.Close()
xl.Error("create encryption stream error: %v", err)
@@ -97,9 +80,9 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
}
}
if pxy.cfg.UseCompression {
rwc = libio.WithCompression(rwc)
rwc = frpIo.WithCompression(rwc)
}
conn = utilnet.WrapReadWriteCloserToConn(rwc, conn)
conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
workConn := conn
readCh := make(chan *msg.UDPPacket, 1024)

View File

@@ -17,24 +17,20 @@ package proxy
import (
"io"
"net"
"reflect"
"strconv"
"time"
"github.com/fatedier/golib/errors"
libio "github.com/fatedier/golib/io"
frpIo "github.com/fatedier/golib/io"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
func init() {
RegisterProxyFactory(reflect.TypeOf(&config.UDPProxyConf{}), NewUDPProxy)
}
// UDP
type UDPProxy struct {
*BaseProxy
@@ -46,18 +42,6 @@ type UDPProxy struct {
// include msg.UDPPacket and msg.Ping
sendCh chan msg.Message
workConn net.Conn
closed bool
}
func NewUDPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
unwrapped, ok := cfg.(*config.UDPProxyConf)
if !ok {
return nil
}
return &UDPProxy{
BaseProxy: baseProxy,
cfg: unwrapped,
}
}
func (pxy *UDPProxy) Run() (err error) {
@@ -95,12 +79,12 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
var rwc io.ReadWriteCloser = conn
var err error
if pxy.limiter != nil {
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
rwc = frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
return conn.Close()
})
}
if pxy.cfg.UseEncryption {
rwc, err = libio.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
if err != nil {
conn.Close()
xl.Error("create encryption stream error: %v", err)
@@ -108,9 +92,9 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
}
}
if pxy.cfg.UseCompression {
rwc = libio.WithCompression(rwc)
rwc = frpIo.WithCompression(rwc)
}
conn = utilnet.WrapReadWriteCloserToConn(rwc, conn)
conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
pxy.mu.Lock()
pxy.workConn = conn

View File

@@ -17,7 +17,6 @@ package proxy
import (
"io"
"net"
"reflect"
"time"
fmux "github.com/hashicorp/yamux"
@@ -26,28 +25,32 @@ import (
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/nathole"
plugin "github.com/fatedier/frp/pkg/plugin/client"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
func init() {
RegisterProxyFactory(reflect.TypeOf(&config.XTCPProxyConf{}), NewXTCPProxy)
}
// XTCP
type XTCPProxy struct {
*BaseProxy
cfg *config.XTCPProxyConf
cfg *config.XTCPProxyConf
proxyPlugin plugin.Plugin
}
func NewXTCPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
unwrapped, ok := cfg.(*config.XTCPProxyConf)
if !ok {
return nil
func (pxy *XTCPProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return &XTCPProxy{
BaseProxy: baseProxy,
cfg: unwrapped,
return
}
func (pxy *XTCPProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
@@ -129,7 +132,7 @@ func (pxy *XTCPProxy) listenByKCP(listenConn *net.UDPConn, raddr *net.UDPAddr, s
}
defer lConn.Close()
remote, err := utilnet.NewKCPConnFromUDP(lConn, true, raddr.String())
remote, err := frpNet.NewKCPConnFromUDP(lConn, true, raddr.String())
if err != nil {
xl.Warn("create kcp connection from udp connection error: %v", err)
return
@@ -137,7 +140,7 @@ func (pxy *XTCPProxy) listenByKCP(listenConn *net.UDPConn, raddr *net.UDPAddr, s
fmuxCfg := fmux.DefaultConfig()
fmuxCfg.KeepAliveInterval = 10 * time.Second
fmuxCfg.MaxStreamWindowSize = 6 * 1024 * 1024
fmuxCfg.MaxStreamWindowSize = 2 * 1024 * 1024
fmuxCfg.LogOutput = io.Discard
session, err := fmux.Server(remote, fmuxCfg)
if err != nil {
@@ -152,7 +155,8 @@ func (pxy *XTCPProxy) listenByKCP(listenConn *net.UDPConn, raddr *net.UDPAddr, s
xl.Error("accept connection error: %v", err)
return
}
go pxy.HandleTCPWorkConnection(muxConn, startWorkConnMsg, []byte(pxy.cfg.Sk))
go HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
muxConn, []byte(pxy.cfg.Sk), startWorkConnMsg)
}
}
@@ -190,6 +194,7 @@ func (pxy *XTCPProxy) listenByQUIC(listenConn *net.UDPConn, _ *net.UDPAddr, star
_ = c.CloseWithError(0, "")
return
}
go pxy.HandleTCPWorkConnection(utilnet.QuicStreamToNetConn(stream, c), startWorkConnMsg, []byte(pxy.cfg.Sk))
go HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
frpNet.QuicStreamToNetConn(stream, c), []byte(pxy.cfg.Sk), startWorkConnMsg)
}
}

View File

@@ -39,7 +39,7 @@ import (
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/log"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/version"
"github.com/fatedier/frp/pkg/util/xlog"
@@ -369,8 +369,7 @@ func (cm *ConnectionManager) OpenConnection() error {
}
tlsConfig.NextProtos = []string{"frp"}
conn, err := quic.DialAddrContext(
cm.ctx,
conn, err := quic.DialAddr(
net.JoinHostPort(cm.cfg.ServerAddr, strconv.Itoa(cm.cfg.ServerPort)),
tlsConfig, &quic.Config{
MaxIdleTimeout: time.Duration(cm.cfg.QUICMaxIdleTimeout) * time.Second,
@@ -396,7 +395,6 @@ func (cm *ConnectionManager) OpenConnection() error {
fmuxCfg := fmux.DefaultConfig()
fmuxCfg.KeepAliveInterval = time.Duration(cm.cfg.TCPMuxKeepaliveInterval) * time.Second
fmuxCfg.LogOutput = io.Discard
fmuxCfg.MaxStreamWindowSize = 6 * 1024 * 1024
session, err := fmux.Client(conn, fmuxCfg)
if err != nil {
return err
@@ -411,7 +409,7 @@ func (cm *ConnectionManager) Connect() (net.Conn, error) {
if err != nil {
return nil, err
}
return utilnet.QuicStreamToNetConn(stream, cm.quicConn), nil
return frpNet.QuicStreamToNetConn(stream, cm.quicConn), nil
} else if cm.muxSession != nil {
stream, err := cm.muxSession.OpenStream()
if err != nil {
@@ -453,7 +451,7 @@ func (cm *ConnectionManager) realConnect() (net.Conn, error) {
protocol := cm.cfg.Protocol
if protocol == "websocket" {
protocol = "tcp"
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{Hook: utilnet.DialHookWebsocket()}))
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{Hook: frpNet.DialHookWebsocket()}))
}
if cm.cfg.ConnectServerLocalIP != "" {
dialOptions = append(dialOptions, libdial.WithLocalAddr(cm.cfg.ConnectServerLocalIP))
@@ -466,11 +464,10 @@ func (cm *ConnectionManager) realConnect() (net.Conn, error) {
libdial.WithProxyAuth(auth),
libdial.WithTLSConfig(tlsConfig),
libdial.WithAfterHook(libdial.AfterHook{
Hook: utilnet.DialHookCustomTLSHeadByte(tlsConfig != nil, cm.cfg.DisableCustomTLSFirstByte),
Hook: frpNet.DialHookCustomTLSHeadByte(tlsConfig != nil, cm.cfg.DisableCustomTLSFirstByte),
}),
)
conn, err := libdial.DialContext(
cm.ctx,
conn, err := libdial.Dial(
net.JoinHostPort(cm.cfg.ServerAddr, strconv.Itoa(cm.cfg.ServerPort)),
dialOptions...,
)

View File

@@ -20,7 +20,7 @@ import (
"strconv"
"time"
libio "github.com/fatedier/golib/io"
frpIo "github.com/fatedier/golib/io"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
@@ -35,20 +35,17 @@ type STCPVisitor struct {
}
func (sv *STCPVisitor) Run() (err error) {
if sv.cfg.BindPort > 0 {
sv.l, err = net.Listen("tcp", net.JoinHostPort(sv.cfg.BindAddr, strconv.Itoa(sv.cfg.BindPort)))
if err != nil {
return
}
go sv.worker()
sv.l, err = net.Listen("tcp", net.JoinHostPort(sv.cfg.BindAddr, strconv.Itoa(sv.cfg.BindPort)))
if err != nil {
return
}
go sv.internalConnWorker()
go sv.worker()
return
}
func (sv *STCPVisitor) Close() {
sv.BaseVisitor.Close()
sv.l.Close()
}
func (sv *STCPVisitor) worker() {
@@ -59,18 +56,7 @@ func (sv *STCPVisitor) worker() {
xl.Warn("stcp local listener closed")
return
}
go sv.handleConn(conn)
}
}
func (sv *STCPVisitor) internalConnWorker() {
xl := xlog.FromContextSafe(sv.ctx)
for {
conn, err := sv.internalLn.Accept()
if err != nil {
xl.Warn("stcp internal listener closed")
return
}
go sv.handleConn(conn)
}
}
@@ -80,7 +66,7 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
defer userConn.Close()
xl.Debug("get a new stcp user connection")
visitorConn, err := sv.helper.ConnectServer()
visitorConn, err := sv.connectServer()
if err != nil {
return
}
@@ -88,7 +74,6 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
now := time.Now().Unix()
newVisitorConnMsg := &msg.NewVisitorConn{
RunID: sv.helper.RunID(),
ProxyName: sv.cfg.ServerName,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
Timestamp: now,
@@ -118,7 +103,7 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
var remote io.ReadWriteCloser
remote = visitorConn
if sv.cfg.UseEncryption {
remote, err = libio.WithEncryption(remote, []byte(sv.cfg.Sk))
remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk))
if err != nil {
xl.Error("create encryption stream error: %v", err)
return
@@ -126,8 +111,8 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
}
if sv.cfg.UseCompression {
remote = libio.WithCompression(remote)
remote = frpIo.WithCompression(remote)
}
libio.Join(userConn, remote)
frpIo.Join(userConn, remote)
}

View File

@@ -23,12 +23,12 @@ import (
"time"
"github.com/fatedier/golib/errors"
libio "github.com/fatedier/golib/io"
frpIo "github.com/fatedier/golib/io"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/xlog"
)
@@ -199,14 +199,13 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
xl := xlog.FromContextSafe(sv.ctx)
visitorConn, err := sv.helper.ConnectServer()
visitorConn, err := sv.connectServer()
if err != nil {
return nil, fmt.Errorf("frpc connect frps error: %v", err)
}
now := time.Now().Unix()
newVisitorConnMsg := &msg.NewVisitorConn{
RunID: sv.helper.RunID(),
ProxyName: sv.cfg.ServerName,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
Timestamp: now,
@@ -233,16 +232,16 @@ func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
var remote io.ReadWriteCloser
remote = visitorConn
if sv.cfg.UseEncryption {
remote, err = libio.WithEncryption(remote, []byte(sv.cfg.Sk))
remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk))
if err != nil {
xl.Error("create encryption stream error: %v", err)
return nil, err
}
}
if sv.cfg.UseCompression {
remote = libio.WithCompression(remote)
remote = frpIo.WithCompression(remote)
}
return utilnet.WrapReadWriteCloserToConn(remote, visitorConn), nil
return frpNet.WrapReadWriteCloserToConn(remote, visitorConn), nil
}
func (sv *SUDPVisitor) Close() {
@@ -255,7 +254,6 @@ func (sv *SUDPVisitor) Close() {
default:
close(sv.checkCloseCh)
}
sv.BaseVisitor.Close()
if sv.udpConn != nil {
sv.udpConn.Close()
}

View File

@@ -21,27 +21,12 @@ import (
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
)
// Helper wrapps some functions for visitor to use.
type Helper interface {
// ConnectServer directly connects to the frp server.
ConnectServer() (net.Conn, error)
// TransferConn transfers the connection to another visitor.
TransferConn(string, net.Conn) error
// MsgTransporter returns the message transporter that is used to send and receive messages
// to the frp server through the controller.
MsgTransporter() transport.MessageTransporter
// RunID returns the run id of current controller.
RunID() string
}
// Visitor is used for forward traffics from local port tot remote service.
type Visitor interface {
Run() error
AcceptConn(conn net.Conn) error
Close()
}
@@ -49,14 +34,15 @@ func NewVisitor(
ctx context.Context,
cfg config.VisitorConf,
clientCfg config.ClientCommonConf,
helper Helper,
connectServer func() (net.Conn, error),
msgTransporter transport.MessageTransporter,
) (visitor Visitor) {
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(cfg.GetBaseConfig().ProxyName)
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(cfg.GetBaseInfo().ProxyName)
baseVisitor := BaseVisitor{
clientCfg: clientCfg,
helper: helper,
ctx: xlog.NewContext(ctx, xl),
internalLn: utilnet.NewInternalListener(),
clientCfg: clientCfg,
connectServer: connectServer,
msgTransporter: msgTransporter,
ctx: xlog.NewContext(ctx, xl),
}
switch cfg := cfg.(type) {
case *config.STCPVisitorConf:
@@ -81,24 +67,11 @@ func NewVisitor(
}
type BaseVisitor struct {
clientCfg config.ClientCommonConf
helper Helper
l net.Listener
internalLn *utilnet.InternalListener
clientCfg config.ClientCommonConf
connectServer func() (net.Conn, error)
msgTransporter transport.MessageTransporter
l net.Listener
mu sync.RWMutex
ctx context.Context
}
func (v *BaseVisitor) AcceptConn(conn net.Conn) error {
return v.internalLn.PutConn(conn)
}
func (v *BaseVisitor) Close() {
if v.l != nil {
v.l.Close()
}
if v.internalLn != nil {
v.internalLn.Close()
}
}

View File

@@ -16,9 +16,7 @@ package visitor
import (
"context"
"fmt"
"net"
"reflect"
"sync"
"time"
@@ -28,14 +26,15 @@ import (
)
type Manager struct {
clientCfg config.ClientCommonConf
cfgs map[string]config.VisitorConf
visitors map[string]Visitor
helper Helper
clientCfg config.ClientCommonConf
connectServer func() (net.Conn, error)
msgTransporter transport.MessageTransporter
cfgs map[string]config.VisitorConf
visitors map[string]Visitor
checkInterval time.Duration
mu sync.RWMutex
mu sync.Mutex
ctx context.Context
stopCh chan struct{}
@@ -43,26 +42,20 @@ type Manager struct {
func NewManager(
ctx context.Context,
runID string,
clientCfg config.ClientCommonConf,
connectServer func() (net.Conn, error),
msgTransporter transport.MessageTransporter,
) *Manager {
m := &Manager{
clientCfg: clientCfg,
cfgs: make(map[string]config.VisitorConf),
visitors: make(map[string]Visitor),
checkInterval: 10 * time.Second,
ctx: ctx,
stopCh: make(chan struct{}),
return &Manager{
clientCfg: clientCfg,
connectServer: connectServer,
msgTransporter: msgTransporter,
cfgs: make(map[string]config.VisitorConf),
visitors: make(map[string]Visitor),
checkInterval: 10 * time.Second,
ctx: ctx,
stopCh: make(chan struct{}),
}
m.helper = &visitorHelperImpl{
connectServerFn: connectServer,
msgTransporter: msgTransporter,
transferConnFn: m.TransferConn,
runID: runID,
}
return m
}
func (vm *Manager) Run() {
@@ -79,7 +72,7 @@ func (vm *Manager) Run() {
case <-ticker.C:
vm.mu.Lock()
for _, cfg := range vm.cfgs {
name := cfg.GetBaseConfig().ProxyName
name := cfg.GetBaseInfo().ProxyName
if _, exist := vm.visitors[name]; !exist {
xl.Info("try to start visitor [%s]", name)
_ = vm.startVisitor(cfg)
@@ -90,24 +83,11 @@ func (vm *Manager) Run() {
}
}
func (vm *Manager) Close() {
vm.mu.Lock()
defer vm.mu.Unlock()
for _, v := range vm.visitors {
v.Close()
}
select {
case <-vm.stopCh:
default:
close(vm.stopCh)
}
}
// Hold lock before calling this function.
func (vm *Manager) startVisitor(cfg config.VisitorConf) (err error) {
xl := xlog.FromContextSafe(vm.ctx)
name := cfg.GetBaseConfig().ProxyName
visitor := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.helper)
name := cfg.GetBaseInfo().ProxyName
visitor := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.connectServer, vm.msgTransporter)
err = visitor.Run()
if err != nil {
xl.Warn("start error: %v", err)
@@ -127,7 +107,9 @@ func (vm *Manager) Reload(cfgs map[string]config.VisitorConf) {
for name, oldCfg := range vm.cfgs {
del := false
cfg, ok := cfgs[name]
if !ok || !reflect.DeepEqual(oldCfg, cfg) {
if !ok {
del = true
} else if !oldCfg.Compare(cfg) {
del = true
}
@@ -157,36 +139,15 @@ func (vm *Manager) Reload(cfgs map[string]config.VisitorConf) {
}
}
// TransferConn transfers a connection to a visitor.
func (vm *Manager) TransferConn(name string, conn net.Conn) error {
vm.mu.RLock()
defer vm.mu.RUnlock()
v, ok := vm.visitors[name]
if !ok {
return fmt.Errorf("visitor [%s] not found", name)
func (vm *Manager) Close() {
vm.mu.Lock()
defer vm.mu.Unlock()
for _, v := range vm.visitors {
v.Close()
}
select {
case <-vm.stopCh:
default:
close(vm.stopCh)
}
return v.AcceptConn(conn)
}
type visitorHelperImpl struct {
connectServerFn func() (net.Conn, error)
msgTransporter transport.MessageTransporter
transferConnFn func(name string, conn net.Conn) error
runID string
}
func (v *visitorHelperImpl) ConnectServer() (net.Conn, error) {
return v.connectServerFn()
}
func (v *visitorHelperImpl) TransferConn(name string, conn net.Conn) error {
return v.transferConnFn(name, conn)
}
func (v *visitorHelperImpl) MsgTransporter() transport.MessageTransporter {
return v.msgTransporter
}
func (v *visitorHelperImpl) RunID() string {
return v.runID
}

View File

@@ -24,7 +24,7 @@ import (
"sync"
"time"
libio "github.com/fatedier/golib/io"
frpIo "github.com/fatedier/golib/io"
fmux "github.com/hashicorp/yamux"
quic "github.com/quic-go/quic-go"
"golang.org/x/time/rate"
@@ -33,7 +33,7 @@ import (
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/nathole"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/xlog"
)
@@ -59,15 +59,12 @@ func (sv *XTCPVisitor) Run() (err error) {
sv.session = NewQUICTunnelSession(&sv.clientCfg)
}
if sv.cfg.BindPort > 0 {
sv.l, err = net.Listen("tcp", net.JoinHostPort(sv.cfg.BindAddr, strconv.Itoa(sv.cfg.BindPort)))
if err != nil {
return
}
go sv.worker()
sv.l, err = net.Listen("tcp", net.JoinHostPort(sv.cfg.BindAddr, strconv.Itoa(sv.cfg.BindPort)))
if err != nil {
return
}
go sv.internalConnWorker()
go sv.worker()
go sv.processTunnelStartEvents()
if sv.cfg.KeepTunnelOpen {
sv.retryLimiter = rate.NewLimiter(rate.Every(time.Hour/time.Duration(sv.cfg.MaxRetriesAnHour)), sv.cfg.MaxRetriesAnHour)
@@ -77,12 +74,8 @@ func (sv *XTCPVisitor) Run() (err error) {
}
func (sv *XTCPVisitor) Close() {
sv.mu.Lock()
defer sv.mu.Unlock()
sv.BaseVisitor.Close()
if sv.cancel != nil {
sv.cancel()
}
sv.l.Close()
sv.cancel()
if sv.session != nil {
sv.session.Close()
}
@@ -96,18 +89,7 @@ func (sv *XTCPVisitor) worker() {
xl.Warn("xtcp local listener closed")
return
}
go sv.handleConn(conn)
}
}
func (sv *XTCPVisitor) internalConnWorker() {
xl := xlog.FromContextSafe(sv.ctx)
for {
conn, err := sv.internalLn.Accept()
if err != nil {
xl.Warn("xtcp internal listener closed")
return
}
go sv.handleConn(conn)
}
}
@@ -157,53 +139,31 @@ func (sv *XTCPVisitor) keepTunnelOpenWorker() {
func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
xl := xlog.FromContextSafe(sv.ctx)
isConnTrasfered := false
defer func() {
if !isConnTrasfered {
userConn.Close()
}
}()
defer userConn.Close()
xl.Debug("get a new xtcp user connection")
// Open a tunnel connection to the server. If there is already a successful hole-punching connection,
// it will be reused. Otherwise, it will block and wait for a successful hole-punching connection until timeout.
ctx := context.Background()
if sv.cfg.FallbackTo != "" {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(sv.cfg.FallbackTimeoutMs)*time.Millisecond)
defer cancel()
ctx = timeoutCtx
}
tunnelConn, err := sv.openTunnel(ctx)
tunnelConn, err := sv.openTunnel()
if err != nil {
xl.Error("open tunnel error: %v", err)
// no fallback, just return
if sv.cfg.FallbackTo == "" {
return
}
xl.Debug("try to transfer connection to visitor: %s", sv.cfg.FallbackTo)
if err := sv.helper.TransferConn(sv.cfg.FallbackTo, userConn); err != nil {
xl.Error("transfer connection to visitor %s error: %v", sv.cfg.FallbackTo, err)
return
}
isConnTrasfered = true
return
}
var muxConnRWCloser io.ReadWriteCloser = tunnelConn
if sv.cfg.UseEncryption {
muxConnRWCloser, err = libio.WithEncryption(muxConnRWCloser, []byte(sv.cfg.Sk))
muxConnRWCloser, err = frpIo.WithEncryption(muxConnRWCloser, []byte(sv.cfg.Sk))
if err != nil {
xl.Error("create encryption stream error: %v", err)
return
}
}
if sv.cfg.UseCompression {
muxConnRWCloser = libio.WithCompression(muxConnRWCloser)
muxConnRWCloser = frpIo.WithCompression(muxConnRWCloser)
}
_, _, errs := libio.Join(userConn, muxConnRWCloser)
_, _, errs := frpIo.Join(userConn, muxConnRWCloser)
xl.Debug("join connections closed")
if len(errs) > 0 {
xl.Trace("join connections errors: %v", errs)
@@ -211,7 +171,7 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
}
// openTunnel will open a tunnel connection to the target server.
func (sv *XTCPVisitor) openTunnel(ctx context.Context) (conn net.Conn, err error) {
func (sv *XTCPVisitor) openTunnel() (conn net.Conn, err error) {
xl := xlog.FromContextSafe(sv.ctx)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
@@ -225,8 +185,6 @@ func (sv *XTCPVisitor) openTunnel(ctx context.Context) (conn net.Conn, err error
select {
case <-sv.ctx.Done():
return nil, sv.ctx.Err()
case <-ctx.Done():
return nil, ctx.Err()
case <-immediateTrigger:
conn, err = sv.getTunnelConn()
case <-ticker.C:
@@ -266,7 +224,7 @@ func (sv *XTCPVisitor) getTunnelConn() (net.Conn, error) {
// 4. Create a tunnel session using an underlying UDP connection.
func (sv *XTCPVisitor) makeNatHole() {
xl := xlog.FromContextSafe(sv.ctx)
if err := nathole.PreCheck(sv.ctx, sv.helper.MsgTransporter(), sv.cfg.ServerName, 5*time.Second); err != nil {
if err := nathole.PreCheck(sv.ctx, sv.msgTransporter, sv.cfg.ServerName, 5*time.Second); err != nil {
xl.Warn("nathole precheck error: %v", err)
return
}
@@ -294,7 +252,7 @@ func (sv *XTCPVisitor) makeNatHole() {
AssistedAddrs: prepareResult.AssistedAddrs,
}
natHoleRespMsg, err := nathole.ExchangeInfo(sv.ctx, sv.helper.MsgTransporter(), transactionID, natHoleVisitorMsg, 5*time.Second)
natHoleRespMsg, err := nathole.ExchangeInfo(sv.ctx, sv.msgTransporter, transactionID, natHoleVisitorMsg, 5*time.Second)
if err != nil {
listenConn.Close()
xl.Warn("nathole exchange info error: %v", err)
@@ -344,14 +302,14 @@ func (ks *KCPTunnelSession) Init(listenConn *net.UDPConn, raddr *net.UDPAddr) er
if err != nil {
return fmt.Errorf("dial udp error: %v", err)
}
remote, err := utilnet.NewKCPConnFromUDP(lConn, true, raddr.String())
remote, err := frpNet.NewKCPConnFromUDP(lConn, true, raddr.String())
if err != nil {
return fmt.Errorf("create kcp connection from udp connection error: %v", err)
}
fmuxCfg := fmux.DefaultConfig()
fmuxCfg.KeepAliveInterval = 10 * time.Second
fmuxCfg.MaxStreamWindowSize = 6 * 1024 * 1024
fmuxCfg.MaxStreamWindowSize = 2 * 1024 * 1024
fmuxCfg.LogOutput = io.Discard
session, err := fmux.Client(remote, fmuxCfg)
if err != nil {
@@ -435,7 +393,7 @@ func (qs *QUICTunnelSession) OpenConn(ctx context.Context) (net.Conn, error) {
if err != nil {
return nil, err
}
return utilnet.QuicStreamToNetConn(stream, session), nil
return frpNet.QuicStreamToNetConn(stream, session), nil
}
func (qs *QUICTunnelSession) Close() {

View File

@@ -79,7 +79,7 @@ var httpCmd = &cobra.Command{
}
cfg.BandwidthLimitMode = bandwidthLimitMode
err = cfg.ValidateForClient()
err = cfg.CheckForCli()
if err != nil {
fmt.Println(err)
os.Exit(1)

View File

@@ -71,7 +71,7 @@ var httpsCmd = &cobra.Command{
}
cfg.BandwidthLimitMode = bandwidthLimitMode
err = cfg.ValidateForClient()
err = cfg.CheckForCli()
if err != nil {
fmt.Println(err)
os.Exit(1)

View File

@@ -117,6 +117,7 @@ var rootCmd = &cobra.Command{
// Do not show command usage here.
err := runClient(cfgFile)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
return nil
@@ -198,7 +199,6 @@ func parseClientCommonCfgFromCmd() (cfg config.ClientCommonConf, err error) {
func runClient(cfgFilePath string) error {
cfg, pxyCfgs, visitorCfgs, err := config.ParseClientConfig(cfgFilePath)
if err != nil {
fmt.Println(err)
return err
}
return startService(cfg, pxyCfgs, visitorCfgs, cfgFilePath)
@@ -214,8 +214,8 @@ func startService(
cfg.LogMaxDays, cfg.DisableLogColor)
if cfgFile != "" {
log.Info("start frpc service for config file [%s]", cfgFile)
defer log.Info("frpc service for config file [%s] stopped", cfgFile)
log.Trace("start frpc service for config file [%s]", cfgFile)
defer log.Trace("frpc service for config file [%s] stopped", cfgFile)
}
svr, errRet := client.NewService(cfg, pxyCfgs, visitorCfgs, cfgFile)
if errRet != nil {

View File

@@ -78,7 +78,7 @@ var stcpCmd = &cobra.Command{
os.Exit(1)
}
cfg.BandwidthLimitMode = bandwidthLimitMode
err = cfg.ValidateForClient()
err = cfg.CheckForCli()
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -95,7 +95,7 @@ var stcpCmd = &cobra.Command{
cfg.ServerName = serverName
cfg.BindAddr = bindAddr
cfg.BindPort = bindPort
err = cfg.Validate()
err = cfg.Check()
if err != nil {
fmt.Println(err)
os.Exit(1)

View File

@@ -78,7 +78,7 @@ var sudpCmd = &cobra.Command{
os.Exit(1)
}
cfg.BandwidthLimitMode = bandwidthLimitMode
err = cfg.ValidateForClient()
err = cfg.CheckForCli()
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -95,7 +95,7 @@ var sudpCmd = &cobra.Command{
cfg.ServerName = serverName
cfg.BindAddr = bindAddr
cfg.BindPort = bindPort
err = cfg.Validate()
err = cfg.Check()
if err != nil {
fmt.Println(err)
os.Exit(1)

View File

@@ -68,7 +68,7 @@ var tcpCmd = &cobra.Command{
}
cfg.BandwidthLimitMode = bandwidthLimitMode
err = cfg.ValidateForClient()
err = cfg.CheckForCli()
if err != nil {
fmt.Println(err)
os.Exit(1)

View File

@@ -73,7 +73,7 @@ var tcpMuxCmd = &cobra.Command{
}
cfg.BandwidthLimitMode = bandwidthLimitMode
err = cfg.ValidateForClient()
err = cfg.CheckForCli()
if err != nil {
fmt.Println(err)
os.Exit(1)

View File

@@ -68,7 +68,7 @@ var udpCmd = &cobra.Command{
}
cfg.BandwidthLimitMode = bandwidthLimitMode
err = cfg.ValidateForClient()
err = cfg.CheckForCli()
if err != nil {
fmt.Println(err)
os.Exit(1)

View File

@@ -78,7 +78,7 @@ var xtcpCmd = &cobra.Command{
os.Exit(1)
}
cfg.BandwidthLimitMode = bandwidthLimitMode
err = cfg.ValidateForClient()
err = cfg.CheckForCli()
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -95,7 +95,7 @@ var xtcpCmd = &cobra.Command{
cfg.ServerName = serverName
cfg.BindAddr = bindAddr
cfg.BindPort = bindPort
err = cfg.Validate()
err = cfg.Check()
if err != nil {
fmt.Println(err)
os.Exit(1)

View File

@@ -326,9 +326,6 @@ local_ip = 127.0.0.1
local_port = 22
use_encryption = false
use_compression = false
# If not empty, only visitors from specified users can connect.
# Otherwise, visitors from same user can connect. '*' means allow all users.
allow_users = *
# user of frpc should be same in both stcp server and stcp visitor
[secret_tcp_visitor]
@@ -340,8 +337,6 @@ server_name = secret_tcp
sk = abcdefg
# connect this address to visitor stcp server
bind_addr = 127.0.0.1
# bind_port can be less than 0, it means don't bind to the port and only receive connections redirected from
# other visitors. (This is not supported for SUDP now)
bind_port = 9000
use_encryption = false
use_compression = false
@@ -353,20 +348,13 @@ local_ip = 127.0.0.1
local_port = 22
use_encryption = false
use_compression = false
# If not empty, only visitors from specified users can connect.
# Otherwise, visitors from same user can connect. '*' means allow all users.
allow_users = user1, user2
[p2p_tcp_visitor]
role = visitor
type = xtcp
# if the server user is not set, it defaults to the current user
server_user = user1
server_name = p2p_tcp
sk = abcdefg
bind_addr = 127.0.0.1
# bind_port can be less than 0, it means don't bind to the port and only receive connections redirected from
# other visitors. (This is not supported for SUDP now)
bind_port = 9001
use_encryption = false
use_compression = false
@@ -375,8 +363,6 @@ keep_tunnel_open = false
# effective when keep_tunnel_open is set to true, the number of attempts to punch through per hour
max_retries_an_hour = 8
min_retry_interval = 90
# fallback_to = stcp_visitor
# fallback_timeout_ms = 500
[tcpmuxhttpconnect]
type = tcpmux

View File

@@ -1,63 +0,0 @@
#!/bin/sh
OS="$(go env GOOS)"
ARCH="$(go env GOARCH)"
if [ "${TARGET_OS}" ]; then
OS="${TARGET_OS}"
fi
if [ "${TARGET_ARCH}" ]; then
ARCH="${TARGET_ARCH}"
fi
# Determine the latest version by version number ignoring alpha, beta, and rc versions.
if [ "${FRP_VERSION}" = "" ] ; then
FRP_VERSION="$(curl -sL https://github.com/fatedier/frp/releases | \
grep -o 'releases/tag/v[0-9]*.[0-9]*.[0-9]*"' | sort -V | \
tail -1 | awk -F'/' '{ print $3}')"
FRP_VERSION="${FRP_VERSION%?}"
FRP_VERSION="${FRP_VERSION#?}"
fi
if [ "${FRP_VERSION}" = "" ] ; then
printf "Unable to get latest frp version. Set FRP_VERSION env var and re-run. For example: export FRP_VERSION=1.0.0"
exit 1;
fi
SUFFIX=".tar.gz"
if [ "${OS}" = "windows" ] ; then
SUFFIX=".zip"
fi
NAME="frp_${FRP_VERSION}_${OS}_${ARCH}${SUFFIX}"
DIR_NAME="frp_${FRP_VERSION}_${OS}_${ARCH}"
URL="https://github.com/fatedier/frp/releases/download/v${FRP_VERSION}/${NAME}"
download_and_extract() {
printf "Downloading %s from %s ...\n" "$NAME" "${URL}"
if ! curl -o /dev/null -sIf "${URL}"; then
printf "\n%s is not found, please specify a valid FRP_VERSION\n" "${URL}"
exit 1
fi
curl -fsLO "${URL}"
filename=$NAME
if [ "${OS}" = "windows" ]; then
unzip "${filename}"
else
tar -xzf "${filename}"
fi
rm "${filename}"
if [ "${TARGET_DIRNAME}" ]; then
mv "${DIR_NAME}" "${TARGET_DIRNAME}"
DIR_NAME="${TARGET_DIRNAME}"
fi
}
download_and_extract
printf ""
printf "\nfrp %s Download Complete!\n" "$FRP_VERSION"
printf "\n"
printf "frp has been successfully downloaded into the %s folder on your system.\n" "$DIR_NAME"
printf "\n"

View File

@@ -1,30 +1,20 @@
#!/bin/sh
#!/usr/bin/env bash
SCRIPT=$(readlink -f "$0")
ROOT=$(unset CDPATH && cd "$(dirname "$SCRIPT")/.." && pwd)
ROOT=$(unset CDPATH && cd $(dirname "${BASH_SOURCE[0]}")/.. && pwd)
ginkgo_command=$(which ginkgo 2>/dev/null)
if [ -z "$ginkgo_command" ]; then
which ginkgo &> /dev/null
if [ $? -ne 0 ]; then
echo "ginkgo not found, try to install..."
go install github.com/onsi/ginkgo/v2/ginkgo@v2.8.3
fi
debug=false
if [ "x${DEBUG}" = "xtrue" ]; then
if [ x${DEBUG} == x"true" ]; then
debug=true
fi
logLevel=debug
if [ "${LOG_LEVEL}" ]; then
logLevel="${LOG_LEVEL}"
if [ x${LOG_LEVEL} != x"" ]; then
logLevel=${LOG_LEVEL}
fi
frpcPath=${ROOT}/bin/frpc
if [ "${FRPC_PATH}" ]; then
frpcPath="${FRPC_PATH}"
fi
frpsPath=${ROOT}/bin/frps
if [ "${FRPS_PATH}" ]; then
frpsPath="${FRPS_PATH}"
fi
ginkgo -nodes=8 --poll-progress-after=60s ${ROOT}/test/e2e -- -frpc-path=${frpcPath} -frps-path=${frpsPath} -log-level=${logLevel} -debug=${debug}
ginkgo -nodes=8 --poll-progress-after=30s ${ROOT}/test/e2e -- -frpc-path=${ROOT}/bin/frpc -frps-path=${ROOT}/bin/frps -log-level=${logLevel} -debug=${debug}

View File

@@ -352,7 +352,7 @@ func LoadAllProxyConfsFromIni(
case "visitor":
newConf, newErr := NewVisitorConfFromIni(prefix, name, section)
if newErr != nil {
return nil, nil, fmt.Errorf("failed to parse visitor %s, err: %v", name, newErr)
return nil, nil, newErr
}
visitorConfs[prefix+name] = newConf
default:

View File

@@ -500,10 +500,8 @@ func Test_LoadClientBasicConf(t *testing.T) {
},
BandwidthLimitMode: BandwidthLimitModeClient,
},
RoleServerCommonConf: RoleServerCommonConf{
Role: "server",
Sk: "abcdefg",
},
Role: "server",
Sk: "abcdefg",
},
testUser + ".p2p_tcp": &XTCPProxyConf{
BaseProxyConf: BaseProxyConf{
@@ -515,10 +513,8 @@ func Test_LoadClientBasicConf(t *testing.T) {
},
BandwidthLimitMode: BandwidthLimitModeClient,
},
RoleServerCommonConf: RoleServerCommonConf{
Role: "server",
Sk: "abcdefg",
},
Role: "server",
Sk: "abcdefg",
},
testUser + ".tcpmuxhttpconnect": &TCPMuxProxyConf{
BaseProxyConf: BaseProxyConf{
@@ -665,10 +661,9 @@ func Test_LoadClientBasicConf(t *testing.T) {
BindAddr: "127.0.0.1",
BindPort: 9001,
},
Protocol: "quic",
MaxRetriesAnHour: 8,
MinRetryInterval: 90,
FallbackTimeoutMs: 1000,
Protocol: "quic",
MaxRetriesAnHour: 8,
MinRetryInterval: 90,
},
}

File diff suppressed because it is too large Load Diff

View File

@@ -254,10 +254,8 @@ func Test_Proxy_UnmarshalFromIni(t *testing.T) {
},
BandwidthLimitMode: BandwidthLimitModeClient,
},
RoleServerCommonConf: RoleServerCommonConf{
Role: "server",
Sk: "abcdefg",
},
Role: "server",
Sk: "abcdefg",
},
},
{
@@ -281,10 +279,8 @@ func Test_Proxy_UnmarshalFromIni(t *testing.T) {
},
BandwidthLimitMode: BandwidthLimitModeClient,
},
RoleServerCommonConf: RoleServerCommonConf{
Role: "server",
Sk: "abcdefg",
},
Role: "server",
Sk: "abcdefg",
},
},
{

View File

@@ -36,6 +36,7 @@ func Test_LoadServerCommonConf(t *testing.T) {
[common]
bind_addr = 0.0.0.9
bind_port = 7009
bind_udp_port = 7008
kcp_bind_port = 7007
proxy_bind_addr = 127.0.0.9
vhost_http_port = 89
@@ -169,6 +170,7 @@ func Test_LoadServerCommonConf(t *testing.T) {
[common]
bind_addr = 0.0.0.9
bind_port = 7009
bind_udp_port = 7008
`),
expected: ServerCommonConf{
ServerConfig: auth.ServerConfig{

View File

@@ -34,12 +34,10 @@ var (
)
type VisitorConf interface {
// GetBaseConfig returns the base config of visitor.
GetBaseConfig() *BaseVisitorConf
// UnmarshalFromIni unmarshals config from ini.
GetBaseInfo() *BaseVisitorConf
Compare(cmp VisitorConf) bool
UnmarshalFromIni(prefix string, name string, section *ini.Section) error
// Validate validates config.
Validate() error
Check() error
}
type BaseVisitorConf struct {
@@ -49,14 +47,9 @@ type BaseVisitorConf struct {
UseCompression bool `ini:"use_compression" json:"use_compression"`
Role string `ini:"role" json:"role"`
Sk string `ini:"sk" json:"sk"`
// if the server user is not set, it defaults to the current user
ServerUser string `ini:"server_user" json:"server_user"`
ServerName string `ini:"server_name" json:"server_name"`
BindAddr string `ini:"bind_addr" json:"bind_addr"`
// BindPort is the port that visitor listens on.
// It can be less than 0, it means don't bind to the port and only receive connections redirected from
// other visitors. (This is not supported for SUDP now)
BindPort int `ini:"bind_port" json:"bind_port"`
ServerName string `ini:"server_name" json:"server_name"`
BindAddr string `ini:"bind_addr" json:"bind_addr"`
BindPort int `ini:"bind_port" json:"bind_port"`
}
type SUDPVisitorConf struct {
@@ -70,12 +63,10 @@ type STCPVisitorConf struct {
type XTCPVisitorConf struct {
BaseVisitorConf `ini:",extends"`
Protocol string `ini:"protocol" json:"protocol,omitempty"`
KeepTunnelOpen bool `ini:"keep_tunnel_open" json:"keep_tunnel_open,omitempty"`
MaxRetriesAnHour int `ini:"max_retries_an_hour" json:"max_retries_an_hour,omitempty"`
MinRetryInterval int `ini:"min_retry_interval" json:"min_retry_interval,omitempty"`
FallbackTo string `ini:"fallback_to" json:"fallback_to,omitempty"`
FallbackTimeoutMs int `ini:"fallback_timeout_ms" json:"fallback_timeout_ms,omitempty"`
Protocol string `ini:"protocol" json:"protocol,omitempty"`
KeepTunnelOpen bool `ini:"keep_tunnel_open" json:"keep_tunnel_open,omitempty"`
MaxRetriesAnHour int `ini:"max_retries_an_hour" json:"max_retries_an_hour,omitempty"`
MinRetryInterval int `ini:"min_retry_interval" json:"min_retry_interval,omitempty"`
}
// DefaultVisitorConf creates a empty VisitorConf object by visitorType.
@@ -85,6 +76,7 @@ func DefaultVisitorConf(visitorType string) VisitorConf {
if !ok {
return nil
}
return reflect.New(v).Interface().(VisitorConf)
}
@@ -94,19 +86,19 @@ func NewVisitorConfFromIni(prefix string, name string, section *ini.Section) (Vi
visitorType := section.Key("type").String()
if visitorType == "" {
return nil, fmt.Errorf("type shouldn't be empty")
return nil, fmt.Errorf("visitor [%s] type shouldn't be empty", name)
}
conf := DefaultVisitorConf(visitorType)
if conf == nil {
return nil, fmt.Errorf("type [%s] error", visitorType)
return nil, fmt.Errorf("visitor [%s] type [%s] error", name, visitorType)
}
if err := conf.UnmarshalFromIni(prefix, name, section); err != nil {
return nil, fmt.Errorf("type [%s] error", visitorType)
return nil, fmt.Errorf("visitor [%s] type [%s] error", name, visitorType)
}
if err := conf.Validate(); err != nil {
if err := conf.Check(); err != nil {
return nil, err
}
@@ -114,11 +106,26 @@ func NewVisitorConfFromIni(prefix string, name string, section *ini.Section) (Vi
}
// Base
func (cfg *BaseVisitorConf) GetBaseConfig() *BaseVisitorConf {
func (cfg *BaseVisitorConf) GetBaseInfo() *BaseVisitorConf {
return cfg
}
func (cfg *BaseVisitorConf) validate() (err error) {
func (cfg *BaseVisitorConf) compare(cmp *BaseVisitorConf) bool {
if cfg.ProxyName != cmp.ProxyName ||
cfg.ProxyType != cmp.ProxyType ||
cfg.UseEncryption != cmp.UseEncryption ||
cfg.UseCompression != cmp.UseCompression ||
cfg.Role != cmp.Role ||
cfg.Sk != cmp.Sk ||
cfg.ServerName != cmp.ServerName ||
cfg.BindAddr != cmp.BindAddr ||
cfg.BindPort != cmp.BindPort {
return false
}
return true
}
func (cfg *BaseVisitorConf) check() (err error) {
if cfg.Role != "visitor" {
err = fmt.Errorf("invalid role")
return
@@ -127,9 +134,7 @@ func (cfg *BaseVisitorConf) validate() (err error) {
err = fmt.Errorf("bind_addr shouldn't be empty")
return
}
// BindPort can be less than 0, it means don't bind to the port and only receive connections redirected from
// other visitors
if cfg.BindPort == 0 {
if cfg.BindPort <= 0 {
err = fmt.Errorf("bind_port is required")
return
}
@@ -144,16 +149,13 @@ func (cfg *BaseVisitorConf) unmarshalFromIni(prefix string, name string, section
cfg.ProxyName = prefix + name
// server_name
if cfg.ServerUser == "" {
cfg.ServerName = prefix + cfg.ServerName
} else {
cfg.ServerName = cfg.ServerUser + "." + cfg.ServerName
}
cfg.ServerName = prefix + cfg.ServerName
// bind_addr
if cfg.BindAddr == "" {
cfg.BindAddr = "127.0.0.1"
}
return nil
}
@@ -163,16 +165,32 @@ func preVisitorUnmarshalFromIni(cfg VisitorConf, prefix string, name string, sec
return err
}
err = cfg.GetBaseConfig().unmarshalFromIni(prefix, name, section)
err = cfg.GetBaseInfo().unmarshalFromIni(prefix, name, section)
if err != nil {
return err
}
return nil
}
// SUDP
var _ VisitorConf = &SUDPVisitorConf{}
func (cfg *SUDPVisitorConf) Compare(cmp VisitorConf) bool {
cmpConf, ok := cmp.(*SUDPVisitorConf)
if !ok {
return false
}
if !cfg.BaseVisitorConf.compare(&cmpConf.BaseVisitorConf) {
return false
}
// Add custom login equal, if exists
return true
}
func (cfg *SUDPVisitorConf) UnmarshalFromIni(prefix string, name string, section *ini.Section) (err error) {
err = preVisitorUnmarshalFromIni(cfg, prefix, name, section)
if err != nil {
@@ -184,8 +202,8 @@ func (cfg *SUDPVisitorConf) UnmarshalFromIni(prefix string, name string, section
return
}
func (cfg *SUDPVisitorConf) Validate() (err error) {
if err = cfg.BaseVisitorConf.validate(); err != nil {
func (cfg *SUDPVisitorConf) Check() (err error) {
if err = cfg.BaseVisitorConf.check(); err != nil {
return
}
@@ -197,6 +215,21 @@ func (cfg *SUDPVisitorConf) Validate() (err error) {
// STCP
var _ VisitorConf = &STCPVisitorConf{}
func (cfg *STCPVisitorConf) Compare(cmp VisitorConf) bool {
cmpConf, ok := cmp.(*STCPVisitorConf)
if !ok {
return false
}
if !cfg.BaseVisitorConf.compare(&cmpConf.BaseVisitorConf) {
return false
}
// Add custom login equal, if exists
return true
}
func (cfg *STCPVisitorConf) UnmarshalFromIni(prefix string, name string, section *ini.Section) (err error) {
err = preVisitorUnmarshalFromIni(cfg, prefix, name, section)
if err != nil {
@@ -208,8 +241,8 @@ func (cfg *STCPVisitorConf) UnmarshalFromIni(prefix string, name string, section
return
}
func (cfg *STCPVisitorConf) Validate() (err error) {
if err = cfg.BaseVisitorConf.validate(); err != nil {
func (cfg *STCPVisitorConf) Check() (err error) {
if err = cfg.BaseVisitorConf.check(); err != nil {
return
}
@@ -221,6 +254,26 @@ func (cfg *STCPVisitorConf) Validate() (err error) {
// XTCP
var _ VisitorConf = &XTCPVisitorConf{}
func (cfg *XTCPVisitorConf) Compare(cmp VisitorConf) bool {
cmpConf, ok := cmp.(*XTCPVisitorConf)
if !ok {
return false
}
if !cfg.BaseVisitorConf.compare(&cmpConf.BaseVisitorConf) {
return false
}
// Add custom login equal, if exists
if cfg.Protocol != cmpConf.Protocol ||
cfg.KeepTunnelOpen != cmpConf.KeepTunnelOpen ||
cfg.MaxRetriesAnHour != cmpConf.MaxRetriesAnHour ||
cfg.MinRetryInterval != cmpConf.MinRetryInterval {
return false
}
return true
}
func (cfg *XTCPVisitorConf) UnmarshalFromIni(prefix string, name string, section *ini.Section) (err error) {
err = preVisitorUnmarshalFromIni(cfg, prefix, name, section)
if err != nil {
@@ -237,14 +290,11 @@ func (cfg *XTCPVisitorConf) UnmarshalFromIni(prefix string, name string, section
if cfg.MinRetryInterval <= 0 {
cfg.MinRetryInterval = 90
}
if cfg.FallbackTimeoutMs <= 0 {
cfg.FallbackTimeoutMs = 1000
}
return
}
func (cfg *XTCPVisitorConf) Validate() (err error) {
if err = cfg.BaseVisitorConf.validate(); err != nil {
func (cfg *XTCPVisitorConf) Check() (err error) {
if err = cfg.BaseVisitorConf.check(); err != nil {
return
}

View File

@@ -87,10 +87,9 @@ func Test_Visitor_UnmarshalFromIni(t *testing.T) {
BindAddr: "127.0.0.1",
BindPort: 9001,
},
Protocol: "quic",
MaxRetriesAnHour: 8,
MinRetryInterval: 90,
FallbackTimeoutMs: 1000,
Protocol: "quic",
MaxRetriesAnHour: 8,
MinRetryInterval: 90,
},
},
}

View File

@@ -110,9 +110,8 @@ type NewProxy struct {
Headers map[string]string `json:"headers,omitempty"`
RouteByHTTPUser string `json:"route_by_http_user,omitempty"`
// stcp, sudp, xtcp
Sk string `json:"sk,omitempty"`
AllowUsers []string `json:"allow_users,omitempty"`
// stcp
Sk string `json:"sk,omitempty"`
// tcpmux
Multiplexer string `json:"multiplexer,omitempty"`
@@ -146,7 +145,6 @@ type StartWorkConn struct {
}
type NewVisitorConn struct {
RunID string `json:"run_id,omitempty"`
ProxyName string `json:"proxy_name,omitempty"`
SignKey string `json:"sign_key,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`

View File

@@ -63,20 +63,20 @@ var (
}
// mode 2, HardNAT is receiver, EasyNAT is sender
// sender, portsRandomNumber 1000, sendDelayMs 3000 | receiver, listen 256 ports, ttl 7
// sender, portsRandomNumber 1000, sendDelayMs 3000 | receiver, listen 256 ports, ttl 4
// sender, portsRandomNumber 1000, sendDelayMs 3000 | receiver, listen 256 ports
// sender, portsRandomNumber 1000, sendDelayMs 2000 | receiver, listen 256 ports, ttl 7
// sender, portsRandomNumber 1000, sendDelayMs 2000 | receiver, listen 256 ports, ttl 4
// sender, portsRandomNumber 1000, sendDelayMs 2000 | receiver, listen 256 ports
mode2Behaviors = []lo.Tuple2[RecommandBehavior, RecommandBehavior]{
lo.T2(
RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 3000},
RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 2000},
RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, TTL: 7},
),
lo.T2(
RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 3000},
RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 2000},
RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, TTL: 4},
),
lo.T2(
RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 3000},
RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 2000},
RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256},
),
}
@@ -98,21 +98,21 @@ var (
}
// mode 4, Regular ports changes are usually the sender.
// sender, portsRandomNumber 1000, sendDelayMs: 2000 | receiver, listen 256 ports, ttl 7, portsRangeNumber 2
// sender, portsRandomNumber 1000, sendDelayMs: 2000 | receiver, listen 256 ports, ttl 4, portsRangeNumber 2
// sender, portsRandomNumber 1000, SendDelayMs: 2000 | receiver, listen 256 ports, portsRangeNumber 2
// sender, portsRandomNumber 1000, sendDelayMs: 2000 | receiver, listen 256 ports, ttl 7, portsRangeNumber 10
// sender, portsRandomNumber 1000, sendDelayMs: 2000 | receiver, listen 256 ports, ttl 4, portsRangeNumber 10
// sender, portsRandomNumber 1000, SendDelayMs: 2000 | receiver, listen 256 ports, portsRangeNumber 10
mode4Behaviors = []lo.Tuple2[RecommandBehavior, RecommandBehavior]{
lo.T2(
RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 3000},
RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, TTL: 7, PortsRangeNumber: 2},
RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 2000},
RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, TTL: 7, PortsRangeNumber: 10},
),
lo.T2(
RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 3000},
RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, TTL: 4, PortsRangeNumber: 2},
RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 2000},
RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, TTL: 4, PortsRangeNumber: 10},
),
lo.T2(
RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 3000},
RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, PortsRangeNumber: 2},
RecommandBehavior{Role: DetectRoleSender, PortsRandomNumber: 1000, SendDelayMs: 2000},
RecommandBehavior{Role: DetectRoleReceiver, ListenRandomPorts: 256, PortsRangeNumber: 10},
),
}
)

View File

@@ -85,6 +85,11 @@ func ClassifyNATFeature(addresses []string, localIPs []string) (*NatFeature, err
}
}
natFeature.PortsDifference = portMax - portMin
if natFeature.PortsDifference <= 10 && natFeature.PortsDifference >= 1 {
natFeature.RegularPortsChange = true
}
switch {
case ipChanged && portChanged:
natFeature.NatType = HardNAT
@@ -99,12 +104,6 @@ func ClassifyNATFeature(addresses []string, localIPs []string) (*NatFeature, err
natFeature.NatType = EasyNAT
natFeature.Behavior = BehaviorNoChange
}
if natFeature.Behavior == BehaviorPortChanged {
natFeature.PortsDifference = portMax - portMin
if natFeature.PortsDifference <= 5 && natFeature.PortsDifference >= 1 {
natFeature.RegularPortsChange = true
}
}
return natFeature, nil
}

View File

@@ -43,10 +43,9 @@ func NewTransactionID() string {
}
type ClientCfg struct {
name string
sk string
allowUsers []string
sidCh chan string
name string
sk string
sidCh chan string
}
type Session struct {
@@ -121,12 +120,11 @@ func (c *Controller) CleanWorker(ctx context.Context) {
}
}
func (c *Controller) ListenClient(name string, sk string, allowUsers []string) chan string {
func (c *Controller) ListenClient(name string, sk string) chan string {
cfg := &ClientCfg{
name: name,
sk: sk,
allowUsers: allowUsers,
sidCh: make(chan string),
name: name,
sk: sk,
sidCh: make(chan string),
}
c.mu.Lock()
defer c.mu.Unlock()
@@ -146,18 +144,14 @@ func (c *Controller) GenSid() string {
return fmt.Sprintf("%d%s", t, id)
}
func (c *Controller) HandleVisitor(m *msg.NatHoleVisitor, transporter transport.MessageTransporter, visitorUser string) {
func (c *Controller) HandleVisitor(m *msg.NatHoleVisitor, transporter transport.MessageTransporter) {
if m.PreCheck {
cfg, ok := c.clientCfgs[m.ProxyName]
_, ok := c.clientCfgs[m.ProxyName]
if !ok {
_ = transporter.Send(c.GenNatHoleResponse(m.TransactionID, nil, fmt.Sprintf("xtcp server for [%s] doesn't exist", m.ProxyName)))
return
} else {
_ = transporter.Send(c.GenNatHoleResponse(m.TransactionID, nil, ""))
}
if !lo.Contains(cfg.allowUsers, visitorUser) && !lo.Contains(cfg.allowUsers, "*") {
_ = transporter.Send(c.GenNatHoleResponse(m.TransactionID, nil, fmt.Sprintf("xtcp visitor user [%s] not allowed for [%s]", visitorUser, m.ProxyName)))
return
}
_ = transporter.Send(c.GenNatHoleResponse(m.TransactionID, nil, ""))
return
}

View File

@@ -384,7 +384,7 @@ func sendSidMessageToRangePorts(
if err := sendFunc(conn, detectAddr); err != nil {
xl.Trace("send sid message from %s to %s error: %v", conn.LocalAddr(), detectAddr, err)
}
time.Sleep(2 * time.Millisecond)
time.Sleep(5 * time.Millisecond)
}
}
}

View File

@@ -23,7 +23,7 @@ import (
"net/http/httputil"
"strings"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
const PluginHTTP2HTTPS = "http2https"
@@ -98,7 +98,7 @@ func NewHTTP2HTTPSPlugin(params map[string]string) (Plugin, error) {
}
func (p *HTTP2HTTPSPlugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) {
wrapConn := utilnet.WrapReadWriteCloserToConn(conn, realConn)
wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn)
_ = p.l.PutConn(wrapConn)
}

View File

@@ -23,10 +23,10 @@ import (
"strings"
"time"
libio "github.com/fatedier/golib/io"
libnet "github.com/fatedier/golib/net"
frpIo "github.com/fatedier/golib/io"
gnet "github.com/fatedier/golib/net"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
)
@@ -69,9 +69,9 @@ func (hp *HTTPProxy) Name() string {
}
func (hp *HTTPProxy) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) {
wrapConn := utilnet.WrapReadWriteCloserToConn(conn, realConn)
wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn)
sc, rd := libnet.NewSharedConn(wrapConn)
sc, rd := gnet.NewSharedConn(wrapConn)
firstBytes := make([]byte, 7)
_, err := rd.Read(firstBytes)
if err != nil {
@@ -86,7 +86,7 @@ func (hp *HTTPProxy) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBuf
wrapConn.Close()
return
}
hp.handleConnectReq(request, libio.WrapReadWriteCloser(bufRd, wrapConn, wrapConn.Close))
hp.handleConnectReq(request, frpIo.WrapReadWriteCloser(bufRd, wrapConn, wrapConn.Close))
return
}
@@ -158,7 +158,7 @@ func (hp *HTTPProxy) ConnectHandler(rw http.ResponseWriter, req *http.Request) {
}
_, _ = client.Write([]byte("HTTP/1.1 200 OK\r\n\r\n"))
go libio.Join(remote, client)
go frpIo.Join(remote, client)
}
func (hp *HTTPProxy) Auth(req *http.Request) bool {
@@ -213,7 +213,7 @@ func (hp *HTTPProxy) handleConnectReq(req *http.Request, rwc io.ReadWriteCloser)
}
_, _ = rwc.Write([]byte("HTTP/1.1 200 OK\r\n\r\n"))
libio.Join(remote, rwc)
frpIo.Join(remote, rwc)
}
func copyHeaders(dst, src http.Header) {

View File

@@ -24,7 +24,7 @@ import (
"strings"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
const PluginHTTPS2HTTP = "https2http"
@@ -123,7 +123,7 @@ func (p *HTTPS2HTTPPlugin) genTLSConfig() (*tls.Config, error) {
}
func (p *HTTPS2HTTPPlugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) {
wrapConn := utilnet.WrapReadWriteCloserToConn(conn, realConn)
wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn)
_ = p.l.PutConn(wrapConn)
}

View File

@@ -24,7 +24,7 @@ import (
"strings"
"github.com/fatedier/frp/pkg/transport"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
const PluginHTTPS2HTTPS = "https2https"
@@ -128,7 +128,7 @@ func (p *HTTPS2HTTPSPlugin) genTLSConfig() (*tls.Config, error) {
}
func (p *HTTPS2HTTPSPlugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) {
wrapConn := utilnet.WrapReadWriteCloserToConn(conn, realConn)
wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn)
_ = p.l.PutConn(wrapConn)
}

View File

@@ -21,7 +21,7 @@ import (
gosocks5 "github.com/armon/go-socks5"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
const PluginSocks5 = "socks5"
@@ -52,7 +52,7 @@ func NewSocks5Plugin(params map[string]string) (p Plugin, err error) {
func (sp *Socks5Plugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) {
defer conn.Close()
wrapConn := utilnet.WrapReadWriteCloserToConn(conn, realConn)
wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn)
_ = sp.Server.ServeConn(wrapConn)
}

View File

@@ -22,7 +22,7 @@ import (
"github.com/gorilla/mux"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
const PluginStaticFile = "static_file"
@@ -65,8 +65,8 @@ func NewStaticFilePlugin(params map[string]string) (Plugin, error) {
}
router := mux.NewRouter()
router.Use(utilnet.NewHTTPAuthMiddleware(httpUser, httpPasswd).SetAuthFailDelay(200 * time.Millisecond).Middleware)
router.PathPrefix(prefix).Handler(utilnet.MakeHTTPGzipHandler(http.StripPrefix(prefix, http.FileServer(http.Dir(localPath))))).Methods("GET")
router.Use(frpNet.NewHTTPAuthMiddleware(httpUser, httpPasswd).SetAuthFailDelay(200 * time.Millisecond).Middleware)
router.PathPrefix(prefix).Handler(frpNet.MakeHTTPGzipHandler(http.StripPrefix(prefix, http.FileServer(http.Dir(localPath))))).Methods("GET")
sp.s = &http.Server{
Handler: router,
}
@@ -77,7 +77,7 @@ func NewStaticFilePlugin(params map[string]string) (Plugin, error) {
}
func (sp *StaticFilePlugin) Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) {
wrapConn := utilnet.WrapReadWriteCloserToConn(conn, realConn)
wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn)
_ = sp.l.PutConn(wrapConn)
}

View File

@@ -19,7 +19,7 @@ import (
"io"
"net"
libio "github.com/fatedier/golib/io"
frpIo "github.com/fatedier/golib/io"
)
const PluginUnixDomainSocket = "unix_domain_socket"
@@ -62,7 +62,7 @@ func (uds *UnixDomainSocketPlugin) Handle(conn io.ReadWriteCloser, realConn net.
}
}
libio.Join(localConn, conn)
frpIo.Join(localConn, conn)
}
func (uds *UnixDomainSocketPlugin) Name() string {

View File

@@ -22,21 +22,20 @@ import (
"github.com/fatedier/golib/errors"
)
// InternalListener is a listener that can be used to accept connections from
// other goroutines.
type InternalListener struct {
// Custom listener
type CustomListener struct {
acceptCh chan net.Conn
closed bool
mu sync.Mutex
}
func NewInternalListener() *InternalListener {
return &InternalListener{
acceptCh: make(chan net.Conn, 128),
func NewCustomListener() *CustomListener {
return &CustomListener{
acceptCh: make(chan net.Conn, 64),
}
}
func (l *InternalListener) Accept() (net.Conn, error) {
func (l *CustomListener) Accept() (net.Conn, error) {
conn, ok := <-l.acceptCh
if !ok {
return nil, fmt.Errorf("listener closed")
@@ -44,7 +43,7 @@ func (l *InternalListener) Accept() (net.Conn, error) {
return conn, nil
}
func (l *InternalListener) PutConn(conn net.Conn) error {
func (l *CustomListener) PutConn(conn net.Conn) error {
err := errors.PanicToError(func() {
select {
case l.acceptCh <- conn:
@@ -55,7 +54,7 @@ func (l *InternalListener) PutConn(conn net.Conn) error {
return err
}
func (l *InternalListener) Close() error {
func (l *CustomListener) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
if !l.closed {
@@ -65,16 +64,6 @@ func (l *InternalListener) Close() error {
return nil
}
func (l *InternalListener) Addr() net.Addr {
return &InternalAddr{}
}
type InternalAddr struct{}
func (ia *InternalAddr) Network() string {
return "internal"
}
func (ia *InternalAddr) String() string {
return "internal"
func (l *CustomListener) Addr() net.Addr {
return (*net.TCPAddr)(nil)
}

View File

@@ -20,7 +20,7 @@ import (
"net"
"time"
libnet "github.com/fatedier/golib/net"
gnet "github.com/fatedier/golib/net"
)
var FRPTLSHeadByte = 0x17
@@ -28,7 +28,7 @@ var FRPTLSHeadByte = 0x17
func CheckAndEnableTLSServerConnWithTimeout(
c net.Conn, tlsConfig *tls.Config, tlsOnly bool, timeout time.Duration,
) (out net.Conn, isTLS bool, custom bool, err error) {
sc, r := libnet.NewSharedConnSize(c, 2)
sc, r := gnet.NewSharedConnSize(c, 2)
buf := make([]byte, 1)
var n int
_ = c.SetReadDeadline(time.Now().Add(timeout))

View File

@@ -22,7 +22,7 @@ import (
"net/http"
"time"
libnet "github.com/fatedier/golib/net"
gnet "github.com/fatedier/golib/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/vhost"
@@ -94,7 +94,7 @@ func (muxer *HTTPConnectTCPMuxer) auth(c net.Conn, username, password string, re
func (muxer *HTTPConnectTCPMuxer) getHostFromHTTPConnect(c net.Conn) (net.Conn, map[string]string, error) {
reqInfoMap := make(map[string]string, 0)
sc, rd := libnet.NewSharedConn(c)
sc, rd := gnet.NewSharedConn(c)
host, httpUser, httpPwd, err := muxer.readHTTPConnectRequest(rd)
if err != nil {

View File

@@ -28,7 +28,7 @@ import (
"strings"
"time"
libio "github.com/fatedier/golib/io"
frpIo "github.com/fatedier/golib/io"
"github.com/fatedier/golib/pool"
frpLog "github.com/fatedier/frp/pkg/util/log"
@@ -256,7 +256,7 @@ func (rp *HTTPReverseProxy) connectHandler(rw http.ResponseWriter, req *http.Req
return
}
_ = req.Write(remote)
go libio.Join(remote, client)
go frpIo.Join(remote, client)
}
func parseBasicAuth(auth string) (username, password string, ok bool) {

View File

@@ -20,7 +20,7 @@ import (
"net"
"time"
libnet "github.com/fatedier/golib/net"
gnet "github.com/fatedier/golib/net"
)
type HTTPSMuxer struct {
@@ -37,7 +37,7 @@ func NewHTTPSMuxer(listener net.Listener, timeout time.Duration) (*HTTPSMuxer, e
func GetHTTPSHostname(c net.Conn) (_ net.Conn, _ map[string]string, err error) {
reqInfoMap := make(map[string]string, 0)
sc, rd := libnet.NewSharedConn(c)
sc, rd := gnet.NewSharedConn(c)
clientHello, err := readClientHello(rd)
if err != nil {

View File

@@ -22,7 +22,7 @@ import (
"github.com/fatedier/golib/errors"
"github.com/fatedier/frp/pkg/util/log"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
)
@@ -282,7 +282,7 @@ func (l *Listener) Accept() (net.Conn, error) {
xl.Debug("rewrite host to [%s] success", l.rewriteHost)
conn = sConn
}
return utilnet.NewContextConn(l.ctx, conn), nil
return frpNet.NewContextConn(l.ctx, conn), nil
}
func (l *Listener) Close() error {

View File

@@ -30,7 +30,7 @@ import (
"github.com/fatedier/frp/pkg/auth"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/consts"
pkgerr "github.com/fatedier/frp/pkg/errors"
frpErr "github.com/fatedier/frp/pkg/errors"
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/transport"
@@ -268,7 +268,7 @@ func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
err = pkgerr.ErrCtlClosed
err = frpErr.ErrCtlClosed
return
}
xl.Debug("get work connection from pool")
@@ -283,7 +283,7 @@ func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
err = pkgerr.ErrCtlClosed
err = frpErr.ErrCtlClosed
xl.Warn("no work connections available, %v", err)
return
}
@@ -394,7 +394,7 @@ func (ctl *Control) stoper() {
for _, pxy := range ctl.proxies {
pxy.Close()
ctl.pxyManager.Del(pxy.GetName())
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConf().GetBaseConfig().ProxyType)
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
notifyContent := &plugin.CloseProxyContent{
User: plugin.UserInfo{
@@ -524,7 +524,7 @@ func (ctl *Control) manager() {
}
func (ctl *Control) HandleNatHoleVisitor(m *msg.NatHoleVisitor) {
ctl.rc.NatHoleController.HandleVisitor(m, ctl.msgTransporter, ctl.loginMsg.User)
ctl.rc.NatHoleController.HandleVisitor(m, ctl.msgTransporter)
}
func (ctl *Control) HandleNatHoleClient(m *msg.NatHoleClient) {
@@ -537,7 +537,7 @@ func (ctl *Control) HandleNatHoleReport(m *msg.NatHoleReport) {
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
var pxyConf config.ProxyConf
// Load configures from NewProxy message and validate.
// Load configures from NewProxy message and check.
pxyConf, err = config.NewProxyConfFromMsg(pxyMsg, ctl.serverCfg)
if err != nil {
return
@@ -550,8 +550,8 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
RunID: ctl.runID,
}
// NewProxy will return an interface Proxy.
// In fact, it creates different proxies based on the proxy type. We just call run() here.
// NewProxy will return a interface Proxy.
// In fact it create different proxies by different proxy type, we just call run() here.
pxy, err := proxy.NewProxy(ctl.ctx, userInfo, ctl.rc, ctl.poolCount, ctl.GetWorkConn, pxyConf, ctl.serverCfg, ctl.loginMsg)
if err != nil {
return remoteAddr, err
@@ -614,7 +614,7 @@ func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
delete(ctl.proxies, closeMsg.ProxyName)
ctl.mu.Unlock()
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConf().GetBaseConfig().ProxyType)
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
notifyContent := &plugin.CloseProxyContent{
User: plugin.UserInfo{

View File

@@ -25,7 +25,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/fatedier/frp/assets"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
var (
@@ -50,7 +50,7 @@ func (svr *Service) RunDashboardServer(address string) (err error) {
subRouter := router.NewRoute().Subrouter()
user, passwd := svr.cfg.DashboardUser, svr.cfg.DashboardPwd
subRouter.Use(utilnet.NewHTTPAuthMiddleware(user, passwd).SetAuthFailDelay(200 * time.Millisecond).Middleware)
subRouter.Use(frpNet.NewHTTPAuthMiddleware(user, passwd).SetAuthFailDelay(200 * time.Millisecond).Middleware)
// metrics
if svr.cfg.EnablePrometheus {
@@ -65,7 +65,7 @@ func (svr *Service) RunDashboardServer(address string) (err error) {
// view
subRouter.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET")
subRouter.PathPrefix("/static/").Handler(utilnet.MakeHTTPGzipHandler(http.StripPrefix("/static/", http.FileServer(assets.FileSystem)))).Methods("GET")
subRouter.PathPrefix("/static/").Handler(frpNet.MakeHTTPGzipHandler(http.StripPrefix("/static/", http.FileServer(assets.FileSystem)))).Methods("GET")
subRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/static/", http.StatusMovedPermanently)

View File

@@ -19,12 +19,12 @@ import (
"net"
"strings"
libio "github.com/fatedier/golib/io"
frpIo "github.com/fatedier/golib/io"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/util/limit"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/vhost"
"github.com/fatedier/frp/server/metrics"
@@ -157,31 +157,31 @@ func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err err
var rwc io.ReadWriteCloser = tmpConn
if pxy.cfg.UseEncryption {
rwc, err = libio.WithEncryption(rwc, []byte(pxy.serverCfg.Token))
rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.serverCfg.Token))
if err != nil {
xl.Error("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.UseCompression {
rwc = libio.WithCompression(rwc)
rwc = frpIo.WithCompression(rwc)
}
if pxy.GetLimiter() != nil {
rwc = libio.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error {
rwc = frpIo.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error {
return rwc.Close()
})
}
workConn = utilnet.WrapReadWriteCloserToConn(rwc, tmpConn)
workConn = utilnet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn)
metrics.Server.OpenConnection(pxy.GetName(), pxy.GetConf().GetBaseConfig().ProxyType)
workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn)
workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn)
metrics.Server.OpenConnection(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
return
}
func (pxy *HTTPProxy) updateStatsAfterClosedConn(totalRead, totalWrite int64) {
name := pxy.GetName()
proxyType := pxy.GetConf().GetBaseConfig().ProxyType
proxyType := pxy.GetConf().GetBaseInfo().ProxyType
metrics.Server.CloseConnection(name, proxyType)
metrics.Server.AddTrafficIn(name, proxyType, totalWrite)
metrics.Server.AddTrafficOut(name, proxyType, totalRead)

View File

@@ -23,14 +23,14 @@ import (
"sync"
"time"
libio "github.com/fatedier/golib/io"
frpIo "github.com/fatedier/golib/io"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/util/limit"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/metrics"
@@ -113,7 +113,7 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn,
}
xl.Debug("get a new work connection: [%s]", workConn.RemoteAddr().String())
xl.Spawn().AppendPrefix(pxy.GetName())
workConn = utilnet.NewContextConn(pxy.ctx, workConn)
workConn = frpNet.NewContextConn(pxy.ctx, workConn)
var (
srcAddr string
@@ -156,7 +156,7 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn,
}
// startListenHandler start a goroutine handler for each listener.
// p: p will just be passed to handler(Proxy, utilnet.Conn).
// p: p will just be passed to handler(Proxy, frpNet.Conn).
// handler: each proxy type can set different handler function to deal with connections accepted from listeners.
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, config.ServerCommonConf)) {
xl := xlog.FromContextSafe(pxy.ctx)
@@ -196,16 +196,16 @@ func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn,
func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.ResourceController, poolCount int,
getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf, loginMsg *msg.Login,
) (pxy Proxy, err error) {
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseConfig().ProxyName)
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName)
var limiter *rate.Limiter
limitBytes := pxyConf.GetBaseConfig().BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseConfig().BandwidthLimitMode == config.BandwidthLimitModeServer {
limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseInfo().BandwidthLimitMode == config.BandwidthLimitModeServer {
limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
}
basePxy := BaseProxy{
name: pxyConf.GetBaseConfig().ProxyName,
name: pxyConf.GetBaseInfo().ProxyName,
rc: rc,
listeners: make([]net.Listener, 0),
poolCount: poolCount,
@@ -277,7 +277,7 @@ func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.Serv
content := &plugin.NewUserConnContent{
User: pxy.GetUserInfo(),
ProxyName: pxy.GetName(),
ProxyType: pxy.GetConf().GetBaseConfig().ProxyType,
ProxyType: pxy.GetConf().GetBaseInfo().ProxyType,
RemoteAddr: userConn.RemoteAddr().String(),
}
_, err := rc.PluginManager.NewUserConn(content)
@@ -294,21 +294,21 @@ func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.Serv
defer workConn.Close()
var local io.ReadWriteCloser = workConn
cfg := pxy.GetConf().GetBaseConfig()
cfg := pxy.GetConf().GetBaseInfo()
xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression)
if cfg.UseEncryption {
local, err = libio.WithEncryption(local, []byte(serverCfg.Token))
local, err = frpIo.WithEncryption(local, []byte(serverCfg.Token))
if err != nil {
xl.Error("create encryption stream error: %v", err)
return
}
}
if cfg.UseCompression {
local = libio.WithCompression(local)
local = frpIo.WithCompression(local)
}
if pxy.GetLimiter() != nil {
local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
local = frpIo.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
return local.Close()
})
}
@@ -317,9 +317,9 @@ func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.Serv
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
name := pxy.GetName()
proxyType := pxy.GetConf().GetBaseConfig().ProxyType
proxyType := pxy.GetConf().GetBaseInfo().ProxyType
metrics.Server.OpenConnection(name, proxyType)
inCount, outCount, _ := libio.Join(local, userConn)
inCount, outCount, _ := frpIo.Join(local, userConn)
metrics.Server.CloseConnection(name, proxyType)
metrics.Server.AddTrafficIn(name, proxyType, inCount)
metrics.Server.AddTrafficOut(name, proxyType, outCount)

View File

@@ -27,12 +27,7 @@ type STCPProxy struct {
func (pxy *STCPProxy) Run() (remoteAddr string, err error) {
xl := pxy.xl
allowUsers := pxy.cfg.AllowUsers
// if allowUsers is empty, only allow same user from proxy
if len(allowUsers) == 0 {
allowUsers = []string{pxy.GetUserInfo().User}
}
listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk, allowUsers)
listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk)
if errRet != nil {
err = errRet
return

View File

@@ -27,12 +27,8 @@ type SUDPProxy struct {
func (pxy *SUDPProxy) Run() (remoteAddr string, err error) {
xl := pxy.xl
allowUsers := pxy.cfg.AllowUsers
// if allowUsers is empty, only allow same user from proxy
if len(allowUsers) == 0 {
allowUsers = []string{pxy.GetUserInfo().User}
}
listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk, allowUsers)
listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk)
if errRet != nil {
err = errRet
return

View File

@@ -23,14 +23,14 @@ import (
"time"
"github.com/fatedier/golib/errors"
libio "github.com/fatedier/golib/io"
frpIo "github.com/fatedier/golib/io"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/server/metrics"
)
@@ -124,7 +124,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
pxy.readCh <- m
metrics.Server.AddTrafficOut(
pxy.GetName(),
pxy.GetConf().GetBaseConfig().ProxyType,
pxy.GetConf().GetBaseInfo().ProxyType,
int64(len(m.Content)),
)
}); errRet != nil {
@@ -154,7 +154,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
xl.Trace("send message to udp workConn: %s", udpMsg.Content)
metrics.Server.AddTrafficIn(
pxy.GetName(),
pxy.GetConf().GetBaseConfig().ProxyType,
pxy.GetConf().GetBaseInfo().ProxyType,
int64(len(udpMsg.Content)),
)
continue
@@ -189,7 +189,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
var rwc io.ReadWriteCloser = workConn
if pxy.cfg.UseEncryption {
rwc, err = libio.WithEncryption(rwc, []byte(pxy.serverCfg.Token))
rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.serverCfg.Token))
if err != nil {
xl.Error("create encryption stream error: %v", err)
workConn.Close()
@@ -197,16 +197,16 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
}
}
if pxy.cfg.UseCompression {
rwc = libio.WithCompression(rwc)
rwc = frpIo.WithCompression(rwc)
}
if pxy.GetLimiter() != nil {
rwc = libio.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error {
rwc = frpIo.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error {
return rwc.Close()
})
}
pxy.workConn = utilnet.WrapReadWriteCloserToConn(rwc, workConn)
pxy.workConn = frpNet.WrapReadWriteCloserToConn(rwc, workConn)
ctx, cancel := context.WithCancel(context.Background())
go workConnReaderFn(pxy.workConn)
go workConnSenderFn(pxy.workConn, ctx)

View File

@@ -35,15 +35,11 @@ func (pxy *XTCPProxy) Run() (remoteAddr string, err error) {
xl := pxy.xl
if pxy.rc.NatHoleController == nil {
xl.Error("udp port for xtcp is not specified.")
err = fmt.Errorf("xtcp is not supported in frps")
return
}
allowUsers := pxy.cfg.AllowUsers
// if allowUsers is empty, only allow same user from proxy
if len(allowUsers) == 0 {
allowUsers = []string{pxy.GetUserInfo().User}
}
sidCh := pxy.rc.NatHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk, allowUsers)
sidCh := pxy.rc.NatHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk)
go func() {
for {
select {

View File

@@ -39,7 +39,7 @@ import (
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/transport"
"github.com/fatedier/frp/pkg/util/log"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/tcpmux"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/version"
@@ -210,7 +210,7 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
// Listen for accepting connections from client using kcp protocol.
if cfg.KCPBindPort > 0 {
address := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.KCPBindPort))
svr.kcpListener, err = utilnet.ListenKcp(address)
svr.kcpListener, err = frpNet.ListenKcp(address)
if err != nil {
err = fmt.Errorf("listen on kcp udp address %s error: %v", address, err)
return
@@ -235,11 +235,11 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
}
// Listen for accepting connections from client using websocket protocol.
websocketPrefix := []byte("GET " + utilnet.FrpWebsocketPath)
websocketPrefix := []byte("GET " + frpNet.FrpWebsocketPath)
websocketLn := svr.muxer.Listen(0, uint32(len(websocketPrefix)), func(data []byte) bool {
return bytes.Equal(data, websocketPrefix)
})
svr.websocketListener = utilnet.NewWebsocketListener(websocketLn)
svr.websocketListener = frpNet.NewWebsocketListener(websocketLn)
// Create http vhost muxer.
if cfg.VhostHTTPPort > 0 {
@@ -294,7 +294,7 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
// frp tls listener
svr.tlsListener = svr.muxer.Listen(2, 1, func(data []byte) bool {
// tls first byte can be 0x16 only when vhost https port is not same with bind port
return int(data[0]) == utilnet.FRPTLSHeadByte || int(data[0]) == 0x16
return int(data[0]) == frpNet.FRPTLSHeadByte || int(data[0]) == 0x16
})
// Create nat hole controller.
@@ -442,12 +442,12 @@ func (svr *Service) HandleListener(l net.Listener) {
xl := xlog.New()
ctx := context.Background()
c = utilnet.NewContextConn(xlog.NewContext(ctx, xl), c)
c = frpNet.NewContextConn(xlog.NewContext(ctx, xl), c)
log.Trace("start check TLS connection...")
originConn := c
var isTLS, custom bool
c, isTLS, custom, err = utilnet.CheckAndEnableTLSServerConnWithTimeout(c, svr.tlsConfig, svr.cfg.TLSOnly, connReadTimeout)
c, isTLS, custom, err = frpNet.CheckAndEnableTLSServerConnWithTimeout(c, svr.tlsConfig, svr.cfg.TLSOnly, connReadTimeout)
if err != nil {
log.Warn("CheckAndEnableTLSServerConnWithTimeout error: %v", err)
originConn.Close()
@@ -461,7 +461,6 @@ func (svr *Service) HandleListener(l net.Listener) {
fmuxCfg := fmux.DefaultConfig()
fmuxCfg.KeepAliveInterval = time.Duration(svr.cfg.TCPMuxKeepaliveInterval) * time.Second
fmuxCfg.LogOutput = io.Discard
fmuxCfg.MaxStreamWindowSize = 6 * 1024 * 1024
session, err := fmux.Server(frpConn, fmuxCfg)
if err != nil {
log.Warn("Failed to create mux connection: %v", err)
@@ -502,7 +501,7 @@ func (svr *Service) HandleQUICListener(l quic.Listener) {
_ = frpConn.CloseWithError(0, "")
return
}
go svr.handleConnection(ctx, utilnet.QuicStreamToNetConn(stream, frpConn))
go svr.handleConnection(ctx, frpNet.QuicStreamToNetConn(stream, frpConn))
}
}(context.Background(), c)
}
@@ -518,7 +517,7 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err
}
}
ctx := utilnet.NewContextFromConn(ctlConn)
ctx := frpNet.NewContextFromConn(ctlConn)
xl := xlog.FromContextSafe(ctx)
xl.AppendPrefix(loginMsg.RunID)
ctx = xlog.NewContext(ctx, xl)
@@ -556,7 +555,7 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err
// RegisterWorkConn register a new work connection to control and proxies need it.
func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn) error {
xl := utilnet.NewLogFromConn(workConn)
xl := frpNet.NewLogFromConn(workConn)
ctl, exist := svr.ctlManager.GetByID(newMsg.RunID)
if !exist {
xl.Warn("No client control found for run id [%s]", newMsg.RunID)
@@ -588,15 +587,6 @@ func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn)
}
func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVisitorConn) error {
visitorUser := ""
// TODO: Compatible with old versions, can be without runID, user is empty. In later versions, it will be mandatory to include runID.
if newMsg.RunID != "" {
ctl, exist := svr.ctlManager.GetByID(newMsg.RunID)
if !exist {
return fmt.Errorf("no client control found for run id [%s]", newMsg.RunID)
}
visitorUser = ctl.loginMsg.User
}
return svr.rc.VisitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey,
newMsg.UseEncryption, newMsg.UseCompression, visitorUser)
newMsg.UseEncryption, newMsg.UseCompression)
}

View File

@@ -20,78 +20,66 @@ import (
"net"
"sync"
libio "github.com/fatedier/golib/io"
"github.com/samber/lo"
frpIo "github.com/fatedier/golib/io"
utilnet "github.com/fatedier/frp/pkg/util/net"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
)
type listenerBundle struct {
l *utilnet.InternalListener
sk string
allowUsers []string
}
// Manager for visitor listeners.
type Manager struct {
listeners map[string]*listenerBundle
visitorListeners map[string]*frpNet.CustomListener
skMap map[string]string
mu sync.RWMutex
}
func NewManager() *Manager {
return &Manager{
listeners: make(map[string]*listenerBundle),
visitorListeners: make(map[string]*frpNet.CustomListener),
skMap: make(map[string]string),
}
}
func (vm *Manager) Listen(name string, sk string, allowUsers []string) (l *utilnet.InternalListener, err error) {
func (vm *Manager) Listen(name string, sk string) (l *frpNet.CustomListener, err error) {
vm.mu.Lock()
defer vm.mu.Unlock()
if _, ok := vm.listeners[name]; ok {
if _, ok := vm.visitorListeners[name]; ok {
err = fmt.Errorf("custom listener for [%s] is repeated", name)
return
}
l = utilnet.NewInternalListener()
vm.listeners[name] = &listenerBundle{
l: l,
sk: sk,
allowUsers: allowUsers,
}
l = frpNet.NewCustomListener()
vm.visitorListeners[name] = l
vm.skMap[name] = sk
return
}
func (vm *Manager) NewConn(name string, conn net.Conn, timestamp int64, signKey string,
useEncryption bool, useCompression bool, visitorUser string,
useEncryption bool, useCompression bool,
) (err error) {
vm.mu.RLock()
defer vm.mu.RUnlock()
if l, ok := vm.listeners[name]; ok {
if util.GetAuthKey(l.sk, timestamp) != signKey {
if l, ok := vm.visitorListeners[name]; ok {
var sk string
if sk = vm.skMap[name]; util.GetAuthKey(sk, timestamp) != signKey {
err = fmt.Errorf("visitor connection of [%s] auth failed", name)
return
}
if !lo.Contains(l.allowUsers, visitorUser) && !lo.Contains(l.allowUsers, "*") {
err = fmt.Errorf("visitor connection of [%s] user [%s] not allowed", name, visitorUser)
return
}
var rwc io.ReadWriteCloser = conn
if useEncryption {
if rwc, err = libio.WithEncryption(rwc, []byte(l.sk)); err != nil {
if rwc, err = frpIo.WithEncryption(rwc, []byte(sk)); err != nil {
err = fmt.Errorf("create encryption connection failed: %v", err)
return
}
}
if useCompression {
rwc = libio.WithCompression(rwc)
rwc = frpIo.WithCompression(rwc)
}
err = l.l.PutConn(utilnet.WrapReadWriteCloserToConn(rwc, conn))
err = l.PutConn(frpNet.WrapReadWriteCloserToConn(rwc, conn))
} else {
err = fmt.Errorf("custom listener for [%s] doesn't exist", name)
return
@@ -103,5 +91,6 @@ func (vm *Manager) CloseListener(name string) {
vm.mu.Lock()
defer vm.mu.Unlock()
delete(vm.listeners, name)
delete(vm.visitorListeners, name)
delete(vm.skMap, name)
}

View File

@@ -282,9 +282,8 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
proxyType := t
ginkgo.It(fmt.Sprintf("Expose echo server with %s", strings.ToUpper(proxyType)), func() {
serverConf := consts.DefaultServerConfig
clientServerConf := consts.DefaultClientConfig + "\nuser = user1"
clientVisitorConf := consts.DefaultClientConfig + "\nuser = user1"
clientUser2VisitorConf := consts.DefaultClientConfig + "\nuser = user2"
clientServerConf := consts.DefaultClientConfig
clientVisitorConf := consts.DefaultClientConfig
localPortName := ""
protocol := "tcp"
@@ -324,14 +323,11 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
}
tests := []struct {
proxyName string
bindPortName string
visitorSK string
commonExtraConfig string
proxyExtraConfig string
visitorExtraConfig string
expectError bool
user2 bool
proxyName string
bindPortName string
visitorSK string
extraConfig string
expectError bool
}{
{
proxyName: "normal",
@@ -339,22 +335,22 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
visitorSK: correctSK,
},
{
proxyName: "with-encryption",
bindPortName: port.GenName("WithEncryption"),
visitorSK: correctSK,
commonExtraConfig: "use_encryption = true",
proxyName: "with-encryption",
bindPortName: port.GenName("WithEncryption"),
visitorSK: correctSK,
extraConfig: "use_encryption = true",
},
{
proxyName: "with-compression",
bindPortName: port.GenName("WithCompression"),
visitorSK: correctSK,
commonExtraConfig: "use_compression = true",
proxyName: "with-compression",
bindPortName: port.GenName("WithCompression"),
visitorSK: correctSK,
extraConfig: "use_compression = true",
},
{
proxyName: "with-encryption-and-compression",
bindPortName: port.GenName("WithEncryptionAndCompression"),
visitorSK: correctSK,
commonExtraConfig: `
extraConfig: `
use_encryption = true
use_compression = true
`,
@@ -365,53 +361,22 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
visitorSK: wrongSK,
expectError: true,
},
{
proxyName: "allowed-user",
bindPortName: port.GenName("AllowedUser"),
visitorSK: correctSK,
proxyExtraConfig: "allow_users = another, user2",
visitorExtraConfig: "server_user = user1",
user2: true,
},
{
proxyName: "not-allowed-user",
bindPortName: port.GenName("NotAllowedUser"),
visitorSK: correctSK,
proxyExtraConfig: "allow_users = invalid",
visitorExtraConfig: "server_user = user1",
expectError: true,
},
{
proxyName: "allow-all",
bindPortName: port.GenName("AllowAll"),
visitorSK: correctSK,
proxyExtraConfig: "allow_users = *",
visitorExtraConfig: "server_user = user1",
user2: true,
},
}
// build all client config
for _, test := range tests {
clientServerConf += getProxyServerConf(test.proxyName, test.commonExtraConfig+"\n"+test.proxyExtraConfig) + "\n"
clientServerConf += getProxyServerConf(test.proxyName, test.extraConfig) + "\n"
}
for _, test := range tests {
config := getProxyVisitorConf(
test.proxyName, test.bindPortName, test.visitorSK, test.commonExtraConfig+"\n"+test.visitorExtraConfig,
) + "\n"
if test.user2 {
clientUser2VisitorConf += config
} else {
clientVisitorConf += config
}
clientVisitorConf += getProxyVisitorConf(test.proxyName, test.bindPortName, test.visitorSK, test.extraConfig) + "\n"
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientServerConf, clientVisitorConf, clientUser2VisitorConf})
f.RunProcesses([]string{serverConf}, []string{clientServerConf, clientVisitorConf})
for _, test := range tests {
framework.NewRequestExpect(f).
RequestModify(func(r *request.Request) {
r.Timeout(3 * time.Second)
r.Timeout(5 * time.Second)
}).
Protocol(protocol).
PortName(test.bindPortName).

View File

@@ -1,52 +0,0 @@
package basic
import (
"fmt"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/fatedier/frp/test/e2e/framework"
"github.com/fatedier/frp/test/e2e/framework/consts"
"github.com/fatedier/frp/test/e2e/pkg/port"
"github.com/fatedier/frp/test/e2e/pkg/request"
)
var _ = ginkgo.Describe("[Feature: XTCP]", func() {
f := framework.NewDefaultFramework()
ginkgo.It("Fallback To STCP", func() {
serverConf := consts.DefaultServerConfig
clientConf := consts.DefaultClientConfig
bindPortName := port.GenName("XTCP")
clientConf += fmt.Sprintf(`
[foo]
type = stcp
local_port = {{ .%s }}
[foo-visitor]
type = stcp
role = visitor
server_name = foo
bind_port = -1
[bar-visitor]
type = xtcp
role = visitor
server_name = bar
bind_port = {{ .%s }}
keep_tunnel_open = true
fallback_to = foo-visitor
fallback_timeout_ms = 200
`, framework.TCPEchoServerPort, bindPortName)
f.RunProcesses([]string{serverConf}, []string{clientConf})
framework.NewRequestExpect(f).
RequestModify(func(r *request.Request) {
r.Timeout(time.Second)
}).
PortName(bindPortName).
Ensure()
})
})

View File

@@ -56,7 +56,7 @@ func (f *Framework) RunProcesses(serverTemplates []string, clientTemplates []str
ExpectNoError(err)
time.Sleep(500 * time.Millisecond)
}
time.Sleep(3 * time.Second)
time.Sleep(2 * time.Second)
return currentServerProcesses, currentClientProcesses
}

View File

@@ -14,6 +14,9 @@
<el-form-item label="BindPort">
<span>{{ data.bind_port }}</span>
</el-form-item>
<el-form-item label="Bind UDP Port" v-if="data.bind_udp_port != 0">
<span>{{ data.bind_udp_port }}</span>
</el-form-item>
<el-form-item label="KCP Bind Port" v-if="data.kcp_bind_port != 0">
<span>{{ data.kcp_bind_port }}</span>
</el-form-item>
@@ -88,6 +91,7 @@ import LongSpan from './LongSpan.vue'
let data = ref({
version: '',
bind_port: 0,
bind_udp_port: 0,
kcp_bind_port: 0,
quic_bind_port: 0,
vhost_http_port: 0,
@@ -110,6 +114,7 @@ const fetchData = () => {
.then((json) => {
data.value.version = json.version
data.value.bind_port = json.bind_port
data.value.bind_udp_port = json.bind_udp_port
data.value.kcp_bind_port = json.kcp_bind_port
data.value.quic_bind_port = json.quic_bind_port
data.value.vhost_http_port = json.vhost_http_port