Compare commits

...

5 Commits

Author SHA1 Message Date
xfy
f33117b940 fix(handler,http2,loadbalance,logging,resolver,ssl): fix high severity issues
- handler/static.go: add sync.RWMutex to StaticHandler; protect Handle
  with RLock and all setters with Lock to prevent data races
- http2/server.go: delete empty connection slice keys from pool map to
  prevent memory leak under high client churn
- loadbalance/slow_start.go: recreate stopCh in Start() to support
  Start-Stop-Start cycles; guard double-close in Stop()
- resolver/resolver.go: recreate stopCh in Start() to support restart
- logging/logging.go: save *os.File handles from getOutput so Close()
  actually closes log files; exclude os.Stdout/os.Stderr from closing
- ssl/session_tickets.go: protect started/rotateTimer access in
  scheduleRotation with mu; support Start-Stop-Start cycles
- ssl/ssl.go: cache parsed default certificate to avoid re-parsing on
  every TLS handshake for OCSP stapling
2026-06-11 17:03:17 +08:00
xfy
27e00b84a8 fix(proxy,handler,server,stream,ratelimit): fix resource leaks and functional bugs
- proxy/proxy.go: decrement connection count on dangerous path rejection
  (line 724) to prevent connection count leak
- handler/sendfile_linux.go: return *os.File from getSocketFile and let
  linuxSendfile close it, fixing EBADF from deferred close in getSocketFd
- proxy/websocket.go: return bufio.Reader from readWebSocketUpgradeResponse
  and wrap targetConn with bufferedConn to consume pre-buffered frame data,
  preventing first-frame loss
- server/pool.go: use non-blocking send after starting new worker to avoid
  deadlock when queue is full
- stream/stream.go: check stopCh on non-timeout UDP read errors to prevent
  infinite loop and shutdown deadlock
- middleware/ratelimit: replace select-based close guard with sync.Once in
  StopCleanup to prevent double-close panic
2026-06-11 16:35:10 +08:00
xfy
fe0dee4da3 fix(compression,ssl,server,lua): resolve data races and concurrency bugs
- compression: move sync.Pool.New initialization into constructors to
  eliminate lazy-init race in Get()
- ssl/ocsp: copy response fields under RLock before releasing, preventing
  race with concurrent writers in refreshAll
- server: change proxiesMu from sync.Mutex to sync.RWMutex; protect
  getProxyCacheStats and purge handlers with RLock to prevent races
  with proxy registration
- lua/api_timer: fix double-decrement race in Cancel vs executeTimer
  by using timer.Stop() result to determine who decrements active
- lua/api_socket_tcp: fix nil pointer race in ConnectAsync by checking
  currentOp under lock before Connect returns
2026-06-11 16:30:11 +08:00
xfy
e733273139 fix(server,app,proxy,resolver,middleware,lua): add nil guards and safe defaults
- server: reject Start() when config is nil to prevent panic
- app_common: guard empty Servers slice in initHTTP2/3 and logServerAddresses
- proxy/health: handle nil HealthCheckConfig with defaults
- resolver: handle nil ResolverConfig by returning noopResolver
- middleware/headers: skip UpdateConfig when cfg is nil
- middleware/sliding_window: enforce minimum window duration of 1s
- lua/api_log: map EMERG/ALERT/CRIT to Error() instead of Fatal()
  to prevent Lua scripts from killing the entire server process
2026-06-11 16:23:04 +08:00
xfy
818aa23739 fix(logging,mimeutil,variable): correct data corruption and behavior bugs
- logging: pre-allocate fresh slice for request field to avoid mutating
  fasthttp internal buffers via append into slices with excess capacity
- mimeutil: move defaultMIME fallback before cache insertion so unknown
  extensions are consistently cached as application/octet-stream
- builtin: use -0700 timezone format instead of hardcoded +0800; cache
  generated request_id in UserValue to prevent different IDs per expansion
- variable: initialize maps in fallback Context to prevent nil map panic
2026-06-11 16:22:55 +08:00
32 changed files with 436 additions and 107 deletions

View File

