diff --git a/internal/lua/engine.go b/internal/lua/engine.go index 6bb5319..cdf5f2a 100644 --- a/internal/lua/engine.go +++ b/internal/lua/engine.go @@ -88,6 +88,8 @@ type LuaEngine struct { // 引擎统计 stats EngineStats + + schedulerMu sync.RWMutex } // EngineStats 引擎统计信息。 @@ -419,34 +421,29 @@ func (e *LuaEngine) LocationManager() *LocationManager { // - 该方法应在引擎启动后、定时器使用前调用 // - 调度器 LState 与主 LState 共享同一个共享字典管理器 func (e *LuaEngine) InitSchedulerLState() error { - // 步骤1: 创建调度器 LState - e.schedulerLState = glua.NewState(glua.Options{ - SkipOpenLibs: true, // 禁用默认库,手动加载安全库 + L := glua.NewState(glua.Options{ + SkipOpenLibs: true, }) - // 步骤2: 加载安全的标准库 - glua.OpenBase(e.schedulerLState) - glua.OpenTable(e.schedulerLState) - glua.OpenString(e.schedulerLState) - glua.OpenMath(e.schedulerLState) + glua.OpenBase(L) + glua.OpenTable(L) + glua.OpenString(L) + glua.OpenMath(L) - // 步骤3: 创建 ngx 表并注册安全 API - ngx := e.schedulerLState.NewTable() - e.schedulerLState.SetGlobal("ngx", ngx) + ngx := L.NewTable() + L.SetGlobal("ngx", ngx) - // 注册共享字典 API(与主引擎共享同一个管理器) - RegisterSharedDictAPI(e.schedulerLState, e.sharedDictManager, ngx) + RegisterSharedDictAPI(L, e.sharedDictManager, ngx) + RegisterNgxLogAPI(L, nil) + RegisterTimerAPI(L, e.timerManager, ngx) - // 注册日志 API - RegisterNgxLogAPI(e.schedulerLState, nil) + q := make(chan *CallbackEntry, 1024) - // 注册定时器 API(仅安全函数) - RegisterTimerAPI(e.schedulerLState, e.timerManager, ngx) + e.schedulerMu.Lock() + e.schedulerLState = L + e.callbackQueue = q + e.schedulerMu.Unlock() - // 步骤4: 创建回调队列 - e.callbackQueue = make(chan *CallbackEntry, 1024) - - // 步骤5: 启动调度器 goroutine go e.SchedulerLoop() return nil @@ -461,16 +458,22 @@ func (e *LuaEngine) InitSchedulerLState() error { // 注意:该方法由 InitSchedulerLState 自动启动,不应手动调用。 func (e *LuaEngine) SchedulerLoop() { for { + e.schedulerMu.RLock() + q := e.callbackQueue + ctx := e.ctx + e.schedulerMu.RUnlock() + + if q == nil { + return + } + select { - case entry, ok := <-e.callbackQueue: + case entry, ok := <-q: if !ok { - // 通道已关闭,退出调度器 return } e.executeCallback(entry) - - case <-e.ctx.Done(): - // 引擎关闭信号 + case <-ctx.Done(): return } } @@ -490,25 +493,25 @@ func (e *LuaEngine) SchedulerLoop() { func (e *LuaEngine) executeCallback(entry *CallbackEntry) { defer func() { if r := recover(); r != nil { - // 捕获 panic,防止调度器崩溃 _ = r } }() - if e.schedulerLState == nil { + e.schedulerMu.RLock() + L := e.schedulerLState + e.schedulerMu.RUnlock() + + if L == nil { return } - // 从 FunctionProto 创建函数 - fn := e.schedulerLState.NewFunctionFromProto(entry.proto) + fn := L.NewFunctionFromProto(entry.proto) - // 调用回调函数(不添加额外的 fn 参数) - _ = e.schedulerLState.CallByParam(glua.P{ + _ = L.CallByParam(glua.P{ Fn: fn, NRet: 0, Protect: true, }, entry.args...) - // 错误已在 Protect 模式下被捕获 } // EnqueueCallback 将回调加入调度队列。 @@ -525,11 +528,18 @@ func (e *LuaEngine) executeCallback(entry *CallbackEntry) { // - 使用非阻塞发送,队列满时直接返回 false // - 丢弃的回调不会被重试 func (e *LuaEngine) EnqueueCallback(entry *CallbackEntry) bool { + e.schedulerMu.RLock() + q := e.callbackQueue + e.schedulerMu.RUnlock() + + if q == nil { + return false + } + select { - case e.callbackQueue <- entry: + case q <- entry: return true default: - // 队列已满 return false } } @@ -542,6 +552,7 @@ func (e *LuaEngine) EnqueueCallback(entry *CallbackEntry) bool { // // 注意:该方法是幂等的,可安全调用多次。 func (e *LuaEngine) CloseScheduler() { + e.schedulerMu.Lock() if e.callbackQueue != nil { close(e.callbackQueue) e.callbackQueue = nil @@ -550,6 +561,7 @@ func (e *LuaEngine) CloseScheduler() { e.schedulerLState.Close() e.schedulerLState = nil } + e.schedulerMu.Unlock() } // GetLStateForTest 从池中获取一个 LState 用于测试。