From 57eba762d8ed2e61dbccf95ffc8cbd6d0881193f Mon Sep 17 00:00:00 2001 From: linsongzheng Date: Thu, 12 Sep 2024 19:52:57 +0800 Subject: [PATCH 1/2] fix: memory leak --- client/control.go | 2 +- pkg/msg/handler.go | 5 +++++ pkg/transport/message.go | 8 ++++---- server/control.go | 2 +- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/client/control.go b/client/control.go index 3e20c312..9607f95c 100644 --- a/client/control.go +++ b/client/control.go @@ -97,7 +97,7 @@ func NewControl(ctx context.Context, sessionCtx *SessionContext) (*Control, erro ctl.msgDispatcher = msg.NewDispatcher(sessionCtx.Conn) } ctl.registerMsgHandlers() - ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel()) + ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.Send) ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter) ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common, ctl.connectServer, ctl.msgTransporter) diff --git a/pkg/msg/handler.go b/pkg/msg/handler.go index cb1eb15a..dc211173 100644 --- a/pkg/msg/handler.go +++ b/pkg/msg/handler.go @@ -54,6 +54,10 @@ func (d *Dispatcher) sendLoop() { for { select { case <-d.doneCh: + // need to clear all message + // Otherwise, it will memory leak when sendCh is full. + for range d.sendCh { + } return case m := <-d.sendCh: _ = WriteMsg(d.rw, m) @@ -66,6 +70,7 @@ func (d *Dispatcher) readLoop() { m, err := ReadMsg(d.rw) if err != nil { close(d.doneCh) + close(d.sendCh) return } diff --git a/pkg/transport/message.go b/pkg/transport/message.go index dd43fbdc..40dd6c98 100644 --- a/pkg/transport/message.go +++ b/pkg/transport/message.go @@ -35,15 +35,15 @@ type MessageTransporter interface { DispatchWithType(m msg.Message, msgType, laneKey string) bool } -func NewMessageTransporter(sendCh chan msg.Message) MessageTransporter { +func NewMessageTransporter(sendFn func(msg.Message) error) MessageTransporter { return &transporterImpl{ - sendCh: sendCh, + sendFn: sendFn, registry: make(map[string]map[string]chan msg.Message), } } type transporterImpl struct { - sendCh chan msg.Message + sendFn func(msg.Message) error // First key is message type and second key is lane key. // Dispatch will dispatch message to related channel by its message type @@ -54,7 +54,7 @@ type transporterImpl struct { func (impl *transporterImpl) Send(m msg.Message) error { return errors.PanicToError(func() { - impl.sendCh <- m + impl.sendFn(m) }) } diff --git a/server/control.go b/server/control.go index 0b6b3174..3edcdaf0 100644 --- a/server/control.go +++ b/server/control.go @@ -195,7 +195,7 @@ func NewControl( ctl.msgDispatcher = msg.NewDispatcher(ctl.conn) } ctl.registerMsgHandlers() - ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel()) + ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.Send) return ctl, nil } From 5e08b2e9f6bb3f8391faa8fdf7acdc167c3a6f01 Mon Sep 17 00:00:00 2001 From: linsongzheng Date: Thu, 12 Sep 2024 21:08:31 +0800 Subject: [PATCH 2/2] remove unnecessary code --- pkg/msg/handler.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/msg/handler.go b/pkg/msg/handler.go index dc211173..8673b0cd 100644 --- a/pkg/msg/handler.go +++ b/pkg/msg/handler.go @@ -54,10 +54,6 @@ func (d *Dispatcher) sendLoop() { for { select { case <-d.doneCh: - // need to clear all message - // Otherwise, it will memory leak when sendCh is full. - for range d.sendCh { - } return case m := <-d.sendCh: _ = WriteMsg(d.rw, m)