frpc: add status command

This commit is contained in:
fatedier 2018-01-17 01:09:33 +08:00
parent 3bb404dfb5
commit 584e098e8e
11 changed files with 722 additions and 318 deletions

View File

@ -39,6 +39,7 @@ func (svr *Service) RunAdminServer(addr string, port int64) (err error) {
// api, see dashboard_api.go // api, see dashboard_api.go
router.GET("/api/reload", frpNet.HttprouterBasicAuth(svr.apiReload, user, passwd)) router.GET("/api/reload", frpNet.HttprouterBasicAuth(svr.apiReload, user, passwd))
router.GET("/api/status", frpNet.HttprouterBasicAuth(svr.apiStatus, user, passwd))
address := fmt.Sprintf("%s:%d", addr, port) address := fmt.Sprintf("%s:%d", addr, port)
server := &http.Server{ server := &http.Server{

View File

@ -16,7 +16,10 @@ package client
import ( import (
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"sort"
"strings"
"github.com/julienschmidt/httprouter" "github.com/julienschmidt/httprouter"
ini "github.com/vaughan0/go-ini" ini "github.com/vaughan0/go-ini"
@ -72,7 +75,127 @@ func (svr *Service) apiReload(w http.ResponseWriter, r *http.Request, _ httprout
return return
} }
svr.ctl.reloadConf(pxyCfgs, visitorCfgs) err = svr.ctl.reloadConf(pxyCfgs, visitorCfgs)
if err != nil {
res.Code = 4
res.Msg = err.Error()
log.Error("reload frpc proxy config error: %v", err)
return
}
log.Info("success reload conf") log.Info("success reload conf")
return return
} }
type StatusResp struct {
Tcp []ProxyStatusResp `json:"tcp"`
Udp []ProxyStatusResp `json:"udp"`
Http []ProxyStatusResp `json:"http"`
Https []ProxyStatusResp `json:"https"`
Stcp []ProxyStatusResp `json:"stcp"`
Xtcp []ProxyStatusResp `json:"xtcp"`
}
type ProxyStatusResp struct {
Name string `json:"name"`
Type string `json:"type"`
Status string `json:"status"`
Err string `json:"err"`
LocalAddr string `json:"local_addr"`
Plugin string `json:"plugin"`
RemoteAddr string `json:"remote_addr"`
}
type ByProxyStatusResp []ProxyStatusResp
func (a ByProxyStatusResp) Len() int { return len(a) }
func (a ByProxyStatusResp) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByProxyStatusResp) Less(i, j int) bool { return strings.Compare(a[i].Name, a[j].Name) < 0 }
func NewProxyStatusResp(status *ProxyStatus) ProxyStatusResp {
psr := ProxyStatusResp{
Name: status.Name,
Type: status.Type,
Status: status.Status,
Err: status.Err,
}
switch cfg := status.Cfg.(type) {
case *config.TcpProxyConf:
if cfg.LocalPort != 0 {
psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort)
}
psr.Plugin = cfg.Plugin
psr.RemoteAddr = fmt.Sprintf(":%d", cfg.RemotePort)
case *config.UdpProxyConf:
if cfg.LocalPort != 0 {
psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort)
}
psr.RemoteAddr = fmt.Sprintf(":%d", cfg.RemotePort)
case *config.HttpProxyConf:
if cfg.LocalPort != 0 {
psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort)
}
psr.Plugin = cfg.Plugin
case *config.HttpsProxyConf:
if cfg.LocalPort != 0 {
psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort)
}
psr.Plugin = cfg.Plugin
case *config.StcpProxyConf:
if cfg.LocalPort != 0 {
psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort)
}
psr.Plugin = cfg.Plugin
case *config.XtcpProxyConf:
if cfg.LocalPort != 0 {
psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort)
}
psr.Plugin = cfg.Plugin
}
return psr
}
// api/status
func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
var (
buf []byte
res StatusResp
)
res.Tcp = make([]ProxyStatusResp, 0)
res.Udp = make([]ProxyStatusResp, 0)
res.Http = make([]ProxyStatusResp, 0)
res.Https = make([]ProxyStatusResp, 0)
res.Stcp = make([]ProxyStatusResp, 0)
res.Xtcp = make([]ProxyStatusResp, 0)
defer func() {
log.Info("Http response [/api/status]")
buf, _ = json.Marshal(&res)
w.Write(buf)
}()
log.Info("Http request: [/api/status]")
ps := svr.ctl.pm.GetAllProxyStatus()
for _, status := range ps {
switch status.Type {
case "tcp":
res.Tcp = append(res.Tcp, NewProxyStatusResp(status))
case "udp":
res.Udp = append(res.Udp, NewProxyStatusResp(status))
case "http":
res.Http = append(res.Http, NewProxyStatusResp(status))
case "https":
res.Https = append(res.Https, NewProxyStatusResp(status))
case "stcp":
res.Stcp = append(res.Stcp, NewProxyStatusResp(status))
case "xtcp":
res.Xtcp = append(res.Xtcp, NewProxyStatusResp(status))
}
}
sort.Sort(ByProxyStatusResp(res.Tcp))
sort.Sort(ByProxyStatusResp(res.Udp))
sort.Sort(ByProxyStatusResp(res.Http))
sort.Sort(ByProxyStatusResp(res.Https))
sort.Sort(ByProxyStatusResp(res.Stcp))
sort.Sort(ByProxyStatusResp(res.Xtcp))
return
}

View File

