Enable splice for freedom outbound (downlink only)

- Add outbound name
- Add outbound conn in ctx
- Refactor splice: it can be turn on from all inbounds and outbounds
- Refactor splice: Add splice copy to vless inbound
- Fix http error test
- Add freedom splice toggle via env var
- Populate outbound obj in context
- Use CanSpliceCopy to mark a connection
- Turn off splice by default
This commit is contained in:
yuhan6665 2023-05-03 22:21:45 -04:00
parent ae2fa30e01
commit efd32b0fb2
32 changed files with 282 additions and 168 deletions

View file

@ -8,9 +8,7 @@ import (
"crypto/rand"
"io"
"math/big"
"runtime"
"strconv"
"syscall"
"time"
"github.com/xtls/xray-core/common/buf"
@ -20,10 +18,8 @@ import (
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/features/stats"
"github.com/xtls/xray-core/proxy"
"github.com/xtls/xray-core/proxy/vless"
"github.com/xtls/xray-core/transport/internet/reality"
"github.com/xtls/xray-core/transport/internet/stat"
"github.com/xtls/xray-core/transport/internet/tls"
)
const (
@ -206,13 +202,11 @@ func DecodeResponseHeader(reader io.Reader, request *protocol.RequestHeader) (*A
}
// XtlsRead filter and read xtls protocol
func XtlsRead(reader buf.Reader, writer buf.Writer, timer signal.ActivityUpdater, conn net.Conn, rawConn syscall.RawConn,
input *bytes.Reader, rawInput *bytes.Buffer,
counter stats.Counter, ctx context.Context, userUUID []byte, numberOfPacketToFilter *int, enableXtls *bool,
func XtlsRead(reader buf.Reader, writer buf.Writer, timer signal.ActivityUpdater, conn net.Conn, input *bytes.Reader, rawInput *bytes.Buffer,
ctx context.Context, userUUID []byte, numberOfPacketToFilter *int, enableXtls *bool,
isTLS12orAbove *bool, isTLS *bool, cipher *uint16, remainingServerHello *int32,
) error {
err := func() error {
var ct stats.Counter
withinPaddingBuffers := true
shouldSwitchToDirectCopy := false
var remainingContent int32 = -1
@ -220,40 +214,14 @@ func XtlsRead(reader buf.Reader, writer buf.Writer, timer signal.ActivityUpdater
currentCommand := 0
for {
if shouldSwitchToDirectCopy {
shouldSwitchToDirectCopy = false
if inbound := session.InboundFromContext(ctx); inbound != nil && inbound.Conn != nil && (runtime.GOOS == "linux" || runtime.GOOS == "android") {
if _, ok := inbound.User.Account.(*vless.MemoryAccount); inbound.User.Account == nil || ok {
iConn := inbound.Conn
statConn, ok := iConn.(*stat.CounterConnection)
if ok {
iConn = statConn.Connection
}
if tlsConn, ok := iConn.(*tls.Conn); ok {
iConn = tlsConn.NetConn()
} else if realityConn, ok := iConn.(*reality.Conn); ok {
iConn = realityConn.NetConn()
}
if tc, ok := iConn.(*net.TCPConn); ok {
newError("XtlsRead splice").WriteToLog(session.ExportIDToError(ctx))
runtime.Gosched() // necessary
w, err := tc.ReadFrom(conn)
if counter != nil {
counter.Add(w)
}
if statConn != nil && statConn.WriteCounter != nil {
statConn.WriteCounter.Add(w)
}
return err
}
var writerConn net.Conn
if inbound := session.InboundFromContext(ctx); inbound != nil && inbound.Conn != nil {
writerConn = inbound.Conn
if inbound.CanSpliceCopy == 2 {
inbound.CanSpliceCopy = 1 // force the value to 1, don't use setter
}
}
if rawConn != nil {
reader = buf.NewReadVReader(conn, rawConn, nil)
} else {
reader = buf.NewReader(conn)
}
ct = counter
newError("XtlsRead readV").WriteToLog(session.ExportIDToError(ctx))
return proxy.CopyRawConnIfExist(ctx, conn, writerConn, writer, timer)
}
buffer, err := reader.ReadMultiBuffer()
if !buffer.IsEmpty() {
@ -292,9 +260,6 @@ func XtlsRead(reader buf.Reader, writer buf.Writer, timer signal.ActivityUpdater
if *numberOfPacketToFilter > 0 {
XtlsFilterTls(buffer, numberOfPacketToFilter, enableXtls, isTLS12orAbove, isTLS, cipher, remainingServerHello, ctx)
}
if ct != nil {
ct.Add(int64(buffer.Len()))
}
timer.Update()
if werr := writer.WriteMultiBuffer(buffer); werr != nil {
return werr
@ -312,7 +277,7 @@ func XtlsRead(reader buf.Reader, writer buf.Writer, timer signal.ActivityUpdater
}
// XtlsWrite filter and write xtls protocol
func XtlsWrite(reader buf.Reader, writer buf.Writer, timer signal.ActivityUpdater, conn net.Conn, counter stats.Counter,
func XtlsWrite(reader buf.Reader, writer buf.Writer, timer signal.ActivityUpdater, conn net.Conn,
ctx context.Context, numberOfPacketToFilter *int, enableXtls *bool, isTLS12orAbove *bool, isTLS *bool,
cipher *uint16, remainingServerHello *int32,
) error {
@ -349,18 +314,21 @@ func XtlsWrite(reader buf.Reader, writer buf.Writer, timer signal.ActivityUpdate
}
if shouldSwitchToDirectCopy {
encryptBuffer, directBuffer := buf.SplitMulti(buffer, xtlsSpecIndex+1)
length := encryptBuffer.Len()
if !encryptBuffer.IsEmpty() {
timer.Update()
if werr := writer.WriteMultiBuffer(encryptBuffer); werr != nil {
return werr
}
}
buffer = directBuffer
writer = buf.NewWriter(conn)
ct = counter
newError("XtlsWrite writeV ", xtlsSpecIndex, " ", length, " ", buffer.Len()).WriteToLog(session.ExportIDToError(ctx))
time.Sleep(5 * time.Millisecond) // for some device, the first xtls direct packet fails without this delay
if inbound := session.InboundFromContext(ctx); inbound != nil && inbound.CanSpliceCopy == 2 {
inbound.CanSpliceCopy = 1 // force the value to 1, don't use setter
}
buffer = directBuffer
rawConn, _, writerCounter := proxy.UnwrapRawConn(conn)
writer = buf.NewWriter(rawConn)
ct = writerCounter
}
}
if !buffer.IsEmpty() {

View file

@ -10,11 +10,9 @@ import (
"reflect"
"strconv"
"strings"
"syscall"
"time"
"unsafe"
"github.com/pires/go-proxyproto"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
@ -30,7 +28,6 @@ import (
feature_inbound "github.com/xtls/xray-core/features/inbound"
"github.com/xtls/xray-core/features/policy"
"github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/features/stats"
"github.com/xtls/xray-core/proxy/vless"
"github.com/xtls/xray-core/proxy/vless/encoding"
"github.com/xtls/xray-core/transport/internet/reality"
@ -182,8 +179,7 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
sid := session.ExportIDToError(ctx)
iConn := connection
statConn, ok := iConn.(*stat.CounterConnection)
if ok {
if statConn, ok := iConn.(*stat.CounterConnection); ok {
iConn = statConn.Connection
}
@ -447,14 +443,12 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
// Flow: requestAddons.Flow,
}
var netConn net.Conn
var rawConn syscall.RawConn
var input *bytes.Reader
var rawInput *bytes.Buffer
switch requestAddons.Flow {
case vless.XRV:
if account.Flow == requestAddons.Flow {
inbound.SetCanSpliceCopy(2)
switch request.Command {
case protocol.RequestCommandUDP:
return newError(requestAddons.Flow + " doesn't support UDP").AtWarning()
@ -467,23 +461,14 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
if tlsConn.ConnectionState().Version != gotls.VersionTLS13 {
return newError(`failed to use `+requestAddons.Flow+`, found outer tls version `, tlsConn.ConnectionState().Version).AtWarning()
}
netConn = tlsConn.NetConn()
t = reflect.TypeOf(tlsConn.Conn).Elem()
p = uintptr(unsafe.Pointer(tlsConn.Conn))
} else if realityConn, ok := iConn.(*reality.Conn); ok {
netConn = realityConn.NetConn()
t = reflect.TypeOf(realityConn.Conn).Elem()
p = uintptr(unsafe.Pointer(realityConn.Conn))
} else {
return newError("XTLS only supports TLS and REALITY directly for now.").AtWarning()
}
if pc, ok := netConn.(*proxyproto.Conn); ok {
netConn = pc.Raw()
// 8192 > 4096, there is no need to process pc's bufReader
}
if sc, ok := netConn.(syscall.Conn); ok {
rawConn, _ = sc.SyscallConn()
}
i, _ := t.FieldByName("input")
r, _ := t.FieldByName("rawInput")
input = (*bytes.Reader)(unsafe.Pointer(p + i.Offset))
@ -493,6 +478,7 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
return newError(account.ID.String() + " is not able to use " + requestAddons.Flow).AtWarning()
}
case "":
inbound.SetCanSpliceCopy(3)
if account.Flow == vless.XRV && (request.Command == protocol.RequestCommandTCP || isMuxAndNotXUDP(request, first)) {
return newError(account.ID.String() + " is not able to use \"\". Note that the pure TLS proxy has certain TLS in TLS characters.").AtWarning()
}
@ -540,13 +526,8 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
var err error
if requestAddons.Flow == vless.XRV {
var counter stats.Counter
if statConn != nil {
counter = statConn.ReadCounter
}
// TODO enable splice
ctx = session.ContextWithInbound(ctx, nil)
err = encoding.XtlsRead(clientReader, serverWriter, timer, netConn, rawConn, input, rawInput, counter, ctx, account.ID.Bytes(),
ctx1 := session.ContextWithInbound(ctx, nil) // TODO enable splice
err = encoding.XtlsRead(clientReader, serverWriter, timer, connection, input, rawInput, ctx1, account.ID.Bytes(),
&numberOfPacketToFilter, &enableXtls, &isTLS12orAbove, &isTLS, &cipher, &remainingServerHello)
} else {
// from clientReader.ReadMultiBuffer to serverWriter.WriteMultiBufer
@ -592,11 +573,7 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
var err error
if requestAddons.Flow == vless.XRV {
var counter stats.Counter
if statConn != nil {
counter = statConn.WriteCounter
}
err = encoding.XtlsWrite(serverReader, clientWriter, timer, netConn, counter, ctx, &numberOfPacketToFilter,
err = encoding.XtlsWrite(serverReader, clientWriter, timer, connection, ctx, &numberOfPacketToFilter,
&enableXtls, &isTLS12orAbove, &isTLS, &cipher, &remainingServerHello)
} else {
// from serverReader.ReadMultiBuffer to clientWriter.WriteMultiBufer

View file

@ -7,7 +7,6 @@ import (
"context"
gotls "crypto/tls"
"reflect"
"syscall"
"time"
"unsafe"
@ -23,7 +22,6 @@ import (
"github.com/xtls/xray-core/common/xudp"
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/policy"
"github.com/xtls/xray-core/features/stats"
"github.com/xtls/xray-core/proxy/vless"
"github.com/xtls/xray-core/proxy/vless/encoding"
"github.com/xtls/xray-core/transport"
@ -71,9 +69,15 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
// Process implements proxy.Outbound.Process().
func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified").AtError()
}
outbound.Name = "vless"
inbound := session.InboundFromContext(ctx)
var rec *protocol.ServerSpec
var conn stat.Connection
if err := retry.ExponentialBackoff(5, 200).On(func() error {
rec = h.serverPicker.PickServer()
var err error
@ -88,16 +92,9 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
defer conn.Close()
iConn := conn
statConn, ok := iConn.(*stat.CounterConnection)
if ok {
if statConn, ok := iConn.(*stat.CounterConnection); ok {
iConn = statConn.Connection
}
outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified").AtError()
}
target := outbound.Target
newError("tunneling request to ", target, " via ", rec.Destination().NetAddr()).AtInfo().WriteToLog(session.ExportIDToError(ctx))
@ -123,8 +120,6 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
Flow: account.Flow,
}
var netConn net.Conn
var rawConn syscall.RawConn
var input *bytes.Reader
var rawInput *bytes.Buffer
allowUDP443 := false
@ -134,6 +129,9 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
requestAddons.Flow = requestAddons.Flow[:16]
fallthrough
case vless.XRV:
if inbound != nil {
inbound.SetCanSpliceCopy(2)
}
switch request.Command {
case protocol.RequestCommandUDP:
if !allowUDP443 && request.Port == 443 {
@ -146,28 +144,26 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
var t reflect.Type
var p uintptr
if tlsConn, ok := iConn.(*tls.Conn); ok {
netConn = tlsConn.NetConn()
t = reflect.TypeOf(tlsConn.Conn).Elem()
p = uintptr(unsafe.Pointer(tlsConn.Conn))
} else if utlsConn, ok := iConn.(*tls.UConn); ok {
netConn = utlsConn.NetConn()
t = reflect.TypeOf(utlsConn.Conn).Elem()
p = uintptr(unsafe.Pointer(utlsConn.Conn))
} else if realityConn, ok := iConn.(*reality.UConn); ok {
netConn = realityConn.NetConn()
t = reflect.TypeOf(realityConn.Conn).Elem()
p = uintptr(unsafe.Pointer(realityConn.Conn))
} else {
return newError("XTLS only supports TLS and REALITY directly for now.").AtWarning()
}
if sc, ok := netConn.(syscall.Conn); ok {
rawConn, _ = sc.SyscallConn()
}
i, _ := t.FieldByName("input")
r, _ := t.FieldByName("rawInput")
input = (*bytes.Reader)(unsafe.Pointer(p + i.Offset))
rawInput = (*bytes.Buffer)(unsafe.Pointer(p + r.Offset))
}
default:
if inbound != nil {
inbound.SetCanSpliceCopy(3)
}
}
var newCtx context.Context
@ -257,11 +253,8 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
return newError(`failed to use `+requestAddons.Flow+`, found outer tls version `, utlsConn.ConnectionState().Version).AtWarning()
}
}
var counter stats.Counter
if statConn != nil {
counter = statConn.WriteCounter
}
err = encoding.XtlsWrite(clientReader, serverWriter, timer, netConn, counter, ctx, &numberOfPacketToFilter,
ctx1 := session.ContextWithOutbound(ctx, nil) // TODO enable splice
err = encoding.XtlsWrite(clientReader, serverWriter, timer, conn, ctx1, &numberOfPacketToFilter,
&enableXtls, &isTLS12orAbove, &isTLS, &cipher, &remainingServerHello)
} else {
// from clientReader.ReadMultiBuffer to serverWriter.WriteMultiBufer
@ -293,11 +286,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
}
if requestAddons.Flow == vless.XRV {
var counter stats.Counter
if statConn != nil {
counter = statConn.ReadCounter
}
err = encoding.XtlsRead(serverReader, clientWriter, timer, netConn, rawConn, input, rawInput, counter, ctx, account.ID.Bytes(),
err = encoding.XtlsRead(serverReader, clientWriter, timer, conn, input, rawInput, ctx, account.ID.Bytes(),
&numberOfPacketToFilter, &enableXtls, &isTLS12orAbove, &isTLS, &cipher, &remainingServerHello)
} else {
// from serverReader.ReadMultiBuffer to clientWriter.WriteMultiBufer