Replace "only" with "xudpConcurrency" in Mux config

This commit is contained in:
RPRX 2023-04-10 10:36:07 +08:00 committed by GitHub
parent 29d7865d78
commit 24a2be43ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 112 additions and 90 deletions

View file

@ -57,6 +57,7 @@ type Handler struct {
proxy proxy.Outbound
outboundManager outbound.Manager
mux *mux.ClientManager
xudp *mux.ClientManager
uplinkCounter stats.Counter
downlinkCounter stats.Counter
}
@ -106,23 +107,49 @@ func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbou
}
if h.senderSettings != nil && h.senderSettings.MultiplexSettings != nil {
config := h.senderSettings.MultiplexSettings
if config.Concurrency < 1 || config.Concurrency > 1024 {
return nil, newError("invalid mux concurrency: ", config.Concurrency).AtWarning()
}
h.mux = &mux.ClientManager{
Enabled: config.Enabled,
Picker: &mux.IncrementalWorkerPicker{
Factory: &mux.DialingWorkerFactory{
Proxy: proxyHandler,
Dialer: h,
Strategy: mux.ClientStrategy{
MaxConcurrency: config.Concurrency,
MaxConnection: 128,
if config := h.senderSettings.MultiplexSettings; config.Enabled {
if config.Concurrency < 0 {
h.mux = &mux.ClientManager{Enabled: false}
}
if config.Concurrency == 0 {
config.Concurrency = 8 // same as before
}
if config.Concurrency > 0 {
h.mux = &mux.ClientManager{
Enabled: true,
Picker: &mux.IncrementalWorkerPicker{
Factory: &mux.DialingWorkerFactory{
Proxy: proxyHandler,
Dialer: h,
Strategy: mux.ClientStrategy{
MaxConcurrency: uint32(config.Concurrency),
MaxConnection: 128,
},
},
},
},
},
Only: config.Only,
}
}
if config.XudpConcurrency < 0 {
h.xudp = &mux.ClientManager{Enabled: false}
}
if config.XudpConcurrency == 0 {
h.xudp = nil // same as before
}
if config.XudpConcurrency > 0 {
h.xudp = &mux.ClientManager{
Enabled: true,
Picker: &mux.IncrementalWorkerPicker{
Factory: &mux.DialingWorkerFactory{
Proxy: proxyHandler,
Dialer: h,
Strategy: mux.ClientStrategy{
MaxConcurrency: uint32(config.XudpConcurrency),
MaxConnection: 128,
},
},
},
}
}
}
}
@ -137,33 +164,44 @@ func (h *Handler) Tag() string {
// Dispatch implements proxy.Outbound.Dispatch.
func (h *Handler) Dispatch(ctx context.Context, link *transport.Link) {
outbound := session.OutboundFromContext(ctx)
if h.mux != nil && (h.mux.Enabled || session.MuxPreferedFromContext(ctx)) &&
(h.mux.Only == 0 || (outbound != nil && h.mux.Only == uint32(outbound.Target.Network))) {
if err := h.mux.Dispatch(ctx, link); err != nil {
err := newError("failed to process mux outbound traffic").Base(err)
session.SubmitOutboundErrorToOriginator(ctx, err)
err.WriteToLog(session.ExportIDToError(ctx))
common.Interrupt(link.Writer)
}
} else {
err := h.proxy.Process(ctx, link, h)
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) || errors.Is(err, context.Canceled) {
err = nil
if h.mux != nil {
test := func(err error) {
if err != nil {
err := newError("failed to process mux outbound traffic").Base(err)
session.SubmitOutboundErrorToOriginator(ctx, err)
err.WriteToLog(session.ExportIDToError(ctx))
common.Interrupt(link.Writer)
}
}
if err != nil {
// Ensure outbound ray is properly closed.
err := newError("failed to process outbound traffic").Base(err)
session.SubmitOutboundErrorToOriginator(ctx, err)
err.WriteToLog(session.ExportIDToError(ctx))
common.Interrupt(link.Writer)
} else {
common.Must(common.Close(link.Writer))
if h.xudp != nil && session.OutboundFromContext(ctx).Target.Network == net.Network_UDP {
if !h.xudp.Enabled {
goto out
}
test(h.xudp.Dispatch(ctx, link))
return
}
if h.mux.Enabled {
test(h.mux.Dispatch(ctx, link))
return
}
common.Interrupt(link.Reader)
}
out:
err := h.proxy.Process(ctx, link, h)
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) || errors.Is(err, context.Canceled) {
err = nil
}
}
if err != nil {
// Ensure outbound ray is properly closed.
err := newError("failed to process outbound traffic").Base(err)
session.SubmitOutboundErrorToOriginator(ctx, err)
err.WriteToLog(session.ExportIDToError(ctx))
common.Interrupt(link.Writer)
} else {
common.Must(common.Close(link.Writer))
}
common.Interrupt(link.Reader)
}
// Address implements internet.Dialer.