mirror of
https://github.com/fatedier/frp.git
synced 2025-07-27 15:45:39 +00:00
sshTunnelGateway refactor (#3784)
This commit is contained in:
223
client/connector.go
Normal file
223
client/connector.go
Normal file
@@ -0,0 +1,223 @@
|
||||
// Copyright 2023 The frp Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"io"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
libdial "github.com/fatedier/golib/net/dial"
|
||||
fmux "github.com/hashicorp/yamux"
|
||||
quic "github.com/quic-go/quic-go"
|
||||
"github.com/samber/lo"
|
||||
|
||||
v1 "github.com/fatedier/frp/pkg/config/v1"
|
||||
"github.com/fatedier/frp/pkg/transport"
|
||||
utilnet "github.com/fatedier/frp/pkg/util/net"
|
||||
"github.com/fatedier/frp/pkg/util/xlog"
|
||||
)
|
||||
|
||||
// Connector is a interface for establishing connections to the server.
|
||||
type Connector interface {
|
||||
Open() error
|
||||
Connect() (net.Conn, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// defaultConnectorImpl is the default implementation of Connector for normal frpc.
|
||||
type defaultConnectorImpl struct {
|
||||
ctx context.Context
|
||||
cfg *v1.ClientCommonConfig
|
||||
|
||||
muxSession *fmux.Session
|
||||
quicConn quic.Connection
|
||||
}
|
||||
|
||||
func NewConnector(ctx context.Context, cfg *v1.ClientCommonConfig) Connector {
|
||||
return &defaultConnectorImpl{
|
||||
ctx: ctx,
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
// Open opens a underlying connection to the server.
|
||||
// The underlying connection is either a TCP connection or a QUIC connection.
|
||||
// After the underlying connection is established, you can call Connect() to get a stream.
|
||||
// If TCPMux isn't enabled, the underlying connection is nil, you will get a new real TCP connection every time you call Connect().
|
||||
func (c *defaultConnectorImpl) Open() error {
|
||||
xl := xlog.FromContextSafe(c.ctx)
|
||||
|
||||
// special for quic
|
||||
if strings.EqualFold(c.cfg.Transport.Protocol, "quic") {
|
||||
var tlsConfig *tls.Config
|
||||
var err error
|
||||
sn := c.cfg.Transport.TLS.ServerName
|
||||
if sn == "" {
|
||||
sn = c.cfg.ServerAddr
|
||||
}
|
||||
if lo.FromPtr(c.cfg.Transport.TLS.Enable) {
|
||||
tlsConfig, err = transport.NewClientTLSConfig(
|
||||
c.cfg.Transport.TLS.CertFile,
|
||||
c.cfg.Transport.TLS.KeyFile,
|
||||
c.cfg.Transport.TLS.TrustedCaFile,
|
||||
sn)
|
||||
} else {
|
||||
tlsConfig, err = transport.NewClientTLSConfig("", "", "", sn)
|
||||
}
|
||||
if err != nil {
|
||||
xl.Warn("fail to build tls configuration, err: %v", err)
|
||||
return err
|
||||
}
|
||||
tlsConfig.NextProtos = []string{"frp"}
|
||||
|
||||
conn, err := quic.DialAddr(
|
||||
c.ctx,
|
||||
net.JoinHostPort(c.cfg.ServerAddr, strconv.Itoa(c.cfg.ServerPort)),
|
||||
tlsConfig, &quic.Config{
|
||||
MaxIdleTimeout: time.Duration(c.cfg.Transport.QUIC.MaxIdleTimeout) * time.Second,
|
||||
MaxIncomingStreams: int64(c.cfg.Transport.QUIC.MaxIncomingStreams),
|
||||
KeepAlivePeriod: time.Duration(c.cfg.Transport.QUIC.KeepalivePeriod) * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.quicConn = conn
|
||||
return nil
|
||||
}
|
||||
|
||||
if !lo.FromPtr(c.cfg.Transport.TCPMux) {
|
||||
return nil
|
||||
}
|
||||
|
||||
conn, err := c.realConnect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmuxCfg := fmux.DefaultConfig()
|
||||
fmuxCfg.KeepAliveInterval = time.Duration(c.cfg.Transport.TCPMuxKeepaliveInterval) * time.Second
|
||||
fmuxCfg.LogOutput = io.Discard
|
||||
fmuxCfg.MaxStreamWindowSize = 6 * 1024 * 1024
|
||||
session, err := fmux.Client(conn, fmuxCfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.muxSession = session
|
||||
return nil
|
||||
}
|
||||
|
||||
// Connect returns a stream from the underlying connection, or a new TCP connection if TCPMux isn't enabled.
|
||||
func (c *defaultConnectorImpl) Connect() (net.Conn, error) {
|
||||
if c.quicConn != nil {
|
||||
stream, err := c.quicConn.OpenStreamSync(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return utilnet.QuicStreamToNetConn(stream, c.quicConn), nil
|
||||
} else if c.muxSession != nil {
|
||||
stream, err := c.muxSession.OpenStream()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
return c.realConnect()
|
||||
}
|
||||
|
||||
func (c *defaultConnectorImpl) realConnect() (net.Conn, error) {
|
||||
xl := xlog.FromContextSafe(c.ctx)
|
||||
var tlsConfig *tls.Config
|
||||
var err error
|
||||
tlsEnable := lo.FromPtr(c.cfg.Transport.TLS.Enable)
|
||||
if c.cfg.Transport.Protocol == "wss" {
|
||||
tlsEnable = true
|
||||
}
|
||||
if tlsEnable {
|
||||
sn := c.cfg.Transport.TLS.ServerName
|
||||
if sn == "" {
|
||||
sn = c.cfg.ServerAddr
|
||||
}
|
||||
|
||||
tlsConfig, err = transport.NewClientTLSConfig(
|
||||
c.cfg.Transport.TLS.CertFile,
|
||||
c.cfg.Transport.TLS.KeyFile,
|
||||
c.cfg.Transport.TLS.TrustedCaFile,
|
||||
sn)
|
||||
if err != nil {
|
||||
xl.Warn("fail to build tls configuration, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
proxyType, addr, auth, err := libdial.ParseProxyURL(c.cfg.Transport.ProxyURL)
|
||||
if err != nil {
|
||||
xl.Error("fail to parse proxy url")
|
||||
return nil, err
|
||||
}
|
||||
dialOptions := []libdial.DialOption{}
|
||||
protocol := c.cfg.Transport.Protocol
|
||||
switch protocol {
|
||||
case "websocket":
|
||||
protocol = "tcp"
|
||||
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{Hook: utilnet.DialHookWebsocket(protocol, "")}))
|
||||
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{
|
||||
Hook: utilnet.DialHookCustomTLSHeadByte(tlsConfig != nil, lo.FromPtr(c.cfg.Transport.TLS.DisableCustomTLSFirstByte)),
|
||||
}))
|
||||
dialOptions = append(dialOptions, libdial.WithTLSConfig(tlsConfig))
|
||||
case "wss":
|
||||
protocol = "tcp"
|
||||
dialOptions = append(dialOptions, libdial.WithTLSConfigAndPriority(100, tlsConfig))
|
||||
// Make sure that if it is wss, the websocket hook is executed after the tls hook.
|
||||
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{Hook: utilnet.DialHookWebsocket(protocol, tlsConfig.ServerName), Priority: 110}))
|
||||
default:
|
||||
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{
|
||||
Hook: utilnet.DialHookCustomTLSHeadByte(tlsConfig != nil, lo.FromPtr(c.cfg.Transport.TLS.DisableCustomTLSFirstByte)),
|
||||
}))
|
||||
dialOptions = append(dialOptions, libdial.WithTLSConfig(tlsConfig))
|
||||
}
|
||||
|
||||
if c.cfg.Transport.ConnectServerLocalIP != "" {
|
||||
dialOptions = append(dialOptions, libdial.WithLocalAddr(c.cfg.Transport.ConnectServerLocalIP))
|
||||
}
|
||||
dialOptions = append(dialOptions,
|
||||
libdial.WithProtocol(protocol),
|
||||
libdial.WithTimeout(time.Duration(c.cfg.Transport.DialServerTimeout)*time.Second),
|
||||
libdial.WithKeepAlive(time.Duration(c.cfg.Transport.DialServerKeepAlive)*time.Second),
|
||||
libdial.WithProxy(proxyType, addr),
|
||||
libdial.WithProxyAuth(auth),
|
||||
)
|
||||
conn, err := libdial.DialContext(
|
||||
c.ctx,
|
||||
net.JoinHostPort(c.cfg.ServerAddr, strconv.Itoa(c.cfg.ServerPort)),
|
||||
dialOptions...,
|
||||
)
|
||||
return conn, err
|
||||
}
|
||||
|
||||
func (c *defaultConnectorImpl) Close() error {
|
||||
if c.quicConn != nil {
|
||||
_ = c.quicConn.CloseWithError(0, "")
|
||||
}
|
||||
if c.muxSession != nil {
|
||||
_ = c.muxSession.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -58,8 +58,8 @@ type Control struct {
|
||||
// control connection. Once conn is closed, the msgDispatcher and the entire Control will exit.
|
||||
conn net.Conn
|
||||
|
||||
// use cm to create new connections, which could be real TCP connections or virtual streams.
|
||||
cm *ConnectionManager
|
||||
// use connector to create new connections, which could be real TCP connections or virtual streams.
|
||||
connector Connector
|
||||
|
||||
doneCh chan struct{}
|
||||
|
||||
@@ -77,7 +77,7 @@ type Control struct {
|
||||
}
|
||||
|
||||
func NewControl(
|
||||
ctx context.Context, runID string, conn net.Conn, cm *ConnectionManager,
|
||||
ctx context.Context, runID string, conn net.Conn, connector Connector,
|
||||
clientCfg *v1.ClientCommonConfig,
|
||||
pxyCfgs []v1.ProxyConfigurer,
|
||||
visitorCfgs []v1.VisitorConfigurer,
|
||||
@@ -92,7 +92,7 @@ func NewControl(
|
||||
runID: runID,
|
||||
pxyCfgs: pxyCfgs,
|
||||
conn: conn,
|
||||
cm: cm,
|
||||
connector: connector,
|
||||
doneCh: make(chan struct{}),
|
||||
}
|
||||
ctl.lastPong.Store(time.Now())
|
||||
@@ -122,6 +122,10 @@ func (ctl *Control) Run() {
|
||||
go ctl.vm.Run()
|
||||
}
|
||||
|
||||
func (ctl *Control) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
|
||||
ctl.pm.SetInWorkConnCallback(cb)
|
||||
}
|
||||
|
||||
func (ctl *Control) handleReqWorkConn(_ msg.Message) {
|
||||
xl := ctl.xl
|
||||
workConn, err := ctl.connectServer()
|
||||
@@ -207,7 +211,7 @@ func (ctl *Control) GracefulClose(d time.Duration) error {
|
||||
time.Sleep(d)
|
||||
|
||||
ctl.conn.Close()
|
||||
ctl.cm.Close()
|
||||
ctl.connector.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -218,7 +222,7 @@ func (ctl *Control) Done() <-chan struct{} {
|
||||
|
||||
// connectServer return a new connection to frps
|
||||
func (ctl *Control) connectServer() (conn net.Conn, err error) {
|
||||
return ctl.cm.Connect()
|
||||
return ctl.connector.Connect()
|
||||
}
|
||||
|
||||
func (ctl *Control) registerMsgHandlers() {
|
||||
@@ -282,7 +286,7 @@ func (ctl *Control) worker() {
|
||||
|
||||
ctl.pm.Close()
|
||||
ctl.vm.Close()
|
||||
ctl.cm.Close()
|
||||
ctl.connector.Close()
|
||||
|
||||
close(ctl.doneCh)
|
||||
}
|
||||
|
@@ -47,10 +47,9 @@ func RegisterProxyFactory(proxyConfType reflect.Type, factory func(*BaseProxy, v
|
||||
// Proxy defines how to handle work connections for different proxy type.
|
||||
type Proxy interface {
|
||||
Run() error
|
||||
|
||||
// InWorkConn accept work connections registered to server.
|
||||
InWorkConn(net.Conn, *msg.StartWorkConn)
|
||||
|
||||
SetInWorkConnCallback(func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) /* continue */ bool)
|
||||
Close()
|
||||
}
|
||||
|
||||
@@ -89,7 +88,8 @@ type BaseProxy struct {
|
||||
limiter *rate.Limiter
|
||||
// proxyPlugin is used to handle connections instead of dialing to local service.
|
||||
// It's only validate for TCP protocol now.
|
||||
proxyPlugin plugin.Plugin
|
||||
proxyPlugin plugin.Plugin
|
||||
inWorkConnCallback func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) /* continue */ bool
|
||||
|
||||
mu sync.RWMutex
|
||||
xl *xlog.Logger
|
||||
@@ -113,7 +113,16 @@ func (pxy *BaseProxy) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
func (pxy *BaseProxy) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
|
||||
pxy.inWorkConnCallback = cb
|
||||
}
|
||||
|
||||
func (pxy *BaseProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
|
||||
if pxy.inWorkConnCallback != nil {
|
||||
if !pxy.inWorkConnCallback(pxy.baseCfg, conn, m) {
|
||||
return
|
||||
}
|
||||
}
|
||||
pxy.HandleTCPWorkConnection(conn, m, []byte(pxy.clientCfg.Auth.Token))
|
||||
}
|
||||
|
||||
@@ -132,7 +141,7 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
|
||||
})
|
||||
}
|
||||
|
||||
xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t",
|
||||
xl.Trace("handle tcp work connection, useEncryption: %t, useCompression: %t",
|
||||
baseCfg.Transport.UseEncryption, baseCfg.Transport.UseCompression)
|
||||
if baseCfg.Transport.UseEncryption {
|
||||
remote, err = libio.WithEncryption(remote, encKey)
|
||||
|
@@ -31,8 +31,9 @@ import (
|
||||
)
|
||||
|
||||
type Manager struct {
|
||||
proxies map[string]*Wrapper
|
||||
msgTransporter transport.MessageTransporter
|
||||
proxies map[string]*Wrapper
|
||||
msgTransporter transport.MessageTransporter
|
||||
inWorkConnCallback func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool
|
||||
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
@@ -71,6 +72,10 @@ func (pm *Manager) StartProxy(name string, remoteAddr string, serverRespErr stri
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pm *Manager) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
|
||||
pm.inWorkConnCallback = cb
|
||||
}
|
||||
|
||||
func (pm *Manager) Close() {
|
||||
pm.mu.Lock()
|
||||
defer pm.mu.Unlock()
|
||||
@@ -146,6 +151,9 @@ func (pm *Manager) Reload(pxyCfgs []v1.ProxyConfigurer) {
|
||||
name := cfg.GetBaseConfig().Name
|
||||
if _, ok := pm.proxies[name]; !ok {
|
||||
pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter)
|
||||
if pm.inWorkConnCallback != nil {
|
||||
pxy.SetInWorkConnCallback(pm.inWorkConnCallback)
|
||||
}
|
||||
pm.proxies[name] = pxy
|
||||
addPxyNames = append(addPxyNames, name)
|
||||
|
||||
|
@@ -121,6 +121,10 @@ func NewWrapper(
|
||||
return pw
|
||||
}
|
||||
|
||||
func (pw *Wrapper) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
|
||||
pw.pxy.SetInWorkConnCallback(cb)
|
||||
}
|
||||
|
||||
func (pw *Wrapper) SetRunningStatus(remoteAddr string, respErr string) error {
|
||||
pw.mu.Lock()
|
||||
defer pw.mu.Unlock()
|
||||
|
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !frps
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
|
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !frps
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
|
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !frps
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
|
@@ -16,30 +16,22 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fatedier/golib/crypto"
|
||||
libdial "github.com/fatedier/golib/net/dial"
|
||||
fmux "github.com/hashicorp/yamux"
|
||||
quic "github.com/quic-go/quic-go"
|
||||
"github.com/samber/lo"
|
||||
|
||||
"github.com/fatedier/frp/assets"
|
||||
"github.com/fatedier/frp/pkg/auth"
|
||||
v1 "github.com/fatedier/frp/pkg/config/v1"
|
||||
"github.com/fatedier/frp/pkg/msg"
|
||||
"github.com/fatedier/frp/pkg/transport"
|
||||
"github.com/fatedier/frp/pkg/util/log"
|
||||
utilnet "github.com/fatedier/frp/pkg/util/net"
|
||||
"github.com/fatedier/frp/pkg/util/version"
|
||||
"github.com/fatedier/frp/pkg/util/wait"
|
||||
"github.com/fatedier/frp/pkg/util/xlog"
|
||||
@@ -75,6 +67,9 @@ type Service struct {
|
||||
// call cancel to stop service
|
||||
cancel context.CancelFunc
|
||||
gracefulDuration time.Duration
|
||||
|
||||
connectorCreator func(context.Context, *v1.ClientCommonConfig) Connector
|
||||
inWorkConnCallback func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool
|
||||
}
|
||||
|
||||
func NewService(
|
||||
@@ -84,15 +79,24 @@ func NewService(
|
||||
cfgFile string,
|
||||
) *Service {
|
||||
return &Service{
|
||||
authSetter: auth.NewAuthSetter(cfg.Auth),
|
||||
cfg: cfg,
|
||||
cfgFile: cfgFile,
|
||||
pxyCfgs: pxyCfgs,
|
||||
visitorCfgs: visitorCfgs,
|
||||
ctx: context.Background(),
|
||||
authSetter: auth.NewAuthSetter(cfg.Auth),
|
||||
cfg: cfg,
|
||||
cfgFile: cfgFile,
|
||||
pxyCfgs: pxyCfgs,
|
||||
visitorCfgs: visitorCfgs,
|
||||
ctx: context.Background(),
|
||||
connectorCreator: NewConnector,
|
||||
}
|
||||
}
|
||||
|
||||
func (svr *Service) SetConnectorCreator(h func(context.Context, *v1.ClientCommonConfig) Connector) {
|
||||
svr.connectorCreator = h
|
||||
}
|
||||
|
||||
func (svr *Service) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
|
||||
svr.inWorkConnCallback = cb
|
||||
}
|
||||
|
||||
func (svr *Service) GetController() *Control {
|
||||
svr.ctlMu.RLock()
|
||||
defer svr.ctlMu.RUnlock()
|
||||
@@ -101,7 +105,7 @@ func (svr *Service) GetController() *Control {
|
||||
|
||||
func (svr *Service) Run(ctx context.Context) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
svr.ctx = xlog.NewContext(ctx, xlog.New())
|
||||
svr.ctx = xlog.NewContext(ctx, xlog.FromContextSafe(ctx))
|
||||
svr.cancel = cancel
|
||||
|
||||
// set custom DNSServer
|
||||
@@ -173,21 +177,20 @@ func (svr *Service) keepControllerWorking() {
|
||||
// login creates a connection to frps and registers it self as a client
|
||||
// conn: control connection
|
||||
// session: if it's not nil, using tcp mux
|
||||
func (svr *Service) login() (conn net.Conn, cm *ConnectionManager, err error) {
|
||||
func (svr *Service) login() (conn net.Conn, connector Connector, err error) {
|
||||
xl := xlog.FromContextSafe(svr.ctx)
|
||||
cm = NewConnectionManager(svr.ctx, svr.cfg)
|
||||
|
||||
if err = cm.OpenConnection(); err != nil {
|
||||
connector = svr.connectorCreator(svr.ctx, svr.cfg)
|
||||
if err = connector.Open(); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
cm.Close()
|
||||
connector.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
conn, err = cm.Connect()
|
||||
conn, err = connector.Connect()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -226,8 +229,7 @@ func (svr *Service) login() (conn net.Conn, cm *ConnectionManager, err error) {
|
||||
}
|
||||
|
||||
svr.runID = loginRespMsg.RunID
|
||||
xl.ResetPrefixes()
|
||||
xl.AppendPrefix(svr.runID)
|
||||
xl.AddPrefix(xlog.LogPrefix{Name: "runID", Value: svr.runID})
|
||||
|
||||
xl.Info("login to server success, get run id [%s]", loginRespMsg.RunID)
|
||||
return
|
||||
@@ -239,7 +241,7 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
|
||||
|
||||
loginFunc := func() error {
|
||||
xl.Info("try to connect to server...")
|
||||
conn, cm, err := svr.login()
|
||||
conn, connector, err := svr.login()
|
||||
if err != nil {
|
||||
xl.Warn("connect to server error: %v", err)
|
||||
if firstLoginExit {
|
||||
@@ -248,13 +250,14 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
|
||||
return err
|
||||
}
|
||||
|
||||
ctl, err := NewControl(svr.ctx, svr.runID, conn, cm,
|
||||
ctl, err := NewControl(svr.ctx, svr.runID, conn, connector,
|
||||
svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.authSetter)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
xl.Error("NewControl error: %v", err)
|
||||
return err
|
||||
}
|
||||
ctl.SetInWorkConnCallback(svr.inWorkConnCallback)
|
||||
|
||||
ctl.Run()
|
||||
// close and replace previous control
|
||||
@@ -314,184 +317,3 @@ func (svr *Service) stop() {
|
||||
svr.ctl = nil
|
||||
}
|
||||
}
|
||||
|
||||
// ConnectionManager is a wrapper for establishing connections to the server.
|
||||
type ConnectionManager struct {
|
||||
ctx context.Context
|
||||
cfg *v1.ClientCommonConfig
|
||||
|
||||
muxSession *fmux.Session
|
||||
quicConn quic.Connection
|
||||
}
|
||||
|
||||
func NewConnectionManager(ctx context.Context, cfg *v1.ClientCommonConfig) *ConnectionManager {
|
||||
return &ConnectionManager{
|
||||
ctx: ctx,
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
// OpenConnection opens a underlying connection to the server.
|
||||
// The underlying connection is either a TCP connection or a QUIC connection.
|
||||
// After the underlying connection is established, you can call Connect() to get a stream.
|
||||
// If TCPMux isn't enabled, the underlying connection is nil, you will get a new real TCP connection every time you call Connect().
|
||||
func (cm *ConnectionManager) OpenConnection() error {
|
||||
xl := xlog.FromContextSafe(cm.ctx)
|
||||
|
||||
// special for quic
|
||||
if strings.EqualFold(cm.cfg.Transport.Protocol, "quic") {
|
||||
var tlsConfig *tls.Config
|
||||
var err error
|
||||
sn := cm.cfg.Transport.TLS.ServerName
|
||||
if sn == "" {
|
||||
sn = cm.cfg.ServerAddr
|
||||
}
|
||||
if lo.FromPtr(cm.cfg.Transport.TLS.Enable) {
|
||||
tlsConfig, err = transport.NewClientTLSConfig(
|
||||
cm.cfg.Transport.TLS.CertFile,
|
||||
cm.cfg.Transport.TLS.KeyFile,
|
||||
cm.cfg.Transport.TLS.TrustedCaFile,
|
||||
sn)
|
||||
} else {
|
||||
tlsConfig, err = transport.NewClientTLSConfig("", "", "", sn)
|
||||
}
|
||||
if err != nil {
|
||||
xl.Warn("fail to build tls configuration, err: %v", err)
|
||||
return err
|
||||
}
|
||||
tlsConfig.NextProtos = []string{"frp"}
|
||||
|
||||
conn, err := quic.DialAddr(
|
||||
cm.ctx,
|
||||
net.JoinHostPort(cm.cfg.ServerAddr, strconv.Itoa(cm.cfg.ServerPort)),
|
||||
tlsConfig, &quic.Config{
|
||||
MaxIdleTimeout: time.Duration(cm.cfg.Transport.QUIC.MaxIdleTimeout) * time.Second,
|
||||
MaxIncomingStreams: int64(cm.cfg.Transport.QUIC.MaxIncomingStreams),
|
||||
KeepAlivePeriod: time.Duration(cm.cfg.Transport.QUIC.KeepalivePeriod) * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cm.quicConn = conn
|
||||
return nil
|
||||
}
|
||||
|
||||
if !lo.FromPtr(cm.cfg.Transport.TCPMux) {
|
||||
return nil
|
||||
}
|
||||
|
||||
conn, err := cm.realConnect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmuxCfg := fmux.DefaultConfig()
|
||||
fmuxCfg.KeepAliveInterval = time.Duration(cm.cfg.Transport.TCPMuxKeepaliveInterval) * time.Second
|
||||
fmuxCfg.LogOutput = io.Discard
|
||||
fmuxCfg.MaxStreamWindowSize = 6 * 1024 * 1024
|
||||
session, err := fmux.Client(conn, fmuxCfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cm.muxSession = session
|
||||
return nil
|
||||
}
|
||||
|
||||
// Connect returns a stream from the underlying connection, or a new TCP connection if TCPMux isn't enabled.
|
||||
func (cm *ConnectionManager) Connect() (net.Conn, error) {
|
||||
if cm.quicConn != nil {
|
||||
stream, err := cm.quicConn.OpenStreamSync(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return utilnet.QuicStreamToNetConn(stream, cm.quicConn), nil
|
||||
} else if cm.muxSession != nil {
|
||||
stream, err := cm.muxSession.OpenStream()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
return cm.realConnect()
|
||||
}
|
||||
|
||||
func (cm *ConnectionManager) realConnect() (net.Conn, error) {
|
||||
xl := xlog.FromContextSafe(cm.ctx)
|
||||
var tlsConfig *tls.Config
|
||||
var err error
|
||||
tlsEnable := lo.FromPtr(cm.cfg.Transport.TLS.Enable)
|
||||
if cm.cfg.Transport.Protocol == "wss" {
|
||||
tlsEnable = true
|
||||
}
|
||||
if tlsEnable {
|
||||
sn := cm.cfg.Transport.TLS.ServerName
|
||||
if sn == "" {
|
||||
sn = cm.cfg.ServerAddr
|
||||
}
|
||||
|
||||
tlsConfig, err = transport.NewClientTLSConfig(
|
||||
cm.cfg.Transport.TLS.CertFile,
|
||||
cm.cfg.Transport.TLS.KeyFile,
|
||||
cm.cfg.Transport.TLS.TrustedCaFile,
|
||||
sn)
|
||||
if err != nil {
|
||||
xl.Warn("fail to build tls configuration, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
proxyType, addr, auth, err := libdial.ParseProxyURL(cm.cfg.Transport.ProxyURL)
|
||||
if err != nil {
|
||||
xl.Error("fail to parse proxy url")
|
||||
return nil, err
|
||||
}
|
||||
dialOptions := []libdial.DialOption{}
|
||||
protocol := cm.cfg.Transport.Protocol
|
||||
switch protocol {
|
||||
case "websocket":
|
||||
protocol = "tcp"
|
||||
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{Hook: utilnet.DialHookWebsocket(protocol, "")}))
|
||||
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{
|
||||
Hook: utilnet.DialHookCustomTLSHeadByte(tlsConfig != nil, lo.FromPtr(cm.cfg.Transport.TLS.DisableCustomTLSFirstByte)),
|
||||
}))
|
||||
dialOptions = append(dialOptions, libdial.WithTLSConfig(tlsConfig))
|
||||
case "wss":
|
||||
protocol = "tcp"
|
||||
dialOptions = append(dialOptions, libdial.WithTLSConfigAndPriority(100, tlsConfig))
|
||||
// Make sure that if it is wss, the websocket hook is executed after the tls hook.
|
||||
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{Hook: utilnet.DialHookWebsocket(protocol, tlsConfig.ServerName), Priority: 110}))
|
||||
default:
|
||||
dialOptions = append(dialOptions, libdial.WithAfterHook(libdial.AfterHook{
|
||||
Hook: utilnet.DialHookCustomTLSHeadByte(tlsConfig != nil, lo.FromPtr(cm.cfg.Transport.TLS.DisableCustomTLSFirstByte)),
|
||||
}))
|
||||
dialOptions = append(dialOptions, libdial.WithTLSConfig(tlsConfig))
|
||||
}
|
||||
|
||||
if cm.cfg.Transport.ConnectServerLocalIP != "" {
|
||||
dialOptions = append(dialOptions, libdial.WithLocalAddr(cm.cfg.Transport.ConnectServerLocalIP))
|
||||
}
|
||||
dialOptions = append(dialOptions,
|
||||
libdial.WithProtocol(protocol),
|
||||
libdial.WithTimeout(time.Duration(cm.cfg.Transport.DialServerTimeout)*time.Second),
|
||||
libdial.WithKeepAlive(time.Duration(cm.cfg.Transport.DialServerKeepAlive)*time.Second),
|
||||
libdial.WithProxy(proxyType, addr),
|
||||
libdial.WithProxyAuth(auth),
|
||||
)
|
||||
conn, err := libdial.DialContext(
|
||||
cm.ctx,
|
||||
net.JoinHostPort(cm.cfg.ServerAddr, strconv.Itoa(cm.cfg.ServerPort)),
|
||||
dialOptions...,
|
||||
)
|
||||
return conn, err
|
||||
}
|
||||
|
||||
func (cm *ConnectionManager) Close() error {
|
||||
if cm.quicConn != nil {
|
||||
_ = cm.quicConn.CloseWithError(0, "")
|
||||
}
|
||||
if cm.muxSession != nil {
|
||||
_ = cm.muxSession.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user