fix: add synchronization for concurrent access in server/app/http3/stream

This commit is contained in:
xfy 2026-06-05 12:31:41 +08:00
parent 5e3196c37e
commit 4789265ca8
12 changed files with 157 additions and 88 deletions

View File

@ -45,6 +45,8 @@ func (a *App) Run() int {
_ = a.upgradeMgr.WritePid()
}
a.signalReady()
sigChan := make(chan os.Signal, 1)
a.setupSignalHandlers(sigChan)

View File

@ -9,6 +9,7 @@ import (
"fmt"
"net"
"os"
"sync"
"time"
"rua.plus/lolly/internal/config"
@ -35,15 +36,28 @@ type App struct {
pidFile string
logFile string
listeners []net.Listener
ready chan struct{}
readyOnce sync.Once
}
// NewApp creates a new App instance with the given config path.
func NewApp(cfgPath string) *App {
return &App{
cfgPath: cfgPath,
ready: make(chan struct{}),
}
}
// WaitReady blocks until the app has completed initialization.
func (a *App) WaitReady() {
<-a.ready
}
// signalReady closes the ready channel to unblock callers of WaitReady.
func (a *App) signalReady() {
a.readyOnce.Do(func() { close(a.ready) })
}
// SetPidFile sets the path to the PID file for the app.
func (a *App) SetPidFile(path string) {
a.pidFile = path

View File

@ -473,7 +473,9 @@ logging:
}()
// 等待一小段时间让服务器启动
time.Sleep(100 * time.Millisecond)
app.WaitReady()
waitForAppServerRunning(app, 2*time.Second)
// 验证配置已加载
if app.cfg == nil {
@ -528,7 +530,9 @@ logging:
done <- app.Run()
}()
time.Sleep(100 * time.Millisecond)
app.WaitReady()
waitForAppServerRunning(app, 2*time.Second)
// 验证配置已加载且包含变量
if app.cfg == nil {
@ -570,7 +574,9 @@ logging:
done <- app.Run()
}()
time.Sleep(100 * time.Millisecond)
app.WaitReady()
waitForAppServerRunning(app, 2*time.Second)
// 验证 DNS 解析器已创建
if app.resv == nil {
@ -615,7 +621,9 @@ logging:
done <- app.Run()
}()
time.Sleep(100 * time.Millisecond)
app.WaitReady()
waitForAppServerRunning(app, 2*time.Second)
// 验证多服务器配置
if app.cfg == nil {
@ -658,7 +666,9 @@ logging:
done <- app.Run()
}()
time.Sleep(100 * time.Millisecond)
app.WaitReady()
waitForAppServerRunning(app, 2*time.Second)
// 验证 PID 文件已创建
if _, err := os.Stat(pidPath); os.IsNotExist(err) {
@ -703,7 +713,9 @@ logging:
done <- app.Run()
}()
time.Sleep(150 * time.Millisecond)
app.WaitReady()
waitForAppServerRunning(app, 2*time.Second)
// 验证 Stream 服务器已创建
if app.streamSrv == nil {
@ -743,7 +755,9 @@ logging:
done <- app.Run()
}()
time.Sleep(100 * time.Millisecond)
app.WaitReady()
waitForAppServerRunning(app, 2*time.Second)
// 验证升级管理器已创建
if app.upgradeMgr == nil {
@ -783,7 +797,9 @@ logging:
done <- app.Run()
}()
time.Sleep(100 * time.Millisecond)
app.WaitReady()
waitForAppServerRunning(app, 2*time.Second)
// 验证日志文件路径已设置
if app.logFile != logPath {
@ -823,7 +839,9 @@ logging:
done <- app.Run()
}()
time.Sleep(100 * time.Millisecond)
app.WaitReady()
waitForAppServerRunning(app, 2*time.Second)
// 验证配置已加载
if app.cfg == nil {
@ -863,7 +881,9 @@ logging:
done <- app.Run()
}()
time.Sleep(100 * time.Millisecond)
app.WaitReady()
waitForAppServerRunning(app, 2*time.Second)
// 验证配置已加载
if app.cfg == nil {
@ -871,10 +891,16 @@ logging:
}
// 验证超时值
if app.cfg.Shutdown.GracefulTimeout != 10*time.Second {
t.Errorf("GracefulTimeout = %v, want 10s", app.cfg.Shutdown.GracefulTimeout)
}
// 停止服务器
app.srv.StopWithTimeout(1 * time.Second)
}
func waitForAppServerRunning(app *App, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if app.srv != nil && app.srv.Running() {
return true
}
time.Sleep(time.Millisecond)
}
return false
}

