feat(cache): 实现分层缓存架构

- 添加 CacheBackend 接口统一内存/磁盘缓存访问
- 实现 DiskCache 磁盘缓存后端,支持目录层级和原子写入
- 实现 TieredCache 分层缓存(L1 内存 + L2 磁盘)
- 修改 ProxyCache.Delete 返回 error 以符合接口
- 添加 CacheStats() 方法实现接口

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
xfy 2026-04-22 13:15:02 +08:00
parent 9f7090df67
commit aae378433e
6 changed files with 1524 additions and 1 deletions

93
internal/cache/backend.go vendored Normal file
View File

@ -0,0 +1,93 @@
// Package cache 提供文件缓存和代理缓存功能。
//
// 该文件定义 CacheBackend 接口,统一内存缓存和磁盘缓存的访问方式。
// 支持分层缓存架构L1 内存 + L2 磁盘),热点数据常驻内存,冷数据持久化到磁盘。
//
// 主要用途:
//
// 作为缓存后端的抽象层,使 ProxyCache内存和 DiskCache磁盘可互换使用。
//
// 设计原则:
// - 接口精简为核心 CRUD 操作
// - Set 方法无返回值,与现有 ProxyCache.Set 签名一致
// - Get 返回 stale 标志,支持过期缓存复用
//
// 作者xfy
package cache
import "time"
// CacheBackend 缓存后端接口。
//
// 统一内存缓存和磁盘缓存的访问方式,支持分层缓存架构。
// 实现包括 ProxyCache内存、DiskCache磁盘和 TieredCache分层
type CacheBackend interface {
// Get 获取缓存条目。
//
// 参数:
// - hashKey: 缓存键的哈希值
// - origKey: 原始缓存键(用于双重验证,防止哈希碰撞)
//
// 返回值:
// - *ProxyCacheEntry: 缓存条目,包含响应数据、头部、状态码等
// - bool: 是否存在
// - bool: 是否过期staletrue 表示可使用过期缓存
Get(hashKey uint64, origKey string) (entry *ProxyCacheEntry, exists bool, stale bool)
// Set 设置缓存条目。
//
// 无返回值,与现有 ProxyCache.Set 签名一致。
// 写入操作为异步或同步取决于具体实现。
//
// 参数:
// - hashKey: 缓存键的哈希值
// - origKey: 原始缓存键
// - data: 响应体数据
// - headers: 响应头
// - status: HTTP 状态码
// - maxAge: 缓存有效期
Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration)
// Delete 删除缓存条目。
//
// 参数:
// - hashKey: 缓存键的哈希值
//
// 返回值:
// - error: 删除失败时返回错误,成功返回 nil
Delete(hashKey uint64) error
// CacheStats 返回缓存统计信息。
//
// 注意:此方法名与 ProxyCache.Stats() 不同,以保持向后兼容。
// ProxyCache 同时实现 Stats() 返回 ProxyCacheStats 和 CacheStats() 返回 CacheStats。
//
// 返回值:
// - CacheStats: 缓存统计数据
CacheStats() CacheStats
}
// CacheStats 缓存统计信息。
//
// 包含缓存的基本统计数据,用于监控和运维。
type CacheStats struct {
// Entries 当前缓存条目数量
Entries int64
// Size 缓存总大小(字节)
Size int64
// HitCount 缓存命中次数
HitCount int64
// MissCount 缓存未命中次数
MissCount int64
// Evictions 缓存淘汰次数
Evictions int64
}
// 确保 ProxyCache 实现 CacheBackend 接口
// ProxyCache 已有 Get, Set, Delete, Stats 方法,但签名略有不同:
// - Delete() 无返回值,需要添加包装方法
// - Stats() 返回 ProxyCacheStats需要适配

415
internal/cache/disk_cache.go vendored Normal file
View File

