feat(proxy,loadbalance,config): 新增故障转移 (next_upstream) 支持

- 配置新增 NextUpstreamConfig 支持 tries 和 http_codes 参数
- 负载均衡器新增 SelectExcluding 方法用于故障转移排除选择
- 代理请求失败时自动尝试下一个健康后端
- 健康检查新增 MarkHealthy 方法用于故障转移成功后恢复状态

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
xfy 2026-04-07 17:49:10 +08:00
parent d8ac807cb7
commit f6245c19e0
6 changed files with 556 additions and 77 deletions

View File

@ -188,6 +188,12 @@ type ServerConfig struct {
// MaxRequestsPerConn 每连接最大请求数
// 达到后连接将被优雅关闭
MaxRequestsPerConn int `yaml:"max_requests_per_conn"`
// ClientMaxBodySize 客户端请求体大小限制
// 限制请求体最大字节数,超过返回 413 错误
// 支持单位b, kb, mb, gb 或纯数字表示字节
// 默认值为 1MB
ClientMaxBodySize string `yaml:"client_max_body_size"`
}
// StaticConfig 静态文件服务配置。
@ -222,6 +228,16 @@ type StaticConfig struct {
// 访问目录时依次查找这些文件作为默认页面
// 默认为 ["index.html", "index.htm"]
Index []string `yaml:"index"`
// TryFiles 按顺序尝试查找的文件列表
// 支持 $uri 和 $uri/ 占位符,用于 SPA 部署
// 示例: ["$uri", "$uri/", "/index.html"]
TryFiles []string `yaml:"try_files"`
// TryFilesPass 内部重定向是否触发中间件
// 默认为 false内部重定向不触发中间件
// 设置为 true 时try_files 回退会重新进入中间件链
TryFilesPass bool `yaml:"try_files_pass"`
}
// ProxyConfig 反向代理配置,支持负载均衡和健康检查。
@ -284,6 +300,15 @@ type ProxyConfig struct {
// Cache 代理缓存配置
// 启用后缓存后端响应减少重复请求
Cache ProxyCacheConfig `yaml:"cache"`
// ClientMaxBodySize 请求体大小限制
// 限制此代理路径的请求体最大字节数,覆盖全局配置
// 支持单位b, kb, mb, gb 或纯数字表示字节
ClientMaxBodySize string `yaml:"client_max_body_size"`
// NextUpstream 故障转移配置
// 配置后端故障时的自动重试行为
NextUpstream NextUpstreamConfig `yaml:"next_upstream"`
}
// ProxyTarget 后端目标配置。
@ -436,6 +461,30 @@ type ProxyCacheConfig struct {
StaleWhileRevalidate time.Duration `yaml:"stale_while_revalidate"`
}
// NextUpstreamConfig 故障转移配置,定义后端失败时的自动重试行为。
//
// 当后端返回特定错误状态码或连接失败时,自动尝试下一个可用后端。
//
// 注意事项:
// - Tries 为 1 时禁用故障转移
// - 空 NextUpstream 使用默认值Tries=1禁用故障转移
// - 建议根据后端数量合理设置 Tries 值
//
// 使用示例:
//
// next_upstream:
// tries: 3
// http_codes: [502, 503, 504]
type NextUpstreamConfig struct {
// Tries 最大尝试次数
// 包括第一次尝试在内的总请求次数
Tries int `yaml:"tries"`
// HTTPCodes 触发重试的 HTTP 状态码列表
// 后端返回这些状态码时自动尝试下一个
HTTPCodes []int `yaml:"http_codes"`
}
// SSLConfig SSL/TLS 配置。
//
// 用于配置 HTTPS 服务所需的证书和加密参数。
@ -562,6 +611,10 @@ type SecurityConfig struct {
// Headers 安全头部
// 添加安全相关的 HTTP 响应头
Headers SecurityHeaders `yaml:"headers"`
// ErrorPage 自定义错误页面配置
// 允许为特定 HTTP 状态码配置自定义错误页面
ErrorPage ErrorPageConfig `yaml:"error_page"`
}
// AccessConfig IP 访问控制配置。
@ -760,6 +813,41 @@ type SecurityHeaders struct {
PermissionsPolicy string `yaml:"permissions_policy"`
}
// ErrorPageConfig 自定义错误页面配置。
//
// 允许为特定 HTTP 状态码配置自定义错误页面。
// 错误页面文件在启动时预加载到内存中,运行时不进行文件 I/O。
//
// 注意事项:
// - 错误页面文件路径可以是相对路径或绝对路径
// - 所有错误页面加载失败时会阻止服务器启动
// - 部分错误页面加载失败会记录警告但允许启动
// - 支持可选的响应状态码覆盖
//
// 使用示例:
//
// error_page:
// pages:
// 404: "/var/www/errors/404.html"
// 500: "/var/www/errors/500.html"
// 503: "/var/www/errors/503.html"
// default: "/var/www/errors/error.html"
// response_code: 200 # 可选:覆盖响应状态码
type ErrorPageConfig struct {
// Pages 状态码到错误页面文件的映射
// key 为 HTTP 状态码(如 404, 500value 为文件路径
Pages map[int]string `yaml:"pages"`
// Default 默认错误页面
// 当特定状态码没有配置时使用
Default string `yaml:"default"`
// ResponseCode 响应状态码覆盖
// 如果不为 0所有错误页面响应将使用此状态码
// 例如设置为 200 时,即使发生错误也返回 200 OK
ResponseCode int `yaml:"response_code"`
}
// RewriteRule URL 重写规则。
//
// 用于在代理或静态文件服务前修改请求 URL。