View File

@ -18,6 +18,7 @@ import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/quic-go/quic-go"
@ -54,7 +55,7 @@ type Server struct {
listener *quic.EarlyListener
// running 服务器运行状态
running bool
running atomic.Bool
// mu 读写锁
mu sync.RWMutex
@ -103,7 +104,7 @@ func (s *Server) Start() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.running {
if s.running.Load() {
return fmt.Errorf("server already running")
}
@ -160,21 +161,18 @@ func (s *Server) Start() error {
Handler: s.adapter.Wrap(s.handler),
}
s.running = true
s.running.Store(true)
logging.Info().
Str("listen", listenAddr).
Bool("0rtt", s.config.Enable0RTT).
Msg("HTTP/3 server started")
// 开始服务
http3Server := s.http3Server
listener := s.listener
go func() {
if err := s.http3Server.ServeListener(s.listener); err != nil {
s.mu.RLock()
running := s.running
s.mu.RUnlock()
if running {
if err := http3Server.ServeListener(listener); err != nil {
if s.running.Load() {
logging.Error().Err(err).Msg("HTTP/3 server error")
}
}
@ -193,11 +191,11 @@ func (s *Server) Stop() error {
s.mu.Lock()
defer s.mu.Unlock()
if !s.running {
if !s.running.Load() {
return nil
}
s.running = false
s.running.Store(false)
if s.http3Server != nil {
if err := s.http3Server.Close(); err != nil {

View File

@ -159,7 +159,7 @@ func TestNewServer_Success(t *testing.T) {
t.Error("TLS config not set correctly")
}
if server.running {
if server.running.Load() {
t.Error("Server should not be running initially")
}
}
@ -254,7 +254,7 @@ func TestNewServer_TableDriven(t *testing.T) {
assert.NotNil(t, server.handler)
assert.NotNil(t, server.adapter)
assert.Equal(t, tt.tlsConfig, server.tlsConfig)
assert.False(t, server.running)
assert.False(t, server.running.Load())
assert.Nil(t, server.listener)
assert.Nil(t, server.http3Server)
}
@ -281,7 +281,7 @@ func TestNewServer_VerifyInternalFields(t *testing.T) {
assert.Equal(t, cfg, server.config)
assert.Equal(t, tlsConfig, server.tlsConfig)
assert.NotNil(t, server.adapter)
assert.False(t, server.running)
assert.False(t, server.running.Load())
assert.Nil(t, server.listener)
assert.Nil(t, server.http3Server)
}
@ -364,7 +364,7 @@ func TestStart_Success(t *testing.T) {
require.NoError(t, server.Start())
t.Cleanup(func() { _ = server.Stop() })
assert.True(t, server.running)
assert.True(t, server.running.Load())
assert.NotNil(t, server.listener)
assert.NotNil(t, server.http3Server)
}
@ -436,7 +436,7 @@ func TestStart_QUICConfigDefaults(t *testing.T) {
require.NoError(t, server.Start())
t.Cleanup(func() { _ = server.Stop() })
assert.True(t, server.running)
assert.True(t, server.running.Load())
assert.NotNil(t, server.listener)
})
}
@ -456,17 +456,17 @@ func TestStart_MultipleStartsAndStops(t *testing.T) {
// 第一次启动
require.NoError(t, server.Start())
assert.True(t, server.running)
assert.True(t, server.running.Load())
// 停止
require.NoError(t, server.Stop())
assert.False(t, server.running)
assert.False(t, server.running.Load())
// 第二次启动(使用新端口,因为旧端口可能还在释放中)
server.config.Listen = ":0"
require.NoError(t, server.Start())
t.Cleanup(func() { _ = server.Stop() })
assert.True(t, server.running)
assert.True(t, server.running.Load())
}
// TestStop_NotRunning 测试停止未运行的服务器
@ -481,11 +481,11 @@ func TestStop_NotRunning(t *testing.T) {
server, err := NewServer(cfg, handler, tlsConfig)
require.NoError(t, err)
assert.False(t, server.running)
assert.False(t, server.running.Load())
err = server.Stop()
require.NoError(t, err)
assert.False(t, server.running)
assert.False(t, server.running.Load())
}
// TestStop_Running 测试停止正在运行的服务器
@ -501,10 +501,10 @@ func TestStop_Running(t *testing.T) {
require.NoError(t, err)
require.NoError(t, server.Start())
assert.True(t, server.running)
assert.True(t, server.running.Load())
require.NoError(t, server.Stop())
assert.False(t, server.running)
assert.False(t, server.running.Load())
}
// TestStop_CalledMultipleTimes 测试多次停止不会报错
@ -525,7 +525,7 @@ func TestStop_CalledMultipleTimes(t *testing.T) {
require.NoError(t, server.Stop())
require.NoError(t, server.Stop())
assert.False(t, server.running)
assert.False(t, server.running.Load())
}
// TestStartStop_Lifecycle 测试完整的生命周期
@ -543,17 +543,17 @@ func TestStartStop_Lifecycle(t *testing.T) {
server, err := NewServer(cfg, handler, tlsConfig)
require.NoError(t, err)
assert.False(t, server.running)
assert.False(t, server.running.Load())
assert.Nil(t, server.listener)
assert.Nil(t, server.http3Server)
require.NoError(t, server.Start())
assert.True(t, server.running)
assert.True(t, server.running.Load())
assert.NotNil(t, server.listener)
assert.NotNil(t, server.http3Server)
require.NoError(t, server.Stop())
assert.False(t, server.running)
assert.False(t, server.running.Load())
}

View File

@ -57,13 +57,15 @@ func (s *Server) createProxyForConfig(proxyCfg *config.ProxyConfig) *proxy.Proxy
if proxyCfg.HealthCheck.Interval > 0 {
hc := proxy.NewHealthChecker(targets, &proxyCfg.HealthCheck)
hc.Start()
s.proxiesMu.Lock()
s.healthCheckers = append(s.healthCheckers, hc)
// 设置被动健康检查
s.proxiesMu.Unlock()
p.SetHealthChecker(hc)
}
// 保存代理实例用于缓存统计
s.proxiesMu.Lock()
s.proxies = append(s.proxies, p)
s.proxiesMu.Unlock()
return p
}

View File

@ -76,6 +76,7 @@ type Server struct {
fastServer *fasthttp.Server
fastServers []*fasthttp.Server // 多监听器模式使用
proxies []*proxy.Proxy
proxiesMu sync.Mutex
listeners []net.Listener
healthCheckers []*proxy.HealthChecker
locationEngine *matcher.LocationEngine
@ -105,6 +106,11 @@ func New(cfg *config.Config) *Server {
return s
}
// Running reports whether the server is currently running.
func (s *Server) Running() bool {
return s.running.Load()
}
func (s *Server) handleRegistrationError(source, path string, err error) error {
var ce *matcher.ConflictError
if errors.As(err, &ce) {

View File

@ -275,7 +275,7 @@ func TestStartMultiServerMode_Integration_Basic(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -330,7 +330,7 @@ func TestStartMultiServerMode_Integration_WithProxy(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -385,7 +385,7 @@ func TestStartMultiServerMode_Integration_WithStaticFiles(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -428,7 +428,7 @@ func TestStartMultiServerMode_Integration_WithCacheAPI(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -480,7 +480,7 @@ func TestStartMultiServerMode_Integration_WithHealthCheck(t *testing.T) {
}
}()
time.Sleep(100 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -533,7 +533,7 @@ func TestStartMultiServerMode_Integration_WithMiddleware(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -580,7 +580,7 @@ func TestStartMultiServerMode_Integration_WithPerformance(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -616,7 +616,7 @@ func TestStartMultiServerMode_Integration_ThreeServers(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -663,7 +663,7 @@ func TestStartMultiServerMode_Integration_WithCompression(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -705,7 +705,7 @@ func TestStartMultiServerMode_Integration_WithRewrite(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -749,7 +749,7 @@ func TestStartMultiServerMode_Integration_WithConnLimiter(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -821,7 +821,7 @@ func TestStartMultiServerMode_Integration_MixedConfigs(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -862,7 +862,7 @@ func TestStartMultiServerMode_GracefulUpgradeFallback(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -957,7 +957,7 @@ func TestStartMultiServerMode_Integration_WithErrorPage(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -1002,7 +1002,7 @@ func TestStartMultiServerMode_Integration_WithMIMETypes(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -1123,7 +1123,7 @@ func TestStartMultiServerMode_Integration_WithAuthRequest(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -1192,7 +1192,7 @@ func TestStartMultiServerMode_Integration_WithDefaultServer(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {

View File

@ -52,9 +52,7 @@ func TestStartSingleMode_Integration_WithStaticFiles(t *testing.T) {
}()
// 等待服务器启动
time.Sleep(50 * time.Millisecond)
// 停止服务器
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -97,7 +95,7 @@ func TestStartSingleMode_Integration_WithProxy(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -141,7 +139,7 @@ func TestStartSingleMode_Integration_WithMonitoring(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -178,7 +176,7 @@ func TestStartSingleMode_Integration_WithCacheAPI(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -215,7 +213,7 @@ func TestStartSingleMode_Integration_WithCompression(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -260,7 +258,7 @@ func TestStartSingleMode_Integration_WithSecurity(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -296,7 +294,7 @@ func TestStartSingleMode_Integration_WithRewrite(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -341,7 +339,7 @@ func TestStartSingleMode_Integration_WithPerformance(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -391,7 +389,7 @@ func TestStartSingleMode_Integration_WithProxyLocationTypes(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -438,7 +436,7 @@ func TestStartSingleMode_Integration_WithStaticLocationTypes(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -484,7 +482,7 @@ func TestStartSingleMode_Integration_WithHealthCheck(t *testing.T) {
}
}()
time.Sleep(100 * time.Millisecond) // 给健康检查一些时间启动
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -523,7 +521,7 @@ func TestStartSingleMode_Integration_WithMIMETypes(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -568,7 +566,7 @@ func TestStartSingleMode_Integration_WithErrorPage(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -606,7 +604,7 @@ func TestStartSingleMode_Integration_WithConnLimiter(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {
@ -646,7 +644,7 @@ func TestStartSingleMode_Integration_WithAuthRequest(t *testing.T) {
}
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(2 * time.Second)
select {

View File

@ -0,0 +1,16 @@
package server
import (
"time"
)
func waitForServerRunning(s *Server, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if s.running.Load() {
return true
}
time.Sleep(time.Millisecond)
}
return false
}

View File

@ -1782,8 +1782,7 @@ func TestStartVHostMode_ActualExecution(t *testing.T) {
errCh <- s.Start()
}()
// 等待一小段时间让服务器启动
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
// 停止服务器
_ = s.GracefulStop(1 * time.Second)
@ -2146,7 +2145,7 @@ func TestStartVHostMode_ActualServerStart(t *testing.T) {
errCh <- s.Start()
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(1 * time.Second)
select {
@ -2183,7 +2182,7 @@ func TestStartVHostMode_ActualServerStart(t *testing.T) {
errCh <- s.Start()
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(1 * time.Second)
select {
@ -2222,7 +2221,7 @@ func TestStartVHostMode_ActualServerStart(t *testing.T) {
errCh <- s.Start()
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(1 * time.Second)
select {
@ -2255,7 +2254,7 @@ func TestStartVHostMode_ActualServerStart(t *testing.T) {
errCh <- s.Start()
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(1 * time.Second)
select {
@ -2292,7 +2291,7 @@ func TestStartVHostMode_ActualServerStart(t *testing.T) {
errCh <- s.Start()
}()
time.Sleep(50 * time.Millisecond)
waitForServerRunning(s, 2*time.Second)
_ = s.GracefulStop(1 * time.Second)
select {

View File

@ -573,8 +573,16 @@ func TestServe_ReceivesAndForwards(t *testing.T) {
upstream.targets[0].healthy.Store(true)
srv := newUDPServer(proxyConn, upstream, 10*time.Second)
go srv.serve()
go srv.startCleanupTicker()
srv.wg.Add(1)
go func() {
defer srv.wg.Done()
srv.serve()
}()
srv.wg.Add(1)
go func() {
defer srv.wg.Done()
srv.startCleanupTicker()
}()
clientConn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0})
require.NoError(t, err)