From 8a4217fdf57d31e7ddbfdccecc36bbfa6b311dfa Mon Sep 17 00:00:00 2001 From: mmmray <142015632+mmmray@users.noreply.github.com> Date: Sat, 27 Jul 2024 14:52:36 +0200 Subject: [PATCH] SplitHTTP client: Add minUploadInterval (#3592) --- infra/conf/common.go | 31 +++++ infra/conf/transport_internet.go | 5 + transport/internet/splithttp/config.go | 23 ++++ transport/internet/splithttp/config.pb.go | 137 ++++++++++++++++++---- transport/internet/splithttp/config.proto | 6 + transport/internet/splithttp/dialer.go | 11 ++ 6 files changed, 188 insertions(+), 25 deletions(-) diff --git a/infra/conf/common.go b/infra/conf/common.go index 9f9c0068..9d31bc0a 100644 --- a/infra/conf/common.go +++ b/infra/conf/common.go @@ -2,6 +2,7 @@ package conf import ( "encoding/json" + "strconv" "strings" "github.com/xtls/xray-core/common/errors" @@ -242,3 +243,33 @@ func (v *User) Build() *protocol.User { Level: uint32(v.LevelByte), } } + +// Int32Range deserializes from "1-2" or 1, so can deserialize from both int and number. +// Negative integers can be passed as sentinel values, but do not parse as ranges. +type Int32Range struct { + From int32 + To int32 +} + +func (v *Int32Range) UnmarshalJSON(data []byte) error { + var stringrange string + var rawint int32 + if err := json.Unmarshal(data, &stringrange); err == nil { + pair := strings.SplitN(stringrange, "-", 2) + if len(pair) == 2 { + from, err := strconv.Atoi(pair[0]) + to, err2 := strconv.Atoi(pair[1]) + if err == nil && err2 == nil { + v.From = int32(from) + v.To = int32(to) + return nil + } + } + } else if err := json.Unmarshal(data, &rawint); err == nil { + v.From = rawint + v.To = rawint + return nil + } + + return errors.New("Invalid integer range, expected either string of form \"1-2\" or plain integer.") +} diff --git a/infra/conf/transport_internet.go b/infra/conf/transport_internet.go index 30fb3e74..bfaa7481 100644 --- a/infra/conf/transport_internet.go +++ b/infra/conf/transport_internet.go @@ -231,6 +231,7 @@ type SplitHTTPConfig struct { Headers map[string]string `json:"headers"` MaxConcurrentUploads int32 `json:"maxConcurrentUploads"` MaxUploadSize int32 `json:"maxUploadSize"` + MinUploadIntervalMs Int32Range `json:"minUploadIntervalMs"` } // Build implements Buildable. @@ -249,6 +250,10 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) { Header: c.Headers, MaxConcurrentUploads: c.MaxConcurrentUploads, MaxUploadSize: c.MaxUploadSize, + MinUploadIntervalMs: &splithttp.RandRangeConfig{ + From: c.MinUploadIntervalMs.From, + To: c.MinUploadIntervalMs.To, + }, } return config, nil } diff --git a/transport/internet/splithttp/config.go b/transport/internet/splithttp/config.go index 4c44e361..3aa318c8 100644 --- a/transport/internet/splithttp/config.go +++ b/transport/internet/splithttp/config.go @@ -1,6 +1,8 @@ package splithttp import ( + "crypto/rand" + "math/big" "net/http" "github.com/xtls/xray-core/common" @@ -45,8 +47,29 @@ func (c *Config) GetNormalizedMaxUploadSize() int32 { return c.MaxUploadSize } +func (c *Config) GetNormalizedMinUploadInterval() RandRangeConfig { + r := c.MinUploadIntervalMs + + if r == nil { + r = &RandRangeConfig{ + From: 30, + To: 30, + } + } + + return *r +} + func init() { common.Must(internet.RegisterProtocolConfigCreator(protocolName, func() interface{} { return new(Config) })) } + +func (c RandRangeConfig) roll() int32 { + if c.From == c.To { + return c.From + } + bigInt, _ := rand.Int(rand.Reader, big.NewInt(int64(c.To-c.From))) + return c.From + int32(bigInt.Int64()) +} diff --git a/transport/internet/splithttp/config.pb.go b/transport/internet/splithttp/config.pb.go index 3975b9f6..43f57e2f 100644 --- a/transport/internet/splithttp/config.pb.go +++ b/transport/internet/splithttp/config.pb.go @@ -30,6 +30,7 @@ type Config struct { Header map[string]string `protobuf:"bytes,3,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` MaxConcurrentUploads int32 `protobuf:"varint,4,opt,name=maxConcurrentUploads,proto3" json:"maxConcurrentUploads,omitempty"` MaxUploadSize int32 `protobuf:"varint,5,opt,name=maxUploadSize,proto3" json:"maxUploadSize,omitempty"` + MinUploadIntervalMs *RandRangeConfig `protobuf:"bytes,6,opt,name=minUploadIntervalMs,proto3" json:"minUploadIntervalMs,omitempty"` } func (x *Config) Reset() { @@ -99,6 +100,68 @@ func (x *Config) GetMaxUploadSize() int32 { return 0 } +func (x *Config) GetMinUploadIntervalMs() *RandRangeConfig { + if x != nil { + return x.MinUploadIntervalMs + } + return nil +} + +type RandRangeConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + From int32 `protobuf:"varint,1,opt,name=from,proto3" json:"from,omitempty"` + To int32 `protobuf:"varint,2,opt,name=to,proto3" json:"to,omitempty"` +} + +func (x *RandRangeConfig) Reset() { + *x = RandRangeConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_splithttp_config_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RandRangeConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RandRangeConfig) ProtoMessage() {} + +func (x *RandRangeConfig) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_splithttp_config_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RandRangeConfig.ProtoReflect.Descriptor instead. +func (*RandRangeConfig) Descriptor() ([]byte, []int) { + return file_transport_internet_splithttp_config_proto_rawDescGZIP(), []int{1} +} + +func (x *RandRangeConfig) GetFrom() int32 { + if x != nil { + return x.From + } + return 0 +} + +func (x *RandRangeConfig) GetTo() int32 { + if x != nil { + return x.To + } + return 0 +} + var File_transport_internet_splithttp_config_proto protoreflect.FileDescriptor var file_transport_internet_splithttp_config_proto_rawDesc = []byte{ @@ -106,7 +169,7 @@ var file_transport_internet_splithttp_config_proto_rawDesc = []byte{ 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x21, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, - 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x22, 0x94, + 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x22, 0xfa, 0x02, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, @@ -120,20 +183,30 @@ var file_transport_internet_splithttp_config_proto_rawDesc = []byte{ 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x6d, 0x61, 0x78, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x53, 0x69, 0x7a, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0d, 0x6d, 0x61, 0x78, - 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x53, 0x69, 0x7a, 0x65, 0x1a, 0x39, 0x0a, 0x0b, 0x48, 0x65, - 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x85, 0x01, 0x0a, 0x25, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, - 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, - 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x50, - 0x01, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, - 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x74, 0x72, 0x61, - 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, - 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0xaa, 0x02, 0x21, 0x58, 0x72, 0x61, 0x79, - 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, - 0x6e, 0x65, 0x74, 0x2e, 0x53, 0x70, 0x6c, 0x69, 0x74, 0x48, 0x74, 0x74, 0x70, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x64, 0x0a, 0x13, 0x6d, 0x69, + 0x6e, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x4d, + 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, + 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, + 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x61, 0x6e, 0x64, + 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x13, 0x6d, 0x69, 0x6e, + 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x4d, 0x73, + 0x1a, 0x39, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, + 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, + 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x35, 0x0a, 0x0f, 0x52, + 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, + 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x66, 0x72, + 0x6f, 0x6d, 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, + 0x74, 0x6f, 0x42, 0x85, 0x01, 0x0a, 0x25, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, + 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, + 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x50, 0x01, 0x5a, 0x36, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, + 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, + 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x73, 0x70, 0x6c, + 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0xaa, 0x02, 0x21, 0x58, 0x72, 0x61, 0x79, 0x2e, 0x54, 0x72, + 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, + 0x2e, 0x53, 0x70, 0x6c, 0x69, 0x74, 0x48, 0x74, 0x74, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -148,18 +221,20 @@ func file_transport_internet_splithttp_config_proto_rawDescGZIP() []byte { return file_transport_internet_splithttp_config_proto_rawDescData } -var file_transport_internet_splithttp_config_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_transport_internet_splithttp_config_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_transport_internet_splithttp_config_proto_goTypes = []interface{}{ - (*Config)(nil), // 0: xray.transport.internet.splithttp.Config - nil, // 1: xray.transport.internet.splithttp.Config.HeaderEntry + (*Config)(nil), // 0: xray.transport.internet.splithttp.Config + (*RandRangeConfig)(nil), // 1: xray.transport.internet.splithttp.RandRangeConfig + nil, // 2: xray.transport.internet.splithttp.Config.HeaderEntry } var file_transport_internet_splithttp_config_proto_depIdxs = []int32{ - 1, // 0: xray.transport.internet.splithttp.Config.header:type_name -> xray.transport.internet.splithttp.Config.HeaderEntry - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 2, // 0: xray.transport.internet.splithttp.Config.header:type_name -> xray.transport.internet.splithttp.Config.HeaderEntry + 1, // 1: xray.transport.internet.splithttp.Config.minUploadIntervalMs:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_transport_internet_splithttp_config_proto_init() } @@ -180,6 +255,18 @@ func file_transport_internet_splithttp_config_proto_init() { return nil } } + file_transport_internet_splithttp_config_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RandRangeConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -187,7 +274,7 @@ func file_transport_internet_splithttp_config_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_transport_internet_splithttp_config_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 0, }, diff --git a/transport/internet/splithttp/config.proto b/transport/internet/splithttp/config.proto index 43cecee2..c87ab459 100644 --- a/transport/internet/splithttp/config.proto +++ b/transport/internet/splithttp/config.proto @@ -12,4 +12,10 @@ message Config { map header = 3; int32 maxConcurrentUploads = 4; int32 maxUploadSize = 5; + RandRangeConfig minUploadIntervalMs = 6; +} + +message RandRangeConfig { + int32 from = 1; + int32 to = 2; } diff --git a/transport/internet/splithttp/dialer.go b/transport/internet/splithttp/dialer.go index 99558975..7c677bf9 100644 --- a/transport/internet/splithttp/dialer.go +++ b/transport/internet/splithttp/dialer.go @@ -183,6 +183,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me maxConcurrentUploads := transportConfiguration.GetNormalizedMaxConcurrentUploads() maxUploadSize := transportConfiguration.GetNormalizedMaxUploadSize() + minUploadInterval := transportConfiguration.GetNormalizedMinUploadInterval() if tlsConfig != nil { requestURL.Scheme = "https" @@ -207,6 +208,8 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me requestsLimiter := semaphore.New(int(maxConcurrentUploads)) var requestCounter int64 + lastWrite := time.Now() + // by offloading the uploads into a buffered pipe, multiple conn.Write // calls get automatically batched together into larger POST requests. // without batching, bandwidth is extremely limited. @@ -237,6 +240,14 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me } }() + if minUploadInterval.From > 0 { + roll := time.Duration(minUploadInterval.roll()) * time.Millisecond + if time.Since(lastWrite) < roll { + time.Sleep(roll) + } + + lastWrite = time.Now() + } } }()