- Add missing newlines at end of files - Fix indentation in ssl.go - Remove extra blank lines Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
453 lines
9.7 KiB
Go
453 lines
9.7 KiB
Go
// Package server 提供 Goroutine 池功能的测试。
|
||
//
|
||
// 该文件测试 Goroutine 池的各项功能,包括:
|
||
// - 池的创建和配置
|
||
// - 启动和停止控制
|
||
// - 任务提交和执行
|
||
// - 并发提交处理
|
||
// - Handler 包装功能
|
||
// - 统计信息收集
|
||
//
|
||
// 作者:xfy
|
||
package server
|
||
|
||
import (
|
||
"sync"
|
||
"sync/atomic"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/valyala/fasthttp"
|
||
)
|
||
|
||
func TestNewGoroutinePool(t *testing.T) {
|
||
cfg := PoolConfig{
|
||
MaxWorkers: 100,
|
||
MinWorkers: 10,
|
||
IdleTimeout: 30 * time.Second,
|
||
QueueSize: 50,
|
||
}
|
||
|
||
p := NewGoroutinePool(cfg)
|
||
if p == nil {
|
||
t.Fatal("Expected non-nil pool")
|
||
}
|
||
|
||
// 检查配置
|
||
if p.maxWorkers != 100 {
|
||
t.Errorf("Expected maxWorkers 100, got %d", p.maxWorkers)
|
||
}
|
||
if p.minWorkers != 10 {
|
||
t.Errorf("Expected minWorkers 10, got %d", p.minWorkers)
|
||
}
|
||
}
|
||
|
||
func TestPoolDefaults(t *testing.T) {
|
||
p := NewGoroutinePool(PoolConfig{})
|
||
|
||
// 应该使用默认值
|
||
if p.maxWorkers != 10000 {
|
||
t.Errorf("Expected default maxWorkers 10000, got %d", p.maxWorkers)
|
||
}
|
||
if p.minWorkers != 100 {
|
||
t.Errorf("Expected default minWorkers 100, got %d", p.minWorkers)
|
||
}
|
||
}
|
||
|
||
func TestPoolStartStop(t *testing.T) {
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 10,
|
||
MinWorkers: 2,
|
||
})
|
||
|
||
p.Start()
|
||
if !p.running.Load() {
|
||
t.Error("Expected pool to be running")
|
||
}
|
||
|
||
p.Stop()
|
||
if p.running.Load() {
|
||
t.Error("Expected pool to be stopped")
|
||
}
|
||
}
|
||
|
||
func TestPoolSubmit(t *testing.T) {
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 10,
|
||
MinWorkers: 2,
|
||
QueueSize: 10,
|
||
IdleTimeout: 5 * time.Second,
|
||
})
|
||
|
||
p.Start()
|
||
defer p.Stop()
|
||
|
||
var executed atomic.Bool
|
||
task := func(*fasthttp.RequestCtx) {
|
||
executed.Store(true)
|
||
}
|
||
|
||
err := p.Submit(nil, task)
|
||
if err != nil {
|
||
t.Errorf("Submit failed: %v", err)
|
||
}
|
||
|
||
// 等待任务执行
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
if !executed.Load() {
|
||
t.Error("Expected task to be executed")
|
||
}
|
||
}
|
||
|
||
func TestPoolStats(t *testing.T) {
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 100,
|
||
MinWorkers: 10,
|
||
QueueSize: 50,
|
||
IdleTimeout: 30 * time.Second,
|
||
})
|
||
|
||
p.Start()
|
||
defer p.Stop()
|
||
|
||
stats := p.Stats()
|
||
|
||
if stats.MaxWorkers != 100 {
|
||
t.Errorf("Expected MaxWorkers 100, got %d", stats.MaxWorkers)
|
||
}
|
||
if stats.MinWorkers != 10 {
|
||
t.Errorf("Expected MinWorkers 10, got %d", stats.MinWorkers)
|
||
}
|
||
if stats.QueueCap != 50 {
|
||
t.Errorf("Expected QueueCap 50, got %d", stats.QueueCap)
|
||
}
|
||
}
|
||
|
||
func TestPoolConcurrentSubmit(t *testing.T) {
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 50,
|
||
MinWorkers: 5,
|
||
QueueSize: 100,
|
||
IdleTimeout: 5 * time.Second,
|
||
})
|
||
|
||
p.Start()
|
||
defer p.Stop()
|
||
|
||
var counter atomic.Int32
|
||
var wg sync.WaitGroup
|
||
|
||
for range 100 {
|
||
wg.Go(func() {
|
||
_ = p.Submit(nil, func(_ *fasthttp.RequestCtx) {
|
||
counter.Add(1)
|
||
})
|
||
})
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
// 等待所有任务执行
|
||
time.Sleep(500 * time.Millisecond)
|
||
|
||
if counter.Load() != 100 {
|
||
t.Errorf("Expected 100 executions, got %d", counter.Load())
|
||
}
|
||
}
|
||
|
||
func TestPoolSubmitWhenStopped(t *testing.T) {
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 10,
|
||
})
|
||
|
||
// 不启动池
|
||
var executed atomic.Bool
|
||
task := func(_ *fasthttp.RequestCtx) {
|
||
executed.Store(true)
|
||
}
|
||
|
||
err := p.Submit(nil, task)
|
||
if err != nil {
|
||
t.Errorf("Submit should not fail when stopped: %v", err)
|
||
}
|
||
|
||
// 任务应该直接执行
|
||
if !executed.Load() {
|
||
t.Error("Expected task to be executed directly when pool is stopped")
|
||
}
|
||
}
|
||
|
||
func TestPoolWrapHandler(t *testing.T) {
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 10,
|
||
MinWorkers: 2,
|
||
QueueSize: 10,
|
||
IdleTimeout: 5 * time.Second,
|
||
})
|
||
|
||
p.Start()
|
||
defer p.Stop()
|
||
|
||
var executed atomic.Bool
|
||
originalHandler := func(ctx *fasthttp.RequestCtx) {
|
||
executed.Store(true)
|
||
ctx.SetBodyString("wrapped response")
|
||
}
|
||
|
||
wrappedHandler := p.WrapHandler(originalHandler)
|
||
|
||
ctx := &fasthttp.RequestCtx{}
|
||
ctx.Init(&fasthttp.Request{}, nil, nil)
|
||
|
||
wrappedHandler(ctx)
|
||
|
||
// 等待异步执行
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
if !executed.Load() {
|
||
t.Error("Expected wrapped handler to be executed")
|
||
}
|
||
}
|
||
|
||
func TestPoolWrapHandler_WhenStopped(t *testing.T) {
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 10,
|
||
})
|
||
// 不启动池
|
||
|
||
var executed atomic.Bool
|
||
originalHandler := func(*fasthttp.RequestCtx) {
|
||
executed.Store(true)
|
||
}
|
||
|
||
wrappedHandler := p.WrapHandler(originalHandler)
|
||
|
||
ctx := &fasthttp.RequestCtx{}
|
||
ctx.Init(&fasthttp.Request{}, nil, nil)
|
||
|
||
wrappedHandler(ctx)
|
||
|
||
// 池停止时应该直接执行
|
||
if !executed.Load() {
|
||
t.Error("Expected handler to be executed directly when pool is stopped")
|
||
}
|
||
}
|
||
|
||
func TestPoolSubmit_QueueFull_StartNewWorker(t *testing.T) {
|
||
// 测试队列满时启动新 worker
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 10,
|
||
MinWorkers: 1,
|
||
QueueSize: 1, // 小队列,容易满
|
||
IdleTimeout: 5 * time.Second,
|
||
})
|
||
|
||
p.Start()
|
||
defer p.Stop()
|
||
|
||
// 填满队列,让后续提交触发 default 分支
|
||
var executedCount atomic.Int32
|
||
blockTask := func(*fasthttp.RequestCtx) {
|
||
time.Sleep(200 * time.Millisecond) // 阻塞任务
|
||
executedCount.Add(1)
|
||
}
|
||
|
||
// 先提交一个阻塞任务,让 worker 忙碌
|
||
_ = p.Submit(nil, blockTask)
|
||
time.Sleep(50 * time.Millisecond) // 等待 worker 开始执行
|
||
|
||
// 填满队列
|
||
_ = p.Submit(nil, func(*fasthttp.RequestCtx) { executedCount.Add(1) })
|
||
|
||
// 此时队列满,应该启动新 worker
|
||
_ = p.Submit(nil, func(*fasthttp.RequestCtx) { executedCount.Add(1) })
|
||
|
||
// 等待所有任务完成
|
||
time.Sleep(500 * time.Millisecond)
|
||
|
||
if executedCount.Load() < 2 {
|
||
t.Errorf("Expected at least 2 executions, got %d", executedCount.Load())
|
||
}
|
||
}
|
||
|
||
func TestPoolSubmit_QueueFull_MaxWorkers_Fallback(t *testing.T) {
|
||
// 测试队列满且达到最大 worker 时直接执行(fallback)
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 1, // 只有 1 个 worker
|
||
MinWorkers: 1,
|
||
QueueSize: 1, // 队列大小 1
|
||
IdleTimeout: 5 * time.Second,
|
||
})
|
||
|
||
p.Start()
|
||
defer p.Stop()
|
||
|
||
// 使用 channel 阻塞唯一的 worker
|
||
blockCh := make(chan struct{})
|
||
started := make(chan struct{})
|
||
_ = p.Submit(nil, func(*fasthttp.RequestCtx) {
|
||
close(started) // 通知 worker 已开始
|
||
<-blockCh // 阻塞直到测试结束
|
||
})
|
||
|
||
// 等待 worker 开始执行阻塞任务
|
||
<-started
|
||
|
||
// 填满队列
|
||
_ = p.Submit(nil, func(*fasthttp.RequestCtx) {})
|
||
|
||
// 现在唯一的 worker 在阻塞,队列已满
|
||
// 提交新任务应该触发 fallback 直接执行
|
||
var fallbackExecuted atomic.Bool
|
||
_ = p.Submit(nil, func(*fasthttp.RequestCtx) {
|
||
fallbackExecuted.Store(true)
|
||
})
|
||
|
||
// fallback 执行是同步的直接执行
|
||
if !fallbackExecuted.Load() {
|
||
t.Error("Expected task to be executed directly (fallback) when max workers reached")
|
||
}
|
||
|
||
// 释放阻塞的 worker
|
||
close(blockCh)
|
||
}
|
||
|
||
func TestPoolSubmit_WithIdleWorkers(t *testing.T) {
|
||
// 测试有空闲 worker 时不启动新 worker
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 10,
|
||
MinWorkers: 5, // 预热 5 个 worker
|
||
QueueSize: 10,
|
||
IdleTimeout: 5 * time.Second,
|
||
})
|
||
|
||
p.Start()
|
||
defer p.Stop()
|
||
|
||
// 等待预热完成,worker 应该是空闲的
|
||
time.Sleep(50 * time.Millisecond)
|
||
|
||
initialWorkers := atomic.LoadInt32(&p.workers)
|
||
|
||
// 提交任务,应该复用空闲 worker
|
||
var executed atomic.Bool
|
||
_ = p.Submit(nil, func(*fasthttp.RequestCtx) {
|
||
executed.Store(true)
|
||
})
|
||
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
if !executed.Load() {
|
||
t.Error("Expected task to be executed")
|
||
}
|
||
|
||
// worker 数量应该保持稳定或更少(不应该增加)
|
||
finalWorkers := atomic.LoadInt32(&p.workers)
|
||
if finalWorkers > initialWorkers {
|
||
t.Errorf("Worker count should not increase when idle workers available: %d -> %d", initialWorkers, finalWorkers)
|
||
}
|
||
}
|
||
|
||
func TestPoolSubmit_NilTask(t *testing.T) {
|
||
// 测试提交 nil 任务不会 panic
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 10,
|
||
})
|
||
|
||
p.Start()
|
||
defer p.Stop()
|
||
|
||
// 提交 nil 任务 - 这会导致 panic,所以不测试
|
||
// 但可以测试空任务函数
|
||
var executed atomic.Bool
|
||
emptyTask := func(*fasthttp.RequestCtx) {
|
||
executed.Store(true)
|
||
}
|
||
|
||
err := p.Submit(nil, emptyTask)
|
||
if err != nil {
|
||
t.Errorf("Submit failed: %v", err)
|
||
}
|
||
|
||
time.Sleep(50 * time.Millisecond)
|
||
|
||
if !executed.Load() {
|
||
t.Error("Expected empty task to be executed")
|
||
}
|
||
}
|
||
|
||
func TestPoolSubmit_MultipleQueuedTasks(t *testing.T) {
|
||
// 测试多个任务入队
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 5,
|
||
MinWorkers: 2,
|
||
QueueSize: 10,
|
||
IdleTimeout: 5 * time.Second,
|
||
})
|
||
|
||
p.Start()
|
||
defer p.Stop()
|
||
|
||
var counter atomic.Int32
|
||
|
||
// 快速提交多个任务
|
||
for range 5 {
|
||
_ = p.Submit(nil, func(*fasthttp.RequestCtx) {
|
||
counter.Add(1)
|
||
})
|
||
}
|
||
|
||
time.Sleep(200 * time.Millisecond)
|
||
|
||
if counter.Load() != 5 {
|
||
t.Errorf("Expected 5 executions, got %d", counter.Load())
|
||
}
|
||
}
|
||
|
||
func TestPoolSubmit_StartWorkerWhenNoIdle(t *testing.T) {
|
||
// 测试当没有空闲 worker 时启动新 worker
|
||
// 使用 MinWorkers=1 让池只预热 1 个 worker
|
||
p := NewGoroutinePool(PoolConfig{
|
||
MaxWorkers: 5,
|
||
MinWorkers: 1, // 只预热 1 个 worker
|
||
QueueSize: 10,
|
||
IdleTimeout: 5 * time.Second,
|
||
})
|
||
|
||
p.Start()
|
||
defer p.Stop()
|
||
|
||
// 等待预热完成
|
||
time.Sleep(50 * time.Millisecond)
|
||
|
||
// 用阻塞任务让唯一的 worker 忙碌
|
||
blockCh := make(chan struct{})
|
||
started := make(chan struct{})
|
||
_ = p.Submit(nil, func(*fasthttp.RequestCtx) {
|
||
close(started)
|
||
<-blockCh
|
||
})
|
||
<-started // 等待 worker 开始执行
|
||
|
||
// 现在唯一的 worker 在忙碌,idleWorkers == 0
|
||
// 提交新任务应该启动新 worker
|
||
var executed atomic.Bool
|
||
_ = p.Submit(nil, func(*fasthttp.RequestCtx) {
|
||
executed.Store(true)
|
||
})
|
||
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
if !executed.Load() {
|
||
t.Error("Expected task to be executed by new worker")
|
||
}
|
||
|
||
// 检查是否启动了新 worker
|
||
if atomic.LoadInt32(&p.workers) < 2 {
|
||
t.Errorf("Expected at least 2 workers, got %d", atomic.LoadInt32(&p.workers))
|
||
}
|
||
|
||
close(blockCh)
|
||
}
|