feat(server): 添加缓存清理 API 支持

新增 PurgeHandler 处理器,支持:
- 按精确路径和通配符模式清理缓存
- HTTP 方法过滤(默认 GET)
- IP 白名单访问控制(CIDR/单 IP/localhost)
- Token 认证保护
- 三种启动模式路由注册

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
xfy 2026-04-16 16:47:10 +08:00
parent 87cd41a81f
commit bec8932561
6 changed files with 708 additions and 5 deletions

View File

@ -20,6 +20,7 @@ package cache
import (
"container/list"
"slices"
"strings"
"sync"
"time"
)
@ -457,6 +458,24 @@ func (c *ProxyCache) Delete(hashKey uint64) {
delete(c.entries, hashKey)
}
// DeleteByPatternWithMethod 按通配符模式删除缓存条目。
// method 过滤:检查 entry.OrigKey 是否以 "method:" 前缀开头。
// 空 method 匹配所有条目。
func (c *ProxyCache) DeleteByPatternWithMethod(pattern string, method string) int {
c.mu.Lock()
defer c.mu.Unlock()
deleted := 0
for hashKey, entry := range c.entries {
if MatchPattern(pattern, entry.OrigKey) {
if method == "" || strings.HasPrefix(entry.OrigKey, method+":") {
delete(c.entries, hashKey)
deleted++
}
}
}
return deleted
}
// Clear 清空代理缓存。
func (c *ProxyCache) Clear() {
c.mu.Lock()

View File

@ -44,6 +44,9 @@ type PurgeRequest struct {
// Pattern 通配符模式(支持 * 通配符)
Pattern string `json:"pattern,omitempty"`
// Method HTTP 方法,默认 "GET"
Method string `json:"method,omitempty"`
}
// PurgeResponse 清理响应结构。
@ -249,15 +252,23 @@ func (p *PurgeAPI) purgeByPattern(pattern string) int {
return deleted
}
// HashPathWithMethod 使用 FNV-64a 计算缓存键的哈希值。
// method 为空时默认使用 "GET"。
func HashPathWithMethod(path string, method string) uint64 {
if method == "" {
method = "GET"
}
key := method + ":" + path
h := fnv.New64a()
h.Write([]byte(key))
return h.Sum64()
}
// hashPath 使用 FNV-64a 计算路径的哈希值。
// 与代理层 buildCacheKeyHash 使用相同的算法,确保一致性。
// 注意:代理层的 key 格式为 "METHOD:URI"purge 时默认使用 GET 方法。
func hashPath(path string) uint64 {
// 默认使用 GET 方法,与代理层 key 格式一致
key := "GET:" + path
h := fnv.New64a()
h.Write([]byte(key))
return h.Sum64()
return HashPathWithMethod(path, "GET")
}
// MatchPattern 检查路径是否匹配通配符模式。

View File

@ -1005,6 +1005,12 @@ func (p *Proxy) backgroundRefresh(ctx *fasthttp.RequestCtx, target *loadbalance.
p.cache.Set(hashKey, origKey, resp.Body(), headers, resp.StatusCode(), p.getCacheDuration(resp.StatusCode()))
}
// GetCache 返回代理的 ProxyCache 实例(用于 purge handler
// 如果缓存未启用,返回 nil。
func (p *Proxy) GetCache() *cache.ProxyCache {
return p.cache
}
// GetCacheStats 返回代理缓存的统计信息。
// 如果缓存未启用,返回 nil。
func (p *Proxy) GetCacheStats() *cache.ProxyCacheStats {

229
internal/server/purge.go Normal file
View File

@ -0,0 +1,229 @@
// Package server 提供 HTTP 服务器核心功能。
//
// 该文件实现缓存清理 API 处理器,支持主动清理代理缓存。
package server
import (
"encoding/json"
"net"
"net/netip"
"strings"
"github.com/valyala/fasthttp"
"rua.plus/lolly/internal/cache"
"rua.plus/lolly/internal/config"
"rua.plus/lolly/internal/netutil"
)
// PurgeHandler 缓存清理 API 处理器。
//
// 持有 Server 引用以访问所有代理实例的缓存。
// 支持 IP 白名单和 Token 认证保护。
//
// 注意事项:
// - 仅处理 POST 请求
// - 支持按路径和按模式两种清理方式
// - method 参数支持指定 HTTP 方法(默认 GET
type PurgeHandler struct {
server *Server
auth config.CacheAPIAuthConfig
path string
allowed []net.IPNet
}
// NewPurgeHandler 创建缓存清理 API 处理器。
//
// 解析 IP 白名单配置,支持 CIDR 格式和单个 IP。
// localhost 特殊处理为 127.0.0.1 和 ::1。
//
// 参数:
// - server: Server 实例,用于访问代理缓存
// - cfg: CacheAPI 配置
//
// 返回值:
// - *PurgeHandler: 配置好的处理器
// - error: IP 解析失败时返回非 nil 错误
func NewPurgeHandler(server *Server, cfg *config.CacheAPIConfig) (*PurgeHandler, error) {
h := &PurgeHandler{
server: server,
path: cfg.Path,
auth: cfg.Auth,
}
// 默认路径
if h.path == "" {
h.path = "/_cache/purge"
}
// 解析允许的 IP 列表
for _, cidr := range cfg.Allow {
// 处理 localhost 特殊情况
if cidr == "localhost" {
_, v4Network, _ := net.ParseCIDR("127.0.0.1/32")
_, v6Network, _ := net.ParseCIDR("::1/128")
if v4Network != nil {
h.allowed = append(h.allowed, *v4Network)
}
if v6Network != nil {
h.allowed = append(h.allowed, *v6Network)
}
continue
}
_, network, err := net.ParseCIDR(cidr)
if err != nil {
// 尝试作为单个 IP 解析
ip, err := netip.ParseAddr(cidr)
if err != nil {
return nil, err
}
// 转换为 CIDR 格式
if ip.Is4() {
_, network, _ = net.ParseCIDR(cidr + "/32")
} else {
_, network, _ = net.ParseCIDR(cidr + "/128")
}
}
if network != nil {
h.allowed = append(h.allowed, *network)
}
}
return h, nil
}
// Path 返回 API 端点路径。
func (h *PurgeHandler) Path() string {
return h.path
}
// ServeHTTP 处理缓存清理请求。
//
// 仅处理 POST 请求,支持精确路径和通配符模式清理。
// 返回 JSON 格式的响应。
func (h *PurgeHandler) ServeHTTP(ctx *fasthttp.RequestCtx) {
// 仅允许 POST 方法
if string(ctx.Method()) != "POST" {
h.sendError(ctx, fasthttp.StatusMethodNotAllowed, "method not allowed")
return
}
// 检查 IP 访问权限
if !h.checkAccess(ctx) {
h.sendError(ctx, fasthttp.StatusForbidden, "forbidden")
return
}
// 检查认证
if !h.checkAuth(ctx) {
h.sendError(ctx, fasthttp.StatusUnauthorized, "unauthorized")
return
}
// 解析请求体
var req cache.PurgeRequest
if err := json.Unmarshal(ctx.PostBody(), &req); err != nil {
h.sendError(ctx, fasthttp.StatusBadRequest, "invalid request body")
return
}
// 执行清理
deleted := 0
if req.Path != "" {
deleted = h.purgeByPath(req.Path, req.Method)
} else if req.Pattern != "" {
deleted = h.purgeByPattern(req.Pattern, req.Method)
} else {
h.sendError(ctx, fasthttp.StatusBadRequest, "missing path or pattern")
return
}
// 返回响应
ctx.SetContentType("application/json; charset=utf-8")
ctx.SetStatusCode(fasthttp.StatusOK)
_ = json.NewEncoder(ctx).Encode(cache.PurgeResponse{Deleted: deleted})
}
// checkAccess 检查客户端 IP 是否在允许列表中。
func (h *PurgeHandler) checkAccess(ctx *fasthttp.RequestCtx) bool {
// 如果没有配置允许列表,允许所有访问
if len(h.allowed) == 0 {
return true
}
clientIP := netutil.ExtractClientIPNet(ctx)
if clientIP == nil {
return false
}
// 检查是否在允许列表中
for _, network := range h.allowed {
if network.Contains(clientIP) {
return true
}
}
return false
}
// checkAuth 检查认证。
func (h *PurgeHandler) checkAuth(ctx *fasthttp.RequestCtx) bool {
// 无需认证
if h.auth.Type == "" || h.auth.Type == "none" {
return true
}
// Token 认证
if h.auth.Type == "token" {
authHeader := ctx.Request.Header.Peek("Authorization")
if len(authHeader) == 0 {
return false
}
authStr := string(authHeader)
// 支持 Bearer token 格式
if token, ok := strings.CutPrefix(authStr, "Bearer "); ok {
return token == h.auth.Token
}
// 也支持直接传递 token
return authStr == h.auth.Token
}
return false
}
// purgeByPath 按精确路径清理缓存。
func (h *PurgeHandler) purgeByPath(path string, method string) int {
hashKey := cache.HashPathWithMethod(path, method)
deleted := 0
for _, p := range h.server.proxies {
if pcache := p.GetCache(); pcache != nil {
pcache.Delete(hashKey)
deleted++
}
}
return deleted
}
// purgeByPattern 按通配符模式清理缓存。
func (h *PurgeHandler) purgeByPattern(pattern string, method string) int {
deleted := 0
for _, p := range h.server.proxies {
if pcache := p.GetCache(); pcache != nil {
deleted += pcache.DeleteByPatternWithMethod(pattern, method)
}
}
return deleted
}
// sendError 发送错误响应。
func (h *PurgeHandler) sendError(ctx *fasthttp.RequestCtx, status int, errMsg string) {
ctx.SetContentType("application/json; charset=utf-8")
ctx.SetStatusCode(status)
_ = json.NewEncoder(ctx).Encode(cache.PurgeErrorResponse{Error: errMsg})
}

View File

@ -0,0 +1,408 @@
// Package server 提供缓存清理处理器功能的测试。
//
// 该文件测试 PurgeHandler 模块的各项功能,包括:
// - 路径配置(默认和自定义)
// - localhost 特殊处理和 CIDR 解析
// - IP 白名单访问控制
// - Token 认证
//
// 作者xfy
package server
import (
"net"
"testing"
"github.com/valyala/fasthttp"
"rua.plus/lolly/internal/config"
)
func TestPurgeHandler_Path(t *testing.T) {
tests := []struct {
name string
cfgPath string
wantPath string
}{
{
name: "default path",
cfgPath: "",
wantPath: "/_cache/purge",
},
{
name: "custom path",
cfgPath: "/api/purge",
wantPath: "/api/purge",
},
{
name: "custom path with version prefix",
cfgPath: "/api/v1/cache/purge",
wantPath: "/api/v1/cache/purge",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: tt.cfgPath,
Allow: []string{},
}
h, err := NewPurgeHandler(nil, cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if h.Path() != tt.wantPath {
t.Errorf("expected path %s, got %s", tt.wantPath, h.Path())
}
})
}
}
func TestPurgeHandler_NewPurgeHandler(t *testing.T) {
t.Run("localhost special handling", func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: "/_cache/purge",
Allow: []string{"localhost"},
}
h, err := NewPurgeHandler(nil, cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(h.allowed) != 2 {
t.Fatalf("expected 2 allowed networks for localhost, got %d", len(h.allowed))
}
// 验证包含 127.0.0.1/32
_, v4Net, _ := net.ParseCIDR("127.0.0.1/32")
if v4Net == nil {
t.Fatal("failed to parse 127.0.0.1/32")
}
foundV4 := false
for _, n := range h.allowed {
if n.String() == v4Net.String() {
foundV4 = true
break
}
}
if !foundV4 {
t.Error("expected 127.0.0.1/32 in allowed networks")
}
// 验证包含 ::1/128
_, v6Net, _ := net.ParseCIDR("::1/128")
if v6Net == nil {
t.Fatal("failed to parse ::1/128")
}
foundV6 := false
for _, n := range h.allowed {
if n.String() == v6Net.String() {
foundV6 = true
break
}
}
if !foundV6 {
t.Error("expected ::1/128 in allowed networks")
}
})
t.Run("CIDR parsing", func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: "/_cache/purge",
Allow: []string{"10.0.0.0/8", "172.16.0.0/12"},
}
h, err := NewPurgeHandler(nil, cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(h.allowed) != 2 {
t.Errorf("expected 2 allowed networks, got %d", len(h.allowed))
}
})
t.Run("single IP parsed as CIDR", func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: "/_cache/purge",
Allow: []string{"192.168.1.100"},
}
h, err := NewPurgeHandler(nil, cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(h.allowed) != 1 {
t.Fatalf("expected 1 allowed network, got %d", len(h.allowed))
}
// 单 IP 应转换为 /32 CIDR
if h.allowed[0].String() != "192.168.1.100/32" {
t.Errorf("expected 192.168.1.100/32, got %s", h.allowed[0].String())
}
})
t.Run("invalid IP returns error", func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: "/_cache/purge",
Allow: []string{"not-an-ip"},
}
_, err := NewPurgeHandler(nil, cfg)
if err == nil {
t.Error("expected error for invalid IP, got nil")
}
})
}
func TestPurgeHandler_checkAccess(t *testing.T) {
tests := []struct {
name string
allow []string
clientIP string
wantAccess bool
}{
{
name: "no allow list - open access",
allow: []string{},
clientIP: "1.2.3.4",
wantAccess: true,
},
{
name: "CIDR match",
allow: []string{"192.168.0.0/16"},
clientIP: "192.168.1.100",
wantAccess: true,
},
{
name: "CIDR no match",
allow: []string{"10.0.0.0/8"},
clientIP: "192.168.1.100",
wantAccess: false,
},
{
name: "single IP match",
allow: []string{"127.0.0.1"},
clientIP: "127.0.0.1",
wantAccess: true,
},
{
name: "single IP no match",
allow: []string{"127.0.0.1"},
clientIP: "127.0.0.2",
wantAccess: false,
},
{
name: "localhost allows 127.0.0.1",
allow: []string{"localhost"},
clientIP: "127.0.0.1",
wantAccess: true,
},
{
name: "localhost allows ::1",
allow: []string{"localhost"},
clientIP: "::1",
wantAccess: true,
},
{
name: "localhost denies other IP",
allow: []string{"localhost"},
clientIP: "10.0.0.1",
wantAccess: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: "/_cache/purge",
Allow: tt.allow,
}
h, err := NewPurgeHandler(nil, cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(h.allowed) == 0 {
// 无白名单时应允许所有访问
if !h.checkAccess(nil) {
t.Error("expected access to be true when no allow list configured")
}
return
}
// 直接测试 IP 是否在 allowed 列表中
ip := net.ParseIP(tt.clientIP)
if ip == nil {
t.Fatalf("failed to parse client IP: %s", tt.clientIP)
}
found := false
for _, network := range h.allowed {
if network.Contains(ip) {
found = true
break
}
}
if found != tt.wantAccess {
t.Errorf("expected access %v, got %v", tt.wantAccess, found)
}
})
}
}
func TestPurgeHandler_checkAuth(t *testing.T) {
t.Run("no auth configured", func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: "/_cache/purge",
Auth: config.CacheAPIAuthConfig{
Type: "",
Token: "",
},
}
h, err := NewPurgeHandler(nil, cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ctx := &fasthttp.RequestCtx{}
if !h.checkAuth(ctx) {
t.Error("expected auth to pass when no auth configured")
}
})
t.Run("auth type none", func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: "/_cache/purge",
Auth: config.CacheAPIAuthConfig{
Type: "none",
Token: "",
},
}
h, err := NewPurgeHandler(nil, cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ctx := &fasthttp.RequestCtx{}
if !h.checkAuth(ctx) {
t.Error("expected auth to pass when type is none")
}
})
t.Run("token auth - correct Bearer token", func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: "/_cache/purge",
Auth: config.CacheAPIAuthConfig{
Type: "token",
Token: "secret-token",
},
}
h, err := NewPurgeHandler(nil, cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ctx := &fasthttp.RequestCtx{}
ctx.Request.Header.Set("Authorization", "Bearer secret-token")
if !h.checkAuth(ctx) {
t.Error("expected auth to pass with correct Bearer token")
}
})
t.Run("token auth - correct direct token", func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: "/_cache/purge",
Auth: config.CacheAPIAuthConfig{
Type: "token",
Token: "secret-token",
},
}
h, err := NewPurgeHandler(nil, cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ctx := &fasthttp.RequestCtx{}
ctx.Request.Header.Set("Authorization", "secret-token")
if !h.checkAuth(ctx) {
t.Error("expected auth to pass with correct direct token")
}
})
t.Run("token auth - wrong token", func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: "/_cache/purge",
Auth: config.CacheAPIAuthConfig{
Type: "token",
Token: "secret-token",
},
}
h, err := NewPurgeHandler(nil, cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ctx := &fasthttp.RequestCtx{}
ctx.Request.Header.Set("Authorization", "Bearer wrong-token")
if h.checkAuth(ctx) {
t.Error("expected auth to fail with wrong token")
}
})
t.Run("token auth - missing header", func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: "/_cache/purge",
Auth: config.CacheAPIAuthConfig{
Type: "token",
Token: "secret-token",
},
}
h, err := NewPurgeHandler(nil, cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ctx := &fasthttp.RequestCtx{}
if h.checkAuth(ctx) {
t.Error("expected auth to fail when Authorization header is missing")
}
})
t.Run("token auth - unknown type", func(t *testing.T) {
cfg := &config.CacheAPIConfig{
Path: "/_cache/purge",
Auth: config.CacheAPIAuthConfig{
Type: "basic",
Token: "secret-token",
},
}
h, err := NewPurgeHandler(nil, cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ctx := &fasthttp.RequestCtx{}
ctx.Request.Header.Set("Authorization", "Bearer secret-token")
if h.checkAuth(ctx) {
t.Error("expected auth to fail for unknown auth type")
}
})
}

