diff --git a/internal/config/config.go b/internal/config/config.go index 227ebe7..593b474 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -340,6 +340,7 @@ type ProxyConfig struct { RedirectRewrite *RedirectRewriteConfig `yaml:"redirect_rewrite"` ProxySSL *ProxySSLConfig `yaml:"proxy_ssl"` CacheValid *ProxyCacheValidConfig `yaml:"cache_valid"` + Buffering *ProxyBufferingConfig `yaml:"buffering"` // 切片字段 Targets []ProxyTarget `yaml:"targets"` // 字符串字段 @@ -347,6 +348,7 @@ type ProxyConfig struct { LoadBalance string `yaml:"load_balance"` HashKey string `yaml:"hash_key"` ClientMaxBodySize string `yaml:"client_max_body_size"` + ProxyBind string `yaml:"proxy_bind"` // 结构体字段 Headers ProxyHeaders `yaml:"headers"` BalancerByLua BalancerByLuaConfig `yaml:"balancer_by_lua"` @@ -370,6 +372,26 @@ type ProxyConfig struct { Internal bool `yaml:"internal"` } +// ProxyBufferingConfig 代理缓冲配置。 +// +// 控制代理响应的缓冲行为: +// - "default" 或 "on": 缓冲响应到内存/临时文件 +// - "off": 流式转发响应,不缓冲 +// +// 使用示例: +// +// buffering: +// mode: "off" +type ProxyBufferingConfig struct { + // Mode 缓冲模式 + // 可选值:"default"(默认缓冲), "on"(强制缓冲), "off"(关闭缓冲) + Mode string `yaml:"mode"` + + // BufferSize 响应缓冲区大小(字节) + // 0 表示使用默认值 + BufferSize int `yaml:"buffer_size"` +} + // BalancerByLuaConfig Lua 负载均衡配置 // // 使用 Lua 脚本动态选择后端目标,支持自定义负载均衡逻辑。 @@ -425,6 +447,30 @@ type ProxyTarget struct { // Weight 权重 // 用于加权轮询算法,值越大分配的请求越多 Weight int `yaml:"weight"` + + // MaxConns 最大并发连接数 + // 0 表示不限制 + MaxConns int `yaml:"max_conns"` + + // MaxFails 最大失败次数 + // 在 FailTimeout 期间失败次数达到此值后标记为不可用 + // 0 表示不进行被动失败检测 + MaxFails int `yaml:"max_fails"` + + // FailTimeout 失败超时时间 + // 达到 MaxFails 后,目标在此时间内被视为不可用 + FailTimeout time.Duration `yaml:"fail_timeout"` + + // Backup 备份服务器 + // 仅当所有非备份服务器不可用时才使用 + Backup bool `yaml:"backup"` + + // Down 标记服务器为永久不可用 + Down bool `yaml:"down"` + + // ProxyURI 代理传递的 URI 路径 + // 设置后替换请求路径,支持 nginx proxy_pass URI 语义 + ProxyURI string `yaml:"proxy_uri"` } // HealthCheckConfig 健康检查配置。 @@ -508,6 +554,26 @@ type ProxyHeaders struct { // Remove 移除的头部 // 从发送到后端的请求中移除的头部列表 Remove []string `yaml:"remove"` + + // HideResponse 隐藏的响应头 + // 从返回给客户端的响应中移除的头部列表 + HideResponse []string `yaml:"hide_response"` + + // PassResponse 允许传递的响应头 + // 仅传递列出的头部,其他全部隐藏(白名单模式) + PassResponse []string `yaml:"pass_response"` + + // IgnoreHeaders 忽略的头部 + // 代理时完全忽略这些头部,不转发到后端也不返回给客户端 + IgnoreHeaders []string `yaml:"ignore_headers"` + + // CookieDomain Cookie 域重写 + // 将响应中 Set-Cookie 的 domain 替换为此值 + CookieDomain string `yaml:"cookie_domain"` + + // CookiePath Cookie 路径重写 + // 将响应中 Set-Cookie 的 path 替换为此值 + CookiePath string `yaml:"cookie_path"` } // ProxyCacheConfig 代理缓存配置。 diff --git a/internal/loadbalance/algorithms.go b/internal/loadbalance/algorithms.go index 7027ed6..ad073f8 100644 --- a/internal/loadbalance/algorithms.go +++ b/internal/loadbalance/algorithms.go @@ -31,6 +31,7 @@ var ValidAlgorithms = []string{ "least_conn", "ip_hash", "consistent_hash", + "random", } // IsValidAlgorithm 检查给定的算法名称是否有效。 diff --git a/internal/loadbalance/balancer.go b/internal/loadbalance/balancer.go index 2174e17..e7e2f44 100644 --- a/internal/loadbalance/balancer.go +++ b/internal/loadbalance/balancer.go @@ -49,6 +49,24 @@ type Target struct { lastResolved atomic.Int64 hostnameOnce sync.Once Healthy atomic.Bool + + // MaxConns 最大并发连接数,0 表示不限制 + MaxConns int64 + // MaxFails 最大失败次数,0 表示不检测 + MaxFails int64 + // FailTimeout 失败冷却时间 + FailTimeout time.Duration + // Backup 备份服务器标记 + Backup bool + // Down 永久不可用标记 + Down bool + // ProxyURI 代理传递的 URI 路径 + ProxyURI string + + // failCount 失败计数(原子操作) + failCount atomic.Int64 + // failedUntil 失败冷却截止时间(UnixNano,原子操作) + failedUntil atomic.Int64 } // Balancer 是 HTTP 代理(L7 层)负载均衡算法的接口。 @@ -248,24 +266,103 @@ func (i *IPHash) SelectByIP(targets []*Target, clientIP string) *Target { return healthy[idx] } -// filterHealthy 从目标列表中筛选出所有健康的目标,返回新切片。 -// -// 该函数遍历输入切片,仅保留 Healthy 状态为 true 的目标。 -// 返回的切片容量与输入相同,避免多次内存分配。 -// -// 参数: -// - targets: 原始目标列表 -// -// 返回值: -// - 仅包含健康目标的新切片,当无健康目标时返回空切片 -func filterHealthy(targets []*Target) []*Target { - healthy := make([]*Target, 0, len(targets)) - for _, t := range targets { - if t.Healthy.Load() { - healthy = append(healthy, t) +// IsAvailable 检查目标是否可用。 +// 目标不可用的条件(优先级从高到低): +// - Healthy 为 false(硬性不可用,由健康检查器设置) +// - Down 为 true(配置标记永久不可用) +// - 超过 MaxConns 限制 +// - 失败冷却期内(failCount >= MaxFails 且未超过 FailTimeout) +func (t *Target) IsAvailable() bool { + if !t.Healthy.Load() || t.Down { + return false + } + if t.MaxConns > 0 && atomic.LoadInt64(&t.Connections) >= t.MaxConns { + return false + } + if t.MaxFails > 0 { + deadline := t.failedUntil.Load() + if deadline > 0 { + if time.Now().UnixNano() < deadline { + return false + } + // 冷却已过期,CAS 重置防止丢失并发的 RecordFailure + if t.failedUntil.CompareAndSwap(deadline, 0) { + t.failCount.Store(0) + } } } - return healthy + return true +} + +// RecordFailure 记录一次失败。 +// 使用原子递增 failCount,当达到 MaxFails 时设置冷却截止时间。 +// 返回当前失败计数。 +func (t *Target) RecordFailure() int64 { + if t.MaxFails <= 0 { + return 0 + } + count := t.failCount.Add(1) + if count >= t.MaxFails { + timeout := t.FailTimeout + if timeout <= 0 { + timeout = 10 * time.Second + } + t.failedUntil.Store(time.Now().Add(timeout).UnixNano()) + } + return count +} + +// RecordSuccess 记录一次成功,重置软失败状态。 +// 仅重置 failCount 和 failedUntil,不修改 Healthy(健康检查器权威)。 +func (t *Target) RecordSuccess() { + if t.MaxFails <= 0 { + return + } + // CAS 重置:仅在当前 goroutine 持有 deadline 时才清零, + // 防止丢失并发的 RecordFailure 设置的新 deadline。 + for { + deadline := t.failedUntil.Load() + if deadline == 0 { + break + } + if t.failedUntil.CompareAndSwap(deadline, 0) { + break + } + } + t.failCount.Store(0) +} + +// IsBackup 返回目标是否为备份服务器。 +func (t *Target) IsBackup() bool { + return t.Backup +} + +// filterHealthy 从目标列表中筛选出所有可用的目标,返回新切片。 +// +// 选择逻辑: +// 1. 优先选择非备份且可用的目标(IsAvailable() == true && !IsBackup()) +// 2. 如果没有非备份目标可用,则选择可用的备份目标 +// +// 返回的切片容量与输入相同,避免多次内存分配。 +func filterHealthy(targets []*Target) []*Target { + available := make([]*Target, 0, len(targets)) + var backups []*Target + + for _, t := range targets { + if !t.IsAvailable() { + continue + } + if t.IsBackup() { + backups = append(backups, t) + } else { + available = append(available, t) + } + } + + if len(available) > 0 { + return available + } + return backups } // IncrementConnections 原子地增加目标的连接计数。 @@ -280,19 +377,14 @@ func DecrementConnections(t *Target) { atomic.AddInt64(&t.Connections, -1) } -// filterHealthyAndExclude 从目标列表中筛选出健康且不在排除列表中的目标,返回新切片。 +// filterHealthyAndExclude 从目标列表中筛选出可用且不在排除列表中的目标,返回新切片。 +// +// 选择逻辑与 filterHealthy 相同: +// 1. 优先选择非备份且可用的目标 +// 2. 如果没有非备份目标可用,则选择可用的备份目标 // -// 该函数用于 SelectExcluding 方法,同时处理健康过滤和排除逻辑。 // 排除判断基于目标的 URL 进行匹配。 -// -// 参数: -// - targets: 原始目标列表 -// - excluded: 需要排除的目标列表(nil 值会被安全忽略) -// -// 返回值: -// - 包含健康且非排除目标的新切片 func filterHealthyAndExclude(targets []*Target, excluded []*Target) []*Target { - // 构建排除集合(使用 URL 作为键) excludeSet := make(map[string]bool, len(excluded)) for _, t := range excluded { if t != nil { @@ -300,14 +392,24 @@ func filterHealthyAndExclude(targets []*Target, excluded []*Target) []*Target { } } - // 过滤健康且不在排除列表中的目标 available := make([]*Target, 0, len(targets)) + var backups []*Target + for _, t := range targets { - if t.Healthy.Load() && !excludeSet[t.URL] { + if !t.IsAvailable() || excludeSet[t.URL] { + continue + } + if t.IsBackup() { + backups = append(backups, t) + } else { available = append(available, t) } } - return available + + if len(available) > 0 { + return available + } + return backups } // SelectExcluding 根据轮询策略选择一个目标,排除指定的目标列表。 @@ -367,9 +469,8 @@ func (w *WeightedRoundRobin) SelectExcluding(targets []*Target, excluded []*Targ } // SelectExcluding 选择连接数最少的目标,排除指定的目标列表。 -// 只考虑健康且不在排除列表中的目标。 +// 优先选择非备份目标,仅当无可用非备份目标时选择备份目标。 func (l *LeastConnections) SelectExcluding(targets []*Target, excluded []*Target) *Target { - // 构建排除集合 excludeSet := make(map[string]bool, len(excluded)) for _, t := range excluded { if t != nil { @@ -378,23 +479,34 @@ func (l *LeastConnections) SelectExcluding(targets []*Target, excluded []*Target } var selected *Target + var selectedBackup *Target var minConns int64 = -1 + var minBackupConns int64 = -1 for _, t := range targets { - if !t.Healthy.Load() || excludeSet[t.URL] { + if !t.IsAvailable() || excludeSet[t.URL] { continue } - // 原子地读取连接计数 conns := atomic.LoadInt64(&t.Connections) - if selected == nil || conns < minConns { - selected = t - minConns = conns + if t.IsBackup() { + if selectedBackup == nil || conns < minBackupConns { + selectedBackup = t + minBackupConns = conns + } + } else { + if selected == nil || conns < minConns { + selected = t + minConns = conns + } } } - return selected + if selected != nil { + return selected + } + return selectedBackup } // SelectExcluding 基于客户端 IP 的哈希值选择目标,排除指定的目标列表。 @@ -485,14 +597,22 @@ func (t *Target) initHostname() { } // NewTargetFromConfig 从配置创建 Target(推荐入口)。 -// 自动初始化 hostname 和 Healthy 状态。 -func NewTargetFromConfig(url string, weight int) *Target { +// 自动初始化 hostname 和 Healthy 状态,设置上游参数。 +func NewTargetFromConfig(url string, weight int, maxConns int64, maxFails int64, failTimeout time.Duration, backup bool, down bool, proxyURI string) *Target { t := &Target{ - URL: url, - Weight: weight, + URL: url, + Weight: weight, + MaxConns: maxConns, + MaxFails: maxFails, + FailTimeout: failTimeout, + Backup: backup, + Down: down, + ProxyURI: proxyURI, } t.initHostname() - t.Healthy.Store(true) + if !down { + t.Healthy.Store(true) + } return t } diff --git a/internal/loadbalance/random.go b/internal/loadbalance/random.go new file mode 100644 index 0000000..02acbeb --- /dev/null +++ b/internal/loadbalance/random.go @@ -0,0 +1,72 @@ +package loadbalance + +import ( + "math/rand/v2" + "sync/atomic" +) + +// Random 实现随机负载均衡(Power of Two Choices 算法)。 +// +// 随机选择两个候选目标,从中选择连接数较少的那个。 +// 相比纯随机,Power of Two Choices 能更好地均衡负载, +// 同时保持 O(1) 的选择复杂度。 +// +// 当只有一个候选目标时直接返回;当 MaxConns 限制生效时 +// 自动跳过已满的目标。 +type Random struct{} + +// NewRandom 创建一个新的随机负载均衡器。 +func NewRandom() *Random { + return &Random{} +} + +// Select 使用 Power of Two Choices 算法选择目标。 +// 随机选择两个候选,返回连接数较少的那个。 +// 只考虑可用目标。如果没有可用目标则返回 nil。 +func (r *Random) Select(targets []*Target) *Target { + available := filterHealthy(targets) + if len(available) == 0 { + return nil + } + + if len(available) == 1 { + return available[0] + } + + // Power of Two Choices + i := rand.IntN(len(available)) + j := rand.IntN(len(available) - 1) + if j >= i { + j++ + } + + a, b := available[i], available[j] + if atomic.LoadInt64(&a.Connections) <= atomic.LoadInt64(&b.Connections) { + return a + } + return b +} + +// SelectExcluding 使用 Power of Two Choices 算法选择目标,排除指定的目标列表。 +func (r *Random) SelectExcluding(targets []*Target, excluded []*Target) *Target { + available := filterHealthyAndExclude(targets, excluded) + if len(available) == 0 { + return nil + } + + if len(available) == 1 { + return available[0] + } + + i := rand.IntN(len(available)) + j := rand.IntN(len(available) - 1) + if j >= i { + j++ + } + + a, b := available[i], available[j] + if atomic.LoadInt64(&a.Connections) <= atomic.LoadInt64(&b.Connections) { + return a + } + return b +}