support proxy protocol

This commit is contained in:
fatedier 2019-03-29 19:01:18 +08:00
parent 74a8752570
commit 9c4ec56491
13 changed files with 98 additions and 26 deletions

View File

@ -131,7 +131,7 @@ func (ctl *Control) HandleReqWorkConn(inMsg *msg.ReqWorkConn) {
workConn.AddLogPrefix(startMsg.ProxyName) workConn.AddLogPrefix(startMsg.ProxyName)
// dispatch this work connection to related proxy // dispatch this work connection to related proxy
ctl.pm.HandleWorkConn(startMsg.ProxyName, workConn) ctl.pm.HandleWorkConn(startMsg.ProxyName, workConn, &startMsg)
} }
func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) { func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) {

View File

@ -37,6 +37,7 @@ import (
frpIo "github.com/fatedier/golib/io" frpIo "github.com/fatedier/golib/io"
"github.com/fatedier/golib/pool" "github.com/fatedier/golib/pool"
fmux "github.com/hashicorp/yamux" fmux "github.com/hashicorp/yamux"
pp "github.com/pires/go-proxyproto"
) )
// Proxy defines how to handle work connections for different proxy type. // Proxy defines how to handle work connections for different proxy type.
@ -44,7 +45,7 @@ type Proxy interface {
Run() error Run() error
// InWorkConn accept work connections registered to server. // InWorkConn accept work connections registered to server.
InWorkConn(conn frpNet.Conn) InWorkConn(frpNet.Conn, *msg.StartWorkConn)
Close() Close()
log.Logger log.Logger
@ -119,9 +120,9 @@ func (pxy *TcpProxy) Close() {
} }
} }
func (pxy *TcpProxy) InWorkConn(conn frpNet.Conn) { func (pxy *TcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
[]byte(g.GlbClientCfg.Token)) []byte(g.GlbClientCfg.Token), m)
} }
// HTTP // HTTP
@ -148,9 +149,9 @@ func (pxy *HttpProxy) Close() {
} }
} }
func (pxy *HttpProxy) InWorkConn(conn frpNet.Conn) { func (pxy *HttpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
[]byte(g.GlbClientCfg.Token)) []byte(g.GlbClientCfg.Token), m)
} }
// HTTPS // HTTPS
@ -177,9 +178,9 @@ func (pxy *HttpsProxy) Close() {
} }
} }
func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn) { func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
[]byte(g.GlbClientCfg.Token)) []byte(g.GlbClientCfg.Token), m)
} }
// STCP // STCP
@ -206,9 +207,9 @@ func (pxy *StcpProxy) Close() {
} }
} }
func (pxy *StcpProxy) InWorkConn(conn frpNet.Conn) { func (pxy *StcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
[]byte(g.GlbClientCfg.Token)) []byte(g.GlbClientCfg.Token), m)
} }
// XTCP // XTCP
@ -235,7 +236,7 @@ func (pxy *XtcpProxy) Close() {
} }
} }
func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn) { func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
defer conn.Close() defer conn.Close()
var natHoleSidMsg msg.NatHoleSid var natHoleSidMsg msg.NatHoleSid
err := msg.ReadMsgInto(conn, &natHoleSidMsg) err := msg.ReadMsgInto(conn, &natHoleSidMsg)
@ -353,7 +354,7 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn) {
} }
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf,
frpNet.WrapConn(muxConn), []byte(pxy.cfg.Sk)) frpNet.WrapConn(muxConn), []byte(pxy.cfg.Sk), m)
} }
func (pxy *XtcpProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) { func (pxy *XtcpProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) {
@ -415,7 +416,7 @@ func (pxy *UdpProxy) Close() {
} }
} }
func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) { func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
pxy.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String()) pxy.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
// close resources releated with old workConn // close resources releated with old workConn
pxy.Close() pxy.Close()
@ -482,7 +483,7 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) {
// Common handler for tcp work connections. // Common handler for tcp work connections.
func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin, func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
baseInfo *config.BaseProxyConf, workConn frpNet.Conn, encKey []byte) { baseInfo *config.BaseProxyConf, workConn frpNet.Conn, encKey []byte, m *msg.StartWorkConn) {
var ( var (
remote io.ReadWriteCloser remote io.ReadWriteCloser
@ -518,6 +519,34 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.
workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(), workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String()) localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
// check if we need to send proxy protocol info
if baseInfo.ProxyProtocolVersion != "" {
if m.SrcAddr != "" && m.SrcPort != 0 {
h := &pp.Header{
Command: pp.PROXY,
SourceAddress: net.ParseIP(m.SrcAddr),
SourcePort: m.SrcPort,
DestinationAddress: net.ParseIP(m.DstAddr),
DestinationPort: m.DstPort,
}
if h.SourceAddress.To16() == nil {
h.TransportProtocol = pp.TCPv4
} else {
h.TransportProtocol = pp.TCPv6
}
if baseInfo.ProxyProtocolVersion == "v1" {
h.Version = 1
} else if baseInfo.ProxyProtocolVersion == "v2" {
h.Version = 2
}
h.WriteTo(localConn)
}
}
frpIo.Join(localConn, remote) frpIo.Join(localConn, remote)
workConn.Debug("join connections closed") workConn.Debug("join connections closed")
} }

