From 885a6c661ead98fdc030070c0751c0c7e1546e99 Mon Sep 17 00:00:00 2001 From: patterniha <71074308+patterniha@users.noreply.github.com> Date: Tue, 15 Jul 2025 01:52:02 +0330 Subject: [PATCH] UDP-dispatcher: fix activity timer --- app/proxyman/inbound/worker.go | 7 +++++- proxy/shadowsocks/server.go | 1 + proxy/socks/server.go | 4 ++- proxy/trojan/server.go | 1 + transport/internet/udp/dispatcher.go | 37 +++++++++++++++++++--------- 5 files changed, 36 insertions(+), 14 deletions(-) diff --git a/app/proxyman/inbound/worker.go b/app/proxyman/inbound/worker.go index 12e29876..46902e81 100644 --- a/app/proxyman/inbound/worker.go +++ b/app/proxyman/inbound/worker.go @@ -161,6 +161,7 @@ type udpConn struct { uplink stats.Counter downlink stats.Counter inactive bool + cancel context.CancelFunc } func (c *udpConn) setInactive() { @@ -203,6 +204,9 @@ func (c *udpConn) Write(buf []byte) (int, error) { } func (c *udpConn) Close() error { + if c.cancel != nil { + c.cancel() + } common.Must(c.done.Close()) common.Must(common.Close(c.writer)) return nil @@ -306,7 +310,8 @@ func (w *udpWorker) callback(b *buf.Buffer, source net.Destination, originalDest common.Must(w.checker.Start()) go func() { - ctx := w.ctx + ctx, cancel := context.WithCancel(w.ctx) + conn.cancel = cancel sid := session.NewID() ctx = c.ContextWithID(ctx, sid) diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 356868e4..ec022084 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -128,6 +128,7 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn stat.Connection, dis conn.Write(data.Bytes()) }) + defer udpServer.RemoveRay() inbound := session.InboundFromContext(ctx) var dest *net.Destination diff --git a/proxy/socks/server.go b/proxy/socks/server.go index dd6f3953..8a159fad 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -245,13 +245,15 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn stat.Connection, dis udpMessage, err := EncodeUDPPacket(request, payload.Bytes()) payload.Release() - defer udpMessage.Release() if err != nil { errors.LogWarningInner(ctx, err, "failed to write UDP response") + return } + defer udpMessage.Release() conn.Write(udpMessage.Bytes()) }) + defer udpServer.RemoveRay() inbound := session.InboundFromContext(ctx) if inbound != nil && inbound.Source.IsValid() { diff --git a/proxy/trojan/server.go b/proxy/trojan/server.go index 44662ac3..0ce14408 100644 --- a/proxy/trojan/server.go +++ b/proxy/trojan/server.go @@ -259,6 +259,7 @@ func (s *Server) handleUDPPayload(ctx context.Context, clientReader *PacketReade errors.LogWarningInner(ctx, err, "failed to write response") } }) + defer udpServer.RemoveRay() inbound := session.InboundFromContext(ctx) user := inbound.User diff --git a/transport/internet/udp/dispatcher.go b/transport/internet/udp/dispatcher.go index e6267d2f..f8884003 100644 --- a/transport/internet/udp/dispatcher.go +++ b/transport/internet/udp/dispatcher.go @@ -24,6 +24,15 @@ type connEntry struct { link *transport.Link timer signal.ActivityUpdater cancel context.CancelFunc + closed bool +} + +func (c *connEntry) Close() error { + c.closed = true + c.cancel() + common.Interrupt(c.link.Reader) + common.Close(c.link.Writer) + return nil } type Dispatcher struct { @@ -45,8 +54,7 @@ func (v *Dispatcher) RemoveRay() { v.Lock() defer v.Unlock() if v.conn != nil { - common.Interrupt(v.conn.link.Reader) - common.Close(v.conn.link.Writer) + v.conn.Close() v.conn = nil } } @@ -56,28 +64,32 @@ func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) (* defer v.Unlock() if v.conn != nil { - return v.conn, nil + if v.conn.closed { + v.conn = nil + } else { + return v.conn, nil + } } errors.LogInfo(ctx, "establishing new connection for ", dest) ctx, cancel := context.WithCancel(ctx) - removeRay := func() { - cancel() - v.RemoveRay() - } - timer := signal.CancelAfterInactivity(ctx, removeRay, time.Minute) link, err := v.dispatcher.Dispatch(ctx, dest) if err != nil { + cancel() return nil, errors.New("failed to dispatch request to ", dest).Base(err) } entry := &connEntry{ link: link, - timer: timer, - cancel: removeRay, + cancel: cancel, } + entryClose := func() { + entry.Close() + } + + entry.timer = signal.CancelAfterInactivity(ctx, entryClose, time.Minute) v.conn = entry go handleInput(ctx, entry, dest, v.callback, v.callClose) return entry, nil @@ -96,7 +108,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, if outputStream != nil { if err := outputStream.WriteMultiBuffer(buf.MultiBuffer{payload}); err != nil { errors.LogInfoInner(ctx, err, "failed to write first UDP payload") - conn.cancel() + conn.Close() return } } @@ -104,7 +116,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, func handleInput(ctx context.Context, conn *connEntry, dest net.Destination, callback ResponseCallback, callClose func() error) { defer func() { - conn.cancel() + conn.Close() if callClose != nil { callClose() } @@ -136,6 +148,7 @@ func handleInput(ctx context.Context, conn *connEntry, dest net.Destination, cal Payload: b, Source: dest, }) + b.Release() } } }