XHTTP client: Refactor "packet-up" mode, chasing "stream-up" (#4150)

* Add wroteRequest (waiting for new quic-go)

* Use XTLS/quic-go instead

* Client doesn't need `scMaxConcurrentPosts` anymore

* GotConn is available in H3

* `scMaxConcurrentPosts` -> `scMaxBufferedPosts` (server only, 30 by default)

Fixes https://github.com/XTLS/Xray-core/issues/4100
This commit is contained in:
RPRX 2024-12-11 14:05:39 +00:00 committed by GitHub
parent 6be3c35db8
commit 8cd9a74376
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 235 additions and 250 deletions

View file

@ -5,6 +5,7 @@ import (
gotls "crypto/tls"
"io"
"net/http"
"net/http/httptrace"
"net/url"
"strconv"
"sync"
@ -16,7 +17,7 @@ import (
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/signal/semaphore"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/common/uuid"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/browser_dialer"
@ -249,7 +250,6 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
realityConfig := reality.ConfigFromStreamSettings(streamSettings)
scMaxConcurrentPosts := transportConfiguration.GetNormalizedScMaxConcurrentPosts()
scMaxEachPostBytes := transportConfiguration.GetNormalizedScMaxEachPostBytes()
scMinPostsIntervalMs := transportConfiguration.GetNormalizedScMinPostsIntervalMs()
@ -386,54 +386,60 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
}
go func() {
requestsLimiter := semaphore.New(int(scMaxConcurrentPosts.roll()))
var requestCounter int64
var seq int64
var lastWrite time.Time
lastWrite := time.Now()
// by offloading the uploads into a buffered pipe, multiple conn.Write
// calls get automatically batched together into larger POST requests.
// without batching, bandwidth is extremely limited.
for {
wroteRequest := done.New()
ctx := httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
WroteRequest: func(httptrace.WroteRequestInfo) {
wroteRequest.Close()
},
})
// this intentionally makes a shallow-copy of the struct so we
// can reassign Path (potentially concurrently)
url := requestURL
url.Path += "/" + strconv.FormatInt(seq, 10)
// reassign query to get different padding
url.RawQuery = transportConfiguration.GetNormalizedQuery()
seq += 1
if scMinPostsIntervalMs.From > 0 {
sleep := time.Duration(scMinPostsIntervalMs.roll())*time.Millisecond - time.Since(lastWrite)
if sleep > 0 {
time.Sleep(sleep)
}
}
// by offloading the uploads into a buffered pipe, multiple conn.Write
// calls get automatically batched together into larger POST requests.
// without batching, bandwidth is extremely limited.
chunk, err := uploadPipeReader.ReadMultiBuffer()
if err != nil {
break
}
<-requestsLimiter.Wait()
seq := requestCounter
requestCounter += 1
lastWrite = time.Now()
go func() {
defer requestsLimiter.Signal()
// this intentionally makes a shallow-copy of the struct so we
// can reassign Path (potentially concurrently)
url := requestURL
url.Path += "/" + strconv.FormatInt(seq, 10)
// reassign query to get different padding
url.RawQuery = transportConfiguration.GetNormalizedQuery()
err := httpClient.SendUploadRequest(
context.WithoutCancel(ctx),
url.String(),
&buf.MultiBufferContainer{MultiBuffer: chunk},
int64(chunk.Len()),
)
wroteRequest.Close()
if err != nil {
errors.LogInfoInner(ctx, err, "failed to send upload")
uploadPipeReader.Interrupt()
}
}()
if scMinPostsIntervalMs.From > 0 {
roll := time.Duration(scMinPostsIntervalMs.roll()) * time.Millisecond
if time.Since(lastWrite) < roll {
time.Sleep(roll)
}
lastWrite = time.Now()
if _, ok := httpClient.(*DefaultDialerClient); ok {
<-wroteRequest.Wait()
}
}
}()