From 171bc8dd223da4b28db7a3d237c148f4c4d8c69e Mon Sep 17 00:00:00 2001 From: fatedier Date: Mon, 26 Jun 2017 03:02:33 +0800 Subject: [PATCH] new proxy type: stcp(secret tcp) --- client/control.go | 98 +++++++++++++++++---------- client/proxy.go | 35 +++++++++- client/service.go | 4 +- client/vistor.go | 145 ++++++++++++++++++++++++++++++++++++++++ cmd/frpc/main.go | 4 +- conf/frpc_full.ini | 21 ++++++ models/config/proxy.go | 100 +++++++++++++++++++++++++-- models/consts/consts.go | 1 + models/msg/msg.go | 42 +++++++++--- server/manager.go | 74 ++++++++++++++++++++ server/proxy.go | 32 +++++++++ server/service.go | 30 +++++++-- utils/log/log.go | 5 ++ utils/net/listener.go | 53 +++++++++++++++ 14 files changed, 581 insertions(+), 63 deletions(-) create mode 100644 client/vistor.go diff --git a/client/control.go b/client/control.go index c7020580..45f617b3 100644 --- a/client/control.go +++ b/client/control.go @@ -25,7 +25,7 @@ import ( "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/utils/crypto" "github.com/fatedier/frp/utils/log" - "github.com/fatedier/frp/utils/net" + frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/util" "github.com/fatedier/frp/utils/version" "github.com/xtaci/smux" @@ -48,8 +48,14 @@ type Control struct { // proxies proxies map[string]Proxy + // vistor configures + vistorCfgs map[string]config.ProxyConf + + // vistors + vistors map[string]Vistor + // control connection - conn net.Conn + conn frpNet.Conn // tcp stream multiplexing, if enabled session *smux.Session @@ -77,7 +83,7 @@ type Control struct { log.Logger } -func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf) *Control { +func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf, vistorCfgs map[string]config.ProxyConf) *Control { loginMsg := &msg.Login{ Arch: runtime.GOARCH, Os: runtime.GOOS, @@ -86,14 +92,16 @@ func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf) *Control { Version: version.Full(), } return &Control{ - svr: svr, - loginMsg: loginMsg, - pxyCfgs: pxyCfgs, - proxies: make(map[string]Proxy), - sendCh: make(chan msg.Message, 10), - readCh: make(chan msg.Message, 10), - closedCh: make(chan int), - Logger: log.NewPrefixLogger(""), + svr: svr, + loginMsg: loginMsg, + pxyCfgs: pxyCfgs, + vistorCfgs: vistorCfgs, + proxies: make(map[string]Proxy), + vistors: make(map[string]Vistor), + sendCh: make(chan msg.Message, 10), + readCh: make(chan msg.Message, 10), + closedCh: make(chan int), + Logger: log.NewPrefixLogger(""), } } @@ -105,16 +113,17 @@ func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf) *Control { // 6. In controler(): ini readCh, sendCh, closedCh // 7. In controler(): start new reader(), writer(), manager() // controler() will keep running -func (ctl *Control) Run() error { +func (ctl *Control) Run() (err error) { for { - err := ctl.login() + err = ctl.login() if err != nil { + ctl.Warn("login to server failed: %v", err) + // if login_fail_exit is true, just exit this program // otherwise sleep a while and continues relogin to server if config.ClientCommonCfg.LoginFailExit { - return err + return } else { - ctl.Warn("login to server fail: %v", err) time.Sleep(30 * time.Second) } } else { @@ -133,29 +142,25 @@ func (ctl *Control) Run() error { cfg.UnMarshalToMsg(&newProxyMsg) ctl.sendCh <- &newProxyMsg } + + // start all local vistors + for _, cfg := range ctl.vistorCfgs { + vistor := NewVistor(ctl, cfg) + err = vistor.Run() + if err != nil { + vistor.Warn("start error: %v", err) + continue + } + ctl.vistors[cfg.GetName()] = vistor + vistor.Info("start vistor success") + } return nil } func (ctl *Control) NewWorkConn() { - var ( - workConn net.Conn - err error - ) - if config.ClientCommonCfg.TcpMux { - stream, err := ctl.session.OpenStream() - if err != nil { - ctl.Warn("start new work connection error: %v", err) - return - } - workConn = net.WrapConn(stream) - - } else { - workConn, err = net.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol, - fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort)) - if err != nil { - ctl.Warn("start new work connection error: %v", err) - return - } + workConn, err := ctl.connectServer() + if err != nil { + return } m := &msg.NewWorkConn{ @@ -199,7 +204,7 @@ func (ctl *Control) login() (err error) { ctl.session.Close() } - conn, err := net.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol, + conn, err := frpNet.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol, fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort)) if err != nil { return err @@ -221,7 +226,7 @@ func (ctl *Control) login() (err error) { session.Close() return errRet } - conn = net.WrapConn(stream) + conn = frpNet.WrapConn(stream) ctl.session = session } @@ -261,6 +266,27 @@ func (ctl *Control) login() (err error) { return nil } +func (ctl *Control) connectServer() (conn frpNet.Conn, err error) { + if config.ClientCommonCfg.TcpMux { + stream, errRet := ctl.session.OpenStream() + if errRet != nil { + err = errRet + ctl.Warn("start new connection to server error: %v", err) + return + } + conn = frpNet.WrapConn(stream) + + } else { + conn, err = frpNet.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol, + fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort)) + if err != nil { + ctl.Warn("start new connection to server error: %v", err) + return + } + } + return +} + func (ctl *Control) reader() { defer func() { if err := recover(); err != nil { diff --git a/client/proxy.go b/client/proxy.go index cd7994c6..147a3fbd 100644 --- a/client/proxy.go +++ b/client/proxy.go @@ -31,7 +31,7 @@ import ( frpNet "github.com/fatedier/frp/utils/net" ) -// Proxy defines how to work for different proxy type. +// Proxy defines how to deal with work connections for different proxy type. type Proxy interface { Run() error @@ -67,6 +67,11 @@ func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy) { BaseProxy: baseProxy, cfg: cfg, } + case *config.StcpProxyConf: + pxy = &StcpProxy{ + BaseProxy: baseProxy, + cfg: cfg, + } } return } @@ -162,6 +167,34 @@ func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn) { HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn) } +// STCP +type StcpProxy struct { + BaseProxy + + cfg *config.StcpProxyConf + proxyPlugin plugin.Plugin +} + +func (pxy *StcpProxy) Run() (err error) { + if pxy.cfg.Plugin != "" { + pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams) + if err != nil { + return + } + } + return +} + +func (pxy *StcpProxy) Close() { + if pxy.proxyPlugin != nil { + pxy.proxyPlugin.Close() + } +} + +func (pxy *StcpProxy) InWorkConn(conn frpNet.Conn) { + HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn) +} + // UDP type UdpProxy struct { BaseProxy diff --git a/client/service.go b/client/service.go index 36d0a7e8..776951aa 100644 --- a/client/service.go +++ b/client/service.go @@ -23,11 +23,11 @@ type Service struct { closedCh chan int } -func NewService(pxyCfgs map[string]config.ProxyConf) (svr *Service) { +func NewService(pxyCfgs map[string]config.ProxyConf, vistorCfgs map[string]config.ProxyConf) (svr *Service) { svr = &Service{ closedCh: make(chan int), } - ctl := NewControl(svr, pxyCfgs) + ctl := NewControl(svr, pxyCfgs, vistorCfgs) svr.ctl = ctl return } diff --git a/client/vistor.go b/client/vistor.go new file mode 100644 index 00000000..8787ebfe --- /dev/null +++ b/client/vistor.go @@ -0,0 +1,145 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "io" + "sync" + "time" + + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/models/msg" + frpIo "github.com/fatedier/frp/utils/io" + "github.com/fatedier/frp/utils/log" + frpNet "github.com/fatedier/frp/utils/net" + "github.com/fatedier/frp/utils/util" +) + +// Vistor is used for forward traffics from local port tot remote service. +type Vistor interface { + Run() error + Close() + log.Logger +} + +func NewVistor(ctl *Control, pxyConf config.ProxyConf) (vistor Vistor) { + baseVistor := BaseVistor{ + ctl: ctl, + Logger: log.NewPrefixLogger(pxyConf.GetName()), + } + switch cfg := pxyConf.(type) { + case *config.StcpProxyConf: + vistor = &StcpVistor{ + BaseVistor: baseVistor, + cfg: cfg, + } + } + return +} + +type BaseVistor struct { + ctl *Control + l frpNet.Listener + closed bool + mu sync.RWMutex + log.Logger +} + +type StcpVistor struct { + BaseVistor + + cfg *config.StcpProxyConf +} + +func (sv *StcpVistor) Run() (err error) { + sv.l, err = frpNet.ListenTcp(sv.cfg.BindAddr, int64(sv.cfg.BindPort)) + if err != nil { + return + } + + go sv.worker() + return +} + +func (sv *StcpVistor) Close() { + sv.l.Close() +} + +func (sv *StcpVistor) worker() { + for { + conn, err := sv.l.Accept() + if err != nil { + sv.Warn("stcp local listener closed") + return + } + + go sv.handleConn(conn) + } +} + +func (sv *StcpVistor) handleConn(userConn frpNet.Conn) { + defer userConn.Close() + + sv.Debug("get a new stcp user connection") + vistorConn, err := sv.ctl.connectServer() + if err != nil { + return + } + defer vistorConn.Close() + + now := time.Now().Unix() + newVistorConnMsg := &msg.NewVistorConn{ + ProxyName: sv.cfg.ServerName, + SignKey: util.GetAuthKey(sv.cfg.Sk, now), + Timestamp: now, + UseEncryption: sv.cfg.UseEncryption, + UseCompression: sv.cfg.UseCompression, + } + err = msg.WriteMsg(vistorConn, newVistorConnMsg) + if err != nil { + sv.Warn("send newVistorConnMsg to server error: %v", err) + return + } + + var newVistorConnRespMsg msg.NewVistorConnResp + vistorConn.SetReadDeadline(time.Now().Add(10 * time.Second)) + err = msg.ReadMsgInto(vistorConn, &newVistorConnRespMsg) + if err != nil { + sv.Warn("get newVistorConnRespMsg error: %v", err) + return + } + vistorConn.SetReadDeadline(time.Time{}) + + if newVistorConnRespMsg.Error != "" { + sv.Warn("start new vistor connection error: %s", newVistorConnRespMsg.Error) + return + } + + var remote io.ReadWriteCloser + remote = vistorConn + if sv.cfg.UseEncryption { + remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk)) + if err != nil { + sv.Error("create encryption stream error: %v", err) + return + } + } + + if sv.cfg.UseCompression { + remote = frpIo.WithCompression(remote) + } + + frpIo.Join(userConn, remote) +} diff --git a/cmd/frpc/main.go b/cmd/frpc/main.go index dcbc3a53..870eeae1 100644 --- a/cmd/frpc/main.go +++ b/cmd/frpc/main.go @@ -106,7 +106,7 @@ func main() { } } - pxyCfgs, err := config.LoadProxyConfFromFile(config.ClientCommonCfg.User, conf, config.ClientCommonCfg.Start) + pxyCfgs, vistorCfgs, err := config.LoadProxyConfFromFile(config.ClientCommonCfg.User, conf, config.ClientCommonCfg.Start) if err != nil { fmt.Println(err) os.Exit(1) @@ -115,7 +115,7 @@ func main() { log.InitLog(config.ClientCommonCfg.LogWay, config.ClientCommonCfg.LogFile, config.ClientCommonCfg.LogLevel, config.ClientCommonCfg.LogMaxDays) - svr := client.NewService(pxyCfgs) + svr := client.NewService(pxyCfgs, vistorCfgs) err = svr.Run() if err != nil { fmt.Println(err) diff --git a/conf/frpc_full.ini b/conf/frpc_full.ini index a653fc24..4cc6dc17 100644 --- a/conf/frpc_full.ini +++ b/conf/frpc_full.ini @@ -110,3 +110,24 @@ remote_port = 6004 plugin = http_proxy plugin_http_user = abc plugin_http_passwd = abc + +[secret_tcp] +# If the type is secret tcp, remote_port is useless +# Who want to connect local port should deploy another frpc with stcp proxy and role is vistor +type = stcp +# sk used for authentication for vistors +sk = abcdefg +local_ip = 127.0.0.1 +local_port = 22 +use_encryption = false +use_compression = false + +[secret_tcp_vistor] +# frpc role vistor -> frps -> frpc role server +role = vistor +type = stcp +sk = abcdefg +bind_addr = 127.0.0.1 +bind_port = 9000 +use_encryption = false +use_compression = false diff --git a/models/config/proxy.go b/models/config/proxy.go index 27fd3f64..90c982bf 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -35,6 +35,7 @@ func init() { proxyConfTypeMap[consts.UdpProxy] = reflect.TypeOf(UdpProxyConf{}) proxyConfTypeMap[consts.HttpProxy] = reflect.TypeOf(HttpProxyConf{}) proxyConfTypeMap[consts.HttpsProxy] = reflect.TypeOf(HttpsProxyConf{}) + proxyConfTypeMap[consts.StcpProxy] = reflect.TypeOf(StcpProxyConf{}) } // NewConfByType creates a empty ProxyConf object by proxyType. @@ -388,8 +389,10 @@ func (cfg *HttpProxyConf) LoadFromFile(name string, section ini.Section) (err er if err = cfg.DomainConf.LoadFromFile(name, section); err != nil { return } - if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil { - return + if err = cfg.PluginConf.LoadFromFile(name, section); err != nil { + if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil { + return + } } var ( @@ -447,8 +450,10 @@ func (cfg *HttpsProxyConf) LoadFromFile(name string, section ini.Section) (err e if err = cfg.DomainConf.LoadFromFile(name, section); err != nil { return } - if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil { - return + if err = cfg.PluginConf.LoadFromFile(name, section); err != nil { + if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil { + return + } } return } @@ -466,9 +471,81 @@ func (cfg *HttpsProxyConf) Check() (err error) { return } +// STCP +type StcpProxyConf struct { + BaseProxyConf + + Role string `json:"role"` + Sk string `json:"sk"` + + // used in role server + LocalSvrConf + PluginConf + + // used in role vistor + ServerName string `json:"server_name"` + BindAddr string `json:"bind_addr"` + BindPort int `json:"bind_port"` +} + +// Only for role server. +func (cfg *StcpProxyConf) LoadFromMsg(pMsg *msg.NewProxy) { + cfg.BaseProxyConf.LoadFromMsg(pMsg) + cfg.Sk = pMsg.Sk +} + +func (cfg *StcpProxyConf) LoadFromFile(name string, section ini.Section) (err error) { + if err = cfg.BaseProxyConf.LoadFromFile(name, section); err != nil { + return + } + + tmpStr := section["role"] + if tmpStr == "server" || tmpStr == "vistor" { + cfg.Role = tmpStr + } else { + cfg.Role = "server" + } + + cfg.Sk = section["sk"] + + if tmpStr == "vistor" { + prefix := section["prefix"] + cfg.ServerName = prefix + section["server_name"] + if cfg.BindAddr = section["bind_addr"]; cfg.BindAddr == "" { + cfg.BindAddr = "127.0.0.1" + } + + if tmpStr, ok := section["bind_port"]; ok { + if cfg.BindPort, err = strconv.Atoi(tmpStr); err != nil { + return fmt.Errorf("Parse conf error: proxy [%s] bind_port error", name) + } + } else { + return fmt.Errorf("Parse conf error: proxy [%s] bind_port not found", name) + } + } else { + if err = cfg.PluginConf.LoadFromFile(name, section); err != nil { + if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil { + return + } + } + } + return +} + +func (cfg *StcpProxyConf) UnMarshalToMsg(pMsg *msg.NewProxy) { + cfg.BaseProxyConf.UnMarshalToMsg(pMsg) + pMsg.Sk = cfg.Sk +} + +func (cfg *StcpProxyConf) Check() (err error) { + return +} + // if len(startProxy) is 0, start all // otherwise just start proxies in startProxy map -func LoadProxyConfFromFile(prefix string, conf ini.File, startProxy map[string]struct{}) (proxyConfs map[string]ProxyConf, err error) { +func LoadProxyConfFromFile(prefix string, conf ini.File, startProxy map[string]struct{}) ( + proxyConfs map[string]ProxyConf, vistorConfs map[string]ProxyConf, err error) { + if prefix != "" { prefix += "." } @@ -478,14 +555,23 @@ func LoadProxyConfFromFile(prefix string, conf ini.File, startProxy map[string]s startAll = false } proxyConfs = make(map[string]ProxyConf) + vistorConfs = make(map[string]ProxyConf) for name, section := range conf { _, shouldStart := startProxy[name] if name != "common" && (startAll || shouldStart) { + // some proxy or visotr configure may be used this prefix + section["prefix"] = prefix cfg, err := NewProxyConfFromFile(name, section) if err != nil { - return proxyConfs, err + return proxyConfs, vistorConfs, err + } + + role := section["role"] + if role == "vistor" { + vistorConfs[prefix+name] = cfg + } else { + proxyConfs[prefix+name] = cfg } - proxyConfs[prefix+name] = cfg } } return diff --git a/models/consts/consts.go b/models/consts/consts.go index 170bd240..5a4bc264 100644 --- a/models/consts/consts.go +++ b/models/consts/consts.go @@ -27,4 +27,5 @@ var ( UdpProxy string = "udp" HttpProxy string = "http" HttpsProxy string = "https" + StcpProxy string = "stcp" ) diff --git a/models/msg/msg.go b/models/msg/msg.go index d961befa..59736b6d 100644 --- a/models/msg/msg.go +++ b/models/msg/msg.go @@ -20,17 +20,19 @@ import ( ) const ( - TypeLogin = 'o' - TypeLoginResp = '1' - TypeNewProxy = 'p' - TypeNewProxyResp = '2' - TypeCloseProxy = 'c' - TypeNewWorkConn = 'w' - TypeReqWorkConn = 'r' - TypeStartWorkConn = 's' - TypePing = 'h' - TypePong = '4' - TypeUdpPacket = 'u' + TypeLogin = 'o' + TypeLoginResp = '1' + TypeNewProxy = 'p' + TypeNewProxyResp = '2' + TypeCloseProxy = 'c' + TypeNewWorkConn = 'w' + TypeReqWorkConn = 'r' + TypeStartWorkConn = 's' + TypeNewVistorConn = 'v' + TypeNewVistorConnResp = '3' + TypePing = 'h' + TypePong = '4' + TypeUdpPacket = 'u' ) var ( @@ -50,6 +52,8 @@ func init() { TypeMap[TypeNewWorkConn] = reflect.TypeOf(NewWorkConn{}) TypeMap[TypeReqWorkConn] = reflect.TypeOf(ReqWorkConn{}) TypeMap[TypeStartWorkConn] = reflect.TypeOf(StartWorkConn{}) + TypeMap[TypeNewVistorConn] = reflect.TypeOf(NewVistorConn{}) + TypeMap[TypeNewVistorConnResp] = reflect.TypeOf(NewVistorConnResp{}) TypeMap[TypePing] = reflect.TypeOf(Ping{}) TypeMap[TypePong] = reflect.TypeOf(Pong{}) TypeMap[TypeUdpPacket] = reflect.TypeOf(UdpPacket{}) @@ -100,6 +104,9 @@ type NewProxy struct { HostHeaderRewrite string `json:"host_header_rewrite"` HttpUser string `json:"http_user"` HttpPwd string `json:"http_pwd"` + + // stcp + Sk string `json:"sk"` } type NewProxyResp struct { @@ -122,6 +129,19 @@ type StartWorkConn struct { ProxyName string `json:"proxy_name"` } +type NewVistorConn struct { + ProxyName string `json:"proxy_name"` + SignKey string `json:"sign_key"` + Timestamp int64 `json:"timestamp"` + UseEncryption bool `json:"use_encryption"` + UseCompression bool `json:"use_compression"` +} + +type NewVistorConnResp struct { + ProxyName string `json:"proxy_name"` + Error string `json:"error"` +} + type Ping struct { } diff --git a/server/manager.go b/server/manager.go index 47456c00..c78037dd 100644 --- a/server/manager.go +++ b/server/manager.go @@ -16,7 +16,12 @@ package server import ( "fmt" + "io" "sync" + + frpIo "github.com/fatedier/frp/utils/io" + frpNet "github.com/fatedier/frp/utils/net" + "github.com/fatedier/frp/utils/util" ) type ControlManager struct { @@ -87,3 +92,72 @@ func (pm *ProxyManager) GetByName(name string) (pxy Proxy, ok bool) { pxy, ok = pm.pxys[name] return } + +// Manager for vistor listeners. +type VistorManager struct { + vistorListeners map[string]*frpNet.CustomListener + skMap map[string]string + + mu sync.RWMutex +} + +func NewVistorManager() *VistorManager { + return &VistorManager{ + vistorListeners: make(map[string]*frpNet.CustomListener), + skMap: make(map[string]string), + } +} + +func (vm *VistorManager) Listen(name string, sk string) (l *frpNet.CustomListener, err error) { + vm.mu.Lock() + defer vm.mu.Unlock() + + if _, ok := vm.vistorListeners[name]; ok { + err = fmt.Errorf("custom listener for [%s] is repeated", name) + return + } + + l = frpNet.NewCustomListener() + vm.vistorListeners[name] = l + vm.skMap[name] = sk + return +} + +func (vm *VistorManager) NewConn(name string, conn frpNet.Conn, timestamp int64, signKey string, + useEncryption bool, useCompression bool) (err error) { + + vm.mu.RLock() + defer vm.mu.RUnlock() + + if l, ok := vm.vistorListeners[name]; ok { + var sk string + if sk = vm.skMap[name]; util.GetAuthKey(sk, timestamp) != signKey { + err = fmt.Errorf("vistor connection of [%s] auth failed", name) + return + } + + var rwc io.ReadWriteCloser = conn + if useEncryption { + if rwc, err = frpIo.WithEncryption(rwc, []byte(sk)); err != nil { + err = fmt.Errorf("create encryption connection failed: %v", err) + return + } + } + if useCompression { + rwc = frpIo.WithCompression(rwc) + } + err = l.PutConn(frpNet.WrapReadWriteCloserToConn(rwc)) + } else { + err = fmt.Errorf("custom listener for [%s] doesn't exist", name) + return + } + return +} + +func (vm *VistorManager) CloseListener(name string) { + vm.mu.Lock() + defer vm.mu.Unlock() + + delete(vm.vistorListeners, name) + delete(vm.skMap, name) +} diff --git a/server/proxy.go b/server/proxy.go index 6751e245..ac8ddc07 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -143,6 +143,11 @@ func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy, err error) { BaseProxy: basePxy, cfg: cfg, } + case *config.StcpProxyConf: + pxy = &StcpProxy{ + BaseProxy: basePxy, + cfg: cfg, + } default: return pxy, fmt.Errorf("proxy type not support") } @@ -274,6 +279,33 @@ func (pxy *HttpsProxy) Close() { pxy.BaseProxy.Close() } +type StcpProxy struct { + BaseProxy + cfg *config.StcpProxyConf +} + +func (pxy *StcpProxy) Run() error { + listener, err := pxy.ctl.svr.vistorManager.Listen(pxy.GetName(), pxy.cfg.Sk) + if err != nil { + return err + } + listener.AddLogPrefix(pxy.name) + pxy.listeners = append(pxy.listeners, listener) + pxy.Info("stcp proxy custom listen success") + + pxy.startListenHandler(pxy, HandleUserTcpConnection) + return nil +} + +func (pxy *StcpProxy) GetConf() config.ProxyConf { + return pxy.cfg +} + +func (pxy *StcpProxy) Close() { + pxy.BaseProxy.Close() + pxy.ctl.svr.vistorManager.CloseListener(pxy.GetName()) +} + type UdpProxy struct { BaseProxy cfg *config.UdpProxyConf diff --git a/server/service.go b/server/service.go index 6583b1c6..78e7762e 100644 --- a/server/service.go +++ b/server/service.go @@ -55,12 +55,16 @@ type Service struct { // Manage all proxies. pxyManager *ProxyManager + + // Manage all vistor listeners. + vistorManager *VistorManager } func NewService() (svr *Service, err error) { svr = &Service{ - ctlManager: NewControlManager(), - pxyManager: NewProxyManager(), + ctlManager: NewControlManager(), + pxyManager: NewProxyManager(), + vistorManager: NewVistorManager(), } // Init assets. @@ -176,6 +180,20 @@ func (svr *Service) HandleListener(l frpNet.Listener) { } case *msg.NewWorkConn: svr.RegisterWorkConn(conn, m) + case *msg.NewVistorConn: + if err = svr.RegisterVistorConn(conn, m); err != nil { + conn.Warn("%v", err) + msg.WriteMsg(conn, &msg.NewVistorConnResp{ + ProxyName: m.ProxyName, + Error: err.Error(), + }) + conn.Close() + } else { + msg.WriteMsg(conn, &msg.NewVistorConnResp{ + ProxyName: m.ProxyName, + Error: "", + }) + } default: log.Warn("Error message type for the new connection [%s]", conn.RemoteAddr().String()) conn.Close() @@ -262,9 +280,13 @@ func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkCo return } +func (svr *Service) RegisterVistorConn(vistorConn frpNet.Conn, newMsg *msg.NewVistorConn) error { + return svr.vistorManager.NewConn(newMsg.ProxyName, vistorConn, newMsg.Timestamp, newMsg.SignKey, + newMsg.UseEncryption, newMsg.UseCompression) +} + func (svr *Service) RegisterProxy(name string, pxy Proxy) error { - err := svr.pxyManager.Add(name, pxy) - return err + return svr.pxyManager.Add(name, pxy) } func (svr *Service) DelProxy(name string) { diff --git a/utils/log/log.go b/utils/log/log.go index ec6e0775..a0e42b8f 100644 --- a/utils/log/log.go +++ b/utils/log/log.go @@ -88,6 +88,7 @@ func Trace(format string, v ...interface{}) { // Logger type Logger interface { AddLogPrefix(string) + GetPrefixStr() string GetAllPrefix() []string ClearLogPrefix() Error(string, ...interface{}) @@ -119,6 +120,10 @@ func (pl *PrefixLogger) AddLogPrefix(prefix string) { pl.allPrefix = append(pl.allPrefix, prefix) } +func (pl *PrefixLogger) GetPrefixStr() string { + return pl.prefix +} + func (pl *PrefixLogger) GetAllPrefix() []string { return pl.allPrefix } diff --git a/utils/net/listener.go b/utils/net/listener.go index f9345b92..cb3847e5 100644 --- a/utils/net/listener.go +++ b/utils/net/listener.go @@ -15,8 +15,11 @@ package net import ( + "fmt" "net" + "sync" + "github.com/fatedier/frp/utils/errors" "github.com/fatedier/frp/utils/log" ) @@ -44,3 +47,53 @@ func (logL *LogListener) Accept() (Conn, error) { c, err := logL.l.Accept() return WrapConn(c), err } + +// Custom listener +type CustomListener struct { + conns chan Conn + closed bool + mu sync.Mutex + + log.Logger +} + +func NewCustomListener() *CustomListener { + return &CustomListener{ + conns: make(chan Conn, 64), + Logger: log.NewPrefixLogger(""), + } +} + +func (l *CustomListener) Accept() (Conn, error) { + conn, ok := <-l.conns + if !ok { + return nil, fmt.Errorf("listener closed") + } + conn.AddLogPrefix(l.GetPrefixStr()) + return conn, nil +} + +func (l *CustomListener) PutConn(conn Conn) error { + err := errors.PanicToError(func() { + select { + case l.conns <- conn: + default: + conn.Close() + } + }) + return err +} + +func (l *CustomListener) Close() error { + l.mu.Lock() + defer l.mu.Unlock() + if !l.closed { + close(l.conns) + l.closed = true + } + return nil +} + +func (l *CustomListener) Addr() net.Addr { + return (*net.TCPAddr)(nil) +}