Least load balancer (#2999)

* v5: Health Check & LeastLoad Strategy (rebased from 2c5a71490368500a982018a74a6d519c7e121816)

Some changes will be necessary to integrate it into V2Ray

* Update proto

* parse duration conf with time.Parse()

* moving health ping to observatory as a standalone component

* moving health ping to observatory as a standalone component: auto generated file

* add initialization for health ping

* incorporate changes in router implementation

* support principle target output

* add v4 json support for BurstObservatory & fix balancer reference

* update API command

* remove cancelled API

* return zero length value when observer is not found

* remove duplicated targeted dispatch

* adjust test with updated structure

* bug fix for observer

* fix strategy selector

* fix strategy least load

* Fix ticker usage

ticker.Close does not close ticker.C

* feat: Replace default Health Ping URL to HTTPS (#1991)

* fix selectLeastLoad() returns wrong number of nodes (#2083)

* Test: fix leastload strategy unit test

* fix(router): panic caused by concurrent map read and write (#2678)

* Clean up code

---------

Co-authored-by: Jebbs <qjebbs@gmail.com>
Co-authored-by: Shelikhoo <xiaokangwang@outlook.com>
Co-authored-by: 世界 <i@sekai.icu>
Co-authored-by: Bernd Eichelberger <46166740+4-FLOSS-Free-Libre-Open-Source-Software@users.noreply.github.com>
Co-authored-by: 秋のかえで <autmaple@protonmail.com>
Co-authored-by: Rinka <kujourinka@gmail.com>
This commit is contained in:
yuhan6665 2024-02-17 22:51:37 -05:00 committed by GitHub
parent bf02392969
commit fa5d7a255b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
105 changed files with 3523 additions and 429 deletions

View file

@ -7,6 +7,7 @@ import (
loggerservice "github.com/xtls/xray-core/app/log/command"
observatoryservice "github.com/xtls/xray-core/app/observatory/command"
handlerservice "github.com/xtls/xray-core/app/proxyman/command"
routerservice "github.com/xtls/xray-core/app/router/command"
statsservice "github.com/xtls/xray-core/app/stats/command"
"github.com/xtls/xray-core/common/serial"
)
@ -34,6 +35,8 @@ func (c *APIConfig) Build() (*commander.Config, error) {
services = append(services, serial.ToTypedMessage(&statsservice.Config{}))
case "observatoryservice":
services = append(services, serial.ToTypedMessage(&observatoryservice.Config{}))
case "routingservice":
services = append(services, serial.ToTypedMessage(&routerservice.Config{}))
}
}

View file

@ -1,9 +1,11 @@
package conf
import (
"github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/infra/conf/cfgcommon/duration"
"google.golang.org/protobuf/proto"
"github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/app/observatory/burst"
"github.com/xtls/xray-core/infra/conf/cfgcommon/duration"
)
type ObservatoryConfig struct {
@ -16,3 +18,17 @@ type ObservatoryConfig struct {
func (o *ObservatoryConfig) Build() (proto.Message, error) {
return &observatory.Config{SubjectSelector: o.SubjectSelector, ProbeUrl: o.ProbeURL, ProbeInterval: int64(o.ProbeInterval), EnableConcurrency: o.EnableConcurrency}, nil
}
type BurstObservatoryConfig struct {
SubjectSelector []string `json:"subjectSelector"`
// health check settings
HealthCheck *healthCheckSettings `json:"pingConfig,omitempty"`
}
func (b BurstObservatoryConfig) Build() (proto.Message, error) {
if result, err := b.HealthCheck.Build(); err == nil {
return &burst.Config{SubjectSelector: b.SubjectSelector, PingConfig: result.(*burst.HealthPingConfig)}, nil
} else {
return nil, err
}
}

View file

@ -8,6 +8,7 @@ import (
"github.com/xtls/xray-core/app/router"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/serial"
"github.com/xtls/xray-core/common/platform/filesystem"
"google.golang.org/protobuf/proto"
)
@ -24,11 +25,13 @@ type StrategyConfig struct {
}
type BalancingRule struct {
Tag string `json:"tag"`
Selectors StringList `json:"selector"`
Strategy StrategyConfig `json:"strategy"`
Tag string `json:"tag"`
Selectors StringList `json:"selector"`
Strategy StrategyConfig `json:"strategy"`
FallbackTag string `json:"fallbackTag"`
}
// Build builds the balancing rule
func (r *BalancingRule) Build() (*router.BalancingRule, error) {
if r.Tag == "" {
return nil, newError("empty balancer tag")
@ -37,22 +40,37 @@ func (r *BalancingRule) Build() (*router.BalancingRule, error) {
return nil, newError("empty selector list")
}
var strategy string
switch strings.ToLower(r.Strategy.Type) {
case strategyRandom, "":
strategy = strategyRandom
case strategyLeastPing:
strategy = "leastPing"
case strategyRoundRobin:
strategy = "roundRobin"
r.Strategy.Type = strings.ToLower(r.Strategy.Type)
switch r.Strategy.Type {
case "":
r.Strategy.Type = strategyRandom
case strategyRandom, strategyLeastLoad, strategyLeastPing, strategyRoundRobin:
default:
return nil, newError("unknown balancing strategy: " + r.Strategy.Type)
}
settings := []byte("{}")
if r.Strategy.Settings != nil {
settings = ([]byte)(*r.Strategy.Settings)
}
rawConfig, err := strategyConfigLoader.LoadWithID(settings, r.Strategy.Type)
if err != nil {
return nil, newError("failed to parse to strategy config.").Base(err)
}
var ts proto.Message
if builder, ok := rawConfig.(Buildable); ok {
ts, err = builder.Build()
if err != nil {
return nil, err
}
}
return &router.BalancingRule{
Strategy: r.Strategy.Type,
StrategySettings: serial.ToTypedMessage(ts),
FallbackTag: r.FallbackTag,
OutboundSelector: r.Selectors,
Tag: r.Tag,
OutboundSelector: []string(r.Selectors),
Strategy: strategy,
}, nil
}

View file

@ -1,7 +1,93 @@
package conf
import (
"google.golang.org/protobuf/proto"
"github.com/xtls/xray-core/app/router"
"github.com/xtls/xray-core/app/observatory/burst"
"github.com/xtls/xray-core/infra/conf/cfgcommon/duration"
)
const (
strategyRandom string = "random"
strategyLeastPing string = "leastping"
strategyRoundRobin string = "roundrobin"
strategyLeastLoad string = "leastload"
)
var (
strategyConfigLoader = NewJSONConfigLoader(ConfigCreatorCache{
strategyRandom: func() interface{} { return new(strategyEmptyConfig) },
strategyLeastPing: func() interface{} { return new(strategyEmptyConfig) },
strategyRoundRobin: func() interface{} { return new(strategyEmptyConfig) },
strategyLeastLoad: func() interface{} { return new(strategyLeastLoadConfig) },
}, "type", "settings")
)
type strategyEmptyConfig struct {
}
func (v *strategyEmptyConfig) Build() (proto.Message, error) {
return nil, nil
}
type strategyLeastLoadConfig struct {
// weight settings
Costs []*router.StrategyWeight `json:"costs,omitempty"`
// ping rtt baselines
Baselines []duration.Duration `json:"baselines,omitempty"`
// expected nodes count to select
Expected int32 `json:"expected,omitempty"`
// max acceptable rtt, filter away high delay nodes. defalut 0
MaxRTT duration.Duration `json:"maxRTT,omitempty"`
// acceptable failure rate
Tolerance float64 `json:"tolerance,omitempty"`
}
// healthCheckSettings holds settings for health Checker
type healthCheckSettings struct {
Destination string `json:"destination"`
Connectivity string `json:"connectivity"`
Interval duration.Duration `json:"interval"`
SamplingCount int `json:"sampling"`
Timeout duration.Duration `json:"timeout"`
}
func (h healthCheckSettings) Build() (proto.Message, error) {
return &burst.HealthPingConfig{
Destination: h.Destination,
Connectivity: h.Connectivity,
Interval: int64(h.Interval),
Timeout: int64(h.Timeout),
SamplingCount: int32(h.SamplingCount),
}, nil
}
// Build implements Buildable.
func (v *strategyLeastLoadConfig) Build() (proto.Message, error) {
config := &router.StrategyLeastLoadConfig{}
config.Costs = v.Costs
config.Tolerance = float32(v.Tolerance)
if config.Tolerance < 0 {
config.Tolerance = 0
}
if config.Tolerance > 1 {
config.Tolerance = 1
}
config.Expected = v.Expected
if config.Expected < 0 {
config.Expected = 0
}
config.MaxRTT = int64(v.MaxRTT)
if config.MaxRTT < 0 {
config.MaxRTT = 0
}
config.Baselines = make([]int64, 0)
for _, b := range v.Baselines {
if b <= 0 {
continue
}
config.Baselines = append(config.Baselines, int64(b))
}
return config, nil
}

View file

@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"testing"
"time"
_ "unsafe"
"github.com/xtls/xray-core/app/router"
@ -12,6 +13,7 @@ import (
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/platform"
"github.com/xtls/xray-core/common/platform/filesystem"
"github.com/xtls/xray-core/common/serial"
. "github.com/xtls/xray-core/infra/conf"
"google.golang.org/protobuf/proto"
)
@ -96,6 +98,34 @@ func TestRouterConfig(t *testing.T) {
{
"tag": "b1",
"selector": ["test"]
},
{
"tag": "b2",
"selector": ["test"],
"strategy": {
"type": "leastload",
"settings": {
"healthCheck": {
"interval": "5m0s",
"sampling": 2,
"timeout": "5s",
"destination": "dest",
"connectivity": "conn"
},
"costs": [
{
"regexp": true,
"match": "\\d+(\\.\\d+)",
"value": 5
}
],
"baselines": ["400ms", "600ms"],
"expected": 6,
"maxRTT": "1000ms",
"tolerance": 0.5
}
},
"fallbackTag": "fall"
}
]
}`,
@ -108,6 +138,28 @@ func TestRouterConfig(t *testing.T) {
OutboundSelector: []string{"test"},
Strategy: "random",
},
{
Tag: "b2",
OutboundSelector: []string{"test"},
Strategy: "leastload",
StrategySettings: serial.ToTypedMessage(&router.StrategyLeastLoadConfig{
Costs: []*router.StrategyWeight{
{
Regexp: true,
Match: "\\d+(\\.\\d+)",
Value: 5,
},
},
Baselines: []int64{
int64(time.Duration(400) * time.Millisecond),
int64(time.Duration(600) * time.Millisecond),
},
Expected: 6,
MaxRTT: int64(time.Duration(1000) * time.Millisecond),
Tolerance: 0.5,
}),
FallbackTag: "fall",
},
},
Rule: []*router.RoutingRule{
{

View file

@ -410,6 +410,7 @@ type Config struct {
Reverse *ReverseConfig `json:"reverse"`
FakeDNS *FakeDNSConfig `json:"fakeDns"`
Observatory *ObservatoryConfig `json:"observatory"`
BurstObservatory *BurstObservatoryConfig `json:"burstObservatory"`
}
func (c *Config) findInboundTag(tag string) int {
@ -639,6 +640,14 @@ func (c *Config) Build() (*core.Config, error) {
config.App = append(config.App, serial.ToTypedMessage(r))
}
if c.BurstObservatory != nil {
r, err := c.BurstObservatory.Build()
if err != nil {
return nil, err
}
config.App = append(config.App, serial.ToTypedMessage(r))
}
var inbounds []InboundDetourConfig
if c.InboundConfig != nil {