@ -0,0 +1,173 @@
# 深度审查修复计划
## 目标
修复 6 个 review subagent 发现的 Critical 和 High 级别问题。
## 策略
按影响范围和修复复杂度分 4 个批次执行,每批独立提交。
## 批次 1明确的崩溃/数据损坏Critical最容易验证
### 1.1 logging.go: append 污染 fasthttp buffer
- **文件**: `internal/logging/logging.go:147`
- **问题**: `append(append(ctx.Method(), ' '), ctx.Path()...)` 可能 mutate fasthttp 内部 buffer
- **修复**: 预先分配新的 slice`req := make([]byte, 0, len(method)+1+len(path))`
- **测试**: 验证 request 字段不修改原始 buffer
### 1.2 mimeutil/detect.go: 缓存空字符串 bug
- **文件**: `internal/mimeutil/detect.go:150`
- **问题**: 未知扩展名时先插入空字符串再替换成 defaultMIME
- **修复**: 在缓存插入前完成 defaultMIME 回退,或用 RLock 读 + Lock 写
- **测试**: 多次查找未知扩展名都应返回 `application/octet-stream`
### 1.3 server.go nil/empty config panic
- **文件**: `internal/server/server.go:270-295, 470`
- **问题**: `Start()` / `startSingleMode()` 访问 `s.config.*``Servers[0]` 无前置检查
- **修复**: `Start()` 开头加 nil check 和 `len(Servers) > 0` check`startSingleMode` 同理
- **测试**: `New(nil).Start()` 应返回 error 而非 panic
### 1.4 app_common.go empty servers panic
- **文件**: `internal/app/app_common.go:176, 202`
- **问题**: `initHTTP3()`, `initHTTP2()` 访问 `a.cfg.Servers[0]` 未检查
- **修复**: 加 `len(a.cfg.Servers) > 0` guard
- **测试**: 空 servers 配置应跳过 HTTP2/3 初始化
### 1.5 proxy/health.go nil cfg panic
- **文件**: `internal/proxy/health.go:80`
- **问题**: `NewHealthChecker` 直接读 `cfg.Interval`
- **修复**: `if cfg == nil { cfg = &config.HealthCheckConfig{} }`
- **测试**: `NewHealthChecker(targets, nil)` 不 panic
### 1.6 resolver.go nil cfg panic
- **文件**: `internal/resolver/resolver.go:134`
- **问题**: `New(cfg)``cfg.Enabled`
- **修复**: `if cfg == nil || !cfg.Enabled { return &noopResolver{} }`
- **测试**: `New(nil)` 不 panic
### 1.7 lua/api_log.go Fatal kills server
- **文件**: `internal/lua/api_log.go:250`
- **问题**: `EMERG/ALERT/CRIT` 调用 `logger.Fatal().Msg()``os.Exit(1)`
- **修复**: 全部映射到非致命的 `Error()`Lua 日志级别不应 kill 进程)
- **测试**: 调用 `ngx.log(ngx.EMERG, ...)` 不退出进程
---
## 批次 2并发 / Race
### 2.1 compression pool.New race
- **文件**: `internal/middleware/compression/compression.go:75-83`
- **问题**: `compressorPool.Get()` 懒初始化 `pool.New`
- **修复**: 在 `newGzipPool`/`newBrotliPool` 构造时就设置 `pool.New`
- **测试**: `go test -race` 多次 Get 不报错
### 2.2 ssl/ocsp data race
- **文件**: `internal/ssl/ocsp.go:397-426`
- **问题**: RUnlock 后读 `resp.status/nextUpdate`
- **修复**: 整个函数体在 RLock 保护下完成,或把可变字段改为 atomic
- **测试**: `go test -race ./internal/ssl/...`
### 2.3 server lifecycle/purge race on proxies slice
- **文件**: `internal/server/lifecycle.go:222-230`, `internal/server/purge.go:127,145`
- **问题**: 无锁读 `s.proxies`
- **修复**: 用 `s.proxiesMu.RLock()` 包裹读循环
- **测试**: 并行 stats/purge 和 create proxy 时不触发 race
### 2.4 lua/api_timer active race
- **文件**: `internal/lua/api_timer.go:228-265, 303-325`
- **问题**: `executeTimer``Cancel` 都递减 active向 closed callbackQueue 发送会 panic
- **修复**: 只在 `executeTimer` defer 中递减 active`Cancel` 仅关闭 cancel channelcallbackQueue 发送加锁保护
- **测试**: 并发 Cancel 和触发不 panic`active` 不会为负
### 2.5 lua/api_socket_tcp ConnectAsync race
- **文件**: `internal/lua/api_socket_tcp.go:196-205, 640-642`
- **问题**: goroutine 设置 `s.currentOp = nil`caller 访问 op.ID
- **修复**: `Connect` 直接返回 `*SocketOperation` 给 caller不要通过字段传递
- **测试**: 快速完成连接的 localhost 不触发 nil panic
---
## 批次 3资源泄漏 / 功能损坏
### 3.1 proxy connection count leak (2 places)
- **文件**: `internal/proxy/proxy.go:889-896, 898-919`
- **问题**: X-Accel-Redirect 和重试路径漏了 `DecrementConnections`
- **修复**: 两个 return/continue 前都调用 `loadbalance.DecrementConnections(target)`
- **测试**: 内部 redirect 和 retry 后连接计数归零
### 3.2 server/pool.go deadlock
- **文件**: `internal/server/pool.go:178-184`
- **问题**: queue full 时启动 worker 后立即 blocking send可能死锁
- **修复**: 用非阻塞 send或在启动 worker 时直接把 task 传给 worker
- **测试**: 持续 Submit 满载任务不阻塞
### 3.3 handler/sendfile_linux.go FD closed before use
- **文件**: `internal/handler/sendfile_linux.go:151-169`
- **问题**: `getSocketFd` 返回 FD 后 `defer file.Close()` 已执行
- **修复**: 移除 defer`linuxSendfile` 负责关闭;或重构 syscall 实现
- **测试**: Linux sendfile 实际生效
### 3.4 proxy/websocket.go bufio data loss
- **文件**: `internal/proxy/websocket.go:335-349, 415-427`
- **问题**: `bufio.Reader` 缓冲的 frame 数据在返回后被丢弃
- **修复**: 桥接前 drain `resp.Body`,或复用同一个 `bufio.Reader`
- **测试**: WebSocket 连接不丢首帧
### 3.5 stream/stream.go UDP shutdown deadlock
- **文件**: `internal/stream/stream.go:966-1001`
- **问题**: UDP serve 在 `Stop()` 后不退出,`wg.Wait()` 死锁
- **修复**: 非 timeout 错误时检查 `stopCh``Stop()` 设置 `udpSrv.running.Store(false)`
- **测试**: UDP server Stop 能正常完成
### 3.6 stream/stream.go upstream name mismatch
- **文件**: `internal/stream/stream.go:596-608`
- **问题**: `AddUpstream` 用 name`handleConnection` 用 listener addr 查表
- **修复**: 统一 key或建立 listener→upstream 映射
- **测试**: TCP stream proxy 转发到后端
### 3.7 ratelimit token bucket cleanup leak
- **文件**: `internal/middleware/security/ratelimit.go:132-162, 476-512`
- **问题**: cleanup goroutine 不停止;`StopCleanup` double-close panic
- **修复**: `sync.Once` 保护 closereload 时调用 StopCleanup
- **测试**: 多次创建/销毁 limiter 不泄漏 goroutine不 panic
---
## 批次 4High severity
### 4.1 middleware/security/headers.go nil cfg
- 在 `UpdateConfig``addHeaders` 中做 nil guard
### 4.2 middleware/security/sliding_window.go div by zero
- 构造函数中校验 `window > 0`
### 4.3 handler/static.go data race
- setter 和 Handle 并发访问字段;加 `sync.RWMutex`
### 4.4 proxy/proxy_dns.go HostClient.Addr race
- DNS 更新时不能并发修改 live client 的 Addr
### 4.5 loadbalance/slow_start.go Start/Stop bug
- `stopCh` 重建、`findTarget` 未初始化
### 4.6 resolver restart broken
- `stopCh` 关闭后不可重用;`Start()` 重新创建
### 4.7 variable/variable.go nil map panic
- fallback Context 中初始化所有 map
### 4.8 variable/builtin.go request_id/time_local
- request_id 只生成一次并存入 UserValuetime_local 用 `-0700`
### 4.9 logging.go file handle leak
- `New()` 中保存 `*os.File` 以便 `Close()` 关闭
---
## 验证
每个批次完成后:
1. `go test -race` 针对修改的包
2. `make lint`
3. `go test ./internal/...`
4. `make build`
全部完成后做一次全量验证。

