Merge 5e08b2e9f6bb3f8391faa8fdf7acdc167c3a6f01 into fe4ca1b54e9045ea91ffa4645ac8f39bb2e770ef

This commit is contained in:
IvanLam 2024-09-12 13:08:38 +00:00 committed by GitHub
commit b5a2fa6258
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 7 additions and 6 deletions

View File

@ -97,7 +97,7 @@ func NewControl(ctx context.Context, sessionCtx *SessionContext) (*Control, erro
ctl.msgDispatcher = msg.NewDispatcher(sessionCtx.Conn)
}
ctl.registerMsgHandlers()
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.Send)
ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter)
ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common, ctl.connectServer, ctl.msgTransporter)

View File

@ -66,6 +66,7 @@ func (d *Dispatcher) readLoop() {
m, err := ReadMsg(d.rw)
if err != nil {
close(d.doneCh)
close(d.sendCh)
return
}

View File

@ -35,15 +35,15 @@ type MessageTransporter interface {
DispatchWithType(m msg.Message, msgType, laneKey string) bool
}
func NewMessageTransporter(sendCh chan msg.Message) MessageTransporter {
func NewMessageTransporter(sendFn func(msg.Message) error) MessageTransporter {
return &transporterImpl{
sendCh: sendCh,
sendFn: sendFn,
registry: make(map[string]map[string]chan msg.Message),
}
}
type transporterImpl struct {
sendCh chan msg.Message
sendFn func(msg.Message) error
// First key is message type and second key is lane key.
// Dispatch will dispatch message to related channel by its message type
@ -54,7 +54,7 @@ type transporterImpl struct {
func (impl *transporterImpl) Send(m msg.Message) error {
return errors.PanicToError(func() {
impl.sendCh <- m
impl.sendFn(m)
})
}

View File

@ -195,7 +195,7 @@ func NewControl(
ctl.msgDispatcher = msg.NewDispatcher(ctl.conn)
}
ctl.registerMsgHandlers()
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.Send)
return ctl, nil
}