主要变更: - WebSocket 代理支持 (internal/proxy/websocket.go) - OCSP stapling 实现 (internal/ssl/ocsp.go) - 监控状态端点 (internal/server/status.go) - 新增 nginx 模块文档 (19-24) - UDP 代理超时配置支持 - 多模块代码注释完善和功能增强 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
218 lines
5.7 KiB
Go
218 lines
5.7 KiB
Go
// Package loadbalance 提供负载均衡算法实现。
|
||
//
|
||
// 该文件包含负载均衡相关的核心定义,包括:
|
||
// - Target 目标结构体定义
|
||
// - Balancer 接口定义
|
||
// - ValidAlgorithms 有效算法列表
|
||
//
|
||
// 主要用途:
|
||
// 用于定义负载均衡的标准接口和目标结构,支持多种负载均衡算法。
|
||
//
|
||
// 注意事项:
|
||
// - 所有实现必须并发安全
|
||
// - 目标的健康状态使用 atomic.Bool
|
||
//
|
||
// 作者:xfy
|
||
package loadbalance
|
||
|
||
import (
|
||
"hash/fnv"
|
||
"sync/atomic"
|
||
)
|
||
|
||
// Target 表示负载均衡的后端服务器目标。
|
||
// 所有字段都设计为使用原子操作进行并发访问(如适用)。
|
||
type Target struct {
|
||
// URL 是目标地址,例如 "http://backend1:8080"
|
||
URL string
|
||
|
||
// Weight 是此目标在权重算法中的权重值。
|
||
// 权重越高,表示有更多请求会被路由到此目标。
|
||
Weight int
|
||
|
||
// Healthy 表示此目标是否健康可用。
|
||
// 使用 atomic.Bool 保证并发安全。
|
||
Healthy atomic.Bool
|
||
|
||
// Connections 跟踪当前活跃连接数。
|
||
// 并发修改此字段时应使用原子操作。
|
||
Connections int64
|
||
}
|
||
|
||
// Balancer 是负载均衡算法的接口。
|
||
// 实现必须是并发安全的。
|
||
type Balancer interface {
|
||
// Select 根据算法策略从提供的列表中选择一个目标。
|
||
// 如果没有健康目标可用,返回 nil。
|
||
Select(targets []*Target) *Target
|
||
}
|
||
|
||
// RoundRobin 实现简单的轮询负载均衡。
|
||
// 它按顺序将请求均匀分配到所有健康目标上。
|
||
type RoundRobin struct {
|
||
// counter 原子地为每个请求递增
|
||
counter uint64
|
||
}
|
||
|
||
// NewRoundRobin 创建一个新的轮询负载均衡器。
|
||
func NewRoundRobin() *RoundRobin {
|
||
return &RoundRobin{}
|
||
}
|
||
|
||
// Select 选择轮询顺序中的下一个目标。
|
||
// 只考虑健康目标。如果没有健康目标则返回 nil。
|
||
func (r *RoundRobin) Select(targets []*Target) *Target {
|
||
healthy := filterHealthy(targets)
|
||
if len(healthy) == 0 {
|
||
return nil
|
||
}
|
||
|
||
// 原子地递增并获取计数器值
|
||
idx := atomic.AddUint64(&r.counter, 1) - 1
|
||
return healthy[idx%uint64(len(healthy))]
|
||
}
|
||
|
||
// WeightedRoundRobin 实现权重轮询负载均衡。
|
||
// 权重越高的目标接收成比例更多的请求。
|
||
type WeightedRoundRobin struct {
|
||
// counter 原子地为每个请求递增
|
||
counter uint64
|
||
}
|
||
|
||
// NewWeightedRoundRobin 创建一个新的权重轮询负载均衡器。
|
||
func NewWeightedRoundRobin() *WeightedRoundRobin {
|
||
return &WeightedRoundRobin{}
|
||
}
|
||
|
||
// Select 基于权重分布选择目标。
|
||
// 只考虑健康目标。如果没有健康目标则返回 nil。
|
||
func (w *WeightedRoundRobin) Select(targets []*Target) *Target {
|
||
healthy := filterHealthy(targets)
|
||
if len(healthy) == 0 {
|
||
return nil
|
||
}
|
||
|
||
// 计算总权重
|
||
totalWeight := 0
|
||
for _, t := range healthy {
|
||
if t.Weight <= 0 {
|
||
totalWeight += 1 // 最小权重为 1
|
||
} else {
|
||
totalWeight += t.Weight
|
||
}
|
||
}
|
||
|
||
if totalWeight == 0 {
|
||
return nil
|
||
}
|
||
|
||
// 使用原子计数器确定权重分布中的位置
|
||
idx := atomic.AddUint64(&w.counter, 1) - 1
|
||
pos := int(idx % uint64(totalWeight))
|
||
|
||
// 找到计算位置处的目标
|
||
currentWeight := 0
|
||
for _, t := range healthy {
|
||
weight := t.Weight
|
||
if weight <= 0 {
|
||
weight = 1
|
||
}
|
||
currentWeight += weight
|
||
if pos < currentWeight {
|
||
return t
|
||
}
|
||
}
|
||
|
||
// 回退到最后一个目标(不应到达这里)
|
||
return healthy[len(healthy)-1]
|
||
}
|
||
|
||
// LeastConnections 实现最少连接负载均衡。
|
||
// 它选择活跃连接数最少的目标。
|
||
type LeastConnections struct{}
|
||
|
||
// NewLeastConnections 创建一个新的最少连接负载均衡器。
|
||
func NewLeastConnections() *LeastConnections {
|
||
return &LeastConnections{}
|
||
}
|
||
|
||
// Select 选择连接数最少的目标。
|
||
// 只考虑健康目标。如果没有健康目标则返回 nil。
|
||
func (l *LeastConnections) Select(targets []*Target) *Target {
|
||
var selected *Target
|
||
var minConns int64 = -1
|
||
|
||
for _, t := range targets {
|
||
if !t.Healthy.Load() {
|
||
continue
|
||
}
|
||
|
||
// 原子地读取连接计数
|
||
conns := atomic.LoadInt64(&t.Connections)
|
||
|
||
if selected == nil || conns < minConns {
|
||
selected = t
|
||
minConns = conns
|
||
}
|
||
}
|
||
|
||
return selected
|
||
}
|
||
|
||
// IPHash 实现基于 IP 哈希的负载均衡。
|
||
// 它将来自同一客户端 IP 的请求始终路由到同一目标。
|
||
type IPHash struct{}
|
||
|
||
// NewIPHash 创建一个新的 IP 哈希负载均衡器。
|
||
func NewIPHash() *IPHash {
|
||
return &IPHash{}
|
||
}
|
||
|
||
// Select 基于客户端 IP 的哈希值选择目标。
|
||
// 只考虑健康目标。如果没有健康目标则返回 nil。
|
||
// clientIP 参数应该是客户端的 IP 地址字符串。
|
||
func (i *IPHash) Select(targets []*Target) *Target {
|
||
return i.SelectByIP(targets, "")
|
||
}
|
||
|
||
// SelectByIP 基于提供的 IP 地址的哈希值选择目标。
|
||
// 只考虑健康目标。如果没有健康目标则返回 nil。
|
||
func (i *IPHash) SelectByIP(targets []*Target, clientIP string) *Target {
|
||
healthy := filterHealthy(targets)
|
||
if len(healthy) == 0 {
|
||
return nil
|
||
}
|
||
|
||
// 对客户端 IP 进行哈希
|
||
h := fnv.New64a()
|
||
h.Write([]byte(clientIP))
|
||
hash := h.Sum64()
|
||
|
||
idx := hash % uint64(len(healthy))
|
||
return healthy[idx]
|
||
}
|
||
|
||
// filterHealthy 返回仅包含健康目标的新切片。
|
||
// 这是负载均衡实现使用的辅助函数。
|
||
func filterHealthy(targets []*Target) []*Target {
|
||
healthy := make([]*Target, 0, len(targets))
|
||
for _, t := range targets {
|
||
if t.Healthy.Load() {
|
||
healthy = append(healthy, t)
|
||
}
|
||
}
|
||
return healthy
|
||
}
|
||
|
||
// IncrementConnections 原子地增加目标的连接计数。
|
||
// 当新连接建立时应调用此函数。
|
||
func IncrementConnections(t *Target) {
|
||
atomic.AddInt64(&t.Connections, 1)
|
||
}
|
||
|
||
// DecrementConnections 原子地减少目标的连接计数。
|
||
// 当连接关闭时应调用此函数。
|
||
func DecrementConnections(t *Target) {
|
||
atomic.AddInt64(&t.Connections, -1)
|
||
}
|