From bec8932561bbb6de3e62fa61acb8337b6a512709 Mon Sep 17 00:00:00 2001 From: xfy Date: Thu, 16 Apr 2026 16:47:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(server):=20=E6=B7=BB=E5=8A=A0=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E6=B8=85=E7=90=86=20API=20=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 PurgeHandler 处理器,支持: - 按精确路径和通配符模式清理缓存 - HTTP 方法过滤(默认 GET) - IP 白名单访问控制(CIDR/单 IP/localhost) - Token 认证保护 - 三种启动模式路由注册 Co-Authored-By: Claude Opus 4.6 --- internal/cache/file_cache.go | 19 ++ internal/cache/purge.go | 21 +- internal/proxy/proxy.go | 6 + internal/server/purge.go | 229 +++++++++++++++++++ internal/server/purge_test.go | 408 ++++++++++++++++++++++++++++++++++ internal/server/server.go | 30 +++ 6 files changed, 708 insertions(+), 5 deletions(-) create mode 100644 internal/server/purge.go create mode 100644 internal/server/purge_test.go diff --git a/internal/cache/file_cache.go b/internal/cache/file_cache.go index c18e3fd..8ef51d2 100644 --- a/internal/cache/file_cache.go +++ b/internal/cache/file_cache.go @@ -20,6 +20,7 @@ package cache import ( "container/list" "slices" + "strings" "sync" "time" ) @@ -457,6 +458,24 @@ func (c *ProxyCache) Delete(hashKey uint64) { delete(c.entries, hashKey) } +// DeleteByPatternWithMethod 按通配符模式删除缓存条目。 +// method 过滤:检查 entry.OrigKey 是否以 "method:" 前缀开头。 +// 空 method 匹配所有条目。 +func (c *ProxyCache) DeleteByPatternWithMethod(pattern string, method string) int { + c.mu.Lock() + defer c.mu.Unlock() + deleted := 0 + for hashKey, entry := range c.entries { + if MatchPattern(pattern, entry.OrigKey) { + if method == "" || strings.HasPrefix(entry.OrigKey, method+":") { + delete(c.entries, hashKey) + deleted++ + } + } + } + return deleted +} + // Clear 清空代理缓存。 func (c *ProxyCache) Clear() { c.mu.Lock() diff --git a/internal/cache/purge.go b/internal/cache/purge.go index a010d39..b9ff496 100644 --- a/internal/cache/purge.go +++ b/internal/cache/purge.go @@ -44,6 +44,9 @@ type PurgeRequest struct { // Pattern 通配符模式(支持 * 通配符) Pattern string `json:"pattern,omitempty"` + + // Method HTTP 方法,默认 "GET" + Method string `json:"method,omitempty"` } // PurgeResponse 清理响应结构。 @@ -249,15 +252,23 @@ func (p *PurgeAPI) purgeByPattern(pattern string) int { return deleted } +// HashPathWithMethod 使用 FNV-64a 计算缓存键的哈希值。 +// method 为空时默认使用 "GET"。 +func HashPathWithMethod(path string, method string) uint64 { + if method == "" { + method = "GET" + } + key := method + ":" + path + h := fnv.New64a() + h.Write([]byte(key)) + return h.Sum64() +} + // hashPath 使用 FNV-64a 计算路径的哈希值。 // 与代理层 buildCacheKeyHash 使用相同的算法,确保一致性。 // 注意:代理层的 key 格式为 "METHOD:URI",purge 时默认使用 GET 方法。 func hashPath(path string) uint64 { - // 默认使用 GET 方法,与代理层 key 格式一致 - key := "GET:" + path - h := fnv.New64a() - h.Write([]byte(key)) - return h.Sum64() + return HashPathWithMethod(path, "GET") } // MatchPattern 检查路径是否匹配通配符模式。 diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 748d6e3..cacb6ad 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -1005,6 +1005,12 @@ func (p *Proxy) backgroundRefresh(ctx *fasthttp.RequestCtx, target *loadbalance. p.cache.Set(hashKey, origKey, resp.Body(), headers, resp.StatusCode(), p.getCacheDuration(resp.StatusCode())) } +// GetCache 返回代理的 ProxyCache 实例(用于 purge handler)。 +// 如果缓存未启用,返回 nil。 +func (p *Proxy) GetCache() *cache.ProxyCache { + return p.cache +} + // GetCacheStats 返回代理缓存的统计信息。 // 如果缓存未启用,返回 nil。 func (p *Proxy) GetCacheStats() *cache.ProxyCacheStats { diff --git a/internal/server/purge.go b/internal/server/purge.go new file mode 100644 index 0000000..4e72de3 --- /dev/null +++ b/internal/server/purge.go @@ -0,0 +1,229 @@ +// Package server 提供 HTTP 服务器核心功能。 +// +// 该文件实现缓存清理 API 处理器,支持主动清理代理缓存。 +package server + +import ( + "encoding/json" + "net" + "net/netip" + "strings" + + "github.com/valyala/fasthttp" + "rua.plus/lolly/internal/cache" + "rua.plus/lolly/internal/config" + "rua.plus/lolly/internal/netutil" +) + +// PurgeHandler 缓存清理 API 处理器。 +// +// 持有 Server 引用以访问所有代理实例的缓存。 +// 支持 IP 白名单和 Token 认证保护。 +// +// 注意事项: +// - 仅处理 POST 请求 +// - 支持按路径和按模式两种清理方式 +// - method 参数支持指定 HTTP 方法(默认 GET) +type PurgeHandler struct { + server *Server + auth config.CacheAPIAuthConfig + path string + allowed []net.IPNet +} + +// NewPurgeHandler 创建缓存清理 API 处理器。 +// +// 解析 IP 白名单配置,支持 CIDR 格式和单个 IP。 +// localhost 特殊处理为 127.0.0.1 和 ::1。 +// +// 参数: +// - server: Server 实例,用于访问代理缓存 +// - cfg: CacheAPI 配置 +// +// 返回值: +// - *PurgeHandler: 配置好的处理器 +// - error: IP 解析失败时返回非 nil 错误 +func NewPurgeHandler(server *Server, cfg *config.CacheAPIConfig) (*PurgeHandler, error) { + h := &PurgeHandler{ + server: server, + path: cfg.Path, + auth: cfg.Auth, + } + + // 默认路径 + if h.path == "" { + h.path = "/_cache/purge" + } + + // 解析允许的 IP 列表 + for _, cidr := range cfg.Allow { + // 处理 localhost 特殊情况 + if cidr == "localhost" { + _, v4Network, _ := net.ParseCIDR("127.0.0.1/32") + _, v6Network, _ := net.ParseCIDR("::1/128") + if v4Network != nil { + h.allowed = append(h.allowed, *v4Network) + } + if v6Network != nil { + h.allowed = append(h.allowed, *v6Network) + } + continue + } + + _, network, err := net.ParseCIDR(cidr) + if err != nil { + // 尝试作为单个 IP 解析 + ip, err := netip.ParseAddr(cidr) + if err != nil { + return nil, err + } + // 转换为 CIDR 格式 + if ip.Is4() { + _, network, _ = net.ParseCIDR(cidr + "/32") + } else { + _, network, _ = net.ParseCIDR(cidr + "/128") + } + } + if network != nil { + h.allowed = append(h.allowed, *network) + } + } + + return h, nil +} + +// Path 返回 API 端点路径。 +func (h *PurgeHandler) Path() string { + return h.path +} + +// ServeHTTP 处理缓存清理请求。 +// +// 仅处理 POST 请求,支持精确路径和通配符模式清理。 +// 返回 JSON 格式的响应。 +func (h *PurgeHandler) ServeHTTP(ctx *fasthttp.RequestCtx) { + // 仅允许 POST 方法 + if string(ctx.Method()) != "POST" { + h.sendError(ctx, fasthttp.StatusMethodNotAllowed, "method not allowed") + return + } + + // 检查 IP 访问权限 + if !h.checkAccess(ctx) { + h.sendError(ctx, fasthttp.StatusForbidden, "forbidden") + return + } + + // 检查认证 + if !h.checkAuth(ctx) { + h.sendError(ctx, fasthttp.StatusUnauthorized, "unauthorized") + return + } + + // 解析请求体 + var req cache.PurgeRequest + if err := json.Unmarshal(ctx.PostBody(), &req); err != nil { + h.sendError(ctx, fasthttp.StatusBadRequest, "invalid request body") + return + } + + // 执行清理 + deleted := 0 + if req.Path != "" { + deleted = h.purgeByPath(req.Path, req.Method) + } else if req.Pattern != "" { + deleted = h.purgeByPattern(req.Pattern, req.Method) + } else { + h.sendError(ctx, fasthttp.StatusBadRequest, "missing path or pattern") + return + } + + // 返回响应 + ctx.SetContentType("application/json; charset=utf-8") + ctx.SetStatusCode(fasthttp.StatusOK) + _ = json.NewEncoder(ctx).Encode(cache.PurgeResponse{Deleted: deleted}) +} + +// checkAccess 检查客户端 IP 是否在允许列表中。 +func (h *PurgeHandler) checkAccess(ctx *fasthttp.RequestCtx) bool { + // 如果没有配置允许列表,允许所有访问 + if len(h.allowed) == 0 { + return true + } + + clientIP := netutil.ExtractClientIPNet(ctx) + if clientIP == nil { + return false + } + + // 检查是否在允许列表中 + for _, network := range h.allowed { + if network.Contains(clientIP) { + return true + } + } + + return false +} + +// checkAuth 检查认证。 +func (h *PurgeHandler) checkAuth(ctx *fasthttp.RequestCtx) bool { + // 无需认证 + if h.auth.Type == "" || h.auth.Type == "none" { + return true + } + + // Token 认证 + if h.auth.Type == "token" { + authHeader := ctx.Request.Header.Peek("Authorization") + if len(authHeader) == 0 { + return false + } + + authStr := string(authHeader) + // 支持 Bearer token 格式 + if token, ok := strings.CutPrefix(authStr, "Bearer "); ok { + return token == h.auth.Token + } + + // 也支持直接传递 token + return authStr == h.auth.Token + } + + return false +} + +// purgeByPath 按精确路径清理缓存。 +func (h *PurgeHandler) purgeByPath(path string, method string) int { + hashKey := cache.HashPathWithMethod(path, method) + deleted := 0 + + for _, p := range h.server.proxies { + if pcache := p.GetCache(); pcache != nil { + pcache.Delete(hashKey) + deleted++ + } + } + + return deleted +} + +// purgeByPattern 按通配符模式清理缓存。 +func (h *PurgeHandler) purgeByPattern(pattern string, method string) int { + deleted := 0 + + for _, p := range h.server.proxies { + if pcache := p.GetCache(); pcache != nil { + deleted += pcache.DeleteByPatternWithMethod(pattern, method) + } + } + + return deleted +} + +// sendError 发送错误响应。 +func (h *PurgeHandler) sendError(ctx *fasthttp.RequestCtx, status int, errMsg string) { + ctx.SetContentType("application/json; charset=utf-8") + ctx.SetStatusCode(status) + _ = json.NewEncoder(ctx).Encode(cache.PurgeErrorResponse{Error: errMsg}) +} diff --git a/internal/server/purge_test.go b/internal/server/purge_test.go new file mode 100644 index 0000000..31360d4 --- /dev/null +++ b/internal/server/purge_test.go @@ -0,0 +1,408 @@ +// Package server 提供缓存清理处理器功能的测试。 +// +// 该文件测试 PurgeHandler 模块的各项功能,包括: +// - 路径配置(默认和自定义) +// - localhost 特殊处理和 CIDR 解析 +// - IP 白名单访问控制 +// - Token 认证 +// +// 作者:xfy +package server + +import ( + "net" + "testing" + + "github.com/valyala/fasthttp" + "rua.plus/lolly/internal/config" +) + +func TestPurgeHandler_Path(t *testing.T) { + tests := []struct { + name string + cfgPath string + wantPath string + }{ + { + name: "default path", + cfgPath: "", + wantPath: "/_cache/purge", + }, + { + name: "custom path", + cfgPath: "/api/purge", + wantPath: "/api/purge", + }, + { + name: "custom path with version prefix", + cfgPath: "/api/v1/cache/purge", + wantPath: "/api/v1/cache/purge", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: tt.cfgPath, + Allow: []string{}, + } + + h, err := NewPurgeHandler(nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if h.Path() != tt.wantPath { + t.Errorf("expected path %s, got %s", tt.wantPath, h.Path()) + } + }) + } +} + +func TestPurgeHandler_NewPurgeHandler(t *testing.T) { + t.Run("localhost special handling", func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: "/_cache/purge", + Allow: []string{"localhost"}, + } + + h, err := NewPurgeHandler(nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(h.allowed) != 2 { + t.Fatalf("expected 2 allowed networks for localhost, got %d", len(h.allowed)) + } + + // 验证包含 127.0.0.1/32 + _, v4Net, _ := net.ParseCIDR("127.0.0.1/32") + if v4Net == nil { + t.Fatal("failed to parse 127.0.0.1/32") + } + foundV4 := false + for _, n := range h.allowed { + if n.String() == v4Net.String() { + foundV4 = true + break + } + } + if !foundV4 { + t.Error("expected 127.0.0.1/32 in allowed networks") + } + + // 验证包含 ::1/128 + _, v6Net, _ := net.ParseCIDR("::1/128") + if v6Net == nil { + t.Fatal("failed to parse ::1/128") + } + foundV6 := false + for _, n := range h.allowed { + if n.String() == v6Net.String() { + foundV6 = true + break + } + } + if !foundV6 { + t.Error("expected ::1/128 in allowed networks") + } + }) + + t.Run("CIDR parsing", func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: "/_cache/purge", + Allow: []string{"10.0.0.0/8", "172.16.0.0/12"}, + } + + h, err := NewPurgeHandler(nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(h.allowed) != 2 { + t.Errorf("expected 2 allowed networks, got %d", len(h.allowed)) + } + }) + + t.Run("single IP parsed as CIDR", func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: "/_cache/purge", + Allow: []string{"192.168.1.100"}, + } + + h, err := NewPurgeHandler(nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(h.allowed) != 1 { + t.Fatalf("expected 1 allowed network, got %d", len(h.allowed)) + } + + // 单 IP 应转换为 /32 CIDR + if h.allowed[0].String() != "192.168.1.100/32" { + t.Errorf("expected 192.168.1.100/32, got %s", h.allowed[0].String()) + } + }) + + t.Run("invalid IP returns error", func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: "/_cache/purge", + Allow: []string{"not-an-ip"}, + } + + _, err := NewPurgeHandler(nil, cfg) + if err == nil { + t.Error("expected error for invalid IP, got nil") + } + }) +} + +func TestPurgeHandler_checkAccess(t *testing.T) { + tests := []struct { + name string + allow []string + clientIP string + wantAccess bool + }{ + { + name: "no allow list - open access", + allow: []string{}, + clientIP: "1.2.3.4", + wantAccess: true, + }, + { + name: "CIDR match", + allow: []string{"192.168.0.0/16"}, + clientIP: "192.168.1.100", + wantAccess: true, + }, + { + name: "CIDR no match", + allow: []string{"10.0.0.0/8"}, + clientIP: "192.168.1.100", + wantAccess: false, + }, + { + name: "single IP match", + allow: []string{"127.0.0.1"}, + clientIP: "127.0.0.1", + wantAccess: true, + }, + { + name: "single IP no match", + allow: []string{"127.0.0.1"}, + clientIP: "127.0.0.2", + wantAccess: false, + }, + { + name: "localhost allows 127.0.0.1", + allow: []string{"localhost"}, + clientIP: "127.0.0.1", + wantAccess: true, + }, + { + name: "localhost allows ::1", + allow: []string{"localhost"}, + clientIP: "::1", + wantAccess: true, + }, + { + name: "localhost denies other IP", + allow: []string{"localhost"}, + clientIP: "10.0.0.1", + wantAccess: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: "/_cache/purge", + Allow: tt.allow, + } + + h, err := NewPurgeHandler(nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(h.allowed) == 0 { + // 无白名单时应允许所有访问 + if !h.checkAccess(nil) { + t.Error("expected access to be true when no allow list configured") + } + return + } + + // 直接测试 IP 是否在 allowed 列表中 + ip := net.ParseIP(tt.clientIP) + if ip == nil { + t.Fatalf("failed to parse client IP: %s", tt.clientIP) + } + + found := false + for _, network := range h.allowed { + if network.Contains(ip) { + found = true + break + } + } + + if found != tt.wantAccess { + t.Errorf("expected access %v, got %v", tt.wantAccess, found) + } + }) + } +} + +func TestPurgeHandler_checkAuth(t *testing.T) { + t.Run("no auth configured", func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: "/_cache/purge", + Auth: config.CacheAPIAuthConfig{ + Type: "", + Token: "", + }, + } + + h, err := NewPurgeHandler(nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ctx := &fasthttp.RequestCtx{} + if !h.checkAuth(ctx) { + t.Error("expected auth to pass when no auth configured") + } + }) + + t.Run("auth type none", func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: "/_cache/purge", + Auth: config.CacheAPIAuthConfig{ + Type: "none", + Token: "", + }, + } + + h, err := NewPurgeHandler(nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ctx := &fasthttp.RequestCtx{} + if !h.checkAuth(ctx) { + t.Error("expected auth to pass when type is none") + } + }) + + t.Run("token auth - correct Bearer token", func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: "/_cache/purge", + Auth: config.CacheAPIAuthConfig{ + Type: "token", + Token: "secret-token", + }, + } + + h, err := NewPurgeHandler(nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ctx := &fasthttp.RequestCtx{} + ctx.Request.Header.Set("Authorization", "Bearer secret-token") + + if !h.checkAuth(ctx) { + t.Error("expected auth to pass with correct Bearer token") + } + }) + + t.Run("token auth - correct direct token", func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: "/_cache/purge", + Auth: config.CacheAPIAuthConfig{ + Type: "token", + Token: "secret-token", + }, + } + + h, err := NewPurgeHandler(nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ctx := &fasthttp.RequestCtx{} + ctx.Request.Header.Set("Authorization", "secret-token") + + if !h.checkAuth(ctx) { + t.Error("expected auth to pass with correct direct token") + } + }) + + t.Run("token auth - wrong token", func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: "/_cache/purge", + Auth: config.CacheAPIAuthConfig{ + Type: "token", + Token: "secret-token", + }, + } + + h, err := NewPurgeHandler(nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ctx := &fasthttp.RequestCtx{} + ctx.Request.Header.Set("Authorization", "Bearer wrong-token") + + if h.checkAuth(ctx) { + t.Error("expected auth to fail with wrong token") + } + }) + + t.Run("token auth - missing header", func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: "/_cache/purge", + Auth: config.CacheAPIAuthConfig{ + Type: "token", + Token: "secret-token", + }, + } + + h, err := NewPurgeHandler(nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ctx := &fasthttp.RequestCtx{} + + if h.checkAuth(ctx) { + t.Error("expected auth to fail when Authorization header is missing") + } + }) + + t.Run("token auth - unknown type", func(t *testing.T) { + cfg := &config.CacheAPIConfig{ + Path: "/_cache/purge", + Auth: config.CacheAPIAuthConfig{ + Type: "basic", + Token: "secret-token", + }, + } + + h, err := NewPurgeHandler(nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ctx := &fasthttp.RequestCtx{} + ctx.Request.Header.Set("Authorization", "Bearer secret-token") + + if h.checkAuth(ctx) { + t.Error("expected auth to fail for unknown auth type") + } + }) +} diff --git a/internal/server/server.go b/internal/server/server.go index c279ae2..0a3eeef 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -485,6 +485,16 @@ func (s *Server) startSingleMode() error { } } + // 注册缓存清理 API(如果配置) + if serverCfg.CacheAPI != nil && serverCfg.CacheAPI.Enabled { + purgeHandler, err := NewPurgeHandler(s, serverCfg.CacheAPI) + if err != nil { + logging.Error().Msg("创建缓存清理处理器失败: " + err.Error()) + } else { + router.POST(purgeHandler.Path(), purgeHandler.ServeHTTP) + } + } + // 注册代理路由 s.registerProxyRoutes(router, serverCfg) @@ -605,7 +615,16 @@ func (s *Server) startVHostMode() error { } } + // 注册缓存清理 API(如果配置) defaultSrv := s.config.GetDefaultServerFromList() + if defaultSrv != nil && defaultSrv.CacheAPI != nil && defaultSrv.CacheAPI.Enabled { + purgeHandler, err := NewPurgeHandler(s, defaultSrv.CacheAPI) + if err != nil { + logging.Error().Msg("创建缓存清理处理器失败: " + err.Error()) + } else { + router.POST(purgeHandler.Path(), purgeHandler.ServeHTTP) + } + } s.registerProxyRoutes(router, defaultSrv) @@ -713,6 +732,17 @@ func (s *Server) startMultiServerMode() error { // 创建路由器 router := handler.NewRouter() + + // 注册缓存清理 API(仅第一个服务器) + if idx == 0 && serverCfg.CacheAPI != nil && serverCfg.CacheAPI.Enabled { + purgeHandler, purgeErr := NewPurgeHandler(s, serverCfg.CacheAPI) + if purgeErr != nil { + errCh <- fmt.Errorf("创建缓存清理处理器失败 (server[%d]): %w", idx, purgeErr) + return + } + router.POST(purgeHandler.Path(), purgeHandler.ServeHTTP) + } + s.registerProxyRoutes(router, serverCfg) // 静态文件服务