diff --git a/client/proxy/proxy.go b/client/proxy/proxy.go index f208083c..1b29a754 100644 --- a/client/proxy/proxy.go +++ b/client/proxy/proxy.go @@ -757,12 +757,12 @@ func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf if m.DstAddr == "" { m.DstAddr = "127.0.0.1" } + srcAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.SrcAddr, strconv.Itoa(int(m.SrcPort)))) + dstAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.DstAddr, strconv.Itoa(int(m.DstPort)))) h := &pp.Header{ - Command: pp.PROXY, - SourceAddress: net.ParseIP(m.SrcAddr), - SourcePort: m.SrcPort, - DestinationAddress: net.ParseIP(m.DstAddr), - DestinationPort: m.DstPort, + Command: pp.PROXY, + SourceAddr: srcAddr, + DestinationAddr: dstAddr, } if strings.Contains(m.SrcAddr, ".") { diff --git a/go.mod b/go.mod index 989595e1..aefa0a9e 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/leodido/go-urn v1.2.1 // indirect github.com/onsi/ginkgo v1.16.4 github.com/onsi/gomega v1.13.0 - github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc + github.com/pires/go-proxyproto v0.5.0 github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect github.com/prometheus/client_golang v1.11.0 github.com/rakyll/statik v0.1.1 diff --git a/go.sum b/go.sum index 082069a2..a7a52958 100644 --- a/go.sum +++ b/go.sum @@ -260,8 +260,8 @@ github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak= github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= -github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc h1:lNOt1SMsgHXTdpuGw+RpnJtzUcCb/oRKZP65pBy9pr8= -github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc/go.mod h1:6/gX3+E/IYGa0wMORlSMla999awQFdbaeQCHjSMKIzY= +github.com/pires/go-proxyproto v0.5.0 h1:A4Jv4ZCaV3AFJeGh5mGwkz4iuWUYMlQ7IoO/GTuSuLo= +github.com/pires/go-proxyproto v0.5.0/go.mod h1:Odh9VFOZJCf9G8cLW5o435Xf1J95Jw9Gw5rnCjcwzAY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/hack/run-e2e.sh b/hack/run-e2e.sh index 289265d7..c2de13d0 100755 --- a/hack/run-e2e.sh +++ b/hack/run-e2e.sh @@ -17,4 +17,4 @@ if [ x${LOG_LEVEL} != x"" ]; then logLevel=${LOG_LEVEL} fi -ginkgo -nodes=5 -slowSpecThreshold=10 ${ROOT}/test/e2e -- -frpc-path=${ROOT}/bin/frpc -frps-path=${ROOT}/bin/frps -log-level=${logLevel} -debug=${debug} +ginkgo -nodes=5 -slowSpecThreshold=20 ${ROOT}/test/e2e -- -frpc-path=${ROOT}/bin/frpc -frps-path=${ROOT}/bin/frps -log-level=${logLevel} -debug=${debug} diff --git a/test/e2e/basic/http.go b/test/e2e/basic/http.go index f94c0044..b8cb448c 100644 --- a/test/e2e/basic/http.go +++ b/test/e2e/basic/http.go @@ -272,7 +272,7 @@ var _ = Describe("[Feature: HTTP]", func() { Ensure() }) - It("websocket", func() { + It("Websocket protocol", func() { vhostHTTPPort := f.AllocPort() serverConf := getDefaultServerConf(vhostHTTPPort) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index ea0bfdfa..2dddb952 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -11,6 +11,7 @@ import ( // test source _ "github.com/fatedier/frp/test/e2e/basic" + _ "github.com/fatedier/frp/test/e2e/features" _ "github.com/fatedier/frp/test/e2e/plugin" _ "github.com/onsi/ginkgo" diff --git a/test/e2e/features/bandwidth_limit.go b/test/e2e/features/bandwidth_limit.go new file mode 100644 index 00000000..f984fdb9 --- /dev/null +++ b/test/e2e/features/bandwidth_limit.go @@ -0,0 +1,47 @@ +package features + +import ( + "fmt" + "strings" + "time" + + "github.com/fatedier/frp/test/e2e/framework" + "github.com/fatedier/frp/test/e2e/framework/consts" + "github.com/fatedier/frp/test/e2e/mock/server/streamserver" + "github.com/fatedier/frp/test/e2e/pkg/request" + + . "github.com/onsi/ginkgo" +) + +var _ = Describe("[Feature: Bandwidth Limit]", func() { + f := framework.NewDefaultFramework() + + It("Proxy Bandwidth Limit", func() { + serverConf := consts.DefaultServerConfig + clientConf := consts.DefaultClientConfig + + localPort := f.AllocPort() + localServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(localPort)) + f.RunServer("", localServer) + + remotePort := f.AllocPort() + clientConf += fmt.Sprintf(` + [tcp] + type = tcp + local_port = %d + remote_port = %d + bandwidth_limit = 10KB + `, localPort, remotePort) + + f.RunProcesses([]string{serverConf}, []string{clientConf}) + + content := strings.Repeat("a", 50*1024) // 5KB + start := time.Now() + framework.NewRequestExpect(f).Port(remotePort).RequestModify(func(r *request.Request) { + r.Body([]byte(content)).Timeout(30 * time.Second) + }).ExpectResp([]byte(content)).Ensure() + duration := time.Now().Sub(start) + + framework.ExpectTrue(duration.Seconds() > 7, "100Kb with 10KB limit, want > 7 seconds, but got %d seconds", duration.Seconds()) + }) +}) diff --git a/test/e2e/basic/chaos.go b/test/e2e/features/chaos.go similarity index 99% rename from test/e2e/basic/chaos.go rename to test/e2e/features/chaos.go index ad328114..2049fc77 100644 --- a/test/e2e/basic/chaos.go +++ b/test/e2e/features/chaos.go @@ -1,4 +1,4 @@ -package basic +package features import ( "fmt" diff --git a/test/e2e/basic/group.go b/test/e2e/features/group.go similarity index 99% rename from test/e2e/basic/group.go rename to test/e2e/features/group.go index 423977dc..ea6b781f 100644 --- a/test/e2e/basic/group.go +++ b/test/e2e/features/group.go @@ -1,4 +1,4 @@ -package basic +package features import ( "fmt" diff --git a/test/e2e/features/real_ip.go b/test/e2e/features/real_ip.go new file mode 100644 index 00000000..857633a0 --- /dev/null +++ b/test/e2e/features/real_ip.go @@ -0,0 +1,20 @@ +package features + +import ( + "github.com/fatedier/frp/test/e2e/framework" + + . "github.com/onsi/ginkgo" +) + +var _ = Describe("[Feature: Real IP]", func() { + f := framework.NewDefaultFramework() + + It("HTTP X-Forwarded-For", func() { + // TODO + _ = f + }) + + It("Proxy Protocol", func() { + // TODO + }) +}) diff --git a/test/e2e/framework/mockservers.go b/test/e2e/framework/mockservers.go index e6a2cd40..af41ab90 100644 --- a/test/e2e/framework/mockservers.go +++ b/test/e2e/framework/mockservers.go @@ -31,8 +31,8 @@ func NewMockServers(portAllocator *port.Allocator) *MockServers { tcpPort := portAllocator.Get() udpPort := portAllocator.Get() httpPort := portAllocator.Get() - s.tcpEchoServer = streamserver.New(streamserver.TCP, streamserver.WithBindPort(tcpPort), streamserver.WithEchoMode(true)) - s.udpEchoServer = streamserver.New(streamserver.UDP, streamserver.WithBindPort(udpPort), streamserver.WithEchoMode(true)) + s.tcpEchoServer = streamserver.New(streamserver.TCP, streamserver.WithBindPort(tcpPort)) + s.udpEchoServer = streamserver.New(streamserver.UDP, streamserver.WithBindPort(udpPort)) s.httpSimpleServer = httpserver.New(httpserver.WithBindPort(httpPort), httpserver.WithHandler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { w.Write([]byte(consts.TestString)) }))) @@ -40,7 +40,7 @@ func NewMockServers(portAllocator *port.Allocator) *MockServers { udsIndex := portAllocator.Get() udsAddr := fmt.Sprintf("%s/frp_echo_server_%d.sock", os.TempDir(), udsIndex) os.Remove(udsAddr) - s.udsEchoServer = streamserver.New(streamserver.Unix, streamserver.WithBindAddr(udsAddr), streamserver.WithEchoMode(true)) + s.udsEchoServer = streamserver.New(streamserver.Unix, streamserver.WithBindAddr(udsAddr)) return s } diff --git a/test/e2e/mock/server/streamserver/server.go b/test/e2e/mock/server/streamserver/server.go index 1714721e..37b96ee9 100644 --- a/test/e2e/mock/server/streamserver/server.go +++ b/test/e2e/mock/server/streamserver/server.go @@ -5,6 +5,7 @@ import ( "net" libnet "github.com/fatedier/frp/pkg/util/net" + "github.com/fatedier/frp/test/e2e/pkg/rpc" ) type Type string @@ -20,9 +21,6 @@ type Server struct { bindAddr string bindPort int respContent []byte - bufSize int64 - - echoMode bool l net.Listener } @@ -33,7 +31,6 @@ func New(netType Type, options ...Option) *Server { s := &Server{ netType: netType, bindAddr: "127.0.0.1", - bufSize: 2048, } for _, option := range options { @@ -63,20 +60,6 @@ func WithRespContent(content []byte) Option { } } -func WithBufSize(bufSize int64) Option { - return func(s *Server) *Server { - s.bufSize = bufSize - return s - } -} - -func WithEchoMode(echoMode bool) Option { - return func(s *Server) *Server { - s.echoMode = echoMode - return s - } -} - func (s *Server) Run() error { if err := s.initListener(); err != nil { return err @@ -118,18 +101,16 @@ func (s *Server) initListener() (err error) { func (s *Server) handle(c net.Conn) { defer c.Close() - buf := make([]byte, s.bufSize) for { - n, err := c.Read(buf) + buf, err := rpc.ReadBytes(c) if err != nil { return } - if s.echoMode { - c.Write(buf[:n]) - } else { - c.Write(s.respContent) + if len(s.respContent) > 0 { + buf = s.respContent } + rpc.WriteBytes(c, buf) } } diff --git a/test/e2e/pkg/request/request.go b/test/e2e/pkg/request/request.go index 68a5572f..9a457b8c 100644 --- a/test/e2e/pkg/request/request.go +++ b/test/e2e/pkg/request/request.go @@ -11,6 +11,7 @@ import ( "strconv" "time" + "github.com/fatedier/frp/test/e2e/pkg/rpc" libnet "github.com/fatedier/golib/net" ) @@ -210,15 +211,14 @@ func sendHTTPRequest(method, urlstr string, host string, headers map[string]stri } func sendRequestByConn(c net.Conn, content []byte) ([]byte, error) { - _, err := c.Write(content) + _, err := rpc.WriteBytes(c, content) if err != nil { return nil, fmt.Errorf("write error: %v", err) } - buf := make([]byte, 2048) - n, err := c.Read(buf) + buf, err := rpc.ReadBytes(c) if err != nil { return nil, fmt.Errorf("read error: %v", err) } - return buf[:n], nil + return buf, nil } diff --git a/test/e2e/pkg/rpc/rpc.go b/test/e2e/pkg/rpc/rpc.go new file mode 100644 index 00000000..6303d35c --- /dev/null +++ b/test/e2e/pkg/rpc/rpc.go @@ -0,0 +1,35 @@ +package rpc + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "io" +) + +func WriteBytes(w io.Writer, buf []byte) (int, error) { + out := bytes.NewBuffer(nil) + binary.Write(out, binary.BigEndian, int64(len(buf))) + out.Write(buf) + return w.Write(out.Bytes()) +} + +func ReadBytes(r io.Reader) ([]byte, error) { + // To compatible with UDP connection, use bufio reader here to avoid lost conent. + rd := bufio.NewReader(r) + + var length int64 + if err := binary.Read(rd, binary.BigEndian, &length); err != nil { + return nil, err + } + buffer := make([]byte, length) + n, err := io.ReadFull(rd, buffer) + if err != nil { + return nil, err + } + if int64(n) != length { + return nil, errors.New("invalid length") + } + return buffer, nil +}