mv folders

This commit is contained in:
fatedier
2018-12-09 22:06:22 +08:00
parent aea9f9fbcc
commit 35278ad17f
8 changed files with 35 additions and 30 deletions

455
client/proxy/proxy.go Normal file
View File

@@ -0,0 +1,455 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"bytes"
"fmt"
"io"
"net"
"sync"
"time"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/plugin"
"github.com/fatedier/frp/models/proto/udp"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/golib/errors"
frpIo "github.com/fatedier/golib/io"
"github.com/fatedier/golib/pool"
)
// Proxy defines how to handle work connections for different proxy type.
type Proxy interface {
Run() error
// InWorkConn accept work connections registered to server.
InWorkConn(conn frpNet.Conn)
Close()
log.Logger
}
func NewProxy(pxyConf config.ProxyConf) (pxy Proxy) {
baseProxy := BaseProxy{
Logger: log.NewPrefixLogger(pxyConf.GetBaseInfo().ProxyName),
}
switch cfg := pxyConf.(type) {
case *config.TcpProxyConf:
pxy = &TcpProxy{
BaseProxy: baseProxy,
cfg: cfg,
}
case *config.UdpProxyConf:
pxy = &UdpProxy{
BaseProxy: baseProxy,
cfg: cfg,
}
case *config.HttpProxyConf:
pxy = &HttpProxy{
BaseProxy: baseProxy,
cfg: cfg,
}
case *config.HttpsProxyConf:
pxy = &HttpsProxy{
BaseProxy: baseProxy,
cfg: cfg,
}
case *config.StcpProxyConf:
pxy = &StcpProxy{
BaseProxy: baseProxy,
cfg: cfg,
}
case *config.XtcpProxyConf:
pxy = &XtcpProxy{
BaseProxy: baseProxy,
cfg: cfg,
}
}
return
}
type BaseProxy struct {
closed bool
mu sync.RWMutex
log.Logger
}
// TCP
type TcpProxy struct {
BaseProxy
cfg *config.TcpProxyConf
proxyPlugin plugin.Plugin
}
func (pxy *TcpProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
func (pxy *TcpProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
func (pxy *TcpProxy) InWorkConn(conn frpNet.Conn) {
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
[]byte(g.GlbClientCfg.Token))
}
// HTTP
type HttpProxy struct {
BaseProxy
cfg *config.HttpProxyConf
proxyPlugin plugin.Plugin
}
func (pxy *HttpProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
func (pxy *HttpProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
func (pxy *HttpProxy) InWorkConn(conn frpNet.Conn) {
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
[]byte(g.GlbClientCfg.Token))
}
// HTTPS
type HttpsProxy struct {
BaseProxy
cfg *config.HttpsProxyConf
proxyPlugin plugin.Plugin
}
func (pxy *HttpsProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
func (pxy *HttpsProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn) {
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
[]byte(g.GlbClientCfg.Token))
}
// STCP
type StcpProxy struct {
BaseProxy
cfg *config.StcpProxyConf
proxyPlugin plugin.Plugin
}
func (pxy *StcpProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
func (pxy *StcpProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
func (pxy *StcpProxy) InWorkConn(conn frpNet.Conn) {
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
[]byte(g.GlbClientCfg.Token))
}
// XTCP
type XtcpProxy struct {
BaseProxy
cfg *config.XtcpProxyConf
proxyPlugin plugin.Plugin
}
func (pxy *XtcpProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
func (pxy *XtcpProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn) {
defer conn.Close()
var natHoleSidMsg msg.NatHoleSid
err := msg.ReadMsgInto(conn, &natHoleSidMsg)
if err != nil {
pxy.Error("xtcp read from workConn error: %v", err)
return
}
natHoleClientMsg := &msg.NatHoleClient{
ProxyName: pxy.cfg.ProxyName,
Sid: natHoleSidMsg.Sid,
}
raddr, _ := net.ResolveUDPAddr("udp",
fmt.Sprintf("%s:%d", g.GlbClientCfg.ServerAddr, g.GlbClientCfg.ServerUdpPort))
clientConn, err := net.DialUDP("udp", nil, raddr)
defer clientConn.Close()
err = msg.WriteMsg(clientConn, natHoleClientMsg)
if err != nil {
pxy.Error("send natHoleClientMsg to server error: %v", err)
return
}
// Wait for client address at most 5 seconds.
var natHoleRespMsg msg.NatHoleResp
clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))
buf := pool.GetBuf(1024)
n, err := clientConn.Read(buf)
if err != nil {
pxy.Error("get natHoleRespMsg error: %v", err)
return
}
err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
if err != nil {
pxy.Error("get natHoleRespMsg error: %v", err)
return
}
clientConn.SetReadDeadline(time.Time{})
clientConn.Close()
if natHoleRespMsg.Error != "" {
pxy.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
return
}
pxy.Trace("get natHoleRespMsg, sid [%s], client address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr)
// Send sid to visitor udp address.
time.Sleep(time.Second)
laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String())
daddr, err := net.ResolveUDPAddr("udp", natHoleRespMsg.VisitorAddr)
if err != nil {
pxy.Error("resolve visitor udp address error: %v", err)
return
}
lConn, err := net.DialUDP("udp", laddr, daddr)
if err != nil {
pxy.Error("dial visitor udp address error: %v", err)
return
}
lConn.Write([]byte(natHoleRespMsg.Sid))
kcpConn, err := frpNet.NewKcpConnFromUdp(lConn, true, natHoleRespMsg.VisitorAddr)
if err != nil {
pxy.Error("create kcp connection from udp connection error: %v", err)
return
}
HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf,
frpNet.WrapConn(kcpConn), []byte(pxy.cfg.Sk))
}
// UDP
type UdpProxy struct {
BaseProxy
cfg *config.UdpProxyConf
localAddr *net.UDPAddr
readCh chan *msg.UdpPacket
// include msg.UdpPacket and msg.Ping
sendCh chan msg.Message
workConn frpNet.Conn
}
func (pxy *UdpProxy) Run() (err error) {
pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIp, pxy.cfg.LocalPort))
if err != nil {
return
}
return
}
func (pxy *UdpProxy) Close() {
pxy.mu.Lock()
defer pxy.mu.Unlock()
if !pxy.closed {
pxy.closed = true
if pxy.workConn != nil {
pxy.workConn.Close()
}
if pxy.readCh != nil {
close(pxy.readCh)
}
if pxy.sendCh != nil {
close(pxy.sendCh)
}
}
}
func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) {
pxy.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
// close resources releated with old workConn
pxy.Close()
pxy.mu.Lock()
pxy.workConn = conn
pxy.readCh = make(chan *msg.UdpPacket, 1024)
pxy.sendCh = make(chan msg.Message, 1024)
pxy.closed = false
pxy.mu.Unlock()
workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) {
for {
var udpMsg msg.UdpPacket
if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
pxy.Warn("read from workConn for udp error: %v", errRet)
return
}
if errRet := errors.PanicToError(func() {
pxy.Trace("get udp package from workConn: %s", udpMsg.Content)
readCh <- &udpMsg
}); errRet != nil {
pxy.Info("reader goroutine for udp work connection closed: %v", errRet)
return
}
}
}
workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
defer func() {
pxy.Info("writer goroutine for udp work connection closed")
}()
var errRet error
for rawMsg := range sendCh {
switch m := rawMsg.(type) {
case *msg.UdpPacket:
pxy.Trace("send udp package to workConn: %s", m.Content)
case *msg.Ping:
pxy.Trace("send ping message to udp workConn")
}
if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
pxy.Error("udp work write error: %v", errRet)
return
}
}
}
heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
var errRet error
for {
time.Sleep(time.Duration(30) * time.Second)
if errRet = errors.PanicToError(func() {
sendCh <- &msg.Ping{}
}); errRet != nil {
pxy.Trace("heartbeat goroutine for udp work connection closed")
break
}
}
}
go workConnSenderFn(pxy.workConn, pxy.sendCh)
go workConnReaderFn(pxy.workConn, pxy.readCh)
go heartbeatFn(pxy.workConn, pxy.sendCh)
udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh)
}
// Common handler for tcp work connections.
func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
baseInfo *config.BaseProxyConf, workConn frpNet.Conn, encKey []byte) {
var (
remote io.ReadWriteCloser
err error
)
remote = workConn
if baseInfo.UseEncryption {
remote, err = frpIo.WithEncryption(remote, encKey)
if err != nil {
workConn.Close()
workConn.Error("create encryption stream error: %v", err)
return
}
}
if baseInfo.UseCompression {
remote = frpIo.WithCompression(remote)
}
if proxyPlugin != nil {
// if plugin is set, let plugin handle connections first
workConn.Debug("handle by plugin: %s", proxyPlugin.Name())
proxyPlugin.Handle(remote, workConn)
workConn.Debug("handle by plugin finished")
return
} else {
localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort))
if err != nil {
workConn.Close()
workConn.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err)
return
}
workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
frpIo.Join(localConn, remote)
workConn.Debug("join connections closed")
}
}

