mirror of
https://github.com/fatedier/frp.git
synced 2025-06-16 08:08:20 +00:00
Merge 5e08b2e9f6bb3f8391faa8fdf7acdc167c3a6f01 into 2855ac71e3fc3fb2859f4c75f97f97e99f131f1b
This commit is contained in:
commit
3c63641574
@ -97,7 +97,7 @@ func NewControl(ctx context.Context, sessionCtx *SessionContext) (*Control, erro
|
|||||||
ctl.msgDispatcher = msg.NewDispatcher(sessionCtx.Conn)
|
ctl.msgDispatcher = msg.NewDispatcher(sessionCtx.Conn)
|
||||||
}
|
}
|
||||||
ctl.registerMsgHandlers()
|
ctl.registerMsgHandlers()
|
||||||
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
|
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.Send)
|
||||||
|
|
||||||
ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter)
|
ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter)
|
||||||
ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common, ctl.connectServer, ctl.msgTransporter)
|
ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common, ctl.connectServer, ctl.msgTransporter)
|
||||||
|
@ -66,6 +66,7 @@ func (d *Dispatcher) readLoop() {
|
|||||||
m, err := ReadMsg(d.rw)
|
m, err := ReadMsg(d.rw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
close(d.doneCh)
|
close(d.doneCh)
|
||||||
|
close(d.sendCh)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,15 +35,15 @@ type MessageTransporter interface {
|
|||||||
DispatchWithType(m msg.Message, msgType, laneKey string) bool
|
DispatchWithType(m msg.Message, msgType, laneKey string) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMessageTransporter(sendCh chan msg.Message) MessageTransporter {
|
func NewMessageTransporter(sendFn func(msg.Message) error) MessageTransporter {
|
||||||
return &transporterImpl{
|
return &transporterImpl{
|
||||||
sendCh: sendCh,
|
sendFn: sendFn,
|
||||||
registry: make(map[string]map[string]chan msg.Message),
|
registry: make(map[string]map[string]chan msg.Message),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type transporterImpl struct {
|
type transporterImpl struct {
|
||||||
sendCh chan msg.Message
|
sendFn func(msg.Message) error
|
||||||
|
|
||||||
// First key is message type and second key is lane key.
|
// First key is message type and second key is lane key.
|
||||||
// Dispatch will dispatch message to related channel by its message type
|
// Dispatch will dispatch message to related channel by its message type
|
||||||
@ -54,7 +54,7 @@ type transporterImpl struct {
|
|||||||
|
|
||||||
func (impl *transporterImpl) Send(m msg.Message) error {
|
func (impl *transporterImpl) Send(m msg.Message) error {
|
||||||
return errors.PanicToError(func() {
|
return errors.PanicToError(func() {
|
||||||
impl.sendCh <- m
|
impl.sendFn(m)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,7 +195,7 @@ func NewControl(
|
|||||||
ctl.msgDispatcher = msg.NewDispatcher(ctl.conn)
|
ctl.msgDispatcher = msg.NewDispatcher(ctl.conn)
|
||||||
}
|
}
|
||||||
ctl.registerMsgHandlers()
|
ctl.registerMsgHandlers()
|
||||||
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
|
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.Send)
|
||||||
return ctl, nil
|
return ctl, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user