frp/server/control.go

418 lines
9.7 KiB
Go
Raw Normal View History

2017-03-22 18:01:25 +00:00
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2017-03-08 18:03:47 +00:00
package server
import (
"fmt"
"io"
"sync"
"time"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/consts"
"github.com/fatedier/frp/models/msg"
2017-03-09 17:42:06 +00:00
"github.com/fatedier/frp/utils/crypto"
2017-03-08 18:03:47 +00:00
"github.com/fatedier/frp/utils/errors"
"github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/shutdown"
"github.com/fatedier/frp/utils/version"
)
type Control struct {
// frps service
svr *Service
// login message
loginMsg *msg.Login
// control connection
conn net.Conn
// put a message in this channel to send it over control connection to client
sendCh chan (msg.Message)
// read from this channel to get the next message sent by client
readCh chan (msg.Message)
// work connections
workConnCh chan net.Conn
// proxies in one client
2017-06-11 09:22:05 +00:00
proxies map[string]Proxy
2017-03-08 18:03:47 +00:00
// pool count
poolCount int
2018-01-26 06:56:55 +00:00
// ports used, for limitations
portsUsedNum int
2017-03-08 18:03:47 +00:00
// last time got the Ping message
lastPing time.Time
// A new run id will be generated when a new client login.
// If run id got from login message has same run id, it means it's the same client, so we can
// replace old controller instantly.
runId string
// control status
status string
readerShutdown *shutdown.Shutdown
writerShutdown *shutdown.Shutdown
managerShutdown *shutdown.Shutdown
allShutdown *shutdown.Shutdown
mu sync.RWMutex
}
func NewControl(svr *Service, ctlConn net.Conn, loginMsg *msg.Login) *Control {
return &Control{
svr: svr,
conn: ctlConn,
loginMsg: loginMsg,
sendCh: make(chan msg.Message, 10),
readCh: make(chan msg.Message, 10),
workConnCh: make(chan net.Conn, loginMsg.PoolCount+10),
2017-06-11 09:22:05 +00:00
proxies: make(map[string]Proxy),
2017-03-08 18:03:47 +00:00
poolCount: loginMsg.PoolCount,
2018-01-26 06:56:55 +00:00
portsUsedNum: 0,
2017-03-08 18:03:47 +00:00
lastPing: time.Now(),
runId: loginMsg.RunId,
status: consts.Working,
readerShutdown: shutdown.New(),
writerShutdown: shutdown.New(),
managerShutdown: shutdown.New(),
allShutdown: shutdown.New(),
}
}
// Start send a login success message to client and start working.
func (ctl *Control) Start() {
2017-03-09 17:42:06 +00:00
loginRespMsg := &msg.LoginResp{
2017-10-24 10:20:07 +00:00
Version: version.Full(),
RunId: ctl.runId,
ServerUdpPort: config.ServerCommonCfg.BindUdpPort,
Error: "",
2017-03-08 18:03:47 +00:00
}
2017-03-09 17:42:06 +00:00
msg.WriteMsg(ctl.conn, loginRespMsg)
2017-03-08 18:03:47 +00:00
2017-03-09 17:42:06 +00:00
go ctl.writer()
2017-03-08 18:03:47 +00:00
for i := 0; i < ctl.poolCount; i++ {
ctl.sendCh <- &msg.ReqWorkConn{}
}
go ctl.manager()
go ctl.reader()
go ctl.stoper()
}
func (ctl *Control) RegisterWorkConn(conn net.Conn) {
defer func() {
if err := recover(); err != nil {
ctl.conn.Error("panic error: %v", err)
}
}()
select {
case ctl.workConnCh <- conn:
2017-03-11 18:03:24 +00:00
ctl.conn.Debug("new work connection registered")
2017-03-08 18:03:47 +00:00
default:
2017-03-11 18:03:24 +00:00
ctl.conn.Debug("work connection pool is full, discarding")
2017-03-08 18:03:47 +00:00
conn.Close()
}
}
// When frps get one user connection, we get one work connection from the pool and return it.
// If no workConn available in the pool, send message to frpc to get one or more
// and wait until it is available.
// return an error if wait timeout
func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
defer func() {
if err := recover(); err != nil {
ctl.conn.Error("panic error: %v", err)
}
}()
var ok bool
// get a work connection from the pool
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
2017-03-12 18:44:47 +00:00
err = errors.ErrCtlClosed
2017-03-08 18:03:47 +00:00
return
}
ctl.conn.Debug("get work connection from pool")
default:
// no work connections available in the poll, send message to frpc to get more
err = errors.PanicToError(func() {
ctl.sendCh <- &msg.ReqWorkConn{}
})
if err != nil {
ctl.conn.Error("%v", err)
return
}
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
2017-03-12 18:44:47 +00:00
err = errors.ErrCtlClosed
ctl.conn.Warn("no work connections avaiable, %v", err)
2017-03-08 18:03:47 +00:00
return
}
case <-time.After(time.Duration(config.ServerCommonCfg.UserConnTimeout) * time.Second):
err = fmt.Errorf("timeout trying to get work connection")
ctl.conn.Warn("%v", err)
return
}
}
// When we get a work connection from pool, replace it with a new one.
errors.PanicToError(func() {
ctl.sendCh <- &msg.ReqWorkConn{}
})
return
}
func (ctl *Control) Replaced(newCtl *Control) {
ctl.conn.Info("Replaced by client [%s]", newCtl.runId)
ctl.runId = ""
ctl.allShutdown.Start()
}
func (ctl *Control) writer() {
defer func() {
if err := recover(); err != nil {
ctl.conn.Error("panic error: %v", err)
}
}()
defer ctl.allShutdown.Start()
defer ctl.writerShutdown.Done()
2017-03-09 17:42:06 +00:00
encWriter, err := crypto.NewWriter(ctl.conn, []byte(config.ServerCommonCfg.PrivilegeToken))
if err != nil {
ctl.conn.Error("crypto new writer error: %v", err)
ctl.allShutdown.Start()
return
}
2017-03-08 18:03:47 +00:00
for {
if m, ok := <-ctl.sendCh; !ok {
ctl.conn.Info("control writer is closing")
return
} else {
2017-03-09 17:42:06 +00:00
if err := msg.WriteMsg(encWriter, m); err != nil {
2017-03-08 18:03:47 +00:00
ctl.conn.Warn("write message to control connection error: %v", err)
return
}
}
}
}
func (ctl *Control) reader() {
defer func() {
if err := recover(); err != nil {
ctl.conn.Error("panic error: %v", err)
}
}()
defer ctl.allShutdown.Start()
defer ctl.readerShutdown.Done()
2017-03-09 18:01:17 +00:00
encReader := crypto.NewReader(ctl.conn, []byte(config.ServerCommonCfg.PrivilegeToken))
2017-03-08 18:03:47 +00:00
for {
2017-03-09 17:42:06 +00:00
if m, err := msg.ReadMsg(encReader); err != nil {
2017-03-08 18:03:47 +00:00
if err == io.EOF {
ctl.conn.Debug("control connection closed")
return
} else {
ctl.conn.Warn("read error: %v", err)
return
}
} else {
ctl.readCh <- m
}
}
}
func (ctl *Control) stoper() {
defer func() {
if err := recover(); err != nil {
ctl.conn.Error("panic error: %v", err)
}
}()
ctl.allShutdown.WaitStart()
close(ctl.readCh)
2018-01-16 17:09:33 +00:00
ctl.managerShutdown.WaitDone()
2017-03-08 18:03:47 +00:00
close(ctl.sendCh)
2018-01-16 17:09:33 +00:00
ctl.writerShutdown.WaitDone()
2017-03-08 18:03:47 +00:00
ctl.conn.Close()
2018-01-16 17:09:33 +00:00
ctl.readerShutdown.WaitDone()
2017-03-08 18:03:47 +00:00
2018-03-19 12:22:15 +00:00
ctl.mu.Lock()
defer ctl.mu.Unlock()
2017-03-08 18:03:47 +00:00
close(ctl.workConnCh)
for workConn := range ctl.workConnCh {
workConn.Close()
}
for _, pxy := range ctl.proxies {
pxy.Close()
2017-03-12 18:44:47 +00:00
ctl.svr.DelProxy(pxy.GetName())
StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
2017-03-08 18:03:47 +00:00
}
ctl.allShutdown.Done()
2017-03-09 18:01:17 +00:00
ctl.conn.Info("client exit success")
2017-03-22 18:01:25 +00:00
StatsCloseClient()
2017-03-08 18:03:47 +00:00
}
func (ctl *Control) manager() {
defer func() {
if err := recover(); err != nil {
ctl.conn.Error("panic error: %v", err)
}
}()
defer ctl.allShutdown.Start()
defer ctl.managerShutdown.Done()
heartbeat := time.NewTicker(time.Second)
defer heartbeat.Stop()
for {
select {
case <-heartbeat.C:
if time.Since(ctl.lastPing) > time.Duration(config.ServerCommonCfg.HeartBeatTimeout)*time.Second {
ctl.conn.Warn("heartbeat timeout")
ctl.allShutdown.Start()
2018-03-19 12:22:15 +00:00
return
2017-03-08 18:03:47 +00:00
}
case rawMsg, ok := <-ctl.readCh:
if !ok {
return
}
switch m := rawMsg.(type) {
case *msg.NewProxy:
// register proxy in this control
2018-01-17 06:40:08 +00:00
remoteAddr, err := ctl.RegisterProxy(m)
2017-03-08 18:03:47 +00:00
resp := &msg.NewProxyResp{
ProxyName: m.ProxyName,
}
if err != nil {
resp.Error = err.Error()
ctl.conn.Warn("new proxy [%s] error: %v", m.ProxyName, err)
} else {
2018-01-17 06:40:08 +00:00
resp.RemoteAddr = remoteAddr
2017-03-08 18:03:47 +00:00
ctl.conn.Info("new proxy [%s] success", m.ProxyName)
2017-03-22 18:01:25 +00:00
StatsNewProxy(m.ProxyName, m.ProxyType)
2017-03-08 18:03:47 +00:00
}
ctl.sendCh <- resp
2017-06-11 09:22:05 +00:00
case *msg.CloseProxy:
ctl.CloseProxy(m)
ctl.conn.Info("close proxy [%s] success", m.ProxyName)
2017-03-08 18:03:47 +00:00
case *msg.Ping:
ctl.lastPing = time.Now()
2017-03-27 09:25:25 +00:00
ctl.conn.Debug("receive heartbeat")
2017-03-08 18:03:47 +00:00
ctl.sendCh <- &msg.Pong{}
}
}
}
}
2018-01-17 06:40:08 +00:00
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
2017-03-08 18:03:47 +00:00
var pxyConf config.ProxyConf
// Load configures from NewProxy message and check.
pxyConf, err = config.NewProxyConf(pxyMsg)
if err != nil {
2018-01-17 06:40:08 +00:00
return
2017-03-08 18:03:47 +00:00
}
// NewProxy will return a interface Proxy.
// In fact it create different proxies by different proxy type, we just call run() here.
pxy, err := NewProxy(ctl, pxyConf)
if err != nil {
2018-01-17 06:40:08 +00:00
return remoteAddr, err
2017-03-08 18:03:47 +00:00
}
2018-01-26 06:56:55 +00:00
// Check ports used number in each client
if config.ServerCommonCfg.MaxPortsPerClient > 0 {
ctl.mu.Lock()
if ctl.portsUsedNum+pxy.GetUsedPortsNum() > int(config.ServerCommonCfg.MaxPortsPerClient) {
ctl.mu.Unlock()
err = fmt.Errorf("exceed the max_ports_per_client")
return
}
ctl.portsUsedNum = ctl.portsUsedNum + pxy.GetUsedPortsNum()
ctl.mu.Unlock()
defer func() {
if err != nil {
ctl.mu.Lock()
ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum()
ctl.mu.Unlock()
}
}()
}
2018-01-17 06:40:08 +00:00
remoteAddr, err = pxy.Run()
2017-03-08 18:03:47 +00:00
if err != nil {
2018-01-17 06:40:08 +00:00
return
2017-03-08 18:03:47 +00:00
}
defer func() {
if err != nil {
pxy.Close()
}
}()
err = ctl.svr.RegisterProxy(pxyMsg.ProxyName, pxy)
if err != nil {
2018-01-17 06:40:08 +00:00
return
2017-03-08 18:03:47 +00:00
}
2017-06-11 09:22:05 +00:00
ctl.mu.Lock()
ctl.proxies[pxy.GetName()] = pxy
ctl.mu.Unlock()
2018-01-17 06:40:08 +00:00
return
2017-03-08 18:03:47 +00:00
}
2017-06-11 09:22:05 +00:00
func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
ctl.mu.Lock()
pxy, ok := ctl.proxies[closeMsg.ProxyName]
if !ok {
2018-01-26 06:56:55 +00:00
ctl.mu.Unlock()
2017-06-11 09:22:05 +00:00
return
}
2018-01-26 06:56:55 +00:00
if config.ServerCommonCfg.MaxPortsPerClient > 0 {
ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum()
}
2017-06-11 09:22:05 +00:00
pxy.Close()
ctl.svr.DelProxy(pxy.GetName())
2017-06-26 17:59:30 +00:00
delete(ctl.proxies, closeMsg.ProxyName)
2018-01-26 06:56:55 +00:00
ctl.mu.Unlock()
2017-06-11 09:22:05 +00:00
StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
return
}