feat(config,loadbalance): 添加上游服务器参数和 random 负载均衡算法

Target 新增 MaxConns/MaxFails/FailTimeout/Backup/Down/ProxyURI 字段,
实现 IsAvailable/RecordFailure/RecordSuccess 软失败机制,
filterHealthy 支持备份服务器优先级选择,
新增 random(Power of Two Choices)负载均衡算法。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
xfy 2026-04-21 11:28:02 +08:00
parent dafe96b5a3
commit c0b7e30bf0
4 changed files with 301 additions and 42 deletions

View File

@ -340,6 +340,7 @@ type ProxyConfig struct {
RedirectRewrite *RedirectRewriteConfig `yaml:"redirect_rewrite"`
ProxySSL *ProxySSLConfig `yaml:"proxy_ssl"`
CacheValid *ProxyCacheValidConfig `yaml:"cache_valid"`
Buffering *ProxyBufferingConfig `yaml:"buffering"`
// 切片字段
Targets []ProxyTarget `yaml:"targets"`
// 字符串字段
@ -347,6 +348,7 @@ type ProxyConfig struct {
LoadBalance string `yaml:"load_balance"`
HashKey string `yaml:"hash_key"`
ClientMaxBodySize string `yaml:"client_max_body_size"`
ProxyBind string `yaml:"proxy_bind"`
// 结构体字段
Headers ProxyHeaders `yaml:"headers"`
BalancerByLua BalancerByLuaConfig `yaml:"balancer_by_lua"`
@ -370,6 +372,26 @@ type ProxyConfig struct {
Internal bool `yaml:"internal"`
}
// ProxyBufferingConfig 代理缓冲配置。
//
// 控制代理响应的缓冲行为:
// - "default" 或 "on": 缓冲响应到内存/临时文件
// - "off": 流式转发响应,不缓冲
//
// 使用示例:
//
// buffering:
// mode: "off"
type ProxyBufferingConfig struct {
// Mode 缓冲模式
// 可选值:"default"(默认缓冲), "on"(强制缓冲), "off"(关闭缓冲)
Mode string `yaml:"mode"`
// BufferSize 响应缓冲区大小(字节)
// 0 表示使用默认值
BufferSize int `yaml:"buffer_size"`
}
// BalancerByLuaConfig Lua 负载均衡配置
//
// 使用 Lua 脚本动态选择后端目标,支持自定义负载均衡逻辑。
@ -425,6 +447,30 @@ type ProxyTarget struct {
// Weight 权重
// 用于加权轮询算法,值越大分配的请求越多
Weight int `yaml:"weight"`
// MaxConns 最大并发连接数
// 0 表示不限制
MaxConns int `yaml:"max_conns"`
// MaxFails 最大失败次数
// 在 FailTimeout 期间失败次数达到此值后标记为不可用
// 0 表示不进行被动失败检测
MaxFails int `yaml:"max_fails"`
// FailTimeout 失败超时时间
// 达到 MaxFails 后,目标在此时间内被视为不可用
FailTimeout time.Duration `yaml:"fail_timeout"`
// Backup 备份服务器
// 仅当所有非备份服务器不可用时才使用
Backup bool `yaml:"backup"`
// Down 标记服务器为永久不可用
Down bool `yaml:"down"`
// ProxyURI 代理传递的 URI 路径
// 设置后替换请求路径,支持 nginx proxy_pass URI 语义
ProxyURI string `yaml:"proxy_uri"`
}
// HealthCheckConfig 健康检查配置。
@ -508,6 +554,26 @@ type ProxyHeaders struct {
// Remove 移除的头部
// 从发送到后端的请求中移除的头部列表
Remove []string `yaml:"remove"`
// HideResponse 隐藏的响应头
// 从返回给客户端的响应中移除的头部列表
HideResponse []string `yaml:"hide_response"`
// PassResponse 允许传递的响应头
// 仅传递列出的头部,其他全部隐藏(白名单模式)
PassResponse []string `yaml:"pass_response"`
// IgnoreHeaders 忽略的头部
// 代理时完全忽略这些头部,不转发到后端也不返回给客户端
IgnoreHeaders []string `yaml:"ignore_headers"`
// CookieDomain Cookie 域重写
// 将响应中 Set-Cookie 的 domain 替换为此值
CookieDomain string `yaml:"cookie_domain"`
// CookiePath Cookie 路径重写
// 将响应中 Set-Cookie 的 path 替换为此值
CookiePath string `yaml:"cookie_path"`
}
// ProxyCacheConfig 代理缓存配置。

View File

@ -31,6 +31,7 @@ var ValidAlgorithms = []string{
"least_conn",
"ip_hash",
"consistent_hash",
"random",
}
// IsValidAlgorithm 检查给定的算法名称是否有效。

View File

@ -49,6 +49,24 @@ type Target struct {
lastResolved atomic.Int64
hostnameOnce sync.Once
Healthy atomic.Bool
// MaxConns 最大并发连接数0 表示不限制
MaxConns int64
// MaxFails 最大失败次数0 表示不检测
MaxFails int64
// FailTimeout 失败冷却时间
FailTimeout time.Duration
// Backup 备份服务器标记
Backup bool
// Down 永久不可用标记
Down bool
// ProxyURI 代理传递的 URI 路径
ProxyURI string
// failCount 失败计数(原子操作)
failCount atomic.Int64
// failedUntil 失败冷却截止时间UnixNano原子操作
failedUntil atomic.Int64
}
// Balancer 是 HTTP 代理L7 层)负载均衡算法的接口。
@ -248,24 +266,103 @@ func (i *IPHash) SelectByIP(targets []*Target, clientIP string) *Target {
return healthy[idx]
}
// filterHealthy 从目标列表中筛选出所有健康的目标,返回新切片。
//
// 该函数遍历输入切片,仅保留 Healthy 状态为 true 的目标。
// 返回的切片容量与输入相同,避免多次内存分配。
//
// 参数:
// - targets: 原始目标列表
//
// 返回值:
// - 仅包含健康目标的新切片,当无健康目标时返回空切片
func filterHealthy(targets []*Target) []*Target {
healthy := make([]*Target, 0, len(targets))
for _, t := range targets {
if t.Healthy.Load() {
healthy = append(healthy, t)
// IsAvailable 检查目标是否可用。
// 目标不可用的条件(优先级从高到低):
// - Healthy 为 false硬性不可用由健康检查器设置
// - Down 为 true配置标记永久不可用
// - 超过 MaxConns 限制
// - 失败冷却期内failCount >= MaxFails 且未超过 FailTimeout
func (t *Target) IsAvailable() bool {
if !t.Healthy.Load() || t.Down {
return false
}
if t.MaxConns > 0 && atomic.LoadInt64(&t.Connections) >= t.MaxConns {
return false
}
if t.MaxFails > 0 {
deadline := t.failedUntil.Load()
if deadline > 0 {
if time.Now().UnixNano() < deadline {
return false
}
// 冷却已过期CAS 重置防止丢失并发的 RecordFailure
if t.failedUntil.CompareAndSwap(deadline, 0) {
t.failCount.Store(0)
}
}
}
return healthy
return true
}
// RecordFailure 记录一次失败。
// 使用原子递增 failCount当达到 MaxFails 时设置冷却截止时间。
// 返回当前失败计数。
func (t *Target) RecordFailure() int64 {
if t.MaxFails <= 0 {
return 0
}
count := t.failCount.Add(1)
if count >= t.MaxFails {
timeout := t.FailTimeout
if timeout <= 0 {
timeout = 10 * time.Second
}
t.failedUntil.Store(time.Now().Add(timeout).UnixNano())
}
return count
}
// RecordSuccess 记录一次成功,重置软失败状态。
// 仅重置 failCount 和 failedUntil不修改 Healthy健康检查器权威
func (t *Target) RecordSuccess() {
if t.MaxFails <= 0 {
return
}
// CAS 重置:仅在当前 goroutine 持有 deadline 时才清零,
// 防止丢失并发的 RecordFailure 设置的新 deadline。
for {
deadline := t.failedUntil.Load()
if deadline == 0 {
break
}
if t.failedUntil.CompareAndSwap(deadline, 0) {
break
}
}
t.failCount.Store(0)
}
// IsBackup 返回目标是否为备份服务器。
func (t *Target) IsBackup() bool {
return t.Backup
}
// filterHealthy 从目标列表中筛选出所有可用的目标,返回新切片。
//
// 选择逻辑:
// 1. 优先选择非备份且可用的目标IsAvailable() == true && !IsBackup()
// 2. 如果没有非备份目标可用,则选择可用的备份目标
//
// 返回的切片容量与输入相同,避免多次内存分配。
func filterHealthy(targets []*Target) []*Target {
available := make([]*Target, 0, len(targets))
var backups []*Target
for _, t := range targets {
if !t.IsAvailable() {
continue
}
if t.IsBackup() {
backups = append(backups, t)
} else {
available = append(available, t)
}
}
if len(available) > 0 {
return available
}
return backups
}
// IncrementConnections 原子地增加目标的连接计数。
@ -280,19 +377,14 @@ func DecrementConnections(t *Target) {
atomic.AddInt64(&t.Connections, -1)
}
// filterHealthyAndExclude 从目标列表中筛选出健康且不在排除列表中的目标,返回新切片。
// filterHealthyAndExclude 从目标列表中筛选出可用且不在排除列表中的目标,返回新切片。
//
// 选择逻辑与 filterHealthy 相同:
// 1. 优先选择非备份且可用的目标
// 2. 如果没有非备份目标可用,则选择可用的备份目标
//
// 该函数用于 SelectExcluding 方法,同时处理健康过滤和排除逻辑。
// 排除判断基于目标的 URL 进行匹配。
//
// 参数:
// - targets: 原始目标列表
// - excluded: 需要排除的目标列表nil 值会被安全忽略)
//
// 返回值:
// - 包含健康且非排除目标的新切片
func filterHealthyAndExclude(targets []*Target, excluded []*Target) []*Target {
// 构建排除集合(使用 URL 作为键)
excludeSet := make(map[string]bool, len(excluded))
for _, t := range excluded {
if t != nil {
@ -300,14 +392,24 @@ func filterHealthyAndExclude(targets []*Target, excluded []*Target) []*Target {
}
}
// 过滤健康且不在排除列表中的目标
available := make([]*Target, 0, len(targets))
var backups []*Target
for _, t := range targets {
if t.Healthy.Load() && !excludeSet[t.URL] {
if !t.IsAvailable() || excludeSet[t.URL] {
continue
}
if t.IsBackup() {
backups = append(backups, t)
} else {
available = append(available, t)
}
}
return available
if len(available) > 0 {
return available
}
return backups
}
// SelectExcluding 根据轮询策略选择一个目标,排除指定的目标列表。
@ -367,9 +469,8 @@ func (w *WeightedRoundRobin) SelectExcluding(targets []*Target, excluded []*Targ
}
// SelectExcluding 选择连接数最少的目标,排除指定的目标列表。
// 只考虑健康且不在排除列表中的目标。
// 优先选择非备份目标,仅当无可用非备份目标时选择备份目标。
func (l *LeastConnections) SelectExcluding(targets []*Target, excluded []*Target) *Target {
// 构建排除集合
excludeSet := make(map[string]bool, len(excluded))
for _, t := range excluded {
if t != nil {
@ -378,23 +479,34 @@ func (l *LeastConnections) SelectExcluding(targets []*Target, excluded []*Target
}
var selected *Target
var selectedBackup *Target
var minConns int64 = -1
var minBackupConns int64 = -1
for _, t := range targets {
if !t.Healthy.Load() || excludeSet[t.URL] {
if !t.IsAvailable() || excludeSet[t.URL] {
continue
}
// 原子地读取连接计数
conns := atomic.LoadInt64(&t.Connections)
if selected == nil || conns < minConns {
selected = t
minConns = conns
if t.IsBackup() {
if selectedBackup == nil || conns < minBackupConns {
selectedBackup = t
minBackupConns = conns
}
} else {
if selected == nil || conns < minConns {
selected = t
minConns = conns
}
}
}
return selected
if selected != nil {
return selected
}
return selectedBackup
}
// SelectExcluding 基于客户端 IP 的哈希值选择目标,排除指定的目标列表。
@ -485,14 +597,22 @@ func (t *Target) initHostname() {
}
// NewTargetFromConfig 从配置创建 Target推荐入口
// 自动初始化 hostname 和 Healthy 状态
func NewTargetFromConfig(url string, weight int) *Target {
// 自动初始化 hostname 和 Healthy 状态,设置上游参数
func NewTargetFromConfig(url string, weight int, maxConns int64, maxFails int64, failTimeout time.Duration, backup bool, down bool, proxyURI string) *Target {
t := &Target{
URL: url,
Weight: weight,
URL: url,
Weight: weight,
MaxConns: maxConns,
MaxFails: maxFails,
FailTimeout: failTimeout,
Backup: backup,
Down: down,
ProxyURI: proxyURI,
}
t.initHostname()
t.Healthy.Store(true)
if !down {
t.Healthy.Store(true)
}
return t
}

View File

@ -0,0 +1,72 @@
package loadbalance
import (
"math/rand/v2"
"sync/atomic"
)
// Random 实现随机负载均衡Power of Two Choices 算法)。
//
// 随机选择两个候选目标,从中选择连接数较少的那个。
// 相比纯随机Power of Two Choices 能更好地均衡负载,
// 同时保持 O(1) 的选择复杂度。
//
// 当只有一个候选目标时直接返回;当 MaxConns 限制生效时
// 自动跳过已满的目标。
type Random struct{}
// NewRandom 创建一个新的随机负载均衡器。
func NewRandom() *Random {
return &Random{}
}
// Select 使用 Power of Two Choices 算法选择目标。
// 随机选择两个候选,返回连接数较少的那个。
// 只考虑可用目标。如果没有可用目标则返回 nil。
func (r *Random) Select(targets []*Target) *Target {
available := filterHealthy(targets)
if len(available) == 0 {
return nil
}
if len(available) == 1 {
return available[0]
}
// Power of Two Choices
i := rand.IntN(len(available))
j := rand.IntN(len(available) - 1)
if j >= i {
j++
}
a, b := available[i], available[j]
if atomic.LoadInt64(&a.Connections) <= atomic.LoadInt64(&b.Connections) {
return a
}
return b
}
// SelectExcluding 使用 Power of Two Choices 算法选择目标,排除指定的目标列表。
func (r *Random) SelectExcluding(targets []*Target, excluded []*Target) *Target {
available := filterHealthyAndExclude(targets, excluded)
if len(available) == 0 {
return nil
}
if len(available) == 1 {
return available[0]
}
i := rand.IntN(len(available))
j := rand.IntN(len(available) - 1)
if j >= i {
j++
}
a, b := available[i], available[j]
if atomic.LoadInt64(&a.Connections) <= atomic.LoadInt64(&b.Connections) {
return a
}
return b
}