split visitors from proxies and add health check config

This commit is contained in:
fatedier
2018-06-25 18:22:35 +08:00
parent b6c219aa97
commit c33b5152e7
13 changed files with 641 additions and 287 deletions

View File

@@ -77,7 +77,7 @@ func (svr *Service) apiReload(w http.ResponseWriter, r *http.Request) {
return
}
pxyCfgs, visitorCfgs, err := config.LoadProxyConfFromIni(g.GlbClientCfg.User, conf, newCommonCfg.Start)
pxyCfgs, visitorCfgs, err := config.LoadAllConfFromIni(g.GlbClientCfg.User, conf, newCommonCfg.Start)
if err != nil {
res.Code = 3
res.Msg = err.Error()

View File

@@ -47,8 +47,12 @@ type Control struct {
// login message to server, only used
loginMsg *msg.Login
// manage all proxies
pm *ProxyManager
// manage all visitors
vm *VisitorManager
// control connection
conn frpNet.Conn
@@ -82,7 +86,7 @@ type Control struct {
log.Logger
}
func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) *Control {
func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf) *Control {
loginMsg := &msg.Login{
Arch: runtime.GOARCH,
Os: runtime.GOOS,
@@ -102,7 +106,9 @@ func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf, visitorCfgs m
Logger: log.NewPrefixLogger(""),
}
ctl.pm = NewProxyManager(ctl, ctl.sendCh, "")
ctl.pm.Reload(pxyCfgs, visitorCfgs, false)
ctl.pm.Reload(pxyCfgs, false)
ctl.vm = NewVisitorManager(ctl)
ctl.vm.Reload(visitorCfgs)
return ctl
}
@@ -129,6 +135,8 @@ func (ctl *Control) Run() (err error) {
// start all local visitors and send NewProxy message for all configured proxies
ctl.pm.Reset(ctl.sendCh, ctl.runId)
ctl.pm.CheckAndStartProxy([]string{ProxyStatusNew})
go ctl.vm.Run()
return nil
}
@@ -444,7 +452,8 @@ func (ctl *Control) worker() {
}
}
func (ctl *Control) reloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) error {
err := ctl.pm.Reload(pxyCfgs, visitorCfgs, true)
func (ctl *Control) reloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf) error {
ctl.vm.Reload(visitorCfgs)
err := ctl.pm.Reload(pxyCfgs, true)
return err
}

32
client/health.go Normal file
View File

@@ -0,0 +1,32 @@
// Copyright 2018 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"github.com/fatedier/frp/models/config"
)
type HealthCheckMonitor struct {
cfg config.HealthCheckConf
}
func NewHealthCheckMonitor(cfg *config.HealthCheckConf) *HealthCheckMonitor {
return &HealthCheckMonitor{
cfg: *cfg,
}
}
func (monitor *HealthCheckMonitor) Start() {
}

View File

@@ -13,25 +13,21 @@ import (
)
const (
ProxyStatusNew = "new"
ProxyStatusStartErr = "start error"
ProxyStatusWaitStart = "wait start"
ProxyStatusRunning = "running"
ProxyStatusClosed = "closed"
ProxyStatusNew = "new"
ProxyStatusStartErr = "start error"
ProxyStatusWaitStart = "wait start"
ProxyStatusRunning = "running"
ProxyStatusCheckFailed = "check failed"
ProxyStatusCheckSuccess = "check success"
ProxyStatusClosed = "closed"
)
type ProxyManager struct {
ctl *Control
ctl *Control
sendCh chan (msg.Message)
proxies map[string]*ProxyWrapper
visitorCfgs map[string]config.ProxyConf
visitors map[string]Visitor
sendCh chan (msg.Message)
closed bool
mu sync.RWMutex
closed bool
mu sync.RWMutex
log.Logger
}
@@ -151,13 +147,11 @@ func (pw *ProxyWrapper) Close() {
func NewProxyManager(ctl *Control, msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
return &ProxyManager{
ctl: ctl,
proxies: make(map[string]*ProxyWrapper),
visitorCfgs: make(map[string]config.ProxyConf),
visitors: make(map[string]Visitor),
sendCh: msgSendCh,
closed: false,
Logger: log.NewPrefixLogger(logPrefix),
ctl: ctl,
proxies: make(map[string]*ProxyWrapper),
sendCh: msgSendCh,
closed: false,
Logger: log.NewPrefixLogger(logPrefix),
}
}
@@ -239,24 +233,9 @@ func (pm *ProxyManager) CheckAndStartProxy(pxyStatus []string) {
}
}
}
for _, cfg := range pm.visitorCfgs {
name := cfg.GetBaseInfo().ProxyName
if _, exist := pm.visitors[name]; !exist {
pm.Info("try to start visitor [%s]", name)
visitor := NewVisitor(pm.ctl, cfg)
err := visitor.Run()
if err != nil {
visitor.Warn("start error: %v", err)
continue
}
pm.visitors[name] = visitor
visitor.Info("start visitor success")
}
}
}
func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf, startNow bool) error {
func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, startNow bool) error {
pm.mu.Lock()
defer func() {
pm.mu.Unlock()
@@ -308,38 +287,6 @@ func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, visitorCfgs
}
}
pm.Info("proxy added: %v", addPxyNames)
delVisitorName := make([]string, 0)
for name, oldVisitorCfg := range pm.visitorCfgs {
del := false
cfg, ok := visitorCfgs[name]
if !ok {
del = true
} else {
if !oldVisitorCfg.Compare(cfg) {
del = true
}
}
if del {
delVisitorName = append(delVisitorName, name)
delete(pm.visitorCfgs, name)
if visitor, ok := pm.visitors[name]; ok {
visitor.Close()
}
delete(pm.visitors, name)
}
}
pm.Info("visitor removed: %v", delVisitorName)
addVisitorName := make([]string, 0)
for name, visitorCfg := range visitorCfgs {
if _, ok := pm.visitorCfgs[name]; !ok {
pm.visitorCfgs[name] = visitorCfg
addVisitorName = append(addVisitorName, name)
}
}
pm.Info("visitor added: %v", addVisitorName)
return nil
}