@ -0,0 +1,415 @@
// Package cache 提供文件缓存和代理缓存功能。
//
// 该文件实现 DiskCache 磁盘缓存后端,支持:
// - 目录层级配置levels=1:2
// - 原子写入策略(.tmp → .data
// - 懒加载(后台加载元数据,不阻塞启动)
// - CRC32 校验和验证数据完整性
//
// 主要用途:
//
// 作为 L2 缓存层,持久化代理响应到磁盘,支持服务重启后恢复。
//
// 作者xfy
package cache
import (
"encoding/json"
"fmt"
"hash/crc32"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
)
// DiskCacheConfig 磁盘缓存配置。
type DiskCacheConfig struct {
// Path 缓存根目录
Path string
// Levels 目录层级,如 "1:2" 表示两级目录
Levels string
// MaxSize 最大缓存大小(字节)
MaxSize int64
// Inactive 未访问淘汰时间
Inactive time.Duration
}
// DiskCache 磁盘缓存实现。
type DiskCache struct {
basePath string
levels []int
maxSize int64
inactive time.Duration
currentSize atomic.Int64
entries map[uint64]*DiskCacheMeta
mu sync.RWMutex
stopCh chan struct{}
// 懒加载相关
loaded atomic.Bool
loadCh chan struct{}
// 统计
hitCount atomic.Int64
missCount atomic.Int64
evictions atomic.Int64
}
// DiskCacheMeta 磁盘缓存元数据。
type DiskCacheMeta struct {
HashKey uint64 `json:"hash_key"`
OrigKey string `json:"orig_key"`
Created time.Time `json:"created"`
MaxAge time.Duration `json:"max_age"`
Status int `json:"status"`
Size int64 `json:"size"`
Headers map[string]string `json:"headers,omitempty"`
CRC32 uint32 `json:"crc32"`
}
// DiskCacheEntry 磁盘缓存条目(包含数据)。
type DiskCacheEntry struct {
*DiskCacheMeta
Data []byte
}
// NewDiskCache 创建磁盘缓存实例(懒加载模式)。
func NewDiskCache(cfg *DiskCacheConfig) (*DiskCache, error) {
if cfg.Path == "" {
return nil, fmt.Errorf("disk cache path is required")
}
// 确保目录存在
if err := os.MkdirAll(cfg.Path, 0o755); err != nil {
return nil, fmt.Errorf("create cache directory: %w", err)
}
dc := &DiskCache{
basePath: cfg.Path,
levels: parseLevels(cfg.Levels),
maxSize: cfg.MaxSize,
inactive: cfg.Inactive,
entries: make(map[uint64]*DiskCacheMeta),
loadCh: make(chan struct{}),
stopCh: make(chan struct{}),
}
// 启动后台加载,不阻塞服务启动
go dc.lazyLoad()
return dc, nil
}
// parseLevels 解析目录层级配置。
// 支持格式:""(无层级)、"1"(一级)、"1:2"(两级)
func parseLevels(levels string) []int {
if levels == "" {
return nil
}
var result []int
start := 0
for i := 0; i <= len(levels); i++ {
if i == len(levels) || levels[i] == ':' {
var level int
for j := start; j < i; j++ {
level = level*10 + int(levels[j]-'0')
}
if level > 0 {
result = append(result, level)
}
start = i + 1
}
}
return result
}
// lazyLoad 后台加载缓存元数据。
func (dc *DiskCache) lazyLoad() {
defer close(dc.loadCh)
// 扫描目录加载元数据(不加载实际数据)
filepath.Walk(dc.basePath, func(path string, info os.FileInfo, err error) error {
if err != nil || info.IsDir() {
return nil
}
// 只处理 .meta 文件
if filepath.Ext(path) != ".meta" {
return nil
}
meta := dc.loadMetaFile(path)
if meta != nil {
dc.mu.Lock()
dc.entries[meta.HashKey] = meta
dc.mu.Unlock()
dc.currentSize.Add(meta.Size)
}
return nil
})
dc.loaded.Store(true)
}
// loadMetaFile 加载元数据文件。
func (dc *DiskCache) loadMetaFile(path string) *DiskCacheMeta {
data, err := os.ReadFile(path)
if err != nil {
return nil
}
var meta DiskCacheMeta
if err := json.Unmarshal(data, &meta); err != nil {
return nil
}
return &meta
}
// Get 获取缓存条目(实现 CacheBackend 接口)。
func (dc *DiskCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool, bool) {
// 如果未完成加载,等待加载完成或超时
if !dc.loaded.Load() {
select {
case <-dc.loadCh:
// 加载完成,继续
case <-time.After(100 * time.Millisecond):
// 超时,返回未命中(避免阻塞请求)
dc.missCount.Add(1)
return nil, false, false
}
}
dc.mu.RLock()
meta, exists := dc.entries[hashKey]
dc.mu.RUnlock()
if !exists {
dc.missCount.Add(1)
return nil, false, false
}
// 双重验证:检查原始 key 是否匹配
if meta.OrigKey != origKey {
dc.missCount.Add(1)
return nil, false, false
}
// 读取数据文件
dataPath := dc.filePathFromHash(hashKey, "data")
data, err := os.ReadFile(dataPath)
if err != nil {
dc.missCount.Add(1)
return nil, false, false
}
// 验证 CRC32
if meta.CRC32 != 0 {
if crc32.ChecksumIEEE(data) != meta.CRC32 {
// 数据损坏,删除条目
dc.Delete(hashKey)
dc.missCount.Add(1)
return nil, false, false
}
}
// 检查是否过期
now := time.Now()
expiresAt := meta.Created.Add(meta.MaxAge)
stale := now.After(expiresAt)
dc.hitCount.Add(1)
// 转换为 ProxyCacheEntry
entry := &ProxyCacheEntry{
Key: meta.OrigKey,
OrigKey: meta.OrigKey,
Data: data,
Headers: meta.Headers,
Status: meta.Status,
Created: meta.Created,
MaxAge: meta.MaxAge,
}
return entry, true, stale
}
// Set 设置缓存条目(实现 CacheBackend 接口)。
func (dc *DiskCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) {
// 计算文件路径
dataPath := dc.filePathFromHash(hashKey, "data")
metaPath := dc.filePathFromHash(hashKey, "meta")
// 确保目录存在
dir := filepath.Dir(dataPath)
if err := os.MkdirAll(dir, 0o755); err != nil {
return
}
// 计算 CRC32
crc := crc32.ChecksumIEEE(data)
// 创建元数据
meta := &DiskCacheMeta{
HashKey: hashKey,
OrigKey: origKey,
Created: time.Now(),
MaxAge: maxAge,
Status: status,
Size: int64(len(data)),
Headers: headers,
CRC32: crc,
}
// 原子写入数据文件:先写临时文件,再重命名
tmpDataPath := dataPath + ".tmp"
if err := os.WriteFile(tmpDataPath, data, 0o644); err != nil {
return
}
if err := os.Rename(tmpDataPath, dataPath); err != nil {
os.Remove(tmpDataPath)
return
}
// 写入元数据文件
metaData, err := json.Marshal(meta)
if err != nil {
return
}
tmpMetaPath := metaPath + ".tmp"
if err := os.WriteFile(tmpMetaPath, metaData, 0o644); err != nil {
return
}
if err := os.Rename(tmpMetaPath, metaPath); err != nil {
os.Remove(tmpMetaPath)
return
}
// 更新内存索引
dc.mu.Lock()
oldMeta, existed := dc.entries[hashKey]
dc.entries[hashKey] = meta
dc.mu.Unlock()
// 更新大小统计
if existed && oldMeta != nil {
dc.currentSize.Add(-oldMeta.Size)
}
dc.currentSize.Add(meta.Size)
// 检查是否需要淘汰
if dc.maxSize > 0 && dc.currentSize.Load() > dc.maxSize {
go dc.evict()
}
}
// Delete 删除缓存条目(实现 CacheBackend 接口)。
func (dc *DiskCache) Delete(hashKey uint64) error {
dc.mu.Lock()
meta, exists := dc.entries[hashKey]
if exists {
delete(dc.entries, hashKey)
dc.currentSize.Add(-meta.Size)
dc.evictions.Add(1)
}
dc.mu.Unlock()
if !exists {
return nil
}
// 删除文件
dataPath := dc.filePathFromHash(hashKey, "data")
metaPath := dc.filePathFromHash(hashKey, "meta")
os.Remove(dataPath)
os.Remove(metaPath)
return nil
}
// CacheStats 返回缓存统计信息(实现 CacheBackend 接口)。
func (dc *DiskCache) CacheStats() CacheStats {
dc.mu.RLock()
entries := int64(len(dc.entries))
dc.mu.RUnlock()
return CacheStats{
Entries: entries,
Size: dc.currentSize.Load(),
HitCount: dc.hitCount.Load(),
MissCount: dc.missCount.Load(),
Evictions: dc.evictions.Load(),
}
}
// Stop 停止磁盘缓存。
func (dc *DiskCache) Stop() {
close(dc.stopCh)
}
// filePathFromHash 根据哈希值计算文件路径。
func (dc *DiskCache) filePathFromHash(hashKey uint64, ext string) string {
// 将哈希值转换为十六进制字符串
hashStr := fmt.Sprintf("%016x", hashKey)
if len(dc.levels) == 0 {
return filepath.Join(dc.basePath, hashStr+"."+ext)
}
// 根据层级构建路径
parts := []string{dc.basePath}
offset := len(hashStr)
for _, level := range dc.levels {
if offset < level {
break
}
offset -= level
parts = append(parts, hashStr[offset:offset+level])
}
parts = append(parts, hashStr+"."+ext)
return filepath.Join(parts...)
}
// evict 淘汰旧条目。
func (dc *DiskCache) evict() {
// 简单策略:删除最旧的条目直到大小低于阈值
targetSize := dc.maxSize * 9 / 10 // 淘汰到 90%
for dc.currentSize.Load() > targetSize {
dc.mu.Lock()
// 找到最旧的条目
var oldestKey uint64
var oldestTime time.Time
for key, meta := range dc.entries {
if oldestKey == 0 || meta.Created.Before(oldestTime) {
oldestKey = key
oldestTime = meta.Created
}
}
if oldestKey == 0 {
dc.mu.Unlock()
break
}
meta := dc.entries[oldestKey]
delete(dc.entries, oldestKey)
dc.currentSize.Add(-meta.Size)
dc.evictions.Add(1)
dc.mu.Unlock()
// 删除文件
dataPath := dc.filePathFromHash(oldestKey, "data")
metaPath := dc.filePathFromHash(oldestKey, "meta")
os.Remove(dataPath)
os.Remove(metaPath)
}
}

