perf(proxy): pre-build cacheIgnoreSet, single-pass cache key, pool UpstreamTiming
Three optimizations in the proxy cache hot path: - Pre-build cacheIgnoreSet map once at Proxy creation instead of per-response. Eliminates map allocation + linear scan per cached response. - Compute cache key once per request via computeCacheKey() closure. Previously buildCacheKeyHash was called up to 5 times per request; now computed on first access and reused. - Pool UpstreamTiming objects with sync.Pool. Eliminates one heap allocation per proxied request.
This commit is contained in:
parent
02775de641
commit
0a53622351
@ -102,6 +102,12 @@ var headersPool = sync.Pool{
|
||||
},
|
||||
}
|
||||
|
||||
var upstreamTimingPool = sync.Pool{
|
||||
New: func() any {
|
||||
return NewUpstreamTiming()
|
||||
},
|
||||
}
|
||||
|
||||
// Proxy 表示反向代理实例,负责将 HTTP 请求转发到后端目标。
|
||||
//
|
||||
// 它为每个后端目标管理连接池(HostClient),并提供负载均衡、
|
||||
@ -124,6 +130,7 @@ type Proxy struct {
|
||||
targets []*loadbalance.Target // 后端目标列表
|
||||
mu sync.RWMutex // 保护并发访问的读写锁
|
||||
started atomic.Bool // 代理启动标志
|
||||
cacheIgnoreSet map[string]bool // 缓存时忽略的响应头集合
|
||||
}
|
||||
|
||||
// NewProxy 使用给定的配置和后台目标创建一个新的反向代理实例。
|
||||
@ -213,6 +220,12 @@ func NewProxy(cfg *config.ProxyConfig, targets []*loadbalance.Target, transportC
|
||||
}
|
||||
p.redirectRewriter = rewriter
|
||||
|
||||
cacheIgnoreSet := make(map[string]bool, len(cfg.Cache.CacheIgnoreHeaders))
|
||||
for _, h := range cfg.Cache.CacheIgnoreHeaders {
|
||||
cacheIgnoreSet[strings.ToLower(h)] = true
|
||||
}
|
||||
p.cacheIgnoreSet = cacheIgnoreSet
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
@ -394,6 +407,10 @@ func NewUpstreamTiming() *UpstreamTiming {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *UpstreamTiming) reset() {
|
||||
*t = UpstreamTiming{}
|
||||
}
|
||||
|
||||
// MarkConnectStart 标记连接开始时间点。
|
||||
func (t *UpstreamTiming) MarkConnectStart() {
|
||||
t.connectStart = time.Now()
|
||||
@ -501,19 +518,32 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
||||
// 上游变量捕获
|
||||
var upstreamAddr string
|
||||
var upstreamStatus int
|
||||
timing := NewUpstreamTiming()
|
||||
timing, ok := upstreamTimingPool.Get().(*UpstreamTiming)
|
||||
if !ok {
|
||||
timing = NewUpstreamTiming()
|
||||
}
|
||||
timing.reset()
|
||||
|
||||
var cacheHashKey uint64
|
||||
var cacheOrigKey string
|
||||
cacheKeyComputed := false
|
||||
computeCacheKey := func() (uint64, string) {
|
||||
if !cacheKeyComputed {
|
||||
cacheHashKey, cacheOrigKey = p.buildCacheKeyHash(ctx)
|
||||
cacheKeyComputed = true
|
||||
}
|
||||
return cacheHashKey, cacheOrigKey
|
||||
}
|
||||
|
||||
// 创建变量上下文用于设置上游变量
|
||||
vc := variable.NewContext(ctx)
|
||||
defer func() {
|
||||
// 确保记录了响应结束时间
|
||||
if timing.responseEnd.IsZero() {
|
||||
timing.MarkResponseEnd()
|
||||
}
|
||||
// 设置上游变量
|
||||
FinalizeUpstreamVars(vc, upstreamAddr, upstreamStatus, timing)
|
||||
// 释放变量上下文
|
||||
variable.ReleaseContext(vc)
|
||||
upstreamTimingPool.Put(timing)
|
||||
}()
|
||||
|
||||
// 故障转移配置
|
||||
@ -659,7 +689,7 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
||||
path := string(ctx.Request.URI().Path())
|
||||
rule := p.cache.MatchRule(path, method, 0)
|
||||
if rule != nil {
|
||||
hashKey, origKey := p.buildCacheKeyHash(ctx)
|
||||
hashKey, origKey := computeCacheKey()
|
||||
if entry, ok, stale := p.cache.Get(hashKey, origKey); ok {
|
||||
// 缓存命中
|
||||
loadbalance.DecrementConnections(target)
|
||||
@ -750,7 +780,7 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
||||
|
||||
// 尝试使用 stale 缓存
|
||||
if p.cache != nil {
|
||||
hashKey, origKey := p.buildCacheKeyHash(ctx)
|
||||
hashKey, origKey := computeCacheKey()
|
||||
isTimeout := errors.Is(err, fasthttp.ErrTimeout)
|
||||
if staleEntry, ok := p.cache.GetStale(hashKey, origKey, isTimeout); ok {
|
||||
logging.Info().Msgf("[PROXY] 使用 stale 缓存: key=%s, isTimeout=%v", origKey, isTimeout)
|
||||
@ -763,7 +793,8 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
||||
|
||||
// 释放缓存锁
|
||||
if p.cache != nil && attempt == 0 {
|
||||
hashKey := p.buildCacheKeyHashValue(ctx)
|
||||
computeCacheKey()
|
||||
hashKey := cacheHashKey
|
||||
p.cache.ReleaseLock(hashKey, err)
|
||||
}
|
||||
|
||||
@ -807,7 +838,8 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
||||
if shouldRetry {
|
||||
// 释放缓存锁
|
||||
if p.cache != nil && attempt == 0 {
|
||||
hashKey := p.buildCacheKeyHashValue(ctx)
|
||||
computeCacheKey()
|
||||
hashKey := cacheHashKey
|
||||
p.cache.ReleaseLock(hashKey, fmt.Errorf("HTTP %d", statusCode))
|
||||
}
|
||||
|
||||
@ -836,7 +868,7 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
||||
return
|
||||
}
|
||||
|
||||
hashKey, origKey := p.buildCacheKeyHash(ctx)
|
||||
hashKey, origKey := computeCacheKey()
|
||||
if statusCode >= 200 && statusCode < 300 {
|
||||
// 检查 MinUses 阈值
|
||||
if entry, ok, _ := p.cache.Get(hashKey, origKey); ok {
|
||||
@ -855,29 +887,13 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
||||
for k := range headers {
|
||||
delete(headers, k)
|
||||
}
|
||||
// 构建忽略头部查找表(大小写不敏感)
|
||||
ignoreSet := make(map[string]bool, len(p.config.Cache.CacheIgnoreHeaders))
|
||||
for _, h := range p.config.Cache.CacheIgnoreHeaders {
|
||||
ignoreSet[strings.ToLower(h)] = true
|
||||
}
|
||||
|
||||
var lastModified, etag string
|
||||
for key, value := range ctx.Response.Header.All() {
|
||||
// 使用 bytes.EqualFold 进行大小写不敏感比较,避免 strings.ToLower 分配
|
||||
shouldIgnore := false
|
||||
for _, h := range p.config.Cache.CacheIgnoreHeaders {
|
||||
if bytes.EqualFold(key, s2b(h)) {
|
||||
shouldIgnore = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if shouldIgnore {
|
||||
if p.cacheIgnoreSet[b2s(bytes.ToLower(key))] {
|
||||
continue
|
||||
}
|
||||
// 使用 b2s 零分配转换
|
||||
headers[b2s(key)] = b2s(value)
|
||||
|
||||
// 检查特定头部(使用 bytes.EqualFold)
|
||||
if bytes.EqualFold(key, []byte("last-modified")) {
|
||||
lastModified = b2s(value)
|
||||
} else if bytes.EqualFold(key, []byte("etag")) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user