View File

@ -485,6 +485,16 @@ func (s *Server) startSingleMode() error {
}
}
// 注册缓存清理 API如果配置
if serverCfg.CacheAPI != nil && serverCfg.CacheAPI.Enabled {
purgeHandler, err := NewPurgeHandler(s, serverCfg.CacheAPI)
if err != nil {
logging.Error().Msg("创建缓存清理处理器失败: " + err.Error())
} else {
router.POST(purgeHandler.Path(), purgeHandler.ServeHTTP)
}
}
// 注册代理路由
s.registerProxyRoutes(router, serverCfg)
@ -605,7 +615,16 @@ func (s *Server) startVHostMode() error {
}
}
// 注册缓存清理 API如果配置
defaultSrv := s.config.GetDefaultServerFromList()
if defaultSrv != nil && defaultSrv.CacheAPI != nil && defaultSrv.CacheAPI.Enabled {
purgeHandler, err := NewPurgeHandler(s, defaultSrv.CacheAPI)
if err != nil {
logging.Error().Msg("创建缓存清理处理器失败: " + err.Error())
} else {
router.POST(purgeHandler.Path(), purgeHandler.ServeHTTP)
}
}
s.registerProxyRoutes(router, defaultSrv)
@ -713,6 +732,17 @@ func (s *Server) startMultiServerMode() error {
// 创建路由器
router := handler.NewRouter()
// 注册缓存清理 API仅第一个服务器
if idx == 0 && serverCfg.CacheAPI != nil && serverCfg.CacheAPI.Enabled {
purgeHandler, purgeErr := NewPurgeHandler(s, serverCfg.CacheAPI)
if purgeErr != nil {
errCh <- fmt.Errorf("创建缓存清理处理器失败 (server[%d]): %w", idx, purgeErr)
return
}
router.POST(purgeHandler.Path(), purgeHandler.ServeHTTP)
}
s.registerProxyRoutes(router, serverCfg)
// 静态文件服务