mirror of
https://github.com/XTLS/Xray-core.git
synced 2025-01-27 03:54:12 +00:00
Add Datagram transport
This commit is contained in:
parent
79d020f4d7
commit
8882937bce
@ -18,6 +18,7 @@ import (
|
||||
"github.com/xtls/xray-core/transport/internet"
|
||||
"github.com/xtls/xray-core/transport/internet/httpupgrade"
|
||||
"github.com/xtls/xray-core/transport/internet/kcp"
|
||||
"github.com/xtls/xray-core/transport/internet/quic"
|
||||
"github.com/xtls/xray-core/transport/internet/reality"
|
||||
"github.com/xtls/xray-core/transport/internet/splithttp"
|
||||
"github.com/xtls/xray-core/transport/internet/tcp"
|
||||
@ -326,6 +327,22 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
|
||||
return config, nil
|
||||
}
|
||||
|
||||
type QUICConfig struct {
|
||||
// Header json.RawMessage `json:"header"`
|
||||
// Security string `json:"security"`
|
||||
// Key string `json:"key"`
|
||||
|
||||
Fec bool `json:"fec"`
|
||||
}
|
||||
|
||||
// Build implements Buildable.
|
||||
func (c *QUICConfig) Build() (proto.Message, error) {
|
||||
config := &quic.Config{
|
||||
Fec: c.Fec,
|
||||
}
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func readFileOrString(f string, s []string) ([]byte, error) {
|
||||
if len(f) > 0 {
|
||||
return filesystem.ReadFile(f)
|
||||
@ -659,8 +676,8 @@ func (p TransportProtocol) Build() (string, error) {
|
||||
return "httpupgrade", nil
|
||||
case "h2", "h3", "http":
|
||||
return "", errors.PrintRemovedFeatureError("HTTP transport (without header padding, etc.)", "XHTTP stream-one H2 & H3")
|
||||
case "quic":
|
||||
return "", errors.PrintRemovedFeatureError("QUIC transport (without web service, etc.)", "XHTTP stream-one H3")
|
||||
case "quic", "datagram":
|
||||
return "quic", nil
|
||||
default:
|
||||
return "", errors.New("Config: unknown transport protocol: ", p)
|
||||
}
|
||||
@ -793,6 +810,7 @@ type StreamConfig struct {
|
||||
XHTTPSettings *SplitHTTPConfig `json:"xhttpSettings"`
|
||||
SplitHTTPSettings *SplitHTTPConfig `json:"splithttpSettings"`
|
||||
KCPSettings *KCPConfig `json:"kcpSettings"`
|
||||
QUICSettings *QUICConfig `json:"quicSettings"`
|
||||
GRPCSettings *GRPCConfig `json:"grpcSettings"`
|
||||
WSSettings *WebSocketConfig `json:"wsSettings"`
|
||||
HTTPUPGRADESettings *HttpUpgradeConfig `json:"httpupgradeSettings"`
|
||||
@ -884,6 +902,16 @@ func (c *StreamConfig) Build() (*internet.StreamConfig, error) {
|
||||
Settings: serial.ToTypedMessage(ts),
|
||||
})
|
||||
}
|
||||
if c.QUICSettings != nil {
|
||||
qs, err := c.QUICSettings.Build()
|
||||
if err != nil {
|
||||
return nil, errors.New("Failed to build QUIC config").Base(err)
|
||||
}
|
||||
config.TransportSettings = append(config.TransportSettings, &internet.TransportConfig{
|
||||
ProtocolName: "quic",
|
||||
Settings: serial.ToTypedMessage(qs),
|
||||
})
|
||||
}
|
||||
if c.GRPCSettings != nil {
|
||||
gs, err := c.GRPCSettings.Build()
|
||||
if err != nil {
|
||||
|
@ -53,6 +53,7 @@ import (
|
||||
_ "github.com/xtls/xray-core/transport/internet/grpc"
|
||||
_ "github.com/xtls/xray-core/transport/internet/httpupgrade"
|
||||
_ "github.com/xtls/xray-core/transport/internet/kcp"
|
||||
_ "github.com/xtls/xray-core/transport/internet/quic"
|
||||
_ "github.com/xtls/xray-core/transport/internet/reality"
|
||||
_ "github.com/xtls/xray-core/transport/internet/splithttp"
|
||||
_ "github.com/xtls/xray-core/transport/internet/tcp"
|
||||
|
61
transport/internet/quic/conn.go
Normal file
61
transport/internet/quic/conn.go
Normal file
@ -0,0 +1,61 @@
|
||||
package quic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/xtls/quic-go"
|
||||
"github.com/xtls/xray-core/common/buf"
|
||||
"github.com/xtls/xray-core/common/net"
|
||||
)
|
||||
|
||||
type interConn struct {
|
||||
ctx context.Context
|
||||
quicConn quic.Connection
|
||||
local net.Addr
|
||||
remote net.Addr
|
||||
}
|
||||
|
||||
func (c *interConn) Read(b []byte) (int, error) {
|
||||
received, e := c.quicConn.ReceiveDatagram(c.ctx)
|
||||
if e != nil {
|
||||
return 0, e
|
||||
}
|
||||
nBytes := copy(b, received[:])
|
||||
return nBytes, nil
|
||||
}
|
||||
|
||||
func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error {
|
||||
mb = buf.Compact(mb)
|
||||
mb, err := buf.WriteMultiBuffer(c, mb)
|
||||
buf.ReleaseMulti(mb)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *interConn) Write(b []byte) (int, error) {
|
||||
return len(b), c.quicConn.SendDatagram(b)
|
||||
}
|
||||
|
||||
func (c *interConn) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *interConn) LocalAddr() net.Addr {
|
||||
return c.local
|
||||
}
|
||||
|
||||
func (c *interConn) RemoteAddr() net.Addr {
|
||||
return c.remote
|
||||
}
|
||||
|
||||
func (c *interConn) SetDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *interConn) SetReadDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *interConn) SetWriteDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
154
transport/internet/quic/dialer.go
Normal file
154
transport/internet/quic/dialer.go
Normal file
@ -0,0 +1,154 @@
|
||||
package quic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/xtls/quic-go"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/errors"
|
||||
"github.com/xtls/xray-core/common/net"
|
||||
"github.com/xtls/xray-core/transport/internet"
|
||||
"github.com/xtls/xray-core/transport/internet/stat"
|
||||
"github.com/xtls/xray-core/transport/internet/tls"
|
||||
)
|
||||
|
||||
type connectionContext struct {
|
||||
rawConn *net.UDPConn
|
||||
conn quic.Connection
|
||||
}
|
||||
|
||||
type clientConnections struct {
|
||||
access sync.Mutex
|
||||
conns map[net.Destination][]*connectionContext
|
||||
// cleanup *task.Periodic
|
||||
}
|
||||
|
||||
func isActive(s quic.Connection) bool {
|
||||
select {
|
||||
case <-s.Context().Done():
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
|
||||
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
|
||||
if tlsConfig == nil {
|
||||
tlsConfig = &tls.Config{
|
||||
ServerName: internalDomain,
|
||||
AllowInsecure: true,
|
||||
}
|
||||
}
|
||||
|
||||
var destAddr *net.UDPAddr
|
||||
if dest.Address.Family().IsIP() {
|
||||
destAddr = &net.UDPAddr{
|
||||
IP: dest.Address.IP(),
|
||||
Port: int(dest.Port),
|
||||
}
|
||||
} else {
|
||||
dialerIp := internet.DestIpAddress()
|
||||
if dialerIp != nil {
|
||||
destAddr = &net.UDPAddr{
|
||||
IP: dialerIp,
|
||||
Port: int(dest.Port),
|
||||
}
|
||||
errors.LogInfo(ctx, "quic Dial use dialer dest addr: ", destAddr)
|
||||
} else {
|
||||
addr, err := net.ResolveUDPAddr("udp", dest.NetAddr())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
destAddr = addr
|
||||
}
|
||||
}
|
||||
|
||||
config := streamSettings.ProtocolSettings.(*Config)
|
||||
|
||||
return client.openConnection(ctx, destAddr, config, tlsConfig, streamSettings.SocketSettings)
|
||||
}
|
||||
|
||||
func (s *clientConnections) openConnection(ctx context.Context, destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (stat.Connection, error) {
|
||||
s.access.Lock()
|
||||
defer s.access.Unlock()
|
||||
|
||||
if s.conns == nil {
|
||||
s.conns = make(map[net.Destination][]*connectionContext)
|
||||
}
|
||||
|
||||
dest := net.DestinationFromAddr(destAddr)
|
||||
|
||||
var conns []*connectionContext
|
||||
if s, found := s.conns[dest]; found {
|
||||
conns = s
|
||||
}
|
||||
|
||||
if len(conns) > 0 {
|
||||
s := conns[len(conns)-1]
|
||||
if isActive(s.conn) {
|
||||
return &interConn{
|
||||
ctx: ctx,
|
||||
quicConn: s.conn,
|
||||
local: s.conn.LocalAddr(),
|
||||
remote: destAddr,
|
||||
}, nil
|
||||
} else {
|
||||
errors.LogInfo(ctx, "current quic connection is not active!")
|
||||
}
|
||||
}
|
||||
|
||||
errors.LogInfo(ctx, "dialing quic to ", dest)
|
||||
rawConn, err := internet.DialSystem(ctx, dest, sockopt)
|
||||
if err != nil {
|
||||
return nil, errors.New("failed to dial to dest: ", err).AtWarning().Base(err)
|
||||
}
|
||||
|
||||
quicConfig := &quic.Config{
|
||||
KeepAlivePeriod: 0,
|
||||
HandshakeIdleTimeout: time.Second * 8,
|
||||
MaxIdleTimeout: time.Second * 300,
|
||||
EnableDatagrams: true,
|
||||
}
|
||||
|
||||
var udpConn *net.UDPConn
|
||||
switch conn := rawConn.(type) {
|
||||
case *net.UDPConn:
|
||||
udpConn = conn
|
||||
case *internet.PacketConnWrapper:
|
||||
udpConn = conn.Conn.(*net.UDPConn)
|
||||
default:
|
||||
rawConn.Close()
|
||||
return nil, errors.New("QUIC with sockopt is unsupported").AtWarning()
|
||||
}
|
||||
|
||||
tr := quic.Transport{
|
||||
ConnectionIDLength: 12,
|
||||
Conn: udpConn,
|
||||
}
|
||||
conn, err := tr.Dial(context.Background(), destAddr, tlsConfig.GetTLSConfig(tls.WithDestination(dest)), quicConfig)
|
||||
if err != nil {
|
||||
udpConn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
context := &connectionContext{
|
||||
conn: conn,
|
||||
rawConn: udpConn,
|
||||
}
|
||||
s.conns[dest] = append(conns, context)
|
||||
return &interConn{
|
||||
ctx: ctx,
|
||||
quicConn: context.conn,
|
||||
local: context.conn.LocalAddr(),
|
||||
remote: destAddr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var client clientConnections
|
||||
|
||||
func init() {
|
||||
common.Must(internet.RegisterTransportDialer(protocolName, Dial))
|
||||
}
|
113
transport/internet/quic/hub.go
Normal file
113
transport/internet/quic/hub.go
Normal file
@ -0,0 +1,113 @@
|
||||
package quic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/xtls/quic-go"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/errors"
|
||||
"github.com/xtls/xray-core/common/net"
|
||||
"github.com/xtls/xray-core/common/protocol/tls/cert"
|
||||
"github.com/xtls/xray-core/common/signal/done"
|
||||
"github.com/xtls/xray-core/transport/internet"
|
||||
"github.com/xtls/xray-core/transport/internet/tls"
|
||||
)
|
||||
|
||||
// Listener is an internet.Listener that listens for TCP connections.
|
||||
type Listener struct {
|
||||
rawConn *net.UDPConn
|
||||
listener *quic.Listener
|
||||
done *done.Instance
|
||||
addConn internet.ConnHandler
|
||||
}
|
||||
|
||||
func (l *Listener) keepAccepting(ctx context.Context) {
|
||||
for {
|
||||
conn, err := l.listener.Accept(context.Background())
|
||||
if err != nil {
|
||||
errors.LogInfoInner(context.Background(), err, "failed to accept QUIC connection")
|
||||
if l.done.Done() {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
l.addConn(&interConn{
|
||||
ctx: ctx,
|
||||
quicConn: conn,
|
||||
local: conn.LocalAddr(),
|
||||
remote: conn.RemoteAddr(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Addr implements internet.Listener.Addr.
|
||||
func (l *Listener) Addr() net.Addr {
|
||||
return l.listener.Addr()
|
||||
}
|
||||
|
||||
// Close implements internet.Listener.Close.
|
||||
func (l *Listener) Close() error {
|
||||
l.done.Close()
|
||||
l.listener.Close()
|
||||
l.rawConn.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Listen creates a new Listener based on configurations.
|
||||
func Listen(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) {
|
||||
if address.Family().IsDomain() {
|
||||
return nil, errors.New("domain address is not allows for listening quic")
|
||||
}
|
||||
|
||||
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
|
||||
if tlsConfig == nil {
|
||||
tlsConfig = &tls.Config{
|
||||
Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.DNSNames(internalDomain), cert.CommonName(internalDomain)))},
|
||||
}
|
||||
}
|
||||
|
||||
//config := streamSettings.ProtocolSettings.(*Config)
|
||||
rawConn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{
|
||||
IP: address.IP(),
|
||||
Port: int(port),
|
||||
}, streamSettings.SocketSettings)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
quicConfig := &quic.Config{
|
||||
KeepAlivePeriod: 0,
|
||||
HandshakeIdleTimeout: time.Second * 8,
|
||||
MaxIdleTimeout: time.Second * 300,
|
||||
MaxIncomingStreams: 32,
|
||||
MaxIncomingUniStreams: -1,
|
||||
EnableDatagrams: true,
|
||||
}
|
||||
|
||||
tr := quic.Transport{
|
||||
ConnectionIDLength: 12,
|
||||
Conn: rawConn.(*net.UDPConn),
|
||||
}
|
||||
qListener, err := tr.Listen(tlsConfig.GetTLSConfig(), quicConfig)
|
||||
if err != nil {
|
||||
rawConn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
listener := &Listener{
|
||||
done: done.New(),
|
||||
rawConn: rawConn.(*net.UDPConn),
|
||||
listener: qListener,
|
||||
addConn: handler,
|
||||
}
|
||||
|
||||
go listener.keepAccepting(ctx)
|
||||
|
||||
return listener, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
common.Must(internet.RegisterTransportListener(protocolName, Listen))
|
||||
}
|
17
transport/internet/quic/quic.go
Normal file
17
transport/internet/quic/quic.go
Normal file
@ -0,0 +1,17 @@
|
||||
package quic
|
||||
|
||||
import (
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/transport/internet"
|
||||
)
|
||||
|
||||
const (
|
||||
protocolName = "quic"
|
||||
internalDomain = "quic.internal.example.com"
|
||||
)
|
||||
|
||||
func init() {
|
||||
common.Must(internet.RegisterProtocolConfigCreator(protocolName, func() interface{} {
|
||||
return new(Config)
|
||||
}))
|
||||
}
|
92
transport/internet/quic/quic_test.go
Normal file
92
transport/internet/quic/quic_test.go
Normal file
@ -0,0 +1,92 @@
|
||||
package quic_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/buf"
|
||||
"github.com/xtls/xray-core/common/net"
|
||||
"github.com/xtls/xray-core/common/protocol/tls/cert"
|
||||
"github.com/xtls/xray-core/testing/servers/udp"
|
||||
"github.com/xtls/xray-core/transport/internet"
|
||||
"github.com/xtls/xray-core/transport/internet/quic"
|
||||
"github.com/xtls/xray-core/transport/internet/stat"
|
||||
"github.com/xtls/xray-core/transport/internet/tls"
|
||||
)
|
||||
|
||||
func TestQuicConnection(t *testing.T) {
|
||||
port := udp.PickPort()
|
||||
|
||||
listener, err := quic.Listen(context.Background(), net.LocalHostIP, port, &internet.MemoryStreamConfig{
|
||||
ProtocolName: "quic",
|
||||
ProtocolSettings: &quic.Config{},
|
||||
SecurityType: "tls",
|
||||
SecuritySettings: &tls.Config{
|
||||
Certificate: []*tls.Certificate{
|
||||
tls.ParseCertificate(
|
||||
cert.MustGenerate(nil,
|
||||
cert.DNSNames("www.example.com"),
|
||||
),
|
||||
),
|
||||
},
|
||||
},
|
||||
}, func(conn stat.Connection) {
|
||||
go func() {
|
||||
defer conn.Close()
|
||||
|
||||
b := buf.New()
|
||||
defer b.Release()
|
||||
|
||||
for {
|
||||
b.Clear()
|
||||
if _, err := b.ReadFrom(conn); err != nil {
|
||||
return
|
||||
}
|
||||
common.Must2(conn.Write(b.Bytes()))
|
||||
}
|
||||
}()
|
||||
})
|
||||
common.Must(err)
|
||||
|
||||
defer listener.Close()
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
dctx := context.Background()
|
||||
conn, err := quic.Dial(dctx, net.TCPDestination(net.LocalHostIP, port), &internet.MemoryStreamConfig{
|
||||
ProtocolName: "quic",
|
||||
ProtocolSettings: &quic.Config{},
|
||||
SecurityType: "tls",
|
||||
SecuritySettings: &tls.Config{
|
||||
ServerName: "www.example.com",
|
||||
AllowInsecure: true,
|
||||
},
|
||||
})
|
||||
common.Must(err)
|
||||
defer conn.Close()
|
||||
|
||||
const N = 1024
|
||||
b1 := make([]byte, N)
|
||||
common.Must2(rand.Read(b1))
|
||||
b2 := buf.New()
|
||||
|
||||
common.Must2(conn.Write(b1))
|
||||
|
||||
b2.Clear()
|
||||
common.Must2(b2.ReadFullFrom(conn, N))
|
||||
if r := cmp.Diff(b2.Bytes(), b1); r != "" {
|
||||
t.Error(r)
|
||||
}
|
||||
|
||||
common.Must2(conn.Write(b1))
|
||||
|
||||
b2.Clear()
|
||||
common.Must2(b2.ReadFullFrom(conn, N))
|
||||
if r := cmp.Diff(b2.Bytes(), b1); r != "" {
|
||||
t.Error(r)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user