lolly/internal/lua/socket_manager.go
xfy 8b382606df Merge branch 'lint-fix' - resolve sendfile.go conflict
Conflict: sendfile.go (!linux build tag) was incorrectly modified to
include linuxSendfile and getSocketFd functions which already exist
in sendfile_linux.go.

Resolution: Keep HEAD version (simple fallback returning ENOTSUP) as
Linux implementation is properly separated in sendfile_linux.go.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-13 09:26:48 +08:00

316 lines
7.2 KiB
Go

// Package lua 提供 Cosocket 管理功能
package lua
import (
"context"
"net"
"sync"
"sync/atomic"
"time"
)
// SocketState 表示 socket 操作状态
type SocketState int
const (
// SocketStateIdle 空闲状态
SocketStateIdle SocketState = iota
// SocketStateConnecting 连接中
SocketStateConnecting
// SocketStateConnected 已连接
SocketStateConnected
// SocketStateSending 发送中
SocketStateSending
// SocketStateReceiving 接收中
SocketStateReceiving
// SocketStateClosing 关闭中
SocketStateClosing
// SocketStateClosed 已关闭
SocketStateClosed
// SocketStateError 错误状态
SocketStateError
)
func (s SocketState) String() string {
switch s {
case SocketStateIdle:
return "idle"
case SocketStateConnecting:
return "connecting"
case SocketStateConnected:
return "connected"
case SocketStateSending:
return "sending"
case SocketStateReceiving:
return "receiving"
case SocketStateClosing:
return "closing"
case SocketStateClosed:
return "closed"
case SocketStateError:
return "error"
default:
return "unknown"
}
}
// OperationType 操作类型
type OperationType string
const (
// OpConnect 连接操作
OpConnect OperationType = "connect"
// OpSend 发送操作
OpSend OperationType = "send"
// OpReceive 接收操作
OpReceive OperationType = "receive"
// OpClose 关闭操作
OpClose OperationType = "close"
)
// SocketOperation 表示一个 socket 操作
type SocketOperation struct {
CreatedAt time.Time
LastActivity time.Time
Error error
Result interface{}
Socket *TCPSocket
Done chan struct{}
Type OperationType
ID uint64
State SocketState
Timeout time.Duration
completed int32
}
// IsCompleted 检查操作是否已完成
func (op *SocketOperation) IsCompleted() bool {
return atomic.LoadInt32(&op.completed) == 1
}
// Complete 标记操作完成
func (op *SocketOperation) Complete(result interface{}, err error) {
if atomic.CompareAndSwapInt32(&op.completed, 0, 1) {
op.Result = result
op.Error = err
close(op.Done)
}
}
// Wait 等待操作完成
func (op *SocketOperation) Wait(ctx context.Context) (interface{}, error) {
select {
case <-op.Done:
return op.Result, op.Error
case <-ctx.Done():
return nil, ctx.Err()
}
}
// Touch 更新活动时间
func (op *SocketOperation) Touch() {
op.LastActivity = time.Now()
}
// CosocketStats Cosocket 统计信息
type CosocketStats struct {
// 总操作数
TotalOperations uint64
// 活跃操作数
ActiveOperations uint64
// 超时操作数
TimeoutOperations uint64
// 错误操作数
ErrorOperations uint64
// 当前 socket 数
ActiveSockets uint64
// 总创建 socket 数
TotalSocketsCreated uint64
// 总关闭 socket 数
TotalSocketsClosed uint64
}
// CosocketManager Cosocket 管理器
type CosocketManager struct {
ctx context.Context
operations map[uint64]*SocketOperation
timeoutChecker *time.Ticker
cancel context.CancelFunc
stats CosocketStats
nextID uint64
defaultTimeout time.Duration
cleanupInterval time.Duration
mu sync.RWMutex
}
// DefaultCosocketManager 全局默认管理器
var DefaultCosocketManager = NewCosocketManager()
// NewCosocketManager 创建新的 Cosocket 管理器
func NewCosocketManager() *CosocketManager {
ctx, cancel := context.WithCancel(context.Background())
cm := &CosocketManager{
operations: make(map[uint64]*SocketOperation),
nextID: 0,
timeoutChecker: time.NewTicker(30 * time.Second),
ctx: ctx,
cancel: cancel,
defaultTimeout: 60 * time.Second,
cleanupInterval: 30 * time.Second,
}
// 启动清理循环
go cm.cleanupLoop()
return cm
}
// StartOperation 开始一个新的 socket 操作
func (cm *CosocketManager) StartOperation(socket *TCPSocket, opType OperationType, timeout time.Duration) *SocketOperation {
if timeout <= 0 {
timeout = cm.defaultTimeout
}
id := atomic.AddUint64(&cm.nextID, 1)
now := time.Now()
op := &SocketOperation{
ID: id,
Socket: socket,
Type: opType,
State: SocketStateIdle,
CreatedAt: now,
LastActivity: now,
Timeout: timeout,
Done: make(chan struct{}),
}
cm.mu.Lock()
cm.operations[id] = op
cm.mu.Unlock()
atomic.AddUint64(&cm.stats.TotalOperations, 1)
atomic.AddUint64(&cm.stats.ActiveOperations, 1)
return op
}
// CompleteOperation 完成操作
func (cm *CosocketManager) CompleteOperation(id uint64, result interface{}, err error) {
cm.mu.Lock()
op, exists := cm.operations[id]
if exists {
delete(cm.operations, id)
}
cm.mu.Unlock()
if exists && op != nil {
op.Complete(result, err)
atomic.AddUint64(&cm.stats.ActiveOperations, ^uint64(0))
if err != nil {
atomic.AddUint64(&cm.stats.ErrorOperations, 1)
}
}
}
// GetOperation 获取操作
func (cm *CosocketManager) GetOperation(id uint64) *SocketOperation {
cm.mu.RLock()
defer cm.mu.RUnlock()
return cm.operations[id]
}
// cleanupLoop 清理循环
func (cm *CosocketManager) cleanupLoop() {
for {
select {
case <-cm.ctx.Done():
return
case <-cm.timeoutChecker.C:
cm.cleanup()
}
}
}
// cleanup 清理超时操作
func (cm *CosocketManager) cleanup() {
now := time.Now()
timeoutOps := make([]*SocketOperation, 0)
cm.mu.RLock()
for _, op := range cm.operations {
if !op.IsCompleted() && now.Sub(op.LastActivity) > op.Timeout {
timeoutOps = append(timeoutOps, op)
}
}
cm.mu.RUnlock()
for _, op := range timeoutOps {
cm.CompleteOperation(op.ID, nil, context.DeadlineExceeded)
atomic.AddUint64(&cm.stats.TimeoutOperations, 1)
}
}
// Stats 获取统计信息
func (cm *CosocketManager) Stats() CosocketStats {
return CosocketStats{
TotalOperations: atomic.LoadUint64(&cm.stats.TotalOperations),
ActiveOperations: atomic.LoadUint64(&cm.stats.ActiveOperations),
TimeoutOperations: atomic.LoadUint64(&cm.stats.TimeoutOperations),
ErrorOperations: atomic.LoadUint64(&cm.stats.ErrorOperations),
ActiveSockets: atomic.LoadUint64(&cm.stats.ActiveSockets),
TotalSocketsCreated: atomic.LoadUint64(&cm.stats.TotalSocketsCreated),
TotalSocketsClosed: atomic.LoadUint64(&cm.stats.TotalSocketsClosed),
}
}
// SetDefaultTimeout 设置默认超时
func (cm *CosocketManager) SetDefaultTimeout(timeout time.Duration) {
cm.defaultTimeout = timeout
}
// Close 关闭管理器
func (cm *CosocketManager) Close() {
cm.cancel()
cm.timeoutChecker.Stop()
// 取消所有未完成操作
cm.mu.Lock()
ops := make([]*SocketOperation, 0, len(cm.operations))
for _, op := range cm.operations {
ops = append(ops, op)
}
cm.operations = make(map[uint64]*SocketOperation)
cm.mu.Unlock()
for _, op := range ops {
op.Complete(nil, context.Canceled)
}
}
// TrackSocketCreated 跟踪 socket 创建
func (cm *CosocketManager) TrackSocketCreated() {
atomic.AddUint64(&cm.stats.TotalSocketsCreated, 1)
atomic.AddUint64(&cm.stats.ActiveSockets, 1)
}
// TrackSocketClosed 跟踪 socket 关闭
func (cm *CosocketManager) TrackSocketClosed() {
atomic.AddUint64(&cm.stats.TotalSocketsClosed, 1)
atomic.AddUint64(&cm.stats.ActiveSockets, ^uint64(0))
}
// TCPAddr 解析 TCP 地址
func (cm *CosocketManager) TCPAddr(host string, port int) (*net.TCPAddr, error) {
return &net.TCPAddr{
IP: net.ParseIP(host),
Port: port,
}, nil
}