feat(cache): 添加 stale_if_error 和 stale_if_timeout 缓存接口和实现
在 CacheBackend 接口新增 GetStale 方法,支持上游错误时按错误类型 (超时 vs 其他错误)检查对应的 stale 窗口返回过期缓存。 ProxyCache、DiskCache、TieredCache 均实现该方法。 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
c6df735caa
commit
8deda73b24
15
internal/cache/backend.go
vendored
15
internal/cache/backend.go
vendored
@ -34,6 +34,21 @@ type CacheBackend interface {
|
|||||||
// - bool: 是否过期(stale),true 表示可使用过期缓存
|
// - bool: 是否过期(stale),true 表示可使用过期缓存
|
||||||
Get(hashKey uint64, origKey string) (entry *ProxyCacheEntry, exists bool, stale bool)
|
Get(hashKey uint64, origKey string) (entry *ProxyCacheEntry, exists bool, stale bool)
|
||||||
|
|
||||||
|
// GetStale 在上游错误时获取可用的过期缓存。
|
||||||
|
//
|
||||||
|
// 与 Get 不同,GetStale 只在错误发生时使用,根据错误类型检查对应的 stale 窗口。
|
||||||
|
// 超时错误检查 staleIfTimeout 窗口,其他错误检查 staleIfError 窗口。
|
||||||
|
//
|
||||||
|
// 参数:
|
||||||
|
// - hashKey: 缓存键的哈希值
|
||||||
|
// - origKey: 原始缓存键(用于双重验证)
|
||||||
|
// - isTimeout: 是否为超时错误
|
||||||
|
//
|
||||||
|
// 返回值:
|
||||||
|
// - *ProxyCacheEntry: 缓存条目
|
||||||
|
// - bool: 是否存在可用的过期缓存
|
||||||
|
GetStale(hashKey uint64, origKey string, isTimeout bool) (entry *ProxyCacheEntry, exists bool)
|
||||||
|
|
||||||
// Set 设置缓存条目。
|
// Set 设置缓存条目。
|
||||||
//
|
//
|
||||||
// 无返回值,与现有 ProxyCache.Set 签名一致。
|
// 无返回值,与现有 ProxyCache.Set 签名一致。
|
||||||
|
|||||||
126
internal/cache/disk_cache.go
vendored
126
internal/cache/disk_cache.go
vendored
@ -37,18 +37,26 @@ type DiskCacheConfig struct {
|
|||||||
|
|
||||||
// Inactive 未访问淘汰时间
|
// Inactive 未访问淘汰时间
|
||||||
Inactive time.Duration
|
Inactive time.Duration
|
||||||
|
|
||||||
|
// StaleIfError 错误时使用过期缓存的窗口
|
||||||
|
StaleIfError time.Duration
|
||||||
|
|
||||||
|
// StaleIfTimeout 超时时使用过期缓存的窗口
|
||||||
|
StaleIfTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// DiskCache 磁盘缓存实现。
|
// DiskCache 磁盘缓存实现。
|
||||||
type DiskCache struct {
|
type DiskCache struct {
|
||||||
basePath string
|
basePath string
|
||||||
levels []int
|
levels []int
|
||||||
maxSize int64
|
maxSize int64
|
||||||
inactive time.Duration
|
inactive time.Duration
|
||||||
currentSize atomic.Int64
|
staleIfError time.Duration
|
||||||
entries map[uint64]*DiskCacheMeta
|
staleIfTimeout time.Duration
|
||||||
mu sync.RWMutex
|
currentSize atomic.Int64
|
||||||
stopCh chan struct{}
|
entries map[uint64]*DiskCacheMeta
|
||||||
|
mu sync.RWMutex
|
||||||
|
stopCh chan struct{}
|
||||||
|
|
||||||
// 懒加载相关
|
// 懒加载相关
|
||||||
loaded atomic.Bool
|
loaded atomic.Bool
|
||||||
@ -90,13 +98,15 @@ func NewDiskCache(cfg *DiskCacheConfig) (*DiskCache, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
dc := &DiskCache{
|
dc := &DiskCache{
|
||||||
basePath: cfg.Path,
|
basePath: cfg.Path,
|
||||||
levels: parseLevels(cfg.Levels),
|
levels: parseLevels(cfg.Levels),
|
||||||
maxSize: cfg.MaxSize,
|
maxSize: cfg.MaxSize,
|
||||||
inactive: cfg.Inactive,
|
inactive: cfg.Inactive,
|
||||||
entries: make(map[uint64]*DiskCacheMeta),
|
staleIfError: cfg.StaleIfError,
|
||||||
loadCh: make(chan struct{}),
|
staleIfTimeout: cfg.StaleIfTimeout,
|
||||||
stopCh: make(chan struct{}),
|
entries: make(map[uint64]*DiskCacheMeta),
|
||||||
|
loadCh: make(chan struct{}),
|
||||||
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
// 启动后台加载,不阻塞服务启动
|
// 启动后台加载,不阻塞服务启动
|
||||||
@ -241,6 +251,92 @@ func (dc *DiskCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool
|
|||||||
return entry, true, stale
|
return entry, true, stale
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetStale 在上游错误时获取可用的过期缓存。
|
||||||
|
//
|
||||||
|
// 与 Get 不同,GetStale 只在错误发生时使用,根据错误类型检查对应的 stale 窗口。
|
||||||
|
func (dc *DiskCache) GetStale(hashKey uint64, origKey string, isTimeout bool) (*ProxyCacheEntry, bool) {
|
||||||
|
// 等待懒加载完成
|
||||||
|
if !dc.loaded.Load() {
|
||||||
|
select {
|
||||||
|
case <-dc.loadCh:
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
dc.mu.RLock()
|
||||||
|
meta, ok := dc.entries[hashKey]
|
||||||
|
dc.mu.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 双重验证:检查原始 key 是否匹配
|
||||||
|
if meta.OrigKey != origKey {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 读取数据文件
|
||||||
|
dataPath := dc.filePathFromHash(hashKey, "data")
|
||||||
|
data, err := os.ReadFile(dataPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 验证 CRC32
|
||||||
|
crc := crc32.ChecksumIEEE(data)
|
||||||
|
if crc != meta.CRC32 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
expiresAt := meta.Created.Add(meta.MaxAge)
|
||||||
|
|
||||||
|
// 未过期,直接返回
|
||||||
|
if !now.After(expiresAt) {
|
||||||
|
entry := &ProxyCacheEntry{
|
||||||
|
Key: meta.OrigKey,
|
||||||
|
OrigKey: meta.OrigKey,
|
||||||
|
Data: data,
|
||||||
|
Headers: meta.Headers,
|
||||||
|
Status: meta.Status,
|
||||||
|
Created: meta.Created,
|
||||||
|
MaxAge: meta.MaxAge,
|
||||||
|
}
|
||||||
|
return entry, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// 已过期,检查 stale 窗口
|
||||||
|
var staleWindow time.Duration
|
||||||
|
if isTimeout {
|
||||||
|
staleWindow = dc.staleIfTimeout
|
||||||
|
} else {
|
||||||
|
staleWindow = dc.staleIfError
|
||||||
|
}
|
||||||
|
|
||||||
|
if staleWindow <= 0 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查是否在 stale 窗口内
|
||||||
|
if now.Sub(expiresAt) > staleWindow {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
entry := &ProxyCacheEntry{
|
||||||
|
Key: meta.OrigKey,
|
||||||
|
OrigKey: meta.OrigKey,
|
||||||
|
Data: data,
|
||||||
|
Headers: meta.Headers,
|
||||||
|
Status: meta.Status,
|
||||||
|
Created: meta.Created,
|
||||||
|
MaxAge: meta.MaxAge,
|
||||||
|
}
|
||||||
|
|
||||||
|
return entry, true
|
||||||
|
}
|
||||||
|
|
||||||
// Set 设置缓存条目(实现 CacheBackend 接口)。
|
// Set 设置缓存条目(实现 CacheBackend 接口)。
|
||||||
func (dc *DiskCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) {
|
func (dc *DiskCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) {
|
||||||
// 计算文件路径
|
// 计算文件路径
|
||||||
|
|||||||
87
internal/cache/file_cache.go
vendored
87
internal/cache/file_cache.go
vendored
@ -310,12 +310,14 @@ type ProxyCacheEntry struct {
|
|||||||
|
|
||||||
// ProxyCache 代理响应缓存,支持缓存锁防击穿。
|
// ProxyCache 代理响应缓存,支持缓存锁防击穿。
|
||||||
type ProxyCache struct {
|
type ProxyCache struct {
|
||||||
entries map[uint64]*ProxyCacheEntry
|
entries map[uint64]*ProxyCacheEntry
|
||||||
pending map[uint64]*pendingRequest
|
pending map[uint64]*pendingRequest
|
||||||
rules []ProxyCacheRule
|
rules []ProxyCacheRule
|
||||||
staleTime time.Duration
|
staleTime time.Duration // StaleWhileRevalidate 窗口
|
||||||
mu sync.RWMutex
|
staleIfError time.Duration // 错误时使用过期缓存的窗口
|
||||||
cacheLock bool
|
staleIfTimeout time.Duration // 超时时使用过期缓存的窗口
|
||||||
|
mu sync.RWMutex
|
||||||
|
cacheLock bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// pendingRequest 等待中的缓存请求。
|
// pendingRequest 等待中的缓存请求。
|
||||||
@ -332,17 +334,21 @@ type pendingRequest struct {
|
|||||||
// 参数:
|
// 参数:
|
||||||
// - rules: 代理缓存规则列表,定义可缓存的路径、方法、状态码等
|
// - rules: 代理缓存规则列表,定义可缓存的路径、方法、状态码等
|
||||||
// - cacheLock: 是否启用缓存生成锁,防止多个请求同时生成缓存
|
// - cacheLock: 是否启用缓存生成锁,防止多个请求同时生成缓存
|
||||||
// - staleTime: 过期缓存可复用的额外时间,设为 0 表示不启用
|
// - staleTime: StaleWhileRevalidate 窗口,过期后后台刷新期间可复用
|
||||||
|
// - staleIfError: 错误时使用过期缓存的窗口(上游 5xx/连接失败)
|
||||||
|
// - staleIfTimeout: 超时时使用过期缓存的窗口
|
||||||
//
|
//
|
||||||
// 返回值:
|
// 返回值:
|
||||||
// - *ProxyCache: 初始化的代理缓存实例
|
// - *ProxyCache: 初始化的代理缓存实例
|
||||||
func NewProxyCache(rules []ProxyCacheRule, cacheLock bool, staleTime time.Duration) *ProxyCache {
|
func NewProxyCache(rules []ProxyCacheRule, cacheLock bool, staleTime, staleIfError, staleIfTimeout time.Duration) *ProxyCache {
|
||||||
return &ProxyCache{
|
return &ProxyCache{
|
||||||
rules: rules,
|
rules: rules,
|
||||||
entries: make(map[uint64]*ProxyCacheEntry),
|
entries: make(map[uint64]*ProxyCacheEntry),
|
||||||
cacheLock: cacheLock,
|
cacheLock: cacheLock,
|
||||||
pending: make(map[uint64]*pendingRequest),
|
pending: make(map[uint64]*pendingRequest),
|
||||||
staleTime: staleTime,
|
staleTime: staleTime,
|
||||||
|
staleIfError: staleIfError,
|
||||||
|
staleIfTimeout: staleIfTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -380,6 +386,61 @@ func (c *ProxyCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool
|
|||||||
return entry, true, false
|
return entry, true, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetStale 在上游错误时获取可用的过期缓存。
|
||||||
|
//
|
||||||
|
// 与 Get 不同,GetStale 只在错误发生时使用,根据错误类型检查对应的 stale 窗口。
|
||||||
|
// 超时错误检查 staleIfTimeout,其他错误检查 staleIfError。
|
||||||
|
//
|
||||||
|
// 参数:
|
||||||
|
// - hashKey: 缓存键的哈希值
|
||||||
|
// - origKey: 原始缓存键(用于双重验证)
|
||||||
|
// - isTimeout: 是否为超时错误
|
||||||
|
//
|
||||||
|
// 返回值:
|
||||||
|
// - *ProxyCacheEntry: 缓存条目
|
||||||
|
// - bool: 是否存在可用的过期缓存
|
||||||
|
func (c *ProxyCache) GetStale(hashKey uint64, origKey string, isTimeout bool) (*ProxyCacheEntry, bool) {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
|
||||||
|
entry, ok := c.entries[hashKey]
|
||||||
|
if !ok {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 双重验证:检查原始 key 是否匹配
|
||||||
|
if entry.OrigKey != origKey {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
expiresAt := entry.Created.Add(entry.MaxAge)
|
||||||
|
|
||||||
|
// 未过期,直接返回
|
||||||
|
if !now.After(expiresAt) {
|
||||||
|
return entry, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// 已过期,检查 stale 窗口
|
||||||
|
var staleWindow time.Duration
|
||||||
|
if isTimeout {
|
||||||
|
staleWindow = c.staleIfTimeout
|
||||||
|
} else {
|
||||||
|
staleWindow = c.staleIfError
|
||||||
|
}
|
||||||
|
|
||||||
|
if staleWindow <= 0 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查是否在 stale 窗口内
|
||||||
|
if now.Sub(expiresAt) > staleWindow {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
return entry, true
|
||||||
|
}
|
||||||
|
|
||||||
// Set 设置代理缓存条目。
|
// Set 设置代理缓存条目。
|
||||||
func (c *ProxyCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) {
|
func (c *ProxyCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
|||||||
26
internal/cache/tiered_cache.go
vendored
26
internal/cache/tiered_cache.go
vendored
@ -28,6 +28,10 @@ type TieredCacheConfig struct {
|
|||||||
// L2 配置
|
// L2 配置
|
||||||
L2Config *DiskCacheConfig
|
L2Config *DiskCacheConfig
|
||||||
|
|
||||||
|
// Stale 配置
|
||||||
|
StaleIfError time.Duration // 错误时使用过期缓存的窗口
|
||||||
|
StaleIfTimeout time.Duration // 超时时使用过期缓存的窗口
|
||||||
|
|
||||||
// 热点提升配置
|
// 热点提升配置
|
||||||
PromoteThreshold int // 访问次数阈值,超过后提升到 L1
|
PromoteThreshold int // 访问次数阈值,超过后提升到 L1
|
||||||
PromoteInterval time.Duration // 提升检查间隔
|
PromoteInterval time.Duration // 提升检查间隔
|
||||||
@ -66,7 +70,7 @@ type accessInfo struct {
|
|||||||
// NewTieredCache 创建分层缓存实例。
|
// NewTieredCache 创建分层缓存实例。
|
||||||
func NewTieredCache(cfg *TieredCacheConfig) (*TieredCache, error) {
|
func NewTieredCache(cfg *TieredCacheConfig) (*TieredCache, error) {
|
||||||
// 创建 L1 内存缓存
|
// 创建 L1 内存缓存
|
||||||
l1 := NewProxyCache(nil, true, 0)
|
l1 := NewProxyCache(nil, true, 0, cfg.StaleIfError, cfg.StaleIfTimeout)
|
||||||
|
|
||||||
// 创建 L2 磁盘缓存
|
// 创建 L2 磁盘缓存
|
||||||
l2, err := NewDiskCache(cfg.L2Config)
|
l2, err := NewDiskCache(cfg.L2Config)
|
||||||
@ -123,6 +127,26 @@ func (tc *TieredCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bo
|
|||||||
return entry, true, stale
|
return entry, true, stale
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetStale 在上游错误时获取可用的过期缓存。
|
||||||
|
//
|
||||||
|
// 先查 L1,再查 L2。
|
||||||
|
func (tc *TieredCache) GetStale(hashKey uint64, origKey string, isTimeout bool) (*ProxyCacheEntry, bool) {
|
||||||
|
// 1. 先查 L1
|
||||||
|
if entry, ok := tc.l1.GetStale(hashKey, origKey, isTimeout); ok {
|
||||||
|
tc.l1Hits.Add(1)
|
||||||
|
return entry, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 查 L2
|
||||||
|
if entry, ok := tc.l2.GetStale(hashKey, origKey, isTimeout); ok {
|
||||||
|
tc.l2Hits.Add(1)
|
||||||
|
return entry, true
|
||||||
|
}
|
||||||
|
|
||||||
|
tc.misses.Add(1)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
// Set 设置缓存条目(实现 CacheBackend 接口)。
|
// Set 设置缓存条目(实现 CacheBackend 接口)。
|
||||||
func (tc *TieredCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) {
|
func (tc *TieredCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) {
|
||||||
// 同时写入 L1 和 L2
|
// 同时写入 L1 和 L2
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user