fix(lua): 修复 Lua 引擎并发安全问题

- 缓存 coroutine.yield/status 函数,避免并发读取全局 Lua 状态机
- 添加 ngxRegisterMu 锁保护共享 ngx 表的并发写入
- 各 API 注册函数检查字段是否已存在,避免重复写入
- TCPSocket.currentOp 字段添加锁保护

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
xfy 2026-04-29 14:19:11 +08:00
parent dbeb2217a5
commit 247fa81c00
11 changed files with 161 additions and 125 deletions

View File

@ -31,7 +31,7 @@ ulw 运行 make lint并修复
ulw 深度分析下当前项目的性能
ulw 完善性能基准测试
ulw @Makefile 完善性能基准测试
ulw 深度分析下代码质量

View File

@ -578,15 +578,13 @@ ngx.header["X-Lua-Processed"] = "true"`
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ctx := &fasthttp.RequestCtx{}
ctx.Request.SetRequestURI("/api/test")
ctx.Request.Header.SetMethod(fasthttp.MethodGet)
ctx.Request.Header.Set("Host", "example.com")
finalHandler(ctx)
}
})
for b.Loop() {
ctx := &fasthttp.RequestCtx{}
ctx.Request.SetRequestURI("/api/test")
ctx.Request.Header.SetMethod(fasthttp.MethodGet)
ctx.Request.Header.Set("Host", "example.com")
finalHandler(ctx)
}
}
// BenchmarkE2EMultiLuaPhase 基准测试多 Lua 阶段执行路径。
@ -650,15 +648,13 @@ func BenchmarkE2EMultiLuaPhase(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ctx := &fasthttp.RequestCtx{}
ctx.Request.SetRequestURI("/api/test")
ctx.Request.Header.SetMethod(fasthttp.MethodGet)
ctx.Request.Header.Set("Host", "example.com")
finalHandler(ctx)
}
})
for b.Loop() {
ctx := &fasthttp.RequestCtx{}
ctx.Request.SetRequestURI("/api/test")
ctx.Request.Header.SetMethod(fasthttp.MethodGet)
ctx.Request.Header.Set("Host", "example.com")
finalHandler(ctx)
}
}
// ============================================================
@ -1231,8 +1227,9 @@ func BenchmarkE2EStaticWithCompression(b *testing.B) {
// 创建可压缩的内容
jsonContent := make([]byte, 20*1024) // 20KB JSON
template := `{"key":"value","data":"repeat"}`
for i := range jsonContent {
jsonContent[i] = `{"key":"value","data":"repeat"}`[i%32]
jsonContent[i] = template[i%len(template)]
}
jsonPath := filepath.Join(staticDir, "compressible.json")
if err := os.WriteFile(jsonPath, jsonContent, 0o644); err != nil {

View File

@ -23,6 +23,11 @@ import (
// - L: Lua 状态
// - ngxTable: ngx 全局表
func RegisterNgxCtxAPI(L *glua.LState, ngxTable *glua.LTable) {
// 检查 ngx.ctx 是否已存在,避免并发写入
if existing := ngxTable.RawGetString("ctx"); existing != glua.LNil {
return
}
// 创建请求级的 ctx table
ctxTable := L.NewTable()

View File

@ -108,7 +108,10 @@ func newNgxLogAPI(ctx *fasthttp.RequestCtx, luaCtx *LuaContext, logger *zerolog.
}
}
// RegisterNgxLogAPI 在 Lua 状态机中注册 ngx.log 和输出控制 API
// RegisterNgxLogAPI 在 Lua 状态机中注册 ngx.log 和输出控制 API。
//
// 常量日志级别、HTTP状态码等只在首次注册时写入避免并发写入冲突。
// 每次请求都会重新注册请求特定的函数log, say, print, flush, exit, redirect
func RegisterNgxLogAPI(L *glua.LState, api *ngxLogAPI) {
// 获取或创建 ngx 表
var ngx *glua.LTable
@ -124,60 +127,64 @@ func RegisterNgxLogAPI(L *glua.LState, api *ngxLogAPI) {
ngx = L.NewTable()
}
// 注册日志级别常量
ngx.RawSetString("STDERR", glua.LNumber(LogStderr))
ngx.RawSetString("EMERG", glua.LNumber(LogEmerg))
ngx.RawSetString("ALERT", glua.LNumber(LogAlert))
ngx.RawSetString("CRIT", glua.LNumber(LogCrit))
ngx.RawSetString("ERR", glua.LNumber(LogErr))
ngx.RawSetString("WARN", glua.LNumber(LogWarn))
ngx.RawSetString("NOTICE", glua.LNumber(LogNotice))
ngx.RawSetString("INFO", glua.LNumber(LogInfo))
ngx.RawSetString("DEBUG", glua.LNumber(LogDebug))
// 检查常量是否已注册(通过 STDERR 常量判断)
// 如果已注册,跳过常量写入,避免并发写入全局表
if ngx.RawGetString("STDERR") == glua.LNil {
// 注册日志级别常量
ngx.RawSetString("STDERR", glua.LNumber(LogStderr))
ngx.RawSetString("EMERG", glua.LNumber(LogEmerg))
ngx.RawSetString("ALERT", glua.LNumber(LogAlert))
ngx.RawSetString("CRIT", glua.LNumber(LogCrit))
ngx.RawSetString("ERR", glua.LNumber(LogErr))
ngx.RawSetString("WARN", glua.LNumber(LogWarn))
ngx.RawSetString("NOTICE", glua.LNumber(LogNotice))
ngx.RawSetString("INFO", glua.LNumber(LogInfo))
ngx.RawSetString("DEBUG", glua.LNumber(LogDebug))
// 注册 HTTP 状态码常量
ngx.RawSetString("HTTP_CONTINUE", glua.LNumber(HTTPContinue))
ngx.RawSetString("HTTP_SWITCHING_PROTOCOLS", glua.LNumber(HTTPSwitchingProtocols))
ngx.RawSetString("HTTP_OK", glua.LNumber(HTTPOK))
ngx.RawSetString("HTTP_CREATED", glua.LNumber(HTTPCreated))
ngx.RawSetString("HTTP_ACCEPTED", glua.LNumber(HTTPAccepted))
ngx.RawSetString("HTTP_NO_CONTENT", glua.LNumber(HTTPNoContent))
ngx.RawSetString("HTTP_PARTIAL_CONTENT", glua.LNumber(HTTPPartialContent))
ngx.RawSetString("HTTP_MOVED_PERMANENTLY", glua.LNumber(HTTPMovedPermanently))
ngx.RawSetString("HTTP_FOUND", glua.LNumber(HTTPFound))
ngx.RawSetString("HTTP_SEE_OTHER", glua.LNumber(HTTPSeeOther))
ngx.RawSetString("HTTP_NOT_MODIFIED", glua.LNumber(HTTPNotModified))
ngx.RawSetString("HTTP_TEMPORARY_REDIRECT", glua.LNumber(HTTPTemporaryRedirect))
ngx.RawSetString("HTTP_PERMANENT_REDIRECT", glua.LNumber(HTTPPermanentRedirect))
ngx.RawSetString("HTTP_BAD_REQUEST", glua.LNumber(HTTPBadRequest))
ngx.RawSetString("HTTP_UNAUTHORIZED", glua.LNumber(HTTPUnauthorized))
ngx.RawSetString("HTTP_FORBIDDEN", glua.LNumber(HTTPForbidden))
ngx.RawSetString("HTTP_NOT_FOUND", glua.LNumber(HTTPNotFound))
ngx.RawSetString("HTTP_METHOD_NOT_ALLOWED", glua.LNumber(HTTPMethodNotAllowed))
ngx.RawSetString("HTTP_REQUEST_TIMEOUT", glua.LNumber(HTTPRequestTimeout))
ngx.RawSetString("HTTP_CONFLICT", glua.LNumber(HTTPConflict))
ngx.RawSetString("HTTP_GONE", glua.LNumber(HTTPGone))
ngx.RawSetString("HTTP_LENGTH_REQUIRED", glua.LNumber(HTTPLengthRequired))
ngx.RawSetString("HTTP_PAYLOAD_TOO_LARGE", glua.LNumber(HTTPPayloadTooLarge))
ngx.RawSetString("HTTP_URI_TOO_LONG", glua.LNumber(HTTPURITooLong))
ngx.RawSetString("HTTP_UNSUPPORTED_MEDIA_TYPE", glua.LNumber(HTTPUnsupportedMedia))
ngx.RawSetString("HTTP_RANGE_NOT_SATISFIABLE", glua.LNumber(HTTPRangeNotSatisfiable))
ngx.RawSetString("HTTP_TOO_MANY_REQUESTS", glua.LNumber(HTTPTooManyRequests))
ngx.RawSetString("HTTP_INTERNAL_SERVER_ERROR", glua.LNumber(HTTPInternalServerError))
ngx.RawSetString("HTTP_NOT_IMPLEMENTED", glua.LNumber(HTTPNotImplemented))
ngx.RawSetString("HTTP_BAD_GATEWAY", glua.LNumber(HTTPBadGateway))
ngx.RawSetString("HTTP_SERVICE_UNAVAILABLE", glua.LNumber(HTTPServiceUnavailable))
ngx.RawSetString("HTTP_GATEWAY_TIMEOUT", glua.LNumber(HTTPGatewayTimeout))
ngx.RawSetString("HTTP_VERSION_NOT_SUPPORTED", glua.LNumber(HTTPHTTPVersionNotSupported))
// 注册 HTTP 状态码常量
ngx.RawSetString("HTTP_CONTINUE", glua.LNumber(HTTPContinue))
ngx.RawSetString("HTTP_SWITCHING_PROTOCOLS", glua.LNumber(HTTPSwitchingProtocols))
ngx.RawSetString("HTTP_OK", glua.LNumber(HTTPOK))
ngx.RawSetString("HTTP_CREATED", glua.LNumber(HTTPCreated))
ngx.RawSetString("HTTP_ACCEPTED", glua.LNumber(HTTPAccepted))
ngx.RawSetString("HTTP_NO_CONTENT", glua.LNumber(HTTPNoContent))
ngx.RawSetString("HTTP_PARTIAL_CONTENT", glua.LNumber(HTTPPartialContent))
ngx.RawSetString("HTTP_MOVED_PERMANENTLY", glua.LNumber(HTTPMovedPermanently))
ngx.RawSetString("HTTP_FOUND", glua.LNumber(HTTPFound))
ngx.RawSetString("HTTP_SEE_OTHER", glua.LNumber(HTTPSeeOther))
ngx.RawSetString("HTTP_NOT_MODIFIED", glua.LNumber(HTTPNotModified))
ngx.RawSetString("HTTP_TEMPORARY_REDIRECT", glua.LNumber(HTTPTemporaryRedirect))
ngx.RawSetString("HTTP_PERMANENT_REDIRECT", glua.LNumber(HTTPPermanentRedirect))
ngx.RawSetString("HTTP_BAD_REQUEST", glua.LNumber(HTTPBadRequest))
ngx.RawSetString("HTTP_UNAUTHORIZED", glua.LNumber(HTTPUnauthorized))
ngx.RawSetString("HTTP_FORBIDDEN", glua.LNumber(HTTPForbidden))
ngx.RawSetString("HTTP_NOT_FOUND", glua.LNumber(HTTPNotFound))
ngx.RawSetString("HTTP_METHOD_NOT_ALLOWED", glua.LNumber(HTTPMethodNotAllowed))
ngx.RawSetString("HTTP_REQUEST_TIMEOUT", glua.LNumber(HTTPRequestTimeout))
ngx.RawSetString("HTTP_CONFLICT", glua.LNumber(HTTPConflict))
ngx.RawSetString("HTTP_GONE", glua.LNumber(HTTPGone))
ngx.RawSetString("HTTP_LENGTH_REQUIRED", glua.LNumber(HTTPLengthRequired))
ngx.RawSetString("HTTP_PAYLOAD_TOO_LARGE", glua.LNumber(HTTPPayloadTooLarge))
ngx.RawSetString("HTTP_URI_TOO_LONG", glua.LNumber(HTTPURITooLong))
ngx.RawSetString("HTTP_UNSUPPORTED_MEDIA_TYPE", glua.LNumber(HTTPUnsupportedMedia))
ngx.RawSetString("HTTP_RANGE_NOT_SATISFIABLE", glua.LNumber(HTTPRangeNotSatisfiable))
ngx.RawSetString("HTTP_TOO_MANY_REQUESTS", glua.LNumber(HTTPTooManyRequests))
ngx.RawSetString("HTTP_INTERNAL_SERVER_ERROR", glua.LNumber(HTTPInternalServerError))
ngx.RawSetString("HTTP_NOT_IMPLEMENTED", glua.LNumber(HTTPNotImplemented))
ngx.RawSetString("HTTP_BAD_GATEWAY", glua.LNumber(HTTPBadGateway))
ngx.RawSetString("HTTP_SERVICE_UNAVAILABLE", glua.LNumber(HTTPServiceUnavailable))
ngx.RawSetString("HTTP_GATEWAY_TIMEOUT", glua.LNumber(HTTPGatewayTimeout))
ngx.RawSetString("HTTP_VERSION_NOT_SUPPORTED", glua.LNumber(HTTPHTTPVersionNotSupported))
// 特殊常量
ngx.RawSetString("ERROR", glua.LNumber(-1))
ngx.RawSetString("OK", glua.LNumber(0))
ngx.RawSetString("AGAIN", glua.LNumber(-2))
ngx.RawSetString("DONE", glua.LNumber(-4))
ngx.RawSetString("DECLINED", glua.LNumber(-5))
// 特殊常量
ngx.RawSetString("ERROR", glua.LNumber(-1))
ngx.RawSetString("OK", glua.LNumber(0))
ngx.RawSetString("AGAIN", glua.LNumber(-2))
ngx.RawSetString("DONE", glua.LNumber(-4))
ngx.RawSetString("DECLINED", glua.LNumber(-5))
}
// 注册 ngx.log 函数
// 注册 ngx.log 函数(每次请求重新注册以绑定正确的 ctx
ngx.RawSetString("log", L.NewFunction(api.luaLog))
// 注册输出控制函数

View File

@ -105,8 +105,15 @@ func newNgxReqAPI(ctx *fasthttp.RequestCtx) *ngxReqAPI {
// RegisterNgxReqAPI 在 Lua 状态机中注册 ngx.req API
// 这是主入口函数,由 LuaEngine 在初始化时调用
func RegisterNgxReqAPI(L *glua.LState, api *ngxReqAPI, ngxTable *glua.LTable) {
// 创建 ngx.req 子表
ngxReq := L.NewTable()
// 检查 ngx.req 是否已存在,避免并发写入
var ngxReq *glua.LTable
if existingReq := ngxTable.RawGetString("req"); existingReq == glua.LNil {
// 首次创建 ngx.req 子表
ngxReq = L.NewTable()
ngxTable.RawSetString("req", ngxReq)
} else {
ngxReq = existingReq.(*glua.LTable)
}
// 直接映射层 APIget_method
// 特点:直接访问 fasthttp.RequestCtx零拷贝最小开销
@ -148,9 +155,6 @@ func RegisterNgxReqAPI(L *glua.LState, api *ngxReqAPI, ngxTable *glua.LTable) {
// 伪非阻塞层 APIread_body
// 特点确保请求体已被读取fasthttp 已预读)
ngxReq.RawSetString("read_body", L.NewFunction(api.luaReadBody))
// 将 ngx.req 添加到 ngx 表
ngxTable.RawSetString("req", ngxReq)
}
// ==================== 直接映射层 API ====================

View File

@ -54,31 +54,28 @@ func RegisterNgxRespAPI(L *glua.LState, api *ngxRespAPI) {
L.SetGlobal("ngx", ngx)
}
// 创建 ngx.resp 子表
ngxResp := L.NewTable()
// ngx.resp.get_status() - 获取响应状态码
ngxResp.RawSetString("get_status", L.NewFunction(api.luaGetStatus))
// ngx.resp.set_status(code) - 设置响应状态码
ngxResp.RawSetString("set_status", L.NewFunction(api.luaSetStatus))
// ngx.resp.get_headers(max_headers?) - 获取响应头表
ngxResp.RawSetString("get_headers", L.NewFunction(api.luaGetHeaders))
// ngx.resp.set_header(key, value) - 设置响应头
ngxResp.RawSetString("set_header", L.NewFunction(api.luaSetHeader))
// ngx.resp.clear_header(key) - 清除响应头
ngxResp.RawSetString("clear_header", L.NewFunction(api.luaClearHeader))
// 将 ngx.resp 添加到 ngx
// 类型断言检查
ngxTable, ok := ngx.(*glua.LTable)
if !ok {
return
}
ngxTable.RawSetString("resp", ngxResp)
// 检查 ngx.resp 是否已存在,避免并发写入
var ngxResp *glua.LTable
if existingResp := ngxTable.RawGetString("resp"); existingResp == glua.LNil {
// 首次创建 ngx.resp 子表
ngxResp = L.NewTable()
ngxTable.RawSetString("resp", ngxResp)
} else {
ngxResp = existingResp.(*glua.LTable)
}
// 每次请求更新函数以绑定正确的 ctx
ngxResp.RawSetString("get_status", L.NewFunction(api.luaGetStatus))
ngxResp.RawSetString("set_status", L.NewFunction(api.luaSetStatus))
ngxResp.RawSetString("get_headers", L.NewFunction(api.luaGetHeaders))
ngxResp.RawSetString("set_header", L.NewFunction(api.luaSetHeader))
ngxResp.RawSetString("clear_header", L.NewFunction(api.luaClearHeader))
}
// ==================== API 实现 ====================

View File

@ -135,12 +135,16 @@ func (s *TCPSocket) Connect(host string, port int) error {
// 开始操作
op := s.manager.StartOperation(s, OpConnect, s.connectTimeout)
s.mu.Lock()
s.currentOp = op
s.mu.Unlock()
// 在 goroutine 中执行连接
go func() {
defer func() {
s.mu.Lock()
s.currentOp = nil
s.mu.Unlock()
}()
dialer := &net.Dialer{
@ -533,12 +537,6 @@ const tcpSocketMT = "tcp_socket"
// RegisterTCPSocketAPI 注册 TCP socket API
func RegisterTCPSocketAPI(L *glua.LState, engine *LuaEngine) {
// 创建 ngx.socket 表
socket := L.NewTable()
// ngx.socket.tcp()
socket.RawSetString("tcp", L.NewFunction(newTCPSocketFunc(engine)))
// 确保 ngx 表存在
ngx := L.GetGlobal("ngx")
var ngxTbl *glua.LTable
@ -549,9 +547,21 @@ func RegisterTCPSocketAPI(L *glua.LState, engine *LuaEngine) {
ngxTbl = L.NewTable()
L.SetGlobal("ngx", ngxTbl)
}
ngxTbl.RawSetString("socket", socket)
// 注册元表
// 检查 ngx.socket 是否已存在,避免并发写入
var socket *glua.LTable
if existing := ngxTbl.RawGetString("socket"); existing == glua.LNil {
// 首次创建 ngx.socket 表
socket = L.NewTable()
ngxTbl.RawSetString("socket", socket)
} else {
socket = existing.(*glua.LTable)
}
// 每次请求更新 tcp 函数以绑定正确的 engine
socket.RawSetString("tcp", L.NewFunction(newTCPSocketFunc(engine)))
// 注册元表(仅首次)
registerTCPSocketMetaTable(L)
}

View File

@ -57,6 +57,11 @@ func newNgxVarAPI(ctx *fasthttp.RequestCtx) *ngxVarAPI {
// RegisterNgxVarAPI 在 Lua 状态机中注册 ngx.var API
// 使用元表实现动态读写ngx.var.key 和 ngx.var[key]
func RegisterNgxVarAPI(L *glua.LState, api *ngxVarAPI, ngxTable *glua.LTable) {
// 检查 ngx 表是否已存在 var 字段,避免重复写入
if existing := ngxTable.RawGetString("var"); existing != glua.LNil {
return
}
// 创建 ngx.var 表(使用元表实现动态访问)
ngxVar := L.NewTable()

View File

@ -173,26 +173,15 @@ func (c *LuaCoroutine) SetupSandbox() error {
// 仅保留安全的 yield 和 status 函数。
// 被拦截的函数返回友好错误消息,而非直接崩溃。
func (c *LuaCoroutine) setupSecureCoroutineLib() {
// 获取原始 coroutine 表
originalCoroutine := c.Engine.L.GetGlobal("coroutine")
if originalCoroutine == glua.LNil {
return // coroutine 库未加载
}
origTable, ok := originalCoroutine.(*glua.LTable)
if !ok {
return
}
// 创建安全的 coroutine 表
// 创建安全的 coroutine 表(使用 Engine 缓存的函数,避免并发读取)
safeCoroutine := c.Co.NewTable()
// 仅保留安全的函数yield 和 status
if yield := origTable.RawGetString("yield"); yield != glua.LNil {
safeCoroutine.RawSetString("yield", yield)
// 使用缓存的函数(在 Engine.NewEngine 时已获取)
if c.Engine.coroYieldFn != nil && c.Engine.coroYieldFn != glua.LNil {
safeCoroutine.RawSetString("yield", c.Engine.coroYieldFn)
}
if status := origTable.RawGetString("status"); status != glua.LNil {
safeCoroutine.RawSetString("status", status)
if c.Engine.coroStatusFn != nil && c.Engine.coroStatusFn != glua.LNil {
safeCoroutine.RawSetString("status", c.Engine.coroStatusFn)
}
// 拦截函数 - 返回友好错误
@ -226,6 +215,10 @@ func (c *LuaCoroutine) setupSecureCoroutineLib() {
// - ngx.timer定时器
// - ngx.location子请求
func (c *LuaCoroutine) setupNgxAPI() {
// 加锁保护对共享 ngx 表的并发写入
c.Engine.ngxRegisterMu.Lock()
defer c.Engine.ngxRegisterMu.Unlock()
// 创建 ngx 表
ngx := c.Co.NewTable()

View File

@ -77,6 +77,13 @@ type LuaEngine struct {
// 回调队列,定时器触发后将回调入队
callbackQueue chan *CallbackEntry
// 缓存coroutine 库函数(避免并发读取 Engine.L
coroYieldFn glua.LValue
coroStatusFn glua.LValue
// ngx 表注册锁(保护并发写入共享的全局 ngx 表)
ngxRegisterMu sync.Mutex
// 上下文及取消函数
ctx context.Context
cancel context.CancelFunc
@ -195,6 +202,15 @@ func NewEngine(config *Config) (*LuaEngine, error) {
}
}
// 步骤7: 缓存 coroutine 库的安全函数(避免并发读取 Engine.L
coroTable := L.GetGlobal("coroutine")
if coroTable != glua.LNil {
if tbl, ok := coroTable.(*glua.LTable); ok {
engine.coroYieldFn = tbl.RawGetString("yield")
engine.coroStatusFn = tbl.RawGetString("status")
}
}
return engine, nil
}

View File

@ -175,13 +175,15 @@ func TestDelayedResponseWriter_WithLuaEngine(t *testing.T) {
_ = err
}
// BenchmarkResponseInterceptor 基准测试响应拦截器
// BenchmarkResponseInterceptor 基准测试响应拦截器。
//
// 注意:每个 goroutine 必须创建独立的 RequestCtx因为 fasthttp.RequestCtx
// 不是并发安全的。Flush() 会修改 ResponseHeader 的内部 map。
func BenchmarkResponseInterceptor(b *testing.B) {
ctx := mockRequestCtx()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ctx := mockRequestCtx()
ri := NewResponseInterceptor(ctx)
ri.Enable()
ri.WriteString("Hello, World!")