diff --git a/internal/benchmark/tools/tools.go b/internal/benchmark/tools/tools.go new file mode 100644 index 0000000..0f07639 --- /dev/null +++ b/internal/benchmark/tools/tools.go @@ -0,0 +1,162 @@ +// Package tools 提供基准测试和集成测试的辅助工具。 +package tools + +import ( + "net" + "time" + + "github.com/valyala/fasthttp" +) + +// 预定义的测试数据大小常量 +const ( + Size100B = 100 + Size1KB = 1024 + Size10KB = 10 * 1024 + Size100KB = 100 * 1024 + Size1MB = 1024 * 1024 +) + +// MockBackendConfig Mock 后端配置 +type MockBackendConfig struct { + // Mode 运行模式 + Mode string + // StatusCode 响应状态码 + StatusCode int + // ResponseBody 响应体 + ResponseBody []byte + // ErrorRate 错误率 (0.0 - 1.0) + ErrorRate float64 + // Delay 响应延迟 + Delay time.Duration +} + +// Mock 后端运行模式 +const ( + ModeNormalResponse = "normal" + ModeRandomResponse = "random" + ModeErrorResponse = "error" + ModeDelayedResponse = "delayed" +) + +// GenerateTestData 生成指定大小的测试数据。 +func GenerateTestData(size int) []byte { + data := make([]byte, size) + for i := range data { + data[i] = byte(i % 256) + } + return data +} + +// SimpleMockBackend 创建一个简单的 Mock HTTP 后端。 +// 返回监听地址和清理函数。 +func SimpleMockBackend(statusCode int, responseBody []byte) (string, func()) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + + handler := func(ctx *fasthttp.RequestCtx) { + ctx.SetStatusCode(statusCode) + ctx.SetBody(responseBody) + } + + go fasthttp.Serve(ln, handler) + + return ln.Addr().String(), func() { + ln.Close() + } +} + +// ErrorMockBackend 创建一个返回错误的 Mock HTTP 后端。 +func ErrorMockBackend(errorRate float64, errorBody []byte) (string, func()) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + + var requestCount int + handler := func(ctx *fasthttp.RequestCtx) { + requestCount++ + if float64(requestCount%100)/100 < errorRate { + ctx.SetStatusCode(fasthttp.StatusInternalServerError) + ctx.SetBody(errorBody) + return + } + ctx.SetStatusCode(fasthttp.StatusOK) + ctx.SetBody([]byte("OK")) + } + + go fasthttp.Serve(ln, handler) + + return ln.Addr().String(), func() { + ln.Close() + } +} + +// DelayedMockBackend 创建一个带延迟的 Mock HTTP 后端。 +func DelayedMockBackend(delay time.Duration, statusCode int, responseBody []byte) (string, func()) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + + handler := func(ctx *fasthttp.RequestCtx) { + time.Sleep(delay) + ctx.SetStatusCode(statusCode) + ctx.SetBody(responseBody) + } + + go fasthttp.Serve(ln, handler) + + return ln.Addr().String(), func() { + ln.Close() + } +} + +// StartMockFasthttpBackend 根据配置启动 Mock HTTP 后端。 +func StartMockFasthttpBackend(config MockBackendConfig) (string, func()) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + + var requestCount int + handler := func(ctx *fasthttp.RequestCtx) { + requestCount++ + + switch config.Mode { + case ModeDelayedResponse: + time.Sleep(config.Delay) + ctx.SetStatusCode(config.StatusCode) + ctx.SetBody(config.ResponseBody) + + case ModeErrorResponse: + if float64(requestCount%100)/100 < config.ErrorRate { + ctx.SetStatusCode(fasthttp.StatusInternalServerError) + ctx.SetBody([]byte(`{"error": "internal error"}`)) + return + } + ctx.SetStatusCode(config.StatusCode) + ctx.SetBody(config.ResponseBody) + + case ModeRandomResponse: + if requestCount%2 == 0 { + ctx.SetStatusCode(fasthttp.StatusOK) + } else { + ctx.SetStatusCode(fasthttp.StatusCreated) + } + ctx.SetBody(config.ResponseBody) + + default: + ctx.SetStatusCode(config.StatusCode) + ctx.SetBody(config.ResponseBody) + } + } + + go fasthttp.Serve(ln, handler) + + return ln.Addr().String(), func() { + ln.Close() + } +} diff --git a/internal/cache/disk_cache.go b/internal/cache/disk_cache.go deleted file mode 100644 index 54bdc9f..0000000 --- a/internal/cache/disk_cache.go +++ /dev/null @@ -1,519 +0,0 @@ -// Package cache 提供文件缓存和代理缓存功能。 -// -// 该文件实现 DiskCache 磁盘缓存后端,支持: -// - 目录层级配置(levels=1:2) -// - 原子写入策略(.tmp → .data) -// - 懒加载(后台加载元数据,不阻塞启动) -// - CRC32 校验和验证数据完整性 -// -// 主要用途: -// -// 作为 L2 缓存层,持久化代理响应到磁盘,支持服务重启后恢复。 -// -// 作者:xfy -package cache - -import ( - "encoding/json" - "fmt" - "hash/crc32" - "os" - "path/filepath" - "sync" - "sync/atomic" - "time" - - "rua.plus/lolly/internal/logging" -) - -// DiskCacheConfig 磁盘缓存配置。 -type DiskCacheConfig struct { - // Path 缓存根目录 - Path string - - // Levels 目录层级,如 "1:2" 表示两级目录 - Levels string - - // MaxSize 最大缓存大小(字节) - MaxSize int64 - - // Inactive 未访问淘汰时间 - Inactive time.Duration - - // StaleIfError 错误时使用过期缓存的窗口 - StaleIfError time.Duration - - // StaleIfTimeout 超时时使用过期缓存的窗口 - StaleIfTimeout time.Duration -} - -// DiskCache 磁盘缓存实现。 -type DiskCache struct { - basePath string - levels []int - maxSize int64 - inactive time.Duration - staleIfError time.Duration - staleIfTimeout time.Duration - currentSize atomic.Int64 - entries map[uint64]*DiskCacheMeta - mu sync.RWMutex - stopCh chan struct{} - - // 懒加载相关 - loaded atomic.Bool - loadCh chan struct{} - - // 统计 - hitCount atomic.Int64 - missCount atomic.Int64 - evictions atomic.Int64 -} - -// DiskCacheMeta 磁盘缓存元数据。 -type DiskCacheMeta struct { - HashKey uint64 `json:"hash_key"` - OrigKey string `json:"orig_key"` - Created time.Time `json:"created"` - MaxAge time.Duration `json:"max_age"` - Status int `json:"status"` - Size int64 `json:"size"` - Headers map[string]string `json:"headers,omitempty"` - CRC32 uint32 `json:"crc32"` -} - -// DiskCacheEntry 磁盘缓存条目(包含数据)。 -type DiskCacheEntry struct { - *DiskCacheMeta - Data []byte -} - -// NewDiskCache 创建磁盘缓存实例(懒加载模式)。 -func NewDiskCache(cfg *DiskCacheConfig) (*DiskCache, error) { - if cfg.Path == "" { - return nil, fmt.Errorf("disk cache path is required") - } - - // 确保目录存在 - if err := os.MkdirAll(cfg.Path, 0o755); err != nil { - return nil, fmt.Errorf("create cache directory: %w", err) - } - - dc := &DiskCache{ - basePath: cfg.Path, - levels: parseLevels(cfg.Levels), - maxSize: cfg.MaxSize, - inactive: cfg.Inactive, - staleIfError: cfg.StaleIfError, - staleIfTimeout: cfg.StaleIfTimeout, - entries: make(map[uint64]*DiskCacheMeta), - loadCh: make(chan struct{}), - stopCh: make(chan struct{}), - } - - // 启动后台加载,不阻塞服务启动 - go dc.lazyLoad() - - return dc, nil -} - -// parseLevels 解析目录层级配置。 -// 支持格式:""(无层级)、"1"(一级)、"1:2"(两级) -func parseLevels(levels string) []int { - if levels == "" { - return nil - } - - var result []int - start := 0 - for i := 0; i <= len(levels); i++ { - if i == len(levels) || levels[i] == ':' { - var level int - for j := start; j < i; j++ { - level = level*10 + int(levels[j]-'0') - } - if level > 0 { - result = append(result, level) - } - start = i + 1 - } - } - return result -} - -// lazyLoad 后台加载缓存元数据。 -func (dc *DiskCache) lazyLoad() { - defer close(dc.loadCh) - - // 扫描目录加载元数据(不加载实际数据) - _ = filepath.Walk(dc.basePath, func(path string, info os.FileInfo, err error) error { - if err != nil || info.IsDir() { - return nil - } - - // 只处理 .meta 文件 - if filepath.Ext(path) != ".meta" { - return nil - } - - meta := dc.loadMetaFile(path) - if meta != nil { - dc.mu.Lock() - dc.entries[meta.HashKey] = meta - dc.mu.Unlock() - dc.currentSize.Add(meta.Size) - } - - return nil - }) - - dc.loaded.Store(true) -} - -// loadMetaFile 加载元数据文件。 -func (dc *DiskCache) loadMetaFile(path string) *DiskCacheMeta { - data, err := os.ReadFile(path) - if err != nil { - return nil - } - - var meta DiskCacheMeta - if err := json.Unmarshal(data, &meta); err != nil { - return nil - } - - return &meta -} - -// Get 获取缓存条目(实现 CacheBackend 接口)。 -func (dc *DiskCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool, bool) { - // 如果未完成加载,等待加载完成或超时 - if !dc.loaded.Load() { - select { - case <-dc.loadCh: - // 加载完成,继续 - case <-time.After(100 * time.Millisecond): - // 超时,返回未命中(避免阻塞请求) - dc.missCount.Add(1) - return nil, false, false - } - } - - dc.mu.RLock() - meta, exists := dc.entries[hashKey] - dc.mu.RUnlock() - - if !exists { - dc.missCount.Add(1) - return nil, false, false - } - - // 双重验证:检查原始 key 是否匹配 - if meta.OrigKey != origKey { - dc.missCount.Add(1) - return nil, false, false - } - - // 读取数据文件 - dataPath := dc.filePathFromHash(hashKey, "data") - data, err := os.ReadFile(dataPath) - if err != nil { - dc.missCount.Add(1) - return nil, false, false - } - - // 验证 CRC32 - if meta.CRC32 != 0 { - if crc32.ChecksumIEEE(data) != meta.CRC32 { - // 数据损坏,删除条目 - _ = dc.Delete(hashKey) - dc.missCount.Add(1) - return nil, false, false - } - } - - // 检查是否过期 - now := time.Now() - expiresAt := meta.Created.Add(meta.MaxAge) - stale := now.After(expiresAt) - - dc.hitCount.Add(1) - - // 转换为 ProxyCacheEntry - entry := &ProxyCacheEntry{ - Key: meta.OrigKey, - OrigKey: meta.OrigKey, - Data: data, - Headers: meta.Headers, - Status: meta.Status, - Created: meta.Created, - MaxAge: meta.MaxAge, - } - - return entry, true, stale -} - -// GetStale 在上游错误时获取可用的过期缓存。 -// -// 与 Get 不同,GetStale 只在错误发生时使用,根据错误类型检查对应的 stale 窗口。 -func (dc *DiskCache) GetStale(hashKey uint64, origKey string, isTimeout bool) (*ProxyCacheEntry, bool) { - // 等待懒加载完成 - if !dc.loaded.Load() { - select { - case <-dc.loadCh: - case <-time.After(100 * time.Millisecond): - return nil, false - } - } - - dc.mu.RLock() - meta, ok := dc.entries[hashKey] - dc.mu.RUnlock() - - if !ok { - return nil, false - } - - // 双重验证:检查原始 key 是否匹配 - if meta.OrigKey != origKey { - return nil, false - } - - // 读取数据文件 - dataPath := dc.filePathFromHash(hashKey, "data") - data, err := os.ReadFile(dataPath) - if err != nil { - return nil, false - } - - // 验证 CRC32 - crc := crc32.ChecksumIEEE(data) - if crc != meta.CRC32 { - return nil, false - } - - now := time.Now() - expiresAt := meta.Created.Add(meta.MaxAge) - - // 未过期,直接返回 - if !now.After(expiresAt) { - entry := &ProxyCacheEntry{ - Key: meta.OrigKey, - OrigKey: meta.OrigKey, - Data: data, - Headers: meta.Headers, - Status: meta.Status, - Created: meta.Created, - MaxAge: meta.MaxAge, - } - return entry, true - } - - // 已过期,检查 stale 窗口 - var staleWindow time.Duration - if isTimeout { - staleWindow = dc.staleIfTimeout - } else { - staleWindow = dc.staleIfError - } - - if staleWindow <= 0 { - return nil, false - } - - // 检查是否在 stale 窗口内 - if now.Sub(expiresAt) > staleWindow { - return nil, false - } - - entry := &ProxyCacheEntry{ - Key: meta.OrigKey, - OrigKey: meta.OrigKey, - Data: data, - Headers: meta.Headers, - Status: meta.Status, - Created: meta.Created, - MaxAge: meta.MaxAge, - } - - return entry, true -} - -// Set 设置缓存条目(实现 CacheBackend 接口)。 -func (dc *DiskCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) { - // 计算文件路径 - dataPath := dc.filePathFromHash(hashKey, "data") - metaPath := dc.filePathFromHash(hashKey, "meta") - - // 确保目录存在 - dir := filepath.Dir(dataPath) - if err := os.MkdirAll(dir, 0o755); err != nil { - logging.Error().Err(err).Str("dir", dir).Msg("disk cache mkdir failed") - return - } - - // 计算 CRC32 - crc := crc32.ChecksumIEEE(data) - - // 创建元数据 - meta := &DiskCacheMeta{ - HashKey: hashKey, - OrigKey: origKey, - Created: time.Now(), - MaxAge: maxAge, - Status: status, - Size: int64(len(data)), - Headers: headers, - CRC32: crc, - } - - // 原子写入数据文件:先写临时文件,再重命名 - tmpDataPath := dataPath + ".tmp" - if err := os.WriteFile(tmpDataPath, data, 0o644); err != nil { - logging.Error().Err(err).Str("path", tmpDataPath).Msg("disk cache write failed") - return - } - if err := os.Rename(tmpDataPath, dataPath); err != nil { - _ = os.Remove(tmpDataPath) - logging.Error().Err(err).Str("from", tmpDataPath).Str("to", dataPath).Msg("disk cache rename failed") - return - } - - // 写入元数据文件 - metaData, err := json.Marshal(meta) - if err != nil { - logging.Error().Err(err).Msg("disk cache json marshal failed") - return - } - tmpMetaPath := metaPath + ".tmp" - if err := os.WriteFile(tmpMetaPath, metaData, 0o644); err != nil { - logging.Error().Err(err).Str("path", tmpMetaPath).Msg("disk cache write meta failed") - return - } - if err := os.Rename(tmpMetaPath, metaPath); err != nil { - _ = os.Remove(tmpMetaPath) - logging.Error().Err(err).Str("from", tmpMetaPath).Str("to", metaPath).Msg("disk cache rename meta failed") - return - } - - // 更新内存索引 - dc.mu.Lock() - oldMeta, existed := dc.entries[hashKey] - dc.entries[hashKey] = meta - dc.mu.Unlock() - - // 更新大小统计 - if existed && oldMeta != nil { - dc.currentSize.Add(-oldMeta.Size) - } - dc.currentSize.Add(meta.Size) - - // 检查是否需要淘汰 - if dc.maxSize > 0 && dc.currentSize.Load() > dc.maxSize { - go dc.evict() - } -} - -// Delete 删除缓存条目(实现 CacheBackend 接口)。 -func (dc *DiskCache) Delete(hashKey uint64) error { - dc.mu.Lock() - meta, exists := dc.entries[hashKey] - if exists { - delete(dc.entries, hashKey) - dc.currentSize.Add(-meta.Size) - dc.evictions.Add(1) - } - dc.mu.Unlock() - - if !exists { - return nil - } - - // 删除文件 - dataPath := dc.filePathFromHash(hashKey, "data") - metaPath := dc.filePathFromHash(hashKey, "meta") - _ = os.Remove(dataPath) - _ = os.Remove(metaPath) - - return nil -} - -// CacheStats 返回缓存统计信息(实现 CacheBackend 接口)。 -func (dc *DiskCache) CacheStats() CacheStats { - dc.mu.RLock() - entries := int64(len(dc.entries)) - dc.mu.RUnlock() - - return CacheStats{ - Entries: entries, - Size: dc.currentSize.Load(), - HitCount: dc.hitCount.Load(), - MissCount: dc.missCount.Load(), - Evictions: dc.evictions.Load(), - } -} - -// Stop 停止磁盘缓存。 -func (dc *DiskCache) Stop() { - close(dc.stopCh) -} - -// filePathFromHash 根据哈希值计算文件路径。 -func (dc *DiskCache) filePathFromHash(hashKey uint64, ext string) string { - // 将哈希值转换为十六进制字符串 - hashStr := fmt.Sprintf("%016x", hashKey) - - if len(dc.levels) == 0 { - return filepath.Join(dc.basePath, hashStr+"."+ext) - } - - // 根据层级构建路径 - parts := []string{dc.basePath} - offset := len(hashStr) - for _, level := range dc.levels { - if offset < level { - break - } - offset -= level - parts = append(parts, hashStr[offset:offset+level]) - } - parts = append(parts, hashStr+"."+ext) - - return filepath.Join(parts...) -} - -// evict 淘汰旧条目。 -func (dc *DiskCache) evict() { - // 简单策略:删除最旧的条目直到大小低于阈值 - targetSize := dc.maxSize * 9 / 10 // 淘汰到 90% - - for dc.currentSize.Load() > targetSize { - dc.mu.Lock() - // 找到最旧的条目 - var oldestKey uint64 - var oldestTime time.Time - for key, meta := range dc.entries { - if oldestKey == 0 || meta.Created.Before(oldestTime) { - oldestKey = key - oldestTime = meta.Created - } - } - - if oldestKey == 0 { - dc.mu.Unlock() - break - } - - meta := dc.entries[oldestKey] - delete(dc.entries, oldestKey) - dc.currentSize.Add(-meta.Size) - dc.evictions.Add(1) - dc.mu.Unlock() - - // 删除文件 - dataPath := dc.filePathFromHash(oldestKey, "data") - metaPath := dc.filePathFromHash(oldestKey, "meta") - _ = os.Remove(dataPath) - _ = os.Remove(metaPath) - } -} diff --git a/internal/cache/disk_cache_test.go b/internal/cache/disk_cache_test.go deleted file mode 100644 index 2d01299..0000000 --- a/internal/cache/disk_cache_test.go +++ /dev/null @@ -1,497 +0,0 @@ -package cache - -import ( - "os" - "path/filepath" - "testing" - "time" -) - -func TestNewDiskCache(t *testing.T) { - // 创建临时目录 - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - // 等待懒加载完成 - <-dc.loadCh - - if dc.basePath != tmpDir { - t.Errorf("basePath = %q, want %q", dc.basePath, tmpDir) - } -} - -func TestDiskCacheSetGet(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - // 等待懒加载完成 - <-dc.loadCh - - // 设置缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("test response data") - headers := map[string]string{"Content-Type": "application/json"} - status := 200 - maxAge := 10 * time.Minute - - dc.Set(hashKey, origKey, data, headers, status, maxAge) - - // 获取缓存 - entry, exists, stale := dc.Get(hashKey, origKey) - if !exists { - t.Fatal("cache entry not found") - } - if stale { - t.Error("entry should not be stale") - } - if string(entry.Data) != string(data) { - t.Errorf("Data = %q, want %q", entry.Data, data) - } - if entry.Status != status { - t.Errorf("Status = %d, want %d", entry.Status, status) - } - if entry.Headers["Content-Type"] != "application/json" { - t.Errorf("Headers[Content-Type] = %q, want %q", entry.Headers["Content-Type"], "application/json") - } -} - -func TestDiskCacheDelete(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - <-dc.loadCh - - // 设置缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("test"), nil, 200, 10*time.Minute) - - // 验证存在 - _, exists, _ := dc.Get(hashKey, origKey) - if !exists { - t.Fatal("entry should exist before delete") - } - - // 删除 - if err := dc.Delete(hashKey); err != nil { - t.Fatalf("Delete failed: %v", err) - } - - // 验证已删除 - _, exists, _ = dc.Get(hashKey, origKey) - if exists { - t.Error("entry should not exist after delete") - } -} - -func TestDiskCacheStale(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - <-dc.loadCh - - // 设置一个已过期的缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("test"), nil, 200, 1*time.Millisecond) - - // 等待过期 - time.Sleep(10 * time.Millisecond) - - // 获取缓存 - entry, exists, stale := dc.Get(hashKey, origKey) - if !exists { - t.Fatal("expired entry should still exist") - } - if !stale { - t.Error("expired entry should be marked as stale") - } - if string(entry.Data) != "test" { - t.Errorf("Data = %q, want %q", entry.Data, "test") - } -} - -func TestDiskCacheLevels(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - <-dc.loadCh - - // 设置缓存 - hashKey := uint64(0xabcdef1234567890) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("test"), nil, 200, 10*time.Minute) - - // 验证文件路径包含层级目录 - dataPath := dc.filePathFromHash(hashKey, "data") - if filepath.Dir(dataPath) == tmpDir { - t.Error("file should be in a subdirectory for levels=1:2") - } - - // 验证文件存在 - if _, err := os.Stat(dataPath); os.IsNotExist(err) { - t.Errorf("data file not found at %s", dataPath) - } -} - -func TestDiskCacheMaxSize(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - MaxSize: 100, // 很小的限制 - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - <-dc.loadCh - - // 设置多个缓存条目 - for i := range 10 { - hashKey := uint64(i) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("test data that is longer than 10 bytes"), nil, 200, 10*time.Minute) - } - - // 等待淘汰完成(淘汰是异步的) - time.Sleep(500 * time.Millisecond) - - // 验证淘汰发生(Evictions > 0) - stats := dc.CacheStats() - if stats.Evictions == 0 { - t.Error("Evictions should be > 0 when MaxSize is exceeded") - } -} - -func TestDiskCacheStats(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - <-dc.loadCh - - // 初始统计 - stats := dc.CacheStats() - if stats.Entries != 0 { - t.Errorf("initial Entries = %d, want 0", stats.Entries) - } - - // 设置缓存 - dc.Set(1, "key1", []byte("data1"), nil, 200, 10*time.Minute) - dc.Set(2, "key2", []byte("data2"), nil, 200, 10*time.Minute) - - stats = dc.CacheStats() - if stats.Entries != 2 { - t.Errorf("Entries = %d, want 2", stats.Entries) - } - - // 获取缓存(命中) - dc.Get(1, "key1") - stats = dc.CacheStats() - if stats.HitCount != 1 { - t.Errorf("HitCount = %d, want 1", stats.HitCount) - } - - // 获取不存在的缓存(未命中) - dc.Get(999, "nonexistent") - stats = dc.CacheStats() - if stats.MissCount != 1 { - t.Errorf("MissCount = %d, want 1", stats.MissCount) - } -} - -func TestDiskCacheCRC32(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - <-dc.loadCh - - // 设置缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("test data for crc32") - dc.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) - - // 获取缓存验证 CRC32 - entry, exists, _ := dc.Get(hashKey, origKey) - if !exists { - t.Fatal("entry not found") - } - if string(entry.Data) != string(data) { - t.Errorf("Data mismatch") - } -} - -func TestParseLevels(t *testing.T) { - tests := []struct { - input string - expected []int - }{ - {"", nil}, - {"1", []int{1}}, - {"1:2", []int{1, 2}}, - {"2:2:2", []int{2, 2, 2}}, - } - - for _, tt := range tests { - result := parseLevels(tt.input) - if len(result) != len(tt.expected) { - t.Errorf("parseLevels(%q) = %v, want %v", tt.input, result, tt.expected) - continue - } - for i, v := range result { - if v != tt.expected[i] { - t.Errorf("parseLevels(%q)[%d] = %d, want %d", tt.input, i, v, tt.expected[i]) - } - } - } -} - -func TestDiskCacheLazyLoad(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - - // 在懒加载完成前,loaded 应该是 false - // 但由于加载很快,我们无法可靠测试这个状态 - // 所以我们等待加载完成 - <-dc.loadCh - - if !dc.loaded.Load() { - t.Error("loaded should be true after lazyLoad completes") - } -} - -func TestDiskCacheRestart(t *testing.T) { - tmpDir := t.TempDir() - - // 第一个实例:写入数据 - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - } - - dc1, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - <-dc1.loadCh - - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("persistent data") - dc1.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) - dc1.Stop() - - // 第二个实例:读取数据(模拟重启) - dc2, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache (restart) failed: %v", err) - } - <-dc2.loadCh - defer dc2.Stop() - - // 验证数据恢复 - entry, exists, _ := dc2.Get(hashKey, origKey) - if !exists { - t.Fatal("entry should exist after restart") - } - if string(entry.Data) != string(data) { - t.Errorf("Data = %q, want %q", entry.Data, data) - } -} - -func TestDiskCacheGetStaleIfError(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - StaleIfError: 200 * time.Millisecond, - StaleIfTimeout: 0, - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - <-dc.loadCh - - hashKey := uint64(12345) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("data"), nil, 200, 100*time.Millisecond) - - // 等待过期但仍在 stale_if_error 窗口内 - time.Sleep(150 * time.Millisecond) - - // isTimeout=false,应该使用 staleIfError 窗口 - entry, ok := dc.GetStale(hashKey, origKey, false) - if !ok { - t.Error("stale entry should be usable on error") - } - if entry == nil || string(entry.Data) != "data" { - t.Errorf("entry.Data = %v, want %q", entry, "data") - } - - // isTimeout=true,staleIfTimeout=0,不应该可用 - if _, ok2 := dc.GetStale(hashKey, origKey, true); ok2 { - t.Error("stale entry should NOT be usable on timeout when staleIfTimeout=0") - } -} - -func TestDiskCacheGetStaleIfTimeout(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - StaleIfError: 0, - StaleIfTimeout: 300 * time.Millisecond, - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - <-dc.loadCh - - hashKey := uint64(12345) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("data"), nil, 200, 100*time.Millisecond) - - // 等待过期但仍在 stale_if_timeout 窗口内 - time.Sleep(250 * time.Millisecond) - - // isTimeout=true,应该使用 staleIfTimeout 窗口 - entry, ok := dc.GetStale(hashKey, origKey, true) - if !ok { - t.Error("stale entry should be usable on timeout") - } - if entry == nil { - t.Error("expected stale entry data") - } - - // isTimeout=false,staleIfError=0,不应该可用 - if _, ok2 := dc.GetStale(hashKey, origKey, false); ok2 { - t.Error("stale entry should NOT be usable on error when staleIfError=0") - } -} - -func TestDiskCacheGetStaleExpired(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - StaleIfError: 100 * time.Millisecond, - StaleIfTimeout: 100 * time.Millisecond, - } - - dc, err := NewDiskCache(cfg) - if err != nil { - t.Fatalf("NewDiskCache failed: %v", err) - } - defer dc.Stop() - <-dc.loadCh - - hashKey := uint64(12345) - origKey := "GET:/api/test" - dc.Set(hashKey, origKey, []byte("data"), nil, 200, 50*time.Millisecond) - - // 等待超过 stale 窗口 - time.Sleep(200 * time.Millisecond) - - if _, ok := dc.GetStale(hashKey, origKey, false); ok { - t.Error("stale entry should NOT be usable after stale window expired") - } - - if _, ok2 := dc.GetStale(hashKey, origKey, true); ok2 { - t.Error("stale entry should NOT be usable on timeout after stale window expired") - } -} diff --git a/internal/cache/file_cache.go b/internal/cache/file_cache.go index 2e954eb..da1298f 100644 --- a/internal/cache/file_cache.go +++ b/internal/cache/file_cache.go @@ -19,6 +19,7 @@ package cache import ( "container/list" + "hash/fnv" "slices" "strings" "sync" @@ -787,3 +788,71 @@ func (c *ProxyCache) CacheStats() CacheStats { Entries: int64(len(c.entries)), } } + +// HashPathWithMethod 使用 FNV-64a 计算路径和方法的哈希值。 +func HashPathWithMethod(path string, method string) uint64 { + if method == "" { + method = "GET" + } + key := method + ":" + path + h := fnv.New64a() + h.Write([]byte(key)) + return h.Sum64() +} + +// MatchPattern 检查路径是否匹配通配符模式。 +// +// 支持以下匹配模式: +// - "*":匹配所有路径 +// - 以 "*" 结尾:前缀匹配(如 "/api/*" 匹配 "/api/xxx") +// - 以 "/" 结尾:目录前缀匹配 +// - 中间通配符:"/api/*/users" 匹配 "/api/v1/users" +// - 其他:精确匹配 +func MatchPattern(pattern, path string) bool { + // 特殊情况:* 匹配所有 + if pattern == "*" { + return true + } + + // 目录前缀匹配(pattern 以 / 结尾) + if strings.HasSuffix(pattern, "/") { + return strings.HasPrefix(path, pattern) + } + + // 检查是否有通配符 + if !strings.Contains(pattern, "*") { + return path == pattern + } + + // 简单的前缀匹配:/api/users/* 匹配 /api/users/123 + if prefix, ok := strings.CutSuffix(pattern, "*"); ok { + return strings.HasPrefix(path, prefix) + } + + // 中间通配符:/api/*/users 匹配 /api/v1/users + parts := strings.Split(pattern, "*") + if len(parts) == 2 { + return strings.HasPrefix(path, parts[0]) && strings.HasSuffix(path, parts[1]) + } + + // 复杂模式不支持,返回 false + return false +} + +// PurgeRequest 缓存清理请求结构。 +type PurgeRequest struct { + // Path 精确路径 + Path string `json:"path,omitempty"` + + // Pattern 通配符模式(支持 * 通配符) + Pattern string `json:"pattern,omitempty"` + + // Method HTTP 方法,默认 "GET" + Method string `json:"method,omitempty"` +} + +// PurgeResponse 缓存清理响应结构。 +type PurgeResponse struct { + // Deleted 被删除的缓存条目数 + Deleted int `json:"deleted"` +} diff --git a/internal/cache/purge.go b/internal/cache/purge.go deleted file mode 100644 index e4213fc..0000000 --- a/internal/cache/purge.go +++ /dev/null @@ -1,268 +0,0 @@ -// Package cache 提供文件缓存和代理缓存功能,支持 LRU 淘汰和缓存锁防击穿。 -// -// 该文件实现了缓存清理 API,用于主动清理代理缓存。 -// -// 主要功能: -// - 精确路径清理:删除指定路径的缓存条目 -// - 通配符模式清理:按模式批量删除缓存条目 -// - IP 白名单访问控制 -// - Token 认证支持 -// -// 作者:xfy -package cache - -import ( - "encoding/json" - "hash/fnv" - "net" - "strings" - - "github.com/valyala/fasthttp" - "rua.plus/lolly/internal/config" - "rua.plus/lolly/internal/utils" -) - -// PurgeAPI 缓存清理 API 处理器。 -// -// 提供 HTTP API 用于主动清理代理缓存,支持精确路径和通配符模式清理。 -// -// 注意事项: -// - 所有方法均为并发安全 -// - 支持 IP 白名单和 Token 认证 -// - 仅处理 POST 请求 -type PurgeAPI struct { - cache *ProxyCache - auth config.CacheAPIAuthConfig - path string - allowed []net.IPNet -} - -// PurgeRequest 清理请求结构。 -type PurgeRequest struct { - // Path 精确路径 - Path string `json:"path,omitempty"` - - // Pattern 通配符模式(支持 * 通配符) - Pattern string `json:"pattern,omitempty"` - - // Method HTTP 方法,默认 "GET" - Method string `json:"method,omitempty"` -} - -// PurgeResponse 清理响应结构。 -type PurgeResponse struct { - // Deleted 被删除的缓存条目数 - Deleted int `json:"deleted"` -} - -// PurgeErrorResponse 错误响应结构。 -type PurgeErrorResponse struct { - // Error 错误信息 - Error string `json:"error"` -} - -// NewPurgeAPI 创建缓存清理 API 处理器。 -// -// 参数: -// - cache: 代理缓存实例 -// - cfg: 缓存 API 配置 -// -// 返回值: -// - *PurgeAPI: 配置好的处理器 -// - error: IP 解析失败时返回非 nil 错误 -func NewPurgeAPI(cache *ProxyCache, cfg *config.CacheAPIConfig) (*PurgeAPI, error) { - p := &PurgeAPI{ - cache: cache, - auth: cfg.Auth, - path: cfg.Path, - } - - // 解析允许的 IP 列表 - for _, cidr := range cfg.Allow { - _, network, err := net.ParseCIDR(cidr) - if err != nil { - // 尝试作为单个 IP 解析 - ip := net.ParseIP(cidr) - if ip == nil { - return nil, err - } - // 转换为 CIDR 格式 - if ip.To4() != nil { - _, network, _ = net.ParseCIDR(cidr + "/32") - } else { - _, network, _ = net.ParseCIDR(cidr + "/128") - } - } - if network != nil { - p.allowed = append(p.allowed, *network) - } - } - - return p, nil -} - -// Path 返回 API 端点路径。 -func (p *PurgeAPI) Path() string { - if p.path == "" { - return "/_cache/purge" - } - return p.path -} - -// ServeHTTP 处理缓存清理请求。 -// -// 仅处理 POST 请求,支持精确路径和通配符模式清理。 -// 返回 JSON 格式的响应。 -func (p *PurgeAPI) ServeHTTP(ctx *fasthttp.RequestCtx) { - // 仅允许 POST 方法 - if string(ctx.Method()) != "POST" { - utils.SendJSONError(ctx, fasthttp.StatusMethodNotAllowed, "method not allowed") - return - } - - // 检查 IP 访问权限 - if !utils.CheckIPAccess(ctx, p.allowed) { - utils.SendJSONError(ctx, fasthttp.StatusForbidden, "forbidden") - return - } - - // 检查认证 - if !utils.CheckTokenAuth(ctx, p.auth) { - utils.SendJSONError(ctx, fasthttp.StatusUnauthorized, "unauthorized") - return - } - - // 解析请求体 - var req PurgeRequest - if err := json.Unmarshal(ctx.PostBody(), &req); err != nil { - utils.SendJSONError(ctx, fasthttp.StatusBadRequest, "invalid request body") - return - } - - // 执行清理 - deleted := 0 - if req.Path != "" { - deleted = p.purgeByPath(req.Path) - } else if req.Pattern != "" { - deleted = p.purgeByPattern(req.Pattern) - } else { - utils.SendJSONError(ctx, fasthttp.StatusBadRequest, "missing path or pattern") - return - } - - // 返回响应 - ctx.SetContentType("application/json; charset=utf-8") - ctx.SetStatusCode(fasthttp.StatusOK) - _ = json.NewEncoder(ctx).Encode(PurgeResponse{Deleted: deleted}) -} - -// purgeByPath 按精确路径清理缓存。 -func (p *PurgeAPI) purgeByPath(path string) int { - if p.cache == nil { - return 0 - } - - // 计算缓存键的哈希值 - hashKey := hashPath(path) - - // 尝试删除 - p.cache.mu.Lock() - defer p.cache.mu.Unlock() - - if _, ok := p.cache.entries[hashKey]; ok { - delete(p.cache.entries, hashKey) - return 1 - } - - return 0 -} - -// purgeByPattern 按通配符模式清理缓存。 -func (p *PurgeAPI) purgeByPattern(pattern string) int { - if p.cache == nil { - return 0 - } - - p.cache.mu.Lock() - defer p.cache.mu.Unlock() - - deleted := 0 - for hashKey, entry := range p.cache.entries { - if matchPattern(pattern, entry.OrigKey) { - delete(p.cache.entries, hashKey) - deleted++ - } - } - - return deleted -} - -// HashPathWithMethod 使用 FNV-64a 计算缓存键的哈希值。 -// method 为空时默认使用 "GET"。 -func HashPathWithMethod(path string, method string) uint64 { - if method == "" { - method = "GET" - } - key := method + ":" + path - h := fnv.New64a() - h.Write([]byte(key)) - return h.Sum64() -} - -// hashPath 使用 FNV-64a 计算路径的哈希值。 -// 与代理层 buildCacheKeyHash 使用相同的算法,确保一致性。 -// 注意:代理层的 key 格式为 "METHOD:URI",purge 时默认使用 GET 方法。 -func hashPath(path string) uint64 { - return HashPathWithMethod(path, "GET") -} - -// MatchPattern 检查路径是否匹配通配符模式。 -// -// 支持以下匹配模式: -// - "*":匹配所有路径 -// - 以 "*" 结尾:前缀匹配(如 "/api/*" 匹配 "/api/xxx") -// - 以 "/" 结尾:目录前缀匹配 -// - 中间通配符:"/api/*/users" 匹配 "/api/v1/users" -// - 其他:精确匹配 -// -// 参数: -// - pattern: 匹配模式,支持通配符 -// - path: 待检查的路径 -// -// 返回值: -// - bool: true 表示匹配,false 表示不匹配 -func MatchPattern(pattern, path string) bool { - // 特殊情况:* 匹配所有 - if pattern == "*" { - return true - } - - // 目录前缀匹配(pattern 以 / 结尾) - if strings.HasSuffix(pattern, "/") { - return strings.HasPrefix(path, pattern) - } - - // 检查是否有通配符 - if !strings.Contains(pattern, "*") { - return path == pattern - } - - // 简单的前缀匹配:/api/users/* 匹配 /api/users/123 - if prefix, ok := strings.CutSuffix(pattern, "*"); ok { - return strings.HasPrefix(path, prefix) - } - - // 中间通配符:/api/*/users 匹配 /api/v1/users - parts := strings.Split(pattern, "*") - if len(parts) == 2 { - return strings.HasPrefix(path, parts[0]) && strings.HasSuffix(path, parts[1]) - } - - // 复杂模式不支持,返回 false - return false -} - -// matchPattern 是 MatchPattern 的内部别名,保持向后兼容。 -func matchPattern(pattern, path string) bool { - return MatchPattern(pattern, path) -} diff --git a/internal/cache/purge_test.go b/internal/cache/purge_test.go deleted file mode 100644 index ba149a9..0000000 --- a/internal/cache/purge_test.go +++ /dev/null @@ -1,763 +0,0 @@ -// Package cache 提供缓存清理 API 的测试。 -// -// 该文件测试 purge.go 中的各项功能,包括: -// - PurgeAPI 创建和配置 -// - Path() 默认和自定义路径 -// - ServeHTTP 完整请求处理 -// - IP 白名单访问控制 -// - Token 认证 -// - 按路径和模式清理缓存 -// - HashPathWithMethod 哈希计算 -// - MatchPattern 通配符匹配 -// -// 作者:xfy -package cache - -import ( - "encoding/json" - "net" - "testing" - "time" - - "github.com/valyala/fasthttp" - "rua.plus/lolly/internal/config" -) - -func TestHashPathWithMethod(t *testing.T) { - t.Run("GET method default", func(t *testing.T) { - h1 := HashPathWithMethod("/api/users", "") - h2 := HashPathWithMethod("/api/users", "GET") - if h1 != h2 { - t.Errorf("Expected empty method to default to GET, got %d vs %d", h1, h2) - } - }) - - t.Run("different methods different hashes", func(t *testing.T) { - hGet := HashPathWithMethod("/api/users", "GET") - hPost := HashPathWithMethod("/api/users", "POST") - if hGet == hPost { - t.Error("Expected different hashes for different methods") - } - }) - - t.Run("different paths different hashes", func(t *testing.T) { - h1 := HashPathWithMethod("/api/users", "GET") - h2 := HashPathWithMethod("/api/posts", "GET") - if h1 == h2 { - t.Error("Expected different hashes for different paths") - } - }) -} - -func TestMatchPattern(t *testing.T) { - tests := []struct { - name string - pattern string - path string - want bool - }{ - // Wildcard - {"wildcard matches all", "*", "/anything/goes", true}, - {"wildcard matches empty", "*", "/", true}, - - // Prefix with trailing * - {"prefix star match", "/api/*", "/api/users", true}, - {"prefix star match nested", "/api/*", "/api/v1/users", true}, - {"prefix star no match", "/api/*", "/other/path", false}, - {"prefix star match base", "/api/*", "/api/", true}, - - // Directory prefix (pattern ends with /) - {"dir prefix match", "/api/", "/api/users", true}, - {"dir prefix no match", "/api/", "/other/path", false}, - - // Exact match - {"exact match", "/api/users", "/api/users", true}, - {"exact no match", "/api/users", "/api/users/extra", false}, - - // Middle wildcard - {"middle wildcard", "/api/*/users", "/api/v1/users", true}, - {"middle wildcard no match prefix", "/api/*/users", "/other/v1/users", false}, - {"middle wildcard no match suffix", "/api/*/users", "/api/v1/posts", false}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := MatchPattern(tt.pattern, tt.path) - if result != tt.want { - t.Errorf("MatchPattern(%s, %s) = %v, want %v", tt.pattern, tt.path, result, tt.want) - } - }) - } -} - -func TestNewPurgeAPI(t *testing.T) { - t.Run("nil cache", func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Path: "/_cache/purge", - Allow: []string{}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if api == nil { - t.Fatal("expected non-nil PurgeAPI") - } - if api.cache != nil { - t.Error("expected nil cache") - } - }) - - t.Run("with cache", func(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Path: "/custom/purge", - Allow: []string{"127.0.0.1"}, - Auth: config.CacheAPIAuthConfig{ - Type: "token", - Token: "test-token", - }, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if api.cache != pc { - t.Error("expected cache to match") - } - if api.path != "/custom/purge" { - t.Errorf("expected path /custom/purge, got %s", api.path) - } - if api.auth.Token != "test-token" { - t.Errorf("expected token test-token, got %s", api.auth.Token) - } - }) - - t.Run("CIDR parsing", func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{"10.0.0.0/8", "172.16.0.0/12"}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if len(api.allowed) != 2 { - t.Errorf("expected 2 allowed networks, got %d", len(api.allowed)) - } - }) - - t.Run("single IP converted to CIDR", func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{"192.168.1.100"}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if len(api.allowed) != 1 { - t.Fatalf("expected 1 allowed network, got %d", len(api.allowed)) - } - if api.allowed[0].String() != "192.168.1.100/32" { - t.Errorf("expected 192.168.1.100/32, got %s", api.allowed[0].String()) - } - }) - - t.Run("single IPv6 converted to CIDR", func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{"::1"}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if len(api.allowed) != 1 { - t.Fatalf("expected 1 allowed network, got %d", len(api.allowed)) - } - if api.allowed[0].String() != "::1/128" { - t.Errorf("expected ::1/128, got %s", api.allowed[0].String()) - } - }) - - t.Run("invalid IP returns error", func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{"not-an-ip"}, - } - _, err := NewPurgeAPI(nil, cfg) - if err == nil { - t.Error("expected error for invalid IP, got nil") - } - }) - - t.Run("mixed valid and CIDR", func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{"10.0.0.0/8", "192.168.1.1"}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if len(api.allowed) != 2 { - t.Errorf("expected 2 allowed networks, got %d", len(api.allowed)) - } - }) -} - -func TestPurgeAPI_Path(t *testing.T) { - tests := []struct { - name string - cfgPath string - wantPath string - }{ - {"default path", "", "/_cache/purge"}, - {"custom path", "/api/purge", "/api/purge"}, - {"custom path with version", "/api/v1/cache/purge", "/api/v1/cache/purge"}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Path: tt.cfgPath, - Allow: []string{}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if api.Path() != tt.wantPath { - t.Errorf("expected path %s, got %s", tt.wantPath, api.Path()) - } - }) - } -} - -func TestPurgeAPI_ServeHTTP_MethodNotAllowed(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - tests := []string{"GET", "PUT", "DELETE", "PATCH", "OPTIONS"} - for _, method := range tests { - t.Run(method, func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod(method) - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusMethodNotAllowed { - t.Errorf("expected status 405, got %d", ctx.Response.StatusCode()) - } - }) - } -} - -func TestPurgeAPI_ServeHTTP_AccessForbidden(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{"10.0.0.0/8"}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/test"}`) - // Set RemoteAddr to 192.168.1.1 (not in 10.0.0.0/8) - ctx.SetRemoteAddr(&net.TCPAddr{IP: net.ParseIP("192.168.1.1"), Port: 12345}) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusForbidden { - t.Errorf("expected status 403, got %d", ctx.Response.StatusCode()) - } -} - -func TestPurgeAPI_ServeHTTP_Unauthorized(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{"127.0.0.1"}, - Auth: config.CacheAPIAuthConfig{ - Type: "token", - Token: "secret", - }, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/test"}`) - ctx.SetRemoteAddr(&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 12345}) - // No Authorization header - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusUnauthorized { - t.Errorf("expected status 401, got %d", ctx.Response.StatusCode()) - } -} - -func TestPurgeAPI_ServeHTTP_BadRequest(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - t.Run("invalid JSON", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{invalid json}`) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusBadRequest { - t.Errorf("expected status 400, got %d", ctx.Response.StatusCode()) - } - }) - - t.Run("missing path and pattern", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{}`) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusBadRequest { - t.Errorf("expected status 400, got %d", ctx.Response.StatusCode()) - } - }) -} - -func TestPurgeAPI_ServeHTTP_PurgeByPath(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - key := "GET:/api/test" - pc.Set(hashKey(key), key, []byte("data"), nil, 200, 10*60*time.Second) - - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/api/test"}`) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Fatalf("expected status 200, got %d", ctx.Response.StatusCode()) - } - - var resp PurgeResponse - if err := json.Unmarshal(ctx.Response.Body(), &resp); err != nil { - t.Fatalf("failed to parse response: %v", err) - } - if resp.Deleted != 1 { - t.Errorf("expected 1 deleted, got %d", resp.Deleted) - } - - // Verify cache is gone - _, ok, _ := pc.Get(hashKey(key), key) - if ok { - t.Error("expected cache entry to be purged") - } -} - -func TestPurgeAPI_ServeHTTP_PurgeByPath_NotFound(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/nonexistent"}`) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Fatalf("expected status 200, got %d", ctx.Response.StatusCode()) - } - - var resp PurgeResponse - if err := json.Unmarshal(ctx.Response.Body(), &resp); err != nil { - t.Fatalf("failed to parse response: %v", err) - } - if resp.Deleted != 0 { - t.Errorf("expected 0 deleted, got %d", resp.Deleted) - } -} - -func TestPurgeAPI_ServeHTTP_PurgeByPattern(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - pc.Set(hashKey("GET:/api/users"), "GET:/api/users", []byte("users"), nil, 200, 10*60*time.Second) - pc.Set(hashKey("GET:/api/posts"), "GET:/api/posts", []byte("posts"), nil, 200, 10*60*time.Second) - pc.Set(hashKey("GET:/static/css"), "GET:/static/css", []byte("css"), nil, 200, 10*60*time.Second) - - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"pattern": "GET:/api/*"}`) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Fatalf("expected status 200, got %d", ctx.Response.StatusCode()) - } - - var resp PurgeResponse - if err := json.Unmarshal(ctx.Response.Body(), &resp); err != nil { - t.Fatalf("failed to parse response: %v", err) - } - if resp.Deleted != 2 { - t.Errorf("expected 2 deleted (api/users and api/posts), got %d", resp.Deleted) - } - - // Verify /static/css is still there - _, ok, _ := pc.Get(hashKey("GET:/static/css"), "GET:/static/css") - if !ok { - t.Error("expected /static/css to still exist") - } -} - -func TestPurgeAPI_ServeHTTP_PurgeByPattern_Wildcard(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - pc.Set(hashKey("GET:/a"), "GET:/a", []byte("a"), nil, 200, 10*60*time.Second) - pc.Set(hashKey("GET:/b"), "GET:/b", []byte("b"), nil, 200, 10*60*time.Second) - - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"pattern": "*"}`) - - api.ServeHTTP(ctx) - - var resp PurgeResponse - if err := json.Unmarshal(ctx.Response.Body(), &resp); err != nil { - t.Fatalf("failed to parse response: %v", err) - } - if resp.Deleted != 2 { - t.Errorf("expected 2 deleted, got %d", resp.Deleted) - } -} - -func TestPurgeAPI_ServeHTTP_PurgeByPattern_DirPrefix(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - pc.Set(hashKey("GET:/api/v1/users"), "GET:/api/v1/users", []byte("u"), nil, 200, 10*60*time.Second) - pc.Set(hashKey("GET:/api/v2/posts"), "GET:/api/v2/posts", []byte("p"), nil, 200, 10*60*time.Second) - pc.Set(hashKey("GET:/other"), "GET:/other", []byte("o"), nil, 200, 10*60*time.Second) - - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"pattern": "GET:/api/"}`) - - api.ServeHTTP(ctx) - - var resp PurgeResponse - if err := json.Unmarshal(ctx.Response.Body(), &resp); err != nil { - t.Fatalf("failed to parse response: %v", err) - } - if resp.Deleted != 2 { - t.Errorf("expected 2 deleted, got %d", resp.Deleted) - } -} - -func TestPurgeAPI_ServeHTTP_ContentType(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - t.Run("success response content type", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/test"}`) - api.ServeHTTP(ctx) - - ct := string(ctx.Response.Header.Peek("Content-Type")) - if ct != "application/json; charset=utf-8" { - t.Errorf("expected content-type application/json; charset=utf-8, got %s", ct) - } - }) - - t.Run("error response content type", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("GET") - api.ServeHTTP(ctx) - - ct := string(ctx.Response.Header.Peek("Content-Type")) - if ct != "application/json; charset=utf-8" { - t.Errorf("expected content-type application/json; charset=utf-8, got %s", ct) - } - }) -} - -func TestPurgeAPI_ServeHTTP_AccessAllowed(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{"10.0.0.0/8"}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/api/test"}`) - ctx.SetRemoteAddr(&net.TCPAddr{IP: net.ParseIP("10.1.2.3"), Port: 12345}) - - api.ServeHTTP(ctx) - - // Should succeed (access allowed, no auth required) - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Errorf("expected status 200, got %d", ctx.Response.StatusCode()) - } -} - -func TestPurgeAPI_ServeHTTP_TokenAuth(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - Auth: config.CacheAPIAuthConfig{ - Type: "token", - Token: "my-secret", - }, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - t.Run("Bearer token", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/test"}`) - ctx.Request.Header.Set("Authorization", "Bearer my-secret") - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Errorf("expected status 200, got %d", ctx.Response.StatusCode()) - } - }) - - t.Run("Direct token", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/test"}`) - ctx.Request.Header.Set("Authorization", "my-secret") - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Errorf("expected status 200, got %d", ctx.Response.StatusCode()) - } - }) -} - -func TestPurgeAPI_ServeHTTP_AuthTypeNone(t *testing.T) { - pc := NewProxyCache(nil, false, 0, 0, 0) - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - Auth: config.CacheAPIAuthConfig{ - Type: "none", - }, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/test"}`) - - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusOK { - t.Errorf("expected status 200, got %d", ctx.Response.StatusCode()) - } -} - -func TestPurgeAPI_PurgeByPath_NilCache(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // Directly test purgeByPath with nil cache - result := api.purgeByPath("/test") - if result != 0 { - t.Errorf("expected 0 deleted with nil cache, got %d", result) - } -} - -func TestPurgeAPI_PurgeByPattern_NilCache(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // Directly test purgeByPattern with nil cache - result := api.purgeByPattern("*") - if result != 0 { - t.Errorf("expected 0 deleted with nil cache, got %d", result) - } -} - -func TestPurgeAPI_ErrorResponse(t *testing.T) { - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(nil, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - t.Run("method not allowed", func(t *testing.T) { - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("DELETE") - api.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusMethodNotAllowed { - t.Fatalf("expected status 405, got %d", ctx.Response.StatusCode()) - } - - var errResp PurgeErrorResponse - if err := json.Unmarshal(ctx.Response.Body(), &errResp); err != nil { - t.Fatalf("failed to parse error response: %v", err) - } - if errResp.Error != "method not allowed" { - t.Errorf("expected error 'method not allowed', got %s", errResp.Error) - } - }) - - t.Run("forbidden", func(t *testing.T) { - cfg2 := &config.CacheAPIConfig{ - Allow: []string{"10.0.0.0/8"}, - } - api2, err := NewPurgeAPI(nil, cfg2) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.SetRemoteAddr(&net.TCPAddr{IP: net.ParseIP("192.168.1.1"), Port: 12345}) - api2.ServeHTTP(ctx) - - if ctx.Response.StatusCode() != fasthttp.StatusForbidden { - t.Fatalf("expected status 403, got %d", ctx.Response.StatusCode()) - } - - var errResp PurgeErrorResponse - if err := json.Unmarshal(ctx.Response.Body(), &errResp); err != nil { - t.Fatalf("failed to parse error response: %v", err) - } - if errResp.Error != "forbidden" { - t.Errorf("expected error 'forbidden', got %s", errResp.Error) - } - }) -} - -func TestPurgeAPI_PurgeByPath_WrongMethod(t *testing.T) { - // Test that hashPath only uses GET, so purging a POST-cached entry won't work - pc := NewProxyCache(nil, false, 0, 0, 0) - // Set a cache entry with GET:/api/test key - pc.Set(hashKey("GET:/api/test"), "GET:/api/test", []byte("data"), nil, 200, 10*60*time.Second) - - cfg := &config.CacheAPIConfig{ - Allow: []string{}, - } - api, err := NewPurgeAPI(pc, cfg) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ctx := &fasthttp.RequestCtx{} - ctx.Request.Header.SetMethod("POST") - ctx.Request.SetBodyString(`{"path": "/api/test"}`) - - api.ServeHTTP(ctx) - - var resp PurgeResponse - if err := json.Unmarshal(ctx.Response.Body(), &resp); err != nil { - t.Fatalf("failed to parse response: %v", err) - } - if resp.Deleted != 1 { - t.Errorf("expected 1 deleted, got %d", resp.Deleted) - } -} - -func TestMatchPattern_ComplexPatterns(t *testing.T) { - t.Run("multiple wildcards not supported", func(t *testing.T) { - // Pattern with multiple * in the middle is not supported - result := MatchPattern("/api/*/users/*", "/api/v1/users/123") - if result { - t.Error("expected complex pattern with multiple wildcards to return false") - } - }) - - t.Run("pattern without wildcard exact match", func(t *testing.T) { - result := MatchPattern("/api/users", "/api/users") - if !result { - t.Error("expected exact match") - } - }) - - t.Run("pattern without wildcard no match", func(t *testing.T) { - result := MatchPattern("/api/users", "/api/users/123") - if result { - t.Error("expected no match for non-exact") - } - }) -} diff --git a/internal/cache/tiered_cache.go b/internal/cache/tiered_cache.go deleted file mode 100644 index 7228233..0000000 --- a/internal/cache/tiered_cache.go +++ /dev/null @@ -1,264 +0,0 @@ -// Package cache 提供文件缓存和代理缓存功能。 -// -// 该文件实现 TieredCache 分层缓存,支持: -// - L1 内存缓存(热点数据) -// - L2 磁盘缓存(持久化) -// - 热点提升机制 -// - L2 过期检查 -// -// 主要用途: -// -// 作为代理缓存的主要实现,平衡性能(L1)和容量(L2)。 -// -// 作者:xfy -package cache - -import ( - "sync" - "sync/atomic" - "time" -) - -// TieredCacheConfig 分层缓存配置。 -type TieredCacheConfig struct { - // L1 配置 - L1MaxEntries int64 - L1MaxSize int64 - - // L2 配置 - L2Config *DiskCacheConfig - - // Stale 配置 - StaleIfError time.Duration // 错误时使用过期缓存的窗口 - StaleIfTimeout time.Duration // 超时时使用过期缓存的窗口 - - // 热点提升配置 - PromoteThreshold int // 访问次数阈值,超过后提升到 L1 - PromoteInterval time.Duration // 提升检查间隔 -} - -// TieredCache 分层缓存(L1 内存 + L2 磁盘)。 -type TieredCache struct { - l1 *ProxyCache // L1 内存缓存(热点数据) - l2 *DiskCache // L2 磁盘缓存(持久化) - l1Ratio float64 // L1 容量占 L2 的比例 - promoter *promoter // 热点提升器 - stopCh chan struct{} - - // 统计 - l1Hits atomic.Int64 - l2Hits atomic.Int64 - misses atomic.Int64 - promotes atomic.Int64 -} - -// promoter 热点提升器。 -type promoter struct { - threshold int // 访问次数阈值 - interval time.Duration // 检查间隔 - accessMap map[uint64]*accessInfo - mu sync.RWMutex -} - -// accessInfo 访问信息。 -type accessInfo struct { - count int - lastAccess time.Time - origKey string -} - -// NewTieredCache 创建分层缓存实例。 -func NewTieredCache(cfg *TieredCacheConfig) (*TieredCache, error) { - // 创建 L1 内存缓存 - l1 := NewProxyCache(nil, true, 0, cfg.StaleIfError, cfg.StaleIfTimeout) - - // 创建 L2 磁盘缓存 - l2, err := NewDiskCache(cfg.L2Config) - if err != nil { - return nil, err - } - - tc := &TieredCache{ - l1: l1, - l2: l2, - l1Ratio: 0.1, // 默认 L1 占 10% - promoter: &promoter{ - threshold: cfg.PromoteThreshold, - interval: cfg.PromoteInterval, - accessMap: make(map[uint64]*accessInfo), - }, - stopCh: make(chan struct{}), - } - - // 启动热点提升检查 - if tc.promoter.threshold > 0 { - go tc.promoteLoop() - } - - return tc, nil -} - -// Get 获取缓存条目(实现 CacheBackend 接口)。 -func (tc *TieredCache) Get(hashKey uint64, origKey string) (*ProxyCacheEntry, bool, bool) { - // 1. 先查 L1 - entry, exists, stale := tc.l1.Get(hashKey, origKey) - if exists { - tc.l1Hits.Add(1) - return entry, true, stale - } - - // 2. 查 L2,必须验证 max_age - entry, exists, stale = tc.l2.Get(hashKey, origKey) - if !exists { - tc.misses.Add(1) - return nil, false, false - } - - tc.l2Hits.Add(1) - - // 3. 记录访问(用于热点提升) - tc.recordAccess(hashKey, origKey) - - // 4. L2 命中且未过期,异步提升到 L1 - if !stale { - go tc.promoteToL1(hashKey, entry) - } - - return entry, true, stale -} - -// GetStale 在上游错误时获取可用的过期缓存。 -// -// 先查 L1,再查 L2。 -func (tc *TieredCache) GetStale(hashKey uint64, origKey string, isTimeout bool) (*ProxyCacheEntry, bool) { - // 1. 先查 L1 - if entry, ok := tc.l1.GetStale(hashKey, origKey, isTimeout); ok { - tc.l1Hits.Add(1) - return entry, true - } - - // 2. 查 L2 - if entry, ok := tc.l2.GetStale(hashKey, origKey, isTimeout); ok { - tc.l2Hits.Add(1) - return entry, true - } - - tc.misses.Add(1) - return nil, false -} - -// Set 设置缓存条目(实现 CacheBackend 接口)。 -func (tc *TieredCache) Set(hashKey uint64, origKey string, data []byte, headers map[string]string, status int, maxAge time.Duration) { - // 同时写入 L1 和 L2 - tc.l1.Set(hashKey, origKey, data, headers, status, maxAge) - - // L2 异步写入(在 goroutine 中执行) - go tc.l2.Set(hashKey, origKey, data, headers, status, maxAge) -} - -// Delete 删除缓存条目(实现 CacheBackend 接口)。 -func (tc *TieredCache) Delete(hashKey uint64) error { - _ = tc.l1.Delete(hashKey) - return tc.l2.Delete(hashKey) -} - -// CacheStats 返回缓存统计信息(实现 CacheBackend 接口)。 -func (tc *TieredCache) CacheStats() CacheStats { - l1Stats := tc.l1.CacheStats() - l2Stats := tc.l2.CacheStats() - - return CacheStats{ - Entries: l1Stats.Entries + l2Stats.Entries, - Size: l1Stats.Size + l2Stats.Size, - HitCount: tc.l1Hits.Load() + tc.l2Hits.Load(), - MissCount: tc.misses.Load(), - Evictions: l2Stats.Evictions, - } -} - -// TieredCacheStats 返回分层缓存详细统计。 -func (tc *TieredCache) TieredCacheStats() TieredCacheStats { - return TieredCacheStats{ - L1Hits: tc.l1Hits.Load(), - L2Hits: tc.l2Hits.Load(), - Misses: tc.misses.Load(), - Promotes: tc.promotes.Load(), - L1Entries: tc.l1.CacheStats().Entries, - L2Entries: tc.l2.CacheStats().Entries, - } -} - -// TieredCacheStats 分层缓存详细统计。 -type TieredCacheStats struct { - L1Hits int64 - L2Hits int64 - Misses int64 - Promotes int64 - L1Entries int64 - L2Entries int64 -} - -// Stop 停止分层缓存。 -func (tc *TieredCache) Stop() { - close(tc.stopCh) - if tc.l2 != nil { - tc.l2.Stop() - } -} - -// recordAccess 记录访问(用于热点提升)。 -func (tc *TieredCache) recordAccess(hashKey uint64, origKey string) { - if tc.promoter.threshold <= 0 { - return - } - - tc.promoter.mu.Lock() - defer tc.promoter.mu.Unlock() - - info, exists := tc.promoter.accessMap[hashKey] - if !exists { - info = &accessInfo{origKey: origKey} - tc.promoter.accessMap[hashKey] = info - } - info.count++ - info.lastAccess = time.Now() -} - -// promoteToL1 提升条目到 L1。 -func (tc *TieredCache) promoteToL1(hashKey uint64, entry *ProxyCacheEntry) { - tc.l1.Set(hashKey, entry.OrigKey, entry.Data, entry.Headers, entry.Status, entry.MaxAge) - tc.promotes.Add(1) -} - -// promoteLoop 热点提升检查循环。 -func (tc *TieredCache) promoteLoop() { - ticker := time.NewTicker(tc.promoter.interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - tc.checkAndPromote() - case <-tc.stopCh: - return - } - } -} - -// checkAndPromote 检查并提升热点数据。 -func (tc *TieredCache) checkAndPromote() { - tc.promoter.mu.Lock() - defer tc.promoter.mu.Unlock() - - for hashKey, info := range tc.promoter.accessMap { - if info.count >= tc.promoter.threshold { - // 从 L2 获取并提升到 L1 - entry, exists, _ := tc.l2.Get(hashKey, info.origKey) - if exists { - tc.promoteToL1(hashKey, entry) - } - // 重置计数 - info.count = 0 - } - } -} diff --git a/internal/cache/tiered_cache_test.go b/internal/cache/tiered_cache_test.go deleted file mode 100644 index 578f513..0000000 --- a/internal/cache/tiered_cache_test.go +++ /dev/null @@ -1,431 +0,0 @@ -package cache - -import ( - "testing" - "time" -) - -func TestNewTieredCache(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - PromoteThreshold: 3, - PromoteInterval: 100 * time.Millisecond, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - // 等待 L2 懒加载完成 - <-tc.l2.loadCh - - if tc.l1 == nil { - t.Error("l1 should not be nil") - } - if tc.l2 == nil { - t.Error("l2 should not be nil") - } -} - -func TestTieredCacheSetGet(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 设置缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("test response data") - - tc.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) - - // 等待 L2 异步写入完成 - time.Sleep(50 * time.Millisecond) - - // 获取缓存(应该从 L1 获取) - entry, exists, stale := tc.Get(hashKey, origKey) - if !exists { - t.Fatal("cache entry not found") - } - if stale { - t.Error("entry should not be stale") - } - if string(entry.Data) != string(data) { - t.Errorf("Data = %q, want %q", entry.Data, data) - } - - // 验证 L1 命中 - stats := tc.TieredCacheStats() - if stats.L1Hits != 1 { - t.Errorf("L1Hits = %d, want 1", stats.L1Hits) - } -} - -func TestTieredCacheL2Fallback(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 直接写入 L2(绕过 L1) - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("test from l2") - - tc.l2.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) - time.Sleep(50 * time.Millisecond) - - // 获取缓存(应该从 L2 获取) - entry, exists, stale := tc.Get(hashKey, origKey) - if !exists { - t.Fatal("cache entry not found") - } - if stale { - t.Error("entry should not be stale") - } - if string(entry.Data) != string(data) { - t.Errorf("Data = %q, want %q", entry.Data, data) - } - - // 验证 L2 命中 - stats := tc.TieredCacheStats() - if stats.L2Hits != 1 { - t.Errorf("L2Hits = %d, want 1", stats.L2Hits) - } -} - -func TestTieredCacheDelete(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 设置缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - tc.Set(hashKey, origKey, []byte("test"), nil, 200, 10*time.Minute) - time.Sleep(50 * time.Millisecond) - - // 删除 - if err := tc.Delete(hashKey); err != nil { - t.Fatalf("Delete failed: %v", err) - } - - // 验证 L1 和 L2 都已删除 - _, exists, _ := tc.l1.Get(hashKey, origKey) - if exists { - t.Error("entry should not exist in L1 after delete") - } - - _, exists, _ = tc.l2.Get(hashKey, origKey) - if exists { - t.Error("entry should not exist in L2 after delete") - } -} - -func TestTieredCacheStale(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 设置一个已过期的缓存 - hashKey := uint64(12345) - origKey := "GET:/api/test" - tc.l2.Set(hashKey, origKey, []byte("test"), nil, 200, 1*time.Millisecond) - - // 等待过期 - time.Sleep(10 * time.Millisecond) - - // 获取缓存 - _, exists, stale := tc.Get(hashKey, origKey) - if !exists { - t.Fatal("expired entry should still exist") - } - if !stale { - t.Error("expired entry should be marked as stale") - } -} - -func TestTieredCachePromote(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - PromoteThreshold: 2, - PromoteInterval: 50 * time.Millisecond, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 直接写入 L2 - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("test data") - tc.l2.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) - time.Sleep(50 * time.Millisecond) - - // 访问两次(达到阈值) - tc.Get(hashKey, origKey) - tc.Get(hashKey, origKey) - - // 等待提升检查 - time.Sleep(100 * time.Millisecond) - - // 验证提升发生 - stats := tc.TieredCacheStats() - if stats.Promotes == 0 { - t.Error("promotes should be > 0 after reaching threshold") - } -} - -func TestTieredCacheStats(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 设置缓存 - tc.Set(1, "key1", []byte("data1"), nil, 200, 10*time.Minute) - tc.Set(2, "key2", []byte("data2"), nil, 200, 10*time.Minute) - time.Sleep(50 * time.Millisecond) - - // 获取缓存 - tc.Get(1, "key1") // L1 命中 - tc.Get(1, "key1") // L1 命中 - tc.Get(999, "nonexistent") // 未命中 - - stats := tc.TieredCacheStats() - if stats.L1Hits != 2 { - t.Errorf("L1Hits = %d, want 2", stats.L1Hits) - } - if stats.Misses != 1 { - t.Errorf("Misses = %d, want 1", stats.Misses) - } -} - -func TestTieredCacheRestart(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - // 第一个实例:写入数据 - tc1, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - <-tc1.l2.loadCh - - hashKey := uint64(12345) - origKey := "GET:/api/test" - data := []byte("persistent data") - tc1.Set(hashKey, origKey, data, nil, 200, 10*time.Minute) - time.Sleep(50 * time.Millisecond) - tc1.Stop() - - // 第二个实例:读取数据(模拟重启) - tc2, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache (restart) failed: %v", err) - } - <-tc2.l2.loadCh - defer tc2.Stop() - - // 验证数据从 L2 恢复 - entry, exists, _ := tc2.Get(hashKey, origKey) - if !exists { - t.Fatal("entry should exist after restart") - } - if string(entry.Data) != string(data) { - t.Errorf("Data = %q, want %q", entry.Data, data) - } - - // 验证是从 L2 获取的 - stats := tc2.TieredCacheStats() - if stats.L2Hits != 1 { - t.Errorf("L2Hits = %d, want 1", stats.L2Hits) - } -} - -func TestTieredCacheCacheStats(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - <-tc.l2.loadCh - - // 设置缓存 - tc.Set(1, "key1", []byte("data1"), nil, 200, 10*time.Minute) - time.Sleep(50 * time.Millisecond) - - // 获取缓存统计 - stats := tc.CacheStats() - if stats.Entries < 1 { - t.Errorf("Entries = %d, should be >= 1", stats.Entries) - } -} - -func TestTieredCacheGetStaleL1Hit(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L1MaxEntries: 100, - L1MaxSize: 1024 * 1024, - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - StaleIfError: 200 * time.Millisecond, - StaleIfTimeout: 0, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - hashKey := uint64(54321) - origKey := "GET:/api/tiered" - tc.Set(hashKey, origKey, []byte("l1data"), nil, 200, 100*time.Millisecond) - - // 等待过期但仍在 stale_if_error 窗口内 - time.Sleep(150 * time.Millisecond) - - // isTimeout=false,应该从 L1 获取 stale 缓存 - entry, ok := tc.GetStale(hashKey, origKey, false) - if !ok { - t.Error("stale entry should be usable on error from L1") - } - if entry == nil || string(entry.Data) != "l1data" { - t.Errorf("entry.Data = %v, want %q", entry, "l1data") - } - - // isTimeout=true,staleIfTimeout=0,不应该可用 - if _, ok2 := tc.GetStale(hashKey, origKey, true); ok2 { - t.Error("stale entry should NOT be usable on timeout when staleIfTimeout=0") - } -} - -func TestTieredCacheGetStaleMiss(t *testing.T) { - tmpDir := t.TempDir() - - cfg := &TieredCacheConfig{ - L1MaxEntries: 100, - L1MaxSize: 1024 * 1024, - L2Config: &DiskCacheConfig{ - Path: tmpDir, - Levels: "1:2", - }, - StaleIfError: 200 * time.Millisecond, - StaleIfTimeout: 200 * time.Millisecond, - } - - tc, err := NewTieredCache(cfg) - if err != nil { - t.Fatalf("NewTieredCache failed: %v", err) - } - defer tc.Stop() - - // 不存在的 key - if _, ok := tc.GetStale(99999, "nonexistent", false); ok { - t.Error("should not find nonexistent key") - } - - if _, ok2 := tc.GetStale(99999, "nonexistent", true); ok2 { - t.Error("should not find nonexistent key on timeout") - } -} diff --git a/internal/config/loader.go b/internal/config/loader.go deleted file mode 100644 index bac2cdb..0000000 --- a/internal/config/loader.go +++ /dev/null @@ -1,155 +0,0 @@ -// Package config 提供 YAML 配置文件的解析、验证和默认配置生成功能。 -// -// 该文件包含配置加载器相关的核心逻辑,包括: -// - 多文件配置合并(include 指令支持) -// - DAG-safe 循环检测(防止配置文件循环引用) -// - Glob 模式展开(支持通配符路径) -// - 深度限制(防止无限递归) -// -// 主要用途: -// -// 用于加载主配置文件及其引用的子配置文件,合并为完整配置。 -// -// 注意事项: -// - 最大 include 深度为 10 层 -// - 循环引用会返回错误 -// - DAG 共享子配置允许(同一文件可被多处引用) -// -// 作者:xfy -package config - -import ( - "fmt" - "os" - "path/filepath" - - "gopkg.in/yaml.v3" -) - -const maxIncludeDepth = 10 - -// ConfigLoader 配置加载器 -type ConfigLoader struct { - loadedFiles map[string]bool // 所有已加载文件(用于跳过重复处理) - stack map[string]bool // 当前调用栈(用于 DAG 循环检测) - baseDir string - depth int -} - -// NewConfigLoader 构造函数 -func NewConfigLoader(mainConfigPath string) *ConfigLoader { - absPath, err := filepath.Abs(mainConfigPath) - if err != nil { - absPath = mainConfigPath - } - - return &ConfigLoader{ - baseDir: filepath.Dir(absPath), - loadedFiles: make(map[string]bool), - stack: make(map[string]bool), - depth: 0, - } -} - -// Load 加载配置(含 DAG-safe 循环检测) -func (l *ConfigLoader) Load(path string) (*Config, error) { - // 深度限制 - if l.depth > maxIncludeDepth { - return nil, fmt.Errorf("include depth exceeds maximum (%d)", maxIncludeDepth) - } - - absPath, err := filepath.Abs(path) - if err != nil { - return nil, fmt.Errorf("resolve path failed: %w", err) - } - - // DAG-safe 循环检测 - // 使用 stack 检测真正的循环(当前调用栈中的文件) - // 使用 loadedFiles 跳过已处理的文件(允许 DAG 共享子配置) - if l.stack[absPath] { - return nil, fmt.Errorf("circular include detected: '%s' is in current include chain", absPath) - } - - // 如果文件已处理过,跳过(不报错) - if l.loadedFiles[absPath] { - return &Config{}, nil // 返回空配置,跳过重复处理 - } - - l.stack[absPath] = true // 加入调用栈 - l.loadedFiles[absPath] = true // 标记已处理 - l.depth++ - - // 加载文件 - data, err := os.ReadFile(absPath) - if err != nil { - return nil, fmt.Errorf("read file failed: %w", err) - } - - var cfg Config - if err := yaml.Unmarshal(data, &cfg); err != nil { - return nil, fmt.Errorf("parse yaml failed: %w", err) - } - - // 处理 include - for _, inc := range cfg.Include { - files, err := l.expandGlob(inc.Path) - if err != nil { - return nil, err - } - - for _, f := range files { - subCfg, err := l.Load(f) // 递归 - if err != nil { - return nil, fmt.Errorf("include %s: %w", f, err) - } - - if err := l.merge(&cfg, subCfg, f); err != nil { - return nil, err - } - } - } - - // 清理调用栈 - delete(l.stack, absPath) - l.depth-- - return &cfg, nil -} - -// merge 合并配置 -func (l *ConfigLoader) merge(dst, src *Config, _ string) error { - // Server name collision(listen collision 由 validate.go 处理) - for _, newServer := range src.Servers { - for _, existing := range dst.Servers { - if newServer.Name == existing.Name { - return fmt.Errorf("server name collision: '%s'", newServer.Name) - } - } - } - dst.Servers = append(dst.Servers, src.Servers...) - - // Stream collision - for _, newStream := range src.Stream { - for _, existing := range dst.Stream { - if newStream.Listen == existing.Listen { - return fmt.Errorf("stream listen collision: '%s'", newStream.Listen) - } - } - } - dst.Stream = append(dst.Stream, src.Stream...) - - return nil -} - -// expandGlob 展开 glob 模式 -func (l *ConfigLoader) expandGlob(pattern string) ([]string, error) { - absPattern := l.resolvePath(pattern) - return filepath.Glob(absPattern) -} - -// resolvePath 解析路径 -func (l *ConfigLoader) resolvePath(path string) string { - if filepath.IsAbs(path) { - return path - } - return filepath.Join(l.baseDir, path) -} diff --git a/internal/config/loader_test.go b/internal/config/loader_test.go deleted file mode 100644 index 65d2298..0000000 --- a/internal/config/loader_test.go +++ /dev/null @@ -1,442 +0,0 @@ -// Package config 提供配置加载器测试。 -package config - -import ( - "os" - "path/filepath" - "testing" -) - -// TestNewConfigLoader 测试 ConfigLoader 构造函数。 -func TestNewConfigLoader(t *testing.T) { - t.Run("相对路径转换", func(t *testing.T) { - tmpDir := t.TempDir() - configPath := filepath.Join(tmpDir, "config.yaml") - - loader := NewConfigLoader(configPath) - if loader == nil { - t.Fatal("NewConfigLoader() returned nil") - } - - // 验证 baseDir 是配置文件所在目录 - expectedDir, _ := filepath.Abs(tmpDir) - if loader.baseDir != expectedDir { - t.Errorf("baseDir = %q, want %q", loader.baseDir, expectedDir) - } - }) - - t.Run("绝对路径保持不变", func(t *testing.T) { - absPath := "/etc/lolly/config.yaml" - loader := NewConfigLoader(absPath) - if loader == nil { - t.Fatal("NewConfigLoader() returned nil") - } - - if loader.baseDir != "/etc/lolly" { - t.Errorf("baseDir = %q, want /etc/lolly", loader.baseDir) - } - }) - - t.Run("初始化状态", func(t *testing.T) { - loader := NewConfigLoader("config.yaml") - - if loader.loadedFiles == nil { - t.Error("loadedFiles not initialized") - } - if loader.stack == nil { - t.Error("stack not initialized") - } - if loader.depth != 0 { - t.Errorf("depth = %d, want 0", loader.depth) - } - }) -} - -// TestConfigLoader_Load 测试配置加载。 -func TestConfigLoader_Load(t *testing.T) { - t.Run("单文件配置加载", func(t *testing.T) { - content := ` -servers: - - listen: ":8080" - name: "main" - static: - - path: "/" - root: "/var/www" -` - tmpDir := t.TempDir() - configPath := filepath.Join(tmpDir, "config.yaml") - if err := os.WriteFile(configPath, []byte(content), 0o644); err != nil { - t.Fatalf("写入配置文件失败: %v", err) - } - - loader := NewConfigLoader(configPath) - cfg, err := loader.Load(configPath) - if err != nil { - t.Fatalf("Load() 失败: %v", err) - } - - if len(cfg.Servers) != 1 { - t.Errorf("len(Servers) = %d, want 1", len(cfg.Servers)) - } - if cfg.Servers[0].Listen != ":8080" { - t.Errorf("Listen = %q, want :8080", cfg.Servers[0].Listen) - } - }) - - t.Run("文件不存在", func(t *testing.T) { - loader := NewConfigLoader("/nonexistent/config.yaml") - _, err := loader.Load("/nonexistent/config.yaml") - if err == nil { - t.Error("Load() 期望返回错误,但返回 nil") - } - }) - - t.Run("无效YAML", func(t *testing.T) { - content := `servers: [invalid yaml` - tmpDir := t.TempDir() - configPath := filepath.Join(tmpDir, "invalid.yaml") - if err := os.WriteFile(configPath, []byte(content), 0o644); err != nil { - t.Fatalf("写入配置文件失败: %v", err) - } - - loader := NewConfigLoader(configPath) - _, err := loader.Load(configPath) - if err == nil { - t.Error("Load() 期望返回错误,但返回 nil") - } - }) -} - -// TestConfigLoader_Include 测试 include 指令。 -func TestConfigLoader_Include(t *testing.T) { - t.Run("多文件include合并", func(t *testing.T) { - tmpDir := t.TempDir() - - // 主配置文件 - mainConfig := ` -servers: - - listen: ":8080" - name: "main" -include: - - path: "servers/*.yaml" -` - mainPath := filepath.Join(tmpDir, "main.yaml") - if err := os.WriteFile(mainPath, []byte(mainConfig), 0o644); err != nil { - t.Fatalf("写入主配置文件失败: %v", err) - } - - // 创建 servers 目录 - serversDir := filepath.Join(tmpDir, "servers") - if err := os.Mkdir(serversDir, 0o755); err != nil { - t.Fatalf("创建 servers 目录失败: %v", err) - } - - // 子配置文件1 - server1 := ` -servers: - - listen: ":8081" - name: "server1" -` - if err := os.WriteFile(filepath.Join(serversDir, "server1.yaml"), []byte(server1), 0o644); err != nil { - t.Fatalf("写入 server1.yaml 失败: %v", err) - } - - // 子配置文件2 - server2 := ` -servers: - - listen: ":8082" - name: "server2" -` - if err := os.WriteFile(filepath.Join(serversDir, "server2.yaml"), []byte(server2), 0o644); err != nil { - t.Fatalf("写入 server2.yaml 失败: %v", err) - } - - loader := NewConfigLoader(mainPath) - cfg, err := loader.Load(mainPath) - if err != nil { - t.Fatalf("Load() 失败: %v", err) - } - - // 应该有3个server: main + server1 + server2 - if len(cfg.Servers) != 3 { - t.Errorf("len(Servers) = %d, want 3", len(cfg.Servers)) - } - }) - - t.Run("循环引用检测", func(t *testing.T) { - tmpDir := t.TempDir() - - // a.yaml includes b.yaml - configA := ` -servers: - - listen: ":8080" - name: "a" -include: - - path: "b.yaml" -` - // b.yaml includes a.yaml (循环) - configB := ` -servers: - - listen: ":8081" - name: "b" -include: - - path: "a.yaml" -` - if err := os.WriteFile(filepath.Join(tmpDir, "a.yaml"), []byte(configA), 0o644); err != nil { - t.Fatalf("写入 a.yaml 失败: %v", err) - } - if err := os.WriteFile(filepath.Join(tmpDir, "b.yaml"), []byte(configB), 0o644); err != nil { - t.Fatalf("写入 b.yaml 失败: %v", err) - } - - loader := NewConfigLoader(filepath.Join(tmpDir, "a.yaml")) - _, err := loader.Load(filepath.Join(tmpDir, "a.yaml")) - if err == nil { - t.Error("Load() 期望返回循环引用错误,但返回 nil") - } - }) - - t.Run("深度超限", func(t *testing.T) { - tmpDir := t.TempDir() - - // 创建深度嵌套的配置文件链 - for i := range 12 { - config := ` -servers: - - listen: ":8080" -` - if i < 11 { - config += `include: - - path: "next.yaml" -` - } - filename := "config.yaml" - if i > 0 { - filename = "next.yaml" - } - // 每个层级创建子目录 - subDir := filepath.Join(tmpDir, "level"+string(rune('0'+i))) - _ = os.Mkdir(subDir, 0o755) - if i == 0 { - if err := os.WriteFile(filepath.Join(tmpDir, filename), []byte(config), 0o644); err != nil { - t.Fatalf("写入配置文件失败: %v", err) - } - } - } - - // 简化测试:直接测试深度限制 - loader := NewConfigLoader(filepath.Join(tmpDir, "config.yaml")) - loader.depth = 11 // 超过 maxIncludeDepth (10) - - _, err := loader.Load(filepath.Join(tmpDir, "config.yaml")) - if err == nil { - t.Error("Load() 期望返回深度超限错误,但返回 nil") - } - }) - - t.Run("DAG共享子配置", func(t *testing.T) { - tmpDir := t.TempDir() - - // shared.yaml - 被多处引用 - shared := ` -servers: - - listen: ":9090" - name: "shared" -` - if err := os.WriteFile(filepath.Join(tmpDir, "shared.yaml"), []byte(shared), 0o644); err != nil { - t.Fatalf("写入 shared.yaml 失败: %v", err) - } - - // main.yaml - 引用 shared.yaml 两次(应该只处理一次) - main := ` -servers: - - listen: ":8080" - name: "main" -include: - - path: "shared.yaml" - - path: "shared.yaml" -` - if err := os.WriteFile(filepath.Join(tmpDir, "main.yaml"), []byte(main), 0o644); err != nil { - t.Fatalf("写入 main.yaml 失败: %v", err) - } - - loader := NewConfigLoader(filepath.Join(tmpDir, "main.yaml")) - cfg, err := loader.Load(filepath.Join(tmpDir, "main.yaml")) - if err != nil { - t.Fatalf("Load() 失败: %v", err) - } - - // shared 应该只被处理一次 - sharedCount := 0 - for _, s := range cfg.Servers { - if s.Name == "shared" { - sharedCount++ - } - } - if sharedCount != 1 { - t.Errorf("shared server count = %d, want 1", sharedCount) - } - }) -} - -// TestConfigLoader_Merge 测试配置合并。 -func TestConfigLoader_Merge(t *testing.T) { - t.Run("server name冲突", func(t *testing.T) { - tmpDir := t.TempDir() - - main := ` -servers: - - listen: ":8080" - name: "duplicate" -include: - - path: "sub.yaml" -` - sub := ` -servers: - - listen: ":8081" - name: "duplicate" -` - if err := os.WriteFile(filepath.Join(tmpDir, "main.yaml"), []byte(main), 0o644); err != nil { - t.Fatalf("写入 main.yaml 失败: %v", err) - } - if err := os.WriteFile(filepath.Join(tmpDir, "sub.yaml"), []byte(sub), 0o644); err != nil { - t.Fatalf("写入 sub.yaml 失败: %v", err) - } - - loader := NewConfigLoader(filepath.Join(tmpDir, "main.yaml")) - _, err := loader.Load(filepath.Join(tmpDir, "main.yaml")) - if err == nil { - t.Error("Load() 期望返回 server name 冲突错误,但返回 nil") - } - }) - - t.Run("stream listen冲突", func(t *testing.T) { - tmpDir := t.TempDir() - - main := ` -stream: - - listen: "12345" - proxy_pass: "backend:54321" -include: - - path: "sub.yaml" -` - sub := ` -stream: - - listen: "12345" - proxy_pass: "backend2:54321" -` - if err := os.WriteFile(filepath.Join(tmpDir, "main.yaml"), []byte(main), 0o644); err != nil { - t.Fatalf("写入 main.yaml 失败: %v", err) - } - if err := os.WriteFile(filepath.Join(tmpDir, "sub.yaml"), []byte(sub), 0o644); err != nil { - t.Fatalf("写入 sub.yaml 失败: %v", err) - } - - loader := NewConfigLoader(filepath.Join(tmpDir, "main.yaml")) - _, err := loader.Load(filepath.Join(tmpDir, "main.yaml")) - if err == nil { - t.Error("Load() 期望返回 stream listen 冲突错误,但返回 nil") - } - }) - - t.Run("正常合并", func(t *testing.T) { - tmpDir := t.TempDir() - - main := ` -servers: - - listen: ":8080" - name: "main" -include: - - path: "sub.yaml" -` - sub := ` -servers: - - listen: ":8081" - name: "sub" -stream: - - listen: "12345" - proxy_pass: "backend:54321" -` - if err := os.WriteFile(filepath.Join(tmpDir, "main.yaml"), []byte(main), 0o644); err != nil { - t.Fatalf("写入 main.yaml 失败: %v", err) - } - if err := os.WriteFile(filepath.Join(tmpDir, "sub.yaml"), []byte(sub), 0o644); err != nil { - t.Fatalf("写入 sub.yaml 失败: %v", err) - } - - loader := NewConfigLoader(filepath.Join(tmpDir, "main.yaml")) - cfg, err := loader.Load(filepath.Join(tmpDir, "main.yaml")) - if err != nil { - t.Fatalf("Load() 失败: %v", err) - } - - if len(cfg.Servers) != 2 { - t.Errorf("len(Servers) = %d, want 2", len(cfg.Servers)) - } - if len(cfg.Stream) != 1 { - t.Errorf("len(Stream) = %d, want 1", len(cfg.Stream)) - } - }) -} - -// TestConfigLoader_Glob 测试 glob 模式展开。 -func TestConfigLoader_Glob(t *testing.T) { - t.Run("glob模式匹配", func(t *testing.T) { - tmpDir := t.TempDir() - - // 创建多个配置文件 - for i := range 3 { - content := ` -servers: - - listen: ":8080" -` - filename := filepath.Join(tmpDir, "server"+string(rune('0'+i+1))+".yaml") - if err := os.WriteFile(filename, []byte(content), 0o644); err != nil { - t.Fatalf("写入配置文件失败: %v", err) - } - } - - loader := NewConfigLoader(tmpDir) - files, err := loader.expandGlob(filepath.Join(tmpDir, "server*.yaml")) - if err != nil { - t.Fatalf("expandGlob() 失败: %v", err) - } - - if len(files) != 3 { - t.Errorf("len(files) = %d, want 3", len(files)) - } - }) - - t.Run("无匹配文件", func(t *testing.T) { - loader := NewConfigLoader("/tmp") - files, err := loader.expandGlob("/nonexistent/*.yaml") - if err != nil { - t.Fatalf("expandGlob() 失败: %v", err) - } - if len(files) != 0 { - t.Errorf("len(files) = %d, want 0", len(files)) - } - }) -} - -// TestConfigLoader_ResolvePath 测试路径解析。 -func TestConfigLoader_ResolvePath(t *testing.T) { - t.Run("相对路径解析", func(t *testing.T) { - loader := NewConfigLoader("/etc/lolly/config.yaml") - - result := loader.resolvePath("servers/config.yaml") - expected := "/etc/lolly/servers/config.yaml" - if result != expected { - t.Errorf("resolvePath() = %q, want %q", result, expected) - } - }) - - t.Run("绝对路径保持不变", func(t *testing.T) { - loader := NewConfigLoader("/etc/lolly/config.yaml") - - result := loader.resolvePath("/absolute/path.yaml") - if result != "/absolute/path.yaml" { - t.Errorf("resolvePath() = %q, want /absolute/path.yaml", result) - } - }) -} diff --git a/internal/config/validate.go b/internal/config/validate.go index 2148821..69d9a6e 100644 --- a/internal/config/validate.go +++ b/internal/config/validate.go @@ -129,14 +129,7 @@ func ValidateNonNegative[T SignedInteger](value T, fieldName string) error { return nil } -// ValidateNonNegativeInt64 验证 int64 值为非负数 -// Deprecated: 使用 ValidateNonNegative[int64] 代替 -func ValidateNonNegativeInt64(value int64, fieldName string) error { - return ValidateNonNegative(value, fieldName) -} - // ValidateNonNegativeDuration 验证 time.Duration 值为非负数 -// Deprecated: 使用 ValidateNonNegative[int64] 代替(time.Duration 是 int64 别名) func ValidateNonNegativeDuration(value time.Duration, fieldName string) error { return ValidateNonNegative(int64(value), fieldName) } diff --git a/internal/lua/api_log.go b/internal/lua/api_log.go index e2f951a..f9e8967 100644 --- a/internal/lua/api_log.go +++ b/internal/lua/api_log.go @@ -407,7 +407,6 @@ func luaSchedulerLog(L *glua.LState) int { // 在实际实现中,可以通过 engine 的 logger 输出 _ = level _ = msg - // fmt.Printf("[timer] %s\n", msg) return 0 } diff --git a/internal/server/start_integration_test.go b/internal/server/start_integration_test.go index cbeeeb5..bfe0a16 100644 --- a/internal/server/start_integration_test.go +++ b/internal/server/start_integration_test.go @@ -514,6 +514,7 @@ func TestStart_WithDelayedBackend(t *testing.T) { // 启动延迟的 mock 服务器 backendAddr, cleanup := tools.DelayedMockBackend( 100*time.Millisecond, + fasthttp.StatusOK, []byte(`{"message": "delayed response"}`), ) defer cleanup() @@ -544,9 +545,9 @@ func TestStart_WithDelayedBackend(t *testing.T) { func TestStart_WithRandomResponse(t *testing.T) { // 启动随机响应的 mock 服务器 backendAddr, cleanup := tools.StartMockFasthttpBackend(tools.MockBackendConfig{ - Mode: tools.ModeRandomResponse, - StatusCode: fasthttp.StatusOK, - Body: []byte(`{"random": true}`), + Mode: tools.ModeRandomResponse, + StatusCode: fasthttp.StatusOK, + ResponseBody: []byte(`{"random": true}`), }) defer cleanup()