lolly/internal/lua/engine.go
xfy fd4e164ae6 refactor(security): 滑动窗口限流器使用分段锁优化并发性能
将单一 counters map + 全局 mutex 改为 16 buckets 分段锁结构:

- 新增 limiterBucket 结构体,每个桶独立持有 RW 锁和计数器 map
- 使用 FNV-1a 哈希算法将键均匀分布到 16 个桶中
- 各方法修改为按 bucket 分发操作:
  - Allow() / allowApproximate() / allowPrecise()
  - Reset() / ResetAll() / Cleanup()
  - GetStats() / GetCount()

收益:
- 并发场景下锁竞争降低约 94% (16 个桶并行)
- 基准测试显示并行 Allow 操作约 89ns/op

测试验证:
- go test -race 通过并发安全测试
- 基准测试显示吞吐提升

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-14 14:26:01 +08:00

339 lines
8.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package lua 提供 Lua 脚本嵌入能力
package lua
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/valyala/fasthttp"
glua "github.com/yuin/gopher-lua"
)
// LuaEngine 全局 Lua 引擎
// 每个 HTTP Server 实例持有一个 LuaEngine
//
// 类型命名说明:虽然 lua.LuaEngine 存在 stuttering但保持此命名以
// 1) 与 LuaContext/LuaCoroutine 保持一致的 API 命名风格
// 2) 明确区分 Lua 引擎与其他引擎类型
// 3) 保持向后兼容性
type LuaEngine struct {
coroutinePool sync.Pool
ctx context.Context
codeCache *CodeCache
L *glua.LState
config *Config
schedulerLState *glua.LState
cancel context.CancelFunc
sharedDictManager *SharedDictManager
timerManager *TimerManager
locationManager *LocationManager
callbackQueue chan *CallbackEntry
stats EngineStats
maxCoroutines int
activeCount int32
}
// EngineStats 引擎统计信息
type EngineStats struct {
CoroutinesCreated uint64
CoroutinesClosed uint64
ScriptsExecuted uint64
ScriptsErrors uint64
}
// NewEngine 创建 Lua 引擎
func NewEngine(config *Config) (*LuaEngine, error) {
if config == nil {
config = DefaultConfig()
}
// 创建主 LState使用优化后的栈配置
// 协程通过 NewThread 继承这些配置
L := glua.NewState(glua.Options{
SkipOpenLibs: true, // 禁用默认库,手动加载安全库
CallStackSize: config.CoroutineStackSize,
MinimizeStackMemory: config.MinimizeStackMemory,
})
// 加载安全的标准库
glua.OpenBase(L)
glua.OpenTable(L)
glua.OpenString(L)
glua.OpenMath(L)
glua.OpenCoroutine(L) // 加载 coroutine 库支持 yield
// 可选加载危险库
if config.EnableOSLib {
glua.OpenOs(L)
}
if config.EnableIOLib {
glua.OpenIo(L)
}
// 注意package 库默认不加载,禁止 require 外部模块
ctx, cancel := context.WithCancel(context.Background())
engine := &LuaEngine{
L: L,
config: config,
codeCache: NewCodeCache(config.CodeCacheSize, config.CodeCacheTTL, config.EnableFileWatch),
maxCoroutines: config.MaxConcurrentCoroutines,
ctx: ctx,
cancel: cancel,
sharedDictManager: NewSharedDictManager(),
coroutinePool: sync.Pool{
New: func() interface{} {
// 注意:这里只是创建空的协程对象结构
// 实际的协程通过 L.NewThread() 创建
return &LuaCoroutine{}
},
},
}
// 创建定时器管理器(需要在 engine 创建后初始化)
engine.timerManager = NewTimerManager(engine)
// 创建 location 管理器
engine.locationManager = NewLocationManager()
// 协程池预热:预创建 LuaCoroutine 结构体对象
if config.CoroutinePoolWarmup > 0 {
for i := 0; i < config.CoroutinePoolWarmup; i++ {
engine.coroutinePool.Put(&LuaCoroutine{})
}
}
return engine, nil
}
// Close 关闭引擎
func (e *LuaEngine) Close() {
e.cancel()
if e.timerManager != nil {
e.timerManager.Close()
}
if e.sharedDictManager != nil {
e.sharedDictManager.Close()
}
if e.L != nil {
e.L.Close()
}
}
// NewCoroutine 创建临时协程
// 注意:协程在 ResumeOK 后变成 dead 状态,不能复用
func (e *LuaEngine) NewCoroutine(req *fasthttp.RequestCtx) (*LuaCoroutine, error) {
// 检查并发限制
current := atomic.AddInt32(&e.activeCount, 1)
if current > int32(e.maxCoroutines) {
atomic.AddInt32(&e.activeCount, -1)
return nil, fmt.Errorf("max concurrent coroutines exceeded: %d/%d", current, e.maxCoroutines)
}
// 通过 NewThread 创建协程
// 协程继承主 LState 的全局环境
co, cancel := e.L.NewThread()
if co == nil {
atomic.AddInt32(&e.activeCount, -1)
return nil, fmt.Errorf("failed to create coroutine")
}
// 从池中获取协程对象结构(复用内存,不复用协程状态)
coro, ok := e.coroutinePool.Get().(*LuaCoroutine)
if !ok {
coro = &LuaCoroutine{}
}
coro.Engine = e
coro.Co = co
coro.Cancel = cancel
coro.RequestCtx = req
coro.CreatedAt = time.Now()
coro.ExecutionContext, coro.executionCancel = context.WithTimeout(e.ctx, e.config.MaxExecutionTime)
// 设置 LState 的上下文为执行上下文(用于超时控制)
// 注意:不直接使用 RequestCtx因为 RequestCtx.Done() 依赖服务器连接
// RequestCtx 通过 coro.RequestCtx 字段访问,而不是 L.Context()
co.SetContext(coro.ExecutionContext)
atomic.AddUint64(&e.stats.CoroutinesCreated, 1)
return coro, nil
}
// releaseCoroutine 释放协程(内部方法)
func (e *LuaEngine) releaseCoroutine(coro *LuaCoroutine) {
if coro == nil {
return
}
// 取消执行上下文
if coro.executionCancel != nil {
coro.executionCancel()
}
// 取消协程
if coro.Cancel != nil {
coro.Cancel()
}
// 清理状态
coro.Co = nil
coro.Cancel = nil
coro.RequestCtx = nil
coro.ExecutionContext = nil
coro.executionCancel = nil
// 更新计数
atomic.AddInt32(&e.activeCount, -1)
atomic.AddUint64(&e.stats.CoroutinesClosed, 1)
// 放回池中(仅复用 LuaCoroutine 结构体内存)
e.coroutinePool.Put(coro)
}
// CodeCache 返回字节码缓存
func (e *LuaEngine) CodeCache() *CodeCache {
return e.codeCache
}
// Stats 返回引擎统计
func (e *LuaEngine) Stats() EngineStats {
return EngineStats{
CoroutinesCreated: atomic.LoadUint64(&e.stats.CoroutinesCreated),
CoroutinesClosed: atomic.LoadUint64(&e.stats.CoroutinesClosed),
ScriptsExecuted: atomic.LoadUint64(&e.stats.ScriptsExecuted),
ScriptsErrors: atomic.LoadUint64(&e.stats.ScriptsErrors),
}
}
// ActiveCoroutines 返回活跃协程数
func (e *LuaEngine) ActiveCoroutines() int32 {
return atomic.LoadInt32(&e.activeCount)
}
// SharedDictManager 返回共享字典管理器
func (e *LuaEngine) SharedDictManager() *SharedDictManager {
return e.sharedDictManager
}
// CreateSharedDict 创建共享字典
func (e *LuaEngine) CreateSharedDict(name string, maxItems int) *SharedDict {
return e.sharedDictManager.CreateDict(name, maxItems)
}
// TimerManager 返回定时器管理器
func (e *LuaEngine) TimerManager() *TimerManager {
return e.timerManager
}
// LocationManager 返回 location 管理器
func (e *LuaEngine) LocationManager() *LocationManager {
return e.locationManager
}
// InitSchedulerLState 初始化调度器 LState
// 创建专用的 LState 用于定时器回调执行,线程隔离
func (e *LuaEngine) InitSchedulerLState() error {
// 创建调度器 LState
e.schedulerLState = glua.NewState(glua.Options{
SkipOpenLibs: true, // 禁用默认库,手动加载安全库
})
// 加载安全的标准库
glua.OpenBase(e.schedulerLState)
glua.OpenTable(e.schedulerLState)
glua.OpenString(e.schedulerLState)
glua.OpenMath(e.schedulerLState)
// 创建 ngx 表
ngx := e.schedulerLState.NewTable()
e.schedulerLState.SetGlobal("ngx", ngx)
// 注册共享字典 API与主引擎共享同一个管理器
RegisterSharedDictAPI(e.schedulerLState, e.sharedDictManager, ngx)
// 注册日志 API
RegisterNgxLogAPI(e.schedulerLState, nil)
// 注册定时器 API仅安全函数
RegisterTimerAPI(e.schedulerLState, e.timerManager, ngx)
// 创建回调队列
e.callbackQueue = make(chan *CallbackEntry, 1024)
// 启动调度器 goroutine
go e.SchedulerLoop()
return nil
}
// SchedulerLoop 调度器循环
// 在独立的 goroutine 中运行,处理定时器回调
func (e *LuaEngine) SchedulerLoop() {
for {
select {
case entry, ok := <-e.callbackQueue:
if !ok {
// 通道已关闭,退出调度器
return
}
e.executeCallback(entry)
case <-e.ctx.Done():
// 引擎关闭信号
return
}
}
}
// executeCallback 执行定时器回调
func (e *LuaEngine) executeCallback(entry *CallbackEntry) {
defer func() {
if r := recover(); r != nil {
// 捕获 panic防止调度器崩溃
_ = r
}
}()
if e.schedulerLState == nil {
return
}
// 从 FunctionProto 创建函数
fn := e.schedulerLState.NewFunctionFromProto(entry.proto)
// 调用回调函数(不添加额外的 fn 参数)
_ = e.schedulerLState.CallByParam(glua.P{
Fn: fn,
NRet: 0,
Protect: true,
}, entry.args...)
// 错误已在 Protect 模式下被捕获
}
// EnqueueCallback 将回调加入调度队列
// 由 TimerManager 在定时器触发时调用
func (e *LuaEngine) EnqueueCallback(entry *CallbackEntry) bool {
select {
case e.callbackQueue <- entry:
return true
default:
// 队列已满
return false
}
}
// CloseScheduler 关闭调度器
func (e *LuaEngine) CloseScheduler() {
if e.callbackQueue != nil {
close(e.callbackQueue)
}
if e.schedulerLState != nil {
e.schedulerLState.Close()
e.schedulerLState = nil
}
}