xfy c6bb75cffe feat(loadbalance): add atomic EWMA statistics core
- Zero-lock atomic EWMA implementation using fixed-point arithmetic
- Supports header_time and last_byte_time tracking
- Concurrent-safe with CAS retry loop
2026-06-08 17:13:08 +08:00

61 lines
1.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package loadbalance
import (
"sync/atomic"
"time"
)
// EWMAStats 使用原子操作实现的 EWMA指数加权移动平均统计器。
type EWMAStats struct {
headerTime atomic.Int64 // 首字节时间的 EWMA纳秒
lastByteTime atomic.Int64 // 完整响应时间的 EWMA纳秒
sampleCount atomic.Int64 // 样本计数
}
const defaultAlphaScale = 300 // alpha = 0.3
func NewEWMAStats() *EWMAStats {
return &EWMAStats{}
}
func (e *EWMAStats) Record(headerTime, lastByteTime time.Duration) {
e.recordAtomic(&e.headerTime, headerTime)
e.recordAtomic(&e.lastByteTime, lastByteTime)
e.sampleCount.Add(1)
}
func (e *EWMAStats) recordAtomic(ptr *atomic.Int64, newValue time.Duration) {
newNano := newValue.Nanoseconds()
for {
old := ptr.Load()
if old == 0 {
if ptr.CompareAndSwap(0, newNano) {
return
}
continue
}
updated := (defaultAlphaScale*newNano + (1000-defaultAlphaScale)*old) / 1000
if ptr.CompareAndSwap(old, updated) {
return
}
}
}
func (e *EWMAStats) HeaderTime() time.Duration {
return time.Duration(e.headerTime.Load())
}
func (e *EWMAStats) LastByteTime() time.Duration {
return time.Duration(e.lastByteTime.Load())
}
func (e *EWMAStats) SampleCount() int64 {
return e.sampleCount.Load()
}
func (e *EWMAStats) Reset() {
e.headerTime.Store(0)
e.lastByteTime.Store(0)
e.sampleCount.Store(0)
}