mirror of
https://github.com/XTLS/Xray-core.git
synced 2025-01-01 01:59:47 +00:00
28b17b529d
Add `enableConcurrency` option, false by default. If it's set as `true`, start probing outbounds concurrently in every circle of observation. Wait `probeInterval` between observation circles.
240 lines
5.9 KiB
Go
240 lines
5.9 KiB
Go
package observatory
|
|
|
|
import (
|
|
"context"
|
|
"github.com/xtls/xray-core/core"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
"github.com/xtls/xray-core/common"
|
|
v2net "github.com/xtls/xray-core/common/net"
|
|
"github.com/xtls/xray-core/common/session"
|
|
"github.com/xtls/xray-core/common/signal/done"
|
|
"github.com/xtls/xray-core/common/task"
|
|
"github.com/xtls/xray-core/features/extension"
|
|
"github.com/xtls/xray-core/features/outbound"
|
|
"github.com/xtls/xray-core/transport/internet/tagged"
|
|
)
|
|
|
|
type Observer struct {
|
|
config *Config
|
|
ctx context.Context
|
|
|
|
statusLock sync.Mutex
|
|
status []*OutboundStatus
|
|
|
|
finished *done.Instance
|
|
|
|
ohm outbound.Manager
|
|
}
|
|
|
|
func (o *Observer) GetObservation(ctx context.Context) (proto.Message, error) {
|
|
return &ObservationResult{Status: o.status}, nil
|
|
}
|
|
|
|
func (o *Observer) Type() interface{} {
|
|
return extension.ObservatoryType()
|
|
}
|
|
|
|
func (o *Observer) Start() error {
|
|
if o.config != nil && len(o.config.SubjectSelector) != 0 {
|
|
o.finished = done.New()
|
|
go o.background()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *Observer) Close() error {
|
|
if o.finished != nil {
|
|
return o.finished.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *Observer) background() {
|
|
for !o.finished.Done() {
|
|
hs, ok := o.ohm.(outbound.HandlerSelector)
|
|
if !ok {
|
|
newError("outbound.Manager is not a HandlerSelector").WriteToLog()
|
|
return
|
|
}
|
|
|
|
outbounds := hs.Select(o.config.SubjectSelector)
|
|
|
|
o.updateStatus(outbounds)
|
|
|
|
sleepTime := time.Second * 10
|
|
if o.config.ProbeInterval != 0 {
|
|
sleepTime = time.Duration(o.config.ProbeInterval)
|
|
}
|
|
|
|
if !o.config.EnableConcurrency {
|
|
sort.Strings(outbounds)
|
|
for _, v := range outbounds {
|
|
result := o.probe(v)
|
|
o.updateStatusForResult(v, &result)
|
|
if o.finished.Done() {
|
|
return
|
|
}
|
|
time.Sleep(sleepTime)
|
|
}
|
|
continue
|
|
}
|
|
|
|
ch := make(chan struct{}, len(outbounds))
|
|
|
|
for _, v := range outbounds {
|
|
go func(v string) {
|
|
result := o.probe(v)
|
|
o.updateStatusForResult(v, &result)
|
|
ch <- struct{}{}
|
|
}(v)
|
|
}
|
|
|
|
for range outbounds {
|
|
select {
|
|
case <-ch:
|
|
case <-o.finished.Wait():
|
|
return
|
|
}
|
|
}
|
|
time.Sleep(sleepTime)
|
|
}
|
|
}
|
|
|
|
func (o *Observer) updateStatus(outbounds []string) {
|
|
o.statusLock.Lock()
|
|
defer o.statusLock.Unlock()
|
|
// TODO should remove old inbound that is removed
|
|
_ = outbounds
|
|
}
|
|
|
|
func (o *Observer) probe(outbound string) ProbeResult {
|
|
errorCollectorForRequest := newErrorCollector()
|
|
|
|
httpTransport := http.Transport{
|
|
Proxy: func(*http.Request) (*url.URL, error) {
|
|
return nil, nil
|
|
},
|
|
DialContext: func(ctx context.Context, network string, addr string) (net.Conn, error) {
|
|
var connection net.Conn
|
|
taskErr := task.Run(ctx, func() error {
|
|
// MUST use Xray's built in context system
|
|
dest, err := v2net.ParseDestination(network + ":" + addr)
|
|
if err != nil {
|
|
return newError("cannot understand address").Base(err)
|
|
}
|
|
trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest)
|
|
conn, err := tagged.Dialer(trackedCtx, dest, outbound)
|
|
if err != nil {
|
|
return newError("cannot dial remote address ", dest).Base(err)
|
|
}
|
|
connection = conn
|
|
return nil
|
|
})
|
|
if taskErr != nil {
|
|
return nil, newError("cannot finish connection").Base(taskErr)
|
|
}
|
|
return connection, nil
|
|
},
|
|
TLSHandshakeTimeout: time.Second * 5,
|
|
}
|
|
httpClient := &http.Client{
|
|
Transport: &httpTransport,
|
|
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
|
return http.ErrUseLastResponse
|
|
},
|
|
Jar: nil,
|
|
Timeout: time.Second * 5,
|
|
}
|
|
var GETTime time.Duration
|
|
err := task.Run(o.ctx, func() error {
|
|
startTime := time.Now()
|
|
probeURL := "https://www.google.com/generate_204"
|
|
if o.config.ProbeUrl != "" {
|
|
probeURL = o.config.ProbeUrl
|
|
}
|
|
response, err := httpClient.Get(probeURL)
|
|
if err != nil {
|
|
return newError("outbound failed to relay connection").Base(err)
|
|
}
|
|
if response.Body != nil {
|
|
response.Body.Close()
|
|
}
|
|
endTime := time.Now()
|
|
GETTime = endTime.Sub(startTime)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
fullerr := newError("underlying connection failed").Base(errorCollectorForRequest.UnderlyingError())
|
|
fullerr = newError("with outbound handler report").Base(fullerr)
|
|
fullerr = newError("GET request failed:", err).Base(fullerr)
|
|
fullerr = newError("the outbound ", outbound, " is dead:").Base(fullerr)
|
|
fullerr = fullerr.AtInfo()
|
|
fullerr.WriteToLog()
|
|
return ProbeResult{Alive: false, LastErrorReason: fullerr.Error()}
|
|
}
|
|
newError("the outbound ", outbound, " is alive:", GETTime.Seconds()).AtInfo().WriteToLog()
|
|
return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()}
|
|
}
|
|
|
|
func (o *Observer) updateStatusForResult(outbound string, result *ProbeResult) {
|
|
o.statusLock.Lock()
|
|
defer o.statusLock.Unlock()
|
|
var status *OutboundStatus
|
|
if location := o.findStatusLocationLockHolderOnly(outbound); location != -1 {
|
|
status = o.status[location]
|
|
} else {
|
|
status = &OutboundStatus{}
|
|
o.status = append(o.status, status)
|
|
}
|
|
|
|
status.LastTryTime = time.Now().Unix()
|
|
status.OutboundTag = outbound
|
|
status.Alive = result.Alive
|
|
if result.Alive {
|
|
status.Delay = result.Delay
|
|
status.LastSeenTime = status.LastTryTime
|
|
status.LastErrorReason = ""
|
|
} else {
|
|
status.LastErrorReason = result.LastErrorReason
|
|
status.Delay = 99999999
|
|
}
|
|
}
|
|
|
|
func (o *Observer) findStatusLocationLockHolderOnly(outbound string) int {
|
|
for i, v := range o.status {
|
|
if v.OutboundTag == outbound {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func New(ctx context.Context, config *Config) (*Observer, error) {
|
|
var outboundManager outbound.Manager
|
|
err := core.RequireFeatures(ctx, func(om outbound.Manager) {
|
|
outboundManager = om
|
|
})
|
|
if err != nil {
|
|
return nil, newError("Cannot get depended features").Base(err)
|
|
}
|
|
return &Observer{
|
|
config: config,
|
|
ctx: ctx,
|
|
ohm: outboundManager,
|
|
}, nil
|
|
}
|
|
|
|
func init() {
|
|
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
|
|
return New(ctx, config.(*Config))
|
|
}))
|
|
}
|