Add SplitHTTP Browser Dialer support (#3484)

This commit is contained in:
mmmray 2024-07-11 09:56:20 +02:00 committed by GitHub
parent 308f0c64c3
commit c8f6ba9ff0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 536 additions and 294 deletions

View file

@ -0,0 +1,39 @@
package splithttp
import (
"context"
"io"
"io/ioutil"
gonet "net"
"github.com/xtls/xray-core/transport/internet/browser_dialer"
"github.com/xtls/xray-core/transport/internet/websocket"
)
// implements splithttp.DialerClient in terms of browser dialer
// has no fields because everything is global state :O)
type BrowserDialerClient struct{}
func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
conn, err := browser_dialer.DialGet(baseURL)
dummyAddr := &gonet.IPAddr{}
if err != nil {
return nil, dummyAddr, dummyAddr, err
}
return websocket.NewConnection(conn, dummyAddr, nil), conn.RemoteAddr(), conn.LocalAddr(), nil
}
func (c *BrowserDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error {
bytes, err := ioutil.ReadAll(payload)
if err != nil {
return err
}
err = browser_dialer.DialPost(url, bytes)
if err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,169 @@
package splithttp
import (
"bytes"
"context"
"io"
gonet "net"
"net/http"
"net/http/httptrace"
"sync"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/signal/done"
)
// interface to abstract between use of browser dialer, vs net/http
type DialerClient interface {
// (ctx, baseURL, payload) -> err
// baseURL already contains sessionId and seq
SendUploadRequest(context.Context, string, io.ReadWriteCloser, int64) error
// (ctx, baseURL) -> (downloadReader, remoteAddr, localAddr)
// baseURL already contains sessionId
OpenDownload(context.Context, string) (io.ReadCloser, net.Addr, net.Addr, error)
}
// implements splithttp.DialerClient in terms of direct network connections
type DefaultDialerClient struct {
transportConfig *Config
download *http.Client
upload *http.Client
isH2 bool
// pool of net.Conn, created using dialUploadConn
uploadRawPool *sync.Pool
dialUploadConn func(ctxInner context.Context) (net.Conn, error)
}
func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
var remoteAddr gonet.Addr
var localAddr gonet.Addr
// this is done when the TCP/UDP connection to the server was established,
// and we can unblock the Dial function and print correct net addresses in
// logs
gotConn := done.New()
var downResponse io.ReadCloser
gotDownResponse := done.New()
go func() {
trace := &httptrace.ClientTrace{
GotConn: func(connInfo httptrace.GotConnInfo) {
remoteAddr = connInfo.Conn.RemoteAddr()
localAddr = connInfo.Conn.LocalAddr()
gotConn.Close()
},
}
// in case we hit an error, we want to unblock this part
defer gotConn.Close()
req, err := http.NewRequestWithContext(
httptrace.WithClientTrace(ctx, trace),
"GET",
baseURL,
nil,
)
if err != nil {
errors.LogInfoInner(ctx, err, "failed to construct download http request")
gotDownResponse.Close()
return
}
req.Header = c.transportConfig.GetRequestHeader()
response, err := c.download.Do(req)
gotConn.Close()
if err != nil {
errors.LogInfoInner(ctx, err, "failed to send download http request")
gotDownResponse.Close()
return
}
if response.StatusCode != 200 {
response.Body.Close()
errors.LogInfo(ctx, "invalid status code on download:", response.Status)
gotDownResponse.Close()
return
}
downResponse = response.Body
gotDownResponse.Close()
}()
// we want to block Dial until we know the remote address of the server,
// for logging purposes
<-gotConn.Wait()
lazyDownload := &LazyReader{
CreateReader: func() (io.ReadCloser, error) {
<-gotDownResponse.Wait()
if downResponse == nil {
return nil, errors.New("downResponse failed")
}
return downResponse, nil
},
}
return lazyDownload, remoteAddr, localAddr, nil
}
func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error {
req, err := http.NewRequest("POST", url, payload)
req.ContentLength = contentLength
if err != nil {
return err
}
req.Header = c.transportConfig.GetRequestHeader()
if c.isH2 {
resp, err := c.upload.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return errors.New("bad status code:", resp.Status)
}
} else {
// stringify the entire HTTP/1.1 request so it can be
// safely retried. if instead req.Write is called multiple
// times, the body is already drained after the first
// request
requestBytes := new(bytes.Buffer)
common.Must(req.Write(requestBytes))
var uploadConn any
for {
uploadConn = c.uploadRawPool.Get()
newConnection := uploadConn == nil
if newConnection {
uploadConn, err = c.dialUploadConn(context.WithoutCancel(ctx))
if err != nil {
return err
}
}
_, err = uploadConn.(net.Conn).Write(requestBytes.Bytes())
// if the write failed, we try another connection from
// the pool, until the write on a new connection fails.
// failed writes to a pooled connection are normal when
// the connection has been closed in the meantime.
if err == nil {
break
} else if newConnection {
return err
}
}
c.uploadRawPool.Put(uploadConn)
}
return nil
}

