perf(loadbalance): eliminate per-request allocations in filterHealthy with sync.Pool

filterHealthy() allocated 2 slices (available + backups) per call.
filterHealthyAndExclude() allocated 3 (adds excludeSet map).
IPHash allocated fnv.New64a() object per call.
All triggered on every request's LB selection.

Changes:
- Add filterContext struct holding reusable buffers, managed by sync.Pool
- Replace filterHealthy → filterInto (writes into pooled buffers)
- Replace filterHealthyAndExclude → filterIntoExcluding (pooled buffers)
- Add inline fnvHash64a() to avoid fnv.New64a() heap allocation
- Update all 6 balancer algorithms (RoundRobin, WeightedRoundRobin,
  LeastConnections, IPHash, Random, ConsistentHash) to use pooled
  filterContext via acquire/release pattern
- ConsistentHash.SelectExcludingByKey also uses pool for targetSet
- Remove buildExcludeSet (merged into filterIntoExcluding)

Result: allocs/op reduced from 2-3 to 0-1 on all LB Select paths.
This commit is contained in:
xfy 2026-06-04 00:19:04 +08:00
parent e44cfc7128
commit c6e7091089
4 changed files with 144 additions and 130 deletions

View File

@ -17,7 +17,6 @@
package loadbalance
import (
"hash/fnv"
"net"
"net/url"
"sync"
@ -25,6 +24,44 @@ import (
"time"
)
type filterContext struct {
available []*Target
backups []*Target
excludeSet map[string]bool
}
var filterContextPool = sync.Pool{
New: func() any {
return &filterContext{
available: make([]*Target, 0, 64),
backups: make([]*Target, 0, 64),
excludeSet: make(map[string]bool, 8),
}
},
}
func acquireFilterContext() *filterContext {
return filterContextPool.Get().(*filterContext)
}
func releaseFilterContext(fc *filterContext) {
fc.available = fc.available[:0]
fc.backups = fc.backups[:0]
for k := range fc.excludeSet {
delete(fc.excludeSet, k)
}
filterContextPool.Put(fc)
}
func fnvHash64a(key string) uint64 {
var h uint64 = 14695981039346656037
for i := 0; i < len(key); i++ {
h ^= uint64(key[i])
h *= 1099511628211
}
return h
}
// Target 表示 HTTP 代理L7 层)的负载均衡后端服务器目标。
//
// HTTP Target 特性(区别于 Stream Target
@ -120,12 +157,12 @@ func NewRoundRobin() *RoundRobin {
// Select 选择轮询顺序中的下一个目标。
// 只考虑健康目标。如果没有健康目标则返回 nil。
func (r *RoundRobin) Select(targets []*Target) *Target {
healthy := filterHealthy(targets)
fc := acquireFilterContext()
defer releaseFilterContext(fc)
healthy := filterInto(fc, targets)
if len(healthy) == 0 {
return nil
}
// 原子地递增并获取计数器值
idx := r.counter.Add(1) - 1
return healthy[idx%uint64(len(healthy))]
}
@ -154,16 +191,17 @@ func NewWeightedRoundRobin() *WeightedRoundRobin {
// Select 基于权重分布选择目标。
// 只考虑健康目标。如果没有健康目标则返回 nil。
func (w *WeightedRoundRobin) Select(targets []*Target) *Target {
healthy := filterHealthy(targets)
fc := acquireFilterContext()
defer releaseFilterContext(fc)
healthy := filterInto(fc, targets)
if len(healthy) == 0 {
return nil
}
// 计算总权重
totalWeight := 0
for _, t := range healthy {
if t.Weight <= 0 {
totalWeight++ // 最小权重为 1
totalWeight++
} else {
totalWeight += t.Weight
}
@ -173,11 +211,9 @@ func (w *WeightedRoundRobin) Select(targets []*Target) *Target {
return nil
}
// 使用原子计数器确定权重分布中的位置
idx := w.counter.Add(1) - 1
pos := int(idx % uint64(totalWeight))
// 找到计算位置处的目标
currentWeight := 0
for _, t := range healthy {
weight := t.Weight
@ -190,7 +226,6 @@ func (w *WeightedRoundRobin) Select(targets []*Target) *Target {
}
}
// 回退到最后一个目标(不应到达这里)
return healthy[len(healthy)-1]
}
@ -256,16 +291,13 @@ func (i *IPHash) Select(targets []*Target) *Target {
// SelectByIP 基于提供的 IP 地址的哈希值选择目标。
// 只考虑健康目标。如果没有健康目标则返回 nil。
func (i *IPHash) SelectByIP(targets []*Target, clientIP string) *Target {
healthy := filterHealthy(targets)
fc := acquireFilterContext()
defer releaseFilterContext(fc)
healthy := filterInto(fc, targets)
if len(healthy) == 0 {
return nil
}
// 对客户端 IP 进行哈希
h := fnv.New64a()
h.Write([]byte(clientIP))
hash := h.Sum64()
hash := fnvHash64a(clientIP)
idx := hash % uint64(len(healthy))
return healthy[idx]
}
@ -344,32 +376,21 @@ func (t *Target) IsBackup() bool {
return t.Backup
}
// filterHealthy 从目标列表中筛选出所有可用的目标,返回新切片。
//
// 选择逻辑:
// 1. 优先选择非备份且可用的目标IsAvailable() == true && !IsBackup()
// 2. 如果没有非备份目标可用,则选择可用的备份目标
//
// 返回的切片容量与输入相同,避免多次内存分配。
func filterHealthy(targets []*Target) []*Target {
available := make([]*Target, 0, len(targets))
backups := make([]*Target, 0, len(targets))
func filterInto(fc *filterContext, targets []*Target) []*Target {
for _, t := range targets {
if !t.IsAvailable() {
continue
}
if t.IsBackup() {
backups = append(backups, t)
fc.backups = append(fc.backups, t)
} else {
available = append(available, t)
fc.available = append(fc.available, t)
}
}
if len(available) > 0 {
return available
if len(fc.available) > 0 {
return fc.available
}
return backups
return fc.backups
}
// IncrementConnections 原子地增加目标的连接计数。
@ -384,45 +405,37 @@ func DecrementConnections(t *Target) {
atomic.AddInt64(&t.Connections, -1)
}
// filterHealthyAndExclude 从目标列表中筛选出可用且不在排除列表中的目标,返回新切片。
//
// 选择逻辑与 filterHealthy 相同:
// 1. 优先选择非备份且可用的目标
// 2. 如果没有非备份目标可用,则选择可用的备份目标
//
// 排除判断基于目标的 URL 进行匹配。
func filterHealthyAndExclude(targets []*Target, excluded []*Target) []*Target {
excludeSet := buildExcludeSet(excluded)
available := make([]*Target, 0, len(targets))
backups := make([]*Target, 0, len(targets))
func filterIntoExcluding(fc *filterContext, targets []*Target, excluded []*Target) []*Target {
for _, t := range excluded {
if t != nil {
fc.excludeSet[t.URL] = true
}
}
for _, t := range targets {
if !t.IsAvailable() || excludeSet[t.URL] {
if !t.IsAvailable() || fc.excludeSet[t.URL] {
continue
}
if t.IsBackup() {
backups = append(backups, t)
fc.backups = append(fc.backups, t)
} else {
available = append(available, t)
fc.available = append(fc.available, t)
}
}
if len(available) > 0 {
return available
if len(fc.available) > 0 {
return fc.available
}
return backups
return fc.backups
}
// SelectExcluding 根据轮询策略选择一个目标,排除指定的目标列表。
// 只考虑健康且不在排除列表中的目标。
func (r *RoundRobin) SelectExcluding(targets []*Target, excluded []*Target) *Target {
available := filterHealthyAndExclude(targets, excluded)
fc := acquireFilterContext()
defer releaseFilterContext(fc)
available := filterIntoExcluding(fc, targets, excluded)
if len(available) == 0 {
return nil
}
// 原子地递增并获取计数器值
idx := r.counter.Add(1) - 1
return available[idx%uint64(len(available))]
}
@ -430,16 +443,17 @@ func (r *RoundRobin) SelectExcluding(targets []*Target, excluded []*Target) *Tar
// SelectExcluding 根据权重分布选择目标,排除指定的目标列表。
// 只考虑健康且不在排除列表中的目标。
func (w *WeightedRoundRobin) SelectExcluding(targets []*Target, excluded []*Target) *Target {
available := filterHealthyAndExclude(targets, excluded)
fc := acquireFilterContext()
defer releaseFilterContext(fc)
available := filterIntoExcluding(fc, targets, excluded)
if len(available) == 0 {
return nil
}
// 计算总权重
totalWeight := 0
for _, t := range available {
if t.Weight <= 0 {
totalWeight++ // 最小权重为 1
totalWeight++
} else {
totalWeight += t.Weight
}
@ -449,11 +463,9 @@ func (w *WeightedRoundRobin) SelectExcluding(targets []*Target, excluded []*Targ
return nil
}
// 使用原子计数器确定权重分布中的位置
idx := w.counter.Add(1) - 1
pos := int(idx % uint64(totalWeight))
// 找到计算位置处的目标
currentWeight := 0
for _, t := range available {
weight := t.Weight
@ -466,14 +478,19 @@ func (w *WeightedRoundRobin) SelectExcluding(targets []*Target, excluded []*Targ
}
}
// 回退到最后一个目标(不应到达这里)
return available[len(available)-1]
}
// SelectExcluding 选择连接数最少的目标,排除指定的目标列表。
// 优先选择非备份目标,仅当无可用非备份目标时选择备份目标。
func (l *LeastConnections) SelectExcluding(targets []*Target, excluded []*Target) *Target {
excludeSet := buildExcludeSet(excluded)
fc := acquireFilterContext()
defer releaseFilterContext(fc)
for _, t := range excluded {
if t != nil {
fc.excludeSet[t.URL] = true
}
}
var selected *Target
var selectedBackup *Target
@ -481,12 +498,10 @@ func (l *LeastConnections) SelectExcluding(targets []*Target, excluded []*Target
var minBackupConns int64 = -1
for _, t := range targets {
if !t.IsAvailable() || excludeSet[t.URL] {
if !t.IsAvailable() || fc.excludeSet[t.URL] {
continue
}
conns := atomic.LoadInt64(&t.Connections)
if t.IsBackup() {
if selectedBackup == nil || conns < minBackupConns {
selectedBackup = t
@ -515,16 +530,13 @@ func (i *IPHash) SelectExcluding(targets []*Target, excluded []*Target) *Target
// SelectExcludingByIP 基于提供的 IP 地址的哈希值选择目标,排除指定的目标列表。
// 只考虑健康且不在排除列表中的目标。
func (i *IPHash) SelectExcludingByIP(targets []*Target, excluded []*Target, clientIP string) *Target {
available := filterHealthyAndExclude(targets, excluded)
fc := acquireFilterContext()
defer releaseFilterContext(fc)
available := filterIntoExcluding(fc, targets, excluded)
if len(available) == 0 {
return nil
}
// 对客户端 IP 进行哈希
h := fnv.New64a()
h.Write([]byte(clientIP))
hash := h.Sum64()
hash := fnvHash64a(clientIP)
idx := hash % uint64(len(available))
return available[idx]
}
@ -622,24 +634,4 @@ func (t *Target) LastResolved() time.Time {
return time.Unix(0, nano)
}
// buildExcludeSet 从排除列表构建 URL 集合。
//
// 用于负载均衡算法中快速检查目标是否应被排除。
//
// 参数:
// - excluded: 需要排除的目标列表
//
// 返回值:
// - map[string]bool: 目标 URL 到 true 的映射
func buildExcludeSet(excluded []*Target) map[string]bool {
if len(excluded) == 0 {
return nil
}
excludeSet := make(map[string]bool, len(excluded))
for _, t := range excluded {
if t != nil {
excludeSet[t.URL] = true
}
}
return excludeSet
}

View File

@ -575,15 +575,16 @@ func TestFilterHealthy(t *testing.T) {
createHealthyTarget("http://backend4:8080", false),
}
got := filterHealthy(targets)
fc := acquireFilterContext()
got := filterInto(fc, targets)
defer releaseFilterContext(fc)
if len(got) != 2 {
t.Errorf("len(filterHealthy) = %d, want 2", len(got))
t.Errorf("len(filterInto) = %d, want 2", len(got))
}
// 验证返回的都是健康目标
for _, target := range got {
if !target.Healthy.Load() {
t.Errorf("filterHealthy 返回了不健康目标: %q", target.URL)
t.Errorf("filterInto returned unhealthy target: %q", target.URL)
}
}
})
@ -594,9 +595,11 @@ func TestFilterHealthy(t *testing.T) {
createHealthyTarget("http://backend2:8080", true),
}
got := filterHealthy(targets)
fc := acquireFilterContext()
got := filterInto(fc, targets)
defer releaseFilterContext(fc)
if len(got) != 2 {
t.Errorf("len(filterHealthy) = %d, want 2", len(got))
t.Errorf("len(filterInto) = %d, want 2", len(got))
}
})
@ -606,23 +609,29 @@ func TestFilterHealthy(t *testing.T) {
createHealthyTarget("http://backend2:8080", false),
}
got := filterHealthy(targets)
fc := acquireFilterContext()
got := filterInto(fc, targets)
defer releaseFilterContext(fc)
if len(got) != 0 {
t.Errorf("len(filterHealthy) = %d, want 0", len(got))
t.Errorf("len(filterInto) = %d, want 0", len(got))
}
})
t.Run("空切片", func(_ *testing.T) {
got := filterHealthy([]*Target{})
fc := acquireFilterContext()
got := filterInto(fc, []*Target{})
defer releaseFilterContext(fc)
if len(got) != 0 {
t.Errorf("len(filterHealthy) = %d, want 0", len(got))
t.Errorf("len(filterInto) = %d, want 0", len(got))
}
})
t.Run("nil切片", func(_ *testing.T) {
got := filterHealthy(nil)
fc := acquireFilterContext()
got := filterInto(fc, nil)
defer releaseFilterContext(fc)
if len(got) != 0 {
t.Errorf("len(filterHealthy) = %d, want 0", len(got))
t.Errorf("len(filterInto) = %d, want 0", len(got))
}
})
}
@ -1255,7 +1264,9 @@ func TestFilterHealthyAndExclude(t *testing.T) {
}
excluded := []*Target{targets[0]}
got := filterHealthyAndExclude(targets, excluded)
fc := acquireFilterContext()
got := filterIntoExcluding(fc, targets, excluded)
defer releaseFilterContext(fc)
if len(got) != 1 {
t.Fatalf("len = %d, want 1", len(got))
}
@ -1270,14 +1281,18 @@ func TestFilterHealthyAndExclude(t *testing.T) {
createHealthyTarget("http://backend2:8080", true),
}
got := filterHealthyAndExclude(targets, nil)
fc := acquireFilterContext()
got := filterIntoExcluding(fc, targets, nil)
defer releaseFilterContext(fc)
if len(got) != 2 {
t.Fatalf("len = %d, want 2", len(got))
}
})
t.Run("空目标列表", func(_ *testing.T) {
got := filterHealthyAndExclude(nil, []*Target{})
fc := acquireFilterContext()
got := filterIntoExcluding(fc, nil, []*Target{})
defer releaseFilterContext(fc)
if len(got) != 0 {
t.Errorf("len = %d, want 0", len(got))
}
@ -1289,7 +1304,9 @@ func TestFilterHealthyAndExclude(t *testing.T) {
}
excluded := []*Target{nil}
got := filterHealthyAndExclude(targets, excluded)
fc := acquireFilterContext()
got := filterIntoExcluding(fc, targets, excluded)
defer releaseFilterContext(fc)
if len(got) != 1 {
t.Fatalf("len = %d, want 1", len(got))
}
@ -1835,7 +1852,9 @@ func TestFilterHealthyBackup(t *testing.T) {
backup := NewTargetFromConfig("http://backup:8080", 1, 0, 0, 0, true, false, "")
targets := []*Target{primary, backup}
result := filterHealthy(targets)
fc := acquireFilterContext()
result := filterInto(fc, targets)
defer releaseFilterContext(fc)
if len(result) != 1 || result[0].URL != "http://primary:8080" {
t.Error("should prefer non-backup target")
}
@ -1847,7 +1866,9 @@ func TestFilterHealthyBackup(t *testing.T) {
backup := NewTargetFromConfig("http://backup:8080", 1, 0, 0, 0, true, false, "")
targets := []*Target{primary, backup}
result := filterHealthy(targets)
fc := acquireFilterContext()
result := filterInto(fc, targets)
defer releaseFilterContext(fc)
if len(result) != 1 || result[0].URL != "http://backup:8080" {
t.Error("should fall back to backup target")
}

View File

@ -16,7 +16,6 @@ package loadbalance
import (
"fmt"
"hash/fnv"
"slices"
"sort"
"sync"
@ -144,9 +143,7 @@ func (c *ConsistentHash) rebuildCircle(targets []*Target) {
// hashKeyString 计算字符串的哈希值(使用 FNV-64a
func (c *ConsistentHash) hashKeyString(key string) uint64 {
h := fnv.New64a()
h.Write([]byte(key))
return h.Sum64()
return fnvHash64a(key)
}
// PrecomputeHashes 预计算目标的虚拟节点哈希值。
@ -246,42 +243,43 @@ func (c *ConsistentHash) SelectExcludingByKey(targets []*Target, excluded []*Tar
c.mu.RLock()
}
// 构建 targets 集合(用于校验返回的目标是否有效)
targetSet := make(map[string]bool, len(targets))
fc := acquireFilterContext()
defer releaseFilterContext(fc)
targetSet := fc.excludeSet
for _, t := range targets {
if t.Healthy.Load() {
targetSet[t.URL] = true
}
}
// 构建排除集合
excludeSet := buildExcludeSet(excluded)
for _, t := range excluded {
if t != nil {
targetSet[t.URL] = false
}
}
if len(c.sortedHashes) == 0 {
c.mu.RUnlock()
return nil
}
// 计算键的哈希值
hash := c.hashKeyString(key)
// 二分查找起始位置
idx := sort.Search(len(c.sortedHashes), func(i int) bool {
return c.sortedHashes[i] >= hash
})
// 从起始位置开始查找,跳过 excluded 和不在 targetSet 中的目标
for i := 0; i < len(c.sortedHashes); i++ {
targetIdx := (idx + i) % len(c.sortedHashes)
target := c.circle[c.sortedHashes[targetIdx]]
if targetSet[target.URL] && !excludeSet[target.URL] {
if targetSet[target.URL] {
c.mu.RUnlock()
return target
}
}
c.mu.RUnlock()
return nil // 所有目标都被排除或不在 targets 列表中
return nil
}
// 验证接口实现

View File

@ -29,7 +29,9 @@ func NewRandom() *Random {
// 随机选择两个候选,返回连接数较少的那个。
// 只考虑可用目标。如果没有可用目标则返回 nil。
func (r *Random) Select(targets []*Target) *Target {
available := filterHealthy(targets)
fc := acquireFilterContext()
defer releaseFilterContext(fc)
available := filterInto(fc, targets)
if len(available) == 0 {
return nil
}
@ -38,7 +40,6 @@ func (r *Random) Select(targets []*Target) *Target {
return available[0]
}
// Power of Two Choices
i := rand.IntN(len(available))
j := rand.IntN(len(available) - 1)
if j >= i {
@ -54,7 +55,9 @@ func (r *Random) Select(targets []*Target) *Target {
// SelectExcluding 使用 Power of Two Choices 算法选择目标,排除指定的目标列表。
func (r *Random) SelectExcluding(targets []*Target, excluded []*Target) *Target {
available := filterHealthyAndExclude(targets, excluded)
fc := acquireFilterContext()
defer releaseFilterContext(fc)
available := filterIntoExcluding(fc, targets, excluded)
if len(available) == 0 {
return nil
}