Apply modern Go patterns across the codebase:
- Replace `interface{}` with `any` (Go 1.18+)
- Use `for range n` instead of `for i := 0; i < n; i++` (Go 1.22+)
- Replace `sort.Slice` with `slices.Sort` from slices package
- Simplify sync.WaitGroup patterns with errgroup where appropriate
- Add Makefile targets for modernize analyzer
Total: 84 files updated, net reduction of 79 lines
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
457 lines
11 KiB
Go
457 lines
11 KiB
Go
// Package lua 提供 Cosocket 管理功能。
|
||
//
|
||
// 该文件实现 TCP Cosocket 管理器,包括:
|
||
// - SocketState:Socket 生命周期状态机
|
||
// - SocketOperation:单个 Socket 操作的封装,支持异步等待
|
||
// - CosocketManager:操作生命周期管理、超时检测、统计追踪
|
||
//
|
||
// 特性:
|
||
// - 原子操作标记完成状态,避免竞态条件
|
||
// - 后台清理循环定期检测并取消超时操作
|
||
// - 统计信息使用 atomic 操作保证并发安全
|
||
//
|
||
// 注意事项:
|
||
// - 操作一旦标记完成(CompareAndSwap),不可重复完成
|
||
// - 管理器关闭时会取消所有未完成的操作
|
||
//
|
||
// 作者:xfy
|
||
package lua
|
||
|
||
import (
|
||
"context"
|
||
"net"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
// SocketState 表示 Socket 操作状态。
|
||
//
|
||
// 状态机流转:Idle -> Connecting -> Connected -> Sending/Receiving -> Closing -> Closed
|
||
type SocketState int
|
||
|
||
// Socket 生命周期状态常量
|
||
const (
|
||
// SocketStateIdle 空闲状态
|
||
SocketStateIdle SocketState = iota
|
||
// SocketStateConnecting 连接中
|
||
SocketStateConnecting
|
||
// SocketStateConnected 已连接
|
||
SocketStateConnected
|
||
// SocketStateSending 发送中
|
||
SocketStateSending
|
||
// SocketStateReceiving 接收中
|
||
SocketStateReceiving
|
||
// SocketStateClosing 关闭中
|
||
SocketStateClosing
|
||
// SocketStateClosed 已关闭
|
||
SocketStateClosed
|
||
// SocketStateError 错误状态
|
||
SocketStateError
|
||
)
|
||
|
||
// String 返回状态的字符串表示
|
||
func (s SocketState) String() string {
|
||
switch s {
|
||
case SocketStateIdle:
|
||
return "idle"
|
||
case SocketStateConnecting:
|
||
return "connecting"
|
||
case SocketStateConnected:
|
||
return "connected"
|
||
case SocketStateSending:
|
||
return "sending"
|
||
case SocketStateReceiving:
|
||
return "receiving"
|
||
case SocketStateClosing:
|
||
return "closing"
|
||
case SocketStateClosed:
|
||
return "closed"
|
||
case SocketStateError:
|
||
return "error"
|
||
default:
|
||
return "unknown"
|
||
}
|
||
}
|
||
|
||
// OperationType 操作类型
|
||
type OperationType string
|
||
|
||
// 操作类型常量
|
||
const (
|
||
// OpConnect 连接操作
|
||
OpConnect OperationType = "connect"
|
||
// OpSend 发送操作
|
||
OpSend OperationType = "send"
|
||
// OpReceive 接收操作
|
||
OpReceive OperationType = "receive"
|
||
// OpClose 关闭操作
|
||
OpClose OperationType = "close"
|
||
)
|
||
|
||
// SocketOperation 表示一个 Socket 操作。
|
||
//
|
||
// 封装单个异步操作的生命周期,包括创建、执行、完成和等待。
|
||
// 使用 atomic 操作标记完成状态,通过 Done channel 通知等待方。
|
||
type SocketOperation struct {
|
||
// ID 操作唯一标识
|
||
ID uint64
|
||
|
||
// Type 操作类型
|
||
Type OperationType
|
||
|
||
// State 当前 Socket 状态
|
||
State SocketState
|
||
|
||
// Socket 关联的 TCP Socket
|
||
Socket *TCPSocket
|
||
|
||
// Timeout 操作超时时间
|
||
Timeout time.Duration
|
||
|
||
// CreatedAt 操作创建时间
|
||
CreatedAt time.Time
|
||
|
||
// LastActivity 最后活动时间(用于超时检测)
|
||
LastActivity time.Time
|
||
|
||
// Result 操作结果
|
||
Result any
|
||
|
||
// Error 操作错误
|
||
Error error
|
||
|
||
// Done 完成信号 channel,操作完成时关闭
|
||
Done chan struct{}
|
||
|
||
// completed 原子标记,1=已完成,0=未完成
|
||
completed atomic.Int32
|
||
}
|
||
|
||
// IsCompleted 检查操作是否已完成。
|
||
//
|
||
// 返回值:
|
||
// - bool: true 表示已完成
|
||
func (op *SocketOperation) IsCompleted() bool {
|
||
return op.completed.Load() == 1
|
||
}
|
||
|
||
// Complete 标记操作完成。
|
||
//
|
||
// 使用 CompareAndSwap 确保只完成一次,完成后关闭 Done channel 通知等待方。
|
||
//
|
||
// 参数:
|
||
// - result: 操作结果
|
||
// - err: 操作错误(nil 表示成功)
|
||
func (op *SocketOperation) Complete(result any, err error) {
|
||
if op.completed.CompareAndSwap(0, 1) {
|
||
op.Result = result
|
||
op.Error = err
|
||
close(op.Done)
|
||
}
|
||
}
|
||
|
||
// Wait 等待操作完成。
|
||
//
|
||
// 阻塞直到操作完成或上下文取消。
|
||
//
|
||
// 参数:
|
||
// - ctx: 取消上下文
|
||
//
|
||
// 返回值:
|
||
// - interface{}: 操作结果
|
||
// - error: 操作错误或上下文取消错误
|
||
func (op *SocketOperation) Wait(ctx context.Context) (any, error) {
|
||
select {
|
||
case <-op.Done:
|
||
return op.Result, op.Error
|
||
case <-ctx.Done():
|
||
return nil, ctx.Err()
|
||
}
|
||
}
|
||
|
||
// Touch 更新活动时间(用于超时检测)
|
||
func (op *SocketOperation) Touch() {
|
||
op.LastActivity = time.Now()
|
||
}
|
||
|
||
// CosocketStats Cosocket 统计信息。
|
||
//
|
||
// 包含操作和 Socket 的创建、活跃、超时、错误等统计。
|
||
type CosocketStats struct {
|
||
// TotalOperations 总操作数
|
||
TotalOperations uint64
|
||
|
||
// ActiveOperations 当前活跃操作数
|
||
ActiveOperations uint64
|
||
|
||
// TimeoutOperations 超时操作数
|
||
TimeoutOperations uint64
|
||
|
||
// ErrorOperations 错误操作数
|
||
ErrorOperations uint64
|
||
|
||
// ActiveSockets 当前活跃 Socket 数
|
||
ActiveSockets uint64
|
||
|
||
// TotalSocketsCreated 累计创建的 Socket 总数
|
||
TotalSocketsCreated uint64
|
||
|
||
// TotalSocketsClosed 累计关闭的 Socket 总数
|
||
TotalSocketsClosed uint64
|
||
}
|
||
|
||
// CosocketManager Cosocket 管理器。
|
||
//
|
||
// 负责管理 Socket 操作的生命周期,包括:
|
||
// - 创建和跟踪异步操作
|
||
// - 检测并清理超时操作
|
||
// - 统计操作和 Socket 的使用情况
|
||
type CosocketManager struct {
|
||
// ctx 上下文(用于控制清理循环)
|
||
ctx context.Context
|
||
|
||
// cancel 取消函数
|
||
cancel context.CancelFunc
|
||
|
||
// operations 进行中的操作映射(ID -> 操作)
|
||
operations map[uint64]*SocketOperation
|
||
|
||
// mu 读写锁
|
||
mu sync.RWMutex
|
||
|
||
// nextID 下一个操作 ID
|
||
nextID uint64
|
||
|
||
// defaultTimeout 默认超时时间
|
||
defaultTimeout time.Duration
|
||
|
||
// timeoutChecker 超时检查定时器
|
||
timeoutChecker *time.Ticker
|
||
|
||
// cleanupInterval 清理间隔
|
||
cleanupInterval time.Duration
|
||
|
||
// stats 统计信息
|
||
stats CosocketStats
|
||
|
||
// DisableSSRFGuard 禁用 SSRF 防护(仅用于测试)
|
||
DisableSSRFGuard bool
|
||
}
|
||
|
||
// DefaultCosocketManager 全局默认 Cosocket 管理器
|
||
var DefaultCosocketManager = NewCosocketManager()
|
||
|
||
// testingSSRFGuardDisabled 测试模式下启用,允许本地回环连接。
|
||
// 由 *_test.go 中的 init() 函数设置。
|
||
var testingSSRFGuardDisabled bool
|
||
|
||
// NewCosocketManager 创建新的 Cosocket 管理器。
|
||
//
|
||
// 启动后台清理循环,每 30 秒检查一次超时操作。
|
||
//
|
||
// 返回值:
|
||
// - *CosocketManager: 初始化的管理器实例
|
||
func NewCosocketManager() *CosocketManager {
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
cm := &CosocketManager{
|
||
operations: make(map[uint64]*SocketOperation),
|
||
nextID: 0,
|
||
timeoutChecker: time.NewTicker(30 * time.Second),
|
||
ctx: ctx,
|
||
cancel: cancel,
|
||
defaultTimeout: 60 * time.Second,
|
||
cleanupInterval: 30 * time.Second,
|
||
DisableSSRFGuard: testingSSRFGuardDisabled,
|
||
}
|
||
|
||
// 启动清理循环
|
||
go cm.cleanupLoop()
|
||
|
||
return cm
|
||
}
|
||
|
||
// StartOperation 开始一个新的 Socket 操作。
|
||
//
|
||
// 创建操作实例,分配唯一 ID,注册到管理器中。
|
||
//
|
||
// 参数:
|
||
// - socket: 关联的 TCP Socket
|
||
// - opType: 操作类型
|
||
// - timeout: 超时时间,零值时使用默认超时
|
||
//
|
||
// 返回值:
|
||
// - *SocketOperation: 新创建的操作实例
|
||
func (cm *CosocketManager) StartOperation(socket *TCPSocket, opType OperationType, timeout time.Duration) *SocketOperation {
|
||
if timeout <= 0 {
|
||
timeout = cm.defaultTimeout
|
||
}
|
||
|
||
id := atomic.AddUint64(&cm.nextID, 1)
|
||
now := time.Now()
|
||
|
||
op := &SocketOperation{
|
||
ID: id,
|
||
Socket: socket,
|
||
Type: opType,
|
||
State: SocketStateIdle,
|
||
CreatedAt: now,
|
||
LastActivity: now,
|
||
Timeout: timeout,
|
||
Done: make(chan struct{}),
|
||
}
|
||
|
||
cm.mu.Lock()
|
||
cm.operations[id] = op
|
||
cm.mu.Unlock()
|
||
|
||
atomic.AddUint64(&cm.stats.TotalOperations, 1)
|
||
atomic.AddUint64(&cm.stats.ActiveOperations, 1)
|
||
|
||
return op
|
||
}
|
||
|
||
// CompleteOperation 完成指定 ID 的操作。
|
||
//
|
||
// 从管理器中移除操作,标记完成,更新统计。
|
||
//
|
||
// 参数:
|
||
// - id: 操作 ID
|
||
// - result: 操作结果
|
||
// - err: 操作错误
|
||
func (cm *CosocketManager) CompleteOperation(id uint64, result any, err error) {
|
||
cm.mu.Lock()
|
||
op, exists := cm.operations[id]
|
||
if exists {
|
||
delete(cm.operations, id)
|
||
}
|
||
cm.mu.Unlock()
|
||
|
||
if exists && op != nil {
|
||
op.Complete(result, err)
|
||
atomic.AddUint64(&cm.stats.ActiveOperations, ^uint64(0))
|
||
if err != nil {
|
||
atomic.AddUint64(&cm.stats.ErrorOperations, 1)
|
||
}
|
||
}
|
||
}
|
||
|
||
// GetOperation 获取指定 ID 的操作。
|
||
//
|
||
// 参数:
|
||
// - id: 操作 ID
|
||
//
|
||
// 返回值:
|
||
// - *SocketOperation: 操作实例,不存在时返回 nil
|
||
func (cm *CosocketManager) GetOperation(id uint64) *SocketOperation {
|
||
cm.mu.RLock()
|
||
defer cm.mu.RUnlock()
|
||
return cm.operations[id]
|
||
}
|
||
|
||
// cleanupLoop 清理循环,定期检测超时操作。
|
||
func (cm *CosocketManager) cleanupLoop() {
|
||
for {
|
||
select {
|
||
case <-cm.ctx.Done():
|
||
return
|
||
case <-cm.timeoutChecker.C:
|
||
cm.cleanup()
|
||
}
|
||
}
|
||
}
|
||
|
||
// cleanup 清理超时操作。
|
||
//
|
||
// 扫描所有未完成的操作,标记超过 LastActivity + Timeout 的操作为超时。
|
||
func (cm *CosocketManager) cleanup() {
|
||
now := time.Now()
|
||
timeoutOps := make([]*SocketOperation, 0)
|
||
|
||
cm.mu.RLock()
|
||
for _, op := range cm.operations {
|
||
if !op.IsCompleted() && now.Sub(op.LastActivity) > op.Timeout {
|
||
timeoutOps = append(timeoutOps, op)
|
||
}
|
||
}
|
||
cm.mu.RUnlock()
|
||
|
||
for _, op := range timeoutOps {
|
||
cm.CompleteOperation(op.ID, nil, context.DeadlineExceeded)
|
||
atomic.AddUint64(&cm.stats.TimeoutOperations, 1)
|
||
}
|
||
}
|
||
|
||
// Stats 获取 Cosocket 统计信息。
|
||
//
|
||
// 返回值:
|
||
// - CosocketStats: 当前统计快照
|
||
func (cm *CosocketManager) Stats() CosocketStats {
|
||
return CosocketStats{
|
||
TotalOperations: atomic.LoadUint64(&cm.stats.TotalOperations),
|
||
ActiveOperations: atomic.LoadUint64(&cm.stats.ActiveOperations),
|
||
TimeoutOperations: atomic.LoadUint64(&cm.stats.TimeoutOperations),
|
||
ErrorOperations: atomic.LoadUint64(&cm.stats.ErrorOperations),
|
||
ActiveSockets: atomic.LoadUint64(&cm.stats.ActiveSockets),
|
||
TotalSocketsCreated: atomic.LoadUint64(&cm.stats.TotalSocketsCreated),
|
||
TotalSocketsClosed: atomic.LoadUint64(&cm.stats.TotalSocketsClosed),
|
||
}
|
||
}
|
||
|
||
// SetDefaultTimeout 设置默认超时时间。
|
||
//
|
||
// 参数:
|
||
// - timeout: 新的默认超时
|
||
func (cm *CosocketManager) SetDefaultTimeout(timeout time.Duration) {
|
||
cm.defaultTimeout = timeout
|
||
}
|
||
|
||
// Close 关闭 Cosocket 管理器。
|
||
//
|
||
// 停止清理循环,取消所有未完成的操作。
|
||
func (cm *CosocketManager) Close() {
|
||
cm.cancel()
|
||
cm.timeoutChecker.Stop()
|
||
|
||
// 取消所有未完成操作
|
||
cm.mu.Lock()
|
||
ops := make([]*SocketOperation, 0, len(cm.operations))
|
||
for _, op := range cm.operations {
|
||
ops = append(ops, op)
|
||
}
|
||
cm.operations = make(map[uint64]*SocketOperation)
|
||
cm.mu.Unlock()
|
||
|
||
for _, op := range ops {
|
||
op.Complete(nil, context.Canceled)
|
||
}
|
||
}
|
||
|
||
// TrackSocketCreated 跟踪 Socket 创建(更新统计)。
|
||
func (cm *CosocketManager) TrackSocketCreated() {
|
||
atomic.AddUint64(&cm.stats.TotalSocketsCreated, 1)
|
||
atomic.AddUint64(&cm.stats.ActiveSockets, 1)
|
||
}
|
||
|
||
// TrackSocketClosed 跟踪 Socket 关闭(更新统计)。
|
||
func (cm *CosocketManager) TrackSocketClosed() {
|
||
atomic.AddUint64(&cm.stats.TotalSocketsClosed, 1)
|
||
atomic.AddUint64(&cm.stats.ActiveSockets, ^uint64(0))
|
||
}
|
||
|
||
// TCPAddr 解析 TCP 地址。
|
||
//
|
||
// 参数:
|
||
// - host: 主机地址
|
||
// - port: 端口号
|
||
//
|
||
// 返回值:
|
||
// - *net.TCPAddr: 解析后的 TCP 地址
|
||
// - error: 解析失败时返回错误
|
||
func (cm *CosocketManager) TCPAddr(host string, port int) (*net.TCPAddr, error) {
|
||
return &net.TCPAddr{
|
||
IP: net.ParseIP(host),
|
||
Port: port,
|
||
}, nil
|
||
}
|