@ -24,9 +24,9 @@ import (
"github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/crypto" "github.com/fatedier/frp/utils/crypto"
"github.com/fatedier/frp/utils/errors"
"github.com/fatedier/frp/utils/log" "github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net" frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/shutdown"
"github.com/fatedier/frp/utils/util" "github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/version" "github.com/fatedier/frp/utils/version"
"github.com/xtaci/smux" "github.com/xtaci/smux"
@ -40,20 +40,10 @@ type Control struct {
// frpc service // frpc service
svr *Service svr *Service
// login message to server // login message to server, only used
loginMsg *msg.Login loginMsg *msg.Login
// proxy configures pm *ProxyManager
pxyCfgs map[string]config.ProxyConf
// proxies
proxies map[string]Proxy
// visitor configures
visitorCfgs map[string]config.ProxyConf
// visitors
visitors map[string]Visitor
// control connection // control connection
conn frpNet.Conn conn frpNet.Conn
@ -79,6 +69,10 @@ type Control struct {
// last time got the Pong message // last time got the Pong message
lastPong time.Time lastPong time.Time
readerShutdown *shutdown.Shutdown
writerShutdown *shutdown.Shutdown
msgHandlerShutdown *shutdown.Shutdown
mu sync.RWMutex mu sync.RWMutex
log.Logger log.Logger
@ -92,28 +86,22 @@ func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf, visitorCfgs m
User: config.ClientCommonCfg.User, User: config.ClientCommonCfg.User,
Version: version.Full(), Version: version.Full(),
} }
return &Control{ ctl := &Control{
svr: svr, svr: svr,
loginMsg: loginMsg, loginMsg: loginMsg,
pxyCfgs: pxyCfgs, sendCh: make(chan msg.Message, 10),
visitorCfgs: visitorCfgs, readCh: make(chan msg.Message, 10),
proxies: make(map[string]Proxy), closedCh: make(chan int),
visitors: make(map[string]Visitor), readerShutdown: shutdown.New(),
sendCh: make(chan msg.Message, 10), writerShutdown: shutdown.New(),
readCh: make(chan msg.Message, 10), msgHandlerShutdown: shutdown.New(),
closedCh: make(chan int), Logger: log.NewPrefixLogger(""),
Logger: log.NewPrefixLogger(""),
} }
ctl.pm = NewProxyManager(ctl, ctl.sendCh, "")
ctl.pm.Reload(pxyCfgs, visitorCfgs)
return ctl
} }
// 1. login
// 2. start reader() writer() manager()
// 3. connection closed
// 4. In reader(): close closedCh and exit, controler() get it
// 5. In controler(): close readCh and sendCh, manager() and writer() will exit
// 6. In controler(): ini readCh, sendCh, closedCh
// 7. In controler(): start new reader(), writer(), manager()
// controler() will keep running
func (ctl *Control) Run() (err error) { func (ctl *Control) Run() (err error) {
for { for {
err = ctl.login() err = ctl.login()
@ -125,47 +113,29 @@ func (ctl *Control) Run() (err error) {
if config.ClientCommonCfg.LoginFailExit { if config.ClientCommonCfg.LoginFailExit {
return return
} else { } else {
time.Sleep(30 * time.Second) time.Sleep(10 * time.Second)
} }
} else { } else {
break break
} }
} }
go ctl.controler() go ctl.worker()
go ctl.manager()
go ctl.writer()
go ctl.reader()
// start all local visitors // start all local visitors and send NewProxy message for all configured proxies
for _, cfg := range ctl.visitorCfgs { ctl.pm.Reset(ctl.sendCh, ctl.runId)
visitor := NewVisitor(ctl, cfg) ctl.pm.CheckAndStartProxy()
err = visitor.Run()
if err != nil {
visitor.Warn("start error: %v", err)
continue
}
ctl.visitors[cfg.GetName()] = visitor
visitor.Info("start visitor success")
}
// send NewProxy message for all configured proxies
for _, cfg := range ctl.pxyCfgs {
var newProxyMsg msg.NewProxy
cfg.UnMarshalToMsg(&newProxyMsg)
ctl.sendCh <- &newProxyMsg
}
return nil return nil
} }
func (ctl *Control) NewWorkConn() { func (ctl *Control) HandleReqWorkConn(inMsg *msg.ReqWorkConn) {
workConn, err := ctl.connectServer() workConn, err := ctl.connectServer()
if err != nil { if err != nil {
return return
} }
m := &msg.NewWorkConn{ m := &msg.NewWorkConn{
RunId: ctl.getRunId(), RunId: ctl.runId,
} }
if err = msg.WriteMsg(workConn, m); err != nil { if err = msg.WriteMsg(workConn, m); err != nil {
ctl.Warn("work connection write to server error: %v", err) ctl.Warn("work connection write to server error: %v", err)
@ -182,33 +152,26 @@ func (ctl *Control) NewWorkConn() {
workConn.AddLogPrefix(startMsg.ProxyName) workConn.AddLogPrefix(startMsg.ProxyName)
// dispatch this work connection to related proxy // dispatch this work connection to related proxy
pxy, ok := ctl.getProxy(startMsg.ProxyName) ctl.pm.HandleWorkConn(startMsg.ProxyName, workConn)
if ok { }
workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
go pxy.InWorkConn(workConn) func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) {
// Server will return NewProxyResp message to each NewProxy message.
// Start a new proxy handler if no error got
err := ctl.pm.StartProxy(inMsg.ProxyName, inMsg.Error)
if err != nil {
ctl.Warn("[%s] start error: %v", inMsg.ProxyName, err)
} else { } else {
workConn.Close() ctl.Info("[%s] start proxy success", inMsg.ProxyName)
} }
} }
func (ctl *Control) Close() error { func (ctl *Control) Close() error {
ctl.mu.Lock() ctl.mu.Lock()
defer ctl.mu.Unlock()
ctl.exit = true ctl.exit = true
err := errors.PanicToError(func() { ctl.pm.CloseProxies()
for name, _ := range ctl.proxies { return nil
ctl.sendCh <- &msg.CloseProxy{
ProxyName: name,
}
}
})
ctl.mu.Unlock()
return err
}
func (ctl *Control) init() {
ctl.sendCh = make(chan msg.Message, 10)
ctl.readCh = make(chan msg.Message, 10)
ctl.closedCh = make(chan int)
} }
// login send a login message to server and wait for a loginResp message. // login send a login message to server and wait for a loginResp message.
@ -249,7 +212,7 @@ func (ctl *Control) login() (err error) {
now := time.Now().Unix() now := time.Now().Unix()
ctl.loginMsg.PrivilegeKey = util.GetAuthKey(config.ClientCommonCfg.PrivilegeToken, now) ctl.loginMsg.PrivilegeKey = util.GetAuthKey(config.ClientCommonCfg.PrivilegeToken, now)
ctl.loginMsg.Timestamp = now ctl.loginMsg.Timestamp = now
ctl.loginMsg.RunId = ctl.getRunId() ctl.loginMsg.RunId = ctl.runId
if err = msg.WriteMsg(conn, ctl.loginMsg); err != nil { if err = msg.WriteMsg(conn, ctl.loginMsg); err != nil {
return err return err
@ -270,16 +233,11 @@ func (ctl *Control) login() (err error) {
ctl.conn = conn ctl.conn = conn
// update runId got from server // update runId got from server
ctl.setRunId(loginRespMsg.RunId) ctl.runId = loginRespMsg.RunId
config.ClientCommonCfg.ServerUdpPort = loginRespMsg.ServerUdpPort config.ClientCommonCfg.ServerUdpPort = loginRespMsg.ServerUdpPort
ctl.ClearLogPrefix() ctl.ClearLogPrefix()
ctl.AddLogPrefix(loginRespMsg.RunId) ctl.AddLogPrefix(loginRespMsg.RunId)
ctl.Info("login to server success, get run id [%s], server udp port [%d]", loginRespMsg.RunId, loginRespMsg.ServerUdpPort) ctl.Info("login to server success, get run id [%s], server udp port [%d]", loginRespMsg.RunId, loginRespMsg.ServerUdpPort)
// login success, so we let closedCh available again
ctl.closedCh = make(chan int)
ctl.lastPong = time.Now()
return nil return nil
} }
@ -292,7 +250,6 @@ func (ctl *Control) connectServer() (conn frpNet.Conn, err error) {
return return
} }
conn = frpNet.WrapConn(stream) conn = frpNet.WrapConn(stream)
} else { } else {
conn, err = frpNet.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol, conn, err = frpNet.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol,
fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort)) fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort))
@ -304,12 +261,14 @@ func (ctl *Control) connectServer() (conn frpNet.Conn, err error) {
return return
} }
// reader read all messages from frps and send to readCh
func (ctl *Control) reader() { func (ctl *Control) reader() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
ctl.Error("panic error: %v", err) ctl.Error("panic error: %v", err)
} }
}() }()
defer ctl.readerShutdown.Done()
defer close(ctl.closedCh) defer close(ctl.closedCh)
encReader := crypto.NewReader(ctl.conn, []byte(config.ClientCommonCfg.PrivilegeToken)) encReader := crypto.NewReader(ctl.conn, []byte(config.ClientCommonCfg.PrivilegeToken))
@ -328,7 +287,9 @@ func (ctl *Control) reader() {
} }
} }
// writer writes messages got from sendCh to frps
func (ctl *Control) writer() { func (ctl *Control) writer() {
defer ctl.writerShutdown.Done()
encWriter, err := crypto.NewWriter(ctl.conn, []byte(config.ClientCommonCfg.PrivilegeToken)) encWriter, err := crypto.NewWriter(ctl.conn, []byte(config.ClientCommonCfg.PrivilegeToken))
if err != nil { if err != nil {
ctl.conn.Error("crypto new writer error: %v", err) ctl.conn.Error("crypto new writer error: %v", err)
@ -348,19 +309,22 @@ func (ctl *Control) writer() {
} }
} }
// manager handles all channel events and do corresponding process // msgHandler handles all channel events and do corresponding operations.
func (ctl *Control) manager() { func (ctl *Control) msgHandler() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
ctl.Error("panic error: %v", err) ctl.Error("panic error: %v", err)
} }
}() }()
defer ctl.msgHandlerShutdown.Done()
hbSend := time.NewTicker(time.Duration(config.ClientCommonCfg.HeartBeatInterval) * time.Second) hbSend := time.NewTicker(time.Duration(config.ClientCommonCfg.HeartBeatInterval) * time.Second)
defer hbSend.Stop() defer hbSend.Stop()
hbCheck := time.NewTicker(time.Second) hbCheck := time.NewTicker(time.Second)
defer hbCheck.Stop() defer hbCheck.Stop()
ctl.lastPong = time.Now()
for { for {
select { select {
case <-hbSend.C: case <-hbSend.C:
@ -381,35 +345,9 @@ func (ctl *Control) manager() {
switch m := rawMsg.(type) { switch m := rawMsg.(type) {
case *msg.ReqWorkConn: case *msg.ReqWorkConn:
go ctl.NewWorkConn() go ctl.HandleReqWorkConn(m)
case *msg.NewProxyResp: case *msg.NewProxyResp:
// Server will return NewProxyResp message to each NewProxy message. ctl.HandleNewProxyResp(m)
// Start a new proxy handler if no error got
if m.Error != "" {
ctl.Warn("[%s] start error: %s", m.ProxyName, m.Error)
continue
}
cfg, ok := ctl.getProxyConf(m.ProxyName)
if !ok {
// it will never go to this branch now
ctl.Warn("[%s] no proxy conf found", m.ProxyName)
continue
}
oldPxy, ok := ctl.getProxy(m.ProxyName)
if ok {
oldPxy.Close()
}
pxy := NewProxy(ctl, cfg)
if err := pxy.Run(); err != nil {
ctl.Warn("[%s] proxy start running error: %v", m.ProxyName, err)
ctl.sendCh <- &msg.CloseProxy{
ProxyName: m.ProxyName,
}
continue
}
ctl.addProxy(m.ProxyName, pxy)
ctl.Info("[%s] start proxy success", m.ProxyName)
case *msg.Pong: case *msg.Pong:
ctl.lastPong = time.Now() ctl.lastPong = time.Now()
ctl.Debug("receive heartbeat from server") ctl.Debug("receive heartbeat from server")
@ -419,10 +357,14 @@ func (ctl *Control) manager() {
} }
// controler keep watching closedCh, start a new connection if previous control connection is closed. // controler keep watching closedCh, start a new connection if previous control connection is closed.
// If controler is notified by closedCh, reader and writer and manager will exit, then recall these functions. // If controler is notified by closedCh, reader and writer and handler will exit, then recall these functions.
func (ctl *Control) controler() { func (ctl *Control) worker() {
go ctl.msgHandler()
go ctl.writer()
go ctl.reader()
var err error var err error
maxDelayTime := 30 * time.Second maxDelayTime := 20 * time.Second
delayTime := time.Second delayTime := time.Second
checkInterval := 10 * time.Second checkInterval := 10 * time.Second
@ -430,41 +372,20 @@ func (ctl *Control) controler() {
for { for {
select { select {
case <-checkProxyTicker.C: case <-checkProxyTicker.C:
// Every 10 seconds, check which proxy registered failed and reregister it to server. // every 10 seconds, check which proxy registered failed and reregister it to server
ctl.mu.RLock() ctl.pm.CheckAndStartProxy()
for _, cfg := range ctl.pxyCfgs {
if _, exist := ctl.proxies[cfg.GetName()]; !exist {
ctl.Info("try to register proxy [%s]", cfg.GetName())
var newProxyMsg msg.NewProxy
cfg.UnMarshalToMsg(&newProxyMsg)
ctl.sendCh <- &newProxyMsg
}
}
for _, cfg := range ctl.visitorCfgs {
if _, exist := ctl.visitors[cfg.GetName()]; !exist {
ctl.Info("try to start visitor [%s]", cfg.GetName())
visitor := NewVisitor(ctl, cfg)
err = visitor.Run()
if err != nil {
visitor.Warn("start error: %v", err)
continue
}
ctl.visitors[cfg.GetName()] = visitor
visitor.Info("start visitor success")
}
}
ctl.mu.RUnlock()
case _, ok := <-ctl.closedCh: case _, ok := <-ctl.closedCh:
// we won't get any variable from this channel // we won't get any variable from this channel
if !ok { if !ok {
// close related channels // close related channels and wait until other goroutines done
close(ctl.readCh) close(ctl.readCh)
close(ctl.sendCh) ctl.readerShutdown.WaitDone()
ctl.msgHandlerShutdown.WaitDone()
for _, pxy := range ctl.proxies { close(ctl.sendCh)
pxy.Close() ctl.writerShutdown.WaitDone()
}
ctl.pm.CloseProxies()
// if ctl.exit is true, just exit // if ctl.exit is true, just exit
ctl.mu.RLock() ctl.mu.RLock()
exit := ctl.exit exit := ctl.exit
@ -473,9 +394,7 @@ func (ctl *Control) controler() {
return return
} }
time.Sleep(time.Second) // loop util reconnecting to server success
// loop util reconnect to server success
for { for {
ctl.Info("try to reconnect to server...") ctl.Info("try to reconnect to server...")
err = ctl.login() err = ctl.login()
@ -488,27 +407,27 @@ func (ctl *Control) controler() {
} }
continue continue
} }
// reconnect success, init the delayTime // reconnect success, init delayTime
delayTime = time.Second delayTime = time.Second
break break
} }
// init related channels and variables // init related channels and variables
ctl.init() ctl.sendCh = make(chan msg.Message, 10)
ctl.readCh = make(chan msg.Message, 10)
ctl.closedCh = make(chan int)
ctl.readerShutdown = shutdown.New()
ctl.writerShutdown = shutdown.New()
ctl.msgHandlerShutdown = shutdown.New()
ctl.pm.Reset(ctl.sendCh, ctl.runId)
// previous work goroutines should be closed and start them here // previous work goroutines should be closed and start them here
go ctl.manager() go ctl.msgHandler()
go ctl.writer() go ctl.writer()
go ctl.reader() go ctl.reader()
// send NewProxy message for all configured proxies // start all configured proxies
ctl.mu.RLock() ctl.pm.CheckAndStartProxy()
for _, cfg := range ctl.pxyCfgs {
var newProxyMsg msg.NewProxy
cfg.UnMarshalToMsg(&newProxyMsg)
ctl.sendCh <- &newProxyMsg
}
ctl.mu.RUnlock()
checkProxyTicker.Stop() checkProxyTicker.Stop()
checkProxyTicker = time.NewTicker(checkInterval) checkProxyTicker = time.NewTicker(checkInterval)
@ -517,106 +436,7 @@ func (ctl *Control) controler() {
} }
} }
func (ctl *Control) setRunId(runId string) { func (ctl *Control) reloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) error {
ctl.mu.Lock() err := ctl.pm.Reload(pxyCfgs, visitorCfgs)
defer ctl.mu.Unlock() return err
ctl.runId = runId
}
func (ctl *Control) getRunId() string {
ctl.mu.RLock()
defer ctl.mu.RUnlock()
return ctl.runId
}
func (ctl *Control) getProxy(name string) (pxy Proxy, ok bool) {
ctl.mu.RLock()
defer ctl.mu.RUnlock()
pxy, ok = ctl.proxies[name]
return
}
func (ctl *Control) addProxy(name string, pxy Proxy) {
ctl.mu.Lock()
defer ctl.mu.Unlock()
ctl.proxies[name] = pxy
}
func (ctl *Control) getProxyConf(name string) (conf config.ProxyConf, ok bool) {
ctl.mu.RLock()
defer ctl.mu.RUnlock()
conf, ok = ctl.pxyCfgs[name]
return
}
func (ctl *Control) reloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) {
ctl.mu.Lock()
defer ctl.mu.Unlock()
removedPxyNames := make([]string, 0)
for name, oldCfg := range ctl.pxyCfgs {
del := false
cfg, ok := pxyCfgs[name]
if !ok {
del = true
} else {
if !oldCfg.Compare(cfg) {
del = true
}
}
if del {
removedPxyNames = append(removedPxyNames, name)
delete(ctl.pxyCfgs, name)
if pxy, ok := ctl.proxies[name]; ok {
pxy.Close()
}
delete(ctl.proxies, name)
ctl.sendCh <- &msg.CloseProxy{
ProxyName: name,
}
}
}
ctl.Info("proxy removed: %v", removedPxyNames)
addedPxyNames := make([]string, 0)
for name, cfg := range pxyCfgs {
if _, ok := ctl.pxyCfgs[name]; !ok {
ctl.pxyCfgs[name] = cfg
addedPxyNames = append(addedPxyNames, name)
}
}
ctl.Info("proxy added: %v", addedPxyNames)
removedVisitorName := make([]string, 0)
for name, oldVisitorCfg := range ctl.visitorCfgs {
del := false
cfg, ok := visitorCfgs[name]
if !ok {
del = true
} else {
if !oldVisitorCfg.Compare(cfg) {
del = true
}
}
if del {
removedVisitorName = append(removedVisitorName, name)
delete(ctl.visitorCfgs, name)
if visitor, ok := ctl.visitors[name]; ok {
visitor.Close()
}
delete(ctl.visitors, name)
}
}
ctl.Info("visitor removed: %v", removedVisitorName)
addedVisitorName := make([]string, 0)
for name, visitorCfg := range visitorCfgs {
if _, ok := ctl.visitorCfgs[name]; !ok {
ctl.visitorCfgs[name] = visitorCfg
addedVisitorName = append(addedVisitorName, name)
}
}
ctl.Info("visitor added: %v", addedVisitorName)
} }

View File

@ -39,13 +39,13 @@ type Proxy interface {
// InWorkConn accept work connections registered to server. // InWorkConn accept work connections registered to server.
InWorkConn(conn frpNet.Conn) InWorkConn(conn frpNet.Conn)
Close() Close()
log.Logger log.Logger
} }
func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy) { func NewProxy(pxyConf config.ProxyConf) (pxy Proxy) {
baseProxy := BaseProxy{ baseProxy := BaseProxy{
ctl: ctl,
Logger: log.NewPrefixLogger(pxyConf.GetName()), Logger: log.NewPrefixLogger(pxyConf.GetName()),
} }
switch cfg := pxyConf.(type) { switch cfg := pxyConf.(type) {
@ -84,7 +84,6 @@ func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy) {
} }
type BaseProxy struct { type BaseProxy struct {
ctl *Control
closed bool closed bool
mu sync.RWMutex mu sync.RWMutex
log.Logger log.Logger

340
client/proxy_manager.go Normal file
View File

@ -0,0 +1,340 @@
package client
import (
"fmt"
"sync"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/errors"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
)
const (
ProxyStatusNew = "new"
ProxyStatusStartErr = "start error"
ProxyStatusRunning = "running"
ProxyStatusClosed = "closed"
)
type ProxyManager struct {
ctl *Control
proxies map[string]*ProxyWrapper
visitorCfgs map[string]config.ProxyConf
visitors map[string]Visitor
sendCh chan (msg.Message)
closed bool
mu sync.RWMutex
log.Logger
}
type ProxyWrapper struct {
Name string
Type string
Status string
Err string
Cfg config.ProxyConf
pxy Proxy
mu sync.RWMutex
}
type ProxyStatus struct {
Name string `json:"name"`
Type string `json:"type"`
Status string `json:"status"`
Err string `json:"err"`
Cfg config.ProxyConf `json:"cfg"`
}
func NewProxyWrapper(cfg config.ProxyConf) *ProxyWrapper {
return &ProxyWrapper{
Name: cfg.GetName(),
Type: cfg.GetType(),
Status: ProxyStatusNew,
Cfg: cfg,
pxy: nil,
}
}
func (pw *ProxyWrapper) IsRunning() bool {
pw.mu.RLock()
defer pw.mu.RUnlock()
if pw.Status == ProxyStatusRunning {
return true
} else {
return false
}
}
func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
pw.mu.RLock()
defer pw.mu.RUnlock()
ps := &ProxyStatus{
Name: pw.Name,
Type: pw.Type,
Status: pw.Status,
Err: pw.Err,
Cfg: pw.Cfg,
}
return ps
}
func (pw *ProxyWrapper) Start(serverRespErr string) error {
if pw.pxy != nil {
pw.pxy.Close()
pw.pxy = nil
}
if serverRespErr != "" {
pw.mu.Lock()
pw.Status = ProxyStatusStartErr
pw.Err = serverRespErr
pw.mu.Unlock()
return fmt.Errorf(serverRespErr)
}
pxy := NewProxy(pw.Cfg)
pw.mu.Lock()
defer pw.mu.Unlock()
if err := pxy.Run(); err != nil {
pw.Status = ProxyStatusStartErr
pw.Err = err.Error()
return err
}
pw.Status = ProxyStatusRunning
pw.Err = ""
pw.pxy = pxy
return nil
}
func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) {
pw.mu.RLock()
pxy := pw.pxy
pw.mu.RUnlock()
if pxy != nil {
workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
go pxy.InWorkConn(workConn)
} else {
workConn.Close()
}
}
func (pw *ProxyWrapper) Close() {
pw.mu.Lock()
defer pw.mu.Unlock()
if pw.pxy != nil {
pw.pxy.Close()
pw.pxy = nil
}
pw.Status = ProxyStatusClosed
}
func NewProxyManager(ctl *Control, msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
return &ProxyManager{
proxies: make(map[string]*ProxyWrapper),
visitorCfgs: make(map[string]config.ProxyConf),
visitors: make(map[string]Visitor),
sendCh: msgSendCh,
closed: false,
Logger: log.NewPrefixLogger(logPrefix),
}
}
func (pm *ProxyManager) Reset(msgSendCh chan (msg.Message), logPrefix string) {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.closed = false
pm.sendCh = msgSendCh
pm.ClearLogPrefix()
pm.AddLogPrefix(logPrefix)
}
// Must hold the lock before calling this function.
func (pm *ProxyManager) sendMsg(m msg.Message) error {
err := errors.PanicToError(func() {
pm.sendCh <- m
})
if err != nil {
pm.closed = true
}
return err
}
func (pm *ProxyManager) StartProxy(name string, serverRespErr string) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.closed {
return fmt.Errorf("ProxyManager is closed now")
}
pxy, ok := pm.proxies[name]
if !ok {
return fmt.Errorf("no proxy found")
}
if err := pxy.Start(serverRespErr); err != nil {
errRet := err
err = pm.sendMsg(&msg.CloseProxy{
ProxyName: name,
})
if err != nil {
errRet = fmt.Errorf("send CloseProxy message error")
}
return errRet
}
return nil
}
func (pm *ProxyManager) CloseProxies() {
pm.mu.RLock()
defer pm.mu.RUnlock()
for _, pxy := range pm.proxies {
pxy.Close()
}
}
func (pm *ProxyManager) CheckAndStartProxy() {
pm.mu.RLock()
defer pm.mu.RUnlock()
if pm.closed {
pm.Warn("CheckAndStartProxy error: ProxyManager is closed now")
return
}
for _, pxy := range pm.proxies {
if !pxy.IsRunning() {
var newProxyMsg msg.NewProxy
pxy.Cfg.UnMarshalToMsg(&newProxyMsg)
err := pm.sendMsg(&newProxyMsg)
if err != nil {
pm.Warn("[%s] proxy send NewProxy message error")
return
}
}
}
for _, cfg := range pm.visitorCfgs {
if _, exist := pm.visitors[cfg.GetName()]; !exist {
pm.Info("try to start visitor [%s]", cfg.GetName())
visitor := NewVisitor(pm.ctl, cfg)
err := visitor.Run()
if err != nil {
visitor.Warn("start error: %v", err)
continue
}
pm.visitors[cfg.GetName()] = visitor
visitor.Info("start visitor success")
}
}
}
func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.closed {
err := fmt.Errorf("Reload error: ProxyManager is closed now")
pm.Warn(err.Error())
return err
}
delPxyNames := make([]string, 0)
for name, pxy := range pm.proxies {
del := false
cfg, ok := pxyCfgs[name]
if !ok {
del = true
} else {
if !pxy.Cfg.Compare(cfg) {
del = true
}
}
if del {
delPxyNames = append(delPxyNames, name)
delete(pm.proxies, name)
pxy.Close()
err := pm.sendMsg(&msg.CloseProxy{
ProxyName: name,
})
if err != nil {
err = fmt.Errorf("Reload error: ProxyManager is closed now")
pm.Warn(err.Error())
return err
}
}
}
pm.Info("proxy removed: %v", delPxyNames)
addPxyNames := make([]string, 0)
for name, cfg := range pxyCfgs {
if _, ok := pm.proxies[name]; !ok {
pxy := NewProxyWrapper(cfg)
pm.proxies[name] = pxy
addPxyNames = append(addPxyNames, name)
}
}
pm.Info("proxy added: %v", addPxyNames)
delVisitorName := make([]string, 0)
for name, oldVisitorCfg := range pm.visitorCfgs {
del := false
cfg, ok := visitorCfgs[name]
if !ok {
del = true
} else {
if !oldVisitorCfg.Compare(cfg) {
del = true
}
}
if del {
delVisitorName = append(delVisitorName, name)
delete(pm.visitorCfgs, name)
if visitor, ok := pm.visitors[name]; ok {
visitor.Close()
}
delete(pm.visitors, name)
}
}
pm.Info("visitor removed: %v", delVisitorName)
addVisitorName := make([]string, 0)
for name, visitorCfg := range visitorCfgs {
if _, ok := pm.visitorCfgs[name]; !ok {
pm.visitorCfgs[name] = visitorCfg
addVisitorName = append(addVisitorName, name)
}
}
pm.Info("visitor added: %v", addVisitorName)
return nil
}
func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) {
pm.mu.RLock()
pw, ok := pm.proxies[name]
pm.mu.RUnlock()
if ok {
pw.InWorkConn(workConn)
} else {
workConn.Close()
}
}
func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
ps := make([]*ProxyStatus, 0)
pm.mu.RLock()
defer pm.mu.RUnlock()
for _, pxy := range pm.proxies {
ps = append(ps, pxy.GetStatus())
}
return ps
}

View File

@ -53,6 +53,6 @@ func (svr *Service) Run() error {
return nil return nil
} }
func (svr *Service) Close() error { func (svr *Service) Close() {
return svr.ctl.Close() svr.ctl.Close()
} }

View File

@ -28,6 +28,7 @@ import (
"time" "time"
docopt "github.com/docopt/docopt-go" docopt "github.com/docopt/docopt-go"
"github.com/rodaine/table"
ini "github.com/vaughan0/go-ini" ini "github.com/vaughan0/go-ini"
"github.com/fatedier/frp/client" "github.com/fatedier/frp/client"
@ -44,7 +45,8 @@ var usage string = `frpc is the client of frp
Usage: Usage:
frpc [-c config_file] [-L log_file] [--log-level=<log_level>] [--server-addr=<server_addr>] frpc [-c config_file] [-L log_file] [--log-level=<log_level>] [--server-addr=<server_addr>]
frpc [-c config_file] --reload frpc reload [-c config_file]
frpc status [-c config_file]
frpc -h | --help frpc -h | --help
frpc -v | --version frpc -v | --version
@ -53,7 +55,6 @@ Options:
-L log_file set output log file, including console -L log_file set output log file, including console
--log-level=<log_level> set log level: debug, info, warn, error --log-level=<log_level> set log level: debug, info, warn, error
--server-addr=<server_addr> addr which frps is listening for, example: 0.0.0.0:7000 --server-addr=<server_addr> addr which frps is listening for, example: 0.0.0.0:7000
--reload reload configure file without program exit
-h --help show this screen -h --help show this screen
-v --version show version -v --version show version
` `
@ -82,40 +83,25 @@ func main() {
config.ClientCommonCfg.ConfigFile = confFile config.ClientCommonCfg.ConfigFile = confFile
// check if reload command // check if reload command
if args["--reload"] != nil { if args["reload"] != nil {
if args["--reload"].(bool) { if args["reload"].(bool) {
req, err := http.NewRequest("GET", "http://"+ if err = CmdReload(); err != nil {
config.ClientCommonCfg.AdminAddr+":"+fmt.Sprintf("%d", config.ClientCommonCfg.AdminPort)+"/api/reload", nil)
if err != nil {
fmt.Printf("frps reload error: %v\n", err) fmt.Printf("frps reload error: %v\n", err)
os.Exit(1) os.Exit(1)
} else {
fmt.Printf("reload success\n")
os.Exit(0)
} }
}
}
authStr := "Basic " + base64.StdEncoding.EncodeToString([]byte(config.ClientCommonCfg.AdminUser+":"+ // check if status command
config.ClientCommonCfg.AdminPwd)) if args["status"] != nil {
if args["status"].(bool) {
req.Header.Add("Authorization", authStr) if err = CmdStatus(); err != nil {
resp, err := http.DefaultClient.Do(req) fmt.Println("frps get status error: %v\n", err)
if err != nil {
fmt.Printf("frpc reload error: %v\n", err)
os.Exit(1) os.Exit(1)
} else { } else {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Printf("frpc reload error: %v\n", err)
os.Exit(1)
}
res := &client.GeneralResponse{}
err = json.Unmarshal(body, &res)
if err != nil {
fmt.Printf("http response error: %s\n", strings.TrimSpace(string(body)))
os.Exit(1)
} else if res.Code != 0 {
fmt.Printf("reload error: %s\n", res.Msg)
os.Exit(1)
}
fmt.Printf("reload success\n")
os.Exit(0) os.Exit(0)
} }
} }
@ -187,3 +173,133 @@ func HandleSignal(svr *client.Service) {
time.Sleep(250 * time.Millisecond) time.Sleep(250 * time.Millisecond)
os.Exit(0) os.Exit(0)
} }
func CmdReload() error {
if config.ClientCommonCfg.AdminPort == 0 {
return fmt.Errorf("admin_port shoud be set if you want to use reload feature")
}
req, err := http.NewRequest("GET", "http://"+
config.ClientCommonCfg.AdminAddr+":"+fmt.Sprintf("%d", config.ClientCommonCfg.AdminPort)+"/api/reload", nil)
if err != nil {
return err
}
authStr := "Basic " + base64.StdEncoding.EncodeToString([]byte(config.ClientCommonCfg.AdminUser+":"+
config.ClientCommonCfg.AdminPwd))
req.Header.Add("Authorization", authStr)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
} else {
if resp.StatusCode != 200 {
return fmt.Errorf("admin api status code [%d]", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
res := &client.GeneralResponse{}
err = json.Unmarshal(body, &res)
if err != nil {
return fmt.Errorf("unmarshal http response error: %s", strings.TrimSpace(string(body)))
} else if res.Code != 0 {
return fmt.Errorf(res.Msg)
}
}
return nil
}
func CmdStatus() error {
if config.ClientCommonCfg.AdminPort == 0 {
return fmt.Errorf("admin_port shoud be set if you want to get proxy status")
}
req, err := http.NewRequest("GET", "http://"+
config.ClientCommonCfg.AdminAddr+":"+fmt.Sprintf("%d", config.ClientCommonCfg.AdminPort)+"/api/status", nil)
if err != nil {
return err
}
authStr := "Basic " + base64.StdEncoding.EncodeToString([]byte(config.ClientCommonCfg.AdminUser+":"+
config.ClientCommonCfg.AdminPwd))
req.Header.Add("Authorization", authStr)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
} else {
if resp.StatusCode != 200 {
return fmt.Errorf("admin api status code [%d]", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
res := &client.StatusResp{}
err = json.Unmarshal(body, &res)
if err != nil {
return fmt.Errorf("unmarshal http response error: %s", strings.TrimSpace(string(body)))
}
fmt.Println("Proxy Status...")
if len(res.Tcp) > 0 {
fmt.Printf("TCP")
tbl := table.New("Name", "Status", "LocalAddr", "Plugin", "RemoteAddr", "Error")
for _, ps := range res.Tcp {
tbl.AddRow(ps.Name, ps.Status, ps.LocalAddr, ps.Plugin, ps.RemoteAddr, ps.Err)
}
tbl.Print()
fmt.Println("")
}
if len(res.Udp) > 0 {
fmt.Printf("UDP")
tbl := table.New("Name", "Status", "LocalAddr", "Plugin", "RemoteAddr", "Error")
for _, ps := range res.Udp {
tbl.AddRow(ps.Name, ps.Status, ps.LocalAddr, ps.Plugin, ps.RemoteAddr, ps.Err)
}
tbl.Print()
fmt.Println("")
}
if len(res.Http) > 0 {
fmt.Printf("HTTP")
tbl := table.New("Name", "Status", "LocalAddr", "Plugin", "RemoteAddr", "Error")
for _, ps := range res.Http {
tbl.AddRow(ps.Name, ps.Status, ps.LocalAddr, ps.Plugin, ps.RemoteAddr, ps.Err)
}
tbl.Print()
fmt.Println("")
}
if len(res.Https) > 0 {
fmt.Printf("HTTPS")
tbl := table.New("Name", "Status", "LocalAddr", "Plugin", "RemoteAddr", "Error")
for _, ps := range res.Https {
tbl.AddRow(ps.Name, ps.Status, ps.LocalAddr, ps.Plugin, ps.RemoteAddr, ps.Err)
}
tbl.Print()
fmt.Println("")
}
if len(res.Stcp) > 0 {
fmt.Printf("STCP")
tbl := table.New("Name", "Status", "LocalAddr", "Plugin", "RemoteAddr", "Error")
for _, ps := range res.Stcp {
tbl.AddRow(ps.Name, ps.Status, ps.LocalAddr, ps.Plugin, ps.RemoteAddr, ps.Err)
}
tbl.Print()
fmt.Println("")
}
if len(res.Xtcp) > 0 {
fmt.Printf("XTCP")
tbl := table.New("Name", "Status", "LocalAddr", "Plugin", "RemoteAddr", "Error")
for _, ps := range res.Xtcp {
tbl.AddRow(ps.Name, ps.Status, ps.LocalAddr, ps.Plugin, ps.RemoteAddr, ps.Err)
}
tbl.Print()
fmt.Println("")
}
}
return nil
}

View File

@ -52,6 +52,7 @@ func NewConfByType(proxyType string) ProxyConf {
type ProxyConf interface { type ProxyConf interface {
GetName() string GetName() string
GetType() string
GetBaseInfo() *BaseProxyConf GetBaseInfo() *BaseProxyConf
LoadFromMsg(pMsg *msg.NewProxy) LoadFromMsg(pMsg *msg.NewProxy)
LoadFromFile(name string, conf ini.Section) error LoadFromFile(name string, conf ini.Section) error
@ -103,6 +104,10 @@ func (cfg *BaseProxyConf) GetName() string {
return cfg.ProxyName return cfg.ProxyName
} }
func (cfg *BaseProxyConf) GetType() string {
return cfg.ProxyType
}
func (cfg *BaseProxyConf) GetBaseInfo() *BaseProxyConf { func (cfg *BaseProxyConf) GetBaseInfo() *BaseProxyConf {
return cfg return cfg
} }

View File

@ -253,13 +253,13 @@ func (ctl *Control) stoper() {
ctl.allShutdown.WaitStart() ctl.allShutdown.WaitStart()
close(ctl.readCh) close(ctl.readCh)
ctl.managerShutdown.WaitDown() ctl.managerShutdown.WaitDone()
close(ctl.sendCh) close(ctl.sendCh)
ctl.writerShutdown.WaitDown() ctl.writerShutdown.WaitDone()
ctl.conn.Close() ctl.conn.Close()
ctl.readerShutdown.WaitDown() ctl.readerShutdown.WaitDone()
close(ctl.workConnCh) close(ctl.workConnCh)
for workConn := range ctl.workConnCh { for workConn := range ctl.workConnCh {

View File

@ -283,7 +283,7 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
ctl := NewControl(svr, ctlConn, loginMsg) ctl := NewControl(svr, ctlConn, loginMsg)
if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil { if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil {
oldCtl.allShutdown.WaitDown() oldCtl.allShutdown.WaitDone()
} }
ctlConn.AddLogPrefix(loginMsg.RunId) ctlConn.AddLogPrefix(loginMsg.RunId)

View File

@ -19,19 +19,19 @@ import (
) )
type Shutdown struct { type Shutdown struct {
doing bool doing bool
ending bool ending bool
start chan struct{} startCh chan struct{}
down chan struct{} doneCh chan struct{}
mu sync.Mutex mu sync.Mutex
} }
func New() *Shutdown { func New() *Shutdown {
return &Shutdown{ return &Shutdown{
doing: false, doing: false,
ending: false, ending: false,
start: make(chan struct{}), startCh: make(chan struct{}),
down: make(chan struct{}), doneCh: make(chan struct{}),
} }
} }
@ -40,12 +40,12 @@ func (s *Shutdown) Start() {
defer s.mu.Unlock() defer s.mu.Unlock()
if !s.doing { if !s.doing {
s.doing = true s.doing = true
close(s.start) close(s.startCh)
} }
} }
func (s *Shutdown) WaitStart() { func (s *Shutdown) WaitStart() {
<-s.start <-s.startCh
} }
func (s *Shutdown) Done() { func (s *Shutdown) Done() {
@ -53,10 +53,10 @@ func (s *Shutdown) Done() {
defer s.mu.Unlock() defer s.mu.Unlock()
if !s.ending { if !s.ending {
s.ending = true s.ending = true
close(s.down) close(s.doneCh)
} }
} }
func (s *Shutdown) WaitDown() { func (s *Shutdown) WaitDone() {
<-s.down <-s.doneCh
} }