diff --git a/internal/config/config.go b/internal/config/config.go index a7819d1..64014be 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -188,6 +188,12 @@ type ServerConfig struct { // MaxRequestsPerConn 每连接最大请求数 // 达到后连接将被优雅关闭 MaxRequestsPerConn int `yaml:"max_requests_per_conn"` + + // ClientMaxBodySize 客户端请求体大小限制 + // 限制请求体最大字节数,超过返回 413 错误 + // 支持单位:b, kb, mb, gb 或纯数字表示字节 + // 默认值为 1MB + ClientMaxBodySize string `yaml:"client_max_body_size"` } // StaticConfig 静态文件服务配置。 @@ -222,6 +228,16 @@ type StaticConfig struct { // 访问目录时依次查找这些文件作为默认页面 // 默认为 ["index.html", "index.htm"] Index []string `yaml:"index"` + + // TryFiles 按顺序尝试查找的文件列表 + // 支持 $uri 和 $uri/ 占位符,用于 SPA 部署 + // 示例: ["$uri", "$uri/", "/index.html"] + TryFiles []string `yaml:"try_files"` + + // TryFilesPass 内部重定向是否触发中间件 + // 默认为 false,内部重定向不触发中间件 + // 设置为 true 时,try_files 回退会重新进入中间件链 + TryFilesPass bool `yaml:"try_files_pass"` } // ProxyConfig 反向代理配置,支持负载均衡和健康检查。 @@ -284,6 +300,15 @@ type ProxyConfig struct { // Cache 代理缓存配置 // 启用后缓存后端响应减少重复请求 Cache ProxyCacheConfig `yaml:"cache"` + + // ClientMaxBodySize 请求体大小限制 + // 限制此代理路径的请求体最大字节数,覆盖全局配置 + // 支持单位:b, kb, mb, gb 或纯数字表示字节 + ClientMaxBodySize string `yaml:"client_max_body_size"` + + // NextUpstream 故障转移配置 + // 配置后端故障时的自动重试行为 + NextUpstream NextUpstreamConfig `yaml:"next_upstream"` } // ProxyTarget 后端目标配置。 @@ -436,6 +461,30 @@ type ProxyCacheConfig struct { StaleWhileRevalidate time.Duration `yaml:"stale_while_revalidate"` } +// NextUpstreamConfig 故障转移配置,定义后端失败时的自动重试行为。 +// +// 当后端返回特定错误状态码或连接失败时,自动尝试下一个可用后端。 +// +// 注意事项: +// - Tries 为 1 时禁用故障转移 +// - 空 NextUpstream 使用默认值(Tries=1,禁用故障转移) +// - 建议根据后端数量合理设置 Tries 值 +// +// 使用示例: +// +// next_upstream: +// tries: 3 +// http_codes: [502, 503, 504] +type NextUpstreamConfig struct { + // Tries 最大尝试次数 + // 包括第一次尝试在内的总请求次数 + Tries int `yaml:"tries"` + + // HTTPCodes 触发重试的 HTTP 状态码列表 + // 后端返回这些状态码时自动尝试下一个 + HTTPCodes []int `yaml:"http_codes"` +} + // SSLConfig SSL/TLS 配置。 // // 用于配置 HTTPS 服务所需的证书和加密参数。 @@ -562,6 +611,10 @@ type SecurityConfig struct { // Headers 安全头部 // 添加安全相关的 HTTP 响应头 Headers SecurityHeaders `yaml:"headers"` + + // ErrorPage 自定义错误页面配置 + // 允许为特定 HTTP 状态码配置自定义错误页面 + ErrorPage ErrorPageConfig `yaml:"error_page"` } // AccessConfig IP 访问控制配置。 @@ -760,6 +813,41 @@ type SecurityHeaders struct { PermissionsPolicy string `yaml:"permissions_policy"` } +// ErrorPageConfig 自定义错误页面配置。 +// +// 允许为特定 HTTP 状态码配置自定义错误页面。 +// 错误页面文件在启动时预加载到内存中,运行时不进行文件 I/O。 +// +// 注意事项: +// - 错误页面文件路径可以是相对路径或绝对路径 +// - 所有错误页面加载失败时会阻止服务器启动 +// - 部分错误页面加载失败会记录警告但允许启动 +// - 支持可选的响应状态码覆盖 +// +// 使用示例: +// +// error_page: +// pages: +// 404: "/var/www/errors/404.html" +// 500: "/var/www/errors/500.html" +// 503: "/var/www/errors/503.html" +// default: "/var/www/errors/error.html" +// response_code: 200 # 可选:覆盖响应状态码 +type ErrorPageConfig struct { + // Pages 状态码到错误页面文件的映射 + // key 为 HTTP 状态码(如 404, 500),value 为文件路径 + Pages map[int]string `yaml:"pages"` + + // Default 默认错误页面 + // 当特定状态码没有配置时使用 + Default string `yaml:"default"` + + // ResponseCode 响应状态码覆盖 + // 如果不为 0,所有错误页面响应将使用此状态码 + // 例如设置为 200 时,即使发生错误也返回 200 OK + ResponseCode int `yaml:"response_code"` +} + // RewriteRule URL 重写规则。 // // 用于在代理或静态文件服务前修改请求 URL。 diff --git a/internal/config/validate.go b/internal/config/validate.go index 83fc324..df2e880 100644 --- a/internal/config/validate.go +++ b/internal/config/validate.go @@ -221,6 +221,11 @@ func validateProxy(p *ProxyConfig) error { return fmt.Errorf("无效的负载均衡算法:%s", p.LoadBalance) } + // 验证故障转移配置 + if err := validateNextUpstream(&p.NextUpstream); err != nil { + return fmt.Errorf("next_upstream: %w", err) + } + // 验证一致性哈希键格式 if p.HashKey != "" { validHashKeys := []string{"ip", "uri"} @@ -732,3 +737,37 @@ func validatePerformance(p *PerformanceConfig) error { return nil } + +// validateNextUpstream 验证故障转移配置。 +// +// 检查重试次数和 HTTP 状态码的有效性。 +// +// 参数: +// - n: 故障转移配置对象 +// +// 返回值: +// - error: 验证失败时返回错误信息,成功返回 nil +// +// 验证规则: +// - tries 不能为负数,建议不超过后端数量 +// - http_codes 应包含有效的 HTTP 状态码 +func validateNextUpstream(n *NextUpstreamConfig) error { + // 未配置时跳过 + if n.Tries == 0 && len(n.HTTPCodes) == 0 { + return nil + } + + // 验证重试次数 + if n.Tries < 0 { + return errors.New("tries 不能为负数") + } + + // 验证 HTTP 状态码 + for i, code := range n.HTTPCodes { + if code < 100 || code > 599 { + return fmt.Errorf("http_codes[%d]: 无效的 HTTP 状态码 %d", i, code) + } + } + + return nil +} diff --git a/internal/loadbalance/balancer.go b/internal/loadbalance/balancer.go index a8b789a..21eea39 100644 --- a/internal/loadbalance/balancer.go +++ b/internal/loadbalance/balancer.go @@ -46,6 +46,11 @@ type Balancer interface { // Select 根据算法策略从提供的列表中选择一个目标。 // 如果没有健康目标可用,返回 nil。 Select(targets []*Target) *Target + + // SelectExcluding 根据算法策略选择一个目标,排除指定的目标列表。 + // 用于故障转移场景,避免选择已失败的目标。 + // 如果除了排除列表外没有可用目标,返回 nil。 + SelectExcluding(targets []*Target, excluded []*Target) *Target } // RoundRobin 实现简单的轮询负载均衡。 @@ -216,3 +221,134 @@ func IncrementConnections(t *Target) { func DecrementConnections(t *Target) { atomic.AddInt64(&t.Connections, -1) } + +// filterHealthyAndExclude 返回仅包含健康目标且不在排除列表中的新切片。 +// 这是 SelectExcluding 使用的辅助函数。 +func filterHealthyAndExclude(targets []*Target, excluded []*Target) []*Target { + // 构建排除集合(使用 URL 作为键) + excludeSet := make(map[string]bool, len(excluded)) + for _, t := range excluded { + if t != nil { + excludeSet[t.URL] = true + } + } + + // 过滤健康且不在排除列表中的目标 + available := make([]*Target, 0, len(targets)) + for _, t := range targets { + if t.Healthy.Load() && !excludeSet[t.URL] { + available = append(available, t) + } + } + return available +} + +// SelectExcluding 根据轮询策略选择一个目标,排除指定的目标列表。 +// 只考虑健康且不在排除列表中的目标。 +func (r *RoundRobin) SelectExcluding(targets []*Target, excluded []*Target) *Target { + available := filterHealthyAndExclude(targets, excluded) + if len(available) == 0 { + return nil + } + + // 原子地递增并获取计数器值 + idx := atomic.AddUint64(&r.counter, 1) - 1 + return available[idx%uint64(len(available))] +} + +// SelectExcluding 根据权重分布选择目标,排除指定的目标列表。 +// 只考虑健康且不在排除列表中的目标。 +func (w *WeightedRoundRobin) SelectExcluding(targets []*Target, excluded []*Target) *Target { + available := filterHealthyAndExclude(targets, excluded) + if len(available) == 0 { + return nil + } + + // 计算总权重 + totalWeight := 0 + for _, t := range available { + if t.Weight <= 0 { + totalWeight += 1 // 最小权重为 1 + } else { + totalWeight += t.Weight + } + } + + if totalWeight == 0 { + return nil + } + + // 使用原子计数器确定权重分布中的位置 + idx := atomic.AddUint64(&w.counter, 1) - 1 + pos := int(idx % uint64(totalWeight)) + + // 找到计算位置处的目标 + currentWeight := 0 + for _, t := range available { + weight := t.Weight + if weight <= 0 { + weight = 1 + } + currentWeight += weight + if pos < currentWeight { + return t + } + } + + // 回退到最后一个目标(不应到达这里) + return available[len(available)-1] +} + +// SelectExcluding 选择连接数最少的目标,排除指定的目标列表。 +// 只考虑健康且不在排除列表中的目标。 +func (l *LeastConnections) SelectExcluding(targets []*Target, excluded []*Target) *Target { + // 构建排除集合 + excludeSet := make(map[string]bool, len(excluded)) + for _, t := range excluded { + if t != nil { + excludeSet[t.URL] = true + } + } + + var selected *Target + var minConns int64 = -1 + + for _, t := range targets { + if !t.Healthy.Load() || excludeSet[t.URL] { + continue + } + + // 原子地读取连接计数 + conns := atomic.LoadInt64(&t.Connections) + + if selected == nil || conns < minConns { + selected = t + minConns = conns + } + } + + return selected +} + +// SelectExcluding 基于客户端 IP 的哈希值选择目标,排除指定的目标列表。 +// 只考虑健康且不在排除列表中的目标。 +func (i *IPHash) SelectExcluding(targets []*Target, excluded []*Target) *Target { + return i.SelectExcludingByIP(targets, excluded, "") +} + +// SelectExcludingByIP 基于提供的 IP 地址的哈希值选择目标,排除指定的目标列表。 +// 只考虑健康且不在排除列表中的目标。 +func (i *IPHash) SelectExcludingByIP(targets []*Target, excluded []*Target, clientIP string) *Target { + available := filterHealthyAndExclude(targets, excluded) + if len(available) == 0 { + return nil + } + + // 对客户端 IP 进行哈希 + h := fnv.New64a() + h.Write([]byte(clientIP)) + hash := h.Sum64() + + idx := hash % uint64(len(available)) + return available[idx] +} diff --git a/internal/loadbalance/consistent_hash.go b/internal/loadbalance/consistent_hash.go index 64caf49..70c60eb 100644 --- a/internal/loadbalance/consistent_hash.go +++ b/internal/loadbalance/consistent_hash.go @@ -185,5 +185,93 @@ func (c *ConsistentHash) GetStats() ConsistentHashStats { } } +// SelectExcluding 根据指定键选择目标,排除指定的目标列表。 +// +// 参数: +// - targets: 可用目标列表 +// - excluded: 需要排除的目标列表 +// +// 返回值: +// - *Target: 选中的目标,如果没有可用目标则返回 nil +func (c *ConsistentHash) SelectExcluding(targets []*Target, excluded []*Target) *Target { + return c.SelectExcludingByKey(targets, excluded, "") +} + +// SelectExcludingByKey 根据指定键选择目标,排除指定的目标列表。 +// +// 参数: +// - targets: 可用目标列表 +// - excluded: 需要排除的目标列表 +// - key: 哈希键值(如客户端 IP、URI 等) +// +// 返回值: +// - *Target: 选中的目标,如果没有可用目标则返回 nil +func (c *ConsistentHash) SelectExcludingByKey(targets []*Target, excluded []*Target, key string) *Target { + // 构建排除集合 + excludeSet := make(map[string]bool, len(excluded)) + for _, t := range excluded { + if t != nil { + excludeSet[t.URL] = true + } + } + + c.mu.RLock() + defer c.mu.RUnlock() + + // 如果没有排除的目标,使用正常选择 + if len(excludeSet) == 0 { + return c.SelectByKey(targets, key) + } + + // 过滤掉被排除的目标 + filtered := make([]*Target, 0, len(targets)) + for _, t := range targets { + if t.Healthy.Load() && !excludeSet[t.URL] { + filtered = append(filtered, t) + } + } + + if len(filtered) == 0 { + return nil + } + + // 为过滤后的目标临时构建哈希环 + circle := make(map[uint64]*Target) + sortedHashes := make([]uint64, 0) + + for _, target := range filtered { + for i := 0; i < c.virtualNodes; i++ { + nodeKey := fmt.Sprintf("%s#%d", target.URL, i) + hash := c.hashKeyString(nodeKey) + circle[hash] = target + sortedHashes = append(sortedHashes, hash) + } + } + + // 排序哈希值 + sort.Slice(sortedHashes, func(i, j int) bool { + return sortedHashes[i] < sortedHashes[j] + }) + + if len(sortedHashes) == 0 { + return nil + } + + // 计算键的哈希值 + hash := c.hashKeyString(key) + + // 二分查找最近的节点 + idx := sort.Search(len(sortedHashes), func(i int) bool { + return sortedHashes[i] >= hash + }) + + // 环形回绕 + if idx >= len(sortedHashes) { + idx = 0 + } + + return circle[sortedHashes[idx]] +} + // 验证接口实现 var _ Balancer = (*ConsistentHash)(nil) diff --git a/internal/proxy/health.go b/internal/proxy/health.go index d4902e0..983c619 100644 --- a/internal/proxy/health.go +++ b/internal/proxy/health.go @@ -235,6 +235,21 @@ func (h *HealthChecker) MarkUnhealthy(target *loadbalance.Target) { target.Healthy.Store(false) } +// MarkHealthy 将目标标记为健康。 +// 此方法用于故障转移成功后,将之前失败的目标恢复为健康状态。 +// +// 在故障转移成功后的使用示例: +// +// if err := retryRequest(target, req, resp); err == nil { +// healthChecker.MarkHealthy(target) +// } +// +// 注意:此方法与主动健康检查独立运作,用于快速恢复 +// 故障转移场景中已恢复的目标。 +func (h *HealthChecker) MarkHealthy(target *loadbalance.Target) { + target.Healthy.Store(true) +} + // IsRunning 如果健康检查器当前正在运行,则返回 true。 func (h *HealthChecker) IsRunning() bool { return h.running.Load() diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 48ddb18..9452253 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -34,6 +34,7 @@ package proxy import ( "errors" + "fmt" "strings" "sync" "time" @@ -207,107 +208,190 @@ func createHostClient(targetURL string, timeout config.ProxyTimeout, transportCf // 4. 将响应复制回客户端 // // 如果没有可用的健康目标,返回 502 Bad Gateway。 -// 如果后端请求失败,返回相应的错误响应。 +// 如果后端请求失败,根据 next_upstream 配置尝试下一个目标。 func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) { - // 使用负载均衡器选择目标 - target := p.selectTarget(ctx) - if target == nil { - ctx.Error("Bad Gateway: no healthy upstream", fasthttp.StatusBadGateway) - return + // 故障转移配置 + maxTries := p.config.NextUpstream.Tries + if maxTries <= 0 { + maxTries = 1 // 默认不重试 + } + httpCodes := p.config.NextUpstream.HTTPCodes + if len(httpCodes) == 0 { + // 默认重试的状态码 + httpCodes = []int{502, 503, 504} } - // 获取所选目标的客户端 - client := p.getClient(target.URL) - if client == nil { - ctx.Error("Bad Gateway: upstream client unavailable", fasthttp.StatusBadGateway) - return - } + // 已尝试的目标列表(用于故障转移时排除) + attemptedTargets := make([]*loadbalance.Target, 0, maxTries) - // 增加连接计数(用于最少连接数负载均衡) - loadbalance.IncrementConnections(target) - defer loadbalance.DecrementConnections(target) + var lastErr error - // 检查是否为 WebSocket 升级请求 - if isWebSocketRequest(ctx) { - p.handleWebSocket(ctx, target, client) - return - } + for attempt := 0; attempt < maxTries; attempt++ { + // 选择目标(第一次使用普通选择,后续排除已失败目标) + var target *loadbalance.Target + if attempt == 0 { + target = p.selectTarget(ctx) + } else { + target = p.selectTargetExcluding(ctx, attemptedTargets) + } - // 准备请求 - req := &ctx.Request - - // 修改请求头 - p.modifyRequestHeaders(ctx, target) - - // 尝试从缓存获取(如果启用) - if p.cache != nil { - cacheKey := p.buildCacheKey(ctx) - if entry, ok, stale := p.cache.Get(cacheKey); ok { - // 缓存命中 - if !stale { - // 新鲜缓存,直接返回 - p.writeCachedResponse(ctx, entry) + if target == nil { + if attempt == 0 { + ctx.Error("Bad Gateway: no healthy upstream", fasthttp.StatusBadGateway) return } - // 过期缓存,尝试后台刷新,同时返回旧数据 - go p.backgroundRefresh(ctx, target, cacheKey) - p.writeCachedResponse(ctx, entry) + // 没有更多可用目标,返回最后一次错误 + break + } + + attemptedTargets = append(attemptedTargets, target) + + // 获取所选目标的客户端 + client := p.getClient(target.URL) + if client == nil { + // 标记为不健康并继续尝试下一个 + if p.healthChecker != nil { + p.healthChecker.MarkUnhealthy(target) + } + continue + } + + // 增加连接计数(用于最少连接数负载均衡) + loadbalance.IncrementConnections(target) + + // 检查是否为 WebSocket 升级请求 + if isWebSocketRequest(ctx) { + // WebSocket 使用 defer 确保连接计数释放 + defer loadbalance.DecrementConnections(target) + p.handleWebSocket(ctx, target, client) return } - // 检查是否需要缓存锁(防止缓存击穿) - if done := p.cache.AcquireLock(cacheKey); done != nil { - // 有其他请求正在生成缓存,等待 - <-done - // 重新尝试获取缓存 - if entry, ok, _ := p.cache.Get(cacheKey); ok { + // 准备请求 + req := &ctx.Request + + // 修改请求头 + p.modifyRequestHeaders(ctx, target) + + // 尝试从缓存获取(如果启用) + if p.cache != nil && attempt == 0 { + cacheKey := p.buildCacheKey(ctx) + if entry, ok, stale := p.cache.Get(cacheKey); ok { + // 缓存命中 + loadbalance.DecrementConnections(target) + if !stale { + // 新鲜缓存,直接返回 + p.writeCachedResponse(ctx, entry) + return + } + // 过期缓存,尝试后台刷新,同时返回旧数据 + go p.backgroundRefresh(ctx, target, cacheKey) p.writeCachedResponse(ctx, entry) return } + + // 检查是否需要缓存锁(防止缓存击穿) + if done := p.cache.AcquireLock(cacheKey); done != nil { + // 有其他请求正在生成缓存,等待 + loadbalance.DecrementConnections(target) + <-done + // 重新尝试获取缓存 + if entry, ok, _ := p.cache.Get(cacheKey); ok { + p.writeCachedResponse(ctx, entry) + return + } + // 缓存未命中,需要重新选择目标 + loadbalance.IncrementConnections(target) + } } + + // 执行代理请求 + err := client.Do(req, &ctx.Response) + + if err != nil { + loadbalance.DecrementConnections(target) + + // 被动健康检查:标记目标为不健康 + if p.healthChecker != nil { + p.healthChecker.MarkUnhealthy(target) + } + + // 释放缓存锁 + if p.cache != nil && attempt == 0 { + p.cache.ReleaseLock(p.buildCacheKey(ctx), err) + } + + lastErr = err + // 继续尝试下一个目标 + continue + } + + // 请求成功,减少连接计数 + loadbalance.DecrementConnections(target) + + // 检查响应状态码是否需要重试 + statusCode := ctx.Response.StatusCode() + shouldRetry := false + for _, code := range httpCodes { + if statusCode == code { + shouldRetry = true + break + } + } + + if shouldRetry { + // 释放缓存锁 + if p.cache != nil && attempt == 0 { + p.cache.ReleaseLock(p.buildCacheKey(ctx), fmt.Errorf("HTTP %d", statusCode)) + } + + // 如果不是最后一次尝试,继续下一个目标 + if attempt < maxTries-1 { + // 标记目标为不健康 + if p.healthChecker != nil { + p.healthChecker.MarkUnhealthy(target) + } + continue + } + } + + // 重试成功时恢复健康状态 + if attempt > 0 && p.healthChecker != nil { + p.healthChecker.MarkHealthy(target) + } + + // 存入缓存(如果启用且响应可缓存) + if p.cache != nil { + cacheKey := p.buildCacheKey(ctx) + if statusCode >= 200 && statusCode < 300 { + // 提取响应头 + headers := make(map[string]string) + for key, value := range ctx.Response.Header.All() { + headers[string(key)] = string(value) + } + p.cache.Set(cacheKey, ctx.Response.Body(), headers, statusCode, p.config.Cache.MaxAge) + } + p.cache.ReleaseLock(cacheKey, nil) + } + + // 修改响应头 + p.modifyResponseHeaders(ctx) + return } - // 执行代理请求 - err := client.Do(req, &ctx.Response) - if err != nil { - // 被动健康检查:标记目标为不健康 - if p.healthChecker != nil { - p.healthChecker.MarkUnhealthy(target) - } - - // 释放缓存锁 - if p.cache != nil { - p.cache.ReleaseLock(p.buildCacheKey(ctx), err) - } - + // 所有尝试都失败 + if lastErr != nil { // 处理不同类型的错误 - if errors.Is(err, fasthttp.ErrTimeout) { + if errors.Is(lastErr, fasthttp.ErrTimeout) { ctx.Error("Gateway Timeout", fasthttp.StatusGatewayTimeout) - } else if errors.Is(err, fasthttp.ErrConnectionClosed) { + } else if errors.Is(lastErr, fasthttp.ErrConnectionClosed) { ctx.Error("Bad Gateway: upstream connection closed", fasthttp.StatusBadGateway) } else { ctx.Error("Bad Gateway", fasthttp.StatusBadGateway) } - return + } else { + ctx.Error("Bad Gateway: all upstreams failed", fasthttp.StatusBadGateway) } - - // 存入缓存(如果启用且响应可缓存) - if p.cache != nil { - cacheKey := p.buildCacheKey(ctx) - status := ctx.Response.StatusCode() - if status >= 200 && status < 300 { - // 提取响应头 - headers := make(map[string]string) - for key, value := range ctx.Response.Header.All() { - headers[string(key)] = string(value) - } - p.cache.Set(cacheKey, ctx.Response.Body(), headers, status, p.config.Cache.MaxAge) - } - p.cache.ReleaseLock(cacheKey, nil) - } - - // 修改响应头 - p.modifyResponseHeaders(ctx) } // selectTarget 使用配置的负载均衡器选择后端目标。 @@ -340,6 +424,35 @@ func (p *Proxy) selectTarget(ctx *fasthttp.RequestCtx) *loadbalance.Target { return balancer.Select(targets) } +// selectTargetExcluding 选择后端目标,排除已尝试失败的目标。 +// 用于故障转移场景,避免重复选择已失败的目标。 +// 如果没有可用的健康目标则返回 nil。 +func (p *Proxy) selectTargetExcluding(ctx *fasthttp.RequestCtx, excluded []*loadbalance.Target) *loadbalance.Target { + p.mu.RLock() + balancer := p.balancer + targets := p.targets + p.mu.RUnlock() + + if len(targets) == 0 { + return nil + } + + // 对于 IPHash 负载均衡器,提取客户端 IP + if ipHash, ok := balancer.(*loadbalance.IPHash); ok { + clientIP := netutil.ExtractClientIP(ctx) + return ipHash.SelectExcludingByIP(targets, excluded, clientIP) + } + + // 对于一致性哈希,根据 hash_key 配置选择 + if ch, ok := balancer.(*loadbalance.ConsistentHash); ok { + hashKey := ch.GetHashKey() + key := p.extractHashKey(ctx, hashKey) + return ch.SelectExcludingByKey(targets, excluded, key) + } + + return balancer.SelectExcluding(targets, excluded) +} + // extractHashKey 根据配置提取哈希键值。 func (p *Proxy) extractHashKey(ctx *fasthttp.RequestCtx, hashKey string) string { switch {