refactor(proxy,server): 改进健康检查 goroutine 退出机制

- health: 使用 WaitGroup 确保 run goroutine 完全退出后再继续
- health_match: 忽略未使用参数,预分配 slice 容量
- proxy_coverage_extra_test: 使用 atomic.Int32 修复测试竞态条件
- purge: 忽略 Delete 返回值

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
xfy 2026-04-22 13:35:03 +08:00
parent 2bdc8f3b3b
commit 00c5319819
4 changed files with 19 additions and 13 deletions

View File

@ -65,6 +65,7 @@ type HealthChecker struct {
running atomic.Bool running atomic.Bool
matcher HealthMatch // 健康检查匹配器 matcher HealthMatch // 健康检查匹配器
slowStartManager *loadbalance.SlowStartManager // 慢启动管理器 slowStartManager *loadbalance.SlowStartManager // 慢启动管理器
wg sync.WaitGroup // 等待 run goroutine 退出
} }
// NewHealthChecker 使用指定的目标和配置创建一个新的 HealthChecker。 // NewHealthChecker 使用指定的目标和配置创建一个新的 HealthChecker。
@ -140,6 +141,7 @@ func (h *HealthChecker) Start() {
if h.slowStartManager != nil { if h.slowStartManager != nil {
h.slowStartManager.Start() h.slowStartManager.Start()
} }
h.wg.Add(1)
go h.run() go h.run()
} }
@ -151,10 +153,11 @@ func (h *HealthChecker) Stop() {
if !h.running.CompareAndSwap(true, false) { if !h.running.CompareAndSwap(true, false) {
return // 已经停止,直接返回 return // 已经停止,直接返回
} }
close(h.stopCh)
h.wg.Wait() // 等待 run goroutine 退出
if h.slowStartManager != nil { if h.slowStartManager != nil {
h.slowStartManager.Stop() h.slowStartManager.Stop()
} }
close(h.stopCh)
// 重新创建 stopCh 以支持后续 Start // 重新创建 stopCh 以支持后续 Start
h.stopCh = make(chan struct{}) h.stopCh = make(chan struct{})
} }
@ -163,6 +166,8 @@ func (h *HealthChecker) Stop() {
// 它对所有目标执行初始检查,然后进入循环, // 它对所有目标执行初始检查,然后进入循环,
// 以固定间隔检查目标,直到被停止。 // 以固定间隔检查目标,直到被停止。
func (h *HealthChecker) run() { func (h *HealthChecker) run() {
defer h.wg.Done()
// 执行初始健康检查 // 执行初始健康检查
h.checkAll() h.checkAll()

View File

@ -42,7 +42,7 @@ type HealthMatch interface {
type defaultHealthMatch struct{} type defaultHealthMatch struct{}
// Match 实现 HealthMatch 接口。 // Match 实现 HealthMatch 接口。
func (m *defaultHealthMatch) Match(status int, body []byte, headers map[string]string) bool { func (m *defaultHealthMatch) Match(status int, _ []byte, _ map[string]string) bool {
return status >= 200 && status < 300 return status >= 200 && status < 300
} }
@ -121,7 +121,7 @@ func NewHealthMatch(cfg *HealthMatchConfig) HealthMatch {
} }
// 解析状态码范围 // 解析状态码范围
var ranges []statusRange ranges := make([]statusRange, 0, len(cfg.Status))
for _, s := range cfg.Status { for _, s := range cfg.Status {
r, err := parseStatusRange(s) r, err := parseStatusRange(s)
if err != nil { if err != nil {
@ -142,7 +142,7 @@ func NewHealthMatch(cfg *HealthMatchConfig) HealthMatch {
} }
// 解析响应头匹配 // 解析响应头匹配
var headerMatches []headerMatch headerMatches := make([]headerMatch, 0, len(cfg.Headers))
for k, v := range cfg.Headers { for k, v := range cfg.Headers {
headerMatches = append(headerMatches, headerMatch{ headerMatches = append(headerMatches, headerMatch{
key: strings.ToLower(k), // 统一小写 key: strings.ToLower(k), // 统一小写

View File

@ -20,6 +20,7 @@ import (
"net/http/httptest" "net/http/httptest"
"os" "os"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -114,9 +115,9 @@ func TestHealthChecker_Run(t *testing.T) {
}) })
t.Run("定时检查执行", func(t *testing.T) { t.Run("定时检查执行", func(t *testing.T) {
requestCount := 0 var requestCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++ requestCount.Add(1)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
})) }))
defer server.Close() defer server.Close()
@ -135,15 +136,15 @@ func TestHealthChecker_Run(t *testing.T) {
checker.Stop() checker.Stop()
// 应该至少执行初始检查 + 2 次定时检查 // 应该至少执行初始检查 + 2 次定时检查
if requestCount < 2 { if requestCount.Load() < 2 {
t.Errorf("期望至少 2 次检查,实际 %d 次", requestCount) t.Errorf("期望至少 2 次检查,实际 %d 次", requestCount.Load())
} }
}) })
t.Run("停止后不再检查", func(t *testing.T) { t.Run("停止后不再检查", func(t *testing.T) {
requestCount := 0 var requestCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++ requestCount.Add(1)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
})) }))
defer server.Close() defer server.Close()
@ -159,12 +160,12 @@ func TestHealthChecker_Run(t *testing.T) {
checker.Start() checker.Start()
time.Sleep(60 * time.Millisecond) time.Sleep(60 * time.Millisecond)
checker.Stop() checker.Stop()
countAfterStop := requestCount countAfterStop := requestCount.Load()
// 等待一段时间,确认不再有检查 // 等待一段时间,确认不再有检查
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
if requestCount != countAfterStop { if requestCount.Load() != countAfterStop {
t.Error("停止后不应再执行检查") t.Error("停止后不应再执行检查")
} }
}) })

View File

@ -204,7 +204,7 @@ func (h *PurgeHandler) purgeByPath(path string, method string) int {
for _, p := range h.server.proxies { for _, p := range h.server.proxies {
if pcache := p.GetCache(); pcache != nil { if pcache := p.GetCache(); pcache != nil {
pcache.Delete(hashKey) _ = pcache.Delete(hashKey)
deleted++ deleted++
} }
} }