View File

@@ -27,7 +27,7 @@ type Service struct {
closedCh chan int
}
func NewService(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) (svr *Service) {
func NewService(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf) (svr *Service) {
svr = &Service{
closedCh: make(chan int),
}

View File

@@ -44,18 +44,18 @@ type Visitor interface {
log.Logger
}
func NewVisitor(ctl *Control, pxyConf config.ProxyConf) (visitor Visitor) {
func NewVisitor(ctl *Control, cfg config.VisitorConf) (visitor Visitor) {
baseVisitor := BaseVisitor{
ctl: ctl,
Logger: log.NewPrefixLogger(pxyConf.GetBaseInfo().ProxyName),
Logger: log.NewPrefixLogger(cfg.GetBaseInfo().ProxyName),
}
switch cfg := pxyConf.(type) {
case *config.StcpProxyConf:
switch cfg := cfg.(type) {
case *config.StcpVisitorConf:
visitor = &StcpVisitor{
BaseVisitor: baseVisitor,
cfg: cfg,
}
case *config.XtcpProxyConf:
case *config.XtcpVisitorConf:
visitor = &XtcpVisitor{
BaseVisitor: baseVisitor,
cfg: cfg,
@@ -75,7 +75,7 @@ type BaseVisitor struct {
type StcpVisitor struct {
BaseVisitor
cfg *config.StcpProxyConf
cfg *config.StcpVisitorConf
}
func (sv *StcpVisitor) Run() (err error) {
@@ -162,7 +162,7 @@ func (sv *StcpVisitor) handleConn(userConn frpNet.Conn) {
type XtcpVisitor struct {
BaseVisitor
cfg *config.XtcpProxyConf
cfg *config.XtcpVisitorConf
}
func (sv *XtcpVisitor) Run() (err error) {

111
client/visitor_manager.go Normal file
View File

@@ -0,0 +1,111 @@
// Copyright 2018 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"sync"
"time"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/utils/log"
)
type VisitorManager struct {
ctl *Control
cfgs map[string]config.VisitorConf
visitors map[string]Visitor
checkInterval time.Duration
mu sync.Mutex
}
func NewVisitorManager(ctl *Control) *VisitorManager {
return &VisitorManager{
ctl: ctl,
cfgs: make(map[string]config.VisitorConf),
visitors: make(map[string]Visitor),
checkInterval: 10 * time.Second,
}
}
func (vm *VisitorManager) Run() {
for {
time.Sleep(vm.checkInterval)
vm.mu.Lock()
for _, cfg := range vm.cfgs {
name := cfg.GetBaseInfo().ProxyName
if _, exist := vm.visitors[name]; !exist {
log.Info("try to start visitor [%s]", name)
vm.startVisitor(cfg)
}
}
vm.mu.Unlock()
}
}
// Hold lock before calling this function.
func (vm *VisitorManager) startVisitor(cfg config.VisitorConf) (err error) {
name := cfg.GetBaseInfo().ProxyName
visitor := NewVisitor(vm.ctl, cfg)
err = visitor.Run()
if err != nil {
visitor.Warn("start error: %v", err)
} else {
vm.visitors[name] = visitor
visitor.Info("start visitor success")
}
return
}
func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) {
vm.mu.Lock()
defer vm.mu.Unlock()
delNames := make([]string, 0)
for name, oldCfg := range vm.cfgs {
del := false
cfg, ok := cfgs[name]
if !ok {
del = true
} else {
if !oldCfg.Compare(cfg) {
del = true
}
}
if del {
delNames = append(delNames, name)
delete(vm.cfgs, name)
if visitor, ok := vm.visitors[name]; ok {
visitor.Close()
}
delete(vm.visitors, name)
}
}
log.Info("visitor removed: %v", delNames)
addNames := make([]string, 0)
for name, cfg := range cfgs {
if _, ok := vm.cfgs[name]; !ok {
vm.cfgs[name] = cfg
addNames = append(addNames, name)
vm.startVisitor(cfg)
}
}
log.Info("visitor added: %v", addNames)
return
}