View File

@@ -0,0 +1,138 @@
package proxy
import (
"fmt"
"sync"
"github.com/fatedier/frp/client/event"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/golib/errors"
)
type ProxyManager struct {
sendCh chan (msg.Message)
proxies map[string]*ProxyWrapper
closed bool
mu sync.RWMutex
logPrefix string
log.Logger
}
func NewProxyManager(msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
return &ProxyManager{
proxies: make(map[string]*ProxyWrapper),
sendCh: msgSendCh,
closed: false,
logPrefix: logPrefix,
Logger: log.NewPrefixLogger(logPrefix),
}
}
func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error {
pm.mu.RLock()
pxy, ok := pm.proxies[name]
pm.mu.RUnlock()
if !ok {
return fmt.Errorf("proxy [%s] not found", name)
}
err := pxy.SetRunningStatus(remoteAddr, serverRespErr)
if err != nil {
return err
}
return nil
}
func (pm *ProxyManager) Close() {
pm.mu.RLock()
defer pm.mu.RUnlock()
for _, pxy := range pm.proxies {
pxy.Stop()
}
}
func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) {
pm.mu.RLock()
pw, ok := pm.proxies[name]
pm.mu.RUnlock()
if ok {
pw.InWorkConn(workConn)
} else {
workConn.Close()
}
}
func (pm *ProxyManager) HandleEvent(evType event.EventType, payload interface{}) error {
var m msg.Message
switch e := payload.(type) {
case *event.StartProxyPayload:
m = e.NewProxyMsg
case *event.CloseProxyPayload:
m = e.CloseProxyMsg
default:
return event.ErrPayloadType
}
err := errors.PanicToError(func() {
pm.sendCh <- m
})
return err
}
func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
ps := make([]*ProxyStatus, 0)
pm.mu.RLock()
defer pm.mu.RUnlock()
for _, pxy := range pm.proxies {
ps = append(ps, pxy.GetStatus())
}
return ps
}
func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) {
pm.mu.Lock()
defer pm.mu.Unlock()
delPxyNames := make([]string, 0)
for name, pxy := range pm.proxies {
del := false
cfg, ok := pxyCfgs[name]
if !ok {
del = true
} else {
if !pxy.Cfg.Compare(cfg) {
del = true
}
}
if del {
delPxyNames = append(delPxyNames, name)
delete(pm.proxies, name)
pxy.Stop()
}
}
if len(delPxyNames) > 0 {
pm.Info("proxy removed: %v", delPxyNames)
}
addPxyNames := make([]string, 0)
for name, cfg := range pxyCfgs {
if _, ok := pm.proxies[name]; !ok {
pxy := NewProxyWrapper(cfg, pm.HandleEvent, pm.logPrefix)
pm.proxies[name] = pxy
addPxyNames = append(addPxyNames, name)
pxy.Start()
}
}
if len(addPxyNames) > 0 {
pm.Info("proxy added: %v", addPxyNames)
}
}

