feat(loadbalance): 实现慢启动功能

- 添加 SlowStartManager 统一管理目标慢启动状态
- Target 新增 EffectiveWeight 和 SlowStart 字段
- 实现 GetEffectiveWeight 方法支持动态权重
- 权重从 1 线性增加到配置权重,防止恢复服务器被压垮

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
xfy 2026-04-22 13:15:09 +08:00
parent aae378433e
commit 26a7a58265
3 changed files with 385 additions and 0 deletions

View File

@ -67,6 +67,12 @@ type Target struct {
failMu sync.Mutex
failCount int64
failedUntil int64
// 慢启动相关字段
// EffectiveWeight 当前有效权重(慢启动期间动态变化)
EffectiveWeight atomic.Int64
// SlowStart 慢启动时间(配置)
SlowStart time.Duration `yaml:"slow_start"`
}
// Balancer 是 HTTP 代理L7 层)负载均衡算法的接口。
@ -266,6 +272,18 @@ func (i *IPHash) SelectByIP(targets []*Target, clientIP string) *Target {
return healthy[idx]
}
// GetEffectiveWeight 获取目标的有效权重。
//
// 如果未配置慢启动或不在慢启动状态,返回配置权重。
// 慢启动期间,权重从 1 逐渐增加到配置权重。
func (t *Target) GetEffectiveWeight() int {
ew := t.EffectiveWeight.Load()
if ew == 0 {
return t.Weight // 未配置慢启动时返回配置权重
}
return int(ew)
}
// IsAvailable 检查目标是否可用。
// 目标不可用的条件(优先级从高到低):
// - Healthy 为 false硬性不可用由健康检查器设置

View File

@ -0,0 +1,191 @@
// Package loadbalance 提供负载均衡算法实现。
//
// 该文件实现 SlowStartManager 慢启动管理器,支持:
// - 目标恢复健康时渐进式增加权重
// - 权重从 1 线性增加到配置权重
// - EffectiveWeight 字段方案(零侵入)
//
// 主要用途:
//
// 防止刚恢复的后端服务器被大量请求压垮。
//
// 作者xfy
package loadbalance
import (
"sync"
"sync/atomic"
"time"
)
// SlowStartManager 慢启动管理器。
//
// 统一管理所有目标的慢启动状态和权重计算。
// 使用 EffectiveWeight 字段方案,无需修改 Balancer 实现。
type SlowStartManager struct {
targets map[string]*SlowStartState // key: target.URL
mu sync.RWMutex
interval time.Duration // 权重更新间隔
stopCh chan struct{}
running atomic.Bool
// 目标查找回调
findTarget func(url string) *Target
}
// SlowStartState 慢启动状态。
type SlowStartState struct {
BaseWeight int // 配置的基础权重
RecoverTime time.Time // 恢复健康的时间
SlowStart time.Duration // 慢启动持续时间
}
// NewSlowStartManager 创建慢启动管理器。
func NewSlowStartManager(interval time.Duration) *SlowStartManager {
if interval <= 0 {
interval = time.Second // 默认 1 秒更新一次
}
return &SlowStartManager{
targets: make(map[string]*SlowStartState),
interval: interval,
stopCh: make(chan struct{}),
}
}
// SetFindTarget 设置目标查找回调。
func (m *SlowStartManager) SetFindTarget(fn func(url string) *Target) {
m.findTarget = fn
}
// OnTargetHealthy 目标恢复健康时调用。
//
// 初始化慢启动状态,设置 EffectiveWeight = 1。
func (m *SlowStartManager) OnTargetHealthy(target *Target) {
if target.SlowStart <= 0 {
return // 未配置慢启动
}
m.mu.Lock()
defer m.mu.Unlock()
m.targets[target.URL] = &SlowStartState{
BaseWeight: target.Weight,
RecoverTime: time.Now(),
SlowStart: target.SlowStart,
}
// 设置初始权重为 1
target.EffectiveWeight.Store(1)
}
// OnTargetUnhealthy 目标变为不健康时调用。
//
// 清除慢启动状态,重置 EffectiveWeight = 0。
func (m *SlowStartManager) OnTargetUnhealthy(target *Target) {
m.mu.Lock()
delete(m.targets, target.URL)
m.mu.Unlock()
// 重置有效权重
target.EffectiveWeight.Store(0)
}
// Start 启动后台权重更新。
//
// 定期遍历所有慢启动中的目标,计算并更新 EffectiveWeight。
func (m *SlowStartManager) Start() {
if m.running.Swap(true) {
return // 已经在运行
}
go m.updateLoop()
}
// Stop 停止后台更新。
func (m *SlowStartManager) Stop() {
if !m.running.Load() {
return
}
close(m.stopCh)
}
// updateLoop 后台更新循环。
func (m *SlowStartManager) updateLoop() {
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
m.updateEffectiveWeights()
case <-m.stopCh:
return
}
}
}
// updateEffectiveWeights 更新所有慢启动目标的有效权重。
func (m *SlowStartManager) updateEffectiveWeights() {
now := time.Now()
var toDelete []string
m.mu.Lock()
defer m.mu.Unlock()
for url, state := range m.targets {
elapsed := now.Sub(state.RecoverTime)
if elapsed >= state.SlowStart {
// 慢启动完成,标记删除
toDelete = append(toDelete, url)
continue
}
// 线性增长:从 1 增加到 BaseWeight
progress := float64(elapsed) / float64(state.SlowStart)
effectiveWeight := int(1 + progress*float64(state.BaseWeight-1))
if effectiveWeight < 1 {
effectiveWeight = 1
}
if effectiveWeight > state.BaseWeight {
effectiveWeight = state.BaseWeight
}
// 查找目标并更新 EffectiveWeight
if m.findTarget != nil {
if target := m.findTarget(url); target != nil {
target.EffectiveWeight.Store(int64(effectiveWeight))
}
}
}
// 删除已完成的慢启动状态
for _, url := range toDelete {
delete(m.targets, url)
// 重置 EffectiveWeight 为 0表示使用配置权重
if m.findTarget != nil {
if target := m.findTarget(url); target != nil {
target.EffectiveWeight.Store(0)
}
}
}
}
// GetState 获取目标的慢启动状态(用于调试/监控)。
func (m *SlowStartManager) GetState(url string) *SlowStartState {
m.mu.RLock()
defer m.mu.RUnlock()
return m.targets[url]
}
// GetAllStates 获取所有慢启动状态(用于调试/监控)。
func (m *SlowStartManager) GetAllStates() map[string]*SlowStartState {
m.mu.RLock()
defer m.mu.RUnlock()
result := make(map[string]*SlowStartState, len(m.targets))
for k, v := range m.targets {
result[k] = v
}
return result
}

