Core: Add mutex to injection resolution (#4206)

* Revert "Add RequireFeaturesAsync() that works regardless order of app init"

* Add mutex to injection resolution

- Turns out we already support async DI resolution regardless of feature ordering
Previous code contain a race condition causing some resolution is lost
- Note that the new mutex cover s.pendingResolutions and s.features
but must not cover callbackResolution() due to deadlock
- Refactor some method names and simplify code

* Add OptionalFeatures injection

For example OptionalFeatures() is useful for fakedns module
This commit is contained in:
yuhan6665 2024-12-26 07:55:12 -05:00 committed by GitHub
parent a7909f8671
commit 42aea01fb5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 107 additions and 93 deletions

View File

@ -106,7 +106,7 @@ func init() {
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
d := new(DefaultDispatcher) d := new(DefaultDispatcher)
if err := core.RequireFeatures(ctx, func(om outbound.Manager, router routing.Router, pm policy.Manager, sm stats.Manager, dc dns.Client) error { if err := core.RequireFeatures(ctx, func(om outbound.Manager, router routing.Router, pm policy.Manager, sm stats.Manager, dc dns.Client) error {
core.RequireFeatures(ctx, func(fdns dns.FakeDNSEngine) { // FakeDNSEngine is optional core.OptionalFeatures(ctx, func(fdns dns.FakeDNSEngine) {
d.fdns = fdns d.fdns = fdns
}) })
return d.Init(config.(*Config), om, router, pm, sm, dc) return d.Init(config.(*Config), om, router, pm, sm, dc)

View File

@ -56,7 +56,7 @@ func NewServer(ctx context.Context, dest net.Destination, dispatcher routing.Dis
return NewTCPLocalNameServer(u, queryStrategy) return NewTCPLocalNameServer(u, queryStrategy)
case strings.EqualFold(u.String(), "fakedns"): case strings.EqualFold(u.String(), "fakedns"):
var fd dns.FakeDNSEngine var fd dns.FakeDNSEngine
core.RequireFeatures(ctx, func(fdns dns.FakeDNSEngine) { // FakeDNSEngine is optional core.RequireFeatures(ctx, func(fdns dns.FakeDNSEngine) {
fd = fdns fd = fdns
}) })
return NewFakeDNSServer(fd), nil return NewFakeDNSServer(fd), nil

View File

@ -38,7 +38,7 @@ func init() {
sv := &service{v: s} sv := &service{v: s}
err := s.RequireFeatures(func(Observatory extension.Observatory) { err := s.RequireFeatures(func(Observatory extension.Observatory) {
sv.observatory = Observatory sv.observatory = Observatory
}) }, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -177,7 +177,7 @@ func (s *service) Register(server *grpc.Server) {
common.Must(s.v.RequireFeatures(func(im inbound.Manager, om outbound.Manager) { common.Must(s.v.RequireFeatures(func(im inbound.Manager, om outbound.Manager) {
hs.ihm = im hs.ihm = im
hs.ohm = om hs.ohm = om
})) }, false))
RegisterHandlerServiceServer(server, hs) RegisterHandlerServiceServer(server, hs)
// For compatibility purposes // For compatibility purposes

View File

@ -5,6 +5,7 @@ import (
sync "sync" sync "sync"
"github.com/xtls/xray-core/app/observatory" "github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/core" "github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/extension" "github.com/xtls/xray-core/features/extension"
@ -31,9 +32,10 @@ type RoundRobinStrategy struct {
func (s *RoundRobinStrategy) InjectContext(ctx context.Context) { func (s *RoundRobinStrategy) InjectContext(ctx context.Context) {
s.ctx = ctx s.ctx = ctx
if len(s.FallbackTag) > 0 { if len(s.FallbackTag) > 0 {
core.RequireFeaturesAsync(s.ctx, func(observatory extension.Observatory) { common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
s.observatory = observatory s.observatory = observatory
}) return nil
}))
} }
} }

View File

@ -135,7 +135,7 @@ func (s *service) Register(server *grpc.Server) {
vCoreDesc := RoutingService_ServiceDesc vCoreDesc := RoutingService_ServiceDesc
vCoreDesc.ServiceName = "v2ray.core.app.router.command.RoutingService" vCoreDesc.ServiceName = "v2ray.core.app.router.command.RoutingService"
server.RegisterService(&vCoreDesc, rs) server.RegisterService(&vCoreDesc, rs)
})) }, false))
} }
func init() { func init() {

View File

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/xtls/xray-core/app/observatory" "github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/dice" "github.com/xtls/xray-core/common/dice"
"github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/core" "github.com/xtls/xray-core/core"
@ -59,9 +60,10 @@ type node struct {
func (s *LeastLoadStrategy) InjectContext(ctx context.Context) { func (s *LeastLoadStrategy) InjectContext(ctx context.Context) {
s.ctx = ctx s.ctx = ctx
core.RequireFeaturesAsync(s.ctx, func(observatory extension.Observatory) { common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
s.observer = observatory s.observer = observatory
}) return nil
}))
} }
func (s *LeastLoadStrategy) PickOutbound(candidates []string) string { func (s *LeastLoadStrategy) PickOutbound(candidates []string) string {

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/xtls/xray-core/app/observatory" "github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/core" "github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/extension" "github.com/xtls/xray-core/features/extension"
@ -20,9 +21,10 @@ func (l *LeastPingStrategy) GetPrincipleTarget(strings []string) []string {
func (l *LeastPingStrategy) InjectContext(ctx context.Context) { func (l *LeastPingStrategy) InjectContext(ctx context.Context) {
l.ctx = ctx l.ctx = ctx
core.RequireFeaturesAsync(l.ctx, func(observatory extension.Observatory) { common.Must(core.RequireFeatures(l.ctx, func(observatory extension.Observatory) error {
l.observatory = observatory l.observatory = observatory
}) return nil
}))
} }
func (l *LeastPingStrategy) PickOutbound(strings []string) string { func (l *LeastPingStrategy) PickOutbound(strings []string) string {

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/xtls/xray-core/app/observatory" "github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/dice" "github.com/xtls/xray-core/common/dice"
"github.com/xtls/xray-core/core" "github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/extension" "github.com/xtls/xray-core/features/extension"
@ -20,9 +21,10 @@ type RandomStrategy struct {
func (s *RandomStrategy) InjectContext(ctx context.Context) { func (s *RandomStrategy) InjectContext(ctx context.Context) {
s.ctx = ctx s.ctx = ctx
if len(s.FallbackTag) > 0 { if len(s.FallbackTag) > 0 {
core.RequireFeaturesAsync(s.ctx, func(observatory extension.Observatory) { common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
s.observatory = observatory s.observatory = observatory
}) return nil
}))
} }
} }

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"reflect" "reflect"
"sync" "sync"
"time"
"github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/errors"
@ -45,22 +44,13 @@ func getFeature(allFeatures []features.Feature, t reflect.Type) features.Feature
return nil return nil
} }
func (r *resolution) resolve(allFeatures []features.Feature) (bool, error) { func (r *resolution) callbackResolution(allFeatures []features.Feature) error {
var fs []features.Feature
for _, d := range r.deps {
f := getFeature(allFeatures, d)
if f == nil {
return false, nil
}
fs = append(fs, f)
}
callback := reflect.ValueOf(r.callback) callback := reflect.ValueOf(r.callback)
var input []reflect.Value var input []reflect.Value
callbackType := callback.Type() callbackType := callback.Type()
for i := 0; i < callbackType.NumIn(); i++ { for i := 0; i < callbackType.NumIn(); i++ {
pt := callbackType.In(i) pt := callbackType.In(i)
for _, f := range fs { for _, f := range allFeatures {
if reflect.TypeOf(f).AssignableTo(pt) { if reflect.TypeOf(f).AssignableTo(pt) {
input = append(input, reflect.ValueOf(f)) input = append(input, reflect.ValueOf(f))
break break
@ -85,15 +75,17 @@ func (r *resolution) resolve(allFeatures []features.Feature) (bool, error) {
} }
} }
return true, err return err
} }
// Instance combines all Xray features. // Instance combines all Xray features.
type Instance struct { type Instance struct {
access sync.Mutex statusLock sync.Mutex
features []features.Feature features []features.Feature
featureResolutions []resolution pendingResolutions []resolution
running bool pendingOptionalResolutions []resolution
running bool
resolveLock sync.Mutex
ctx context.Context ctx context.Context
} }
@ -154,13 +146,14 @@ func addOutboundHandlers(server *Instance, configs []*OutboundHandlerConfig) err
// See Instance.RequireFeatures for more information. // See Instance.RequireFeatures for more information.
func RequireFeatures(ctx context.Context, callback interface{}) error { func RequireFeatures(ctx context.Context, callback interface{}) error {
v := MustFromContext(ctx) v := MustFromContext(ctx)
return v.RequireFeatures(callback) return v.RequireFeatures(callback, false)
} }
// RequireFeaturesAsync registers a callback, which will be called when all dependent features are registered. The order of app init doesn't matter // OptionalFeatures is a helper function to aquire features from Instance in context.
func RequireFeaturesAsync(ctx context.Context, callback interface{}) { // See Instance.RequireFeatures for more information.
func OptionalFeatures(ctx context.Context, callback interface{}) error {
v := MustFromContext(ctx) v := MustFromContext(ctx)
v.RequireFeaturesAsync(callback) return v.RequireFeatures(callback, true)
} }
// New returns a new Xray instance based on given configuration. // New returns a new Xray instance based on given configuration.
@ -234,9 +227,12 @@ func initInstanceWithConfig(config *Config, server *Instance) (bool, error) {
}(), }(),
) )
if server.featureResolutions != nil { server.resolveLock.Lock()
if server.pendingResolutions != nil {
server.resolveLock.Unlock()
return true, errors.New("not all dependencies are resolved.") return true, errors.New("not all dependencies are resolved.")
} }
server.resolveLock.Unlock()
if err := addInboundHandlers(server, config.Inbound); err != nil { if err := addInboundHandlers(server, config.Inbound); err != nil {
return true, err return true, err
@ -255,8 +251,8 @@ func (s *Instance) Type() interface{} {
// Close shutdown the Xray instance. // Close shutdown the Xray instance.
func (s *Instance) Close() error { func (s *Instance) Close() error {
s.access.Lock() s.statusLock.Lock()
defer s.access.Unlock() defer s.statusLock.Unlock()
s.running = false s.running = false
@ -275,7 +271,7 @@ func (s *Instance) Close() error {
// RequireFeatures registers a callback, which will be called when all dependent features are registered. // RequireFeatures registers a callback, which will be called when all dependent features are registered.
// The callback must be a func(). All its parameters must be features.Feature. // The callback must be a func(). All its parameters must be features.Feature.
func (s *Instance) RequireFeatures(callback interface{}) error { func (s *Instance) RequireFeatures(callback interface{}, optional bool) error {
callbackType := reflect.TypeOf(callback) callbackType := reflect.TypeOf(callback)
if callbackType.Kind() != reflect.Func { if callbackType.Kind() != reflect.Func {
panic("not a function") panic("not a function")
@ -290,47 +286,32 @@ func (s *Instance) RequireFeatures(callback interface{}) error {
deps: featureTypes, deps: featureTypes,
callback: callback, callback: callback,
} }
if finished, err := r.resolve(s.features); finished {
return err
}
s.featureResolutions = append(s.featureResolutions, r)
return nil
}
// RequireFeaturesAsync registers a callback, which will be called when all dependent features are registered. The order of app init doesn't matter s.resolveLock.Lock()
func (s *Instance) RequireFeaturesAsync(callback interface{}) { foundAll := true
callbackType := reflect.TypeOf(callback) for _, d := range r.deps {
if callbackType.Kind() != reflect.Func { f := getFeature(s.features, d)
panic("not a function") if f == nil {
} foundAll = false
break
var featureTypes []reflect.Type
for i := 0; i < callbackType.NumIn(); i++ {
featureTypes = append(featureTypes, reflect.PtrTo(callbackType.In(i)))
}
r := resolution{
deps: featureTypes,
callback: callback,
}
go func() {
var finished = false
for i := 0; !finished; i++ {
if i > 100000 {
errors.LogError(s.ctx, "RequireFeaturesAsync failed after count ", i)
break;
}
finished, _ = r.resolve(s.features)
time.Sleep(time.Millisecond)
} }
s.featureResolutions = append(s.featureResolutions, r) }
}() if foundAll {
s.resolveLock.Unlock()
return r.callbackResolution(s.features)
} else {
if optional {
s.pendingOptionalResolutions = append(s.pendingOptionalResolutions, r)
} else {
s.pendingResolutions = append(s.pendingResolutions, r)
}
s.resolveLock.Unlock()
return nil
}
} }
// AddFeature registers a feature into current Instance. // AddFeature registers a feature into current Instance.
func (s *Instance) AddFeature(feature features.Feature) error { func (s *Instance) AddFeature(feature features.Feature) error {
s.features = append(s.features, feature)
if s.running { if s.running {
if err := feature.Start(); err != nil { if err := feature.Start(); err != nil {
errors.LogInfoInner(s.ctx, err, "failed to start feature") errors.LogInfoInner(s.ctx, err, "failed to start feature")
@ -338,27 +319,52 @@ func (s *Instance) AddFeature(feature features.Feature) error {
return nil return nil
} }
if s.featureResolutions == nil { s.resolveLock.Lock()
return nil s.features = append(s.features, feature)
}
var pendingResolutions []resolution var availableResolution []resolution
for _, r := range s.featureResolutions { var pending []resolution
finished, err := r.resolve(s.features) for _, r := range s.pendingResolutions {
if finished && err != nil { foundAll := true
return err for _, d := range r.deps {
f := getFeature(s.features, d)
if f == nil {
foundAll = false
break
}
} }
if !finished { if foundAll {
pendingResolutions = append(pendingResolutions, r) availableResolution = append(availableResolution, r)
} else {
pending = append(pending, r)
} }
} }
if len(pendingResolutions) == 0 { s.pendingResolutions = pending
s.featureResolutions = nil
} else if len(pendingResolutions) < len(s.featureResolutions) {
s.featureResolutions = pendingResolutions
}
return nil var pendingOptional []resolution
for _, r := range s.pendingOptionalResolutions {
foundAll := true
for _, d := range r.deps {
f := getFeature(s.features, d)
if f == nil {
foundAll = false
break
}
}
if foundAll {
availableResolution = append(availableResolution, r)
} else {
pendingOptional = append(pendingOptional, r)
}
}
s.pendingOptionalResolutions = pendingOptional
s.resolveLock.Unlock()
var err error
for _, r := range availableResolution {
err = r.callbackResolution(s.features) // only return the last error for now
}
return err
} }
// GetFeature returns a feature of the given type, or nil if such feature is not registered. // GetFeature returns a feature of the given type, or nil if such feature is not registered.
@ -371,8 +377,8 @@ func (s *Instance) GetFeature(featureType interface{}) features.Feature {
// //
// xray:api:stable // xray:api:stable
func (s *Instance) Start() error { func (s *Instance) Start() error {
s.access.Lock() s.statusLock.Lock()
defer s.access.Unlock() defer s.statusLock.Unlock()
s.running = true s.running = true
for _, f := range s.features { for _, f := range s.features {

View File

@ -30,7 +30,7 @@ func TestXrayDependency(t *testing.T) {
t.Error("expected dns client fulfilled, but actually nil") t.Error("expected dns client fulfilled, but actually nil")
} }
wait <- true wait <- true
}) }, false)
instance.AddFeature(localdns.New()) instance.AddFeature(localdns.New())
<-wait <-wait
} }

View File

@ -27,7 +27,7 @@ func init() {
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
h := new(Handler) h := new(Handler)
if err := core.RequireFeatures(ctx, func(dnsClient dns.Client, policyManager policy.Manager) error { if err := core.RequireFeatures(ctx, func(dnsClient dns.Client, policyManager policy.Manager) error {
core.RequireFeatures(ctx, func(fdns dns.FakeDNSEngine) { // FakeDNSEngine is optional core.OptionalFeatures(ctx, func(fdns dns.FakeDNSEngine) {
h.fdns = fdns h.fdns = fdns
}) })
return h.Init(config.(*Config), dnsClient, policyManager) return h.Init(config.(*Config), dnsClient, policyManager)