lolly/internal/server/server.go
xfy b0380f8798 feat(server): 改用前缀匹配注册代理路由
使用 fasthttp/router 通配符路由替代精确匹配:
- path: / 匹配所有子路径如 /sorry/index
- path: /api/ 匹配 /api/* 所有子路径
- 路由格式: {path}/{path:*}

此改动确保代理路径下所有子请求都能正确路由到代理处理器,
支持 redirect_rewrite 功能正确处理子路径的重定向改写。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-15 17:01:30 +08:00

1149 lines
32 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package server 提供 HTTP 服务器的核心实现,支持单服务器和虚拟主机两种运行模式。
//
// 该文件包含服务器相关的核心逻辑,包括:
// - HTTP 服务器的创建和生命周期管理
// - 中间件链的构建和应用
// - 代理路由的注册和处理
// - 静态文件服务的集成
// - Goroutine 池的性能优化
//
// 主要用途:
//
// 用于启动和管理 HTTP 服务器,处理客户端请求并转发到上游服务或静态文件。
//
// 注意事项:
// - 服务器支持优雅关闭和热升级
// - 所有公开方法均为并发安全
// - 使用前需确保配置已正确加载
//
// 作者xfy
package server
import (
"context"
"crypto/tls"
"fmt"
"net"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/valyala/fasthttp"
"rua.plus/lolly/internal/cache"
"rua.plus/lolly/internal/config"
"rua.plus/lolly/internal/handler"
"rua.plus/lolly/internal/loadbalance"
"rua.plus/lolly/internal/logging"
"rua.plus/lolly/internal/lua"
"rua.plus/lolly/internal/middleware"
"rua.plus/lolly/internal/middleware/accesslog"
"rua.plus/lolly/internal/middleware/bodylimit"
"rua.plus/lolly/internal/middleware/compression"
"rua.plus/lolly/internal/middleware/errorintercept"
"rua.plus/lolly/internal/middleware/rewrite"
"rua.plus/lolly/internal/middleware/security"
"rua.plus/lolly/internal/proxy"
"rua.plus/lolly/internal/resolver"
"rua.plus/lolly/internal/ssl"
)
// Server HTTP 服务器,封装 fasthttp.Server 并提供中间件链和生命周期管理。
//
// 该结构体是服务器的核心实体,负责:
// - 管理配置和 fasthttp.Server 实例
// - 构建和应用中间件链
// - 维护健康检查器和访问日志中间件
// - 可选的 Goroutine 池和文件缓存
//
// 注意事项:
// - 创建后需调用 Start 方法启动服务器
// - 关闭时建议使用 GracefulStop 实现优雅关闭
type Server struct {
startTime time.Time
resolver resolver.Resolver
tlsManager *ssl.TLSManager
handler fasthttp.RequestHandler
accessLogMiddleware *accesslog.AccessLog
luaEngine *lua.LuaEngine
accessControl *security.AccessControl
errorPageManager *handler.ErrorPageManager
fileCache *cache.FileCache
pool *GoroutinePool
config *config.Config
fastServer *fasthttp.Server
fastServers []*fasthttp.Server // 多监听器模式使用
proxies []*proxy.Proxy
listeners []net.Listener
healthCheckers []*proxy.HealthChecker
connections atomic.Int64
requests atomic.Int64
bytesSent atomic.Int64
bytesReceived atomic.Int64
running bool
}
// New 创建 HTTP 服务器实例。
//
// 根据提供的配置创建服务器对象,但不启动服务器。
// 服务器创建后需调用 Start 方法才能开始处理请求。
//
// 参数:
// - cfg: 服务器配置对象,包含监听地址、代理、静态文件、安全等配置
//
// 返回值:
// - *Server: 创建的服务器实例
func New(cfg *config.Config) *Server {
return &Server{config: cfg}
}
// trackStats 包装处理器以统计请求数和数据传输量。
//
// 在每个请求处理前后更新统计字段requests、bytesSent、bytesReceived。
//
// 参数:
// - handler: 原始的请求处理器
//
// 返回值:
// - fasthttp.RequestHandler: 包装后的处理器
func (s *Server) trackStats(handler fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(ctx *fasthttp.RequestCtx) {
s.requests.Add(1)
s.bytesReceived.Add(int64(len(ctx.Request.Body())))
handler(ctx)
s.bytesSent.Add(int64(len(ctx.Response.Body())))
}
}
// GetListeners 获取服务器监听器列表。
//
// 返回当前服务器使用的监听器,用于热升级时传递给子进程。
//
// 返回值:
// - []net.Listener: 监听器列表
func (s *Server) GetListeners() []net.Listener {
return s.listeners
}
// SetListeners 设置服务器监听器列表。
//
// 用于热升级时,子进程从父进程继承监听器。
//
// 参数:
// - listeners: 要设置的监听器列表
func (s *Server) SetListeners(listeners []net.Listener) {
s.listeners = listeners
}
// GetTLSConfig 获取 TLS 配置。
//
// 返回服务器的 TLS 配置,用于 HTTP/3 等需要 TLS 的协议。
//
// 返回值:
// - *tls.Config: TLS 配置对象
// - error: 未配置 TLS 或配置无效时返回错误
func (s *Server) GetTLSConfig() (*tls.Config, error) {
if s.tlsManager == nil {
return nil, fmt.Errorf("TLS not configured")
}
return s.tlsManager.GetTLSConfig(), nil
}
// GetHandler 获取请求处理器。
//
// 返回服务器的请求处理器,用于 HTTP/3 等需要复用处理器的场景。
//
// 返回值:
// - fasthttp.RequestHandler: 请求处理器
func (s *Server) GetHandler() fasthttp.RequestHandler {
return s.handler
}
// buildMiddlewareChain 构建中间件链。
//
// 根据服务器配置按顺序构建中间件链,顺序为:
//
// AccessLog -> AccessControl -> RateLimiter -> BasicAuth -> Rewrite -> Compression -> SecurityHeaders
//
// 参数:
// - serverCfg: 单个服务器的配置对象
//
// 返回值:
// - *middleware.Chain: 构建完成的中间件链
// - error: 构建过程中遇到的错误,如中间件创建失败
//
// 注意事项:
// - 各中间件按顺序依次包装请求处理器
// - 未配置的中间件不会添加到链中
func (s *Server) buildMiddlewareChain(serverCfg *config.ServerConfig) (*middleware.Chain, error) {
var middlewares []middleware.Middleware
// 1. AccessLog (已集成)
s.accessLogMiddleware = accesslog.New(&s.config.Logging)
middlewares = append(middlewares, s.accessLogMiddleware)
// 2. Security: AccessControl (IP 访问控制)
if len(serverCfg.Security.Access.Allow) > 0 || len(serverCfg.Security.Access.Deny) > 0 {
ac, err := security.NewAccessControl(&serverCfg.Security.Access)
if err != nil {
return nil, fmt.Errorf("创建访问控制中间件失败: %w", err)
}
middlewares = append(middlewares, ac)
s.accessControl = ac
}
// 3. Security: RateLimiter (速率限制)
if serverCfg.Security.RateLimit.RequestRate > 0 {
rl, err := security.NewRateLimiter(&serverCfg.Security.RateLimit)
if err != nil {
return nil, fmt.Errorf("创建限流中间件失败: %w", err)
}
middlewares = append(middlewares, rl)
}
// 3.5 Security: ConnLimiter (连接数限制)
if serverCfg.Security.RateLimit.ConnLimit > 0 {
cl, err := security.NewConnLimiter(serverCfg.Security.RateLimit.ConnLimit, true, serverCfg.Security.RateLimit.Key)
if err != nil {
return nil, fmt.Errorf("创建连接限制中间件失败: %w", err)
}
middlewares = append(middlewares, cl.Middleware())
}
// 4. Security: BasicAuth (认证)
if len(serverCfg.Security.Auth.Users) > 0 {
auth, err := security.NewBasicAuth(&serverCfg.Security.Auth)
if err != nil {
return nil, fmt.Errorf("创建认证中间件失败: %w", err)
}
middlewares = append(middlewares, auth)
}
// 4.3 Security: AuthRequest (外部认证子请求)
if serverCfg.Security.AuthRequest.Enabled && serverCfg.Security.AuthRequest.URI != "" {
authReq, err := security.NewAuthRequest(serverCfg.Security.AuthRequest)
if err != nil {
return nil, fmt.Errorf("创建外部认证中间件失败: %w", err)
}
middlewares = append(middlewares, authReq)
}
// 4.5 BodyLimit (请求体大小限制)
// 创建 bodylimit 中间件,使用全局配置或默认值
bodyLimitMiddleware := bodylimit.NewWithDefault()
if serverCfg.ClientMaxBodySize != "" {
bl, err := bodylimit.New(serverCfg.ClientMaxBodySize)
if err != nil {
return nil, fmt.Errorf("创建请求体限制中间件失败: %w", err)
}
bodyLimitMiddleware = bl
}
// 添加路径级别的限制配置
for i := range serverCfg.Proxy {
if serverCfg.Proxy[i].ClientMaxBodySize != "" {
if err := bodyLimitMiddleware.AddPathLimit(
serverCfg.Proxy[i].Path,
serverCfg.Proxy[i].ClientMaxBodySize,
); err != nil {
return nil, fmt.Errorf("添加路径请求体限制失败: %w", err)
}
}
}
middlewares = append(middlewares, bodyLimitMiddleware)
// 5. Rewrite (URL 重写)
if len(serverCfg.Rewrite) > 0 {
rw, err := rewrite.New(serverCfg.Rewrite)
if err != nil {
return nil, fmt.Errorf("创建重写中间件失败: %w", err)
}
middlewares = append(middlewares, rw)
}
// 6. Compression (响应压缩)
if serverCfg.Compression.Type != "" {
comp, err := compression.New(&serverCfg.Compression)
if err != nil {
return nil, fmt.Errorf("创建压缩中间件失败: %w", err)
}
middlewares = append(middlewares, comp)
}
// 7. SecurityHeaders (安全头部)
// 如果有任何安全头部配置,则启用
if serverCfg.Security.Headers.XFrameOptions != "" ||
serverCfg.Security.Headers.XContentTypeOptions != "" ||
serverCfg.Security.Headers.ContentSecurityPolicy != "" ||
serverCfg.Security.Headers.ReferrerPolicy != "" ||
serverCfg.Security.Headers.PermissionsPolicy != "" {
headers := security.NewHeadersWithHSTS(&serverCfg.Security.Headers, &serverCfg.SSL.HSTS)
middlewares = append(middlewares, headers)
}
// 8. ErrorIntercept (错误页面拦截)
// 如果配置了错误页面,添加错误拦截中间件
if s.errorPageManager != nil && s.errorPageManager.IsConfigured() {
ei := errorintercept.New(s.errorPageManager)
middlewares = append(middlewares, ei)
}
// Lua 中间件(可选)
if s.luaEngine != nil && serverCfg.Lua != nil && serverCfg.Lua.Enabled {
luaMiddlewares, err := s.buildLuaMiddlewares(serverCfg.Lua)
if err != nil {
return nil, fmt.Errorf("创建 Lua 中间件失败: %w", err)
}
middlewares = append(middlewares, luaMiddlewares...)
}
return middleware.NewChain(middlewares...), nil
}
// buildLuaMiddlewares 根据 Lua 配置创建中间件。
//
// 根据 Scripts 配置创建 LuaMiddleware 或 MultiPhaseLuaMiddleware。
// 支持单脚本和多阶段脚本配置。
//
// 参数:
// - luaCfg: Lua 配置对象
//
// 返回值:
// - []middleware.Middleware: 创建的中间件列表
// - error: 创建过程中遇到的错误
func (s *Server) buildLuaMiddlewares(luaCfg *config.LuaMiddlewareConfig) ([]middleware.Middleware, error) {
if s.luaEngine == nil {
return nil, nil
}
// 按阶段分组脚本
phaseScripts := make(map[string][]config.LuaScriptConfig)
for _, script := range luaCfg.Scripts {
// 默认启用
enabled := script.Enabled
if !enabled && script.Timeout == 0 && script.Path != "" {
enabled = true // 零值时默认启用
}
if enabled {
phaseScripts[script.Phase] = append(phaseScripts[script.Phase], script)
}
}
var middlewares []middleware.Middleware
// 为每个阶段创建中间件
for phase, scripts := range phaseScripts {
if len(scripts) == 0 {
continue
}
// 单脚本:直接创建 LuaMiddleware
if len(scripts) == 1 {
script := scripts[0]
luaPhase, err := lua.ParsePhase(phase)
if err != nil {
return nil, fmt.Errorf("无效的阶段 '%s': %w", phase, err)
}
timeout := script.Timeout
if timeout == 0 {
timeout = 30 * time.Second
}
cfg := lua.LuaMiddlewareConfig{
ScriptPath: script.Path,
Phase: luaPhase,
Timeout: timeout,
Name: fmt.Sprintf("lua-%s", phase),
}
mw, err := lua.NewLuaMiddleware(s.luaEngine, cfg)
if err != nil {
return nil, fmt.Errorf("创建 Lua 中间件失败 (phase=%s): %w", phase, err)
}
middlewares = append(middlewares, mw)
} else {
// 多脚本:创建 MultiPhaseLuaMiddleware
multi := lua.NewMultiPhaseLuaMiddleware(s.luaEngine, fmt.Sprintf("lua-multi-%s", phase))
for _, script := range scripts {
luaPhase, err := lua.ParsePhase(phase)
if err != nil {
return nil, fmt.Errorf("无效的阶段 '%s': %w", phase, err)
}
timeout := script.Timeout
if timeout == 0 {
timeout = 30 * time.Second
}
err = multi.AddPhase(luaPhase, script.Path, timeout)
if err != nil {
return nil, fmt.Errorf("添加 Lua 阶段失败 (phase=%s): %w", phase, err)
}
}
middlewares = append(middlewares, multi)
}
}
return middlewares, nil
}
// Start 启动 HTTP 服务器。
//
// 初始化日志系统、性能优化组件Goroutine池、文件缓存
// 根据配置选择单服务器模式或虚拟主机模式启动。
//
// 返回值:
// - error: 启动过程中遇到的错误,如监听地址绑定失败
//
// 注意事项:
// - 该方法会阻塞运行,直到服务器停止
// - 调用前需确保配置已正确加载
// - Goroutine池和文件缓存根据配置自动启用
func (s *Server) Start() error {
logging.Init(s.config.Logging.Error.Level, true)
// 记录启动时间
s.startTime = time.Now()
// 初始化 GoroutinePool
s.pool = initGoroutinePool(&s.config.Performance)
// 初始化文件缓存
s.fileCache = initFileCache(&s.config.Performance)
// 初始化错误页面管理器
var err error
if len(s.config.Servers) > 0 {
s.errorPageManager, err = initErrorPageManager(&s.config.Servers[0].Security.ErrorPage)
if err != nil {
return err
}
// 初始化 Lua 引擎
s.luaEngine, err = initLuaEngine(s.config.Servers[0].Lua)
if err != nil {
return err
}
}
// 根据模式选择启动方式
mode := s.config.GetMode()
switch mode {
case config.ServerModeSingle:
return s.startSingleMode()
case config.ServerModeVHost:
return s.startVHostMode()
case config.ServerModeMultiServer:
return s.startMultiServerMode()
default:
// 默认使用单服务器模式
return s.startSingleMode()
}
}
// startSingleMode 单服务器模式启动。
//
// 在单服务器模式下,创建单一路由器,注册代理路由和静态文件服务,
// 应用中间件链后启动 fasthttp 服务器。
//
// 返回值:
// - error: 启动过程中遇到的错误
//
// 注意事项:
// - 静态文件服务作为 fallback 处理非代理路径的请求
// - 使用零拷贝传输优化大文件传输
func (s *Server) startSingleMode() error {
// 使用 Servers[0] 配置(迁移后 Server 字段为空)
serverCfg := &s.config.Servers[0]
router := handler.NewRouter()
// 注册状态监控端点(如果配置)
if s.config.Monitoring.Status.Path != "" || len(s.config.Monitoring.Status.Allow) > 0 {
statusHandler, err := NewStatusHandler(s, &s.config.Monitoring.Status)
if err != nil {
logging.Error().Msg("创建状态处理器失败: " + err.Error())
} else {
router.GET(statusHandler.Path(), statusHandler.ServeHTTP)
}
}
// 注册 pprof 性能分析端点(如果配置)
if s.config.Monitoring.Pprof.Enabled {
pprofHandler, err := NewPprofHandler(&s.config.Monitoring.Pprof)
if err != nil {
logging.Error().Msg("创建 pprof 处理器失败: " + err.Error())
} else {
router.GET(pprofHandler.Path(), pprofHandler.ServeHTTP)
router.GET(pprofHandler.Path()+"/{profile:*}", pprofHandler.ServeHTTP)
}
}
// 注册代理路由
s.registerProxyRoutes(router, serverCfg)
// 静态文件服务
s.registerStaticHandlers(router, serverCfg)
// 构建中间件链
chain, err := s.buildMiddlewareChain(serverCfg)
if err != nil {
return err
}
// 应用 GoroutinePool如果启用
handler := chain.Apply(router.Handler())
if s.pool != nil {
handler = s.pool.WrapHandler(handler)
}
// 包装统计追踪
handler = s.trackStats(handler)
s.handler = handler
s.fastServer = &fasthttp.Server{
Name: "lolly",
Handler: s.handler,
ReadTimeout: serverCfg.ReadTimeout,
WriteTimeout: serverCfg.WriteTimeout,
IdleTimeout: serverCfg.IdleTimeout,
MaxConnsPerIP: serverCfg.MaxConnsPerIP,
MaxRequestsPerConn: serverCfg.MaxRequestsPerConn,
CloseOnShutdown: true,
}
s.running = true
// 创建监听器并保存,用于热升级
ln, err := net.Listen("tcp", serverCfg.Listen)
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
s.listeners = []net.Listener{ln}
// 检查是否配置了 SSL/TLS
if serverCfg.SSL.Cert != "" && serverCfg.SSL.Key != "" {
var err error
s.tlsManager, err = ssl.NewTLSManager(&serverCfg.SSL)
if err != nil {
return fmt.Errorf("创建 TLS 管理器失败: %w", err)
}
s.fastServer.TLSConfig = s.tlsManager.GetTLSConfig()
return s.fastServer.ServeTLS(ln, "", "")
}
return s.fastServer.Serve(ln)
}
// startVHostMode 虚拟主机模式启动。
//
// 在虚拟主机模式下,为每个配置的服务器创建独立的路由器和中间件链,
// 通过虚拟主机管理器根据 Host 头分发请求。
//
// 返回值:
// - error: 启动过程中遇到的错误
//
// 注意事项:
// - 每个虚拟主机有独立的中间件配置
// - 未匹配的 Host 头请求由默认主机处理
func (s *Server) startVHostMode() error {
vhostMgr := NewVHostManager()
for i := range s.config.Servers {
router := handler.NewRouter()
s.registerProxyRoutes(router, &s.config.Servers[i])
// 静态文件
s.registerStaticHandlers(router, &s.config.Servers[i])
// 为每个虚拟主机构建独立的中间件链
chain, err := s.buildMiddlewareChain(&s.config.Servers[i])
if err != nil {
return err
}
handler := chain.Apply(router.Handler())
if s.pool != nil {
handler = s.pool.WrapHandler(handler)
}
vhostMgr.AddHost(s.config.Servers[i].Name, handler)
}
// 默认主机
if s.config.GetDefaultServerFromList() != nil {
router := handler.NewRouter()
// 注册状态监控端点(如果配置)
if s.config.Monitoring.Status.Path != "" || len(s.config.Monitoring.Status.Allow) > 0 {
statusHandler, err := NewStatusHandler(s, &s.config.Monitoring.Status)
if err != nil {
logging.Error().Msg("创建状态处理器失败: " + err.Error())
} else {
router.GET(statusHandler.Path(), statusHandler.ServeHTTP)
}
}
// 注册 pprof 性能分析端点(如果配置)
if s.config.Monitoring.Pprof.Enabled {
pprofHandler, err := NewPprofHandler(&s.config.Monitoring.Pprof)
if err != nil {
logging.Error().Msg("创建 pprof 处理器失败: " + err.Error())
} else {
router.GET(pprofHandler.Path(), pprofHandler.ServeHTTP)
router.GET(pprofHandler.Path()+"/{profile:*}", pprofHandler.ServeHTTP)
}
}
defaultSrv := s.config.GetDefaultServerFromList()
s.registerProxyRoutes(router, defaultSrv)
// 静态文件
s.registerStaticHandlers(router, defaultSrv)
chain, err := s.buildMiddlewareChain(defaultSrv)
if err != nil {
return err
}
handler := chain.Apply(router.Handler())
if s.pool != nil {
handler = s.pool.WrapHandler(handler)
}
vhostMgr.SetDefault(handler)
}
s.handler = vhostMgr.Handler()
// 包装统计追踪
s.handler = s.trackStats(s.handler)
// 使用 Servers[0] 配置(迁移后 Server 字段为空)
serverCfg := &s.config.Servers[0]
s.fastServer = &fasthttp.Server{
Name: "lolly",
Handler: s.handler,
ReadTimeout: serverCfg.ReadTimeout,
WriteTimeout: serverCfg.WriteTimeout,
IdleTimeout: serverCfg.IdleTimeout,
MaxConnsPerIP: serverCfg.MaxConnsPerIP,
MaxRequestsPerConn: serverCfg.MaxRequestsPerConn,
CloseOnShutdown: true,
}
s.running = true
// 创建监听器并保存,用于热升级
ln, err := net.Listen("tcp", serverCfg.Listen)
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
s.listeners = []net.Listener{ln}
// 检查是否配置了 SSL/TLS
if serverCfg.SSL.Cert != "" && serverCfg.SSL.Key != "" {
var err error
s.tlsManager, err = ssl.NewTLSManager(&serverCfg.SSL)
if err != nil {
return fmt.Errorf("创建 TLS 管理器失败: %w", err)
}
s.fastServer.TLSConfig = s.tlsManager.GetTLSConfig()
return s.fastServer.ServeTLS(ln, "", "")
}
return s.fastServer.Serve(ln)
}
// startMultiServerMode 多服务器模式启动。
//
// 为每个配置的服务器创建独立的 fasthttp.Server 实例,
// 每个实例监听各自的地址并运行在独立的 goroutine 中。
//
// 返回值:
// - error: 启动过程中遇到的第一个错误(或全部成功时返回 nil
//
// 注意事项:
// - 每个服务器有独立的中间件配置
// - 热升级场景下回退到虚拟主机模式
// - 使用 goroutine 并行启动多个服务器
func (s *Server) startMultiServerMode() error {
// 热升级检测multi_server 热升级未实现,回退到 vhost 模式
if os.Getenv("GRACEFUL_UPGRADE") == "1" {
logging.Warn().Msg("热升级模式下 multi_server 模式未实现,回退到虚拟主机模式")
return s.startVHostMode()
}
s.fastServers = make([]*fasthttp.Server, len(s.config.Servers))
s.listeners = make([]net.Listener, len(s.config.Servers))
var wg sync.WaitGroup
errCh := make(chan error, len(s.config.Servers))
// 并行创建监听器和 fasthttp.Server
for i := range s.config.Servers {
wg.Add(1)
go func(idx int) {
defer wg.Done()
serverCfg := &s.config.Servers[idx]
// 创建监听器
ln, err := net.Listen("tcp", serverCfg.Listen)
if err != nil {
errCh <- fmt.Errorf("监听地址 %s 失败: %w", serverCfg.Listen, err)
return
}
s.listeners[idx] = ln
// 创建路由器
router := handler.NewRouter()
s.registerProxyRoutes(router, serverCfg)
// 静态文件服务
s.registerStaticHandlers(router, serverCfg)
// 构建独立的中间件链
chain, err := s.buildMiddlewareChain(serverCfg)
if err != nil {
errCh <- fmt.Errorf("构建中间件链失败 (server[%d]): %w", idx, err)
return
}
// 应用中间件
h := chain.Apply(router.Handler())
if s.pool != nil {
h = s.pool.WrapHandler(h)
}
h = s.trackStats(h)
// 创建 fasthttp.Server
fastSrv := &fasthttp.Server{
Name: "lolly",
Handler: h,
ReadTimeout: serverCfg.ReadTimeout,
WriteTimeout: serverCfg.WriteTimeout,
IdleTimeout: serverCfg.IdleTimeout,
MaxConnsPerIP: serverCfg.MaxConnsPerIP,
MaxRequestsPerConn: serverCfg.MaxRequestsPerConn,
CloseOnShutdown: true,
}
// 检查 SSL 配置
if serverCfg.SSL.Cert != "" && serverCfg.SSL.Key != "" {
tlsManager, err := ssl.NewTLSManager(&serverCfg.SSL)
if err != nil {
errCh <- fmt.Errorf("创建 TLS 管理器失败 (server[%d]): %w", idx, err)
return
}
fastSrv.TLSConfig = tlsManager.GetTLSConfig()
}
s.fastServers[idx] = fastSrv
}(i)
}
// 等待所有 goroutine 完成
wg.Wait()
close(errCh)
// 检查是否有错误
var firstErr error
for err := range errCh {
if firstErr == nil {
firstErr = err
}
}
// 如果有错误,清理已创建的监听器
if firstErr != nil {
for _, ln := range s.listeners {
if ln != nil {
_ = ln.Close()
}
}
return firstErr
}
s.running = true
// 启动所有服务器
for idx, fastSrv := range s.fastServers {
ln := s.listeners[idx]
if fastSrv == nil || ln == nil {
continue
}
wg.Add(1)
go func(f *fasthttp.Server, l net.Listener, i int) {
defer wg.Done()
var serveErr error
if f.TLSConfig != nil {
serveErr = f.ServeTLS(l, "", "")
} else {
serveErr = f.Serve(l)
}
if serveErr != nil {
logging.Error().Err(serveErr).Msgf("服务器 [%d] 监听 %s 时发生错误", i, l.Addr())
}
}(fastSrv, ln, idx)
}
// 等待服务器停止(阻塞)
wg.Wait()
return nil
}
// registerProxyRoutes 注册代理路由。
//
// 根据配置为路由器注册代理路径,创建代理处理器和健康检查器。
// 支持 GET、POST、PUT、DELETE、HEAD 等 HTTP 方法。
//
// 参数:
// - router: 路由器实例,用于注册路由规则
// - serverCfg: 服务器配置,包含代理目标、负载均衡、健康检查等设置
//
// 注意事项:
// - 代理目标初始状态默认为健康
// - 健康检查根据配置自动启动
func (s *Server) registerProxyRoutes(router *handler.Router, serverCfg *config.ServerConfig) {
for i := range serverCfg.Proxy {
proxyCfg := &serverCfg.Proxy[i]
// 转换目标
targets := make([]*loadbalance.Target, len(proxyCfg.Targets))
for j, t := range proxyCfg.Targets {
targets[j] = loadbalance.NewTargetFromConfig(t.URL, t.Weight)
}
// 传递 Transport 配置和 Lua 引擎
p, err := proxy.NewProxy(proxyCfg, targets, &s.config.Performance.Transport, s.luaEngine)
if err != nil {
logging.Error().Msg("创建代理失败: " + err.Error())
continue
}
// 设置 DNS 解析器(如果已配置)
if s.resolver != nil {
p.SetResolver(s.resolver)
if err := p.Start(); err != nil {
logging.Error().Err(err).Msg("启动代理失败")
}
}
// 启动健康检查
if proxyCfg.HealthCheck.Interval > 0 {
hc := proxy.NewHealthChecker(targets, &proxyCfg.HealthCheck)
hc.Start()
s.healthCheckers = append(s.healthCheckers, hc)
// 设置被动健康检查
p.SetHealthChecker(hc)
}
// 保存代理实例用于缓存统计
s.proxies = append(s.proxies, p)
// 使用前缀匹配(通配符)注册代理路由
// path: / 匹配所有子路径如 /sorry/index
// path: /api/ 匹配 /api/* 所有子路径
routePath := proxyCfg.Path
// 确保通配符路由格式正确
if !strings.HasSuffix(routePath, "/") && routePath != "/" {
routePath += "/"
}
wildcardPath := routePath + "{path:*}"
router.GET(wildcardPath, p.ServeHTTP)
router.POST(wildcardPath, p.ServeHTTP)
router.PUT(wildcardPath, p.ServeHTTP)
router.DELETE(wildcardPath, p.ServeHTTP)
router.HEAD(wildcardPath, p.ServeHTTP)
}
}
// shutdownServers 并行关闭多个 fasthttp.Server 实例。
//
// 使用 goroutine 并行关闭所有服务器,收集所有错误并返回聚合错误。
// 部分服务器关闭失败不会影响其他服务器的关闭。
//
// 参数:
// - ctx: 关闭上下文,用于控制超时和取消
// - servers: 要关闭的 fasthttp.Server 实例列表
//
// 返回值:
// - error: 聚合错误,无错误或全部成功时返回 nil
func shutdownServers(ctx context.Context, servers []*fasthttp.Server) error {
if len(servers) == 0 {
return nil
}
var (
mu sync.Mutex
errs []error
wg sync.WaitGroup
)
for _, srv := range servers {
if srv == nil {
continue
}
wg.Add(1)
go func(s *fasthttp.Server) {
defer wg.Done()
if err := s.Shutdown(); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}(srv)
}
// 等待所有关闭完成或上下文取消
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
if len(errs) == 0 {
return nil
}
if len(errs) == 1 {
return errs[0]
}
msgs := make([]string, len(errs))
for i, e := range errs {
msgs[i] = e.Error()
}
return fmt.Errorf("关闭服务器时发生 %d 个错误: %s", len(errs), strings.Join(msgs, "; "))
case <-ctx.Done():
return ctx.Err()
}
}
// StopWithTimeout 快速停止服务器(支持自定义超时)。
//
// 立即停止服务器,不等待正在处理的请求完成。
// 停止所有健康检查器和访问日志中间件。
//
// 参数:
// - timeout: 快速关闭的最大等待时间
//
// 返回值:
// - error: 停止过程中遇到的错误
//
// 注意事项:
// - 对于生产环境,建议使用 GracefulStop 实现优雅关闭
// - timeout <= 0 时会使用默认 5s 超时
func (s *Server) StopWithTimeout(timeout time.Duration) error {
// 防御性检查:如果 timeout <= 0使用默认值
if timeout <= 0 {
timeout = 5 * time.Second
}
s.running = false
// 停止 Goroutine 池
if s.pool != nil {
s.pool.Stop()
}
// 停止健康检查器
for _, hc := range s.healthCheckers {
hc.Stop()
}
// 关闭访问日志
if s.accessLogMiddleware != nil {
_ = s.accessLogMiddleware.Close()
}
// 关闭 TLS 管理器
if s.tlsManager != nil {
s.tlsManager.Close()
}
// 关闭 AccessControl (释放 GeoIP 资源)
if s.accessControl != nil {
if err := s.accessControl.Close(); err != nil {
logging.Warn().Err(err).Msg("关闭 AccessControl 失败")
}
}
// 关闭 Lua 引擎
if s.luaEngine != nil {
s.luaEngine.Close()
logging.Info().Msg("Lua 引擎已关闭")
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// 多服务器模式:并行关闭所有 fasthttp.Server
if len(s.fastServers) > 0 {
return shutdownServers(ctx, s.fastServers)
}
// 单服务器模式:关闭单个 fasthttp.Server
if s.fastServer != nil {
done := make(chan struct{})
go func() {
_ = s.fastServer.Shutdown()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// GracefulStop 优雅停止服务器。
//
// 等待正在处理的请求完成后再停止服务器,确保连接正常关闭。
// 如果超时时间到达仍有请求未完成,将返回超时错误。
//
// 参数:
// - timeout: 优雅关闭的最大等待时间
//
// 返回值:
// - error: 停止过程中遇到的错误,超时返回 context.DeadlineExceeded
//
// 注意事项:
// - 推荐在生产环境使用此方法关闭服务器
// - 超时后会强制关闭,可能导致部分请求中断
func (s *Server) GracefulStop(timeout time.Duration) error {
s.running = false
// 停止 Goroutine 池
if s.pool != nil {
s.pool.Stop()
}
// 停止健康检查器
for _, hc := range s.healthCheckers {
hc.Stop()
}
// 关闭访问日志
if s.accessLogMiddleware != nil {
_ = s.accessLogMiddleware.Close()
}
// 关闭 TLS 管理器
if s.tlsManager != nil {
s.tlsManager.Close()
}
// 关闭 AccessControl (释放 GeoIP 资源)
if s.accessControl != nil {
if err := s.accessControl.Close(); err != nil {
logging.Warn().Err(err).Msg("关闭 AccessControl 失败")
}
}
// 关闭 Lua 引擎
if s.luaEngine != nil {
s.luaEngine.Close()
logging.Info().Msg("Lua 引擎已关闭")
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// 多服务器模式:并行关闭所有 fasthttp.Server
if len(s.fastServers) > 0 {
return shutdownServers(ctx, s.fastServers)
}
// 单服务器模式:关闭单个 fasthttp.Server
if s.fastServer != nil {
done := make(chan struct{})
go func() {
_ = s.fastServer.Shutdown()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// getProxyCacheStats 收集所有代理缓存的统计信息。
func (s *Server) getProxyCacheStats() ProxyCacheStats {
var total ProxyCacheStats
for _, p := range s.proxies {
if stats := p.GetCacheStats(); stats != nil {
total.Entries += stats.Entries
total.Pending += stats.Pending
}
}
return total
}
// registerStaticHandlers 注册静态文件处理器。
//
// 为路由器注册静态文件服务,支持多个静态目录、文件缓存和预压缩文件。
//
// 参数:
// - router: 路由器实例,用于注册路由规则
// - cfg: 服务器配置,包含静态文件和压缩设置
func (s *Server) registerStaticHandlers(router *handler.Router, cfg *config.ServerConfig) {
for _, static := range cfg.Static {
path := static.Path
if path == "" {
path = "/"
}
staticHandler := handler.NewStaticHandler(
static.Root,
path,
static.Index,
true, // useSendfile
)
if s.fileCache != nil {
staticHandler.SetFileCache(s.fileCache)
}
if cfg.Compression.GzipStatic {
staticHandler.SetGzipStatic(true, cfg.Compression.GzipStaticExtensions)
}
// 设置 try_files 配置
if len(static.TryFiles) > 0 {
// 注意tryFilesPass 需要路由器支持,当前实现传入 nil
// 如果 tryFilesPass 为 true需要额外处理
staticHandler.SetTryFiles(static.TryFiles, static.TryFilesPass, router)
}
// 注册路由:确保路径以 / 结尾
routePath := path
if !strings.HasSuffix(routePath, "/") {
routePath += "/"
}
router.GET(routePath+"{filepath:*}", staticHandler.Handle)
router.HEAD(routePath+"{filepath:*}", staticHandler.Handle)
}
}
// SetResolver 设置 DNS 解析器。
func (s *Server) SetResolver(r resolver.Resolver) {
s.resolver = r
}
// GetResolver 返回 DNS 解析器。
func (s *Server) GetResolver() resolver.Resolver {
return s.resolver
}