Merge pull request #1396 from velovix/issue-1387_server-conf-as-argument

Pass server configuration as an argument
This commit is contained in:
fatedier
2019-08-21 00:30:01 +08:00
committed by GitHub
13 changed files with 115 additions and 149 deletions

View File

@@ -21,7 +21,6 @@ import (
"sync"
"time"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/consts"
frpErr "github.com/fatedier/frp/models/errors"
@@ -129,10 +128,14 @@ type Control struct {
allShutdown *shutdown.Shutdown
mu sync.RWMutex
// Server configuration information
serverCfg config.ServerCommonConf
}
func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManager,
statsCollector stats.Collector, ctlConn net.Conn, loginMsg *msg.Login) *Control {
statsCollector stats.Collector, ctlConn net.Conn, loginMsg *msg.Login,
serverCfg config.ServerCommonConf) *Control {
return &Control{
rc: rc,
@@ -153,6 +156,7 @@ func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManage
writerShutdown: shutdown.New(),
managerShutdown: shutdown.New(),
allShutdown: shutdown.New(),
serverCfg: serverCfg,
}
}
@@ -161,7 +165,7 @@ func (ctl *Control) Start() {
loginRespMsg := &msg.LoginResp{
Version: version.Full(),
RunId: ctl.runId,
ServerUdpPort: g.GlbServerCfg.BindUdpPort,
ServerUdpPort: ctl.serverCfg.BindUdpPort,
Error: "",
}
msg.WriteMsg(ctl.conn, loginRespMsg)
@@ -232,7 +236,7 @@ func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
return
}
case <-time.After(time.Duration(g.GlbServerCfg.UserConnTimeout) * time.Second):
case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):
err = fmt.Errorf("timeout trying to get work connection")
ctl.conn.Warn("%v", err)
return
@@ -263,7 +267,7 @@ func (ctl *Control) writer() {
defer ctl.allShutdown.Start()
defer ctl.writerShutdown.Done()
encWriter, err := crypto.NewWriter(ctl.conn, []byte(g.GlbServerCfg.Token))
encWriter, err := crypto.NewWriter(ctl.conn, []byte(ctl.serverCfg.Token))
if err != nil {
ctl.conn.Error("crypto new writer error: %v", err)
ctl.allShutdown.Start()
@@ -293,7 +297,7 @@ func (ctl *Control) reader() {
defer ctl.allShutdown.Start()
defer ctl.readerShutdown.Done()
encReader := crypto.NewReader(ctl.conn, []byte(g.GlbServerCfg.Token))
encReader := crypto.NewReader(ctl.conn, []byte(ctl.serverCfg.Token))
for {
if m, err := msg.ReadMsg(encReader); err != nil {
if err == io.EOF {
@@ -374,7 +378,7 @@ func (ctl *Control) manager() {
for {
select {
case <-heartbeat.C:
if time.Since(ctl.lastPing) > time.Duration(g.GlbServerCfg.HeartBeatTimeout)*time.Second {
if time.Since(ctl.lastPing) > time.Duration(ctl.serverCfg.HeartBeatTimeout)*time.Second {
ctl.conn.Warn("heartbeat timeout")
return
}
@@ -417,22 +421,22 @@ func (ctl *Control) manager() {
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
var pxyConf config.ProxyConf
// Load configures from NewProxy message and check.
pxyConf, err = config.NewProxyConfFromMsg(pxyMsg)
pxyConf, err = config.NewProxyConfFromMsg(pxyMsg, ctl.serverCfg)
if err != nil {
return
}
// NewProxy will return a interface Proxy.
// In fact it create different proxies by different proxy type, we just call run() here.
pxy, err := proxy.NewProxy(ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, ctl.GetWorkConn, pxyConf)
pxy, err := proxy.NewProxy(ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, ctl.GetWorkConn, pxyConf, ctl.serverCfg)
if err != nil {
return remoteAddr, err
}
// Check ports used number in each client
if g.GlbServerCfg.MaxPortsPerClient > 0 {
if ctl.serverCfg.MaxPortsPerClient > 0 {
ctl.mu.Lock()
if ctl.portsUsedNum+pxy.GetUsedPortsNum() > int(g.GlbServerCfg.MaxPortsPerClient) {
if ctl.portsUsedNum+pxy.GetUsedPortsNum() > int(ctl.serverCfg.MaxPortsPerClient) {
ctl.mu.Unlock()
err = fmt.Errorf("exceed the max_ports_per_client")
return
@@ -478,7 +482,7 @@ func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
return
}
if g.GlbServerCfg.MaxPortsPerClient > 0 {
if ctl.serverCfg.MaxPortsPerClient > 0 {
ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum()
}
pxy.Close()

View File

@@ -21,7 +21,6 @@ import (
"time"
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/g"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/gorilla/mux"
@@ -36,7 +35,7 @@ func (svr *Service) RunDashboardServer(addr string, port int) (err error) {
// url router
router := mux.NewRouter()
user, passwd := g.GlbServerCfg.DashboardUser, g.GlbServerCfg.DashboardPwd
user, passwd := svr.cfg.DashboardUser, svr.cfg.DashboardPwd
router.Use(frpNet.NewHttpAuthMiddleware(user, passwd).Middleware)
// api, see dashboard_api.go

View File

@@ -18,7 +18,6 @@ import (
"encoding/json"
"net/http"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/consts"
"github.com/fatedier/frp/utils/log"
@@ -63,19 +62,18 @@ func (svr *Service) ApiServerInfo(w http.ResponseWriter, r *http.Request) {
}()
log.Info("Http request: [%s]", r.URL.Path)
cfg := &g.GlbServerCfg.ServerCommonConf
serverStats := svr.statsCollector.GetServer()
svrResp := ServerInfoResp{
Version: version.Full(),
BindPort: cfg.BindPort,
BindUdpPort: cfg.BindUdpPort,
VhostHttpPort: cfg.VhostHttpPort,
VhostHttpsPort: cfg.VhostHttpsPort,
KcpBindPort: cfg.KcpBindPort,
SubdomainHost: cfg.SubDomainHost,
MaxPoolCount: cfg.MaxPoolCount,
MaxPortsPerClient: cfg.MaxPortsPerClient,
HeartBeatTimeout: cfg.HeartBeatTimeout,
BindPort: svr.cfg.BindPort,
BindUdpPort: svr.cfg.BindUdpPort,
VhostHttpPort: svr.cfg.VhostHttpPort,
VhostHttpsPort: svr.cfg.VhostHttpsPort,
KcpBindPort: svr.cfg.KcpBindPort,
SubdomainHost: svr.cfg.SubDomainHost,
MaxPoolCount: svr.cfg.MaxPoolCount,
MaxPortsPerClient: svr.cfg.MaxPortsPerClient,
HeartBeatTimeout: svr.cfg.HeartBeatTimeout,
TotalTrafficIn: serverStats.TotalTrafficIn,
TotalTrafficOut: serverStats.TotalTrafficOut,

View File

@@ -19,7 +19,6 @@ import (
"net"
"strings"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/server/stats"
frpNet "github.com/fatedier/frp/utils/net"
@@ -88,13 +87,13 @@ func (pxy *HttpProxy) Run() (remoteAddr string, err error) {
pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation)
})
}
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(g.GlbServerCfg.VhostHttpPort)))
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(pxy.serverCfg.VhostHttpPort)))
pxy.Info("http proxy listen for host [%s] location [%s] group [%s]", routeConfig.Domain, routeConfig.Location, pxy.cfg.Group)
}
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost
routeConfig.Domain = pxy.cfg.SubDomain + "." + pxy.serverCfg.SubDomainHost
for _, location := range locations {
routeConfig.Location = location
tmpDomain := routeConfig.Domain
@@ -119,7 +118,7 @@ func (pxy *HttpProxy) Run() (remoteAddr string, err error) {
pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation)
})
}
addrs = append(addrs, util.CanonicalAddr(tmpDomain, g.GlbServerCfg.VhostHttpPort))
addrs = append(addrs, util.CanonicalAddr(tmpDomain, pxy.serverCfg.VhostHttpPort))
pxy.Info("http proxy listen for host [%s] location [%s] group [%s]", routeConfig.Domain, routeConfig.Location, pxy.cfg.Group)
}
@@ -147,7 +146,7 @@ func (pxy *HttpProxy) GetRealConn(remoteAddr string) (workConn frpNet.Conn, err
var rwc io.ReadWriteCloser = tmpConn
if pxy.cfg.UseEncryption {
rwc, err = frpIo.WithEncryption(rwc, []byte(g.GlbServerCfg.Token))
rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.serverCfg.Token))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return

View File

@@ -17,7 +17,6 @@ package proxy
import (
"strings"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/vhost"
@@ -51,11 +50,11 @@ func (pxy *HttpsProxy) Run() (remoteAddr string, err error) {
l.AddLogPrefix(pxy.name)
pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
pxy.listeners = append(pxy.listeners, l)
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, g.GlbServerCfg.VhostHttpsPort))
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, pxy.serverCfg.VhostHttpsPort))
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost
routeConfig.Domain = pxy.cfg.SubDomain + "." + pxy.serverCfg.SubDomainHost
l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig)
if errRet != nil {
err = errRet
@@ -64,7 +63,7 @@ func (pxy *HttpsProxy) Run() (remoteAddr string, err error) {
l.AddLogPrefix(pxy.name)
pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
pxy.listeners = append(pxy.listeners, l)
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(g.GlbServerCfg.VhostHttpsPort)))
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(pxy.serverCfg.VhostHttpsPort)))
}
pxy.startListenHandler(pxy, HandleUserTcpConnection)

View File

@@ -21,7 +21,6 @@ import (
"strconv"
"sync"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/server/controller"
@@ -52,6 +51,7 @@ type BaseProxy struct {
usedPortsNum int
poolCount int
getWorkConnFn GetWorkConnFn
serverCfg config.ServerCommonConf
mu sync.RWMutex
log.Logger
@@ -126,7 +126,7 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Co
// startListenHandler start a goroutine handler for each listener.
// p: p will just be passed to handler(Proxy, frpNet.Conn).
// handler: each proxy type can set different handler function to deal with connections accepted from listeners.
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn, stats.Collector)) {
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn, stats.Collector, config.ServerCommonConf)) {
for _, listener := range pxy.listeners {
go func(l frpNet.Listener) {
for {
@@ -138,14 +138,14 @@ func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Con
return
}
pxy.Debug("get a user connection [%s]", c.RemoteAddr().String())
go handler(p, c, pxy.statsCollector)
go handler(p, c, pxy.statsCollector, pxy.serverCfg)
}
}(listener)
}
}
func NewProxy(runId string, rc *controller.ResourceController, statsCollector stats.Collector, poolCount int,
getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf) (pxy Proxy, err error) {
getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf) (pxy Proxy, err error) {
basePxy := BaseProxy{
name: pxyConf.GetBaseInfo().ProxyName,
@@ -155,6 +155,7 @@ func NewProxy(runId string, rc *controller.ResourceController, statsCollector st
poolCount: poolCount,
getWorkConnFn: getWorkConnFn,
Logger: log.NewPrefixLogger(runId),
serverCfg: serverCfg,
}
switch cfg := pxyConf.(type) {
case *config.TcpProxyConf:
@@ -198,7 +199,7 @@ func NewProxy(runId string, rc *controller.ResourceController, statsCollector st
// HandleUserTcpConnection is used for incoming tcp user connections.
// It can be used for tcp, http, https type.
func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector stats.Collector) {
func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector stats.Collector, serverCfg config.ServerCommonConf) {
defer userConn.Close()
// try all connections from the pool
@@ -211,7 +212,7 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector sta
var local io.ReadWriteCloser = workConn
cfg := pxy.GetConf().GetBaseInfo()
if cfg.UseEncryption {
local, err = frpIo.WithEncryption(local, []byte(g.GlbServerCfg.Token))
local, err = frpIo.WithEncryption(local, []byte(serverCfg.Token))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return

View File

@@ -17,7 +17,6 @@ package proxy
import (
"fmt"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
frpNet "github.com/fatedier/frp/utils/net"
)
@@ -31,7 +30,7 @@ type TcpProxy struct {
func (pxy *TcpProxy) Run() (remoteAddr string, err error) {
if pxy.cfg.Group != "" {
l, realPort, errRet := pxy.rc.TcpGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, g.GlbServerCfg.ProxyBindAddr, pxy.cfg.RemotePort)
l, realPort, errRet := pxy.rc.TcpGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, pxy.serverCfg.ProxyBindAddr, pxy.cfg.RemotePort)
if errRet != nil {
err = errRet
return
@@ -56,7 +55,7 @@ func (pxy *TcpProxy) Run() (remoteAddr string, err error) {
pxy.rc.TcpPortManager.Release(pxy.realPort)
}
}()
listener, errRet := frpNet.ListenTcp(g.GlbServerCfg.ProxyBindAddr, pxy.realPort)
listener, errRet := frpNet.ListenTcp(pxy.serverCfg.ProxyBindAddr, pxy.realPort)
if errRet != nil {
err = errRet
return

View File

@@ -20,7 +20,6 @@ import (
"net"
"time"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/proto/udp"
@@ -67,7 +66,7 @@ func (pxy *UdpProxy) Run() (remoteAddr string, err error) {
remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
pxy.cfg.RemotePort = pxy.realPort
addr, errRet := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", g.GlbServerCfg.ProxyBindAddr, pxy.realPort))
addr, errRet := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.serverCfg.ProxyBindAddr, pxy.realPort))
if errRet != nil {
err = errRet
return

View File

@@ -29,7 +29,7 @@ import (
"time"
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/nathole"
"github.com/fatedier/frp/server/controller"
@@ -51,8 +51,6 @@ const (
connReadTimeout time.Duration = 10 * time.Second
)
var ServerService *Service
// Server service
type Service struct {
// Dispatch connections to different handlers listen on same port
@@ -86,10 +84,11 @@ type Service struct {
statsCollector stats.Collector
tlsConfig *tls.Config
cfg config.ServerCommonConf
}
func NewService() (svr *Service, err error) {
cfg := &g.GlbServerCfg.ServerCommonConf
func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
svr = &Service{
ctlManager: NewControlManager(),
pxyManager: proxy.NewProxyManager(),
@@ -100,6 +99,7 @@ func NewService() (svr *Service, err error) {
},
httpVhostRouter: vhost.NewVhostRouters(),
tlsConfig: generateTLSConfig(),
cfg: cfg,
}
// Init group controller
@@ -248,7 +248,7 @@ func (svr *Service) Run() {
if svr.rc.NatHoleController != nil {
go svr.rc.NatHoleController.Run()
}
if g.GlbServerCfg.KcpBindPort > 0 {
if svr.cfg.KcpBindPort > 0 {
go svr.HandleListener(svr.kcpListener)
}
@@ -324,7 +324,7 @@ func (svr *Service) HandleListener(l frpNet.Listener) {
}
}
if g.GlbServerCfg.TcpMux {
if svr.cfg.TcpMux {
fmuxCfg := fmux.DefaultConfig()
fmuxCfg.KeepAliveInterval = 20 * time.Second
fmuxCfg.LogOutput = ioutil.Discard
@@ -363,7 +363,7 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
}
// Check auth.
if util.GetAuthKey(g.GlbServerCfg.Token, loginMsg.Timestamp) != loginMsg.PrivilegeKey {
if util.GetAuthKey(svr.cfg.Token, loginMsg.Timestamp) != loginMsg.PrivilegeKey {
err = fmt.Errorf("authorization failed")
return
}
@@ -377,7 +377,7 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
}
}
ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg)
ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg, svr.cfg)
if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil {
oldCtl.allShutdown.WaitDone()