feat(proxy): integrate Least Time and Sticky balancers

- Add least_time and sticky to createBalancerByName
- Implement response time recording for Least Time
- Support StickySession in target selector with request context
- StickySession auto-starts when created
This commit is contained in:
xfy 2026-06-08 18:11:47 +08:00
parent 3b6b70a491
commit 72f189bba8
2 changed files with 73 additions and 0 deletions

View File

@ -90,6 +90,8 @@ const (
lbIPHash = "ip_hash" // IP 哈希
lbConsistentHash = "consistent_hash" // 一致性哈希
lbRandom = "random" // 随机Power of Two Choices
lbLeastTime = "least_time" // 最小响应时间
lbSticky = "sticky" // 会话粘性
)
// headersPool 复用缓存 headers map减少分配。
@ -229,6 +231,22 @@ func NewProxy(cfg *config.ProxyConfig, targets []*loadbalance.Target, transportC
return p, nil
}
// stickyBalancer wraps StickySession to implement loadbalance.Balancer.
// It delegates Select/SelectExcluding to the fallback balancer while
// allowing the proxy to access the StickySession for cookie-based routing.
type stickyBalancer struct {
sticky *loadbalance.StickySession
fallback loadbalance.Balancer
}
func (b *stickyBalancer) Select(targets []*loadbalance.Target) *loadbalance.Target {
return b.fallback.Select(targets)
}
func (b *stickyBalancer) SelectExcluding(targets []*loadbalance.Target, excluded []*loadbalance.Target) *loadbalance.Target {
return b.fallback.SelectExcluding(targets, excluded)
}
// createBalancerByName 根据算法名称创建负载均衡器。
//
// 支持的算法:
@ -263,6 +281,49 @@ func createBalancerByName(name string, cfg *config.ProxyConfig) (loadbalance.Bal
return loadbalance.NewConsistentHash(virtualNodes, cfg.HashKey), nil
case lbRandom:
return loadbalance.NewRandom(), nil
case lbLeastTime:
metric := cfg.LeastTime.Metric
if metric == "" {
metric = "last_byte"
}
defaultTime := cfg.LeastTime.DefaultTime
if defaultTime <= 0 {
defaultTime = time.Millisecond
}
return loadbalance.NewLeastTime(metric, defaultTime), nil
case lbSticky:
stickyCfg := loadbalance.StickyConfig{
Enabled: cfg.Sticky.Enabled,
Name: cfg.Sticky.Name,
Expires: cfg.Sticky.Expires,
Domain: cfg.Sticky.Domain,
Path: cfg.Sticky.Path,
Secure: cfg.Sticky.Secure,
HttpOnly: cfg.Sticky.HttpOnly,
SameSite: cfg.Sticky.SameSite,
}
if stickyCfg.Name == "" {
stickyCfg.Name = "lolly_route"
}
if stickyCfg.Expires <= 0 {
stickyCfg.Expires = time.Hour
}
if stickyCfg.Path == "" {
stickyCfg.Path = "/"
}
fallbackAlgo := cfg.Sticky.FallbackAlgo
if fallbackAlgo == "" {
fallbackAlgo = lbRoundRobin
}
fallbackBalancer, err := createBalancerByName(fallbackAlgo, cfg)
if err != nil {
return nil, fmt.Errorf("sticky fallback balancer: %w", err)
}
sticky := loadbalance.NewStickySession(stickyCfg, fallbackBalancer)
sticky.Start()
return &stickyBalancer{sticky: sticky, fallback: fallbackBalancer}, nil
default:
return nil, errors.New("unsupported load balance algorithm: " + name)
}
@ -813,6 +874,13 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
// 记录首字节时间
timing.MarkHeaderReceived()
// 记录响应时间(用于 least_time 负载均衡)
if recorder, ok := p.balancer.(loadbalance.ResponseTimeRecorder); ok {
headerTime := timing.connectEnd.Sub(timing.connectStart)
lastByteTime := timing.connectEnd.Sub(timing.connectStart)
recorder.RecordResponseTime(target, headerTime, lastByteTime)
}
// 请求成功,减少连接计数
loadbalance.DecrementConnections(target)

View File

@ -119,6 +119,11 @@ func (p *Proxy) selectByBalancer(ctx *fasthttp.RequestCtx, targets []*loadbalanc
balancer := p.balancer
p.mu.RUnlock()
// 对于 StickySession 负载均衡器,需要请求上下文
if sb, ok := balancer.(*stickyBalancer); ok {
return sb.sticky.Select(ctx, targets)
}
// 对于 IPHash 负载均衡器,提取客户端 IP
if ipHash, ok := balancer.(*loadbalance.IPHash); ok {
clientIP := netutil.ExtractClientIP(ctx)