frp/server/proxy.go

600 lines
15 KiB
Go
Raw Normal View History

2017-03-22 18:01:25 +00:00
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2017-03-08 18:03:47 +00:00
package server
import (
2017-03-12 18:44:47 +00:00
"context"
2017-03-08 18:03:47 +00:00
"fmt"
"io"
2017-03-12 18:44:47 +00:00
"net"
2017-04-24 16:34:14 +00:00
"sync"
2017-03-12 18:44:47 +00:00
"time"
2017-03-08 18:03:47 +00:00
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
2017-03-12 18:44:47 +00:00
"github.com/fatedier/frp/models/proto/udp"
"github.com/fatedier/frp/utils/errors"
2017-06-06 10:48:40 +00:00
frpIo "github.com/fatedier/frp/utils/io"
2017-03-08 18:03:47 +00:00
"github.com/fatedier/frp/utils/log"
2017-03-12 18:44:47 +00:00
frpNet "github.com/fatedier/frp/utils/net"
2017-03-09 18:01:17 +00:00
"github.com/fatedier/frp/utils/vhost"
2017-03-08 18:03:47 +00:00
)
type Proxy interface {
Run() error
GetControl() *Control
GetName() string
GetConf() config.ProxyConf
2017-03-12 18:44:47 +00:00
GetWorkConnFromPool() (workConn frpNet.Conn, err error)
2017-03-08 18:03:47 +00:00
Close()
log.Logger
}
type BaseProxy struct {
name string
ctl *Control
2017-03-12 18:44:47 +00:00
listeners []frpNet.Listener
2017-04-24 16:34:14 +00:00
mu sync.RWMutex
2017-03-08 18:03:47 +00:00
log.Logger
}
func (pxy *BaseProxy) GetName() string {
return pxy.name
}
func (pxy *BaseProxy) GetControl() *Control {
return pxy.ctl
}
func (pxy *BaseProxy) Close() {
pxy.Info("proxy closing")
for _, l := range pxy.listeners {
l.Close()
}
}
2017-03-12 18:44:47 +00:00
func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) {
ctl := pxy.GetControl()
// try all connections from the pool
for i := 0; i < ctl.poolCount+1; i++ {
if workConn, err = ctl.GetWorkConn(); err != nil {
pxy.Warn("failed to get work connection: %v", err)
return
}
pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
workConn.AddLogPrefix(pxy.GetName())
err := msg.WriteMsg(workConn, &msg.StartWorkConn{
ProxyName: pxy.GetName(),
})
if err != nil {
workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
workConn.Close()
} else {
break
}
}
if err != nil {
pxy.Error("try to get work connection failed in the end")
return
}
return
}
2017-03-09 18:01:17 +00:00
// startListenHandler start a goroutine handler for each listener.
2017-03-12 18:44:47 +00:00
// p: p will just be passed to handler(Proxy, frpNet.Conn).
2017-03-09 18:01:17 +00:00
// handler: each proxy type can set different handler function to deal with connections accepted from listeners.
2017-03-12 18:44:47 +00:00
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn)) {
2017-03-09 18:01:17 +00:00
for _, listener := range pxy.listeners {
2017-03-12 18:44:47 +00:00
go func(l frpNet.Listener) {
2017-03-09 18:01:17 +00:00
for {
// block
// if listener is closed, err returned
c, err := l.Accept()
if err != nil {
pxy.Info("listener is closed")
return
}
pxy.Debug("get a user connection [%s]", c.RemoteAddr().String())
go handler(p, c)
}
}(listener)
}
}
2017-03-08 18:03:47 +00:00
func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy, err error) {
basePxy := BaseProxy{
name: pxyConf.GetName(),
ctl: ctl,
2017-03-12 18:44:47 +00:00
listeners: make([]frpNet.Listener, 0),
2017-03-08 18:03:47 +00:00
Logger: log.NewPrefixLogger(ctl.runId),
}
switch cfg := pxyConf.(type) {
case *config.TcpProxyConf:
pxy = &TcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.HttpProxyConf:
pxy = &HttpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.HttpsProxyConf:
pxy = &HttpsProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.UdpProxyConf:
pxy = &UdpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
2017-06-25 19:02:33 +00:00
case *config.StcpProxyConf:
pxy = &StcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
2017-10-24 10:20:07 +00:00
case *config.XtcpProxyConf:
pxy = &XtcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
2017-03-08 18:03:47 +00:00
default:
return pxy, fmt.Errorf("proxy type not support")
}
pxy.AddLogPrefix(pxy.GetName())
return
}
type TcpProxy struct {
BaseProxy
cfg *config.TcpProxyConf
}
func (pxy *TcpProxy) Run() error {
2017-07-04 17:40:01 +00:00
listener, err := frpNet.ListenTcp(config.ServerCommonCfg.ProxyBindAddr, pxy.cfg.RemotePort)
2017-03-08 18:03:47 +00:00
if err != nil {
return err
}
2017-03-11 18:03:24 +00:00
listener.AddLogPrefix(pxy.name)
2017-03-08 18:03:47 +00:00
pxy.listeners = append(pxy.listeners, listener)
2017-03-09 18:01:17 +00:00
pxy.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort)
2017-03-08 18:03:47 +00:00
2017-03-09 18:01:17 +00:00
pxy.startListenHandler(pxy, HandleUserTcpConnection)
2017-03-08 18:03:47 +00:00
return nil
}
func (pxy *TcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *TcpProxy) Close() {
pxy.BaseProxy.Close()
}
type HttpProxy struct {
BaseProxy
cfg *config.HttpProxyConf
2017-12-12 19:27:43 +00:00
closeFuncs []func()
2017-03-08 18:03:47 +00:00
}
func (pxy *HttpProxy) Run() (err error) {
2017-12-13 15:44:27 +00:00
routeConfig := vhost.VhostRouteConfig{
RewriteHost: pxy.cfg.HostHeaderRewrite,
Username: pxy.cfg.HttpUser,
Password: pxy.cfg.HttpPwd,
CreateConnFn: pxy.GetRealConn,
2017-03-09 18:01:17 +00:00
}
locations := pxy.cfg.Locations
if len(locations) == 0 {
locations = []string{""}
}
for _, domain := range pxy.cfg.CustomDomains {
routeConfig.Domain = domain
for _, location := range locations {
routeConfig.Location = location
2017-12-13 15:44:27 +00:00
err := pxy.ctl.svr.httpReverseProxy.Register(routeConfig)
2017-03-09 18:01:17 +00:00
if err != nil {
return err
}
2017-12-12 19:27:43 +00:00
tmpDomain := routeConfig.Domain
tmpLocation := routeConfig.Location
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.ctl.svr.httpReverseProxy.UnRegister(tmpDomain, tmpLocation)
})
2017-03-09 18:01:17 +00:00
pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
}
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + config.ServerCommonCfg.SubDomainHost
for _, location := range locations {
routeConfig.Location = location
2017-12-13 15:44:27 +00:00
err := pxy.ctl.svr.httpReverseProxy.Register(routeConfig)
2017-03-09 18:01:17 +00:00
if err != nil {
return err
}
2017-12-12 19:27:43 +00:00
tmpDomain := routeConfig.Domain
tmpLocation := routeConfig.Location
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.ctl.svr.httpReverseProxy.UnRegister(tmpDomain, tmpLocation)
})
2017-03-09 18:01:17 +00:00
pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
}
}
2017-03-08 18:03:47 +00:00
return
}
func (pxy *HttpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HttpProxy) GetRealConn() (workConn frpNet.Conn, err error) {
tmpConn, errRet := pxy.GetWorkConnFromPool()
if errRet != nil {
err = errRet
return
}
var rwc io.ReadWriteCloser = tmpConn
if pxy.cfg.UseEncryption {
rwc, err = frpIo.WithEncryption(rwc, []byte(config.ServerCommonCfg.PrivilegeToken))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.UseCompression {
rwc = frpIo.WithCompression(rwc)
}
workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn)
return
}
2017-03-08 18:03:47 +00:00
func (pxy *HttpProxy) Close() {
pxy.BaseProxy.Close()
2017-12-12 19:27:43 +00:00
for _, closeFn := range pxy.closeFuncs {
closeFn()
}
2017-03-08 18:03:47 +00:00
}
type HttpsProxy struct {
BaseProxy
cfg *config.HttpsProxyConf
}
func (pxy *HttpsProxy) Run() (err error) {
2017-03-09 18:01:17 +00:00
routeConfig := &vhost.VhostRouteConfig{}
for _, domain := range pxy.cfg.CustomDomains {
routeConfig.Domain = domain
l, err := pxy.ctl.svr.VhostHttpsMuxer.Listen(routeConfig)
if err != nil {
return err
}
2017-03-11 18:03:24 +00:00
l.AddLogPrefix(pxy.name)
2017-03-09 18:01:17 +00:00
pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
pxy.listeners = append(pxy.listeners, l)
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + config.ServerCommonCfg.SubDomainHost
l, err := pxy.ctl.svr.VhostHttpsMuxer.Listen(routeConfig)
if err != nil {
return err
}
2017-03-11 18:03:24 +00:00
l.AddLogPrefix(pxy.name)
2017-03-09 18:01:17 +00:00
pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
pxy.listeners = append(pxy.listeners, l)
}
pxy.startListenHandler(pxy, HandleUserTcpConnection)
2017-03-08 18:03:47 +00:00
return
}
func (pxy *HttpsProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HttpsProxy) Close() {
pxy.BaseProxy.Close()
}
2017-06-25 19:02:33 +00:00
type StcpProxy struct {
BaseProxy
cfg *config.StcpProxyConf
}
func (pxy *StcpProxy) Run() error {
2017-12-04 17:34:33 +00:00
listener, err := pxy.ctl.svr.visitorManager.Listen(pxy.GetName(), pxy.cfg.Sk)
2017-06-25 19:02:33 +00:00
if err != nil {
return err
}
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("stcp proxy custom listen success")
pxy.startListenHandler(pxy, HandleUserTcpConnection)
return nil
}
func (pxy *StcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *StcpProxy) Close() {
pxy.BaseProxy.Close()
2017-12-04 17:34:33 +00:00
pxy.ctl.svr.visitorManager.CloseListener(pxy.GetName())
2017-06-25 19:02:33 +00:00
}
2017-10-24 10:20:07 +00:00
type XtcpProxy struct {
BaseProxy
cfg *config.XtcpProxyConf
closeCh chan struct{}
}
func (pxy *XtcpProxy) Run() error {
if pxy.ctl.svr.natHoleController == nil {
pxy.Error("udp port for xtcp is not specified.")
return fmt.Errorf("xtcp is not supported in frps")
}
sidCh := pxy.ctl.svr.natHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk)
go func() {
for {
select {
case <-pxy.closeCh:
break
case sid := <-sidCh:
workConn, err := pxy.GetWorkConnFromPool()
if err != nil {
continue
}
m := &msg.NatHoleSid{
Sid: sid,
}
err = msg.WriteMsg(workConn, m)
if err != nil {
pxy.Warn("write nat hole sid package error, %v", err)
}
}
}
}()
return nil
}
func (pxy *XtcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *XtcpProxy) Close() {
pxy.BaseProxy.Close()
pxy.ctl.svr.natHoleController.CloseClient(pxy.GetName())
errors.PanicToError(func() {
close(pxy.closeCh)
})
}
2017-03-08 18:03:47 +00:00
type UdpProxy struct {
BaseProxy
cfg *config.UdpProxyConf
2017-03-12 18:44:47 +00:00
2017-04-24 16:34:14 +00:00
// udpConn is the listener of udp packages
udpConn *net.UDPConn
// there are always only one workConn at the same time
// get another one if it closed
workConn net.Conn
// sendCh is used for sending packages to workConn
sendCh chan *msg.UdpPacket
// readCh is used for reading packages from workConn
readCh chan *msg.UdpPacket
// checkCloseCh is used for watching if workConn is closed
2017-03-12 18:44:47 +00:00
checkCloseCh chan int
2017-04-24 16:34:14 +00:00
isClosed bool
2017-03-08 18:03:47 +00:00
}
func (pxy *UdpProxy) Run() (err error) {
2017-07-04 17:40:01 +00:00
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", config.ServerCommonCfg.ProxyBindAddr, pxy.cfg.RemotePort))
2017-03-12 18:44:47 +00:00
if err != nil {
return err
}
udpConn, err := net.ListenUDP("udp", addr)
if err != nil {
pxy.Warn("listen udp port error: %v", err)
return err
}
pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort)
pxy.udpConn = udpConn
pxy.sendCh = make(chan *msg.UdpPacket, 1024)
pxy.readCh = make(chan *msg.UdpPacket, 1024)
2017-03-12 18:44:47 +00:00
pxy.checkCloseCh = make(chan int)
2017-04-24 16:34:14 +00:00
// read message from workConn, if it returns any error, notify proxy to start a new workConn
2017-03-12 18:44:47 +00:00
workConnReaderFn := func(conn net.Conn) {
for {
var (
rawMsg msg.Message
errRet error
)
2017-04-24 16:34:14 +00:00
pxy.Trace("loop waiting message from udp workConn")
// client will send heartbeat in workConn for keeping alive
conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
2017-03-12 18:44:47 +00:00
pxy.Warn("read from workConn for udp error: %v", errRet)
conn.Close()
2017-03-26 17:39:05 +00:00
// notify proxy to start a new work connection
2017-04-24 16:34:14 +00:00
// ignore error here, it means the proxy is closed
2017-03-12 18:44:47 +00:00
errors.PanicToError(func() {
pxy.checkCloseCh <- 1
})
return
}
conn.SetReadDeadline(time.Time{})
switch m := rawMsg.(type) {
case *msg.Ping:
pxy.Trace("udp work conn get ping message")
continue
case *msg.UdpPacket:
if errRet := errors.PanicToError(func() {
pxy.Trace("get udp message from workConn: %s", m.Content)
pxy.readCh <- m
StatsAddTrafficOut(pxy.GetName(), int64(len(m.Content)))
}); errRet != nil {
conn.Close()
pxy.Info("reader goroutine for udp work connection closed")
return
}
2017-03-12 18:44:47 +00:00
}
}
}
2017-04-24 16:34:14 +00:00
// send message to workConn
2017-03-12 18:44:47 +00:00
workConnSenderFn := func(conn net.Conn, ctx context.Context) {
var errRet error
for {
select {
case udpMsg, ok := <-pxy.sendCh:
if !ok {
pxy.Info("sender goroutine for udp work connection closed")
2017-03-12 18:44:47 +00:00
return
}
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
pxy.Info("sender goroutine for udp work connection closed: %v", errRet)
2017-04-24 16:34:14 +00:00
conn.Close()
2017-03-12 18:44:47 +00:00
return
} else {
2017-04-24 16:34:14 +00:00
pxy.Trace("send message to udp workConn: %s", udpMsg.Content)
2017-03-26 17:39:05 +00:00
StatsAddTrafficIn(pxy.GetName(), int64(len(udpMsg.Content)))
2017-03-12 18:44:47 +00:00
continue
}
case <-ctx.Done():
pxy.Info("sender goroutine for udp work connection closed")
return
}
}
}
go func() {
2017-04-24 16:34:14 +00:00
// Sleep a while for waiting control send the NewProxyResp to client.
time.Sleep(500 * time.Millisecond)
2017-03-12 18:44:47 +00:00
for {
workConn, err := pxy.GetWorkConnFromPool()
if err != nil {
2017-04-24 16:34:14 +00:00
time.Sleep(1 * time.Second)
2017-03-12 18:44:47 +00:00
// check if proxy is closed
select {
case _, ok := <-pxy.checkCloseCh:
if !ok {
return
}
default:
}
continue
}
2017-04-24 16:34:14 +00:00
// close the old workConn and replac it with a new one
if pxy.workConn != nil {
pxy.workConn.Close()
}
2017-03-12 18:44:47 +00:00
pxy.workConn = workConn
ctx, cancel := context.WithCancel(context.Background())
go workConnReaderFn(workConn)
go workConnSenderFn(workConn, ctx)
_, ok := <-pxy.checkCloseCh
cancel()
if !ok {
return
}
}
}()
2017-04-24 16:34:14 +00:00
// Read from user connections and send wrapped udp message to sendCh (forwarded by workConn).
2017-03-12 18:44:47 +00:00
// Client will transfor udp message to local udp service and waiting for response for a while.
2017-04-24 16:34:14 +00:00
// Response will be wrapped to be forwarded by work connection to server.
// Close readCh and sendCh at the end.
go func() {
udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh)
pxy.Close()
}()
2017-03-12 18:44:47 +00:00
return nil
2017-03-08 18:03:47 +00:00
}
func (pxy *UdpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *UdpProxy) Close() {
2017-04-24 16:34:14 +00:00
pxy.mu.Lock()
defer pxy.mu.Unlock()
if !pxy.isClosed {
pxy.isClosed = true
pxy.BaseProxy.Close()
if pxy.workConn != nil {
pxy.workConn.Close()
}
2017-04-24 16:34:14 +00:00
pxy.udpConn.Close()
// all channels only closed here
close(pxy.checkCloseCh)
close(pxy.readCh)
close(pxy.sendCh)
}
2017-03-08 18:03:47 +00:00
}
// HandleUserTcpConnection is used for incoming tcp user connections.
2017-03-09 17:42:06 +00:00
// It can be used for tcp, http, https type.
2017-03-12 18:44:47 +00:00
func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn) {
2017-03-08 18:03:47 +00:00
defer userConn.Close()
2017-03-12 18:44:47 +00:00
// try all connections from the pool
workConn, err := pxy.GetWorkConnFromPool()
2017-03-08 18:03:47 +00:00
if err != nil {
return
}
2017-03-12 18:44:47 +00:00
defer workConn.Close()
2017-03-08 18:03:47 +00:00
2017-03-09 18:01:17 +00:00
var local io.ReadWriteCloser = workConn
2017-03-08 18:03:47 +00:00
cfg := pxy.GetConf().GetBaseInfo()
if cfg.UseEncryption {
2017-06-06 10:48:40 +00:00
local, err = frpIo.WithEncryption(local, []byte(config.ServerCommonCfg.PrivilegeToken))
2017-03-08 18:03:47 +00:00
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return
}
}
if cfg.UseCompression {
2017-06-06 10:48:40 +00:00
local = frpIo.WithCompression(local)
2017-03-08 18:03:47 +00:00
}
2017-03-09 18:01:17 +00:00
pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
2017-03-22 18:01:25 +00:00
StatsOpenConnection(pxy.GetName())
2017-06-06 10:48:40 +00:00
inCount, outCount := frpIo.Join(local, userConn)
2017-03-22 18:01:25 +00:00
StatsCloseConnection(pxy.GetName())
2017-03-26 17:39:05 +00:00
StatsAddTrafficIn(pxy.GetName(), inCount)
StatsAddTrafficOut(pxy.GetName(), outCount)
2017-03-09 18:01:17 +00:00
pxy.Debug("join connections closed")
2017-03-08 18:03:47 +00:00
}