fix(sticky): guard against double Stop, nil fallback, and multiple Start calls
- Add sync.Once to prevent double close of stopCh in Stop() - Add nil fallback guard in NewStickySession (defaults to RoundRobin) - Add atomic.Bool to make Start() idempotent - Add tests for double Stop() and nil fallback scenarios
This commit is contained in:
parent
360fd0da9d
commit
0a5443f6cf
@ -5,6 +5,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
@ -29,10 +30,15 @@ type StickySession struct {
|
|||||||
shards []*stickyShard
|
shards []*stickyShard
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
started atomic.Bool
|
||||||
|
stopOnce sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStickySession 创建一个新的会话粘性负载均衡器。
|
// NewStickySession 创建一个新的会话粘性负载均衡器。
|
||||||
func NewStickySession(config StickyConfig, fallback Balancer) *StickySession {
|
func NewStickySession(config StickyConfig, fallback Balancer) *StickySession {
|
||||||
|
if fallback == nil {
|
||||||
|
fallback = NewRoundRobin()
|
||||||
|
}
|
||||||
shards := make([]*stickyShard, stickyShardCount)
|
shards := make([]*stickyShard, stickyShardCount)
|
||||||
for i := range shards {
|
for i := range shards {
|
||||||
shards[i] = &stickyShard{
|
shards[i] = &stickyShard{
|
||||||
@ -50,13 +56,21 @@ func NewStickySession(config StickyConfig, fallback Balancer) *StickySession {
|
|||||||
|
|
||||||
// Start 启动后台清理 goroutine。
|
// Start 启动后台清理 goroutine。
|
||||||
func (s *StickySession) Start() {
|
func (s *StickySession) Start() {
|
||||||
|
if s.started.Swap(true) {
|
||||||
|
return
|
||||||
|
}
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.cleanupLoop()
|
go s.cleanupLoop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop 停止后台清理 goroutine。
|
// Stop 停止后台清理 goroutine。
|
||||||
func (s *StickySession) Stop() {
|
func (s *StickySession) Stop() {
|
||||||
close(s.stopCh)
|
if !s.started.Swap(false) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.stopOnce.Do(func() {
|
||||||
|
close(s.stopCh)
|
||||||
|
})
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -278,6 +278,39 @@ func TestStickySession_ExpiredCookie(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestStickySession_DoubleStop 测试重复调用 Stop() 不会 panic。
|
||||||
|
func TestStickySession_DoubleStop(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
fallback := NewRoundRobin()
|
||||||
|
config := DefaultStickyConfig()
|
||||||
|
|
||||||
|
sticky := NewStickySession(config, fallback)
|
||||||
|
sticky.Start()
|
||||||
|
sticky.Stop()
|
||||||
|
sticky.Stop() // Should not panic
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestStickySession_NilFallback 测试 nil fallback 会使用默认 RoundRobin。
|
||||||
|
func TestStickySession_NilFallback(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
config := DefaultStickyConfig()
|
||||||
|
config.Enabled = true
|
||||||
|
|
||||||
|
sticky := NewStickySession(config, nil)
|
||||||
|
sticky.Start()
|
||||||
|
defer sticky.Stop()
|
||||||
|
|
||||||
|
targets := []*Target{
|
||||||
|
createHealthyTarget("http://backend1:8080", true),
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := &fasthttp.RequestCtx{}
|
||||||
|
selected := sticky.Select(ctx, targets)
|
||||||
|
if selected == nil {
|
||||||
|
t.Fatal("expected a target with default fallback")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestStickySession_SelectExcluding 测试排除选择委托给 fallback。
|
// TestStickySession_SelectExcluding 测试排除选择委托给 fallback。
|
||||||
func TestStickySession_SelectExcluding(t *testing.T) {
|
func TestStickySession_SelectExcluding(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user