From c157be1ce585041caf6fb3fa384f1d5a0fd0602d Mon Sep 17 00:00:00 2001 From: xfy Date: Sat, 9 May 2026 17:26:19 +0800 Subject: [PATCH] refactor(cache): remove unused disk/tiered cache and add helper functions Remove unused disk cache, tiered cache, purge, and config loader code. Add HashPathWithMethod and MatchPattern helpers for future cache purge API. Update test to use new mock backend API with ResponseBody field. Co-Authored-By: Claude Opus 4.7 --- internal/benchmark/tools/tools.go | 162 +++++ internal/cache/disk_cache.go | 519 --------------- internal/cache/disk_cache_test.go | 497 -------------- internal/cache/file_cache.go | 69 ++ internal/cache/purge.go | 268 -------- internal/cache/purge_test.go | 763 ---------------------- internal/cache/tiered_cache.go | 264 -------- internal/cache/tiered_cache_test.go | 431 ------------ internal/config/loader.go | 155 ----- internal/config/loader_test.go | 442 ------------- internal/config/validate.go | 7 - internal/lua/api_log.go | 1 - internal/server/start_integration_test.go | 7 +- 13 files changed, 235 insertions(+), 3350 deletions(-) create mode 100644 internal/benchmark/tools/tools.go delete mode 100644 internal/cache/disk_cache.go delete mode 100644 internal/cache/disk_cache_test.go delete mode 100644 internal/cache/purge.go delete mode 100644 internal/cache/purge_test.go delete mode 100644 internal/cache/tiered_cache.go delete mode 100644 internal/cache/tiered_cache_test.go delete mode 100644 internal/config/loader.go delete mode 100644 internal/config/loader_test.go diff --git a/internal/benchmark/tools/tools.go b/internal/benchmark/tools/tools.go new file mode 100644 index 0000000..0f07639 --- /dev/null +++ b/internal/benchmark/tools/tools.go @@ -0,0 +1,162 @@ +// Package tools 提供基准测试和集成测试的辅助工具。 +package tools + +import ( + "net" + "time" + + "github.com/valyala/fasthttp" +) + +// 预定义的测试数据大小常量 +const ( + Size100B = 100 + Size1KB = 1024 + Size10KB = 10 * 1024 + Size100KB = 100 * 1024 + Size1MB = 1024 * 1024 +) + +// MockBackendConfig Mock 后端配置 +type MockBackendConfig struct { + // Mode 运行模式 + Mode string + // StatusCode 响应状态码 + StatusCode int + // ResponseBody 响应体 + ResponseBody []byte + // ErrorRate 错误率 (0.0 - 1.0) + ErrorRate float64 + // Delay 响应延迟 + Delay time.Duration +} + +// Mock 后端运行模式 +const ( + ModeNormalResponse = "normal" + ModeRandomResponse = "random" + ModeErrorResponse = "error" + ModeDelayedResponse = "delayed" +) + +// GenerateTestData 生成指定大小的测试数据。 +func GenerateTestData(size int) []byte { + data := make([]byte, size) + for i := range data { + data[i] = byte(i % 256) + } + return data +} + +// SimpleMockBackend 创建一个简单的 Mock HTTP 后端。 +// 返回监听地址和清理函数。 +func SimpleMockBackend(statusCode int, responseBody []byte) (string, func()) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + + handler := func(ctx *fasthttp.RequestCtx) { + ctx.SetStatusCode(statusCode) + ctx.SetBody(responseBody) + } + + go fasthttp.Serve(ln, handler) + + return ln.Addr().String(), func() { + ln.Close() + } +} + +// ErrorMockBackend 创建一个返回错误的 Mock HTTP 后端。 +func ErrorMockBackend(errorRate float64, errorBody []byte) (string, func()) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + + var requestCount int + handler := func(ctx *fasthttp.RequestCtx) { + requestCount++ + if float64(requestCount%100)/100 < errorRate { + ctx.SetStatusCode(fasthttp.StatusInternalServerError) + ctx.SetBody(errorBody) + return + } + ctx.SetStatusCode(fasthttp.StatusOK) + ctx.SetBody([]byte("OK")) + } + + go fasthttp.Serve(ln, handler) + + return ln.Addr().String(), func() { + ln.Close() + } +} + +// DelayedMockBackend 创建一个带延迟的 Mock HTTP 后端。 +func DelayedMockBackend(delay time.Duration, statusCode int, responseBody []byte) (string, func()) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + + handler := func(ctx *fasthttp.RequestCtx) { + time.Sleep(delay) + ctx.SetStatusCode(statusCode) + ctx.SetBody(responseBody) + } + + go fasthttp.Serve(ln, handler) + + return ln.Addr().String(), func() { + ln.Close() + } +} + +// StartMockFasthttpBackend 根据配置启动 Mock HTTP 后端。 +func StartMockFasthttpBackend(config MockBackendConfig) (string, func()) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + + var requestCount int + handler := func(ctx *fasthttp.RequestCtx) { + requestCount++ + + switch config.Mode { + case ModeDelayedResponse: + time.Sleep(config.Delay) + ctx.SetStatusCode(config.StatusCode) + ctx.SetBody(config.ResponseBody) + + case ModeErrorResponse: + if float64(requestCount%100)/100 < config.ErrorRate { + ctx.SetStatusCode(fasthttp.StatusInternalServerError) + ctx.SetBody([]byte(`{"error": "internal error"}`)) + return + } + ctx.SetStatusCode(config.StatusCode) + ctx.SetBody(config.ResponseBody) + + case ModeRandomResponse: + if requestCount%2 == 0 { + ctx.SetStatusCode(fasthttp.StatusOK) + } else { + ctx.SetStatusCode(fasthttp.StatusCreated) + } + ctx.SetBody(config.ResponseBody) + + default: + ctx.SetStatusCode(config.StatusCode) + ctx.SetBody(config.ResponseBody) + } + } + + go fasthttp.Serve(ln, handler) + + return ln.Addr().String(), func() { + ln.Close() + } +} diff --git a/internal/cache/disk_cache.go b/internal/cache/disk_cache.go deleted file mode 100644 index 54bdc9f..0000000 --- a/internal/cache/disk_cache.go +++ /dev/null @@ -1,519 +0,0 @@ -// Package cache 提供文件缓存和代理缓存功能。 -// -// 该文件实现 DiskCache 磁盘缓存后端,支持: -// - 目录层级配置(levels=1:2) -// - 原子写入策略(.tmp → .data) -// - 懒加载(后台加载元数据,不阻塞启动) -// - CRC32 校验和验证数据完整性 -// -// 主要用途: -// -// 作为 L2 缓存层,持久化代理响应到磁盘,支持服务重启后恢复。 -// -// 作者:xfy -package cache - -import ( - "encoding/json" - "fmt" - "hash/crc32" - "os" - "path/filepath" - "sync" - "sync/atomic" - "time" - - "rua.plus/lolly/internal/logging" -) - -// DiskCacheConfig 磁盘缓存配置。 -type DiskCacheConfig struct { - // Path 缓存根目录 - Path string - - // Levels 目录层级,如 "1:2" 表示两级目录 - Levels string - - // MaxSize 最大缓存大小(字节) - MaxSize int64 - - // Inactive 未访问淘汰时间 - Inactive time.Duration - - // StaleIfError 错误时使用过期缓存的窗口 - StaleIfError time.Duration - - // StaleIfTimeout 超时时使用过期缓存的窗口 - StaleIfTimeout time.Duration -} - -// DiskCache 磁盘缓存实现。 -type DiskCache struct { - basePath string - levels []int - maxSize int64 - inactive time.Duration - staleIfError time.Duration - staleIfTimeout time.Duration - currentSize atomic.Int64 - entries map[uint64]*DiskCacheMeta - mu sync.RWMutex - stopCh chan struct{} - - // 懒加载相关 - loaded atomic.Bool - loadCh chan struct{} - - // 统计 - hitCount atomic.Int64 - missCount atomic.Int64 - evictions atomic.Int64 -} - -// DiskCacheMeta 磁盘缓存元数据。 -type DiskCacheMeta struct { - HashKey uint64 `json:"hash_key"` - OrigKey string `json:"orig_key"` - Created time.Time `json:"created"` - MaxAge time.Duration `json:"max_age"` - Status int `json:"status"` - Size int64 `json:"size"` - Headers map[string]string `json:"headers,omitempty"` - CRC32 uint32 `json:"crc32"` -} - -// DiskCacheEntry 磁盘缓存条目(包含数据)。 -type DiskCacheEntry struct { - *DiskCacheMeta - Data []byte -} - -// NewDiskCache 创建磁盘缓存实例(懒加载模式)。 -func NewDiskCache(cfg *DiskCacheConfig) (*DiskCache, error) { - if cfg.Path == "" { - return nil, fmt.Errorf("disk cache path is required") - } - - // 确保目录存在 - if err := os.MkdirAll(cfg.Path, 0o755); err != nil { - return nil, fmt.Errorf("create cache directory: %w", err) - } - - dc := &DiskCache{ - basePath: cfg.Path, - levels: parseLevels(cfg.Levels), - maxSize: cfg.MaxSize, - inactive: cfg.Inactive, - staleIfError: cfg.StaleIfError, - staleIfTimeout: cfg.StaleIfTimeout, - entries: make(map[uint64]*DiskCacheMeta), - loadCh: make(chan struct{}), - stopCh: make(chan struct{}), - } - - // 启动后台加载,不阻塞服务启动 - go dc.lazyLoad() - - return dc, nil -} - -// parseLevels 解析目录层级配置。 -// 支持格式:""(无层级)、"1"(一级)、"1:2"(两级) -func parseLevels(levels string) []int { - if levels == "" { - return nil - } - - var result []int - start := 0 - for i := 0; i <= len(levels); i++ { - if i == len(levels) || levels[i] == ':' { - var level int - for j := start; j < i; j++ { - level = level*10 + int(levels[j]-'0') - } - if level > 0 { - result = append(result, level) - } - start = i + 1 - } - } - return result -} - -// lazyLoad 后台加载缓存元数据。 -func (dc *DiskCache) lazyLoad() { - defer close(dc.loadCh) - - // 扫描目录加载元数据(不加载实际数据) - _ = filepath.Walk(dc.basePath, func(path string, info os.FileInfo, err error) error { - if err != nil || info.IsDir() { - return nil - } - - // 只处理 .meta 文件 - if filepath.Ext(path) != ".meta" { - return nil - } - - meta := dc.loadMetaFile(path) - if meta != nil { - dc.mu.Lock() - dc.entries[meta.HashKey] = meta - dc.mu.Unlock() - dc.currentSize.Add(meta.Size) - } - - return nil - }) - - dc.loaded.Store(true) -} - -// loadMetaFile 加载元数据文件。 -func (dc *DiskCache) loadMetaFile(path string) *DiskCacheMeta { - data, err := os.ReadFile(path) - if err != nil { - return nil - } - - var meta DiskCacheMeta - if err := json.Unmarshal(data, &meta); err != nil { - return nil - } - - return &meta -} - -// Get 获取缓存条目(实现 CacheBackend 接口)。 -func (dc *DiskCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool, bool) { - // 如果未完成加载,等待加载完成或超时 - if !dc.loaded.Load() { - select { - case <-dc.loadCh: - // 加载完成,继续 - case <-time.After(100 * time.Millisecond): - // 超时,返回未命中(避免阻塞请求) - dc.missCount.Add(1) - return nil, false, false - } - } - - dc.mu.RLock() - meta, exists := dc.entries[hashKey] - dc.mu.RUnlock() - - if !exists { - dc.missCount.Add(1) - return nil, false, false - } - - // 双重验证:检查原始 key 是否匹配 - if meta.OrigKey != origKey { - dc.missCount.Add(1) - return nil, false, false - } - - // 读取数据文件 - dataPath := dc.filePathFromHash(hashKey, "data") - data, err := os.ReadFile(dataPath) - if err != nil { - dc.missCount.Add(1) - return nil, false, false - } - - // 验证 CRC32 - if meta.CRC32 != 0 { - if crc32.ChecksumIEEE(data) != meta.CRC32 { - // 数据损坏,删除条目 - _ = dc.Delete(hashKey) - dc.missCount.Add(1) - return nil, false, false - } - } - - // 检查是否过期 - now := time.Now() - expiresAt := meta.Created.Add(meta.MaxAge) - stale := now.After(expiresAt) - - dc.hitCount.Add(1) - - // 转换为 ProxyCacheEntry - entry := &ProxyCacheEntry{ - Key: meta.OrigKey, - OrigKey: meta.OrigKey, - Data: data, - Headers: meta.Headers, - Status: meta.Status, - Created: meta.Created, - MaxAge: meta.MaxAge, - } - - return entry, true, stale -} - -// GetStale 在上游错误时获取可用的过期缓存。 -// -// 与 Get 不同,GetStale 只在错误发生时使用,根据错误类型检查对应的 stale 窗口。 -func (dc *DiskCache) GetStale(hashKey uint64, origKey string, isTimeout bool) (*ProxyCacheEntry, bool) { - // 等待懒加载完成 - if !dc.loaded.Load() { - select { - case <-dc.loadCh: - case <-time.After(100 * time.Millisecond): - return nil, false - } - } - - dc.mu.RLock() - meta, ok := dc.entries[hashKey] - dc.mu.RUnlock() - - if !ok { - return nil, false - } - - // 双重验证:检查原始 key 是否匹配 - if meta.OrigKey != origKey { - return nil, false - } - - // 读取数据文件 - dataPath := dc.filePathFromHash(hashKey, "data") - data, err := os.ReadFile(dataPath) - if err != nil { - return nil, false - } - - // 验证 CRC32 - crc := crc32.ChecksumIEEE(data) - if crc != meta.CRC32 { - return nil, false - } - - now := time.Now() - expiresAt := meta.Created.Add(meta.MaxAge) - - // 未过期,直接返回 - if !now.After(expiresAt) { - entry := &ProxyCacheEntry{ - Key: meta.OrigKey, - OrigKey: meta.OrigKey, - Data: data, - Headers: meta.Headers, - Status: meta.Status, - Created: meta.Created, - MaxAge: meta.MaxAge, - } - return entry, true - } - - // 已过期,检查 stale 窗口 - var staleWindow time.Duration - if isTimeout { - staleWindow = dc.staleIfTimeout - } else { - staleWindow = dc.staleIfError - } - - if staleWindow <= 0 { - return nil, false - } - - // 检查是否在 stale 窗口内 - if now.Sub(expiresAt) > staleWindow { - return nil, false - } - - entry := &ProxyCacheEntry{ - Key: meta.OrigKey, - OrigKey: meta.OrigKey, - Data: data, - Headers: meta.Headers, - Status: meta.Status, - Created: meta.Created, - MaxAge: meta.MaxAge, - } - - return entry, true -} - -// Set 设置缓存条目(实现 CacheBackend 接口)。 -func (dc *DiskCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) { - // 计算文件路径 - dataPath := dc.filePathFromHash(hashKey, "data") - metaPath := dc.filePathFromHash(hashKey, "meta") - - // 确保目录存在 - dir := filepath.Dir(dataPath) - if err := os.MkdirAll(dir, 0o755); err != nil { - logging.Error().Err(err).Str("dir", dir).Msg("disk cache mkdir failed") - return - } - - // 计算 CRC32 - crc := crc32.ChecksumIEEE(data) - - // 创建元数据 - meta := &DiskCacheMeta{ - HashKey: hashKey, - OrigKey: origKey, - Created: time.Now(), - MaxAge: maxAge, - Status: status, - Size: int64(len(data)), - Headers: headers, - CRC32: crc, - } - - // 原子写入数据文件:先写临时文件,再重命名 - tmpDataPath := dataPath + ".tmp" - if err := os.WriteFile(tmpDataPath, data, 0o644); err != nil { - logging.Error().Err(err).Str("path", tmpDataPath).Msg("disk cache write failed") - return - } - if err := os.Rename(tmpDataPath, dataPath); err != nil { - _ = os.Remove(tmpDataPath) - logging.Error().Err(err).Str("from", tmpDataPath).Str("to", dataPath).Msg("disk cache rename failed") - return - } - - // 写入元数据文件 - metaData, err := json.Marshal(meta) - if err != nil { - logging.Error().Err(err).Msg("disk cache json marshal failed") - return - } - tmpMetaPath := metaPath + ".tmp" - if err := os.WriteFile(tmpMetaPath, metaData, 0o644); err != nil { - logging.Error().Err(err).Str("path", tmpMetaPath).Msg("disk cache write meta failed") - return - } - if err := os.Rename(tmpMetaPath, metaPath); err != nil { - _ = os.Remove(tmpMetaPath) - logging.Error().Err(err).Str("from", tmpMetaPath).Str("to", metaPath).Msg("disk cache rename meta failed") - return - } - - // 更新内存索引 - dc.mu.Lock() - oldMeta, existed := dc.entries[hashKey] - dc.entries[hashKey] = meta - dc.mu.Unlock() - - // 更新大小统计 - if existed && oldMeta != nil { - dc.currentSize.Add(-oldMeta.Size) - } - dc.currentSize.Add(meta.Size) - - // 检查是否需要淘汰 - if dc.maxSize > 0 && dc.currentSize.Load() > dc.maxSize { - go dc.evict() - } -} - -// Delete 删除缓存条目(实现 CacheBackend 接口)。 -func (dc *DiskCache) Delete(hashKey uint64) error { - dc.mu.Lock() - meta, exists := dc.entries[hashKey] - if exists { - delete(dc.entries, hashKey) - dc.currentSize.Add(-meta.Size) - dc.evictions.Add(1) - } - dc.mu.Unlock() - - if !exists { - return nil - } - - // 删除文件 - dataPath := dc.filePathFromHash(hashKey, "data") - metaPath := dc.filePathFromHash(hashKey, "meta") - _ = os.Remove(dataPath) - _ = os.Remove(metaPath) - - return nil -} - -// CacheStats 返回缓存统计信息(实现 CacheBackend 接口)。 -func (dc *DiskCache) CacheStats() CacheStats { - dc.mu.RLock() - entries := int64(len(dc.entries)) - dc.mu.RUnlock() - - return CacheStats{ - Entries: entries, - Size: dc.currentSize.Load(), - HitCount: dc.hitCount.Load(), - MissCount: dc.missCount.Load(), - Evictions: dc.evictions.Load(), - } -} - -// Stop 停止磁盘缓存。 -func (dc *DiskCache) Stop() { - close(dc.stopCh) -} - -// filePathFromHash 根据哈希值计算文件路径。 -func (dc *DiskCache) filePathFromHash(hashKey uint64, ext string) string { - // 将哈希值转换为十六进制字符串 - hashStr := fmt.Sprintf("%016x", hashKey) - - if len(dc.levels) == 0 { - return filepath.Join(dc.basePath, hashStr+"."+ext) - } - - // 根据层级构建路径 - parts := []string{dc.basePath} - offset := len(hashStr) - for _, level := range dc.levels { - if offset < level { - break - } - offset -= level - parts = append(parts, hashStr[offset:offset+level]) - } - parts = append(parts, hashStr+"."+ext) - - return filepath.Join(parts...) -} - -// evict 淘汰旧条目。 -func (dc *DiskCache) evict() { - // 简单策略:删除最旧的条目直到大小低于阈值 - targetSize := dc.maxSize * 9 / 10 // 淘汰到 90% - - for dc.currentSize.Load() > targetSize { - dc.mu.Lock() - // 找到最旧的条目 - var oldestKey uint64 - var oldestTime time.Time - for key, meta := range dc.entries { - if oldestKey == 0 || meta.Created.Before(oldestTime) { - oldestKey = key - oldestTime = meta.Created - } - } - - if oldestKey == 0 { - dc.mu.Unlock() - break - } - - meta := dc.entries[oldestKey] - delete(dc.entries, oldestKey) - dc.currentSize.Add(-meta.Size) - dc.evictions.Add(1) - dc.mu.Unlock() - - // 删除文件 - dataPath := dc.filePathFromHash(oldestKey, "data") - metaPath := dc.filePathFromHash(oldestKey, "meta") - _ = os.Remove(dataPath) - _ = os.Remove(metaPath) - } -} diff --git a/internal/cache/disk_cache_test.go b/internal/cache/disk_cache_test.go deleted file mode 100644 index 2d01299..0000000 --- a/internal/cache/disk_cache_test.go +++ /dev/null @@ -1,497 +0,0 @@ -package cache - -import ( - "os" - "path/filepath" - "testing" - "time" -) - -func TestNewDiskCache(t *testing.T) { - // 创建临时目录 - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - // 等待懒加载完成 - <-dc.loadCh - - if dc.basePath != tmpDir { - t.Errorf("basePath = %q, want %q", dc.basePath, tmpDir) - } -} - -func TestDiskCacheSetGet(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - // 等待懒加载完成 - <-dc.loadCh - - // 设置缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("test response data") - headers := map[string]string{"Content-Type": "application/json"} - status := 200 - maxAge := 10 * time.Minute - - dc.Set(hashKey, origKey, data, headers, status, maxAge) - - // 获取缓存 - entry, exists, stale := dc.Get(hashKey, origKey) - if !exists { - t.Fatal("cache entry not found") - } - if stale { - t.Error("entry should not be stale") - } - if string(entry.Data) != string(data) { - t.Errorf("Data = %q, want %q", entry.Data, data) - } - if entry.Status != status { - t.Errorf("Status = %d, want %d", entry.Status, status) - } - if entry.Headers["Content-Type"] != "application/json" { - t.Errorf("Headers[Content-Type] = %q, want %q", entry.Headers["Content-Type"], "application/json") - } -} - -func TestDiskCacheDelete(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - <-dc.loadCh - - // 设置缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("test"), nil, 200, 10*time.Minute) - - // 验证存在 - _, exists, _ := dc.Get(hashKey, origKey) - if !exists { - t.Fatal("entry should exist before delete") - } - - // 删除 - if err := dc.Delete(hashKey); err != nil { - t.Fatalf("Delete failed: %v", err) - } - - // 验证已删除 - _, exists, _ = dc.Get(hashKey, origKey) - if exists { - t.Error("entry should not exist after delete") - } -} - -func TestDiskCacheStale(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - <-dc.loadCh - - // 设置一个已过期的缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("test"), nil, 200, 1*time.Millisecond) - - // 等待过期 - time.Sleep(10 * time.Millisecond) - - // 获取缓存 - entry, exists, stale := dc.Get(hashKey, origKey) - if !exists { - t.Fatal("expired entry should still exist") - } - if !stale { - t.Error("expired entry should be marked as stale") - } - if string(entry.Data) != "test" { - t.Errorf("Data = %q, want %q", entry.Data, "test") - } -} - -func TestDiskCacheLevels(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - <-dc.loadCh - - // 设置缓存 - hashKey := uint64(0xabcdef1234567890) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("test"), nil, 200, 10*time.Minute) - - // 验证文件路径包含层级目录 - dataPath := dc.filePathFromHash(hashKey, "data") - if filepath.Dir(dataPath) == tmpDir { - t.Error("file should be in a subdirectory for levels=1:2") - } - - // 验证文件存在 - if _, err := os.Stat(dataPath); os.IsNotExist(err) { - t.Errorf("data file not found at %s", dataPath) - } -} - -func TestDiskCacheMaxSize(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - MaxSize: 100, // 很小的限制 - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - <-dc.loadCh - - // 设置多个缓存条目 - for i := range 10 { - hashKey := uint64(i) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("test data that is longer than 10 bytes"), nil, 200, 10*time.Minute) - } - - // 等待淘汰完成(淘汰是异步的) - time.Sleep(500 * time.Millisecond) - - // 验证淘汰发生(Evictions > 0) - stats := dc.CacheStats() - if stats.Evictions == 0 { - t.Error("Evictions should be > 0 when MaxSize is exceeded") - } -} - -func TestDiskCacheStats(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - <-dc.loadCh - - // 初始统计 - stats := dc.CacheStats() - if stats.Entries != 0 { - t.Errorf("initial Entries = %d, want 0", stats.Entries) - } - - // 设置缓存 - dc.Set(1, "key1", []byte("data1"), nil, 200, 10*time.Minute) - dc.Set(2, "key2", []byte("data2"), nil, 200, 10*time.Minute) - - stats = dc.CacheStats() - if stats.Entries != 2 { - t.Errorf("Entries = %d, want 2", stats.Entries) - } - - // 获取缓存(命中) - dc.Get(1, "key1") - stats = dc.CacheStats() - if stats.HitCount != 1 { - t.Errorf("HitCount = %d, want 1", stats.HitCount) - } - - // 获取不存在的缓存(未命中) - dc.Get(999, "nonexistent") - stats = dc.CacheStats() - if stats.MissCount != 1 { - t.Errorf("MissCount = %d, want 1", stats.MissCount) - } -} - -func TestDiskCacheCRC32(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - <-dc.loadCh - - // 设置缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("test data for crc32") - dc.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) - - // 获取缓存验证 CRC32 - entry, exists, _ := dc.Get(hashKey, origKey) - if !exists { - t.Fatal("entry not found") - } - if string(entry.Data) != string(data) { - t.Errorf("Data mismatch") - } -} - -func TestParseLevels(t *testing.T) { - tests := []struct { - input string - expected []int - }{ - {"", nil}, - {"1", []int{1}}, - {"1:2", []int{1, 2}}, - {"2:2:2", []int{2, 2, 2}}, - } - - for _, tt := range tests { - result := parseLevels(tt.input) - if len(result) != len(tt.expected) { - t.Errorf("parseLevels(%q) = %v, want %v", tt.input, result, tt.expected) - continue - } - for i, v := range result { - if v != tt.expected[i] { - t.Errorf("parseLevels(%q)[%d] = %d, want %d", tt.input, i, v, tt.expected[i]) - } - } - } -} - -func TestDiskCacheLazyLoad(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - // 在懒加载完成前,loaded 应该是 false - // 但由于加载很快,我们无法可靠测试这个状态 - // 所以我们等待加载完成 - <-dc.loadCh - - if !dc.loaded.Load() { - t.Error("loaded should be true after lazyLoad completes") - } -} - -func TestDiskCacheRestart(t *testing.T) { - tmpDir := t.TempDir() - - // 第一个实例:写入数据 - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc1, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - <-dc1.loadCh - - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("persistent data") - dc1.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) - dc1.Stop() - - // 第二个实例:读取数据(模拟重启) - dc2, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache (restart) failed: %v", err) - } - <-dc2.loadCh - defer dc2.Stop() - - // 验证数据恢复 - entry, exists, _ := dc2.Get(hashKey, origKey) - if !exists { - t.Fatal("entry should exist after restart") - } - if string(entry.Data) != string(data) { - t.Errorf("Data = %q, want %q", entry.Data, data) - } -} - -func TestDiskCacheGetStaleIfError(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - StaleIfError: 200 * time.Millisecond, - StaleIfTimeout: 0, - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - <-dc.loadCh - - hashKey := uint64(12345) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("data"), nil, 200, 100*time.Millisecond) - - // 等待过期但仍在 stale_if_error 窗口内 - time.Sleep(150 * time.Millisecond) - - // isTimeout=false,应该使用 staleIfError 窗口 - entry, ok := dc.GetStale(hashKey, origKey, false) - if !ok { - t.Error("stale entry should be usable on error") - } - if entry == nil || string(entry.Data) != "data" { - t.Errorf("entry.Data = %v, want %q", entry, "data") - } - - // isTimeout=true,staleIfTimeout=0,不应该可用 - if _, ok2 := dc.GetStale(hashKey, origKey, true); ok2 { - t.Error("stale entry should NOT be usable on timeout when staleIfTimeout=0") - } -} - -func TestDiskCacheGetStaleIfTimeout(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - StaleIfError: 0, - StaleIfTimeout: 300 * time.Millisecond, - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - <-dc.loadCh - - hashKey := uint64(12345) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("data"), nil, 200, 100*time.Millisecond) - - // 等待过期但仍在 stale_if_timeout 窗口内 - time.Sleep(250 * time.Millisecond) - - // isTimeout=true,应该使用 staleIfTimeout 窗口 - entry, ok := dc.GetStale(hashKey, origKey, true) - if !ok { - t.Error("stale entry should be usable on timeout") - } - if entry == nil { - t.Error("expected stale entry data") - } - - // isTimeout=false,staleIfError=0,不应该可用 - if _, ok2 := dc.GetStale(hashKey, origKey, false); ok2 { - t.Error("stale entry should NOT be usable on error when staleIfError=0") - } -} - -func TestDiskCacheGetStaleExpired(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - StaleIfError: 100 * time.Millisecond, - StaleIfTimeout: 100 * time.Millisecond, - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - <-dc.loadCh - - hashKey := uint64(12345) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("data"), nil, 200, 50*time.Millisecond) - - // 等待超过 stale 窗口 - time.Sleep(200 * time.Millisecond) - - if _, ok := dc.GetStale(hashKey, origKey, false); ok { - t.Error("stale entry should NOT be usable after stale window expired") - } - - if _, ok2 := dc.GetStale(hashKey, origKey, true); ok2 { - t.Error("stale entry should NOT be usable on timeout after stale window expired") - } -} diff --git a/internal/cache/file_cache.go b/internal/cache/file_cache.go index 2e954eb..da1298f 100644 --- a/internal/cache/file_cache.go +++ b/internal/cache/file_cache.go @@ -19,6 +19,7 @@ package cache import ( "container/list" + "hash/fnv" "slices" "strings" "sync" @@ -787,3 +788,71 @@ func (c *ProxyCache) CacheStats() CacheStats { Entries: int64(len(c.entries)), } } + +// HashPathWithMethod 使用 FNV-64a 计算路径和方法的哈希值。 +func HashPathWithMethod(path string, method string) uint64 { + if method == "" { + method = "GET" + } + key := method + ":" + path + h := fnv.New64a() + h.Write([]byte(key)) + return h.Sum64() +} + +// MatchPattern 检查路径是否匹配通配符模式。 +// +// 支持以下匹配模式: +// - "*":匹配所有路径 +// - 以 "*" 结尾:前缀匹配(如 "/api/*" 匹配 "/api/xxx") +// - 以 "/" 结尾:目录前缀匹配 +// - 中间通配符:"/api/*/users" 匹配 "/api/v1/users" +// - 其他:精确匹配 +func MatchPattern(pattern, path string) bool { + // 特殊情况:* 匹配所有 + if pattern == "*" { + return true + } + + // 目录前缀匹配(pattern 以 / 结尾) + if strings.HasSuffix(pattern, "/") { + return strings.HasPrefix(path, pattern) + } + + // 检查是否有通配符 + if !strings.Contains(pattern, "*") { + return path == pattern + } + + // 简单的前缀匹配:/api/users/* 匹配 /api/users/123 + if prefix, ok := strings.CutSuffix(pattern, "*"); ok { + return strings.HasPrefix(path, prefix) + } + + // 中间通配符:/api/*/users 匹配 /api/v1/users + parts := strings.Split(pattern, "*") + if len(parts) == 2 { + return strings.HasPrefix(path, parts[0]) && strings.HasSuffix(path, parts[1]) + } + + // 复杂模式不支持,返回 false + return false +} + +// PurgeRequest 缓存清理请求结构。 +type PurgeRequest struct { + // Path 精确路径 + Path string `json:"path,omitempty"` + + // Pattern 通配符模式(支持 * 通配符) + Pattern string `json:"pattern,omitempty"` + + // Method HTTP 方法,默认 "GET" + Method string `json:"method,omitempty"` +} + +// PurgeResponse 缓存清理响应结构。 +type PurgeResponse struct { + // Deleted 被删除的缓存条目数 + Deleted int `json:"deleted"` +} diff --git a/internal/cache/purge.go b/internal/cache/purge.go deleted file mode 100644 index e4213fc..0000000 --- a/internal/cache/purge.go +++ /dev/null @@ -1,268 +0,0 @@ -// Package cache 提供文件缓存和代理缓存功能,支持 LRU 淘汰和缓存锁防击穿。 -// -// 该文件实现了缓存清理 API,用于主动清理代理缓存。 -// -// 主要功能: -// - 精确路径清理:删除指定路径的缓存条目 -// - 通配符模式清理:按模式批量删除缓存条目 -// - IP 白名单访问控制 -// - Token 认证支持 -// -// 作者:xfy -package cache - -import ( - "encoding/json" - "hash/fnv" - "net" - "strings" - - "github.com/valyala/fasthttp" - "rua.plus/lolly/internal/config" - "rua.plus/lolly/internal/utils" -) - -// PurgeAPI 缓存清理 API 处理器。 -// -// 提供 HTTP API 用于主动清理代理缓存,支持精确路径和通配符模式清理。 -// -// 注意事项: -// - 所有方法均为并发安全 -// - 支持 IP 白名单和 Token 认证 -// - 仅处理 POST 请求 -type PurgeAPI struct { - cache *ProxyCache - auth config.CacheAPIAuthConfig - path string - allowed []net.IPNet -} - -// PurgeRequest 清理请求结构。 -type PurgeRequest struct { - // Path 精确路径 - Path string `json:"path,omitempty"` - - // Pattern 通配符模式(支持 * 通配符) - Pattern string `json:"pattern,omitempty"` - - // Method HTTP 方法,默认 "GET" - Method string `json:"method,omitempty"` -} - -// PurgeResponse 清理响应结构。 -type PurgeResponse struct { - // Deleted 被删除的缓存条目数 - Deleted int `json:"deleted"` -} - -// PurgeErrorResponse 错误响应结构。 -type PurgeErrorResponse struct { - // Error 错误信息 - Error string `json:"error"` -} - -// NewPurgeAPI 创建缓存清理 API 处理器。 -// -// 参数: -// - cache: 代理缓存实例 -// - cfg: 缓存 API 配置 -// -// 返回值: -// - *PurgeAPI: 配置好的处理器 -// - error: IP 解析失败时返回非 nil 错误 -func NewPurgeAPI(cache *ProxyCache, cfg *config.CacheAPIConfig) (*PurgeAPI, error) { - p := &PurgeAPI{ - cache: cache, - auth: cfg.Auth, - path: cfg.Path, - } - - // 解析允许的 IP 列表 - for _, cidr := range cfg.Allow { - _, network, err := net.ParseCIDR(cidr) - if err != nil { - // 尝试作为单个 IP 解析 - ip := net.ParseIP(cidr) - if ip == nil { - return nil, err - } - // 转换为 CIDR 格式 - if ip.To4() != nil { - _, network, _ = net.ParseCIDR(cidr + "/32") - } else { - _, network, _ = net.ParseCIDR(cidr + "/128") - } - } - if network != nil { - p.allowed = append(p.allowed, *network) - } - } - - return p, nil -} - -// Path 返回 API 端点路径。 -func (p *PurgeAPI) Path() string { - if p.path == "" { - return "/_cache/purge" - } - return p.path -} - -// ServeHTTP 处理缓存清理请求。 -// -// 仅处理 POST 请求,支持精确路径和通配符模式清理。 -// 返回 JSON 格式的响应。 -func (p *PurgeAPI) ServeHTTP(ctx *fasthttp.RequestCtx) { - // 仅允许 POST 方法 - if string(ctx.Method()) != "POST" { - utils.SendJSONError(ctx, fasthttp.StatusMethodNotAllowed, "method not allowed") - return - } - - // 检查 IP 访问权限 - if !utils.CheckIPAccess(ctx, p.allowed) { - utils.SendJSONError(ctx, fasthttp.StatusForbidden, "forbidden") - return - } - - // 检查认证 - if !utils.CheckTokenAuth(ctx, p.auth) { - utils.SendJSONError(ctx, fasthttp.StatusUnauthorized, "unauthorized") - return - } - - // 解析请求体 - var req PurgeRequest - if err := json.Unmarshal(ctx.PostBody(), &req); err != nil { - utils.SendJSONError(ctx, fasthttp.StatusBadRequest, "invalid request body") - return - } - - // 执行清理 - deleted := 0 - if req.Path != "" { - deleted = p.purgeByPath(req.Path) - } else if req.Pattern != "" { - deleted = p.purgeByPattern(req.Pattern) - } else { - utils.SendJSONError(ctx, fasthttp.StatusBadRequest, "missing path or pattern") - return - } - - // 返回响应 - ctx.SetContentType("application/json; charset=utf-8") - ctx.SetStatusCode(fasthttp.StatusOK) - _ = json.NewEncoder(ctx).Encode(PurgeResponse{Deleted: deleted}) -} - -// purgeByPath 按精确路径清理缓存。 -func (p *PurgeAPI) purgeByPath(path string) int { - if p.cache == nil { - return 0 - } - - // 计算缓存键的哈希值 - hashKey := hashPath(path) - - // 尝试删除 - p.cache.mu.Lock() - defer p.cache.mu.Unlock() - - if _, ok := p.cache.entries[hashKey]; ok { - delete(p.cache.entries, hashKey) - return 1 - } - - return 0 -} - -// purgeByPattern 按通配符模式清理缓存。 -func (p *PurgeAPI) purgeByPattern(pattern string) int { - if p.cache == nil { - return 0 - } - - p.cache.mu.Lock() - defer p.cache.mu.Unlock() - - deleted := 0 - for hashKey, entry := range p.cache.entries { - if matchPattern(pattern, entry.OrigKey) { - delete(p.cache.entries, hashKey) - deleted++ - } - } - - return deleted -} - -// HashPathWithMethod 使用 FNV-64a 计算缓存键的哈希值。 -// method 为空时默认使用 "GET"。 -func HashPathWithMethod(path string, method string) uint64 { - if method == "" { - method = "GET" - } - key := method + ":" + path - h := fnv.New64a() - h.Write([]byte(key)) - return h.Sum64() -} - -// hashPath 使用 FNV-64a 计算路径的哈希值。 -// 与代理层 buildCacheKeyHash 使用相同的算法,确保一致性。 -// 注意:代理层的 key 格式为 "METHOD:URI",purge 时默认使用 GET 方法。 -func hashPath(path string) uint64 { - return HashPathWithMethod(path, "GET") -} - -// MatchPattern 检查路径是否匹配通配符模式。 -// -// 支持以下匹配模式: -// - "*":匹配所有路径 -// - 以 "*" 结尾:前缀匹配(如 "/api/*" 匹配 "/api/xxx") -// - 以 "/" 结尾:目录前缀匹配 -// - 中间通配符:"/api/*/users" 匹配 "/api/v1/users" -// - 其他:精确匹配 -// -// 参数: -// - pattern: 匹配模式,支持通配符 -// - path: 待检查的路径 -// -// 返回值: -// - bool: true 表示匹配,false 表示不匹配 -func MatchPattern(pattern, path string) bool { - // 特殊情况:* 匹配所有 - if pattern == "*" { - return true - } - - // 目录前缀匹配(pattern 以 / 结尾) - if strings.HasSuffix(pattern, "/") { - return strings.HasPrefix(path, pattern) - } - - // 检查是否有通配符 - if !strings.Contains(pattern, "*") { - return path == pattern - } - - // 简单的前缀匹配:/api/users/* 匹配 /api/users/123 - if prefix, ok := strings.CutSuffix(pattern, "*"); ok { - return strings.HasPrefix(path, prefix) - } - - // 中间通配符:/api/*/users 匹配 /api/v1/users - parts := strings.Split(pattern, "*") - if len(parts) == 2 { - return strings.HasPrefix(path, parts[0]) && strings.HasSuffix(path, parts[1]) - } - - // 复杂模式不支持,返回 false - return false -} - -// matchPattern 是 MatchPattern 的内部别名,保持向后兼容。 -func matchPattern(pattern, path string) bool { - return MatchPattern(pattern, path) -} diff --git a/internal/cache/purge_test.go b/internal/cache/purge_test.go deleted file mode 100644 index ba149a9..0000000 --- a/internal/cache/purge_test.go +++ /dev/null @@ -1,763 +0,0 @@ -// Package cache 提供缓存清理 API 的测试。 -// -// 该文件测试 purge.go 中的各项功能,包括: -// - PurgeAPI 创建和配置 -// - Path() 默认和自定义路径 -// - ServeHTTP 完整请求处理 -// - IP 白名单访问控制 -// - Token 认证 -// - 按路径和模式清理缓存 -// - HashPathWithMethod 哈希计算 -// - MatchPattern 通配符匹配 -// -// 作者:xfy -package cache - -import ( - "encoding/json" - "net" - "testing" - "time" - - "github.com/valyala/fasthttp" - "rua.plus/lolly/internal/config" -) - -func TestHashPathWithMethod(t *testing.T) { - t.Run("GET method default", func(t *testing.T) { - h1 := HashPathWithMethod("/api/users", "") - h2 := HashPathWithMethod("/api/users", "GET") - if h1 != h2 { - t.Errorf("Expected empty method to default to GET, got %d vs %d", h1, h2) - } - }) - - t.Run("different methods different hashes", func(t *testing.T) { - hGet := HashPathWithMethod("/api/users", "GET") - hPost := HashPathWithMethod("/api/users", "POST") - if hGet == hPost { - t.Error("Expected different hashes for different methods") - } - }) - - t.Run("different paths different hashes", func(t *testing.T) { - h1 := HashPathWithMethod("/api/users", "GET") - h2 := HashPathWithMethod("/api/posts", "GET") - if h1 == h2 { - t.Error("Expected different hashes for different paths") - } - }) -} - -func TestMatchPattern(t *testing.T) { - tests := []struct { - name string - pattern string - path string - want bool - }{ - // Wildcard - {"wildcard matches all", "*", "/anything/goes", true}, - {"wildcard matches empty", "*", "/", true}, - - // Prefix with trailing * - {"prefix star match", "/api/*", "/api/users", true}, - {"prefix star match nested", "/api/*", "/api/v1/users", true}, - {"prefix star no match", "/api/*", "/other/path", false}, - {"prefix star match base", "/api/*", "/api/", true}, - - // Directory prefix (pattern ends with /) - {"dir prefix match", "/api/", "/api/users", true}, - {"dir prefix no match", "/api/", "/other/path", false}, - - // Exact match - {"exact match", "/api/users", "/api/users", true}, - {"exact no match", "/api/users", "/api/users/extra", false}, - - // Middle wildcard - {"middle wildcard", "/api/*/users", "/api/v1/users", true}, - {"middle wildcard no match prefix", "/api/*/users", "/other/v1/users", false}, - {"middle wildcard no match suffix", "/api/*/users", "/api/v1/posts", false}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := MatchPattern(tt.pattern, tt.path) - if result != tt.want { - t.Errorf("MatchPattern(%s, %s) = %v, want %v", tt.pattern, tt.path, result, tt.want) - } - }) - } -} - -func TestNewPurgeAPI(t *testing.T) { - t.Run("nil cache", func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Path: "/_cache/purge", - Allow: []string{}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if api == nil { - t.Fatal("expected non-nil PurgeAPI") - } - if api.cache != nil { - t.Error("expected nil cache") - } - }) - - t.Run("with cache", func(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Path: "/custom/purge", - Allow: []string{"127.0.0.1"}, - Auth: config.CacheAPIAuthConfig{ - Type: "token", - Token: "test-token", - }, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if api.cache != pc { - t.Error("expected cache to match") - } - if api.path != "/custom/purge" { - t.Errorf("expected path /custom/purge, got %s", api.path) - } - if api.auth.Token != "test-token" { - t.Errorf("expected token test-token, got %s", api.auth.Token) - } - }) - - t.Run("CIDR parsing", func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{"10.0.0.0/8", "172.16.0.0/12"}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if len(api.allowed) != 2 { - t.Errorf("expected 2 allowed networks, got %d", len(api.allowed)) - } - }) - - t.Run("single IP converted to CIDR", func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{"192.168.1.100"}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if len(api.allowed) != 1 { - t.Fatalf("expected 1 allowed network, got %d", len(api.allowed)) - } - if api.allowed[0].String() != "192.168.1.100/32" { - t.Errorf("expected 192.168.1.100/32, got %s", api.allowed[0].String()) - } - }) - - t.Run("single IPv6 converted to CIDR", func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{"::1"}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if len(api.allowed) != 1 { - t.Fatalf("expected 1 allowed network, got %d", len(api.allowed)) - } - if api.allowed[0].String() != "::1/128" { - t.Errorf("expected ::1/128, got %s", api.allowed[0].String()) - } - }) - - t.Run("invalid IP returns error", func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{"not-an-ip"}, - } - _, err := NewPurgeAPI(nil, cfg) - if err == nil { - t.Error("expected error for invalid IP, got nil") - } - }) - - t.Run("mixed valid and CIDR", func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{"10.0.0.0/8", "192.168.1.1"}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if len(api.allowed) != 2 { - t.Errorf("expected 2 allowed networks, got %d", len(api.allowed)) - } - }) -} - -func TestPurgeAPI_Path(t *testing.T) { - tests := []struct { - name string - cfgPath string - wantPath string - }{ - {"default path", "", "/_cache/purge"}, - {"custom path", "/api/purge", "/api/purge"}, - {"custom path with version", "/api/v1/cache/purge", "/api/v1/cache/purge"}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Path: tt.cfgPath, - Allow: []string{}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if api.Path() != tt.wantPath { - t.Errorf("expected path %s, got %s", tt.wantPath, api.Path()) - } - }) - } -} - -func TestPurgeAPI_ServeHTTP_MethodNotAllowed(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - tests := []string{"GET", "PUT", "DELETE", "PATCH", "OPTIONS"} - for _, method := range tests { - t.Run(method, func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod(method) - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusMethodNotAllowed { - t.Errorf("expected status 405, got %d", ctx.Response.StatusCode()) - } - }) - } -} - -func TestPurgeAPI_ServeHTTP_AccessForbidden(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{"10.0.0.0/8"}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/test"}`) - // Set RemoteAddr to 192.168.1.1 (not in 10.0.0.0/8) - ctx.SetRemoteAddr(&net.TCPAddr{IP: net.ParseIP("192.168.1.1"), Port: 12345}) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusForbidden { - t.Errorf("expected status 403, got %d", ctx.Response.StatusCode()) - } -} - -func TestPurgeAPI_ServeHTTP_Unauthorized(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{"127.0.0.1"}, - Auth: config.CacheAPIAuthConfig{ - Type: "token", - Token: "secret", - }, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/test"}`) - ctx.SetRemoteAddr(&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 12345}) - // No Authorization header - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusUnauthorized { - t.Errorf("expected status 401, got %d", ctx.Response.StatusCode()) - } -} - -func TestPurgeAPI_ServeHTTP_BadRequest(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - t.Run("invalid JSON", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{invalid json}`) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusBadRequest { - t.Errorf("expected status 400, got %d", ctx.Response.StatusCode()) - } - }) - - t.Run("missing path and pattern", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{}`) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusBadRequest { - t.Errorf("expected status 400, got %d", ctx.Response.StatusCode()) - } - }) -} - -func TestPurgeAPI_ServeHTTP_PurgeByPath(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - key := "GET:/api/test" - pc.Set(hashKey(key), key, []byte("data"), nil, 200, 10*60*time.Second) - - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/api/test"}`) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Fatalf("expected status 200, got %d", ctx.Response.StatusCode()) - } - - var resp PurgeResponse - if err := json.Unmarshal(ctx.Response.Body(), &resp); err != nil { - t.Fatalf("failed to parse response: %v", err) - } - if resp.Deleted != 1 { - t.Errorf("expected 1 deleted, got %d", resp.Deleted) - } - - // Verify cache is gone - _, ok, _ := pc.Get(hashKey(key), key) - if ok { - t.Error("expected cache entry to be purged") - } -} - -func TestPurgeAPI_ServeHTTP_PurgeByPath_NotFound(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/nonexistent"}`) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Fatalf("expected status 200, got %d", ctx.Response.StatusCode()) - } - - var resp PurgeResponse - if err := json.Unmarshal(ctx.Response.Body(), &resp); err != nil { - t.Fatalf("failed to parse response: %v", err) - } - if resp.Deleted != 0 { - t.Errorf("expected 0 deleted, got %d", resp.Deleted) - } -} - -func TestPurgeAPI_ServeHTTP_PurgeByPattern(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - pc.Set(hashKey("GET:/api/users"), "GET:/api/users", []byte("users"), nil, 200, 10*60*time.Second) - pc.Set(hashKey("GET:/api/posts"), "GET:/api/posts", []byte("posts"), nil, 200, 10*60*time.Second) - pc.Set(hashKey("GET:/static/css"), "GET:/static/css", []byte("css"), nil, 200, 10*60*time.Second) - - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"pattern": "GET:/api/*"}`) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Fatalf("expected status 200, got %d", ctx.Response.StatusCode()) - } - - var resp PurgeResponse - if err := json.Unmarshal(ctx.Response.Body(), &resp); err != nil { - t.Fatalf("failed to parse response: %v", err) - } - if resp.Deleted != 2 { - t.Errorf("expected 2 deleted (api/users and api/posts), got %d", resp.Deleted) - } - - // Verify /static/css is still there - _, ok, _ := pc.Get(hashKey("GET:/static/css"), "GET:/static/css") - if !ok { - t.Error("expected /static/css to still exist") - } -} - -func TestPurgeAPI_ServeHTTP_PurgeByPattern_Wildcard(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - pc.Set(hashKey("GET:/a"), "GET:/a", []byte("a"), nil, 200, 10*60*time.Second) - pc.Set(hashKey("GET:/b"), "GET:/b", []byte("b"), nil, 200, 10*60*time.Second) - - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"pattern": "*"}`) - - api.ServeHTTP(ctx) - - var resp PurgeResponse - if err := json.Unmarshal(ctx.Response.Body(), &resp); err != nil { - t.Fatalf("failed to parse response: %v", err) - } - if resp.Deleted != 2 { - t.Errorf("expected 2 deleted, got %d", resp.Deleted) - } -} - -func TestPurgeAPI_ServeHTTP_PurgeByPattern_DirPrefix(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - pc.Set(hashKey("GET:/api/v1/users"), "GET:/api/v1/users", []byte("u"), nil, 200, 10*60*time.Second) - pc.Set(hashKey("GET:/api/v2/posts"), "GET:/api/v2/posts", []byte("p"), nil, 200, 10*60*time.Second) - pc.Set(hashKey("GET:/other"), "GET:/other", []byte("o"), nil, 200, 10*60*time.Second) - - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"pattern": "GET:/api/"}`) - - api.ServeHTTP(ctx) - - var resp PurgeResponse - if err := json.Unmarshal(ctx.Response.Body(), &resp); err != nil { - t.Fatalf("failed to parse response: %v", err) - } - if resp.Deleted != 2 { - t.Errorf("expected 2 deleted, got %d", resp.Deleted) - } -} - -func TestPurgeAPI_ServeHTTP_ContentType(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - t.Run("success response content type", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/test"}`) - api.ServeHTTP(ctx) - - ct := string(ctx.Response.Header.Peek("Content-Type")) - if ct != "application/json; charset=utf-8" { - t.Errorf("expected content-type application/json; charset=utf-8, got %s", ct) - } - }) - - t.Run("error response content type", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("GET") - api.ServeHTTP(ctx) - - ct := string(ctx.Response.Header.Peek("Content-Type")) - if ct != "application/json; charset=utf-8" { - t.Errorf("expected content-type application/json; charset=utf-8, got %s", ct) - } - }) -} - -func TestPurgeAPI_ServeHTTP_AccessAllowed(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{"10.0.0.0/8"}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/api/test"}`) - ctx.SetRemoteAddr(&net.TCPAddr{IP: net.ParseIP("10.1.2.3"), Port: 12345}) - - api.ServeHTTP(ctx) - - // Should succeed (access allowed, no auth required) - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Errorf("expected status 200, got %d", ctx.Response.StatusCode()) - } -} - -func TestPurgeAPI_ServeHTTP_TokenAuth(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - Auth: config.CacheAPIAuthConfig{ - Type: "token", - Token: "my-secret", - }, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - t.Run("Bearer token", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/test"}`) - ctx.Request.Header.Set("Authorization", "Bearer my-secret") - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Errorf("expected status 200, got %d", ctx.Response.StatusCode()) - } - }) - - t.Run("Direct token", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/test"}`) - ctx.Request.Header.Set("Authorization", "my-secret") - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Errorf("expected status 200, got %d", ctx.Response.StatusCode()) - } - }) -} - -func TestPurgeAPI_ServeHTTP_AuthTypeNone(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - Auth: config.CacheAPIAuthConfig{ - Type: "none", - }, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/test"}`) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Errorf("expected status 200, got %d", ctx.Response.StatusCode()) - } -} - -func TestPurgeAPI_PurgeByPath_NilCache(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // Directly test purgeByPath with nil cache - result := api.purgeByPath("/test") - if result != 0 { - t.Errorf("expected 0 deleted with nil cache, got %d", result) - } -} - -func TestPurgeAPI_PurgeByPattern_NilCache(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // Directly test purgeByPattern with nil cache - result := api.purgeByPattern("*") - if result != 0 { - t.Errorf("expected 0 deleted with nil cache, got %d", result) - } -} - -func TestPurgeAPI_ErrorResponse(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - t.Run("method not allowed", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("DELETE") - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusMethodNotAllowed { - t.Fatalf("expected status 405, got %d", ctx.Response.StatusCode()) - } - - var errResp PurgeErrorResponse - if err := json.Unmarshal(ctx.Response.Body(), &errResp); err != nil { - t.Fatalf("failed to parse error response: %v", err) - } - if errResp.Error != "method not allowed" { - t.Errorf("expected error 'method not allowed', got %s", errResp.Error) - } - }) - - t.Run("forbidden", func(t *testing.T) { - cfg2 := &config.CacheAPIConfig{ - Allow: []string{"10.0.0.0/8"}, - } - api2, err := NewPurgeAPI(nil, cfg2) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.SetRemoteAddr(&net.TCPAddr{IP: net.ParseIP("192.168.1.1"), Port: 12345}) - api2.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusForbidden { - t.Fatalf("expected status 403, got %d", ctx.Response.StatusCode()) - } - - var errResp PurgeErrorResponse - if err := json.Unmarshal(ctx.Response.Body(), &errResp); err != nil { - t.Fatalf("failed to parse error response: %v", err) - } - if errResp.Error != "forbidden" { - t.Errorf("expected error 'forbidden', got %s", errResp.Error) - } - }) -} - -func TestPurgeAPI_PurgeByPath_WrongMethod(t *testing.T) { - // Test that hashPath only uses GET, so purging a POST-cached entry won't work - pc := NewProxyCache(nil, false, 0, 0, 0) - // Set a cache entry with GET:/api/test key - pc.Set(hashKey("GET:/api/test"), "GET:/api/test", []byte("data"), nil, 200, 10*60*time.Second) - - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/api/test"}`) - - api.ServeHTTP(ctx) - - var resp PurgeResponse - if err := json.Unmarshal(ctx.Response.Body(), &resp); err != nil { - t.Fatalf("failed to parse response: %v", err) - } - if resp.Deleted != 1 { - t.Errorf("expected 1 deleted, got %d", resp.Deleted) - } -} - -func TestMatchPattern_ComplexPatterns(t *testing.T) { - t.Run("multiple wildcards not supported", func(t *testing.T) { - // Pattern with multiple * in the middle is not supported - result := MatchPattern("/api/*/users/*", "/api/v1/users/123") - if result { - t.Error("expected complex pattern with multiple wildcards to return false") - } - }) - - t.Run("pattern without wildcard exact match", func(t *testing.T) { - result := MatchPattern("/api/users", "/api/users") - if !result { - t.Error("expected exact match") - } - }) - - t.Run("pattern without wildcard no match", func(t *testing.T) { - result := MatchPattern("/api/users", "/api/users/123") - if result { - t.Error("expected no match for non-exact") - } - }) -} diff --git a/internal/cache/tiered_cache.go b/internal/cache/tiered_cache.go deleted file mode 100644 index 7228233..0000000 --- a/internal/cache/tiered_cache.go +++ /dev/null @@ -1,264 +0,0 @@ -// Package cache 提供文件缓存和代理缓存功能。 -// -// 该文件实现 TieredCache 分层缓存,支持: -// - L1 内存缓存(热点数据) -// - L2 磁盘缓存(持久化) -// - 热点提升机制 -// - L2 过期检查 -// -// 主要用途: -// -// 作为代理缓存的主要实现,平衡性能(L1)和容量(L2)。 -// -// 作者:xfy -package cache - -import ( - "sync" - "sync/atomic" - "time" -) - -// TieredCacheConfig 分层缓存配置。 -type TieredCacheConfig struct { - // L1 配置 - L1MaxEntries int64 - L1MaxSize int64 - - // L2 配置 - L2Config *DiskCacheConfig - - // Stale 配置 - StaleIfError time.Duration // 错误时使用过期缓存的窗口 - StaleIfTimeout time.Duration // 超时时使用过期缓存的窗口 - - // 热点提升配置 - PromoteThreshold int // 访问次数阈值,超过后提升到 L1 - PromoteInterval time.Duration // 提升检查间隔 -} - -// TieredCache 分层缓存(L1 内存 + L2 磁盘)。 -type TieredCache struct { - l1 *ProxyCache // L1 内存缓存(热点数据) - l2 *DiskCache // L2 磁盘缓存(持久化) - l1Ratio float64 // L1 容量占 L2 的比例 - promoter *promoter // 热点提升器 - stopCh chan struct{} - - // 统计 - l1Hits atomic.Int64 - l2Hits atomic.Int64 - misses atomic.Int64 - promotes atomic.Int64 -} - -// promoter 热点提升器。 -type promoter struct { - threshold int // 访问次数阈值 - interval time.Duration // 检查间隔 - accessMap map[uint64]*accessInfo - mu sync.RWMutex -} - -// accessInfo 访问信息。 -type accessInfo struct { - count int - lastAccess time.Time - origKey string -} - -// NewTieredCache 创建分层缓存实例。 -func NewTieredCache(cfg *TieredCacheConfig) (*TieredCache, error) { - // 创建 L1 内存缓存 - l1 := NewProxyCache(nil, true, 0, cfg.StaleIfError, cfg.StaleIfTimeout) - - // 创建 L2 磁盘缓存 - l2, err := NewDiskCache(cfg.L2Config) - if err != nil { - return nil, err - } - - tc := &TieredCache{ - l1: l1, - l2: l2, - l1Ratio: 0.1, // 默认 L1 占 10% - promoter: &promoter{ - threshold: cfg.PromoteThreshold, - interval: cfg.PromoteInterval, - accessMap: make(map[uint64]*accessInfo), - }, - stopCh: make(chan struct{}), - } - - // 启动热点提升检查 - if tc.promoter.threshold > 0 { - go tc.promoteLoop() - } - - return tc, nil -} - -// Get 获取缓存条目(实现 CacheBackend 接口)。 -func (tc *TieredCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool, bool) { - // 1. 先查 L1 - entry, exists, stale := tc.l1.Get(hashKey, origKey) - if exists { - tc.l1Hits.Add(1) - return entry, true, stale - } - - // 2. 查 L2,必须验证 max_age - entry, exists, stale = tc.l2.Get(hashKey, origKey) - if !exists { - tc.misses.Add(1) - return nil, false, false - } - - tc.l2Hits.Add(1) - - // 3. 记录访问(用于热点提升) - tc.recordAccess(hashKey, origKey) - - // 4. L2 命中且未过期,异步提升到 L1 - if !stale { - go tc.promoteToL1(hashKey, entry) - } - - return entry, true, stale -} - -// GetStale 在上游错误时获取可用的过期缓存。 -// -// 先查 L1,再查 L2。 -func (tc *TieredCache) GetStale(hashKey uint64, origKey string, isTimeout bool) (*ProxyCacheEntry, bool) { - // 1. 先查 L1 - if entry, ok := tc.l1.GetStale(hashKey, origKey, isTimeout); ok { - tc.l1Hits.Add(1) - return entry, true - } - - // 2. 查 L2 - if entry, ok := tc.l2.GetStale(hashKey, origKey, isTimeout); ok { - tc.l2Hits.Add(1) - return entry, true - } - - tc.misses.Add(1) - return nil, false -} - -// Set 设置缓存条目(实现 CacheBackend 接口)。 -func (tc *TieredCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) { - // 同时写入 L1 和 L2 - tc.l1.Set(hashKey, origKey, data, headers, status, maxAge) - - // L2 异步写入(在 goroutine 中执行) - go tc.l2.Set(hashKey, origKey, data, headers, status, maxAge) -} - -// Delete 删除缓存条目(实现 CacheBackend 接口)。 -func (tc *TieredCache) Delete(hashKey uint64) error { - _ = tc.l1.Delete(hashKey) - return tc.l2.Delete(hashKey) -} - -// CacheStats 返回缓存统计信息(实现 CacheBackend 接口)。 -func (tc *TieredCache) CacheStats() CacheStats { - l1Stats := tc.l1.CacheStats() - l2Stats := tc.l2.CacheStats() - - return CacheStats{ - Entries: l1Stats.Entries + l2Stats.Entries, - Size: l1Stats.Size + l2Stats.Size, - HitCount: tc.l1Hits.Load() + tc.l2Hits.Load(), - MissCount: tc.misses.Load(), - Evictions: l2Stats.Evictions, - } -} - -// TieredCacheStats 返回分层缓存详细统计。 -func (tc *TieredCache) TieredCacheStats() TieredCacheStats { - return TieredCacheStats{ - L1Hits: tc.l1Hits.Load(), - L2Hits: tc.l2Hits.Load(), - Misses: tc.misses.Load(), - Promotes: tc.promotes.Load(), - L1Entries: tc.l1.CacheStats().Entries, - L2Entries: tc.l2.CacheStats().Entries, - } -} - -// TieredCacheStats 分层缓存详细统计。 -type TieredCacheStats struct { - L1Hits int64 - L2Hits int64 - Misses int64 - Promotes int64 - L1Entries int64 - L2Entries int64 -} - -// Stop 停止分层缓存。 -func (tc *TieredCache) Stop() { - close(tc.stopCh) - if tc.l2 != nil { - tc.l2.Stop() - } -} - -// recordAccess 记录访问(用于热点提升)。 -func (tc *TieredCache) recordAccess(hashKey uint64, origKey string) { - if tc.promoter.threshold <= 0 { - return - } - - tc.promoter.mu.Lock() - defer tc.promoter.mu.Unlock() - - info, exists := tc.promoter.accessMap[hashKey] - if !exists { - info = &accessInfo{origKey: origKey} - tc.promoter.accessMap[hashKey] = info - } - info.count++ - info.lastAccess = time.Now() -} - -// promoteToL1 提升条目到 L1。 -func (tc *TieredCache) promoteToL1(hashKey uint64, entry *ProxyCacheEntry) { - tc.l1.Set(hashKey, entry.OrigKey, entry.Data, entry.Headers, entry.Status, entry.MaxAge) - tc.promotes.Add(1) -} - -// promoteLoop 热点提升检查循环。 -func (tc *TieredCache) promoteLoop() { - ticker := time.NewTicker(tc.promoter.interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - tc.checkAndPromote() - case <-tc.stopCh: - return - } - } -} - -// checkAndPromote 检查并提升热点数据。 -func (tc *TieredCache) checkAndPromote() { - tc.promoter.mu.Lock() - defer tc.promoter.mu.Unlock() - - for hashKey, info := range tc.promoter.accessMap { - if info.count >= tc.promoter.threshold { - // 从 L2 获取并提升到 L1 - entry, exists, _ := tc.l2.Get(hashKey, info.origKey) - if exists { - tc.promoteToL1(hashKey, entry) - } - // 重置计数 - info.count = 0 - } - } -} diff --git a/internal/cache/tiered_cache_test.go b/internal/cache/tiered_cache_test.go deleted file mode 100644 index 578f513..0000000 --- a/internal/cache/tiered_cache_test.go +++ /dev/null @@ -1,431 +0,0 @@ -package cache - -import ( - "testing" - "time" -) - -func TestNewTieredCache(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - PromoteThreshold: 3, - PromoteInterval: 100 * time.Millisecond, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - // 等待 L2 懒加载完成 - <-tc.l2.loadCh - - if tc.l1 == nil { - t.Error("l1 should not be nil") - } - if tc.l2 == nil { - t.Error("l2 should not be nil") - } -} - -func TestTieredCacheSetGet(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 设置缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("test response data") - - tc.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) - - // 等待 L2 异步写入完成 - time.Sleep(50 * time.Millisecond) - - // 获取缓存(应该从 L1 获取) - entry, exists, stale := tc.Get(hashKey, origKey) - if !exists { - t.Fatal("cache entry not found") - } - if stale { - t.Error("entry should not be stale") - } - if string(entry.Data) != string(data) { - t.Errorf("Data = %q, want %q", entry.Data, data) - } - - // 验证 L1 命中 - stats := tc.TieredCacheStats() - if stats.L1Hits != 1 { - t.Errorf("L1Hits = %d, want 1", stats.L1Hits) - } -} - -func TestTieredCacheL2Fallback(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 直接写入 L2(绕过 L1) - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("test from l2") - - tc.l2.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) - time.Sleep(50 * time.Millisecond) - - // 获取缓存(应该从 L2 获取) - entry, exists, stale := tc.Get(hashKey, origKey) - if !exists { - t.Fatal("cache entry not found") - } - if stale { - t.Error("entry should not be stale") - } - if string(entry.Data) != string(data) { - t.Errorf("Data = %q, want %q", entry.Data, data) - } - - // 验证 L2 命中 - stats := tc.TieredCacheStats() - if stats.L2Hits != 1 { - t.Errorf("L2Hits = %d, want 1", stats.L2Hits) - } -} - -func TestTieredCacheDelete(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 设置缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - tc.Set(hashKey, origKey, []byte("test"), nil, 200, 10*time.Minute) - time.Sleep(50 * time.Millisecond) - - // 删除 - if err := tc.Delete(hashKey); err != nil { - t.Fatalf("Delete failed: %v", err) - } - - // 验证 L1 和 L2 都已删除 - _, exists, _ := tc.l1.Get(hashKey, origKey) - if exists { - t.Error("entry should not exist in L1 after delete") - } - - _, exists, _ = tc.l2.Get(hashKey, origKey) - if exists { - t.Error("entry should not exist in L2 after delete") - } -} - -func TestTieredCacheStale(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 设置一个已过期的缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - tc.l2.Set(hashKey, origKey, []byte("test"), nil, 200, 1*time.Millisecond) - - // 等待过期 - time.Sleep(10 * time.Millisecond) - - // 获取缓存 - _, exists, stale := tc.Get(hashKey, origKey) - if !exists { - t.Fatal("expired entry should still exist") - } - if !stale { - t.Error("expired entry should be marked as stale") - } -} - -func TestTieredCachePromote(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - PromoteThreshold: 2, - PromoteInterval: 50 * time.Millisecond, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 直接写入 L2 - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("test data") - tc.l2.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) - time.Sleep(50 * time.Millisecond) - - // 访问两次(达到阈值) - tc.Get(hashKey, origKey) - tc.Get(hashKey, origKey) - - // 等待提升检查 - time.Sleep(100 * time.Millisecond) - - // 验证提升发生 - stats := tc.TieredCacheStats() - if stats.Promotes == 0 { - t.Error("promotes should be > 0 after reaching threshold") - } -} - -func TestTieredCacheStats(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 设置缓存 - tc.Set(1, "key1", []byte("data1"), nil, 200, 10*time.Minute) - tc.Set(2, "key2", []byte("data2"), nil, 200, 10*time.Minute) - time.Sleep(50 * time.Millisecond) - - // 获取缓存 - tc.Get(1, "key1") // L1 命中 - tc.Get(1, "key1") // L1 命中 - tc.Get(999, "nonexistent") // 未命中 - - stats := tc.TieredCacheStats() - if stats.L1Hits != 2 { - t.Errorf("L1Hits = %d, want 2", stats.L1Hits) - } - if stats.Misses != 1 { - t.Errorf("Misses = %d, want 1", stats.Misses) - } -} - -func TestTieredCacheRestart(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - // 第一个实例:写入数据 - tc1, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - <-tc1.l2.loadCh - - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("persistent data") - tc1.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) - time.Sleep(50 * time.Millisecond) - tc1.Stop() - - // 第二个实例:读取数据(模拟重启) - tc2, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache (restart) failed: %v", err) - } - <-tc2.l2.loadCh - defer tc2.Stop() - - // 验证数据从 L2 恢复 - entry, exists, _ := tc2.Get(hashKey, origKey) - if !exists { - t.Fatal("entry should exist after restart") - } - if string(entry.Data) != string(data) { - t.Errorf("Data = %q, want %q", entry.Data, data) - } - - // 验证是从 L2 获取的 - stats := tc2.TieredCacheStats() - if stats.L2Hits != 1 { - t.Errorf("L2Hits = %d, want 1", stats.L2Hits) - } -} - -func TestTieredCacheCacheStats(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 设置缓存 - tc.Set(1, "key1", []byte("data1"), nil, 200, 10*time.Minute) - time.Sleep(50 * time.Millisecond) - - // 获取缓存统计 - stats := tc.CacheStats() - if stats.Entries < 1 { - t.Errorf("Entries = %d, should be >= 1", stats.Entries) - } -} - -func TestTieredCacheGetStaleL1Hit(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L1MaxEntries: 100, - L1MaxSize: 1024 * 1024, - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - StaleIfError: 200 * time.Millisecond, - StaleIfTimeout: 0, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - hashKey := uint64(54321) - origKey := "GET:/api/tiered" - tc.Set(hashKey, origKey, []byte("l1data"), nil, 200, 100*time.Millisecond) - - // 等待过期但仍在 stale_if_error 窗口内 - time.Sleep(150 * time.Millisecond) - - // isTimeout=false,应该从 L1 获取 stale 缓存 - entry, ok := tc.GetStale(hashKey, origKey, false) - if !ok { - t.Error("stale entry should be usable on error from L1") - } - if entry == nil || string(entry.Data) != "l1data" { - t.Errorf("entry.Data = %v, want %q", entry, "l1data") - } - - // isTimeout=true,staleIfTimeout=0,不应该可用 - if _, ok2 := tc.GetStale(hashKey, origKey, true); ok2 { - t.Error("stale entry should NOT be usable on timeout when staleIfTimeout=0") - } -} - -func TestTieredCacheGetStaleMiss(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L1MaxEntries: 100, - L1MaxSize: 1024 * 1024, - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - StaleIfError: 200 * time.Millisecond, - StaleIfTimeout: 200 * time.Millisecond, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - // 不存在的 key - if _, ok := tc.GetStale(99999, "nonexistent", false); ok { - t.Error("should not find nonexistent key") - } - - if _, ok2 := tc.GetStale(99999, "nonexistent", true); ok2 { - t.Error("should not find nonexistent key on timeout") - } -} diff --git a/internal/config/loader.go b/internal/config/loader.go deleted file mode 100644 index bac2cdb..0000000 --- a/internal/config/loader.go +++ /dev/null @@ -1,155 +0,0 @@ -// Package config 提供 YAML 配置文件的解析、验证和默认配置生成功能。 -// -// 该文件包含配置加载器相关的核心逻辑,包括: -// - 多文件配置合并(include 指令支持) -// - DAG-safe 循环检测(防止配置文件循环引用) -// - Glob 模式展开(支持通配符路径) -// - 深度限制(防止无限递归) -// -// 主要用途: -// -// 用于加载主配置文件及其引用的子配置文件,合并为完整配置。 -// -// 注意事项: -// - 最大 include 深度为 10 层 -// - 循环引用会返回错误 -// - DAG 共享子配置允许(同一文件可被多处引用) -// -// 作者:xfy -package config - -import ( - "fmt" - "os" - "path/filepath" - - "gopkg.in/yaml.v3" -) - -const maxIncludeDepth = 10 - -// ConfigLoader 配置加载器 -type ConfigLoader struct { - loadedFiles map[string]bool // 所有已加载文件(用于跳过重复处理) - stack map[string]bool // 当前调用栈(用于 DAG 循环检测) - baseDir string - depth int -} - -// NewConfigLoader 构造函数 -func NewConfigLoader(mainConfigPath string) *ConfigLoader { - absPath, err := filepath.Abs(mainConfigPath) - if err != nil { - absPath = mainConfigPath - } - - return &ConfigLoader{ - baseDir: filepath.Dir(absPath), - loadedFiles: make(map[string]bool), - stack: make(map[string]bool), - depth: 0, - } -} - -// Load 加载配置(含 DAG-safe 循环检测) -func (l *ConfigLoader) Load(path string) (*Config, error) { - // 深度限制 - if l.depth > maxIncludeDepth { - return nil, fmt.Errorf("include depth exceeds maximum (%d)", maxIncludeDepth) - } - - absPath, err := filepath.Abs(path) - if err != nil { - return nil, fmt.Errorf("resolve path failed: %w", err) - } - - // DAG-safe 循环检测 - // 使用 stack 检测真正的循环(当前调用栈中的文件) - // 使用 loadedFiles 跳过已处理的文件(允许 DAG 共享子配置) - if l.stack[absPath] { - return nil, fmt.Errorf("circular include detected: '%s' is in current include chain", absPath) - } - - // 如果文件已处理过,跳过(不报错) - if l.loadedFiles[absPath] { - return &Config{}, nil // 返回空配置,跳过重复处理 - } - - l.stack[absPath] = true // 加入调用栈 - l.loadedFiles[absPath] = true // 标记已处理 - l.depth++ - - // 加载文件 - data, err := os.ReadFile(absPath) - if err != nil { - return nil, fmt.Errorf("read file failed: %w", err) - } - - var cfg Config - if err := yaml.Unmarshal(data, &cfg); err != nil { - return nil, fmt.Errorf("parse yaml failed: %w", err) - } - - // 处理 include - for _, inc := range cfg.Include { - files, err := l.expandGlob(inc.Path) - if err != nil { - return nil, err - } - - for _, f := range files { - subCfg, err := l.Load(f) // 递归 - if err != nil { - return nil, fmt.Errorf("include %s: %w", f, err) - } - - if err := l.merge(&cfg, subCfg, f); err != nil { - return nil, err - } - } - } - - // 清理调用栈 - delete(l.stack, absPath) - l.depth-- - return &cfg, nil -} - -// merge 合并配置 -func (l *ConfigLoader) merge(dst, src *Config, _ string) error { - // Server name collision(listen collision 由 validate.go 处理) - for _, newServer := range src.Servers { - for _, existing := range dst.Servers { - if newServer.Name == existing.Name { - return fmt.Errorf("server name collision: '%s'", newServer.Name) - } - } - } - dst.Servers = append(dst.Servers, src.Servers...) - - // Stream collision - for _, newStream := range src.Stream { - for _, existing := range dst.Stream { - if newStream.Listen == existing.Listen { - return fmt.Errorf("stream listen collision: '%s'", newStream.Listen) - } - } - } - dst.Stream = append(dst.Stream, src.Stream...) - - return nil -} - -// expandGlob 展开 glob 模式 -func (l *ConfigLoader) expandGlob(pattern string) ([]string, error) { - absPattern := l.resolvePath(pattern) - return filepath.Glob(absPattern) -} - -// resolvePath 解析路径 -func (l *ConfigLoader) resolvePath(path string) string { - if filepath.IsAbs(path) { - return path - } - return filepath.Join(l.baseDir, path) -} diff --git a/internal/config/loader_test.go b/internal/config/loader_test.go deleted file mode 100644 index 65d2298..0000000 --- a/internal/config/loader_test.go +++ /dev/null @@ -1,442 +0,0 @@ -// Package config 提供配置加载器测试。 -package config - -import ( - "os" - "path/filepath" - "testing" -) - -// TestNewConfigLoader 测试 ConfigLoader 构造函数。 -func TestNewConfigLoader(t *testing.T) { - t.Run("相对路径转换", func(t *testing.T) { - tmpDir := t.TempDir() - configPath := filepath.Join(tmpDir, "config.yaml") - - loader := NewConfigLoader(configPath) - if loader == nil { - t.Fatal("NewConfigLoader() returned nil") - } - - // 验证 baseDir 是配置文件所在目录 - expectedDir, _ := filepath.Abs(tmpDir) - if loader.baseDir != expectedDir { - t.Errorf("baseDir = %q, want %q", loader.baseDir, expectedDir) - } - }) - - t.Run("绝对路径保持不变", func(t *testing.T) { - absPath := "/etc/lolly/config.yaml" - loader := NewConfigLoader(absPath) - if loader == nil { - t.Fatal("NewConfigLoader() returned nil") - } - - if loader.baseDir != "/etc/lolly" { - t.Errorf("baseDir = %q, want /etc/lolly", loader.baseDir) - } - }) - - t.Run("初始化状态", func(t *testing.T) { - loader := NewConfigLoader("config.yaml") - - if loader.loadedFiles == nil { - t.Error("loadedFiles not initialized") - } - if loader.stack == nil { - t.Error("stack not initialized") - } - if loader.depth != 0 { - t.Errorf("depth = %d, want 0", loader.depth) - } - }) -} - -// TestConfigLoader_Load 测试配置加载。 -func TestConfigLoader_Load(t *testing.T) { - t.Run("单文件配置加载", func(t *testing.T) { - content := ` -servers: - - listen: ":8080" - name: "main" - static: - - path: "/" - root: "/var/www" -` - tmpDir := t.TempDir() - configPath := filepath.Join(tmpDir, "config.yaml") - if err := os.WriteFile(configPath, []byte(content), 0o644); err != nil { - t.Fatalf("写入配置文件失败: %v", err) - } - - loader := NewConfigLoader(configPath) - cfg, err := loader.Load(configPath) - if err != nil { - t.Fatalf("Load() 失败: %v", err) - } - - if len(cfg.Servers) != 1 { - t.Errorf("len(Servers) = %d, want 1", len(cfg.Servers)) - } - if cfg.Servers[0].Listen != ":8080" { - t.Errorf("Listen = %q, want :8080", cfg.Servers[0].Listen) - } - }) - - t.Run("文件不存在", func(t *testing.T) { - loader := NewConfigLoader("/nonexistent/config.yaml") - _, err := loader.Load("/nonexistent/config.yaml") - if err == nil { - t.Error("Load() 期望返回错误,但返回 nil") - } - }) - - t.Run("无效YAML", func(t *testing.T) { - content := `servers: [invalid yaml` - tmpDir := t.TempDir() - configPath := filepath.Join(tmpDir, "invalid.yaml") - if err := os.WriteFile(configPath, []byte(content), 0o644); err != nil { - t.Fatalf("写入配置文件失败: %v", err) - } - - loader := NewConfigLoader(configPath) - _, err := loader.Load(configPath) - if err == nil { - t.Error("Load() 期望返回错误,但返回 nil") - } - }) -} - -// TestConfigLoader_Include 测试 include 指令。 -func TestConfigLoader_Include(t *testing.T) { - t.Run("多文件include合并", func(t *testing.T) { - tmpDir := t.TempDir() - - // 主配置文件 - mainConfig := ` -servers: - - listen: ":8080" - name: "main" -include: - - path: "servers/*.yaml" -` - mainPath := filepath.Join(tmpDir, "main.yaml") - if err := os.WriteFile(mainPath, []byte(mainConfig), 0o644); err != nil { - t.Fatalf("写入主配置文件失败: %v", err) - } - - // 创建 servers 目录 - serversDir := filepath.Join(tmpDir, "servers") - if err := os.Mkdir(serversDir, 0o755); err != nil { - t.Fatalf("创建 servers 目录失败: %v", err) - } - - // 子配置文件1 - server1 := ` -servers: - - listen: ":8081" - name: "server1" -` - if err := os.WriteFile(filepath.Join(serversDir, "server1.yaml"), []byte(server1), 0o644); err != nil { - t.Fatalf("写入 server1.yaml 失败: %v", err) - } - - // 子配置文件2 - server2 := ` -servers: - - listen: ":8082" - name: "server2" -` - if err := os.WriteFile(filepath.Join(serversDir, "server2.yaml"), []byte(server2), 0o644); err != nil { - t.Fatalf("写入 server2.yaml 失败: %v", err) - } - - loader := NewConfigLoader(mainPath) - cfg, err := loader.Load(mainPath) - if err != nil { - t.Fatalf("Load() 失败: %v", err) - } - - // 应该有3个server: main + server1 + server2 - if len(cfg.Servers) != 3 { - t.Errorf("len(Servers) = %d, want 3", len(cfg.Servers)) - } - }) - - t.Run("循环引用检测", func(t *testing.T) { - tmpDir := t.TempDir() - - // a.yaml includes b.yaml - configA := ` -servers: - - listen: ":8080" - name: "a" -include: - - path: "b.yaml" -` - // b.yaml includes a.yaml (循环) - configB := ` -servers: - - listen: ":8081" - name: "b" -include: - - path: "a.yaml" -` - if err := os.WriteFile(filepath.Join(tmpDir, "a.yaml"), []byte(configA), 0o644); err != nil { - t.Fatalf("写入 a.yaml 失败: %v", err) - } - if err := os.WriteFile(filepath.Join(tmpDir, "b.yaml"), []byte(configB), 0o644); err != nil { - t.Fatalf("写入 b.yaml 失败: %v", err) - } - - loader := NewConfigLoader(filepath.Join(tmpDir, "a.yaml")) - _, err := loader.Load(filepath.Join(tmpDir, "a.yaml")) - if err == nil { - t.Error("Load() 期望返回循环引用错误,但返回 nil") - } - }) - - t.Run("深度超限", func(t *testing.T) { - tmpDir := t.TempDir() - - // 创建深度嵌套的配置文件链 - for i := range 12 { - config := ` -servers: - - listen: ":8080" -` - if i < 11 { - config += `include: - - path: "next.yaml" -` - } - filename := "config.yaml" - if i > 0 { - filename = "next.yaml" - } - // 每个层级创建子目录 - subDir := filepath.Join(tmpDir, "level"+string(rune('0'+i))) - _ = os.Mkdir(subDir, 0o755) - if i == 0 { - if err := os.WriteFile(filepath.Join(tmpDir, filename), []byte(config), 0o644); err != nil { - t.Fatalf("写入配置文件失败: %v", err) - } - } - } - - // 简化测试:直接测试深度限制 - loader := NewConfigLoader(filepath.Join(tmpDir, "config.yaml")) - loader.depth = 11 // 超过 maxIncludeDepth (10) - - _, err := loader.Load(filepath.Join(tmpDir, "config.yaml")) - if err == nil { - t.Error("Load() 期望返回深度超限错误,但返回 nil") - } - }) - - t.Run("DAG共享子配置", func(t *testing.T) { - tmpDir := t.TempDir() - - // shared.yaml - 被多处引用 - shared := ` -servers: - - listen: ":9090" - name: "shared" -` - if err := os.WriteFile(filepath.Join(tmpDir, "shared.yaml"), []byte(shared), 0o644); err != nil { - t.Fatalf("写入 shared.yaml 失败: %v", err) - } - - // main.yaml - 引用 shared.yaml 两次(应该只处理一次) - main := ` -servers: - - listen: ":8080" - name: "main" -include: - - path: "shared.yaml" - - path: "shared.yaml" -` - if err := os.WriteFile(filepath.Join(tmpDir, "main.yaml"), []byte(main), 0o644); err != nil { - t.Fatalf("写入 main.yaml 失败: %v", err) - } - - loader := NewConfigLoader(filepath.Join(tmpDir, "main.yaml")) - cfg, err := loader.Load(filepath.Join(tmpDir, "main.yaml")) - if err != nil { - t.Fatalf("Load() 失败: %v", err) - } - - // shared 应该只被处理一次 - sharedCount := 0 - for _, s := range cfg.Servers { - if s.Name == "shared" { - sharedCount++ - } - } - if sharedCount != 1 { - t.Errorf("shared server count = %d, want 1", sharedCount) - } - }) -} - -// TestConfigLoader_Merge 测试配置合并。 -func TestConfigLoader_Merge(t *testing.T) { - t.Run("server name冲突", func(t *testing.T) { - tmpDir := t.TempDir() - - main := ` -servers: - - listen: ":8080" - name: "duplicate" -include: - - path: "sub.yaml" -` - sub := ` -servers: - - listen: ":8081" - name: "duplicate" -` - if err := os.WriteFile(filepath.Join(tmpDir, "main.yaml"), []byte(main), 0o644); err != nil { - t.Fatalf("写入 main.yaml 失败: %v", err) - } - if err := os.WriteFile(filepath.Join(tmpDir, "sub.yaml"), []byte(sub), 0o644); err != nil { - t.Fatalf("写入 sub.yaml 失败: %v", err) - } - - loader := NewConfigLoader(filepath.Join(tmpDir, "main.yaml")) - _, err := loader.Load(filepath.Join(tmpDir, "main.yaml")) - if err == nil { - t.Error("Load() 期望返回 server name 冲突错误,但返回 nil") - } - }) - - t.Run("stream listen冲突", func(t *testing.T) { - tmpDir := t.TempDir() - - main := ` -stream: - - listen: "12345" - proxy_pass: "backend:54321" -include: - - path: "sub.yaml" -` - sub := ` -stream: - - listen: "12345" - proxy_pass: "backend2:54321" -` - if err := os.WriteFile(filepath.Join(tmpDir, "main.yaml"), []byte(main), 0o644); err != nil { - t.Fatalf("写入 main.yaml 失败: %v", err) - } - if err := os.WriteFile(filepath.Join(tmpDir, "sub.yaml"), []byte(sub), 0o644); err != nil { - t.Fatalf("写入 sub.yaml 失败: %v", err) - } - - loader := NewConfigLoader(filepath.Join(tmpDir, "main.yaml")) - _, err := loader.Load(filepath.Join(tmpDir, "main.yaml")) - if err == nil { - t.Error("Load() 期望返回 stream listen 冲突错误,但返回 nil") - } - }) - - t.Run("正常合并", func(t *testing.T) { - tmpDir := t.TempDir() - - main := ` -servers: - - listen: ":8080" - name: "main" -include: - - path: "sub.yaml" -` - sub := ` -servers: - - listen: ":8081" - name: "sub" -stream: - - listen: "12345" - proxy_pass: "backend:54321" -` - if err := os.WriteFile(filepath.Join(tmpDir, "main.yaml"), []byte(main), 0o644); err != nil { - t.Fatalf("写入 main.yaml 失败: %v", err) - } - if err := os.WriteFile(filepath.Join(tmpDir, "sub.yaml"), []byte(sub), 0o644); err != nil { - t.Fatalf("写入 sub.yaml 失败: %v", err) - } - - loader := NewConfigLoader(filepath.Join(tmpDir, "main.yaml")) - cfg, err := loader.Load(filepath.Join(tmpDir, "main.yaml")) - if err != nil { - t.Fatalf("Load() 失败: %v", err) - } - - if len(cfg.Servers) != 2 { - t.Errorf("len(Servers) = %d, want 2", len(cfg.Servers)) - } - if len(cfg.Stream) != 1 { - t.Errorf("len(Stream) = %d, want 1", len(cfg.Stream)) - } - }) -} - -// TestConfigLoader_Glob 测试 glob 模式展开。 -func TestConfigLoader_Glob(t *testing.T) { - t.Run("glob模式匹配", func(t *testing.T) { - tmpDir := t.TempDir() - - // 创建多个配置文件 - for i := range 3 { - content := ` -servers: - - listen: ":8080" -` - filename := filepath.Join(tmpDir, "server"+string(rune('0'+i+1))+".yaml") - if err := os.WriteFile(filename, []byte(content), 0o644); err != nil { - t.Fatalf("写入配置文件失败: %v", err) - } - } - - loader := NewConfigLoader(tmpDir) - files, err := loader.expandGlob(filepath.Join(tmpDir, "server*.yaml")) - if err != nil { - t.Fatalf("expandGlob() 失败: %v", err) - } - - if len(files) != 3 { - t.Errorf("len(files) = %d, want 3", len(files)) - } - }) - - t.Run("无匹配文件", func(t *testing.T) { - loader := NewConfigLoader("/tmp") - files, err := loader.expandGlob("/nonexistent/*.yaml") - if err != nil { - t.Fatalf("expandGlob() 失败: %v", err) - } - if len(files) != 0 { - t.Errorf("len(files) = %d, want 0", len(files)) - } - }) -} - -// TestConfigLoader_ResolvePath 测试路径解析。 -func TestConfigLoader_ResolvePath(t *testing.T) { - t.Run("相对路径解析", func(t *testing.T) { - loader := NewConfigLoader("/etc/lolly/config.yaml") - - result := loader.resolvePath("servers/config.yaml") - expected := "/etc/lolly/servers/config.yaml" - if result != expected { - t.Errorf("resolvePath() = %q, want %q", result, expected) - } - }) - - t.Run("绝对路径保持不变", func(t *testing.T) { - loader := NewConfigLoader("/etc/lolly/config.yaml") - - result := loader.resolvePath("/absolute/path.yaml") - if result != "/absolute/path.yaml" { - t.Errorf("resolvePath() = %q, want /absolute/path.yaml", result) - } - }) -} diff --git a/internal/config/validate.go b/internal/config/validate.go index 2148821..69d9a6e 100644 --- a/internal/config/validate.go +++ b/internal/config/validate.go @@ -129,14 +129,7 @@ func ValidateNonNegative[T SignedInteger](value T, fieldName string) error { return nil } -// ValidateNonNegativeInt64 验证 int64 值为非负数 -// Deprecated: 使用 ValidateNonNegative[int64] 代替 -func ValidateNonNegativeInt64(value int64, fieldName string) error { - return ValidateNonNegative(value, fieldName) -} - // ValidateNonNegativeDuration 验证 time.Duration 值为非负数 -// Deprecated: 使用 ValidateNonNegative[int64] 代替(time.Duration 是 int64 别名) func ValidateNonNegativeDuration(value time.Duration, fieldName string) error { return ValidateNonNegative(int64(value), fieldName) } diff --git a/internal/lua/api_log.go b/internal/lua/api_log.go index e2f951a..f9e8967 100644 --- a/internal/lua/api_log.go +++ b/internal/lua/api_log.go @@ -407,7 +407,6 @@ func luaSchedulerLog(L *glua.LState) int { // 在实际实现中,可以通过 engine 的 logger 输出 _ = level _ = msg - // fmt.Printf("[timer] %s\n", msg) return 0 } diff --git a/internal/server/start_integration_test.go b/internal/server/start_integration_test.go index cbeeeb5..bfe0a16 100644 --- a/internal/server/start_integration_test.go +++ b/internal/server/start_integration_test.go @@ -514,6 +514,7 @@ func TestStart_WithDelayedBackend(t *testing.T) { // 启动延迟的 mock 服务器 backendAddr, cleanup := tools.DelayedMockBackend( 100*time.Millisecond, + fasthttp.StatusOK, []byte(`{"message": "delayed response"}`), ) defer cleanup() @@ -544,9 +545,9 @@ func TestStart_WithDelayedBackend(t *testing.T) { func TestStart_WithRandomResponse(t *testing.T) { // 启动随机响应的 mock 服务器 backendAddr, cleanup := tools.StartMockFasthttpBackend(tools.MockBackendConfig{ - Mode: tools.ModeRandomResponse, - StatusCode: fasthttp.StatusOK, - Body: []byte(`{"random": true}`), + Mode: tools.ModeRandomResponse, + StatusCode: fasthttp.StatusOK, + ResponseBody: []byte(`{"random": true}`), }) defer cleanup()