perf(stream): Balancer healthy slice 池化减少分配
- roundRobin/weightedRoundRobin/ipHash 添加 healthyPool sync.Pool - Select 方法使用 pool.Get/Put 复用 []*Target slice - steady-state allocs/op 从 slice 分配降至 1(仅 pool 接口开销) Benchmark: roundRobin 1 allocs/op, weightedRoundRobin 1 allocs/op Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
58ed02ceac
commit
f20eafbb28
@ -64,6 +64,7 @@ type Balancer interface {
|
|||||||
// 确保请求均匀分布到所有健康目标。
|
// 确保请求均匀分布到所有健康目标。
|
||||||
type roundRobin struct {
|
type roundRobin struct {
|
||||||
counter uint64
|
counter uint64
|
||||||
|
healthyPool sync.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// newRoundRobin 创建轮询均衡器。
|
// newRoundRobin 创建轮询均衡器。
|
||||||
@ -73,7 +74,14 @@ type roundRobin struct {
|
|||||||
// 返回值:
|
// 返回值:
|
||||||
// - Balancer: 轮询负载均衡器实例
|
// - Balancer: 轮询负载均衡器实例
|
||||||
func newRoundRobin() Balancer {
|
func newRoundRobin() Balancer {
|
||||||
return &roundRobin{}
|
rr := &roundRobin{}
|
||||||
|
rr.healthyPool = sync.Pool{
|
||||||
|
New: func() any {
|
||||||
|
// 预分配合理容量(典型场景 64 个后端)
|
||||||
|
return make([]*Target, 0, 64)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return rr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select 选择下一个目标。
|
// Select 选择下一个目标。
|
||||||
@ -87,18 +95,22 @@ func newRoundRobin() Balancer {
|
|||||||
// 返回值:
|
// 返回值:
|
||||||
// - *Target: 选中的目标服务器,无可用目标时返回 nil
|
// - *Target: 选中的目标服务器,无可用目标时返回 nil
|
||||||
func (r *roundRobin) Select(targets []*Target) *Target {
|
func (r *roundRobin) Select(targets []*Target) *Target {
|
||||||
// 过滤健康目标
|
// 从池中获取 healthy slice 并复用
|
||||||
healthy := make([]*Target, 0)
|
healthy := r.healthyPool.Get().([]*Target)
|
||||||
|
healthy = healthy[:0] // 清空但保留容量
|
||||||
for _, t := range targets {
|
for _, t := range targets {
|
||||||
if t.healthy.Load() {
|
if t.healthy.Load() {
|
||||||
healthy = append(healthy, t)
|
healthy = append(healthy, t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(healthy) == 0 {
|
if len(healthy) == 0 {
|
||||||
|
r.healthyPool.Put(healthy)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
idx := atomic.AddUint64(&r.counter, 1) - 1
|
idx := atomic.AddUint64(&r.counter, 1) - 1
|
||||||
return healthy[idx%uint64(len(healthy))]
|
result := healthy[idx%uint64(len(healthy))]
|
||||||
|
r.healthyPool.Put(healthy)
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// leastConn 最少连接负载均衡器。
|
// leastConn 最少连接负载均衡器。
|
||||||
@ -149,6 +161,7 @@ func (l *leastConn) Select(targets []*Target) *Target {
|
|||||||
// 使用原子计数器确保线程安全,支持不同权重的目标混合使用。
|
// 使用原子计数器确保线程安全,支持不同权重的目标混合使用。
|
||||||
type weightedRoundRobin struct {
|
type weightedRoundRobin struct {
|
||||||
counter uint64
|
counter uint64
|
||||||
|
healthyPool sync.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// newWeightedRoundRobin 创建加权轮询均衡器。
|
// newWeightedRoundRobin 创建加权轮询均衡器。
|
||||||
@ -158,7 +171,13 @@ type weightedRoundRobin struct {
|
|||||||
// 返回值:
|
// 返回值:
|
||||||
// - Balancer: 加权轮询负载均衡器实例
|
// - Balancer: 加权轮询负载均衡器实例
|
||||||
func newWeightedRoundRobin() Balancer {
|
func newWeightedRoundRobin() Balancer {
|
||||||
return &weightedRoundRobin{}
|
w := &weightedRoundRobin{}
|
||||||
|
w.healthyPool = sync.Pool{
|
||||||
|
New: func() any {
|
||||||
|
return make([]*Target, 0, 64)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select 基于权重分布选择目标。
|
// Select 基于权重分布选择目标。
|
||||||
@ -173,13 +192,15 @@ func newWeightedRoundRobin() Balancer {
|
|||||||
// 返回值:
|
// 返回值:
|
||||||
// - *Target: 选中的目标服务器,无可用目标时返回 nil
|
// - *Target: 选中的目标服务器,无可用目标时返回 nil
|
||||||
func (w *weightedRoundRobin) Select(targets []*Target) *Target {
|
func (w *weightedRoundRobin) Select(targets []*Target) *Target {
|
||||||
healthy := make([]*Target, 0)
|
healthy := w.healthyPool.Get().([]*Target)
|
||||||
|
healthy = healthy[:0]
|
||||||
for _, t := range targets {
|
for _, t := range targets {
|
||||||
if t.healthy.Load() {
|
if t.healthy.Load() {
|
||||||
healthy = append(healthy, t)
|
healthy = append(healthy, t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(healthy) == 0 {
|
if len(healthy) == 0 {
|
||||||
|
w.healthyPool.Put(healthy)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -206,18 +227,23 @@ func (w *weightedRoundRobin) Select(targets []*Target) *Target {
|
|||||||
}
|
}
|
||||||
currentWeight += weight
|
currentWeight += weight
|
||||||
if pos < currentWeight {
|
if pos < currentWeight {
|
||||||
|
w.healthyPool.Put(healthy)
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return healthy[len(healthy)-1]
|
result := healthy[len(healthy)-1]
|
||||||
|
w.healthyPool.Put(healthy)
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// ipHash IP 哈希负载均衡器。
|
// ipHash IP 哈希负载均衡器。
|
||||||
//
|
//
|
||||||
// 基于客户端 IP 地址计算哈希值,将同一 IP 的请求分配到固定目标。
|
// 基于客户端 IP 地址计算哈希值,将同一 IP 的请求分配到固定目标。
|
||||||
// 适合需要会话保持的场景,确保相同客户端总是被路由到同一服务器。
|
// 适合需要会话保持的场景,确保相同客户端总是被路由到同一服务器。
|
||||||
type ipHash struct{}
|
type ipHash struct {
|
||||||
|
healthyPool sync.Pool
|
||||||
|
}
|
||||||
|
|
||||||
// newIPHash 创建 IP 哈希均衡器。
|
// newIPHash 创建 IP 哈希均衡器。
|
||||||
//
|
//
|
||||||
@ -226,7 +252,13 @@ type ipHash struct{}
|
|||||||
// 返回值:
|
// 返回值:
|
||||||
// - Balancer: IP 哈希负载均衡器实例
|
// - Balancer: IP 哈希负载均衡器实例
|
||||||
func newIPHash() Balancer {
|
func newIPHash() Balancer {
|
||||||
return &ipHash{}
|
ih := &ipHash{}
|
||||||
|
ih.healthyPool = sync.Pool{
|
||||||
|
New: func() any {
|
||||||
|
return make([]*Target, 0, 64)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return ih
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select 默认选择(IP Hash 需要具体 IP)。
|
// Select 默认选择(IP Hash 需要具体 IP)。
|
||||||
@ -256,13 +288,15 @@ func (i *ipHash) Select(targets []*Target) *Target {
|
|||||||
// 返回值:
|
// 返回值:
|
||||||
// - *Target: 选中的目标服务器,无可用目标时返回 nil
|
// - *Target: 选中的目标服务器,无可用目标时返回 nil
|
||||||
func (i *ipHash) SelectByIP(targets []*Target, clientIP string) *Target {
|
func (i *ipHash) SelectByIP(targets []*Target, clientIP string) *Target {
|
||||||
healthy := make([]*Target, 0)
|
healthy := i.healthyPool.Get().([]*Target)
|
||||||
|
healthy = healthy[:0]
|
||||||
for _, t := range targets {
|
for _, t := range targets {
|
||||||
if t.healthy.Load() {
|
if t.healthy.Load() {
|
||||||
healthy = append(healthy, t)
|
healthy = append(healthy, t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(healthy) == 0 {
|
if len(healthy) == 0 {
|
||||||
|
i.healthyPool.Put(healthy)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,7 +306,9 @@ func (i *ipHash) SelectByIP(targets []*Target, clientIP string) *Target {
|
|||||||
hash := h.Sum64()
|
hash := h.Sum64()
|
||||||
|
|
||||||
idx := hash % uint64(len(healthy))
|
idx := hash % uint64(len(healthy))
|
||||||
return healthy[idx]
|
result := healthy[idx]
|
||||||
|
i.healthyPool.Put(healthy)
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// Server TCP/UDP Stream 代理服务器。
|
// Server TCP/UDP Stream 代理服务器。
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user