diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 32a1262..a9174d0 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -63,7 +63,8 @@ type Balancer interface { // 使用原子计数器实现线程安全的轮询选择,每次选择后计数器递增, // 确保请求均匀分布到所有健康目标。 type roundRobin struct { - counter uint64 + counter uint64 + healthyPool sync.Pool } // newRoundRobin 创建轮询均衡器。 @@ -73,7 +74,14 @@ type roundRobin struct { // 返回值: // - Balancer: 轮询负载均衡器实例 func newRoundRobin() Balancer { - return &roundRobin{} + rr := &roundRobin{} + rr.healthyPool = sync.Pool{ + New: func() any { + // 预分配合理容量(典型场景 64 个后端) + return make([]*Target, 0, 64) + }, + } + return rr } // Select 选择下一个目标。 @@ -87,18 +95,22 @@ func newRoundRobin() Balancer { // 返回值: // - *Target: 选中的目标服务器,无可用目标时返回 nil func (r *roundRobin) Select(targets []*Target) *Target { - // 过滤健康目标 - healthy := make([]*Target, 0) + // 从池中获取 healthy slice 并复用 + healthy := r.healthyPool.Get().([]*Target) + healthy = healthy[:0] // 清空但保留容量 for _, t := range targets { if t.healthy.Load() { healthy = append(healthy, t) } } if len(healthy) == 0 { + r.healthyPool.Put(healthy) return nil } idx := atomic.AddUint64(&r.counter, 1) - 1 - return healthy[idx%uint64(len(healthy))] + result := healthy[idx%uint64(len(healthy))] + r.healthyPool.Put(healthy) + return result } // leastConn 最少连接负载均衡器。 @@ -148,7 +160,8 @@ func (l *leastConn) Select(targets []*Target) *Target { // 根据目标服务器的权重分配请求,权重高的目标获得更多请求。 // 使用原子计数器确保线程安全,支持不同权重的目标混合使用。 type weightedRoundRobin struct { - counter uint64 + counter uint64 + healthyPool sync.Pool } // newWeightedRoundRobin 创建加权轮询均衡器。 @@ -158,7 +171,13 @@ type weightedRoundRobin struct { // 返回值: // - Balancer: 加权轮询负载均衡器实例 func newWeightedRoundRobin() Balancer { - return &weightedRoundRobin{} + w := &weightedRoundRobin{} + w.healthyPool = sync.Pool{ + New: func() any { + return make([]*Target, 0, 64) + }, + } + return w } // Select 基于权重分布选择目标。 @@ -173,13 +192,15 @@ func newWeightedRoundRobin() Balancer { // 返回值: // - *Target: 选中的目标服务器,无可用目标时返回 nil func (w *weightedRoundRobin) Select(targets []*Target) *Target { - healthy := make([]*Target, 0) + healthy := w.healthyPool.Get().([]*Target) + healthy = healthy[:0] for _, t := range targets { if t.healthy.Load() { healthy = append(healthy, t) } } if len(healthy) == 0 { + w.healthyPool.Put(healthy) return nil } @@ -206,18 +227,23 @@ func (w *weightedRoundRobin) Select(targets []*Target) *Target { } currentWeight += weight if pos < currentWeight { + w.healthyPool.Put(healthy) return t } } - return healthy[len(healthy)-1] + result := healthy[len(healthy)-1] + w.healthyPool.Put(healthy) + return result } // ipHash IP 哈希负载均衡器。 // // 基于客户端 IP 地址计算哈希值,将同一 IP 的请求分配到固定目标。 // 适合需要会话保持的场景,确保相同客户端总是被路由到同一服务器。 -type ipHash struct{} +type ipHash struct { + healthyPool sync.Pool +} // newIPHash 创建 IP 哈希均衡器。 // @@ -226,7 +252,13 @@ type ipHash struct{} // 返回值: // - Balancer: IP 哈希负载均衡器实例 func newIPHash() Balancer { - return &ipHash{} + ih := &ipHash{} + ih.healthyPool = sync.Pool{ + New: func() any { + return make([]*Target, 0, 64) + }, + } + return ih } // Select 默认选择(IP Hash 需要具体 IP)。 @@ -256,13 +288,15 @@ func (i *ipHash) Select(targets []*Target) *Target { // 返回值: // - *Target: 选中的目标服务器,无可用目标时返回 nil func (i *ipHash) SelectByIP(targets []*Target, clientIP string) *Target { - healthy := make([]*Target, 0) + healthy := i.healthyPool.Get().([]*Target) + healthy = healthy[:0] for _, t := range targets { if t.healthy.Load() { healthy = append(healthy, t) } } if len(healthy) == 0 { + i.healthyPool.Put(healthy) return nil } @@ -272,7 +306,9 @@ func (i *ipHash) SelectByIP(targets []*Target, clientIP string) *Target { hash := h.Sum64() idx := hash % uint64(len(healthy)) - return healthy[idx] + result := healthy[idx] + i.healthyPool.Put(healthy) + return result } // Server TCP/UDP Stream 代理服务器。