fix: memory leak

This commit is contained in:
linsongzheng 2024-09-12 19:52:57 +08:00
parent fe4ca1b54e
commit 57eba762d8
4 changed files with 11 additions and 6 deletions

View File

@ -97,7 +97,7 @@ func NewControl(ctx context.Context, sessionCtx *SessionContext) (*Control, erro
ctl.msgDispatcher = msg.NewDispatcher(sessionCtx.Conn) ctl.msgDispatcher = msg.NewDispatcher(sessionCtx.Conn)
} }
ctl.registerMsgHandlers() ctl.registerMsgHandlers()
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel()) ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.Send)
ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter) ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter)
ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common, ctl.connectServer, ctl.msgTransporter) ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common, ctl.connectServer, ctl.msgTransporter)

View File

@ -54,6 +54,10 @@ func (d *Dispatcher) sendLoop() {
for { for {
select { select {
case <-d.doneCh: case <-d.doneCh:
// need to clear all message
// Otherwise, it will memory leak when sendCh is full.
for range d.sendCh {
}
return return
case m := <-d.sendCh: case m := <-d.sendCh:
_ = WriteMsg(d.rw, m) _ = WriteMsg(d.rw, m)
@ -66,6 +70,7 @@ func (d *Dispatcher) readLoop() {
m, err := ReadMsg(d.rw) m, err := ReadMsg(d.rw)
if err != nil { if err != nil {
close(d.doneCh) close(d.doneCh)
close(d.sendCh)
return return
} }

View File

@ -35,15 +35,15 @@ type MessageTransporter interface {
DispatchWithType(m msg.Message, msgType, laneKey string) bool DispatchWithType(m msg.Message, msgType, laneKey string) bool
} }
func NewMessageTransporter(sendCh chan msg.Message) MessageTransporter { func NewMessageTransporter(sendFn func(msg.Message) error) MessageTransporter {
return &transporterImpl{ return &transporterImpl{
sendCh: sendCh, sendFn: sendFn,
registry: make(map[string]map[string]chan msg.Message), registry: make(map[string]map[string]chan msg.Message),
} }
} }
type transporterImpl struct { type transporterImpl struct {
sendCh chan msg.Message sendFn func(msg.Message) error
// First key is message type and second key is lane key. // First key is message type and second key is lane key.
// Dispatch will dispatch message to related channel by its message type // Dispatch will dispatch message to related channel by its message type
@ -54,7 +54,7 @@ type transporterImpl struct {
func (impl *transporterImpl) Send(m msg.Message) error { func (impl *transporterImpl) Send(m msg.Message) error {
return errors.PanicToError(func() { return errors.PanicToError(func() {
impl.sendCh <- m impl.sendFn(m)
}) })
} }

View File

@ -195,7 +195,7 @@ func NewControl(
ctl.msgDispatcher = msg.NewDispatcher(ctl.conn) ctl.msgDispatcher = msg.NewDispatcher(ctl.conn)
} }
ctl.registerMsgHandlers() ctl.registerMsgHandlers()
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel()) ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.Send)
return ctl, nil return ctl, nil
} }