mirror of
https://github.com/XTLS/Xray-core.git
synced 2024-12-21 21:19:49 +00:00
079d0bd8a9
* Refactor log * Add new log methods * Fix logger test * Change all logging code * Clean up pathObj * Rebase to latest main * Remove invoking method name after the dot
242 lines
4.1 KiB
Go
242 lines
4.1 KiB
Go
package mux
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/xtls/xray-core/common"
|
|
"github.com/xtls/xray-core/common/buf"
|
|
"github.com/xtls/xray-core/common/errors"
|
|
"github.com/xtls/xray-core/common/net"
|
|
"github.com/xtls/xray-core/common/protocol"
|
|
"github.com/xtls/xray-core/transport/pipe"
|
|
)
|
|
|
|
type SessionManager struct {
|
|
sync.RWMutex
|
|
sessions map[uint16]*Session
|
|
count uint16
|
|
closed bool
|
|
}
|
|
|
|
func NewSessionManager() *SessionManager {
|
|
return &SessionManager{
|
|
count: 0,
|
|
sessions: make(map[uint16]*Session, 16),
|
|
}
|
|
}
|
|
|
|
func (m *SessionManager) Closed() bool {
|
|
m.RLock()
|
|
defer m.RUnlock()
|
|
|
|
return m.closed
|
|
}
|
|
|
|
func (m *SessionManager) Size() int {
|
|
m.RLock()
|
|
defer m.RUnlock()
|
|
|
|
return len(m.sessions)
|
|
}
|
|
|
|
func (m *SessionManager) Count() int {
|
|
m.RLock()
|
|
defer m.RUnlock()
|
|
|
|
return int(m.count)
|
|
}
|
|
|
|
func (m *SessionManager) Allocate() *Session {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
|
|
if m.closed {
|
|
return nil
|
|
}
|
|
|
|
m.count++
|
|
s := &Session{
|
|
ID: m.count,
|
|
parent: m,
|
|
}
|
|
m.sessions[s.ID] = s
|
|
return s
|
|
}
|
|
|
|
func (m *SessionManager) Add(s *Session) bool {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
|
|
if m.closed {
|
|
return false
|
|
}
|
|
|
|
m.count++
|
|
m.sessions[s.ID] = s
|
|
return true
|
|
}
|
|
|
|
func (m *SessionManager) Remove(locked bool, id uint16) {
|
|
if !locked {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
}
|
|
locked = true
|
|
|
|
if m.closed {
|
|
return
|
|
}
|
|
|
|
delete(m.sessions, id)
|
|
|
|
/*
|
|
if len(m.sessions) == 0 {
|
|
m.sessions = make(map[uint16]*Session, 16)
|
|
}
|
|
*/
|
|
}
|
|
|
|
func (m *SessionManager) Get(id uint16) (*Session, bool) {
|
|
m.RLock()
|
|
defer m.RUnlock()
|
|
|
|
if m.closed {
|
|
return nil, false
|
|
}
|
|
|
|
s, found := m.sessions[id]
|
|
return s, found
|
|
}
|
|
|
|
func (m *SessionManager) CloseIfNoSession() bool {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
|
|
if m.closed {
|
|
return true
|
|
}
|
|
|
|
if len(m.sessions) != 0 {
|
|
return false
|
|
}
|
|
|
|
m.closed = true
|
|
return true
|
|
}
|
|
|
|
func (m *SessionManager) Close() error {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
|
|
if m.closed {
|
|
return nil
|
|
}
|
|
|
|
m.closed = true
|
|
|
|
for _, s := range m.sessions {
|
|
s.Close(true)
|
|
}
|
|
|
|
m.sessions = nil
|
|
return nil
|
|
}
|
|
|
|
// Session represents a client connection in a Mux connection.
|
|
type Session struct {
|
|
input buf.Reader
|
|
output buf.Writer
|
|
parent *SessionManager
|
|
ID uint16
|
|
transferType protocol.TransferType
|
|
closed bool
|
|
XUDP *XUDP
|
|
}
|
|
|
|
// Close closes all resources associated with this session.
|
|
func (s *Session) Close(locked bool) error {
|
|
if !locked {
|
|
s.parent.Lock()
|
|
defer s.parent.Unlock()
|
|
}
|
|
locked = true
|
|
if s.closed {
|
|
return nil
|
|
}
|
|
s.closed = true
|
|
if s.XUDP == nil {
|
|
common.Interrupt(s.input)
|
|
common.Close(s.output)
|
|
} else {
|
|
// Stop existing handle(), then trigger writer.Close().
|
|
// Note that s.output may be dispatcher.SizeStatWriter.
|
|
s.input.(*pipe.Reader).ReturnAnError(io.EOF)
|
|
runtime.Gosched()
|
|
// If the error set by ReturnAnError still exists, clear it.
|
|
s.input.(*pipe.Reader).Recover()
|
|
XUDPManager.Lock()
|
|
if s.XUDP.Status == Active {
|
|
s.XUDP.Expire = time.Now().Add(time.Minute)
|
|
s.XUDP.Status = Expiring
|
|
errors.LogDebug(context.Background(), "XUDP put ", s.XUDP.GlobalID)
|
|
}
|
|
XUDPManager.Unlock()
|
|
}
|
|
s.parent.Remove(locked, s.ID)
|
|
return nil
|
|
}
|
|
|
|
// NewReader creates a buf.Reader based on the transfer type of this Session.
|
|
func (s *Session) NewReader(reader *buf.BufferedReader, dest *net.Destination) buf.Reader {
|
|
if s.transferType == protocol.TransferTypeStream {
|
|
return NewStreamReader(reader)
|
|
}
|
|
return NewPacketReader(reader, dest)
|
|
}
|
|
|
|
const (
|
|
Initializing = 0
|
|
Active = 1
|
|
Expiring = 2
|
|
)
|
|
|
|
type XUDP struct {
|
|
GlobalID [8]byte
|
|
Status uint64
|
|
Expire time.Time
|
|
Mux *Session
|
|
}
|
|
|
|
func (x *XUDP) Interrupt() {
|
|
common.Interrupt(x.Mux.input)
|
|
common.Close(x.Mux.output)
|
|
}
|
|
|
|
var XUDPManager struct {
|
|
sync.Mutex
|
|
Map map[[8]byte]*XUDP
|
|
}
|
|
|
|
func init() {
|
|
XUDPManager.Map = make(map[[8]byte]*XUDP)
|
|
go func() {
|
|
for {
|
|
time.Sleep(time.Minute)
|
|
now := time.Now()
|
|
XUDPManager.Lock()
|
|
for id, x := range XUDPManager.Map {
|
|
if x.Status == Expiring && now.After(x.Expire) {
|
|
x.Interrupt()
|
|
delete(XUDPManager.Map, id)
|
|
errors.LogDebug(context.Background(), "XUDP del ", id)
|
|
}
|
|
}
|
|
XUDPManager.Unlock()
|
|
}
|
|
}()
|
|
}
|