lolly/internal/lua/engine.go

606 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package lua 提供 Lua 脚本嵌入能力。
//
// 该文件包含 Lua 引擎的核心实现,包括:
// - LuaEngine全局引擎每个 HTTP Server 实例持有一个
// - LuaCoroutine请求级临时协程生命周期与请求绑定
// - CodeCache字节码缓存支持 LRU 淘汰和文件变更检测
// - 调度器:专用的 LState 用于定时器回调执行,实现线程隔离
//
// 架构设计:
//
// 采用 LState Pool 架构,每个请求从池中获取完全独立的 LState。
// 彻底消除共享状态,解决 gopher-lua 的并发竞态问题。
//
// 主要用途:
//
// 用于在 fasthttp 服务中嵌入 Lua 脚本,实现动态请求处理、
// 负载均衡、响应过滤等可编程功能,兼容 OpenResty/ngx_lua API 语义。
//
// 注意事项:
// - LuaEngine 非并发安全NewEngine/Close 应在初始化/关闭阶段调用
// - LuaCoroutine 为请求级独占,不可跨请求复用
// - 每个 LState 独立创建,拥有独立的 Global 表
//
// 作者xfy
package lua
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/valyala/fasthttp"
glua "github.com/yuin/gopher-lua"
"rua.plus/lolly/gjson"
)
// LuaEngine 全局 Lua 引擎。
//
// 每个 HTTP Server 实例持有一个 LuaEngine负责
// - 管理 LState 池(解决并发竞态问题)
// - 创建和回收请求级协程LuaCoroutine
// - 管理字节码缓存CodeCache
// - 管理共享字典、定时器、location 等子系统
// - 提供调度器 LState 用于定时器回调的线程隔离执行
//
// 类型命名说明:虽然 lua.LuaEngine 存在 stuttering但保持此命名以
// 1) 与 LuaContext/LuaCoroutine 保持一致的 API 命名风格
// 2) 明确区分 Lua 引擎与其他引擎类型
// 3) 保持向后兼容性
type LuaEngine struct {
// 引擎配置
config *Config
// 字节码缓存
codeCache *CodeCache
// 协程池,复用 LuaCoroutine 结构体内存(不复用协程状态)
coroutinePool sync.Pool
// 共享字典管理器
sharedDictManager *SharedDictManager
// 定时器管理器
timerManager *TimerManager
// location 管理器(子请求)
locationManager *LocationManager
// 调度器 LState用于执行定时器回调
schedulerLState *glua.LState
// 回调队列,定时器触发后将回调入队
callbackQueue chan *CallbackEntry
// LState 池(解决并发竞态问题)
lstatePool *LStatePool
// 上下文及取消函数
ctx context.Context
cancel context.CancelFunc
// 并发控制
maxCoroutines int
activeCount atomic.Int32
// 引擎统计
stats EngineStats
schedulerMu sync.RWMutex
schedulerDone chan struct{}
}
// EngineStats 引擎统计信息。
//
// 记录引擎运行期间的关键指标,用于监控和诊断。
// 所有字段均为原子操作,并发安全。
type EngineStats struct {
// CoroutinesCreated 已创建的协程总数
CoroutinesCreated uint64
// CoroutinesClosed 已关闭的协程总数
CoroutinesClosed uint64
// ScriptsExecuted 成功执行的脚本总数
ScriptsExecuted uint64
// ScriptsErrors 执行出错的脚本总数
ScriptsErrors uint64
}
// NewEngine 创建并初始化 Lua 引擎。
//
// 该函数执行以下初始化步骤:
// 1. 创建主 LState配置栈大小和内存优化选项
// 2. 加载安全的标准库base、table、string、math、coroutine
// 3. 按需加载危险库os、io默认禁止 package 库
// 4. 初始化字节码缓存、共享字典、定时器、location 管理器
// 5. 执行协程池预热
//
// 参数:
// - config: 引擎配置,为 nil 时使用 DefaultConfig()
//
// 返回值:
// - *LuaEngine: 初始化完成的引擎实例
// - error: 初始化失败时返回错误
//
// 使用示例:
//
// engine, err := lua.NewEngine(nil) // 使用默认配置
// if err != nil {
// // 处理初始化错误
// }
// defer engine.Close()
//
// 注意事项:
// - 该方法应在服务启动阶段调用,不应在请求处理路径中调用
// - 返回的引擎需要在使用完毕后调用 Close() 释放资源
func NewEngine(config *Config) (*LuaEngine, error) {
if config == nil {
config = DefaultConfig()
}
// 设置池默认值
if config.LStatePoolInitialSize <= 0 {
config.LStatePoolInitialSize = 10
}
if config.LStatePoolMaxSize <= 0 {
config.LStatePoolMaxSize = 100
}
ctx, cancel := context.WithCancel(context.Background())
engine := &LuaEngine{
config: config,
codeCache: NewCodeCache(config.CodeCacheSize, config.CodeCacheTTL, config.EnableFileWatch),
maxCoroutines: config.MaxConcurrentCoroutines,
ctx: ctx,
cancel: cancel,
sharedDictManager: NewSharedDictManager(),
coroutinePool: sync.Pool{
New: func() any {
return &LuaCoroutine{}
},
},
}
// 创建 LState 池工厂函数
poolFactory := func() *glua.LState {
L := glua.NewState(glua.Options{
SkipOpenLibs: true,
CallStackSize: config.CoroutineStackSize,
MinimizeStackMemory: config.MinimizeStackMemory,
})
// 加载安全的标准库
glua.OpenBase(L)
glua.OpenTable(L)
glua.OpenString(L)
glua.OpenMath(L)
glua.OpenCoroutine(L)
glua.OpenPackage(L) // 需要 package 库支持 require
// 可选加载危险库
if config.EnableOSLib {
glua.OpenOs(L)
}
if config.EnableIOLib {
glua.OpenIo(L)
}
// 预加载 gjson 模块
gjson.Preload(L)
return L
}
// 创建 LState 池
engine.lstatePool = NewLStatePool(poolFactory, config.LStatePoolInitialSize, config.LStatePoolMaxSize)
// 创建定时器管理器
engine.timerManager = NewTimerManager(engine)
// 创建 location 管理器
engine.locationManager = NewLocationManager()
// 协程池预热
if config.CoroutinePoolWarmup > 0 {
for i := 0; i < config.CoroutinePoolWarmup; i++ {
engine.coroutinePool.Put(&LuaCoroutine{})
}
}
return engine, nil
}
// Close 关闭 Lua 引擎,释放所有资源。
//
// 关闭顺序:
// 1. 取消引擎上下文,通知所有子 goroutine 退出
// 2. 关闭定时器管理器(等待定时器回调排空)
// 3. 关闭共享字典管理器
// 4. 关闭主 LState
//
// 注意:该方法是幂等的,可安全调用多次。
func (e *LuaEngine) Close() {
if e == nil {
return // 已关闭或 nil
}
e.cancel()
if e.timerManager != nil {
e.timerManager.Close()
}
if e.sharedDictManager != nil {
e.sharedDictManager.Close()
}
if e.lstatePool != nil {
e.lstatePool.Close()
}
// 标记为已关闭
e.lstatePool = nil
}
// NewCoroutine 创建请求级临时协程。
//
// 该方法执行以下操作:
// 1. 检查并发限制,超过最大协程数时返回错误
// 2. 通过主 LState.NewThread() 创建底层 Lua 协程
// 3. 从对象池中获取 LuaCoroutine 结构体(复用内存)
// 4. 设置执行上下文(含超时控制)和请求上下文
//
// 参数:
// - req: fasthttp 请求上下文,用于 API 访问ngx.req、ngx.resp 等)
//
// 返回值:
// - *LuaCoroutine: 新创建的协程实例
// - error: 创建失败时返回错误(如超出并发限制)
//
// 注意事项:
// - 协程在 ResumeOK 后变成 dead 状态,不能复用
// - 使用完毕后必须调用 Close() 或 releaseCoroutine() 释放资源
func (e *LuaEngine) NewCoroutine(req *fasthttp.RequestCtx) (*LuaCoroutine, error) {
// 步骤1: 检查并发限制
current := e.activeCount.Add(1)
if current > int32(e.maxCoroutines) {
e.activeCount.Add(-1)
return nil, fmt.Errorf("max concurrent coroutines exceeded: %d/%d", current, e.maxCoroutines)
}
// 步骤2: 从池中获取独立的 LState
L := e.lstatePool.Get()
if L == nil {
e.activeCount.Add(-1)
return nil, fmt.Errorf("lstate pool exhausted")
}
// 步骤3: 从池中获取协程对象结构
coro, ok := e.coroutinePool.Get().(*LuaCoroutine)
if !ok {
coro = &LuaCoroutine{}
}
coro.Engine = e
coro.Co = L
coro.Cancel = nil // 不再使用 NewThread 的 cancel
coro.RequestCtx = req
coro.CreatedAt = time.Now()
coro.ExecutionContext, coro.executionCancel = context.WithTimeout(e.ctx, e.config.MaxExecutionTime)
// 步骤4: 设置 LState 的上下文为执行上下文
L.SetContext(coro.ExecutionContext)
atomic.AddUint64(&e.stats.CoroutinesCreated, 1)
return coro, nil
}
// releaseCoroutine 释放协程资源并放回对象池。
//
// 该方法执行以下清理操作:
// 1. 取消执行上下文和协程
// 2. 清空所有引用字段,防止内存泄漏
// 3. 更新活跃协程计数和关闭计数
// 4. 将 LuaCoroutine 结构体放回对象池(仅复用内存)
//
// 注意:这是内部方法,外部应通过 LuaCoroutine.Close() 间接调用。
func (e *LuaEngine) releaseCoroutine(coro *LuaCoroutine) {
if coro == nil {
return
}
// 步骤1: 取消执行上下文
if coro.executionCancel != nil {
coro.executionCancel()
}
// 步骤2: 将 LState 归还池中
if coro.Co != nil {
e.lstatePool.Put(coro.Co)
}
// 步骤3: 清理状态,防止内存泄漏
coro.Co = nil
coro.Cancel = nil
coro.RequestCtx = nil
coro.ExecutionContext = nil
coro.executionCancel = nil
// 步骤4: 更新计数
e.activeCount.Add(-1)
atomic.AddUint64(&e.stats.CoroutinesClosed, 1)
// 步骤5: 放回池中(仅复用 LuaCoroutine 结构体内存)
e.coroutinePool.Put(coro)
}
// CodeCache 返回字节码缓存实例。
//
// 返回值:
// - *CodeCache: 字节码缓存,用于脚本编译缓存
func (e *LuaEngine) CodeCache() *CodeCache {
return e.codeCache
}
// Stats 返回引擎运行统计信息。
//
// 返回值:
// - EngineStats: 包含创建/关闭协程数、执行/出错脚本数的统计快照
//
// 注意:返回值为快照副本,非实时引用。
func (e *LuaEngine) Stats() EngineStats {
return EngineStats{
CoroutinesCreated: atomic.LoadUint64(&e.stats.CoroutinesCreated),
CoroutinesClosed: atomic.LoadUint64(&e.stats.CoroutinesClosed),
ScriptsExecuted: atomic.LoadUint64(&e.stats.ScriptsExecuted),
ScriptsErrors: atomic.LoadUint64(&e.stats.ScriptsErrors),
}
}
// ActiveCoroutines 返回当前活跃的协程数量。
//
// 返回值:
// - int32: 当前正在执行的协程数
func (e *LuaEngine) ActiveCoroutines() int32 {
return e.activeCount.Load()
}
// SharedDictManager 返回共享字典管理器实例。
//
// 返回值:
// - *SharedDictManager: 用于管理多个命名的 SharedDict 实例
func (e *LuaEngine) SharedDictManager() *SharedDictManager {
return e.sharedDictManager
}
// CreateSharedDict 创建或获取指定名称的共享字典。
//
// 参数:
// - name: 字典名称
// - maxItems: 字典最大条目数LRU 淘汰阈值)
//
// 返回值:
// - *SharedDict: 共享字典实例
func (e *LuaEngine) CreateSharedDict(name string, maxItems int) *SharedDict {
return e.sharedDictManager.CreateDict(name, maxItems)
}
// TimerManager 返回定时器管理器实例。
//
// 返回值:
// - *TimerManager: 用于管理 ngx.timer.* API 的定时器
func (e *LuaEngine) TimerManager() *TimerManager {
return e.timerManager
}
// LocationManager 返回 location 管理器实例。
//
// 返回值:
// - *LocationManager: 用于管理 ngx.location.capture 子请求
func (e *LuaEngine) LocationManager() *LocationManager {
return e.locationManager
}
// InitSchedulerLState 初始化调度器 LState。
//
// 创建专用的 LState 用于定时器回调执行,实现与请求处理线程的隔离。
// 该调度器 LState 仅加载安全子集库,禁止危险操作。
//
// 初始化步骤:
// 1. 创建 LState跳过默认库
// 2. 加载安全库base、table、string、math
// 3. 注册安全的 APIngx.shared、ngx.log、ngx.timer
// 4. 创建回调队列(容量 1024
// 5. 启动调度器 goroutine
//
// 返回值:
// - error: 初始化失败时返回错误
//
// 注意事项:
// - 该方法应在引擎启动后、定时器使用前调用
// - 调度器 LState 与主 LState 共享同一个共享字典管理器
func (e *LuaEngine) InitSchedulerLState() error {
L := glua.NewState(glua.Options{
SkipOpenLibs: true,
})
glua.OpenBase(L)
glua.OpenTable(L)
glua.OpenString(L)
glua.OpenMath(L)
ngx := L.NewTable()
L.SetGlobal("ngx", ngx)
RegisterSharedDictAPI(L, e.sharedDictManager, ngx)
RegisterNgxLogAPI(L, nil)
RegisterTimerAPI(L, e.timerManager, ngx)
q := make(chan *CallbackEntry, 1024)
done := make(chan struct{})
e.schedulerMu.Lock()
e.schedulerLState = L
e.callbackQueue = q
e.schedulerDone = done
e.schedulerMu.Unlock()
go e.SchedulerLoop()
return nil
}
// SchedulerLoop 调度器循环。
//
// 在独立的 goroutine 中运行,持续监听回调队列和引擎上下文:
// - 从 callbackQueue 接收定时器回调并执行
// - 监听 ctx.Done() 信号,引擎关闭时退出循环
//
// 注意:该方法由 InitSchedulerLState 自动启动,不应手动调用。
func (e *LuaEngine) SchedulerLoop() {
defer func() {
e.schedulerMu.RLock()
d := e.schedulerDone
e.schedulerMu.RUnlock()
if d != nil {
close(d)
}
}()
for {
e.schedulerMu.RLock()
q := e.callbackQueue
ctx := e.ctx
e.schedulerMu.RUnlock()
if q == nil {
return
}
select {
case entry, ok := <-q:
if !ok {
return
}
e.executeCallback(entry)
case <-ctx.Done():
return
}
}
}
// executeCallback 执行单个定时器回调。
//
// 该函数从 FunctionProto 重建 Lua 函数并在调度器 LState 中调用。
// 使用 Protect 模式捕获执行错误,防止回调 panic 导致调度器崩溃。
//
// 参数:
// - entry: 回调队列条目,包含编译后的 FunctionProto 和参数
//
// 注意事项:
// - 使用 defer+recover 捕获 panic保护调度器稳定性
// - 错误在 Protect 模式下被 gopher-lua 内部捕获,不向外传播
func (e *LuaEngine) executeCallback(entry *CallbackEntry) {
defer func() {
if r := recover(); r != nil {
_ = r
}
}()
e.schedulerMu.RLock()
L := e.schedulerLState
e.schedulerMu.RUnlock()
if L == nil {
return
}
fn := L.NewFunctionFromProto(entry.proto)
_ = L.CallByParam(glua.P{
Fn: fn,
NRet: 0,
Protect: true,
}, entry.args...)
}
// EnqueueCallback 将回调加入调度队列。
//
// 由 TimerManager 在定时器触发时调用,将回调推入 callbackQueue。
//
// 参数:
// - entry: 回调条目
//
// 返回值:
// - bool: true 表示入队成功false 表示队列已满(回调被丢弃)
//
// 注意事项:
// - 使用非阻塞发送,队列满时直接返回 false
// - 丢弃的回调不会被重试
func (e *LuaEngine) EnqueueCallback(entry *CallbackEntry) bool {
e.schedulerMu.RLock()
q := e.callbackQueue
e.schedulerMu.RUnlock()
if q == nil {
return false
}
select {
case q <- entry:
return true
default:
return false
}
}
// CloseScheduler 关闭调度器。
//
// 执行以下操作:
// 1. 关闭回调队列(阻止新回调入队)
// 2. 关闭调度器 LState
//
// 注意:该方法是幂等的,可安全调用多次。
func (e *LuaEngine) CloseScheduler() {
e.schedulerMu.Lock()
if e.callbackQueue != nil {
close(e.callbackQueue)
e.callbackQueue = nil
}
done := e.schedulerDone
e.schedulerMu.Unlock()
if done != nil {
<-done
}
e.schedulerMu.Lock()
if e.schedulerLState != nil {
e.schedulerLState.Close()
e.schedulerLState = nil
}
e.schedulerMu.Unlock()
}
// GetLStateForTest 从池中获取一个 LState 用于测试。
//
// 该方法仅用于测试目的,返回一个独立的 LState。
// 使用完毕后应调用 PutLStateForTest 归还。
//
// 返回值:
// - *glua.LState: 可用的 LState 实例
func (e *LuaEngine) GetLStateForTest() *glua.LState {
return e.lstatePool.Get()
}
// PutLStateForTest 归还测试用的 LState。
//
// 参数:
// - L: 要归还的 LState
func (e *LuaEngine) PutLStateForTest(L *glua.LState) {
e.lstatePool.Put(L)
}