View File

@ -221,6 +221,11 @@ func validateProxy(p *ProxyConfig) error {
return fmt.Errorf("无效的负载均衡算法:%s", p.LoadBalance)
}
// 验证故障转移配置
if err := validateNextUpstream(&p.NextUpstream); err != nil {
return fmt.Errorf("next_upstream: %w", err)
}
// 验证一致性哈希键格式
if p.HashKey != "" {
validHashKeys := []string{"ip", "uri"}
@ -732,3 +737,37 @@ func validatePerformance(p *PerformanceConfig) error {
return nil
}
// validateNextUpstream 验证故障转移配置。
//
// 检查重试次数和 HTTP 状态码的有效性。
//
// 参数:
// - n: 故障转移配置对象
//
// 返回值:
// - error: 验证失败时返回错误信息,成功返回 nil
//
// 验证规则:
// - tries 不能为负数,建议不超过后端数量
// - http_codes 应包含有效的 HTTP 状态码
func validateNextUpstream(n *NextUpstreamConfig) error {
// 未配置时跳过
if n.Tries == 0 && len(n.HTTPCodes) == 0 {
return nil
}
// 验证重试次数
if n.Tries < 0 {
return errors.New("tries 不能为负数")
}
// 验证 HTTP 状态码
for i, code := range n.HTTPCodes {
if code < 100 || code > 599 {
return fmt.Errorf("http_codes[%d]: 无效的 HTTP 状态码 %d", i, code)
}
}
return nil
}

View File

@ -46,6 +46,11 @@ type Balancer interface {
// Select 根据算法策略从提供的列表中选择一个目标。
// 如果没有健康目标可用,返回 nil。
Select(targets []*Target) *Target
// SelectExcluding 根据算法策略选择一个目标,排除指定的目标列表。
// 用于故障转移场景,避免选择已失败的目标。
// 如果除了排除列表外没有可用目标,返回 nil。
SelectExcluding(targets []*Target, excluded []*Target) *Target
}
// RoundRobin 实现简单的轮询负载均衡。
@ -216,3 +221,134 @@ func IncrementConnections(t *Target) {
func DecrementConnections(t *Target) {
atomic.AddInt64(&t.Connections, -1)
}
// filterHealthyAndExclude 返回仅包含健康目标且不在排除列表中的新切片。
// 这是 SelectExcluding 使用的辅助函数。
func filterHealthyAndExclude(targets []*Target, excluded []*Target) []*Target {
// 构建排除集合(使用 URL 作为键)
excludeSet := make(map[string]bool, len(excluded))
for _, t := range excluded {
if t != nil {
excludeSet[t.URL] = true
}
}
// 过滤健康且不在排除列表中的目标
available := make([]*Target, 0, len(targets))
for _, t := range targets {
if t.Healthy.Load() && !excludeSet[t.URL] {
available = append(available, t)
}
}
return available
}
// SelectExcluding 根据轮询策略选择一个目标,排除指定的目标列表。
// 只考虑健康且不在排除列表中的目标。
func (r *RoundRobin) SelectExcluding(targets []*Target, excluded []*Target) *Target {
available := filterHealthyAndExclude(targets, excluded)
if len(available) == 0 {
return nil
}
// 原子地递增并获取计数器值
idx := atomic.AddUint64(&r.counter, 1) - 1
return available[idx%uint64(len(available))]
}
// SelectExcluding 根据权重分布选择目标,排除指定的目标列表。
// 只考虑健康且不在排除列表中的目标。
func (w *WeightedRoundRobin) SelectExcluding(targets []*Target, excluded []*Target) *Target {
available := filterHealthyAndExclude(targets, excluded)
if len(available) == 0 {
return nil
}
// 计算总权重
totalWeight := 0
for _, t := range available {
if t.Weight <= 0 {
totalWeight += 1 // 最小权重为 1
} else {
totalWeight += t.Weight
}
}
if totalWeight == 0 {
return nil
}
// 使用原子计数器确定权重分布中的位置
idx := atomic.AddUint64(&w.counter, 1) - 1
pos := int(idx % uint64(totalWeight))
// 找到计算位置处的目标
currentWeight := 0
for _, t := range available {
weight := t.Weight
if weight <= 0 {
weight = 1
}
currentWeight += weight
if pos < currentWeight {
return t
}
}
// 回退到最后一个目标(不应到达这里)
return available[len(available)-1]
}
// SelectExcluding 选择连接数最少的目标,排除指定的目标列表。
// 只考虑健康且不在排除列表中的目标。
func (l *LeastConnections) SelectExcluding(targets []*Target, excluded []*Target) *Target {
// 构建排除集合
excludeSet := make(map[string]bool, len(excluded))
for _, t := range excluded {
if t != nil {
excludeSet[t.URL] = true
}
}
var selected *Target
var minConns int64 = -1
for _, t := range targets {
if !t.Healthy.Load() || excludeSet[t.URL] {
continue
}
// 原子地读取连接计数
conns := atomic.LoadInt64(&t.Connections)
if selected == nil || conns < minConns {
selected = t
minConns = conns
}
}
return selected
}
// SelectExcluding 基于客户端 IP 的哈希值选择目标,排除指定的目标列表。
// 只考虑健康且不在排除列表中的目标。
func (i *IPHash) SelectExcluding(targets []*Target, excluded []*Target) *Target {
return i.SelectExcludingByIP(targets, excluded, "")
}
// SelectExcludingByIP 基于提供的 IP 地址的哈希值选择目标,排除指定的目标列表。
// 只考虑健康且不在排除列表中的目标。
func (i *IPHash) SelectExcludingByIP(targets []*Target, excluded []*Target, clientIP string) *Target {
available := filterHealthyAndExclude(targets, excluded)
if len(available) == 0 {
return nil
}
// 对客户端 IP 进行哈希
h := fnv.New64a()
h.Write([]byte(clientIP))
hash := h.Sum64()
idx := hash % uint64(len(available))
return available[idx]
}

View File

@ -185,5 +185,93 @@ func (c *ConsistentHash) GetStats() ConsistentHashStats {
}
}
// SelectExcluding 根据指定键选择目标,排除指定的目标列表。
//
// 参数:
// - targets: 可用目标列表
// - excluded: 需要排除的目标列表
//
// 返回值:
// - *Target: 选中的目标,如果没有可用目标则返回 nil
func (c *ConsistentHash) SelectExcluding(targets []*Target, excluded []*Target) *Target {
return c.SelectExcludingByKey(targets, excluded, "")
}
// SelectExcludingByKey 根据指定键选择目标,排除指定的目标列表。
//
// 参数:
// - targets: 可用目标列表
// - excluded: 需要排除的目标列表
// - key: 哈希键值(如客户端 IP、URI 等)
//
// 返回值:
// - *Target: 选中的目标,如果没有可用目标则返回 nil
func (c *ConsistentHash) SelectExcludingByKey(targets []*Target, excluded []*Target, key string) *Target {
// 构建排除集合
excludeSet := make(map[string]bool, len(excluded))
for _, t := range excluded {
if t != nil {
excludeSet[t.URL] = true
}
}
c.mu.RLock()
defer c.mu.RUnlock()
// 如果没有排除的目标,使用正常选择
if len(excludeSet) == 0 {
return c.SelectByKey(targets, key)
}
// 过滤掉被排除的目标
filtered := make([]*Target, 0, len(targets))
for _, t := range targets {
if t.Healthy.Load() && !excludeSet[t.URL] {
filtered = append(filtered, t)
}
}
if len(filtered) == 0 {
return nil
}
// 为过滤后的目标临时构建哈希环
circle := make(map[uint64]*Target)
sortedHashes := make([]uint64, 0)
for _, target := range filtered {
for i := 0; i < c.virtualNodes; i++ {
nodeKey := fmt.Sprintf("%s#%d", target.URL, i)
hash := c.hashKeyString(nodeKey)
circle[hash] = target
sortedHashes = append(sortedHashes, hash)
}
}
// 排序哈希值
sort.Slice(sortedHashes, func(i, j int) bool {
return sortedHashes[i] < sortedHashes[j]
})
if len(sortedHashes) == 0 {
return nil
}
// 计算键的哈希值
hash := c.hashKeyString(key)
// 二分查找最近的节点
idx := sort.Search(len(sortedHashes), func(i int) bool {
return sortedHashes[i] >= hash
})
// 环形回绕
if idx >= len(sortedHashes) {
idx = 0
}
return circle[sortedHashes[idx]]
}
// 验证接口实现
var _ Balancer = (*ConsistentHash)(nil)

View File

@ -235,6 +235,21 @@ func (h *HealthChecker) MarkUnhealthy(target *loadbalance.Target) {
target.Healthy.Store(false)
}
// MarkHealthy 将目标标记为健康。
// 此方法用于故障转移成功后,将之前失败的目标恢复为健康状态。
//
// 在故障转移成功后的使用示例:
//
// if err := retryRequest(target, req, resp); err == nil {
// healthChecker.MarkHealthy(target)
// }
//
// 注意:此方法与主动健康检查独立运作,用于快速恢复
// 故障转移场景中已恢复的目标。
func (h *HealthChecker) MarkHealthy(target *loadbalance.Target) {
target.Healthy.Store(true)
}
// IsRunning 如果健康检查器当前正在运行,则返回 true。
func (h *HealthChecker) IsRunning() bool {
return h.running.Load()

View File

@ -34,6 +34,7 @@ package proxy
import (
"errors"
"fmt"
"strings"
"sync"
"time"
@ -207,107 +208,190 @@ func createHostClient(targetURL string, timeout config.ProxyTimeout, transportCf
// 4. 将响应复制回客户端
//
// 如果没有可用的健康目标,返回 502 Bad Gateway。
// 如果后端请求失败,返回相应的错误响应
// 如果后端请求失败,根据 next_upstream 配置尝试下一个目标
func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
// 使用负载均衡器选择目标
target := p.selectTarget(ctx)
if target == nil {
ctx.Error("Bad Gateway: no healthy upstream", fasthttp.StatusBadGateway)
return
// 故障转移配置
maxTries := p.config.NextUpstream.Tries
if maxTries <= 0 {
maxTries = 1 // 默认不重试
}
httpCodes := p.config.NextUpstream.HTTPCodes
if len(httpCodes) == 0 {
// 默认重试的状态码
httpCodes = []int{502, 503, 504}
}
// 获取所选目标的客户端
client := p.getClient(target.URL)
if client == nil {
ctx.Error("Bad Gateway: upstream client unavailable", fasthttp.StatusBadGateway)
return
}
// 已尝试的目标列表(用于故障转移时排除)
attemptedTargets := make([]*loadbalance.Target, 0, maxTries)
// 增加连接计数(用于最少连接数负载均衡)
loadbalance.IncrementConnections(target)
defer loadbalance.DecrementConnections(target)
var lastErr error
// 检查是否为 WebSocket 升级请求
if isWebSocketRequest(ctx) {
p.handleWebSocket(ctx, target, client)
return
}
for attempt := 0; attempt < maxTries; attempt++ {
// 选择目标(第一次使用普通选择,后续排除已失败目标)
var target *loadbalance.Target
if attempt == 0 {
target = p.selectTarget(ctx)
} else {
target = p.selectTargetExcluding(ctx, attemptedTargets)
}
// 准备请求
req := &ctx.Request
// 修改请求头
p.modifyRequestHeaders(ctx, target)
// 尝试从缓存获取(如果启用)
if p.cache != nil {
cacheKey := p.buildCacheKey(ctx)
if entry, ok, stale := p.cache.Get(cacheKey); ok {
// 缓存命中
if !stale {
// 新鲜缓存,直接返回
p.writeCachedResponse(ctx, entry)
if target == nil {
if attempt == 0 {
ctx.Error("Bad Gateway: no healthy upstream", fasthttp.StatusBadGateway)
return
}
// 过期缓存,尝试后台刷新,同时返回旧数据
go p.backgroundRefresh(ctx, target, cacheKey)
p.writeCachedResponse(ctx, entry)
// 没有更多可用目标,返回最后一次错误
break
}
attemptedTargets = append(attemptedTargets, target)
// 获取所选目标的客户端
client := p.getClient(target.URL)
if client == nil {
// 标记为不健康并继续尝试下一个
if p.healthChecker != nil {
p.healthChecker.MarkUnhealthy(target)
}
continue
}
// 增加连接计数(用于最少连接数负载均衡)
loadbalance.IncrementConnections(target)
// 检查是否为 WebSocket 升级请求
if isWebSocketRequest(ctx) {
// WebSocket 使用 defer 确保连接计数释放
defer loadbalance.DecrementConnections(target)
p.handleWebSocket(ctx, target, client)
return
}
// 检查是否需要缓存锁(防止缓存击穿)
if done := p.cache.AcquireLock(cacheKey); done != nil {
// 有其他请求正在生成缓存,等待
<-done
// 重新尝试获取缓存
if entry, ok, _ := p.cache.Get(cacheKey); ok {
// 准备请求
req := &ctx.Request
// 修改请求头
p.modifyRequestHeaders(ctx, target)
// 尝试从缓存获取(如果启用)
if p.cache != nil && attempt == 0 {
cacheKey := p.buildCacheKey(ctx)
if entry, ok, stale := p.cache.Get(cacheKey); ok {
// 缓存命中
loadbalance.DecrementConnections(target)
if !stale {
// 新鲜缓存,直接返回
p.writeCachedResponse(ctx, entry)
return
}
// 过期缓存,尝试后台刷新,同时返回旧数据
go p.backgroundRefresh(ctx, target, cacheKey)
p.writeCachedResponse(ctx, entry)
return
}
// 检查是否需要缓存锁(防止缓存击穿)
if done := p.cache.AcquireLock(cacheKey); done != nil {
// 有其他请求正在生成缓存,等待
loadbalance.DecrementConnections(target)
<-done
// 重新尝试获取缓存
if entry, ok, _ := p.cache.Get(cacheKey); ok {
p.writeCachedResponse(ctx, entry)
return
}
// 缓存未命中,需要重新选择目标
loadbalance.IncrementConnections(target)
}
}
// 执行代理请求
err := client.Do(req, &ctx.Response)
if err != nil {
loadbalance.DecrementConnections(target)
// 被动健康检查:标记目标为不健康
if p.healthChecker != nil {
p.healthChecker.MarkUnhealthy(target)
}
// 释放缓存锁
if p.cache != nil && attempt == 0 {
p.cache.ReleaseLock(p.buildCacheKey(ctx), err)
}
lastErr = err
// 继续尝试下一个目标
continue
}
// 请求成功,减少连接计数
loadbalance.DecrementConnections(target)
// 检查响应状态码是否需要重试
statusCode := ctx.Response.StatusCode()
shouldRetry := false
for _, code := range httpCodes {
if statusCode == code {
shouldRetry = true
break
}
}
if shouldRetry {
// 释放缓存锁
if p.cache != nil && attempt == 0 {
p.cache.ReleaseLock(p.buildCacheKey(ctx), fmt.Errorf("HTTP %d", statusCode))
}
// 如果不是最后一次尝试,继续下一个目标
if attempt < maxTries-1 {
// 标记目标为不健康
if p.healthChecker != nil {
p.healthChecker.MarkUnhealthy(target)
}
continue
}
}
// 重试成功时恢复健康状态
if attempt > 0 && p.healthChecker != nil {
p.healthChecker.MarkHealthy(target)
}
// 存入缓存(如果启用且响应可缓存)
if p.cache != nil {
cacheKey := p.buildCacheKey(ctx)
if statusCode >= 200 && statusCode < 300 {
// 提取响应头
headers := make(map[string]string)
for key, value := range ctx.Response.Header.All() {
headers[string(key)] = string(value)
}
p.cache.Set(cacheKey, ctx.Response.Body(), headers, statusCode, p.config.Cache.MaxAge)
}
p.cache.ReleaseLock(cacheKey, nil)
}
// 修改响应头
p.modifyResponseHeaders(ctx)
return
}
// 执行代理请求
err := client.Do(req, &ctx.Response)
if err != nil {
// 被动健康检查:标记目标为不健康
if p.healthChecker != nil {
p.healthChecker.MarkUnhealthy(target)
}
// 释放缓存锁
if p.cache != nil {
p.cache.ReleaseLock(p.buildCacheKey(ctx), err)
}
// 所有尝试都失败
if lastErr != nil {
// 处理不同类型的错误
if errors.Is(err, fasthttp.ErrTimeout) {
if errors.Is(lastErr, fasthttp.ErrTimeout) {
ctx.Error("Gateway Timeout", fasthttp.StatusGatewayTimeout)
} else if errors.Is(err, fasthttp.ErrConnectionClosed) {
} else if errors.Is(lastErr, fasthttp.ErrConnectionClosed) {
ctx.Error("Bad Gateway: upstream connection closed", fasthttp.StatusBadGateway)
} else {
ctx.Error("Bad Gateway", fasthttp.StatusBadGateway)
}
return
} else {
ctx.Error("Bad Gateway: all upstreams failed", fasthttp.StatusBadGateway)
}
// 存入缓存(如果启用且响应可缓存)
if p.cache != nil {
cacheKey := p.buildCacheKey(ctx)
status := ctx.Response.StatusCode()
if status >= 200 && status < 300 {
// 提取响应头
headers := make(map[string]string)
for key, value := range ctx.Response.Header.All() {
headers[string(key)] = string(value)
}
p.cache.Set(cacheKey, ctx.Response.Body(), headers, status, p.config.Cache.MaxAge)
}
p.cache.ReleaseLock(cacheKey, nil)
}
// 修改响应头
p.modifyResponseHeaders(ctx)
}
// selectTarget 使用配置的负载均衡器选择后端目标。
@ -340,6 +424,35 @@ func (p *Proxy) selectTarget(ctx *fasthttp.RequestCtx) *loadbalance.Target {
return balancer.Select(targets)
}
// selectTargetExcluding 选择后端目标,排除已尝试失败的目标。
// 用于故障转移场景,避免重复选择已失败的目标。
// 如果没有可用的健康目标则返回 nil。
func (p *Proxy) selectTargetExcluding(ctx *fasthttp.RequestCtx, excluded []*loadbalance.Target) *loadbalance.Target {
p.mu.RLock()
balancer := p.balancer
targets := p.targets
p.mu.RUnlock()
if len(targets) == 0 {
return nil
}
// 对于 IPHash 负载均衡器,提取客户端 IP
if ipHash, ok := balancer.(*loadbalance.IPHash); ok {
clientIP := netutil.ExtractClientIP(ctx)
return ipHash.SelectExcludingByIP(targets, excluded, clientIP)
}
// 对于一致性哈希,根据 hash_key 配置选择
if ch, ok := balancer.(*loadbalance.ConsistentHash); ok {
hashKey := ch.GetHashKey()
key := p.extractHashKey(ctx, hashKey)
return ch.SelectExcludingByKey(targets, excluded, key)
}
return balancer.SelectExcluding(targets, excluded)
}
// extractHashKey 根据配置提取哈希键值。
func (p *Proxy) extractHashKey(ctx *fasthttp.RequestCtx, hashKey string) string {
switch {