diff --git a/Release.md b/Release.md index c346193f..3775da51 100644 --- a/Release.md +++ b/Release.md @@ -6,3 +6,7 @@ ## Improvements * **VirtualNet**: Implemented intelligent reconnection with exponential backoff. When connection errors occur repeatedly, the reconnect interval increases from 60s to 300s (max), reducing unnecessary reconnection attempts. Normal disconnections still reconnect quickly at 10s intervals. + +## Fixes + +* Fix deadlock issue when TCP connection is closed. Previously, sending messages could block forever if the connection handler had already stopped. diff --git a/client/control.go b/client/control.go index 4bd6a2f7..c18ae07c 100644 --- a/client/control.go +++ b/client/control.go @@ -100,7 +100,7 @@ func NewControl(ctx context.Context, sessionCtx *SessionContext) (*Control, erro ctl.msgDispatcher = msg.NewDispatcher(sessionCtx.Conn) } ctl.registerMsgHandlers() - ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel()) + ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher) ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter, sessionCtx.VnetController) ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common, diff --git a/pkg/msg/handler.go b/pkg/msg/handler.go index cb1eb15a..243e599a 100644 --- a/pkg/msg/handler.go +++ b/pkg/msg/handler.go @@ -86,10 +86,6 @@ func (d *Dispatcher) Send(m Message) error { } } -func (d *Dispatcher) SendChannel() chan Message { - return d.sendCh -} - func (d *Dispatcher) RegisterHandler(msg Message, handler func(Message)) { d.msgHandlers[reflect.TypeOf(msg)] = handler } diff --git a/pkg/transport/message.go b/pkg/transport/message.go index dd43fbdc..40165f5d 100644 --- a/pkg/transport/message.go +++ b/pkg/transport/message.go @@ -35,15 +35,19 @@ type MessageTransporter interface { DispatchWithType(m msg.Message, msgType, laneKey string) bool } -func NewMessageTransporter(sendCh chan msg.Message) MessageTransporter { +type MessageSender interface { + Send(msg.Message) error +} + +func NewMessageTransporter(sender MessageSender) MessageTransporter { return &transporterImpl{ - sendCh: sendCh, + sender: sender, registry: make(map[string]map[string]chan msg.Message), } } type transporterImpl struct { - sendCh chan msg.Message + sender MessageSender // First key is message type and second key is lane key. // Dispatch will dispatch message to related channel by its message type @@ -53,9 +57,7 @@ type transporterImpl struct { } func (impl *transporterImpl) Send(m msg.Message) error { - return errors.PanicToError(func() { - impl.sendCh <- m - }) + return impl.sender.Send(m) } func (impl *transporterImpl) Do(ctx context.Context, req msg.Message, laneKey, recvMsgType string) (msg.Message, error) { diff --git a/server/control.go b/server/control.go index b70d8d12..65d52062 100644 --- a/server/control.go +++ b/server/control.go @@ -195,7 +195,7 @@ func NewControl( ctl.msgDispatcher = msg.NewDispatcher(ctl.conn) } ctl.registerMsgHandlers() - ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel()) + ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher) return ctl, nil }