frp/server/group/tcpmux.go

220 lines
5.4 KiB
Go
Raw Normal View History

2020-04-20 05:35:47 +00:00
// Copyright 2020 guylewin, guy@lewin.co.il
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package group
import (
"context"
"fmt"
"net"
"sync"
2020-09-23 05:49:14 +00:00
"github.com/fatedier/frp/pkg/consts"
"github.com/fatedier/frp/pkg/util/tcpmux"
"github.com/fatedier/frp/pkg/util/vhost"
2020-04-20 05:35:47 +00:00
gerr "github.com/fatedier/golib/errors"
)
2020-05-24 09:48:37 +00:00
// TCPMuxGroupCtl manage all TCPMuxGroups
type TCPMuxGroupCtl struct {
groups map[string]*TCPMuxGroup
2020-04-20 05:35:47 +00:00
// portManager is used to manage port
2020-05-24 09:48:37 +00:00
tcpMuxHTTPConnectMuxer *tcpmux.HTTPConnectTCPMuxer
2020-04-20 05:35:47 +00:00
mu sync.Mutex
}
2020-05-24 09:48:37 +00:00
// NewTCPMuxGroupCtl return a new TCPMuxGroupCtl
func NewTCPMuxGroupCtl(tcpMuxHTTPConnectMuxer *tcpmux.HTTPConnectTCPMuxer) *TCPMuxGroupCtl {
return &TCPMuxGroupCtl{
groups: make(map[string]*TCPMuxGroup),
tcpMuxHTTPConnectMuxer: tcpMuxHTTPConnectMuxer,
2020-04-20 05:35:47 +00:00
}
}
2020-05-24 09:48:37 +00:00
// Listen is the wrapper for TCPMuxGroup's Listen
2020-04-20 05:35:47 +00:00
// If there are no group, we will create one here
2020-05-24 09:48:37 +00:00
func (tmgc *TCPMuxGroupCtl) Listen(ctx context.Context, multiplexer string, group string, groupKey string,
domain string) (l net.Listener, err error) {
2020-04-20 05:35:47 +00:00
tmgc.mu.Lock()
tcpMuxGroup, ok := tmgc.groups[group]
if !ok {
2020-05-24 09:48:37 +00:00
tcpMuxGroup = NewTCPMuxGroup(tmgc)
2020-04-20 05:35:47 +00:00
tmgc.groups[group] = tcpMuxGroup
}
tmgc.mu.Unlock()
switch multiplexer {
2020-05-24 09:48:37 +00:00
case consts.HTTPConnectTCPMultiplexer:
return tcpMuxGroup.HTTPConnectListen(ctx, group, groupKey, domain)
2020-04-20 05:35:47 +00:00
default:
err = fmt.Errorf("unknown multiplexer [%s]", multiplexer)
return
}
}
2020-05-24 09:48:37 +00:00
// RemoveGroup remove TCPMuxGroup from controller
func (tmgc *TCPMuxGroupCtl) RemoveGroup(group string) {
2020-04-20 05:35:47 +00:00
tmgc.mu.Lock()
defer tmgc.mu.Unlock()
delete(tmgc.groups, group)
}
2020-05-24 09:48:37 +00:00
// TCPMuxGroup route connections to different proxies
type TCPMuxGroup struct {
2020-04-20 05:35:47 +00:00
group string
groupKey string
domain string
acceptCh chan net.Conn
index uint64
tcpMuxLn net.Listener
2020-05-24 09:48:37 +00:00
lns []*TCPMuxGroupListener
ctl *TCPMuxGroupCtl
2020-04-20 05:35:47 +00:00
mu sync.Mutex
}
2020-05-24 09:48:37 +00:00
// NewTCPMuxGroup return a new TCPMuxGroup
func NewTCPMuxGroup(ctl *TCPMuxGroupCtl) *TCPMuxGroup {
return &TCPMuxGroup{
lns: make([]*TCPMuxGroupListener, 0),
2020-04-20 05:35:47 +00:00
ctl: ctl,
acceptCh: make(chan net.Conn),
}
}
2020-05-24 09:48:37 +00:00
// Listen will return a new TCPMuxGroupListener
// if TCPMuxGroup already has a listener, just add a new TCPMuxGroupListener to the queues
2020-04-20 05:35:47 +00:00
// otherwise, listen on the real address
2020-05-24 09:48:37 +00:00
func (tmg *TCPMuxGroup) HTTPConnectListen(ctx context.Context, group string, groupKey string, domain string) (ln *TCPMuxGroupListener, err error) {
2020-04-20 05:35:47 +00:00
tmg.mu.Lock()
defer tmg.mu.Unlock()
if len(tmg.lns) == 0 {
// the first listener, listen on the real address
2020-05-24 09:48:37 +00:00
routeConfig := &vhost.RouteConfig{
2020-04-20 05:35:47 +00:00
Domain: domain,
}
2020-05-24 09:48:37 +00:00
tcpMuxLn, errRet := tmg.ctl.tcpMuxHTTPConnectMuxer.Listen(ctx, routeConfig)
2020-04-20 05:35:47 +00:00
if errRet != nil {
return nil, errRet
}
2020-05-24 09:48:37 +00:00
ln = newTCPMuxGroupListener(group, tmg, tcpMuxLn.Addr())
2020-04-20 05:35:47 +00:00
tmg.group = group
tmg.groupKey = groupKey
tmg.domain = domain
tmg.tcpMuxLn = tcpMuxLn
tmg.lns = append(tmg.lns, ln)
if tmg.acceptCh == nil {
tmg.acceptCh = make(chan net.Conn)
}
go tmg.worker()
} else {
// domain in the same group must be equal
if tmg.group != group || tmg.domain != domain {
return nil, ErrGroupParamsInvalid
}
if tmg.groupKey != groupKey {
return nil, ErrGroupAuthFailed
}
2020-05-24 09:48:37 +00:00
ln = newTCPMuxGroupListener(group, tmg, tmg.lns[0].Addr())
2020-04-20 05:35:47 +00:00
tmg.lns = append(tmg.lns, ln)
}
return
}
2020-05-24 09:48:37 +00:00
// worker is called when the real TCP listener has been created
func (tmg *TCPMuxGroup) worker() {
2020-04-20 05:35:47 +00:00
for {
c, err := tmg.tcpMuxLn.Accept()
if err != nil {
return
}
err = gerr.PanicToError(func() {
tmg.acceptCh <- c
})
if err != nil {
return
}
}
}
2020-05-24 09:48:37 +00:00
func (tmg *TCPMuxGroup) Accept() <-chan net.Conn {
2020-04-20 05:35:47 +00:00
return tmg.acceptCh
}
2020-05-24 09:48:37 +00:00
// CloseListener remove the TCPMuxGroupListener from the TCPMuxGroup
func (tmg *TCPMuxGroup) CloseListener(ln *TCPMuxGroupListener) {
2020-04-20 05:35:47 +00:00
tmg.mu.Lock()
defer tmg.mu.Unlock()
for i, tmpLn := range tmg.lns {
if tmpLn == ln {
tmg.lns = append(tmg.lns[:i], tmg.lns[i+1:]...)
break
}
}
if len(tmg.lns) == 0 {
close(tmg.acceptCh)
tmg.tcpMuxLn.Close()
tmg.ctl.RemoveGroup(tmg.group)
}
}
2020-05-24 09:48:37 +00:00
// TCPMuxGroupListener
type TCPMuxGroupListener struct {
2020-04-20 05:35:47 +00:00
groupName string
2020-05-24 09:48:37 +00:00
group *TCPMuxGroup
2020-04-20 05:35:47 +00:00
addr net.Addr
closeCh chan struct{}
}
2020-05-24 09:48:37 +00:00
func newTCPMuxGroupListener(name string, group *TCPMuxGroup, addr net.Addr) *TCPMuxGroupListener {
return &TCPMuxGroupListener{
2020-04-20 05:35:47 +00:00
groupName: name,
group: group,
addr: addr,
closeCh: make(chan struct{}),
}
}
2020-05-24 09:48:37 +00:00
// Accept will accept connections from TCPMuxGroup
func (ln *TCPMuxGroupListener) Accept() (c net.Conn, err error) {
2020-04-20 05:35:47 +00:00
var ok bool
select {
case <-ln.closeCh:
return nil, ErrListenerClosed
case c, ok = <-ln.group.Accept():
if !ok {
return nil, ErrListenerClosed
}
return c, nil
}
}
2020-05-24 09:48:37 +00:00
func (ln *TCPMuxGroupListener) Addr() net.Addr {
2020-04-20 05:35:47 +00:00
return ln.addr
}
// Close close the listener
2020-05-24 09:48:37 +00:00
func (ln *TCPMuxGroupListener) Close() (err error) {
2020-04-20 05:35:47 +00:00
close(ln.closeCh)
// remove self from TcpMuxGroup
ln.group.CloseListener(ln)
return
}