1351 lines
36 KiB
Go
1351 lines
36 KiB
Go
package lua
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"net"
|
||
"sync"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/stretchr/testify/assert"
|
||
"github.com/stretchr/testify/require"
|
||
glua "github.com/yuin/gopher-lua"
|
||
)
|
||
|
||
// TestNewTCPSocket_NilManager 测试 nil manager 使用默认管理器
|
||
func TestNewTCPSocket_NilManager(t *testing.T) {
|
||
socket := NewTCPSocket(nil)
|
||
require.NotNil(t, socket)
|
||
assert.Equal(t, SocketStateIdle, socket.State())
|
||
assert.Equal(t, 60*time.Second, socket.readTimeout)
|
||
assert.Equal(t, 60*time.Second, socket.sendTimeout)
|
||
assert.Equal(t, 30*time.Second, socket.connectTimeout)
|
||
assert.False(t, socket.IsClosed())
|
||
socket.Close()
|
||
}
|
||
|
||
// TestNewTCPSocket_ExplicitManager 测试显式管理器
|
||
func TestNewTCPSocket_ExplicitManager(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
require.NotNil(t, socket)
|
||
assert.Equal(t, cm, socket.manager)
|
||
socket.Close()
|
||
}
|
||
|
||
// TestTCPSocket_ConnectNotIdle 测试非空闲状态下连接失败
|
||
func TestTCPSocket_ConnectNotIdle(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
// 设置状态为非空闲
|
||
socket.setState(SocketStateConnected)
|
||
defer socket.setState(SocketStateIdle)
|
||
|
||
err := socket.Connect("127.0.0.1", 9999)
|
||
assert.Error(t, err)
|
||
assert.Contains(t, err.Error(), "socket not idle")
|
||
}
|
||
|
||
// TestTCPSocket_Send_NotConnected 测试未连接时发送失败
|
||
func TestTCPSocket_Send_NotConnected(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
// 未连接状态下发送
|
||
n, err := socket.Send([]byte("test"))
|
||
assert.Error(t, err)
|
||
assert.Equal(t, 0, n)
|
||
assert.Contains(t, err.Error(), "socket not connected")
|
||
}
|
||
|
||
// TestTCPSocket_SendAsync_NotConnected 测试未连接时异步发送失败
|
||
func TestTCPSocket_SendAsync_NotConnected(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
op, err := socket.SendAsync([]byte("test"))
|
||
assert.Error(t, err)
|
||
assert.Nil(t, op)
|
||
assert.Contains(t, err.Error(), "socket not connected")
|
||
}
|
||
|
||
// TestTCPSocket_Receive_NotConnected 测试未连接时接收失败
|
||
func TestTCPSocket_Receive_NotConnected(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
data, err := socket.Receive(1024)
|
||
assert.Error(t, err)
|
||
assert.Nil(t, data)
|
||
assert.Contains(t, err.Error(), "socket not connected")
|
||
}
|
||
|
||
// TestTCPSocket_ReceiveAsync_NotConnected 测试未连接时异步接收失败
|
||
func TestTCPSocket_ReceiveAsync_NotConnected(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
op, err := socket.ReceiveAsync(1024)
|
||
assert.Error(t, err)
|
||
assert.Nil(t, op)
|
||
assert.Contains(t, err.Error(), "socket not connected")
|
||
}
|
||
|
||
// TestTCPSocket_ReceiveUntil_NotConnected 测试未连接时 ReceiveUntil 失败
|
||
func TestTCPSocket_ReceiveUntil_NotConnected(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
// 设置为 connected 状态但 conn 为 nil
|
||
socket.setState(SocketStateConnected)
|
||
defer socket.setState(SocketStateIdle)
|
||
|
||
data, err := socket.ReceiveUntil("\n", true)
|
||
assert.Error(t, err)
|
||
assert.Nil(t, data)
|
||
assert.Contains(t, err.Error(), "socket connection is nil")
|
||
}
|
||
|
||
// TestTCPSocket_ReceiveUntil_EmptyPattern 测试空模式错误
|
||
func TestTCPSocket_ReceiveUntil_EmptyPattern(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
data, err := socket.ReceiveUntil("", true)
|
||
assert.Error(t, err)
|
||
assert.Nil(t, data)
|
||
assert.Contains(t, err.Error(), "pattern cannot be empty")
|
||
}
|
||
|
||
// TestTCPSocket_SetTimeouts 测试设置超时
|
||
func TestTCPSocket_SetTimeouts(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
socket.SetTimeout(5 * time.Second)
|
||
assert.Equal(t, 5*time.Second, socket.readTimeout)
|
||
assert.Equal(t, 5*time.Second, socket.sendTimeout)
|
||
assert.Equal(t, 5*time.Second, socket.connectTimeout)
|
||
|
||
socket.SetReadTimeout(10 * time.Second)
|
||
assert.Equal(t, 10*time.Second, socket.readTimeout)
|
||
|
||
socket.SetSendTimeout(15 * time.Second)
|
||
assert.Equal(t, 15*time.Second, socket.sendTimeout)
|
||
|
||
socket.SetConnectTimeout(20 * time.Second)
|
||
assert.Equal(t, 20*time.Second, socket.connectTimeout)
|
||
}
|
||
|
||
// TestTCPSocket_StateString 测试状态字符串
|
||
func TestTCPSocket_StateString(t *testing.T) {
|
||
assert.Equal(t, "idle", SocketStateIdle.String())
|
||
assert.Equal(t, "connecting", SocketStateConnecting.String())
|
||
assert.Equal(t, "connected", SocketStateConnected.String())
|
||
assert.Equal(t, "sending", SocketStateSending.String())
|
||
assert.Equal(t, "receiving", SocketStateReceiving.String())
|
||
assert.Equal(t, "closing", SocketStateClosing.String())
|
||
assert.Equal(t, "closed", SocketStateClosed.String())
|
||
assert.Equal(t, "error", SocketStateError.String())
|
||
// 未知状态
|
||
assert.Equal(t, "unknown", SocketState(999).String())
|
||
}
|
||
|
||
// TestTCPSocket_LocalAddr_RemoteAddr_NotConnected 测试未连接时地址返回 nil
|
||
func TestTCPSocket_LocalAddr_RemoteAddr_NotConnected(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
assert.Nil(t, socket.LocalAddr())
|
||
assert.Nil(t, socket.RemoteAddr())
|
||
}
|
||
|
||
// TestTCPSocket_ConnectAsync 测试 ConnectAsync
|
||
func TestTCPSocket_ConnectAsync(t *testing.T) {
|
||
_, cleanup := mockEchoServer(t, "127.0.0.1:18801")
|
||
defer cleanup()
|
||
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
L := glua.NewState()
|
||
defer L.Close()
|
||
|
||
op, err := socket.ConnectAsync(L, "127.0.0.1", 18801)
|
||
require.NoError(t, err)
|
||
require.NotNil(t, op)
|
||
|
||
// 等待连接完成
|
||
result, err := op.Wait(context.Background())
|
||
require.NoError(t, err)
|
||
assert.NotNil(t, result)
|
||
}
|
||
|
||
// TestTCPSocket_ConnectAsync_Error 测试 ConnectAsync 错误路径
|
||
func TestTCPSocket_ConnectAsync_Error(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
// 先设置为非空闲状态,ConnectAsync 应失败
|
||
socket.setState(SocketStateConnected)
|
||
defer socket.setState(SocketStateIdle)
|
||
|
||
L := glua.NewState()
|
||
defer L.Close()
|
||
|
||
op, err := socket.ConnectAsync(L, "127.0.0.1", 18800)
|
||
assert.Error(t, err)
|
||
assert.Nil(t, op)
|
||
}
|
||
|
||
// TestTCPSocket_Connect_Failure 测试连接失败(无服务器)
|
||
func TestTCPSocket_Connect_Failure(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
// 连接到不存在的端口,应该返回错误
|
||
err := socket.Connect("127.0.0.1", 18899)
|
||
require.NoError(t, err) // Connect 本身不报错
|
||
|
||
// 等待异步连接完成
|
||
socket.mu.RLock()
|
||
op := socket.currentOp
|
||
socket.mu.RUnlock()
|
||
if op != nil {
|
||
_, err := op.Wait(context.Background())
|
||
assert.Error(t, err) // 连接应该失败
|
||
}
|
||
|
||
// 状态应该变为 error
|
||
assert.Equal(t, SocketStateError, socket.State())
|
||
}
|
||
|
||
// TestTCPSocket_Receive_DefaultSize 测试默认读取大小 (size <= 0)
|
||
func TestTCPSocket_Receive_DefaultSize(t *testing.T) {
|
||
_, cleanup := mockEchoServer(t, "127.0.0.1:18802")
|
||
defer cleanup()
|
||
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
// 连接
|
||
if err := socket.Connect("127.0.0.1", 18802); err != nil {
|
||
t.Fatalf("Connect failed: %v", err)
|
||
}
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 发送数据
|
||
testData := "Hello"
|
||
_, err := socket.Send([]byte(testData))
|
||
require.NoError(t, err)
|
||
|
||
// 使用 size=0 触发默认 4096
|
||
received, err := socket.Receive(0)
|
||
require.NoError(t, err)
|
||
assert.Equal(t, testData, string(received))
|
||
}
|
||
|
||
// TestTCPSocket_ReceiveAsync_DefaultSize 测试异步接收默认大小
|
||
func TestTCPSocket_ReceiveAsync_DefaultSize(t *testing.T) {
|
||
_, cleanup := mockEchoServer(t, "127.0.0.1:18803")
|
||
defer cleanup()
|
||
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
if err := socket.Connect("127.0.0.1", 18803); err != nil {
|
||
t.Fatalf("Connect failed: %v", err)
|
||
}
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 发送
|
||
testData := "AsyncReceive"
|
||
_, err := socket.Send([]byte(testData))
|
||
require.NoError(t, err)
|
||
|
||
// 异步接收,size=-1 触发默认
|
||
op, err := socket.ReceiveAsync(-1)
|
||
require.NoError(t, err)
|
||
require.NotNil(t, op)
|
||
|
||
result, err := op.Wait(context.Background())
|
||
require.NoError(t, err)
|
||
data, ok := result.([]byte)
|
||
require.True(t, ok)
|
||
assert.Equal(t, testData, string(data))
|
||
}
|
||
|
||
// TestTCPSocket_ReceiveUntil_Inclusive 测试 inclusive 模式
|
||
func TestTCPSocket_ReceiveUntil_Inclusive(t *testing.T) {
|
||
_, cleanup := mockEchoServer(t, "127.0.0.1:18804")
|
||
defer cleanup()
|
||
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
if err := socket.Connect("127.0.0.1", 18804); err != nil {
|
||
t.Fatalf("Connect failed: %v", err)
|
||
}
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 发送带分隔符的数据
|
||
testData := "hello|world"
|
||
_, err := socket.Send([]byte(testData))
|
||
require.NoError(t, err)
|
||
|
||
// inclusive=true: 包含模式
|
||
data, err := socket.ReceiveUntil("|", true)
|
||
require.NoError(t, err)
|
||
assert.Equal(t, "hello|", string(data))
|
||
}
|
||
|
||
// TestTCPSocket_ReceiveUntil_Exclusive 测试 exclusive 模式
|
||
func TestTCPSocket_ReceiveUntil_Exclusive(t *testing.T) {
|
||
_, cleanup := mockEchoServer(t, "127.0.0.1:18805")
|
||
defer cleanup()
|
||
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
if err := socket.Connect("127.0.0.1", 18805); err != nil {
|
||
t.Fatalf("Connect failed: %v", err)
|
||
}
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
testData := "hello|world"
|
||
_, err := socket.Send([]byte(testData))
|
||
require.NoError(t, err)
|
||
|
||
// inclusive=false: 不包含模式
|
||
data, err := socket.ReceiveUntil("|", false)
|
||
require.NoError(t, err)
|
||
assert.Equal(t, "hello", string(data))
|
||
}
|
||
|
||
// TestTCPSocket_Close_CompletesPendingOp 测试关闭时完成未完成的操作
|
||
func TestTCPSocket_Close_CompletesPendingOp(t *testing.T) {
|
||
// 使用 slow server 模拟延迟
|
||
ln, err := net.Listen("tcp", "127.0.0.1:18806")
|
||
require.NoError(t, err)
|
||
|
||
var wg sync.WaitGroup
|
||
stopCh := make(chan struct{})
|
||
|
||
// slow server: 接受连接但延迟响应
|
||
go func() {
|
||
for {
|
||
conn, err := ln.Accept()
|
||
if err != nil {
|
||
select {
|
||
case <-stopCh:
|
||
return
|
||
default:
|
||
continue
|
||
}
|
||
}
|
||
wg.Add(1)
|
||
go func(c net.Conn) {
|
||
defer wg.Done()
|
||
// 保持连接打开但不发送数据
|
||
buf := make([]byte, 1)
|
||
c.Read(buf)
|
||
c.Close()
|
||
}(conn)
|
||
}
|
||
}()
|
||
|
||
defer func() {
|
||
close(stopCh)
|
||
ln.Close()
|
||
wg.Wait()
|
||
}()
|
||
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
// 连接
|
||
err = socket.Connect("127.0.0.1", 18806)
|
||
require.NoError(t, err)
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 此时连接应该建立了
|
||
assert.Equal(t, SocketStateConnected, socket.State())
|
||
|
||
// 启动一个异步接收操作
|
||
op, err := socket.ReceiveAsync(1024)
|
||
require.NoError(t, err)
|
||
require.NotNil(t, op)
|
||
|
||
// 在操作完成前关闭 socket
|
||
err = socket.Close()
|
||
assert.NoError(t, err)
|
||
|
||
// 等待操作完成(应该被取消)
|
||
_, err = op.Wait(context.Background())
|
||
assert.Error(t, err)
|
||
assert.Contains(t, err.Error(), "socket closed")
|
||
}
|
||
|
||
// TestTCPSocket_Close_NilSafety 测试 nil socket 的 Close
|
||
func TestTCPSocket_Close_NilSafety(t *testing.T) {
|
||
var s *TCPSocket
|
||
err := s.Close()
|
||
assert.NoError(t, err)
|
||
}
|
||
|
||
// TestTCPSocket_DoubleClose 测试重复关闭
|
||
func TestTCPSocket_DoubleClose(t *testing.T) {
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
|
||
err := socket.Close()
|
||
assert.NoError(t, err)
|
||
|
||
err = socket.Close()
|
||
assert.NoError(t, err) // 第二次关闭不应报错
|
||
|
||
assert.True(t, socket.IsClosed())
|
||
}
|
||
|
||
// TestTCPSocket_Addresses_WhenConnected 测试连接后获取地址
|
||
func TestTCPSocket_Addresses_WhenConnected(t *testing.T) {
|
||
_, cleanup := mockEchoServer(t, "127.0.0.1:18807")
|
||
defer cleanup()
|
||
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
socket := NewTCPSocket(cm)
|
||
defer socket.Close()
|
||
|
||
if err := socket.Connect("127.0.0.1", 18807); err != nil {
|
||
t.Fatalf("Connect failed: %v", err)
|
||
}
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
assert.NotNil(t, socket.LocalAddr())
|
||
assert.NotNil(t, socket.RemoteAddr())
|
||
assert.Contains(t, socket.RemoteAddr().String(), "127.0.0.1:18807")
|
||
}
|
||
|
||
// ---- Lua API Tests ----
|
||
|
||
// TestLuaAPI_newTCPSocketFunc 测试 newTCPSocketFunc
|
||
func TestLuaAPI_newTCPSocketFunc(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
assert(sock ~= nil)
|
||
assert(type(sock) == "userdata")
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketConnect 测试 tcpSocketConnect
|
||
func TestLuaAPI_tcpSocketConnect(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// 测试 connect 返回值结构(不等待实际连接完成,因为没有 yield 处理)
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local res1, res2 = sock:connect("127.0.0.1", 9999)
|
||
-- res1 应该是 "cosocket_connect",res2 是 op ID
|
||
assert(type(res1) == "string")
|
||
assert(res1 == "cosocket_connect")
|
||
assert(type(res2) == "number")
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketConnect_WithError 测试 connect 错误返回
|
||
func TestLuaAPI_tcpSocketConnect_WithError(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// 尝试连接到非空闲 socket(已连接过但这里没有,用非法端口)
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
-- 先用一个 nil 测试 connect 的 Lua 参数错误
|
||
local res, err = pcall(function()
|
||
sock:connect(123) -- wrong argument types
|
||
end)
|
||
`)
|
||
// Lua 可能 raise error,这取决于实现
|
||
_ = err
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketSend 测试 tcpSocketSend
|
||
func TestLuaAPI_tcpSocketSend(t *testing.T) {
|
||
_, cleanup := mockEchoServer(t, "127.0.0.1:18809")
|
||
defer cleanup()
|
||
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// connect 和 send 都会返回 yield 值(cosocket_xxx, op_id)
|
||
// 在没有实际 yield 处理的情况下,只测试不报错
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local res1, res2 = sock:connect("127.0.0.1", 18809)
|
||
-- res1 应该是 "cosocket_connect",res2 应该是 op ID
|
||
-- 没有 yield 处理,连接实际未完成
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketSend_Error 测试 send 错误(未连接时发送)
|
||
func TestLuaAPI_tcpSocketSend_Error(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local res, err = sock:send("hello")
|
||
-- 未连接时应该返回 nil + error
|
||
assert(res == nil)
|
||
assert(err ~= nil)
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketReceive 测试 tcpSocketReceive
|
||
func TestLuaAPI_tcpSocketReceive(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// 测试未连接时的 receive 返回错误
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local res, err = sock:receive(1024)
|
||
-- 未连接时应该返回 nil + error
|
||
assert(res == nil)
|
||
assert(err ~= nil)
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketReceive_Error 测试 receive 错误(未连接时接收)
|
||
func TestLuaAPI_tcpSocketReceive_Error(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local res, err = sock:receive(1024)
|
||
assert(res == nil)
|
||
assert(err ~= nil)
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketReceive_Pattern 测试 receive 带模式
|
||
func TestLuaAPI_tcpSocketReceive_Pattern(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// 测试未连接时 receive("*a") 返回错误
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local res, err = sock:receive("*a")
|
||
assert(res == nil)
|
||
assert(err ~= nil)
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketReceive_UnknownPattern 测试未知模式
|
||
func TestLuaAPI_tcpSocketReceive_UnknownPattern(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
cm := NewCosocketManager()
|
||
defer cm.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// 测试 unknown pattern "*x"
|
||
// 需要先连接才能进入 pattern 匹配,但这里直接测试模式错误路径
|
||
// 实际上 receive("*x") 在未连接时会先报 "not connected"
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
-- 未连接时用 *x 模式会先报 not connected
|
||
local res, err = sock:receive("*x")
|
||
assert(res == nil)
|
||
assert(err ~= nil)
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketReceiveWithTable 测试 receive 带 table 参数
|
||
func TestLuaAPI_tcpSocketReceiveWithTable(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// 未连接时 receive 带 table 参数返回错误
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local res, err = sock:receive({timeout = 5000})
|
||
assert(res == nil)
|
||
assert(err ~= nil)
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketReceiveUntil 测试 receiveuntil
|
||
func TestLuaAPI_tcpSocketReceiveUntil(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// 未连接时 receiveuntil 会先报 not connected
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local res, err = sock:receiveuntil("|")
|
||
assert(res == nil)
|
||
assert(err ~= nil)
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketReceiveUntil_Inclusive 测试 receiveuntil 带 inclusive 选项
|
||
func TestLuaAPI_tcpSocketReceiveUntil_Inclusive(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// 未连接时 receiveuntil 会先报 not connected
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local res, err = sock:receiveuntil("|", {inclusive = true})
|
||
assert(res == nil)
|
||
assert(err ~= nil)
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketClose 测试 tcpSocketClose
|
||
func TestLuaAPI_tcpSocketClose(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local ok = sock:close()
|
||
assert(ok == true)
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketSetTimeout 测试 settimeout
|
||
func TestLuaAPI_tcpSocketSetTimeout(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local ok = sock:settimeout(5000)
|
||
assert(ok == true)
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketSetTimeouts 测试 settimeouts
|
||
func TestLuaAPI_tcpSocketSetTimeouts(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local ok = sock:settimeouts(1000, 2000, 3000)
|
||
assert(ok == true)
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketToString 测试 __tostring
|
||
func TestLuaAPI_tcpSocketToString(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local str = tostring(sock)
|
||
assert(str:find("tcp_socket"))
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketGC 测试 __gc
|
||
func TestLuaAPI_tcpSocketGC(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// 创建 socket 并触发 GC
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
sock = nil
|
||
-- 强制 GC(Lua GC 可能不会立即触发 __gc)
|
||
collectgarbage("collect")
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestCheckTCPSocket_ArgError 测试 checkTCPSocket 参数错误
|
||
func TestCheckTCPSocket_ArgError(t *testing.T) {
|
||
L := glua.NewState()
|
||
defer L.Close()
|
||
|
||
// 传入非 userdata 应该 raise error
|
||
L.Push(glua.LString("not a socket"))
|
||
_ = L.PCall(0, 0, nil)
|
||
// 这不会触发 checkTCPSocket,我们需要通过 Lua 脚本来测试
|
||
_ = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
`)
|
||
// 在没有注册 API 的情况下会失败
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketConnect_WithTimeout 测试 connect 带超时选项
|
||
func TestLuaAPI_tcpSocketConnect_WithTimeout(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// 测试 connect 带超时选项(不等待实际连接完成)
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local res1, res2 = sock:connect("127.0.0.1", 9999, {timeout = 100})
|
||
-- 返回值应该是 "cosocket_connect" 和 op ID
|
||
assert(type(res1) == "string")
|
||
assert(type(res2) == "number")
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestLuaAPI_tcpSocketSend_WithTimeout 测试 send 带超时选项
|
||
func TestLuaAPI_tcpSocketSend_WithTimeout(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
L := engine.GetLStateForTest()
|
||
defer engine.PutLStateForTest(L)
|
||
|
||
ngx := L.NewTable()
|
||
L.SetGlobal("ngx", ngx)
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// 未连接时 send 返回 nil + error
|
||
err = L.DoString(`
|
||
local sock = ngx.socket.tcp()
|
||
local res, err = sock:send("hello", {timeout = 5000})
|
||
-- 未连接时应该返回 nil + error
|
||
assert(res == nil)
|
||
assert(err ~= nil)
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestCosocketYield_Connect 测试 cosocket connect yield 处理
|
||
func TestCosocketYield_Connect(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
coro, err := engine.NewCoroutine(nil)
|
||
require.NoError(t, err)
|
||
defer coro.Close()
|
||
|
||
err = coro.SetupSandbox()
|
||
require.NoError(t, err)
|
||
|
||
// 测试 connect 返回值结构(不等待实际连接完成)
|
||
err = coro.Execute(`
|
||
local sock = ngx.socket.tcp()
|
||
local res1, res2 = sock:connect("127.0.0.1", 9999)
|
||
-- res1 应该是 "cosocket_connect",res2 是 op ID
|
||
assert(type(res1) == "string")
|
||
assert(res1 == "cosocket_connect")
|
||
assert(type(res2) == "number")
|
||
`)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestHandleCosocketYield_Unknown 测试未知 yield reason
|
||
func TestHandleCosocketYield_Unknown(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
coro, err := engine.NewCoroutine(nil)
|
||
require.NoError(t, err)
|
||
defer coro.Close()
|
||
|
||
_, err = coro.HandleCosocketYield("unknown_reason", []glua.LValue{})
|
||
assert.Error(t, err)
|
||
assert.Contains(t, err.Error(), "unknown cosocket yield reason")
|
||
}
|
||
|
||
// TestHandleCosocketConnect_MissingOpID 测试 connect yield 缺少 op ID
|
||
func TestHandleCosocketConnect_MissingOpID(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
coro, err := engine.NewCoroutine(nil)
|
||
require.NoError(t, err)
|
||
defer coro.Close()
|
||
|
||
_, err = coro.HandleCosocketYield("cosocket_connect", []glua.LValue{})
|
||
assert.Error(t, err)
|
||
assert.Contains(t, err.Error(), "requires operation ID")
|
||
}
|
||
|
||
// TestHandleCosocketConnect_OpNotFound 测试 connect yield op 不存在
|
||
func TestHandleCosocketConnect_OpNotFound(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
coro, err := engine.NewCoroutine(nil)
|
||
require.NoError(t, err)
|
||
defer coro.Close()
|
||
|
||
_, err = coro.HandleCosocketYield("cosocket_connect", []glua.LValue{glua.LNumber(999999)})
|
||
assert.Error(t, err)
|
||
assert.Contains(t, err.Error(), "not found")
|
||
}
|
||
|
||
// TestHandleCosocketSend_MissingOpID 测试 send yield 缺少 op ID
|
||
func TestHandleCosocketSend_MissingOpID(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
coro, err := engine.NewCoroutine(nil)
|
||
require.NoError(t, err)
|
||
defer coro.Close()
|
||
|
||
_, err = coro.HandleCosocketYield("cosocket_send", []glua.LValue{})
|
||
assert.Error(t, err)
|
||
assert.Contains(t, err.Error(), "requires operation ID")
|
||
}
|
||
|
||
// TestHandleCosocketSend_OpNotFound 测试 send yield op 不存在
|
||
func TestHandleCosocketSend_OpNotFound(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
coro, err := engine.NewCoroutine(nil)
|
||
require.NoError(t, err)
|
||
defer coro.Close()
|
||
|
||
_, err = coro.HandleCosocketYield("cosocket_send", []glua.LValue{glua.LNumber(999999)})
|
||
assert.Error(t, err)
|
||
assert.Contains(t, err.Error(), "not found")
|
||
}
|
||
|
||
// TestHandleCosocketReceive_MissingOpID 测试 receive yield 缺少 op ID
|
||
func TestHandleCosocketReceive_MissingOpID(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
coro, err := engine.NewCoroutine(nil)
|
||
require.NoError(t, err)
|
||
defer coro.Close()
|
||
|
||
_, err = coro.HandleCosocketYield("cosocket_receive", []glua.LValue{})
|
||
assert.Error(t, err)
|
||
assert.Contains(t, err.Error(), "requires operation ID")
|
||
}
|
||
|
||
// TestHandleCosocketReceive_OpNotFound 测试 receive yield op 不存在
|
||
func TestHandleCosocketReceive_OpNotFound(t *testing.T) {
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
coro, err := engine.NewCoroutine(nil)
|
||
require.NoError(t, err)
|
||
defer coro.Close()
|
||
|
||
_, err = coro.HandleCosocketYield("cosocket_receive", []glua.LValue{glua.LNumber(999999)})
|
||
assert.Error(t, err)
|
||
assert.Contains(t, err.Error(), "not found")
|
||
}
|
||
|
||
// TestHandleCosocketReceive_EmptyData 测试 receive yield 返回空数据
|
||
func TestHandleCosocketReceive_EmptyData(t *testing.T) {
|
||
socket := NewTCPSocket(DefaultCosocketManager)
|
||
defer socket.Close()
|
||
|
||
// 创建一个空的 receive 操作
|
||
op := DefaultCosocketManager.StartOperation(socket, OpReceive, 5*time.Second)
|
||
op.Complete([]byte{}, nil)
|
||
|
||
coro := &LuaCoroutine{
|
||
Engine: nil,
|
||
ExecutionContext: context.Background(),
|
||
}
|
||
|
||
results, err := coro.HandleCosocketYield("cosocket_receive", []glua.LValue{glua.LNumber(op.ID)})
|
||
require.NoError(t, err)
|
||
// 空数据应该返回 nil + "closed"
|
||
assert.Equal(t, 2, len(results))
|
||
assert.Equal(t, glua.LNil, results[0])
|
||
assert.Equal(t, glua.LString("closed"), results[1])
|
||
}
|
||
|
||
// TestHandleCosocketReceive_InvalidResult 测试 receive yield 无效结果类型
|
||
func TestHandleCosocketReceive_InvalidResult(t *testing.T) {
|
||
socket := NewTCPSocket(DefaultCosocketManager)
|
||
defer socket.Close()
|
||
|
||
op := DefaultCosocketManager.StartOperation(socket, OpReceive, 5*time.Second)
|
||
op.Complete("not_bytes", nil) // 非 []byte 类型
|
||
|
||
coro := &LuaCoroutine{
|
||
Engine: nil,
|
||
ExecutionContext: context.Background(),
|
||
}
|
||
|
||
results, err := coro.HandleCosocketYield("cosocket_receive", []glua.LValue{glua.LNumber(op.ID)})
|
||
require.NoError(t, err)
|
||
assert.Equal(t, 2, len(results))
|
||
assert.Equal(t, glua.LNil, results[0])
|
||
assert.Equal(t, glua.LString("invalid result"), results[1])
|
||
}
|
||
|
||
// TestHandleCosocketSend_InvalidResult 测试 send yield 无效结果类型
|
||
func TestHandleCosocketSend_InvalidResult(t *testing.T) {
|
||
socket := NewTCPSocket(DefaultCosocketManager)
|
||
defer socket.Close()
|
||
|
||
op := DefaultCosocketManager.StartOperation(socket, OpSend, 5*time.Second)
|
||
op.Complete("not_int", nil) // 非 int 类型
|
||
|
||
coro := &LuaCoroutine{
|
||
Engine: nil,
|
||
ExecutionContext: context.Background(),
|
||
}
|
||
|
||
results, err := coro.HandleCosocketYield("cosocket_send", []glua.LValue{glua.LNumber(op.ID)})
|
||
require.NoError(t, err)
|
||
assert.Equal(t, 2, len(results))
|
||
assert.Equal(t, glua.LNil, results[0])
|
||
assert.Equal(t, glua.LString("invalid result"), results[1])
|
||
}
|
||
|
||
// TestHandleCosocketConnect_NilResult 测试 connect yield nil result
|
||
func TestHandleCosocketConnect_NilResult(t *testing.T) {
|
||
socket := NewTCPSocket(DefaultCosocketManager)
|
||
defer socket.Close()
|
||
|
||
op := DefaultCosocketManager.StartOperation(socket, OpConnect, 5*time.Second)
|
||
op.Complete(nil, nil)
|
||
|
||
coro := &LuaCoroutine{
|
||
Engine: nil,
|
||
ExecutionContext: context.Background(),
|
||
}
|
||
|
||
results, err := coro.HandleCosocketYield("cosocket_connect", []glua.LValue{glua.LNumber(op.ID)})
|
||
require.NoError(t, err)
|
||
assert.Equal(t, 2, len(results))
|
||
assert.Equal(t, glua.LNil, results[0])
|
||
assert.Equal(t, glua.LNil, results[1])
|
||
}
|
||
|
||
// TestHandleCosocketConnect_WithError 测试 connect yield 带错误
|
||
func TestHandleCosocketConnect_WithError(t *testing.T) {
|
||
socket := NewTCPSocket(DefaultCosocketManager)
|
||
defer socket.Close()
|
||
|
||
op := DefaultCosocketManager.StartOperation(socket, OpConnect, 5*time.Second)
|
||
op.Complete(nil, fmt.Errorf("connection refused"))
|
||
|
||
coro := &LuaCoroutine{
|
||
Engine: nil,
|
||
ExecutionContext: context.Background(),
|
||
}
|
||
|
||
results, err := coro.HandleCosocketYield("cosocket_connect", []glua.LValue{glua.LNumber(op.ID)})
|
||
require.NoError(t, err)
|
||
assert.Equal(t, 2, len(results))
|
||
assert.Equal(t, glua.LNil, results[0])
|
||
assert.Contains(t, string(glua.LVAsString(results[1])), "connection refused")
|
||
}
|
||
|
||
// TestHandleCosocketSend_WithError 测试 send yield 带错误
|
||
func TestHandleCosocketSend_WithError(t *testing.T) {
|
||
socket := NewTCPSocket(DefaultCosocketManager)
|
||
defer socket.Close()
|
||
|
||
op := DefaultCosocketManager.StartOperation(socket, OpSend, 5*time.Second)
|
||
op.Complete(0, fmt.Errorf("write error"))
|
||
|
||
coro := &LuaCoroutine{
|
||
Engine: nil,
|
||
ExecutionContext: context.Background(),
|
||
}
|
||
|
||
results, err := coro.HandleCosocketYield("cosocket_send", []glua.LValue{glua.LNumber(op.ID)})
|
||
require.NoError(t, err)
|
||
assert.Equal(t, 2, len(results))
|
||
assert.Equal(t, glua.LNil, results[0])
|
||
assert.Contains(t, string(glua.LVAsString(results[1])), "write error")
|
||
}
|
||
|
||
// TestHandleCosocketReceive_WithData 测试 receive yield 带数据
|
||
func TestHandleCosocketReceive_WithData(t *testing.T) {
|
||
socket := NewTCPSocket(DefaultCosocketManager)
|
||
defer socket.Close()
|
||
|
||
expected := []byte("hello world")
|
||
op := DefaultCosocketManager.StartOperation(socket, OpReceive, 5*time.Second)
|
||
op.Complete(expected, nil)
|
||
|
||
coro := &LuaCoroutine{
|
||
Engine: nil,
|
||
ExecutionContext: context.Background(),
|
||
}
|
||
|
||
results, err := coro.HandleCosocketYield("cosocket_receive", []glua.LValue{glua.LNumber(op.ID)})
|
||
require.NoError(t, err)
|
||
assert.Equal(t, 1, len(results))
|
||
assert.Equal(t, glua.LString("hello world"), results[0])
|
||
}
|
||
|
||
// TestHandleCosocketSend_Success 测试 send yield 成功
|
||
func TestHandleCosocketSend_Success(t *testing.T) {
|
||
socket := NewTCPSocket(DefaultCosocketManager)
|
||
defer socket.Close()
|
||
|
||
op := DefaultCosocketManager.StartOperation(socket, OpSend, 5*time.Second)
|
||
op.Complete(5, nil)
|
||
|
||
coro := &LuaCoroutine{
|
||
Engine: nil,
|
||
ExecutionContext: context.Background(),
|
||
}
|
||
|
||
results, err := coro.HandleCosocketYield("cosocket_send", []glua.LValue{glua.LNumber(op.ID)})
|
||
require.NoError(t, err)
|
||
assert.Equal(t, 1, len(results))
|
||
assert.Equal(t, glua.LNumber(5), results[0])
|
||
}
|
||
|
||
// TestHandleCosocketConnect_Success 测试 connect yield 成功
|
||
func TestHandleCosocketConnect_Success(t *testing.T) {
|
||
socket := NewTCPSocket(DefaultCosocketManager)
|
||
defer socket.Close()
|
||
|
||
op := DefaultCosocketManager.StartOperation(socket, OpConnect, 5*time.Second)
|
||
op.Complete(&net.TCPConn{}, nil) // 非 nil result
|
||
|
||
coro := &LuaCoroutine{
|
||
Engine: nil,
|
||
ExecutionContext: context.Background(),
|
||
}
|
||
|
||
results, err := coro.HandleCosocketYield("cosocket_connect", []glua.LValue{glua.LNumber(op.ID)})
|
||
require.NoError(t, err)
|
||
assert.Equal(t, 1, len(results))
|
||
assert.Equal(t, glua.LNumber(1), results[0])
|
||
}
|
||
|
||
// TestRegisterTCPSocketMetaTable 测试元表注册
|
||
func TestRegisterTCPSocketMetaTable(t *testing.T) {
|
||
L := glua.NewState()
|
||
defer L.Close()
|
||
|
||
registerTCPSocketMetaTable(L)
|
||
|
||
// 验证元表存在
|
||
mt := L.GetGlobal(tcpSocketMT)
|
||
assert.NotNil(t, mt)
|
||
assert.IsType(t, &glua.LTable{}, mt)
|
||
}
|
||
|
||
// TestRegisterTCPSocketAPI_NgxTableCreation 测试 ngx.socket API 注册时创建 ngx 表
|
||
func TestRegisterTCPSocketAPI_NgxTableCreation(t *testing.T) {
|
||
L := glua.NewState()
|
||
defer L.Close()
|
||
|
||
// 确保 ngx 表不存在
|
||
L.SetGlobal("ngx", glua.LNil)
|
||
|
||
engine, err := NewEngine(DefaultConfig())
|
||
require.NoError(t, err)
|
||
defer engine.Close()
|
||
|
||
RegisterTCPSocketAPI(L, engine)
|
||
|
||
// 验证 ngx.socket.tcp 存在
|
||
ngx := L.GetGlobal("ngx")
|
||
require.IsType(t, &glua.LTable{}, ngx)
|
||
ngxTbl := ngx.(*glua.LTable)
|
||
|
||
socket := ngxTbl.RawGetString("socket")
|
||
require.IsType(t, &glua.LTable{}, socket)
|
||
socketTbl := socket.(*glua.LTable)
|
||
|
||
tcp := socketTbl.RawGetString("tcp")
|
||
assert.IsType(t, &glua.LFunction{}, tcp)
|
||
}
|
||
|
||
// TestOperationType_String 测试操作类型字符串
|
||
func TestOperationType_String(t *testing.T) {
|
||
assert.Equal(t, "connect", string(OpConnect))
|
||
assert.Equal(t, "send", string(OpSend))
|
||
assert.Equal(t, "receive", string(OpReceive))
|
||
assert.Equal(t, "close", string(OpClose))
|
||
}
|
||
|
||
// TestSocketOperation_Complete 测试操作完成
|
||
func TestSocketOperation_Complete(t *testing.T) {
|
||
op := &SocketOperation{
|
||
ID: 1,
|
||
Done: make(chan struct{}),
|
||
}
|
||
|
||
// 完成操作
|
||
op.Complete("result", nil)
|
||
assert.True(t, op.IsCompleted())
|
||
assert.Equal(t, "result", op.Result)
|
||
assert.Nil(t, op.Error)
|
||
|
||
// 重复完成应该无影响
|
||
op.Complete("other", fmt.Errorf("err"))
|
||
assert.Equal(t, "result", op.Result) // 保持第一次的值
|
||
}
|
||
|
||
// TestSocketOperation_Wait_Timeout 测试 Wait 超时
|
||
func TestSocketOperation_Wait_Timeout(t *testing.T) {
|
||
op := &SocketOperation{
|
||
ID: 1,
|
||
Done: make(chan struct{}),
|
||
}
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
|
||
defer cancel()
|
||
|
||
result, err := op.Wait(ctx)
|
||
assert.Nil(t, result)
|
||
assert.Error(t, err) // context deadline exceeded
|
||
}
|
||
|
||
// TestSocketOperation_Touch 测试 Touch
|
||
func TestSocketOperation_Touch(t *testing.T) {
|
||
op := &SocketOperation{
|
||
ID: 1,
|
||
Done: make(chan struct{}),
|
||
LastActivity: time.Now().Add(-time.Hour),
|
||
}
|
||
|
||
oldTime := op.LastActivity
|
||
op.Touch()
|
||
assert.True(t, op.LastActivity.After(oldTime))
|
||
}
|