diff --git a/internal/app/app.go b/internal/app/app.go index 39208a1..774f29d 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -192,7 +192,7 @@ func (a *App) reloadConfig() { for i, ln := range listeners { duped[i], err = server.DupListener(ln) if err != nil { - for j := 0; j < i; j++ { + for j := range i { _ = duped[j].Close() } a.logger.Error().Err(err).Msg("Failed to dup listener for reload") @@ -289,6 +289,8 @@ func (a *App) requiresFullRestart(newCfg *config.Config) bool { return true } } + case config.ServerModeAuto: + return true } return false } diff --git a/internal/lua/api_location.go b/internal/lua/api_location.go index c0dc1c8..54bd2b5 100644 --- a/internal/lua/api_location.go +++ b/internal/lua/api_location.go @@ -180,7 +180,7 @@ func RegisterLocationAPI(L *glua.LState, manager *LocationManager, ngx *glua.LTa optionsTable := L.CheckTable(2) optionsTable.ForEach(func(key, value glua.LValue) { keyStr := glua.LVAsString(key) - //nolint:exhaustive // 只处理特定类型 + //nolint:exhaustive,nolintlint switch value.Type() { case glua.LTString: opts[keyStr] = glua.LVAsString(value) diff --git a/internal/lua/api_req.go b/internal/lua/api_req.go index 48ec1ae..6d8076d 100644 --- a/internal/lua/api_req.go +++ b/internal/lua/api_req.go @@ -308,7 +308,7 @@ func (api *ngxReqAPI) luaSetURIArgs(L *glua.LState) int { // 获取参数类型 argType := L.Get(1) - //nolint:exhaustive // 只处理特定类型 + //nolint:exhaustive,nolintlint switch argType.Type() { case glua.LTString: // 如果是字符串,直接解析并设置 @@ -330,7 +330,7 @@ func (api *ngxReqAPI) luaSetURIArgs(L *glua.LState) int { table.ForEach(func(key, value glua.LValue) { keyStr := glua.LVAsString(key) - //nolint:exhaustive // 只处理特定类型 + //nolint:exhaustive,nolintlint switch value.Type() { case glua.LTString: // 类型断言检查 diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index cd1c10d..292173e 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -57,7 +57,7 @@ import ( // proxyDebugLog 在 DEBUG 级别记录代理日志 // 调用者必须先检查 logging.Debug().Enabled() 以避免不必要的内存分配 -func proxyDebugLog(msg string, kv ...interface{}) { +func proxyDebugLog(msg string, kv ...any) { event := logging.Debug() for i := 0; i < len(kv)-1; i += 2 { key, ok := kv[i].(string) diff --git a/internal/server/server.go b/internal/server/server.go index 8e0364b..d95fd08 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -46,6 +46,8 @@ import ( "rua.plus/lolly/internal/version" ) +const networkTCP = "tcp" + // Server HTTP 服务器,封装 fasthttp.Server 并提供中间件链和生命周期管理。 // // 该结构体是服务器的核心实体,负责: @@ -358,7 +360,7 @@ func (s *Server) createListener(cfg *config.ServerConfig) (net.Listener, error) return listener, nil } - return net.Listen("tcp", listenAddr) + return net.Listen(networkTCP, listenAddr) } func (s *Server) matchInheritedListener(inherited []net.Listener, listenAddr string) net.Listener { @@ -383,7 +385,7 @@ func (s *Server) matchInheritedListener(inherited []net.Listener, listenAddr str if ln == nil { continue } - if ln.Addr().Network() != "tcp" { + if ln.Addr().Network() != networkTCP { continue } if s.tcpAddrMatch(ln.Addr().String(), listenAddr) { @@ -426,14 +428,14 @@ func DupListener(ln net.Listener) (net.Listener, error) { if err != nil { return nil, fmt.Errorf("dup tcp listener: %w", err) } - defer file.Close() + defer func() { _ = file.Close() }() return net.FileListener(file) case *net.UnixListener: file, err := l.File() if err != nil { return nil, fmt.Errorf("dup unix listener: %w", err) } - defer file.Close() + defer func() { _ = file.Close() }() return net.FileListener(file) default: return nil, fmt.Errorf("unsupported listener type: %T", ln) @@ -467,8 +469,8 @@ func (s *Server) startSingleMode() error { if err != nil { logging.Error().Msg("Failed to create status handler: " + err.Error()) } else { - if err := s.locationEngine.AddExact(statusHandler.Path(), statusHandler.ServeHTTP, false); err != nil { - if err := s.handleRegistrationError("status", statusHandler.Path(), err); err != nil { + if regErr := s.locationEngine.AddExact(statusHandler.Path(), statusHandler.ServeHTTP, false); regErr != nil { + if err := s.handleRegistrationError("status", statusHandler.Path(), regErr); err != nil { return err } } @@ -480,13 +482,13 @@ func (s *Server) startSingleMode() error { if err != nil { logging.Error().Msg("Failed to create pprof handler: " + err.Error()) } else { - if err := s.locationEngine.AddExact(pprofHandler.Path(), pprofHandler.ServeHTTP, false); err != nil { - if err := s.handleRegistrationError("pprof", pprofHandler.Path(), err); err != nil { + if regErr := s.locationEngine.AddExact(pprofHandler.Path(), pprofHandler.ServeHTTP, false); regErr != nil { + if err := s.handleRegistrationError("pprof", pprofHandler.Path(), regErr); err != nil { return err } } - if err := s.locationEngine.AddPrefixPriority(pprofHandler.Path()+"/", pprofHandler.ServeHTTP, false); err != nil { - if err := s.handleRegistrationError("pprof", pprofHandler.Path()+"/", err); err != nil { + if regErr := s.locationEngine.AddPrefixPriority(pprofHandler.Path()+"/", pprofHandler.ServeHTTP, false); regErr != nil { + if err := s.handleRegistrationError("pprof", pprofHandler.Path()+"/", regErr); err != nil { return err } } @@ -498,8 +500,8 @@ func (s *Server) startSingleMode() error { if err != nil { logging.Error().Msg("Failed to create cache purge handler: " + err.Error()) } else { - if err := s.locationEngine.AddExact(purgeHandler.Path(), purgeHandler.ServeHTTP, false); err != nil { - if err := s.handleRegistrationError("cache-purge", purgeHandler.Path(), err); err != nil { + if regErr := s.locationEngine.AddExact(purgeHandler.Path(), purgeHandler.ServeHTTP, false); regErr != nil { + if err := s.handleRegistrationError("cache-purge", purgeHandler.Path(), regErr); err != nil { return err } } @@ -650,7 +652,7 @@ func (s *Server) startMultiServerMode() error { serverCfg := &s.config.Servers[i] ln, err := s.createListener(serverCfg) if err != nil { - for j := 0; j < i; j++ { + for j := range i { if s.listeners[j] != nil { _ = s.listeners[j].Close() } diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 78659d6..4bb906f 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -136,7 +136,7 @@ func (l *leastConn) Select(targets []*Target) *Target { if !t.healthy.Load() { continue } - conns := atomic.LoadInt64(&t.conns) + conns := t.conns.Load() if selected == nil || conns < minConns { selected = t minConns = conns @@ -293,7 +293,7 @@ type Server struct { // upstreams 上游配置映射 upstreams map[string]*Upstream // connCount 当前连接数 - connCount int64 + connCount atomic.Int64 // mu 读写锁,保护并发访问 mu sync.RWMutex // running 运行状态标志 @@ -329,7 +329,7 @@ type Target struct { // healthy 健康状态 healthy atomic.Bool // conns 当前连接数 - conns int64 + conns atomic.Int64 } // HealthChecker Stream 健康检查器。 @@ -528,7 +528,7 @@ func (s *Server) acceptLoop(addr string, listener net.Listener) { continue } - atomic.AddInt64(&s.connCount, 1) + s.connCount.Add(1) go s.handleConnection(conn, addr) } } @@ -547,7 +547,7 @@ func (s *Server) acceptLoop(addr string, listener net.Listener) { func (s *Server) handleConnection(clientConn net.Conn, _ string) { defer func() { _ = clientConn.Close() - atomic.AddInt64(&s.connCount, -1) + s.connCount.Add(-1) }() s.mu.RLock() @@ -569,8 +569,8 @@ func (s *Server) handleConnection(clientConn net.Conn, _ string) { return // 无可用目标 } - atomic.AddInt64(&target.conns, 1) - defer func() { atomic.AddInt64(&target.conns, -1) }() + target.conns.Add(1) + defer func() { target.conns.Add(-1) }() // 连接目标 targetConn, err := net.DialTimeout("tcp", target.addr, 10*time.Second) @@ -808,7 +808,7 @@ func (s *udpServer) getOrCreateSession(clientAddr *net.UDPAddr) (*udpSession, er return nil, err } - atomic.AddInt64(&target.conns, 1) + target.conns.Add(1) // 创建新会话 session = &udpSession{ @@ -855,7 +855,7 @@ func (sess *udpSession) close() { _ = sess.targetConn.Close() } if sess.target != nil { - atomic.AddInt64(&sess.target.conns, -1) + sess.target.conns.Add(-1) } }) } diff --git a/internal/stream/stream_bench_test.go b/internal/stream/stream_bench_test.go index e5b39a8..0fe44b6 100644 --- a/internal/stream/stream_bench_test.go +++ b/internal/stream/stream_bench_test.go @@ -260,8 +260,8 @@ func BenchmarkStreamBalancerSelect(b *testing.B) { targets[i] = &Target{ addr: fmt.Sprintf("backend%d:8080", i), weight: i + 1, - conns: int64(i * 10), // 模拟不同连接数 } + targets[i].conns.Store(int64(i * 10)) targets[i].healthy.Store(true) } @@ -353,8 +353,8 @@ func BenchmarkStreamLeastConnWithVaryingConns(b *testing.B) { targets[i] = &Target{ addr: fmt.Sprintf("backend%d:8080", i), weight: 1, - conns: conns, } + targets[i].conns.Store(conns) targets[i].healthy.Store(true) } diff --git a/internal/stream/stream_test.go b/internal/stream/stream_test.go index 387e63b..b00a83a 100644 --- a/internal/stream/stream_test.go +++ b/internal/stream/stream_test.go @@ -13,7 +13,6 @@ package stream import ( "net" "sync" - "sync/atomic" "testing" "time" ) @@ -96,10 +95,13 @@ func TestRoundRobinBalancer(t *testing.T) { func TestLeastConnBalancer(t *testing.T) { targets := []*Target{ - {addr: "localhost:8001", conns: 5}, - {addr: "localhost:8002", conns: 2}, - {addr: "localhost:8003", conns: 8}, + {addr: "localhost:8001"}, + {addr: "localhost:8002"}, + {addr: "localhost:8003"}, } + targets[0].conns.Store(5) + targets[1].conns.Store(2) + targets[2].conns.Store(8) for _, t := range targets { t.healthy.Store(true) } @@ -249,12 +251,12 @@ func TestConcurrentConnections(t *testing.T) { var wg sync.WaitGroup for range 100 { wg.Go(func() { - atomic.AddInt64(&s.connCount, 1) + s.connCount.Add(1) }) } wg.Wait() - if s.connCount != 100 { + if s.connCount.Load() != 100 { t.Errorf("Expected 100 connections, got %d", s.connCount) } } @@ -335,10 +337,13 @@ func TestRoundRobinBalancerWithSingleTarget(t *testing.T) { func TestLeastConnBalancerWithTie(t *testing.T) { lc := newLeastConn() targets := []*Target{ - {addr: "backend1:8080", conns: 5}, - {addr: "backend2:8080", conns: 5}, - {addr: "backend3:8080", conns: 5}, + {addr: "backend1:8080"}, + {addr: "backend2:8080"}, + {addr: "backend3:8080"}, } + targets[0].conns.Store(5) + targets[1].conns.Store(5) + targets[2].conns.Store(5) for _, t := range targets { t.healthy.Store(true) }