- disk_cache: 忽略 filepath.Walk 和 Delete 返回值 - tiered_cache: 忽略 l1.Delete 返回值,删除未使用的 revalidate 函数 - config: 简化 multiplier 变量声明 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
241 lines
5.7 KiB
Go
241 lines
5.7 KiB
Go
// Package cache 提供文件缓存和代理缓存功能。
|
||
//
|
||
// 该文件实现 TieredCache 分层缓存,支持:
|
||
// - L1 内存缓存(热点数据)
|
||
// - L2 磁盘缓存(持久化)
|
||
// - 热点提升机制
|
||
// - L2 过期检查
|
||
//
|
||
// 主要用途:
|
||
//
|
||
// 作为代理缓存的主要实现,平衡性能(L1)和容量(L2)。
|
||
//
|
||
// 作者:xfy
|
||
package cache
|
||
|
||
import (
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
// TieredCacheConfig 分层缓存配置。
|
||
type TieredCacheConfig struct {
|
||
// L1 配置
|
||
L1MaxEntries int64
|
||
L1MaxSize int64
|
||
|
||
// L2 配置
|
||
L2Config *DiskCacheConfig
|
||
|
||
// 热点提升配置
|
||
PromoteThreshold int // 访问次数阈值,超过后提升到 L1
|
||
PromoteInterval time.Duration // 提升检查间隔
|
||
}
|
||
|
||
// TieredCache 分层缓存(L1 内存 + L2 磁盘)。
|
||
type TieredCache struct {
|
||
l1 *ProxyCache // L1 内存缓存(热点数据)
|
||
l2 *DiskCache // L2 磁盘缓存(持久化)
|
||
l1Ratio float64 // L1 容量占 L2 的比例
|
||
promoter *promoter // 热点提升器
|
||
stopCh chan struct{}
|
||
|
||
// 统计
|
||
l1Hits atomic.Int64
|
||
l2Hits atomic.Int64
|
||
misses atomic.Int64
|
||
promotes atomic.Int64
|
||
}
|
||
|
||
// promoter 热点提升器。
|
||
type promoter struct {
|
||
threshold int // 访问次数阈值
|
||
interval time.Duration // 检查间隔
|
||
accessMap map[uint64]*accessInfo
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
// accessInfo 访问信息。
|
||
type accessInfo struct {
|
||
count int
|
||
lastAccess time.Time
|
||
origKey string
|
||
}
|
||
|
||
// NewTieredCache 创建分层缓存实例。
|
||
func NewTieredCache(cfg *TieredCacheConfig) (*TieredCache, error) {
|
||
// 创建 L1 内存缓存
|
||
l1 := NewProxyCache(nil, true, 0)
|
||
|
||
// 创建 L2 磁盘缓存
|
||
l2, err := NewDiskCache(cfg.L2Config)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
tc := &TieredCache{
|
||
l1: l1,
|
||
l2: l2,
|
||
l1Ratio: 0.1, // 默认 L1 占 10%
|
||
promoter: &promoter{
|
||
threshold: cfg.PromoteThreshold,
|
||
interval: cfg.PromoteInterval,
|
||
accessMap: make(map[uint64]*accessInfo),
|
||
},
|
||
stopCh: make(chan struct{}),
|
||
}
|
||
|
||
// 启动热点提升检查
|
||
if tc.promoter.threshold > 0 {
|
||
go tc.promoteLoop()
|
||
}
|
||
|
||
return tc, nil
|
||
}
|
||
|
||
// Get 获取缓存条目(实现 CacheBackend 接口)。
|
||
func (tc *TieredCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool, bool) {
|
||
// 1. 先查 L1
|
||
entry, exists, stale := tc.l1.Get(hashKey, origKey)
|
||
if exists {
|
||
tc.l1Hits.Add(1)
|
||
return entry, true, stale
|
||
}
|
||
|
||
// 2. 查 L2,必须验证 max_age
|
||
entry, exists, stale = tc.l2.Get(hashKey, origKey)
|
||
if !exists {
|
||
tc.misses.Add(1)
|
||
return nil, false, false
|
||
}
|
||
|
||
tc.l2Hits.Add(1)
|
||
|
||
// 3. 记录访问(用于热点提升)
|
||
tc.recordAccess(hashKey, origKey)
|
||
|
||
// 4. L2 命中且未过期,异步提升到 L1
|
||
if !stale {
|
||
go tc.promoteToL1(hashKey, entry)
|
||
}
|
||
|
||
return entry, true, stale
|
||
}
|
||
|
||
// Set 设置缓存条目(实现 CacheBackend 接口)。
|
||
func (tc *TieredCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) {
|
||
// 同时写入 L1 和 L2
|
||
tc.l1.Set(hashKey, origKey, data, headers, status, maxAge)
|
||
|
||
// L2 异步写入(在 goroutine 中执行)
|
||
go tc.l2.Set(hashKey, origKey, data, headers, status, maxAge)
|
||
}
|
||
|
||
// Delete 删除缓存条目(实现 CacheBackend 接口)。
|
||
func (tc *TieredCache) Delete(hashKey uint64) error {
|
||
_ = tc.l1.Delete(hashKey)
|
||
return tc.l2.Delete(hashKey)
|
||
}
|
||
|
||
// CacheStats 返回缓存统计信息(实现 CacheBackend 接口)。
|
||
func (tc *TieredCache) CacheStats() CacheStats {
|
||
l1Stats := tc.l1.CacheStats()
|
||
l2Stats := tc.l2.CacheStats()
|
||
|
||
return CacheStats{
|
||
Entries: l1Stats.Entries + l2Stats.Entries,
|
||
Size: l1Stats.Size + l2Stats.Size,
|
||
HitCount: tc.l1Hits.Load() + tc.l2Hits.Load(),
|
||
MissCount: tc.misses.Load(),
|
||
Evictions: l2Stats.Evictions,
|
||
}
|
||
}
|
||
|
||
// TieredCacheStats 返回分层缓存详细统计。
|
||
func (tc *TieredCache) TieredCacheStats() TieredCacheStats {
|
||
return TieredCacheStats{
|
||
L1Hits: tc.l1Hits.Load(),
|
||
L2Hits: tc.l2Hits.Load(),
|
||
Misses: tc.misses.Load(),
|
||
Promotes: tc.promotes.Load(),
|
||
L1Entries: tc.l1.CacheStats().Entries,
|
||
L2Entries: tc.l2.CacheStats().Entries,
|
||
}
|
||
}
|
||
|
||
// TieredCacheStats 分层缓存详细统计。
|
||
type TieredCacheStats struct {
|
||
L1Hits int64
|
||
L2Hits int64
|
||
Misses int64
|
||
Promotes int64
|
||
L1Entries int64
|
||
L2Entries int64
|
||
}
|
||
|
||
// Stop 停止分层缓存。
|
||
func (tc *TieredCache) Stop() {
|
||
close(tc.stopCh)
|
||
if tc.l2 != nil {
|
||
tc.l2.Stop()
|
||
}
|
||
}
|
||
|
||
// recordAccess 记录访问(用于热点提升)。
|
||
func (tc *TieredCache) recordAccess(hashKey uint64, origKey string) {
|
||
if tc.promoter.threshold <= 0 {
|
||
return
|
||
}
|
||
|
||
tc.promoter.mu.Lock()
|
||
defer tc.promoter.mu.Unlock()
|
||
|
||
info, exists := tc.promoter.accessMap[hashKey]
|
||
if !exists {
|
||
info = &accessInfo{origKey: origKey}
|
||
tc.promoter.accessMap[hashKey] = info
|
||
}
|
||
info.count++
|
||
info.lastAccess = time.Now()
|
||
}
|
||
|
||
// promoteToL1 提升条目到 L1。
|
||
func (tc *TieredCache) promoteToL1(hashKey uint64, entry *ProxyCacheEntry) {
|
||
tc.l1.Set(hashKey, entry.OrigKey, entry.Data, entry.Headers, entry.Status, entry.MaxAge)
|
||
tc.promotes.Add(1)
|
||
}
|
||
|
||
// promoteLoop 热点提升检查循环。
|
||
func (tc *TieredCache) promoteLoop() {
|
||
ticker := time.NewTicker(tc.promoter.interval)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
tc.checkAndPromote()
|
||
case <-tc.stopCh:
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// checkAndPromote 检查并提升热点数据。
|
||
func (tc *TieredCache) checkAndPromote() {
|
||
tc.promoter.mu.Lock()
|
||
defer tc.promoter.mu.Unlock()
|
||
|
||
for hashKey, info := range tc.promoter.accessMap {
|
||
if info.count >= tc.promoter.threshold {
|
||
// 从 L2 获取并提升到 L1
|
||
entry, exists, _ := tc.l2.Get(hashKey, info.origKey)
|
||
if exists {
|
||
tc.promoteToL1(hashKey, entry)
|
||
}
|
||
// 重置计数
|
||
info.count = 0
|
||
}
|
||
}
|
||
}
|