diff --git a/internal/loadbalance/balancer.go b/internal/loadbalance/balancer.go index f0e63e7..5eb6463 100644 --- a/internal/loadbalance/balancer.go +++ b/internal/loadbalance/balancer.go @@ -67,6 +67,12 @@ type Target struct { failMu sync.Mutex failCount int64 failedUntil int64 + + // 慢启动相关字段 + // EffectiveWeight 当前有效权重(慢启动期间动态变化) + EffectiveWeight atomic.Int64 + // SlowStart 慢启动时间(配置) + SlowStart time.Duration `yaml:"slow_start"` } // Balancer 是 HTTP 代理(L7 层)负载均衡算法的接口。 @@ -266,6 +272,18 @@ func (i *IPHash) SelectByIP(targets []*Target, clientIP string) *Target { return healthy[idx] } +// GetEffectiveWeight 获取目标的有效权重。 +// +// 如果未配置慢启动或不在慢启动状态,返回配置权重。 +// 慢启动期间,权重从 1 逐渐增加到配置权重。 +func (t *Target) GetEffectiveWeight() int { + ew := t.EffectiveWeight.Load() + if ew == 0 { + return t.Weight // 未配置慢启动时返回配置权重 + } + return int(ew) +} + // IsAvailable 检查目标是否可用。 // 目标不可用的条件(优先级从高到低): // - Healthy 为 false(硬性不可用,由健康检查器设置) diff --git a/internal/loadbalance/slow_start.go b/internal/loadbalance/slow_start.go new file mode 100644 index 0000000..7799f05 --- /dev/null +++ b/internal/loadbalance/slow_start.go @@ -0,0 +1,191 @@ +// Package loadbalance 提供负载均衡算法实现。 +// +// 该文件实现 SlowStartManager 慢启动管理器,支持: +// - 目标恢复健康时渐进式增加权重 +// - 权重从 1 线性增加到配置权重 +// - EffectiveWeight 字段方案(零侵入) +// +// 主要用途: +// +// 防止刚恢复的后端服务器被大量请求压垮。 +// +// 作者:xfy +package loadbalance + +import ( + "sync" + "sync/atomic" + "time" +) + +// SlowStartManager 慢启动管理器。 +// +// 统一管理所有目标的慢启动状态和权重计算。 +// 使用 EffectiveWeight 字段方案,无需修改 Balancer 实现。 +type SlowStartManager struct { + targets map[string]*SlowStartState // key: target.URL + mu sync.RWMutex + interval time.Duration // 权重更新间隔 + stopCh chan struct{} + running atomic.Bool + + // 目标查找回调 + findTarget func(url string) *Target +} + +// SlowStartState 慢启动状态。 +type SlowStartState struct { + BaseWeight int // 配置的基础权重 + RecoverTime time.Time // 恢复健康的时间 + SlowStart time.Duration // 慢启动持续时间 +} + +// NewSlowStartManager 创建慢启动管理器。 +func NewSlowStartManager(interval time.Duration) *SlowStartManager { + if interval <= 0 { + interval = time.Second // 默认 1 秒更新一次 + } + + return &SlowStartManager{ + targets: make(map[string]*SlowStartState), + interval: interval, + stopCh: make(chan struct{}), + } +} + +// SetFindTarget 设置目标查找回调。 +func (m *SlowStartManager) SetFindTarget(fn func(url string) *Target) { + m.findTarget = fn +} + +// OnTargetHealthy 目标恢复健康时调用。 +// +// 初始化慢启动状态,设置 EffectiveWeight = 1。 +func (m *SlowStartManager) OnTargetHealthy(target *Target) { + if target.SlowStart <= 0 { + return // 未配置慢启动 + } + + m.mu.Lock() + defer m.mu.Unlock() + + m.targets[target.URL] = &SlowStartState{ + BaseWeight: target.Weight, + RecoverTime: time.Now(), + SlowStart: target.SlowStart, + } + + // 设置初始权重为 1 + target.EffectiveWeight.Store(1) +} + +// OnTargetUnhealthy 目标变为不健康时调用。 +// +// 清除慢启动状态,重置 EffectiveWeight = 0。 +func (m *SlowStartManager) OnTargetUnhealthy(target *Target) { + m.mu.Lock() + delete(m.targets, target.URL) + m.mu.Unlock() + + // 重置有效权重 + target.EffectiveWeight.Store(0) +} + +// Start 启动后台权重更新。 +// +// 定期遍历所有慢启动中的目标,计算并更新 EffectiveWeight。 +func (m *SlowStartManager) Start() { + if m.running.Swap(true) { + return // 已经在运行 + } + + go m.updateLoop() +} + +// Stop 停止后台更新。 +func (m *SlowStartManager) Stop() { + if !m.running.Load() { + return + } + close(m.stopCh) +} + +// updateLoop 后台更新循环。 +func (m *SlowStartManager) updateLoop() { + ticker := time.NewTicker(m.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + m.updateEffectiveWeights() + case <-m.stopCh: + return + } + } +} + +// updateEffectiveWeights 更新所有慢启动目标的有效权重。 +func (m *SlowStartManager) updateEffectiveWeights() { + now := time.Now() + var toDelete []string + + m.mu.Lock() + defer m.mu.Unlock() + + for url, state := range m.targets { + elapsed := now.Sub(state.RecoverTime) + if elapsed >= state.SlowStart { + // 慢启动完成,标记删除 + toDelete = append(toDelete, url) + continue + } + + // 线性增长:从 1 增加到 BaseWeight + progress := float64(elapsed) / float64(state.SlowStart) + effectiveWeight := int(1 + progress*float64(state.BaseWeight-1)) + if effectiveWeight < 1 { + effectiveWeight = 1 + } + if effectiveWeight > state.BaseWeight { + effectiveWeight = state.BaseWeight + } + + // 查找目标并更新 EffectiveWeight + if m.findTarget != nil { + if target := m.findTarget(url); target != nil { + target.EffectiveWeight.Store(int64(effectiveWeight)) + } + } + } + + // 删除已完成的慢启动状态 + for _, url := range toDelete { + delete(m.targets, url) + // 重置 EffectiveWeight 为 0(表示使用配置权重) + if m.findTarget != nil { + if target := m.findTarget(url); target != nil { + target.EffectiveWeight.Store(0) + } + } + } +} + +// GetState 获取目标的慢启动状态(用于调试/监控)。 +func (m *SlowStartManager) GetState(url string) *SlowStartState { + m.mu.RLock() + defer m.mu.RUnlock() + return m.targets[url] +} + +// GetAllStates 获取所有慢启动状态(用于调试/监控)。 +func (m *SlowStartManager) GetAllStates() map[string]*SlowStartState { + m.mu.RLock() + defer m.mu.RUnlock() + + result := make(map[string]*SlowStartState, len(m.targets)) + for k, v := range m.targets { + result[k] = v + } + return result +} diff --git a/internal/loadbalance/slow_start_test.go b/internal/loadbalance/slow_start_test.go new file mode 100644 index 0000000..c3501e6 --- /dev/null +++ b/internal/loadbalance/slow_start_test.go @@ -0,0 +1,176 @@ +package loadbalance + +import ( + "sync" + "testing" + "time" +) + +func TestSlowStartManager_OnTargetHealthy(t *testing.T) { + mgr := NewSlowStartManager(100 * time.Millisecond) + + target := &Target{ + URL: "http://backend:8080", + Weight: 10, + SlowStart: 5 * time.Second, + } + + mgr.OnTargetHealthy(target) + + // 验证初始权重为 1 + if target.EffectiveWeight.Load() != 1 { + t.Errorf("EffectiveWeight = %d, want 1", target.EffectiveWeight.Load()) + } + + // 验证状态已记录 + state := mgr.GetState(target.URL) + if state == nil { + t.Fatal("state should not be nil") + } + if state.BaseWeight != 10 { + t.Errorf("BaseWeight = %d, want 10", state.BaseWeight) + } +} + +func TestSlowStartManager_OnTargetUnhealthy(t *testing.T) { + mgr := NewSlowStartManager(100 * time.Millisecond) + + target := &Target{ + URL: "http://backend:8080", + Weight: 10, + SlowStart: 5 * time.Second, + } + + mgr.OnTargetHealthy(target) + mgr.OnTargetUnhealthy(target) + + // 验证状态已清除 + state := mgr.GetState(target.URL) + if state != nil { + t.Error("state should be nil after unhealthy") + } + + // 验证权重已重置 + if target.EffectiveWeight.Load() != 0 { + t.Errorf("EffectiveWeight = %d, want 0", target.EffectiveWeight.Load()) + } +} + +func TestSlowStartManager_WeightProgression(t *testing.T) { + mgr := NewSlowStartManager(50 * time.Millisecond) + + target := &Target{ + URL: "http://backend:8080", + Weight: 10, + SlowStart: 200 * time.Millisecond, + } + + var mu sync.Mutex + targets := map[string]*Target{target.URL: target} + mgr.SetFindTarget(func(url string) *Target { + mu.Lock() + defer mu.Unlock() + return targets[url] + }) + + mgr.OnTargetHealthy(target) + mgr.Start() + defer mgr.Stop() + + // 等待慢启动完成 + time.Sleep(300 * time.Millisecond) + + // 验证权重已达到配置值或重置为 0 + ew := target.EffectiveWeight.Load() + if ew != 0 && ew != int64(target.Weight) { + t.Errorf("EffectiveWeight = %d, want 0 or %d", ew, target.Weight) + } +} + +func TestSlowStartManager_NoSlowStart(t *testing.T) { + mgr := NewSlowStartManager(100 * time.Millisecond) + + target := &Target{ + URL: "http://backend:8080", + Weight: 10, + SlowStart: 0, // 未配置慢启动 + } + + mgr.OnTargetHealthy(target) + + // 验证没有设置有效权重 + if target.EffectiveWeight.Load() != 0 { + t.Errorf("EffectiveWeight = %d, want 0 (no slow start)", target.EffectiveWeight.Load()) + } + + // 验证状态未记录 + state := mgr.GetState(target.URL) + if state != nil { + t.Error("state should be nil when slow_start is 0") + } +} + +func TestTarget_GetEffectiveWeight(t *testing.T) { + tests := []struct { + name string + weight int + effectiveWeight int64 + want int + }{ + { + name: "no slow start", + weight: 10, + effectiveWeight: 0, + want: 10, + }, + { + name: "slow start in progress", + weight: 10, + effectiveWeight: 5, + want: 5, + }, + { + name: "slow start at 1", + weight: 10, + effectiveWeight: 1, + want: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + target := &Target{ + Weight: tt.weight, + } + target.EffectiveWeight.Store(tt.effectiveWeight) + + got := target.GetEffectiveWeight() + if got != tt.want { + t.Errorf("GetEffectiveWeight() = %d, want %d", got, tt.want) + } + }) + } +} + +func TestSlowStartManager_GetAllStates(t *testing.T) { + mgr := NewSlowStartManager(100 * time.Millisecond) + + target1 := &Target{ + URL: "http://backend1:8080", + Weight: 10, + SlowStart: 5 * time.Second, + } + target2 := &Target{ + URL: "http://backend2:8080", + Weight: 5, + SlowStart: 3 * time.Second, + } + + mgr.OnTargetHealthy(target1) + mgr.OnTargetHealthy(target2) + + states := mgr.GetAllStates() + if len(states) != 2 { + t.Errorf("len(states) = %d, want 2", len(states)) + } +}