386
internal/cache/disk_cache_test.go vendored Normal file
View File

@ -0,0 +1,386 @@
package cache
import (
"os"
"path/filepath"
"testing"
"time"
)
func TestNewDiskCache(t *testing.T) {
// 创建临时目录
tmpDir := t.TempDir()
cfg := &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
}
dc, err := NewDiskCache(cfg)
if err != nil {
t.Fatalf("NewDiskCache failed: %v", err)
}
defer dc.Stop()
// 等待懒加载完成
<-dc.loadCh
if dc.basePath != tmpDir {
t.Errorf("basePath = %q, want %q", dc.basePath, tmpDir)
}
}
func TestDiskCacheSetGet(t *testing.T) {
tmpDir := t.TempDir()
cfg := &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
}
dc, err := NewDiskCache(cfg)
if err != nil {
t.Fatalf("NewDiskCache failed: %v", err)
}
defer dc.Stop()
// 等待懒加载完成
<-dc.loadCh
// 设置缓存
hashKey := uint64(12345)
origKey := "GET:/api/test"
data := []byte("test response data")
headers := map[string]string{"Content-Type": "application/json"}
status := 200
maxAge := 10 * time.Minute
dc.Set(hashKey, origKey, data, headers, status, maxAge)
// 获取缓存
entry, exists, stale := dc.Get(hashKey, origKey)
if !exists {
t.Fatal("cache entry not found")
}
if stale {
t.Error("entry should not be stale")
}
if string(entry.Data) != string(data) {
t.Errorf("Data = %q, want %q", entry.Data, data)
}
if entry.Status != status {
t.Errorf("Status = %d, want %d", entry.Status, status)
}
if entry.Headers["Content-Type"] != "application/json" {
t.Errorf("Headers[Content-Type] = %q, want %q", entry.Headers["Content-Type"], "application/json")
}
}
func TestDiskCacheDelete(t *testing.T) {
tmpDir := t.TempDir()
cfg := &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
}
dc, err := NewDiskCache(cfg)
if err != nil {
t.Fatalf("NewDiskCache failed: %v", err)
}
defer dc.Stop()
<-dc.loadCh
// 设置缓存
hashKey := uint64(12345)
origKey := "GET:/api/test"
dc.Set(hashKey, origKey, []byte("test"), nil, 200, 10*time.Minute)
// 验证存在
_, exists, _ := dc.Get(hashKey, origKey)
if !exists {
t.Fatal("entry should exist before delete")
}
// 删除
if err := dc.Delete(hashKey); err != nil {
t.Fatalf("Delete failed: %v", err)
}
// 验证已删除
_, exists, _ = dc.Get(hashKey, origKey)
if exists {
t.Error("entry should not exist after delete")
}
}
func TestDiskCacheStale(t *testing.T) {
tmpDir := t.TempDir()
cfg := &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
}
dc, err := NewDiskCache(cfg)
if err != nil {
t.Fatalf("NewDiskCache failed: %v", err)
}
defer dc.Stop()
<-dc.loadCh
// 设置一个已过期的缓存
hashKey := uint64(12345)
origKey := "GET:/api/test"
dc.Set(hashKey, origKey, []byte("test"), nil, 200, 1*time.Millisecond)
// 等待过期
time.Sleep(10 * time.Millisecond)
// 获取缓存
entry, exists, stale := dc.Get(hashKey, origKey)
if !exists {
t.Fatal("expired entry should still exist")
}
if !stale {
t.Error("expired entry should be marked as stale")
}
if string(entry.Data) != "test" {
t.Errorf("Data = %q, want %q", entry.Data, "test")
}
}
func TestDiskCacheLevels(t *testing.T) {
tmpDir := t.TempDir()
cfg := &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
}
dc, err := NewDiskCache(cfg)
if err != nil {
t.Fatalf("NewDiskCache failed: %v", err)
}
defer dc.Stop()
<-dc.loadCh
// 设置缓存
hashKey := uint64(0xabcdef1234567890)
origKey := "GET:/api/test"
dc.Set(hashKey, origKey, []byte("test"), nil, 200, 10*time.Minute)
// 验证文件路径包含层级目录
dataPath := dc.filePathFromHash(hashKey, "data")
if filepath.Dir(dataPath) == tmpDir {
t.Error("file should be in a subdirectory for levels=1:2")
}
// 验证文件存在
if _, err := os.Stat(dataPath); os.IsNotExist(err) {
t.Errorf("data file not found at %s", dataPath)
}
}
func TestDiskCacheMaxSize(t *testing.T) {
tmpDir := t.TempDir()
cfg := &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
MaxSize: 100, // 很小的限制
}
dc, err := NewDiskCache(cfg)
if err != nil {
t.Fatalf("NewDiskCache failed: %v", err)
}
defer dc.Stop()
<-dc.loadCh
// 设置多个缓存条目
for i := range 10 {
hashKey := uint64(i)
origKey := "GET:/api/test"
dc.Set(hashKey, origKey, []byte("test data that is longer than 10 bytes"), nil, 200, 10*time.Minute)
}
// 等待淘汰完成(淘汰是异步的)
time.Sleep(500 * time.Millisecond)
// 验证淘汰发生Evictions > 0
stats := dc.CacheStats()
if stats.Evictions == 0 {
t.Error("Evictions should be > 0 when MaxSize is exceeded")
}
}
func TestDiskCacheStats(t *testing.T) {
tmpDir := t.TempDir()
cfg := &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
}
dc, err := NewDiskCache(cfg)
if err != nil {
t.Fatalf("NewDiskCache failed: %v", err)
}
defer dc.Stop()
<-dc.loadCh
// 初始统计
stats := dc.CacheStats()
if stats.Entries != 0 {
t.Errorf("initial Entries = %d, want 0", stats.Entries)
}
// 设置缓存
dc.Set(1, "key1", []byte("data1"), nil, 200, 10*time.Minute)
dc.Set(2, "key2", []byte("data2"), nil, 200, 10*time.Minute)
stats = dc.CacheStats()
if stats.Entries != 2 {
t.Errorf("Entries = %d, want 2", stats.Entries)
}
// 获取缓存(命中)
dc.Get(1, "key1")
stats = dc.CacheStats()
if stats.HitCount != 1 {
t.Errorf("HitCount = %d, want 1", stats.HitCount)
}
// 获取不存在的缓存(未命中)
dc.Get(999, "nonexistent")
stats = dc.CacheStats()
if stats.MissCount != 1 {
t.Errorf("MissCount = %d, want 1", stats.MissCount)
}
}
func TestDiskCacheCRC32(t *testing.T) {
tmpDir := t.TempDir()
cfg := &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
}
dc, err := NewDiskCache(cfg)
if err != nil {
t.Fatalf("NewDiskCache failed: %v", err)
}
defer dc.Stop()
<-dc.loadCh
// 设置缓存
hashKey := uint64(12345)
origKey := "GET:/api/test"
data := []byte("test data for crc32")
dc.Set(hashKey, origKey, data, nil, 200, 10*time.Minute)
// 获取缓存验证 CRC32
entry, exists, _ := dc.Get(hashKey, origKey)
if !exists {
t.Fatal("entry not found")
}
if string(entry.Data) != string(data) {
t.Errorf("Data mismatch")
}
}
func TestParseLevels(t *testing.T) {
tests := []struct {
input string
expected []int
}{
{"", nil},
{"1", []int{1}},
{"1:2", []int{1, 2}},
{"2:2:2", []int{2, 2, 2}},
}
for _, tt := range tests {
result := parseLevels(tt.input)
if len(result) != len(tt.expected) {
t.Errorf("parseLevels(%q) = %v, want %v", tt.input, result, tt.expected)
continue
}
for i, v := range result {
if v != tt.expected[i] {
t.Errorf("parseLevels(%q)[%d] = %d, want %d", tt.input, i, v, tt.expected[i])
}
}
}
}
func TestDiskCacheLazyLoad(t *testing.T) {
tmpDir := t.TempDir()
cfg := &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
}
dc, err := NewDiskCache(cfg)
if err != nil {
t.Fatalf("NewDiskCache failed: %v", err)
}
defer dc.Stop()
// 在懒加载完成前loaded 应该是 false
// 但由于加载很快,我们无法可靠测试这个状态
// 所以我们等待加载完成
<-dc.loadCh
if !dc.loaded.Load() {
t.Error("loaded should be true after lazyLoad completes")
}
}
func TestDiskCacheRestart(t *testing.T) {
tmpDir := t.TempDir()
// 第一个实例:写入数据
cfg := &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
}
dc1, err := NewDiskCache(cfg)
if err != nil {
t.Fatalf("NewDiskCache failed: %v", err)
}
<-dc1.loadCh
hashKey := uint64(12345)
origKey := "GET:/api/test"
data := []byte("persistent data")
dc1.Set(hashKey, origKey, data, nil, 200, 10*time.Minute)
dc1.Stop()
// 第二个实例:读取数据(模拟重启)
dc2, err := NewDiskCache(cfg)
if err != nil {
t.Fatalf("NewDiskCache (restart) failed: %v", err)
}
<-dc2.loadCh
defer dc2.Stop()
// 验证数据恢复
entry, exists, _ := dc2.Get(hashKey, origKey)
if !exists {
t.Fatal("entry should exist after restart")
}
if string(entry.Data) != string(data) {
t.Errorf("Data = %q, want %q", entry.Data, data)
}
}

