From 18e5b0963f2c0f128ca0668ce237533bc8c7e71d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=96=E7=95=8C?= Date: Sun, 23 Apr 2023 19:31:41 +0800 Subject: [PATCH] Update dependencies --- app/dispatcher/default.go | 40 +++--- app/proxyman/outbound/handler.go | 2 +- common/singbridge/destination.go | 46 +++++++ common/singbridge/dialer.go | 59 +++++++++ common/singbridge/error.go | 10 ++ common/singbridge/handler.go | 51 ++++++++ common/singbridge/logger.go | 71 +++++++++++ common/singbridge/packet.go | 82 ++++++++++++ common/singbridge/pipe.go | 61 +++++++++ common/singbridge/reader.go | 66 ++++++++++ go.mod | 2 +- go.sum | 2 + proxy/shadowsocks_2022/inbound.go | 18 +-- proxy/shadowsocks_2022/inbound_multi.go | 18 +-- proxy/shadowsocks_2022/inbound_relay.go | 20 ++- proxy/shadowsocks_2022/outbound.go | 24 +--- proxy/shadowsocks_2022/shadowsocks_2022.go | 142 --------------------- transport/internet/system_dialer.go | 16 +-- transport/internet/system_listener.go | 21 ++- transport/internet/system_listener_test.go | 10 +- 20 files changed, 523 insertions(+), 238 deletions(-) create mode 100644 common/singbridge/destination.go create mode 100644 common/singbridge/dialer.go create mode 100644 common/singbridge/error.go create mode 100644 common/singbridge/handler.go create mode 100644 common/singbridge/logger.go create mode 100644 common/singbridge/packet.go create mode 100644 common/singbridge/pipe.go create mode 100644 common/singbridge/reader.go diff --git a/app/dispatcher/default.go b/app/dispatcher/default.go index 7328d213..35307cef 100644 --- a/app/dispatcher/default.go +++ b/app/dispatcher/default.go @@ -342,29 +342,27 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De } sniffingRequest := content.SniffingRequest if !sniffingRequest.Enabled { - go d.routedDispatch(ctx, outbound, destination) + d.routedDispatch(ctx, outbound, destination) } else { - go func() { - cReader := &cachedReader{ - reader: outbound.Reader.(*pipe.Reader), + cReader := &cachedReader{ + reader: outbound.Reader.(*pipe.Reader), + } + outbound.Reader = cReader + result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network) + if err == nil { + content.Protocol = result.Protocol() + } + if err == nil && d.shouldOverride(ctx, result, sniffingRequest, destination) { + domain := result.Domain() + newError("sniffed domain: ", domain).WriteToLog(session.ExportIDToError(ctx)) + destination.Address = net.ParseAddress(domain) + if sniffingRequest.RouteOnly && result.Protocol() != "fakedns" { + ob.RouteTarget = destination + } else { + ob.Target = destination } - outbound.Reader = cReader - result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network) - if err == nil { - content.Protocol = result.Protocol() - } - if err == nil && d.shouldOverride(ctx, result, sniffingRequest, destination) { - domain := result.Domain() - newError("sniffed domain: ", domain).WriteToLog(session.ExportIDToError(ctx)) - destination.Address = net.ParseAddress(domain) - if sniffingRequest.RouteOnly && result.Protocol() != "fakedns" { - ob.RouteTarget = destination - } else { - ob.Target = destination - } - } - d.routedDispatch(ctx, outbound, destination) - }() + } + d.routedDispatch(ctx, outbound, destination) } return nil diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index e91bcb07..b477dd6b 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -211,7 +211,7 @@ out: err.WriteToLog(session.ExportIDToError(ctx)) common.Interrupt(link.Writer) } else { - common.Must(common.Close(link.Writer)) + common.Close(link.Writer) } common.Interrupt(link.Reader) } diff --git a/common/singbridge/destination.go b/common/singbridge/destination.go new file mode 100644 index 00000000..7a89c9ef --- /dev/null +++ b/common/singbridge/destination.go @@ -0,0 +1,46 @@ +package singbridge + +import ( + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" + "github.com/xtls/xray-core/common/net" +) + +func ToNetwork(network string) net.Network { + switch N.NetworkName(network) { + case N.NetworkTCP: + return net.Network_TCP + case N.NetworkUDP: + return net.Network_UDP + default: + return net.Network_Unknown + } +} + +func ToDestination(socksaddr M.Socksaddr, network net.Network) net.Destination { + if socksaddr.IsFqdn() { + return net.Destination{ + Network: network, + Address: net.DomainAddress(socksaddr.Fqdn), + Port: net.Port(socksaddr.Port), + } + } else { + return net.Destination{ + Network: network, + Address: net.IPAddress(socksaddr.Addr.AsSlice()), + Port: net.Port(socksaddr.Port), + } + } +} + +func ToSocksaddr(destination net.Destination) M.Socksaddr { + var addr M.Socksaddr + switch destination.Address.Family() { + case net.AddressFamilyDomain: + addr.Fqdn = destination.Address.Domain() + default: + addr.Addr = M.AddrFromIP(destination.Address.IP()) + } + addr.Port = uint16(destination.Port) + return addr +} diff --git a/common/singbridge/dialer.go b/common/singbridge/dialer.go new file mode 100644 index 00000000..dfc128d8 --- /dev/null +++ b/common/singbridge/dialer.go @@ -0,0 +1,59 @@ +package singbridge + +import ( + "context" + "os" + + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/net/cnc" + "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/proxy" + "github.com/xtls/xray-core/transport" + "github.com/xtls/xray-core/transport/internet" + "github.com/xtls/xray-core/transport/pipe" +) + +var _ N.Dialer = (*XrayDialer)(nil) + +type XrayDialer struct { + internet.Dialer +} + +func NewDialer(dialer internet.Dialer) *XrayDialer { + return &XrayDialer{dialer} +} + +func (d *XrayDialer) DialContext(ctx context.Context, network string, destination M.Socksaddr) (net.Conn, error) { + return d.Dialer.Dial(ctx, ToDestination(destination, ToNetwork(network))) +} + +func (d *XrayDialer) ListenPacket(ctx context.Context, destination M.Socksaddr) (net.PacketConn, error) { + return nil, os.ErrInvalid +} + +type XrayOutboundDialer struct { + outbound proxy.Outbound + dialer internet.Dialer +} + +func NewOutboundDialer(outbound proxy.Outbound, dialer internet.Dialer) *XrayOutboundDialer { + return &XrayOutboundDialer{outbound, dialer} +} + +func (d *XrayOutboundDialer) DialContext(ctx context.Context, network string, destination M.Socksaddr) (net.Conn, error) { + ctx = session.ContextWithOutbound(context.Background(), &session.Outbound{ + Target: ToDestination(destination, ToNetwork(network)), + }) + opts := []pipe.Option{pipe.WithSizeLimit(64 * 1024)} + uplinkReader, uplinkWriter := pipe.New(opts...) + downlinkReader, downlinkWriter := pipe.New(opts...) + conn := cnc.NewConnection(cnc.ConnectionInputMulti(downlinkWriter), cnc.ConnectionOutputMulti(uplinkReader)) + go d.outbound.Process(ctx, &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}, d.dialer) + return conn, nil +} + +func (d *XrayOutboundDialer) ListenPacket(ctx context.Context, destination M.Socksaddr) (net.PacketConn, error) { + return nil, os.ErrInvalid +} diff --git a/common/singbridge/error.go b/common/singbridge/error.go new file mode 100644 index 00000000..ac9e6351 --- /dev/null +++ b/common/singbridge/error.go @@ -0,0 +1,10 @@ +package singbridge + +import E "github.com/sagernet/sing/common/exceptions" + +func ReturnError(err error) error { + if E.IsClosedOrCanceled(err) { + return nil + } + return err +} diff --git a/common/singbridge/handler.go b/common/singbridge/handler.go new file mode 100644 index 00000000..18d4ad71 --- /dev/null +++ b/common/singbridge/handler.go @@ -0,0 +1,51 @@ +package singbridge + +import ( + "context" + "io" + + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" + "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/common/errors" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/features/routing" + "github.com/xtls/xray-core/transport" +) + +var ( + _ N.TCPConnectionHandler = (*Dispatcher)(nil) + _ N.UDPConnectionHandler = (*Dispatcher)(nil) +) + +type Dispatcher struct { + upstream routing.Dispatcher + newErrorFunc func(values ...any) *errors.Error +} + +func NewDispatcher(dispatcher routing.Dispatcher, newErrorFunc func(values ...any) *errors.Error) *Dispatcher { + return &Dispatcher{ + upstream: dispatcher, + newErrorFunc: newErrorFunc, + } +} + +func (d *Dispatcher) NewConnection(ctx context.Context, conn net.Conn, metadata M.Metadata) error { + xConn := NewConn(conn) + return d.upstream.DispatchLink(ctx, ToDestination(metadata.Destination, net.Network_TCP), &transport.Link{ + Reader: xConn, + Writer: xConn, + }) +} + +func (d *Dispatcher) NewPacketConnection(ctx context.Context, conn N.PacketConn, metadata M.Metadata) error { + return d.upstream.DispatchLink(ctx, ToDestination(metadata.Destination, net.Network_UDP), &transport.Link{ + Reader: buf.NewPacketReader(conn.(io.Reader)), + Writer: buf.NewWriter(conn.(io.Writer)), + }) +} + +func (d *Dispatcher) NewError(ctx context.Context, err error) { + d.newErrorFunc(err).WriteToLog(session.ExportIDToError(ctx)) +} diff --git a/common/singbridge/logger.go b/common/singbridge/logger.go new file mode 100644 index 00000000..c1702363 --- /dev/null +++ b/common/singbridge/logger.go @@ -0,0 +1,71 @@ +package singbridge + +import ( + "context" + + "github.com/sagernet/sing/common/logger" + "github.com/xtls/xray-core/common/errors" + "github.com/xtls/xray-core/common/session" +) + +var _ logger.ContextLogger = (*XrayLogger)(nil) + +type XrayLogger struct { + newError func(values ...any) *errors.Error +} + +func NewLogger(newErrorFunc func(values ...any) *errors.Error) *XrayLogger { + return &XrayLogger{ + newErrorFunc, + } +} + +func (l *XrayLogger) Trace(args ...any) { +} + +func (l *XrayLogger) Debug(args ...any) { + l.newError(args...).AtDebug().WriteToLog() +} + +func (l *XrayLogger) Info(args ...any) { + l.newError(args...).AtInfo().WriteToLog() +} + +func (l *XrayLogger) Warn(args ...any) { + l.newError(args...).AtWarning().WriteToLog() +} + +func (l *XrayLogger) Error(args ...any) { + l.newError(args...).AtError().WriteToLog() +} + +func (l *XrayLogger) Fatal(args ...any) { +} + +func (l *XrayLogger) Panic(args ...any) { +} + +func (l *XrayLogger) TraceContext(ctx context.Context, args ...any) { +} + +func (l *XrayLogger) DebugContext(ctx context.Context, args ...any) { + l.newError(args...).AtDebug().WriteToLog(session.ExportIDToError(ctx)) +} + +func (l *XrayLogger) InfoContext(ctx context.Context, args ...any) { + l.newError(args...).AtInfo().WriteToLog(session.ExportIDToError(ctx)) +} + +func (l *XrayLogger) WarnContext(ctx context.Context, args ...any) { + l.newError(args...).AtWarning().WriteToLog(session.ExportIDToError(ctx)) +} + +func (l *XrayLogger) ErrorContext(ctx context.Context, args ...any) { + l.newError(args...).AtError().WriteToLog(session.ExportIDToError(ctx)) +} + +func (l *XrayLogger) FatalContext(ctx context.Context, args ...any) { +} + +func (l *XrayLogger) PanicContext(ctx context.Context, args ...any) { +} diff --git a/common/singbridge/packet.go b/common/singbridge/packet.go new file mode 100644 index 00000000..fef955e7 --- /dev/null +++ b/common/singbridge/packet.go @@ -0,0 +1,82 @@ +package singbridge + +import ( + "context" + + B "github.com/sagernet/sing/common/buf" + "github.com/sagernet/sing/common/bufio" + M "github.com/sagernet/sing/common/metadata" + "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/transport" +) + +func CopyPacketConn(ctx context.Context, inboundConn net.Conn, link *transport.Link, destination net.Destination, serverConn net.PacketConn) error { + conn := &PacketConnWrapper{ + Reader: link.Reader, + Writer: link.Writer, + Dest: destination, + Conn: inboundConn, + } + return ReturnError(bufio.CopyPacketConn(ctx, conn, bufio.NewPacketConn(serverConn))) +} + +type PacketConnWrapper struct { + buf.Reader + buf.Writer + net.Conn + Dest net.Destination + cached buf.MultiBuffer +} + +func (w *PacketConnWrapper) ReadPacket(buffer *B.Buffer) (M.Socksaddr, error) { + if w.cached != nil { + mb, bb := buf.SplitFirst(w.cached) + if bb == nil { + w.cached = nil + } else { + buffer.Write(bb.Bytes()) + w.cached = mb + var destination net.Destination + if bb.UDP != nil { + destination = *bb.UDP + } else { + destination = w.Dest + } + bb.Release() + return ToSocksaddr(destination), nil + } + } + mb, err := w.ReadMultiBuffer() + if err != nil { + return M.Socksaddr{}, err + } + nb, bb := buf.SplitFirst(mb) + if bb == nil { + return M.Socksaddr{}, nil + } else { + buffer.Write(bb.Bytes()) + w.cached = nb + var destination net.Destination + if bb.UDP != nil { + destination = *bb.UDP + } else { + destination = w.Dest + } + bb.Release() + return ToSocksaddr(destination), nil + } +} + +func (w *PacketConnWrapper) WritePacket(buffer *B.Buffer, destination M.Socksaddr) error { + vBuf := buf.New() + vBuf.Write(buffer.Bytes()) + endpoint := ToDestination(destination, net.Network_UDP) + vBuf.UDP = &endpoint + return w.Writer.WriteMultiBuffer(buf.MultiBuffer{vBuf}) +} + +func (w *PacketConnWrapper) Close() error { + buf.ReleaseMulti(w.cached) + return nil +} diff --git a/common/singbridge/pipe.go b/common/singbridge/pipe.go new file mode 100644 index 00000000..d04ebda4 --- /dev/null +++ b/common/singbridge/pipe.go @@ -0,0 +1,61 @@ +package singbridge + +import ( + "context" + "io" + "net" + + "github.com/sagernet/sing/common/bufio" + "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/transport" +) + +func CopyConn(ctx context.Context, inboundConn net.Conn, link *transport.Link, serverConn net.Conn) error { + conn := &PipeConnWrapper{ + W: link.Writer, + Conn: inboundConn, + } + if ir, ok := link.Reader.(io.Reader); ok { + conn.R = ir + } else { + conn.R = &buf.BufferedReader{Reader: link.Reader} + } + return ReturnError(bufio.CopyConn(ctx, conn, serverConn)) +} + +type PipeConnWrapper struct { + R io.Reader + W buf.Writer + net.Conn +} + +func (w *PipeConnWrapper) Close() error { + return nil +} + +func (w *PipeConnWrapper) Read(b []byte) (n int, err error) { + return w.R.Read(b) +} + +func (w *PipeConnWrapper) Write(p []byte) (n int, err error) { + n = len(p) + var mb buf.MultiBuffer + pLen := len(p) + for pLen > 0 { + buffer := buf.New() + if pLen > buf.Size { + _, err = buffer.Write(p[:buf.Size]) + p = p[buf.Size:] + } else { + buffer.Write(p) + } + pLen -= int(buffer.Len()) + mb = append(mb, buffer) + } + err = w.W.WriteMultiBuffer(mb) + if err != nil { + n = 0 + buf.ReleaseMulti(mb) + } + return +} diff --git a/common/singbridge/reader.go b/common/singbridge/reader.go new file mode 100644 index 00000000..1ace1845 --- /dev/null +++ b/common/singbridge/reader.go @@ -0,0 +1,66 @@ +package singbridge + +import ( + "time" + + "github.com/sagernet/sing/common" + "github.com/sagernet/sing/common/bufio" + N "github.com/sagernet/sing/common/network" + "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/common/net" +) + +var ( + _ buf.Reader = (*Conn)(nil) + _ buf.TimeoutReader = (*Conn)(nil) + _ buf.Writer = (*Conn)(nil) +) + +type Conn struct { + net.Conn + writer N.VectorisedWriter +} + +func NewConn(conn net.Conn) *Conn { + writer, _ := bufio.CreateVectorisedWriter(conn) + return &Conn{ + Conn: conn, + writer: writer, + } +} + +func (c *Conn) ReadMultiBuffer() (buf.MultiBuffer, error) { + buffer, err := buf.ReadBuffer(c.Conn) + if err != nil { + return nil, err + } + return buf.MultiBuffer{buffer}, nil +} + +func (c *Conn) ReadMultiBufferTimeout(duration time.Duration) (buf.MultiBuffer, error) { + err := c.SetReadDeadline(time.Now().Add(duration)) + if err != nil { + return nil, err + } + defer c.SetReadDeadline(time.Time{}) + return c.ReadMultiBuffer() +} + +func (c *Conn) WriteMultiBuffer(bufferList buf.MultiBuffer) error { + defer buf.ReleaseMulti(bufferList) + if c.writer != nil { + bytesList := make([][]byte, len(bufferList)) + for i, buffer := range bufferList { + bytesList[i] = buffer.Bytes() + } + return common.Error(bufio.WriteVectorised(c.writer, bytesList)) + } + // Since this conn is only used by tun, we don't force buffer writes to merge. + for _, buffer := range bufferList { + _, err := c.Conn.Write(buffer.Bytes()) + if err != nil { + return err + } + } + return nil +} diff --git a/go.mod b/go.mod index 36733221..bf6a6c3a 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/pires/go-proxyproto v0.7.0 github.com/quic-go/quic-go v0.34.0 github.com/refraction-networking/utls v1.3.2 - github.com/sagernet/sing v0.2.3 + github.com/sagernet/sing v0.2.4 github.com/sagernet/sing-shadowsocks v0.2.1 github.com/sagernet/wireguard-go v0.0.0-20221116151939-c99467f53f2c github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb diff --git a/go.sum b/go.sum index 4e681dc1..f8df0400 100644 --- a/go.sum +++ b/go.sum @@ -145,6 +145,8 @@ github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3/go.mod h1:HgjTstv github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/sagernet/sing v0.2.3 h1:V50MvZ4c3Iij2lYFWPlzL1PyipwSzjGeN9x+Ox89vpk= github.com/sagernet/sing v0.2.3/go.mod h1:Ta8nHnDLAwqySzKhGoKk4ZIB+vJ3GTKj7UPrWYvM+4w= +github.com/sagernet/sing v0.2.4 h1:gC8BR5sglbJZX23RtMyFa8EETP9YEUADhfbEzU1yVbo= +github.com/sagernet/sing v0.2.4/go.mod h1:Ta8nHnDLAwqySzKhGoKk4ZIB+vJ3GTKj7UPrWYvM+4w= github.com/sagernet/sing-shadowsocks v0.2.1 h1:FvdLQOqpvxHBJUcUe4fvgiYP2XLLwH5i1DtXQviVEPw= github.com/sagernet/sing-shadowsocks v0.2.1/go.mod h1:T/OgurSjsAe+Ug3+6PprXjmgHFmJidjOvQcjXGTKb3I= github.com/sagernet/wireguard-go v0.0.0-20221116151939-c99467f53f2c h1:vK2wyt9aWYHHvNLWniwijBu/n4pySypiKRhN32u/JGo= diff --git a/proxy/shadowsocks_2022/inbound.go b/proxy/shadowsocks_2022/inbound.go index 1c2ae1d2..bb298c09 100644 --- a/proxy/shadowsocks_2022/inbound.go +++ b/proxy/shadowsocks_2022/inbound.go @@ -17,6 +17,7 @@ import ( "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/common/singbridge" "github.com/xtls/xray-core/features/routing" "github.com/xtls/xray-core/transport/internet/stat" ) @@ -74,7 +75,7 @@ func (i *Inbound) Process(ctx context.Context, network net.Network, connection s ctx = session.ContextWithDispatcher(ctx, dispatcher) if network == net.Network_TCP { - return returnError(i.service.NewConnection(ctx, connection, metadata)) + return singbridge.ReturnError(i.service.NewConnection(ctx, connection, metadata)) } else { reader := buf.NewReader(connection) pc := &natPacketConn{connection} @@ -82,7 +83,7 @@ func (i *Inbound) Process(ctx context.Context, network net.Network, connection s mb, err := reader.ReadMultiBuffer() if err != nil { buf.ReleaseMulti(mb) - return returnError(err) + return singbridge.ReturnError(err) } for _, buffer := range mb { packet := B.As(buffer.Bytes()).ToOwned() @@ -112,16 +113,11 @@ func (i *Inbound) NewConnection(ctx context.Context, conn net.Conn, metadata M.M }) newError("tunnelling request to tcp:", metadata.Destination).WriteToLog(session.ExportIDToError(ctx)) dispatcher := session.DispatcherFromContext(ctx) - link, err := dispatcher.Dispatch(ctx, toDestination(metadata.Destination, net.Network_TCP)) + link, err := dispatcher.Dispatch(ctx, singbridge.ToDestination(metadata.Destination, net.Network_TCP)) if err != nil { return err } - outConn := &pipeConnWrapper{ - &buf.BufferedReader{Reader: link.Reader}, - link.Writer, - conn, - } - return bufio.CopyConn(ctx, conn, outConn) + return singbridge.CopyConn(ctx, nil, link, conn) } func (i *Inbound) NewPacketConnection(ctx context.Context, conn N.PacketConn, metadata M.Metadata) error { @@ -138,12 +134,12 @@ func (i *Inbound) NewPacketConnection(ctx context.Context, conn N.PacketConn, me }) newError("tunnelling request to udp:", metadata.Destination).WriteToLog(session.ExportIDToError(ctx)) dispatcher := session.DispatcherFromContext(ctx) - destination := toDestination(metadata.Destination, net.Network_UDP) + destination := singbridge.ToDestination(metadata.Destination, net.Network_UDP) link, err := dispatcher.Dispatch(ctx, destination) if err != nil { return err } - outConn := &packetConnWrapper{ + outConn := &singbridge.PacketConnWrapper{ Reader: link.Reader, Writer: link.Writer, Dest: destination, diff --git a/proxy/shadowsocks_2022/inbound_multi.go b/proxy/shadowsocks_2022/inbound_multi.go index 77a34427..04cac573 100644 --- a/proxy/shadowsocks_2022/inbound_multi.go +++ b/proxy/shadowsocks_2022/inbound_multi.go @@ -21,6 +21,7 @@ import ( "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/common/singbridge" "github.com/xtls/xray-core/common/uuid" "github.com/xtls/xray-core/features/routing" "github.com/xtls/xray-core/transport/internet/stat" @@ -163,7 +164,7 @@ func (i *MultiUserInbound) Process(ctx context.Context, network net.Network, con ctx = session.ContextWithDispatcher(ctx, dispatcher) if network == net.Network_TCP { - return returnError(i.service.NewConnection(ctx, connection, metadata)) + return singbridge.ReturnError(i.service.NewConnection(ctx, connection, metadata)) } else { reader := buf.NewReader(connection) pc := &natPacketConn{connection} @@ -171,7 +172,7 @@ func (i *MultiUserInbound) Process(ctx context.Context, network net.Network, con mb, err := reader.ReadMultiBuffer() if err != nil { buf.ReleaseMulti(mb) - return returnError(err) + return singbridge.ReturnError(err) } for _, buffer := range mb { packet := B.As(buffer.Bytes()).ToOwned() @@ -203,16 +204,11 @@ func (i *MultiUserInbound) NewConnection(ctx context.Context, conn net.Conn, met }) newError("tunnelling request to tcp:", metadata.Destination).WriteToLog(session.ExportIDToError(ctx)) dispatcher := session.DispatcherFromContext(ctx) - link, err := dispatcher.Dispatch(ctx, toDestination(metadata.Destination, net.Network_TCP)) + link, err := dispatcher.Dispatch(ctx, singbridge.ToDestination(metadata.Destination, net.Network_TCP)) if err != nil { return err } - outConn := &pipeConnWrapper{ - &buf.BufferedReader{Reader: link.Reader}, - link.Writer, - conn, - } - return bufio.CopyConn(ctx, conn, outConn) + return singbridge.CopyConn(ctx, conn, link, conn) } func (i *MultiUserInbound) NewPacketConnection(ctx context.Context, conn N.PacketConn, metadata M.Metadata) error { @@ -231,12 +227,12 @@ func (i *MultiUserInbound) NewPacketConnection(ctx context.Context, conn N.Packe }) newError("tunnelling request to udp:", metadata.Destination).WriteToLog(session.ExportIDToError(ctx)) dispatcher := session.DispatcherFromContext(ctx) - destination := toDestination(metadata.Destination, net.Network_UDP) + destination := singbridge.ToDestination(metadata.Destination, net.Network_UDP) link, err := dispatcher.Dispatch(ctx, destination) if err != nil { return err } - outConn := &packetConnWrapper{ + outConn := &singbridge.PacketConnWrapper{ Reader: link.Reader, Writer: link.Writer, Dest: destination, diff --git a/proxy/shadowsocks_2022/inbound_relay.go b/proxy/shadowsocks_2022/inbound_relay.go index d07babb8..c3f8e675 100644 --- a/proxy/shadowsocks_2022/inbound_relay.go +++ b/proxy/shadowsocks_2022/inbound_relay.go @@ -19,6 +19,7 @@ import ( "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/common/singbridge" "github.com/xtls/xray-core/common/uuid" "github.com/xtls/xray-core/features/routing" "github.com/xtls/xray-core/transport/internet/stat" @@ -66,7 +67,7 @@ func NewRelayServer(ctx context.Context, config *RelayServerConfig) (*RelayInbou C.MapIndexed(config.Destinations, func(index int, it *RelayDestination) int { return index }), C.Map(config.Destinations, func(it *RelayDestination) string { return it.Key }), C.Map(config.Destinations, func(it *RelayDestination) M.Socksaddr { - return toSocksaddr(net.Destination{ + return singbridge.ToSocksaddr(net.Destination{ Address: it.Address.AsAddress(), Port: net.Port(it.Port), }) @@ -95,7 +96,7 @@ func (i *RelayInbound) Process(ctx context.Context, network net.Network, connect ctx = session.ContextWithDispatcher(ctx, dispatcher) if network == net.Network_TCP { - return returnError(i.service.NewConnection(ctx, connection, metadata)) + return singbridge.ReturnError(i.service.NewConnection(ctx, connection, metadata)) } else { reader := buf.NewReader(connection) pc := &natPacketConn{connection} @@ -103,7 +104,7 @@ func (i *RelayInbound) Process(ctx context.Context, network net.Network, connect mb, err := reader.ReadMultiBuffer() if err != nil { buf.ReleaseMulti(mb) - return returnError(err) + return singbridge.ReturnError(err) } for _, buffer := range mb { packet := B.As(buffer.Bytes()).ToOwned() @@ -135,16 +136,11 @@ func (i *RelayInbound) NewConnection(ctx context.Context, conn net.Conn, metadat }) newError("tunnelling request to tcp:", metadata.Destination).WriteToLog(session.ExportIDToError(ctx)) dispatcher := session.DispatcherFromContext(ctx) - link, err := dispatcher.Dispatch(ctx, toDestination(metadata.Destination, net.Network_TCP)) + link, err := dispatcher.Dispatch(ctx, singbridge.ToDestination(metadata.Destination, net.Network_TCP)) if err != nil { return err } - outConn := &pipeConnWrapper{ - &buf.BufferedReader{Reader: link.Reader}, - link.Writer, - conn, - } - return bufio.CopyConn(ctx, conn, outConn) + return singbridge.CopyConn(ctx, nil, link, conn) } func (i *RelayInbound) NewPacketConnection(ctx context.Context, conn N.PacketConn, metadata M.Metadata) error { @@ -163,12 +159,12 @@ func (i *RelayInbound) NewPacketConnection(ctx context.Context, conn N.PacketCon }) newError("tunnelling request to udp:", metadata.Destination).WriteToLog(session.ExportIDToError(ctx)) dispatcher := session.DispatcherFromContext(ctx) - destination := toDestination(metadata.Destination, net.Network_UDP) + destination := singbridge.ToDestination(metadata.Destination, net.Network_UDP) link, err := dispatcher.Dispatch(ctx, destination) if err != nil { return err } - outConn := &packetConnWrapper{ + outConn := &singbridge.PacketConnWrapper{ Reader: link.Reader, Writer: link.Writer, Dest: destination, diff --git a/proxy/shadowsocks_2022/outbound.go b/proxy/shadowsocks_2022/outbound.go index 41e239dc..151ea0e2 100644 --- a/proxy/shadowsocks_2022/outbound.go +++ b/proxy/shadowsocks_2022/outbound.go @@ -2,7 +2,6 @@ package shadowsocks_2022 import ( "context" - "io" "runtime" "time" @@ -17,6 +16,7 @@ import ( "github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/common/singbridge" "github.com/xtls/xray-core/transport" "github.com/xtls/xray-core/transport/internet" ) @@ -93,7 +93,7 @@ func (o *Outbound) Process(ctx context.Context, link *transport.Link, dialer int } if network == net.Network_TCP { - serverConn := o.method.DialEarlyConn(connection, toSocksaddr(destination)) + serverConn := o.method.DialEarlyConn(connection, singbridge.ToSocksaddr(destination)) var handshake bool if timeoutReader, isTimeoutReader := link.Reader.(buf.TimeoutReader); isTimeoutReader { mb, err := timeoutReader.ReadMultiBufferTimeout(time.Millisecond * 100) @@ -128,17 +128,7 @@ func (o *Outbound) Process(ctx context.Context, link *transport.Link, dialer int return newError("client handshake").Base(err) } } - conn := &pipeConnWrapper{ - W: link.Writer, - Conn: inboundConn, - } - if ir, ok := link.Reader.(io.Reader); ok { - conn.R = ir - } else { - conn.R = &buf.BufferedReader{Reader: link.Reader} - } - - return returnError(bufio.CopyConn(ctx, conn, serverConn)) + return singbridge.CopyConn(ctx, inboundConn, link, serverConn) } else { var packetConn N.PacketConn if pc, isPacketConn := inboundConn.(N.PacketConn); isPacketConn { @@ -146,7 +136,7 @@ func (o *Outbound) Process(ctx context.Context, link *transport.Link, dialer int } else if nc, isNetPacket := inboundConn.(net.PacketConn); isNetPacket { packetConn = bufio.NewPacketConn(nc) } else { - packetConn = &packetConnWrapper{ + packetConn = &singbridge.PacketConnWrapper{ Reader: link.Reader, Writer: link.Writer, Conn: inboundConn, @@ -155,14 +145,14 @@ func (o *Outbound) Process(ctx context.Context, link *transport.Link, dialer int } if o.uotClient != nil { - uConn, err := o.uotClient.DialEarlyConn(o.method.DialEarlyConn(connection, uot.RequestDestination(o.uotClient.Version)), false, toSocksaddr(destination)) + uConn, err := o.uotClient.DialEarlyConn(o.method.DialEarlyConn(connection, uot.RequestDestination(o.uotClient.Version)), false, singbridge.ToSocksaddr(destination)) if err != nil { return err } - return returnError(bufio.CopyPacketConn(ctx, packetConn, uConn)) + return singbridge.ReturnError(bufio.CopyPacketConn(ctx, packetConn, uConn)) } else { serverConn := o.method.DialPacketConn(connection) - return returnError(bufio.CopyPacketConn(ctx, packetConn, serverConn)) + return singbridge.ReturnError(bufio.CopyPacketConn(ctx, packetConn, serverConn)) } } } diff --git a/proxy/shadowsocks_2022/shadowsocks_2022.go b/proxy/shadowsocks_2022/shadowsocks_2022.go index 945c4499..4f8d88ab 100644 --- a/proxy/shadowsocks_2022/shadowsocks_2022.go +++ b/proxy/shadowsocks_2022/shadowsocks_2022.go @@ -1,145 +1,3 @@ package shadowsocks_2022 -import ( - "io" - - B "github.com/sagernet/sing/common/buf" - E "github.com/sagernet/sing/common/exceptions" - M "github.com/sagernet/sing/common/metadata" - "github.com/xtls/xray-core/common/buf" - "github.com/xtls/xray-core/common/net" -) - //go:generate go run github.com/xtls/xray-core/common/errors/errorgen - -func toDestination(socksaddr M.Socksaddr, network net.Network) net.Destination { - if socksaddr.IsFqdn() { - return net.Destination{ - Network: network, - Address: net.DomainAddress(socksaddr.Fqdn), - Port: net.Port(socksaddr.Port), - } - } else { - return net.Destination{ - Network: network, - Address: net.IPAddress(socksaddr.Addr.AsSlice()), - Port: net.Port(socksaddr.Port), - } - } -} - -func toSocksaddr(destination net.Destination) M.Socksaddr { - var addr M.Socksaddr - switch destination.Address.Family() { - case net.AddressFamilyDomain: - addr.Fqdn = destination.Address.Domain() - default: - addr.Addr = M.AddrFromIP(destination.Address.IP()) - } - addr.Port = uint16(destination.Port) - return addr -} - -type pipeConnWrapper struct { - R io.Reader - W buf.Writer - net.Conn -} - -func (w *pipeConnWrapper) Close() error { - return nil -} - -func (w *pipeConnWrapper) Read(b []byte) (n int, err error) { - return w.R.Read(b) -} - -func (w *pipeConnWrapper) Write(p []byte) (n int, err error) { - n = len(p) - var mb buf.MultiBuffer - pLen := len(p) - for pLen > 0 { - buffer := buf.New() - if pLen > buf.Size { - _, err = buffer.Write(p[:buf.Size]) - p = p[buf.Size:] - } else { - buffer.Write(p) - } - pLen -= int(buffer.Len()) - mb = append(mb, buffer) - } - err = w.W.WriteMultiBuffer(mb) - if err != nil { - n = 0 - buf.ReleaseMulti(mb) - } - return -} - -type packetConnWrapper struct { - buf.Reader - buf.Writer - net.Conn - Dest net.Destination - cached buf.MultiBuffer -} - -func (w *packetConnWrapper) ReadPacket(buffer *B.Buffer) (M.Socksaddr, error) { - if w.cached != nil { - mb, bb := buf.SplitFirst(w.cached) - if bb == nil { - w.cached = nil - } else { - buffer.Write(bb.Bytes()) - w.cached = mb - var destination net.Destination - if bb.UDP != nil { - destination = *bb.UDP - } else { - destination = w.Dest - } - bb.Release() - return toSocksaddr(destination), nil - } - } - mb, err := w.ReadMultiBuffer() - if err != nil { - return M.Socksaddr{}, err - } - nb, bb := buf.SplitFirst(mb) - if bb == nil { - return M.Socksaddr{}, nil - } else { - buffer.Write(bb.Bytes()) - w.cached = nb - var destination net.Destination - if bb.UDP != nil { - destination = *bb.UDP - } else { - destination = w.Dest - } - bb.Release() - return toSocksaddr(destination), nil - } -} - -func (w *packetConnWrapper) WritePacket(buffer *B.Buffer, destination M.Socksaddr) error { - vBuf := buf.New() - vBuf.Write(buffer.Bytes()) - endpoint := toDestination(destination, net.Network_UDP) - vBuf.UDP = &endpoint - return w.Writer.WriteMultiBuffer(buf.MultiBuffer{vBuf}) -} - -func (w *packetConnWrapper) Close() error { - buf.ReleaseMulti(w.cached) - return nil -} - -func returnError(err error) error { - if E.IsClosed(err) { - return nil - } - return err -} diff --git a/transport/internet/system_dialer.go b/transport/internet/system_dialer.go index 93cf404e..5a68144d 100644 --- a/transport/internet/system_dialer.go +++ b/transport/internet/system_dialer.go @@ -5,6 +5,7 @@ import ( "syscall" "time" + "github.com/sagernet/sing/common/control" "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/session" "github.com/xtls/xray-core/features/dns" @@ -18,7 +19,7 @@ type SystemDialer interface { } type DefaultSystemDialer struct { - controllers []controller + controllers []control.Func dns dns.Client obm outbound.Manager } @@ -81,6 +82,11 @@ func (d *DefaultSystemDialer) Dial(ctx context.Context, src net.Address, dest ne if sockopt != nil || len(d.controllers) > 0 { dialer.Control = func(network, address string, c syscall.RawConn) error { + for _, ctl := range d.controllers { + if err := ctl(network, address, c); err != nil { + newError("failed to apply external controller").Base(err).WriteToLog(session.ExportIDToError(ctx)) + } + } return c.Control(func(fd uintptr) { if sockopt != nil { if err := applyOutboundSocketOptions(network, address, fd, sockopt); err != nil { @@ -92,12 +98,6 @@ func (d *DefaultSystemDialer) Dial(ctx context.Context, src net.Address, dest ne } } } - - for _, ctl := range d.controllers { - if err := ctl(network, address, fd); err != nil { - newError("failed to apply external controller").Base(err).WriteToLog(session.ExportIDToError(ctx)) - } - } }) } } @@ -185,7 +185,7 @@ func UseAlternativeSystemDialer(dialer SystemDialer) { // It only works when effective dialer is the default dialer. // // xray:api:beta -func RegisterDialerController(ctl func(network, address string, fd uintptr) error) error { +func RegisterDialerController(ctl control.Func) error { if ctl == nil { return newError("nil listener controller") } diff --git a/transport/internet/system_listener.go b/transport/internet/system_listener.go index 04694383..60979062 100644 --- a/transport/internet/system_listener.go +++ b/transport/internet/system_listener.go @@ -10,21 +10,26 @@ import ( "time" "github.com/pires/go-proxyproto" + "github.com/sagernet/sing/common/control" "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/session" ) var effectiveListener = DefaultListener{} -type controller func(network, address string, fd uintptr) error - type DefaultListener struct { - controllers []controller + controllers []control.Func } -func getControlFunc(ctx context.Context, sockopt *SocketConfig, controllers []controller) func(network, address string, c syscall.RawConn) error { +func getControlFunc(ctx context.Context, sockopt *SocketConfig, controllers []control.Func) func(network, address string, c syscall.RawConn) error { return func(network, address string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { + for _, controller := range controllers { + if err := controller(network, address, c); err != nil { + newError("failed to apply external controller").Base(err).WriteToLog(session.ExportIDToError(ctx)) + } + } + if sockopt != nil { if err := applyInboundSocketOptions(network, fd, sockopt); err != nil { newError("failed to apply socket options to incoming connection").Base(err).WriteToLog(session.ExportIDToError(ctx)) @@ -32,12 +37,6 @@ func getControlFunc(ctx context.Context, sockopt *SocketConfig, controllers []co } setReusePort(fd) - - for _, controller := range controllers { - if err := controller(network, address, fd); err != nil { - newError("failed to apply external controller").Base(err).WriteToLog(session.ExportIDToError(ctx)) - } - } }) } } @@ -117,7 +116,7 @@ func (dl *DefaultListener) ListenPacket(ctx context.Context, addr net.Addr, sock // The controller can be used to operate on file descriptors before they are put into use. // // xray:api:beta -func RegisterListenerController(controller func(network, address string, fd uintptr) error) error { +func RegisterListenerController(controller control.Func) error { if controller == nil { return newError("nil listener controller") } diff --git a/transport/internet/system_listener_test.go b/transport/internet/system_listener_test.go index 0fcc9a95..390888e7 100644 --- a/transport/internet/system_listener_test.go +++ b/transport/internet/system_listener_test.go @@ -3,8 +3,10 @@ package internet_test import ( "context" "net" + "syscall" "testing" + "github.com/sagernet/sing/common/control" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/transport/internet" ) @@ -12,9 +14,11 @@ import ( func TestRegisterListenerController(t *testing.T) { var gotFd uintptr - common.Must(internet.RegisterListenerController(func(network string, addr string, fd uintptr) error { - gotFd = fd - return nil + common.Must(internet.RegisterListenerController(func(network, address string, conn syscall.RawConn) error { + return control.Raw(conn, func(fd uintptr) error { + gotFd = fd + return nil + }) })) conn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{