fix(stream): use atomic operations for counters and fix UDP conns leak

- Server.connCount and Target.conns now use atomic.AddInt64/LoadInt64
  instead of non-atomic ++ and --, fixing data races under concurrency
- UDP sessions now store a reference to their target and decrement
  target.conns in close(), preventing monotonically increasing counts
  that would break least_conn load balancing over time
This commit is contained in:
xfy 2026-06-03 01:16:37 +08:00
parent eb404f98a2
commit 0359d4c477

View File

@ -515,7 +515,7 @@ func (s *Server) acceptLoop(addr string, listener net.Listener) {
continue continue
} }
s.connCount++ atomic.AddInt64(&s.connCount, 1)
go s.handleConnection(conn, addr) go s.handleConnection(conn, addr)
} }
} }
@ -534,7 +534,7 @@ func (s *Server) acceptLoop(addr string, listener net.Listener) {
func (s *Server) handleConnection(clientConn net.Conn, _ string) { func (s *Server) handleConnection(clientConn net.Conn, _ string) {
defer func() { defer func() {
_ = clientConn.Close() _ = clientConn.Close()
s.connCount-- atomic.AddInt64(&s.connCount, -1)
}() }()
s.mu.RLock() s.mu.RLock()
@ -556,8 +556,8 @@ func (s *Server) handleConnection(clientConn net.Conn, _ string) {
return // 无可用目标 return // 无可用目标
} }
target.conns++ atomic.AddInt64(&target.conns, 1)
defer func() { target.conns-- }() defer func() { atomic.AddInt64(&target.conns, -1) }()
// 连接目标 // 连接目标
targetConn, err := net.DialTimeout("tcp", target.addr, 10*time.Second) targetConn, err := net.DialTimeout("tcp", target.addr, 10*time.Second)
@ -675,7 +675,7 @@ func (s *Server) Stats() Stats {
defer s.mu.RUnlock() defer s.mu.RUnlock()
return Stats{ return Stats{
Connections: s.connCount, Connections: atomic.LoadInt64(&s.connCount),
Listeners: len(s.listeners) + len(s.udpServers), Listeners: len(s.listeners) + len(s.udpServers),
Upstreams: len(s.upstreams), Upstreams: len(s.upstreams),
} }
@ -701,6 +701,7 @@ type Stats struct {
type udpSession struct { type udpSession struct {
lastActive time.Time lastActive time.Time
targetConn net.Conn targetConn net.Conn
target *Target
clientAddr *net.UDPAddr clientAddr *net.UDPAddr
srv *udpServer srv *udpServer
mu sync.RWMutex mu sync.RWMutex
@ -838,12 +839,13 @@ func (s *udpServer) getOrCreateSession(clientAddr *net.UDPAddr) (*udpSession, er
return nil, err return nil, err
} }
target.conns++ atomic.AddInt64(&target.conns, 1)
// 创建新会话 // 创建新会话
session = &udpSession{ session = &udpSession{
clientAddr: clientAddr, clientAddr: clientAddr,
targetConn: targetConn, targetConn: targetConn,
target: target,
lastActive: time.Now(), lastActive: time.Now(),
srv: s, srv: s,
} }
@ -883,6 +885,9 @@ func (sess *udpSession) close() {
if sess.targetConn != nil { if sess.targetConn != nil {
_ = sess.targetConn.Close() _ = sess.targetConn.Close()
} }
if sess.target != nil {
atomic.AddInt64(&sess.target.conns, -1)
}
}) })
} }