diff --git a/internal/cache/backend.go b/internal/cache/backend.go new file mode 100644 index 0000000..13f299a --- /dev/null +++ b/internal/cache/backend.go @@ -0,0 +1,93 @@ +// Package cache 提供文件缓存和代理缓存功能。 +// +// 该文件定义 CacheBackend 接口,统一内存缓存和磁盘缓存的访问方式。 +// 支持分层缓存架构(L1 内存 + L2 磁盘),热点数据常驻内存,冷数据持久化到磁盘。 +// +// 主要用途: +// +// 作为缓存后端的抽象层,使 ProxyCache(内存)和 DiskCache(磁盘)可互换使用。 +// +// 设计原则: +// - 接口精简为核心 CRUD 操作 +// - Set 方法无返回值,与现有 ProxyCache.Set 签名一致 +// - Get 返回 stale 标志,支持过期缓存复用 +// +// 作者:xfy +package cache + +import "time" + +// CacheBackend 缓存后端接口。 +// +// 统一内存缓存和磁盘缓存的访问方式,支持分层缓存架构。 +// 实现包括 ProxyCache(内存)、DiskCache(磁盘)和 TieredCache(分层)。 +type CacheBackend interface { + // Get 获取缓存条目。 + // + // 参数: + // - hashKey: 缓存键的哈希值 + // - origKey: 原始缓存键(用于双重验证,防止哈希碰撞) + // + // 返回值: + // - *ProxyCacheEntry: 缓存条目,包含响应数据、头部、状态码等 + // - bool: 是否存在 + // - bool: 是否过期(stale),true 表示可使用过期缓存 + Get(hashKey uint64, origKey string) (entry *ProxyCacheEntry, exists bool, stale bool) + + // Set 设置缓存条目。 + // + // 无返回值,与现有 ProxyCache.Set 签名一致。 + // 写入操作为异步或同步取决于具体实现。 + // + // 参数: + // - hashKey: 缓存键的哈希值 + // - origKey: 原始缓存键 + // - data: 响应体数据 + // - headers: 响应头 + // - status: HTTP 状态码 + // - maxAge: 缓存有效期 + Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) + + // Delete 删除缓存条目。 + // + // 参数: + // - hashKey: 缓存键的哈希值 + // + // 返回值: + // - error: 删除失败时返回错误,成功返回 nil + Delete(hashKey uint64) error + + // CacheStats 返回缓存统计信息。 + // + // 注意:此方法名与 ProxyCache.Stats() 不同,以保持向后兼容。 + // ProxyCache 同时实现 Stats() 返回 ProxyCacheStats 和 CacheStats() 返回 CacheStats。 + // + // 返回值: + // - CacheStats: 缓存统计数据 + CacheStats() CacheStats +} + +// CacheStats 缓存统计信息。 +// +// 包含缓存的基本统计数据,用于监控和运维。 +type CacheStats struct { + // Entries 当前缓存条目数量 + Entries int64 + + // Size 缓存总大小(字节) + Size int64 + + // HitCount 缓存命中次数 + HitCount int64 + + // MissCount 缓存未命中次数 + MissCount int64 + + // Evictions 缓存淘汰次数 + Evictions int64 +} + +// 确保 ProxyCache 实现 CacheBackend 接口 +// ProxyCache 已有 Get, Set, Delete, Stats 方法,但签名略有不同: +// - Delete() 无返回值,需要添加包装方法 +// - Stats() 返回 ProxyCacheStats,需要适配 diff --git a/internal/cache/disk_cache.go b/internal/cache/disk_cache.go new file mode 100644 index 0000000..380b6b4 --- /dev/null +++ b/internal/cache/disk_cache.go @@ -0,0 +1,415 @@ +// Package cache 提供文件缓存和代理缓存功能。 +// +// 该文件实现 DiskCache 磁盘缓存后端,支持: +// - 目录层级配置(levels=1:2) +// - 原子写入策略(.tmp → .data) +// - 懒加载(后台加载元数据,不阻塞启动) +// - CRC32 校验和验证数据完整性 +// +// 主要用途: +// +// 作为 L2 缓存层,持久化代理响应到磁盘,支持服务重启后恢复。 +// +// 作者:xfy +package cache + +import ( + "encoding/json" + "fmt" + "hash/crc32" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" +) + +// DiskCacheConfig 磁盘缓存配置。 +type DiskCacheConfig struct { + // Path 缓存根目录 + Path string + + // Levels 目录层级,如 "1:2" 表示两级目录 + Levels string + + // MaxSize 最大缓存大小(字节) + MaxSize int64 + + // Inactive 未访问淘汰时间 + Inactive time.Duration +} + +// DiskCache 磁盘缓存实现。 +type DiskCache struct { + basePath string + levels []int + maxSize int64 + inactive time.Duration + currentSize atomic.Int64 + entries map[uint64]*DiskCacheMeta + mu sync.RWMutex + stopCh chan struct{} + + // 懒加载相关 + loaded atomic.Bool + loadCh chan struct{} + + // 统计 + hitCount atomic.Int64 + missCount atomic.Int64 + evictions atomic.Int64 +} + +// DiskCacheMeta 磁盘缓存元数据。 +type DiskCacheMeta struct { + HashKey uint64 `json:"hash_key"` + OrigKey string `json:"orig_key"` + Created time.Time `json:"created"` + MaxAge time.Duration `json:"max_age"` + Status int `json:"status"` + Size int64 `json:"size"` + Headers map[string]string `json:"headers,omitempty"` + CRC32 uint32 `json:"crc32"` +} + +// DiskCacheEntry 磁盘缓存条目(包含数据)。 +type DiskCacheEntry struct { + *DiskCacheMeta + Data []byte +} + +// NewDiskCache 创建磁盘缓存实例(懒加载模式)。 +func NewDiskCache(cfg *DiskCacheConfig) (*DiskCache, error) { + if cfg.Path == "" { + return nil, fmt.Errorf("disk cache path is required") + } + + // 确保目录存在 + if err := os.MkdirAll(cfg.Path, 0o755); err != nil { + return nil, fmt.Errorf("create cache directory: %w", err) + } + + dc := &DiskCache{ + basePath: cfg.Path, + levels: parseLevels(cfg.Levels), + maxSize: cfg.MaxSize, + inactive: cfg.Inactive, + entries: make(map[uint64]*DiskCacheMeta), + loadCh: make(chan struct{}), + stopCh: make(chan struct{}), + } + + // 启动后台加载,不阻塞服务启动 + go dc.lazyLoad() + + return dc, nil +} + +// parseLevels 解析目录层级配置。 +// 支持格式:""(无层级)、"1"(一级)、"1:2"(两级) +func parseLevels(levels string) []int { + if levels == "" { + return nil + } + + var result []int + start := 0 + for i := 0; i <= len(levels); i++ { + if i == len(levels) || levels[i] == ':' { + var level int + for j := start; j < i; j++ { + level = level*10 + int(levels[j]-'0') + } + if level > 0 { + result = append(result, level) + } + start = i + 1 + } + } + return result +} + +// lazyLoad 后台加载缓存元数据。 +func (dc *DiskCache) lazyLoad() { + defer close(dc.loadCh) + + // 扫描目录加载元数据(不加载实际数据) + filepath.Walk(dc.basePath, func(path string, info os.FileInfo, err error) error { + if err != nil || info.IsDir() { + return nil + } + + // 只处理 .meta 文件 + if filepath.Ext(path) != ".meta" { + return nil + } + + meta := dc.loadMetaFile(path) + if meta != nil { + dc.mu.Lock() + dc.entries[meta.HashKey] = meta + dc.mu.Unlock() + dc.currentSize.Add(meta.Size) + } + + return nil + }) + + dc.loaded.Store(true) +} + +// loadMetaFile 加载元数据文件。 +func (dc *DiskCache) loadMetaFile(path string) *DiskCacheMeta { + data, err := os.ReadFile(path) + if err != nil { + return nil + } + + var meta DiskCacheMeta + if err := json.Unmarshal(data, &meta); err != nil { + return nil + } + + return &meta +} + +// Get 获取缓存条目(实现 CacheBackend 接口)。 +func (dc *DiskCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool, bool) { + // 如果未完成加载,等待加载完成或超时 + if !dc.loaded.Load() { + select { + case <-dc.loadCh: + // 加载完成,继续 + case <-time.After(100 * time.Millisecond): + // 超时,返回未命中(避免阻塞请求) + dc.missCount.Add(1) + return nil, false, false + } + } + + dc.mu.RLock() + meta, exists := dc.entries[hashKey] + dc.mu.RUnlock() + + if !exists { + dc.missCount.Add(1) + return nil, false, false + } + + // 双重验证:检查原始 key 是否匹配 + if meta.OrigKey != origKey { + dc.missCount.Add(1) + return nil, false, false + } + + // 读取数据文件 + dataPath := dc.filePathFromHash(hashKey, "data") + data, err := os.ReadFile(dataPath) + if err != nil { + dc.missCount.Add(1) + return nil, false, false + } + + // 验证 CRC32 + if meta.CRC32 != 0 { + if crc32.ChecksumIEEE(data) != meta.CRC32 { + // 数据损坏,删除条目 + dc.Delete(hashKey) + dc.missCount.Add(1) + return nil, false, false + } + } + + // 检查是否过期 + now := time.Now() + expiresAt := meta.Created.Add(meta.MaxAge) + stale := now.After(expiresAt) + + dc.hitCount.Add(1) + + // 转换为 ProxyCacheEntry + entry := &ProxyCacheEntry{ + Key: meta.OrigKey, + OrigKey: meta.OrigKey, + Data: data, + Headers: meta.Headers, + Status: meta.Status, + Created: meta.Created, + MaxAge: meta.MaxAge, + } + + return entry, true, stale +} + +// Set 设置缓存条目(实现 CacheBackend 接口)。 +func (dc *DiskCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) { + // 计算文件路径 + dataPath := dc.filePathFromHash(hashKey, "data") + metaPath := dc.filePathFromHash(hashKey, "meta") + + // 确保目录存在 + dir := filepath.Dir(dataPath) + if err := os.MkdirAll(dir, 0o755); err != nil { + return + } + + // 计算 CRC32 + crc := crc32.ChecksumIEEE(data) + + // 创建元数据 + meta := &DiskCacheMeta{ + HashKey: hashKey, + OrigKey: origKey, + Created: time.Now(), + MaxAge: maxAge, + Status: status, + Size: int64(len(data)), + Headers: headers, + CRC32: crc, + } + + // 原子写入数据文件:先写临时文件,再重命名 + tmpDataPath := dataPath + ".tmp" + if err := os.WriteFile(tmpDataPath, data, 0o644); err != nil { + return + } + if err := os.Rename(tmpDataPath, dataPath); err != nil { + os.Remove(tmpDataPath) + return + } + + // 写入元数据文件 + metaData, err := json.Marshal(meta) + if err != nil { + return + } + tmpMetaPath := metaPath + ".tmp" + if err := os.WriteFile(tmpMetaPath, metaData, 0o644); err != nil { + return + } + if err := os.Rename(tmpMetaPath, metaPath); err != nil { + os.Remove(tmpMetaPath) + return + } + + // 更新内存索引 + dc.mu.Lock() + oldMeta, existed := dc.entries[hashKey] + dc.entries[hashKey] = meta + dc.mu.Unlock() + + // 更新大小统计 + if existed && oldMeta != nil { + dc.currentSize.Add(-oldMeta.Size) + } + dc.currentSize.Add(meta.Size) + + // 检查是否需要淘汰 + if dc.maxSize > 0 && dc.currentSize.Load() > dc.maxSize { + go dc.evict() + } +} + +// Delete 删除缓存条目(实现 CacheBackend 接口)。 +func (dc *DiskCache) Delete(hashKey uint64) error { + dc.mu.Lock() + meta, exists := dc.entries[hashKey] + if exists { + delete(dc.entries, hashKey) + dc.currentSize.Add(-meta.Size) + dc.evictions.Add(1) + } + dc.mu.Unlock() + + if !exists { + return nil + } + + // 删除文件 + dataPath := dc.filePathFromHash(hashKey, "data") + metaPath := dc.filePathFromHash(hashKey, "meta") + os.Remove(dataPath) + os.Remove(metaPath) + + return nil +} + +// CacheStats 返回缓存统计信息(实现 CacheBackend 接口)。 +func (dc *DiskCache) CacheStats() CacheStats { + dc.mu.RLock() + entries := int64(len(dc.entries)) + dc.mu.RUnlock() + + return CacheStats{ + Entries: entries, + Size: dc.currentSize.Load(), + HitCount: dc.hitCount.Load(), + MissCount: dc.missCount.Load(), + Evictions: dc.evictions.Load(), + } +} + +// Stop 停止磁盘缓存。 +func (dc *DiskCache) Stop() { + close(dc.stopCh) +} + +// filePathFromHash 根据哈希值计算文件路径。 +func (dc *DiskCache) filePathFromHash(hashKey uint64, ext string) string { + // 将哈希值转换为十六进制字符串 + hashStr := fmt.Sprintf("%016x", hashKey) + + if len(dc.levels) == 0 { + return filepath.Join(dc.basePath, hashStr+"."+ext) + } + + // 根据层级构建路径 + parts := []string{dc.basePath} + offset := len(hashStr) + for _, level := range dc.levels { + if offset < level { + break + } + offset -= level + parts = append(parts, hashStr[offset:offset+level]) + } + parts = append(parts, hashStr+"."+ext) + + return filepath.Join(parts...) +} + +// evict 淘汰旧条目。 +func (dc *DiskCache) evict() { + // 简单策略:删除最旧的条目直到大小低于阈值 + targetSize := dc.maxSize * 9 / 10 // 淘汰到 90% + + for dc.currentSize.Load() > targetSize { + dc.mu.Lock() + // 找到最旧的条目 + var oldestKey uint64 + var oldestTime time.Time + for key, meta := range dc.entries { + if oldestKey == 0 || meta.Created.Before(oldestTime) { + oldestKey = key + oldestTime = meta.Created + } + } + + if oldestKey == 0 { + dc.mu.Unlock() + break + } + + meta := dc.entries[oldestKey] + delete(dc.entries, oldestKey) + dc.currentSize.Add(-meta.Size) + dc.evictions.Add(1) + dc.mu.Unlock() + + // 删除文件 + dataPath := dc.filePathFromHash(oldestKey, "data") + metaPath := dc.filePathFromHash(oldestKey, "meta") + os.Remove(dataPath) + os.Remove(metaPath) + } +} diff --git a/internal/cache/disk_cache_test.go b/internal/cache/disk_cache_test.go new file mode 100644 index 0000000..4a78f3c --- /dev/null +++ b/internal/cache/disk_cache_test.go @@ -0,0 +1,386 @@ +package cache + +import ( + "os" + "path/filepath" + "testing" + "time" +) + +func TestNewDiskCache(t *testing.T) { + // 创建临时目录 + tmpDir := t.TempDir() + + cfg := &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + } + + dc, err := NewDiskCache(cfg) + if err != nil { + t.Fatalf("NewDiskCache failed: %v", err) + } + defer dc.Stop() + + // 等待懒加载完成 + <-dc.loadCh + + if dc.basePath != tmpDir { + t.Errorf("basePath = %q, want %q", dc.basePath, tmpDir) + } +} + +func TestDiskCacheSetGet(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + } + + dc, err := NewDiskCache(cfg) + if err != nil { + t.Fatalf("NewDiskCache failed: %v", err) + } + defer dc.Stop() + + // 等待懒加载完成 + <-dc.loadCh + + // 设置缓存 + hashKey := uint64(12345) + origKey := "GET:/api/test" + data := []byte("test response data") + headers := map[string]string{"Content-Type": "application/json"} + status := 200 + maxAge := 10 * time.Minute + + dc.Set(hashKey, origKey, data, headers, status, maxAge) + + // 获取缓存 + entry, exists, stale := dc.Get(hashKey, origKey) + if !exists { + t.Fatal("cache entry not found") + } + if stale { + t.Error("entry should not be stale") + } + if string(entry.Data) != string(data) { + t.Errorf("Data = %q, want %q", entry.Data, data) + } + if entry.Status != status { + t.Errorf("Status = %d, want %d", entry.Status, status) + } + if entry.Headers["Content-Type"] != "application/json" { + t.Errorf("Headers[Content-Type] = %q, want %q", entry.Headers["Content-Type"], "application/json") + } +} + +func TestDiskCacheDelete(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + } + + dc, err := NewDiskCache(cfg) + if err != nil { + t.Fatalf("NewDiskCache failed: %v", err) + } + defer dc.Stop() + + <-dc.loadCh + + // 设置缓存 + hashKey := uint64(12345) + origKey := "GET:/api/test" + dc.Set(hashKey, origKey, []byte("test"), nil, 200, 10*time.Minute) + + // 验证存在 + _, exists, _ := dc.Get(hashKey, origKey) + if !exists { + t.Fatal("entry should exist before delete") + } + + // 删除 + if err := dc.Delete(hashKey); err != nil { + t.Fatalf("Delete failed: %v", err) + } + + // 验证已删除 + _, exists, _ = dc.Get(hashKey, origKey) + if exists { + t.Error("entry should not exist after delete") + } +} + +func TestDiskCacheStale(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + } + + dc, err := NewDiskCache(cfg) + if err != nil { + t.Fatalf("NewDiskCache failed: %v", err) + } + defer dc.Stop() + + <-dc.loadCh + + // 设置一个已过期的缓存 + hashKey := uint64(12345) + origKey := "GET:/api/test" + dc.Set(hashKey, origKey, []byte("test"), nil, 200, 1*time.Millisecond) + + // 等待过期 + time.Sleep(10 * time.Millisecond) + + // 获取缓存 + entry, exists, stale := dc.Get(hashKey, origKey) + if !exists { + t.Fatal("expired entry should still exist") + } + if !stale { + t.Error("expired entry should be marked as stale") + } + if string(entry.Data) != "test" { + t.Errorf("Data = %q, want %q", entry.Data, "test") + } +} + +func TestDiskCacheLevels(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + } + + dc, err := NewDiskCache(cfg) + if err != nil { + t.Fatalf("NewDiskCache failed: %v", err) + } + defer dc.Stop() + + <-dc.loadCh + + // 设置缓存 + hashKey := uint64(0xabcdef1234567890) + origKey := "GET:/api/test" + dc.Set(hashKey, origKey, []byte("test"), nil, 200, 10*time.Minute) + + // 验证文件路径包含层级目录 + dataPath := dc.filePathFromHash(hashKey, "data") + if filepath.Dir(dataPath) == tmpDir { + t.Error("file should be in a subdirectory for levels=1:2") + } + + // 验证文件存在 + if _, err := os.Stat(dataPath); os.IsNotExist(err) { + t.Errorf("data file not found at %s", dataPath) + } +} + +func TestDiskCacheMaxSize(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + MaxSize: 100, // 很小的限制 + } + + dc, err := NewDiskCache(cfg) + if err != nil { + t.Fatalf("NewDiskCache failed: %v", err) + } + defer dc.Stop() + + <-dc.loadCh + + // 设置多个缓存条目 + for i := range 10 { + hashKey := uint64(i) + origKey := "GET:/api/test" + dc.Set(hashKey, origKey, []byte("test data that is longer than 10 bytes"), nil, 200, 10*time.Minute) + } + + // 等待淘汰完成(淘汰是异步的) + time.Sleep(500 * time.Millisecond) + + // 验证淘汰发生(Evictions > 0) + stats := dc.CacheStats() + if stats.Evictions == 0 { + t.Error("Evictions should be > 0 when MaxSize is exceeded") + } +} + +func TestDiskCacheStats(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + } + + dc, err := NewDiskCache(cfg) + if err != nil { + t.Fatalf("NewDiskCache failed: %v", err) + } + defer dc.Stop() + + <-dc.loadCh + + // 初始统计 + stats := dc.CacheStats() + if stats.Entries != 0 { + t.Errorf("initial Entries = %d, want 0", stats.Entries) + } + + // 设置缓存 + dc.Set(1, "key1", []byte("data1"), nil, 200, 10*time.Minute) + dc.Set(2, "key2", []byte("data2"), nil, 200, 10*time.Minute) + + stats = dc.CacheStats() + if stats.Entries != 2 { + t.Errorf("Entries = %d, want 2", stats.Entries) + } + + // 获取缓存(命中) + dc.Get(1, "key1") + stats = dc.CacheStats() + if stats.HitCount != 1 { + t.Errorf("HitCount = %d, want 1", stats.HitCount) + } + + // 获取不存在的缓存(未命中) + dc.Get(999, "nonexistent") + stats = dc.CacheStats() + if stats.MissCount != 1 { + t.Errorf("MissCount = %d, want 1", stats.MissCount) + } +} + +func TestDiskCacheCRC32(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + } + + dc, err := NewDiskCache(cfg) + if err != nil { + t.Fatalf("NewDiskCache failed: %v", err) + } + defer dc.Stop() + + <-dc.loadCh + + // 设置缓存 + hashKey := uint64(12345) + origKey := "GET:/api/test" + data := []byte("test data for crc32") + dc.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) + + // 获取缓存验证 CRC32 + entry, exists, _ := dc.Get(hashKey, origKey) + if !exists { + t.Fatal("entry not found") + } + if string(entry.Data) != string(data) { + t.Errorf("Data mismatch") + } +} + +func TestParseLevels(t *testing.T) { + tests := []struct { + input string + expected []int + }{ + {"", nil}, + {"1", []int{1}}, + {"1:2", []int{1, 2}}, + {"2:2:2", []int{2, 2, 2}}, + } + + for _, tt := range tests { + result := parseLevels(tt.input) + if len(result) != len(tt.expected) { + t.Errorf("parseLevels(%q) = %v, want %v", tt.input, result, tt.expected) + continue + } + for i, v := range result { + if v != tt.expected[i] { + t.Errorf("parseLevels(%q)[%d] = %d, want %d", tt.input, i, v, tt.expected[i]) + } + } + } +} + +func TestDiskCacheLazyLoad(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + } + + dc, err := NewDiskCache(cfg) + if err != nil { + t.Fatalf("NewDiskCache failed: %v", err) + } + defer dc.Stop() + + // 在懒加载完成前,loaded 应该是 false + // 但由于加载很快,我们无法可靠测试这个状态 + // 所以我们等待加载完成 + <-dc.loadCh + + if !dc.loaded.Load() { + t.Error("loaded should be true after lazyLoad completes") + } +} + +func TestDiskCacheRestart(t *testing.T) { + tmpDir := t.TempDir() + + // 第一个实例:写入数据 + cfg := &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + } + + dc1, err := NewDiskCache(cfg) + if err != nil { + t.Fatalf("NewDiskCache failed: %v", err) + } + <-dc1.loadCh + + hashKey := uint64(12345) + origKey := "GET:/api/test" + data := []byte("persistent data") + dc1.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) + dc1.Stop() + + // 第二个实例:读取数据(模拟重启) + dc2, err := NewDiskCache(cfg) + if err != nil { + t.Fatalf("NewDiskCache (restart) failed: %v", err) + } + <-dc2.loadCh + defer dc2.Stop() + + // 验证数据恢复 + entry, exists, _ := dc2.Get(hashKey, origKey) + if !exists { + t.Fatal("entry should exist after restart") + } + if string(entry.Data) != string(data) { + t.Errorf("Data = %q, want %q", entry.Data, data) + } +} diff --git a/internal/cache/file_cache.go b/internal/cache/file_cache.go index b13b19e..f7f716c 100644 --- a/internal/cache/file_cache.go +++ b/internal/cache/file_cache.go @@ -554,10 +554,11 @@ func (c *ProxyCache) matchStatus(statuses []int, status int) bool { } // Delete 删除缓存条目。 -func (c *ProxyCache) Delete(hashKey uint64) { +func (c *ProxyCache) Delete(hashKey uint64) error { c.mu.Lock() defer c.mu.Unlock() delete(c.entries, hashKey) + return nil } // DeleteByPatternWithMethod 按通配符模式删除缓存条目。 @@ -648,3 +649,20 @@ type ProxyCacheStats struct { // Pending 正在等待缓存生成的请求数量 Pending int } + +// ToCacheStats 转换为 CacheStats 格式。 +func (s ProxyCacheStats) ToCacheStats() CacheStats { + return CacheStats{ + Entries: int64(s.Entries), + } +} + +// CacheStats 返回缓存统计信息(实现 CacheBackend 接口)。 +func (c *ProxyCache) CacheStats() CacheStats { + c.mu.RLock() + defer c.mu.RUnlock() + + return CacheStats{ + Entries: int64(len(c.entries)), + } +} diff --git a/internal/cache/tiered_cache.go b/internal/cache/tiered_cache.go new file mode 100644 index 0000000..cdb6d99 --- /dev/null +++ b/internal/cache/tiered_cache.go @@ -0,0 +1,252 @@ +// Package cache 提供文件缓存和代理缓存功能。 +// +// 该文件实现 TieredCache 分层缓存,支持: +// - L1 内存缓存(热点数据) +// - L2 磁盘缓存(持久化) +// - 热点提升机制 +// - L2 过期检查 +// +// 主要用途: +// +// 作为代理缓存的主要实现,平衡性能(L1)和容量(L2)。 +// +// 作者:xfy +package cache + +import ( + "sync" + "sync/atomic" + "time" +) + +// TieredCacheConfig 分层缓存配置。 +type TieredCacheConfig struct { + // L1 配置 + L1MaxEntries int64 + L1MaxSize int64 + + // L2 配置 + L2Config *DiskCacheConfig + + // 热点提升配置 + PromoteThreshold int // 访问次数阈值,超过后提升到 L1 + PromoteInterval time.Duration // 提升检查间隔 +} + +// TieredCache 分层缓存(L1 内存 + L2 磁盘)。 +type TieredCache struct { + l1 *ProxyCache // L1 内存缓存(热点数据) + l2 *DiskCache // L2 磁盘缓存(持久化) + l1Ratio float64 // L1 容量占 L2 的比例 + promoter *promoter // 热点提升器 + stopCh chan struct{} + + // 统计 + l1Hits atomic.Int64 + l2Hits atomic.Int64 + misses atomic.Int64 + promotes atomic.Int64 +} + +// promoter 热点提升器。 +type promoter struct { + threshold int // 访问次数阈值 + interval time.Duration // 检查间隔 + accessMap map[uint64]*accessInfo + mu sync.RWMutex +} + +// accessInfo 访问信息。 +type accessInfo struct { + count int + lastAccess time.Time + origKey string +} + +// NewTieredCache 创建分层缓存实例。 +func NewTieredCache(cfg *TieredCacheConfig) (*TieredCache, error) { + // 创建 L1 内存缓存 + l1 := NewProxyCache(nil, true, 0) + + // 创建 L2 磁盘缓存 + l2, err := NewDiskCache(cfg.L2Config) + if err != nil { + return nil, err + } + + tc := &TieredCache{ + l1: l1, + l2: l2, + l1Ratio: 0.1, // 默认 L1 占 10% + promoter: &promoter{ + threshold: cfg.PromoteThreshold, + interval: cfg.PromoteInterval, + accessMap: make(map[uint64]*accessInfo), + }, + stopCh: make(chan struct{}), + } + + // 启动热点提升检查 + if tc.promoter.threshold > 0 { + go tc.promoteLoop() + } + + return tc, nil +} + +// Get 获取缓存条目(实现 CacheBackend 接口)。 +func (tc *TieredCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool, bool) { + // 1. 先查 L1 + entry, exists, stale := tc.l1.Get(hashKey, origKey) + if exists { + tc.l1Hits.Add(1) + return entry, true, stale + } + + // 2. 查 L2,必须验证 max_age + entry, exists, stale = tc.l2.Get(hashKey, origKey) + if !exists { + tc.misses.Add(1) + return nil, false, false + } + + tc.l2Hits.Add(1) + + // 3. 记录访问(用于热点提升) + tc.recordAccess(hashKey, origKey) + + // 4. L2 命中且未过期,异步提升到 L1 + if !stale { + go tc.promoteToL1(hashKey, entry) + } + + return entry, true, stale +} + +// Set 设置缓存条目(实现 CacheBackend 接口)。 +func (tc *TieredCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) { + // 同时写入 L1 和 L2 + tc.l1.Set(hashKey, origKey, data, headers, status, maxAge) + + // L2 异步写入(在 goroutine 中执行) + go tc.l2.Set(hashKey, origKey, data, headers, status, maxAge) +} + +// Delete 删除缓存条目(实现 CacheBackend 接口)。 +func (tc *TieredCache) Delete(hashKey uint64) error { + tc.l1.Delete(hashKey) + return tc.l2.Delete(hashKey) +} + +// CacheStats 返回缓存统计信息(实现 CacheBackend 接口)。 +func (tc *TieredCache) CacheStats() CacheStats { + l1Stats := tc.l1.CacheStats() + l2Stats := tc.l2.CacheStats() + + return CacheStats{ + Entries: l1Stats.Entries + l2Stats.Entries, + Size: l1Stats.Size + l2Stats.Size, + HitCount: tc.l1Hits.Load() + tc.l2Hits.Load(), + MissCount: tc.misses.Load(), + Evictions: l2Stats.Evictions, + } +} + +// TieredCacheStats 返回分层缓存详细统计。 +func (tc *TieredCache) TieredCacheStats() TieredCacheStats { + return TieredCacheStats{ + L1Hits: tc.l1Hits.Load(), + L2Hits: tc.l2Hits.Load(), + Misses: tc.misses.Load(), + Promotes: tc.promotes.Load(), + L1Entries: tc.l1.CacheStats().Entries, + L2Entries: tc.l2.CacheStats().Entries, + } +} + +// TieredCacheStats 分层缓存详细统计。 +type TieredCacheStats struct { + L1Hits int64 + L2Hits int64 + Misses int64 + Promotes int64 + L1Entries int64 + L2Entries int64 +} + +// Stop 停止分层缓存。 +func (tc *TieredCache) Stop() { + close(tc.stopCh) + if tc.l2 != nil { + tc.l2.Stop() + } +} + +// recordAccess 记录访问(用于热点提升)。 +func (tc *TieredCache) recordAccess(hashKey uint64, origKey string) { + if tc.promoter.threshold <= 0 { + return + } + + tc.promoter.mu.Lock() + defer tc.promoter.mu.Unlock() + + info, exists := tc.promoter.accessMap[hashKey] + if !exists { + info = &accessInfo{origKey: origKey} + tc.promoter.accessMap[hashKey] = info + } + info.count++ + info.lastAccess = time.Now() +} + +// promoteToL1 提升条目到 L1。 +func (tc *TieredCache) promoteToL1(hashKey uint64, entry *ProxyCacheEntry) { + tc.l1.Set(hashKey, entry.OrigKey, entry.Data, entry.Headers, entry.Status, entry.MaxAge) + tc.promotes.Add(1) +} + +// promoteLoop 热点提升检查循环。 +func (tc *TieredCache) promoteLoop() { + ticker := time.NewTicker(tc.promoter.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + tc.checkAndPromote() + case <-tc.stopCh: + return + } + } +} + +// checkAndPromote 检查并提升热点数据。 +func (tc *TieredCache) checkAndPromote() { + tc.promoter.mu.Lock() + defer tc.promoter.mu.Unlock() + + for hashKey, info := range tc.promoter.accessMap { + if info.count >= tc.promoter.threshold { + // 从 L2 获取并提升到 L1 + entry, exists, _ := tc.l2.Get(hashKey, info.origKey) + if exists { + tc.promoteToL1(hashKey, entry) + } + // 重置计数 + info.count = 0 + } + } +} + +// revalidate 重新验证过期缓存。 +func (tc *TieredCache) revalidate(hashKey uint64, origKey string, entry *ProxyCacheEntry) { + // 标记为正在更新 + entry.Updating.Store(true) + + // 删除 L1 中的过期条目(如果存在) + tc.l1.Delete(hashKey) + + // 注意:实际的重新验证逻辑需要在 proxy 层实现 + // 这里只是标记和清理,真正的重新获取由 proxy 层触发 +} diff --git a/internal/cache/tiered_cache_test.go b/internal/cache/tiered_cache_test.go new file mode 100644 index 0000000..86fac74 --- /dev/null +++ b/internal/cache/tiered_cache_test.go @@ -0,0 +1,359 @@ +package cache + +import ( + "testing" + "time" +) + +func TestNewTieredCache(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &TieredCacheConfig{ + L2Config: &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + }, + PromoteThreshold: 3, + PromoteInterval: 100 * time.Millisecond, + } + + tc, err := NewTieredCache(cfg) + if err != nil { + t.Fatalf("NewTieredCache failed: %v", err) + } + defer tc.Stop() + + // 等待 L2 懒加载完成 + <-tc.l2.loadCh + + if tc.l1 == nil { + t.Error("l1 should not be nil") + } + if tc.l2 == nil { + t.Error("l2 should not be nil") + } +} + +func TestTieredCacheSetGet(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &TieredCacheConfig{ + L2Config: &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + }, + } + + tc, err := NewTieredCache(cfg) + if err != nil { + t.Fatalf("NewTieredCache failed: %v", err) + } + defer tc.Stop() + + <-tc.l2.loadCh + + // 设置缓存 + hashKey := uint64(12345) + origKey := "GET:/api/test" + data := []byte("test response data") + + tc.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) + + // 等待 L2 异步写入完成 + time.Sleep(50 * time.Millisecond) + + // 获取缓存(应该从 L1 获取) + entry, exists, stale := tc.Get(hashKey, origKey) + if !exists { + t.Fatal("cache entry not found") + } + if stale { + t.Error("entry should not be stale") + } + if string(entry.Data) != string(data) { + t.Errorf("Data = %q, want %q", entry.Data, data) + } + + // 验证 L1 命中 + stats := tc.TieredCacheStats() + if stats.L1Hits != 1 { + t.Errorf("L1Hits = %d, want 1", stats.L1Hits) + } +} + +func TestTieredCacheL2Fallback(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &TieredCacheConfig{ + L2Config: &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + }, + } + + tc, err := NewTieredCache(cfg) + if err != nil { + t.Fatalf("NewTieredCache failed: %v", err) + } + defer tc.Stop() + + <-tc.l2.loadCh + + // 直接写入 L2(绕过 L1) + hashKey := uint64(12345) + origKey := "GET:/api/test" + data := []byte("test from l2") + + tc.l2.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) + time.Sleep(50 * time.Millisecond) + + // 获取缓存(应该从 L2 获取) + entry, exists, stale := tc.Get(hashKey, origKey) + if !exists { + t.Fatal("cache entry not found") + } + if stale { + t.Error("entry should not be stale") + } + if string(entry.Data) != string(data) { + t.Errorf("Data = %q, want %q", entry.Data, data) + } + + // 验证 L2 命中 + stats := tc.TieredCacheStats() + if stats.L2Hits != 1 { + t.Errorf("L2Hits = %d, want 1", stats.L2Hits) + } +} + +func TestTieredCacheDelete(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &TieredCacheConfig{ + L2Config: &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + }, + } + + tc, err := NewTieredCache(cfg) + if err != nil { + t.Fatalf("NewTieredCache failed: %v", err) + } + defer tc.Stop() + + <-tc.l2.loadCh + + // 设置缓存 + hashKey := uint64(12345) + origKey := "GET:/api/test" + tc.Set(hashKey, origKey, []byte("test"), nil, 200, 10*time.Minute) + time.Sleep(50 * time.Millisecond) + + // 删除 + if err := tc.Delete(hashKey); err != nil { + t.Fatalf("Delete failed: %v", err) + } + + // 验证 L1 和 L2 都已删除 + _, exists, _ := tc.l1.Get(hashKey, origKey) + if exists { + t.Error("entry should not exist in L1 after delete") + } + + _, exists, _ = tc.l2.Get(hashKey, origKey) + if exists { + t.Error("entry should not exist in L2 after delete") + } +} + +func TestTieredCacheStale(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &TieredCacheConfig{ + L2Config: &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + }, + } + + tc, err := NewTieredCache(cfg) + if err != nil { + t.Fatalf("NewTieredCache failed: %v", err) + } + defer tc.Stop() + + <-tc.l2.loadCh + + // 设置一个已过期的缓存 + hashKey := uint64(12345) + origKey := "GET:/api/test" + tc.l2.Set(hashKey, origKey, []byte("test"), nil, 200, 1*time.Millisecond) + + // 等待过期 + time.Sleep(10 * time.Millisecond) + + // 获取缓存 + _, exists, stale := tc.Get(hashKey, origKey) + if !exists { + t.Fatal("expired entry should still exist") + } + if !stale { + t.Error("expired entry should be marked as stale") + } +} + +func TestTieredCachePromote(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &TieredCacheConfig{ + L2Config: &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + }, + PromoteThreshold: 2, + PromoteInterval: 50 * time.Millisecond, + } + + tc, err := NewTieredCache(cfg) + if err != nil { + t.Fatalf("NewTieredCache failed: %v", err) + } + defer tc.Stop() + + <-tc.l2.loadCh + + // 直接写入 L2 + hashKey := uint64(12345) + origKey := "GET:/api/test" + data := []byte("test data") + tc.l2.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) + time.Sleep(50 * time.Millisecond) + + // 访问两次(达到阈值) + tc.Get(hashKey, origKey) + tc.Get(hashKey, origKey) + + // 等待提升检查 + time.Sleep(100 * time.Millisecond) + + // 验证提升发生 + stats := tc.TieredCacheStats() + if stats.Promotes == 0 { + t.Error("promotes should be > 0 after reaching threshold") + } +} + +func TestTieredCacheStats(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &TieredCacheConfig{ + L2Config: &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + }, + } + + tc, err := NewTieredCache(cfg) + if err != nil { + t.Fatalf("NewTieredCache failed: %v", err) + } + defer tc.Stop() + + <-tc.l2.loadCh + + // 设置缓存 + tc.Set(1, "key1", []byte("data1"), nil, 200, 10*time.Minute) + tc.Set(2, "key2", []byte("data2"), nil, 200, 10*time.Minute) + time.Sleep(50 * time.Millisecond) + + // 获取缓存 + tc.Get(1, "key1") // L1 命中 + tc.Get(1, "key1") // L1 命中 + tc.Get(999, "nonexistent") // 未命中 + + stats := tc.TieredCacheStats() + if stats.L1Hits != 2 { + t.Errorf("L1Hits = %d, want 2", stats.L1Hits) + } + if stats.Misses != 1 { + t.Errorf("Misses = %d, want 1", stats.Misses) + } +} + +func TestTieredCacheRestart(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &TieredCacheConfig{ + L2Config: &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + }, + } + + // 第一个实例:写入数据 + tc1, err := NewTieredCache(cfg) + if err != nil { + t.Fatalf("NewTieredCache failed: %v", err) + } + <-tc1.l2.loadCh + + hashKey := uint64(12345) + origKey := "GET:/api/test" + data := []byte("persistent data") + tc1.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) + time.Sleep(50 * time.Millisecond) + tc1.Stop() + + // 第二个实例:读取数据(模拟重启) + tc2, err := NewTieredCache(cfg) + if err != nil { + t.Fatalf("NewTieredCache (restart) failed: %v", err) + } + <-tc2.l2.loadCh + defer tc2.Stop() + + // 验证数据从 L2 恢复 + entry, exists, _ := tc2.Get(hashKey, origKey) + if !exists { + t.Fatal("entry should exist after restart") + } + if string(entry.Data) != string(data) { + t.Errorf("Data = %q, want %q", entry.Data, data) + } + + // 验证是从 L2 获取的 + stats := tc2.TieredCacheStats() + if stats.L2Hits != 1 { + t.Errorf("L2Hits = %d, want 1", stats.L2Hits) + } +} + +func TestTieredCacheCacheStats(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &TieredCacheConfig{ + L2Config: &DiskCacheConfig{ + Path: tmpDir, + Levels: "1:2", + }, + } + + tc, err := NewTieredCache(cfg) + if err != nil { + t.Fatalf("NewTieredCache failed: %v", err) + } + defer tc.Stop() + + <-tc.l2.loadCh + + // 设置缓存 + tc.Set(1, "key1", []byte("data1"), nil, 200, 10*time.Minute) + time.Sleep(50 * time.Millisecond) + + // 获取缓存统计 + stats := tc.CacheStats() + if stats.Entries < 1 { + t.Errorf("Entries = %d, should be >= 1", stats.Entries) + } +}