mirror of
https://github.com/XTLS/Xray-core.git
synced 2025-05-09 21:58:41 +00:00
Use capsulate protocol for large UDP packet
- make datagram transport without mux functionality - it is now recommended to always pair with mux-cool (XUDP new tunnel non-zero session id)
This commit is contained in:
parent
358bdc258e
commit
129b2be9c1
4 changed files with 140 additions and 89 deletions
|
@ -6,22 +6,83 @@ import (
|
|||
|
||||
"github.com/quic-go/quic-go"
|
||||
"github.com/xtls/xray-core/common/buf"
|
||||
"github.com/xtls/xray-core/common/errors"
|
||||
"github.com/xtls/xray-core/common/net"
|
||||
"github.com/xtls/xray-core/common/signal/done"
|
||||
)
|
||||
|
||||
var MaxIncomingStreams = 2
|
||||
var currentStream = 0
|
||||
|
||||
type interConn struct {
|
||||
ctx context.Context
|
||||
quicConn quic.Connection
|
||||
local net.Addr
|
||||
remote net.Addr
|
||||
ctx context.Context
|
||||
quicConn quic.Connection // small udp packet can be sent with Datagram directly
|
||||
streams []quic.Stream // other packets can be sent via steam, it offer mux, reliability, fragmentation and ordering
|
||||
readChannel chan readResult
|
||||
done *done.Instance
|
||||
local net.Addr
|
||||
remote net.Addr
|
||||
}
|
||||
|
||||
func (c *interConn) Read(b []byte) (int, error) {
|
||||
received, e := c.quicConn.ReceiveDatagram(c.ctx)
|
||||
if e != nil {
|
||||
return 0, e
|
||||
type readResult struct {
|
||||
buffer []byte
|
||||
err error
|
||||
}
|
||||
|
||||
func NewConnInitReader(ctx context.Context, quicConn quic.Connection, done *done.Instance, remote net.Addr) *interConn {
|
||||
c := &interConn{
|
||||
ctx: ctx,
|
||||
quicConn: quicConn,
|
||||
readChannel: make(chan readResult),
|
||||
done: done,
|
||||
local: quicConn.LocalAddr(),
|
||||
remote: remote,
|
||||
}
|
||||
nBytes := copy(b, received[:])
|
||||
go func() {
|
||||
for {
|
||||
received, e := c.quicConn.ReceiveDatagram(c.ctx)
|
||||
c.readChannel <- readResult{buffer: received, err: e}
|
||||
}
|
||||
}()
|
||||
go c.acceptStreams()
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *interConn) acceptStreams() {
|
||||
for {
|
||||
stream, err := c.quicConn.AcceptStream(context.Background())
|
||||
if err != nil {
|
||||
errors.LogInfoInner(context.Background(), err, "failed to accept stream")
|
||||
select {
|
||||
case <-c.quicConn.Context().Done():
|
||||
return
|
||||
case <-c.done.Wait():
|
||||
if err := c.quicConn.CloseWithError(0, ""); err != nil {
|
||||
errors.LogInfoInner(context.Background(), err, "failed to close connection")
|
||||
}
|
||||
return
|
||||
default:
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
received := make([]byte, buf.Size)
|
||||
i, e := stream.Read(received)
|
||||
c.readChannel <- readResult{buffer: received[:i], err: e}
|
||||
}
|
||||
}()
|
||||
c.streams = append(c.streams, stream)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *interConn) Read(b []byte) (int, error) {
|
||||
received := <- c.readChannel
|
||||
if received.err != nil {
|
||||
return 0, received.err
|
||||
}
|
||||
nBytes := copy(b, received.buffer[:])
|
||||
return nBytes, nil
|
||||
}
|
||||
|
||||
|
@ -33,11 +94,37 @@ func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error {
|
|||
}
|
||||
|
||||
func (c *interConn) Write(b []byte) (int, error) {
|
||||
return len(b), c.quicConn.SendDatagram(b)
|
||||
var err = c.quicConn.SendDatagram(b)
|
||||
if _, ok := err.(*quic.DatagramTooLargeError); ok {
|
||||
if len(c.streams) < MaxIncomingStreams {
|
||||
stream, err := c.quicConn.OpenStream()
|
||||
if err == nil {
|
||||
c.streams = append(c.streams, stream)
|
||||
} else {
|
||||
errors.LogInfoInner(c.ctx, err, "failed to openStream: ")
|
||||
}
|
||||
}
|
||||
currentStream++;
|
||||
if currentStream > len(c.streams) - 1 {
|
||||
currentStream = 0;
|
||||
}
|
||||
return c.streams[currentStream].Write(b)
|
||||
}
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (c *interConn) Close() error {
|
||||
return nil
|
||||
var err error
|
||||
for _, s := range c.streams {
|
||||
e := s.Close()
|
||||
if e != nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *interConn) LocalAddr() net.Addr {
|
||||
|
@ -49,13 +136,34 @@ func (c *interConn) RemoteAddr() net.Addr {
|
|||
}
|
||||
|
||||
func (c *interConn) SetDeadline(t time.Time) error {
|
||||
return nil
|
||||
var err error
|
||||
for _, s := range c.streams {
|
||||
e := s.SetDeadline(t)
|
||||
if e != nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *interConn) SetReadDeadline(t time.Time) error {
|
||||
return nil
|
||||
var err error
|
||||
for _, s := range c.streams {
|
||||
e := s.SetReadDeadline(t)
|
||||
if e != nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *interConn) SetWriteDeadline(t time.Time) error {
|
||||
return nil
|
||||
var err error
|
||||
for _, s := range c.streams {
|
||||
e := s.SetWriteDeadline(t)
|
||||
if e != nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue