lolly/internal/lua/socket_manager.go
xfy f145a8770e refactor: modernize code with Go 1.22+ features
Apply modern Go patterns across the codebase:
- Replace `interface{}` with `any` (Go 1.18+)
- Use `for range n` instead of `for i := 0; i < n; i++` (Go 1.22+)
- Replace `sort.Slice` with `slices.Sort` from slices package
- Simplify sync.WaitGroup patterns with errgroup where appropriate
- Add Makefile targets for modernize analyzer

Total: 84 files updated, net reduction of 79 lines

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-30 10:37:45 +08:00

457 lines
11 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package lua 提供 Cosocket 管理功能。
//
// 该文件实现 TCP Cosocket 管理器,包括:
// - SocketStateSocket 生命周期状态机
// - SocketOperation单个 Socket 操作的封装,支持异步等待
// - CosocketManager操作生命周期管理、超时检测、统计追踪
//
// 特性:
// - 原子操作标记完成状态,避免竞态条件
// - 后台清理循环定期检测并取消超时操作
// - 统计信息使用 atomic 操作保证并发安全
//
// 注意事项:
// - 操作一旦标记完成CompareAndSwap不可重复完成
// - 管理器关闭时会取消所有未完成的操作
//
// 作者xfy
package lua
import (
"context"
"net"
"sync"
"sync/atomic"
"time"
)
// SocketState 表示 Socket 操作状态。
//
// 状态机流转Idle -> Connecting -> Connected -> Sending/Receiving -> Closing -> Closed
type SocketState int
// Socket 生命周期状态常量
const (
// SocketStateIdle 空闲状态
SocketStateIdle SocketState = iota
// SocketStateConnecting 连接中
SocketStateConnecting
// SocketStateConnected 已连接
SocketStateConnected
// SocketStateSending 发送中
SocketStateSending
// SocketStateReceiving 接收中
SocketStateReceiving
// SocketStateClosing 关闭中
SocketStateClosing
// SocketStateClosed 已关闭
SocketStateClosed
// SocketStateError 错误状态
SocketStateError
)
// String 返回状态的字符串表示
func (s SocketState) String() string {
switch s {
case SocketStateIdle:
return "idle"
case SocketStateConnecting:
return "connecting"
case SocketStateConnected:
return "connected"
case SocketStateSending:
return "sending"
case SocketStateReceiving:
return "receiving"
case SocketStateClosing:
return "closing"
case SocketStateClosed:
return "closed"
case SocketStateError:
return "error"
default:
return "unknown"
}
}
// OperationType 操作类型
type OperationType string
// 操作类型常量
const (
// OpConnect 连接操作
OpConnect OperationType = "connect"
// OpSend 发送操作
OpSend OperationType = "send"
// OpReceive 接收操作
OpReceive OperationType = "receive"
// OpClose 关闭操作
OpClose OperationType = "close"
)
// SocketOperation 表示一个 Socket 操作。
//
// 封装单个异步操作的生命周期,包括创建、执行、完成和等待。
// 使用 atomic 操作标记完成状态,通过 Done channel 通知等待方。
type SocketOperation struct {
// ID 操作唯一标识
ID uint64
// Type 操作类型
Type OperationType
// State 当前 Socket 状态
State SocketState
// Socket 关联的 TCP Socket
Socket *TCPSocket
// Timeout 操作超时时间
Timeout time.Duration
// CreatedAt 操作创建时间
CreatedAt time.Time
// LastActivity 最后活动时间(用于超时检测)
LastActivity time.Time
// Result 操作结果
Result any
// Error 操作错误
Error error
// Done 完成信号 channel操作完成时关闭
Done chan struct{}
// completed 原子标记1=已完成0=未完成
completed atomic.Int32
}
// IsCompleted 检查操作是否已完成。
//
// 返回值:
// - bool: true 表示已完成
func (op *SocketOperation) IsCompleted() bool {
return op.completed.Load() == 1
}
// Complete 标记操作完成。
//
// 使用 CompareAndSwap 确保只完成一次,完成后关闭 Done channel 通知等待方。
//
// 参数:
// - result: 操作结果
// - err: 操作错误nil 表示成功)
func (op *SocketOperation) Complete(result any, err error) {
if op.completed.CompareAndSwap(0, 1) {
op.Result = result
op.Error = err
close(op.Done)
}
}
// Wait 等待操作完成。
//
// 阻塞直到操作完成或上下文取消。
//
// 参数:
// - ctx: 取消上下文
//
// 返回值:
// - interface{}: 操作结果
// - error: 操作错误或上下文取消错误
func (op *SocketOperation) Wait(ctx context.Context) (any, error) {
select {
case <-op.Done:
return op.Result, op.Error
case <-ctx.Done():
return nil, ctx.Err()
}
}
// Touch 更新活动时间(用于超时检测)
func (op *SocketOperation) Touch() {
op.LastActivity = time.Now()
}
// CosocketStats Cosocket 统计信息。
//
// 包含操作和 Socket 的创建、活跃、超时、错误等统计。
type CosocketStats struct {
// TotalOperations 总操作数
TotalOperations uint64
// ActiveOperations 当前活跃操作数
ActiveOperations uint64
// TimeoutOperations 超时操作数
TimeoutOperations uint64
// ErrorOperations 错误操作数
ErrorOperations uint64
// ActiveSockets 当前活跃 Socket 数
ActiveSockets uint64
// TotalSocketsCreated 累计创建的 Socket 总数
TotalSocketsCreated uint64
// TotalSocketsClosed 累计关闭的 Socket 总数
TotalSocketsClosed uint64
}
// CosocketManager Cosocket 管理器。
//
// 负责管理 Socket 操作的生命周期,包括:
// - 创建和跟踪异步操作
// - 检测并清理超时操作
// - 统计操作和 Socket 的使用情况
type CosocketManager struct {
// ctx 上下文(用于控制清理循环)
ctx context.Context
// cancel 取消函数
cancel context.CancelFunc
// operations 进行中的操作映射ID -> 操作)
operations map[uint64]*SocketOperation
// mu 读写锁
mu sync.RWMutex
// nextID 下一个操作 ID
nextID uint64
// defaultTimeout 默认超时时间
defaultTimeout time.Duration
// timeoutChecker 超时检查定时器
timeoutChecker *time.Ticker
// cleanupInterval 清理间隔
cleanupInterval time.Duration
// stats 统计信息
stats CosocketStats
// DisableSSRFGuard 禁用 SSRF 防护(仅用于测试)
DisableSSRFGuard bool
}
// DefaultCosocketManager 全局默认 Cosocket 管理器
var DefaultCosocketManager = NewCosocketManager()
// testingSSRFGuardDisabled 测试模式下启用,允许本地回环连接。
// 由 *_test.go 中的 init() 函数设置。
var testingSSRFGuardDisabled bool
// NewCosocketManager 创建新的 Cosocket 管理器。
//
// 启动后台清理循环,每 30 秒检查一次超时操作。
//
// 返回值:
// - *CosocketManager: 初始化的管理器实例
func NewCosocketManager() *CosocketManager {
ctx, cancel := context.WithCancel(context.Background())
cm := &CosocketManager{
operations: make(map[uint64]*SocketOperation),
nextID: 0,
timeoutChecker: time.NewTicker(30 * time.Second),
ctx: ctx,
cancel: cancel,
defaultTimeout: 60 * time.Second,
cleanupInterval: 30 * time.Second,
DisableSSRFGuard: testingSSRFGuardDisabled,
}
// 启动清理循环
go cm.cleanupLoop()
return cm
}
// StartOperation 开始一个新的 Socket 操作。
//
// 创建操作实例,分配唯一 ID注册到管理器中。
//
// 参数:
// - socket: 关联的 TCP Socket
// - opType: 操作类型
// - timeout: 超时时间,零值时使用默认超时
//
// 返回值:
// - *SocketOperation: 新创建的操作实例
func (cm *CosocketManager) StartOperation(socket *TCPSocket, opType OperationType, timeout time.Duration) *SocketOperation {
if timeout <= 0 {
timeout = cm.defaultTimeout
}
id := atomic.AddUint64(&cm.nextID, 1)
now := time.Now()
op := &SocketOperation{
ID: id,
Socket: socket,
Type: opType,
State: SocketStateIdle,
CreatedAt: now,
LastActivity: now,
Timeout: timeout,
Done: make(chan struct{}),
}
cm.mu.Lock()
cm.operations[id] = op
cm.mu.Unlock()
atomic.AddUint64(&cm.stats.TotalOperations, 1)
atomic.AddUint64(&cm.stats.ActiveOperations, 1)
return op
}
// CompleteOperation 完成指定 ID 的操作。
//
// 从管理器中移除操作,标记完成,更新统计。
//
// 参数:
// - id: 操作 ID
// - result: 操作结果
// - err: 操作错误
func (cm *CosocketManager) CompleteOperation(id uint64, result any, err error) {
cm.mu.Lock()
op, exists := cm.operations[id]
if exists {
delete(cm.operations, id)
}
cm.mu.Unlock()
if exists && op != nil {
op.Complete(result, err)
atomic.AddUint64(&cm.stats.ActiveOperations, ^uint64(0))
if err != nil {
atomic.AddUint64(&cm.stats.ErrorOperations, 1)
}
}
}
// GetOperation 获取指定 ID 的操作。
//
// 参数:
// - id: 操作 ID
//
// 返回值:
// - *SocketOperation: 操作实例,不存在时返回 nil
func (cm *CosocketManager) GetOperation(id uint64) *SocketOperation {
cm.mu.RLock()
defer cm.mu.RUnlock()
return cm.operations[id]
}
// cleanupLoop 清理循环,定期检测超时操作。
func (cm *CosocketManager) cleanupLoop() {
for {
select {
case <-cm.ctx.Done():
return
case <-cm.timeoutChecker.C:
cm.cleanup()
}
}
}
// cleanup 清理超时操作。
//
// 扫描所有未完成的操作,标记超过 LastActivity + Timeout 的操作为超时。
func (cm *CosocketManager) cleanup() {
now := time.Now()
timeoutOps := make([]*SocketOperation, 0)
cm.mu.RLock()
for _, op := range cm.operations {
if !op.IsCompleted() && now.Sub(op.LastActivity) > op.Timeout {
timeoutOps = append(timeoutOps, op)
}
}
cm.mu.RUnlock()
for _, op := range timeoutOps {
cm.CompleteOperation(op.ID, nil, context.DeadlineExceeded)
atomic.AddUint64(&cm.stats.TimeoutOperations, 1)
}
}
// Stats 获取 Cosocket 统计信息。
//
// 返回值:
// - CosocketStats: 当前统计快照
func (cm *CosocketManager) Stats() CosocketStats {
return CosocketStats{
TotalOperations: atomic.LoadUint64(&cm.stats.TotalOperations),
ActiveOperations: atomic.LoadUint64(&cm.stats.ActiveOperations),
TimeoutOperations: atomic.LoadUint64(&cm.stats.TimeoutOperations),
ErrorOperations: atomic.LoadUint64(&cm.stats.ErrorOperations),
ActiveSockets: atomic.LoadUint64(&cm.stats.ActiveSockets),
TotalSocketsCreated: atomic.LoadUint64(&cm.stats.TotalSocketsCreated),
TotalSocketsClosed: atomic.LoadUint64(&cm.stats.TotalSocketsClosed),
}
}
// SetDefaultTimeout 设置默认超时时间。
//
// 参数:
// - timeout: 新的默认超时
func (cm *CosocketManager) SetDefaultTimeout(timeout time.Duration) {
cm.defaultTimeout = timeout
}
// Close 关闭 Cosocket 管理器。
//
// 停止清理循环,取消所有未完成的操作。
func (cm *CosocketManager) Close() {
cm.cancel()
cm.timeoutChecker.Stop()
// 取消所有未完成操作
cm.mu.Lock()
ops := make([]*SocketOperation, 0, len(cm.operations))
for _, op := range cm.operations {
ops = append(ops, op)
}
cm.operations = make(map[uint64]*SocketOperation)
cm.mu.Unlock()
for _, op := range ops {
op.Complete(nil, context.Canceled)
}
}
// TrackSocketCreated 跟踪 Socket 创建(更新统计)。
func (cm *CosocketManager) TrackSocketCreated() {
atomic.AddUint64(&cm.stats.TotalSocketsCreated, 1)
atomic.AddUint64(&cm.stats.ActiveSockets, 1)
}
// TrackSocketClosed 跟踪 Socket 关闭(更新统计)。
func (cm *CosocketManager) TrackSocketClosed() {
atomic.AddUint64(&cm.stats.TotalSocketsClosed, 1)
atomic.AddUint64(&cm.stats.ActiveSockets, ^uint64(0))
}
// TCPAddr 解析 TCP 地址。
//
// 参数:
// - host: 主机地址
// - port: 端口号
//
// 返回值:
// - *net.TCPAddr: 解析后的 TCP 地址
// - error: 解析失败时返回错误
func (cm *CosocketManager) TCPAddr(host string, port int) (*net.TCPAddr, error) {
return &net.TCPAddr{
IP: net.ParseIP(host),
Port: port,
}, nil
}