Xray-core/app/proxyman/outbound/handler.go
yuhan6665 079d0bd8a9
Refactor log (#3446)
* Refactor log

* Add new log methods

* Fix logger test

* Change all logging code

* Clean up pathObj

* Rebase to latest main

* Remove invoking method name after the dot
2024-06-29 14:32:57 -04:00

339 lines
9.9 KiB
Go

package outbound
import (
"context"
"crypto/rand"
goerrors "errors"
"io"
"math/big"
gonet "net"
"os"
"github.com/xtls/xray-core/app/proxyman"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/mux"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/net/cnc"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/outbound"
"github.com/xtls/xray-core/features/policy"
"github.com/xtls/xray-core/features/stats"
"github.com/xtls/xray-core/proxy"
"github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/stat"
"github.com/xtls/xray-core/transport/internet/tls"
"github.com/xtls/xray-core/transport/pipe"
)
func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter) {
var uplinkCounter stats.Counter
var downlinkCounter stats.Counter
policy := v.GetFeature(policy.ManagerType()).(policy.Manager)
if len(tag) > 0 && policy.ForSystem().Stats.OutboundUplink {
statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
name := "outbound>>>" + tag + ">>>traffic>>>uplink"
c, _ := stats.GetOrRegisterCounter(statsManager, name)
if c != nil {
uplinkCounter = c
}
}
if len(tag) > 0 && policy.ForSystem().Stats.OutboundDownlink {
statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
name := "outbound>>>" + tag + ">>>traffic>>>downlink"
c, _ := stats.GetOrRegisterCounter(statsManager, name)
if c != nil {
downlinkCounter = c
}
}
return uplinkCounter, downlinkCounter
}
// Handler is an implements of outbound.Handler.
type Handler struct {
tag string
senderSettings *proxyman.SenderConfig
streamSettings *internet.MemoryStreamConfig
proxy proxy.Outbound
outboundManager outbound.Manager
mux *mux.ClientManager
xudp *mux.ClientManager
udp443 string
uplinkCounter stats.Counter
downlinkCounter stats.Counter
}
// NewHandler creates a new Handler based on the given configuration.
func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbound.Handler, error) {
v := core.MustFromContext(ctx)
uplinkCounter, downlinkCounter := getStatCounter(v, config.Tag)
h := &Handler{
tag: config.Tag,
outboundManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager),
uplinkCounter: uplinkCounter,
downlinkCounter: downlinkCounter,
}
if config.SenderSettings != nil {
senderSettings, err := config.SenderSettings.GetInstance()
if err != nil {
return nil, err
}
switch s := senderSettings.(type) {
case *proxyman.SenderConfig:
h.senderSettings = s
mss, err := internet.ToMemoryStreamConfig(s.StreamSettings)
if err != nil {
return nil, errors.New("failed to parse stream settings").Base(err).AtWarning()
}
h.streamSettings = mss
default:
return nil, errors.New("settings is not SenderConfig")
}
}
proxyConfig, err := config.ProxySettings.GetInstance()
if err != nil {
return nil, err
}
rawProxyHandler, err := common.CreateObject(ctx, proxyConfig)
if err != nil {
return nil, err
}
proxyHandler, ok := rawProxyHandler.(proxy.Outbound)
if !ok {
return nil, errors.New("not an outbound handler")
}
if h.senderSettings != nil && h.senderSettings.MultiplexSettings != nil {
if config := h.senderSettings.MultiplexSettings; config.Enabled {
if config.Concurrency < 0 {
h.mux = &mux.ClientManager{Enabled: false}
}
if config.Concurrency == 0 {
config.Concurrency = 8 // same as before
}
if config.Concurrency > 0 {
h.mux = &mux.ClientManager{
Enabled: true,
Picker: &mux.IncrementalWorkerPicker{
Factory: &mux.DialingWorkerFactory{
Proxy: proxyHandler,
Dialer: h,
Strategy: mux.ClientStrategy{
MaxConcurrency: uint32(config.Concurrency),
MaxConnection: 128,
},
},
},
}
}
if config.XudpConcurrency < 0 {
h.xudp = &mux.ClientManager{Enabled: false}
}
if config.XudpConcurrency == 0 {
h.xudp = nil // same as before
}
if config.XudpConcurrency > 0 {
h.xudp = &mux.ClientManager{
Enabled: true,
Picker: &mux.IncrementalWorkerPicker{
Factory: &mux.DialingWorkerFactory{
Proxy: proxyHandler,
Dialer: h,
Strategy: mux.ClientStrategy{
MaxConcurrency: uint32(config.XudpConcurrency),
MaxConnection: 128,
},
},
},
}
}
h.udp443 = config.XudpProxyUDP443
}
}
h.proxy = proxyHandler
return h, nil
}
// Tag implements outbound.Handler.
func (h *Handler) Tag() string {
return h.tag
}
// Dispatch implements proxy.Outbound.Dispatch.
func (h *Handler) Dispatch(ctx context.Context, link *transport.Link) {
outbounds := session.OutboundsFromContext(ctx)
ob := outbounds[len(outbounds)-1]
if ob.Target.Network == net.Network_UDP && ob.OriginalTarget.Address != nil && ob.OriginalTarget.Address != ob.Target.Address {
link.Reader = &buf.EndpointOverrideReader{Reader: link.Reader, Dest: ob.Target.Address, OriginalDest: ob.OriginalTarget.Address}
link.Writer = &buf.EndpointOverrideWriter{Writer: link.Writer, Dest: ob.Target.Address, OriginalDest: ob.OriginalTarget.Address}
}
if h.mux != nil {
test := func(err error) {
if err != nil {
err := errors.New("failed to process mux outbound traffic").Base(err)
session.SubmitOutboundErrorToOriginator(ctx, err)
errors.LogInfo(ctx, err.Error())
common.Interrupt(link.Writer)
}
}
if ob.Target.Network == net.Network_UDP && ob.Target.Port == 443 {
switch h.udp443 {
case "reject":
test(errors.New("XUDP rejected UDP/443 traffic").AtInfo())
return
case "skip":
goto out
}
}
if h.xudp != nil && ob.Target.Network == net.Network_UDP {
if !h.xudp.Enabled {
goto out
}
test(h.xudp.Dispatch(ctx, link))
return
}
if h.mux.Enabled {
test(h.mux.Dispatch(ctx, link))
return
}
}
out:
err := h.proxy.Process(ctx, link, h)
if err != nil {
if goerrors.Is(err, io.EOF) || goerrors.Is(err, io.ErrClosedPipe) || goerrors.Is(err, context.Canceled) {
err = nil
}
}
if err != nil {
// Ensure outbound ray is properly closed.
err := errors.New("failed to process outbound traffic").Base(err)
session.SubmitOutboundErrorToOriginator(ctx, err)
errors.LogInfo(ctx, err.Error())
common.Interrupt(link.Writer)
} else {
common.Close(link.Writer)
}
common.Interrupt(link.Reader)
}
// Address implements internet.Dialer.
func (h *Handler) Address() net.Address {
if h.senderSettings == nil || h.senderSettings.Via == nil {
return nil
}
return h.senderSettings.Via.AsAddress()
}
func (h *Handler) DestIpAddress() net.IP {
return internet.DestIpAddress()
}
// Dial implements internet.Dialer.
func (h *Handler) Dial(ctx context.Context, dest net.Destination) (stat.Connection, error) {
if h.senderSettings != nil {
if h.senderSettings.ProxySettings.HasTag() {
tag := h.senderSettings.ProxySettings.Tag
handler := h.outboundManager.GetHandler(tag)
if handler != nil {
errors.LogDebug(ctx, "proxying to ", tag, " for dest ", dest)
outbounds := session.OutboundsFromContext(ctx)
ctx = session.ContextWithOutbounds(ctx, append(outbounds, &session.Outbound{
Target: dest,
Tag: tag,
})) // add another outbound in session ctx
opts := pipe.OptionsFromContext(ctx)
uplinkReader, uplinkWriter := pipe.New(opts...)
downlinkReader, downlinkWriter := pipe.New(opts...)
go handler.Dispatch(ctx, &transport.Link{Reader: uplinkReader, Writer: downlinkWriter})
conn := cnc.NewConnection(cnc.ConnectionInputMulti(uplinkWriter), cnc.ConnectionOutputMulti(downlinkReader))
if config := tls.ConfigFromStreamSettings(h.streamSettings); config != nil {
tlsConfig := config.GetTLSConfig(tls.WithDestination(dest))
conn = tls.Client(conn, tlsConfig)
}
return h.getStatCouterConnection(conn), nil
}
errors.LogWarning(ctx, "failed to get outbound handler with tag: ", tag)
}
if h.senderSettings.Via != nil {
outbounds := session.OutboundsFromContext(ctx)
ob := outbounds[len(outbounds)-1]
if h.senderSettings.ViaCidr == "" {
ob.Gateway = h.senderSettings.Via.AsAddress()
} else { //Get a random address.
ob.Gateway = ParseRandomIPv6(h.senderSettings.Via.AsAddress(), h.senderSettings.ViaCidr)
}
}
}
if conn, err := h.getUoTConnection(ctx, dest); err != os.ErrInvalid {
return conn, err
}
conn, err := internet.Dial(ctx, dest, h.streamSettings)
conn = h.getStatCouterConnection(conn)
outbounds := session.OutboundsFromContext(ctx)
ob := outbounds[len(outbounds)-1]
ob.Conn = conn
return conn, err
}
func (h *Handler) getStatCouterConnection(conn stat.Connection) stat.Connection {
if h.uplinkCounter != nil || h.downlinkCounter != nil {
return &stat.CounterConnection{
Connection: conn,
ReadCounter: h.downlinkCounter,
WriteCounter: h.uplinkCounter,
}
}
return conn
}
// GetOutbound implements proxy.GetOutbound.
func (h *Handler) GetOutbound() proxy.Outbound {
return h.proxy
}
// Start implements common.Runnable.
func (h *Handler) Start() error {
return nil
}
// Close implements common.Closable.
func (h *Handler) Close() error {
common.Close(h.mux)
return nil
}
func ParseRandomIPv6(address net.Address, prefix string) net.Address {
_, network, _ := gonet.ParseCIDR(address.IP().String() + "/" + prefix)
maskSize, totalBits := network.Mask.Size()
subnetSize := big.NewInt(1).Lsh(big.NewInt(1), uint(totalBits-maskSize))
// random
randomBigInt, _ := rand.Int(rand.Reader, subnetSize)
startIPBigInt := big.NewInt(0).SetBytes(network.IP.To16())
randomIPBigInt := big.NewInt(0).Add(startIPBigInt, randomBigInt)
randomIPBytes := randomIPBigInt.Bytes()
randomIPBytes = append(make([]byte, 16-len(randomIPBytes)), randomIPBytes...)
return net.ParseAddress(gonet.IP(randomIPBytes).String())
}