mirror of
synced 2025-03-14 02:20:44 +00:00
* Add session context outbounds as slice slice is needed for dialer proxy where two outbounds work on top of each other There are two sets of target addr for example It also enable Xtls to correctly do splice copy by checking both outbounds are ready to do direct copy * Fill outbound tag info * Splice now checks capalibility from all outbounds * Fix unit tests
379 lines
9.1 KiB
379 lines
9.1 KiB
package http
import (
type Client struct {
serverPicker protocol.ServerPicker
policyManager policy.Manager
header []*Header
type h2Conn struct {
rawConn net.Conn
h2Conn *http2.ClientConn
var (
cachedH2Mutex sync.Mutex
cachedH2Conns map[net.Destination]h2Conn
// NewClient create a new http client based on the given config.
func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
serverList := protocol.NewServerList()
for _, rec := range config.Server {
s, err := protocol.NewServerSpecFromPB(rec)
if err != nil {
return nil, newError("failed to get server spec").Base(err)
if serverList.Size() == 0 {
return nil, newError("0 target server")
v := core.MustFromContext(ctx)
return &Client{
serverPicker: protocol.NewRoundRobinServerPicker(serverList),
policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager),
header: config.Header,
}, nil
// Process implements proxy.Outbound.Process. We first create a socket tunnel via HTTP CONNECT method, then redirect all inbound traffic to that tunnel.
func (c *Client) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
outbounds := session.OutboundsFromContext(ctx)
ob := outbounds[len(outbounds) - 1]
if !ob.Target.IsValid() {
return newError("target not specified.")
ob.Name = "http"
ob.CanSpliceCopy = 2
target := ob.Target
targetAddr := target.NetAddr()
if target.Network == net.Network_UDP {
return newError("UDP is not supported by HTTP outbound")
var user *protocol.MemoryUser
var conn stat.Connection
mbuf, _ := link.Reader.ReadMultiBuffer()
len := mbuf.Len()
firstPayload := bytespool.Alloc(len)
mbuf, _ = buf.SplitBytes(mbuf, firstPayload)
firstPayload = firstPayload[:len]
defer bytespool.Free(firstPayload)
header, err := fillRequestHeader(ctx, c.header)
if err != nil {
return newError("failed to fill out header").Base(err)
if err := retry.ExponentialBackoff(5, 100).On(func() error {
server := c.serverPicker.PickServer()
dest := server.Destination()
user = server.PickUser()
netConn, err := setUpHTTPTunnel(ctx, dest, targetAddr, user, dialer, header, firstPayload)
if netConn != nil {
if _, ok := netConn.(*http2Conn); !ok {
if _, err := netConn.Write(firstPayload); err != nil {
return err
conn = stat.Connection(netConn)
return err
}); err != nil {
return newError("failed to find an available destination").Base(err)
defer func() {
if err := conn.Close(); err != nil {
newError("failed to closed connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
p := c.policyManager.ForLevel(0)
if user != nil {
p = c.policyManager.ForLevel(user.Level)
var newCtx context.Context
var newCancel context.CancelFunc
if session.TimeoutOnlyFromContext(ctx) {
newCtx, newCancel = context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, func() {
if newCancel != nil {
}, p.Timeouts.ConnectionIdle)
requestFunc := func() error {
defer timer.SetTimeout(p.Timeouts.DownlinkOnly)
return buf.Copy(link.Reader, buf.NewWriter(conn), buf.UpdateActivity(timer))
responseFunc := func() error {
defer timer.SetTimeout(p.Timeouts.UplinkOnly)
return buf.Copy(buf.NewReader(conn), link.Writer, buf.UpdateActivity(timer))
if newCtx != nil {
ctx = newCtx
responseDonePost := task.OnSuccess(responseFunc, task.Close(link.Writer))
if err := task.Run(ctx, requestFunc, responseDonePost); err != nil {
return newError("connection ends").Base(err)
return nil
// fillRequestHeader will fill out the template of the headers
func fillRequestHeader(ctx context.Context, header []*Header) ([]*Header, error) {
if len(header) == 0 {
return header, nil
inbound := session.InboundFromContext(ctx)
outbounds := session.OutboundsFromContext(ctx)
ob := outbounds[len(outbounds) - 1]
if inbound == nil || ob == nil {
return nil, newError("missing inbound or outbound metadata from context")
data := struct {
Source net.Destination
Target net.Destination
Source: inbound.Source,
Target: ob.Target,
filled := make([]*Header, len(header))
for i, h := range header {
tmpl, err := template.New(h.Key).Parse(h.Value)
if err != nil {
return nil, err
var buf bytes.Buffer
if err = tmpl.Execute(&buf, data); err != nil {
return nil, err
filled[i] = &Header{Key: h.Key, Value: buf.String()}
return filled, nil
// setUpHTTPTunnel will create a socket tunnel via HTTP CONNECT method
func setUpHTTPTunnel(ctx context.Context, dest net.Destination, target string, user *protocol.MemoryUser, dialer internet.Dialer, header []*Header, firstPayload []byte) (net.Conn, error) {
req := &http.Request{
Method: http.MethodConnect,
URL: &url.URL{Host: target},
Header: make(http.Header),
Host: target,
if user != nil && user.Account != nil {
account := user.Account.(*Account)
auth := account.GetUsername() + ":" + account.GetPassword()
req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(auth)))
for _, h := range header {
req.Header.Set(h.Key, h.Value)
connectHTTP1 := func(rawConn net.Conn) (net.Conn, error) {
req.Header.Set("Proxy-Connection", "Keep-Alive")
err := req.Write(rawConn)
if err != nil {
return nil, err
resp, err := http.ReadResponse(bufio.NewReader(rawConn), req)
if err != nil {
return nil, err
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, newError("Proxy responded with non 200 code: " + resp.Status)
return rawConn, nil
connectHTTP2 := func(rawConn net.Conn, h2clientConn *http2.ClientConn) (net.Conn, error) {
pr, pw := io.Pipe()
req.Body = pr
var pErr error
var wg sync.WaitGroup
go func() {
_, pErr = pw.Write(firstPayload)
resp, err := h2clientConn.RoundTrip(req)
if err != nil {
return nil, err
if pErr != nil {
return nil, pErr
if resp.StatusCode != http.StatusOK {
return nil, newError("Proxy responded with non 200 code: " + resp.Status)
return newHTTP2Conn(rawConn, pw, resp.Body), nil
cachedConn, cachedConnFound := cachedH2Conns[dest]
if cachedConnFound {
rc, cc := cachedConn.rawConn, cachedConn.h2Conn
if cc.CanTakeNewRequest() {
proxyConn, err := connectHTTP2(rc, cc)
if err != nil {
return nil, err
return proxyConn, nil
rawConn, err := dialer.Dial(ctx, dest)
if err != nil {
return nil, err
iConn := rawConn
if statConn, ok := iConn.(*stat.CounterConnection); ok {
iConn = statConn.Connection
nextProto := ""
if tlsConn, ok := iConn.(*tls.Conn); ok {
if err := tlsConn.HandshakeContext(ctx); err != nil {
return nil, err
nextProto = tlsConn.ConnectionState().NegotiatedProtocol
switch nextProto {
case "", "http/1.1":
return connectHTTP1(rawConn)
case "h2":
t := http2.Transport{}
h2clientConn, err := t.NewClientConn(rawConn)
if err != nil {
return nil, err
proxyConn, err := connectHTTP2(rawConn, h2clientConn)
if err != nil {
return nil, err
if cachedH2Conns == nil {
cachedH2Conns = make(map[net.Destination]h2Conn)
cachedH2Conns[dest] = h2Conn{
rawConn: rawConn,
h2Conn: h2clientConn,
return proxyConn, err
return nil, newError("negotiated unsupported application layer protocol: " + nextProto)
func newHTTP2Conn(c net.Conn, pipedReqBody *io.PipeWriter, respBody io.ReadCloser) net.Conn {
return &http2Conn{Conn: c, in: pipedReqBody, out: respBody}
type http2Conn struct {
in *io.PipeWriter
out io.ReadCloser
func (h *http2Conn) Read(p []byte) (n int, err error) {
return h.out.Read(p)
func (h *http2Conn) Write(p []byte) (n int, err error) {
return h.in.Write(p)
func (h *http2Conn) Close() error {
return h.out.Close()
func init() {
common.Must(common.RegisterConfig((*ClientConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
return NewClient(ctx, config.(*ClientConfig))