diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 17d2e33..2a91a39 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -515,7 +515,7 @@ func (s *Server) acceptLoop(addr string, listener net.Listener) { continue } - s.connCount++ + atomic.AddInt64(&s.connCount, 1) go s.handleConnection(conn, addr) } } @@ -534,7 +534,7 @@ func (s *Server) acceptLoop(addr string, listener net.Listener) { func (s *Server) handleConnection(clientConn net.Conn, _ string) { defer func() { _ = clientConn.Close() - s.connCount-- + atomic.AddInt64(&s.connCount, -1) }() s.mu.RLock() @@ -556,8 +556,8 @@ func (s *Server) handleConnection(clientConn net.Conn, _ string) { return // 无可用目标 } - target.conns++ - defer func() { target.conns-- }() + atomic.AddInt64(&target.conns, 1) + defer func() { atomic.AddInt64(&target.conns, -1) }() // 连接目标 targetConn, err := net.DialTimeout("tcp", target.addr, 10*time.Second) @@ -675,7 +675,7 @@ func (s *Server) Stats() Stats { defer s.mu.RUnlock() return Stats{ - Connections: s.connCount, + Connections: atomic.LoadInt64(&s.connCount), Listeners: len(s.listeners) + len(s.udpServers), Upstreams: len(s.upstreams), } @@ -701,6 +701,7 @@ type Stats struct { type udpSession struct { lastActive time.Time targetConn net.Conn + target *Target clientAddr *net.UDPAddr srv *udpServer mu sync.RWMutex @@ -838,12 +839,13 @@ func (s *udpServer) getOrCreateSession(clientAddr *net.UDPAddr) (*udpSession, er return nil, err } - target.conns++ + atomic.AddInt64(&target.conns, 1) // 创建新会话 session = &udpSession{ clientAddr: clientAddr, targetConn: targetConn, + target: target, lastActive: time.Now(), srv: s, } @@ -883,6 +885,9 @@ func (sess *udpSession) close() { if sess.targetConn != nil { _ = sess.targetConn.Close() } + if sess.target != nil { + atomic.AddInt64(&sess.target.conns, -1) + } }) }