diff --git a/app/metrics/config.pb.go b/app/metrics/config.pb.go new file mode 100644 index 00000000..7869fdf3 --- /dev/null +++ b/app/metrics/config.pb.go @@ -0,0 +1,148 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.18.0 +// source: config.proto + +package metrics + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Config is the settings for metrics. +type Config struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Tag of the outbound handler that handles metrics http connections. + Tag string `protobuf:"bytes,1,opt,name=tag,proto3" json:"tag,omitempty"` +} + +func (x *Config) Reset() { + *x = Config{} + if protoimpl.UnsafeEnabled { + mi := &file_config_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Config) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Config) ProtoMessage() {} + +func (x *Config) ProtoReflect() protoreflect.Message { + mi := &file_config_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Config.ProtoReflect.Descriptor instead. +func (*Config) Descriptor() ([]byte, []int) { + return file_config_proto_rawDescGZIP(), []int{0} +} + +func (x *Config) GetTag() string { + if x != nil { + return x.Tag + } + return "" +} + +var File_config_proto protoreflect.FileDescriptor + +var file_config_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10, + 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, + 0x22, 0x1a, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x61, + 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x74, 0x61, 0x67, 0x42, 0x52, 0x0a, 0x14, + 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x50, 0x01, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, + 0x65, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0xaa, 0x02, 0x10, + 0x58, 0x72, 0x61, 0x79, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_config_proto_rawDescOnce sync.Once + file_config_proto_rawDescData = file_config_proto_rawDesc +) + +func file_config_proto_rawDescGZIP() []byte { + file_config_proto_rawDescOnce.Do(func() { + file_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_config_proto_rawDescData) + }) + return file_config_proto_rawDescData +} + +var file_config_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_config_proto_goTypes = []interface{}{ + (*Config)(nil), // 0: xray.app.metrics.Config +} +var file_config_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_config_proto_init() } +func file_config_proto_init() { + if File_config_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Config); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_config_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_config_proto_goTypes, + DependencyIndexes: file_config_proto_depIdxs, + MessageInfos: file_config_proto_msgTypes, + }.Build() + File_config_proto = out.File + file_config_proto_rawDesc = nil + file_config_proto_goTypes = nil + file_config_proto_depIdxs = nil +} diff --git a/app/metrics/config.proto b/app/metrics/config.proto new file mode 100644 index 00000000..5e6880f1 --- /dev/null +++ b/app/metrics/config.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +package xray.app.metrics; +option csharp_namespace = "Xray.App.Metrics"; +option go_package = "github.com/xtls/xray-core/app/metrics"; +option java_package = "com.xray.app.metrics"; +option java_multiple_files = true; + +// Config is the settings for metrics. +message Config { + // Tag of the outbound handler that handles metrics http connections. + string tag = 1; +} diff --git a/app/metrics/errors.generated.go b/app/metrics/errors.generated.go new file mode 100644 index 00000000..dee6b316 --- /dev/null +++ b/app/metrics/errors.generated.go @@ -0,0 +1,9 @@ +package metrics + +import "github.com/xtls/xray-core/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +} diff --git a/app/metrics/metrics.go b/app/metrics/metrics.go new file mode 100644 index 00000000..2f1d6baa --- /dev/null +++ b/app/metrics/metrics.go @@ -0,0 +1,115 @@ +package metrics + +import ( + "context" + "expvar" + "net/http" + _ "net/http/pprof" + "strings" + + "github.com/xtls/xray-core/app/observatory" + "github.com/xtls/xray-core/app/stats" + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/signal/done" + "github.com/xtls/xray-core/core" + "github.com/xtls/xray-core/features/extension" + "github.com/xtls/xray-core/features/outbound" + feature_stats "github.com/xtls/xray-core/features/stats" +) + +type MetricsHandler struct { + ohm outbound.Manager + statsManager feature_stats.Manager + observatory extension.Observatory + tag string +} + +// NewMetricsHandler creates a new MetricsHandler based on the given config. +func NewMetricsHandler(ctx context.Context, config *Config) (*MetricsHandler, error) { + c := &MetricsHandler{ + tag: config.Tag, + } + common.Must(core.RequireFeatures(ctx, func(om outbound.Manager, sm feature_stats.Manager) { + c.statsManager = sm + c.ohm = om + })) + expvar.Publish("stats", expvar.Func(func() interface{} { + manager, ok := c.statsManager.(*stats.Manager) + if !ok { + return nil + } + var resp = map[string]map[string]map[string]int64{ + "inbound": {}, + "outbound": {}, + "user": {}, + } + manager.VisitCounters(func(name string, counter feature_stats.Counter) bool { + nameSplit := strings.Split(name, ">>>") + typeName, tagOrUser, direction := nameSplit[0], nameSplit[1], nameSplit[3] + if item, found := resp[typeName][tagOrUser]; found { + item[direction] = counter.Value() + } else { + resp[typeName][tagOrUser] = map[string]int64{ + direction: counter.Value(), + } + } + return true + }) + return resp + })) + expvar.Publish("observatory", expvar.Func(func() interface{} { + if c.observatory == nil { + common.Must(core.RequireFeatures(ctx, func(observatory extension.Observatory) error { + c.observatory = observatory + return nil + })) + } + var resp = map[string]*observatory.OutboundStatus{} + if o, err := c.observatory.GetObservation(context.Background()); err != nil { + return err + } else { + for _, x := range o.(*observatory.ObservationResult).GetStatus() { + resp[x.OutboundTag] = x + } + } + return resp + })) + return c, nil +} + +func (p *MetricsHandler) Type() interface{} { + return (*MetricsHandler)(nil) +} + +func (p *MetricsHandler) Start() error { + listener := &OutboundListener{ + buffer: make(chan net.Conn, 4), + done: done.New(), + } + + go func() { + if err := http.Serve(listener, http.DefaultServeMux); err != nil { + newError("failed to start metrics server").Base(err).AtError().WriteToLog() + } + }() + + if err := p.ohm.RemoveHandler(context.Background(), p.tag); err != nil { + newError("failed to remove existing handler").WriteToLog() + } + + return p.ohm.AddHandler(context.Background(), &Outbound{ + tag: p.tag, + listener: listener, + }) +} + +func (p *MetricsHandler) Close() error { + return nil +} + +func init() { + common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, cfg interface{}) (interface{}, error) { + return NewMetricsHandler(ctx, cfg.(*Config)) + })) +} diff --git a/app/metrics/outbound.go b/app/metrics/outbound.go new file mode 100644 index 00000000..7505e2bf --- /dev/null +++ b/app/metrics/outbound.go @@ -0,0 +1,109 @@ +package metrics + +import ( + "context" + "sync" + + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/net/cnc" + "github.com/xtls/xray-core/common/signal/done" + "github.com/xtls/xray-core/transport" +) + +// OutboundListener is a net.Listener for listening pprof http connections. +type OutboundListener struct { + buffer chan net.Conn + done *done.Instance +} + +func (l *OutboundListener) add(conn net.Conn) { + select { + case l.buffer <- conn: + case <-l.done.Wait(): + conn.Close() + default: + conn.Close() + } +} + +// Accept implements net.Listener. +func (l *OutboundListener) Accept() (net.Conn, error) { + select { + case <-l.done.Wait(): + return nil, newError("listen closed") + case c := <-l.buffer: + return c, nil + } +} + +// Close implement net.Listener. +func (l *OutboundListener) Close() error { + common.Must(l.done.Close()) +L: + for { + select { + case c := <-l.buffer: + c.Close() + default: + break L + } + } + return nil +} + +// Addr implements net.Listener. +func (l *OutboundListener) Addr() net.Addr { + return &net.TCPAddr{ + IP: net.IP{0, 0, 0, 0}, + Port: 0, + } +} + +// Outbound is a outbound.Handler that handles pprof http connections. +type Outbound struct { + tag string + listener *OutboundListener + access sync.RWMutex + closed bool +} + +// Dispatch implements outbound.Handler. +func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) { + co.access.RLock() + + if co.closed { + common.Interrupt(link.Reader) + common.Interrupt(link.Writer) + co.access.RUnlock() + return + } + + closeSignal := done.New() + c := cnc.NewConnection(cnc.ConnectionInputMulti(link.Writer), cnc.ConnectionOutputMulti(link.Reader), cnc.ConnectionOnClose(closeSignal)) + co.listener.add(c) + co.access.RUnlock() + <-closeSignal.Wait() +} + +// Tag implements outbound.Handler. +func (co *Outbound) Tag() string { + return co.tag +} + +// Start implements common.Runnable. +func (co *Outbound) Start() error { + co.access.Lock() + co.closed = false + co.access.Unlock() + return nil +} + +// Close implements common.Closable. +func (co *Outbound) Close() error { + co.access.Lock() + defer co.access.Unlock() + + co.closed = true + return co.listener.Close() +} diff --git a/infra/conf/metrics.go b/infra/conf/metrics.go new file mode 100644 index 00000000..2a0c8ef9 --- /dev/null +++ b/infra/conf/metrics.go @@ -0,0 +1,19 @@ +package conf + +import ( + "github.com/xtls/xray-core/app/metrics" +) + +type MetricsConfig struct { + Tag string `json:"tag"` +} + +func (c *MetricsConfig) Build() (*metrics.Config, error) { + if c.Tag == "" { + return nil, newError("metrics tag can't be empty.") + } + + return &metrics.Config{ + Tag: c.Tag, + }, nil +} diff --git a/infra/conf/xray.go b/infra/conf/xray.go index 6a007074..c67e6e09 100644 --- a/infra/conf/xray.go +++ b/infra/conf/xray.go @@ -411,6 +411,7 @@ type Config struct { Transport *TransportConfig `json:"transport"` Policy *PolicyConfig `json:"policy"` API *APIConfig `json:"api"` + Metrics *MetricsConfig `json:"metrics"` Stats *StatsConfig `json:"stats"` Reverse *ReverseConfig `json:"reverse"` FakeDNS *FakeDNSConfig `json:"fakeDns"` @@ -461,6 +462,9 @@ func (c *Config) Override(o *Config, fn string) { if o.API != nil { c.API = o.API } + if o.Metrics != nil { + c.Metrics = o.Metrics + } if o.Stats != nil { c.Stats = o.Stats } @@ -566,7 +570,13 @@ func (c *Config) Build() (*core.Config, error) { } config.App = append(config.App, serial.ToTypedMessage(apiConf)) } - + if c.Metrics != nil { + metricsConf, err := c.Metrics.Build() + if err != nil { + return nil, err + } + config.App = append(config.App, serial.ToTypedMessage(metricsConf)) + } if c.Stats != nil { statsConf, err := c.Stats.Build() if err != nil { diff --git a/main/distro/all/all.go b/main/distro/all/all.go index cc1dcacf..db9d5c4d 100644 --- a/main/distro/all/all.go +++ b/main/distro/all/all.go @@ -21,6 +21,7 @@ import ( _ "github.com/xtls/xray-core/app/dns" _ "github.com/xtls/xray-core/app/dns/fakedns" _ "github.com/xtls/xray-core/app/log" + _ "github.com/xtls/xray-core/app/metrics" _ "github.com/xtls/xray-core/app/policy" _ "github.com/xtls/xray-core/app/reverse" _ "github.com/xtls/xray-core/app/router" diff --git a/testing/scenarios/metrics_test.go b/testing/scenarios/metrics_test.go new file mode 100644 index 00000000..8c30c514 --- /dev/null +++ b/testing/scenarios/metrics_test.go @@ -0,0 +1,105 @@ +package scenarios + +import ( + "fmt" + "io/ioutil" + "net/http" + "testing" + + "github.com/xtls/xray-core/app/metrics" + "github.com/xtls/xray-core/app/proxyman" + "github.com/xtls/xray-core/app/router" + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/serial" + "github.com/xtls/xray-core/core" + "github.com/xtls/xray-core/proxy/dokodemo" + "github.com/xtls/xray-core/proxy/freedom" + "github.com/xtls/xray-core/testing/servers/tcp" +) + +const expectedMessage = "goroutine profile: total" + +func TestMetrics(t *testing.T) { + tcpServer := tcp.Server{ + MsgProcessor: xor, + } + dest, err := tcpServer.Start() + common.Must(err) + defer tcpServer.Close() + + metricsPort := tcp.PickPort() + clientConfig := &core.Config{ + App: []*serial.TypedMessage{ + serial.ToTypedMessage(&metrics.Config{ + Tag: "metrics_out", + }), + serial.ToTypedMessage(&router.Config{ + Rule: []*router.RoutingRule{ + { + InboundTag: []string{"metrics_in"}, + TargetTag: &router.RoutingRule_Tag{ + Tag: "metrics_out", + }, + }, + }, + }), + }, + Inbound: []*core.InboundHandlerConfig{ + { + Tag: "metrics_in", + ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ + PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(metricsPort)}}, + Listen: net.NewIPOrDomain(net.LocalHostIP), + }), + ProxySettings: serial.ToTypedMessage(&dokodemo.Config{ + Address: net.NewIPOrDomain(dest.Address), + Port: uint32(dest.Port), + Networks: []net.Network{net.Network_TCP}, + }), + }, + }, + Outbound: []*core.OutboundHandlerConfig{ + { + Tag: "default-outbound", + ProxySettings: serial.ToTypedMessage(&freedom.Config{}), + }, + }, + } + + servers, err := InitializeServerConfigs(clientConfig) + common.Must(err) + defer CloseAllServers(servers) + + resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/debug/pprof/goroutine?debug=1", metricsPort)) + common.Must(err) + if resp == nil { + t.Error("unexpected pprof nil response") + } + if resp.StatusCode != http.StatusOK { + t.Error("unexpected pprof status code") + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + if string(body)[0:len(expectedMessage)] != expectedMessage { + t.Error("unexpected response body from pprof handler") + } + + resp2, err2 := http.Get(fmt.Sprintf("http://127.0.0.1:%d/debug/vars", metricsPort)) + common.Must(err2) + if resp2 == nil { + t.Error("unexpected expvars nil response") + } + if resp2.StatusCode != http.StatusOK { + t.Error("unexpected expvars status code") + } + body2, err2 := ioutil.ReadAll(resp2.Body) + if err2 != nil { + t.Fatal(err2) + } + if string(body2)[0] != '{' { + t.Error("unexpected response body from expvars handler") + } +}