diff --git a/proxy/proxy.go b/proxy/proxy.go index 0b1a5824..bdfb6cd2 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -155,7 +155,7 @@ func NewTrafficState(userUUID []byte, flow string) *TrafficState { return &state } -// VisionReader is used to read xtls vision protocol +// VisionReader is used to read seed protocol // Note Vision probably only make sense as the inner most layer of reader, since it need assess traffic state from origin proxy traffic type VisionReader struct { buf.Reader @@ -208,7 +208,7 @@ func (w *VisionReader) ReadMultiBuffer() (buf.MultiBuffer, error) { return buffer, err } -// VisionWriter is used to write xtls vision protocol +// VisionWriter is used to write seed protocol // Note Vision probably only make sense as the inner most layer of writer, since it need assess traffic state from origin proxy traffic type VisionWriter struct { buf.Writer @@ -216,6 +216,7 @@ type VisionWriter struct { trafficState *TrafficState ctx context.Context writeOnceUserUUID []byte + scheduler *Scheduler } func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, context context.Context) *VisionWriter { @@ -227,6 +228,7 @@ func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, cont trafficState: state, ctx: context, writeOnceUserUUID: w, + scheduler: NewScheduler(writer, addon, state, context), } } @@ -279,7 +281,14 @@ func (w *VisionWriter) WriteMultiBuffer(mb buf.MultiBuffer) error { if w.trafficState.StartTime.IsZero() { w.trafficState.StartTime = time.Now() } - return w.Writer.WriteMultiBuffer(mb) + w.scheduler.Buffer <- mb + if w.addons.Scheduler == nil { + w.scheduler.Trigger <- -1 // send all buffers + } + if len(w.scheduler.Error) > 0 { + return <-w.scheduler.Error + } + return nil } // ReshapeMultiBuffer prepare multi buffer for padding structure (max 21 bytes) diff --git a/proxy/scheduler.go b/proxy/scheduler.go new file mode 100644 index 00000000..204a04db --- /dev/null +++ b/proxy/scheduler.go @@ -0,0 +1,74 @@ +package proxy + +import ( + "context" + "crypto/rand" + "math/big" + "sync" + "time" + + "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/common/session" +) + +type Scheduler struct { + Buffer chan buf.MultiBuffer + Trigger chan int + Error chan error + bufferReadLock *sync.Mutex + writer buf.Writer + addons *Addons + trafficState *TrafficState + ctx context.Context +} + +func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, context context.Context) *Scheduler { + var s = Scheduler{ + Buffer: make(chan buf.MultiBuffer, 100), + Trigger: make(chan int), + Error: make(chan error, 100), + bufferReadLock: new(sync.Mutex), + writer: w, + addons: addon, + trafficState: state, + ctx: context, + } + go s.mainLoop() + if s.addons.Scheduler != nil { + go s.exampleIndependentScheduler() + } + return &s +} + +func(s *Scheduler) mainLoop() { + for trigger := range s.Trigger { + go func() { // each trigger has independent delay, trigger does not block + var d = 0 * time.Millisecond + if s.addons.Delay != nil { + l, err := rand.Int(rand.Reader, big.NewInt(int64(s.addons.Delay.MaxMillis - s.addons.Delay.MinMillis))) + if err != nil { + newError("failed to generate delay", trigger).Base(err).WriteToLog(session.ExportIDToError(s.ctx)) + } + d = time.Duration(uint32(l.Int64()) + s.addons.Delay.MinMillis) + time.Sleep(d * time.Millisecond) + } + + s.bufferReadLock.Lock() // guard against multiple trigger threads + var sending = len(s.Buffer) + if sending > 0 { + newError("Scheduler Trigger for ", sending, " buffer(s) with ", d, " ", trigger).AtDebug().WriteToLog(session.ExportIDToError(s.ctx)) + } + for i := 0; i