View File

@ -0,0 +1,176 @@
package loadbalance
import (
"sync"
"testing"
"time"
)
func TestSlowStartManager_OnTargetHealthy(t *testing.T) {
mgr := NewSlowStartManager(100 * time.Millisecond)
target := &Target{
URL: "http://backend:8080",
Weight: 10,
SlowStart: 5 * time.Second,
}
mgr.OnTargetHealthy(target)
// 验证初始权重为 1
if target.EffectiveWeight.Load() != 1 {
t.Errorf("EffectiveWeight = %d, want 1", target.EffectiveWeight.Load())
}
// 验证状态已记录
state := mgr.GetState(target.URL)
if state == nil {
t.Fatal("state should not be nil")
}
if state.BaseWeight != 10 {
t.Errorf("BaseWeight = %d, want 10", state.BaseWeight)
}
}
func TestSlowStartManager_OnTargetUnhealthy(t *testing.T) {
mgr := NewSlowStartManager(100 * time.Millisecond)
target := &Target{
URL: "http://backend:8080",
Weight: 10,
SlowStart: 5 * time.Second,
}
mgr.OnTargetHealthy(target)
mgr.OnTargetUnhealthy(target)
// 验证状态已清除
state := mgr.GetState(target.URL)
if state != nil {
t.Error("state should be nil after unhealthy")
}
// 验证权重已重置
if target.EffectiveWeight.Load() != 0 {
t.Errorf("EffectiveWeight = %d, want 0", target.EffectiveWeight.Load())
}
}
func TestSlowStartManager_WeightProgression(t *testing.T) {
mgr := NewSlowStartManager(50 * time.Millisecond)
target := &Target{
URL: "http://backend:8080",
Weight: 10,
SlowStart: 200 * time.Millisecond,
}
var mu sync.Mutex
targets := map[string]*Target{target.URL: target}
mgr.SetFindTarget(func(url string) *Target {
mu.Lock()
defer mu.Unlock()
return targets[url]
})
mgr.OnTargetHealthy(target)
mgr.Start()
defer mgr.Stop()
// 等待慢启动完成
time.Sleep(300 * time.Millisecond)
// 验证权重已达到配置值或重置为 0
ew := target.EffectiveWeight.Load()
if ew != 0 && ew != int64(target.Weight) {
t.Errorf("EffectiveWeight = %d, want 0 or %d", ew, target.Weight)
}
}
func TestSlowStartManager_NoSlowStart(t *testing.T) {
mgr := NewSlowStartManager(100 * time.Millisecond)
target := &Target{
URL: "http://backend:8080",
Weight: 10,
SlowStart: 0, // 未配置慢启动
}
mgr.OnTargetHealthy(target)
// 验证没有设置有效权重
if target.EffectiveWeight.Load() != 0 {
t.Errorf("EffectiveWeight = %d, want 0 (no slow start)", target.EffectiveWeight.Load())
}
// 验证状态未记录
state := mgr.GetState(target.URL)
if state != nil {
t.Error("state should be nil when slow_start is 0")
}
}
func TestTarget_GetEffectiveWeight(t *testing.T) {
tests := []struct {
name string
weight int
effectiveWeight int64
want int
}{
{
name: "no slow start",
weight: 10,
effectiveWeight: 0,
want: 10,
},
{
name: "slow start in progress",
weight: 10,
effectiveWeight: 5,
want: 5,
},
{
name: "slow start at 1",
weight: 10,
effectiveWeight: 1,
want: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
target := &Target{
Weight: tt.weight,
}
target.EffectiveWeight.Store(tt.effectiveWeight)
got := target.GetEffectiveWeight()
if got != tt.want {
t.Errorf("GetEffectiveWeight() = %d, want %d", got, tt.want)
}
})
}
}
func TestSlowStartManager_GetAllStates(t *testing.T) {
mgr := NewSlowStartManager(100 * time.Millisecond)
target1 := &Target{
URL: "http://backend1:8080",
Weight: 10,
SlowStart: 5 * time.Second,
}
target2 := &Target{
URL: "http://backend2:8080",
Weight: 5,
SlowStart: 3 * time.Second,
}
mgr.OnTargetHealthy(target1)
mgr.OnTargetHealthy(target2)
states := mgr.GetAllStates()
if len(states) != 2 {
t.Errorf("len(states) = %d, want 2", len(states))
}
}