diff --git a/server/nathole.go b/models/nathole/nathole.go similarity index 99% rename from server/nathole.go rename to models/nathole/nathole.go index cc8339f4..1e120ae2 100644 --- a/server/nathole.go +++ b/models/nathole/nathole.go @@ -1,4 +1,4 @@ -package server +package nathole import ( "bytes" diff --git a/server/control.go b/server/control.go index 162f6e5b..90a35844 100644 --- a/server/control.go +++ b/server/control.go @@ -26,6 +26,9 @@ import ( "github.com/fatedier/frp/models/consts" frpErr "github.com/fatedier/frp/models/errors" "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/server/controller" + "github.com/fatedier/frp/server/proxy" + "github.com/fatedier/frp/server/stats" "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/version" @@ -34,9 +37,53 @@ import ( "github.com/fatedier/golib/errors" ) +type ControlManager struct { + // controls indexed by run id + ctlsByRunId map[string]*Control + + mu sync.RWMutex +} + +func NewControlManager() *ControlManager { + return &ControlManager{ + ctlsByRunId: make(map[string]*Control), + } +} + +func (cm *ControlManager) Add(runId string, ctl *Control) (oldCtl *Control) { + cm.mu.Lock() + defer cm.mu.Unlock() + + oldCtl, ok := cm.ctlsByRunId[runId] + if ok { + oldCtl.Replaced(ctl) + } + cm.ctlsByRunId[runId] = ctl + return +} + +func (cm *ControlManager) Del(runId string) { + cm.mu.Lock() + defer cm.mu.Unlock() + delete(cm.ctlsByRunId, runId) +} + +func (cm *ControlManager) GetById(runId string) (ctl *Control, ok bool) { + cm.mu.RLock() + defer cm.mu.RUnlock() + ctl, ok = cm.ctlsByRunId[runId] + return +} + type Control struct { // all resource managers and controllers - rc *ResourceController + rc *controller.ResourceController + + // proxy manager + pxyManager *proxy.ProxyManager + + // stats collector to store stats info of clients and proxies + statsCollector stats.Collector // login message loginMsg *msg.Login @@ -54,7 +101,7 @@ type Control struct { workConnCh chan net.Conn // proxies in one client - proxies map[string]Proxy + proxies map[string]proxy.Proxy // pool count poolCount int @@ -81,15 +128,19 @@ type Control struct { mu sync.RWMutex } -func NewControl(rc *ResourceController, ctlConn net.Conn, loginMsg *msg.Login) *Control { +func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManager, + statsCollector stats.Collector, ctlConn net.Conn, loginMsg *msg.Login) *Control { + return &Control{ rc: rc, + pxyManager: pxyManager, + statsCollector: statsCollector, conn: ctlConn, loginMsg: loginMsg, sendCh: make(chan msg.Message, 10), readCh: make(chan msg.Message, 10), workConnCh: make(chan net.Conn, loginMsg.PoolCount+10), - proxies: make(map[string]Proxy), + proxies: make(map[string]proxy.Proxy), poolCount: loginMsg.PoolCount, portsUsedNum: 0, lastPing: time.Now(), @@ -284,15 +335,22 @@ func (ctl *Control) stoper() { for _, pxy := range ctl.proxies { pxy.Close() - ctl.rc.PxyManager.Del(pxy.GetName()) - StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType) + ctl.pxyManager.Del(pxy.GetName()) + ctl.statsCollector.Mark(stats.TypeCloseProxy, &stats.CloseProxyPayload{ + Name: pxy.GetName(), + ProxyType: pxy.GetConf().GetBaseInfo().ProxyType, + }) } ctl.allShutdown.Done() - ctl.rc.CtlManager.Del(ctl.runId) ctl.conn.Info("client exit success") - StatsCloseClient() + ctl.statsCollector.Mark(stats.TypeCloseClient, &stats.CloseClientPayload{}) +} + +// block until Control closed +func (ctl *Control) WaitClosed() { + ctl.allShutdown.WaitDone() } func (ctl *Control) manager() { @@ -334,7 +392,10 @@ func (ctl *Control) manager() { } else { resp.RemoteAddr = remoteAddr ctl.conn.Info("new proxy [%s] success", m.ProxyName) - StatsNewProxy(m.ProxyName, m.ProxyType) + ctl.statsCollector.Mark(stats.TypeNewProxy, &stats.NewProxyPayload{ + Name: m.ProxyName, + ProxyType: m.ProxyType, + }) } ctl.sendCh <- resp case *msg.CloseProxy: @@ -359,7 +420,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err // NewProxy will return a interface Proxy. // In fact it create different proxies by different proxy type, we just call run() here. - pxy, err := NewProxy(ctl.runId, ctl.rc, ctl.poolCount, ctl.GetWorkConn, pxyConf) + pxy, err := proxy.NewProxy(ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, ctl.GetWorkConn, pxyConf) if err != nil { return remoteAddr, err } @@ -394,7 +455,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err } }() - err = ctl.rc.PxyManager.Add(pxyMsg.ProxyName, pxy) + err = ctl.pxyManager.Add(pxyMsg.ProxyName, pxy) if err != nil { return } @@ -417,10 +478,13 @@ func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) { ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum() } pxy.Close() - ctl.rc.PxyManager.Del(pxy.GetName()) + ctl.pxyManager.Del(pxy.GetName()) delete(ctl.proxies, closeMsg.ProxyName) ctl.mu.Unlock() - StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType) + ctl.statsCollector.Mark(stats.TypeCloseProxy, &stats.CloseProxyPayload{ + Name: pxy.GetName(), + ProxyType: pxy.GetConf().GetBaseInfo().ProxyType, + }) return } diff --git a/server/controller/resource.go b/server/controller/resource.go new file mode 100644 index 00000000..fce8b3e2 --- /dev/null +++ b/server/controller/resource.go @@ -0,0 +1,46 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "github.com/fatedier/frp/models/nathole" + "github.com/fatedier/frp/server/group" + "github.com/fatedier/frp/server/ports" + "github.com/fatedier/frp/utils/vhost" +) + +// All resource managers and controllers +type ResourceController struct { + // Manage all visitor listeners + VisitorManager *VisitorManager + + // Tcp Group Controller + TcpGroupCtl *group.TcpGroupCtl + + // Manage all tcp ports + TcpPortManager *ports.PortManager + + // Manage all udp ports + UdpPortManager *ports.PortManager + + // For http proxies, forwarding http requests + HttpReverseProxy *vhost.HttpReverseProxy + + // For https proxies, route requests to different clients by hostname and other infomation + VhostHttpsMuxer *vhost.HttpsMuxer + + // Controller for nat hole connections + NatHoleController *nathole.NatHoleController +} diff --git a/server/manager.go b/server/controller/visitor.go similarity index 62% rename from server/manager.go rename to server/controller/visitor.go index 50b3d9bf..ea8a53e8 100644 --- a/server/manager.go +++ b/server/controller/visitor.go @@ -1,4 +1,4 @@ -// Copyright 2017 fatedier, fatedier@gmail.com +// Copyright 2019 fatedier, fatedier@gmail.com // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package server +package controller import ( "fmt" @@ -25,81 +25,6 @@ import ( frpIo "github.com/fatedier/golib/io" ) -type ControlManager struct { - // controls indexed by run id - ctlsByRunId map[string]*Control - - mu sync.RWMutex -} - -func NewControlManager() *ControlManager { - return &ControlManager{ - ctlsByRunId: make(map[string]*Control), - } -} - -func (cm *ControlManager) Add(runId string, ctl *Control) (oldCtl *Control) { - cm.mu.Lock() - defer cm.mu.Unlock() - - oldCtl, ok := cm.ctlsByRunId[runId] - if ok { - oldCtl.Replaced(ctl) - } - cm.ctlsByRunId[runId] = ctl - return -} - -func (cm *ControlManager) Del(runId string) { - cm.mu.Lock() - defer cm.mu.Unlock() - delete(cm.ctlsByRunId, runId) -} - -func (cm *ControlManager) GetById(runId string) (ctl *Control, ok bool) { - cm.mu.RLock() - defer cm.mu.RUnlock() - ctl, ok = cm.ctlsByRunId[runId] - return -} - -type ProxyManager struct { - // proxies indexed by proxy name - pxys map[string]Proxy - - mu sync.RWMutex -} - -func NewProxyManager() *ProxyManager { - return &ProxyManager{ - pxys: make(map[string]Proxy), - } -} - -func (pm *ProxyManager) Add(name string, pxy Proxy) error { - pm.mu.Lock() - defer pm.mu.Unlock() - if _, ok := pm.pxys[name]; ok { - return fmt.Errorf("proxy name [%s] is already in use", name) - } - - pm.pxys[name] = pxy - return nil -} - -func (pm *ProxyManager) Del(name string) { - pm.mu.Lock() - defer pm.mu.Unlock() - delete(pm.pxys, name) -} - -func (pm *ProxyManager) GetByName(name string) (pxy Proxy, ok bool) { - pm.mu.RLock() - defer pm.mu.RUnlock() - pxy, ok = pm.pxys[name] - return -} - // Manager for visitor listeners. type VisitorManager struct { visitorListeners map[string]*frpNet.CustomListener diff --git a/server/dashboard_api.go b/server/dashboard_api.go index 8c931399..cc7e7bb6 100644 --- a/server/dashboard_api.go +++ b/server/dashboard_api.go @@ -66,7 +66,7 @@ func (svr *Service) ApiServerInfo(w http.ResponseWriter, r *http.Request) { log.Info("Http request: [%s]", r.URL.Path) cfg := &g.GlbServerCfg.ServerCommonConf - serverStats := StatsGetServer() + serverStats := svr.statsCollector.GetServer() res = ServerInfoResp{ Version: version.Full(), BindPort: cfg.BindPort, @@ -185,11 +185,11 @@ func (svr *Service) ApiProxyByType(w http.ResponseWriter, r *http.Request) { } func (svr *Service) getProxyStatsByType(proxyType string) (proxyInfos []*ProxyStatsInfo) { - proxyStats := StatsGetProxiesByType(proxyType) + proxyStats := svr.statsCollector.GetProxiesByType(proxyType) proxyInfos = make([]*ProxyStatsInfo, 0, len(proxyStats)) for _, ps := range proxyStats { proxyInfo := &ProxyStatsInfo{} - if pxy, ok := svr.rc.PxyManager.GetByName(ps.Name); ok { + if pxy, ok := svr.pxyManager.GetByName(ps.Name); ok { content, err := json.Marshal(pxy.GetConf()) if err != nil { log.Warn("marshal proxy [%s] conf info error: %v", ps.Name, err) @@ -252,12 +252,12 @@ func (svr *Service) ApiProxyByTypeAndName(w http.ResponseWriter, r *http.Request func (svr *Service) getProxyStatsByTypeAndName(proxyType string, proxyName string) (proxyInfo GetProxyStatsResp) { proxyInfo.Name = proxyName - ps := StatsGetProxiesByTypeAndName(proxyType, proxyName) + ps := svr.statsCollector.GetProxiesByTypeAndName(proxyType, proxyName) if ps == nil { proxyInfo.Code = 1 proxyInfo.Msg = "no proxy info found" } else { - if pxy, ok := svr.rc.PxyManager.GetByName(proxyName); ok { + if pxy, ok := svr.pxyManager.GetByName(proxyName); ok { content, err := json.Marshal(pxy.GetConf()) if err != nil { log.Warn("marshal proxy [%s] conf info error: %v", ps.Name, err) @@ -309,7 +309,7 @@ func (svr *Service) ApiProxyTraffic(w http.ResponseWriter, r *http.Request) { log.Info("Http request: [%s]", r.URL.Path) res.Name = name - proxyTrafficInfo := StatsGetProxyTraffic(name) + proxyTrafficInfo := svr.statsCollector.GetProxyTraffic(name) if proxyTrafficInfo == nil { res.Code = 1 res.Msg = "no proxy info found" diff --git a/server/metric.go b/server/metric.go deleted file mode 100644 index 7b2665f7..00000000 --- a/server/metric.go +++ /dev/null @@ -1,316 +0,0 @@ -// Copyright 2017 fatedier, fatedier@gmail.com -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - "sync" - "time" - - "github.com/fatedier/frp/g" - "github.com/fatedier/frp/utils/log" - "github.com/fatedier/frp/utils/metric" -) - -const ( - ReserveDays = 7 -) - -var globalStats *ServerStatistics - -type ServerStatistics struct { - TotalTrafficIn metric.DateCounter - TotalTrafficOut metric.DateCounter - CurConns metric.Counter - - // counter for clients - ClientCounts metric.Counter - - // counter for proxy types - ProxyTypeCounts map[string]metric.Counter - - // statistics for different proxies - // key is proxy name - ProxyStatistics map[string]*ProxyStatistics - - mu sync.Mutex -} - -type ProxyStatistics struct { - Name string - ProxyType string - TrafficIn metric.DateCounter - TrafficOut metric.DateCounter - CurConns metric.Counter - LastStartTime time.Time - LastCloseTime time.Time -} - -func init() { - globalStats = &ServerStatistics{ - TotalTrafficIn: metric.NewDateCounter(ReserveDays), - TotalTrafficOut: metric.NewDateCounter(ReserveDays), - CurConns: metric.NewCounter(), - - ClientCounts: metric.NewCounter(), - ProxyTypeCounts: make(map[string]metric.Counter), - - ProxyStatistics: make(map[string]*ProxyStatistics), - } - - go func() { - for { - time.Sleep(12 * time.Hour) - log.Debug("start to clear useless proxy statistics data...") - StatsClearUselessInfo() - log.Debug("finish to clear useless proxy statistics data") - } - }() -} - -func StatsClearUselessInfo() { - // To check if there are proxies that closed than 7 days and drop them. - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - for name, data := range globalStats.ProxyStatistics { - if !data.LastCloseTime.IsZero() && time.Since(data.LastCloseTime) > time.Duration(7*24)*time.Hour { - delete(globalStats.ProxyStatistics, name) - log.Trace("clear proxy [%s]'s statistics data, lastCloseTime: [%s]", name, data.LastCloseTime.String()) - } - } -} - -func StatsNewClient() { - if g.GlbServerCfg.DashboardPort != 0 { - globalStats.ClientCounts.Inc(1) - } -} - -func StatsCloseClient() { - if g.GlbServerCfg.DashboardPort != 0 { - globalStats.ClientCounts.Dec(1) - } -} - -func StatsNewProxy(name string, proxyType string) { - if g.GlbServerCfg.DashboardPort != 0 { - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - counter, ok := globalStats.ProxyTypeCounts[proxyType] - if !ok { - counter = metric.NewCounter() - } - counter.Inc(1) - globalStats.ProxyTypeCounts[proxyType] = counter - - proxyStats, ok := globalStats.ProxyStatistics[name] - if !(ok && proxyStats.ProxyType == proxyType) { - proxyStats = &ProxyStatistics{ - Name: name, - ProxyType: proxyType, - CurConns: metric.NewCounter(), - TrafficIn: metric.NewDateCounter(ReserveDays), - TrafficOut: metric.NewDateCounter(ReserveDays), - } - globalStats.ProxyStatistics[name] = proxyStats - } - proxyStats.LastStartTime = time.Now() - } -} - -func StatsCloseProxy(proxyName string, proxyType string) { - if g.GlbServerCfg.DashboardPort != 0 { - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - if counter, ok := globalStats.ProxyTypeCounts[proxyType]; ok { - counter.Dec(1) - } - if proxyStats, ok := globalStats.ProxyStatistics[proxyName]; ok { - proxyStats.LastCloseTime = time.Now() - } - } -} - -func StatsOpenConnection(name string) { - if g.GlbServerCfg.DashboardPort != 0 { - globalStats.CurConns.Inc(1) - - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - proxyStats, ok := globalStats.ProxyStatistics[name] - if ok { - proxyStats.CurConns.Inc(1) - globalStats.ProxyStatistics[name] = proxyStats - } - } -} - -func StatsCloseConnection(name string) { - if g.GlbServerCfg.DashboardPort != 0 { - globalStats.CurConns.Dec(1) - - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - proxyStats, ok := globalStats.ProxyStatistics[name] - if ok { - proxyStats.CurConns.Dec(1) - globalStats.ProxyStatistics[name] = proxyStats - } - } -} - -func StatsAddTrafficIn(name string, trafficIn int64) { - if g.GlbServerCfg.DashboardPort != 0 { - globalStats.TotalTrafficIn.Inc(trafficIn) - - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - - proxyStats, ok := globalStats.ProxyStatistics[name] - if ok { - proxyStats.TrafficIn.Inc(trafficIn) - globalStats.ProxyStatistics[name] = proxyStats - } - } -} - -func StatsAddTrafficOut(name string, trafficOut int64) { - if g.GlbServerCfg.DashboardPort != 0 { - globalStats.TotalTrafficOut.Inc(trafficOut) - - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - - proxyStats, ok := globalStats.ProxyStatistics[name] - if ok { - proxyStats.TrafficOut.Inc(trafficOut) - globalStats.ProxyStatistics[name] = proxyStats - } - } -} - -// Functions for getting server stats. -type ServerStats struct { - TotalTrafficIn int64 - TotalTrafficOut int64 - CurConns int64 - ClientCounts int64 - ProxyTypeCounts map[string]int64 -} - -func StatsGetServer() *ServerStats { - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - s := &ServerStats{ - TotalTrafficIn: globalStats.TotalTrafficIn.TodayCount(), - TotalTrafficOut: globalStats.TotalTrafficOut.TodayCount(), - CurConns: globalStats.CurConns.Count(), - ClientCounts: globalStats.ClientCounts.Count(), - ProxyTypeCounts: make(map[string]int64), - } - for k, v := range globalStats.ProxyTypeCounts { - s.ProxyTypeCounts[k] = v.Count() - } - return s -} - -type ProxyStats struct { - Name string - Type string - TodayTrafficIn int64 - TodayTrafficOut int64 - LastStartTime string - LastCloseTime string - CurConns int64 -} - -func StatsGetProxiesByType(proxyType string) []*ProxyStats { - res := make([]*ProxyStats, 0) - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - - for name, proxyStats := range globalStats.ProxyStatistics { - if proxyStats.ProxyType != proxyType { - continue - } - - ps := &ProxyStats{ - Name: name, - Type: proxyStats.ProxyType, - TodayTrafficIn: proxyStats.TrafficIn.TodayCount(), - TodayTrafficOut: proxyStats.TrafficOut.TodayCount(), - CurConns: proxyStats.CurConns.Count(), - } - if !proxyStats.LastStartTime.IsZero() { - ps.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05") - } - if !proxyStats.LastCloseTime.IsZero() { - ps.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05") - } - res = append(res, ps) - } - return res -} - -func StatsGetProxiesByTypeAndName(proxyType string, proxyName string) (res *ProxyStats) { - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - - for name, proxyStats := range globalStats.ProxyStatistics { - if proxyStats.ProxyType != proxyType { - continue - } - - if name != proxyName { - continue - } - - res = &ProxyStats{ - Name: name, - Type: proxyStats.ProxyType, - TodayTrafficIn: proxyStats.TrafficIn.TodayCount(), - TodayTrafficOut: proxyStats.TrafficOut.TodayCount(), - CurConns: proxyStats.CurConns.Count(), - } - if !proxyStats.LastStartTime.IsZero() { - res.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05") - } - if !proxyStats.LastCloseTime.IsZero() { - res.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05") - } - break - } - return -} - -type ProxyTrafficInfo struct { - Name string - TrafficIn []int64 - TrafficOut []int64 -} - -func StatsGetProxyTraffic(name string) (res *ProxyTrafficInfo) { - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - - proxyStats, ok := globalStats.ProxyStatistics[name] - if ok { - res = &ProxyTrafficInfo{ - Name: name, - } - res.TrafficIn = proxyStats.TrafficIn.GetLastDaysCount(ReserveDays) - res.TrafficOut = proxyStats.TrafficOut.GetLastDaysCount(ReserveDays) - } - return -} diff --git a/server/proxy.go b/server/proxy.go deleted file mode 100644 index 3720c375..00000000 --- a/server/proxy.go +++ /dev/null @@ -1,687 +0,0 @@ -// Copyright 2017 fatedier, fatedier@gmail.com -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - "context" - "fmt" - "io" - "net" - "strings" - "sync" - "time" - - "github.com/fatedier/frp/g" - "github.com/fatedier/frp/models/config" - "github.com/fatedier/frp/models/msg" - "github.com/fatedier/frp/models/proto/udp" - "github.com/fatedier/frp/utils/log" - frpNet "github.com/fatedier/frp/utils/net" - "github.com/fatedier/frp/utils/util" - "github.com/fatedier/frp/utils/vhost" - - "github.com/fatedier/golib/errors" - frpIo "github.com/fatedier/golib/io" -) - -type GetWorkConnFn func() (frpNet.Conn, error) - -type Proxy interface { - Run() (remoteAddr string, err error) - GetName() string - GetConf() config.ProxyConf - GetWorkConnFromPool() (workConn frpNet.Conn, err error) - GetUsedPortsNum() int - Close() - log.Logger -} - -type BaseProxy struct { - name string - rc *ResourceController - listeners []frpNet.Listener - usedPortsNum int - poolCount int - getWorkConnFn GetWorkConnFn - - mu sync.RWMutex - log.Logger -} - -func (pxy *BaseProxy) GetName() string { - return pxy.name -} - -func (pxy *BaseProxy) GetUsedPortsNum() int { - return pxy.usedPortsNum -} - -func (pxy *BaseProxy) Close() { - pxy.Info("proxy closing") - for _, l := range pxy.listeners { - l.Close() - } -} - -func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) { - // try all connections from the pool - for i := 0; i < pxy.poolCount+1; i++ { - if workConn, err = pxy.getWorkConnFn(); err != nil { - pxy.Warn("failed to get work connection: %v", err) - return - } - pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String()) - workConn.AddLogPrefix(pxy.GetName()) - - err := msg.WriteMsg(workConn, &msg.StartWorkConn{ - ProxyName: pxy.GetName(), - }) - if err != nil { - workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i) - workConn.Close() - } else { - break - } - } - - if err != nil { - pxy.Error("try to get work connection failed in the end") - return - } - return -} - -// startListenHandler start a goroutine handler for each listener. -// p: p will just be passed to handler(Proxy, frpNet.Conn). -// handler: each proxy type can set different handler function to deal with connections accepted from listeners. -func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn)) { - for _, listener := range pxy.listeners { - go func(l frpNet.Listener) { - for { - // block - // if listener is closed, err returned - c, err := l.Accept() - if err != nil { - pxy.Info("listener is closed") - return - } - pxy.Debug("get a user connection [%s]", c.RemoteAddr().String()) - go handler(p, c) - } - }(listener) - } -} - -func NewProxy(runId string, rc *ResourceController, poolCount int, getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf) (pxy Proxy, err error) { - basePxy := BaseProxy{ - name: pxyConf.GetBaseInfo().ProxyName, - rc: rc, - listeners: make([]frpNet.Listener, 0), - poolCount: poolCount, - getWorkConnFn: getWorkConnFn, - Logger: log.NewPrefixLogger(runId), - } - switch cfg := pxyConf.(type) { - case *config.TcpProxyConf: - basePxy.usedPortsNum = 1 - pxy = &TcpProxy{ - BaseProxy: basePxy, - cfg: cfg, - } - case *config.HttpProxyConf: - pxy = &HttpProxy{ - BaseProxy: basePxy, - cfg: cfg, - } - case *config.HttpsProxyConf: - pxy = &HttpsProxy{ - BaseProxy: basePxy, - cfg: cfg, - } - case *config.UdpProxyConf: - basePxy.usedPortsNum = 1 - pxy = &UdpProxy{ - BaseProxy: basePxy, - cfg: cfg, - } - case *config.StcpProxyConf: - pxy = &StcpProxy{ - BaseProxy: basePxy, - cfg: cfg, - } - case *config.XtcpProxyConf: - pxy = &XtcpProxy{ - BaseProxy: basePxy, - cfg: cfg, - } - default: - return pxy, fmt.Errorf("proxy type not support") - } - pxy.AddLogPrefix(pxy.GetName()) - return -} - -type TcpProxy struct { - BaseProxy - cfg *config.TcpProxyConf - - realPort int -} - -func (pxy *TcpProxy) Run() (remoteAddr string, err error) { - if pxy.cfg.Group != "" { - l, realPort, errRet := pxy.rc.TcpGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, g.GlbServerCfg.ProxyBindAddr, pxy.cfg.RemotePort) - if errRet != nil { - err = errRet - return - } - defer func() { - if err != nil { - l.Close() - } - }() - pxy.realPort = realPort - listener := frpNet.WrapLogListener(l) - listener.AddLogPrefix(pxy.name) - pxy.listeners = append(pxy.listeners, listener) - pxy.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.Group) - } else { - pxy.realPort, err = pxy.rc.TcpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) - if err != nil { - return - } - defer func() { - if err != nil { - pxy.rc.TcpPortManager.Release(pxy.realPort) - } - }() - listener, errRet := frpNet.ListenTcp(g.GlbServerCfg.ProxyBindAddr, pxy.realPort) - if errRet != nil { - err = errRet - return - } - listener.AddLogPrefix(pxy.name) - pxy.listeners = append(pxy.listeners, listener) - pxy.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort) - } - - pxy.cfg.RemotePort = pxy.realPort - remoteAddr = fmt.Sprintf(":%d", pxy.realPort) - pxy.startListenHandler(pxy, HandleUserTcpConnection) - return -} - -func (pxy *TcpProxy) GetConf() config.ProxyConf { - return pxy.cfg -} - -func (pxy *TcpProxy) Close() { - pxy.BaseProxy.Close() - if pxy.cfg.Group == "" { - pxy.rc.TcpPortManager.Release(pxy.realPort) - } -} - -type HttpProxy struct { - BaseProxy - cfg *config.HttpProxyConf - - closeFuncs []func() -} - -func (pxy *HttpProxy) Run() (remoteAddr string, err error) { - routeConfig := vhost.VhostRouteConfig{ - RewriteHost: pxy.cfg.HostHeaderRewrite, - Headers: pxy.cfg.Headers, - Username: pxy.cfg.HttpUser, - Password: pxy.cfg.HttpPwd, - CreateConnFn: pxy.GetRealConn, - } - - locations := pxy.cfg.Locations - if len(locations) == 0 { - locations = []string{""} - } - - addrs := make([]string, 0) - for _, domain := range pxy.cfg.CustomDomains { - routeConfig.Domain = domain - for _, location := range locations { - routeConfig.Location = location - err = pxy.rc.HttpReverseProxy.Register(routeConfig) - if err != nil { - return - } - tmpDomain := routeConfig.Domain - tmpLocation := routeConfig.Location - addrs = append(addrs, util.CanonicalAddr(tmpDomain, int(g.GlbServerCfg.VhostHttpPort))) - pxy.closeFuncs = append(pxy.closeFuncs, func() { - pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation) - }) - pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location) - } - } - - if pxy.cfg.SubDomain != "" { - routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost - for _, location := range locations { - routeConfig.Location = location - err = pxy.rc.HttpReverseProxy.Register(routeConfig) - if err != nil { - return - } - tmpDomain := routeConfig.Domain - tmpLocation := routeConfig.Location - addrs = append(addrs, util.CanonicalAddr(tmpDomain, g.GlbServerCfg.VhostHttpPort)) - pxy.closeFuncs = append(pxy.closeFuncs, func() { - pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation) - }) - pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location) - } - } - remoteAddr = strings.Join(addrs, ",") - return -} - -func (pxy *HttpProxy) GetConf() config.ProxyConf { - return pxy.cfg -} - -func (pxy *HttpProxy) GetRealConn() (workConn frpNet.Conn, err error) { - tmpConn, errRet := pxy.GetWorkConnFromPool() - if errRet != nil { - err = errRet - return - } - - var rwc io.ReadWriteCloser = tmpConn - if pxy.cfg.UseEncryption { - rwc, err = frpIo.WithEncryption(rwc, []byte(g.GlbServerCfg.Token)) - if err != nil { - pxy.Error("create encryption stream error: %v", err) - return - } - } - if pxy.cfg.UseCompression { - rwc = frpIo.WithCompression(rwc) - } - workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn) - workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn) - StatsOpenConnection(pxy.GetName()) - return -} - -func (pxy *HttpProxy) updateStatsAfterClosedConn(totalRead, totalWrite int64) { - name := pxy.GetName() - StatsCloseConnection(name) - StatsAddTrafficIn(name, totalWrite) - StatsAddTrafficOut(name, totalRead) -} - -func (pxy *HttpProxy) Close() { - pxy.BaseProxy.Close() - for _, closeFn := range pxy.closeFuncs { - closeFn() - } -} - -type HttpsProxy struct { - BaseProxy - cfg *config.HttpsProxyConf -} - -func (pxy *HttpsProxy) Run() (remoteAddr string, err error) { - routeConfig := &vhost.VhostRouteConfig{} - - addrs := make([]string, 0) - for _, domain := range pxy.cfg.CustomDomains { - routeConfig.Domain = domain - l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig) - if errRet != nil { - err = errRet - return - } - l.AddLogPrefix(pxy.name) - pxy.Info("https proxy listen for host [%s]", routeConfig.Domain) - pxy.listeners = append(pxy.listeners, l) - addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, g.GlbServerCfg.VhostHttpsPort)) - } - - if pxy.cfg.SubDomain != "" { - routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost - l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig) - if errRet != nil { - err = errRet - return - } - l.AddLogPrefix(pxy.name) - pxy.Info("https proxy listen for host [%s]", routeConfig.Domain) - pxy.listeners = append(pxy.listeners, l) - addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(g.GlbServerCfg.VhostHttpsPort))) - } - - pxy.startListenHandler(pxy, HandleUserTcpConnection) - remoteAddr = strings.Join(addrs, ",") - return -} - -func (pxy *HttpsProxy) GetConf() config.ProxyConf { - return pxy.cfg -} - -func (pxy *HttpsProxy) Close() { - pxy.BaseProxy.Close() -} - -type StcpProxy struct { - BaseProxy - cfg *config.StcpProxyConf -} - -func (pxy *StcpProxy) Run() (remoteAddr string, err error) { - listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk) - if errRet != nil { - err = errRet - return - } - listener.AddLogPrefix(pxy.name) - pxy.listeners = append(pxy.listeners, listener) - pxy.Info("stcp proxy custom listen success") - - pxy.startListenHandler(pxy, HandleUserTcpConnection) - return -} - -func (pxy *StcpProxy) GetConf() config.ProxyConf { - return pxy.cfg -} - -func (pxy *StcpProxy) Close() { - pxy.BaseProxy.Close() - pxy.rc.VisitorManager.CloseListener(pxy.GetName()) -} - -type XtcpProxy struct { - BaseProxy - cfg *config.XtcpProxyConf - - closeCh chan struct{} -} - -func (pxy *XtcpProxy) Run() (remoteAddr string, err error) { - if pxy.rc.NatHoleController == nil { - pxy.Error("udp port for xtcp is not specified.") - err = fmt.Errorf("xtcp is not supported in frps") - return - } - sidCh := pxy.rc.NatHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk) - go func() { - for { - select { - case <-pxy.closeCh: - break - case sid := <-sidCh: - workConn, errRet := pxy.GetWorkConnFromPool() - if errRet != nil { - continue - } - m := &msg.NatHoleSid{ - Sid: sid, - } - errRet = msg.WriteMsg(workConn, m) - if errRet != nil { - pxy.Warn("write nat hole sid package error, %v", errRet) - } - } - } - }() - return -} - -func (pxy *XtcpProxy) GetConf() config.ProxyConf { - return pxy.cfg -} - -func (pxy *XtcpProxy) Close() { - pxy.BaseProxy.Close() - pxy.rc.NatHoleController.CloseClient(pxy.GetName()) - errors.PanicToError(func() { - close(pxy.closeCh) - }) -} - -type UdpProxy struct { - BaseProxy - cfg *config.UdpProxyConf - - realPort int - - // udpConn is the listener of udp packages - udpConn *net.UDPConn - - // there are always only one workConn at the same time - // get another one if it closed - workConn net.Conn - - // sendCh is used for sending packages to workConn - sendCh chan *msg.UdpPacket - - // readCh is used for reading packages from workConn - readCh chan *msg.UdpPacket - - // checkCloseCh is used for watching if workConn is closed - checkCloseCh chan int - - isClosed bool -} - -func (pxy *UdpProxy) Run() (remoteAddr string, err error) { - pxy.realPort, err = pxy.rc.UdpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) - if err != nil { - return - } - defer func() { - if err != nil { - pxy.rc.UdpPortManager.Release(pxy.realPort) - } - }() - - remoteAddr = fmt.Sprintf(":%d", pxy.realPort) - pxy.cfg.RemotePort = pxy.realPort - addr, errRet := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", g.GlbServerCfg.ProxyBindAddr, pxy.realPort)) - if errRet != nil { - err = errRet - return - } - udpConn, errRet := net.ListenUDP("udp", addr) - if errRet != nil { - err = errRet - pxy.Warn("listen udp port error: %v", err) - return - } - pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort) - - pxy.udpConn = udpConn - pxy.sendCh = make(chan *msg.UdpPacket, 1024) - pxy.readCh = make(chan *msg.UdpPacket, 1024) - pxy.checkCloseCh = make(chan int) - - // read message from workConn, if it returns any error, notify proxy to start a new workConn - workConnReaderFn := func(conn net.Conn) { - for { - var ( - rawMsg msg.Message - errRet error - ) - pxy.Trace("loop waiting message from udp workConn") - // client will send heartbeat in workConn for keeping alive - conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second)) - if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil { - pxy.Warn("read from workConn for udp error: %v", errRet) - conn.Close() - // notify proxy to start a new work connection - // ignore error here, it means the proxy is closed - errors.PanicToError(func() { - pxy.checkCloseCh <- 1 - }) - return - } - conn.SetReadDeadline(time.Time{}) - switch m := rawMsg.(type) { - case *msg.Ping: - pxy.Trace("udp work conn get ping message") - continue - case *msg.UdpPacket: - if errRet := errors.PanicToError(func() { - pxy.Trace("get udp message from workConn: %s", m.Content) - pxy.readCh <- m - StatsAddTrafficOut(pxy.GetName(), int64(len(m.Content))) - }); errRet != nil { - conn.Close() - pxy.Info("reader goroutine for udp work connection closed") - return - } - } - } - } - - // send message to workConn - workConnSenderFn := func(conn net.Conn, ctx context.Context) { - var errRet error - for { - select { - case udpMsg, ok := <-pxy.sendCh: - if !ok { - pxy.Info("sender goroutine for udp work connection closed") - return - } - if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil { - pxy.Info("sender goroutine for udp work connection closed: %v", errRet) - conn.Close() - return - } else { - pxy.Trace("send message to udp workConn: %s", udpMsg.Content) - StatsAddTrafficIn(pxy.GetName(), int64(len(udpMsg.Content))) - continue - } - case <-ctx.Done(): - pxy.Info("sender goroutine for udp work connection closed") - return - } - } - } - - go func() { - // Sleep a while for waiting control send the NewProxyResp to client. - time.Sleep(500 * time.Millisecond) - for { - workConn, err := pxy.GetWorkConnFromPool() - if err != nil { - time.Sleep(1 * time.Second) - // check if proxy is closed - select { - case _, ok := <-pxy.checkCloseCh: - if !ok { - return - } - default: - } - continue - } - // close the old workConn and replac it with a new one - if pxy.workConn != nil { - pxy.workConn.Close() - } - pxy.workConn = workConn - ctx, cancel := context.WithCancel(context.Background()) - go workConnReaderFn(workConn) - go workConnSenderFn(workConn, ctx) - _, ok := <-pxy.checkCloseCh - cancel() - if !ok { - return - } - } - }() - - // Read from user connections and send wrapped udp message to sendCh (forwarded by workConn). - // Client will transfor udp message to local udp service and waiting for response for a while. - // Response will be wrapped to be forwarded by work connection to server. - // Close readCh and sendCh at the end. - go func() { - udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh) - pxy.Close() - }() - return remoteAddr, nil -} - -func (pxy *UdpProxy) GetConf() config.ProxyConf { - return pxy.cfg -} - -func (pxy *UdpProxy) Close() { - pxy.mu.Lock() - defer pxy.mu.Unlock() - if !pxy.isClosed { - pxy.isClosed = true - - pxy.BaseProxy.Close() - if pxy.workConn != nil { - pxy.workConn.Close() - } - pxy.udpConn.Close() - - // all channels only closed here - close(pxy.checkCloseCh) - close(pxy.readCh) - close(pxy.sendCh) - } - pxy.rc.UdpPortManager.Release(pxy.realPort) -} - -// HandleUserTcpConnection is used for incoming tcp user connections. -// It can be used for tcp, http, https type. -func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn) { - defer userConn.Close() - - // try all connections from the pool - workConn, err := pxy.GetWorkConnFromPool() - if err != nil { - return - } - defer workConn.Close() - - var local io.ReadWriteCloser = workConn - cfg := pxy.GetConf().GetBaseInfo() - if cfg.UseEncryption { - local, err = frpIo.WithEncryption(local, []byte(g.GlbServerCfg.Token)) - if err != nil { - pxy.Error("create encryption stream error: %v", err) - return - } - } - if cfg.UseCompression { - local = frpIo.WithCompression(local) - } - pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(), - workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String()) - - StatsOpenConnection(pxy.GetName()) - inCount, outCount := frpIo.Join(local, userConn) - StatsCloseConnection(pxy.GetName()) - StatsAddTrafficIn(pxy.GetName(), inCount) - StatsAddTrafficOut(pxy.GetName(), outCount) - pxy.Debug("join connections closed") -} diff --git a/server/proxy/http.go b/server/proxy/http.go new file mode 100644 index 00000000..7c1b62b6 --- /dev/null +++ b/server/proxy/http.go @@ -0,0 +1,138 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "io" + "strings" + + "github.com/fatedier/frp/g" + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/server/stats" + frpNet "github.com/fatedier/frp/utils/net" + "github.com/fatedier/frp/utils/util" + "github.com/fatedier/frp/utils/vhost" + + frpIo "github.com/fatedier/golib/io" +) + +type HttpProxy struct { + BaseProxy + cfg *config.HttpProxyConf + + closeFuncs []func() +} + +func (pxy *HttpProxy) Run() (remoteAddr string, err error) { + routeConfig := vhost.VhostRouteConfig{ + RewriteHost: pxy.cfg.HostHeaderRewrite, + Headers: pxy.cfg.Headers, + Username: pxy.cfg.HttpUser, + Password: pxy.cfg.HttpPwd, + CreateConnFn: pxy.GetRealConn, + } + + locations := pxy.cfg.Locations + if len(locations) == 0 { + locations = []string{""} + } + + addrs := make([]string, 0) + for _, domain := range pxy.cfg.CustomDomains { + routeConfig.Domain = domain + for _, location := range locations { + routeConfig.Location = location + err = pxy.rc.HttpReverseProxy.Register(routeConfig) + if err != nil { + return + } + tmpDomain := routeConfig.Domain + tmpLocation := routeConfig.Location + addrs = append(addrs, util.CanonicalAddr(tmpDomain, int(g.GlbServerCfg.VhostHttpPort))) + pxy.closeFuncs = append(pxy.closeFuncs, func() { + pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation) + }) + pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location) + } + } + + if pxy.cfg.SubDomain != "" { + routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost + for _, location := range locations { + routeConfig.Location = location + err = pxy.rc.HttpReverseProxy.Register(routeConfig) + if err != nil { + return + } + tmpDomain := routeConfig.Domain + tmpLocation := routeConfig.Location + addrs = append(addrs, util.CanonicalAddr(tmpDomain, g.GlbServerCfg.VhostHttpPort)) + pxy.closeFuncs = append(pxy.closeFuncs, func() { + pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation) + }) + pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location) + } + } + remoteAddr = strings.Join(addrs, ",") + return +} + +func (pxy *HttpProxy) GetConf() config.ProxyConf { + return pxy.cfg +} + +func (pxy *HttpProxy) GetRealConn() (workConn frpNet.Conn, err error) { + tmpConn, errRet := pxy.GetWorkConnFromPool() + if errRet != nil { + err = errRet + return + } + + var rwc io.ReadWriteCloser = tmpConn + if pxy.cfg.UseEncryption { + rwc, err = frpIo.WithEncryption(rwc, []byte(g.GlbServerCfg.Token)) + if err != nil { + pxy.Error("create encryption stream error: %v", err) + return + } + } + if pxy.cfg.UseCompression { + rwc = frpIo.WithCompression(rwc) + } + workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn) + workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn) + pxy.statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()}) + return +} + +func (pxy *HttpProxy) updateStatsAfterClosedConn(totalRead, totalWrite int64) { + name := pxy.GetName() + pxy.statsCollector.Mark(stats.TypeCloseProxy, &stats.CloseConnectionPayload{ProxyName: name}) + pxy.statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{ + ProxyName: name, + TrafficBytes: totalWrite, + }) + pxy.statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{ + ProxyName: name, + TrafficBytes: totalRead, + }) +} + +func (pxy *HttpProxy) Close() { + pxy.BaseProxy.Close() + for _, closeFn := range pxy.closeFuncs { + closeFn() + } +} diff --git a/server/proxy/https.go b/server/proxy/https.go new file mode 100644 index 00000000..354a99ea --- /dev/null +++ b/server/proxy/https.go @@ -0,0 +1,72 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "strings" + + "github.com/fatedier/frp/g" + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/utils/util" + "github.com/fatedier/frp/utils/vhost" +) + +type HttpsProxy struct { + BaseProxy + cfg *config.HttpsProxyConf +} + +func (pxy *HttpsProxy) Run() (remoteAddr string, err error) { + routeConfig := &vhost.VhostRouteConfig{} + + addrs := make([]string, 0) + for _, domain := range pxy.cfg.CustomDomains { + routeConfig.Domain = domain + l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig) + if errRet != nil { + err = errRet + return + } + l.AddLogPrefix(pxy.name) + pxy.Info("https proxy listen for host [%s]", routeConfig.Domain) + pxy.listeners = append(pxy.listeners, l) + addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, g.GlbServerCfg.VhostHttpsPort)) + } + + if pxy.cfg.SubDomain != "" { + routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost + l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig) + if errRet != nil { + err = errRet + return + } + l.AddLogPrefix(pxy.name) + pxy.Info("https proxy listen for host [%s]", routeConfig.Domain) + pxy.listeners = append(pxy.listeners, l) + addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(g.GlbServerCfg.VhostHttpsPort))) + } + + pxy.startListenHandler(pxy, HandleUserTcpConnection) + remoteAddr = strings.Join(addrs, ",") + return +} + +func (pxy *HttpsProxy) GetConf() config.ProxyConf { + return pxy.cfg +} + +func (pxy *HttpsProxy) Close() { + pxy.BaseProxy.Close() +} diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go new file mode 100644 index 00000000..7bdc081b --- /dev/null +++ b/server/proxy/proxy.go @@ -0,0 +1,250 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "fmt" + "io" + "sync" + + "github.com/fatedier/frp/g" + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/server/controller" + "github.com/fatedier/frp/server/stats" + "github.com/fatedier/frp/utils/log" + frpNet "github.com/fatedier/frp/utils/net" + + frpIo "github.com/fatedier/golib/io" +) + +type GetWorkConnFn func() (frpNet.Conn, error) + +type Proxy interface { + Run() (remoteAddr string, err error) + GetName() string + GetConf() config.ProxyConf + GetWorkConnFromPool() (workConn frpNet.Conn, err error) + GetUsedPortsNum() int + Close() + log.Logger +} + +type BaseProxy struct { + name string + rc *controller.ResourceController + statsCollector stats.Collector + listeners []frpNet.Listener + usedPortsNum int + poolCount int + getWorkConnFn GetWorkConnFn + + mu sync.RWMutex + log.Logger +} + +func (pxy *BaseProxy) GetName() string { + return pxy.name +} + +func (pxy *BaseProxy) GetUsedPortsNum() int { + return pxy.usedPortsNum +} + +func (pxy *BaseProxy) Close() { + pxy.Info("proxy closing") + for _, l := range pxy.listeners { + l.Close() + } +} + +func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) { + // try all connections from the pool + for i := 0; i < pxy.poolCount+1; i++ { + if workConn, err = pxy.getWorkConnFn(); err != nil { + pxy.Warn("failed to get work connection: %v", err) + return + } + pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String()) + workConn.AddLogPrefix(pxy.GetName()) + + err := msg.WriteMsg(workConn, &msg.StartWorkConn{ + ProxyName: pxy.GetName(), + }) + if err != nil { + workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i) + workConn.Close() + } else { + break + } + } + + if err != nil { + pxy.Error("try to get work connection failed in the end") + return + } + return +} + +// startListenHandler start a goroutine handler for each listener. +// p: p will just be passed to handler(Proxy, frpNet.Conn). +// handler: each proxy type can set different handler function to deal with connections accepted from listeners. +func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn, stats.Collector)) { + for _, listener := range pxy.listeners { + go func(l frpNet.Listener) { + for { + // block + // if listener is closed, err returned + c, err := l.Accept() + if err != nil { + pxy.Info("listener is closed") + return + } + pxy.Debug("get a user connection [%s]", c.RemoteAddr().String()) + go handler(p, c, pxy.statsCollector) + } + }(listener) + } +} + +func NewProxy(runId string, rc *controller.ResourceController, statsCollector stats.Collector, poolCount int, + getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf) (pxy Proxy, err error) { + + basePxy := BaseProxy{ + name: pxyConf.GetBaseInfo().ProxyName, + rc: rc, + statsCollector: statsCollector, + listeners: make([]frpNet.Listener, 0), + poolCount: poolCount, + getWorkConnFn: getWorkConnFn, + Logger: log.NewPrefixLogger(runId), + } + switch cfg := pxyConf.(type) { + case *config.TcpProxyConf: + basePxy.usedPortsNum = 1 + pxy = &TcpProxy{ + BaseProxy: basePxy, + cfg: cfg, + } + case *config.HttpProxyConf: + pxy = &HttpProxy{ + BaseProxy: basePxy, + cfg: cfg, + } + case *config.HttpsProxyConf: + pxy = &HttpsProxy{ + BaseProxy: basePxy, + cfg: cfg, + } + case *config.UdpProxyConf: + basePxy.usedPortsNum = 1 + pxy = &UdpProxy{ + BaseProxy: basePxy, + cfg: cfg, + } + case *config.StcpProxyConf: + pxy = &StcpProxy{ + BaseProxy: basePxy, + cfg: cfg, + } + case *config.XtcpProxyConf: + pxy = &XtcpProxy{ + BaseProxy: basePxy, + cfg: cfg, + } + default: + return pxy, fmt.Errorf("proxy type not support") + } + pxy.AddLogPrefix(pxy.GetName()) + return +} + +// HandleUserTcpConnection is used for incoming tcp user connections. +// It can be used for tcp, http, https type. +func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector stats.Collector) { + defer userConn.Close() + + // try all connections from the pool + workConn, err := pxy.GetWorkConnFromPool() + if err != nil { + return + } + defer workConn.Close() + + var local io.ReadWriteCloser = workConn + cfg := pxy.GetConf().GetBaseInfo() + if cfg.UseEncryption { + local, err = frpIo.WithEncryption(local, []byte(g.GlbServerCfg.Token)) + if err != nil { + pxy.Error("create encryption stream error: %v", err) + return + } + } + if cfg.UseCompression { + local = frpIo.WithCompression(local) + } + pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(), + workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String()) + + statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()}) + inCount, outCount := frpIo.Join(local, userConn) + statsCollector.Mark(stats.TypeCloseConnection, &stats.CloseConnectionPayload{ProxyName: pxy.GetName()}) + statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{ + ProxyName: pxy.GetName(), + TrafficBytes: inCount, + }) + statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{ + ProxyName: pxy.GetName(), + TrafficBytes: outCount, + }) + pxy.Debug("join connections closed") +} + +type ProxyManager struct { + // proxies indexed by proxy name + pxys map[string]Proxy + + mu sync.RWMutex +} + +func NewProxyManager() *ProxyManager { + return &ProxyManager{ + pxys: make(map[string]Proxy), + } +} + +func (pm *ProxyManager) Add(name string, pxy Proxy) error { + pm.mu.Lock() + defer pm.mu.Unlock() + if _, ok := pm.pxys[name]; ok { + return fmt.Errorf("proxy name [%s] is already in use", name) + } + + pm.pxys[name] = pxy + return nil +} + +func (pm *ProxyManager) Del(name string) { + pm.mu.Lock() + defer pm.mu.Unlock() + delete(pm.pxys, name) +} + +func (pm *ProxyManager) GetByName(name string) (pxy Proxy, ok bool) { + pm.mu.RLock() + defer pm.mu.RUnlock() + pxy, ok = pm.pxys[name] + return +} diff --git a/server/proxy/stcp.go b/server/proxy/stcp.go new file mode 100644 index 00000000..13c67329 --- /dev/null +++ b/server/proxy/stcp.go @@ -0,0 +1,47 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "github.com/fatedier/frp/models/config" +) + +type StcpProxy struct { + BaseProxy + cfg *config.StcpProxyConf +} + +func (pxy *StcpProxy) Run() (remoteAddr string, err error) { + listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk) + if errRet != nil { + err = errRet + return + } + listener.AddLogPrefix(pxy.name) + pxy.listeners = append(pxy.listeners, listener) + pxy.Info("stcp proxy custom listen success") + + pxy.startListenHandler(pxy, HandleUserTcpConnection) + return +} + +func (pxy *StcpProxy) GetConf() config.ProxyConf { + return pxy.cfg +} + +func (pxy *StcpProxy) Close() { + pxy.BaseProxy.Close() + pxy.rc.VisitorManager.CloseListener(pxy.GetName()) +} diff --git a/server/proxy/tcp.go b/server/proxy/tcp.go new file mode 100644 index 00000000..aae44dc4 --- /dev/null +++ b/server/proxy/tcp.go @@ -0,0 +1,84 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "fmt" + + "github.com/fatedier/frp/g" + "github.com/fatedier/frp/models/config" + frpNet "github.com/fatedier/frp/utils/net" +) + +type TcpProxy struct { + BaseProxy + cfg *config.TcpProxyConf + + realPort int +} + +func (pxy *TcpProxy) Run() (remoteAddr string, err error) { + if pxy.cfg.Group != "" { + l, realPort, errRet := pxy.rc.TcpGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, g.GlbServerCfg.ProxyBindAddr, pxy.cfg.RemotePort) + if errRet != nil { + err = errRet + return + } + defer func() { + if err != nil { + l.Close() + } + }() + pxy.realPort = realPort + listener := frpNet.WrapLogListener(l) + listener.AddLogPrefix(pxy.name) + pxy.listeners = append(pxy.listeners, listener) + pxy.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.Group) + } else { + pxy.realPort, err = pxy.rc.TcpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) + if err != nil { + return + } + defer func() { + if err != nil { + pxy.rc.TcpPortManager.Release(pxy.realPort) + } + }() + listener, errRet := frpNet.ListenTcp(g.GlbServerCfg.ProxyBindAddr, pxy.realPort) + if errRet != nil { + err = errRet + return + } + listener.AddLogPrefix(pxy.name) + pxy.listeners = append(pxy.listeners, listener) + pxy.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort) + } + + pxy.cfg.RemotePort = pxy.realPort + remoteAddr = fmt.Sprintf(":%d", pxy.realPort) + pxy.startListenHandler(pxy, HandleUserTcpConnection) + return +} + +func (pxy *TcpProxy) GetConf() config.ProxyConf { + return pxy.cfg +} + +func (pxy *TcpProxy) Close() { + pxy.BaseProxy.Close() + if pxy.cfg.Group == "" { + pxy.rc.TcpPortManager.Release(pxy.realPort) + } +} diff --git a/server/proxy/udp.go b/server/proxy/udp.go new file mode 100644 index 00000000..554b4101 --- /dev/null +++ b/server/proxy/udp.go @@ -0,0 +1,225 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/fatedier/frp/g" + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/models/proto/udp" + "github.com/fatedier/frp/server/stats" + + "github.com/fatedier/golib/errors" +) + +type UdpProxy struct { + BaseProxy + cfg *config.UdpProxyConf + + realPort int + + // udpConn is the listener of udp packages + udpConn *net.UDPConn + + // there are always only one workConn at the same time + // get another one if it closed + workConn net.Conn + + // sendCh is used for sending packages to workConn + sendCh chan *msg.UdpPacket + + // readCh is used for reading packages from workConn + readCh chan *msg.UdpPacket + + // checkCloseCh is used for watching if workConn is closed + checkCloseCh chan int + + isClosed bool +} + +func (pxy *UdpProxy) Run() (remoteAddr string, err error) { + pxy.realPort, err = pxy.rc.UdpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort) + if err != nil { + return + } + defer func() { + if err != nil { + pxy.rc.UdpPortManager.Release(pxy.realPort) + } + }() + + remoteAddr = fmt.Sprintf(":%d", pxy.realPort) + pxy.cfg.RemotePort = pxy.realPort + addr, errRet := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", g.GlbServerCfg.ProxyBindAddr, pxy.realPort)) + if errRet != nil { + err = errRet + return + } + udpConn, errRet := net.ListenUDP("udp", addr) + if errRet != nil { + err = errRet + pxy.Warn("listen udp port error: %v", err) + return + } + pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort) + + pxy.udpConn = udpConn + pxy.sendCh = make(chan *msg.UdpPacket, 1024) + pxy.readCh = make(chan *msg.UdpPacket, 1024) + pxy.checkCloseCh = make(chan int) + + // read message from workConn, if it returns any error, notify proxy to start a new workConn + workConnReaderFn := func(conn net.Conn) { + for { + var ( + rawMsg msg.Message + errRet error + ) + pxy.Trace("loop waiting message from udp workConn") + // client will send heartbeat in workConn for keeping alive + conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second)) + if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil { + pxy.Warn("read from workConn for udp error: %v", errRet) + conn.Close() + // notify proxy to start a new work connection + // ignore error here, it means the proxy is closed + errors.PanicToError(func() { + pxy.checkCloseCh <- 1 + }) + return + } + conn.SetReadDeadline(time.Time{}) + switch m := rawMsg.(type) { + case *msg.Ping: + pxy.Trace("udp work conn get ping message") + continue + case *msg.UdpPacket: + if errRet := errors.PanicToError(func() { + pxy.Trace("get udp message from workConn: %s", m.Content) + pxy.readCh <- m + pxy.statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{ + ProxyName: pxy.GetName(), + TrafficBytes: int64(len(m.Content)), + }) + }); errRet != nil { + conn.Close() + pxy.Info("reader goroutine for udp work connection closed") + return + } + } + } + } + + // send message to workConn + workConnSenderFn := func(conn net.Conn, ctx context.Context) { + var errRet error + for { + select { + case udpMsg, ok := <-pxy.sendCh: + if !ok { + pxy.Info("sender goroutine for udp work connection closed") + return + } + if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil { + pxy.Info("sender goroutine for udp work connection closed: %v", errRet) + conn.Close() + return + } else { + pxy.Trace("send message to udp workConn: %s", udpMsg.Content) + pxy.statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{ + ProxyName: pxy.GetName(), + TrafficBytes: int64(len(udpMsg.Content)), + }) + continue + } + case <-ctx.Done(): + pxy.Info("sender goroutine for udp work connection closed") + return + } + } + } + + go func() { + // Sleep a while for waiting control send the NewProxyResp to client. + time.Sleep(500 * time.Millisecond) + for { + workConn, err := pxy.GetWorkConnFromPool() + if err != nil { + time.Sleep(1 * time.Second) + // check if proxy is closed + select { + case _, ok := <-pxy.checkCloseCh: + if !ok { + return + } + default: + } + continue + } + // close the old workConn and replac it with a new one + if pxy.workConn != nil { + pxy.workConn.Close() + } + pxy.workConn = workConn + ctx, cancel := context.WithCancel(context.Background()) + go workConnReaderFn(workConn) + go workConnSenderFn(workConn, ctx) + _, ok := <-pxy.checkCloseCh + cancel() + if !ok { + return + } + } + }() + + // Read from user connections and send wrapped udp message to sendCh (forwarded by workConn). + // Client will transfor udp message to local udp service and waiting for response for a while. + // Response will be wrapped to be forwarded by work connection to server. + // Close readCh and sendCh at the end. + go func() { + udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh) + pxy.Close() + }() + return remoteAddr, nil +} + +func (pxy *UdpProxy) GetConf() config.ProxyConf { + return pxy.cfg +} + +func (pxy *UdpProxy) Close() { + pxy.mu.Lock() + defer pxy.mu.Unlock() + if !pxy.isClosed { + pxy.isClosed = true + + pxy.BaseProxy.Close() + if pxy.workConn != nil { + pxy.workConn.Close() + } + pxy.udpConn.Close() + + // all channels only closed here + close(pxy.checkCloseCh) + close(pxy.readCh) + close(pxy.sendCh) + } + pxy.rc.UdpPortManager.Release(pxy.realPort) +} diff --git a/server/proxy/xtcp.go b/server/proxy/xtcp.go new file mode 100644 index 00000000..6cef4d68 --- /dev/null +++ b/server/proxy/xtcp.go @@ -0,0 +1,73 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "fmt" + + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/models/msg" + + "github.com/fatedier/golib/errors" +) + +type XtcpProxy struct { + BaseProxy + cfg *config.XtcpProxyConf + + closeCh chan struct{} +} + +func (pxy *XtcpProxy) Run() (remoteAddr string, err error) { + if pxy.rc.NatHoleController == nil { + pxy.Error("udp port for xtcp is not specified.") + err = fmt.Errorf("xtcp is not supported in frps") + return + } + sidCh := pxy.rc.NatHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk) + go func() { + for { + select { + case <-pxy.closeCh: + break + case sid := <-sidCh: + workConn, errRet := pxy.GetWorkConnFromPool() + if errRet != nil { + continue + } + m := &msg.NatHoleSid{ + Sid: sid, + } + errRet = msg.WriteMsg(workConn, m) + if errRet != nil { + pxy.Warn("write nat hole sid package error, %v", errRet) + } + } + } + }() + return +} + +func (pxy *XtcpProxy) GetConf() config.ProxyConf { + return pxy.cfg +} + +func (pxy *XtcpProxy) Close() { + pxy.BaseProxy.Close() + pxy.rc.NatHoleController.CloseClient(pxy.GetName()) + errors.PanicToError(func() { + close(pxy.closeCh) + }) +} diff --git a/server/service.go b/server/service.go index 6cbbfd4f..8befe7e3 100644 --- a/server/service.go +++ b/server/service.go @@ -25,8 +25,12 @@ import ( "github.com/fatedier/frp/assets" "github.com/fatedier/frp/g" "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/models/nathole" + "github.com/fatedier/frp/server/controller" "github.com/fatedier/frp/server/group" "github.com/fatedier/frp/server/ports" + "github.com/fatedier/frp/server/proxy" + "github.com/fatedier/frp/server/stats" "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/util" @@ -43,36 +47,6 @@ const ( var ServerService *Service -// All resource managers and controllers -type ResourceController struct { - // Manage all controllers - CtlManager *ControlManager - - // Manage all proxies - PxyManager *ProxyManager - - // Manage all visitor listeners - VisitorManager *VisitorManager - - // Tcp Group Controller - TcpGroupCtl *group.TcpGroupCtl - - // Manage all tcp ports - TcpPortManager *ports.PortManager - - // Manage all udp ports - UdpPortManager *ports.PortManager - - // For http proxies, forwarding http requests - HttpReverseProxy *vhost.HttpReverseProxy - - // For https proxies, route requests to different clients by hostname and other infomation - VhostHttpsMuxer *vhost.HttpsMuxer - - // Controller for nat hole connections - NatHoleController *NatHoleController -} - // Server service type Service struct { // Dispatch connections to different handlers listen on same port @@ -87,21 +61,32 @@ type Service struct { // Accept connections using websocket websocketListener frpNet.Listener + // Manage all controllers + ctlManager *ControlManager + + // Manage all proxies + pxyManager *proxy.ProxyManager + // All resource managers and controllers - rc *ResourceController + rc *controller.ResourceController + + // stats collector to store server and proxies stats info + statsCollector stats.Collector } func NewService() (svr *Service, err error) { cfg := &g.GlbServerCfg.ServerCommonConf svr = &Service{ - rc: &ResourceController{ - CtlManager: NewControlManager(), - PxyManager: NewProxyManager(), - VisitorManager: NewVisitorManager(), + ctlManager: NewControlManager(), + pxyManager: proxy.NewProxyManager(), + rc: &controller.ResourceController{ + VisitorManager: controller.NewVisitorManager(), TcpPortManager: ports.NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts), UdpPortManager: ports.NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts), }, } + + // Init group controller svr.rc.TcpGroupCtl = group.NewTcpGroupCtl(svr.rc.TcpPortManager) // Init assets. @@ -204,9 +189,9 @@ func NewService() (svr *Service, err error) { // Create nat hole controller. if cfg.BindUdpPort > 0 { - var nc *NatHoleController + var nc *nathole.NatHoleController addr := fmt.Sprintf("%s:%d", cfg.BindAddr, cfg.BindUdpPort) - nc, err = NewNatHoleController(addr) + nc, err = nathole.NewNatHoleController(addr) if err != nil { err = fmt.Errorf("Create nat hole controller error, %v", err) return @@ -215,6 +200,7 @@ func NewService() (svr *Service, err error) { log.Info("nat hole udp service listen on %s:%d", cfg.BindAddr, cfg.BindUdpPort) } + var statsEnable bool // Create dashboard web server. if cfg.DashboardPort > 0 { err = svr.RunDashboardServer(cfg.DashboardAddr, cfg.DashboardPort) @@ -223,8 +209,10 @@ func NewService() (svr *Service, err error) { return } log.Info("Dashboard listen on %s:%d", cfg.DashboardAddr, cfg.DashboardPort) + statsEnable = true } + svr.statsCollector = stats.NewInternalCollector(statsEnable) return } @@ -355,9 +343,9 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e } } - ctl := NewControl(svr.rc, ctlConn, loginMsg) + ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg) - if oldCtl := svr.rc.CtlManager.Add(loginMsg.RunId, ctl); oldCtl != nil { + if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil { oldCtl.allShutdown.WaitDone() } @@ -365,13 +353,19 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e ctl.Start() // for statistics - StatsNewClient() + svr.statsCollector.Mark(stats.TypeNewClient, &stats.NewClientPayload{}) + + go func() { + // block until control closed + ctl.WaitClosed() + svr.ctlManager.Del(loginMsg.RunId) + }() return } // RegisterWorkConn register a new work connection to control and proxies need it. func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkConn) { - ctl, exist := svr.rc.CtlManager.GetById(newMsg.RunId) + ctl, exist := svr.ctlManager.GetById(newMsg.RunId) if !exist { workConn.Warn("No client control found for run id [%s]", newMsg.RunId) return diff --git a/server/stats/internal.go b/server/stats/internal.go new file mode 100644 index 00000000..34d9c745 --- /dev/null +++ b/server/stats/internal.go @@ -0,0 +1,273 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stats + +import ( + "sync" + "time" + + "github.com/fatedier/frp/utils/log" + "github.com/fatedier/frp/utils/metric" +) + +type internalCollector struct { + enable bool + info *ServerStatistics + mu sync.Mutex +} + +func NewInternalCollector(enable bool) Collector { + return &internalCollector{ + enable: enable, + info: &ServerStatistics{ + TotalTrafficIn: metric.NewDateCounter(ReserveDays), + TotalTrafficOut: metric.NewDateCounter(ReserveDays), + CurConns: metric.NewCounter(), + + ClientCounts: metric.NewCounter(), + ProxyTypeCounts: make(map[string]metric.Counter), + + ProxyStatistics: make(map[string]*ProxyStatistics), + }, + } +} + +func (collector *internalCollector) Run() error { + go func() { + for { + time.Sleep(12 * time.Hour) + log.Debug("start to clear useless proxy statistics data...") + collector.ClearUselessInfo() + log.Debug("finish to clear useless proxy statistics data") + } + }() + return nil +} + +func (collector *internalCollector) ClearUselessInfo() { + // To check if there are proxies that closed than 7 days and drop them. + collector.mu.Lock() + defer collector.mu.Unlock() + for name, data := range collector.info.ProxyStatistics { + if !data.LastCloseTime.IsZero() && time.Since(data.LastCloseTime) > time.Duration(7*24)*time.Hour { + delete(collector.info.ProxyStatistics, name) + log.Trace("clear proxy [%s]'s statistics data, lastCloseTime: [%s]", name, data.LastCloseTime.String()) + } + } +} + +func (collector *internalCollector) Mark(statsType StatsType, payload interface{}) { + if !collector.enable { + return + } + + switch v := payload.(type) { + case *NewClientPayload: + collector.newClient(v) + case *CloseClientPayload: + collector.closeClient(v) + case *OpenConnectionPayload: + collector.openConnection(v) + case *CloseConnectionPayload: + collector.closeConnection(v) + case *AddTrafficInPayload: + collector.addTrafficIn(v) + case *AddTrafficOutPayload: + collector.addTrafficOut(v) + } +} + +func (collector *internalCollector) newClient(payload *NewClientPayload) { + collector.info.ClientCounts.Inc(1) +} + +func (collector *internalCollector) closeClient(payload *CloseClientPayload) { + collector.info.ClientCounts.Dec(1) +} + +func (collector *internalCollector) newProxy(payload *NewProxyPayload) { + collector.mu.Lock() + defer collector.mu.Unlock() + counter, ok := collector.info.ProxyTypeCounts[payload.ProxyType] + if !ok { + counter = metric.NewCounter() + } + counter.Inc(1) + collector.info.ProxyTypeCounts[payload.ProxyType] = counter + + proxyStats, ok := collector.info.ProxyStatistics[payload.Name] + if !(ok && proxyStats.ProxyType == payload.ProxyType) { + proxyStats = &ProxyStatistics{ + Name: payload.Name, + ProxyType: payload.ProxyType, + CurConns: metric.NewCounter(), + TrafficIn: metric.NewDateCounter(ReserveDays), + TrafficOut: metric.NewDateCounter(ReserveDays), + } + collector.info.ProxyStatistics[payload.Name] = proxyStats + } + proxyStats.LastStartTime = time.Now() +} + +func (collector *internalCollector) closeProxy(payload *CloseProxyPayload) { + collector.mu.Lock() + defer collector.mu.Unlock() + if counter, ok := collector.info.ProxyTypeCounts[payload.ProxyType]; ok { + counter.Dec(1) + } + if proxyStats, ok := collector.info.ProxyStatistics[payload.Name]; ok { + proxyStats.LastCloseTime = time.Now() + } +} + +func (collector *internalCollector) openConnection(payload *OpenConnectionPayload) { + collector.info.CurConns.Inc(1) + + collector.mu.Lock() + defer collector.mu.Unlock() + proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName] + if ok { + proxyStats.CurConns.Inc(1) + collector.info.ProxyStatistics[payload.ProxyName] = proxyStats + } +} + +func (collector *internalCollector) closeConnection(payload *CloseConnectionPayload) { + collector.info.CurConns.Dec(1) + + collector.mu.Lock() + defer collector.mu.Unlock() + proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName] + if ok { + proxyStats.CurConns.Dec(1) + collector.info.ProxyStatistics[payload.ProxyName] = proxyStats + } +} + +func (collector *internalCollector) addTrafficIn(payload *AddTrafficInPayload) { + collector.info.TotalTrafficIn.Inc(payload.TrafficBytes) + + collector.mu.Lock() + defer collector.mu.Unlock() + + proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName] + if ok { + proxyStats.TrafficIn.Inc(payload.TrafficBytes) + collector.info.ProxyStatistics[payload.ProxyName] = proxyStats + } +} + +func (collector *internalCollector) addTrafficOut(payload *AddTrafficOutPayload) { + collector.info.TotalTrafficOut.Inc(payload.TrafficBytes) + + collector.mu.Lock() + defer collector.mu.Unlock() + + proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName] + if ok { + proxyStats.TrafficOut.Inc(payload.TrafficBytes) + collector.info.ProxyStatistics[payload.ProxyName] = proxyStats + } +} + +func (collector *internalCollector) GetServer() *ServerStats { + collector.mu.Lock() + defer collector.mu.Unlock() + s := &ServerStats{ + TotalTrafficIn: collector.info.TotalTrafficIn.TodayCount(), + TotalTrafficOut: collector.info.TotalTrafficOut.TodayCount(), + CurConns: collector.info.CurConns.Count(), + ClientCounts: collector.info.ClientCounts.Count(), + ProxyTypeCounts: make(map[string]int64), + } + for k, v := range collector.info.ProxyTypeCounts { + s.ProxyTypeCounts[k] = v.Count() + } + return s +} + +func (collector *internalCollector) GetProxiesByType(proxyType string) []*ProxyStats { + res := make([]*ProxyStats, 0) + collector.mu.Lock() + defer collector.mu.Unlock() + + for name, proxyStats := range collector.info.ProxyStatistics { + if proxyStats.ProxyType != proxyType { + continue + } + + ps := &ProxyStats{ + Name: name, + Type: proxyStats.ProxyType, + TodayTrafficIn: proxyStats.TrafficIn.TodayCount(), + TodayTrafficOut: proxyStats.TrafficOut.TodayCount(), + CurConns: proxyStats.CurConns.Count(), + } + if !proxyStats.LastStartTime.IsZero() { + ps.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05") + } + if !proxyStats.LastCloseTime.IsZero() { + ps.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05") + } + res = append(res, ps) + } + return res +} + +func (collector *internalCollector) GetProxiesByTypeAndName(proxyType string, proxyName string) (res *ProxyStats) { + collector.mu.Lock() + defer collector.mu.Unlock() + + for name, proxyStats := range collector.info.ProxyStatistics { + if proxyStats.ProxyType != proxyType { + continue + } + + if name != proxyName { + continue + } + + res = &ProxyStats{ + Name: name, + Type: proxyStats.ProxyType, + TodayTrafficIn: proxyStats.TrafficIn.TodayCount(), + TodayTrafficOut: proxyStats.TrafficOut.TodayCount(), + CurConns: proxyStats.CurConns.Count(), + } + if !proxyStats.LastStartTime.IsZero() { + res.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05") + } + if !proxyStats.LastCloseTime.IsZero() { + res.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05") + } + break + } + return +} + +func (collector *internalCollector) GetProxyTraffic(name string) (res *ProxyTrafficInfo) { + collector.mu.Lock() + defer collector.mu.Unlock() + + proxyStats, ok := collector.info.ProxyStatistics[name] + if ok { + res = &ProxyTrafficInfo{ + Name: name, + } + res.TrafficIn = proxyStats.TrafficIn.GetLastDaysCount(ReserveDays) + res.TrafficOut = proxyStats.TrafficOut.GetLastDaysCount(ReserveDays) + } + return +} diff --git a/server/stats/stats.go b/server/stats/stats.go new file mode 100644 index 00000000..a09d7daa --- /dev/null +++ b/server/stats/stats.go @@ -0,0 +1,129 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stats + +import ( + "time" + + "github.com/fatedier/frp/utils/metric" +) + +const ( + ReserveDays = 7 +) + +type StatsType int + +const ( + TypeNewClient StatsType = iota + TypeCloseClient + TypeNewProxy + TypeCloseProxy + TypeOpenConnection + TypeCloseConnection + TypeAddTrafficIn + TypeAddTrafficOut +) + +type ServerStats struct { + TotalTrafficIn int64 + TotalTrafficOut int64 + CurConns int64 + ClientCounts int64 + ProxyTypeCounts map[string]int64 +} + +type ProxyStats struct { + Name string + Type string + TodayTrafficIn int64 + TodayTrafficOut int64 + LastStartTime string + LastCloseTime string + CurConns int64 +} + +type ProxyTrafficInfo struct { + Name string + TrafficIn []int64 + TrafficOut []int64 +} + +type ProxyStatistics struct { + Name string + ProxyType string + TrafficIn metric.DateCounter + TrafficOut metric.DateCounter + CurConns metric.Counter + LastStartTime time.Time + LastCloseTime time.Time +} + +type ServerStatistics struct { + TotalTrafficIn metric.DateCounter + TotalTrafficOut metric.DateCounter + CurConns metric.Counter + + // counter for clients + ClientCounts metric.Counter + + // counter for proxy types + ProxyTypeCounts map[string]metric.Counter + + // statistics for different proxies + // key is proxy name + ProxyStatistics map[string]*ProxyStatistics +} + +type Collector interface { + Mark(statsType StatsType, payload interface{}) + Run() error + GetServer() *ServerStats + GetProxiesByType(proxyType string) []*ProxyStats + GetProxiesByTypeAndName(proxyType string, proxyName string) *ProxyStats + GetProxyTraffic(name string) *ProxyTrafficInfo +} + +type NewClientPayload struct{} + +type CloseClientPayload struct{} + +type NewProxyPayload struct { + Name string + ProxyType string +} + +type CloseProxyPayload struct { + Name string + ProxyType string +} + +type OpenConnectionPayload struct { + ProxyName string +} + +type CloseConnectionPayload struct { + ProxyName string +} + +type AddTrafficInPayload struct { + ProxyName string + TrafficBytes int64 +} + +type AddTrafficOutPayload struct { + ProxyName string + TrafficBytes int64 +}