refactoring monitor code, support prometheus (#1668)

* refactoring monitor code, support prometheus
* remove vendor
This commit is contained in:
fatedier
2020-03-11 13:20:26 +08:00
committed by GitHub
parent 6d1af85e80
commit 495d999b6c
889 changed files with 682 additions and 333140 deletions

View File

@@ -30,8 +30,8 @@ import (
"github.com/fatedier/frp/models/msg"
plugin "github.com/fatedier/frp/models/plugin/server"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/metrics"
"github.com/fatedier/frp/server/proxy"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/version"
"github.com/fatedier/frp/utils/xlog"
@@ -92,9 +92,6 @@ type Control struct {
// plugin manager
pluginManager *plugin.Manager
// stats collector to store stats info of clients and proxies
statsCollector stats.Collector
// verifies authentication based on selected method
authVerifier auth.Verifier
@@ -152,7 +149,6 @@ func NewControl(
rc *controller.ResourceController,
pxyManager *proxy.ProxyManager,
pluginManager *plugin.Manager,
statsCollector stats.Collector,
authVerifier auth.Verifier,
ctlConn net.Conn,
loginMsg *msg.Login,
@@ -167,7 +163,6 @@ func NewControl(
rc: rc,
pxyManager: pxyManager,
pluginManager: pluginManager,
statsCollector: statsCollector,
authVerifier: authVerifier,
conn: ctlConn,
loginMsg: loginMsg,
@@ -381,16 +376,12 @@ func (ctl *Control) stoper() {
for _, pxy := range ctl.proxies {
pxy.Close()
ctl.pxyManager.Del(pxy.GetName())
ctl.statsCollector.Mark(stats.TypeCloseProxy, &stats.CloseProxyPayload{
Name: pxy.GetName(),
ProxyType: pxy.GetConf().GetBaseInfo().ProxyType,
})
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
}
ctl.allShutdown.Done()
xl.Info("client exit success")
ctl.statsCollector.Mark(stats.TypeCloseClient, &stats.CloseClientPayload{})
metrics.Server.CloseClient()
}
// block until Control closed
@@ -451,10 +442,7 @@ func (ctl *Control) manager() {
} else {
resp.RemoteAddr = remoteAddr
xl.Info("new proxy [%s] success", m.ProxyName)
ctl.statsCollector.Mark(stats.TypeNewProxy, &stats.NewProxyPayload{
Name: m.ProxyName,
ProxyType: m.ProxyType,
})
metrics.Server.NewProxy(m.ProxyName, m.ProxyType)
}
ctl.sendCh <- resp
case *msg.CloseProxy:
@@ -486,7 +474,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
// NewProxy will return a interface Proxy.
// In fact it create different proxies by different proxy type, we just call run() here.
pxy, err := proxy.NewProxy(ctl.ctx, ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, ctl.GetWorkConn, pxyConf, ctl.serverCfg)
pxy, err := proxy.NewProxy(ctl.ctx, ctl.runId, ctl.rc, ctl.poolCount, ctl.GetWorkConn, pxyConf, ctl.serverCfg)
if err != nil {
return remoteAddr, err
}
@@ -548,9 +536,6 @@ func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
delete(ctl.proxies, closeMsg.ProxyName)
ctl.mu.Unlock()
ctl.statsCollector.Mark(stats.TypeCloseProxy, &stats.CloseProxyPayload{
Name: pxy.GetName(),
ProxyType: pxy.GetConf().GetBaseInfo().ProxyType,
})
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
return
}

View File

@@ -24,6 +24,7 @@ import (
frpNet "github.com/fatedier/frp/utils/net"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
@@ -38,6 +39,11 @@ func (svr *Service) RunDashboardServer(addr string, port int) (err error) {
user, passwd := svr.cfg.DashboardUser, svr.cfg.DashboardPwd
router.Use(frpNet.NewHttpAuthMiddleware(user, passwd).Middleware)
// metrics
if svr.cfg.EnablePrometheus {
router.Handle("/metrics", promhttp.Handler())
}
// api, see dashboard_api.go
router.HandleFunc("/api/serverinfo", svr.ApiServerInfo).Methods("GET")
router.HandleFunc("/api/proxy/{type}", svr.ApiProxyByType).Methods("GET")

View File

@@ -20,6 +20,7 @@ import (
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/consts"
"github.com/fatedier/frp/models/metrics/mem"
"github.com/fatedier/frp/utils/log"
"github.com/fatedier/frp/utils/version"
@@ -62,7 +63,7 @@ func (svr *Service) ApiServerInfo(w http.ResponseWriter, r *http.Request) {
}()
log.Info("Http request: [%s]", r.URL.Path)
serverStats := svr.statsCollector.GetServer()
serverStats := mem.StatsCollector.GetServer()
svrResp := ServerInfoResp{
Version: version.Full(),
BindPort: svr.cfg.BindPort,
@@ -186,7 +187,7 @@ func (svr *Service) ApiProxyByType(w http.ResponseWriter, r *http.Request) {
}
func (svr *Service) getProxyStatsByType(proxyType string) (proxyInfos []*ProxyStatsInfo) {
proxyStats := svr.statsCollector.GetProxiesByType(proxyType)
proxyStats := mem.StatsCollector.GetProxiesByType(proxyType)
proxyInfos = make([]*ProxyStatsInfo, 0, len(proxyStats))
for _, ps := range proxyStats {
proxyInfo := &ProxyStatsInfo{}
@@ -256,7 +257,7 @@ func (svr *Service) ApiProxyByTypeAndName(w http.ResponseWriter, r *http.Request
func (svr *Service) getProxyStatsByTypeAndName(proxyType string, proxyName string) (proxyInfo GetProxyStatsResp, code int, msg string) {
proxyInfo.Name = proxyName
ps := svr.statsCollector.GetProxiesByTypeAndName(proxyType, proxyName)
ps := mem.StatsCollector.GetProxiesByTypeAndName(proxyType, proxyName)
if ps == nil {
code = 404
msg = "no proxy info found"
@@ -314,7 +315,7 @@ func (svr *Service) ApiProxyTraffic(w http.ResponseWriter, r *http.Request) {
trafficResp := GetProxyTrafficResp{}
trafficResp.Name = name
proxyTrafficInfo := svr.statsCollector.GetProxyTraffic(name)
proxyTrafficInfo := mem.StatsCollector.GetProxyTraffic(name)
if proxyTrafficInfo == nil {
res.Code = 404

37
server/metrics/metrics.go Normal file
View File

@@ -0,0 +1,37 @@
package metrics
import (
"sync"
)
type ServerMetrics interface {
NewClient()
CloseClient()
NewProxy(name string, proxyType string)
CloseProxy(name string, proxyType string)
OpenConnection(name string, proxyType string)
CloseConnection(name string, proxyType string)
AddTrafficIn(name string, proxyType string, trafficBytes int64)
AddTrafficOut(name string, proxyType string, trafficBytes int64)
}
var Server ServerMetrics = noopServerMetrics{}
var registerMetrics sync.Once
func Register(m ServerMetrics) {
registerMetrics.Do(func() {
Server = m
})
}
type noopServerMetrics struct{}
func (noopServerMetrics) NewClient() {}
func (noopServerMetrics) CloseClient() {}
func (noopServerMetrics) NewProxy(name string, proxyType string) {}
func (noopServerMetrics) CloseProxy(name string, proxyType string) {}
func (noopServerMetrics) OpenConnection(name string, proxyType string) {}
func (noopServerMetrics) CloseConnection(name string, proxyType string) {}
func (noopServerMetrics) AddTrafficIn(name string, proxyType string, trafficBytes int64) {}
func (noopServerMetrics) AddTrafficOut(name string, proxyType string, trafficBytes int64) {}

View File

@@ -20,7 +20,7 @@ import (
"strings"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/server/metrics"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/vhost"
@@ -159,21 +159,16 @@ func (pxy *HttpProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err err
}
workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn)
workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn)
pxy.statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()})
metrics.Server.OpenConnection(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
return
}
func (pxy *HttpProxy) updateStatsAfterClosedConn(totalRead, totalWrite int64) {
name := pxy.GetName()
pxy.statsCollector.Mark(stats.TypeCloseProxy, &stats.CloseConnectionPayload{ProxyName: name})
pxy.statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
ProxyName: name,
TrafficBytes: totalWrite,
})
pxy.statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
ProxyName: name,
TrafficBytes: totalRead,
})
proxyType := pxy.GetConf().GetBaseInfo().ProxyType
metrics.Server.CloseConnection(name, proxyType)
metrics.Server.AddTrafficIn(name, proxyType, totalWrite)
metrics.Server.AddTrafficOut(name, proxyType, totalRead)
}
func (pxy *HttpProxy) Close() {

View File

@@ -25,7 +25,7 @@ import (
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/server/metrics"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/xlog"
@@ -45,14 +45,13 @@ type Proxy interface {
}
type BaseProxy struct {
name string
rc *controller.ResourceController
statsCollector stats.Collector
listeners []net.Listener
usedPortsNum int
poolCount int
getWorkConnFn GetWorkConnFn
serverCfg config.ServerCommonConf
name string
rc *controller.ResourceController
listeners []net.Listener
usedPortsNum int
poolCount int
getWorkConnFn GetWorkConnFn
serverCfg config.ServerCommonConf
mu sync.RWMutex
xl *xlog.Logger
@@ -136,7 +135,7 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn,
// startListenHandler start a goroutine handler for each listener.
// p: p will just be passed to handler(Proxy, frpNet.Conn).
// handler: each proxy type can set different handler function to deal with connections accepted from listeners.
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, stats.Collector, config.ServerCommonConf)) {
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, config.ServerCommonConf)) {
xl := xlog.FromContextSafe(pxy.ctx)
for _, listener := range pxy.listeners {
go func(l net.Listener) {
@@ -149,26 +148,25 @@ func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn,
return
}
xl.Debug("get a user connection [%s]", c.RemoteAddr().String())
go handler(p, c, pxy.statsCollector, pxy.serverCfg)
go handler(p, c, pxy.serverCfg)
}
}(listener)
}
}
func NewProxy(ctx context.Context, runId string, rc *controller.ResourceController, statsCollector stats.Collector, poolCount int,
func NewProxy(ctx context.Context, runId string, rc *controller.ResourceController, poolCount int,
getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf) (pxy Proxy, err error) {
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName)
basePxy := BaseProxy{
name: pxyConf.GetBaseInfo().ProxyName,
rc: rc,
statsCollector: statsCollector,
listeners: make([]net.Listener, 0),
poolCount: poolCount,
getWorkConnFn: getWorkConnFn,
serverCfg: serverCfg,
xl: xl,
ctx: xlog.NewContext(ctx, xl),
name: pxyConf.GetBaseInfo().ProxyName,
rc: rc,
listeners: make([]net.Listener, 0),
poolCount: poolCount,
getWorkConnFn: getWorkConnFn,
serverCfg: serverCfg,
xl: xl,
ctx: xlog.NewContext(ctx, xl),
}
switch cfg := pxyConf.(type) {
case *config.TcpProxyConf:
@@ -216,7 +214,7 @@ func NewProxy(ctx context.Context, runId string, rc *controller.ResourceControll
// HandleUserTcpConnection is used for incoming tcp user connections.
// It can be used for tcp, http, https type.
func HandleUserTcpConnection(pxy Proxy, userConn net.Conn, statsCollector stats.Collector, serverCfg config.ServerCommonConf) {
func HandleUserTcpConnection(pxy Proxy, userConn net.Conn, serverCfg config.ServerCommonConf) {
xl := xlog.FromContextSafe(pxy.Context())
defer userConn.Close()
@@ -243,17 +241,13 @@ func HandleUserTcpConnection(pxy Proxy, userConn net.Conn, statsCollector stats.
xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()})
name := pxy.GetName()
proxyType := pxy.GetConf().GetBaseInfo().ProxyType
metrics.Server.OpenConnection(name, proxyType)
inCount, outCount := frpIo.Join(local, userConn)
statsCollector.Mark(stats.TypeCloseConnection, &stats.CloseConnectionPayload{ProxyName: pxy.GetName()})
statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
ProxyName: pxy.GetName(),
TrafficBytes: inCount,
})
statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
ProxyName: pxy.GetName(),
TrafficBytes: outCount,
})
metrics.Server.CloseConnection(name, proxyType)
metrics.Server.AddTrafficIn(name, proxyType, inCount)
metrics.Server.AddTrafficOut(name, proxyType, outCount)
xl.Debug("join connections closed")
}

View File

@@ -23,7 +23,7 @@ import (
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/proto/udp"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/server/metrics"
"github.com/fatedier/golib/errors"
)
@@ -114,10 +114,11 @@ func (pxy *UdpProxy) Run() (remoteAddr string, err error) {
if errRet := errors.PanicToError(func() {
xl.Trace("get udp message from workConn: %s", m.Content)
pxy.readCh <- m
pxy.statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
ProxyName: pxy.GetName(),
TrafficBytes: int64(len(m.Content)),
})
metrics.Server.AddTrafficOut(
pxy.GetName(),
pxy.GetConf().GetBaseInfo().ProxyType,
int64(len(m.Content)),
)
}); errRet != nil {
conn.Close()
xl.Info("reader goroutine for udp work connection closed")
@@ -143,10 +144,11 @@ func (pxy *UdpProxy) Run() (remoteAddr string, err error) {
return
} else {
xl.Trace("send message to udp workConn: %s", udpMsg.Content)
pxy.statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
ProxyName: pxy.GetName(),
TrafficBytes: int64(len(udpMsg.Content)),
})
metrics.Server.AddTrafficIn(
pxy.GetName(),
pxy.GetConf().GetBaseInfo().ProxyType,
int64(len(udpMsg.Content)),
)
continue
}
case <-ctx.Done():

View File

@@ -32,14 +32,15 @@ import (
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/models/auth"
"github.com/fatedier/frp/models/config"
modelmetrics "github.com/fatedier/frp/models/metrics"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/nathole"
plugin "github.com/fatedier/frp/models/plugin/server"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/group"
"github.com/fatedier/frp/server/metrics"
"github.com/fatedier/frp/server/ports"
"github.com/fatedier/frp/server/proxy"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/tcpmux"
@@ -92,9 +93,6 @@ type Service struct {
// Verifies authentication based on selected method
authVerifier auth.Verifier
// stats collector to store server and proxies stats info
statsCollector stats.Collector
tlsConfig *tls.Config
cfg config.ServerCommonConf
@@ -275,8 +273,12 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
log.Info("Dashboard listen on %s:%d", cfg.DashboardAddr, cfg.DashboardPort)
statsEnable = true
}
svr.statsCollector = stats.NewInternalCollector(statsEnable)
if statsEnable {
modelmetrics.EnableMem()
if cfg.EnablePrometheus {
modelmetrics.EnablePrometheus()
}
}
return
}
@@ -429,8 +431,7 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err
return
}
ctl := NewControl(ctx, svr.rc, svr.pxyManager, svr.pluginManager, svr.statsCollector, svr.authVerifier, ctlConn, loginMsg, svr.cfg)
ctl := NewControl(ctx, svr.rc, svr.pxyManager, svr.pluginManager, svr.authVerifier, ctlConn, loginMsg, svr.cfg)
if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil {
oldCtl.allShutdown.WaitDone()
}
@@ -438,7 +439,7 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err
ctl.Start()
// for statistics
svr.statsCollector.Mark(stats.TypeNewClient, &stats.NewClientPayload{})
metrics.Server.NewClient()
go func() {
// block until control closed

View File

@@ -1,277 +0,0 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stats
import (
"sync"
"time"
"github.com/fatedier/frp/utils/log"
"github.com/fatedier/frp/utils/metric"
)
type internalCollector struct {
enable bool
info *ServerStatistics
mu sync.Mutex
}
func NewInternalCollector(enable bool) Collector {
return &internalCollector{
enable: enable,
info: &ServerStatistics{
TotalTrafficIn: metric.NewDateCounter(ReserveDays),
TotalTrafficOut: metric.NewDateCounter(ReserveDays),
CurConns: metric.NewCounter(),
ClientCounts: metric.NewCounter(),
ProxyTypeCounts: make(map[string]metric.Counter),
ProxyStatistics: make(map[string]*ProxyStatistics),
},
}
}
func (collector *internalCollector) Run() error {
go func() {
for {
time.Sleep(12 * time.Hour)
log.Debug("start to clear useless proxy statistics data...")
collector.ClearUselessInfo()
log.Debug("finish to clear useless proxy statistics data")
}
}()
return nil
}
func (collector *internalCollector) ClearUselessInfo() {
// To check if there are proxies that closed than 7 days and drop them.
collector.mu.Lock()
defer collector.mu.Unlock()
for name, data := range collector.info.ProxyStatistics {
if !data.LastCloseTime.IsZero() && time.Since(data.LastCloseTime) > time.Duration(7*24)*time.Hour {
delete(collector.info.ProxyStatistics, name)
log.Trace("clear proxy [%s]'s statistics data, lastCloseTime: [%s]", name, data.LastCloseTime.String())
}
}
}
func (collector *internalCollector) Mark(statsType StatsType, payload interface{}) {
if !collector.enable {
return
}
switch v := payload.(type) {
case *NewClientPayload:
collector.newClient(v)
case *CloseClientPayload:
collector.closeClient(v)
case *NewProxyPayload:
collector.newProxy(v)
case *CloseProxyPayload:
collector.closeProxy(v)
case *OpenConnectionPayload:
collector.openConnection(v)
case *CloseConnectionPayload:
collector.closeConnection(v)
case *AddTrafficInPayload:
collector.addTrafficIn(v)
case *AddTrafficOutPayload:
collector.addTrafficOut(v)
}
}
func (collector *internalCollector) newClient(payload *NewClientPayload) {
collector.info.ClientCounts.Inc(1)
}
func (collector *internalCollector) closeClient(payload *CloseClientPayload) {
collector.info.ClientCounts.Dec(1)
}
func (collector *internalCollector) newProxy(payload *NewProxyPayload) {
collector.mu.Lock()
defer collector.mu.Unlock()
counter, ok := collector.info.ProxyTypeCounts[payload.ProxyType]
if !ok {
counter = metric.NewCounter()
}
counter.Inc(1)
collector.info.ProxyTypeCounts[payload.ProxyType] = counter
proxyStats, ok := collector.info.ProxyStatistics[payload.Name]
if !(ok && proxyStats.ProxyType == payload.ProxyType) {
proxyStats = &ProxyStatistics{
Name: payload.Name,
ProxyType: payload.ProxyType,
CurConns: metric.NewCounter(),
TrafficIn: metric.NewDateCounter(ReserveDays),
TrafficOut: metric.NewDateCounter(ReserveDays),
}
collector.info.ProxyStatistics[payload.Name] = proxyStats
}
proxyStats.LastStartTime = time.Now()
}
func (collector *internalCollector) closeProxy(payload *CloseProxyPayload) {
collector.mu.Lock()
defer collector.mu.Unlock()
if counter, ok := collector.info.ProxyTypeCounts[payload.ProxyType]; ok {
counter.Dec(1)
}
if proxyStats, ok := collector.info.ProxyStatistics[payload.Name]; ok {
proxyStats.LastCloseTime = time.Now()
}
}
func (collector *internalCollector) openConnection(payload *OpenConnectionPayload) {
collector.info.CurConns.Inc(1)
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName]
if ok {
proxyStats.CurConns.Inc(1)
collector.info.ProxyStatistics[payload.ProxyName] = proxyStats
}
}
func (collector *internalCollector) closeConnection(payload *CloseConnectionPayload) {
collector.info.CurConns.Dec(1)
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName]
if ok {
proxyStats.CurConns.Dec(1)
collector.info.ProxyStatistics[payload.ProxyName] = proxyStats
}
}
func (collector *internalCollector) addTrafficIn(payload *AddTrafficInPayload) {
collector.info.TotalTrafficIn.Inc(payload.TrafficBytes)
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName]
if ok {
proxyStats.TrafficIn.Inc(payload.TrafficBytes)
collector.info.ProxyStatistics[payload.ProxyName] = proxyStats
}
}
func (collector *internalCollector) addTrafficOut(payload *AddTrafficOutPayload) {
collector.info.TotalTrafficOut.Inc(payload.TrafficBytes)
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName]
if ok {
proxyStats.TrafficOut.Inc(payload.TrafficBytes)
collector.info.ProxyStatistics[payload.ProxyName] = proxyStats
}
}
func (collector *internalCollector) GetServer() *ServerStats {
collector.mu.Lock()
defer collector.mu.Unlock()
s := &ServerStats{
TotalTrafficIn: collector.info.TotalTrafficIn.TodayCount(),
TotalTrafficOut: collector.info.TotalTrafficOut.TodayCount(),
CurConns: collector.info.CurConns.Count(),
ClientCounts: collector.info.ClientCounts.Count(),
ProxyTypeCounts: make(map[string]int64),
}
for k, v := range collector.info.ProxyTypeCounts {
s.ProxyTypeCounts[k] = v.Count()
}
return s
}
func (collector *internalCollector) GetProxiesByType(proxyType string) []*ProxyStats {
res := make([]*ProxyStats, 0)
collector.mu.Lock()
defer collector.mu.Unlock()
for name, proxyStats := range collector.info.ProxyStatistics {
if proxyStats.ProxyType != proxyType {
continue
}
ps := &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: proxyStats.CurConns.Count(),
}
if !proxyStats.LastStartTime.IsZero() {
ps.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
ps.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
res = append(res, ps)
}
return res
}
func (collector *internalCollector) GetProxiesByTypeAndName(proxyType string, proxyName string) (res *ProxyStats) {
collector.mu.Lock()
defer collector.mu.Unlock()
for name, proxyStats := range collector.info.ProxyStatistics {
if proxyStats.ProxyType != proxyType {
continue
}
if name != proxyName {
continue
}
res = &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: proxyStats.CurConns.Count(),
}
if !proxyStats.LastStartTime.IsZero() {
res.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
res.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
break
}
return
}
func (collector *internalCollector) GetProxyTraffic(name string) (res *ProxyTrafficInfo) {
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[name]
if ok {
res = &ProxyTrafficInfo{
Name: name,
}
res.TrafficIn = proxyStats.TrafficIn.GetLastDaysCount(ReserveDays)
res.TrafficOut = proxyStats.TrafficOut.GetLastDaysCount(ReserveDays)
}
return
}