View File

@ -94,6 +94,9 @@ func (a *App) initVariables() {
func (a *App) logServerAddresses() {
a.logger.LogStartup("Config loaded successfully", map[string]string{"config_path": a.cfgPath})
if len(a.cfg.Servers) == 0 {
return
}
mode := a.cfg.GetMode()
if mode == config.ServerModeMultiServer {
for i, srv := range a.cfg.Servers {
@ -173,7 +176,7 @@ func (a *App) initStreamServers() {
// initHTTP3 starts the HTTP/3 server if enabled.
func (a *App) initHTTP3() {
if !a.cfg.HTTP3.Enabled || a.cfg.Servers[0].SSL.Cert == "" {
if len(a.cfg.Servers) == 0 || !a.cfg.HTTP3.Enabled || a.cfg.Servers[0].SSL.Cert == "" {
return
}
@ -199,7 +202,7 @@ func (a *App) initHTTP3() {
// initHTTP2 starts the HTTP/2 server if enabled.
func (a *App) initHTTP2() {
if !a.cfg.Servers[0].SSL.HTTP2.Enabled || a.cfg.Servers[0].SSL.Cert == "" {
if len(a.cfg.Servers) == 0 || !a.cfg.Servers[0].SSL.HTTP2.Enabled || a.cfg.Servers[0].SSL.Cert == "" {
return
}

View File

@ -80,10 +80,12 @@ func SendFile(ctx *fasthttp.RequestCtx, file *os.File, offset, length int64) err
// 返回值:
// - error: 传输过程中的错误nil 表示成功
func linuxSendfile(conn net.Conn, fileFd uintptr, _, length int64) error {
socketFd, err := getSocketFd(conn)
socketFile, err := getSocketFile(conn)
if err != nil {
return err
}
defer func() { _ = socketFile.Close() }()
socketFd := socketFile.Fd()
// Linux sendfile: sendfile(out_fd, in_fd, offset, count)
var sent int64
@ -148,23 +150,13 @@ func linuxSendfile(conn net.Conn, fileFd uintptr, _, length int64) error {
// 返回值:
// - uintptr: socket 文件描述符,失败时返回 0
// - error: 获取失败时的错误,不支持的连接类型返回 ENOTSUP
func getSocketFd(conn net.Conn) (uintptr, error) {
func getSocketFile(conn net.Conn) (*os.File, error) {
switch c := conn.(type) {
case *net.TCPConn:
file, err := c.File()
if err != nil {
return 0, err
}
defer func() { _ = file.Close() }()
return file.Fd(), nil
return c.File()
case *net.UnixConn:
file, err := c.File()
if err != nil {
return 0, err
}
defer func() { _ = file.Close() }()
return file.Fd(), nil
return c.File()
default:
return 0, syscall.ENOTSUP
return nil, syscall.ENOTSUP
}
}

View File

@ -114,7 +114,7 @@ func TestCopyFile(t *testing.T) {
// TestGetSocketFd_NilConn 测试 nil 连接的情况
func TestGetSocketFd_NilConn(t *testing.T) {
_, err := getSocketFd(nil)
_, err := getSocketFile(nil)
if err == nil {
t.Error("expected error for nil connection")
}
@ -123,7 +123,7 @@ func TestGetSocketFd_NilConn(t *testing.T) {
// TestGetSocketFd_UnsupportedType 测试不支持的连接类型
func TestGetSocketFd_UnsupportedType(t *testing.T) {
conn := &mockConn{}
_, err := getSocketFd(conn)
_, err := getSocketFile(conn)
if err != syscall.ENOTSUP {
t.Errorf("expected ENOTSUP for unsupported conn type, got: %v", err)
}
@ -131,7 +131,7 @@ func TestGetSocketFd_UnsupportedType(t *testing.T) {
// mockConn 是一个不实现 TCPConn/UnixConn 的模拟连接。
//
// 用于测试 getSocketFd 对不支持连接类型的处理。
// 用于测试 getSocketFile 对不支持连接类型的处理。
type mockConn struct{}
func (m *mockConn) Read([]byte) (n int, err error) { return 0, nil }
@ -449,11 +449,11 @@ func TestGetSocketFd_UnixConn(t *testing.T) {
}
defer clientConn.Close()
fd, err := getSocketFd(clientConn)
f, err := getSocketFile(clientConn)
if err != nil {
t.Errorf("getSocketFd failed: %v", err)
t.Errorf("getSocketFile failed: %v", err)
}
if fd == 0 {
if f == nil {
t.Error("Expected non-zero fd")
}
@ -616,11 +616,11 @@ func TestGetSocketFd_TCPConn(t *testing.T) {
}
defer clientConn.Close()
fd, err := getSocketFd(clientConn)
f, err := getSocketFile(clientConn)
if err != nil {
t.Errorf("getSocketFd failed for TCPConn: %v", err)
t.Errorf("getSocketFile failed for TCPConn: %v", err)
}
if fd == 0 {
if f == nil {
t.Error("Expected non-zero fd for TCPConn")
}

View File

@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/valyala/fasthttp"
@ -84,6 +85,7 @@ func (h *StaticHandler) statWithCache(filePath string) (os.FileInfo, bool, error
// - 大文件(>= 8KB自动启用零拷贝传输
// - alias 与 root 互斥,同时配置时 alias 优先
type StaticHandler struct {
mu sync.RWMutex // 保护以下字段的并发访问
// 指针类型字段(按大小排列)
fileCache *cache.FileCache
fileInfoCache *FileInfoCache // FileInfo 缓存,减少 os.Stat 调用
@ -161,6 +163,8 @@ func NewStaticHandler(root, pathPrefix string, index []string, useSendfile bool)
// 参数:
// - alias: 路径别名
func (h *StaticHandler) SetAlias(alias string) {
h.mu.Lock()
defer h.mu.Unlock()
h.alias = alias
if alias != "" {
h.root = ""
@ -201,6 +205,8 @@ func (h *StaticHandler) buildFilePath(relPath string) string {
// - 仅对小于 1MB 的文件启用缓存
// - 缓存会自动检测文件修改并更新
func (h *StaticHandler) SetFileCache(fc *cache.FileCache) {
h.mu.Lock()
defer h.mu.Unlock()
h.fileCache = fc
}
@ -217,6 +223,8 @@ func (h *StaticHandler) SetFileCache(fc *cache.FileCache) {
//
// handler.SetGzipStatic(true, nil, []string{".gz", ".br"})
func (h *StaticHandler) SetGzipStatic(enabled bool, extensions, precompressedExtensions []string) {
h.mu.Lock()
defer h.mu.Unlock()
if enabled {
h.gzipStatic = compression.NewGzipStatic(true, h.root, extensions, precompressedExtensions)
}
@ -236,6 +244,8 @@ func (h *StaticHandler) SetGzipStatic(enabled bool, extensions, precompressedExt
//
// handler.SetTryFiles([]string{"$uri", "$uri/", "/index.html"}, false, nil)
func (h *StaticHandler) SetTryFiles(tryFiles []string, tryFilesPass bool, router *Router) {
h.mu.Lock()
defer h.mu.Unlock()
h.tryFiles = tryFiles
h.tryFilesPass = tryFilesPass
h.router = router
@ -249,6 +259,8 @@ func (h *StaticHandler) SetTryFiles(tryFiles []string, tryFilesPass bool, router
// 参数:
// - enabled: 是否启用符号链接安全检查
func (h *StaticHandler) SetSymlinkCheck(enabled bool) {
h.mu.Lock()
defer h.mu.Unlock()
h.symlinkCheck = enabled
}
@ -260,6 +272,8 @@ func (h *StaticHandler) SetSymlinkCheck(enabled bool) {
// 参数:
// - enabled: 是否启用内部访问限制
func (h *StaticHandler) SetInternal(enabled bool) {
h.mu.Lock()
defer h.mu.Unlock()
h.internal = enabled
}
@ -271,6 +285,8 @@ func (h *StaticHandler) SetInternal(enabled bool) {
// 参数:
// - expires: 过期时间字符串
func (h *StaticHandler) SetExpires(expires string) {
h.mu.Lock()
defer h.mu.Unlock()
h.expires = expires
}
@ -284,6 +300,8 @@ func (h *StaticHandler) SetExpires(expires string) {
// - localtime: 使用本地时间
// - exactSize: 显示精确大小
func (h *StaticHandler) SetAutoIndex(enabled bool, format string, localtime, exactSize bool) {
h.mu.Lock()
defer h.mu.Unlock()
h.autoIndex = enabled
h.autoIndexFormat = format
h.autoIndexLocaltime = localtime
@ -304,6 +322,8 @@ func (h *StaticHandler) SetAutoIndex(enabled bool, format string, localtime, exa
//
// 默认 TTL 为 5 秒。
func (h *StaticHandler) SetCacheTTL(ttl time.Duration) {
h.mu.Lock()
defer h.mu.Unlock()
h.cacheTTL = ttl
if h.fileInfoCache != nil {
h.fileInfoCache.SetTTL(ttl)
@ -328,6 +348,9 @@ func (h *StaticHandler) SetCacheTTL(ttl time.Duration) {
// 7. 大文件使用零拷贝传输
// 8. 读取文件并存入缓存
func (h *StaticHandler) Handle(ctx *fasthttp.RequestCtx) {
h.mu.RLock()
defer h.mu.RUnlock()
reqPath := string(ctx.Path())
// 检查 internal 限制

View File

@ -308,7 +308,12 @@ func (p *connectionPool) remove(key string, conn net.Conn) {
conns := p.conns[key]
for i, c := range conns {
if c == conn {
p.conns[key] = append(conns[:i], conns[i+1:]...)
conns = append(conns[:i], conns[i+1:]...)
if len(conns) == 0 {
delete(p.conns, key)
} else {
p.conns[key] = conns
}
break
}
}

View File

@ -94,6 +94,13 @@ func (m *SlowStartManager) Start() {
return // 已经在运行
}
// 重建 stopCh 以支持 Start-Stop-Start 周期
select {
case <-m.stopCh:
m.stopCh = make(chan struct{})
default:
}
go m.updateLoop()
}
@ -102,7 +109,12 @@ func (m *SlowStartManager) Stop() {
if !m.running.Swap(false) {
return
}
close(m.stopCh)
select {
case <-m.stopCh:
// 已经关闭
default:
close(m.stopCh)
}
}
// updateLoop 后台更新循环。

View File

@ -94,12 +94,15 @@ func New(cfg *config.LoggingConfig) *Logger {
}
accessWriter := getOutput(cfg.Access.Path)
errorWriter := getOutput(cfg.Error.Path)
logger := &Logger{
accessFormat: cfg.Access.Format,
accessWriter: accessWriter,
accessFile: writerFile(accessWriter),
errorFile: writerFile(errorWriter),
accessLog: zerolog.New(accessWriter).With().Timestamp().Logger(),
errorLog: zerolog.New(getOutput(cfg.Error.Path)).Level(parseLevel(cfg.Error.Level)).With().Timestamp().Logger(),
errorLog: zerolog.New(errorWriter).Level(parseLevel(cfg.Error.Level)).With().Timestamp().Logger(),
}
return logger
@ -133,6 +136,14 @@ func getOutput(path string) io.Writer {
return f
}
// writerFile 从 io.Writer 中提取底层的 *os.File如果不是或为标准流则返回 nil。
func writerFile(w io.Writer) *os.File {
if f, ok := w.(*os.File); ok && f != os.Stdout && f != os.Stderr {
return f
}
return nil
}
// LogAccess 记录访问日志(全局实例)。
//
// 使用全局 log 实例记录 HTTP 请求的基本信息,包括方法、路径、状态码、
@ -142,9 +153,15 @@ func getOutput(path string) io.Writer {
func (l *Logger) LogAccess(ctx *fasthttp.RequestCtx, status int, size int64, duration time.Duration) {
// JSON 格式或空格式:输出结构化 JSON
if l.accessFormat == formatJSON || l.accessFormat == "" {
method := ctx.Method()
path := ctx.Path()
req := make([]byte, 0, len(method)+1+len(path))
req = append(req, method...)
req = append(req, ' ')
req = append(req, path...)
l.accessLog.Info().
Str("remote_addr", netutil.FormatRemoteAddr(ctx)).
Bytes("request", append(append(ctx.Method(), ' '), ctx.Path()...)).
Bytes("request", req).
Int("status", status).
Int64("body_bytes_sent", size).
Dur("request_time", duration).

View File

@ -248,7 +248,7 @@ func (api *ngxLogAPI) luaLog(L *glua.LState) int {
if api.logger != nil {
switch level {
case LogEmerg, LogAlert, LogCrit:
api.logger.Fatal().Msg(msg)
api.logger.Error().Str("lua_level", "critical").Msg(msg)
case LogErr:
api.logger.Error().Msg(msg)
case LogWarn:

View File

@ -194,6 +194,13 @@ func (s *TCPSocket) Connect(host string, port int) error {
// - *SocketOperation: 连接操作实例
// - error: 连接失败时返回错误
func (s *TCPSocket) ConnectAsync(_ *glua.LState, host string, port int) (*SocketOperation, error) {
s.mu.Lock()
if s.currentOp != nil {
s.mu.Unlock()
return nil, fmt.Errorf("socket already has an active operation")
}
s.mu.Unlock()
err := s.Connect(host, port)
if err != nil {
return nil, err

View File

@ -309,9 +309,10 @@ func (m *TimerManager) Cancel(handle *TimerHandle) bool {
return false // 定时器不存在或已执行
}
// 停止定时器
// 停止定时器;如果成功停止,回调不会执行
stopped := true
if entry.timer != nil {
entry.timer.Stop()
stopped = entry.timer.Stop()
}
// 发送取消信号
@ -319,7 +320,11 @@ func (m *TimerManager) Cancel(handle *TimerHandle) bool {
// 清理
delete(m.timers, entry.id)
m.active.Add(-1)
// 如果成功停止定时器(回调不会执行),递减 active
if stopped {
m.active.Add(-1)
}
return true
}

View File

@ -47,7 +47,7 @@ type compressorPool struct {
// newGzipPool 创建 gzip writer 池。
func newGzipPool(level int) *compressorPool {
return &compressorPool{
p := &compressorPool{
level: level,
factory: func(level int) resettableWriteCloser {
w, err := gzip.NewWriterLevel(nil, level)
@ -57,11 +57,13 @@ func newGzipPool(level int) *compressorPool {
return w
},
}
p.pool.New = func() any { return p.factory(p.level) }
return p
}
// newBrotliPool 创建 brotli writer 池。
func newBrotliPool(level int) *compressorPool {
return &compressorPool{
p := &compressorPool{
level: level,
factory: func(level int) resettableWriteCloser {
return brotli.NewWriterOptions(nil, brotli.WriterOptions{
@ -69,15 +71,12 @@ func newBrotliPool(level int) *compressorPool {
})
},
}
p.pool.New = func() any { return p.factory(p.level) }
return p
}
// Get 从池中获取 writer。
func (p *compressorPool) Get() (resettableWriteCloser, bool) {
if p.pool.New == nil {
p.pool.New = func() any {
return p.factory(p.level)
}
}
v, ok := p.pool.Get().(resettableWriteCloser)
return v, ok
}

View File

@ -211,6 +211,9 @@ func formatHSTSValue(maxAge int, includeSubDomains bool, preload bool) string {
// 参数:
// - cfg: 新的安全头配置
func (sh *HeadersMiddleware) UpdateConfig(cfg *config.SecurityHeaders) {
if cfg == nil {
return
}
sh.mu.Lock()
sh.config = cfg
sh.formatHSTS()

View File

@ -70,6 +70,7 @@ type RateLimiter struct {
cleanupTicker *time.Ticker
stopCleanupCh chan struct{}
cleanupDone chan struct{}
stopOnce sync.Once
rate float64
burst float64
}
@ -494,18 +495,9 @@ func (rl *RateLimiter) startCleanup(interval time.Duration) {
// 发送停止信号并等待 goroutine 完成,确保资源正确释放。
// 该方法应在限流器不再使用时调用(如服务器关闭时)。
func (rl *RateLimiter) StopCleanup() {
// 使用原子操作或简单的标志检查来避免竞争
// 关闭 stopCleanupCh 会广播给所有等待的 goroutine
select {
case <-rl.stopCleanupCh:
// 已经关闭
return
default:
}
if rl.cleanupTicker != nil {
rl.cleanupTicker.Stop()
close(rl.stopCleanupCh)
rl.stopOnce.Do(func() { close(rl.stopCleanupCh) })
<-rl.cleanupDone
rl.cleanupTicker = nil // 防止重复关闭
}

View File

@ -87,6 +87,9 @@ type windowCounter struct {
// 返回值:
// - *SlidingWindowLimiter: 创建的限流器实例
func NewSlidingWindowLimiter(window time.Duration, limit int, precise bool) *SlidingWindowLimiter {
if window <= 0 {
window = time.Second
}
s := &SlidingWindowLimiter{
window: window,
limit: limit,

View File

@ -146,16 +146,17 @@ func DetectContentType(filePath string) string {
}
}
// 插入新条目
entry := &mimeCacheEntry{ext: ext, mimeType: mimeType}
entry.element = mimeLRU.PushFront(entry)
mimeCache[ext] = entry
// 未知扩展名回退到默认值
if mimeType == "" {
defaultMutex.RLock()
mimeType = defaultMIME
defaultMutex.RUnlock()
}
// 插入新条目
entry := &mimeCacheEntry{ext: ext, mimeType: mimeType}
entry.element = mimeLRU.PushFront(entry)
mimeCache[ext] = entry
return mimeType
}

View File

@ -78,6 +78,9 @@ type HealthChecker struct {
//
// 返回的 HealthChecker 尚未启动;调用 Start() 开始健康检查。
func NewHealthChecker(targets []*loadbalance.Target, cfg *config.HealthCheckConfig) *HealthChecker {
if cfg == nil {
cfg = &config.HealthCheckConfig{}
}
interval := cfg.Interval
if interval <= 0 {
interval = 10 * time.Second

View File

@ -721,6 +721,7 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
if bytes.ContainsAny(path, "@\r\n") {
logging.Warn().Msgf("rejected suspicious proxy path containing dangerous chars: %s", path)
upstreamStatus = 502
loadbalance.DecrementConnections(target)
utils.SendErrorWithDetail(ctx, utils.ErrBadGateway, "invalid proxy path")
return
}

View File

@ -752,7 +752,7 @@ func TestWebSocket_UpgradeRejected(t *testing.T) {
// 发送一个请求让服务端触发
_, _ = conn.Write([]byte("GET /ws HTTP/1.1\r\nHost: localhost\r\n\r\n"))
resp, err := readWebSocketUpgradeResponse(conn, 1*time.Second)
resp, _, err := readWebSocketUpgradeResponse(conn, 1*time.Second)
require.NoError(t, err)
assert.Equal(t, 400, resp.StatusCode)
}
@ -1274,7 +1274,7 @@ func TestReadWebSocketUpgradeResponse_ReadError(t *testing.T) {
conn1, conn2 := net.Pipe()
_ = conn2.Close()
_, err := readWebSocketUpgradeResponse(conn1, 100*time.Millisecond)
_, _, err := readWebSocketUpgradeResponse(conn1, 100*time.Millisecond)
assert.Error(t, err)
_ = conn1.Close()
}

View File

@ -332,20 +332,20 @@ func buildWebSocketUpgradeRequest(ctx *fasthttp.RequestCtx, targetHost string, h
// 返回值:
// - *http.Response: HTTP 响应对象
// - error: 读取失败时返回错误
func readWebSocketUpgradeResponse(conn net.Conn, timeout time.Duration) (*http.Response, error) {
func readWebSocketUpgradeResponse(conn net.Conn, timeout time.Duration) (*http.Response, *bufio.Reader, error) {
// 设置读取超时
if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
return nil, nil, err
}
// 使用 bufio.Reader 读取 HTTP 响应
reader := bufio.NewReader(conn)
resp, err := http.ReadResponse(reader, nil)
if err != nil {
return nil, fmt.Errorf("failed to read upgrade response: %w", err)
return nil, nil, fmt.Errorf("failed to read upgrade response: %w", err)
}
return resp, nil
return resp, reader, nil
}
// WebSocket 处理 WebSocket 代理请求。
@ -400,7 +400,7 @@ func WebSocket(ctx *fasthttp.RequestCtx, target *loadbalance.Target, timeout tim
}
// 步骤4: 读取升级响应
resp, err := readWebSocketUpgradeResponse(targetConn, timeout)
resp, bufferedReader, err := readWebSocketUpgradeResponse(targetConn, timeout)
if err != nil {
return fmt.Errorf("failed to read upgrade response: %w", err)
}
@ -422,6 +422,16 @@ func WebSocket(ctx *fasthttp.RequestCtx, target *loadbalance.Target, timeout tim
// 注意: WebSocket 升级成功后resp.Body 不需要显式关闭
// 因为底层连接已被 bridge 用于双向数据传输
// 如果 bufferedReader 已经缓冲了 WebSocket frame 数据,
// 需要包装连接使后续读取先消耗缓冲区
if bufferedReader != nil && bufferedReader.Buffered() > 0 {
targetConn = &bufferedConn{
Conn: targetConn,
reader: bufferedReader,
}
bridge.targetConn = targetConn
}
// 步骤7: 启动桥接(阻塞直到连接关闭)
return bridge.Bridge()
}
@ -469,3 +479,19 @@ func writeUpgradeResponse(conn net.Conn, resp *http.Response) error {
return nil
}
// bufferedConn 包装 net.Conn优先从 bufio.Reader 的缓冲区读取数据。
//
// 用于 WebSocket 升级响应后,消耗 bufio.Reader 可能已缓冲的 frame 数据。
type bufferedConn struct {
net.Conn
reader *bufio.Reader
}
// Read 优先从内部 bufio.Reader 读取,若缓冲区为空则回退到原始连接。
func (bc *bufferedConn) Read(p []byte) (int, error) {
if bc.reader.Buffered() > 0 {
return bc.reader.Read(p)
}
return bc.Conn.Read(p)
}

View File

@ -557,7 +557,7 @@ func TestReadWebSocketUpgradeResponse(t *testing.T) {
}()
// 读取响应
resp, err := readWebSocketUpgradeResponse(conn1, 1*time.Second)
resp, _, err := readWebSocketUpgradeResponse(conn1, 1*time.Second)
if err != nil {
t.Fatalf("readWebSocketUpgradeResponse failed: %v", err)
}
@ -579,7 +579,7 @@ func TestReadWebSocketUpgradeResponse_Timeout(t *testing.T) {
defer func() { _ = conn2.Close() }()
// 使用很短的超时
_, err := readWebSocketUpgradeResponse(conn1, 10*time.Millisecond)
_, _, err := readWebSocketUpgradeResponse(conn1, 10*time.Millisecond)
if err == nil {
t.Error("Expected timeout error, got nil")
}

View File

@ -131,7 +131,7 @@ type DNSCacheEntry struct {
// 返回值:
// - Resolver: DNS 解析器接口实现,禁用时返回 noopResolver
func New(cfg *config.ResolverConfig) Resolver {
if !cfg.Enabled {
if cfg == nil || !cfg.Enabled {
return &noopResolver{}
}
@ -357,6 +357,13 @@ func (r *DNSResolver) Start() error {
r.started.Store(true)
// 重建 stopCh 以支持 Start-Stop-Start 周期
select {
case <-r.stopCh:
r.stopCh = make(chan struct{})
default:
}
// 启动后台刷新协程
go r.refreshLoop()

View File

@ -221,11 +221,13 @@ func (s *Server) GracefulStop(timeout time.Duration) error {
// getProxyCacheStats 收集所有代理缓存的统计信息。
func (s *Server) getProxyCacheStats() ProxyCacheStats {
var total ProxyCacheStats
s.proxiesMu.RLock()
for _, p := range s.proxies {
if stats := p.GetCacheStats(); stats != nil {
total.Entries += stats.Entries
total.Pending += stats.Pending
}
}
s.proxiesMu.RUnlock()
return total
}

View File

@ -179,9 +179,14 @@ func (p *GoroutinePool) Submit(ctx *fasthttp.RequestCtx, task Task) error {
// 队列满,需要启动新 worker 或直接执行
if atomic.LoadInt32(&p.workers) < p.maxWorkers {
p.startWorker()
// 重新尝试入队
p.taskQueue <- task
return nil
// 非阻塞尝试入队,避免新 worker 尚未就绪时死锁
select {
case p.taskQueue <- task:
return nil
default:
task(ctx)
return nil
}
}
// 达到最大 worker直接执行fallback

View File

@ -124,12 +124,14 @@ func (h *PurgeHandler) purgeByPath(path string, method string) int {
hashKey := cache.HashPathWithMethod(path, method)
deleted := 0
h.server.proxiesMu.RLock()
for _, p := range h.server.proxies {
if pcache := p.GetCache(); pcache != nil {
_ = pcache.Delete(hashKey)
deleted++
}
}
h.server.proxiesMu.RUnlock()
return deleted
}
@ -142,11 +144,13 @@ func (h *PurgeHandler) purgeByPattern(pattern string, method string) int {
deleted := 0
h.server.proxiesMu.RLock()
for _, p := range h.server.proxies {
if pcache := p.GetCache(); pcache != nil {
deleted += pcache.DeleteByPatternWithMethod(pattern, method)
}
}
h.server.proxiesMu.RUnlock()
return deleted
}

View File

@ -76,7 +76,7 @@ type Server struct {
fastServer *fasthttp.Server
fastServers []*fasthttp.Server // 多监听器模式使用
proxies []*proxy.Proxy
proxiesMu sync.Mutex
proxiesMu sync.RWMutex
listeners []net.Listener
healthCheckers []*proxy.HealthChecker
locationEngine *matcher.LocationEngine
@ -268,6 +268,9 @@ func (s *Server) GetHandler() fasthttp.RequestHandler {
// - 调用前需确保配置已正确加载
// - Goroutine池和文件缓存根据配置自动启用
func (s *Server) Start() error {
if s.config == nil {
return fmt.Errorf("server config is nil")
}
logging.Init(s.config.Logging.Error.Level, s.config.Logging.Format)
// 记录启动时间

View File

@ -397,27 +397,32 @@ func (m *OCSPManager) singleOCSPAttempt(url string, req []byte) ([]byte, error)
func (m *OCSPManager) GetOCSPResponse(serial string) []byte {
m.mu.RLock()
resp, ok := m.responses[serial]
m.mu.RUnlock()
if !ok || resp == nil {
m.mu.RUnlock()
return nil
}
status := resp.status
nextUpdate := resp.nextUpdate
response := resp.response
m.mu.RUnlock()
// 检查响应是否仍可用
switch resp.status {
switch status {
case statusValid:
// 检查过期
if time.Now().After(resp.nextUpdate) {
if time.Now().After(nextUpdate) {
// 标记为过期但仍返回(优雅降级)
m.mu.Lock()
resp.status = statusStale
if r, exists := m.responses[serial]; exists && r == resp {
r.status = statusStale
}
m.mu.Unlock()
}
return resp.response
return response
case statusStale:
// 返回过期响应用于优雅降级
// 这允许即使 OCSP 刷新失败也能继续 TLS 握手
return resp.response
return response
case statusFailed:
// 无响应可用
return nil

View File

@ -117,15 +117,21 @@ func NewSessionTicketManager(cfg config.SessionTicketsConfig) (*SessionTicketMan
// 必须在调用 GetKeys 之前启动。
func (m *SessionTicketManager) Start() {
m.mu.Lock()
defer m.mu.Unlock()
if m.started {
m.mu.Unlock()
return
}
m.started = true
m.mu.Unlock()
// 重建 stopCh 以支持 Start-Stop-Start 周期
select {
case <-m.stopCh:
m.stopCh = make(chan struct{})
default:
}
// 启动轮换定时器
m.scheduleRotation()
m.scheduleRotationLocked()
}
// Stop 停止密钥轮换定时器。
@ -138,12 +144,17 @@ func (m *SessionTicketManager) Stop() {
return
}
m.started = false
m.mu.Unlock()
close(m.stopCh)
if m.rotateTimer != nil {
m.rotateTimer.Stop()
m.rotateTimer = nil
}
m.mu.Unlock()
select {
case <-m.stopCh:
// 已经关闭
default:
close(m.stopCh)
}
}
@ -230,6 +241,12 @@ func (m *SessionTicketManager) ApplyToTLSConfig(tlsCfg *tls.Config) {
//
// 使用定时器在指定间隔后执行密钥轮换。
func (m *SessionTicketManager) scheduleRotation() {
m.mu.Lock()
defer m.mu.Unlock()
m.scheduleRotationLocked()
}
func (m *SessionTicketManager) scheduleRotationLocked() {
if !m.started {
return
}

View File

@ -74,6 +74,9 @@ type TLSManager struct {
// certificates 解析后的证书映射,用于 OCSP
certificates map[string]*x509.Certificate
// defaultCert 默认证书的解析结果,避免每次握手重新解析
defaultCert *x509.Certificate
// issuers 颁发者证书映射,用于 OCSP
issuers map[string]*x509.Certificate
@ -163,27 +166,30 @@ func NewTLSManager(cfg *config.SSLConfig) (*TLSManager, error) {
// 解析证书用于 OCSP
if len(cert.Certificate) > 0 {
parsedCert, err := x509.ParseCertificate(cert.Certificate[0])
if err == nil && len(parsedCert.OCSPServer) > 0 {
// 存储证书用于 OCSP 查询
serial := parsedCert.SerialNumber.String()
manager.certificates[serial] = parsedCert
if err == nil {
manager.defaultCert = parsedCert
if len(parsedCert.OCSPServer) > 0 {
// 存储证书用于 OCSP 查询
serial := parsedCert.SerialNumber.String()
manager.certificates[serial] = parsedCert
// 尝试从证书链解析颁发者证书
if len(cert.Certificate) > 1 {
issuerCert, err := x509.ParseCertificate(cert.Certificate[1])
if err == nil {
manager.issuers[serial] = issuerCert
if err := ocspMgr.RegisterCertificate(parsedCert, issuerCert); err != nil {
logging.Warn().Err(err).Msg("OCSP Stapling 注册失败")
// 尝试从证书链解析颁发者证书
if len(cert.Certificate) > 1 {
issuerCert, err := x509.ParseCertificate(cert.Certificate[1])
if err == nil {
manager.issuers[serial] = issuerCert
if err := ocspMgr.RegisterCertificate(parsedCert, issuerCert); err != nil {
logging.Warn().Err(err).Msg("OCSP Stapling 注册失败")
}
}
}
}
// 设置 GetConfigForClient 回调用于 OCSP Stapling
tlsCfg.GetConfigForClient = manager.getConfigForClientWithOCSP
}
}
// 设置 GetConfigForClient 回调用于 OCSP Stapling
tlsCfg.GetConfigForClient = manager.getConfigForClientWithOCSP
ocspMgr.Start()
}
@ -262,10 +268,12 @@ func (m *TLSManager) getConfigForClientWithOCSP(hello *tls.ClientHelloInfo) (*tl
// 将 OCSP 响应附加到证书
cert := &cfgCopy.Certificates[0]
if len(cert.Certificate) > 0 {
// 解析叶子证书以获取序列号
leafCert, err := x509.ParseCertificate(cert.Certificate[0])
if err == nil {
serial := leafCert.SerialNumber.String()
// 使用已缓存的证书解析结果获取序列号
m.mu.RLock()
parsedCert := m.defaultCert
m.mu.RUnlock()
if parsedCert != nil {
serial := parsedCert.SerialNumber.String()
ocspResp := m.ocspManager.GetOCSPResponse(serial)
if ocspResp != nil {
// 将 OCSP 响应附加到证书

View File

@ -982,7 +982,13 @@ func (s *udpServer) serve() {
continue
}
}
continue
// 非超时错误(如连接关闭),检查 stopCh 后退出
select {
case <-s.stopCh:
return
default:
continue
}
}
// 获取或创建会话

View File

@ -255,7 +255,7 @@ func init() {
Name: VarTimeLocal,
Description: "本地时间格式02/Jan/2024:15:04:05 +0800",
Getter: func(_ *fasthttp.RequestCtx) string {
return time.Now().Format("02/Jan/2006:15:04:05 +0800")
return time.Now().Format("02/Jan/2006:15:04:05 -0700")
},
})
@ -273,13 +273,15 @@ func init() {
Name: VarRequestID,
Description: "唯一请求标识符",
Getter: func(ctx *fasthttp.RequestCtx) string {
// 先从 UserValue 获取,如果没有则生成
// 先从 UserValue 获取,如果没有则生成并缓存
if v := ctx.UserValue(VarRequestID); v != nil {
if s, ok := v.(string); ok {
return s
}
}
return uuid.New().String()
id := uuid.New().String()
ctx.SetUserValue(VarRequestID, id)
return id
},
})
}

View File

@ -117,7 +117,12 @@ func NewContext(ctx *fasthttp.RequestCtx) *Context {
vc, ok := pool.Get().(*Context)
if !ok {
// 池中类型不正确时返回新 Context
return &Context{ctx: ctx}
return &Context{
ctx: ctx,
store: make(map[string]string),
cache: make(map[string]string),
bytesCache: make(map[string][]byte),
}
}
vc.ctx = ctx
vc.status = 0