mirror of
https://github.com/XTLS/Xray-core.git
synced 2025-01-22 17:44:06 +00:00
017f53b5fc
* Add session context outbounds as slice slice is needed for dialer proxy where two outbounds work on top of each other There are two sets of target addr for example It also enable Xtls to correctly do splice copy by checking both outbounds are ready to do direct copy * Fill outbound tag info * Splice now checks capalibility from all outbounds * Fix unit tests
270 lines
5.5 KiB
Go
270 lines
5.5 KiB
Go
package reverse
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/xtls/xray-core/common"
|
|
"github.com/xtls/xray-core/common/buf"
|
|
"github.com/xtls/xray-core/common/mux"
|
|
"github.com/xtls/xray-core/common/net"
|
|
"github.com/xtls/xray-core/common/session"
|
|
"github.com/xtls/xray-core/common/task"
|
|
"github.com/xtls/xray-core/features/outbound"
|
|
"github.com/xtls/xray-core/transport"
|
|
"github.com/xtls/xray-core/transport/pipe"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
type Portal struct {
|
|
ohm outbound.Manager
|
|
tag string
|
|
domain string
|
|
picker *StaticMuxPicker
|
|
client *mux.ClientManager
|
|
}
|
|
|
|
func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) {
|
|
if config.Tag == "" {
|
|
return nil, newError("portal tag is empty")
|
|
}
|
|
|
|
if config.Domain == "" {
|
|
return nil, newError("portal domain is empty")
|
|
}
|
|
|
|
picker, err := NewStaticMuxPicker()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Portal{
|
|
ohm: ohm,
|
|
tag: config.Tag,
|
|
domain: config.Domain,
|
|
picker: picker,
|
|
client: &mux.ClientManager{
|
|
Picker: picker,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (p *Portal) Start() error {
|
|
return p.ohm.AddHandler(context.Background(), &Outbound{
|
|
portal: p,
|
|
tag: p.tag,
|
|
})
|
|
}
|
|
|
|
func (p *Portal) Close() error {
|
|
return p.ohm.RemoveHandler(context.Background(), p.tag)
|
|
}
|
|
|
|
func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) error {
|
|
outbounds := session.OutboundsFromContext(ctx)
|
|
ob := outbounds[len(outbounds) - 1]
|
|
if ob == nil {
|
|
return newError("outbound metadata not found").AtError()
|
|
}
|
|
|
|
if isDomain(ob.Target, p.domain) {
|
|
muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
|
|
if err != nil {
|
|
return newError("failed to create mux client worker").Base(err).AtWarning()
|
|
}
|
|
|
|
worker, err := NewPortalWorker(muxClient)
|
|
if err != nil {
|
|
return newError("failed to create portal worker").Base(err)
|
|
}
|
|
|
|
p.picker.AddWorker(worker)
|
|
return nil
|
|
}
|
|
|
|
return p.client.Dispatch(ctx, link)
|
|
}
|
|
|
|
type Outbound struct {
|
|
portal *Portal
|
|
tag string
|
|
}
|
|
|
|
func (o *Outbound) Tag() string {
|
|
return o.tag
|
|
}
|
|
|
|
func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
|
|
if err := o.portal.HandleConnection(ctx, link); err != nil {
|
|
newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
|
|
common.Interrupt(link.Writer)
|
|
}
|
|
}
|
|
|
|
func (o *Outbound) Start() error {
|
|
return nil
|
|
}
|
|
|
|
func (o *Outbound) Close() error {
|
|
return nil
|
|
}
|
|
|
|
type StaticMuxPicker struct {
|
|
access sync.Mutex
|
|
workers []*PortalWorker
|
|
cTask *task.Periodic
|
|
}
|
|
|
|
func NewStaticMuxPicker() (*StaticMuxPicker, error) {
|
|
p := &StaticMuxPicker{}
|
|
p.cTask = &task.Periodic{
|
|
Execute: p.cleanup,
|
|
Interval: time.Second * 30,
|
|
}
|
|
p.cTask.Start()
|
|
return p, nil
|
|
}
|
|
|
|
func (p *StaticMuxPicker) cleanup() error {
|
|
p.access.Lock()
|
|
defer p.access.Unlock()
|
|
|
|
var activeWorkers []*PortalWorker
|
|
for _, w := range p.workers {
|
|
if !w.Closed() {
|
|
activeWorkers = append(activeWorkers, w)
|
|
}
|
|
}
|
|
|
|
if len(activeWorkers) != len(p.workers) {
|
|
p.workers = activeWorkers
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
|
|
p.access.Lock()
|
|
defer p.access.Unlock()
|
|
|
|
if len(p.workers) == 0 {
|
|
return nil, newError("empty worker list")
|
|
}
|
|
|
|
var minIdx int = -1
|
|
var minConn uint32 = 9999
|
|
for i, w := range p.workers {
|
|
if w.draining {
|
|
continue
|
|
}
|
|
if w.client.Closed() {
|
|
continue
|
|
}
|
|
if w.client.ActiveConnections() < minConn {
|
|
minConn = w.client.ActiveConnections()
|
|
minIdx = i
|
|
}
|
|
}
|
|
|
|
if minIdx == -1 {
|
|
for i, w := range p.workers {
|
|
if w.IsFull() {
|
|
continue
|
|
}
|
|
if w.client.ActiveConnections() < minConn {
|
|
minConn = w.client.ActiveConnections()
|
|
minIdx = i
|
|
}
|
|
}
|
|
}
|
|
|
|
if minIdx != -1 {
|
|
return p.workers[minIdx].client, nil
|
|
}
|
|
|
|
return nil, newError("no mux client worker available")
|
|
}
|
|
|
|
func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) {
|
|
p.access.Lock()
|
|
defer p.access.Unlock()
|
|
|
|
p.workers = append(p.workers, worker)
|
|
}
|
|
|
|
type PortalWorker struct {
|
|
client *mux.ClientWorker
|
|
control *task.Periodic
|
|
writer buf.Writer
|
|
reader buf.Reader
|
|
draining bool
|
|
}
|
|
|
|
func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
|
|
opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
|
|
uplinkReader, uplinkWriter := pipe.New(opt...)
|
|
downlinkReader, downlinkWriter := pipe.New(opt...)
|
|
|
|
ctx := context.Background()
|
|
outbounds := []*session.Outbound{{
|
|
Target: net.UDPDestination(net.DomainAddress(internalDomain), 0),
|
|
}}
|
|
ctx = session.ContextWithOutbounds(ctx, outbounds)
|
|
f := client.Dispatch(ctx, &transport.Link{
|
|
Reader: uplinkReader,
|
|
Writer: downlinkWriter,
|
|
})
|
|
if !f {
|
|
return nil, newError("unable to dispatch control connection")
|
|
}
|
|
w := &PortalWorker{
|
|
client: client,
|
|
reader: downlinkReader,
|
|
writer: uplinkWriter,
|
|
}
|
|
w.control = &task.Periodic{
|
|
Execute: w.heartbeat,
|
|
Interval: time.Second * 2,
|
|
}
|
|
w.control.Start()
|
|
return w, nil
|
|
}
|
|
|
|
func (w *PortalWorker) heartbeat() error {
|
|
if w.client.Closed() {
|
|
return newError("client worker stopped")
|
|
}
|
|
|
|
if w.draining || w.writer == nil {
|
|
return newError("already disposed")
|
|
}
|
|
|
|
msg := &Control{}
|
|
msg.FillInRandom()
|
|
|
|
if w.client.TotalConnections() > 256 {
|
|
w.draining = true
|
|
msg.State = Control_DRAIN
|
|
|
|
defer func() {
|
|
common.Close(w.writer)
|
|
common.Interrupt(w.reader)
|
|
w.writer = nil
|
|
}()
|
|
}
|
|
|
|
b, err := proto.Marshal(msg)
|
|
common.Must(err)
|
|
mb := buf.MergeBytes(nil, b)
|
|
return w.writer.WriteMultiBuffer(mb)
|
|
}
|
|
|
|
func (w *PortalWorker) IsFull() bool {
|
|
return w.client.IsFull()
|
|
}
|
|
|
|
func (w *PortalWorker) Closed() bool {
|
|
return w.client.Closed()
|
|
}
|