Add concurrency option for outbound observation

Add `enableConcurrency` option, false by default.

If it's set as `true`, start probing outbounds concurrently in every
circle of observation. Wait `probeInterval` between observation circles.
This commit is contained in:
Zhu Sheng Li 2021-08-27 01:01:10 +08:00 committed by 世界
parent abb8ba8b0e
commit 28b17b529d
No known key found for this signature in database
GPG Key ID: CD109927C34A63C4
2 changed files with 38 additions and 14 deletions

View File

@ -66,22 +66,45 @@ func (o *Observer) background() {
} }
outbounds := hs.Select(o.config.SubjectSelector) outbounds := hs.Select(o.config.SubjectSelector)
sort.Strings(outbounds)
o.updateStatus(outbounds) o.updateStatus(outbounds)
sleepTime := time.Second * 10
if o.config.ProbeInterval != 0 {
sleepTime = time.Duration(o.config.ProbeInterval)
}
if !o.config.EnableConcurrency {
sort.Strings(outbounds)
for _, v := range outbounds { for _, v := range outbounds {
result := o.probe(v) result := o.probe(v)
o.updateStatusForResult(v, &result) o.updateStatusForResult(v, &result)
if o.finished.Done() { if o.finished.Done() {
return return
} }
sleepTime := time.Second * 10
if o.config.ProbeInterval != 0 {
sleepTime = time.Duration(o.config.ProbeInterval)
}
time.Sleep(sleepTime) time.Sleep(sleepTime)
} }
continue
}
ch := make(chan struct{}, len(outbounds))
for _, v := range outbounds {
go func(v string) {
result := o.probe(v)
o.updateStatusForResult(v, &result)
ch <- struct{}{}
}(v)
}
for range outbounds {
select {
case <-ch:
case <-o.finished.Wait():
return
}
}
time.Sleep(sleepTime)
} }
} }

View File

@ -11,8 +11,9 @@ type ObservatoryConfig struct {
SubjectSelector []string `json:"subjectSelector"` SubjectSelector []string `json:"subjectSelector"`
ProbeURL string `json:"probeURL"` ProbeURL string `json:"probeURL"`
ProbeInterval duration.Duration `json:"probeInterval"` ProbeInterval duration.Duration `json:"probeInterval"`
EnableConcurrency bool `json:"enableConcurrency"`
} }
func (o *ObservatoryConfig) Build() (proto.Message, error) { func (o *ObservatoryConfig) Build() (proto.Message, error) {
return &observatory.Config{SubjectSelector: o.SubjectSelector, ProbeUrl: o.ProbeURL, ProbeInterval: int64(o.ProbeInterval)}, nil return &observatory.Config{SubjectSelector: o.SubjectSelector, ProbeUrl: o.ProbeURL, ProbeInterval: int64(o.ProbeInterval), EnableConcurrency: o.EnableConcurrency}, nil
} }