From cf555240a291c331980b09d68c35c558369092bd Mon Sep 17 00:00:00 2001 From: patterniha <71074308+patterniha@users.noreply.github.com> Date: Tue, 15 Jul 2025 06:28:38 +0330 Subject: [PATCH] Update dispatcher.go --- transport/internet/udp/dispatcher.go | 36 ++++++++++------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/transport/internet/udp/dispatcher.go b/transport/internet/udp/dispatcher.go index 998b77f4..b695d6a5 100644 --- a/transport/internet/udp/dispatcher.go +++ b/transport/internet/udp/dispatcher.go @@ -27,14 +27,6 @@ type connEntry struct { closed bool } -func (c *connEntry) Close() error { - c.closed = true - c.cancel() - common.Interrupt(c.link.Reader) - common.Close(c.link.Writer) - return nil -} - type Dispatcher struct { sync.RWMutex conn *connEntry @@ -54,7 +46,8 @@ func (v *Dispatcher) RemoveRay() { v.Lock() defer v.Unlock() if v.conn != nil { - v.conn.Close() + common.Interrupt(v.conn.link.Reader) + common.Close(v.conn.link.Writer) v.conn = nil } } @@ -64,33 +57,28 @@ func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) (* defer v.Unlock() if v.conn != nil { - if v.conn.closed { - v.conn = nil - } else { - return v.conn, nil - } + return v.conn, nil } errors.LogInfo(ctx, "establishing new connection for ", dest) ctx, cancel := context.WithCancel(ctx) + removeRay := func() { + cancel() + v.RemoveRay() + } + timer := signal.CancelAfterInactivity(ctx, removeRay, time.Minute) link, err := v.dispatcher.Dispatch(ctx, dest) if err != nil { - cancel() return nil, errors.New("failed to dispatch request to ", dest).Base(err) } entry := &connEntry{ link: link, - cancel: cancel, + timer: timer, + cancel: removeRay, } - entry.cancel() - //entryClose := func() { - // entry.Close() - //} - - entry.timer = signal.CancelAfterInactivity(ctx, cancel, time.Minute) v.conn = entry go handleInput(ctx, entry, dest, v.callback, v.callClose) return entry, nil @@ -109,7 +97,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, if outputStream != nil { if err := outputStream.WriteMultiBuffer(buf.MultiBuffer{payload}); err != nil { errors.LogInfoInner(ctx, err, "failed to write first UDP payload") - // conn.Close() + conn.cancel() return } } @@ -117,7 +105,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, func handleInput(ctx context.Context, conn *connEntry, dest net.Destination, callback ResponseCallback, callClose func() error) { defer func() { - // conn.Close() + conn.cancel() if callClose != nil { callClose() }