From c6e7091089cf11e32ec14452cd6e6482b38a5160 Mon Sep 17 00:00:00 2001 From: xfy Date: Thu, 4 Jun 2026 00:19:04 +0800 Subject: [PATCH] perf(loadbalance): eliminate per-request allocations in filterHealthy with sync.Pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit filterHealthy() allocated 2 slices (available + backups) per call. filterHealthyAndExclude() allocated 3 (adds excludeSet map). IPHash allocated fnv.New64a() object per call. All triggered on every request's LB selection. Changes: - Add filterContext struct holding reusable buffers, managed by sync.Pool - Replace filterHealthy → filterInto (writes into pooled buffers) - Replace filterHealthyAndExclude → filterIntoExcluding (pooled buffers) - Add inline fnvHash64a() to avoid fnv.New64a() heap allocation - Update all 6 balancer algorithms (RoundRobin, WeightedRoundRobin, LeastConnections, IPHash, Random, ConsistentHash) to use pooled filterContext via acquire/release pattern - ConsistentHash.SelectExcludingByKey also uses pool for targetSet - Remove buildExcludeSet (merged into filterIntoExcluding) Result: allocs/op reduced from 2-3 to 0-1 on all LB Select paths. --- internal/loadbalance/balancer.go | 182 +++++++++++------------- internal/loadbalance/balancer_test.go | 57 +++++--- internal/loadbalance/consistent_hash.go | 26 ++-- internal/loadbalance/random.go | 9 +- 4 files changed, 144 insertions(+), 130 deletions(-) diff --git a/internal/loadbalance/balancer.go b/internal/loadbalance/balancer.go index 88e2dfd..43a9bd9 100644 --- a/internal/loadbalance/balancer.go +++ b/internal/loadbalance/balancer.go @@ -17,7 +17,6 @@ package loadbalance import ( - "hash/fnv" "net" "net/url" "sync" @@ -25,6 +24,44 @@ import ( "time" ) +type filterContext struct { + available []*Target + backups []*Target + excludeSet map[string]bool +} + +var filterContextPool = sync.Pool{ + New: func() any { + return &filterContext{ + available: make([]*Target, 0, 64), + backups: make([]*Target, 0, 64), + excludeSet: make(map[string]bool, 8), + } + }, +} + +func acquireFilterContext() *filterContext { + return filterContextPool.Get().(*filterContext) +} + +func releaseFilterContext(fc *filterContext) { + fc.available = fc.available[:0] + fc.backups = fc.backups[:0] + for k := range fc.excludeSet { + delete(fc.excludeSet, k) + } + filterContextPool.Put(fc) +} + +func fnvHash64a(key string) uint64 { + var h uint64 = 14695981039346656037 + for i := 0; i < len(key); i++ { + h ^= uint64(key[i]) + h *= 1099511628211 + } + return h +} + // Target 表示 HTTP 代理(L7 层)的负载均衡后端服务器目标。 // // HTTP Target 特性(区别于 Stream Target): @@ -120,12 +157,12 @@ func NewRoundRobin() *RoundRobin { // Select 选择轮询顺序中的下一个目标。 // 只考虑健康目标。如果没有健康目标则返回 nil。 func (r *RoundRobin) Select(targets []*Target) *Target { - healthy := filterHealthy(targets) + fc := acquireFilterContext() + defer releaseFilterContext(fc) + healthy := filterInto(fc, targets) if len(healthy) == 0 { return nil } - - // 原子地递增并获取计数器值 idx := r.counter.Add(1) - 1 return healthy[idx%uint64(len(healthy))] } @@ -154,16 +191,17 @@ func NewWeightedRoundRobin() *WeightedRoundRobin { // Select 基于权重分布选择目标。 // 只考虑健康目标。如果没有健康目标则返回 nil。 func (w *WeightedRoundRobin) Select(targets []*Target) *Target { - healthy := filterHealthy(targets) + fc := acquireFilterContext() + defer releaseFilterContext(fc) + healthy := filterInto(fc, targets) if len(healthy) == 0 { return nil } - // 计算总权重 totalWeight := 0 for _, t := range healthy { if t.Weight <= 0 { - totalWeight++ // 最小权重为 1 + totalWeight++ } else { totalWeight += t.Weight } @@ -173,11 +211,9 @@ func (w *WeightedRoundRobin) Select(targets []*Target) *Target { return nil } - // 使用原子计数器确定权重分布中的位置 idx := w.counter.Add(1) - 1 pos := int(idx % uint64(totalWeight)) - // 找到计算位置处的目标 currentWeight := 0 for _, t := range healthy { weight := t.Weight @@ -190,7 +226,6 @@ func (w *WeightedRoundRobin) Select(targets []*Target) *Target { } } - // 回退到最后一个目标(不应到达这里) return healthy[len(healthy)-1] } @@ -256,16 +291,13 @@ func (i *IPHash) Select(targets []*Target) *Target { // SelectByIP 基于提供的 IP 地址的哈希值选择目标。 // 只考虑健康目标。如果没有健康目标则返回 nil。 func (i *IPHash) SelectByIP(targets []*Target, clientIP string) *Target { - healthy := filterHealthy(targets) + fc := acquireFilterContext() + defer releaseFilterContext(fc) + healthy := filterInto(fc, targets) if len(healthy) == 0 { return nil } - - // 对客户端 IP 进行哈希 - h := fnv.New64a() - h.Write([]byte(clientIP)) - hash := h.Sum64() - + hash := fnvHash64a(clientIP) idx := hash % uint64(len(healthy)) return healthy[idx] } @@ -344,32 +376,21 @@ func (t *Target) IsBackup() bool { return t.Backup } -// filterHealthy 从目标列表中筛选出所有可用的目标,返回新切片。 -// -// 选择逻辑: -// 1. 优先选择非备份且可用的目标(IsAvailable() == true && !IsBackup()) -// 2. 如果没有非备份目标可用,则选择可用的备份目标 -// -// 返回的切片容量与输入相同,避免多次内存分配。 -func filterHealthy(targets []*Target) []*Target { - available := make([]*Target, 0, len(targets)) - backups := make([]*Target, 0, len(targets)) - +func filterInto(fc *filterContext, targets []*Target) []*Target { for _, t := range targets { if !t.IsAvailable() { continue } if t.IsBackup() { - backups = append(backups, t) + fc.backups = append(fc.backups, t) } else { - available = append(available, t) + fc.available = append(fc.available, t) } } - - if len(available) > 0 { - return available + if len(fc.available) > 0 { + return fc.available } - return backups + return fc.backups } // IncrementConnections 原子地增加目标的连接计数。 @@ -384,45 +405,37 @@ func DecrementConnections(t *Target) { atomic.AddInt64(&t.Connections, -1) } -// filterHealthyAndExclude 从目标列表中筛选出可用且不在排除列表中的目标,返回新切片。 -// -// 选择逻辑与 filterHealthy 相同: -// 1. 优先选择非备份且可用的目标 -// 2. 如果没有非备份目标可用,则选择可用的备份目标 -// -// 排除判断基于目标的 URL 进行匹配。 -func filterHealthyAndExclude(targets []*Target, excluded []*Target) []*Target { - excludeSet := buildExcludeSet(excluded) - - available := make([]*Target, 0, len(targets)) - backups := make([]*Target, 0, len(targets)) - +func filterIntoExcluding(fc *filterContext, targets []*Target, excluded []*Target) []*Target { + for _, t := range excluded { + if t != nil { + fc.excludeSet[t.URL] = true + } + } for _, t := range targets { - if !t.IsAvailable() || excludeSet[t.URL] { + if !t.IsAvailable() || fc.excludeSet[t.URL] { continue } if t.IsBackup() { - backups = append(backups, t) + fc.backups = append(fc.backups, t) } else { - available = append(available, t) + fc.available = append(fc.available, t) } } - - if len(available) > 0 { - return available + if len(fc.available) > 0 { + return fc.available } - return backups + return fc.backups } // SelectExcluding 根据轮询策略选择一个目标,排除指定的目标列表。 // 只考虑健康且不在排除列表中的目标。 func (r *RoundRobin) SelectExcluding(targets []*Target, excluded []*Target) *Target { - available := filterHealthyAndExclude(targets, excluded) + fc := acquireFilterContext() + defer releaseFilterContext(fc) + available := filterIntoExcluding(fc, targets, excluded) if len(available) == 0 { return nil } - - // 原子地递增并获取计数器值 idx := r.counter.Add(1) - 1 return available[idx%uint64(len(available))] } @@ -430,16 +443,17 @@ func (r *RoundRobin) SelectExcluding(targets []*Target, excluded []*Target) *Tar // SelectExcluding 根据权重分布选择目标,排除指定的目标列表。 // 只考虑健康且不在排除列表中的目标。 func (w *WeightedRoundRobin) SelectExcluding(targets []*Target, excluded []*Target) *Target { - available := filterHealthyAndExclude(targets, excluded) + fc := acquireFilterContext() + defer releaseFilterContext(fc) + available := filterIntoExcluding(fc, targets, excluded) if len(available) == 0 { return nil } - // 计算总权重 totalWeight := 0 for _, t := range available { if t.Weight <= 0 { - totalWeight++ // 最小权重为 1 + totalWeight++ } else { totalWeight += t.Weight } @@ -449,11 +463,9 @@ func (w *WeightedRoundRobin) SelectExcluding(targets []*Target, excluded []*Targ return nil } - // 使用原子计数器确定权重分布中的位置 idx := w.counter.Add(1) - 1 pos := int(idx % uint64(totalWeight)) - // 找到计算位置处的目标 currentWeight := 0 for _, t := range available { weight := t.Weight @@ -466,14 +478,19 @@ func (w *WeightedRoundRobin) SelectExcluding(targets []*Target, excluded []*Targ } } - // 回退到最后一个目标(不应到达这里) return available[len(available)-1] } // SelectExcluding 选择连接数最少的目标,排除指定的目标列表。 // 优先选择非备份目标,仅当无可用非备份目标时选择备份目标。 func (l *LeastConnections) SelectExcluding(targets []*Target, excluded []*Target) *Target { - excludeSet := buildExcludeSet(excluded) + fc := acquireFilterContext() + defer releaseFilterContext(fc) + for _, t := range excluded { + if t != nil { + fc.excludeSet[t.URL] = true + } + } var selected *Target var selectedBackup *Target @@ -481,12 +498,10 @@ func (l *LeastConnections) SelectExcluding(targets []*Target, excluded []*Target var minBackupConns int64 = -1 for _, t := range targets { - if !t.IsAvailable() || excludeSet[t.URL] { + if !t.IsAvailable() || fc.excludeSet[t.URL] { continue } - conns := atomic.LoadInt64(&t.Connections) - if t.IsBackup() { if selectedBackup == nil || conns < minBackupConns { selectedBackup = t @@ -515,16 +530,13 @@ func (i *IPHash) SelectExcluding(targets []*Target, excluded []*Target) *Target // SelectExcludingByIP 基于提供的 IP 地址的哈希值选择目标,排除指定的目标列表。 // 只考虑健康且不在排除列表中的目标。 func (i *IPHash) SelectExcludingByIP(targets []*Target, excluded []*Target, clientIP string) *Target { - available := filterHealthyAndExclude(targets, excluded) + fc := acquireFilterContext() + defer releaseFilterContext(fc) + available := filterIntoExcluding(fc, targets, excluded) if len(available) == 0 { return nil } - - // 对客户端 IP 进行哈希 - h := fnv.New64a() - h.Write([]byte(clientIP)) - hash := h.Sum64() - + hash := fnvHash64a(clientIP) idx := hash % uint64(len(available)) return available[idx] } @@ -622,24 +634,4 @@ func (t *Target) LastResolved() time.Time { return time.Unix(0, nano) } -// buildExcludeSet 从排除列表构建 URL 集合。 -// -// 用于负载均衡算法中快速检查目标是否应被排除。 -// -// 参数: -// - excluded: 需要排除的目标列表 -// -// 返回值: -// - map[string]bool: 目标 URL 到 true 的映射 -func buildExcludeSet(excluded []*Target) map[string]bool { - if len(excluded) == 0 { - return nil - } - excludeSet := make(map[string]bool, len(excluded)) - for _, t := range excluded { - if t != nil { - excludeSet[t.URL] = true - } - } - return excludeSet -} + diff --git a/internal/loadbalance/balancer_test.go b/internal/loadbalance/balancer_test.go index 3fa122b..1c18c2c 100644 --- a/internal/loadbalance/balancer_test.go +++ b/internal/loadbalance/balancer_test.go @@ -575,15 +575,16 @@ func TestFilterHealthy(t *testing.T) { createHealthyTarget("http://backend4:8080", false), } - got := filterHealthy(targets) + fc := acquireFilterContext() + got := filterInto(fc, targets) + defer releaseFilterContext(fc) if len(got) != 2 { - t.Errorf("len(filterHealthy) = %d, want 2", len(got)) + t.Errorf("len(filterInto) = %d, want 2", len(got)) } - // 验证返回的都是健康目标 for _, target := range got { if !target.Healthy.Load() { - t.Errorf("filterHealthy 返回了不健康目标: %q", target.URL) + t.Errorf("filterInto returned unhealthy target: %q", target.URL) } } }) @@ -594,9 +595,11 @@ func TestFilterHealthy(t *testing.T) { createHealthyTarget("http://backend2:8080", true), } - got := filterHealthy(targets) + fc := acquireFilterContext() + got := filterInto(fc, targets) + defer releaseFilterContext(fc) if len(got) != 2 { - t.Errorf("len(filterHealthy) = %d, want 2", len(got)) + t.Errorf("len(filterInto) = %d, want 2", len(got)) } }) @@ -606,23 +609,29 @@ func TestFilterHealthy(t *testing.T) { createHealthyTarget("http://backend2:8080", false), } - got := filterHealthy(targets) + fc := acquireFilterContext() + got := filterInto(fc, targets) + defer releaseFilterContext(fc) if len(got) != 0 { - t.Errorf("len(filterHealthy) = %d, want 0", len(got)) + t.Errorf("len(filterInto) = %d, want 0", len(got)) } }) t.Run("空切片", func(_ *testing.T) { - got := filterHealthy([]*Target{}) + fc := acquireFilterContext() + got := filterInto(fc, []*Target{}) + defer releaseFilterContext(fc) if len(got) != 0 { - t.Errorf("len(filterHealthy) = %d, want 0", len(got)) + t.Errorf("len(filterInto) = %d, want 0", len(got)) } }) t.Run("nil切片", func(_ *testing.T) { - got := filterHealthy(nil) + fc := acquireFilterContext() + got := filterInto(fc, nil) + defer releaseFilterContext(fc) if len(got) != 0 { - t.Errorf("len(filterHealthy) = %d, want 0", len(got)) + t.Errorf("len(filterInto) = %d, want 0", len(got)) } }) } @@ -1255,7 +1264,9 @@ func TestFilterHealthyAndExclude(t *testing.T) { } excluded := []*Target{targets[0]} - got := filterHealthyAndExclude(targets, excluded) + fc := acquireFilterContext() + got := filterIntoExcluding(fc, targets, excluded) + defer releaseFilterContext(fc) if len(got) != 1 { t.Fatalf("len = %d, want 1", len(got)) } @@ -1270,14 +1281,18 @@ func TestFilterHealthyAndExclude(t *testing.T) { createHealthyTarget("http://backend2:8080", true), } - got := filterHealthyAndExclude(targets, nil) + fc := acquireFilterContext() + got := filterIntoExcluding(fc, targets, nil) + defer releaseFilterContext(fc) if len(got) != 2 { t.Fatalf("len = %d, want 2", len(got)) } }) t.Run("空目标列表", func(_ *testing.T) { - got := filterHealthyAndExclude(nil, []*Target{}) + fc := acquireFilterContext() + got := filterIntoExcluding(fc, nil, []*Target{}) + defer releaseFilterContext(fc) if len(got) != 0 { t.Errorf("len = %d, want 0", len(got)) } @@ -1289,7 +1304,9 @@ func TestFilterHealthyAndExclude(t *testing.T) { } excluded := []*Target{nil} - got := filterHealthyAndExclude(targets, excluded) + fc := acquireFilterContext() + got := filterIntoExcluding(fc, targets, excluded) + defer releaseFilterContext(fc) if len(got) != 1 { t.Fatalf("len = %d, want 1", len(got)) } @@ -1835,7 +1852,9 @@ func TestFilterHealthyBackup(t *testing.T) { backup := NewTargetFromConfig("http://backup:8080", 1, 0, 0, 0, true, false, "") targets := []*Target{primary, backup} - result := filterHealthy(targets) + fc := acquireFilterContext() + result := filterInto(fc, targets) + defer releaseFilterContext(fc) if len(result) != 1 || result[0].URL != "http://primary:8080" { t.Error("should prefer non-backup target") } @@ -1847,7 +1866,9 @@ func TestFilterHealthyBackup(t *testing.T) { backup := NewTargetFromConfig("http://backup:8080", 1, 0, 0, 0, true, false, "") targets := []*Target{primary, backup} - result := filterHealthy(targets) + fc := acquireFilterContext() + result := filterInto(fc, targets) + defer releaseFilterContext(fc) if len(result) != 1 || result[0].URL != "http://backup:8080" { t.Error("should fall back to backup target") } diff --git a/internal/loadbalance/consistent_hash.go b/internal/loadbalance/consistent_hash.go index 60caea5..3057ddd 100644 --- a/internal/loadbalance/consistent_hash.go +++ b/internal/loadbalance/consistent_hash.go @@ -16,7 +16,6 @@ package loadbalance import ( "fmt" - "hash/fnv" "slices" "sort" "sync" @@ -144,9 +143,7 @@ func (c *ConsistentHash) rebuildCircle(targets []*Target) { // hashKeyString 计算字符串的哈希值(使用 FNV-64a)。 func (c *ConsistentHash) hashKeyString(key string) uint64 { - h := fnv.New64a() - h.Write([]byte(key)) - return h.Sum64() + return fnvHash64a(key) } // PrecomputeHashes 预计算目标的虚拟节点哈希值。 @@ -246,42 +243,43 @@ func (c *ConsistentHash) SelectExcludingByKey(targets []*Target, excluded []*Tar c.mu.RLock() } - // 构建 targets 集合(用于校验返回的目标是否有效) - targetSet := make(map[string]bool, len(targets)) + fc := acquireFilterContext() + defer releaseFilterContext(fc) + + targetSet := fc.excludeSet for _, t := range targets { if t.Healthy.Load() { targetSet[t.URL] = true } } - - // 构建排除集合 - excludeSet := buildExcludeSet(excluded) + for _, t := range excluded { + if t != nil { + targetSet[t.URL] = false + } + } if len(c.sortedHashes) == 0 { c.mu.RUnlock() return nil } - // 计算键的哈希值 hash := c.hashKeyString(key) - // 二分查找起始位置 idx := sort.Search(len(c.sortedHashes), func(i int) bool { return c.sortedHashes[i] >= hash }) - // 从起始位置开始查找,跳过 excluded 和不在 targetSet 中的目标 for i := 0; i < len(c.sortedHashes); i++ { targetIdx := (idx + i) % len(c.sortedHashes) target := c.circle[c.sortedHashes[targetIdx]] - if targetSet[target.URL] && !excludeSet[target.URL] { + if targetSet[target.URL] { c.mu.RUnlock() return target } } c.mu.RUnlock() - return nil // 所有目标都被排除或不在 targets 列表中 + return nil } // 验证接口实现 diff --git a/internal/loadbalance/random.go b/internal/loadbalance/random.go index 04bead9..fca12c5 100644 --- a/internal/loadbalance/random.go +++ b/internal/loadbalance/random.go @@ -29,7 +29,9 @@ func NewRandom() *Random { // 随机选择两个候选,返回连接数较少的那个。 // 只考虑可用目标。如果没有可用目标则返回 nil。 func (r *Random) Select(targets []*Target) *Target { - available := filterHealthy(targets) + fc := acquireFilterContext() + defer releaseFilterContext(fc) + available := filterInto(fc, targets) if len(available) == 0 { return nil } @@ -38,7 +40,6 @@ func (r *Random) Select(targets []*Target) *Target { return available[0] } - // Power of Two Choices i := rand.IntN(len(available)) j := rand.IntN(len(available) - 1) if j >= i { @@ -54,7 +55,9 @@ func (r *Random) Select(targets []*Target) *Target { // SelectExcluding 使用 Power of Two Choices 算法选择目标,排除指定的目标列表。 func (r *Random) SelectExcluding(targets []*Target, excluded []*Target) *Target { - available := filterHealthyAndExclude(targets, excluded) + fc := acquireFilterContext() + defer releaseFilterContext(fc) + available := filterIntoExcluding(fc, targets, excluded) if len(available) == 0 { return nil }