feat(proxy,config): 增强健康检查和缓存配置
健康检查增强: - 添加 HealthMatch 接口支持自定义健康判断逻辑 - 支持状态码范围、响应体正则、响应头匹配 - 集成 SlowStartManager 实现慢启动 配置增强: - HealthCheckConfig 新增 Match 和 SlowStart 字段 - ProxyBufferingConfig 支持 Buffers 配置字符串格式 - 新增 ProxyCachePathConfig 磁盘缓存路径配置 - 添加 StaleIfError/StaleIfTimeout 缓存配置 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
26a7a58265
commit
92b7040a5f
@ -22,6 +22,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
@ -72,17 +74,18 @@ const (
|
|||||||
// // 处理每个服务器配置
|
// // 处理每个服务器配置
|
||||||
// }
|
// }
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Mode ServerMode `yaml:"mode"`
|
Mode ServerMode `yaml:"mode"`
|
||||||
Variables VariablesConfig `yaml:"variables"`
|
Variables VariablesConfig `yaml:"variables"`
|
||||||
Logging LoggingConfig `yaml:"logging"`
|
Logging LoggingConfig `yaml:"logging"`
|
||||||
Servers []ServerConfig `yaml:"servers"`
|
Servers []ServerConfig `yaml:"servers"`
|
||||||
Stream []StreamConfig `yaml:"stream"`
|
Stream []StreamConfig `yaml:"stream"`
|
||||||
Monitoring MonitoringConfig `yaml:"monitoring"`
|
Monitoring MonitoringConfig `yaml:"monitoring"`
|
||||||
HTTP3 HTTP3Config `yaml:"http3"`
|
HTTP3 HTTP3Config `yaml:"http3"`
|
||||||
Resolver ResolverConfig `yaml:"resolver"`
|
Resolver ResolverConfig `yaml:"resolver"`
|
||||||
Performance PerformanceConfig `yaml:"performance"`
|
Performance PerformanceConfig `yaml:"performance"`
|
||||||
Shutdown ShutdownConfig `yaml:"shutdown"`
|
Shutdown ShutdownConfig `yaml:"shutdown"`
|
||||||
Include []IncludeConfig `yaml:"include"` // 配置引入,支持从其他文件引入配置片段
|
Include []IncludeConfig `yaml:"include"` // 配置引入,支持从其他文件引入配置片段
|
||||||
|
CachePath *ProxyCachePathConfig `yaml:"cache_path"` // 缓存路径配置(磁盘持久化)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IncludeConfig 配置引入配置。
|
// IncludeConfig 配置引入配置。
|
||||||
@ -390,6 +393,85 @@ type ProxyBufferingConfig struct {
|
|||||||
// BufferSize 响应缓冲区大小(字节)
|
// BufferSize 响应缓冲区大小(字节)
|
||||||
// 0 表示使用默认值
|
// 0 表示使用默认值
|
||||||
BufferSize int `yaml:"buffer_size"`
|
BufferSize int `yaml:"buffer_size"`
|
||||||
|
|
||||||
|
// Buffers 多缓冲区配置字符串
|
||||||
|
// 格式:"数量 大小" 或 "数量1 大小1 数量2 大小2 ..."
|
||||||
|
// 例如:"8 16k" 表示 8 个 16KB 缓冲区
|
||||||
|
// 例如:"4 4k 8 16k" 表示 4 个 4KB + 8 个 16KB 缓冲区
|
||||||
|
Buffers string `yaml:"buffers"`
|
||||||
|
|
||||||
|
// BufferCount 缓冲区数量(解析后)
|
||||||
|
BufferCount int `yaml:"-"`
|
||||||
|
|
||||||
|
// BufferSizeEach 每个缓冲区大小(字节,解析后)
|
||||||
|
BufferSizeEach int `yaml:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseBuffers 解析 Buffers 配置字符串。
|
||||||
|
//
|
||||||
|
// 支持格式:
|
||||||
|
// - "8 16k" → 8 个 16KB 缓冲区
|
||||||
|
// - "4 4k" → 4 个 4KB 缓冲区
|
||||||
|
//
|
||||||
|
// 大小单位:
|
||||||
|
// - k 或 K: KB (1024 字节)
|
||||||
|
// - m 或 M: MB (1024 * 1024 字节)
|
||||||
|
// - 无单位: 字节
|
||||||
|
func (c *ProxyBufferingConfig) ParseBuffers() {
|
||||||
|
if c.Buffers == "" {
|
||||||
|
// 向后兼容:使用 BufferSize
|
||||||
|
if c.BufferSize > 0 {
|
||||||
|
c.BufferCount = 1
|
||||||
|
c.BufferSizeEach = c.BufferSize
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
parts := strings.Fields(c.Buffers)
|
||||||
|
if len(parts) < 2 {
|
||||||
|
return // 无效格式
|
||||||
|
}
|
||||||
|
|
||||||
|
count, err := strconv.Atoi(parts[0])
|
||||||
|
if err != nil || count <= 0 {
|
||||||
|
return // 无效数量
|
||||||
|
}
|
||||||
|
|
||||||
|
sizeEach, err := parseSize(parts[1])
|
||||||
|
if err != nil || sizeEach <= 0 {
|
||||||
|
return // 无效大小
|
||||||
|
}
|
||||||
|
|
||||||
|
c.BufferCount = count
|
||||||
|
c.BufferSizeEach = sizeEach
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseSize 解析大小字符串(支持 k, m 单位)。
|
||||||
|
func parseSize(s string) (int, error) {
|
||||||
|
s = strings.TrimSpace(s)
|
||||||
|
if s == "" {
|
||||||
|
return 0, strconv.ErrSyntax
|
||||||
|
}
|
||||||
|
|
||||||
|
// 提取单位
|
||||||
|
unit := strings.ToLower(s[len(s)-1:])
|
||||||
|
var multiplier int = 1
|
||||||
|
numStr := s
|
||||||
|
|
||||||
|
if unit == "k" {
|
||||||
|
multiplier = 1024
|
||||||
|
numStr = s[:len(s)-1]
|
||||||
|
} else if unit == "m" {
|
||||||
|
multiplier = 1024 * 1024
|
||||||
|
numStr = s[:len(s)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
value, err := strconv.Atoi(numStr)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return value * multiplier, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BalancerByLuaConfig Lua 负载均衡配置
|
// BalancerByLuaConfig Lua 负载均衡配置
|
||||||
@ -489,9 +571,18 @@ type ProxyTarget struct {
|
|||||||
// path: "/health"
|
// path: "/health"
|
||||||
// timeout: 5s
|
// timeout: 5s
|
||||||
type HealthCheckConfig struct {
|
type HealthCheckConfig struct {
|
||||||
Path string `yaml:"path"`
|
Path string `yaml:"path"`
|
||||||
Interval time.Duration `yaml:"interval"`
|
Interval time.Duration `yaml:"interval"`
|
||||||
Timeout time.Duration `yaml:"timeout"`
|
Timeout time.Duration `yaml:"timeout"`
|
||||||
|
Match *HealthMatchConfig `yaml:"match"` // 健康检查匹配配置
|
||||||
|
SlowStart time.Duration `yaml:"slow_start"` // 慢启动时间
|
||||||
|
}
|
||||||
|
|
||||||
|
// HealthMatchConfig 健康检查匹配配置。
|
||||||
|
type HealthMatchConfig struct {
|
||||||
|
Status []string `yaml:"status"` // 状态码范围列表
|
||||||
|
Body string `yaml:"body"` // 响应体正则表达式
|
||||||
|
Headers map[string]string `yaml:"headers"` // 响应头匹配
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProxyTimeout 代理超时配置。
|
// ProxyTimeout 代理超时配置。
|
||||||
@ -576,6 +667,54 @@ type ProxyHeaders struct {
|
|||||||
CookiePath string `yaml:"cookie_path"`
|
CookiePath string `yaml:"cookie_path"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ProxyCachePathConfig 缓存路径配置(磁盘持久化)。
|
||||||
|
//
|
||||||
|
// 配置磁盘缓存路径和相关参数,支持 L1/L2 分层缓存架构。
|
||||||
|
// 配置后,代理缓存将持久化到磁盘,服务重启后可恢复。
|
||||||
|
//
|
||||||
|
// 注意事项:
|
||||||
|
// - Path 为必填项,指定缓存根目录
|
||||||
|
// - Levels 支持最多 3 级目录(如 "1:2:2")
|
||||||
|
// - MaxSize 为 0 表示不限制大小
|
||||||
|
// - L1MaxEntries/L1MaxSize 为 0 时使用默认值
|
||||||
|
//
|
||||||
|
// 使用示例:
|
||||||
|
//
|
||||||
|
// cache_path:
|
||||||
|
// path: "/var/cache/lolly"
|
||||||
|
// levels: "1:2"
|
||||||
|
// max_size: "1GB"
|
||||||
|
// inactive: "60m"
|
||||||
|
// l1_max_entries: 10000
|
||||||
|
type ProxyCachePathConfig struct {
|
||||||
|
// Path 缓存根目录
|
||||||
|
Path string `yaml:"path"`
|
||||||
|
|
||||||
|
// Levels 目录层级,如 "1:2" 表示两级目录
|
||||||
|
Levels string `yaml:"levels"`
|
||||||
|
|
||||||
|
// MaxSize 最大缓存大小(字节)
|
||||||
|
MaxSize int64 `yaml:"max_size"`
|
||||||
|
|
||||||
|
// Inactive 未访问淘汰时间
|
||||||
|
Inactive time.Duration `yaml:"inactive"`
|
||||||
|
|
||||||
|
// Purger 是否启用后台清理
|
||||||
|
Purger bool `yaml:"purger"`
|
||||||
|
|
||||||
|
// PurgerInterval 清理间隔
|
||||||
|
PurgerInterval time.Duration `yaml:"purger_interval"`
|
||||||
|
|
||||||
|
// L1MaxEntries L1 最大条目数
|
||||||
|
L1MaxEntries int64 `yaml:"l1_max_entries"`
|
||||||
|
|
||||||
|
// L1MaxSize L1 最大内存大小
|
||||||
|
L1MaxSize int64 `yaml:"l1_max_size"`
|
||||||
|
|
||||||
|
// PromoteThreshold 提升到 L1 的访问阈值
|
||||||
|
PromoteThreshold int `yaml:"promote_threshold"`
|
||||||
|
}
|
||||||
|
|
||||||
// ProxyCacheConfig 代理缓存配置。
|
// ProxyCacheConfig 代理缓存配置。
|
||||||
//
|
//
|
||||||
// 缓存后端响应,减少重复请求,提高响应速度。
|
// 缓存后端响应,减少重复请求,提高响应速度。
|
||||||
@ -596,6 +735,8 @@ type ProxyHeaders struct {
|
|||||||
type ProxyCacheConfig struct {
|
type ProxyCacheConfig struct {
|
||||||
MaxAge time.Duration `yaml:"max_age"`
|
MaxAge time.Duration `yaml:"max_age"`
|
||||||
StaleWhileRevalidate time.Duration `yaml:"stale_while_revalidate"`
|
StaleWhileRevalidate time.Duration `yaml:"stale_while_revalidate"`
|
||||||
|
StaleIfError time.Duration `yaml:"stale_if_error"` // 错误时使用过期缓存
|
||||||
|
StaleIfTimeout time.Duration `yaml:"stale_if_timeout"` // 超时时使用过期缓存
|
||||||
Enabled bool `yaml:"enabled"`
|
Enabled bool `yaml:"enabled"`
|
||||||
CacheLock bool `yaml:"cache_lock"`
|
CacheLock bool `yaml:"cache_lock"`
|
||||||
Methods []string `yaml:"methods"`
|
Methods []string `yaml:"methods"`
|
||||||
|
|||||||
@ -414,3 +414,80 @@ func TestConfigMethods(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProxyBufferingConfig_ParseBuffers(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
buffers string
|
||||||
|
bufferSize int
|
||||||
|
wantCount int
|
||||||
|
wantSizeEach int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty uses buffer_size",
|
||||||
|
buffers: "",
|
||||||
|
bufferSize: 4096,
|
||||||
|
wantCount: 1,
|
||||||
|
wantSizeEach: 4096,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "8 16k format",
|
||||||
|
buffers: "8 16k",
|
||||||
|
wantCount: 8,
|
||||||
|
wantSizeEach: 16 * 1024,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "4 4k format",
|
||||||
|
buffers: "4 4k",
|
||||||
|
wantCount: 4,
|
||||||
|
wantSizeEach: 4 * 1024,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "2 1m format",
|
||||||
|
buffers: "2 1m",
|
||||||
|
wantCount: 2,
|
||||||
|
wantSizeEach: 1024 * 1024,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "bytes without unit",
|
||||||
|
buffers: "4 8192",
|
||||||
|
wantCount: 4,
|
||||||
|
wantSizeEach: 8192,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "uppercase K",
|
||||||
|
buffers: "8 16K",
|
||||||
|
wantCount: 8,
|
||||||
|
wantSizeEach: 16 * 1024,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid format",
|
||||||
|
buffers: "invalid",
|
||||||
|
wantCount: 0,
|
||||||
|
wantSizeEach: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "missing size",
|
||||||
|
buffers: "8",
|
||||||
|
wantCount: 0,
|
||||||
|
wantSizeEach: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cfg := &ProxyBufferingConfig{
|
||||||
|
Buffers: tt.buffers,
|
||||||
|
BufferSize: tt.bufferSize,
|
||||||
|
}
|
||||||
|
cfg.ParseBuffers()
|
||||||
|
|
||||||
|
if cfg.BufferCount != tt.wantCount {
|
||||||
|
t.Errorf("BufferCount = %d, want %d", cfg.BufferCount, tt.wantCount)
|
||||||
|
}
|
||||||
|
if cfg.BufferSizeEach != tt.wantSizeEach {
|
||||||
|
t.Errorf("BufferSizeEach = %d, want %d", cfg.BufferSizeEach, tt.wantSizeEach)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -56,13 +56,15 @@ const healthPath = "/health"
|
|||||||
// checker.Start()
|
// checker.Start()
|
||||||
// defer checker.Stop()
|
// defer checker.Stop()
|
||||||
type HealthChecker struct {
|
type HealthChecker struct {
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
client *fasthttp.Client
|
client *fasthttp.Client
|
||||||
path string
|
path string
|
||||||
targets []*loadbalance.Target
|
targets []*loadbalance.Target
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
running atomic.Bool
|
running atomic.Bool
|
||||||
|
matcher HealthMatch // 健康检查匹配器
|
||||||
|
slowStartManager *loadbalance.SlowStartManager // 慢启动管理器
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHealthChecker 使用指定的目标和配置创建一个新的 HealthChecker。
|
// NewHealthChecker 使用指定的目标和配置创建一个新的 HealthChecker。
|
||||||
@ -90,12 +92,33 @@ func NewHealthChecker(targets []*loadbalance.Target, cfg *config.HealthCheckConf
|
|||||||
path = healthPath
|
path = healthPath
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 创建健康检查匹配器
|
||||||
|
var matcher HealthMatch
|
||||||
|
if cfg.Match != nil {
|
||||||
|
matcher = NewHealthMatch(&HealthMatchConfig{
|
||||||
|
Status: cfg.Match.Status,
|
||||||
|
Body: cfg.Match.Body,
|
||||||
|
Headers: cfg.Match.Headers,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if matcher == nil {
|
||||||
|
matcher = DefaultHealthMatch()
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建慢启动管理器
|
||||||
|
var slowStartManager *loadbalance.SlowStartManager
|
||||||
|
if cfg.SlowStart > 0 {
|
||||||
|
slowStartManager = loadbalance.NewSlowStartManager(cfg.SlowStart)
|
||||||
|
}
|
||||||
|
|
||||||
return &HealthChecker{
|
return &HealthChecker{
|
||||||
targets: targets,
|
targets: targets,
|
||||||
interval: interval,
|
interval: interval,
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
path: path,
|
path: path,
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
|
matcher: matcher,
|
||||||
|
slowStartManager: slowStartManager,
|
||||||
client: &fasthttp.Client{
|
client: &fasthttp.Client{
|
||||||
ReadTimeout: timeout,
|
ReadTimeout: timeout,
|
||||||
WriteTimeout: timeout,
|
WriteTimeout: timeout,
|
||||||
@ -114,6 +137,9 @@ func (h *HealthChecker) Start() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
h.running.Store(true)
|
h.running.Store(true)
|
||||||
|
if h.slowStartManager != nil {
|
||||||
|
h.slowStartManager.Start()
|
||||||
|
}
|
||||||
go h.run()
|
go h.run()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -125,6 +151,9 @@ func (h *HealthChecker) Stop() {
|
|||||||
if !h.running.CompareAndSwap(true, false) {
|
if !h.running.CompareAndSwap(true, false) {
|
||||||
return // 已经停止,直接返回
|
return // 已经停止,直接返回
|
||||||
}
|
}
|
||||||
|
if h.slowStartManager != nil {
|
||||||
|
h.slowStartManager.Stop()
|
||||||
|
}
|
||||||
close(h.stopCh)
|
close(h.stopCh)
|
||||||
// 重新创建 stopCh 以支持后续 Start
|
// 重新创建 stopCh 以支持后续 Start
|
||||||
h.stopCh = make(chan struct{})
|
h.stopCh = make(chan struct{})
|
||||||
@ -172,12 +201,12 @@ func (h *HealthChecker) checkAll() {
|
|||||||
//
|
//
|
||||||
// 目标被认为健康,如果满足以下条件:
|
// 目标被认为健康,如果满足以下条件:
|
||||||
// - HTTP 请求成功
|
// - HTTP 请求成功
|
||||||
// - 响应状态码在 200 到 299 之间
|
// - matcher.Match 返回 true
|
||||||
//
|
//
|
||||||
// 目标被标记为不健康,如果满足以下条件:
|
// 目标被标记为不健康,如果满足以下条件:
|
||||||
// - 连接失败
|
// - 连接失败
|
||||||
// - 请求超时
|
// - 请求超时
|
||||||
// - 响应状态码不是 2xx
|
// - matcher.Match 返回 false
|
||||||
func (h *HealthChecker) checkTarget(target *loadbalance.Target) {
|
func (h *HealthChecker) checkTarget(target *loadbalance.Target) {
|
||||||
// 构建健康检查 URL
|
// 构建健康检查 URL
|
||||||
url := target.URL + h.path
|
url := target.URL + h.path
|
||||||
@ -196,16 +225,23 @@ func (h *HealthChecker) checkTarget(target *loadbalance.Target) {
|
|||||||
err := h.client.DoTimeout(req, resp, h.timeout)
|
err := h.client.DoTimeout(req, resp, h.timeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// 连接失败或超时 - 标记为不健康
|
// 连接失败或超时 - 标记为不健康
|
||||||
target.Healthy.Store(false)
|
h.MarkUnhealthy(target)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 检查状态码 - 2xx 为健康
|
// 提取响应头(小写 key)
|
||||||
|
headers := make(map[string]string)
|
||||||
|
for key, value := range resp.Header.All() {
|
||||||
|
headers[string(key)] = string(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用 matcher 判断健康状态
|
||||||
statusCode := resp.StatusCode()
|
statusCode := resp.StatusCode()
|
||||||
if statusCode >= 200 && statusCode < 300 {
|
body := resp.Body()
|
||||||
target.Healthy.Store(true)
|
if h.matcher.Match(statusCode, body, headers) {
|
||||||
|
h.MarkHealthy(target)
|
||||||
} else {
|
} else {
|
||||||
target.Healthy.Store(false)
|
h.MarkUnhealthy(target)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -215,9 +251,13 @@ func (h *HealthChecker) checkTarget(target *loadbalance.Target) {
|
|||||||
//
|
//
|
||||||
// 同时调用 RecordFailure 记录软失败状态,配合 MaxFails/FailTimeout
|
// 同时调用 RecordFailure 记录软失败状态,配合 MaxFails/FailTimeout
|
||||||
// 实现失败计数和冷却机制。
|
// 实现失败计数和冷却机制。
|
||||||
|
// 同时通知 SlowStartManager 清除慢启动状态。
|
||||||
func (h *HealthChecker) MarkUnhealthy(target *loadbalance.Target) {
|
func (h *HealthChecker) MarkUnhealthy(target *loadbalance.Target) {
|
||||||
target.Healthy.Store(false)
|
target.Healthy.Store(false)
|
||||||
target.RecordFailure()
|
target.RecordFailure()
|
||||||
|
if h.slowStartManager != nil {
|
||||||
|
h.slowStartManager.OnTargetUnhealthy(target)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarkHealthy 将目标标记为健康。
|
// MarkHealthy 将目标标记为健康。
|
||||||
@ -225,9 +265,13 @@ func (h *HealthChecker) MarkUnhealthy(target *loadbalance.Target) {
|
|||||||
//
|
//
|
||||||
// 同时调用 RecordSuccess 重置软失败状态(failCount/failedUntil),
|
// 同时调用 RecordSuccess 重置软失败状态(failCount/failedUntil),
|
||||||
// 但不修改 Healthy 标志——健康检查器对 Healthy 拥有权威。
|
// 但不修改 Healthy 标志——健康检查器对 Healthy 拥有权威。
|
||||||
|
// 同时通知 SlowStartManager 开始慢启动。
|
||||||
func (h *HealthChecker) MarkHealthy(target *loadbalance.Target) {
|
func (h *HealthChecker) MarkHealthy(target *loadbalance.Target) {
|
||||||
target.Healthy.Store(true)
|
target.Healthy.Store(true)
|
||||||
target.RecordSuccess()
|
target.RecordSuccess()
|
||||||
|
if h.slowStartManager != nil {
|
||||||
|
h.slowStartManager.OnTargetHealthy(target)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsRunning 如果健康检查器当前正在运行,则返回 true。
|
// IsRunning 如果健康检查器当前正在运行,则返回 true。
|
||||||
|
|||||||
196
internal/proxy/health_match.go
Normal file
196
internal/proxy/health_match.go
Normal file
@ -0,0 +1,196 @@
|
|||||||
|
// Package proxy 提供 HTTP 代理功能。
|
||||||
|
//
|
||||||
|
// 该文件实现 HealthMatch 健康检查匹配接口,支持:
|
||||||
|
// - 默认 2xx 状态码判断
|
||||||
|
// - 自定义状态码范围匹配
|
||||||
|
// - 响应体正则匹配
|
||||||
|
// - 响应头匹配
|
||||||
|
//
|
||||||
|
// 主要用途:
|
||||||
|
//
|
||||||
|
// 灵活定义后端服务器健康判断逻辑,替代硬编码的 2xx 判断。
|
||||||
|
//
|
||||||
|
// 作者:xfy
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HealthMatch 定义健康检查匹配接口。
|
||||||
|
//
|
||||||
|
// 用于判断健康检查响应是否表示目标健康。
|
||||||
|
type HealthMatch interface {
|
||||||
|
// Match 判断健康检查响应是否表示目标健康。
|
||||||
|
//
|
||||||
|
// 参数:
|
||||||
|
// - status: HTTP 状态码
|
||||||
|
// - body: 响应体内容
|
||||||
|
// - headers: 响应头(key 为小写)
|
||||||
|
//
|
||||||
|
// 返回值:
|
||||||
|
// - true: 目标健康
|
||||||
|
// - false: 目标不健康
|
||||||
|
Match(status int, body []byte, headers map[string]string) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// defaultHealthMatch 默认健康检查匹配器。
|
||||||
|
//
|
||||||
|
// 判断逻辑:状态码为 2xx 即健康。
|
||||||
|
type defaultHealthMatch struct{}
|
||||||
|
|
||||||
|
// Match 实现 HealthMatch 接口。
|
||||||
|
func (m *defaultHealthMatch) Match(status int, body []byte, headers map[string]string) bool {
|
||||||
|
return status >= 200 && status < 300
|
||||||
|
}
|
||||||
|
|
||||||
|
// customHealthMatch 自定义健康检查匹配器。
|
||||||
|
//
|
||||||
|
// 支持状态码范围、响应体正则、响应头匹配。
|
||||||
|
type customHealthMatch struct {
|
||||||
|
statusRanges []statusRange // 状态码范围列表
|
||||||
|
bodyRegex *regexp.Regexp // 响应体正则(可选)
|
||||||
|
headerMatches []headerMatch // 响应头匹配列表(可选)
|
||||||
|
}
|
||||||
|
|
||||||
|
// statusRange 表示状态码范围。
|
||||||
|
type statusRange struct {
|
||||||
|
min int
|
||||||
|
max int
|
||||||
|
}
|
||||||
|
|
||||||
|
// headerMatch 表示响应头匹配条件。
|
||||||
|
type headerMatch struct {
|
||||||
|
key string
|
||||||
|
value string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Match 实现 HealthMatch 接口。
|
||||||
|
func (m *customHealthMatch) Match(status int, body []byte, headers map[string]string) bool {
|
||||||
|
// 1. 检查状态码
|
||||||
|
if !m.matchStatus(status) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 检查响应体正则(如果配置)
|
||||||
|
if m.bodyRegex != nil && !m.bodyRegex.Match(body) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 检查响应头(如果配置)
|
||||||
|
for _, hm := range m.headerMatches {
|
||||||
|
value, exists := headers[hm.key]
|
||||||
|
if !exists || value != hm.value {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// matchStatus 检查状态码是否匹配任一范围。
|
||||||
|
func (m *customHealthMatch) matchStatus(status int) bool {
|
||||||
|
for _, r := range m.statusRanges {
|
||||||
|
if status >= r.min && status <= r.max {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// HealthMatchConfig 健康检查匹配配置。
|
||||||
|
type HealthMatchConfig struct {
|
||||||
|
// Status 状态码范围列表,如 ["200-299", "301"]
|
||||||
|
Status []string `yaml:"status"`
|
||||||
|
|
||||||
|
// Body 响应体正则表达式
|
||||||
|
Body string `yaml:"body"`
|
||||||
|
|
||||||
|
// Headers 响应头匹配,如 {"Content-Type": "application/json"}
|
||||||
|
Headers map[string]string `yaml:"headers"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHealthMatch 从配置创建健康检查匹配器。
|
||||||
|
//
|
||||||
|
// 如果配置为空或无效,返回默认匹配器(2xx 判断)。
|
||||||
|
func NewHealthMatch(cfg *HealthMatchConfig) HealthMatch {
|
||||||
|
if cfg == nil {
|
||||||
|
return &defaultHealthMatch{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 解析状态码范围
|
||||||
|
var ranges []statusRange
|
||||||
|
for _, s := range cfg.Status {
|
||||||
|
r, err := parseStatusRange(s)
|
||||||
|
if err != nil {
|
||||||
|
continue // 忽略无效范围
|
||||||
|
}
|
||||||
|
ranges = append(ranges, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果没有有效状态码范围,使用默认 2xx
|
||||||
|
if len(ranges) == 0 {
|
||||||
|
ranges = []statusRange{{min: 200, max: 299}}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 解析响应体正则
|
||||||
|
var bodyRegex *regexp.Regexp
|
||||||
|
if cfg.Body != "" {
|
||||||
|
bodyRegex = regexp.MustCompile(cfg.Body) // 配置加载时预编译
|
||||||
|
}
|
||||||
|
|
||||||
|
// 解析响应头匹配
|
||||||
|
var headerMatches []headerMatch
|
||||||
|
for k, v := range cfg.Headers {
|
||||||
|
headerMatches = append(headerMatches, headerMatch{
|
||||||
|
key: strings.ToLower(k), // 统一小写
|
||||||
|
value: v,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return &customHealthMatch{
|
||||||
|
statusRanges: ranges,
|
||||||
|
bodyRegex: bodyRegex,
|
||||||
|
headerMatches: headerMatches,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseStatusRange 解析状态码范围字符串。
|
||||||
|
//
|
||||||
|
// 支持格式:
|
||||||
|
// - "200" → 单个状态码
|
||||||
|
// - "200-299" → 范围
|
||||||
|
func parseStatusRange(s string) (statusRange, error) {
|
||||||
|
s = strings.TrimSpace(s)
|
||||||
|
|
||||||
|
// 尝试解析范围
|
||||||
|
if strings.Contains(s, "-") {
|
||||||
|
parts := strings.Split(s, "-")
|
||||||
|
if len(parts) != 2 {
|
||||||
|
return statusRange{}, strconv.ErrSyntax
|
||||||
|
}
|
||||||
|
|
||||||
|
min, err1 := strconv.Atoi(strings.TrimSpace(parts[0]))
|
||||||
|
max, err2 := strconv.Atoi(strings.TrimSpace(parts[1]))
|
||||||
|
if err1 != nil || err2 != nil {
|
||||||
|
return statusRange{}, strconv.ErrSyntax
|
||||||
|
}
|
||||||
|
|
||||||
|
return statusRange{min: min, max: max}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 单个状态码
|
||||||
|
code, err := strconv.Atoi(s)
|
||||||
|
if err != nil {
|
||||||
|
return statusRange{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return statusRange{min: code, max: code}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultHealthMatch 返回默认健康检查匹配器。
|
||||||
|
func DefaultHealthMatch() HealthMatch {
|
||||||
|
return &defaultHealthMatch{}
|
||||||
|
}
|
||||||
282
internal/proxy/health_match_test.go
Normal file
282
internal/proxy/health_match_test.go
Normal file
@ -0,0 +1,282 @@
|
|||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDefaultHealthMatch(t *testing.T) {
|
||||||
|
m := DefaultHealthMatch()
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
status int
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{200, true},
|
||||||
|
{201, true},
|
||||||
|
{299, true},
|
||||||
|
{300, false},
|
||||||
|
{400, false},
|
||||||
|
{500, false},
|
||||||
|
{199, false},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run("", func(t *testing.T) {
|
||||||
|
got := m.Match(tt.status, nil, nil)
|
||||||
|
if got != tt.want {
|
||||||
|
t.Errorf("Match(%d) = %v, want %v", tt.status, got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCustomHealthMatch_StatusRange(t *testing.T) {
|
||||||
|
cfg := &HealthMatchConfig{
|
||||||
|
Status: []string{"200-299", "301", "302"},
|
||||||
|
}
|
||||||
|
m := NewHealthMatch(cfg)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
status int
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{200, true},
|
||||||
|
{250, true},
|
||||||
|
{299, true},
|
||||||
|
{301, true},
|
||||||
|
{302, true},
|
||||||
|
{300, false}, // 不在范围内
|
||||||
|
{303, false},
|
||||||
|
{400, false},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run("", func(t *testing.T) {
|
||||||
|
got := m.Match(tt.status, nil, nil)
|
||||||
|
if got != tt.want {
|
||||||
|
t.Errorf("Match(%d) = %v, want %v", tt.status, got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCustomHealthMatch_BodyRegex(t *testing.T) {
|
||||||
|
cfg := &HealthMatchConfig{
|
||||||
|
Status: []string{"200"},
|
||||||
|
Body: `"status":"ok"`,
|
||||||
|
}
|
||||||
|
m := NewHealthMatch(cfg)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
status int
|
||||||
|
body string
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "matching body",
|
||||||
|
status: 200,
|
||||||
|
body: `{"status":"ok","data":{}}`,
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "non-matching body",
|
||||||
|
status: 200,
|
||||||
|
body: `{"status":"error"}`,
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "wrong status",
|
||||||
|
status: 500,
|
||||||
|
body: `{"status":"ok"}`,
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got := m.Match(tt.status, []byte(tt.body), nil)
|
||||||
|
if got != tt.want {
|
||||||
|
t.Errorf("Match() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCustomHealthMatch_Headers(t *testing.T) {
|
||||||
|
cfg := &HealthMatchConfig{
|
||||||
|
Status: []string{"200"},
|
||||||
|
Headers: map[string]string{
|
||||||
|
"X-Health": "ok",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
m := NewHealthMatch(cfg)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
status int
|
||||||
|
headers map[string]string
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "matching header",
|
||||||
|
status: 200,
|
||||||
|
headers: map[string]string{
|
||||||
|
"x-health": "ok",
|
||||||
|
},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "missing header",
|
||||||
|
status: 200,
|
||||||
|
headers: map[string]string{
|
||||||
|
"content-type": "application/json",
|
||||||
|
},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "wrong value",
|
||||||
|
status: 200,
|
||||||
|
headers: map[string]string{
|
||||||
|
"x-health": "error",
|
||||||
|
},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got := m.Match(tt.status, nil, tt.headers)
|
||||||
|
if got != tt.want {
|
||||||
|
t.Errorf("Match() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewHealthMatch_NilConfig(t *testing.T) {
|
||||||
|
m := NewHealthMatch(nil)
|
||||||
|
|
||||||
|
// 应该返回默认匹配器
|
||||||
|
if !m.Match(200, nil, nil) {
|
||||||
|
t.Error("nil config should return default matcher")
|
||||||
|
}
|
||||||
|
if m.Match(300, nil, nil) {
|
||||||
|
t.Error("default matcher should not match 300")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewHealthMatch_EmptyStatus(t *testing.T) {
|
||||||
|
cfg := &HealthMatchConfig{
|
||||||
|
Status: []string{}, // 空
|
||||||
|
}
|
||||||
|
m := NewHealthMatch(cfg)
|
||||||
|
|
||||||
|
// 应该使用默认 2xx 范围
|
||||||
|
if !m.Match(200, nil, nil) {
|
||||||
|
t.Error("empty status should default to 2xx")
|
||||||
|
}
|
||||||
|
if m.Match(300, nil, nil) {
|
||||||
|
t.Error("empty status should default to 2xx, not match 300")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseStatusRange(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
input string
|
||||||
|
min int
|
||||||
|
max int
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{"200", 200, 200, false},
|
||||||
|
{"200-299", 200, 299, false},
|
||||||
|
{" 200-299 ", 200, 299, false},
|
||||||
|
{"200 - 299", 200, 299, false},
|
||||||
|
{"abc", 0, 0, true},
|
||||||
|
{"200-abc", 0, 0, true},
|
||||||
|
{"200-300-400", 0, 0, true},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.input, func(t *testing.T) {
|
||||||
|
r, err := parseStatusRange(tt.input)
|
||||||
|
if tt.wantErr {
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected error, got nil")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if r.min != tt.min || r.max != tt.max {
|
||||||
|
t.Errorf("range = {%d, %d}, want {%d, %d}", r.min, r.max, tt.min, tt.max)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCustomHealthMatch_Combined(t *testing.T) {
|
||||||
|
cfg := &HealthMatchConfig{
|
||||||
|
Status: []string{"200-299"},
|
||||||
|
Body: `"healthy":true`,
|
||||||
|
Headers: map[string]string{
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
m := NewHealthMatch(cfg)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
status int
|
||||||
|
body string
|
||||||
|
headers map[string]string
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "all match",
|
||||||
|
status: 200,
|
||||||
|
body: `{"healthy":true,"status":"ok"}`,
|
||||||
|
headers: map[string]string{
|
||||||
|
"content-type": "application/json",
|
||||||
|
},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "status mismatch",
|
||||||
|
status: 400,
|
||||||
|
body: `{"healthy":true}`,
|
||||||
|
headers: map[string]string{
|
||||||
|
"content-type": "application/json",
|
||||||
|
},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "body mismatch",
|
||||||
|
status: 200,
|
||||||
|
body: `{"healthy":false}`,
|
||||||
|
headers: map[string]string{
|
||||||
|
"content-type": "application/json",
|
||||||
|
},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "header mismatch",
|
||||||
|
status: 200,
|
||||||
|
body: `{"healthy":true}`,
|
||||||
|
headers: map[string]string{
|
||||||
|
"content-type": "text/plain",
|
||||||
|
},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got := m.Match(tt.status, []byte(tt.body), tt.headers)
|
||||||
|
if got != tt.want {
|
||||||
|
t.Errorf("Match() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user