lolly/internal/lua/api_timer.go
xfy e8fbbf368c fix(config,server): merge defaults on Load and fix monitoring registration
Two related fixes that must land together:

1. config.Load() now starts from DefaultConfig() before unmarshaling
   YAML. This ensures missing top-level fields (Performance,
   Monitoring, Resolver) use their documented defaults instead of
   zero values. Most importantly, file_cache is no longer silently
   disabled when users omit the performance: section.

2. startSingleMode() now checks Monitoring.Status.Enabled instead of
   Path/Allow to decide whether to register the status endpoint.
   Without this change, fix #1 would have caused a regression where
   the status handler is registered even when monitoring is disabled,
   because DefaultConfig() sets Path and Allow defaults.

Also replace remaining log.Printf in status.go and lua/api_timer.go
with zerolog to follow project logging conventions.

Added tests:
- config/load_test.go: verifies defaults are applied, explicit values
  override defaults, and monitoring stays disabled by default.
- server/monitoring_registration_test.go: verifies /_status is only
  registered when enabled and remains reachable with static handler
  on path: /.
2026-06-11 15:08:57 +08:00

571 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package lua 提供 ngx.timer API 实现。
//
// 该文件实现定时器相关的 Lua API兼容 OpenResty/ngx_lua 语义。
// 包括:
// - TimerManager定时器管理器支持延迟执行和取消
// - TimerEntry单个定时器条目包含回调和参数
// - TimerHandleLua 可见的定时器句柄userdata
// - 调度器 goroutine在专用 LState 中安全执行 Lua 回调
//
// 设计说明:
// - 回调通过 FunctionProto 编译后在调度器 goroutine 中执行
// - 不允许回调捕获 upvalue闭包变量防止内存泄漏
// - 优雅关闭:等待回调队列排空,超时后放弃剩余回调
//
// 注意事项:
// - 定时器回调中不可用 ngx.req/ngx.resp/ngx.var/ngx.ctx 等请求级 API
// - 队列满时回调被丢弃(不重试)
//
// 作者xfy
package lua
import (
"fmt"
"sync"
"sync/atomic"
"time"
glua "github.com/yuin/gopher-lua"
"rua.plus/lolly/internal/logging"
)
// CallbackEntry 回调队列条目,封装定时器触发的 Lua 回调。
//
// 包含编译后的 FunctionProto 和调用参数,
// 由 TimerManager 推入 engine 的 callbackQueue 供调度器执行。
type CallbackEntry struct {
// proto 编译后的 Lua 函数原型
proto *glua.FunctionProto
// args 调用参数
args []glua.LValue
}
// TimerManager 定时器管理器。
//
// 负责创建、取消和执行 Lua 定时器。
// 回调在专用调度器 goroutine 的 LState 中执行,实现线程隔离。
//
// 并发安全说明:
// - mu 保护 timers 映射的读写
// - queueMu 保护队列关闭状态
// - active/stopping 使用 atomic 操作
type TimerManager struct {
// timers 活跃定时器映射ID -> 条目)
timers map[uint64]*TimerEntry
// engine 关联的 Lua 引擎
engine *LuaEngine
// callbackQueue 回调队列,推入调度器执行
callbackQueue chan *CallbackEntry
// schedulerDone 调度器 goroutine 退出信号
schedulerDone chan struct{}
// schedulerL 调度器专用 LState
schedulerL *glua.LState
// nextID 下一个定时器 ID原子操作
nextID atomic.Uint64
// mu 定时器映射读写锁
mu sync.Mutex
// queueMu 队列状态锁
queueMu sync.Mutex
// active 活跃定时器计数(原子操作)
active atomic.Int32
// stopping 停止标记(原子操作)
stopping atomic.Int32
// queueClosed 队列是否已关闭
queueClosed bool
}
// TimerEntry 定时器条目。
//
// 封装单个定时器的回调、参数、生命周期和取消信号。
type TimerEntry struct {
// callback 原始回调函数
callback *glua.LFunction
// callbackProto 编译后的回调原型(无 upvalue 限制)
callbackProto *glua.FunctionProto
// timer 底层 time.Timer
timer *time.Timer
// cancel 取消信号通道
cancel chan struct{}
// done 完成信号通道
done chan struct{}
// args 回调参数
args []glua.LValue
// ID 定时器唯一标识
id uint64
// delay 延迟时间
delay time.Duration
}
// TimerHandle 定时器句柄,暴露给 Lua 的 userdata。
//
// 通过该句柄可在 Lua 中取消定时器。
type TimerHandle struct {
// manager 关联的定时器管理器
manager *TimerManager
// id 定时器 ID
id uint64
}
// NewTimerManager 创建定时器管理器实例。
//
// 初始化定时器映射、回调队列、调度器 LState
// 并启动调度器 goroutine。
//
// 参数:
// - engine: Lua 引擎实例
//
// 返回值:
// - *TimerManager: 初始化的管理器实例
func NewTimerManager(engine *LuaEngine) *TimerManager {
m := &TimerManager{
timers: make(map[uint64]*TimerEntry),
engine: engine,
callbackQueue: make(chan *CallbackEntry, 1024),
schedulerDone: make(chan struct{}),
}
// 创建专用调度器 LState
m.schedulerL = glua.NewState(glua.Options{
SkipOpenLibs: true,
})
glua.OpenBase(m.schedulerL)
glua.OpenTable(m.schedulerL)
glua.OpenString(m.schedulerL)
glua.OpenMath(m.schedulerL)
// 注册调度器可用的安全 API
if engine != nil {
RegisterSharedDictAPI(m.schedulerL, engine.SharedDictManager(), m.schedulerL.NewTable())
RegisterNgxLogAPI(m.schedulerL, nil)
}
// 启动调度器 goroutine
go m.schedulerLoop()
return m
}
// At 创建延迟定时器。
//
// 在指定延迟后执行回调函数。回调在调度器 goroutine 的 LState 中执行。
//
// 参数:
// - delay: 延迟时间
// - callback: Lua 回调函数(不能捕获 upvalue
// - args: 回调参数
//
// 返回值:
// - *TimerHandle: 定时器句柄
// - error: 创建失败或服务器正在关闭时返回错误
//
// 安全说明:
// - 回调不能捕获 upvalue闭包变量因为它们会在不同 goroutine 中执行
// - 跨协程数据共享应使用 shared dict
func (m *TimerManager) At(delay time.Duration, callback *glua.LFunction, args []glua.LValue) (*TimerHandle, error) {
if m.stopping.Load() != 0 {
return nil, nil // 服务器正在关闭,不接受新定时器
}
m.mu.Lock()
defer m.mu.Unlock()
id := m.nextID.Add(1)
// 编译回调为 FunctionProto
var proto *glua.FunctionProto
if callback != nil && callback.Proto != nil {
proto = callback.Proto
// 拒绝带有 upvalue 的回调
if proto.NumUpvalues > 0 {
return nil, fmt.Errorf("timer callback cannot capture upvalues (closure variables); use shared dict instead")
}
}
entry := &TimerEntry{
id: id,
delay: delay,
callback: callback,
callbackProto: proto,
args: args,
cancel: make(chan struct{}),
done: make(chan struct{}),
}
// 设置定时器
entry.timer = time.AfterFunc(delay, func() {
m.executeTimer(entry)
})
m.timers[id] = entry
m.active.Add(1)
return &TimerHandle{id: id, manager: m}, nil
}
// executeTimer 执行定时器回调。
//
// 定时器到期时由 time.AfterFunc 调用,将回调推入调度器队列。
// 检查取消信号,清理条目,并在队列满时丢弃回调。
func (m *TimerManager) executeTimer(entry *TimerEntry) {
defer func() {
m.active.Add(-1)
close(entry.done)
}()
// 检查是否被取消
select {
case <-entry.cancel:
return // 已取消
default:
}
// 清理定时器条目
m.mu.Lock()
if m.timers != nil {
delete(m.timers, entry.id)
}
m.mu.Unlock()
// 将回调入队到调度器
if entry.callbackProto != nil {
cbEntry := &CallbackEntry{
proto: entry.callbackProto,
args: entry.args,
}
m.queueMu.Lock()
if m.queueClosed {
m.queueMu.Unlock()
return // 通道已关闭,放弃回调
}
select {
case m.callbackQueue <- cbEntry:
m.queueMu.Unlock()
default:
m.queueMu.Unlock()
logging.Warn().Msg("[lua] timer callback dropped: queue full")
}
}
}
// schedulerLoop 调度器循环,在专用 goroutine 中执行 Lua 回调。
//
// 从 callbackQueue 中读取回调条目,从 FunctionProto 重建 Lua 函数并执行。
// 队列关闭时退出循环。
func (m *TimerManager) schedulerLoop() {
defer close(m.schedulerDone)
for entry := range m.callbackQueue {
// 从字节码重建函数并执行
fn := m.schedulerL.NewFunctionFromProto(entry.proto)
if fn == nil {
logging.Error().Msg("[lua] timer callback: failed to create function from proto")
continue
}
// 调用函数
if err := m.schedulerL.CallByParam(glua.P{
Fn: fn,
NRet: 0,
}, entry.args...); err != nil {
logging.Error().Err(err).Msg("[lua] timer callback error")
}
}
}
// Cancel 取消定时器。
//
// 停止底层 time.Timer发送取消信号清理定时器条目。
//
// 参数:
// - handle: 定时器句柄
//
// 返回值:
// - bool: true 表示成功取消false 表示定时器不存在或已执行
func (m *TimerManager) Cancel(handle *TimerHandle) bool {
m.mu.Lock()
defer m.mu.Unlock()
entry, ok := m.timers[handle.id]
if !ok {
return false // 定时器不存在或已执行
}
// 停止定时器
if entry.timer != nil {
entry.timer.Stop()
}
// 发送取消信号
close(entry.cancel)
// 清理
delete(m.timers, entry.id)
m.active.Add(-1)
return true
}
// WaitAll 等待所有定时器完成。
//
// 设置停止标志(拒绝新定时器),轮询等待活跃计数器归零。
// 超时后强制取消所有剩余定时器。
//
// 参数:
// - timeout: 最大等待时间
//
// 返回值:
// - bool: true 表示所有定时器已正常完成false 表示超时
func (m *TimerManager) WaitAll(timeout time.Duration) bool {
// 设置停止标志
m.stopping.Store(1)
// 等待所有定时器完成
start := time.Now()
for m.active.Load() > 0 {
if time.Since(start) > timeout {
// 超时,强制取消所有
m.mu.Lock()
for _, entry := range m.timers {
if entry.timer != nil {
entry.timer.Stop()
}
close(entry.cancel)
}
m.timers = make(map[uint64]*TimerEntry)
m.mu.Unlock()
return false
}
time.Sleep(10 * time.Millisecond)
}
return true
}
// Close 关闭定时器管理器。
//
// 执行顺序:
// 1. 设置停止标志,拒绝新定时器
// 2. 优雅关闭等待回调队列排空5 秒超时)
// 3. 关闭调度器 LState
//
// 注意:该方法是幂等的,可安全调用多次。
func (m *TimerManager) Close() {
if m == nil || m.stopping.Load() != 0 {
return // 已关闭或 nil
}
// 1. 停止接受新定时器
m.stopping.Store(1)
// 2. 优雅关闭:等待回调队列排空
m.gracefulShutdown(5 * time.Second)
// 3. 关闭调度器 LState
if m.schedulerL != nil {
m.schedulerL.Close()
m.schedulerL = nil
}
}
// gracefulShutdown 优雅关闭定时器管理器。
//
// 关闭回调队列,等待调度器 goroutine 退出。
// 超时后记录被丢弃的回调数量。
//
// 参数:
// - timeout: 最大等待时间
func (m *TimerManager) gracefulShutdown(timeout time.Duration) {
m.queueMu.Lock()
m.queueClosed = true
close(m.callbackQueue)
m.queueMu.Unlock()
// 等待调度器 goroutine 退出
select {
case <-m.schedulerDone:
case <-time.After(timeout):
abandoned := len(m.callbackQueue)
if abandoned > 0 {
logging.Warn().Int("abandoned", abandoned).Msg("[lua] shutdown timeout: callbacks abandoned")
}
}
}
// ActiveCount 返回当前活跃定时器数量。
//
// 返回值:
// - int32: 活跃定时器数量
func (m *TimerManager) ActiveCount() int32 {
return m.active.Load()
}
// RegisterTimerAPI 注册 ngx.timer API 到 Lua 状态机。
//
// 在 ngx 表下创建 timer 子表,注册以下方法:
// - at(delay, callback, ...):创建延迟定时器,返回 userdata 句柄
// - running_count():返回活跃定时器数量
//
// 同时创建定时器句柄元表,支持 cancel 方法。
//
// 参数:
// - L: Lua 状态
// - manager: 定时器管理器实例
// - ngx: ngx 全局表
func RegisterTimerAPI(L *glua.LState, manager *TimerManager, ngx *glua.LTable) {
// 创建 ngx.timer 表
timer := L.NewTable()
// ngx.timer.at(delay, callback, ...)
L.SetField(timer, "at", L.NewFunction(func(L *glua.LState) int {
// 检查参数
delay := float64(L.CheckNumber(1))
callback := L.CheckFunction(2)
// 收集额外参数
args := []glua.LValue{}
for i := 3; i <= L.GetTop(); i++ {
args = append(args, L.Get(i))
}
// 创建定时器
handle, err := manager.At(time.Duration(delay)*time.Second, callback, args)
if err != nil {
L.Push(glua.LNil)
L.Push(glua.LString(err.Error()))
return 2
}
if handle == nil {
L.Push(glua.LNil)
L.Push(glua.LString("server shutting down"))
return 2
}
// 返回定时器句柄
ud := L.NewUserData()
ud.Value = handle
L.SetMetatable(ud, L.GetTypeMetatable("ngx.timer.handle"))
L.Push(ud)
return 1
}))
// ngx.timer.running_count()
L.SetField(timer, "running_count", L.NewFunction(func(L *glua.LState) int {
L.Push(glua.LNumber(manager.ActiveCount()))
return 1
}))
L.SetField(ngx, "timer", timer)
// 创建定时器句柄元表
mt := L.NewTypeMetatable("ngx.timer.handle")
L.SetField(mt, "__index", L.NewFunction(timerHandleIndex))
L.SetField(mt, "__tostring", L.NewFunction(timerHandleToString))
// 注册方法
methods := L.NewTable()
L.SetField(methods, "cancel", L.NewFunction(timerHandleCancel))
L.SetField(mt, "methods", methods)
}
// timerHandleIndex 定时器句柄索引
func timerHandleIndex(L *glua.LState) int {
ud := L.CheckUserData(1)
_, ok := ud.Value.(*TimerHandle)
if !ok {
L.RaiseError("invalid timer handle")
return 0
}
// 检查是否是方法
// 类型断言检查 - 使用已声明的 ud
userdata, ok := L.Get(1).(*glua.LUserData)
if !ok {
L.Push(glua.LNil)
return 1
}
methods := L.GetField(userdata.Metatable, "methods")
if method := L.GetField(methods, L.CheckString(2)); method != glua.LNil {
L.Push(method)
return 1
}
L.Push(glua.LNil)
return 1
}
// timerHandleToString 定时器句柄字符串表示
func timerHandleToString(L *glua.LState) int {
ud := L.CheckUserData(1)
handle, ok := ud.Value.(*TimerHandle)
if !ok {
L.Push(glua.LString("invalid timer handle"))
return 1
}
L.Push(glua.LString("ngx.timer.handle:" + uint64ToStr(handle.id)))
return 1
}
// timerHandleCancel 取消定时器
func timerHandleCancel(L *glua.LState) int {
ud := L.CheckUserData(1)
handle, ok := ud.Value.(*TimerHandle)
if !ok {
L.RaiseError("invalid timer handle")
return 0
}
if handle.manager == nil {
L.Push(glua.LFalse)
L.Push(glua.LString("timer manager not available"))
return 2
}
ok = handle.manager.Cancel(handle)
if ok {
L.Push(glua.LTrue)
return 1
}
L.Push(glua.LFalse)
L.Push(glua.LString("timer not found or already executed"))
return 2
}
// uint64ToStr 整数转字符串
func uint64ToStr(n uint64) string {
if n == 0 {
return "0"
}
var buf []byte
for n > 0 {
buf = append(buf, byte('0'+n%10))
n /= 10
}
// 反转
for i, j := 0, len(buf)-1; i < j; i, j = i+1, j-1 {
buf[i], buf[j] = buf[j], buf[i]
}
return string(buf)
}