From 511503d34c39ff806ee68f28c45aebfcb78ea27e Mon Sep 17 00:00:00 2001 From: fatedier Date: Tue, 6 Jun 2017 18:48:40 +0800 Subject: [PATCH] io.Copy use pool buffer --- client/proxy.go | 8 +-- models/plugin/http_proxy.go | 4 +- models/plugin/unix_domain_socket.go | 4 +- models/proto/tcp/process.go | 38 ----------- models/proto/tcp/process_test.go | 67 ------------------- server/proxy.go | 8 +-- models/proto/tcp/tcp.go => utils/io/io.go | 24 ++++++- .../tcp/tcp_test.go => utils/io/io_test.go | 47 ++++++++++++- utils/pool/buf.go | 17 +++-- utils/version/version.go | 2 +- 10 files changed, 93 insertions(+), 126 deletions(-) delete mode 100644 models/proto/tcp/process.go delete mode 100644 models/proto/tcp/process_test.go rename models/proto/tcp/tcp.go => utils/io/io.go (77%) rename models/proto/tcp/tcp_test.go => utils/io/io_test.go (72%) diff --git a/client/proxy.go b/client/proxy.go index 50b99563..cd7994c6 100644 --- a/client/proxy.go +++ b/client/proxy.go @@ -24,9 +24,9 @@ import ( "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/plugin" - "github.com/fatedier/frp/models/proto/tcp" "github.com/fatedier/frp/models/proto/udp" "github.com/fatedier/frp/utils/errors" + frpIo "github.com/fatedier/frp/utils/io" "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" ) @@ -277,14 +277,14 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin. ) remote = workConn if baseInfo.UseEncryption { - remote, err = tcp.WithEncryption(remote, []byte(config.ClientCommonCfg.PrivilegeToken)) + remote, err = frpIo.WithEncryption(remote, []byte(config.ClientCommonCfg.PrivilegeToken)) if err != nil { workConn.Error("create encryption stream error: %v", err) return } } if baseInfo.UseCompression { - remote = tcp.WithCompression(remote) + remote = frpIo.WithCompression(remote) } if proxyPlugin != nil { @@ -302,7 +302,7 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin. workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(), localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String()) - tcp.Join(localConn, remote) + frpIo.Join(localConn, remote) workConn.Debug("join connections closed") } } diff --git a/models/plugin/http_proxy.go b/models/plugin/http_proxy.go index 9c51e6f5..ceebb983 100644 --- a/models/plugin/http_proxy.go +++ b/models/plugin/http_proxy.go @@ -23,8 +23,8 @@ import ( "strings" "sync" - "github.com/fatedier/frp/models/proto/tcp" "github.com/fatedier/frp/utils/errors" + frpIo "github.com/fatedier/frp/utils/io" frpNet "github.com/fatedier/frp/utils/net" ) @@ -177,7 +177,7 @@ func (hp *HttpProxy) ConnectHandler(rw http.ResponseWriter, req *http.Request) { } client.Write([]byte("HTTP/1.0 200 OK\r\n\r\n")) - go tcp.Join(remote, client) + go frpIo.Join(remote, client) } func (hp *HttpProxy) Auth(rw http.ResponseWriter, req *http.Request) bool { diff --git a/models/plugin/unix_domain_socket.go b/models/plugin/unix_domain_socket.go index 5fd83606..19e06f8a 100644 --- a/models/plugin/unix_domain_socket.go +++ b/models/plugin/unix_domain_socket.go @@ -19,7 +19,7 @@ import ( "io" "net" - "github.com/fatedier/frp/models/proto/tcp" + frpIo "github.com/fatedier/frp/utils/io" ) const PluginUnixDomainSocket = "unix_domain_socket" @@ -57,7 +57,7 @@ func (uds *UnixDomainSocketPlugin) Handle(conn io.ReadWriteCloser) { return } - tcp.Join(localConn, conn) + frpIo.Join(localConn, conn) } func (uds *UnixDomainSocketPlugin) Name() string { diff --git a/models/proto/tcp/process.go b/models/proto/tcp/process.go deleted file mode 100644 index b7b00777..00000000 --- a/models/proto/tcp/process.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2016 fatedier, fatedier@gmail.com -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tcp - -import ( - "io" - "sync" -) - -// Join two io.ReadWriteCloser and do some operations. -func Join(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) (inCount int64, outCount int64) { - var wait sync.WaitGroup - pipe := func(to io.ReadWriteCloser, from io.ReadWriteCloser, count *int64) { - defer to.Close() - defer from.Close() - defer wait.Done() - - *count, _ = io.Copy(to, from) - } - - wait.Add(2) - go pipe(c1, c2, &inCount) - go pipe(c2, c1, &outCount) - wait.Wait() - return -} diff --git a/models/proto/tcp/process_test.go b/models/proto/tcp/process_test.go deleted file mode 100644 index dfeea309..00000000 --- a/models/proto/tcp/process_test.go +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2017 fatedier, fatedier@gmail.com -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tcp - -import ( - "io" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestJoin(t *testing.T) { - assert := assert.New(t) - - var ( - n int - err error - ) - text1 := "A document that gives tips for writing clear, idiomatic Go code. A must read for any new Go programmer. It augments the tour and the language specification, both of which should be read first." - text2 := "A document that specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine." - - // Forward bytes directly. - pr, pw := io.Pipe() - pr2, pw2 := io.Pipe() - pr3, pw3 := io.Pipe() - pr4, pw4 := io.Pipe() - - conn1 := WrapReadWriteCloser(pr, pw2, nil) - conn2 := WrapReadWriteCloser(pr2, pw, nil) - conn3 := WrapReadWriteCloser(pr3, pw4, nil) - conn4 := WrapReadWriteCloser(pr4, pw3, nil) - - go func() { - Join(conn2, conn3) - }() - - buf1 := make([]byte, 1024) - buf2 := make([]byte, 1024) - - conn1.Write([]byte(text1)) - conn4.Write([]byte(text2)) - - n, err = conn4.Read(buf1) - assert.NoError(err) - assert.Equal(text1, string(buf1[:n])) - - n, err = conn1.Read(buf2) - assert.NoError(err) - assert.Equal(text2, string(buf2[:n])) - - conn1.Close() - conn2.Close() - conn3.Close() - conn4.Close() -} diff --git a/server/proxy.go b/server/proxy.go index 79ebf7af..6751e245 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -24,9 +24,9 @@ import ( "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" - "github.com/fatedier/frp/models/proto/tcp" "github.com/fatedier/frp/models/proto/udp" "github.com/fatedier/frp/utils/errors" + frpIo "github.com/fatedier/frp/utils/io" "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/vhost" @@ -461,20 +461,20 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn) { var local io.ReadWriteCloser = workConn cfg := pxy.GetConf().GetBaseInfo() if cfg.UseEncryption { - local, err = tcp.WithEncryption(local, []byte(config.ServerCommonCfg.PrivilegeToken)) + local, err = frpIo.WithEncryption(local, []byte(config.ServerCommonCfg.PrivilegeToken)) if err != nil { pxy.Error("create encryption stream error: %v", err) return } } if cfg.UseCompression { - local = tcp.WithCompression(local) + local = frpIo.WithCompression(local) } pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(), workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String()) StatsOpenConnection(pxy.GetName()) - inCount, outCount := tcp.Join(local, userConn) + inCount, outCount := frpIo.Join(local, userConn) StatsCloseConnection(pxy.GetName()) StatsAddTrafficIn(pxy.GetName(), inCount) StatsAddTrafficOut(pxy.GetName(), outCount) diff --git a/models/proto/tcp/tcp.go b/utils/io/io.go similarity index 77% rename from models/proto/tcp/tcp.go rename to utils/io/io.go index e718714a..68d932c7 100644 --- a/models/proto/tcp/tcp.go +++ b/utils/io/io.go @@ -12,16 +12,38 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tcp +package io import ( "io" + "sync" "github.com/golang/snappy" "github.com/fatedier/frp/utils/crypto" + "github.com/fatedier/frp/utils/pool" ) +// Join two io.ReadWriteCloser and do some operations. +func Join(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) (inCount int64, outCount int64) { + var wait sync.WaitGroup + pipe := func(to io.ReadWriteCloser, from io.ReadWriteCloser, count *int64) { + defer to.Close() + defer from.Close() + defer wait.Done() + + buf := pool.GetBuf(16 * 1024) + defer pool.PutBuf(buf) + *count, _ = io.CopyBuffer(to, from, buf) + } + + wait.Add(2) + go pipe(c1, c2, &inCount) + go pipe(c2, c1, &outCount) + wait.Wait() + return +} + func WithEncryption(rwc io.ReadWriteCloser, key []byte) (io.ReadWriteCloser, error) { w, err := crypto.NewWriter(rwc, key) if err != nil { diff --git a/models/proto/tcp/tcp_test.go b/utils/io/io_test.go similarity index 72% rename from models/proto/tcp/tcp_test.go rename to utils/io/io_test.go index e58d6707..f12b651b 100644 --- a/models/proto/tcp/tcp_test.go +++ b/utils/io/io_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tcp +package io import ( "io" @@ -21,6 +21,51 @@ import ( "github.com/stretchr/testify/assert" ) +func TestJoin(t *testing.T) { + assert := assert.New(t) + + var ( + n int + err error + ) + text1 := "A document that gives tips for writing clear, idiomatic Go code. A must read for any new Go programmer. It augments the tour and the language specification, both of which should be read first." + text2 := "A document that specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine." + + // Forward bytes directly. + pr, pw := io.Pipe() + pr2, pw2 := io.Pipe() + pr3, pw3 := io.Pipe() + pr4, pw4 := io.Pipe() + + conn1 := WrapReadWriteCloser(pr, pw2, nil) + conn2 := WrapReadWriteCloser(pr2, pw, nil) + conn3 := WrapReadWriteCloser(pr3, pw4, nil) + conn4 := WrapReadWriteCloser(pr4, pw3, nil) + + go func() { + Join(conn2, conn3) + }() + + buf1 := make([]byte, 1024) + buf2 := make([]byte, 1024) + + conn1.Write([]byte(text1)) + conn4.Write([]byte(text2)) + + n, err = conn4.Read(buf1) + assert.NoError(err) + assert.Equal(text1, string(buf1[:n])) + + n, err = conn1.Read(buf2) + assert.NoError(err) + assert.Equal(text2, string(buf2[:n])) + + conn1.Close() + conn2.Close() + conn3.Close() + conn4.Close() +} + func TestWithCompression(t *testing.T) { assert := assert.New(t) diff --git a/utils/pool/buf.go b/utils/pool/buf.go index 5b7c6d2a..bff58a91 100644 --- a/utils/pool/buf.go +++ b/utils/pool/buf.go @@ -17,15 +17,18 @@ package pool import "sync" var ( - bufPool5k sync.Pool - bufPool2k sync.Pool - bufPool1k sync.Pool - bufPool sync.Pool + bufPool16k sync.Pool + bufPool5k sync.Pool + bufPool2k sync.Pool + bufPool1k sync.Pool + bufPool sync.Pool ) func GetBuf(size int) []byte { var x interface{} - if size >= 5*1024 { + if size >= 16*1024 { + x = bufPool16k.Get() + } else if size >= 5*1024 { x = bufPool5k.Get() } else if size >= 2*1024 { x = bufPool2k.Get() @@ -46,7 +49,9 @@ func GetBuf(size int) []byte { func PutBuf(buf []byte) { size := cap(buf) - if size >= 5*1024 { + if size >= 16*1024 { + bufPool16k.Put(buf) + } else if size >= 5*1024 { bufPool5k.Put(buf) } else if size >= 2*1024 { bufPool2k.Put(buf) diff --git a/utils/version/version.go b/utils/version/version.go index f12b387a..1a1e749f 100644 --- a/utils/version/version.go +++ b/utils/version/version.go @@ -19,7 +19,7 @@ import ( "strings" ) -var version string = "0.11.0" +var version string = "0.12.0" func Full() string { return version