fix(lua): add schedulerMu to protect scheduler LState and callback queue

This commit is contained in:
xfy 2026-06-05 11:38:52 +08:00
parent 2be04f3fb9
commit 76257a7859

View File

@ -88,6 +88,8 @@ type LuaEngine struct {
// 引擎统计
stats EngineStats
schedulerMu sync.RWMutex
}
// EngineStats 引擎统计信息。
@ -419,34 +421,29 @@ func (e *LuaEngine) LocationManager() *LocationManager {
// - 该方法应在引擎启动后、定时器使用前调用
// - 调度器 LState 与主 LState 共享同一个共享字典管理器
func (e *LuaEngine) InitSchedulerLState() error {
// 步骤1: 创建调度器 LState
e.schedulerLState = glua.NewState(glua.Options{
SkipOpenLibs: true, // 禁用默认库,手动加载安全库
L := glua.NewState(glua.Options{
SkipOpenLibs: true,
})
// 步骤2: 加载安全的标准库
glua.OpenBase(e.schedulerLState)
glua.OpenTable(e.schedulerLState)
glua.OpenString(e.schedulerLState)
glua.OpenMath(e.schedulerLState)
glua.OpenBase(L)
glua.OpenTable(L)
glua.OpenString(L)
glua.OpenMath(L)
// 步骤3: 创建 ngx 表并注册安全 API
ngx := e.schedulerLState.NewTable()
e.schedulerLState.SetGlobal("ngx", ngx)
ngx := L.NewTable()
L.SetGlobal("ngx", ngx)
// 注册共享字典 API与主引擎共享同一个管理器
RegisterSharedDictAPI(e.schedulerLState, e.sharedDictManager, ngx)
RegisterSharedDictAPI(L, e.sharedDictManager, ngx)
RegisterNgxLogAPI(L, nil)
RegisterTimerAPI(L, e.timerManager, ngx)
// 注册日志 API
RegisterNgxLogAPI(e.schedulerLState, nil)
q := make(chan *CallbackEntry, 1024)
// 注册定时器 API仅安全函数
RegisterTimerAPI(e.schedulerLState, e.timerManager, ngx)
e.schedulerMu.Lock()
e.schedulerLState = L
e.callbackQueue = q
e.schedulerMu.Unlock()
// 步骤4: 创建回调队列
e.callbackQueue = make(chan *CallbackEntry, 1024)
// 步骤5: 启动调度器 goroutine
go e.SchedulerLoop()
return nil
@ -461,16 +458,22 @@ func (e *LuaEngine) InitSchedulerLState() error {
// 注意:该方法由 InitSchedulerLState 自动启动,不应手动调用。
func (e *LuaEngine) SchedulerLoop() {
for {
e.schedulerMu.RLock()
q := e.callbackQueue
ctx := e.ctx
e.schedulerMu.RUnlock()
if q == nil {
return
}
select {
case entry, ok := <-e.callbackQueue:
case entry, ok := <-q:
if !ok {
// 通道已关闭,退出调度器
return
}
e.executeCallback(entry)
case <-e.ctx.Done():
// 引擎关闭信号
case <-ctx.Done():
return
}
}
@ -490,25 +493,25 @@ func (e *LuaEngine) SchedulerLoop() {
func (e *LuaEngine) executeCallback(entry *CallbackEntry) {
defer func() {
if r := recover(); r != nil {
// 捕获 panic防止调度器崩溃
_ = r
}
}()
if e.schedulerLState == nil {
e.schedulerMu.RLock()
L := e.schedulerLState
e.schedulerMu.RUnlock()
if L == nil {
return
}
// 从 FunctionProto 创建函数
fn := e.schedulerLState.NewFunctionFromProto(entry.proto)
fn := L.NewFunctionFromProto(entry.proto)
// 调用回调函数(不添加额外的 fn 参数)
_ = e.schedulerLState.CallByParam(glua.P{
_ = L.CallByParam(glua.P{
Fn: fn,
NRet: 0,
Protect: true,
}, entry.args...)
// 错误已在 Protect 模式下被捕获
}
// EnqueueCallback 将回调加入调度队列。
@ -525,11 +528,18 @@ func (e *LuaEngine) executeCallback(entry *CallbackEntry) {
// - 使用非阻塞发送,队列满时直接返回 false
// - 丢弃的回调不会被重试
func (e *LuaEngine) EnqueueCallback(entry *CallbackEntry) bool {
e.schedulerMu.RLock()
q := e.callbackQueue
e.schedulerMu.RUnlock()
if q == nil {
return false
}
select {
case e.callbackQueue <- entry:
case q <- entry:
return true
default:
// 队列已满
return false
}
}
@ -542,6 +552,7 @@ func (e *LuaEngine) EnqueueCallback(entry *CallbackEntry) bool {
//
// 注意:该方法是幂等的,可安全调用多次。
func (e *LuaEngine) CloseScheduler() {
e.schedulerMu.Lock()
if e.callbackQueue != nil {
close(e.callbackQueue)
e.callbackQueue = nil
@ -550,6 +561,7 @@ func (e *LuaEngine) CloseScheduler() {
e.schedulerLState.Close()
e.schedulerLState = nil
}
e.schedulerMu.Unlock()
}
// GetLStateForTest 从池中获取一个 LState 用于测试。