From db934f083218dddf40811e8f716074de2c280160 Mon Sep 17 00:00:00 2001 From: RPRX <63339210+RPRX@users.noreply.github.com> Date: Fri, 20 Dec 2024 14:35:33 +0000 Subject: [PATCH] XHTTP client: Merge Open* into OpenStream(), and more https://github.com/XTLS/Xray-core/issues/4148#issuecomment-2557066988 --- go.mod | 2 +- go.sum | 4 +- .../internet/splithttp/browser_client.go | 18 +- transport/internet/splithttp/client.go | 172 ++++-------------- transport/internet/splithttp/dialer.go | 44 ++--- transport/internet/splithttp/lazy_reader.go | 47 ----- 6 files changed, 61 insertions(+), 226 deletions(-) delete mode 100644 transport/internet/splithttp/lazy_reader.go diff --git a/go.mod b/go.mod index ab084d32..3302a075 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/v2fly/ss-bloomring v0.0.0-20210312155135-28617310f63e github.com/vishvananda/netlink v1.3.0 - github.com/xtls/quic-go v0.46.0 + github.com/xtls/quic-go v0.0.0-20241220091641-6f5777d1c087 github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d go4.org/netipx v0.0.0-20231129151722-fdeea329fbba golang.org/x/crypto v0.31.0 diff --git a/go.sum b/go.sum index 87db06b4..9728dfca 100644 --- a/go.sum +++ b/go.sum @@ -68,8 +68,8 @@ github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQ github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= -github.com/xtls/quic-go v0.46.0 h1:yfv6h+/+iOeFhFnmJiwlZgnJjr4fPb4N4rQelffbs1U= -github.com/xtls/quic-go v0.46.0/go.mod h1:mN9lAuc8Vt7eHvnQkDIH5+uHh+DcLmTBma9rLqk/rPY= +github.com/xtls/quic-go v0.0.0-20241220091641-6f5777d1c087 h1:kKPg/cJPSKnE50VXVBskDYYSBkl4X3sMCIbTy+XKNGk= +github.com/xtls/quic-go v0.0.0-20241220091641-6f5777d1c087/go.mod h1:mN9lAuc8Vt7eHvnQkDIH5+uHh+DcLmTBma9rLqk/rPY= github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d h1:+B97uD9uHLgAAulhigmys4BVwZZypzK7gPN3WtpgRJg= github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d/go.mod h1:dm4y/1QwzjGaK17ofi0Vs6NpKAHegZky8qk6J2JJZAE= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= diff --git a/transport/internet/splithttp/browser_client.go b/transport/internet/splithttp/browser_client.go index 1b5f6d6b..d5d3d942 100644 --- a/transport/internet/splithttp/browser_client.go +++ b/transport/internet/splithttp/browser_client.go @@ -17,16 +17,12 @@ func (c *BrowserDialerClient) IsClosed() bool { panic("not implemented yet") } -func (c *BrowserDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) { - panic("not implemented yet") -} +func (c *BrowserDialerClient) OpenStream(ctx context.Context, url string, body io.Reader, uploadOnly bool) (io.ReadCloser, gonet.Addr, gonet.Addr, error) { + if body != nil { + panic("not implemented yet") + } -func (c *BrowserDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser { - panic("not implemented yet") -} - -func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) { - conn, err := browser_dialer.DialGet(baseURL) + conn, err := browser_dialer.DialGet(url) dummyAddr := &gonet.IPAddr{} if err != nil { return nil, dummyAddr, dummyAddr, err @@ -35,8 +31,8 @@ func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string) return websocket.NewConnection(conn, dummyAddr, nil, 0), conn.RemoteAddr(), conn.LocalAddr(), nil } -func (c *BrowserDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error { - bytes, err := io.ReadAll(payload) +func (c *BrowserDialerClient) PostPacket(ctx context.Context, url string, body io.Reader, contentLength int64) error { + bytes, err := io.ReadAll(body) if err != nil { return err } diff --git a/transport/internet/splithttp/client.go b/transport/internet/splithttp/client.go index 3baea6f0..c6cfee55 100644 --- a/transport/internet/splithttp/client.go +++ b/transport/internet/splithttp/client.go @@ -20,21 +20,11 @@ import ( type DialerClient interface { IsClosed() bool - // (ctx, baseURL, payload) -> err - // baseURL already contains sessionId and seq - SendUploadRequest(context.Context, string, io.ReadWriteCloser, int64) error + // ctx, url, body, uploadOnly + OpenStream(context.Context, string, io.Reader, bool) (io.ReadCloser, net.Addr, net.Addr, error) - // (ctx, baseURL) -> (downloadReader, remoteAddr, localAddr) - // baseURL already contains sessionId - OpenDownload(context.Context, string) (io.ReadCloser, net.Addr, net.Addr, error) - - // (ctx, baseURL) -> uploadWriter - // baseURL already contains sessionId - OpenUpload(context.Context, string) io.WriteCloser - - // (ctx, pureURL) -> (uploadWriter, downloadReader) - // pureURL can not contain sessionId - Open(context.Context, string) (io.WriteCloser, io.ReadCloser) + // ctx, url, body, contentLength + PostPacket(context.Context, string, io.Reader, int64) error } // implements splithttp.DialerClient in terms of direct network connections @@ -52,136 +42,56 @@ func (c *DefaultDialerClient) IsClosed() bool { return c.closed } -func (c *DefaultDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) { - reader, writer := io.Pipe() - req, _ := http.NewRequestWithContext(ctx, "POST", pureURL, reader) - req.Header = c.transportConfig.GetRequestHeader() - if !c.transportConfig.NoGRPCHeader { - req.Header.Set("Content-Type", "application/grpc") - } - wrc := &WaitReadCloser{Wait: make(chan struct{})} - go func() { - response, err := c.client.Do(req) - if err != nil || response.StatusCode != 200 { - if err != nil { - errors.LogInfoInner(ctx, err, "failed to open ", pureURL) - } else { - // c.closed = true - response.Body.Close() - errors.LogInfo(ctx, "unexpected status ", response.StatusCode) - } - wrc.Close() - return - } - wrc.Set(response.Body) - }() - return writer, wrc -} - -func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser { - reader, writer := io.Pipe() - req, _ := http.NewRequestWithContext(ctx, "POST", baseURL, reader) - req.Header = c.transportConfig.GetRequestHeader() - if !c.transportConfig.NoGRPCHeader { - req.Header.Set("Content-Type", "application/grpc") - } - go func() { - if resp, err := c.client.Do(req); err == nil { - if resp.StatusCode != 200 { - // c.closed = true - } - resp.Body.Close() - } - }() - return writer -} - -func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) { - var remoteAddr gonet.Addr - var localAddr gonet.Addr +func (c *DefaultDialerClient) OpenStream(ctx context.Context, url string, body io.Reader, uploadOnly bool) (wrc io.ReadCloser, remoteAddr, localAddr gonet.Addr, err error) { // this is done when the TCP/UDP connection to the server was established, // and we can unblock the Dial function and print correct net addresses in // logs gotConn := done.New() + ctx = httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{ + GotConn: func(connInfo httptrace.GotConnInfo) { + remoteAddr = connInfo.Conn.RemoteAddr() + localAddr = connInfo.Conn.LocalAddr() + gotConn.Close() + }, + }) - var downResponse io.ReadCloser - gotDownResponse := done.New() - - ctx, ctxCancel := context.WithCancel(ctx) + method := "GET" + if body != nil { + method = "POST" + } + req, _ := http.NewRequestWithContext(ctx, method, url, body) + req.Header = c.transportConfig.GetRequestHeader() + if method == "POST" && !c.transportConfig.NoGRPCHeader { + req.Header.Set("Content-Type", "application/grpc") + } + wrc = &WaitReadCloser{Wait: make(chan struct{})} go func() { - trace := &httptrace.ClientTrace{ - GotConn: func(connInfo httptrace.GotConnInfo) { - remoteAddr = connInfo.Conn.RemoteAddr() - localAddr = connInfo.Conn.LocalAddr() - gotConn.Close() - }, - } - - // in case we hit an error, we want to unblock this part - defer gotConn.Close() - - ctx = httptrace.WithClientTrace(ctx, trace) - - req, err := http.NewRequestWithContext( - ctx, - "GET", - baseURL, - nil, - ) + resp, err := c.client.Do(req) if err != nil { - errors.LogInfoInner(ctx, err, "failed to construct download http request") - gotDownResponse.Close() + errors.LogInfoInner(ctx, err, "failed to "+method+" "+url) + gotConn.Close() + wrc.Close() return } - - req.Header = c.transportConfig.GetRequestHeader() - - response, err := c.client.Do(req) - gotConn.Close() - if err != nil { - errors.LogInfoInner(ctx, err, "failed to send download http request") - gotDownResponse.Close() - return - } - - if response.StatusCode != 200 { + if resp.StatusCode != 200 && !uploadOnly { // c.closed = true - response.Body.Close() - errors.LogInfo(ctx, "invalid status code on download:", response.Status) - gotDownResponse.Close() + errors.LogInfo(ctx, "unexpected status ", resp.StatusCode) + } + if resp.StatusCode != 200 || uploadOnly { + resp.Body.Close() + wrc.Close() return } - - downResponse = response.Body - gotDownResponse.Close() + wrc.(*WaitReadCloser).Set(resp.Body) }() <-gotConn.Wait() - - lazyDownload := &LazyReader{ - CreateReader: func() (io.Reader, error) { - <-gotDownResponse.Wait() - if downResponse == nil { - return nil, errors.New("downResponse failed") - } - return downResponse, nil - }, - } - - // workaround for https://github.com/quic-go/quic-go/issues/2143 -- - // always cancel request context so that Close cancels any Read. - // Should then match the behavior of http2 and http1. - reader := downloadBody{ - lazyDownload, - ctxCancel, - } - - return reader, remoteAddr, localAddr, nil + return } -func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error { - req, err := http.NewRequestWithContext(ctx, "POST", url, payload) +func (c *DefaultDialerClient) PostPacket(ctx context.Context, url string, body io.Reader, contentLength int64) error { + req, err := http.NewRequestWithContext(ctx, "POST", url, body) if err != nil { return err } @@ -257,16 +167,6 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, return nil } -type downloadBody struct { - io.Reader - cancel context.CancelFunc -} - -func (c downloadBody) Close() error { - c.cancel() - return nil -} - type WaitReadCloser struct { Wait chan struct{} io.ReadCloser diff --git a/transport/internet/splithttp/dialer.go b/transport/internet/splithttp/dialer.go index aed499c8..d1b3ef09 100644 --- a/transport/internet/splithttp/dialer.go +++ b/transport/internet/splithttp/dialer.go @@ -343,29 +343,6 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me errors.LogInfo(ctx, fmt.Sprintf("XHTTP is downloading from %s, mode %s, HTTP version %s, host %s", dest2, "stream-down", httpVersion2, requestURL2.Host)) } - var writer io.WriteCloser - var reader io.ReadCloser - var remoteAddr, localAddr net.Addr - var err error - - if mode == "stream-one" { - requestURL.Path = transportConfiguration.GetNormalizedPath() - if xmuxClient != nil { - xmuxClient.LeftRequests.Add(-1) - } - writer, reader = httpClient.Open(context.WithoutCancel(ctx), requestURL.String()) - remoteAddr = &net.TCPAddr{} - localAddr = &net.TCPAddr{} - } else { - if xmuxClient2 != nil { - xmuxClient2.LeftRequests.Add(-1) - } - reader, remoteAddr, localAddr, err = httpClient2.OpenDownload(context.WithoutCancel(ctx), requestURL2.String()) - if err != nil { - return nil, err - } - } - if xmuxClient != nil { xmuxClient.OpenUsage.Add(1) } @@ -374,11 +351,9 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me } var closed atomic.Int32 + reader, writer := io.Pipe() conn := splitConn{ - writer: writer, - reader: reader, - remoteAddr: remoteAddr, - localAddr: localAddr, + writer: writer, onClose: func() { if closed.Add(1) > 1 { return @@ -393,16 +368,27 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me } if mode == "stream-one" { + requestURL.Path = transportConfiguration.GetNormalizedPath() if xmuxClient != nil { xmuxClient.LeftRequests.Add(-1) } + conn.reader, conn.remoteAddr, conn.localAddr, _ = httpClient.OpenStream(context.WithoutCancel(ctx), requestURL.String(), reader, false) return stat.Connection(&conn), nil + } else { // stream-down + var err error + if xmuxClient2 != nil { + xmuxClient2.LeftRequests.Add(-1) + } + conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient2.OpenStream(context.WithoutCancel(ctx), requestURL2.String(), nil, false) + if err != nil { // browser dialer only + return nil, err + } } if mode == "stream-up" { if xmuxClient != nil { xmuxClient.LeftRequests.Add(-1) } - conn.writer = httpClient.OpenUpload(ctx, requestURL.String()) + httpClient.OpenStream(ctx, requestURL.String(), reader, true) return stat.Connection(&conn), nil } @@ -466,7 +452,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me } go func() { - err := httpClient.SendUploadRequest( + err := httpClient.PostPacket( context.WithoutCancel(ctx), url.String(), &buf.MultiBufferContainer{MultiBuffer: chunk}, diff --git a/transport/internet/splithttp/lazy_reader.go b/transport/internet/splithttp/lazy_reader.go deleted file mode 100644 index 35d1f436..00000000 --- a/transport/internet/splithttp/lazy_reader.go +++ /dev/null @@ -1,47 +0,0 @@ -package splithttp - -import ( - "io" - "sync" -) - -// Close is intentionally not supported by LazyReader because it's not clear -// how CreateReader should be aborted in case of Close. It's best to wrap -// LazyReader in another struct that handles Close correctly, or better, stop -// using LazyReader entirely. -type LazyReader struct { - readerSync sync.Mutex - CreateReader func() (io.Reader, error) - reader io.Reader - readerError error -} - -func (r *LazyReader) getReader() (io.Reader, error) { - r.readerSync.Lock() - defer r.readerSync.Unlock() - if r.reader != nil { - return r.reader, nil - } - - if r.readerError != nil { - return nil, r.readerError - } - - reader, err := r.CreateReader() - if err != nil { - r.readerError = err - return nil, err - } - - r.reader = reader - return reader, nil -} - -func (r *LazyReader) Read(b []byte) (int, error) { - reader, err := r.getReader() - if err != nil { - return 0, err - } - n, err := reader.Read(b) - return n, err -}