lolly/internal/cache/tiered_cache.go
xfy 8deda73b24 feat(cache): 添加 stale_if_error 和 stale_if_timeout 缓存接口和实现
在 CacheBackend 接口新增 GetStale 方法,支持上游错误时按错误类型
(超时 vs 其他错误)检查对应的 stale 窗口返回过期缓存。
ProxyCache、DiskCache、TieredCache 均实现该方法。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-24 10:05:39 +08:00

265 lines
6.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package cache 提供文件缓存和代理缓存功能。
//
// 该文件实现 TieredCache 分层缓存,支持:
// - L1 内存缓存(热点数据)
// - L2 磁盘缓存(持久化)
// - 热点提升机制
// - L2 过期检查
//
// 主要用途:
//
// 作为代理缓存的主要实现平衡性能L1和容量L2
//
// 作者xfy
package cache
import (
"sync"
"sync/atomic"
"time"
)
// TieredCacheConfig 分层缓存配置。
type TieredCacheConfig struct {
// L1 配置
L1MaxEntries int64
L1MaxSize int64
// L2 配置
L2Config *DiskCacheConfig
// Stale 配置
StaleIfError time.Duration // 错误时使用过期缓存的窗口
StaleIfTimeout time.Duration // 超时时使用过期缓存的窗口
// 热点提升配置
PromoteThreshold int // 访问次数阈值,超过后提升到 L1
PromoteInterval time.Duration // 提升检查间隔
}
// TieredCache 分层缓存L1 内存 + L2 磁盘)。
type TieredCache struct {
l1 *ProxyCache // L1 内存缓存(热点数据)
l2 *DiskCache // L2 磁盘缓存(持久化)
l1Ratio float64 // L1 容量占 L2 的比例
promoter *promoter // 热点提升器
stopCh chan struct{}
// 统计
l1Hits atomic.Int64
l2Hits atomic.Int64
misses atomic.Int64
promotes atomic.Int64
}
// promoter 热点提升器。
type promoter struct {
threshold int // 访问次数阈值
interval time.Duration // 检查间隔
accessMap map[uint64]*accessInfo
mu sync.RWMutex
}
// accessInfo 访问信息。
type accessInfo struct {
count int
lastAccess time.Time
origKey string
}
// NewTieredCache 创建分层缓存实例。
func NewTieredCache(cfg *TieredCacheConfig) (*TieredCache, error) {
// 创建 L1 内存缓存
l1 := NewProxyCache(nil, true, 0, cfg.StaleIfError, cfg.StaleIfTimeout)
// 创建 L2 磁盘缓存
l2, err := NewDiskCache(cfg.L2Config)
if err != nil {
return nil, err
}
tc := &TieredCache{
l1: l1,
l2: l2,
l1Ratio: 0.1, // 默认 L1 占 10%
promoter: &promoter{
threshold: cfg.PromoteThreshold,
interval: cfg.PromoteInterval,
accessMap: make(map[uint64]*accessInfo),
},
stopCh: make(chan struct{}),
}
// 启动热点提升检查
if tc.promoter.threshold > 0 {
go tc.promoteLoop()
}
return tc, nil
}
// Get 获取缓存条目(实现 CacheBackend 接口)。
func (tc *TieredCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool, bool) {
// 1. 先查 L1
entry, exists, stale := tc.l1.Get(hashKey, origKey)
if exists {
tc.l1Hits.Add(1)
return entry, true, stale
}
// 2. 查 L2必须验证 max_age
entry, exists, stale = tc.l2.Get(hashKey, origKey)
if !exists {
tc.misses.Add(1)
return nil, false, false
}
tc.l2Hits.Add(1)
// 3. 记录访问(用于热点提升)
tc.recordAccess(hashKey, origKey)
// 4. L2 命中且未过期,异步提升到 L1
if !stale {
go tc.promoteToL1(hashKey, entry)
}
return entry, true, stale
}
// GetStale 在上游错误时获取可用的过期缓存。
//
// 先查 L1再查 L2。
func (tc *TieredCache) GetStale(hashKey uint64, origKey string, isTimeout bool) (*ProxyCacheEntry, bool) {
// 1. 先查 L1
if entry, ok := tc.l1.GetStale(hashKey, origKey, isTimeout); ok {
tc.l1Hits.Add(1)
return entry, true
}
// 2. 查 L2
if entry, ok := tc.l2.GetStale(hashKey, origKey, isTimeout); ok {
tc.l2Hits.Add(1)
return entry, true
}
tc.misses.Add(1)
return nil, false
}
// Set 设置缓存条目(实现 CacheBackend 接口)。
func (tc *TieredCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) {
// 同时写入 L1 和 L2
tc.l1.Set(hashKey, origKey, data, headers, status, maxAge)
// L2 异步写入(在 goroutine 中执行)
go tc.l2.Set(hashKey, origKey, data, headers, status, maxAge)
}
// Delete 删除缓存条目(实现 CacheBackend 接口)。
func (tc *TieredCache) Delete(hashKey uint64) error {
_ = tc.l1.Delete(hashKey)
return tc.l2.Delete(hashKey)
}
// CacheStats 返回缓存统计信息(实现 CacheBackend 接口)。
func (tc *TieredCache) CacheStats() CacheStats {
l1Stats := tc.l1.CacheStats()
l2Stats := tc.l2.CacheStats()
return CacheStats{
Entries: l1Stats.Entries + l2Stats.Entries,
Size: l1Stats.Size + l2Stats.Size,
HitCount: tc.l1Hits.Load() + tc.l2Hits.Load(),
MissCount: tc.misses.Load(),
Evictions: l2Stats.Evictions,
}
}
// TieredCacheStats 返回分层缓存详细统计。
func (tc *TieredCache) TieredCacheStats() TieredCacheStats {
return TieredCacheStats{
L1Hits: tc.l1Hits.Load(),
L2Hits: tc.l2Hits.Load(),
Misses: tc.misses.Load(),
Promotes: tc.promotes.Load(),
L1Entries: tc.l1.CacheStats().Entries,
L2Entries: tc.l2.CacheStats().Entries,
}
}
// TieredCacheStats 分层缓存详细统计。
type TieredCacheStats struct {
L1Hits int64
L2Hits int64
Misses int64
Promotes int64
L1Entries int64
L2Entries int64
}
// Stop 停止分层缓存。
func (tc *TieredCache) Stop() {
close(tc.stopCh)
if tc.l2 != nil {
tc.l2.Stop()
}
}
// recordAccess 记录访问(用于热点提升)。
func (tc *TieredCache) recordAccess(hashKey uint64, origKey string) {
if tc.promoter.threshold <= 0 {
return
}
tc.promoter.mu.Lock()
defer tc.promoter.mu.Unlock()
info, exists := tc.promoter.accessMap[hashKey]
if !exists {
info = &accessInfo{origKey: origKey}
tc.promoter.accessMap[hashKey] = info
}
info.count++
info.lastAccess = time.Now()
}
// promoteToL1 提升条目到 L1。
func (tc *TieredCache) promoteToL1(hashKey uint64, entry *ProxyCacheEntry) {
tc.l1.Set(hashKey, entry.OrigKey, entry.Data, entry.Headers, entry.Status, entry.MaxAge)
tc.promotes.Add(1)
}
// promoteLoop 热点提升检查循环。
func (tc *TieredCache) promoteLoop() {
ticker := time.NewTicker(tc.promoter.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
tc.checkAndPromote()
case <-tc.stopCh:
return
}
}
}
// checkAndPromote 检查并提升热点数据。
func (tc *TieredCache) checkAndPromote() {
tc.promoter.mu.Lock()
defer tc.promoter.mu.Unlock()
for hashKey, info := range tc.promoter.accessMap {
if info.count >= tc.promoter.threshold {
// 从 L2 获取并提升到 L1
entry, exists, _ := tc.l2.Get(hashKey, info.origKey)
if exists {
tc.promoteToL1(hashKey, entry)
}
// 重置计数
info.count = 0
}
}
}