diff --git a/client/control.go b/client/control.go index 29dca60c..b6c4d481 100644 --- a/client/control.go +++ b/client/control.go @@ -271,9 +271,10 @@ func (ctl *Control) login() (err error) { ctl.conn = conn // update runId got from server ctl.setRunId(loginRespMsg.RunId) + config.ClientCommonCfg.ServerUdpPort = loginRespMsg.ServerUdpPort ctl.ClearLogPrefix() ctl.AddLogPrefix(loginRespMsg.RunId) - ctl.Info("login to server success, get run id [%s]", loginRespMsg.RunId) + ctl.Info("login to server success, get run id [%s], server udp port [%d]", loginRespMsg.RunId, loginRespMsg.ServerUdpPort) // login success, so we let closedCh available again ctl.closedCh = make(chan int) diff --git a/client/proxy.go b/client/proxy.go index 147a3fbd..d7034592 100644 --- a/client/proxy.go +++ b/client/proxy.go @@ -72,6 +72,11 @@ func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy) { BaseProxy: baseProxy, cfg: cfg, } + case *config.XtcpProxyConf: + pxy = &XtcpProxy{ + BaseProxy: baseProxy, + cfg: cfg, + } } return } @@ -195,6 +200,90 @@ func (pxy *StcpProxy) InWorkConn(conn frpNet.Conn) { HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn) } +// XTCP +type XtcpProxy struct { + BaseProxy + + cfg *config.XtcpProxyConf + proxyPlugin plugin.Plugin +} + +func (pxy *XtcpProxy) Run() (err error) { + if pxy.cfg.Plugin != "" { + pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams) + if err != nil { + return + } + } + return +} + +func (pxy *XtcpProxy) Close() { + if pxy.proxyPlugin != nil { + pxy.proxyPlugin.Close() + } +} + +func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn) { + defer conn.Close() + var natHoleSidMsg msg.NatHoleSid + err := msg.ReadMsgInto(conn, &natHoleSidMsg) + if err != nil { + pxy.Error("xtcp read from workConn error: %v", err) + return + } + + natHoleClientMsg := &msg.NatHoleClient{ + ProxyName: pxy.cfg.ProxyName, + Sid: natHoleSidMsg.Sid, + } + raddr, _ := net.ResolveUDPAddr("udp", + fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerUdpPort)) + clientConn, err := net.DialUDP("udp", nil, raddr) + defer clientConn.Close() + + err = msg.WriteMsg(clientConn, natHoleClientMsg) + if err != nil { + pxy.Error("send natHoleClientMsg to server error: %v", err) + return + } + + // Wait for client address at most 10 seconds. + var natHoleRespMsg msg.NatHoleResp + clientConn.SetReadDeadline(time.Now().Add(10 * time.Second)) + err = msg.ReadMsgInto(clientConn, &natHoleRespMsg) + if err != nil { + pxy.Error("get natHoleRespMsg error: %v", err) + return + } + clientConn.SetReadDeadline(time.Time{}) + clientConn.Close() + + // Send sid to vistor udp address. + time.Sleep(time.Second) + laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String()) + daddr, err := net.ResolveUDPAddr("udp", natHoleRespMsg.VistorAddr) + if err != nil { + pxy.Error("resolve vistor udp address error: %v", err) + return + } + + lConn, err := net.DialUDP("udp", laddr, daddr) + if err != nil { + pxy.Error("dial vistor udp address error: %v", err) + return + } + lConn.Write([]byte(natHoleRespMsg.Sid)) + + kcpConn, err := frpNet.NewKcpConnFromUdp(lConn, true, natHoleRespMsg.VistorAddr) + if err != nil { + pxy.Error("create kcp connection from udp connection error: %v", err) + return + } + + HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, frpNet.WrapConn(kcpConn)) +} + // UDP type UdpProxy struct { BaseProxy diff --git a/client/vistor.go b/client/vistor.go index 8787ebfe..7721eb30 100644 --- a/client/vistor.go +++ b/client/vistor.go @@ -15,15 +15,21 @@ package client import ( + "fmt" "io" + "net" + "strings" "sync" "time" + "golang.org/x/net/ipv4" + "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" frpIo "github.com/fatedier/frp/utils/io" "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" + "github.com/fatedier/frp/utils/pool" "github.com/fatedier/frp/utils/util" ) @@ -45,6 +51,11 @@ func NewVistor(ctl *Control, pxyConf config.ProxyConf) (vistor Vistor) { BaseVistor: baseVistor, cfg: cfg, } + case *config.XtcpProxyConf: + vistor = &XtcpVistor{ + BaseVistor: baseVistor, + cfg: cfg, + } } return } @@ -143,3 +154,142 @@ func (sv *StcpVistor) handleConn(userConn frpNet.Conn) { frpIo.Join(userConn, remote) } + +type XtcpVistor struct { + BaseVistor + + cfg *config.XtcpProxyConf +} + +func (sv *XtcpVistor) Run() (err error) { + sv.l, err = frpNet.ListenTcp(sv.cfg.BindAddr, int64(sv.cfg.BindPort)) + if err != nil { + return + } + + go sv.worker() + return +} + +func (sv *XtcpVistor) Close() { + sv.l.Close() +} + +func (sv *XtcpVistor) worker() { + for { + conn, err := sv.l.Accept() + if err != nil { + sv.Warn("stcp local listener closed") + return + } + + go sv.handleConn(conn) + } +} + +func (sv *XtcpVistor) handleConn(userConn frpNet.Conn) { + defer userConn.Close() + + sv.Debug("get a new xtcp user connection") + if config.ClientCommonCfg.ServerUdpPort == 0 { + sv.Error("xtcp is not supported by server") + return + } + + raddr, err := net.ResolveUDPAddr("udp", + fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerUdpPort)) + vistorConn, err := net.DialUDP("udp", nil, raddr) + defer vistorConn.Close() + + now := time.Now().Unix() + natHoleVistorMsg := &msg.NatHoleVistor{ + ProxyName: sv.cfg.ServerName, + SignKey: util.GetAuthKey(sv.cfg.Sk, now), + Timestamp: now, + } + err = msg.WriteMsg(vistorConn, natHoleVistorMsg) + if err != nil { + sv.Warn("send natHoleVistorMsg to server error: %v", err) + return + } + + // Wait for client address at most 10 seconds. + var natHoleResp msg.NatHoleResp + vistorConn.SetReadDeadline(time.Now().Add(10 * time.Second)) + err = msg.ReadMsgInto(vistorConn, &natHoleResp) + if err != nil { + sv.Warn("get natHoleRespMsg error: %v", err) + return + } + vistorConn.SetReadDeadline(time.Time{}) + + // Close vistorConn, so we can use it's local address. + vistorConn.Close() + + // Send detect message for all ports of client in case different NAT type. + array := strings.Split(natHoleResp.ClientAddr, ":") + if len(array) <= 0 { + sv.Error("get natHoleResp client address error: %s", natHoleResp.ClientAddr) + return + } + laddr, _ := net.ResolveUDPAddr("udp", vistorConn.LocalAddr().String()) + for i := 1000; i < 65000; i++ { + sv.sendDetectMsg(array[0], int64(i), laddr) + } + + // Listen for vistorConn's address and wait for client connection. + lConn, _ := net.ListenUDP("udp", laddr) + lConn.SetReadDeadline(time.Now().Add(10 * time.Second)) + sidBuf := pool.GetBuf(1024) + n, _, err := lConn.ReadFromUDP(sidBuf) + if err != nil { + sv.Warn("get sid from client error: %v", err) + return + } + lConn.SetReadDeadline(time.Time{}) + if string(sidBuf[:n]) != natHoleResp.Sid { + sv.Warn("incorrect sid from client") + return + } + pool.PutBuf(sidBuf) + + var remote io.ReadWriteCloser + remote, err = frpNet.NewKcpConnFromUdp(lConn, false, natHoleResp.ClientAddr) + if err != nil { + sv.Error("create kcp connection from udp connection error: %v", err) + return + } + + if sv.cfg.UseEncryption { + remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk)) + if err != nil { + sv.Error("create encryption stream error: %v", err) + return + } + } + + if sv.cfg.UseCompression { + remote = frpIo.WithCompression(remote) + } + + frpIo.Join(userConn, remote) +} + +func (sv *XtcpVistor) sendDetectMsg(addr string, port int64, laddr *net.UDPAddr) (err error) { + daddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, port)) + if err != nil { + return err + } + + tConn, err := net.DialUDP("udp", laddr, daddr) + if err != nil { + return err + } + + uConn := ipv4.NewConn(tConn) + uConn.SetTTL(3) + + tConn.Write([]byte(fmt.Sprintf("%d", port))) + tConn.Close() + return nil +} diff --git a/conf/frpc_full.ini b/conf/frpc_full.ini index 5ae97447..e8e08644 100644 --- a/conf/frpc_full.ini +++ b/conf/frpc_full.ini @@ -141,3 +141,19 @@ bind_addr = 127.0.0.1 bind_port = 9000 use_encryption = false use_compression = false + +[p2p_tcp] +type = xtcp +sk = abcdefg +local_ip = 127.0.0.1 +local_port = 22 +use_encryption = false +use_compression = false + +[p2p_tcp_vistor] +role = vistor +type = xtcp +server_name = p2p_tcp +sk = abcdefg +bind_addr = 127.0.0.1 +bind_port = 9001 diff --git a/conf/frps_full.ini b/conf/frps_full.ini index 3b4740ed..94f481c0 100644 --- a/conf/frps_full.ini +++ b/conf/frps_full.ini @@ -5,6 +5,9 @@ bind_addr = 0.0.0.0 bind_port = 7000 +# udp port to help make udp hole to penetrate nat +bind_udp_port = 7001 + # udp port used for kcp protocol, it can be same with 'bind_port' # if not set, kcp is disabled in frps kcp_bind_port = 7000 diff --git a/models/config/client_common.go b/models/config/client_common.go index 749b6b13..f98169e7 100644 --- a/models/config/client_common.go +++ b/models/config/client_common.go @@ -30,6 +30,7 @@ type ClientCommonConf struct { ConfigFile string ServerAddr string ServerPort int64 + ServerUdpPort int64 // this is specified by login response message from frps HttpProxy string LogFile string LogWay string @@ -55,6 +56,7 @@ func GetDeaultClientCommonConf() *ClientCommonConf { ConfigFile: "./frpc.ini", ServerAddr: "0.0.0.0", ServerPort: 7000, + ServerUdpPort: 0, HttpProxy: "", LogFile: "console", LogWay: "console", diff --git a/models/config/proxy.go b/models/config/proxy.go index b42f416c..2246a844 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -36,6 +36,7 @@ func init() { proxyConfTypeMap[consts.HttpProxy] = reflect.TypeOf(HttpProxyConf{}) proxyConfTypeMap[consts.HttpsProxy] = reflect.TypeOf(HttpsProxyConf{}) proxyConfTypeMap[consts.StcpProxy] = reflect.TypeOf(StcpProxyConf{}) + proxyConfTypeMap[consts.XtcpProxy] = reflect.TypeOf(XtcpProxyConf{}) } // NewConfByType creates a empty ProxyConf object by proxyType. @@ -672,6 +673,95 @@ func (cfg *StcpProxyConf) Check() (err error) { return } +// XTCP +type XtcpProxyConf struct { + BaseProxyConf + + Role string `json:"role"` + Sk string `json:"sk"` + + // used in role server + LocalSvrConf + PluginConf + + // used in role vistor + ServerName string `json:"server_name"` + BindAddr string `json:"bind_addr"` + BindPort int `json:"bind_port"` +} + +func (cfg *XtcpProxyConf) Compare(cmp ProxyConf) bool { + cmpConf, ok := cmp.(*XtcpProxyConf) + if !ok { + return false + } + + if !cfg.BaseProxyConf.compare(&cmpConf.BaseProxyConf) || + !cfg.LocalSvrConf.compare(&cmpConf.LocalSvrConf) || + !cfg.PluginConf.compare(&cmpConf.PluginConf) || + cfg.Role != cmpConf.Role || + cfg.Sk != cmpConf.Sk || + cfg.ServerName != cmpConf.ServerName || + cfg.BindAddr != cmpConf.BindAddr || + cfg.BindPort != cmpConf.BindPort { + return false + } + return true +} + +// Only for role server. +func (cfg *XtcpProxyConf) LoadFromMsg(pMsg *msg.NewProxy) { + cfg.BaseProxyConf.LoadFromMsg(pMsg) + cfg.Sk = pMsg.Sk +} + +func (cfg *XtcpProxyConf) LoadFromFile(name string, section ini.Section) (err error) { + if err = cfg.BaseProxyConf.LoadFromFile(name, section); err != nil { + return + } + + tmpStr := section["role"] + if tmpStr == "server" || tmpStr == "vistor" { + cfg.Role = tmpStr + } else { + cfg.Role = "server" + } + + cfg.Sk = section["sk"] + + if tmpStr == "vistor" { + prefix := section["prefix"] + cfg.ServerName = prefix + section["server_name"] + if cfg.BindAddr = section["bind_addr"]; cfg.BindAddr == "" { + cfg.BindAddr = "127.0.0.1" + } + + if tmpStr, ok := section["bind_port"]; ok { + if cfg.BindPort, err = strconv.Atoi(tmpStr); err != nil { + return fmt.Errorf("Parse conf error: proxy [%s] bind_port error", name) + } + } else { + return fmt.Errorf("Parse conf error: proxy [%s] bind_port not found", name) + } + } else { + if err = cfg.PluginConf.LoadFromFile(name, section); err != nil { + if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil { + return + } + } + } + return +} + +func (cfg *XtcpProxyConf) UnMarshalToMsg(pMsg *msg.NewProxy) { + cfg.BaseProxyConf.UnMarshalToMsg(pMsg) + pMsg.Sk = cfg.Sk +} + +func (cfg *XtcpProxyConf) Check() (err error) { + return +} + // if len(startProxy) is 0, start all // otherwise just start proxies in startProxy map func LoadProxyConfFromFile(prefix string, conf ini.File, startProxy map[string]struct{}) ( diff --git a/models/config/server_common.go b/models/config/server_common.go index 1795a2a5..91a9b8bc 100644 --- a/models/config/server_common.go +++ b/models/config/server_common.go @@ -30,6 +30,7 @@ type ServerCommonConf struct { ConfigFile string BindAddr string BindPort int64 + BindUdpPort int64 KcpBindPort int64 ProxyBindAddr string @@ -66,6 +67,7 @@ func GetDefaultServerCommonConf() *ServerCommonConf { ConfigFile: "./frps.ini", BindAddr: "0.0.0.0", BindPort: 7000, + BindUdpPort: 0, KcpBindPort: 0, ProxyBindAddr: "0.0.0.0", VhostHttpPort: 0, @@ -111,6 +113,14 @@ func LoadServerCommonConf(conf ini.File) (cfg *ServerCommonConf, err error) { } } + tmpStr, ok = conf.Get("common", "bind_udp_port") + if ok { + v, err = strconv.ParseInt(tmpStr, 10, 64) + if err == nil { + cfg.BindUdpPort = v + } + } + tmpStr, ok = conf.Get("common", "kcp_bind_port") if ok { v, err = strconv.ParseInt(tmpStr, 10, 64) diff --git a/models/consts/consts.go b/models/consts/consts.go index 5a4bc264..9bf5880b 100644 --- a/models/consts/consts.go +++ b/models/consts/consts.go @@ -28,4 +28,5 @@ var ( HttpProxy string = "http" HttpsProxy string = "https" StcpProxy string = "stcp" + XtcpProxy string = "xtcp" ) diff --git a/models/msg/msg.go b/models/msg/msg.go index 59736b6d..fc2ab2f1 100644 --- a/models/msg/msg.go +++ b/models/msg/msg.go @@ -33,6 +33,10 @@ const ( TypePing = 'h' TypePong = '4' TypeUdpPacket = 'u' + TypeNatHoleVistor = 'i' + TypeNatHoleClient = 'n' + TypeNatHoleResp = 'm' + TypeNatHoleSid = '5' ) var ( @@ -57,6 +61,10 @@ func init() { TypeMap[TypePing] = reflect.TypeOf(Ping{}) TypeMap[TypePong] = reflect.TypeOf(Pong{}) TypeMap[TypeUdpPacket] = reflect.TypeOf(UdpPacket{}) + TypeMap[TypeNatHoleVistor] = reflect.TypeOf(NatHoleVistor{}) + TypeMap[TypeNatHoleClient] = reflect.TypeOf(NatHoleClient{}) + TypeMap[TypeNatHoleResp] = reflect.TypeOf(NatHoleResp{}) + TypeMap[TypeNatHoleSid] = reflect.TypeOf(NatHoleSid{}) for k, v := range TypeMap { TypeStringMap[v] = k @@ -82,9 +90,10 @@ type Login struct { } type LoginResp struct { - Version string `json:"version"` - RunId string `json:"run_id"` - Error string `json:"error"` + Version string `json:"version"` + RunId string `json:"run_id"` + ServerUdpPort int64 `json:"server_udp_port"` + Error string `json:"error"` } // When frpc login success, send this message to frps for running a new proxy. @@ -153,3 +162,24 @@ type UdpPacket struct { LocalAddr *net.UDPAddr `json:"l"` RemoteAddr *net.UDPAddr `json:"r"` } + +type NatHoleVistor struct { + ProxyName string `json:"proxy_name"` + SignKey string `json:"sign_key"` + Timestamp int64 `json:"timestamp"` +} + +type NatHoleClient struct { + ProxyName string `json:"proxy_name"` + Sid string `json:"sid"` +} + +type NatHoleResp struct { + Sid string `json:"sid"` + VistorAddr string `json:"vistor_addr"` + ClientAddr string `json:"client_addr"` +} + +type NatHoleSid struct { + Sid string `json;"sid"` +} diff --git a/server/control.go b/server/control.go index 5a84394a..2833277b 100644 --- a/server/control.go +++ b/server/control.go @@ -97,9 +97,10 @@ func NewControl(svr *Service, ctlConn net.Conn, loginMsg *msg.Login) *Control { // Start send a login success message to client and start working. func (ctl *Control) Start() { loginRespMsg := &msg.LoginResp{ - Version: version.Full(), - RunId: ctl.runId, - Error: "", + Version: version.Full(), + RunId: ctl.runId, + ServerUdpPort: config.ServerCommonCfg.BindUdpPort, + Error: "", } msg.WriteMsg(ctl.conn, loginRespMsg) diff --git a/server/nathole.go b/server/nathole.go new file mode 100644 index 00000000..933d0773 --- /dev/null +++ b/server/nathole.go @@ -0,0 +1,182 @@ +package server + +import ( + "bytes" + "fmt" + "net" + "sync" + "time" + + "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/utils/errors" + "github.com/fatedier/frp/utils/log" + "github.com/fatedier/frp/utils/pool" + "github.com/fatedier/frp/utils/util" +) + +// Timeout seconds. +var NatHoleTimeout int64 = 10 + +type NatHoleController struct { + listener *net.UDPConn + + clientCfgs map[string]*NatHoleClientCfg + sessions map[string]*NatHoleSession + + mu sync.RWMutex +} + +func NewNatHoleController(udpBindAddr string) (nc *NatHoleController, err error) { + addr, err := net.ResolveUDPAddr("udp", udpBindAddr) + if err != nil { + return nil, err + } + lconn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + nc = &NatHoleController{ + listener: lconn, + clientCfgs: make(map[string]*NatHoleClientCfg), + sessions: make(map[string]*NatHoleSession), + } + return nc, nil +} + +func (nc *NatHoleController) ListenClient(name string, sk string) (sidCh chan string) { + clientCfg := &NatHoleClientCfg{ + Name: name, + Sk: sk, + SidCh: make(chan string), + } + nc.mu.Lock() + nc.clientCfgs[name] = clientCfg + nc.mu.Unlock() + return clientCfg.SidCh +} + +func (nc *NatHoleController) CloseClient(name string) { + nc.mu.Lock() + defer nc.mu.Unlock() + delete(nc.clientCfgs, name) +} + +func (nc *NatHoleController) Run() { + for { + buf := pool.GetBuf(1024) + n, raddr, err := nc.listener.ReadFromUDP(buf) + if err != nil { + log.Trace("nat hole listener read from udp error: %v", err) + return + } + + rd := bytes.NewReader(buf[:n]) + rawMsg, err := msg.ReadMsg(rd) + if err != nil { + log.Trace("read nat hole message error: %v", err) + continue + } + + switch m := rawMsg.(type) { + case *msg.NatHoleVistor: + go nc.HandleVistor(m, raddr) + case *msg.NatHoleClient: + go nc.HandleClient(m, raddr) + default: + log.Trace("error nat hole message type") + continue + } + pool.PutBuf(buf) + } +} + +func (nc *NatHoleController) GenSid() string { + t := time.Now().Unix() + id, _ := util.RandId() + return fmt.Sprintf("%d%s", t, id) +} + +func (nc *NatHoleController) HandleVistor(m *msg.NatHoleVistor, raddr *net.UDPAddr) { + sid := nc.GenSid() + session := &NatHoleSession{ + Sid: sid, + VistorAddr: raddr, + NotifyCh: make(chan struct{}, 0), + } + nc.mu.Lock() + clientCfg, ok := nc.clientCfgs[m.ProxyName] + if !ok || m.SignKey != util.GetAuthKey(clientCfg.Sk, m.Timestamp) { + nc.mu.Unlock() + return + } + nc.sessions[sid] = session + nc.mu.Unlock() + log.Trace("handle vistor message, sid [%s]", sid) + + defer func() { + nc.mu.Lock() + delete(nc.sessions, sid) + nc.mu.Unlock() + }() + + err := errors.PanicToError(func() { + clientCfg.SidCh <- sid + }) + if err != nil { + return + } + + // Wait client connections. + select { + case <-session.NotifyCh: + resp := nc.GenNatHoleResponse(raddr, session) + log.Trace("send nat hole response to vistor") + nc.listener.WriteToUDP(resp, raddr) + case <-time.After(time.Duration(NatHoleTimeout) * time.Second): + return + } +} + +func (nc *NatHoleController) HandleClient(m *msg.NatHoleClient, raddr *net.UDPAddr) { + nc.mu.RLock() + session, ok := nc.sessions[m.Sid] + nc.mu.RUnlock() + if !ok { + return + } + log.Trace("handle client message, sid [%s]", session.Sid) + session.ClientAddr = raddr + session.NotifyCh <- struct{}{} + + resp := nc.GenNatHoleResponse(raddr, session) + log.Trace("send nat hole response to client") + nc.listener.WriteToUDP(resp, raddr) +} + +func (nc *NatHoleController) GenNatHoleResponse(raddr *net.UDPAddr, session *NatHoleSession) []byte { + m := &msg.NatHoleResp{ + Sid: session.Sid, + VistorAddr: session.VistorAddr.String(), + ClientAddr: session.ClientAddr.String(), + } + b := bytes.NewBuffer(nil) + err := msg.WriteMsg(b, m) + if err != nil { + return []byte("") + } + return b.Bytes() +} + +type NatHoleSession struct { + Sid string + VistorAddr *net.UDPAddr + ClientAddr *net.UDPAddr + + NotifyCh chan struct{} +} + +type NatHoleClientCfg struct { + Name string + Sk string + SidCh chan string +} diff --git a/server/proxy.go b/server/proxy.go index ecc58751..a4fdcd26 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -148,6 +148,11 @@ func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy, err error) { BaseProxy: basePxy, cfg: cfg, } + case *config.XtcpProxyConf: + pxy = &XtcpProxy{ + BaseProxy: basePxy, + cfg: cfg, + } default: return pxy, fmt.Errorf("proxy type not support") } @@ -306,6 +311,54 @@ func (pxy *StcpProxy) Close() { pxy.ctl.svr.vistorManager.CloseListener(pxy.GetName()) } +type XtcpProxy struct { + BaseProxy + cfg *config.XtcpProxyConf + + closeCh chan struct{} +} + +func (pxy *XtcpProxy) Run() error { + if pxy.ctl.svr.natHoleController == nil { + pxy.Error("udp port for xtcp is not specified.") + return fmt.Errorf("xtcp is not supported in frps") + } + sidCh := pxy.ctl.svr.natHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk) + go func() { + for { + select { + case <-pxy.closeCh: + break + case sid := <-sidCh: + workConn, err := pxy.GetWorkConnFromPool() + if err != nil { + continue + } + m := &msg.NatHoleSid{ + Sid: sid, + } + err = msg.WriteMsg(workConn, m) + if err != nil { + pxy.Warn("write nat hole sid package error, %v", err) + } + } + } + }() + return nil +} + +func (pxy *XtcpProxy) GetConf() config.ProxyConf { + return pxy.cfg +} + +func (pxy *XtcpProxy) Close() { + pxy.BaseProxy.Close() + pxy.ctl.svr.natHoleController.CloseClient(pxy.GetName()) + errors.PanicToError(func() { + close(pxy.closeCh) + }) +} + type UdpProxy struct { BaseProxy cfg *config.UdpProxyConf diff --git a/server/service.go b/server/service.go index e06447dc..78021703 100644 --- a/server/service.go +++ b/server/service.go @@ -58,6 +58,9 @@ type Service struct { // Manage all vistor listeners. vistorManager *VistorManager + + // Controller for nat hole connections. + natHoleController *NatHoleController } func NewService() (svr *Service, err error) { @@ -66,36 +69,37 @@ func NewService() (svr *Service, err error) { pxyManager: NewProxyManager(), vistorManager: NewVistorManager(), } + cfg := config.ServerCommonCfg // Init assets. - err = assets.Load(config.ServerCommonCfg.AssetsDir) + err = assets.Load(cfg.AssetsDir) if err != nil { err = fmt.Errorf("Load assets error: %v", err) return } // Listen for accepting connections from client. - svr.listener, err = frpNet.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort) + svr.listener, err = frpNet.ListenTcp(cfg.BindAddr, cfg.BindPort) if err != nil { err = fmt.Errorf("Create server listener error, %v", err) return } - log.Info("frps tcp listen on %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort) + log.Info("frps tcp listen on %s:%d", cfg.BindAddr, cfg.BindPort) // Listen for accepting connections from client using kcp protocol. - if config.ServerCommonCfg.KcpBindPort > 0 { - svr.kcpListener, err = frpNet.ListenKcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.KcpBindPort) + if cfg.KcpBindPort > 0 { + svr.kcpListener, err = frpNet.ListenKcp(cfg.BindAddr, cfg.KcpBindPort) if err != nil { - err = fmt.Errorf("Listen on kcp address udp [%s:%d] error: %v", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.KcpBindPort, err) + err = fmt.Errorf("Listen on kcp address udp [%s:%d] error: %v", cfg.BindAddr, cfg.KcpBindPort, err) return } - log.Info("frps kcp listen on udp %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort) + log.Info("frps kcp listen on udp %s:%d", cfg.BindAddr, cfg.BindPort) } // Create http vhost muxer. - if config.ServerCommonCfg.VhostHttpPort > 0 { + if cfg.VhostHttpPort > 0 { var l frpNet.Listener - l, err = frpNet.ListenTcp(config.ServerCommonCfg.ProxyBindAddr, config.ServerCommonCfg.VhostHttpPort) + l, err = frpNet.ListenTcp(cfg.ProxyBindAddr, cfg.VhostHttpPort) if err != nil { err = fmt.Errorf("Create vhost http listener error, %v", err) return @@ -105,13 +109,13 @@ func NewService() (svr *Service, err error) { err = fmt.Errorf("Create vhost httpMuxer error, %v", err) return } - log.Info("http service listen on %s:%d", config.ServerCommonCfg.ProxyBindAddr, config.ServerCommonCfg.VhostHttpPort) + log.Info("http service listen on %s:%d", cfg.ProxyBindAddr, cfg.VhostHttpPort) } // Create https vhost muxer. - if config.ServerCommonCfg.VhostHttpsPort > 0 { + if cfg.VhostHttpsPort > 0 { var l frpNet.Listener - l, err = frpNet.ListenTcp(config.ServerCommonCfg.ProxyBindAddr, config.ServerCommonCfg.VhostHttpsPort) + l, err = frpNet.ListenTcp(cfg.ProxyBindAddr, cfg.VhostHttpsPort) if err != nil { err = fmt.Errorf("Create vhost https listener error, %v", err) return @@ -121,22 +125,38 @@ func NewService() (svr *Service, err error) { err = fmt.Errorf("Create vhost httpsMuxer error, %v", err) return } - log.Info("https service listen on %s:%d", config.ServerCommonCfg.ProxyBindAddr, config.ServerCommonCfg.VhostHttpsPort) + log.Info("https service listen on %s:%d", cfg.ProxyBindAddr, cfg.VhostHttpsPort) + } + + // Create nat hole controller. + if cfg.BindUdpPort > 0 { + var nc *NatHoleController + addr := fmt.Sprintf("%s:%d", cfg.BindAddr, cfg.BindUdpPort) + nc, err = NewNatHoleController(addr) + if err != nil { + err = fmt.Errorf("Create nat hole controller error, %v", err) + return + } + svr.natHoleController = nc + log.Info("nat hole udp service listen on %s:%d", cfg.BindAddr, cfg.BindUdpPort) } // Create dashboard web server. - if config.ServerCommonCfg.DashboardPort > 0 { - err = RunDashboardServer(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.DashboardPort) + if cfg.DashboardPort > 0 { + err = RunDashboardServer(cfg.BindAddr, cfg.DashboardPort) if err != nil { err = fmt.Errorf("Create dashboard web server error, %v", err) return } - log.Info("Dashboard listen on %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.DashboardPort) + log.Info("Dashboard listen on %s:%d", cfg.BindAddr, cfg.DashboardPort) } return } func (svr *Service) Run() { + if svr.natHoleController != nil { + go svr.natHoleController.Run() + } if config.ServerCommonCfg.KcpBindPort > 0 { go svr.HandleListener(svr.kcpListener) } diff --git a/utils/net/kcp.go b/utils/net/kcp.go index 862d3846..4a38fc03 100644 --- a/utils/net/kcp.go +++ b/utils/net/kcp.go @@ -20,7 +20,7 @@ import ( "github.com/fatedier/frp/utils/log" - kcp "github.com/xtaci/kcp-go" + kcp "github.com/fatedier/kcp-go" ) type KcpListener struct { @@ -85,3 +85,7 @@ func (l *KcpListener) Close() error { } return nil } + +func NewKcpConnFromUdp(conn *net.UDPConn, connected bool, raddr string) (net.Conn, error) { + return kcp.NewConnEx(1, connected, raddr, nil, 10, 3, conn) +}