support xtcp for making nat hole

This commit is contained in:
fatedier
2017-10-24 18:20:07 +08:00
parent 6320f15a7c
commit 0559865fe5
15 changed files with 676 additions and 24 deletions

View File

@@ -97,9 +97,10 @@ func NewControl(svr *Service, ctlConn net.Conn, loginMsg *msg.Login) *Control {
// Start send a login success message to client and start working.
func (ctl *Control) Start() {
loginRespMsg := &msg.LoginResp{
Version: version.Full(),
RunId: ctl.runId,
Error: "",
Version: version.Full(),
RunId: ctl.runId,
ServerUdpPort: config.ServerCommonCfg.BindUdpPort,
Error: "",
}
msg.WriteMsg(ctl.conn, loginRespMsg)

182
server/nathole.go Normal file
View File

@@ -0,0 +1,182 @@
package server
import (
"bytes"
"fmt"
"net"
"sync"
"time"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/errors"
"github.com/fatedier/frp/utils/log"
"github.com/fatedier/frp/utils/pool"
"github.com/fatedier/frp/utils/util"
)
// Timeout seconds.
var NatHoleTimeout int64 = 10
type NatHoleController struct {
listener *net.UDPConn
clientCfgs map[string]*NatHoleClientCfg
sessions map[string]*NatHoleSession
mu sync.RWMutex
}
func NewNatHoleController(udpBindAddr string) (nc *NatHoleController, err error) {
addr, err := net.ResolveUDPAddr("udp", udpBindAddr)
if err != nil {
return nil, err
}
lconn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
nc = &NatHoleController{
listener: lconn,
clientCfgs: make(map[string]*NatHoleClientCfg),
sessions: make(map[string]*NatHoleSession),
}
return nc, nil
}
func (nc *NatHoleController) ListenClient(name string, sk string) (sidCh chan string) {
clientCfg := &NatHoleClientCfg{
Name: name,
Sk: sk,
SidCh: make(chan string),
}
nc.mu.Lock()
nc.clientCfgs[name] = clientCfg
nc.mu.Unlock()
return clientCfg.SidCh
}
func (nc *NatHoleController) CloseClient(name string) {
nc.mu.Lock()
defer nc.mu.Unlock()
delete(nc.clientCfgs, name)
}
func (nc *NatHoleController) Run() {
for {
buf := pool.GetBuf(1024)
n, raddr, err := nc.listener.ReadFromUDP(buf)
if err != nil {
log.Trace("nat hole listener read from udp error: %v", err)
return
}
rd := bytes.NewReader(buf[:n])
rawMsg, err := msg.ReadMsg(rd)
if err != nil {
log.Trace("read nat hole message error: %v", err)
continue
}
switch m := rawMsg.(type) {
case *msg.NatHoleVistor:
go nc.HandleVistor(m, raddr)
case *msg.NatHoleClient:
go nc.HandleClient(m, raddr)
default:
log.Trace("error nat hole message type")
continue
}
pool.PutBuf(buf)
}
}
func (nc *NatHoleController) GenSid() string {
t := time.Now().Unix()
id, _ := util.RandId()
return fmt.Sprintf("%d%s", t, id)
}
func (nc *NatHoleController) HandleVistor(m *msg.NatHoleVistor, raddr *net.UDPAddr) {
sid := nc.GenSid()
session := &NatHoleSession{
Sid: sid,
VistorAddr: raddr,
NotifyCh: make(chan struct{}, 0),
}
nc.mu.Lock()
clientCfg, ok := nc.clientCfgs[m.ProxyName]
if !ok || m.SignKey != util.GetAuthKey(clientCfg.Sk, m.Timestamp) {
nc.mu.Unlock()
return
}
nc.sessions[sid] = session
nc.mu.Unlock()
log.Trace("handle vistor message, sid [%s]", sid)
defer func() {
nc.mu.Lock()
delete(nc.sessions, sid)
nc.mu.Unlock()
}()
err := errors.PanicToError(func() {
clientCfg.SidCh <- sid
})
if err != nil {
return
}
// Wait client connections.
select {
case <-session.NotifyCh:
resp := nc.GenNatHoleResponse(raddr, session)
log.Trace("send nat hole response to vistor")
nc.listener.WriteToUDP(resp, raddr)
case <-time.After(time.Duration(NatHoleTimeout) * time.Second):
return
}
}
func (nc *NatHoleController) HandleClient(m *msg.NatHoleClient, raddr *net.UDPAddr) {
nc.mu.RLock()
session, ok := nc.sessions[m.Sid]
nc.mu.RUnlock()
if !ok {
return
}
log.Trace("handle client message, sid [%s]", session.Sid)
session.ClientAddr = raddr
session.NotifyCh <- struct{}{}
resp := nc.GenNatHoleResponse(raddr, session)
log.Trace("send nat hole response to client")
nc.listener.WriteToUDP(resp, raddr)
}
func (nc *NatHoleController) GenNatHoleResponse(raddr *net.UDPAddr, session *NatHoleSession) []byte {
m := &msg.NatHoleResp{
Sid: session.Sid,
VistorAddr: session.VistorAddr.String(),
ClientAddr: session.ClientAddr.String(),
}
b := bytes.NewBuffer(nil)
err := msg.WriteMsg(b, m)
if err != nil {
return []byte("")
}
return b.Bytes()
}
type NatHoleSession struct {
Sid string
VistorAddr *net.UDPAddr
ClientAddr *net.UDPAddr
NotifyCh chan struct{}
}
type NatHoleClientCfg struct {
Name string
Sk string
SidCh chan string
}

View File

@@ -148,6 +148,11 @@ func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy, err error) {
BaseProxy: basePxy,
cfg: cfg,
}
case *config.XtcpProxyConf:
pxy = &XtcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
default:
return pxy, fmt.Errorf("proxy type not support")
}
@@ -306,6 +311,54 @@ func (pxy *StcpProxy) Close() {
pxy.ctl.svr.vistorManager.CloseListener(pxy.GetName())
}
type XtcpProxy struct {
BaseProxy
cfg *config.XtcpProxyConf
closeCh chan struct{}
}
func (pxy *XtcpProxy) Run() error {
if pxy.ctl.svr.natHoleController == nil {
pxy.Error("udp port for xtcp is not specified.")
return fmt.Errorf("xtcp is not supported in frps")
}
sidCh := pxy.ctl.svr.natHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk)
go func() {
for {
select {
case <-pxy.closeCh:
break
case sid := <-sidCh:
workConn, err := pxy.GetWorkConnFromPool()
if err != nil {
continue
}
m := &msg.NatHoleSid{
Sid: sid,
}
err = msg.WriteMsg(workConn, m)
if err != nil {
pxy.Warn("write nat hole sid package error, %v", err)
}
}
}
}()
return nil
}
func (pxy *XtcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *XtcpProxy) Close() {
pxy.BaseProxy.Close()
pxy.ctl.svr.natHoleController.CloseClient(pxy.GetName())
errors.PanicToError(func() {
close(pxy.closeCh)
})
}
type UdpProxy struct {
BaseProxy
cfg *config.UdpProxyConf

View File

@@ -58,6 +58,9 @@ type Service struct {
// Manage all vistor listeners.
vistorManager *VistorManager
// Controller for nat hole connections.
natHoleController *NatHoleController
}
func NewService() (svr *Service, err error) {
@@ -66,36 +69,37 @@ func NewService() (svr *Service, err error) {
pxyManager: NewProxyManager(),
vistorManager: NewVistorManager(),
}
cfg := config.ServerCommonCfg
// Init assets.
err = assets.Load(config.ServerCommonCfg.AssetsDir)
err = assets.Load(cfg.AssetsDir)
if err != nil {
err = fmt.Errorf("Load assets error: %v", err)
return
}
// Listen for accepting connections from client.
svr.listener, err = frpNet.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort)
svr.listener, err = frpNet.ListenTcp(cfg.BindAddr, cfg.BindPort)
if err != nil {
err = fmt.Errorf("Create server listener error, %v", err)
return
}
log.Info("frps tcp listen on %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort)
log.Info("frps tcp listen on %s:%d", cfg.BindAddr, cfg.BindPort)
// Listen for accepting connections from client using kcp protocol.
if config.ServerCommonCfg.KcpBindPort > 0 {
svr.kcpListener, err = frpNet.ListenKcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.KcpBindPort)
if cfg.KcpBindPort > 0 {
svr.kcpListener, err = frpNet.ListenKcp(cfg.BindAddr, cfg.KcpBindPort)
if err != nil {
err = fmt.Errorf("Listen on kcp address udp [%s:%d] error: %v", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.KcpBindPort, err)
err = fmt.Errorf("Listen on kcp address udp [%s:%d] error: %v", cfg.BindAddr, cfg.KcpBindPort, err)
return
}
log.Info("frps kcp listen on udp %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort)
log.Info("frps kcp listen on udp %s:%d", cfg.BindAddr, cfg.BindPort)
}
// Create http vhost muxer.
if config.ServerCommonCfg.VhostHttpPort > 0 {
if cfg.VhostHttpPort > 0 {
var l frpNet.Listener
l, err = frpNet.ListenTcp(config.ServerCommonCfg.ProxyBindAddr, config.ServerCommonCfg.VhostHttpPort)
l, err = frpNet.ListenTcp(cfg.ProxyBindAddr, cfg.VhostHttpPort)
if err != nil {
err = fmt.Errorf("Create vhost http listener error, %v", err)
return
@@ -105,13 +109,13 @@ func NewService() (svr *Service, err error) {
err = fmt.Errorf("Create vhost httpMuxer error, %v", err)
return
}
log.Info("http service listen on %s:%d", config.ServerCommonCfg.ProxyBindAddr, config.ServerCommonCfg.VhostHttpPort)
log.Info("http service listen on %s:%d", cfg.ProxyBindAddr, cfg.VhostHttpPort)
}
// Create https vhost muxer.
if config.ServerCommonCfg.VhostHttpsPort > 0 {
if cfg.VhostHttpsPort > 0 {
var l frpNet.Listener
l, err = frpNet.ListenTcp(config.ServerCommonCfg.ProxyBindAddr, config.ServerCommonCfg.VhostHttpsPort)
l, err = frpNet.ListenTcp(cfg.ProxyBindAddr, cfg.VhostHttpsPort)
if err != nil {
err = fmt.Errorf("Create vhost https listener error, %v", err)
return
@@ -121,22 +125,38 @@ func NewService() (svr *Service, err error) {
err = fmt.Errorf("Create vhost httpsMuxer error, %v", err)
return
}
log.Info("https service listen on %s:%d", config.ServerCommonCfg.ProxyBindAddr, config.ServerCommonCfg.VhostHttpsPort)
log.Info("https service listen on %s:%d", cfg.ProxyBindAddr, cfg.VhostHttpsPort)
}
// Create nat hole controller.
if cfg.BindUdpPort > 0 {
var nc *NatHoleController
addr := fmt.Sprintf("%s:%d", cfg.BindAddr, cfg.BindUdpPort)
nc, err = NewNatHoleController(addr)
if err != nil {
err = fmt.Errorf("Create nat hole controller error, %v", err)
return
}
svr.natHoleController = nc
log.Info("nat hole udp service listen on %s:%d", cfg.BindAddr, cfg.BindUdpPort)
}
// Create dashboard web server.
if config.ServerCommonCfg.DashboardPort > 0 {
err = RunDashboardServer(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.DashboardPort)
if cfg.DashboardPort > 0 {
err = RunDashboardServer(cfg.BindAddr, cfg.DashboardPort)
if err != nil {
err = fmt.Errorf("Create dashboard web server error, %v", err)
return
}
log.Info("Dashboard listen on %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.DashboardPort)
log.Info("Dashboard listen on %s:%d", cfg.BindAddr, cfg.DashboardPort)
}
return
}
func (svr *Service) Run() {
if svr.natHoleController != nil {
go svr.natHoleController.Run()
}
if config.ServerCommonCfg.KcpBindPort > 0 {
go svr.HandleListener(svr.kcpListener)
}