mirror of
https://github.com/XTLS/Xray-core.git
synced 2024-12-23 14:09:49 +00:00
017f53b5fc
* Add session context outbounds as slice slice is needed for dialer proxy where two outbounds work on top of each other There are two sets of target addr for example It also enable Xtls to correctly do splice copy by checking both outbounds are ready to do direct copy * Fill outbound tag info * Splice now checks capalibility from all outbounds * Fix unit tests
175 lines
4.9 KiB
Go
175 lines
4.9 KiB
Go
package trojan
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/xtls/xray-core/common"
|
|
"github.com/xtls/xray-core/common/buf"
|
|
"github.com/xtls/xray-core/common/errors"
|
|
"github.com/xtls/xray-core/common/net"
|
|
"github.com/xtls/xray-core/common/protocol"
|
|
"github.com/xtls/xray-core/common/retry"
|
|
"github.com/xtls/xray-core/common/session"
|
|
"github.com/xtls/xray-core/common/signal"
|
|
"github.com/xtls/xray-core/common/task"
|
|
core "github.com/xtls/xray-core/core"
|
|
"github.com/xtls/xray-core/features/policy"
|
|
"github.com/xtls/xray-core/transport"
|
|
"github.com/xtls/xray-core/transport/internet"
|
|
"github.com/xtls/xray-core/transport/internet/stat"
|
|
)
|
|
|
|
// Client is a inbound handler for trojan protocol
|
|
type Client struct {
|
|
serverPicker protocol.ServerPicker
|
|
policyManager policy.Manager
|
|
}
|
|
|
|
// NewClient create a new trojan client.
|
|
func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
|
|
serverList := protocol.NewServerList()
|
|
for _, rec := range config.Server {
|
|
s, err := protocol.NewServerSpecFromPB(rec)
|
|
if err != nil {
|
|
return nil, newError("failed to parse server spec").Base(err)
|
|
}
|
|
serverList.AddServer(s)
|
|
}
|
|
if serverList.Size() == 0 {
|
|
return nil, newError("0 server")
|
|
}
|
|
|
|
v := core.MustFromContext(ctx)
|
|
client := &Client{
|
|
serverPicker: protocol.NewRoundRobinServerPicker(serverList),
|
|
policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager),
|
|
}
|
|
return client, nil
|
|
}
|
|
|
|
// Process implements OutboundHandler.Process().
|
|
func (c *Client) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
|
|
outbounds := session.OutboundsFromContext(ctx)
|
|
ob := outbounds[len(outbounds) - 1]
|
|
if !ob.Target.IsValid() {
|
|
return newError("target not specified")
|
|
}
|
|
ob.Name = "trojan"
|
|
ob.CanSpliceCopy = 3
|
|
destination := ob.Target
|
|
network := destination.Network
|
|
|
|
var server *protocol.ServerSpec
|
|
var conn stat.Connection
|
|
|
|
err := retry.ExponentialBackoff(5, 100).On(func() error {
|
|
server = c.serverPicker.PickServer()
|
|
rawConn, err := dialer.Dial(ctx, server.Destination())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
conn = rawConn
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return newError("failed to find an available destination").AtWarning().Base(err)
|
|
}
|
|
newError("tunneling request to ", destination, " via ", server.Destination().NetAddr()).WriteToLog(session.ExportIDToError(ctx))
|
|
|
|
defer conn.Close()
|
|
|
|
user := server.PickUser()
|
|
account, ok := user.Account.(*MemoryAccount)
|
|
if !ok {
|
|
return newError("user account is not valid")
|
|
}
|
|
|
|
var newCtx context.Context
|
|
var newCancel context.CancelFunc
|
|
if session.TimeoutOnlyFromContext(ctx) {
|
|
newCtx, newCancel = context.WithCancel(context.Background())
|
|
}
|
|
|
|
sessionPolicy := c.policyManager.ForLevel(user.Level)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
timer := signal.CancelAfterInactivity(ctx, func() {
|
|
cancel()
|
|
if newCancel != nil {
|
|
newCancel()
|
|
}
|
|
}, sessionPolicy.Timeouts.ConnectionIdle)
|
|
|
|
postRequest := func() error {
|
|
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
|
|
|
|
bufferWriter := buf.NewBufferedWriter(buf.NewWriter(conn))
|
|
|
|
connWriter := &ConnWriter{
|
|
Writer: bufferWriter,
|
|
Target: destination,
|
|
Account: account,
|
|
}
|
|
|
|
var bodyWriter buf.Writer
|
|
if destination.Network == net.Network_UDP {
|
|
bodyWriter = &PacketWriter{Writer: connWriter, Target: destination}
|
|
} else {
|
|
bodyWriter = connWriter
|
|
}
|
|
|
|
// write some request payload to buffer
|
|
if err = buf.CopyOnceTimeout(link.Reader, bodyWriter, time.Millisecond*100); err != nil && err != buf.ErrNotTimeoutReader && err != buf.ErrReadTimeout {
|
|
return newError("failed to write A request payload").Base(err).AtWarning()
|
|
}
|
|
|
|
// Flush; bufferWriter.WriteMultiBufer now is bufferWriter.writer.WriteMultiBuffer
|
|
if err = bufferWriter.SetBuffered(false); err != nil {
|
|
return newError("failed to flush payload").Base(err).AtWarning()
|
|
}
|
|
|
|
// Send header if not sent yet
|
|
if _, err = connWriter.Write([]byte{}); err != nil {
|
|
return err.(*errors.Error).AtWarning()
|
|
}
|
|
|
|
if err = buf.Copy(link.Reader, bodyWriter, buf.UpdateActivity(timer)); err != nil {
|
|
return newError("failed to transfer request payload").Base(err).AtInfo()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
getResponse := func() error {
|
|
defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
|
|
|
|
var reader buf.Reader
|
|
if network == net.Network_UDP {
|
|
reader = &PacketReader{
|
|
Reader: conn,
|
|
}
|
|
} else {
|
|
reader = buf.NewReader(conn)
|
|
}
|
|
return buf.Copy(reader, link.Writer, buf.UpdateActivity(timer))
|
|
}
|
|
|
|
if newCtx != nil {
|
|
ctx = newCtx
|
|
}
|
|
|
|
responseDoneAndCloseWriter := task.OnSuccess(getResponse, task.Close(link.Writer))
|
|
if err := task.Run(ctx, postRequest, responseDoneAndCloseWriter); err != nil {
|
|
return newError("connection ends").Base(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func init() {
|
|
common.Must(common.RegisterConfig((*ClientConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
|
|
return NewClient(ctx, config.(*ClientConfig))
|
|
}))
|
|
}
|