View File

@@ -1,129 +0,0 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stats
import (
"time"
"github.com/fatedier/frp/utils/metric"
)
const (
ReserveDays = 7
)
type StatsType int
const (
TypeNewClient StatsType = iota
TypeCloseClient
TypeNewProxy
TypeCloseProxy
TypeOpenConnection
TypeCloseConnection
TypeAddTrafficIn
TypeAddTrafficOut
)
type ServerStats struct {
TotalTrafficIn int64
TotalTrafficOut int64
CurConns int64
ClientCounts int64
ProxyTypeCounts map[string]int64
}
type ProxyStats struct {
Name string
Type string
TodayTrafficIn int64
TodayTrafficOut int64
LastStartTime string
LastCloseTime string
CurConns int64
}
type ProxyTrafficInfo struct {
Name string
TrafficIn []int64
TrafficOut []int64
}
type ProxyStatistics struct {
Name string
ProxyType string
TrafficIn metric.DateCounter
TrafficOut metric.DateCounter
CurConns metric.Counter
LastStartTime time.Time
LastCloseTime time.Time
}
type ServerStatistics struct {
TotalTrafficIn metric.DateCounter
TotalTrafficOut metric.DateCounter
CurConns metric.Counter
// counter for clients
ClientCounts metric.Counter
// counter for proxy types
ProxyTypeCounts map[string]metric.Counter
// statistics for different proxies
// key is proxy name
ProxyStatistics map[string]*ProxyStatistics
}
type Collector interface {
Mark(statsType StatsType, payload interface{})
Run() error
GetServer() *ServerStats
GetProxiesByType(proxyType string) []*ProxyStats
GetProxiesByTypeAndName(proxyType string, proxyName string) *ProxyStats
GetProxyTraffic(name string) *ProxyTrafficInfo
}
type NewClientPayload struct{}
type CloseClientPayload struct{}
type NewProxyPayload struct {
Name string
ProxyType string
}
type CloseProxyPayload struct {
Name string
ProxyType string
}
type OpenConnectionPayload struct {
ProxyName string
}
type CloseConnectionPayload struct {
ProxyName string
}
type AddTrafficInPayload struct {
ProxyName string
TrafficBytes int64
}
type AddTrafficOutPayload struct {
ProxyName string
TrafficBytes int64
}