From 4789265ca8e7ac3ee93d5eb09fca75eb42fa0ae6 Mon Sep 17 00:00:00 2001 From: xfy Date: Fri, 5 Jun 2026 12:31:41 +0800 Subject: [PATCH] fix: add synchronization for concurrent access in server/app/http3/stream --- internal/app/app.go | 2 + internal/app/app_common.go | 14 +++++ internal/app/app_test.go | 56 ++++++++++++++------ internal/http3/server.go | 22 ++++---- internal/http3/server_test.go | 32 +++++------ internal/server/router.go | 6 ++- internal/server/server.go | 6 +++ internal/server/startmultiservermode_test.go | 34 ++++++------ internal/server/startsinglemode_test.go | 32 ++++++----- internal/server/testing_helpers_test.go | 16 ++++++ internal/server/vhost_test.go | 13 +++-- internal/stream/server_coverage_test.go | 12 ++++- 12 files changed, 157 insertions(+), 88 deletions(-) create mode 100644 internal/server/testing_helpers_test.go diff --git a/internal/app/app.go b/internal/app/app.go index 774f29d..230d15f 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -45,6 +45,8 @@ func (a *App) Run() int { _ = a.upgradeMgr.WritePid() } + a.signalReady() + sigChan := make(chan os.Signal, 1) a.setupSignalHandlers(sigChan) diff --git a/internal/app/app_common.go b/internal/app/app_common.go index 86b924a..4867911 100644 --- a/internal/app/app_common.go +++ b/internal/app/app_common.go @@ -9,6 +9,7 @@ import ( "fmt" "net" "os" + "sync" "time" "rua.plus/lolly/internal/config" @@ -35,15 +36,28 @@ type App struct { pidFile string logFile string listeners []net.Listener + ready chan struct{} + readyOnce sync.Once } // NewApp creates a new App instance with the given config path. func NewApp(cfgPath string) *App { return &App{ cfgPath: cfgPath, + ready: make(chan struct{}), } } +// WaitReady blocks until the app has completed initialization. +func (a *App) WaitReady() { + <-a.ready +} + +// signalReady closes the ready channel to unblock callers of WaitReady. +func (a *App) signalReady() { + a.readyOnce.Do(func() { close(a.ready) }) +} + // SetPidFile sets the path to the PID file for the app. func (a *App) SetPidFile(path string) { a.pidFile = path diff --git a/internal/app/app_test.go b/internal/app/app_test.go index b4ada57..185e483 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -473,7 +473,9 @@ logging: }() // 等待一小段时间让服务器启动 - time.Sleep(100 * time.Millisecond) + app.WaitReady() + + waitForAppServerRunning(app, 2*time.Second) // 验证配置已加载 if app.cfg == nil { @@ -528,7 +530,9 @@ logging: done <- app.Run() }() - time.Sleep(100 * time.Millisecond) + app.WaitReady() + + waitForAppServerRunning(app, 2*time.Second) // 验证配置已加载且包含变量 if app.cfg == nil { @@ -570,7 +574,9 @@ logging: done <- app.Run() }() - time.Sleep(100 * time.Millisecond) + app.WaitReady() + + waitForAppServerRunning(app, 2*time.Second) // 验证 DNS 解析器已创建 if app.resv == nil { @@ -615,7 +621,9 @@ logging: done <- app.Run() }() - time.Sleep(100 * time.Millisecond) + app.WaitReady() + + waitForAppServerRunning(app, 2*time.Second) // 验证多服务器配置 if app.cfg == nil { @@ -658,7 +666,9 @@ logging: done <- app.Run() }() - time.Sleep(100 * time.Millisecond) + app.WaitReady() + + waitForAppServerRunning(app, 2*time.Second) // 验证 PID 文件已创建 if _, err := os.Stat(pidPath); os.IsNotExist(err) { @@ -703,7 +713,9 @@ logging: done <- app.Run() }() - time.Sleep(150 * time.Millisecond) + app.WaitReady() + + waitForAppServerRunning(app, 2*time.Second) // 验证 Stream 服务器已创建 if app.streamSrv == nil { @@ -743,7 +755,9 @@ logging: done <- app.Run() }() - time.Sleep(100 * time.Millisecond) + app.WaitReady() + + waitForAppServerRunning(app, 2*time.Second) // 验证升级管理器已创建 if app.upgradeMgr == nil { @@ -783,7 +797,9 @@ logging: done <- app.Run() }() - time.Sleep(100 * time.Millisecond) + app.WaitReady() + + waitForAppServerRunning(app, 2*time.Second) // 验证日志文件路径已设置 if app.logFile != logPath { @@ -823,7 +839,9 @@ logging: done <- app.Run() }() - time.Sleep(100 * time.Millisecond) + app.WaitReady() + + waitForAppServerRunning(app, 2*time.Second) // 验证配置已加载 if app.cfg == nil { @@ -863,7 +881,9 @@ logging: done <- app.Run() }() - time.Sleep(100 * time.Millisecond) + app.WaitReady() + + waitForAppServerRunning(app, 2*time.Second) // 验证配置已加载 if app.cfg == nil { @@ -871,10 +891,16 @@ logging: } // 验证超时值 - if app.cfg.Shutdown.GracefulTimeout != 10*time.Second { - t.Errorf("GracefulTimeout = %v, want 10s", app.cfg.Shutdown.GracefulTimeout) - } - - // 停止服务器 app.srv.StopWithTimeout(1 * time.Second) } + +func waitForAppServerRunning(app *App, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if app.srv != nil && app.srv.Running() { + return true + } + time.Sleep(time.Millisecond) + } + return false +} diff --git a/internal/http3/server.go b/internal/http3/server.go index 40f1384..79b5460 100644 --- a/internal/http3/server.go +++ b/internal/http3/server.go @@ -18,6 +18,7 @@ import ( "fmt" "net" "sync" + "sync/atomic" "time" "github.com/quic-go/quic-go" @@ -54,7 +55,7 @@ type Server struct { listener *quic.EarlyListener // running 服务器运行状态 - running bool + running atomic.Bool // mu 读写锁 mu sync.RWMutex @@ -103,7 +104,7 @@ func (s *Server) Start() error { s.mu.Lock() defer s.mu.Unlock() - if s.running { + if s.running.Load() { return fmt.Errorf("server already running") } @@ -160,21 +161,18 @@ func (s *Server) Start() error { Handler: s.adapter.Wrap(s.handler), } - s.running = true + s.running.Store(true) logging.Info(). Str("listen", listenAddr). Bool("0rtt", s.config.Enable0RTT). Msg("HTTP/3 server started") - // 开始服务 + http3Server := s.http3Server + listener := s.listener go func() { - if err := s.http3Server.ServeListener(s.listener); err != nil { - s.mu.RLock() - running := s.running - s.mu.RUnlock() - - if running { + if err := http3Server.ServeListener(listener); err != nil { + if s.running.Load() { logging.Error().Err(err).Msg("HTTP/3 server error") } } @@ -193,11 +191,11 @@ func (s *Server) Stop() error { s.mu.Lock() defer s.mu.Unlock() - if !s.running { + if !s.running.Load() { return nil } - s.running = false + s.running.Store(false) if s.http3Server != nil { if err := s.http3Server.Close(); err != nil { diff --git a/internal/http3/server_test.go b/internal/http3/server_test.go index 6f9b89a..180f198 100644 --- a/internal/http3/server_test.go +++ b/internal/http3/server_test.go @@ -159,7 +159,7 @@ func TestNewServer_Success(t *testing.T) { t.Error("TLS config not set correctly") } - if server.running { + if server.running.Load() { t.Error("Server should not be running initially") } } @@ -254,7 +254,7 @@ func TestNewServer_TableDriven(t *testing.T) { assert.NotNil(t, server.handler) assert.NotNil(t, server.adapter) assert.Equal(t, tt.tlsConfig, server.tlsConfig) - assert.False(t, server.running) + assert.False(t, server.running.Load()) assert.Nil(t, server.listener) assert.Nil(t, server.http3Server) } @@ -281,7 +281,7 @@ func TestNewServer_VerifyInternalFields(t *testing.T) { assert.Equal(t, cfg, server.config) assert.Equal(t, tlsConfig, server.tlsConfig) assert.NotNil(t, server.adapter) - assert.False(t, server.running) + assert.False(t, server.running.Load()) assert.Nil(t, server.listener) assert.Nil(t, server.http3Server) } @@ -364,7 +364,7 @@ func TestStart_Success(t *testing.T) { require.NoError(t, server.Start()) t.Cleanup(func() { _ = server.Stop() }) - assert.True(t, server.running) + assert.True(t, server.running.Load()) assert.NotNil(t, server.listener) assert.NotNil(t, server.http3Server) } @@ -436,7 +436,7 @@ func TestStart_QUICConfigDefaults(t *testing.T) { require.NoError(t, server.Start()) t.Cleanup(func() { _ = server.Stop() }) - assert.True(t, server.running) + assert.True(t, server.running.Load()) assert.NotNil(t, server.listener) }) } @@ -456,17 +456,17 @@ func TestStart_MultipleStartsAndStops(t *testing.T) { // 第一次启动 require.NoError(t, server.Start()) - assert.True(t, server.running) + assert.True(t, server.running.Load()) // 停止 require.NoError(t, server.Stop()) - assert.False(t, server.running) + assert.False(t, server.running.Load()) // 第二次启动(使用新端口,因为旧端口可能还在释放中) server.config.Listen = ":0" require.NoError(t, server.Start()) t.Cleanup(func() { _ = server.Stop() }) - assert.True(t, server.running) + assert.True(t, server.running.Load()) } // TestStop_NotRunning 测试停止未运行的服务器 @@ -481,11 +481,11 @@ func TestStop_NotRunning(t *testing.T) { server, err := NewServer(cfg, handler, tlsConfig) require.NoError(t, err) - assert.False(t, server.running) + assert.False(t, server.running.Load()) err = server.Stop() require.NoError(t, err) - assert.False(t, server.running) + assert.False(t, server.running.Load()) } // TestStop_Running 测试停止正在运行的服务器 @@ -501,10 +501,10 @@ func TestStop_Running(t *testing.T) { require.NoError(t, err) require.NoError(t, server.Start()) - assert.True(t, server.running) + assert.True(t, server.running.Load()) require.NoError(t, server.Stop()) - assert.False(t, server.running) + assert.False(t, server.running.Load()) } // TestStop_CalledMultipleTimes 测试多次停止不会报错 @@ -525,7 +525,7 @@ func TestStop_CalledMultipleTimes(t *testing.T) { require.NoError(t, server.Stop()) require.NoError(t, server.Stop()) - assert.False(t, server.running) + assert.False(t, server.running.Load()) } // TestStartStop_Lifecycle 测试完整的生命周期 @@ -543,17 +543,17 @@ func TestStartStop_Lifecycle(t *testing.T) { server, err := NewServer(cfg, handler, tlsConfig) require.NoError(t, err) - assert.False(t, server.running) + assert.False(t, server.running.Load()) assert.Nil(t, server.listener) assert.Nil(t, server.http3Server) require.NoError(t, server.Start()) - assert.True(t, server.running) + assert.True(t, server.running.Load()) assert.NotNil(t, server.listener) assert.NotNil(t, server.http3Server) require.NoError(t, server.Stop()) - assert.False(t, server.running) + assert.False(t, server.running.Load()) } diff --git a/internal/server/router.go b/internal/server/router.go index f760e51..395096d 100644 --- a/internal/server/router.go +++ b/internal/server/router.go @@ -57,13 +57,15 @@ func (s *Server) createProxyForConfig(proxyCfg *config.ProxyConfig) *proxy.Proxy if proxyCfg.HealthCheck.Interval > 0 { hc := proxy.NewHealthChecker(targets, &proxyCfg.HealthCheck) hc.Start() + s.proxiesMu.Lock() s.healthCheckers = append(s.healthCheckers, hc) - // 设置被动健康检查 + s.proxiesMu.Unlock() p.SetHealthChecker(hc) } - // 保存代理实例用于缓存统计 + s.proxiesMu.Lock() s.proxies = append(s.proxies, p) + s.proxiesMu.Unlock() return p } diff --git a/internal/server/server.go b/internal/server/server.go index 8b62921..46d1c13 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -76,6 +76,7 @@ type Server struct { fastServer *fasthttp.Server fastServers []*fasthttp.Server // 多监听器模式使用 proxies []*proxy.Proxy + proxiesMu sync.Mutex listeners []net.Listener healthCheckers []*proxy.HealthChecker locationEngine *matcher.LocationEngine @@ -105,6 +106,11 @@ func New(cfg *config.Config) *Server { return s } +// Running reports whether the server is currently running. +func (s *Server) Running() bool { + return s.running.Load() +} + func (s *Server) handleRegistrationError(source, path string, err error) error { var ce *matcher.ConflictError if errors.As(err, &ce) { diff --git a/internal/server/startmultiservermode_test.go b/internal/server/startmultiservermode_test.go index 98db156..264eb08 100644 --- a/internal/server/startmultiservermode_test.go +++ b/internal/server/startmultiservermode_test.go @@ -275,7 +275,7 @@ func TestStartMultiServerMode_Integration_Basic(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -330,7 +330,7 @@ func TestStartMultiServerMode_Integration_WithProxy(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -385,7 +385,7 @@ func TestStartMultiServerMode_Integration_WithStaticFiles(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -428,7 +428,7 @@ func TestStartMultiServerMode_Integration_WithCacheAPI(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -480,7 +480,7 @@ func TestStartMultiServerMode_Integration_WithHealthCheck(t *testing.T) { } }() - time.Sleep(100 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -533,7 +533,7 @@ func TestStartMultiServerMode_Integration_WithMiddleware(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -580,7 +580,7 @@ func TestStartMultiServerMode_Integration_WithPerformance(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -616,7 +616,7 @@ func TestStartMultiServerMode_Integration_ThreeServers(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -663,7 +663,7 @@ func TestStartMultiServerMode_Integration_WithCompression(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -705,7 +705,7 @@ func TestStartMultiServerMode_Integration_WithRewrite(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -749,7 +749,7 @@ func TestStartMultiServerMode_Integration_WithConnLimiter(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -821,7 +821,7 @@ func TestStartMultiServerMode_Integration_MixedConfigs(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -862,7 +862,7 @@ func TestStartMultiServerMode_GracefulUpgradeFallback(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -957,7 +957,7 @@ func TestStartMultiServerMode_Integration_WithErrorPage(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -1002,7 +1002,7 @@ func TestStartMultiServerMode_Integration_WithMIMETypes(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -1123,7 +1123,7 @@ func TestStartMultiServerMode_Integration_WithAuthRequest(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -1192,7 +1192,7 @@ func TestStartMultiServerMode_Integration_WithDefaultServer(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { diff --git a/internal/server/startsinglemode_test.go b/internal/server/startsinglemode_test.go index a435887..14d69c9 100644 --- a/internal/server/startsinglemode_test.go +++ b/internal/server/startsinglemode_test.go @@ -52,9 +52,7 @@ func TestStartSingleMode_Integration_WithStaticFiles(t *testing.T) { }() // 等待服务器启动 - time.Sleep(50 * time.Millisecond) - - // 停止服务器 + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -97,7 +95,7 @@ func TestStartSingleMode_Integration_WithProxy(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -141,7 +139,7 @@ func TestStartSingleMode_Integration_WithMonitoring(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -178,7 +176,7 @@ func TestStartSingleMode_Integration_WithCacheAPI(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -215,7 +213,7 @@ func TestStartSingleMode_Integration_WithCompression(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -260,7 +258,7 @@ func TestStartSingleMode_Integration_WithSecurity(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -296,7 +294,7 @@ func TestStartSingleMode_Integration_WithRewrite(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -341,7 +339,7 @@ func TestStartSingleMode_Integration_WithPerformance(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -391,7 +389,7 @@ func TestStartSingleMode_Integration_WithProxyLocationTypes(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -438,7 +436,7 @@ func TestStartSingleMode_Integration_WithStaticLocationTypes(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -484,7 +482,7 @@ func TestStartSingleMode_Integration_WithHealthCheck(t *testing.T) { } }() - time.Sleep(100 * time.Millisecond) // 给健康检查一些时间启动 + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -523,7 +521,7 @@ func TestStartSingleMode_Integration_WithMIMETypes(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -568,7 +566,7 @@ func TestStartSingleMode_Integration_WithErrorPage(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -606,7 +604,7 @@ func TestStartSingleMode_Integration_WithConnLimiter(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { @@ -646,7 +644,7 @@ func TestStartSingleMode_Integration_WithAuthRequest(t *testing.T) { } }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(2 * time.Second) select { diff --git a/internal/server/testing_helpers_test.go b/internal/server/testing_helpers_test.go new file mode 100644 index 0000000..97ac404 --- /dev/null +++ b/internal/server/testing_helpers_test.go @@ -0,0 +1,16 @@ +package server + +import ( + "time" +) + +func waitForServerRunning(s *Server, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if s.running.Load() { + return true + } + time.Sleep(time.Millisecond) + } + return false +} diff --git a/internal/server/vhost_test.go b/internal/server/vhost_test.go index 394833b..7a3cb33 100644 --- a/internal/server/vhost_test.go +++ b/internal/server/vhost_test.go @@ -1782,8 +1782,7 @@ func TestStartVHostMode_ActualExecution(t *testing.T) { errCh <- s.Start() }() - // 等待一小段时间让服务器启动 - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) // 停止服务器 _ = s.GracefulStop(1 * time.Second) @@ -2146,7 +2145,7 @@ func TestStartVHostMode_ActualServerStart(t *testing.T) { errCh <- s.Start() }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(1 * time.Second) select { @@ -2183,7 +2182,7 @@ func TestStartVHostMode_ActualServerStart(t *testing.T) { errCh <- s.Start() }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(1 * time.Second) select { @@ -2222,7 +2221,7 @@ func TestStartVHostMode_ActualServerStart(t *testing.T) { errCh <- s.Start() }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(1 * time.Second) select { @@ -2255,7 +2254,7 @@ func TestStartVHostMode_ActualServerStart(t *testing.T) { errCh <- s.Start() }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(1 * time.Second) select { @@ -2292,7 +2291,7 @@ func TestStartVHostMode_ActualServerStart(t *testing.T) { errCh <- s.Start() }() - time.Sleep(50 * time.Millisecond) + waitForServerRunning(s, 2*time.Second) _ = s.GracefulStop(1 * time.Second) select { diff --git a/internal/stream/server_coverage_test.go b/internal/stream/server_coverage_test.go index 530cf55..8fc419a 100644 --- a/internal/stream/server_coverage_test.go +++ b/internal/stream/server_coverage_test.go @@ -573,8 +573,16 @@ func TestServe_ReceivesAndForwards(t *testing.T) { upstream.targets[0].healthy.Store(true) srv := newUDPServer(proxyConn, upstream, 10*time.Second) - go srv.serve() - go srv.startCleanupTicker() + srv.wg.Add(1) + go func() { + defer srv.wg.Done() + srv.serve() + }() + srv.wg.Add(1) + go func() { + defer srv.wg.Done() + srv.startCleanupTicker() + }() clientConn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0}) require.NoError(t, err)