diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 7c2bb99..42361b7 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -90,6 +90,8 @@ const ( lbIPHash = "ip_hash" // IP 哈希 lbConsistentHash = "consistent_hash" // 一致性哈希 lbRandom = "random" // 随机(Power of Two Choices) + lbLeastTime = "least_time" // 最小响应时间 + lbSticky = "sticky" // 会话粘性 ) // headersPool 复用缓存 headers map,减少分配。 @@ -229,6 +231,22 @@ func NewProxy(cfg *config.ProxyConfig, targets []*loadbalance.Target, transportC return p, nil } +// stickyBalancer wraps StickySession to implement loadbalance.Balancer. +// It delegates Select/SelectExcluding to the fallback balancer while +// allowing the proxy to access the StickySession for cookie-based routing. +type stickyBalancer struct { + sticky *loadbalance.StickySession + fallback loadbalance.Balancer +} + +func (b *stickyBalancer) Select(targets []*loadbalance.Target) *loadbalance.Target { + return b.fallback.Select(targets) +} + +func (b *stickyBalancer) SelectExcluding(targets []*loadbalance.Target, excluded []*loadbalance.Target) *loadbalance.Target { + return b.fallback.SelectExcluding(targets, excluded) +} + // createBalancerByName 根据算法名称创建负载均衡器。 // // 支持的算法: @@ -263,6 +281,49 @@ func createBalancerByName(name string, cfg *config.ProxyConfig) (loadbalance.Bal return loadbalance.NewConsistentHash(virtualNodes, cfg.HashKey), nil case lbRandom: return loadbalance.NewRandom(), nil + case lbLeastTime: + metric := cfg.LeastTime.Metric + if metric == "" { + metric = "last_byte" + } + defaultTime := cfg.LeastTime.DefaultTime + if defaultTime <= 0 { + defaultTime = time.Millisecond + } + return loadbalance.NewLeastTime(metric, defaultTime), nil + case lbSticky: + stickyCfg := loadbalance.StickyConfig{ + Enabled: cfg.Sticky.Enabled, + Name: cfg.Sticky.Name, + Expires: cfg.Sticky.Expires, + Domain: cfg.Sticky.Domain, + Path: cfg.Sticky.Path, + Secure: cfg.Sticky.Secure, + HttpOnly: cfg.Sticky.HttpOnly, + SameSite: cfg.Sticky.SameSite, + } + if stickyCfg.Name == "" { + stickyCfg.Name = "lolly_route" + } + if stickyCfg.Expires <= 0 { + stickyCfg.Expires = time.Hour + } + if stickyCfg.Path == "" { + stickyCfg.Path = "/" + } + + fallbackAlgo := cfg.Sticky.FallbackAlgo + if fallbackAlgo == "" { + fallbackAlgo = lbRoundRobin + } + fallbackBalancer, err := createBalancerByName(fallbackAlgo, cfg) + if err != nil { + return nil, fmt.Errorf("sticky fallback balancer: %w", err) + } + + sticky := loadbalance.NewStickySession(stickyCfg, fallbackBalancer) + sticky.Start() + return &stickyBalancer{sticky: sticky, fallback: fallbackBalancer}, nil default: return nil, errors.New("unsupported load balance algorithm: " + name) } @@ -813,6 +874,13 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) { // 记录首字节时间 timing.MarkHeaderReceived() + // 记录响应时间(用于 least_time 负载均衡) + if recorder, ok := p.balancer.(loadbalance.ResponseTimeRecorder); ok { + headerTime := timing.connectEnd.Sub(timing.connectStart) + lastByteTime := timing.connectEnd.Sub(timing.connectStart) + recorder.RecordResponseTime(target, headerTime, lastByteTime) + } + // 请求成功,减少连接计数 loadbalance.DecrementConnections(target) diff --git a/internal/proxy/target_selector.go b/internal/proxy/target_selector.go index 79a2f24..67626c7 100644 --- a/internal/proxy/target_selector.go +++ b/internal/proxy/target_selector.go @@ -119,6 +119,11 @@ func (p *Proxy) selectByBalancer(ctx *fasthttp.RequestCtx, targets []*loadbalanc balancer := p.balancer p.mu.RUnlock() + // 对于 StickySession 负载均衡器,需要请求上下文 + if sb, ok := balancer.(*stickyBalancer); ok { + return sb.sticky.Select(ctx, targets) + } + // 对于 IPHash 负载均衡器,提取客户端 IP if ipHash, ok := balancer.(*loadbalance.IPHash); ok { clientIP := netutil.ExtractClientIP(ctx)