View File

@@ -0,0 +1,244 @@
package proxy
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/fatedier/frp/client/event"
"github.com/fatedier/frp/client/health"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/golib/errors"
)
const (
ProxyStatusNew = "new"
ProxyStatusWaitStart = "wait start"
ProxyStatusStartErr = "start error"
ProxyStatusRunning = "running"
ProxyStatusCheckFailed = "check failed"
ProxyStatusClosed = "closed"
)
var (
statusCheckInterval time.Duration = 3 * time.Second
waitResponseTimeout = 20 * time.Second
startErrTimeout = 30 * time.Second
)
type ProxyStatus struct {
Name string `json:"name"`
Type string `json:"type"`
Status string `json:"status"`
Err string `json:"err"`
Cfg config.ProxyConf `json:"cfg"`
// Got from server.
RemoteAddr string `json:"remote_addr"`
}
type ProxyWrapper struct {
ProxyStatus
// underlying proxy
pxy Proxy
// if ProxyConf has healcheck config
// monitor will watch if it is alive
monitor *health.HealthCheckMonitor
// event handler
handler event.EventHandler
health uint32
lastSendStartMsg time.Time
lastStartErr time.Time
closeCh chan struct{}
healthNotifyCh chan struct{}
mu sync.RWMutex
log.Logger
}
func NewProxyWrapper(cfg config.ProxyConf, eventHandler event.EventHandler, logPrefix string) *ProxyWrapper {
baseInfo := cfg.GetBaseInfo()
pw := &ProxyWrapper{
ProxyStatus: ProxyStatus{
Name: baseInfo.ProxyName,
Type: baseInfo.ProxyType,
Status: ProxyStatusNew,
Cfg: cfg,
},
closeCh: make(chan struct{}),
healthNotifyCh: make(chan struct{}),
handler: eventHandler,
Logger: log.NewPrefixLogger(logPrefix),
}
pw.AddLogPrefix(pw.Name)
if baseInfo.HealthCheckType != "" {
pw.health = 1 // means failed
pw.monitor = health.NewHealthCheckMonitor(baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS,
baseInfo.HealthCheckTimeoutS, baseInfo.HealthCheckMaxFailed, baseInfo.HealthCheckAddr,
baseInfo.HealthCheckUrl, pw.statusNormalCallback, pw.statusFailedCallback)
pw.monitor.SetLogger(pw.Logger)
pw.Trace("enable health check monitor")
}
pw.pxy = NewProxy(pw.Cfg)
return pw
}
func (pw *ProxyWrapper) SetRunningStatus(remoteAddr string, respErr string) error {
pw.mu.Lock()
defer pw.mu.Unlock()
if pw.Status != ProxyStatusWaitStart {
return fmt.Errorf("status not wait start, ignore start message")
}
pw.RemoteAddr = remoteAddr
if respErr != "" {
pw.Status = ProxyStatusStartErr
pw.Err = respErr
pw.lastStartErr = time.Now()
return fmt.Errorf(pw.Err)
}
if err := pw.pxy.Run(); err != nil {
pw.Status = ProxyStatusStartErr
pw.Err = err.Error()
pw.lastStartErr = time.Now()
return err
}
pw.Status = ProxyStatusRunning
pw.Err = ""
return nil
}
func (pw *ProxyWrapper) Start() {
go pw.checkWorker()
if pw.monitor != nil {
go pw.monitor.Start()
}
}
func (pw *ProxyWrapper) Stop() {
pw.mu.Lock()
defer pw.mu.Unlock()
close(pw.closeCh)
close(pw.healthNotifyCh)
pw.pxy.Close()
if pw.monitor != nil {
pw.monitor.Stop()
}
pw.Status = ProxyStatusClosed
pw.handler(event.EvCloseProxy, &event.CloseProxyPayload{
CloseProxyMsg: &msg.CloseProxy{
ProxyName: pw.Name,
},
})
}
func (pw *ProxyWrapper) checkWorker() {
if pw.monitor != nil {
// let monitor do check request first
time.Sleep(500 * time.Millisecond)
}
for {
// check proxy status
now := time.Now()
if atomic.LoadUint32(&pw.health) == 0 {
pw.mu.Lock()
if pw.Status == ProxyStatusNew ||
pw.Status == ProxyStatusCheckFailed ||
(pw.Status == ProxyStatusWaitStart && now.After(pw.lastSendStartMsg.Add(waitResponseTimeout))) ||
(pw.Status == ProxyStatusStartErr && now.After(pw.lastStartErr.Add(startErrTimeout))) {
pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusWaitStart)
pw.Status = ProxyStatusWaitStart
var newProxyMsg msg.NewProxy
pw.Cfg.MarshalToMsg(&newProxyMsg)
pw.lastSendStartMsg = now
pw.handler(event.EvStartProxy, &event.StartProxyPayload{
NewProxyMsg: &newProxyMsg,
})
}
pw.mu.Unlock()
} else {
pw.mu.Lock()
if pw.Status == ProxyStatusRunning || pw.Status == ProxyStatusWaitStart {
pw.handler(event.EvCloseProxy, &event.CloseProxyPayload{
CloseProxyMsg: &msg.CloseProxy{
ProxyName: pw.Name,
},
})
pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusCheckFailed)
pw.Status = ProxyStatusCheckFailed
}
pw.mu.Unlock()
}
select {
case <-pw.closeCh:
return
case <-time.After(statusCheckInterval):
case <-pw.healthNotifyCh:
}
}
}
func (pw *ProxyWrapper) statusNormalCallback() {
atomic.StoreUint32(&pw.health, 0)
errors.PanicToError(func() {
select {
case pw.healthNotifyCh <- struct{}{}:
default:
}
})
pw.Info("health check success")
}
func (pw *ProxyWrapper) statusFailedCallback() {
atomic.StoreUint32(&pw.health, 1)
errors.PanicToError(func() {
select {
case pw.healthNotifyCh <- struct{}{}:
default:
}
})
pw.Info("health check failed")
}
func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) {
pw.mu.RLock()
pxy := pw.pxy
pw.mu.RUnlock()
if pxy != nil {
workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
go pxy.InWorkConn(workConn)
} else {
workConn.Close()
}
}
func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
pw.mu.RLock()
defer pw.mu.RUnlock()
ps := &ProxyStatus{
Name: pw.Name,
Type: pw.Type,
Status: pw.Status,
Err: pw.Err,
Cfg: pw.Cfg,
RemoteAddr: pw.RemoteAddr,
}
return ps
}