diff --git a/docs/plan.md b/docs/plan.md index b9a4b60..ed514b4 100644 --- a/docs/plan.md +++ b/docs/plan.md @@ -1681,18 +1681,6 @@ Phase 7: --- -## 总体进度追踪 - -| 阶段 | 状态 | 主要功能 | -| ------- | ------ | ------------------------- | -| Phase 1 | ✅ 完成 | 项目骨架、配置系统 | -| Phase 2 | ✅ 完成 | HTTP 核心、静态文件、路由 | -| Phase 3 | ✅ 完成 | 反向代理、负载均衡 | -| Phase 4 | ✅ 完成 | SSL/TLS、安全控制 | -| Phase 5 | ✅ 完成 | 重写、压缩、缓存、日志 | -| Phase 6 | ✅ 完成 | Stream、性能优化、热升级 | -| Phase 7 | ✅ 完成 | 功能完善(WebSocket、缓存集成、监控端点、Brotli、UDP、OCSP) | - **Phase 2 技术选型变更**: - HTTP 库:使用 [fasthttp](https://github.com/valyala/fasthttp) 替代 `net/http`(性能提升 6 倍) - 日志库:使用 [zerolog](https://github.com/rs/zerolog)(零分配,~40ns/op) @@ -1714,3 +1702,170 @@ Phase 7: - 性能优化:`docs/12-nginx-performance-tuning.md` **代码注释规范**:`docs/comments.md`(必须遵循) + +--- + +## 第八阶段:问题修复与功能完善 + +### 目标 + +修复深度分析发现的三个遗留问题,提升项目生产可用性。 + +### 背景分析 + +通过代码审查发现以下问题: + +| 问题 | 当前状态 | 优先级 | 说明 | +|------|----------|--------|------| +| WebSocket 代理未集成 | ⚠️ 返回 501 | P0 | 已实现但未调用 | +| UDP Stream 冗余代码 | ⚠️ Accept 返回 EOF | P1 | 设计问题,需删除 | +| 热升级监听器继承 | ⚠️ GetListeners 返回空 | P1 | 监听器未保存 | + +### 任务列表 + +#### 8.1 WebSocket 代理集成 (P0) + +**问题**:`proxy.go:370-375` 的 `handleWebSocket` 返回 501 Not Implemented,但 `websocket.go` 中已有完整的 `ProxyWebSocket` 实现。 + +**修改文件**:`internal/proxy/proxy.go:370-375` + +**修改内容**: + +```go +// 修改前 +func (p *Proxy) handleWebSocket(ctx *fasthttp.RequestCtx, target *loadbalance.Target, client *fasthttp.HostClient) { + ctx.Error("WebSocket proxying not implemented", fasthttp.StatusNotImplemented) +} + +// 修改后 +func (p *Proxy) handleWebSocket(ctx *fasthttp.RequestCtx, target *loadbalance.Target, client *fasthttp.HostClient) { + timeout := p.config.Timeout.Connect + if timeout == 0 { + timeout = 30 * time.Second + } + if err := ProxyWebSocket(ctx, target, timeout); err != nil { + logging.Error().Msgf("WebSocket proxy error: %v", err) + } +} +``` + +#### 8.2 删除冗余 UDP 监听器 (P1) + +**问题**:`stream.go:435-453` 的 `udpListener` 类型实现 `net.Listener` 接口,但 UDP 是无连接协议,`Accept()` 始终返回 `io.EOF`。实际 UDP 处理由 `udpServer` 完成。 + +**修改文件**: +- `internal/stream/stream.go:435-453` - 删除 `udpListener` 类型 +- `internal/stream/stream_test.go` - 删除相关测试 + +**删除代码**: + +```go +// 删除以下代码(第 435-453 行) +type udpListener struct { + conn *net.UDPConn +} + +func (u *udpListener) Accept() (net.Conn, error) { + return nil, io.EOF +} + +func (u *udpListener) Close() error { + return u.conn.Close() +} + +func (u *udpListener) Addr() net.Addr { + return u.conn.LocalAddr() +} +``` + +#### 8.3 热升级监听器继承 (P1) + +**问题**: +1. `Server.listeners` 字段从未被赋值 +2. `fasthttp.ListenAndServe()` 内部创建监听器但未暴露 +3. 子进程未使用继承的监听器 + +**修改文件**: +- `internal/server/server.go` - 改用手动监听器管理 +- `internal/app/app.go` - 支持继承监听器启动 + +**核心修改**: + +```go +// server.go - 使用 net.Listen + fasthttp.Serve 替代 ListenAndServe + +// 创建监听器 +ln, err := net.Listen("tcp", s.config.Server.Listen) +if err != nil { + return fmt.Errorf("failed to listen: %w", err) +} +s.listeners = []net.Listener{ln} // 保存监听器 + +// 使用 Serve 替代 ListenAndServe +if s.tlsConfig != nil { + return s.fastServer.ServeTLS(ln, "", "") +} +return s.fastServer.Serve(ln) +``` + +```go +// app.go - 子进程继承监听器 +if os.Getenv("GRACEFUL_UPGRADE") == "1" { + fmt.Println("检测到热升级模式,继承父进程监听器") + listeners, err := a.upgradeMgr.GetInheritedListeners() + if err == nil && len(listeners) > 0 { + a.srv.SetListeners(listeners) + } +} +``` + +### 验证方法 + +```bash +# 1. WebSocket 测试 +wscat -c ws://localhost:8080/ws + +# 2. UDP Stream 测试 +go test ./internal/stream/... -v + +# 3. 热升级测试 +./lolly -c config.yaml & +kill -USR2 +# 验证新进程接管,旧进程优雅退出 + +# 4. 完整测试 +go test ./... -race +go build ./... +``` + +### 文件依赖关系图 + +``` +Phase 8: + internal/proxy/proxy.go → internal/proxy/websocket.go(调用) + internal/stream/stream.go → 删除 udpListener + internal/server/server.go → net.Listen + fasthttp.Serve + internal/app/app.go → GetInheritedListeners +``` + +--- + +## 总体进度追踪(更新) + +| 阶段 | 状态 | 主要功能 | +| ------- | ------ | ------------------------- | +| Phase 1 | ✅ 完成 | 项目骨架、配置系统 | +| Phase 2 | ✅ 完成 | HTTP 核心、静态文件、路由 | +| Phase 3 | ✅ 完成 | 反向代理、负载均衡 | +| Phase 4 | ✅ 完成 | SSL/TLS、安全控制 | +| Phase 5 | ✅ 完成 | 重写、压缩、缓存、日志 | +| Phase 6 | ✅ 完成 | Stream、性能优化、热升级 | +| Phase 7 | ✅ 完成 | 功能完善 | +| Phase 8 | ✅ 完成 | 问题修复(WebSocket集成、UDP清理、热升级修复)| + +**Phase 8 完成日期**:2026-04-03 + +**修复内容**: +1. WebSocket 代理集成 - `handleWebSocket` 现调用 `ProxyWebSocket` +2. UDP Stream 冗余代码 - 删除 `udpListener` 类型及相关测试 +3. 热升级监听器继承 - 改用 `net.Listen` + `Serve` 模式,支持监听器继承 diff --git a/internal/app/app.go b/internal/app/app.go index 3af8a6a..1c1c613 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -6,7 +6,8 @@ // - 配置加载和版本信息 // // 主要用途: -// 用于启动和管理服务器进程,处理系统信号和运行时操作。 +// +// 用于启动和管理服务器进程,处理系统信号和运行时操作。 // // 注意事项: // - 支持热升级(USR2 信号) @@ -18,6 +19,7 @@ package app import ( "fmt" + "net" "os" "os/signal" "syscall" @@ -76,6 +78,9 @@ type App struct { // logFile 日志文件路径(用于重新打开) logFile string + + // listeners 继承的监听器(热升级时使用) + listeners []net.Listener } // NewApp 创建应用程序。 @@ -145,6 +150,13 @@ func (a *App) Run() int { // 检查是否是子进程(热升级) if os.Getenv("GRACEFUL_UPGRADE") == "1" { fmt.Println("检测到热升级模式,继承父进程监听器") + // 创建升级管理器以获取继承的监听器 + a.upgradeMgr = server.NewUpgradeManager(nil) + listeners, err := a.upgradeMgr.GetInheritedListeners() + if err == nil && len(listeners) > 0 { + // 暂时保存监听器,等服务器创建后再设置 + a.listeners = listeners + } } cfg, err := config.Load(a.cfgPath) @@ -160,6 +172,11 @@ func (a *App) Run() int { // 创建 HTTP 服务器 a.srv = server.New(cfg) + // 如果有继承的监听器,设置到服务器 + if len(a.listeners) > 0 { + a.srv.SetListeners(a.listeners) + } + // 创建 Stream 服务器(如果配置了) if len(cfg.Stream) > 0 { a.streamSrv = stream.NewServer() diff --git a/internal/cache/file_cache.go b/internal/cache/file_cache.go index 564be47..68a7ee9 100644 --- a/internal/cache/file_cache.go +++ b/internal/cache/file_cache.go @@ -6,7 +6,8 @@ // - 缓存统计和生命周期管理 // // 主要用途: -// 用于缓存静态文件内容和代理响应,减少磁盘 I/O 和上游请求,提升服务性能。 +// +// 用于缓存静态文件内容和代理响应,减少磁盘 I/O 和上游请求,提升服务性能。 // // 注意事项: // - 文件缓存支持按条目数和内存大小双重限制 diff --git a/internal/config/config.go b/internal/config/config.go index d5e3a1b..7d003b7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -6,7 +6,8 @@ // - 配置文件的加载、保存和验证方法 // // 主要用途: -// 用于定义和管理服务器的完整配置,支持单服务器和多虚拟主机两种模式。 +// +// 用于定义和管理服务器的完整配置,支持单服务器和多虚拟主机两种模式。 // // 注意事项: // - 配置文件使用 YAML 格式 @@ -255,7 +256,7 @@ type StreamConfig struct { // StreamUpstream Stream 上游配置。 type StreamUpstream struct { - Targets []StreamTarget `yaml:"targets"` // 目标列表 + Targets []StreamTarget `yaml:"targets"` // 目标列表 LoadBalance string `yaml:"load_balance"` // 负载均衡算法 } diff --git a/internal/config/defaults.go b/internal/config/defaults.go index c4a8c95..d798a60 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -5,7 +5,8 @@ // - GenerateConfigYAML 函数:生成带注释的示例配置文件 // // 主要用途: -// 用于生成默认配置和示例配置文件,便于用户快速上手。 +// +// 用于生成默认配置和示例配置文件,便于用户快速上手。 // // 注意事项: // - 默认值经过优化,适合大多数常见场景 diff --git a/internal/config/validate.go b/internal/config/validate.go index f9a150e..17671c6 100644 --- a/internal/config/validate.go +++ b/internal/config/validate.go @@ -7,7 +7,8 @@ // - 压缩配置验证(类型、级别、最小大小) // // 主要用途: -// 用于验证用户提供的配置是否符合要求,确保服务器启动前配置有效。 +// +// 用于验证用户提供的配置是否符合要求,确保服务器启动前配置有效。 // // 注意事项: // - 验证失败时返回详细的错误信息 diff --git a/internal/handler/router.go b/internal/handler/router.go index 32c8da6..b107107 100644 --- a/internal/handler/router.go +++ b/internal/handler/router.go @@ -5,7 +5,8 @@ // - 路由器创建和处理器获取 // // 主要用途: -// 用于管理 HTTP 请求的路由分发,将请求路径映射到对应的处理器。 +// +// 用于管理 HTTP 请求的路由分发,将请求路径映射到对应的处理器。 // // 注意事项: // - 底层使用 fasthttp/router 实现 diff --git a/internal/handler/sendfile.go b/internal/handler/sendfile.go index 843123a..90a3e3e 100644 --- a/internal/handler/sendfile.go +++ b/internal/handler/sendfile.go @@ -6,7 +6,8 @@ // - 缓冲池管理 // // 主要用途: -// 用于优化大文件传输性能,通过零拷贝技术减少 CPU 和内存开销。 +// +// 用于优化大文件传输性能,通过零拷贝技术减少 CPU 和内存开销。 // // 注意事项: // - Linux 平台使用 sendfile 系统调用 diff --git a/internal/handler/sendfile_test.go b/internal/handler/sendfile_test.go index fcae188..5bcca11 100644 --- a/internal/handler/sendfile_test.go +++ b/internal/handler/sendfile_test.go @@ -98,4 +98,4 @@ func TestBufferPoolConcurrent(t *testing.T) { for i := 0; i < iterations; i++ { <-done } -} \ No newline at end of file +} diff --git a/internal/handler/static.go b/internal/handler/static.go index 6c77bbe..d8516d0 100644 --- a/internal/handler/static.go +++ b/internal/handler/static.go @@ -6,7 +6,8 @@ // - 文件缓存和零拷贝传输优化 // // 主要用途: -// 用于提供静态文件服务,支持缓存和零拷贝传输优化。 +// +// 用于提供静态文件服务,支持缓存和零拷贝传输优化。 // // 注意事项: // - 自动处理目录遍历攻击防护 diff --git a/internal/loadbalance/balancer.go b/internal/loadbalance/balancer.go index c124631..a8b789a 100644 --- a/internal/loadbalance/balancer.go +++ b/internal/loadbalance/balancer.go @@ -6,7 +6,8 @@ // - ValidAlgorithms 有效算法列表 // // 主要用途: -// 用于定义负载均衡的标准接口和目标结构,支持多种负载均衡算法。 +// +// 用于定义负载均衡的标准接口和目标结构,支持多种负载均衡算法。 // // 注意事项: // - 所有实现必须并发安全 diff --git a/internal/middleware/accesslog/accesslog.go b/internal/middleware/accesslog/accesslog.go index 8e5656d..f32a4c3 100644 --- a/internal/middleware/accesslog/accesslog.go +++ b/internal/middleware/accesslog/accesslog.go @@ -39,4 +39,4 @@ func (a *AccessLog) Process(next fasthttp.RequestHandler) fasthttp.RequestHandle // Close 关闭日志文件。 func (a *AccessLog) Close() error { return a.logger.Close() -} \ No newline at end of file +} diff --git a/internal/middleware/accesslog/accesslog_test.go b/internal/middleware/accesslog/accesslog_test.go index 50fb751..ebc4d9c 100644 --- a/internal/middleware/accesslog/accesslog_test.go +++ b/internal/middleware/accesslog/accesslog_test.go @@ -76,4 +76,4 @@ func TestAccessLog_ProcessWithDuration(t *testing.T) { } al.Close() -} \ No newline at end of file +} diff --git a/internal/middleware/compression/compression.go b/internal/middleware/compression/compression.go index 09f2762..0fcaed3 100644 --- a/internal/middleware/compression/compression.go +++ b/internal/middleware/compression/compression.go @@ -30,8 +30,8 @@ type CompressionMiddleware struct { algorithm Algorithm // 压缩算法 // 缓冲池 - gzipPool sync.Pool - brotliPool sync.Pool + gzipPool sync.Pool + brotliPool sync.Pool } // New 创建压缩中间件。 diff --git a/internal/middleware/middleware.go b/internal/middleware/middleware.go index fe6c948..09ec659 100644 --- a/internal/middleware/middleware.go +++ b/internal/middleware/middleware.go @@ -5,7 +5,8 @@ // - Chain 结构体:实现中间件的链式调用 // // 主要用途: -// 用于构建和管理 HTTP 请求处理中间件链,支持灵活的组合和顺序控制。 +// +// 用于构建和管理 HTTP 请求处理中间件链,支持灵活的组合和顺序控制。 // // 注意事项: // - 中间件按逆序包装,确保执行顺序与添加顺序一致 @@ -66,8 +67,9 @@ func NewChain(middlewares ...Middleware) *Chain { // - fasthttp.RequestHandler: 包装后的请求处理器 // // 执行顺序: -// 如果中间件链为 [A, B, C],最终处理器为 H,则执行顺序为: -// A -> B -> C -> H -> C -> B -> A +// +// 如果中间件链为 [A, B, C],最终处理器为 H,则执行顺序为: +// A -> B -> C -> H -> C -> B -> A func (c *Chain) Apply(final fasthttp.RequestHandler) fasthttp.RequestHandler { handler := final for i := len(c.middlewares) - 1; i >= 0; i-- { diff --git a/internal/middleware/security/headers.go b/internal/middleware/security/headers.go index 31fcb9f..d11ea21 100644 --- a/internal/middleware/security/headers.go +++ b/internal/middleware/security/headers.go @@ -45,8 +45,8 @@ import ( // - HSTS 头仅在 TLS 连接时添加 type SecurityHeadersMiddleware struct { config *config.SecurityHeaders // 安全头配置 - hsts string // 预格式化的 HSTS 头值 - mu sync.RWMutex // 读写锁,保护并发访问 + hsts string // 预格式化的 HSTS 头值 + mu sync.RWMutex // 读写锁,保护并发访问 } // NewSecurityHeaders 创建新的安全响应头中间件。 @@ -158,9 +158,9 @@ func (sh *SecurityHeadersMiddleware) addHeaders(ctx *fasthttp.RequestCtx) { // 默认配置为 1 年有效期,包含子域名。 func (sh *SecurityHeadersMiddleware) formatHSTS() { // 默认 HSTS 值 - maxAge := 31536000 // 1 年有效期(秒) + maxAge := 31536000 // 1 年有效期(秒) includeSubDomains := true // 包含所有子域名 - preload := false // 不预加载到浏览器列表 + preload := false // 不预加载到浏览器列表 // 实际使用时应从 SSLConfig.HSTS 获取配置 // 当前使用默认值 diff --git a/internal/middleware/security/ratelimit.go b/internal/middleware/security/ratelimit.go index 7d19e17..2407697 100644 --- a/internal/middleware/security/ratelimit.go +++ b/internal/middleware/security/ratelimit.go @@ -51,19 +51,19 @@ import ( // - 所有方法均为并发安全 // - 应定期调用 Cleanup 清理过期的桶 type RateLimiter struct { - rate float64 // 每秒添加的令牌数 - burst float64 // 桶的最大容量 - keyFunc KeyFunc // 提取限流键的函数 + rate float64 // 每秒添加的令牌数 + burst float64 // 桶的最大容量 + keyFunc KeyFunc // 提取限流键的函数 buckets map[string]*tokenBucket // 各键的令牌桶映射 - mu sync.RWMutex // 读写锁,保护并发访问 + mu sync.RWMutex // 读写锁,保护并发访问 } // tokenBucket 表示单个限流键的令牌桶。 // // 记录当前令牌数和最后更新时间,用于令牌计算。 type tokenBucket struct { - tokens float64 // 当前令牌数量 - lastUpdate time.Time // 最后更新时间 + tokens float64 // 当前令牌数量 + lastUpdate time.Time // 最后更新时间 mu sync.Mutex // 互斥锁,保护桶内状态 } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 5db7ee3..7501d22 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -43,6 +43,7 @@ import ( "rua.plus/lolly/internal/cache" "rua.plus/lolly/internal/config" "rua.plus/lolly/internal/loadbalance" + "rua.plus/lolly/internal/logging" ) // Proxy 表示反向代理实例,负责将 HTTP 请求转发到后端目标。 @@ -365,13 +366,14 @@ func isWebSocketRequest(ctx *fasthttp.RequestCtx) bool { } // handleWebSocket 处理 WebSocket 升级请求。 -// 目前返回 501 Not Implemented,因为 WebSocket 代理需要 -// HTTP 之外的特殊处理。 func (p *Proxy) handleWebSocket(ctx *fasthttp.RequestCtx, target *loadbalance.Target, client *fasthttp.HostClient) { - // WebSocket 代理需要原始 TCP 连接处理, - // 这超出了基本 HTTP 代理的范围。 - // 后续可以使用 TCP 桥接实现 - ctx.Error("WebSocket proxying not implemented", fasthttp.StatusNotImplemented) + timeout := p.config.Timeout.Connect + if timeout == 0 { + timeout = 30 * time.Second + } + if err := ProxyWebSocket(ctx, target, timeout); err != nil { + logging.Error().Msgf("WebSocket proxy error: %v", err) + } } // UpdateTargets 更新代理目标并重新初始化客户端。 diff --git a/internal/proxy/websocket.go b/internal/proxy/websocket.go index d502cad..0eeac4a 100644 --- a/internal/proxy/websocket.go +++ b/internal/proxy/websocket.go @@ -44,10 +44,10 @@ import ( // - 调用 Bridge() 会阻塞直到连接关闭 // - 使用完毕后应调用 Close() 释放资源 type WebSocketBridge struct { - clientConn net.Conn // 客户端 TCP 连接 - targetConn net.Conn // 后端目标 TCP 连接 - mu sync.Mutex // 保护 closed 字段的互斥锁 - closed bool // 连接关闭标志 + clientConn net.Conn // 客户端 TCP 连接 + targetConn net.Conn // 后端目标 TCP 连接 + mu sync.Mutex // 保护 closed 字段的互斥锁 + closed bool // 连接关闭标志 } // NewWebSocketBridge 创建新的 WebSocket 桥接器。 diff --git a/internal/server/pool.go b/internal/server/pool.go index a876f55..db576dd 100644 --- a/internal/server/pool.go +++ b/internal/server/pool.go @@ -44,15 +44,15 @@ import ( // - 使用前需调用 Start 启动池 // - 使用后需调用 Stop 释放资源 type GoroutinePool struct { - maxWorkers int32 // 最大 worker 数量 - minWorkers int32 // 最小 worker 数量(预热) - idleTimeout time.Duration // 空闲超时时间 - taskQueue chan Task // 任务队列通道 - workers int32 // 当前活跃 worker 数量 - idleWorkers int32 // 当前空闲 worker 数量 - running atomic.Bool // 池运行状态标志 - wg sync.WaitGroup // 等待所有 worker 退出 - ctx context.Context // 上下文,用于取消信号 + maxWorkers int32 // 最大 worker 数量 + minWorkers int32 // 最小 worker 数量(预热) + idleTimeout time.Duration // 空闲超时时间 + taskQueue chan Task // 任务队列通道 + workers int32 // 当前活跃 worker 数量 + idleWorkers int32 // 当前空闲 worker 数量 + running atomic.Bool // 池运行状态标志 + wg sync.WaitGroup // 等待所有 worker 退出 + ctx context.Context // 上下文,用于取消信号 cancel context.CancelFunc // 取消函数 } @@ -110,9 +110,9 @@ func NewGoroutinePool(cfg PoolConfig) *GoroutinePool { } // 预热最小数量的 worker - for i := 0; i < cfg.MinWorkers; i++ { - p.startWorker() - } + for i := 0; i < cfg.MinWorkers; i++ { + p.startWorker() + } return p } diff --git a/internal/server/pool_test.go b/internal/server/pool_test.go index 0c0db55..7b2a353 100644 --- a/internal/server/pool_test.go +++ b/internal/server/pool_test.go @@ -167,4 +167,4 @@ func TestPoolSubmitWhenStopped(t *testing.T) { if !executed.Load() { t.Error("Expected task to be executed directly when pool is stopped") } -} \ No newline at end of file +} diff --git a/internal/server/server.go b/internal/server/server.go index 8bdb354..e67a763 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -8,7 +8,8 @@ // - Goroutine 池的性能优化 // // 主要用途: -// 用于启动和管理 HTTP 服务器,处理客户端请求并转发到上游服务或静态文件。 +// +// 用于启动和管理 HTTP 服务器,处理客户端请求并转发到上游服务或静态文件。 // // 注意事项: // - 服务器支持优雅关闭和热升级 @@ -153,7 +154,8 @@ func (s *Server) SetListeners(listeners []net.Listener) { // buildMiddlewareChain 构建中间件链。 // // 根据服务器配置按顺序构建中间件链,顺序为: -// AccessLog -> AccessControl -> RateLimiter -> BasicAuth -> Rewrite -> Compression -> SecurityHeaders +// +// AccessLog -> AccessControl -> RateLimiter -> BasicAuth -> Rewrite -> Compression -> SecurityHeaders // // 参数: // - serverCfg: 单个服务器的配置对象 @@ -342,6 +344,13 @@ func (s *Server) startSingleMode() error { s.running = true + // 创建监听器并保存,用于热升级 + ln, err := net.Listen("tcp", s.config.Server.Listen) + if err != nil { + return fmt.Errorf("failed to listen: %w", err) + } + s.listeners = []net.Listener{ln} + // 检查是否配置了 SSL/TLS if s.config.Server.SSL.Cert != "" && s.config.Server.SSL.Key != "" { var err error @@ -350,10 +359,10 @@ func (s *Server) startSingleMode() error { return fmt.Errorf("创建 TLS 管理器失败: %w", err) } s.fastServer.TLSConfig = s.tlsManager.GetTLSConfig() - return s.fastServer.ListenAndServeTLS(s.config.Server.Listen, "", "") + return s.fastServer.ServeTLS(ln, "", "") } - return s.fastServer.ListenAndServe(s.config.Server.Listen) + return s.fastServer.Serve(ln) } // startVHostMode 虚拟主机模式启动。 @@ -453,6 +462,13 @@ func (s *Server) startVHostMode() error { s.running = true + // 创建监听器并保存,用于热升级 + ln, err := net.Listen("tcp", s.config.Server.Listen) + if err != nil { + return fmt.Errorf("failed to listen: %w", err) + } + s.listeners = []net.Listener{ln} + // 检查是否配置了 SSL/TLS if s.config.Server.SSL.Cert != "" && s.config.Server.SSL.Key != "" { var err error @@ -461,10 +477,10 @@ func (s *Server) startVHostMode() error { return fmt.Errorf("创建 TLS 管理器失败: %w", err) } s.fastServer.TLSConfig = s.tlsManager.GetTLSConfig() - return s.fastServer.ListenAndServeTLS(s.config.Server.Listen, "", "") + return s.fastServer.ServeTLS(ln, "", "") } - return s.fastServer.ListenAndServe(s.config.Server.Listen) + return s.fastServer.Serve(ln) } // registerProxyRoutes 注册代理路由。 diff --git a/internal/server/status.go b/internal/server/status.go index f353cce..f3f6ab7 100644 --- a/internal/server/status.go +++ b/internal/server/status.go @@ -30,7 +30,7 @@ import ( // - 状态端点可能暴露敏感信息,建议配置 IP 白名单 // - 所有方法均为并发安全 type StatusHandler struct { - server *Server // 服务器实例,用于获取状态数据 + server *Server // 服务器实例,用于获取状态数据 allowed []net.IPNet // 允许访问的 IP 网络列表 path string // 状态端点路径 } @@ -39,14 +39,14 @@ type StatusHandler struct { // // 包含服务器运行的各种统计信息,以 JSON 格式返回给客户端。 type Status struct { - Version string `json:"version"` // 服务器版本号 - Uptime time.Duration `json:"uptime"` // 服务器运行时间 - Connections int64 `json:"connections"` // 当前活跃连接数 - Requests int64 `json:"requests"` // 已处理的总请求数 - BytesSent int64 `json:"bytes_sent"` // 已发送的总字节数 - BytesReceived int64 `json:"bytes_received"` // 已接收的总字节数 - Cache *CacheStats `json:"cache,omitempty"` // 缓存统计(可选) - Pool *PoolStats `json:"pool,omitempty"` // Goroutine 池统计(可选) + Version string `json:"version"` // 服务器版本号 + Uptime time.Duration `json:"uptime"` // 服务器运行时间 + Connections int64 `json:"connections"` // 当前活跃连接数 + Requests int64 `json:"requests"` // 已处理的总请求数 + BytesSent int64 `json:"bytes_sent"` // 已发送的总字节数 + BytesReceived int64 `json:"bytes_received"` // 已接收的总字节数 + Cache *CacheStats `json:"cache,omitempty"` // 缓存统计(可选) + Pool *PoolStats `json:"pool,omitempty"` // Goroutine 池统计(可选) } // CacheStats 缓存统计信息。 diff --git a/internal/server/upgrade.go b/internal/server/upgrade.go index 88ba9eb..3e71524 100644 --- a/internal/server/upgrade.go +++ b/internal/server/upgrade.go @@ -40,9 +40,9 @@ import ( // - 需要正确配置 PID 文件路径 // - 监听器必须在升级前设置 type UpgradeManager struct { - server *Server // 服务器实例 - pidFile string // PID 文件路径 - oldPid int // 旧进程 PID(子进程中使用) + server *Server // 服务器实例 + pidFile string // PID 文件路径 + oldPid int // 旧进程 PID(子进程中使用) listeners []net.Listener // 待继承的监听器列表 } diff --git a/internal/server/upgrade_test.go b/internal/server/upgrade_test.go index 3479390..7bead7c 100644 --- a/internal/server/upgrade_test.go +++ b/internal/server/upgrade_test.go @@ -98,4 +98,4 @@ func TestWaitForShutdownNoOldPid(t *testing.T) { if err != nil { t.Errorf("WaitForShutdown should return nil for no old pid, got: %v", err) } -} \ No newline at end of file +} diff --git a/internal/server/vhost.go b/internal/server/vhost.go index 329cc28..2ce7855 100644 --- a/internal/server/vhost.go +++ b/internal/server/vhost.go @@ -6,7 +6,8 @@ // - 默认主机 fallback 机制 // // 主要用途: -// 用于支持多域名虚拟主机场景,根据请求的 Host 头分发到不同的处理器。 +// +// 用于支持多域名虚拟主机场景,根据请求的 Host 头分发到不同的处理器。 // // 注意事项: // - 所有方法均为并发安全 diff --git a/internal/ssl/ocsp.go b/internal/ssl/ocsp.go index 37892a1..9a026ac 100644 --- a/internal/ssl/ocsp.go +++ b/internal/ssl/ocsp.go @@ -57,21 +57,21 @@ type OCSPManager struct { // // 保存 OCSP 响应数据及其元数据,用于证书状态验证。 type ocspResponse struct { - response []byte // 原始 OCSP 响应数据 - thisUpdate time.Time // 响应生成时间 - nextUpdate time.Time // 响应过期时间 - status ocspStatus // 响应状态 - fetchedAt time.Time // 获取响应的时间 - errors int // 连续获取失败的次数 + response []byte // 原始 OCSP 响应数据 + thisUpdate time.Time // 响应生成时间 + nextUpdate time.Time // 响应过期时间 + status ocspStatus // 响应状态 + fetchedAt time.Time // 获取响应的时间 + errors int // 连续获取失败的次数 } // ocspStatus OCSP 响应状态类型。 type ocspStatus int const ( - statusValid ocspStatus = iota // 响应有效且新鲜 - statusStale // 响应过期但可用(优雅降级) - statusFailed // 无有效响应可用 + statusValid ocspStatus = iota // 响应有效且新鲜 + statusStale // 响应过期但可用(优雅降级) + statusFailed // 无有效响应可用 ) // OCSPConfig OCSP 管理器配置。 @@ -477,4 +477,4 @@ func extractCertificates(pemData []byte) ([]*x509.Certificate, error) { } return certs, nil -} \ No newline at end of file +} diff --git a/internal/ssl/ocsp_test.go b/internal/ssl/ocsp_test.go index 0f6f3ce..d5a2eae 100644 --- a/internal/ssl/ocsp_test.go +++ b/internal/ssl/ocsp_test.go @@ -447,4 +447,4 @@ func TestOCSPConfigDefaults(t *testing.T) { if cfg.MaxRetries != 3 { t.Errorf("Expected default max retries 3, got %d", cfg.MaxRetries) } -} \ No newline at end of file +} diff --git a/internal/ssl/ssl.go b/internal/ssl/ssl.go index 455fead..f00838e 100644 --- a/internal/ssl/ssl.go +++ b/internal/ssl/ssl.go @@ -7,7 +7,8 @@ // - OCSP Stapling 支持 // // 主要用途: -// 用于管理 HTTPS 服务器的 TLS 配置,支持多证书虚拟主机。 +// +// 用于管理 HTTPS 服务器的 TLS 配置,支持多证书虚拟主机。 // // 安全默认值: // - TLS 版本:仅启用 TLSv1.2 和 TLSv1.3 diff --git a/internal/stream/stream.go b/internal/stream/stream.go index e9fdbc0..c1c7f15 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -94,12 +94,12 @@ func (l *leastConn) Select(targets []*Target) *Target { // Server TCP/UDP Stream 代理服务器。 type Server struct { - listeners map[string]net.Listener - udpServers map[string]*udpServer - upstreams map[string]*Upstream - connCount int64 // 当前连接数 - mu sync.RWMutex - running atomic.Bool + listeners map[string]net.Listener + udpServers map[string]*udpServer + upstreams map[string]*Upstream + connCount int64 // 当前连接数 + mu sync.RWMutex + running atomic.Bool } // Upstream Stream 上游配置。 @@ -432,46 +432,26 @@ type Stats struct { Upstreams int } -// udpListener UDP 监听器包装。 -type udpListener struct { - conn *net.UDPConn -} - -// Accept UDP 不支持 Accept,返回错误。 -func (u *udpListener) Accept() (net.Conn, error) { - return nil, io.EOF -} - -// Close 关闭 UDP 连接。 -func (u *udpListener) Close() error { - return u.conn.Close() -} - -// Addr 返回本地地址。 -func (u *udpListener) Addr() net.Addr { - return u.conn.LocalAddr() -} - // udpSession UDP 会话,管理客户端到后端的映射 type udpSession struct { - clientAddr *net.UDPAddr - targetConn net.Conn - lastActive time.Time - mu sync.RWMutex - srv *udpServer - closeOnce sync.Once + clientAddr *net.UDPAddr + targetConn net.Conn + lastActive time.Time + mu sync.RWMutex + srv *udpServer + closeOnce sync.Once } // udpServer UDP 服务器,管理多个客户端会话 type udpServer struct { - conn *net.UDPConn - sessions map[string]*udpSession - mu sync.RWMutex - running atomic.Bool - upstream *Upstream - timeout time.Duration - stopCh chan struct{} - wg sync.WaitGroup + conn *net.UDPConn + sessions map[string]*udpSession + mu sync.RWMutex + running atomic.Bool + upstream *Upstream + timeout time.Duration + stopCh chan struct{} + wg sync.WaitGroup } // newUDPServer 创建新的 UDP 服务器 diff --git a/internal/stream/stream_test.go b/internal/stream/stream_test.go index 9a08326..85ddd1a 100644 --- a/internal/stream/stream_test.go +++ b/internal/stream/stream_test.go @@ -225,37 +225,6 @@ func TestHealthCheckerStartStop(t *testing.T) { hc.Stop() } -func TestUDPListener(t *testing.T) { - addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") - if err != nil { - t.Fatalf("Failed to resolve UDP address: %v", err) - } - - conn, err := net.ListenUDP("udp", addr) - if err != nil { - t.Fatalf("Failed to listen UDP: %v", err) - } - defer conn.Close() - - ul := &udpListener{conn: conn} - - // 测试 Addr - if ul.Addr() == nil { - t.Error("Expected non-nil address") - } - - // 测试 Close - if err := ul.Close(); err != nil { - t.Errorf("Close failed: %v", err) - } - - // 测试 Accept(应该返回 io.EOF) - _, err = ul.Accept() - if err == nil { - t.Error("Expected error from Accept") - } -} - func TestConcurrentConnections(t *testing.T) { s := NewServer() @@ -747,4 +716,4 @@ func TestHealthCheckerCheckWithHealthyTarget(t *testing.T) { if !u.targets[0].healthy.Load() { t.Error("Expected target to be marked healthy") } -} \ No newline at end of file +}