lolly/internal/lua/api_timer.go
xfy 8b382606df Merge branch 'lint-fix' - resolve sendfile.go conflict
Conflict: sendfile.go (!linux build tag) was incorrectly modified to
include linuxSendfile and getSocketFd functions which already exist
in sendfile_linux.go.

Resolution: Keep HEAD version (simple fallback returning ENOTSUP) as
Linux implementation is properly separated in sendfile_linux.go.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-13 09:26:48 +08:00

415 lines
9.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package lua 提供 Lua 脚本嵌入能力
package lua
import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"
glua "github.com/yuin/gopher-lua"
)
// CallbackEntry 回调队列条目
type CallbackEntry struct {
proto *glua.FunctionProto
args []glua.LValue
}
// TimerManager 定时器管理器
type TimerManager struct {
timers map[uint64]*TimerEntry
engine *LuaEngine
callbackQueue chan *CallbackEntry
schedulerDone chan struct{}
schedulerL *glua.LState
nextID uint64
mu sync.Mutex
queueMu sync.Mutex
active int32
stopping int32
queueClosed bool
}
// TimerEntry 定时器条目
type TimerEntry struct {
callback *glua.LFunction
callbackProto *glua.FunctionProto
timer *time.Timer
cancel chan struct{}
done chan struct{}
args []glua.LValue
id uint64
delay time.Duration
}
// TimerHandle 定时器句柄Lua userdata
type TimerHandle struct {
manager *TimerManager
id uint64
}
// NewTimerManager 创建定时器管理器
func NewTimerManager(engine *LuaEngine) *TimerManager {
m := &TimerManager{
timers: make(map[uint64]*TimerEntry),
engine: engine,
callbackQueue: make(chan *CallbackEntry, 1024),
schedulerDone: make(chan struct{}),
}
// 创建专用调度器 LState
m.schedulerL = glua.NewState(glua.Options{
SkipOpenLibs: true,
})
glua.OpenBase(m.schedulerL)
glua.OpenTable(m.schedulerL)
glua.OpenString(m.schedulerL)
glua.OpenMath(m.schedulerL)
// 注册调度器可用的安全 API
if engine != nil {
RegisterSharedDictAPI(m.schedulerL, engine.SharedDictManager(), m.schedulerL.NewTable())
RegisterNgxLogAPI(m.schedulerL, nil)
}
// 启动调度器 goroutine
go m.schedulerLoop()
return m
}
// At 创建定时器
// 返回定时器句柄和错误
func (m *TimerManager) At(delay time.Duration, callback *glua.LFunction, args []glua.LValue) (*TimerHandle, error) {
if atomic.LoadInt32(&m.stopping) != 0 {
return nil, nil // 服务器正在关闭,不接受新定时器
}
m.mu.Lock()
defer m.mu.Unlock()
id := atomic.AddUint64(&m.nextID, 1)
// 编译回调为 FunctionProto
var proto *glua.FunctionProto
if callback != nil && callback.Proto != nil {
proto = callback.Proto
// 拒绝带有 upvalue 的回调
if proto.NumUpvalues > 0 {
return nil, fmt.Errorf("timer callback cannot capture upvalues (closure variables); use shared dict instead")
}
}
entry := &TimerEntry{
id: id,
delay: delay,
callback: callback,
callbackProto: proto,
args: args,
cancel: make(chan struct{}),
done: make(chan struct{}),
}
// 设置定时器
entry.timer = time.AfterFunc(delay, func() {
m.executeTimer(entry)
})
m.timers[id] = entry
atomic.AddInt32(&m.active, 1)
return &TimerHandle{id: id, manager: m}, nil
}
// executeTimer 执行定时器回调
// 通过 channel 将回调调度到调度器 goroutine 执行
func (m *TimerManager) executeTimer(entry *TimerEntry) {
defer func() {
atomic.AddInt32(&m.active, -1)
close(entry.done)
}()
// 检查是否被取消
select {
case <-entry.cancel:
return // 已取消
default:
}
// 清理定时器条目
m.mu.Lock()
if m.timers != nil {
delete(m.timers, entry.id)
}
m.mu.Unlock()
// 将回调入队到调度器
if entry.callbackProto != nil {
cbEntry := &CallbackEntry{
proto: entry.callbackProto,
args: entry.args,
}
m.queueMu.Lock()
if m.queueClosed {
m.queueMu.Unlock()
return // 通道已关闭,放弃回调
}
select {
case m.callbackQueue <- cbEntry:
m.queueMu.Unlock()
default:
m.queueMu.Unlock()
log.Printf("[lua] timer callback dropped: queue full")
}
}
}
// schedulerLoop 调度器循环,在专用 goroutine 中执行 Lua 回调
func (m *TimerManager) schedulerLoop() {
defer close(m.schedulerDone)
for entry := range m.callbackQueue {
// 从字节码重建函数并执行
fn := m.schedulerL.NewFunctionFromProto(entry.proto)
if fn == nil {
log.Printf("[lua] timer callback: failed to create function from proto")
continue
}
// 调用函数
if err := m.schedulerL.CallByParam(glua.P{
Fn: fn,
NRet: 0,
}, entry.args...); err != nil {
log.Printf("[lua] timer callback error: %v", err)
}
}
}
// Cancel 取消定时器
func (m *TimerManager) Cancel(handle *TimerHandle) bool {
m.mu.Lock()
defer m.mu.Unlock()
entry, ok := m.timers[handle.id]
if !ok {
return false // 定时器不存在或已执行
}
// 停止定时器
if entry.timer != nil {
entry.timer.Stop()
}
// 发送取消信号
close(entry.cancel)
// 清理
delete(m.timers, entry.id)
atomic.AddInt32(&m.active, -1)
return true
}
// WaitAll 等待所有定时器完成
func (m *TimerManager) WaitAll(timeout time.Duration) bool {
// 设置停止标志
atomic.StoreInt32(&m.stopping, 1)
// 等待所有定时器完成
start := time.Now()
for atomic.LoadInt32(&m.active) > 0 {
if time.Since(start) > timeout {
// 超时,强制取消所有
m.mu.Lock()
for _, entry := range m.timers {
if entry.timer != nil {
entry.timer.Stop()
}
close(entry.cancel)
}
m.timers = make(map[uint64]*TimerEntry)
m.mu.Unlock()
return false
}
time.Sleep(10 * time.Millisecond)
}
return true
}
// Close 关闭定时器管理器
func (m *TimerManager) Close() {
// 1. 停止接受新定时器
atomic.StoreInt32(&m.stopping, 1)
// 2. 优雅关闭:等待回调队列排空
m.gracefulShutdown(5 * time.Second)
// 3. 关闭调度器 LState
if m.schedulerL != nil {
m.schedulerL.Close()
m.schedulerL = nil
}
}
// gracefulShutdown 优雅关闭:排空回调队列,超时后放弃
func (m *TimerManager) gracefulShutdown(timeout time.Duration) {
m.queueMu.Lock()
m.queueClosed = true
close(m.callbackQueue)
m.queueMu.Unlock()
// 等待调度器 goroutine 退出
select {
case <-m.schedulerDone:
case <-time.After(timeout):
abandoned := len(m.callbackQueue)
if abandoned > 0 {
log.Printf("[lua] shutdown timeout: %d callbacks abandoned", abandoned)
}
}
}
// ActiveCount 返回活跃定时器数
func (m *TimerManager) ActiveCount() int32 {
return atomic.LoadInt32(&m.active)
}
// RegisterTimerAPI 注册 ngx.timer API
func RegisterTimerAPI(L *glua.LState, manager *TimerManager, ngx *glua.LTable) {
// 创建 ngx.timer 表
timer := L.NewTable()
// ngx.timer.at(delay, callback, ...)
L.SetField(timer, "at", L.NewFunction(func(L *glua.LState) int {
// 检查参数
delay := float64(L.CheckNumber(1))
callback := L.CheckFunction(2)
// 收集额外参数
args := []glua.LValue{}
for i := 3; i <= L.GetTop(); i++ {
args = append(args, L.Get(i))
}
// 创建定时器
handle, err := manager.At(time.Duration(delay)*time.Second, callback, args)
if err != nil {
L.Push(glua.LNil)
L.Push(glua.LString(err.Error()))
return 2
}
if handle == nil {
L.Push(glua.LNil)
L.Push(glua.LString("server shutting down"))
return 2
}
// 返回定时器句柄
ud := L.NewUserData()
ud.Value = handle
L.SetMetatable(ud, L.GetTypeMetatable("ngx.timer.handle"))
L.Push(ud)
return 1
}))
// ngx.timer.running_count()
L.SetField(timer, "running_count", L.NewFunction(func(L *glua.LState) int {
L.Push(glua.LNumber(manager.ActiveCount()))
return 1
}))
L.SetField(ngx, "timer", timer)
// 创建定时器句柄元表
mt := L.NewTypeMetatable("ngx.timer.handle")
L.SetField(mt, "__index", L.NewFunction(timerHandleIndex))
L.SetField(mt, "__tostring", L.NewFunction(timerHandleToString))
// 注册方法
methods := L.NewTable()
L.SetField(methods, "cancel", L.NewFunction(timerHandleCancel))
L.SetField(mt, "methods", methods)
}
// timerHandleIndex 定时器句柄索引
func timerHandleIndex(L *glua.LState) int {
ud := L.CheckUserData(1)
_, ok := ud.Value.(*TimerHandle)
if !ok {
L.RaiseError("invalid timer handle")
return 0
}
// 检查是否是方法
//nolint:errcheck // 类型断言检查
methods := L.GetField(L.Get(1).(*glua.LUserData).Metatable, "methods")
if method := L.GetField(methods, L.CheckString(2)); method != glua.LNil {
L.Push(method)
return 1
}
L.Push(glua.LNil)
return 1
}
// timerHandleToString 定时器句柄字符串表示
func timerHandleToString(L *glua.LState) int {
ud := L.CheckUserData(1)
handle, ok := ud.Value.(*TimerHandle)
if !ok {
L.Push(glua.LString("invalid timer handle"))
return 1
}
L.Push(glua.LString("ngx.timer.handle:" + uint64ToStr(handle.id)))
return 1
}
// timerHandleCancel 取消定时器
func timerHandleCancel(L *glua.LState) int {
ud := L.CheckUserData(1)
handle, ok := ud.Value.(*TimerHandle)
if !ok {
L.RaiseError("invalid timer handle")
return 0
}
if handle.manager == nil {
L.Push(glua.LFalse)
L.Push(glua.LString("timer manager not available"))
return 2
}
ok = handle.manager.Cancel(handle)
if ok {
L.Push(glua.LTrue)
return 1
}
L.Push(glua.LFalse)
L.Push(glua.LString("timer not found or already executed"))
return 2
}
// uint64ToStr 整数转字符串
func uint64ToStr(n uint64) string {
if n == 0 {
return "0"
}
var buf []byte
for n > 0 {
buf = append(buf, byte('0'+n%10))
n /= 10
}
// 反转
for i, j := 0, len(buf)-1; i < j; i, j = i+1, j-1 {
buf[i], buf[j] = buf[j], buf[i]
}
return string(buf)
}