From 8f5f0b0a9a7766450205a82697d1d0476f772588 Mon Sep 17 00:00:00 2001 From: fatedier Date: Fri, 29 Jul 2016 23:08:00 +0800 Subject: [PATCH] all: new feature connection pool --- conf/frpc.ini | 3 + conf/frps.ini | 2 + src/frp/cmd/frpc/control.go | 5 +- src/frp/cmd/frps/control.go | 7 +++ src/frp/models/client/config.go | 13 ++++- src/frp/models/config/config.go | 1 + src/frp/models/metric/server.go | 27 ++++++--- src/frp/models/msg/msg.go | 1 + src/frp/models/server/config.go | 9 +++ src/frp/models/server/server.go | 98 +++++++++++++++++++++++++-------- src/frp/utils/conn/conn.go | 39 ++++++++++++- 11 files changed, 170 insertions(+), 35 deletions(-) diff --git a/conf/frpc.ini b/conf/frpc.ini index bbab6d09..d70760d9 100644 --- a/conf/frpc.ini +++ b/conf/frpc.ini @@ -22,6 +22,8 @@ local_port = 22 use_encryption = true # default is false use_gzip = false +# connections will be established in advance, default value is zero +pool_count = 10 # Resolve your domain names to [server_addr] so you can use http://web01.yourdomain.com to browse web01 and http://web02.yourdomain.com to browse web02, the domains are set in frps.ini [web01] @@ -29,6 +31,7 @@ type = http local_ip = 127.0.0.1 local_port = 80 use_gzip = true +pool_count = 20 [web02] type = http diff --git a/conf/frps.ini b/conf/frps.ini index aada47bf..f4752b4c 100644 --- a/conf/frps.ini +++ b/conf/frps.ini @@ -15,6 +15,8 @@ log_max_days = 3 # if you enable privilege mode, frpc can create a proxy without pre-configure in frps when privilege_token is correct privilege_mode = true privilege_token = 12345678 +# pool_count in each proxy will change to max_pool_count if they exceed the maximum value +max_pool_count = 100 # ssh is the proxy name, client will use this name and auth_token to connect to server [ssh] diff --git a/src/frp/cmd/frpc/control.go b/src/frp/cmd/frpc/control.go index 9e8c3e62..2b2a7eec 100644 --- a/src/frp/cmd/frpc/control.go +++ b/src/frp/cmd/frpc/control.go @@ -142,6 +142,7 @@ func loginToServer(cli *client.ProxyClient) (c *conn.Conn, err error) { ProxyName: cli.Name, UseEncryption: cli.UseEncryption, UseGzip: cli.UseGzip, + PoolCount: cli.PoolCount, PrivilegeMode: cli.PrivilegeMode, ProxyType: cli.Type, Timestamp: nowTime, @@ -181,7 +182,7 @@ func loginToServer(cli *client.ProxyClient) (c *conn.Conn, err error) { return c, fmt.Errorf("%s", ctlRes.Msg) } - log.Debug("ProxyName [%s], connect to server [%s:%d] success!", cli.Name, client.ServerAddr, client.ServerPort) + log.Info("ProxyName [%s], connect to server [%s:%d] success!", cli.Name, client.ServerAddr, client.ServerPort) return } @@ -199,5 +200,5 @@ func heartbeatSender(c *conn.Conn, msgSendChan chan interface{}) { break } } - log.Debug("Heartbeat goroutine exit") + log.Info("Heartbeat goroutine exit") } diff --git a/src/frp/cmd/frps/control.go b/src/frp/cmd/frps/control.go index 61660bfa..f51b905a 100644 --- a/src/frp/cmd/frps/control.go +++ b/src/frp/cmd/frps/control.go @@ -276,6 +276,13 @@ func doLogin(req *msg.ControlReq, c *conn.Conn) (ret int64, info string) { // set infomations from frpc s.UseEncryption = req.UseEncryption s.UseGzip = req.UseGzip + if req.PoolCount > server.MaxPoolCount { + s.PoolCount = server.MaxPoolCount + } else if req.PoolCount < 0 { + s.PoolCount = 0 + } else { + s.PoolCount = req.PoolCount + } // start proxy and listen for user connections, no block err := s.Start(c) diff --git a/src/frp/models/client/config.go b/src/frp/models/client/config.go index 0d942e95..7269227d 100644 --- a/src/frp/models/client/config.go +++ b/src/frp/models/client/config.go @@ -147,10 +147,21 @@ func LoadConf(confFile string) (err error) { proxyClient.PrivilegeMode = true } + // pool_count + proxyClient.PoolCount = 0 + tmpStr, ok = section["pool_count"] + if ok { + tmpInt, err := strconv.ParseInt(tmpStr, 10, 64) + if err != nil || tmpInt < 0 { + return fmt.Errorf("Parse conf error: proxy [%s] pool_count error", proxyClient.Name) + } + proxyClient.PoolCount = tmpInt + } + // configures used in privilege mode if proxyClient.PrivilegeMode == true { if PrivilegeToken == "" { - return fmt.Errorf("Parse conf error: proxy [%s] privilege_key must be set when privilege_mode = true", proxyClient.Name) + return fmt.Errorf("Parse conf error: proxy [%s] privilege_token must be set when privilege_mode = true", proxyClient.Name) } else { proxyClient.PrivilegeToken = PrivilegeToken } diff --git a/src/frp/models/config/config.go b/src/frp/models/config/config.go index 14200eb4..f7cc5098 100644 --- a/src/frp/models/config/config.go +++ b/src/frp/models/config/config.go @@ -22,4 +22,5 @@ type BaseConf struct { UseGzip bool PrivilegeMode bool PrivilegeToken string + PoolCount int64 } diff --git a/src/frp/models/metric/server.go b/src/frp/models/metric/server.go index 9f6585df..49920b8a 100644 --- a/src/frp/models/metric/server.go +++ b/src/frp/models/metric/server.go @@ -15,7 +15,6 @@ package metric import ( - "encoding/json" "sort" "sync" "time" @@ -64,15 +63,29 @@ func init() { ServerMetricInfoMap = make(map[string]*ServerMetric) } +func (s *ServerMetric) clone() *ServerMetric { + copy := *s + copy.CustomDomains = make([]string, len(s.CustomDomains)) + var i int + for i = range copy.CustomDomains { + copy.CustomDomains[i] = s.CustomDomains[i] + } + + copy.Daily = make([]*DailyServerStats, len(s.Daily)) + for i = range copy.Daily { + tmpDaily := *s.Daily[i] + copy.Daily[i] = &tmpDaily + } + return © +} + func GetAllProxyMetrics() []*ServerMetric { result := make(ServerMetricList, 0) smMutex.RLock() for _, metric := range ServerMetricInfoMap { metric.mutex.RLock() - byteBuf, _ := json.Marshal(metric) + tmpMetric := metric.clone() metric.mutex.RUnlock() - tmpMetric := &ServerMetric{} - json.Unmarshal(byteBuf, &tmpMetric) result = append(result, tmpMetric) } smMutex.RUnlock() @@ -88,9 +101,9 @@ func GetProxyMetrics(proxyName string) *ServerMetric { defer smMutex.RUnlock() metric, ok := ServerMetricInfoMap[proxyName] if ok { - byteBuf, _ := json.Marshal(metric) - tmpMetric := &ServerMetric{} - json.Unmarshal(byteBuf, &tmpMetric) + metric.mutex.RLock() + tmpMetric := metric.clone() + metric.mutex.RUnlock() return tmpMetric } else { return nil diff --git a/src/frp/models/msg/msg.go b/src/frp/models/msg/msg.go index e89bce1c..55590dda 100644 --- a/src/frp/models/msg/msg.go +++ b/src/frp/models/msg/msg.go @@ -26,6 +26,7 @@ type ControlReq struct { AuthKey string `json:"auth_key"` UseEncryption bool `json:"use_encryption"` UseGzip bool `json:"use_gzip"` + PoolCount int64 `json:"pool_count"` // configures used if privilege_mode is enabled PrivilegeMode bool `json:"privilege_mode"` diff --git a/src/frp/models/server/config.go b/src/frp/models/server/config.go index a914e197..67f6488e 100644 --- a/src/frp/models/server/config.go +++ b/src/frp/models/server/config.go @@ -42,6 +42,7 @@ var ( LogMaxDays int64 = 3 PrivilegeMode bool = false PrivilegeToken string = "" + MaxPoolCount int64 = 100 HeartBeatTimeout int64 = 90 UserConnTimeout int64 = 10 @@ -155,6 +156,14 @@ func loadCommonConf(confFile string) error { return fmt.Errorf("Parse conf error: privilege_token must be set if privilege_mode is enabled") } } + + tmpStr, ok = conf.Get("common", "max_pool_count") + if ok { + v, err := strconv.ParseInt(tmpStr, 10, 64) + if err == nil && v >= 0 { + MaxPoolCount = v + } + } return nil } diff --git a/src/frp/models/server/server.go b/src/frp/models/server/server.go index 139c9899..99aff28e 100644 --- a/src/frp/models/server/server.go +++ b/src/frp/models/server/server.go @@ -43,7 +43,8 @@ type ProxyServer struct { listeners []Listener // accept new connection from remote users ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel workConnChan chan *conn.Conn // get new work conns from control goroutine - mutex sync.Mutex + mutex sync.RWMutex + closeChan chan struct{} // for notify other goroutines that the proxy is closed by close this channel } func NewProxyServer() (p *ProxyServer) { @@ -71,9 +72,10 @@ func (p *ProxyServer) Init() { p.Lock() p.Status = consts.Idle metric.SetStatus(p.Name, p.Status) - p.workConnChan = make(chan *conn.Conn, 100) - p.ctlMsgChan = make(chan int64) + p.workConnChan = make(chan *conn.Conn, p.PoolCount+10) + p.ctlMsgChan = make(chan int64, p.PoolCount+10) p.listeners = make([]Listener, 0) + p.closeChan = make(chan struct{}) p.Unlock() } @@ -134,6 +136,11 @@ func (p *ProxyServer) Start(c *conn.Conn) (err error) { p.Unlock() metric.SetStatus(p.Name, p.Status) + // create connection pool if needed + if p.PoolCount > 0 { + go p.connectionPoolManager(p.closeChan) + } + // start a goroutine for every listener to accept user connection for _, listener := range p.listeners { go func(l Listener) { @@ -153,7 +160,7 @@ func (p *ProxyServer) Start(c *conn.Conn) (err error) { return } - // start another goroutine for join two conns from frpc and user + // start another goroutine for join two connections between frpc and user go func() { workConn, err := p.getWorkConn() if err != nil { @@ -161,14 +168,14 @@ func (p *ProxyServer) Start(c *conn.Conn) (err error) { } userConn := c - // msg will transfer to another without modifying + // message will be transferred to another without modifying // l means local, r means remote log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", workConn.GetLocalAddr(), workConn.GetRemoteAddr(), userConn.GetLocalAddr(), userConn.GetRemoteAddr()) + metric.OpenConnection(p.Name) needRecord := true go msg.JoinMore(userConn, workConn, p.BaseConf, needRecord) - metric.OpenConnection(p.Name) }() } }(listener) @@ -187,6 +194,7 @@ func (p *ProxyServer) Close() { } close(p.ctlMsgChan) close(p.workConnChan) + close(p.closeChan) if p.CtlConn != nil { p.CtlConn.Close() } @@ -210,7 +218,11 @@ func (p *ProxyServer) WaitUserConn() (closeFlag bool) { } func (p *ProxyServer) RegisterNewWorkConn(c *conn.Conn) { - p.workConnChan <- c + select { + case p.workConnChan <- c: + default: + log.Debug("ProxyName [%s], workConnChan is full, so close this work connection", p.Name) + } } // When frps get one user connection, we get one work connection from the pool and return it. @@ -219,30 +231,72 @@ func (p *ProxyServer) RegisterNewWorkConn(c *conn.Conn) { // return an error if wait timeout func (p *ProxyServer) getWorkConn() (workConn *conn.Conn, err error) { var ok bool - // get a work connection from the pool - select { - case workConn, ok = <-p.workConnChan: - if !ok { - err = fmt.Errorf("ProxyName [%s], no work connections available, control is closing", p.Name) - return - } - default: - // no work connections available in the poll, send message to frpc to get one - p.ctlMsgChan <- 1 - + for { select { case workConn, ok = <-p.workConnChan: if !ok { err = fmt.Errorf("ProxyName [%s], no work connections available, control is closing", p.Name) return } + default: + // no work connections available in the poll, send message to frpc to get more + p.ctlMsgChan <- 1 - case <-time.After(time.Duration(UserConnTimeout) * time.Second): - log.Warn("ProxyName [%s], timeout trying to get work connection", p.Name) - err = fmt.Errorf("ProxyName [%s], timeout trying to get work connection", p.Name) - return + select { + case workConn, ok = <-p.workConnChan: + if !ok { + err = fmt.Errorf("ProxyName [%s], no work connections available, control is closing", p.Name) + return + } + + case <-time.After(time.Duration(UserConnTimeout) * time.Second): + log.Warn("ProxyName [%s], timeout trying to get work connection", p.Name) + err = fmt.Errorf("ProxyName [%s], timeout trying to get work connection", p.Name) + return + } + } + + // if connection pool is not used, we don't check the status + // function CheckClosed will consume at least 1 millisecond if the connection isn't closed + if p.PoolCount == 0 || !workConn.CheckClosed() { + break + } else { + log.Warn("ProxyName [%s], connection got from pool, but it's already closed", p.Name) } } return } + +func (p *ProxyServer) connectionPoolManager(closeCh <-chan struct{}) { + for { + // check if we need more work connections and send messages to frpc to get more + time.Sleep(time.Duration(2) * time.Second) + select { + // if the channel closed, it means the proxy is closed, so just return + case <-closeCh: + log.Info("ProxyName [%s], connectionPoolManager exit", p.Name) + return + default: + curWorkConnNum := int64(len(p.workConnChan)) + diff := p.PoolCount - curWorkConnNum + if diff > 0 { + if diff < p.PoolCount/5 { + diff = p.PoolCount*4/5 + 1 + } else if diff < p.PoolCount/2 { + diff = p.PoolCount/4 + 1 + } else if diff < p.PoolCount*4/5 { + diff = p.PoolCount/5 + 1 + } else { + diff = p.PoolCount/10 + 1 + } + if diff+curWorkConnNum > p.PoolCount { + diff = p.PoolCount - curWorkConnNum + } + for i := 0; i < int(diff); i++ { + p.ctlMsgChan <- 1 + } + } + } + } +} diff --git a/src/frp/utils/conn/conn.go b/src/frp/utils/conn/conn.go index ed330f68..0bfe9648 100644 --- a/src/frp/utils/conn/conn.go +++ b/src/frp/utils/conn/conn.go @@ -145,8 +145,11 @@ func (c *Conn) Write(content string) (err error) { } func (c *Conn) SetDeadline(t time.Time) error { - err := c.TcpConn.SetDeadline(t) - return err + return c.TcpConn.SetDeadline(t) +} + +func (c *Conn) SetReadDeadline(t time.Time) error { + return c.TcpConn.SetReadDeadline(t) } func (c *Conn) Close() { @@ -160,7 +163,37 @@ func (c *Conn) Close() { func (c *Conn) IsClosed() (closeFlag bool) { c.mutex.RLock() + defer c.mutex.RUnlock() closeFlag = c.closeFlag - c.mutex.RUnlock() return } + +// when you call this function, you should make sure that +// remote client won't send any bytes to this socket +func (c *Conn) CheckClosed() bool { + c.mutex.RLock() + if c.closeFlag { + return true + } + c.mutex.RUnlock() + + // err := c.TcpConn.SetReadDeadline(time.Now().Add(100 * time.Microsecond)) + err := c.TcpConn.SetReadDeadline(time.Now().Add(time.Millisecond)) + if err != nil { + c.Close() + return true + } + + var tmp []byte = make([]byte, 1) + _, err = c.TcpConn.Read(tmp) + if err == io.EOF { + return true + } + + err = c.TcpConn.SetReadDeadline(time.Time{}) + if err != nil { + c.Close() + return true + } + return false +}