View File

@ -554,10 +554,11 @@ func (c *ProxyCache) matchStatus(statuses []int, status int) bool {
}
// Delete 删除缓存条目。
func (c *ProxyCache) Delete(hashKey uint64) {
func (c *ProxyCache) Delete(hashKey uint64) error {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.entries, hashKey)
return nil
}
// DeleteByPatternWithMethod 按通配符模式删除缓存条目。
@ -648,3 +649,20 @@ type ProxyCacheStats struct {
// Pending 正在等待缓存生成的请求数量
Pending int
}
// ToCacheStats 转换为 CacheStats 格式。
func (s ProxyCacheStats) ToCacheStats() CacheStats {
return CacheStats{
Entries: int64(s.Entries),
}
}
// CacheStats 返回缓存统计信息(实现 CacheBackend 接口)。
func (c *ProxyCache) CacheStats() CacheStats {
c.mu.RLock()
defer c.mu.RUnlock()
return CacheStats{
Entries: int64(len(c.entries)),
}
}

252
internal/cache/tiered_cache.go vendored Normal file
View File

@ -0,0 +1,252 @@
// Package cache 提供文件缓存和代理缓存功能。
//
// 该文件实现 TieredCache 分层缓存,支持:
// - L1 内存缓存(热点数据)
// - L2 磁盘缓存(持久化)
// - 热点提升机制
// - L2 过期检查
//
// 主要用途:
//
// 作为代理缓存的主要实现平衡性能L1和容量L2
//
// 作者xfy
package cache
import (
"sync"
"sync/atomic"
"time"
)
// TieredCacheConfig 分层缓存配置。
type TieredCacheConfig struct {
// L1 配置
L1MaxEntries int64
L1MaxSize int64
// L2 配置
L2Config *DiskCacheConfig
// 热点提升配置
PromoteThreshold int // 访问次数阈值,超过后提升到 L1
PromoteInterval time.Duration // 提升检查间隔
}
// TieredCache 分层缓存L1 内存 + L2 磁盘)。
type TieredCache struct {
l1 *ProxyCache // L1 内存缓存(热点数据)
l2 *DiskCache // L2 磁盘缓存(持久化)
l1Ratio float64 // L1 容量占 L2 的比例
promoter *promoter // 热点提升器
stopCh chan struct{}
// 统计
l1Hits atomic.Int64
l2Hits atomic.Int64
misses atomic.Int64
promotes atomic.Int64
}
// promoter 热点提升器。
type promoter struct {
threshold int // 访问次数阈值
interval time.Duration // 检查间隔
accessMap map[uint64]*accessInfo
mu sync.RWMutex
}
// accessInfo 访问信息。
type accessInfo struct {
count int
lastAccess time.Time
origKey string
}
// NewTieredCache 创建分层缓存实例。
func NewTieredCache(cfg *TieredCacheConfig) (*TieredCache, error) {
// 创建 L1 内存缓存
l1 := NewProxyCache(nil, true, 0)
// 创建 L2 磁盘缓存
l2, err := NewDiskCache(cfg.L2Config)
if err != nil {
return nil, err
}
tc := &TieredCache{
l1: l1,
l2: l2,
l1Ratio: 0.1, // 默认 L1 占 10%
promoter: &promoter{
threshold: cfg.PromoteThreshold,
interval: cfg.PromoteInterval,
accessMap: make(map[uint64]*accessInfo),
},
stopCh: make(chan struct{}),
}
// 启动热点提升检查
if tc.promoter.threshold > 0 {
go tc.promoteLoop()
}
return tc, nil
}
// Get 获取缓存条目(实现 CacheBackend 接口)。
func (tc *TieredCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool, bool) {
// 1. 先查 L1
entry, exists, stale := tc.l1.Get(hashKey, origKey)
if exists {
tc.l1Hits.Add(1)
return entry, true, stale
}
// 2. 查 L2必须验证 max_age
entry, exists, stale = tc.l2.Get(hashKey, origKey)
if !exists {
tc.misses.Add(1)
return nil, false, false
}
tc.l2Hits.Add(1)
// 3. 记录访问(用于热点提升)
tc.recordAccess(hashKey, origKey)
// 4. L2 命中且未过期,异步提升到 L1
if !stale {
go tc.promoteToL1(hashKey, entry)
}
return entry, true, stale
}
// Set 设置缓存条目(实现 CacheBackend 接口)。
func (tc *TieredCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) {
// 同时写入 L1 和 L2
tc.l1.Set(hashKey, origKey, data, headers, status, maxAge)
// L2 异步写入(在 goroutine 中执行)
go tc.l2.Set(hashKey, origKey, data, headers, status, maxAge)
}
// Delete 删除缓存条目(实现 CacheBackend 接口)。
func (tc *TieredCache) Delete(hashKey uint64) error {
tc.l1.Delete(hashKey)
return tc.l2.Delete(hashKey)
}
// CacheStats 返回缓存统计信息(实现 CacheBackend 接口)。
func (tc *TieredCache) CacheStats() CacheStats {
l1Stats := tc.l1.CacheStats()
l2Stats := tc.l2.CacheStats()
return CacheStats{
Entries: l1Stats.Entries + l2Stats.Entries,
Size: l1Stats.Size + l2Stats.Size,
HitCount: tc.l1Hits.Load() + tc.l2Hits.Load(),
MissCount: tc.misses.Load(),
Evictions: l2Stats.Evictions,
}
}
// TieredCacheStats 返回分层缓存详细统计。
func (tc *TieredCache) TieredCacheStats() TieredCacheStats {
return TieredCacheStats{
L1Hits: tc.l1Hits.Load(),
L2Hits: tc.l2Hits.Load(),
Misses: tc.misses.Load(),
Promotes: tc.promotes.Load(),
L1Entries: tc.l1.CacheStats().Entries,
L2Entries: tc.l2.CacheStats().Entries,
}
}
// TieredCacheStats 分层缓存详细统计。
type TieredCacheStats struct {
L1Hits int64
L2Hits int64
Misses int64
Promotes int64
L1Entries int64
L2Entries int64
}
// Stop 停止分层缓存。
func (tc *TieredCache) Stop() {
close(tc.stopCh)
if tc.l2 != nil {
tc.l2.Stop()
}
}
// recordAccess 记录访问(用于热点提升)。
func (tc *TieredCache) recordAccess(hashKey uint64, origKey string) {
if tc.promoter.threshold <= 0 {
return
}
tc.promoter.mu.Lock()
defer tc.promoter.mu.Unlock()
info, exists := tc.promoter.accessMap[hashKey]
if !exists {
info = &accessInfo{origKey: origKey}
tc.promoter.accessMap[hashKey] = info
}
info.count++
info.lastAccess = time.Now()
}
// promoteToL1 提升条目到 L1。
func (tc *TieredCache) promoteToL1(hashKey uint64, entry *ProxyCacheEntry) {
tc.l1.Set(hashKey, entry.OrigKey, entry.Data, entry.Headers, entry.Status, entry.MaxAge)
tc.promotes.Add(1)
}
// promoteLoop 热点提升检查循环。
func (tc *TieredCache) promoteLoop() {
ticker := time.NewTicker(tc.promoter.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
tc.checkAndPromote()
case <-tc.stopCh:
return
}
}
}
// checkAndPromote 检查并提升热点数据。
func (tc *TieredCache) checkAndPromote() {
tc.promoter.mu.Lock()
defer tc.promoter.mu.Unlock()
for hashKey, info := range tc.promoter.accessMap {
if info.count >= tc.promoter.threshold {
// 从 L2 获取并提升到 L1
entry, exists, _ := tc.l2.Get(hashKey, info.origKey)
if exists {
tc.promoteToL1(hashKey, entry)
}
// 重置计数
info.count = 0
}
}
}
// revalidate 重新验证过期缓存。
func (tc *TieredCache) revalidate(hashKey uint64, origKey string, entry *ProxyCacheEntry) {
// 标记为正在更新
entry.Updating.Store(true)
// 删除 L1 中的过期条目(如果存在)
tc.l1.Delete(hashKey)
// 注意:实际的重新验证逻辑需要在 proxy 层实现
// 这里只是标记和清理,真正的重新获取由 proxy 层触发
}

