diff --git a/common/buf/buffer.go b/common/buf/buffer.go index 9b32c6c7..ec0c32bd 100644 --- a/common/buf/buffer.go +++ b/common/buf/buffer.go @@ -31,6 +31,22 @@ func New() *Buffer { } } +func NewExisted(b []byte) *Buffer { + if cap(b) < Size { + panic("Invalid buffer") + } + + oLen := len(b) + if oLen < Size { + b = append(b, make([]byte, Size-oLen)...) + } + + return &Buffer{ + v: b, + end: int32(oLen), + } +} + // StackNew creates a new Buffer object on stack. // This method is for buffers that is released in the same function. func StackNew() Buffer { diff --git a/infra/conf/grpc.go b/infra/conf/grpc.go new file mode 100644 index 00000000..e39d314d --- /dev/null +++ b/infra/conf/grpc.go @@ -0,0 +1,16 @@ +package conf + +import ( + "github.com/golang/protobuf/proto" + + "github.com/xtls/xray-core/transport/internet/grpc" +) + +type GRPCConfig struct { + ServiceName string `json:"serviceName"` + MultiMode bool `json:"multiMode"` +} + +func (g GRPCConfig) Build() (proto.Message, error) { + return &grpc.Config{ServiceName: g.ServiceName, MultiMode: g.MultiMode}, nil +} diff --git a/infra/conf/transport.go b/infra/conf/transport.go index 1108167b..fa8a3886 100644 --- a/infra/conf/transport.go +++ b/infra/conf/transport.go @@ -13,6 +13,8 @@ type TransportConfig struct { HTTPConfig *HTTPConfig `json:"httpSettings"` DSConfig *DomainSocketConfig `json:"dsSettings"` QUICConfig *QUICConfig `json:"quicSettings"` + GRPCConfig *GRPCConfig `json:"grpcSettings"` + GUNConfig *GRPCConfig `json:"gunSettings"` } // Build implements Buildable. @@ -85,5 +87,19 @@ func (c *TransportConfig) Build() (*global.Config, error) { }) } + if c.GRPCConfig == nil { + c.GRPCConfig = c.GUNConfig + } + if c.GRPCConfig != nil { + gs, err := c.GRPCConfig.Build() + if err != nil { + return nil, newError("Failed to build gRPC config.").Base(err) + } + config.TransportSettings = append(config.TransportSettings, &internet.TransportConfig{ + ProtocolName: "grpc", + Settings: serial.ToTypedMessage(gs), + }) + } + return config, nil } diff --git a/infra/conf/transport_internet.go b/infra/conf/transport_internet.go index c8f5c18f..15d5dd32 100644 --- a/infra/conf/transport_internet.go +++ b/infra/conf/transport_internet.go @@ -458,6 +458,8 @@ func (p TransportProtocol) Build() (string, error) { return "domainsocket", nil case "quic": return "quic", nil + case "grpc", "gun": + return "grpc", nil default: return "", newError("Config: unknown transport protocol: ", p) } @@ -534,6 +536,8 @@ type StreamConfig struct { DSSettings *DomainSocketConfig `json:"dsSettings"` QUICSettings *QUICConfig `json:"quicSettings"` SocketSettings *SocketConfig `json:"sockopt"` + GRPCConfig *GRPCConfig `json:"grpcSettings"` + GUNConfig *GRPCConfig `json:"gunSettings"` } // Build implements Buildable. @@ -643,6 +647,19 @@ func (c *StreamConfig) Build() (*internet.StreamConfig, error) { Settings: serial.ToTypedMessage(qs), }) } + if c.GRPCConfig == nil { + c.GRPCConfig = c.GUNConfig + } + if c.GRPCConfig != nil { + gs, err := c.GRPCConfig.Build() + if err != nil { + return nil, newError("Failed to build gRPC config.").Base(err) + } + config.TransportSettings = append(config.TransportSettings, &internet.TransportConfig{ + ProtocolName: "grpc", + Settings: serial.ToTypedMessage(gs), + }) + } if c.SocketSettings != nil { ss, err := c.SocketSettings.Build() if err != nil { diff --git a/infra/conf/transport_test.go b/infra/conf/transport_test.go index acc15dd0..a3e9cc74 100644 --- a/infra/conf/transport_test.go +++ b/infra/conf/transport_test.go @@ -10,6 +10,7 @@ import ( . "github.com/xtls/xray-core/infra/conf" "github.com/xtls/xray-core/transport/global" "github.com/xtls/xray-core/transport/internet" + "github.com/xtls/xray-core/transport/internet/grpc" "github.com/xtls/xray-core/transport/internet/headers/http" "github.com/xtls/xray-core/transport/internet/headers/noop" "github.com/xtls/xray-core/transport/internet/headers/tls" @@ -120,6 +121,10 @@ func TestTransportConfig(t *testing.T) { "header": { "type": "dtls" } + }, + "grpcSettings": { + "serviceName": "name", + "multiMode": true } }`, Parser: createParser(), @@ -190,6 +195,31 @@ func TestTransportConfig(t *testing.T) { Header: serial.ToTypedMessage(&tls.PacketConfig{}), }), }, + { + ProtocolName: "grpc", + Settings: serial.ToTypedMessage(&grpc.Config{ + ServiceName: "name", + MultiMode: true, + }), + }, + }, + }, + }, + { + Input: `{ + "gunSettings": { + "serviceName": "name" + } + }`, + Parser: createParser(), + Output: &global.Config{ + TransportSettings: []*internet.TransportConfig{ + { + ProtocolName: "grpc", + Settings: serial.ToTypedMessage(&grpc.Config{ + ServiceName: "name", + }), + }, }, }, }, diff --git a/main/distro/all/all.go b/main/distro/all/all.go index 0284b33a..301987ce 100644 --- a/main/distro/all/all.go +++ b/main/distro/all/all.go @@ -40,6 +40,7 @@ import ( // Transports _ "github.com/xtls/xray-core/transport/internet/domainsocket" + _ "github.com/xtls/xray-core/transport/internet/grpc" _ "github.com/xtls/xray-core/transport/internet/http" _ "github.com/xtls/xray-core/transport/internet/kcp" _ "github.com/xtls/xray-core/transport/internet/quic" diff --git a/testing/scenarios/tls_test.go b/testing/scenarios/tls_test.go index cb420821..83699f2d 100644 --- a/testing/scenarios/tls_test.go +++ b/testing/scenarios/tls_test.go @@ -24,6 +24,7 @@ import ( "github.com/xtls/xray-core/testing/servers/tcp" "github.com/xtls/xray-core/testing/servers/udp" "github.com/xtls/xray-core/transport/internet" + "github.com/xtls/xray-core/transport/internet/grpc" "github.com/xtls/xray-core/transport/internet/http" "github.com/xtls/xray-core/transport/internet/tls" "github.com/xtls/xray-core/transport/internet/websocket" @@ -589,3 +590,239 @@ func TestHTTP2(t *testing.T) { t.Error(err) } } + +func TestGRPC(t *testing.T) { + tcpServer := tcp.Server{ + MsgProcessor: xor, + } + dest, err := tcpServer.Start() + common.Must(err) + defer tcpServer.Close() + + userID := protocol.NewID(uuid.New()) + serverPort := tcp.PickPort() + serverConfig := &core.Config{ + Inbound: []*core.InboundHandlerConfig{ + { + ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ + PortRange: net.SinglePortRange(serverPort), + Listen: net.NewIPOrDomain(net.LocalHostIP), + StreamSettings: &internet.StreamConfig{ + ProtocolName: "grpc", + TransportSettings: []*internet.TransportConfig{ + { + ProtocolName: "grpc", + Settings: serial.ToTypedMessage(&grpc.Config{ServiceName: "🍉"}), + }, + }, + SecurityType: serial.GetMessageType(&tls.Config{}), + SecuritySettings: []*serial.TypedMessage{ + serial.ToTypedMessage(&tls.Config{ + Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil))}, + }), + }, + }, + }), + ProxySettings: serial.ToTypedMessage(&inbound.Config{ + User: []*protocol.User{ + { + Account: serial.ToTypedMessage(&vmess.Account{ + Id: userID.String(), + }), + }, + }, + }), + }, + }, + Outbound: []*core.OutboundHandlerConfig{ + { + ProxySettings: serial.ToTypedMessage(&freedom.Config{}), + }, + }, + } + + clientPort := tcp.PickPort() + clientConfig := &core.Config{ + Inbound: []*core.InboundHandlerConfig{ + { + ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ + PortRange: net.SinglePortRange(clientPort), + Listen: net.NewIPOrDomain(net.LocalHostIP), + }), + ProxySettings: serial.ToTypedMessage(&dokodemo.Config{ + Address: net.NewIPOrDomain(dest.Address), + Port: uint32(dest.Port), + NetworkList: &net.NetworkList{ + Network: []net.Network{net.Network_TCP}, + }, + }), + }, + }, + Outbound: []*core.OutboundHandlerConfig{ + { + ProxySettings: serial.ToTypedMessage(&outbound.Config{ + Receiver: []*protocol.ServerEndpoint{ + { + Address: net.NewIPOrDomain(net.LocalHostIP), + Port: uint32(serverPort), + User: []*protocol.User{ + { + Account: serial.ToTypedMessage(&vmess.Account{ + Id: userID.String(), + }), + }, + }, + }, + }, + }), + SenderSettings: serial.ToTypedMessage(&proxyman.SenderConfig{ + StreamSettings: &internet.StreamConfig{ + ProtocolName: "grpc", + TransportSettings: []*internet.TransportConfig{ + { + ProtocolName: "grpc", + Settings: serial.ToTypedMessage(&grpc.Config{ServiceName: "🍉"}), + }, + }, + SecurityType: serial.GetMessageType(&tls.Config{}), + SecuritySettings: []*serial.TypedMessage{ + serial.ToTypedMessage(&tls.Config{ + AllowInsecure: true, + }), + }, + }, + }), + }, + }, + } + + servers, err := InitializeServerConfigs(serverConfig, clientConfig) + common.Must(err) + defer CloseAllServers(servers) + + var errg errgroup.Group + for i := 0; i < 10; i++ { + errg.Go(testTCPConn(clientPort, 1024*10240, time.Second*40)) + } + if err := errg.Wait(); err != nil { + t.Error(err) + } +} + +func TestGRPCMultiMode(t *testing.T) { + tcpServer := tcp.Server{ + MsgProcessor: xor, + } + dest, err := tcpServer.Start() + common.Must(err) + defer tcpServer.Close() + + userID := protocol.NewID(uuid.New()) + serverPort := tcp.PickPort() + serverConfig := &core.Config{ + Inbound: []*core.InboundHandlerConfig{ + { + ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ + PortRange: net.SinglePortRange(serverPort), + Listen: net.NewIPOrDomain(net.LocalHostIP), + StreamSettings: &internet.StreamConfig{ + ProtocolName: "grpc", + TransportSettings: []*internet.TransportConfig{ + { + ProtocolName: "grpc", + Settings: serial.ToTypedMessage(&grpc.Config{ServiceName: "🍉"}), + }, + }, + SecurityType: serial.GetMessageType(&tls.Config{}), + SecuritySettings: []*serial.TypedMessage{ + serial.ToTypedMessage(&tls.Config{ + Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil))}, + }), + }, + }, + }), + ProxySettings: serial.ToTypedMessage(&inbound.Config{ + User: []*protocol.User{ + { + Account: serial.ToTypedMessage(&vmess.Account{ + Id: userID.String(), + }), + }, + }, + }), + }, + }, + Outbound: []*core.OutboundHandlerConfig{ + { + ProxySettings: serial.ToTypedMessage(&freedom.Config{}), + }, + }, + } + + clientPort := tcp.PickPort() + clientConfig := &core.Config{ + Inbound: []*core.InboundHandlerConfig{ + { + ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ + PortRange: net.SinglePortRange(clientPort), + Listen: net.NewIPOrDomain(net.LocalHostIP), + }), + ProxySettings: serial.ToTypedMessage(&dokodemo.Config{ + Address: net.NewIPOrDomain(dest.Address), + Port: uint32(dest.Port), + NetworkList: &net.NetworkList{ + Network: []net.Network{net.Network_TCP}, + }, + }), + }, + }, + Outbound: []*core.OutboundHandlerConfig{ + { + ProxySettings: serial.ToTypedMessage(&outbound.Config{ + Receiver: []*protocol.ServerEndpoint{ + { + Address: net.NewIPOrDomain(net.LocalHostIP), + Port: uint32(serverPort), + User: []*protocol.User{ + { + Account: serial.ToTypedMessage(&vmess.Account{ + Id: userID.String(), + }), + }, + }, + }, + }, + }), + SenderSettings: serial.ToTypedMessage(&proxyman.SenderConfig{ + StreamSettings: &internet.StreamConfig{ + ProtocolName: "grpc", + TransportSettings: []*internet.TransportConfig{ + { + ProtocolName: "grpc", + Settings: serial.ToTypedMessage(&grpc.Config{ServiceName: "🍉", MultiMode: true}), + }, + }, + SecurityType: serial.GetMessageType(&tls.Config{}), + SecuritySettings: []*serial.TypedMessage{ + serial.ToTypedMessage(&tls.Config{ + AllowInsecure: true, + }), + }, + }, + }), + }, + }, + } + + servers, err := InitializeServerConfigs(serverConfig, clientConfig) + common.Must(err) + defer CloseAllServers(servers) + + var errg errgroup.Group + for i := 0; i < 10; i++ { + errg.Go(testTCPConn(clientPort, 1024*10240, time.Second*40)) + } + if err := errg.Wait(); err != nil { + t.Error(err) + } +} diff --git a/transport/internet/grpc/config.go b/transport/internet/grpc/config.go new file mode 100644 index 00000000..4be47ec2 --- /dev/null +++ b/transport/internet/grpc/config.go @@ -0,0 +1,14 @@ +package grpc + +import ( + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/transport/internet" +) + +const protocolName = "grpc" + +func init() { + common.Must(internet.RegisterProtocolConfigCreator(protocolName, func() interface{} { + return new(Config) + })) +} diff --git a/transport/internet/grpc/config.pb.go b/transport/internet/grpc/config.pb.go new file mode 100644 index 00000000..4f8fcb81 --- /dev/null +++ b/transport/internet/grpc/config.pb.go @@ -0,0 +1,172 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.15.6 +// source: transport/internet/grpc/config.proto + +package grpc + +import ( + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type Config struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` + ServiceName string `protobuf:"bytes,2,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"` + MultiMode bool `protobuf:"varint,3,opt,name=multi_mode,json=multiMode,proto3" json:"multi_mode,omitempty"` +} + +func (x *Config) Reset() { + *x = Config{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_grpc_config_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Config) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Config) ProtoMessage() {} + +func (x *Config) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_grpc_config_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Config.ProtoReflect.Descriptor instead. +func (*Config) Descriptor() ([]byte, []int) { + return file_transport_internet_grpc_config_proto_rawDescGZIP(), []int{0} +} + +func (x *Config) GetHost() string { + if x != nil { + return x.Host + } + return "" +} + +func (x *Config) GetServiceName() string { + if x != nil { + return x.ServiceName + } + return "" +} + +func (x *Config) GetMultiMode() bool { + if x != nil { + return x.MultiMode + } + return false +} + +var File_transport_internet_grpc_config_proto protoreflect.FileDescriptor + +var file_transport_internet_grpc_config_proto_rawDesc = []byte{ + 0x0a, 0x24, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x25, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, + 0x67, 0x72, 0x70, 0x63, 0x2e, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x22, 0x5e, 0x0a, + 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x73, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1d, + 0x0a, 0x0a, 0x6d, 0x75, 0x6c, 0x74, 0x69, 0x5f, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x09, 0x6d, 0x75, 0x6c, 0x74, 0x69, 0x4d, 0x6f, 0x64, 0x65, 0x42, 0x33, 0x5a, + 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, + 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, + 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x72, + 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_transport_internet_grpc_config_proto_rawDescOnce sync.Once + file_transport_internet_grpc_config_proto_rawDescData = file_transport_internet_grpc_config_proto_rawDesc +) + +func file_transport_internet_grpc_config_proto_rawDescGZIP() []byte { + file_transport_internet_grpc_config_proto_rawDescOnce.Do(func() { + file_transport_internet_grpc_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_transport_internet_grpc_config_proto_rawDescData) + }) + return file_transport_internet_grpc_config_proto_rawDescData +} + +var file_transport_internet_grpc_config_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_transport_internet_grpc_config_proto_goTypes = []interface{}{ + (*Config)(nil), // 0: xray.transport.internet.grpc.encoding.Config +} +var file_transport_internet_grpc_config_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_transport_internet_grpc_config_proto_init() } +func file_transport_internet_grpc_config_proto_init() { + if File_transport_internet_grpc_config_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_transport_internet_grpc_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Config); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_transport_internet_grpc_config_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_transport_internet_grpc_config_proto_goTypes, + DependencyIndexes: file_transport_internet_grpc_config_proto_depIdxs, + MessageInfos: file_transport_internet_grpc_config_proto_msgTypes, + }.Build() + File_transport_internet_grpc_config_proto = out.File + file_transport_internet_grpc_config_proto_rawDesc = nil + file_transport_internet_grpc_config_proto_goTypes = nil + file_transport_internet_grpc_config_proto_depIdxs = nil +} diff --git a/transport/internet/grpc/config.proto b/transport/internet/grpc/config.proto new file mode 100644 index 00000000..9c4330b2 --- /dev/null +++ b/transport/internet/grpc/config.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +package xray.transport.internet.grpc.encoding; +option go_package = "github.com/xtls/xray-core/transport/internet/grpc"; + +message Config { + string host = 1; + string service_name = 2; + bool multi_mode = 3; +} diff --git a/transport/internet/grpc/dial.go b/transport/internet/grpc/dial.go new file mode 100644 index 00000000..457f67df --- /dev/null +++ b/transport/internet/grpc/dial.go @@ -0,0 +1,130 @@ +package grpc + +import ( + "context" + gonet "net" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials" + + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/transport/internet" + "github.com/xtls/xray-core/transport/internet/grpc/encoding" + "github.com/xtls/xray-core/transport/internet/tls" +) + +func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) { + newError("creating connection to ", dest).WriteToLog(session.ExportIDToError(ctx)) + + conn, err := dialgRPC(ctx, dest, streamSettings) + if err != nil { + return nil, newError("failed to dial gRPC").Base(err) + } + return internet.Connection(conn), nil +} + +func init() { + common.Must(internet.RegisterTransportDialer(protocolName, Dial)) +} + +type dialerConf struct { + net.Destination + *internet.SocketConfig +} + +var ( + globalDialerMap map[dialerConf]*grpc.ClientConn + globalDialerAccess sync.Mutex +) + +func dialgRPC(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (net.Conn, error) { + grpcSettings := streamSettings.ProtocolSettings.(*Config) + + config := tls.ConfigFromStreamSettings(streamSettings) + var dialOption = grpc.WithInsecure() + + if config != nil { + dialOption = grpc.WithTransportCredentials(credentials.NewTLS(config.GetTLSConfig())) + } + + conn, err := getGrpcClient(ctx, dest, dialOption, streamSettings.SocketSettings) + + if err != nil { + return nil, newError("Cannot dial gRPC").Base(err) + } + client := encoding.NewGRPCServiceClient(conn) + if grpcSettings.MultiMode { + newError("using gRPC multi mode").AtDebug().WriteToLog() + grpcService, err := client.(encoding.GRPCServiceClientX).TunMultiCustomName(ctx, grpcSettings.ServiceName) + if err != nil { + return nil, newError("Cannot dial gRPC").Base(err) + } + return encoding.NewMultiHunkConn(grpcService, nil), nil + } + + grpcService, err := client.(encoding.GRPCServiceClientX).TunCustomName(ctx, grpcSettings.ServiceName) + if err != nil { + return nil, newError("Cannot dial gRPC").Base(err) + } + + return encoding.NewHunkConn(grpcService, nil), nil +} + +func getGrpcClient(ctx context.Context, dest net.Destination, dialOption grpc.DialOption, sockopt *internet.SocketConfig) (*grpc.ClientConn, error) { + globalDialerAccess.Lock() + defer globalDialerAccess.Unlock() + + if globalDialerMap == nil { + globalDialerMap = make(map[dialerConf]*grpc.ClientConn) + } + + if client, found := globalDialerMap[dialerConf{dest, sockopt}]; found && client.GetState() != connectivity.Shutdown { + return client, nil + } + + conn, err := grpc.Dial( + gonet.JoinHostPort(dest.Address.String(), dest.Port.String()), + dialOption, + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: 500 * time.Millisecond, + Multiplier: 1.5, + Jitter: 0.2, + MaxDelay: 19 * time.Second, + }, + MinConnectTimeout: 5 * time.Second, + }), + grpc.WithContextDialer(func(gctx context.Context, s string) (gonet.Conn, error) { + gctx = session.ContextWithID(gctx, session.IDFromContext(ctx)) + gctx = session.ContextWithOutbound(gctx, session.OutboundFromContext(ctx)) + + rawHost, rawPort, err := net.SplitHostPort(s) + select { + case <-gctx.Done(): + return nil, gctx.Err() + default: + } + + if err != nil { + return nil, err + } + if len(rawPort) == 0 { + rawPort = "443" + } + port, err := net.PortFromString(rawPort) + if err != nil { + return nil, err + } + address := net.ParseAddress(rawHost) + return internet.DialSystem(gctx, net.TCPDestination(address, port), sockopt) + }), + ) + globalDialerMap[dialerConf{dest, sockopt}] = conn + return conn, err +} diff --git a/transport/internet/grpc/encoding/customSeviceName.go b/transport/internet/grpc/encoding/customSeviceName.go new file mode 100644 index 00000000..d37d61f8 --- /dev/null +++ b/transport/internet/grpc/encoding/customSeviceName.go @@ -0,0 +1,60 @@ +package encoding + +import ( + "context" + + "google.golang.org/grpc" +) + +func ServerDesc(name string) grpc.ServiceDesc { + return grpc.ServiceDesc{ + ServiceName: name, + HandlerType: (*GRPCServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Tun", + Handler: _GRPCService_Tun_Handler, + ServerStreams: true, + ClientStreams: true, + }, + { + StreamName: "TunMulti", + Handler: _GRPCService_TunMulti_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "grpc.proto", + } +} + +func (c *gRPCServiceClient) TunCustomName(ctx context.Context, name string, opts ...grpc.CallOption) (GRPCService_TunClient, error) { + stream, err := c.cc.NewStream(ctx, &ServerDesc(name).Streams[0], "/"+name+"/Tun", opts...) + if err != nil { + return nil, err + } + x := &gRPCServiceTunClient{stream} + return x, nil +} + +func (c *gRPCServiceClient) TunMultiCustomName(ctx context.Context, name string, opts ...grpc.CallOption) (GRPCService_TunMultiClient, error) { + stream, err := c.cc.NewStream(ctx, &ServerDesc(name).Streams[0], "/"+name+"/TunMulti", opts...) + if err != nil { + return nil, err + } + x := &gRPCServiceTunMultiClient{stream} + return x, nil +} + +type GRPCServiceClientX interface { + TunCustomName(ctx context.Context, name string, opts ...grpc.CallOption) (GRPCService_TunClient, error) + TunMultiCustomName(ctx context.Context, name string, opts ...grpc.CallOption) (GRPCService_TunMultiClient, error) + Tun(ctx context.Context, opts ...grpc.CallOption) (GRPCService_TunClient, error) + TunMulti(ctx context.Context, opts ...grpc.CallOption) (GRPCService_TunMultiClient, error) +} + +func RegisterGRPCServiceServerX(s *grpc.Server, srv GRPCServiceServer, name string) { + desc := ServerDesc(name) + s.RegisterService(&desc, srv) +} diff --git a/transport/internet/grpc/encoding/encoding.go b/transport/internet/grpc/encoding/encoding.go new file mode 100644 index 00000000..f8e3e368 --- /dev/null +++ b/transport/internet/grpc/encoding/encoding.go @@ -0,0 +1,3 @@ +package encoding + +//go:generate go run github.com/xtls/xray-core/common/errors/errorgen diff --git a/transport/internet/grpc/encoding/errors.generated.go b/transport/internet/grpc/encoding/errors.generated.go new file mode 100644 index 00000000..267711d9 --- /dev/null +++ b/transport/internet/grpc/encoding/errors.generated.go @@ -0,0 +1,9 @@ +package encoding + +import "github.com/xtls/xray-core/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +} diff --git a/transport/internet/grpc/encoding/hunkconn.go b/transport/internet/grpc/encoding/hunkconn.go new file mode 100644 index 00000000..50815144 --- /dev/null +++ b/transport/internet/grpc/encoding/hunkconn.go @@ -0,0 +1,123 @@ +package encoding + +import ( + "context" + "io" + "net" + + "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/common/net/cnc" + "github.com/xtls/xray-core/common/signal/done" +) + +type HunkConn interface { + Send(*Hunk) error + Recv() (*Hunk, error) + SendMsg(m interface{}) error + RecvMsg(m interface{}) error +} + +type StreamCloser interface { + CloseSend() error +} + +type HunkReaderWriter struct { + hc HunkConn + cancel context.CancelFunc + done *done.Instance + + buf []byte + index int +} + +func NewHunkReadWriter(hc HunkConn, cancel context.CancelFunc) *HunkReaderWriter { + return &HunkReaderWriter{hc, cancel, done.New(), nil, 0} +} + +func NewHunkConn(hc HunkConn, cancel context.CancelFunc) net.Conn { + wrc := NewHunkReadWriter(hc, cancel) + return cnc.NewConnection( + cnc.ConnectionInput(wrc), + cnc.ConnectionOutput(wrc), + cnc.ConnectionOnClose(wrc), + ) +} + +func (h *HunkReaderWriter) forceFetch() error { + hunk, err := h.hc.Recv() + if err != nil { + if err == io.EOF { + return err + } + + return newError("failed to fetch hunk from gRPC tunnel").Base(err) + } + + h.buf = hunk.Data + h.index = 0 + + return nil +} + +func (h *HunkReaderWriter) Read(buf []byte) (int, error) { + if h.done.Done() { + return 0, io.EOF + } + + if h.index >= len(h.buf) { + if err := h.forceFetch(); err != nil { + return 0, err + } + } + n := copy(buf, h.buf[h.index:]) + h.index += n + + return n, nil +} + +func (h *HunkReaderWriter) ReadMultiBuffer() (buf.MultiBuffer, error) { + if h.done.Done() { + return nil, io.EOF + } + if h.index >= len(h.buf) { + if err := h.forceFetch(); err != nil { + return nil, err + } + } + + if cap(h.buf) == buf.Size { + b := h.buf + h.index = len(h.buf) + return buf.MultiBuffer{buf.NewExisted(b)}, nil + } + + b := buf.New() + _, err := b.ReadFrom(h) + if err != nil { + return nil, err + } + return buf.MultiBuffer{b}, nil +} + +func (h *HunkReaderWriter) Write(buf []byte) (int, error) { + if h.done.Done() { + return 0, io.ErrClosedPipe + } + + err := h.hc.Send(&Hunk{Data: buf[:]}) + if err != nil { + return 0, newError("failed to send data over gRPC tunnel").Base(err) + } + return len(buf), nil +} + +func (h *HunkReaderWriter) Close() error { + if h.cancel != nil { + h.cancel() + } + if sc, match := h.hc.(StreamCloser); match { + return sc.CloseSend() + } + + return h.done.Close() +} diff --git a/transport/internet/grpc/encoding/multiconn.go b/transport/internet/grpc/encoding/multiconn.go new file mode 100644 index 00000000..b2b865ab --- /dev/null +++ b/transport/internet/grpc/encoding/multiconn.go @@ -0,0 +1,108 @@ +package encoding + +import ( + "context" + "io" + "net" + + "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/common/net/cnc" + "github.com/xtls/xray-core/common/signal/done" +) + +type MultiHunkConn interface { + Send(*MultiHunk) error + Recv() (*MultiHunk, error) + SendMsg(m interface{}) error + RecvMsg(m interface{}) error +} + +type MultiHunkReaderWriter struct { + hc MultiHunkConn + cancel context.CancelFunc + done *done.Instance + + buf [][]byte +} + +func NewMultiHunkReadWriter(hc MultiHunkConn, cancel context.CancelFunc) *MultiHunkReaderWriter { + return &MultiHunkReaderWriter{hc, cancel, done.New(), nil} +} + +func NewMultiHunkConn(hc MultiHunkConn, cancel context.CancelFunc) net.Conn { + wrc := NewMultiHunkReadWriter(hc, cancel) + return cnc.NewConnection( + cnc.ConnectionInputMulti(wrc), + cnc.ConnectionOutputMulti(wrc), + cnc.ConnectionOnClose(wrc), + ) +} + +func (h *MultiHunkReaderWriter) forceFetch() error { + hunk, err := h.hc.Recv() + if err != nil { + if err == io.EOF { + return err + } + + return newError("failed to fetch hunk from gRPC tunnel").Base(err) + } + + h.buf = hunk.Data + + return nil +} + +func (h *MultiHunkReaderWriter) ReadMultiBuffer() (buf.MultiBuffer, error) { + if h.done.Done() { + return nil, io.EOF + } + + if err := h.forceFetch(); err != nil { + return nil, err + } + + var mb = make(buf.MultiBuffer, 0, len(h.buf)) + for _, b := range h.buf { + if cap(b) >= buf.Size { + mb = append(mb, buf.NewExisted(b)) + continue + } + + nb := buf.New() + nb.Extend(int32(len(b))) + copy(nb.Bytes(), b) + + mb = append(mb, nb) + } + return mb, nil +} + +func (h *MultiHunkReaderWriter) WriteMultiBuffer(mb buf.MultiBuffer) error { + defer buf.ReleaseMulti(mb) + if h.done.Done() { + return io.ErrClosedPipe + } + + hunk := &MultiHunk{Data: make([][]byte, len(mb))} + for _, b := range mb { + hunk.Data = append(hunk.Data, b.Bytes()) + } + + err := h.hc.Send(hunk) + if err != nil { + return err + } + return nil +} + +func (h *MultiHunkReaderWriter) Close() error { + if h.cancel != nil { + h.cancel() + } + if sc, match := h.hc.(StreamCloser); match { + return sc.CloseSend() + } + + return h.done.Close() +} diff --git a/transport/internet/grpc/encoding/stream.pb.go b/transport/internet/grpc/encoding/stream.pb.go new file mode 100644 index 00000000..122ab50e --- /dev/null +++ b/transport/internet/grpc/encoding/stream.pb.go @@ -0,0 +1,234 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.15.6 +// source: transport/internet/grpc/encoding/stream.proto + +package encoding + +import ( + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type Hunk struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *Hunk) Reset() { + *x = Hunk{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_grpc_encoding_stream_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Hunk) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Hunk) ProtoMessage() {} + +func (x *Hunk) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_grpc_encoding_stream_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Hunk.ProtoReflect.Descriptor instead. +func (*Hunk) Descriptor() ([]byte, []int) { + return file_transport_internet_grpc_encoding_stream_proto_rawDescGZIP(), []int{0} +} + +func (x *Hunk) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +type MultiHunk struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data [][]byte `protobuf:"bytes,1,rep,name=data,proto3" json:"data,omitempty"` +} + +func (x *MultiHunk) Reset() { + *x = MultiHunk{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_grpc_encoding_stream_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MultiHunk) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MultiHunk) ProtoMessage() {} + +func (x *MultiHunk) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_grpc_encoding_stream_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MultiHunk.ProtoReflect.Descriptor instead. +func (*MultiHunk) Descriptor() ([]byte, []int) { + return file_transport_internet_grpc_encoding_stream_proto_rawDescGZIP(), []int{1} +} + +func (x *MultiHunk) GetData() [][]byte { + if x != nil { + return x.Data + } + return nil +} + +var File_transport_internet_grpc_encoding_stream_proto protoreflect.FileDescriptor + +var file_transport_internet_grpc_encoding_stream_proto_rawDesc = []byte{ + 0x0a, 0x2d, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x69, + 0x6e, 0x67, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x25, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x65, 0x6e, + 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x22, 0x1a, 0x0a, 0x04, 0x48, 0x75, 0x6e, 0x6b, 0x12, 0x12, + 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x22, 0x1f, 0x0a, 0x09, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x48, 0x75, 0x6e, 0x6b, 0x12, + 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x32, 0xe6, 0x01, 0x0a, 0x0b, 0x47, 0x52, 0x50, 0x43, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x12, 0x63, 0x0a, 0x03, 0x54, 0x75, 0x6e, 0x12, 0x2b, 0x2e, 0x78, 0x72, 0x61, + 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x69, + 0x6e, 0x67, 0x2e, 0x48, 0x75, 0x6e, 0x6b, 0x1a, 0x2b, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, + 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, + 0x74, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x2e, + 0x48, 0x75, 0x6e, 0x6b, 0x28, 0x01, 0x30, 0x01, 0x12, 0x72, 0x0a, 0x08, 0x54, 0x75, 0x6e, 0x4d, + 0x75, 0x6c, 0x74, 0x69, 0x12, 0x30, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, + 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x67, + 0x72, 0x70, 0x63, 0x2e, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x2e, 0x4d, 0x75, 0x6c, + 0x74, 0x69, 0x48, 0x75, 0x6e, 0x6b, 0x1a, 0x30, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, + 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, + 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x2e, 0x4d, + 0x75, 0x6c, 0x74, 0x69, 0x48, 0x75, 0x6e, 0x6b, 0x28, 0x01, 0x30, 0x01, 0x42, 0x3c, 0x5a, 0x3a, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, + 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, + 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x72, 0x70, + 0x63, 0x2f, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_transport_internet_grpc_encoding_stream_proto_rawDescOnce sync.Once + file_transport_internet_grpc_encoding_stream_proto_rawDescData = file_transport_internet_grpc_encoding_stream_proto_rawDesc +) + +func file_transport_internet_grpc_encoding_stream_proto_rawDescGZIP() []byte { + file_transport_internet_grpc_encoding_stream_proto_rawDescOnce.Do(func() { + file_transport_internet_grpc_encoding_stream_proto_rawDescData = protoimpl.X.CompressGZIP(file_transport_internet_grpc_encoding_stream_proto_rawDescData) + }) + return file_transport_internet_grpc_encoding_stream_proto_rawDescData +} + +var file_transport_internet_grpc_encoding_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_transport_internet_grpc_encoding_stream_proto_goTypes = []interface{}{ + (*Hunk)(nil), // 0: xray.transport.internet.grpc.encoding.Hunk + (*MultiHunk)(nil), // 1: xray.transport.internet.grpc.encoding.MultiHunk +} +var file_transport_internet_grpc_encoding_stream_proto_depIdxs = []int32{ + 0, // 0: xray.transport.internet.grpc.encoding.GRPCService.Tun:input_type -> xray.transport.internet.grpc.encoding.Hunk + 1, // 1: xray.transport.internet.grpc.encoding.GRPCService.TunMulti:input_type -> xray.transport.internet.grpc.encoding.MultiHunk + 0, // 2: xray.transport.internet.grpc.encoding.GRPCService.Tun:output_type -> xray.transport.internet.grpc.encoding.Hunk + 1, // 3: xray.transport.internet.grpc.encoding.GRPCService.TunMulti:output_type -> xray.transport.internet.grpc.encoding.MultiHunk + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_transport_internet_grpc_encoding_stream_proto_init() } +func file_transport_internet_grpc_encoding_stream_proto_init() { + if File_transport_internet_grpc_encoding_stream_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_transport_internet_grpc_encoding_stream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Hunk); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_transport_internet_grpc_encoding_stream_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MultiHunk); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_transport_internet_grpc_encoding_stream_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_transport_internet_grpc_encoding_stream_proto_goTypes, + DependencyIndexes: file_transport_internet_grpc_encoding_stream_proto_depIdxs, + MessageInfos: file_transport_internet_grpc_encoding_stream_proto_msgTypes, + }.Build() + File_transport_internet_grpc_encoding_stream_proto = out.File + file_transport_internet_grpc_encoding_stream_proto_rawDesc = nil + file_transport_internet_grpc_encoding_stream_proto_goTypes = nil + file_transport_internet_grpc_encoding_stream_proto_depIdxs = nil +} diff --git a/transport/internet/grpc/encoding/stream.proto b/transport/internet/grpc/encoding/stream.proto new file mode 100644 index 00000000..63898a77 --- /dev/null +++ b/transport/internet/grpc/encoding/stream.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package xray.transport.internet.grpc.encoding; +option go_package = "github.com/xtls/xray-core/transport/internet/grpc/encoding"; + +message Hunk { + bytes data = 1; +} + +message MultiHunk { + repeated bytes data = 1; +} + +service GRPCService { + rpc Tun (stream Hunk) returns (stream Hunk); + rpc TunMulti (stream MultiHunk) returns (stream MultiHunk); +} diff --git a/transport/internet/grpc/encoding/stream_grpc.pb.go b/transport/internet/grpc/encoding/stream_grpc.pb.go new file mode 100644 index 00000000..42e440a6 --- /dev/null +++ b/transport/internet/grpc/encoding/stream_grpc.pb.go @@ -0,0 +1,201 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package encoding + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// GRPCServiceClient is the client API for GRPCService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type GRPCServiceClient interface { + Tun(ctx context.Context, opts ...grpc.CallOption) (GRPCService_TunClient, error) + TunMulti(ctx context.Context, opts ...grpc.CallOption) (GRPCService_TunMultiClient, error) +} + +type gRPCServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewGRPCServiceClient(cc grpc.ClientConnInterface) GRPCServiceClient { + return &gRPCServiceClient{cc} +} + +func (c *gRPCServiceClient) Tun(ctx context.Context, opts ...grpc.CallOption) (GRPCService_TunClient, error) { + stream, err := c.cc.NewStream(ctx, &GRPCService_ServiceDesc.Streams[0], "/xray.transport.internet.grpc.encoding.GRPCService/Tun", opts...) + if err != nil { + return nil, err + } + x := &gRPCServiceTunClient{stream} + return x, nil +} + +type GRPCService_TunClient interface { + Send(*Hunk) error + Recv() (*Hunk, error) + grpc.ClientStream +} + +type gRPCServiceTunClient struct { + grpc.ClientStream +} + +func (x *gRPCServiceTunClient) Send(m *Hunk) error { + return x.ClientStream.SendMsg(m) +} + +func (x *gRPCServiceTunClient) Recv() (*Hunk, error) { + m := new(Hunk) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *gRPCServiceClient) TunMulti(ctx context.Context, opts ...grpc.CallOption) (GRPCService_TunMultiClient, error) { + stream, err := c.cc.NewStream(ctx, &GRPCService_ServiceDesc.Streams[1], "/xray.transport.internet.grpc.encoding.GRPCService/TunMulti", opts...) + if err != nil { + return nil, err + } + x := &gRPCServiceTunMultiClient{stream} + return x, nil +} + +type GRPCService_TunMultiClient interface { + Send(*MultiHunk) error + Recv() (*MultiHunk, error) + grpc.ClientStream +} + +type gRPCServiceTunMultiClient struct { + grpc.ClientStream +} + +func (x *gRPCServiceTunMultiClient) Send(m *MultiHunk) error { + return x.ClientStream.SendMsg(m) +} + +func (x *gRPCServiceTunMultiClient) Recv() (*MultiHunk, error) { + m := new(MultiHunk) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// GRPCServiceServer is the server API for GRPCService service. +// All implementations must embed UnimplementedGRPCServiceServer +// for forward compatibility +type GRPCServiceServer interface { + Tun(GRPCService_TunServer) error + TunMulti(GRPCService_TunMultiServer) error + mustEmbedUnimplementedGRPCServiceServer() +} + +// UnimplementedGRPCServiceServer must be embedded to have forward compatible implementations. +type UnimplementedGRPCServiceServer struct { +} + +func (UnimplementedGRPCServiceServer) Tun(GRPCService_TunServer) error { + return status.Errorf(codes.Unimplemented, "method Tun not implemented") +} +func (UnimplementedGRPCServiceServer) TunMulti(GRPCService_TunMultiServer) error { + return status.Errorf(codes.Unimplemented, "method TunMulti not implemented") +} +func (UnimplementedGRPCServiceServer) mustEmbedUnimplementedGRPCServiceServer() {} + +// UnsafeGRPCServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to GRPCServiceServer will +// result in compilation errors. +type UnsafeGRPCServiceServer interface { + mustEmbedUnimplementedGRPCServiceServer() +} + +func RegisterGRPCServiceServer(s grpc.ServiceRegistrar, srv GRPCServiceServer) { + s.RegisterService(&GRPCService_ServiceDesc, srv) +} + +func _GRPCService_Tun_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(GRPCServiceServer).Tun(&gRPCServiceTunServer{stream}) +} + +type GRPCService_TunServer interface { + Send(*Hunk) error + Recv() (*Hunk, error) + grpc.ServerStream +} + +type gRPCServiceTunServer struct { + grpc.ServerStream +} + +func (x *gRPCServiceTunServer) Send(m *Hunk) error { + return x.ServerStream.SendMsg(m) +} + +func (x *gRPCServiceTunServer) Recv() (*Hunk, error) { + m := new(Hunk) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _GRPCService_TunMulti_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(GRPCServiceServer).TunMulti(&gRPCServiceTunMultiServer{stream}) +} + +type GRPCService_TunMultiServer interface { + Send(*MultiHunk) error + Recv() (*MultiHunk, error) + grpc.ServerStream +} + +type gRPCServiceTunMultiServer struct { + grpc.ServerStream +} + +func (x *gRPCServiceTunMultiServer) Send(m *MultiHunk) error { + return x.ServerStream.SendMsg(m) +} + +func (x *gRPCServiceTunMultiServer) Recv() (*MultiHunk, error) { + m := new(MultiHunk) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// GRPCService_ServiceDesc is the grpc.ServiceDesc for GRPCService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var GRPCService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "xray.transport.internet.grpc.encoding.GRPCService", + HandlerType: (*GRPCServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Tun", + Handler: _GRPCService_Tun_Handler, + ServerStreams: true, + ClientStreams: true, + }, + { + StreamName: "TunMulti", + Handler: _GRPCService_TunMulti_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "transport/internet/grpc/encoding/stream.proto", +} diff --git a/transport/internet/grpc/errors.generated.go b/transport/internet/grpc/errors.generated.go new file mode 100644 index 00000000..290cc82f --- /dev/null +++ b/transport/internet/grpc/errors.generated.go @@ -0,0 +1,9 @@ +package grpc + +import "github.com/xtls/xray-core/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +} diff --git a/transport/internet/grpc/grpc.go b/transport/internet/grpc/grpc.go new file mode 100644 index 00000000..6caf155b --- /dev/null +++ b/transport/internet/grpc/grpc.go @@ -0,0 +1,3 @@ +package grpc + +//go:generate go run github.com/xtls/xray-core/common/errors/errorgen diff --git a/transport/internet/grpc/hub.go b/transport/internet/grpc/hub.go new file mode 100644 index 00000000..0f7ab6ba --- /dev/null +++ b/transport/internet/grpc/hub.go @@ -0,0 +1,129 @@ +package grpc + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/transport/internet" + "github.com/xtls/xray-core/transport/internet/grpc/encoding" + "github.com/xtls/xray-core/transport/internet/tls" +) + +type Listener struct { + encoding.UnimplementedGRPCServiceServer + ctx context.Context + handler internet.ConnHandler + local net.Addr + config *Config + locker *internet.FileLocker // for unix domain socket + + s *grpc.Server +} + +func (l Listener) Tun(server encoding.GRPCService_TunServer) error { + tunCtx, cancel := context.WithCancel(l.ctx) + l.handler(encoding.NewHunkConn(server, cancel)) + <-tunCtx.Done() + return nil +} + +func (l Listener) TunMulti(server encoding.GRPCService_TunMultiServer) error { + tunCtx, cancel := context.WithCancel(l.ctx) + l.handler(encoding.NewMultiHunkConn(server, cancel)) + <-tunCtx.Done() + return nil +} + +func (l Listener) Close() error { + l.s.Stop() + return nil +} + +func (l Listener) Addr() net.Addr { + return l.local +} + +func Listen(ctx context.Context, address net.Address, port net.Port, settings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) { + grpcSettings := settings.ProtocolSettings.(*Config) + var listener *Listener + if port == net.Port(0) { // unix + listener = &Listener{ + handler: handler, + local: &net.UnixAddr{ + Name: address.Domain(), + Net: "unix", + }, + config: grpcSettings, + } + } else { // tcp + listener = &Listener{ + handler: handler, + local: &net.TCPAddr{ + IP: address.IP(), + Port: int(port), + }, + config: grpcSettings, + } + } + + listener.ctx = ctx + + config := tls.ConfigFromStreamSettings(settings) + + var s *grpc.Server + if config == nil { + s = grpc.NewServer() + } else { + s = grpc.NewServer(grpc.Creds(credentials.NewTLS(config.GetTLSConfig(tls.WithNextProto("h2"))))) + } + listener.s = s + + if settings.SocketSettings != nil && settings.SocketSettings.AcceptProxyProtocol { + newError("accepting PROXY protocol").AtWarning().WriteToLog(session.ExportIDToError(ctx)) + } + + go func() { + var streamListener net.Listener + var err error + if port == net.Port(0) { // unix + streamListener, err = internet.ListenSystem(ctx, &net.UnixAddr{ + Name: address.Domain(), + Net: "unix", + }, settings.SocketSettings) + if err != nil { + newError("failed to listen on ", address).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx)) + return + } + locker := ctx.Value(address.Domain()) + if locker != nil { + listener.locker = locker.(*internet.FileLocker) + } + } else { // tcp + streamListener, err = internet.ListenSystem(ctx, &net.TCPAddr{ + IP: address.IP(), + Port: int(port), + }, settings.SocketSettings) + if err != nil { + newError("failed to listen on ", address, ":", port).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx)) + return + } + } + + encoding.RegisterGRPCServiceServerX(s, listener, grpcSettings.ServiceName) + + if err = s.Serve(streamListener); err != nil { + newError("Listener for gRPC ended").Base(err).WriteToLog() + } + }() + + return listener, nil +} + +func init() { + common.Must(internet.RegisterTransportListener(protocolName, Listen)) +} diff --git a/transport/internet/http/dialer.go b/transport/internet/http/dialer.go index e7d6242a..dc2cd8ab 100644 --- a/transport/internet/http/dialer.go +++ b/transport/internet/http/dialer.go @@ -18,8 +18,13 @@ import ( "golang.org/x/net/http2" ) +type dialerConf struct { + net.Destination + *internet.SocketConfig +} + var ( - globalDialerMap map[net.Destination]*http.Client + globalDialerMap map[dialerConf]*http.Client globalDialerAccess sync.Mutex ) @@ -28,10 +33,10 @@ func getHTTPClient(ctx context.Context, dest net.Destination, tlsSettings *tls.C defer globalDialerAccess.Unlock() if globalDialerMap == nil { - globalDialerMap = make(map[net.Destination]*http.Client) + globalDialerMap = make(map[dialerConf]*http.Client) } - if client, found := globalDialerMap[dest]; found { + if client, found := globalDialerMap[dialerConf{dest, sockopt}]; found { return client, nil } @@ -87,7 +92,7 @@ func getHTTPClient(ctx context.Context, dest net.Destination, tlsSettings *tls.C Transport: transport, } - globalDialerMap[dest] = client + globalDialerMap[dialerConf{dest, sockopt}] = client return client, nil }