View File

@ -58,12 +58,12 @@ func (pm *ProxyManager) Close() {
pm.proxies = make(map[string]*ProxyWrapper) pm.proxies = make(map[string]*ProxyWrapper)
} }
func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) { func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn, m *msg.StartWorkConn) {
pm.mu.RLock() pm.mu.RLock()
pw, ok := pm.proxies[name] pw, ok := pm.proxies[name]
pm.mu.RUnlock() pm.mu.RUnlock()
if ok { if ok {
pw.InWorkConn(workConn) pw.InWorkConn(workConn, m)
} else { } else {
workConn.Close() workConn.Close()
} }

View File

@ -217,13 +217,13 @@ func (pw *ProxyWrapper) statusFailedCallback() {
pw.Info("health check failed") pw.Info("health check failed")
} }
func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) { func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn, m *msg.StartWorkConn) {
pw.mu.RLock() pw.mu.RLock()
pxy := pw.pxy pxy := pw.pxy
pw.mu.RUnlock() pw.mu.RUnlock()
if pxy != nil { if pxy != nil {
workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
go pxy.InWorkConn(workConn) go pxy.InWorkConn(workConn, m)
} else { } else {
workConn.Close() workConn.Close()
} }

View File

@ -154,6 +154,9 @@ use_encryption = false
use_compression = false use_compression = false
subdomain = web01 subdomain = web01
custom_domains = web02.yourdomain.com custom_domains = web02.yourdomain.com
# if not empty, frpc will use proxy protocol to transfer connection info to your local service
# v1 or v2 or empty
proxy_protocol_version = v2
[plugin_unix_domain_socket] [plugin_unix_domain_socket]
type = tcp type = tcp

1
go.mod
View File

@ -17,6 +17,7 @@ require (
github.com/klauspost/cpuid v1.2.0 // indirect github.com/klauspost/cpuid v1.2.0 // indirect
github.com/klauspost/reedsolomon v1.9.1 // indirect github.com/klauspost/reedsolomon v1.9.1 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc
github.com/pkg/errors v0.8.0 // indirect github.com/pkg/errors v0.8.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rakyll/statik v0.1.1 github.com/rakyll/statik v0.1.1

2
go.sum
View File

@ -20,6 +20,8 @@ github.com/klauspost/reedsolomon v1.9.1 h1:kYrT1MlR4JH6PqOpC+okdb9CDTcwEC/BqpzK4
github.com/klauspost/reedsolomon v1.9.1/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= github.com/klauspost/reedsolomon v1.9.1/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y= github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc h1:lNOt1SMsgHXTdpuGw+RpnJtzUcCb/oRKZP65pBy9pr8=
github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc/go.mod h1:6/gX3+E/IYGa0wMORlSMla999awQFdbaeQCHjSMKIzY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rakyll/statik v0.1.1/go.mod h1:OEi9wJV/fMUAGx1eNjq75DKDsJVuEv1U0oYdX6GX8Zs= github.com/rakyll/statik v0.1.1/go.mod h1:OEi9wJV/fMUAGx1eNjq75DKDsJVuEv1U0oYdX6GX8Zs=

View File

@ -107,8 +107,10 @@ type BaseProxyConf struct {
Group string `json:"group"` Group string `json:"group"`
GroupKey string `json:"group_key"` GroupKey string `json:"group_key"`
// only used for client
ProxyProtocolVersion string `json:"proxy_protocol_version"`
LocalSvrConf LocalSvrConf
HealthCheckConf // only used for client HealthCheckConf
} }
func (cfg *BaseProxyConf) GetBaseInfo() *BaseProxyConf { func (cfg *BaseProxyConf) GetBaseInfo() *BaseProxyConf {
@ -121,7 +123,8 @@ func (cfg *BaseProxyConf) compare(cmp *BaseProxyConf) bool {
cfg.UseEncryption != cmp.UseEncryption || cfg.UseEncryption != cmp.UseEncryption ||
cfg.UseCompression != cmp.UseCompression || cfg.UseCompression != cmp.UseCompression ||
cfg.Group != cmp.Group || cfg.Group != cmp.Group ||
cfg.GroupKey != cmp.GroupKey { cfg.GroupKey != cmp.GroupKey ||
cfg.ProxyProtocolVersion != cmp.ProxyProtocolVersion {
return false return false
} }
if !cfg.LocalSvrConf.compare(&cmp.LocalSvrConf) { if !cfg.LocalSvrConf.compare(&cmp.LocalSvrConf) {
@ -162,6 +165,7 @@ func (cfg *BaseProxyConf) UnmarshalFromIni(prefix string, name string, section i
cfg.Group = section["group"] cfg.Group = section["group"]
cfg.GroupKey = section["group_key"] cfg.GroupKey = section["group_key"]
cfg.ProxyProtocolVersion = section["proxy_protocol_version"]
if err := cfg.LocalSvrConf.UnmarshalFromIni(prefix, name, section); err != nil { if err := cfg.LocalSvrConf.UnmarshalFromIni(prefix, name, section); err != nil {
return err return err
@ -194,6 +198,12 @@ func (cfg *BaseProxyConf) MarshalToMsg(pMsg *msg.NewProxy) {
} }
func (cfg *BaseProxyConf) checkForCli() (err error) { func (cfg *BaseProxyConf) checkForCli() (err error) {
if cfg.ProxyProtocolVersion != "" {
if cfg.ProxyProtocolVersion != "v1" && cfg.ProxyProtocolVersion != "v2" {
return fmt.Errorf("no support proxy protocol version: %s", cfg.ProxyProtocolVersion)
}
}
if err = cfg.LocalSvrConf.checkForCli(); err != nil { if err = cfg.LocalSvrConf.checkForCli(); err != nil {
return return
} }

View File

@ -126,6 +126,10 @@ type ReqWorkConn struct {
type StartWorkConn struct { type StartWorkConn struct {
ProxyName string `json:"proxy_name"` ProxyName string `json:"proxy_name"`
SrcAddr string `json:"src_addr"`
DstAddr string `json:"dst_addr"`
SrcPort uint16 `json:"src_port"`
DstPort uint16 `json:"dst_port"`
} }
type NewVisitorConn struct { type NewVisitorConn struct {

View File

@ -98,7 +98,7 @@ func (pxy *HttpProxy) GetConf() config.ProxyConf {
} }
func (pxy *HttpProxy) GetRealConn() (workConn frpNet.Conn, err error) { func (pxy *HttpProxy) GetRealConn() (workConn frpNet.Conn, err error) {
tmpConn, errRet := pxy.GetWorkConnFromPool() tmpConn, errRet := pxy.GetWorkConnFromPool(nil, nil)
if errRet != nil { if errRet != nil {
err = errRet err = errRet
return return

View File

@ -17,6 +17,8 @@ package proxy
import ( import (
"fmt" "fmt"
"io" "io"
"net"
"strconv"
"sync" "sync"
"github.com/fatedier/frp/g" "github.com/fatedier/frp/g"
@ -36,7 +38,7 @@ type Proxy interface {
Run() (remoteAddr string, err error) Run() (remoteAddr string, err error)
GetName() string GetName() string
GetConf() config.ProxyConf GetConf() config.ProxyConf
GetWorkConnFromPool() (workConn frpNet.Conn, err error) GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Conn, err error)
GetUsedPortsNum() int GetUsedPortsNum() int
Close() Close()
log.Logger log.Logger
@ -70,7 +72,7 @@ func (pxy *BaseProxy) Close() {
} }
} }
func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) { func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Conn, err error) {
// try all connections from the pool // try all connections from the pool
for i := 0; i < pxy.poolCount+1; i++ { for i := 0; i < pxy.poolCount+1; i++ {
if workConn, err = pxy.getWorkConnFn(); err != nil { if workConn, err = pxy.getWorkConnFn(); err != nil {
@ -80,8 +82,29 @@ func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) {
pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String()) pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
workConn.AddLogPrefix(pxy.GetName()) workConn.AddLogPrefix(pxy.GetName())
var (
srcAddr string
dstAddr string
srcPortStr string
dstPortStr string
srcPort int
dstPort int
)
if src != nil {
srcAddr, srcPortStr, _ = net.SplitHostPort(src.String())
srcPort, _ = strconv.Atoi(srcPortStr)
}
if dst != nil {
dstAddr, dstPortStr, _ = net.SplitHostPort(dst.String())
dstPort, _ = strconv.Atoi(dstPortStr)
}
err := msg.WriteMsg(workConn, &msg.StartWorkConn{ err := msg.WriteMsg(workConn, &msg.StartWorkConn{
ProxyName: pxy.GetName(), ProxyName: pxy.GetName(),
SrcAddr: srcAddr,
SrcPort: uint16(srcPort),
DstAddr: dstAddr,
DstPort: uint16(dstPort),
}) })
if err != nil { if err != nil {
workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i) workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
@ -177,7 +200,7 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector sta
defer userConn.Close() defer userConn.Close()
// try all connections from the pool // try all connections from the pool
workConn, err := pxy.GetWorkConnFromPool() workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
if err != nil { if err != nil {
return return
} }

View File

@ -160,7 +160,7 @@ func (pxy *UdpProxy) Run() (remoteAddr string, err error) {
// Sleep a while for waiting control send the NewProxyResp to client. // Sleep a while for waiting control send the NewProxyResp to client.
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
for { for {
workConn, err := pxy.GetWorkConnFromPool() workConn, err := pxy.GetWorkConnFromPool(nil, nil)
if err != nil { if err != nil {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
// check if proxy is closed // check if proxy is closed

View File

@ -44,7 +44,7 @@ func (pxy *XtcpProxy) Run() (remoteAddr string, err error) {
break break
case sidRequest := <-sidCh: case sidRequest := <-sidCh:
sr := sidRequest sr := sidRequest
workConn, errRet := pxy.GetWorkConnFromPool() workConn, errRet := pxy.GetWorkConnFromPool(nil, nil)
if errRet != nil { if errRet != nil {
continue continue
} }