359
internal/cache/tiered_cache_test.go vendored Normal file
View File

@ -0,0 +1,359 @@
package cache
import (
"testing"
"time"
)
func TestNewTieredCache(t *testing.T) {
tmpDir := t.TempDir()
cfg := &TieredCacheConfig{
L2Config: &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
},
PromoteThreshold: 3,
PromoteInterval: 100 * time.Millisecond,
}
tc, err := NewTieredCache(cfg)
if err != nil {
t.Fatalf("NewTieredCache failed: %v", err)
}
defer tc.Stop()
// 等待 L2 懒加载完成
<-tc.l2.loadCh
if tc.l1 == nil {
t.Error("l1 should not be nil")
}
if tc.l2 == nil {
t.Error("l2 should not be nil")
}
}
func TestTieredCacheSetGet(t *testing.T) {
tmpDir := t.TempDir()
cfg := &TieredCacheConfig{
L2Config: &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
},
}
tc, err := NewTieredCache(cfg)
if err != nil {
t.Fatalf("NewTieredCache failed: %v", err)
}
defer tc.Stop()
<-tc.l2.loadCh
// 设置缓存
hashKey := uint64(12345)
origKey := "GET:/api/test"
data := []byte("test response data")
tc.Set(hashKey, origKey, data, nil, 200, 10*time.Minute)
// 等待 L2 异步写入完成
time.Sleep(50 * time.Millisecond)
// 获取缓存(应该从 L1 获取)
entry, exists, stale := tc.Get(hashKey, origKey)
if !exists {
t.Fatal("cache entry not found")
}
if stale {
t.Error("entry should not be stale")
}
if string(entry.Data) != string(data) {
t.Errorf("Data = %q, want %q", entry.Data, data)
}
// 验证 L1 命中
stats := tc.TieredCacheStats()
if stats.L1Hits != 1 {
t.Errorf("L1Hits = %d, want 1", stats.L1Hits)
}
}
func TestTieredCacheL2Fallback(t *testing.T) {
tmpDir := t.TempDir()
cfg := &TieredCacheConfig{
L2Config: &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
},
}
tc, err := NewTieredCache(cfg)
if err != nil {
t.Fatalf("NewTieredCache failed: %v", err)
}
defer tc.Stop()
<-tc.l2.loadCh
// 直接写入 L2绕过 L1
hashKey := uint64(12345)
origKey := "GET:/api/test"
data := []byte("test from l2")
tc.l2.Set(hashKey, origKey, data, nil, 200, 10*time.Minute)
time.Sleep(50 * time.Millisecond)
// 获取缓存(应该从 L2 获取)
entry, exists, stale := tc.Get(hashKey, origKey)
if !exists {
t.Fatal("cache entry not found")
}
if stale {
t.Error("entry should not be stale")
}
if string(entry.Data) != string(data) {
t.Errorf("Data = %q, want %q", entry.Data, data)
}
// 验证 L2 命中
stats := tc.TieredCacheStats()
if stats.L2Hits != 1 {
t.Errorf("L2Hits = %d, want 1", stats.L2Hits)
}
}
func TestTieredCacheDelete(t *testing.T) {
tmpDir := t.TempDir()
cfg := &TieredCacheConfig{
L2Config: &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
},
}
tc, err := NewTieredCache(cfg)
if err != nil {
t.Fatalf("NewTieredCache failed: %v", err)
}
defer tc.Stop()
<-tc.l2.loadCh
// 设置缓存
hashKey := uint64(12345)
origKey := "GET:/api/test"
tc.Set(hashKey, origKey, []byte("test"), nil, 200, 10*time.Minute)
time.Sleep(50 * time.Millisecond)
// 删除
if err := tc.Delete(hashKey); err != nil {
t.Fatalf("Delete failed: %v", err)
}
// 验证 L1 和 L2 都已删除
_, exists, _ := tc.l1.Get(hashKey, origKey)
if exists {
t.Error("entry should not exist in L1 after delete")
}
_, exists, _ = tc.l2.Get(hashKey, origKey)
if exists {
t.Error("entry should not exist in L2 after delete")
}
}
func TestTieredCacheStale(t *testing.T) {
tmpDir := t.TempDir()
cfg := &TieredCacheConfig{
L2Config: &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
},
}
tc, err := NewTieredCache(cfg)
if err != nil {
t.Fatalf("NewTieredCache failed: %v", err)
}
defer tc.Stop()
<-tc.l2.loadCh
// 设置一个已过期的缓存
hashKey := uint64(12345)
origKey := "GET:/api/test"
tc.l2.Set(hashKey, origKey, []byte("test"), nil, 200, 1*time.Millisecond)
// 等待过期
time.Sleep(10 * time.Millisecond)
// 获取缓存
_, exists, stale := tc.Get(hashKey, origKey)
if !exists {
t.Fatal("expired entry should still exist")
}
if !stale {
t.Error("expired entry should be marked as stale")
}
}
func TestTieredCachePromote(t *testing.T) {
tmpDir := t.TempDir()
cfg := &TieredCacheConfig{
L2Config: &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
},
PromoteThreshold: 2,
PromoteInterval: 50 * time.Millisecond,
}
tc, err := NewTieredCache(cfg)
if err != nil {
t.Fatalf("NewTieredCache failed: %v", err)
}
defer tc.Stop()
<-tc.l2.loadCh
// 直接写入 L2
hashKey := uint64(12345)
origKey := "GET:/api/test"
data := []byte("test data")
tc.l2.Set(hashKey, origKey, data, nil, 200, 10*time.Minute)
time.Sleep(50 * time.Millisecond)
// 访问两次(达到阈值)
tc.Get(hashKey, origKey)
tc.Get(hashKey, origKey)
// 等待提升检查
time.Sleep(100 * time.Millisecond)
// 验证提升发生
stats := tc.TieredCacheStats()
if stats.Promotes == 0 {
t.Error("promotes should be > 0 after reaching threshold")
}
}
func TestTieredCacheStats(t *testing.T) {
tmpDir := t.TempDir()
cfg := &TieredCacheConfig{
L2Config: &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
},
}
tc, err := NewTieredCache(cfg)
if err != nil {
t.Fatalf("NewTieredCache failed: %v", err)
}
defer tc.Stop()
<-tc.l2.loadCh
// 设置缓存
tc.Set(1, "key1", []byte("data1"), nil, 200, 10*time.Minute)
tc.Set(2, "key2", []byte("data2"), nil, 200, 10*time.Minute)
time.Sleep(50 * time.Millisecond)
// 获取缓存
tc.Get(1, "key1") // L1 命中
tc.Get(1, "key1") // L1 命中
tc.Get(999, "nonexistent") // 未命中
stats := tc.TieredCacheStats()
if stats.L1Hits != 2 {
t.Errorf("L1Hits = %d, want 2", stats.L1Hits)
}
if stats.Misses != 1 {
t.Errorf("Misses = %d, want 1", stats.Misses)
}
}
func TestTieredCacheRestart(t *testing.T) {
tmpDir := t.TempDir()
cfg := &TieredCacheConfig{
L2Config: &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
},
}
// 第一个实例:写入数据
tc1, err := NewTieredCache(cfg)
if err != nil {
t.Fatalf("NewTieredCache failed: %v", err)
}
<-tc1.l2.loadCh
hashKey := uint64(12345)
origKey := "GET:/api/test"
data := []byte("persistent data")
tc1.Set(hashKey, origKey, data, nil, 200, 10*time.Minute)
time.Sleep(50 * time.Millisecond)
tc1.Stop()
// 第二个实例:读取数据(模拟重启)
tc2, err := NewTieredCache(cfg)
if err != nil {
t.Fatalf("NewTieredCache (restart) failed: %v", err)
}
<-tc2.l2.loadCh
defer tc2.Stop()
// 验证数据从 L2 恢复
entry, exists, _ := tc2.Get(hashKey, origKey)
if !exists {
t.Fatal("entry should exist after restart")
}
if string(entry.Data) != string(data) {
t.Errorf("Data = %q, want %q", entry.Data, data)
}
// 验证是从 L2 获取的
stats := tc2.TieredCacheStats()
if stats.L2Hits != 1 {
t.Errorf("L2Hits = %d, want 1", stats.L2Hits)
}
}
func TestTieredCacheCacheStats(t *testing.T) {
tmpDir := t.TempDir()
cfg := &TieredCacheConfig{
L2Config: &DiskCacheConfig{
Path: tmpDir,
Levels: "1:2",
},
}
tc, err := NewTieredCache(cfg)
if err != nil {
t.Fatalf("NewTieredCache failed: %v", err)
}
defer tc.Stop()
<-tc.l2.loadCh
// 设置缓存
tc.Set(1, "key1", []byte("data1"), nil, 200, 10*time.Minute)
time.Sleep(50 * time.Millisecond)
// 获取缓存统计
stats := tc.CacheStats()
if stats.Entries < 1 {
t.Errorf("Entries = %d, should be >= 1", stats.Entries)
}
}