diff --git a/client/admin_api.go b/client/admin_api.go index af61b395..1acbb95b 100644 --- a/client/admin_api.go +++ b/client/admin_api.go @@ -88,6 +88,7 @@ type StatusResp struct { Https []ProxyStatusResp `json:"https"` Stcp []ProxyStatusResp `json:"stcp"` Xtcp []ProxyStatusResp `json:"xtcp"` + Sudp []ProxyStatusResp `json:"sudp"` } type ProxyStatusResp struct { @@ -155,6 +156,11 @@ func NewProxyStatusResp(status *proxy.ProxyStatus, serverAddr string) ProxyStatu psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort) } psr.Plugin = cfg.Plugin + case *config.SudpProxyConf: + if cfg.LocalPort != 0 { + psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort) + } + psr.Plugin = cfg.Plugin } return psr } @@ -171,6 +177,7 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) { res.Https = make([]ProxyStatusResp, 0) res.Stcp = make([]ProxyStatusResp, 0) res.Xtcp = make([]ProxyStatusResp, 0) + res.Sudp = make([]ProxyStatusResp, 0) log.Info("Http request [/api/status]") defer func() { @@ -194,6 +201,8 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) { res.Stcp = append(res.Stcp, NewProxyStatusResp(status, svr.cfg.ServerAddr)) case "xtcp": res.Xtcp = append(res.Xtcp, NewProxyStatusResp(status, svr.cfg.ServerAddr)) + case "sudp": + res.Sudp = append(res.Sudp, NewProxyStatusResp(status, svr.cfg.ServerAddr)) } } sort.Sort(ByProxyStatusResp(res.Tcp)) @@ -202,6 +211,7 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) { sort.Sort(ByProxyStatusResp(res.Https)) sort.Sort(ByProxyStatusResp(res.Stcp)) sort.Sort(ByProxyStatusResp(res.Xtcp)) + sort.Sort(ByProxyStatusResp(res.Sudp)) return } diff --git a/client/proxy/proxy.go b/client/proxy/proxy.go index c263e1d2..0c9ea52f 100644 --- a/client/proxy/proxy.go +++ b/client/proxy/proxy.go @@ -102,6 +102,12 @@ func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.Cl BaseProxy: &baseProxy, cfg: cfg, } + case *config.SudpProxyConf: + pxy = &SudpProxy{ + BaseProxy: &baseProxy, + cfg: cfg, + closeCh: make(chan struct{}), + } } return } @@ -540,6 +546,151 @@ func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh) } +type SudpProxy struct { + *BaseProxy + + cfg *config.SudpProxyConf + + localAddr *net.UDPAddr + + closeCh chan struct{} +} + +func (pxy *SudpProxy) Run() (err error) { + pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIp, pxy.cfg.LocalPort)) + if err != nil { + return + } + return +} + +func (pxy *SudpProxy) Close() { + pxy.mu.Lock() + defer pxy.mu.Unlock() + select { + case <-pxy.closeCh: + return + default: + close(pxy.closeCh) + } +} + +func (pxy *SudpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { + xl := pxy.xl + xl.Info("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String()) + + if pxy.limiter != nil { + rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error { + return conn.Close() + }) + conn = frpNet.WrapReadWriteCloserToConn(rwc, conn) + } + + workConn := conn + readCh := make(chan *msg.UdpPacket, 1024) + sendCh := make(chan msg.Message, 1024) + isClose := false + + mu := &sync.Mutex{} + + closeFn := func() { + mu.Lock() + defer mu.Unlock() + if isClose { + return + } + + isClose = true + if workConn != nil { + workConn.Close() + } + close(readCh) + close(sendCh) + } + + // udp service <- frpc <- frps <- frpc visitor <- user + workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) { + defer closeFn() + + for { + // first to check sudp proxy is closed or not + select { + case <-pxy.closeCh: + xl.Trace("frpc sudp proxy is closed") + return + default: + } + + var udpMsg msg.UdpPacket + if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil { + xl.Warn("read from workConn for sudp error: %v", errRet) + return + } + + if errRet := errors.PanicToError(func() { + readCh <- &udpMsg + }); errRet != nil { + xl.Warn("reader goroutine for sudp work connection closed: %v", errRet) + return + } + } + } + + // udp service -> frpc -> frps -> frpc visitor -> user + workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) { + defer func() { + closeFn() + xl.Info("writer goroutine for sudp work connection closed") + }() + + var errRet error + for rawMsg := range sendCh { + switch m := rawMsg.(type) { + case *msg.UdpPacket: + xl.Trace("frpc send udp package to frpc visitor, [udp local: %v, remote: %v], [tcp work conn local: %v, remote: %v]", + m.LocalAddr.String(), m.RemoteAddr.String(), conn.LocalAddr().String(), conn.RemoteAddr().String()) + case *msg.Ping: + xl.Trace("frpc send ping message to frpc visitor") + } + + if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil { + xl.Error("sudp work write error: %v", errRet) + return + } + } + } + + heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) { + ticker := time.NewTicker(30 * time.Second) + defer func() { + ticker.Stop() + closeFn() + }() + + var errRet error + for { + select { + case <-ticker.C: + if errRet = errors.PanicToError(func() { + sendCh <- &msg.Ping{} + }); errRet != nil { + xl.Warn("heartbeat goroutine for sudp work connection closed") + return + } + case <-pxy.closeCh: + xl.Trace("frpc sudp proxy is closed") + return + } + } + } + + go workConnSenderFn(workConn, sendCh) + go workConnReaderFn(workConn, readCh) + go heartbeatFn(workConn, sendCh) + + udp.Forwarder(pxy.localAddr, readCh, sendCh) +} + // Common handler for tcp work connections. func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin, baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) { diff --git a/client/visitor.go b/client/visitor.go index a4900e06..d3004868 100644 --- a/client/visitor.go +++ b/client/visitor.go @@ -26,10 +26,12 @@ import ( "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/models/proto/udp" frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/util" "github.com/fatedier/frp/utils/xlog" + "github.com/fatedier/golib/errors" frpIo "github.com/fatedier/golib/io" "github.com/fatedier/golib/pool" fmux "github.com/hashicorp/yamux" @@ -58,6 +60,12 @@ func NewVisitor(ctx context.Context, ctl *Control, cfg config.VisitorConf) (visi BaseVisitor: &baseVisitor, cfg: cfg, } + case *config.SudpVisitorConf: + visitor = &SudpVisitor{ + BaseVisitor: &baseVisitor, + cfg: cfg, + checkCloseCh: make(chan struct{}), + } } return } @@ -328,3 +336,204 @@ func (sv *XtcpVisitor) handleConn(userConn net.Conn) { frpIo.Join(userConn, muxConnRWCloser) xl.Debug("join connections closed") } + +type SudpVisitor struct { + *BaseVisitor + + checkCloseCh chan struct{} + // udpConn is the listener of udp packet + udpConn *net.UDPConn + readCh chan *msg.UdpPacket + sendCh chan *msg.UdpPacket + + cfg *config.SudpVisitorConf +} + +// SUDP Run start listen a udp port +func (sv *SudpVisitor) Run() (err error) { + xl := xlog.FromContextSafe(sv.ctx) + + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", sv.cfg.BindAddr, sv.cfg.BindPort)) + if err != nil { + return fmt.Errorf("sudp ResolveUDPAddr error: %v", err) + } + + sv.udpConn, err = net.ListenUDP("udp", addr) + if err != nil { + return fmt.Errorf("listen udp port %s error: %v", addr.String(), err) + } + + sv.sendCh = make(chan *msg.UdpPacket, 1024) + sv.readCh = make(chan *msg.UdpPacket, 1024) + + xl.Info("sudp start to work") + + go sv.dispatcher() + go udp.ForwardUserConn(sv.udpConn, sv.readCh, sv.sendCh) + + return +} + +func (sv *SudpVisitor) dispatcher() { + xl := xlog.FromContextSafe(sv.ctx) + + for { + // loop for get frpc to frps tcp conn + // setup worker + // wait worker to finished + // retry or exit + visitorConn, err := sv.getNewVisitorConn() + if err != nil { + // check if proxy is closed + // if checkCloseCh is close, we will return, other case we will continue to reconnect + select { + case <-sv.checkCloseCh: + xl.Info("frpc sudp visitor proxy is closed") + return + default: + } + + time.Sleep(3 * time.Second) + + xl.Warn("newVisitorConn to frps error: %v, try to reconnect", err) + continue + } + + sv.worker(visitorConn) + + select { + case <-sv.checkCloseCh: + return + default: + } + } +} + +func (sv *SudpVisitor) worker(workConn net.Conn) { + xl := xlog.FromContextSafe(sv.ctx) + xl.Debug("starting sudp proxy worker") + + wg := &sync.WaitGroup{} + wg.Add(2) + closeCh := make(chan struct{}) + + // udp service -> frpc -> frps -> frpc visitor -> user + workConnReaderFn := func(conn net.Conn) { + defer func() { + conn.Close() + close(closeCh) + wg.Done() + }() + + for { + var ( + rawMsg msg.Message + errRet error + ) + + // frpc will send heartbeat in workConn to frpc visitor for keeping alive + conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil { + xl.Warn("read from workconn for user udp conn error: %v", errRet) + return + } + + conn.SetReadDeadline(time.Time{}) + switch m := rawMsg.(type) { + case *msg.Ping: + xl.Debug("frpc visitor get ping message from frpc") + continue + case *msg.UdpPacket: + if errRet := errors.PanicToError(func() { + sv.readCh <- m + xl.Trace("frpc visitor get udp packet from frpc") + }); errRet != nil { + xl.Info("reader goroutine for udp work connection closed") + return + } + } + } + } + + // udp service <- frpc <- frps <- frpc visitor <- user + workConnSenderFn := func(conn net.Conn) { + defer func() { + conn.Close() + wg.Done() + }() + + var errRet error + for { + select { + case udpMsg, ok := <-sv.sendCh: + if !ok { + xl.Info("sender goroutine for udp work connection closed") + return + } + + if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil { + xl.Warn("sender goroutine for udp work connection closed: %v", errRet) + return + } + case <-closeCh: + return + } + } + } + + go workConnReaderFn(workConn) + go workConnSenderFn(workConn) + + wg.Wait() + xl.Info("sudp worker is closed") +} + +func (sv *SudpVisitor) getNewVisitorConn() (visitorConn net.Conn, err error) { + visitorConn, err = sv.ctl.connectServer() + if err != nil { + return nil, fmt.Errorf("frpc connect frps error: %v", err) + } + + now := time.Now().Unix() + newVisitorConnMsg := &msg.NewVisitorConn{ + ProxyName: sv.cfg.ServerName, + SignKey: util.GetAuthKey(sv.cfg.Sk, now), + Timestamp: now, + UseEncryption: sv.cfg.UseEncryption, + UseCompression: sv.cfg.UseCompression, + } + err = msg.WriteMsg(visitorConn, newVisitorConnMsg) + if err != nil { + return nil, fmt.Errorf("frpc send newVisitorConnMsg to frps error: %v", err) + } + + var newVisitorConnRespMsg msg.NewVisitorConnResp + visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second)) + err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg) + if err != nil { + return nil, fmt.Errorf("frpc read newVisitorConnRespMsg error: %v", err) + } + visitorConn.SetReadDeadline(time.Time{}) + + if newVisitorConnRespMsg.Error != "" { + return nil, fmt.Errorf("start new visitor connection error: %s", newVisitorConnRespMsg.Error) + } + return +} + +func (sv *SudpVisitor) Close() { + sv.mu.Lock() + defer sv.mu.Unlock() + + select { + case <-sv.checkCloseCh: + return + default: + close(sv.checkCloseCh) + } + if sv.udpConn != nil { + sv.udpConn.Close() + } + close(sv.readCh) + close(sv.sendCh) +} diff --git a/cmd/frpc/sub/sudp.go b/cmd/frpc/sub/sudp.go new file mode 100644 index 00000000..e3e91abc --- /dev/null +++ b/cmd/frpc/sub/sudp.go @@ -0,0 +1,113 @@ +// Copyright 2018 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sub + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/models/consts" +) + +func init() { + sudpCmd.PersistentFlags().StringVarP(&serverAddr, "server_addr", "s", "127.0.0.1:7000", "frp server's address") + sudpCmd.PersistentFlags().StringVarP(&user, "user", "u", "", "user") + sudpCmd.PersistentFlags().StringVarP(&protocol, "protocol", "p", "tcp", "tcp or kcp or websocket") + sudpCmd.PersistentFlags().StringVarP(&token, "token", "t", "", "auth token") + sudpCmd.PersistentFlags().StringVarP(&logLevel, "log_level", "", "info", "log level") + sudpCmd.PersistentFlags().StringVarP(&logFile, "log_file", "", "console", "console or file path") + sudpCmd.PersistentFlags().IntVarP(&logMaxDays, "log_max_days", "", 3, "log file reversed days") + sudpCmd.PersistentFlags().BoolVarP(&disableLogColor, "disable_log_color", "", false, "disable log color in console") + + sudpCmd.PersistentFlags().StringVarP(&proxyName, "proxy_name", "n", "", "proxy name") + sudpCmd.PersistentFlags().StringVarP(&role, "role", "", "server", "role") + sudpCmd.PersistentFlags().StringVarP(&sk, "sk", "", "", "secret key") + sudpCmd.PersistentFlags().StringVarP(&serverName, "server_name", "", "", "server name") + sudpCmd.PersistentFlags().StringVarP(&localIp, "local_ip", "i", "127.0.0.1", "local ip") + sudpCmd.PersistentFlags().IntVarP(&localPort, "local_port", "l", 0, "local port") + sudpCmd.PersistentFlags().StringVarP(&bindAddr, "bind_addr", "", "", "bind addr") + sudpCmd.PersistentFlags().IntVarP(&bindPort, "bind_port", "", 0, "bind port") + sudpCmd.PersistentFlags().BoolVarP(&useEncryption, "ue", "", false, "use encryption") + sudpCmd.PersistentFlags().BoolVarP(&useCompression, "uc", "", false, "use compression") + + rootCmd.AddCommand(sudpCmd) +} + +var sudpCmd = &cobra.Command{ + Use: "sudp", + Short: "Run frpc with a single sudp proxy", + RunE: func(cmd *cobra.Command, args []string) error { + clientCfg, err := parseClientCommonCfg(CfgFileTypeCmd, "") + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + proxyConfs := make(map[string]config.ProxyConf) + visitorConfs := make(map[string]config.VisitorConf) + + var prefix string + if user != "" { + prefix = user + "." + } + + if role == "server" { + cfg := &config.SudpProxyConf{} + cfg.ProxyName = prefix + proxyName + cfg.ProxyType = consts.SudpProxy + cfg.UseEncryption = useEncryption + cfg.UseCompression = useCompression + cfg.Role = role + cfg.Sk = sk + cfg.LocalIp = localIp + cfg.LocalPort = localPort + err = cfg.CheckForCli() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + proxyConfs[cfg.ProxyName] = cfg + } else if role == "visitor" { + cfg := &config.SudpVisitorConf{} + cfg.ProxyName = prefix + proxyName + cfg.ProxyType = consts.SudpProxy + cfg.UseEncryption = useEncryption + cfg.UseCompression = useCompression + cfg.Role = role + cfg.Sk = sk + cfg.ServerName = serverName + cfg.BindAddr = bindAddr + cfg.BindPort = bindPort + err = cfg.Check() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + visitorConfs[cfg.ProxyName] = cfg + } else { + fmt.Println("invalid role") + os.Exit(1) + } + + err = startService(clientCfg, proxyConfs, visitorConfs, "") + if err != nil { + os.Exit(1) + } + return nil + }, +} diff --git a/models/config/proxy.go b/models/config/proxy.go index 591011f9..a1e34ce8 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -40,6 +40,7 @@ func init() { proxyConfTypeMap[consts.HttpsProxy] = reflect.TypeOf(HttpsProxyConf{}) proxyConfTypeMap[consts.StcpProxy] = reflect.TypeOf(StcpProxyConf{}) proxyConfTypeMap[consts.XtcpProxy] = reflect.TypeOf(XtcpProxyConf{}) + proxyConfTypeMap[consts.SudpProxy] = reflect.TypeOf(SudpProxyConf{}) } // NewConfByType creates a empty ProxyConf object by proxyType. @@ -875,6 +876,72 @@ func (cfg *HttpsProxyConf) CheckForSvr(serverCfg ServerCommonConf) (err error) { return } +// SUDP +type SudpProxyConf struct { + BaseProxyConf + + Role string `json:"role"` + Sk string `json:"sk"` +} + +func (cfg *SudpProxyConf) Compare(cmp ProxyConf) bool { + cmpConf, ok := cmp.(*SudpProxyConf) + if !ok { + return false + } + + if !cfg.BaseProxyConf.compare(&cmpConf.BaseProxyConf) || + cfg.Role != cmpConf.Role || + cfg.Sk != cmpConf.Sk { + return false + } + return true +} + +func (cfg *SudpProxyConf) UnmarshalFromIni(prefix string, name string, section ini.Section) (err error) { + if err = cfg.BaseProxyConf.UnmarshalFromIni(prefix, name, section); err != nil { + return + } + + cfg.Role = section["role"] + if cfg.Role != "server" { + return fmt.Errorf("Parse conf error: proxy [%s] incorrect role [%s]", name, cfg.Role) + } + + cfg.Sk = section["sk"] + + if err = cfg.LocalSvrConf.UnmarshalFromIni(prefix, name, section); err != nil { + return + } + return +} + +func (cfg *SudpProxyConf) MarshalToMsg(pMsg *msg.NewProxy) { + cfg.BaseProxyConf.MarshalToMsg(pMsg) + pMsg.Sk = cfg.Sk +} + +func (cfg *SudpProxyConf) CheckForCli() (err error) { + if err = cfg.BaseProxyConf.checkForCli(); err != nil { + return + } + if cfg.Role != "server" { + err = fmt.Errorf("role should be 'server'") + return + } + return +} + +func (cfg *SudpProxyConf) CheckForSvr(serverCfg ServerCommonConf) (err error) { + return +} + +// Only for role server. +func (cfg *SudpProxyConf) UnmarshalFromMsg(pMsg *msg.NewProxy) { + cfg.BaseProxyConf.UnmarshalFromMsg(pMsg) + cfg.Sk = pMsg.Sk +} + // STCP type StcpProxyConf struct { BaseProxyConf diff --git a/models/config/visitor.go b/models/config/visitor.go index 4233375c..ad9ff841 100644 --- a/models/config/visitor.go +++ b/models/config/visitor.go @@ -32,6 +32,7 @@ func init() { visitorConfTypeMap = make(map[string]reflect.Type) visitorConfTypeMap[consts.StcpProxy] = reflect.TypeOf(StcpVisitorConf{}) visitorConfTypeMap[consts.XtcpProxy] = reflect.TypeOf(XtcpVisitorConf{}) + visitorConfTypeMap[consts.SudpProxy] = reflect.TypeOf(SudpVisitorConf{}) } type VisitorConf interface { @@ -152,6 +153,36 @@ func (cfg *BaseVisitorConf) UnmarshalFromIni(prefix string, name string, section return nil } +type SudpVisitorConf struct { + BaseVisitorConf +} + +func (cfg *SudpVisitorConf) Compare(cmp VisitorConf) bool { + cmpConf, ok := cmp.(*SudpVisitorConf) + if !ok { + return false + } + + if !cfg.BaseVisitorConf.compare(&cmpConf.BaseVisitorConf) { + return false + } + return true +} + +func (cfg *SudpVisitorConf) UnmarshalFromIni(prefix string, name string, section ini.Section) (err error) { + if err = cfg.BaseVisitorConf.UnmarshalFromIni(prefix, name, section); err != nil { + return + } + return +} + +func (cfg *SudpVisitorConf) Check() (err error) { + if err = cfg.BaseVisitorConf.check(); err != nil { + return + } + return +} + type StcpVisitorConf struct { BaseVisitorConf } diff --git a/models/consts/consts.go b/models/consts/consts.go index 4c1ca4c7..e63da54a 100644 --- a/models/consts/consts.go +++ b/models/consts/consts.go @@ -30,6 +30,7 @@ var ( HttpsProxy string = "https" StcpProxy string = "stcp" XtcpProxy string = "xtcp" + SudpProxy string = "sudp" // authentication method TokenAuthMethod string = "token" diff --git a/models/proto/udp/udp.go b/models/proto/udp/udp.go index ed7f95a9..8ae1db64 100644 --- a/models/proto/udp/udp.go +++ b/models/proto/udp/udp.go @@ -57,11 +57,11 @@ func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UdpPacket, sendCh for { n, remoteAddr, err := udpConn.ReadFromUDP(buf) if err != nil { - udpConn.Close() return } // buf[:n] will be encoded to string, so the bytes can be reused udpMsg := NewUdpPacket(buf[:n], nil, remoteAddr) + select { case sendCh <- udpMsg: default: diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index 41d3b493..d3668a17 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -219,6 +219,11 @@ func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.Reso BaseProxy: &basePxy, cfg: cfg, } + case *config.SudpProxyConf: + pxy = &SudpProxy{ + BaseProxy: &basePxy, + cfg: cfg, + } default: return pxy, fmt.Errorf("proxy type not support") } diff --git a/server/proxy/sudp.go b/server/proxy/sudp.go new file mode 100644 index 00000000..2916334a --- /dev/null +++ b/server/proxy/sudp.go @@ -0,0 +1,48 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "github.com/fatedier/frp/models/config" +) + +type SudpProxy struct { + *BaseProxy + cfg *config.SudpProxyConf +} + +func (pxy *SudpProxy) Run() (remoteAddr string, err error) { + xl := pxy.xl + + listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk) + if errRet != nil { + err = errRet + return + } + pxy.listeners = append(pxy.listeners, listener) + xl.Info("sudp proxy custom listen success") + + pxy.startListenHandler(pxy, HandleUserTcpConnection) + return +} + +func (pxy *SudpProxy) GetConf() config.ProxyConf { + return pxy.cfg +} + +func (pxy *SudpProxy) Close() { + pxy.BaseProxy.Close() + pxy.rc.VisitorManager.CloseListener(pxy.GetName()) +} diff --git a/tests/ci/auto_test_frpc.ini b/tests/ci/auto_test_frpc.ini index fbcf971a..46f12797 100644 --- a/tests/ci/auto_test_frpc.ini +++ b/tests/ci/auto_test_frpc.ini @@ -72,6 +72,12 @@ local_port = 10701 use_encryption = true use_compression = true +[sudp] +type = sudp +sk = abcdefg +local_ip = 127.0.0.1 +local_port = 10702 + [web01] type = http local_ip = 127.0.0.1 diff --git a/tests/ci/auto_test_frpc_visitor.ini b/tests/ci/auto_test_frpc_visitor.ini index 660c7931..2ed3af56 100644 --- a/tests/ci/auto_test_frpc_visitor.ini +++ b/tests/ci/auto_test_frpc_visitor.ini @@ -23,3 +23,12 @@ bind_addr = 127.0.0.1 bind_port = 10905 use_encryption = true use_compression = true + + +[sudp_visitor] +type = sudp +role = visitor +server_name = sudp +sk = abcdefg +bind_addr = 127.0.0.1 +bind_port = 10816 diff --git a/tests/ci/normal_test.go b/tests/ci/normal_test.go index 572fec09..6f1a8ade 100644 --- a/tests/ci/normal_test.go +++ b/tests/ci/normal_test.go @@ -118,6 +118,16 @@ func TestStcp(t *testing.T) { } } +func TestSudp(t *testing.T) { + assert := assert.New(t) + // Normal + addr := fmt.Sprintf("127.0.0.1:%d", consts.TEST_SUDP_FRP_PORT) + res, err := util.SendUdpMsg(addr, consts.TEST_SUDP_ECHO_STR) + + assert.NoError(err) + assert.Equal(consts.TEST_SUDP_ECHO_STR, res) +} + func TestHttp(t *testing.T) { assert := assert.New(t) // web01 diff --git a/tests/consts/consts.go b/tests/consts/consts.go index 5fc8857d..e7f970e4 100644 --- a/tests/consts/consts.go +++ b/tests/consts/consts.go @@ -46,6 +46,9 @@ var ( TEST_STCP_EC_FRP_PORT int = 10905 TEST_STCP_ECHO_STR string = "stcp type:" + TEST_STR + TEST_SUDP_FRP_PORT int = 10816 + TEST_SUDP_ECHO_STR string = "sudp type:" + TEST_STR + ProxyTcpPortNotAllowed string = "tcp_port_not_allowed" ProxyTcpPortUnavailable string = "tcp_port_unavailable" ProxyTcpPortNormal string = "tcp_port_normal" diff --git a/tests/util/util.go b/tests/util/util.go index 163ddc26..3dfd28f9 100644 --- a/tests/util/util.go +++ b/tests/util/util.go @@ -71,6 +71,11 @@ func GetProxyStatus(statusAddr string, user string, passwd string, name string) return &s, nil } } + for _, s := range allStatus.Sudp { + if s.Name == name { + return &s, nil + } + } return status, errors.New("no proxy status found") }