View file

@ -1,13 +1,10 @@
package splithttp
import (
"bytes"
"context"
gotls "crypto/tls"
"io"
gonet "net"
"net/http"
"net/http/httptrace"
"net/url"
"strconv"
"sync"
@ -17,10 +14,10 @@ import (
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/common/signal/semaphore"
"github.com/xtls/xray-core/common/uuid"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/browser_dialer"
"github.com/xtls/xray-core/transport/internet/stat"
"github.com/xtls/xray-core/transport/internet/tls"
"github.com/xtls/xray-core/transport/pipe"
@ -32,32 +29,31 @@ type dialerConf struct {
*internet.MemoryStreamConfig
}
type reusedClient struct {
download *http.Client
upload *http.Client
isH2 bool
// pool of net.Conn, created using dialUploadConn
uploadRawPool *sync.Pool
dialUploadConn func(ctxInner context.Context) (net.Conn, error)
}
var (
globalDialerMap map[dialerConf]reusedClient
globalDialerMap map[dialerConf]DialerClient
globalDialerAccess sync.Mutex
)
func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) reusedClient {
func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) DialerClient {
if browser_dialer.HasBrowserDialer() {
return &BrowserDialerClient{}
}
globalDialerAccess.Lock()
defer globalDialerAccess.Unlock()
if globalDialerMap == nil {
globalDialerMap = make(map[dialerConf]reusedClient)
globalDialerMap = make(map[dialerConf]DialerClient)
}
if client, found := globalDialerMap[dialerConf{dest, streamSettings}]; found {
return client
}
if browser_dialer.HasBrowserDialer() {
return &BrowserDialerClient{}
}
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
isH2 := tlsConfig != nil && !(len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "http/1.1")
@ -116,7 +112,8 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in
uploadTransport = nil
}
client := reusedClient{
client := &DefaultDialerClient{
transportConfig: streamSettings.ProtocolSettings.(*Config),
download: &http.Client{
Transport: downloadTransport,
},
@ -160,80 +157,9 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
httpClient := getHTTPClient(ctx, dest, streamSettings)
var remoteAddr gonet.Addr
var localAddr gonet.Addr
// this is done when the TCP/UDP connection to the server was established,
// and we can unblock the Dial function and print correct net addresses in
// logs
gotConn := done.New()
var downResponse io.ReadCloser
gotDownResponse := done.New()
sessionIdUuid := uuid.New()
sessionId := sessionIdUuid.String()
go func() {
trace := &httptrace.ClientTrace{
GotConn: func(connInfo httptrace.GotConnInfo) {
remoteAddr = connInfo.Conn.RemoteAddr()
localAddr = connInfo.Conn.LocalAddr()
gotConn.Close()
},
}
// in case we hit an error, we want to unblock this part
defer gotConn.Close()
req, err := http.NewRequestWithContext(
httptrace.WithClientTrace(context.WithoutCancel(ctx), trace),
"GET",
requestURL.String()+sessionId,
nil,
)
if err != nil {
errors.LogInfoInner(ctx, err, "failed to construct download http request")
gotDownResponse.Close()
return
}
req.Header = transportConfiguration.GetRequestHeader()
response, err := httpClient.download.Do(req)
gotConn.Close()
if err != nil {
errors.LogInfoInner(ctx, err, "failed to send download http request")
gotDownResponse.Close()
return
}
if response.StatusCode != 200 {
response.Body.Close()
errors.LogInfo(ctx, "invalid status code on download:", response.Status)
gotDownResponse.Close()
return
}
// skip "ooooooooook" response
trashHeader := []byte{0}
for {
_, err = io.ReadFull(response.Body, trashHeader)
if err != nil {
response.Body.Close()
errors.LogInfoInner(ctx, err, "failed to read initial response")
gotDownResponse.Close()
return
}
if trashHeader[0] == 'k' {
break
}
}
downResponse = response.Body
gotDownResponse.Close()
}()
uploadUrl := requestURL.String() + sessionId + "/"
baseURL := requestURL.String() + sessionId
uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize))
@ -252,97 +178,55 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
<-requestsLimiter.Wait()
url := uploadUrl + strconv.FormatInt(requestCounter, 10)
seq := requestCounter
requestCounter += 1
go func() {
defer requestsLimiter.Signal()
req, err := http.NewRequest("POST", url, &buf.MultiBufferContainer{MultiBuffer: chunk})
err := httpClient.SendUploadRequest(
context.WithoutCancel(ctx),
baseURL+"/"+strconv.FormatInt(seq, 10),
&buf.MultiBufferContainer{MultiBuffer: chunk},
int64(chunk.Len()),
)
if err != nil {
errors.LogInfoInner(ctx, err, "failed to send upload")
uploadPipeReader.Interrupt()
return
}
req.ContentLength = int64(chunk.Len())
req.Header = transportConfiguration.GetRequestHeader()
if httpClient.isH2 {
resp, err := httpClient.upload.Do(req)
if err != nil {
errors.LogInfoInner(ctx, err, "failed to send upload")
uploadPipeReader.Interrupt()
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
errors.LogInfo(ctx, "failed to send upload, bad status code:", resp.Status)
uploadPipeReader.Interrupt()
return
}
} else {
var uploadConn any
// stringify the entire HTTP/1.1 request so it can be
// safely retried. if instead req.Write is called multiple
// times, the body is already drained after the first
// request
requestBytes := new(bytes.Buffer)
common.Must(req.Write(requestBytes))
for {
uploadConn = httpClient.uploadRawPool.Get()
newConnection := uploadConn == nil
if newConnection {
uploadConn, err = httpClient.dialUploadConn(context.WithoutCancel(ctx))
if err != nil {
errors.LogInfoInner(ctx, err, "failed to connect upload")
uploadPipeReader.Interrupt()
return
}
}
_, err = uploadConn.(net.Conn).Write(requestBytes.Bytes())
// if the write failed, we try another connection from
// the pool, until the write on a new connection fails.
// failed writes to a pooled connection are normal when
// the connection has been closed in the meantime.
if err == nil {
break
} else if newConnection {
errors.LogInfoInner(ctx, err, "failed to send upload")
uploadPipeReader.Interrupt()
return
}
}
httpClient.uploadRawPool.Put(uploadConn)
}
}()
}
}()
// we want to block Dial until we know the remote address of the server,
// for logging purposes
<-gotConn.Wait()
lazyRawDownload, remoteAddr, localAddr, err := httpClient.OpenDownload(context.WithoutCancel(ctx), baseURL)
if err != nil {
return nil, err
}
lazyDownload := &LazyReader{
CreateReader: func() (io.ReadCloser, error) {
// skip "ooooooooook" response
trashHeader := []byte{0}
for {
_, err := io.ReadFull(lazyRawDownload, trashHeader)
if err != nil {
return nil, errors.New("failed to read initial response").Base(err)
}
if trashHeader[0] == 'k' {
break
}
}
return lazyRawDownload, nil
},
}
// necessary in order to send larger chunks in upload
bufferedUploadPipeWriter := buf.NewBufferedWriter(uploadPipeWriter)
bufferedUploadPipeWriter.SetBuffered(false)
lazyDownload := &LazyReader{
CreateReader: func() (io.ReadCloser, error) {
<-gotDownResponse.Wait()
if downResponse == nil {
return nil, errors.New("downResponse failed")
}
return downResponse, nil
},
}
conn := splitConn{
writer: bufferedUploadPipeWriter,
reader: lazyDownload,

View file

@ -32,7 +32,7 @@ type requestHandler struct {
}
type httpSession struct {
uploadQueue *UploadQueue
uploadQueue *uploadQueue
// for as long as the GET request is not opened by the client, this will be
// open ("undone"), and the session may be expired within a certain TTL.
// after the client connects, this becomes "done" and the session lives as
@ -163,7 +163,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
writer.Header().Set("X-Accel-Buffering", "no")
// magic header to make the HTTP middle box consider this as SSE to disable buffer
writer.Header().Set("Content-Type", "text/event-stream")
writer.WriteHeader(http.StatusOK)
// send a chunk immediately to enable CDN streaming.
// many CDN buffer the response headers until the origin starts sending

View file

@ -15,7 +15,7 @@ type Packet struct {
Seq uint64
}
type UploadQueue struct {
type uploadQueue struct {
pushedPackets chan Packet
heap uploadHeap
nextSeq uint64
@ -23,8 +23,8 @@ type UploadQueue struct {
maxPackets int
}
func NewUploadQueue(maxPackets int) *UploadQueue {
return &UploadQueue{
func NewUploadQueue(maxPackets int) *uploadQueue {
return &uploadQueue{
pushedPackets: make(chan Packet, maxPackets),
heap: uploadHeap{},
nextSeq: 0,
@ -33,7 +33,7 @@ func NewUploadQueue(maxPackets int) *UploadQueue {
}
}
func (h *UploadQueue) Push(p Packet) error {
func (h *uploadQueue) Push(p Packet) error {
if h.closed {
return errors.New("splithttp packet queue closed")
}
@ -42,13 +42,13 @@ func (h *UploadQueue) Push(p Packet) error {
return nil
}
func (h *UploadQueue) Close() error {
func (h *uploadQueue) Close() error {
h.closed = true
close(h.pushedPackets)
return nil
}
func (h *UploadQueue) Read(b []byte) (int, error) {
func (h *uploadQueue) Read(b []byte) (int, error) {
if h.closed {
return 0, io.EOF
}