From b9a1eeb54c164ce926b2136c8fb9d10f87166b97 Mon Sep 17 00:00:00 2001 From: patterniha <71074308+patterniha@users.noreply.github.com> Date: Sun, 6 Jul 2025 18:45:28 +0330 Subject: [PATCH 01/11] MUX: ClientWorker-Dispatch needs Thread-Lock --- common/mux/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/mux/client.go b/common/mux/client.go index df74ef17..b6148fbc 100644 --- a/common/mux/client.go +++ b/common/mux/client.go @@ -174,6 +174,7 @@ type ClientWorker struct { link transport.Link done *done.Instance strategy ClientStrategy + access sync.Mutex } var ( @@ -289,7 +290,9 @@ func (m *ClientWorker) IsFull() bool { } func (m *ClientWorker) Dispatch(ctx context.Context, link *transport.Link) bool { - if m.IsFull() || m.Closed() { + m.access.Lock() + defer m.access.Unlock() + if m.IsFull() { return false } From ec4f48d42903baac4b3e52ec2a2b58f8a4943d0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Tue, 8 Jul 2025 22:00:39 +0800 Subject: [PATCH 02/11] Update session.go --- common/mux/session.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/mux/session.go b/common/mux/session.go index 5e4b69ca..1530a661 100644 --- a/common/mux/session.go +++ b/common/mux/session.go @@ -50,11 +50,11 @@ func (m *SessionManager) Count() int { return int(m.count) } -func (m *SessionManager) Allocate() *Session { +func (m *SessionManager) Allocate(MaxConcurrency int) *Session { m.Lock() defer m.Unlock() - if m.closed { + if m.closed || (MaxConcurrency > 0 && len(m.sessions) >= MaxConcurrency) { return nil } From fd71c36e2c49f1638f7cbcfb970f30a852535891 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Tue, 8 Jul 2025 22:01:09 +0800 Subject: [PATCH 03/11] Update client.go --- common/mux/client.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/common/mux/client.go b/common/mux/client.go index b6148fbc..f73fc509 100644 --- a/common/mux/client.go +++ b/common/mux/client.go @@ -174,7 +174,6 @@ type ClientWorker struct { link transport.Link done *done.Instance strategy ClientStrategy - access sync.Mutex } var ( @@ -277,6 +276,8 @@ func (m *ClientWorker) IsClosing() bool { return false } +// IsFull returns true if this ClientWorker is unable to accept more connections. +// it might be because it is closing, or the number of connections has reached the limit. func (m *ClientWorker) IsFull() bool { if m.IsClosing() || m.Closed() { return true @@ -290,14 +291,12 @@ func (m *ClientWorker) IsFull() bool { } func (m *ClientWorker) Dispatch(ctx context.Context, link *transport.Link) bool { - m.access.Lock() - defer m.access.Unlock() if m.IsFull() { return false } sm := m.sessionManager - s := sm.Allocate() + s := sm.Allocate(int(m.strategy.MaxConcurrency)) if s == nil { return false } From f0c643caf4dacd76aa1ef9d0353bee5b9b7d68de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Tue, 8 Jul 2025 22:02:06 +0800 Subject: [PATCH 04/11] Update client_test.go --- common/mux/client_test.go | 135 ++++++++++---------------------------- 1 file changed, 35 insertions(+), 100 deletions(-) diff --git a/common/mux/client_test.go b/common/mux/client_test.go index 9626e2a2..9b3e8719 100644 --- a/common/mux/client_test.go +++ b/common/mux/client_test.go @@ -1,116 +1,51 @@ package mux_test import ( - "context" "testing" - "time" - "github.com/golang/mock/gomock" - "github.com/xtls/xray-core/common" - "github.com/xtls/xray-core/common/errors" - "github.com/xtls/xray-core/common/mux" - "github.com/xtls/xray-core/common/net" - "github.com/xtls/xray-core/common/session" - "github.com/xtls/xray-core/testing/mocks" - "github.com/xtls/xray-core/transport" - "github.com/xtls/xray-core/transport/pipe" + . "github.com/xtls/xray-core/common/mux" ) -func TestIncrementalPickerFailure(t *testing.T) { - mockCtl := gomock.NewController(t) - defer mockCtl.Finish() +func TestSessionManagerAdd(t *testing.T) { + m := NewSessionManager() - mockWorkerFactory := mocks.NewMuxClientWorkerFactory(mockCtl) - mockWorkerFactory.EXPECT().Create().Return(nil, errors.New("test")) - - picker := mux.IncrementalWorkerPicker{ - Factory: mockWorkerFactory, + s := m.Allocate(0) + if s.ID != 1 { + t.Error("id: ", s.ID) + } + if m.Size() != 1 { + t.Error("size: ", m.Size()) } - _, err := picker.PickAvailable() - if err == nil { - t.Error("expected error, but nil") + s = m.Allocate(0) + if s.ID != 2 { + t.Error("id: ", s.ID) + } + if m.Size() != 2 { + t.Error("size: ", m.Size()) + } + + s = &Session{ + ID: 4, + } + m.Add(s) + if s.ID != 4 { + t.Error("id: ", s.ID) + } + if m.Size() != 3 { + t.Error("size: ", m.Size()) } } -func TestClientWorkerEOF(t *testing.T) { - reader, writer := pipe.New(pipe.WithoutSizeLimit()) - common.Must(writer.Close()) +func TestSessionManagerClose(t *testing.T) { + m := NewSessionManager() + s := m.Allocate(0) - worker, err := mux.NewClientWorker(transport.Link{Reader: reader, Writer: writer}, mux.ClientStrategy{}) - common.Must(err) - - time.Sleep(time.Millisecond * 500) - - f := worker.Dispatch(context.Background(), nil) - if f { - t.Error("expected failed dispatching, but actually not") + if m.CloseIfNoSession() { + t.Error("able to close") + } + m.Remove(false, s.ID) + if !m.CloseIfNoSession() { + t.Error("not able to close") } } - -func TestClientWorkerClose(t *testing.T) { - mockCtl := gomock.NewController(t) - defer mockCtl.Finish() - - r1, w1 := pipe.New(pipe.WithoutSizeLimit()) - worker1, err := mux.NewClientWorker(transport.Link{ - Reader: r1, - Writer: w1, - }, mux.ClientStrategy{ - MaxConcurrency: 4, - MaxConnection: 4, - }) - common.Must(err) - - r2, w2 := pipe.New(pipe.WithoutSizeLimit()) - worker2, err := mux.NewClientWorker(transport.Link{ - Reader: r2, - Writer: w2, - }, mux.ClientStrategy{ - MaxConcurrency: 4, - MaxConnection: 4, - }) - common.Must(err) - - factory := mocks.NewMuxClientWorkerFactory(mockCtl) - gomock.InOrder( - factory.EXPECT().Create().Return(worker1, nil), - factory.EXPECT().Create().Return(worker2, nil), - ) - - picker := &mux.IncrementalWorkerPicker{ - Factory: factory, - } - manager := &mux.ClientManager{ - Picker: picker, - } - - tr1, tw1 := pipe.New(pipe.WithoutSizeLimit()) - ctx1 := session.ContextWithOutbounds(context.Background(), []*session.Outbound{{ - Target: net.TCPDestination(net.DomainAddress("www.example.com"), 80), - }}) - common.Must(manager.Dispatch(ctx1, &transport.Link{ - Reader: tr1, - Writer: tw1, - })) - defer tw1.Close() - - common.Must(w1.Close()) - - time.Sleep(time.Millisecond * 500) - if !worker1.Closed() { - t.Error("worker1 is not finished") - } - - tr2, tw2 := pipe.New(pipe.WithoutSizeLimit()) - ctx2 := session.ContextWithOutbounds(context.Background(), []*session.Outbound{{ - Target: net.TCPDestination(net.DomainAddress("www.example.com"), 80), - }}) - common.Must(manager.Dispatch(ctx2, &transport.Link{ - Reader: tr2, - Writer: tw2, - })) - defer tw2.Close() - - common.Must(w2.Close()) -} From a77a6f85a61c9e0e19877b1cbdaebfb50e00b9ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Tue, 8 Jul 2025 22:02:53 +0800 Subject: [PATCH 05/11] Update client_test.go --- common/mux/client_test.go | 135 ++++++++++++++++++++++++++++---------- 1 file changed, 100 insertions(+), 35 deletions(-) diff --git a/common/mux/client_test.go b/common/mux/client_test.go index 9b3e8719..9626e2a2 100644 --- a/common/mux/client_test.go +++ b/common/mux/client_test.go @@ -1,51 +1,116 @@ package mux_test import ( + "context" "testing" + "time" - . "github.com/xtls/xray-core/common/mux" + "github.com/golang/mock/gomock" + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/errors" + "github.com/xtls/xray-core/common/mux" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/testing/mocks" + "github.com/xtls/xray-core/transport" + "github.com/xtls/xray-core/transport/pipe" ) -func TestSessionManagerAdd(t *testing.T) { - m := NewSessionManager() +func TestIncrementalPickerFailure(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() - s := m.Allocate(0) - if s.ID != 1 { - t.Error("id: ", s.ID) - } - if m.Size() != 1 { - t.Error("size: ", m.Size()) + mockWorkerFactory := mocks.NewMuxClientWorkerFactory(mockCtl) + mockWorkerFactory.EXPECT().Create().Return(nil, errors.New("test")) + + picker := mux.IncrementalWorkerPicker{ + Factory: mockWorkerFactory, } - s = m.Allocate(0) - if s.ID != 2 { - t.Error("id: ", s.ID) - } - if m.Size() != 2 { - t.Error("size: ", m.Size()) - } - - s = &Session{ - ID: 4, - } - m.Add(s) - if s.ID != 4 { - t.Error("id: ", s.ID) - } - if m.Size() != 3 { - t.Error("size: ", m.Size()) + _, err := picker.PickAvailable() + if err == nil { + t.Error("expected error, but nil") } } -func TestSessionManagerClose(t *testing.T) { - m := NewSessionManager() - s := m.Allocate(0) +func TestClientWorkerEOF(t *testing.T) { + reader, writer := pipe.New(pipe.WithoutSizeLimit()) + common.Must(writer.Close()) - if m.CloseIfNoSession() { - t.Error("able to close") - } - m.Remove(false, s.ID) - if !m.CloseIfNoSession() { - t.Error("not able to close") + worker, err := mux.NewClientWorker(transport.Link{Reader: reader, Writer: writer}, mux.ClientStrategy{}) + common.Must(err) + + time.Sleep(time.Millisecond * 500) + + f := worker.Dispatch(context.Background(), nil) + if f { + t.Error("expected failed dispatching, but actually not") } } + +func TestClientWorkerClose(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + r1, w1 := pipe.New(pipe.WithoutSizeLimit()) + worker1, err := mux.NewClientWorker(transport.Link{ + Reader: r1, + Writer: w1, + }, mux.ClientStrategy{ + MaxConcurrency: 4, + MaxConnection: 4, + }) + common.Must(err) + + r2, w2 := pipe.New(pipe.WithoutSizeLimit()) + worker2, err := mux.NewClientWorker(transport.Link{ + Reader: r2, + Writer: w2, + }, mux.ClientStrategy{ + MaxConcurrency: 4, + MaxConnection: 4, + }) + common.Must(err) + + factory := mocks.NewMuxClientWorkerFactory(mockCtl) + gomock.InOrder( + factory.EXPECT().Create().Return(worker1, nil), + factory.EXPECT().Create().Return(worker2, nil), + ) + + picker := &mux.IncrementalWorkerPicker{ + Factory: factory, + } + manager := &mux.ClientManager{ + Picker: picker, + } + + tr1, tw1 := pipe.New(pipe.WithoutSizeLimit()) + ctx1 := session.ContextWithOutbounds(context.Background(), []*session.Outbound{{ + Target: net.TCPDestination(net.DomainAddress("www.example.com"), 80), + }}) + common.Must(manager.Dispatch(ctx1, &transport.Link{ + Reader: tr1, + Writer: tw1, + })) + defer tw1.Close() + + common.Must(w1.Close()) + + time.Sleep(time.Millisecond * 500) + if !worker1.Closed() { + t.Error("worker1 is not finished") + } + + tr2, tw2 := pipe.New(pipe.WithoutSizeLimit()) + ctx2 := session.ContextWithOutbounds(context.Background(), []*session.Outbound{{ + Target: net.TCPDestination(net.DomainAddress("www.example.com"), 80), + }}) + common.Must(manager.Dispatch(ctx2, &transport.Link{ + Reader: tr2, + Writer: tw2, + })) + defer tw2.Close() + + common.Must(w2.Close()) +} From e5927016436244355551cf9abeff9a805105fd8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Tue, 8 Jul 2025 22:03:10 +0800 Subject: [PATCH 06/11] Update session_test.go --- common/mux/session_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/mux/session_test.go b/common/mux/session_test.go index d81ad8c4..9b3e8719 100644 --- a/common/mux/session_test.go +++ b/common/mux/session_test.go @@ -9,7 +9,7 @@ import ( func TestSessionManagerAdd(t *testing.T) { m := NewSessionManager() - s := m.Allocate() + s := m.Allocate(0) if s.ID != 1 { t.Error("id: ", s.ID) } @@ -17,7 +17,7 @@ func TestSessionManagerAdd(t *testing.T) { t.Error("size: ", m.Size()) } - s = m.Allocate() + s = m.Allocate(0) if s.ID != 2 { t.Error("id: ", s.ID) } @@ -39,7 +39,7 @@ func TestSessionManagerAdd(t *testing.T) { func TestSessionManagerClose(t *testing.T) { m := NewSessionManager() - s := m.Allocate() + s := m.Allocate(0) if m.CloseIfNoSession() { t.Error("able to close") From 0c956420999c9c95db68a0f6c08aa0f35b7e7e3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Tue, 8 Jul 2025 22:17:00 +0800 Subject: [PATCH 07/11] Update client.go --- common/mux/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/mux/client.go b/common/mux/client.go index f73fc509..ebf73e09 100644 --- a/common/mux/client.go +++ b/common/mux/client.go @@ -296,7 +296,7 @@ func (m *ClientWorker) Dispatch(ctx context.Context, link *transport.Link) bool } sm := m.sessionManager - s := sm.Allocate(int(m.strategy.MaxConcurrency)) + s := sm.Allocate(m.strategy) if s == nil { return false } From f14c548451b5ee99cacc9062a6e1c6104466278a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Tue, 8 Jul 2025 22:18:24 +0800 Subject: [PATCH 08/11] Update session.go --- common/mux/session.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/common/mux/session.go b/common/mux/session.go index 1530a661..f5ca30a3 100644 --- a/common/mux/session.go +++ b/common/mux/session.go @@ -50,11 +50,13 @@ func (m *SessionManager) Count() int { return int(m.count) } -func (m *SessionManager) Allocate(MaxConcurrency int) *Session { +func (m *SessionManager) Allocate(Strategy ClientStrategy) *Session { + MaxConcurrency := int(Strategy.MaxConcurrency) + MaxConnection := uint16(Strategy.MaxConnection) m.Lock() defer m.Unlock() - if m.closed || (MaxConcurrency > 0 && len(m.sessions) >= MaxConcurrency) { + if m.closed || (MaxConcurrency > 0 && len(m.sessions) >= MaxConcurrency) || (MaxConnection > 0 && m.count >= MaxConnection) { return nil } From 67f5e24453c921fc80b339ba587f608bfaec2f6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Tue, 8 Jul 2025 22:18:41 +0800 Subject: [PATCH 09/11] Update session_test.go --- common/mux/session_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/mux/session_test.go b/common/mux/session_test.go index 9b3e8719..4c2ec189 100644 --- a/common/mux/session_test.go +++ b/common/mux/session_test.go @@ -9,7 +9,7 @@ import ( func TestSessionManagerAdd(t *testing.T) { m := NewSessionManager() - s := m.Allocate(0) + s := m.Allocate(ClientStrategy{}) if s.ID != 1 { t.Error("id: ", s.ID) } @@ -17,7 +17,7 @@ func TestSessionManagerAdd(t *testing.T) { t.Error("size: ", m.Size()) } - s = m.Allocate(0) + s = m.Allocate(ClientStrategy{}) if s.ID != 2 { t.Error("id: ", s.ID) } @@ -39,7 +39,7 @@ func TestSessionManagerAdd(t *testing.T) { func TestSessionManagerClose(t *testing.T) { m := NewSessionManager() - s := m.Allocate(0) + s := m.Allocate(ClientStrategy{}) if m.CloseIfNoSession() { t.Error("able to close") From d9fd84861f90f5756795789313e8818b483c88c3 Mon Sep 17 00:00:00 2001 From: patterniha <71074308+patterniha@users.noreply.github.com> Date: Tue, 8 Jul 2025 18:05:18 +0330 Subject: [PATCH 10/11] fix --- common/mux/client.go | 2 +- common/mux/session.go | 7 ++++--- common/mux/session_test.go | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/common/mux/client.go b/common/mux/client.go index ebf73e09..6c9e90bc 100644 --- a/common/mux/client.go +++ b/common/mux/client.go @@ -296,7 +296,7 @@ func (m *ClientWorker) Dispatch(ctx context.Context, link *transport.Link) bool } sm := m.sessionManager - s := sm.Allocate(m.strategy) + s := sm.Allocate(&m.strategy) if s == nil { return false } diff --git a/common/mux/session.go b/common/mux/session.go index f5ca30a3..8bcb01bb 100644 --- a/common/mux/session.go +++ b/common/mux/session.go @@ -50,11 +50,12 @@ func (m *SessionManager) Count() int { return int(m.count) } -func (m *SessionManager) Allocate(Strategy ClientStrategy) *Session { - MaxConcurrency := int(Strategy.MaxConcurrency) - MaxConnection := uint16(Strategy.MaxConnection) +func (m *SessionManager) Allocate(Strategy *ClientStrategy) *Session { m.Lock() defer m.Unlock() + + MaxConcurrency := int(Strategy.MaxConcurrency) + MaxConnection := uint16(Strategy.MaxConnection) if m.closed || (MaxConcurrency > 0 && len(m.sessions) >= MaxConcurrency) || (MaxConnection > 0 && m.count >= MaxConnection) { return nil diff --git a/common/mux/session_test.go b/common/mux/session_test.go index 4c2ec189..a8491a9c 100644 --- a/common/mux/session_test.go +++ b/common/mux/session_test.go @@ -9,7 +9,7 @@ import ( func TestSessionManagerAdd(t *testing.T) { m := NewSessionManager() - s := m.Allocate(ClientStrategy{}) + s := m.Allocate(&ClientStrategy{}) if s.ID != 1 { t.Error("id: ", s.ID) } @@ -17,7 +17,7 @@ func TestSessionManagerAdd(t *testing.T) { t.Error("size: ", m.Size()) } - s = m.Allocate(ClientStrategy{}) + s = m.Allocate(&ClientStrategy{}) if s.ID != 2 { t.Error("id: ", s.ID) } @@ -39,7 +39,7 @@ func TestSessionManagerAdd(t *testing.T) { func TestSessionManagerClose(t *testing.T) { m := NewSessionManager() - s := m.Allocate(ClientStrategy{}) + s := m.Allocate(&ClientStrategy{}) if m.CloseIfNoSession() { t.Error("able to close") From 293d84ed930506ac5b26c2898584c3349e724567 Mon Sep 17 00:00:00 2001 From: patterniha <71074308+patterniha@users.noreply.github.com> Date: Wed, 9 Jul 2025 01:34:18 +0330 Subject: [PATCH 11/11] Update server.go --- common/mux/server.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/common/mux/server.go b/common/mux/server.go index 8fc0ae09..30dcf06e 100644 --- a/common/mux/server.go +++ b/common/mux/server.go @@ -201,11 +201,12 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, transferType: protocol.TransferTypePacket, XUDP: x, } - go handle(ctx, x.Mux, w.link.Writer) x.Status = Active if !w.sessionManager.Add(x.Mux) { x.Mux.Close(false) + return errors.New("failed to add new session") } + go handle(ctx, x.Mux, w.link.Writer) return nil } @@ -226,18 +227,23 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, if meta.Target.Network == net.Network_UDP { s.transferType = protocol.TransferTypePacket } - w.sessionManager.Add(s) + if !w.sessionManager.Add(s) { + s.Close(false) + return errors.New("failed to add new session") + } go handle(ctx, s, w.link.Writer) if !meta.Option.Has(OptionData) { return nil } rr := s.NewReader(reader, &meta.Target) - if err := buf.Copy(rr, s.output); err != nil { - buf.Copy(rr, buf.Discard) - return s.Close(false) + err = buf.Copy(rr, s.output) + + if err != nil && buf.IsWriteError(err) { + s.Close(false) + return buf.Copy(rr, buf.Discard) } - return nil + return err } func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error { @@ -304,10 +310,11 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedRead } func (w *ServerWorker) run(ctx context.Context) { - input := w.link.Reader - reader := &buf.BufferedReader{Reader: input} + reader := &buf.BufferedReader{Reader: w.link.Reader} defer w.sessionManager.Close() + defer common.Close(w.link.Writer) + defer common.Interrupt(w.link.Reader) for { select { @@ -318,7 +325,6 @@ func (w *ServerWorker) run(ctx context.Context) { if err != nil { if errors.Cause(err) != io.EOF { errors.LogInfoInner(ctx, err, "unexpected EOF") - common.Interrupt(input) } return }