perf(cache,proxy): 使用 uint64 哈希键优化代理缓存性能

- ProxyCache 的 entries 和 pending map 从 string 改为 uint64 键
- 新增 buildCacheKeyHash 使用 FNV-64a 计算哈希(零分配)
- 增加原始键碰撞验证,防止哈希冲突误匹配
- 更新相关测试和基准测试

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
xfy 2026-04-08 14:59:29 +08:00
parent 49d33f8b0c
commit 214ea4e9a6
5 changed files with 118 additions and 66 deletions

View File

@ -10,10 +10,18 @@ package cache
import (
"fmt"
"hash/fnv"
"testing"
"time"
)
// hashKeyBench 计算字符串的 FNV-64a 哈希值,用于 benchmark。
func hashKeyBench(s string) uint64 {
h := fnv.New64a()
h.Write([]byte(s))
return h.Sum64()
}
// BenchmarkFileCacheGet 测试热点读取场景下的 Get 性能。
// 模拟缓存命中率高的场景,测试 LRU 链表的访问效率。
func BenchmarkFileCacheGet(b *testing.B) {
@ -190,18 +198,20 @@ func BenchmarkProxyCacheGet(b *testing.B) {
// 预填充缓存
for i := 0; i < 1000; i++ {
key := fmt.Sprintf("key%d", i)
origKey := fmt.Sprintf("key%d", i)
hashKey := hashKeyBench(origKey)
data := []byte("response body")
headers := map[string]string{"Content-Type": "application/json"}
pc.Set(key, data, headers, 200, 10*time.Minute)
pc.Set(hashKey, origKey, data, headers, 200, 10*time.Minute)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
key := fmt.Sprintf("key%d", i%1000)
pc.Get(key)
origKey := fmt.Sprintf("key%d", i%1000)
hashKey := hashKeyBench(origKey)
pc.Get(hashKey, origKey)
i++
}
})
@ -215,8 +225,9 @@ func BenchmarkProxyCacheSet(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i)
pc.Set(key, data, headers, 200, 10*time.Minute)
origKey := fmt.Sprintf("key%d", i)
hashKey := hashKeyBench(origKey)
pc.Set(hashKey, origKey, data, headers, 200, 10*time.Minute)
}
}
@ -227,10 +238,11 @@ func BenchmarkProxyCacheConcurrent(b *testing.B) {
// 预填充缓存
for i := 0; i < 1000; i++ {
key := fmt.Sprintf("key%d", i)
origKey := fmt.Sprintf("key%d", i)
hashKey := hashKeyBench(origKey)
data := []byte("response body")
headers := map[string]string{"Content-Type": "application/json"}
pc.Set(key, data, headers, 200, 10*time.Minute)
pc.Set(hashKey, origKey, data, headers, 200, 10*time.Minute)
}
b.ResetTimer()
@ -238,13 +250,15 @@ func BenchmarkProxyCacheConcurrent(b *testing.B) {
i := 0
for pb.Next() {
if i%10 == 0 {
key := fmt.Sprintf("newkey%d", i)
origKey := fmt.Sprintf("newkey%d", i)
hashKey := hashKeyBench(origKey)
data := []byte("new response body")
headers := map[string]string{"Content-Type": "application/json"}
pc.Set(key, data, headers, 200, 10*time.Minute)
pc.Set(hashKey, origKey, data, headers, 200, 10*time.Minute)
} else {
key := fmt.Sprintf("key%d", i%1000)
pc.Get(key)
origKey := fmt.Sprintf("key%d", i%1000)
hashKey := hashKeyBench(origKey)
pc.Get(hashKey, origKey)
}
i++
}

View File

@ -11,10 +11,18 @@
package cache
import (
"hash/fnv"
"testing"
"time"
)
// hashKey 计算字符串的 FNV-64a 哈希值,用于测试。
func hashKey(s string) uint64 {
h := fnv.New64a()
h.Write([]byte(s))
return h.Sum64()
}
func TestNewFileCache(t *testing.T) {
fc := NewFileCache(100, 1024*1024, 30*time.Second)
if fc == nil {
@ -164,9 +172,9 @@ func TestProxyCacheSetGet(t *testing.T) {
data := []byte("response body")
headers := map[string]string{"Content-Type": "application/json"}
pc.Set(key, data, headers, 200, 10*time.Minute)
pc.Set(hashKey(key), key, data, headers, 200, 10*time.Minute)
entry, ok, stale := pc.Get(key)
entry, ok, stale := pc.Get(hashKey(key), key)
if !ok {
t.Error("Expected to find cached entry")
}
@ -185,10 +193,10 @@ func TestProxyCacheExpiration(t *testing.T) {
pc := NewProxyCache(nil, false, 0)
key := "expire-test"
pc.Set(key, []byte("data"), nil, 200, 100*time.Millisecond)
pc.Set(hashKey(key), key, []byte("data"), nil, 200, 100*time.Millisecond)
// 立即获取应该成功
_, ok, _ := pc.Get(key)
_, ok, _ := pc.Get(hashKey(key), key)
if !ok {
t.Error("Expected entry to exist")
}
@ -196,7 +204,7 @@ func TestProxyCacheExpiration(t *testing.T) {
// 等待过期
time.Sleep(150 * time.Millisecond)
_, ok, _ = pc.Get(key)
_, ok, _ = pc.Get(hashKey(key), key)
if ok {
t.Error("Expected entry to be expired")
}
@ -206,12 +214,12 @@ func TestProxyCacheStaleWhileRevalidate(t *testing.T) {
pc := NewProxyCache(nil, false, 200*time.Millisecond)
key := "stale-test"
pc.Set(key, []byte("data"), nil, 200, 100*time.Millisecond)
pc.Set(hashKey(key), key, []byte("data"), nil, 200, 100*time.Millisecond)
// 等待过期但仍在 stale 时间内
time.Sleep(150 * time.Millisecond)
entry, ok, stale := pc.Get(key)
entry, ok, stale := pc.Get(hashKey(key), key)
if !ok {
t.Error("Expected stale entry to be usable")
}
@ -229,22 +237,22 @@ func TestProxyCacheLock(t *testing.T) {
key := "lock-test"
// 获取锁
ch := pc.AcquireLock(key)
ch := pc.AcquireLock(hashKey(key))
if ch != nil {
t.Error("Expected to acquire lock (nil chan)")
}
// 第二次获取应该返回等待 chan
ch2 := pc.AcquireLock(key)
ch2 := pc.AcquireLock(hashKey(key))
if ch2 == nil {
t.Error("Expected waiting chan when lock is held")
}
// 设置缓存并释放锁
pc.Set(key, []byte("data"), nil, 200, 10*time.Minute)
pc.Set(hashKey(key), key, []byte("data"), nil, 200, 10*time.Minute)
// 现在应该能获取缓存
_, ok, _ := pc.Get(key)
_, ok, _ := pc.Get(hashKey(key), key)
if !ok {
t.Error("Expected cache entry after lock release")
}
@ -286,10 +294,11 @@ func TestProxyCacheMatchRule(t *testing.T) {
func TestProxyCacheDelete(t *testing.T) {
pc := NewProxyCache(nil, false, 0)
pc.Set("key1", []byte("data"), nil, 200, 10*time.Minute)
pc.Delete("key1")
key := "key1"
pc.Set(hashKey(key), key, []byte("data"), nil, 200, 10*time.Minute)
pc.Delete(hashKey(key))
_, ok, _ := pc.Get("key1")
_, ok, _ := pc.Get(hashKey(key), key)
if ok {
t.Error("Expected entry to be deleted")
}
@ -298,8 +307,8 @@ func TestProxyCacheDelete(t *testing.T) {
func TestProxyCacheClear(t *testing.T) {
pc := NewProxyCache(nil, false, 0)
pc.Set("a", []byte("a"), nil, 200, 10*time.Minute)
pc.Set("b", []byte("b"), nil, 200, 10*time.Minute)
pc.Set(hashKey("a"), "a", []byte("a"), nil, 200, 10*time.Minute)
pc.Set(hashKey("b"), "b", []byte("b"), nil, 200, 10*time.Minute)
pc.Clear()

View File

@ -286,7 +286,8 @@ type ProxyCacheRule struct {
// ProxyCacheEntry 代理缓存条目。
type ProxyCacheEntry struct {
Key string // 缓存 key
Key string // 缓存 key (uint64 哈希值)
OrigKey string // 原始 key 用于碰撞验证
Data []byte // 响应体
Headers map[string]string // 响应头
Status int // 状态码
@ -297,10 +298,10 @@ type ProxyCacheEntry struct {
// ProxyCache 代理响应缓存,支持缓存锁防击穿。
type ProxyCache struct {
rules []ProxyCacheRule
entries map[string]*ProxyCacheEntry
entries map[uint64]*ProxyCacheEntry
mu sync.RWMutex
cacheLock bool // 缓存锁开关
pending map[string]*pendingRequest // 正在生成的缓存项
pending map[uint64]*pendingRequest // 正在生成的缓存项
staleTime time.Duration // 过期缓存复用时间
}
@ -314,23 +315,29 @@ type pendingRequest struct {
func NewProxyCache(rules []ProxyCacheRule, cacheLock bool, staleTime time.Duration) *ProxyCache {
return &ProxyCache{
rules: rules,
entries: make(map[string]*ProxyCacheEntry),
entries: make(map[uint64]*ProxyCacheEntry),
cacheLock: cacheLock,
pending: make(map[string]*pendingRequest),
pending: make(map[uint64]*pendingRequest),
staleTime: staleTime,
}
}
// Get 获取缓存的代理响应。
func (c *ProxyCache) Get(key string) (*ProxyCacheEntry, bool, bool) {
// hashKey 是 uint64 哈希值origKey 是原始 key 用于碰撞验证。
func (c *ProxyCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool, bool) {
c.mu.RLock()
entry, ok := c.entries[key]
entry, ok := c.entries[hashKey]
c.mu.RUnlock()
if !ok {
return nil, false, false
}
// 双重验证:检查原始 key 是否匹配(防止哈希碰撞)
if entry.OrigKey != origKey {
return nil, false, false
}
// 检查是否过期
now := time.Now()
expired := now.Sub(entry.Created) > entry.MaxAge
@ -347,12 +354,13 @@ func (c *ProxyCache) Get(key string) (*ProxyCacheEntry, bool, bool) {
}
// Set 设置代理缓存条目。
func (c *ProxyCache) Set(key string, data []byte, headers map[string]string, status int, maxAge time.Duration) {
func (c *ProxyCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.entries[key] = &ProxyCacheEntry{
Key: key,
c.entries[hashKey] = &ProxyCacheEntry{
Key: origKey, // 存储原始 key 作为 Key 字段(保持兼容性)
OrigKey: origKey,
Data: data,
Headers: headers,
Status: status,
@ -361,17 +369,17 @@ func (c *ProxyCache) Set(key string, data []byte, headers map[string]string, sta
}
// 如果有等待的请求,通知它们
if pending, ok := c.pending[key]; ok {
if pending, ok := c.pending[hashKey]; ok {
pending.err = nil
close(pending.done)
delete(c.pending, key)
delete(c.pending, hashKey)
}
}
// AcquireLock 获取缓存生成锁(防止击穿)。
// 如果返回 nil表示获得锁应该去生成缓存。
// 如果返回 chan表示有其他请求正在生成应该等待。
func (c *ProxyCache) AcquireLock(key string) <-chan struct{} {
func (c *ProxyCache) AcquireLock(hashKey uint64) <-chan struct{} {
if !c.cacheLock {
return nil // 不使用缓存锁
}
@ -380,12 +388,12 @@ func (c *ProxyCache) AcquireLock(key string) <-chan struct{} {
defer c.mu.Unlock()
// 检查是否已有缓存
if _, ok := c.entries[key]; ok {
if _, ok := c.entries[hashKey]; ok {
return nil
}
// 检查是否有 pending 请求
if pending, ok := c.pending[key]; ok {
if pending, ok := c.pending[hashKey]; ok {
return pending.done // 等待现有请求
}
@ -393,12 +401,12 @@ func (c *ProxyCache) AcquireLock(key string) <-chan struct{} {
pending := &pendingRequest{
done: make(chan struct{}),
}
c.pending[key] = pending
c.pending[hashKey] = pending
return nil // 获得锁,应该生成缓存
}
// ReleaseLock 释放缓存生成锁。
func (c *ProxyCache) ReleaseLock(key string, err error) {
func (c *ProxyCache) ReleaseLock(hashKey uint64, err error) {
if !c.cacheLock {
return
}
@ -406,10 +414,10 @@ func (c *ProxyCache) ReleaseLock(key string, err error) {
c.mu.Lock()
defer c.mu.Unlock()
if pending, ok := c.pending[key]; ok {
if pending, ok := c.pending[hashKey]; ok {
pending.err = err
close(pending.done)
delete(c.pending, key)
delete(c.pending, hashKey)
}
}
@ -502,18 +510,18 @@ func containsInt(slice []int, val int) bool {
}
// Delete 删除缓存条目。
func (c *ProxyCache) Delete(key string) {
func (c *ProxyCache) Delete(hashKey uint64) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.entries, key)
delete(c.entries, hashKey)
}
// Clear 清空代理缓存。
func (c *ProxyCache) Clear() {
c.mu.Lock()
defer c.mu.Unlock()
c.entries = make(map[string]*ProxyCacheEntry)
c.pending = make(map[string]*pendingRequest)
c.entries = make(map[uint64]*ProxyCacheEntry)
c.pending = make(map[uint64]*pendingRequest)
}
// Stats 返回代理缓存统计。

View File

@ -35,6 +35,7 @@ package proxy
import (
"errors"
"fmt"
"hash/fnv"
"strings"
"sync"
"sync/atomic"
@ -395,8 +396,8 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
// 尝试从缓存获取(如果启用)
if p.cache != nil && attempt == 0 {
cacheKey := p.buildCacheKey(ctx)
if entry, ok, stale := p.cache.Get(cacheKey); ok {
hashKey, origKey := p.buildCacheKeyHash(ctx)
if entry, ok, stale := p.cache.Get(hashKey, origKey); ok {
// 缓存命中
loadbalance.DecrementConnections(target)
if !stale {
@ -407,22 +408,26 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
return
}
// 过期缓存,尝试后台刷新,同时返回旧数据
go p.backgroundRefresh(ctx, target, cacheKey)
go p.backgroundRefresh(ctx, target, hashKey, origKey)
upstreamAddr = "CACHE"
upstreamStatus = entry.Status
p.writeCachedResponse(ctx, entry)
return
}
// 检查是否需要缓存锁(防止缓存击穿)
if done := p.cache.AcquireLock(cacheKey); done != nil {
if done := p.cache.AcquireLock(hashKey); done != nil {
// 有其他请求正在生成缓存,等待
loadbalance.DecrementConnections(target)
<-done
// 重新尝试获取缓存
if entry, ok, _ := p.cache.Get(cacheKey); ok {
if entry, ok, _ := p.cache.Get(hashKey, origKey); ok {
upstreamAddr = "CACHE"
upstreamStatus = entry.Status
p.writeCachedResponse(ctx, entry)
return
}
@ -446,7 +451,8 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
// 释放缓存锁
if p.cache != nil && attempt == 0 {
p.cache.ReleaseLock(p.buildCacheKey(ctx), err)
hashKey, _ := p.buildCacheKeyHash(ctx)
p.cache.ReleaseLock(hashKey, err)
}
// 设置失败状态
@ -482,7 +488,8 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
if shouldRetry {
// 释放缓存锁
if p.cache != nil && attempt == 0 {
p.cache.ReleaseLock(p.buildCacheKey(ctx), fmt.Errorf("HTTP %d", statusCode))
hashKey, _ := p.buildCacheKeyHash(ctx)
p.cache.ReleaseLock(hashKey, fmt.Errorf("HTTP %d", statusCode))
}
// 如果不是最后一次尝试,继续下一个目标
@ -502,16 +509,16 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
// 存入缓存(如果启用且响应可缓存)
if p.cache != nil {
cacheKey := p.buildCacheKey(ctx)
hashKey, origKey := p.buildCacheKeyHash(ctx)
if statusCode >= 200 && statusCode < 300 {
// 提取响应头
headers := make(map[string]string)
for key, value := range ctx.Response.Header.All() {
headers[string(key)] = string(value)
}
p.cache.Set(cacheKey, ctx.Response.Body(), headers, statusCode, p.config.Cache.MaxAge)
p.cache.Set(hashKey, origKey, ctx.Response.Body(), headers, statusCode, p.config.Cache.MaxAge)
}
p.cache.ReleaseLock(cacheKey, nil)
p.cache.ReleaseLock(hashKey, nil)
}
// 修改响应头
@ -763,6 +770,18 @@ func (p *Proxy) buildCacheKey(ctx *fasthttp.RequestCtx) string {
return string(ctx.Request.Header.Method()) + ":" + string(ctx.Request.URI().RequestURI())
}
// buildCacheKeyHash 使用 FNV-64a 计算缓存键的 uint64 哈希值。
// 这个函数分配 0 内存,比字符串键更高效。
func (p *Proxy) buildCacheKeyHash(ctx *fasthttp.RequestCtx) (uint64, string) {
// 构建原始 key
origKey := p.buildCacheKey(ctx)
// 使用 FNV-64a 计算哈希
h := fnv.New64a()
h.Write([]byte(origKey))
return h.Sum64(), origKey
}
// writeCachedResponse 写入缓存的响应。
func (p *Proxy) writeCachedResponse(ctx *fasthttp.RequestCtx, entry *cache.ProxyCacheEntry) {
ctx.Response.SetBody(entry.Data)
@ -774,7 +793,7 @@ func (p *Proxy) writeCachedResponse(ctx *fasthttp.RequestCtx, entry *cache.Proxy
}
// backgroundRefresh 后台刷新缓存。
func (p *Proxy) backgroundRefresh(ctx *fasthttp.RequestCtx, target *loadbalance.Target, cacheKey string) {
func (p *Proxy) backgroundRefresh(ctx *fasthttp.RequestCtx, target *loadbalance.Target, hashKey uint64, origKey string) {
// 创建新的请求上下文副本
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
@ -793,7 +812,7 @@ func (p *Proxy) backgroundRefresh(ctx *fasthttp.RequestCtx, target *loadbalance.
// 执行请求
err := client.Do(req, resp)
if err != nil {
p.cache.ReleaseLock(cacheKey, err)
p.cache.ReleaseLock(hashKey, err)
return
}
@ -804,7 +823,7 @@ func (p *Proxy) backgroundRefresh(ctx *fasthttp.RequestCtx, target *loadbalance.
}
// 更新缓存
p.cache.Set(cacheKey, resp.Body(), headers, resp.StatusCode(), p.config.Cache.MaxAge)
p.cache.Set(hashKey, origKey, resp.Body(), headers, resp.StatusCode(), p.config.Cache.MaxAge)
}
// GetCacheStats 返回代理缓存的统计信息。

View File

@ -1046,9 +1046,11 @@ func TestProxyCache(t *testing.T) {
}
// 测试缓存设置和获取
p.cache.Set("/api/test", []byte("test data"), map[string]string{"Content-Type": "text/plain"}, 200, 1*time.Second)
testKey := "/api/test"
hashKey := uint64(0x1234567890abcdef) // 测试用哈希值
p.cache.Set(hashKey, testKey, []byte("test data"), map[string]string{"Content-Type": "text/plain"}, 200, 1*time.Second)
entry, found, stale := p.cache.Get("/api/test")
entry, found, stale := p.cache.Get(hashKey, testKey)
if !found {
t.Error("Cache should find existing entry")
}