perf: 零分配优化与 Dial timeout 支持

- 添加 b2s/s2b 零分配字节-字符串转换工具函数
- WebSocket 数据转发使用 sync.Pool 复用 32KB buffer
- 条件化 Debug 日志避免非 Debug 级别的字符串分配
- 缓存键哈希计算直接写入 []byte 避免 string 转换
- 使用 bytes.EqualFold 替代 strings.ToLower 进行大小写不敏感比较
- generateETag 使用 strconv.AppendInt 避免 fmt.Sprintf
- 支持 Dial timeout 配置,区分 TCP 连接建立和总连接超时
- MaxConnsPerHost 默认值改为 512(fasthttp 推荐)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
xfy 2026-04-28 20:11:20 +08:00
parent cf2fcca7e8
commit 11e22c80b8
14 changed files with 164 additions and 74 deletions

View File

@ -35,6 +35,8 @@ ulw 完善性能基准测试
ulw 深度分析下代码质量 ulw 深度分析下代码质量
ulw 深度分析下代码架构
ulw 分析下 lib/fasthttp/ 的源码,然后看下 lolly 的用法合不合理,有没有性能可以提升的地方 ulw 分析下 lib/fasthttp/ 的源码,然后看下 lolly 的用法合不合理,有没有性能可以提升的地方
## 兼容性 ## 兼容性

View File

@ -9,9 +9,9 @@
// //
// 关键设计决策: // 关键设计决策:
// //
// 1. bufferPool 使用 singleton 模式ctxPool 保持独立 // 1. bufferPool 使用 singleton 模式ctxPool 保持独立
// 2. CommonAdapter 不包含 ConvertResponseHTTP/2/HTTP/3 行为不同) // 2. CommonAdapter 不包含 ConvertResponseHTTP/2/HTTP/3 行为不同)
// 3. 阈值常量统一,避免 HTTP/2 inline 和 HTTP/3 constant 不一致 // 3. 阈值常量统一,避免 HTTP/2 inline 和 HTTP/3 constant 不一致
// //
// 作者xfy // 作者xfy
package adapter package adapter

View File

@ -266,4 +266,3 @@ func MatchPattern(pattern, path string) bool {
func matchPattern(pattern, path string) bool { func matchPattern(pattern, path string) bool {
return MatchPattern(pattern, path) return MatchPattern(pattern, path)
} }

View File

@ -182,7 +182,7 @@ func DefaultConfig() *Config {
}, },
Transport: TransportConfig{ Transport: TransportConfig{
IdleConnTimeout: 90 * time.Second, IdleConnTimeout: 90 * time.Second,
MaxConnsPerHost: 0, // 0 表示不限制 MaxConnsPerHost: 512, // fasthttp 推荐值
}, },
}, },
Monitoring: MonitoringConfig{ Monitoring: MonitoringConfig{

View File

@ -171,8 +171,8 @@ func TestDefaultConfigPerformance(t *testing.T) {
if cfg.Performance.Transport.IdleConnTimeout != 90*time.Second { if cfg.Performance.Transport.IdleConnTimeout != 90*time.Second {
t.Errorf("Transport.IdleConnTimeout 期望 90s, 实际 %v", cfg.Performance.Transport.IdleConnTimeout) t.Errorf("Transport.IdleConnTimeout 期望 90s, 实际 %v", cfg.Performance.Transport.IdleConnTimeout)
} }
if cfg.Performance.Transport.MaxConnsPerHost != 0 { if cfg.Performance.Transport.MaxConnsPerHost != 512 {
t.Errorf("Transport.MaxConnsPerHost 期望 0 (不限制), 实际 %d", cfg.Performance.Transport.MaxConnsPerHost) t.Errorf("Transport.MaxConnsPerHost 期望 512 (fasthttp 推荐), 实际 %d", cfg.Performance.Transport.MaxConnsPerHost)
} }
} }

View File

@ -261,12 +261,17 @@ type HealthMatchConfig struct {
// 使用示例: // 使用示例:
// //
// timeout: // timeout:
// connect: 5s // dial: 5s # TCP 连接建立超时
// connect: 30s # 总连接超时(含 DNS/TLS
// read: 30s // read: 30s
// write: 30s // write: 30s
type ProxyTimeout struct { type ProxyTimeout struct {
// Connect 连接超时 // Dial TCP 连接建立超时
// 建立到后端服务器的连接超时 // 建立 TCP 连接的超时时间(不含 DNS 和 TLS
Dial time.Duration `yaml:"dial"`
// Connect 总连接超时
// 从开始连接到连接可用的总超时时间(含 DNS 和 TLS
Connect time.Duration `yaml:"connect"` Connect time.Duration `yaml:"connect"`
// Read 读取超时 // Read 读取超时

View File

@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"strings" "strings"
"time" "time"
@ -771,8 +772,16 @@ func (h *StaticHandler) validateSymlink(filePath string) error {
} }
// generateETag 基于 ModTime 和 Size 生成 ETag。 // generateETag 基于 ModTime 和 Size 生成 ETag。
// 使用 strconv.AppendInt 避免 fmt.Sprintf 分配。
func generateETag(modTime time.Time, size int64) string { func generateETag(modTime time.Time, size int64) string {
return fmt.Sprintf("\"%x-%x\"", modTime.Unix(), size) var buf [32]byte
b := buf[:0]
b = append(b, '"')
b = strconv.AppendInt(b, modTime.Unix(), 16)
b = append(b, '-')
b = strconv.AppendInt(b, size, 16)
b = append(b, '"')
return string(b)
} }
// isNotModified 检查条件请求是否匹配(返回 true 表示应返回 304 // isNotModified 检查条件请求是否匹配(返回 true 表示应返回 304

View File

@ -16,8 +16,8 @@ import (
"net" "net"
"net/http" "net/http"
"rua.plus/lolly/internal/adapter"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"rua.plus/lolly/internal/adapter"
) )
// Adapter 将 fasthttp.RequestHandler 适配为 http.Handler。 // Adapter 将 fasthttp.RequestHandler 适配为 http.Handler。

View File

@ -9,33 +9,18 @@ import (
"rua.plus/lolly/internal/loadbalance" "rua.plus/lolly/internal/loadbalance"
) )
// buildCacheKey 构建缓存键字符串。
//
// 使用请求方法和完整请求 URI 作为缓存键。
// 该函数保留用于日志记录和调试场景。
//
// 参数:
// - ctx: FastHTTP 请求上下文
//
// 返回值:
// - string: 缓存键(格式 "METHOD:URI"
func (p *Proxy) buildCacheKey(ctx *fasthttp.RequestCtx) string {
// 使用请求方法和路径作为缓存键
return string(ctx.Request.Header.Method()) + ":" + string(ctx.Request.URI().RequestURI())
}
// buildCacheKeyHash 使用 FNV-64a 计算缓存键的 uint64 哈希值。 // buildCacheKeyHash 使用 FNV-64a 计算缓存键的 uint64 哈希值。
// 返回哈希值和原始字符串键。 // 使用零分配方式构建哈希,避免 []byte(origKey) 转换。
// 注意:此函数会先构建字符串键再哈希,存在双重分配。
// 对于只需要哈希值的场景,使用 buildCacheKeyHashValue 代替。
func (p *Proxy) buildCacheKeyHash(ctx *fasthttp.RequestCtx) (uint64, string) { func (p *Proxy) buildCacheKeyHash(ctx *fasthttp.RequestCtx) (uint64, string) {
// 构建原始 key
origKey := p.buildCacheKey(ctx)
// 使用 FNV-64a 计算哈希
h := fnv.New64a() h := fnv.New64a()
h.Write([]byte(origKey)) h.Write(ctx.Request.Header.Method())
return h.Sum64(), origKey h.Write([]byte(":"))
h.Write(ctx.Request.URI().RequestURI())
hash := h.Sum64()
// 仅在需要 origKey 时构建字符串
origKey := b2s(ctx.Request.Header.Method()) + ":" + b2s(ctx.Request.URI().RequestURI())
return hash, origKey
} }
// buildCacheKeyHashValue 直接计算缓存键的哈希值,零字符串分配。 // buildCacheKeyHashValue 直接计算缓存键的哈希值,零字符串分配。
@ -111,12 +96,12 @@ func (p *Proxy) backgroundRefresh(ctx *fasthttp.RequestCtx, target *loadbalance.
// 处理 304 Not Modified 响应 // 处理 304 Not Modified 响应
if resp.StatusCode() == 304 { if resp.StatusCode() == 304 {
newHeaders := make(map[string]string) newHeaders := make(map[string]string, 5) // 预分配,通常只有 Last-Modified 和 ETag
if lm := resp.Header.Peek("Last-Modified"); len(lm) > 0 { if lm := resp.Header.Peek("Last-Modified"); len(lm) > 0 {
newHeaders["Last-Modified"] = string(lm) newHeaders["Last-Modified"] = b2s(lm)
} }
if et := resp.Header.Peek("ETag"); len(et) > 0 { if et := resp.Header.Peek("ETag"); len(et) > 0 {
newHeaders["ETag"] = string(et) newHeaders["ETag"] = b2s(et)
} }
p.cache.RefreshTTL(hashKey, origKey, newHeaders) p.cache.RefreshTTL(hashKey, origKey, newHeaders)
return return

View File

@ -75,8 +75,9 @@ func (p *Proxy) modifyResponseHeaders(ctx *fasthttp.RequestCtx) {
if len(passSet) > 0 { if len(passSet) > 0 {
var toDelete []string var toDelete []string
for key := range respHeaders.All() { for key := range respHeaders.All() {
if !passSet[string(key)] { // 不在白名单中的应该删除
toDelete = append(toDelete, string(key)) if !isInWhitelist(key, passSet) {
toDelete = append(toDelete, b2s(key))
} }
} }
for _, k := range toDelete { for _, k := range toDelete {

View File

@ -234,8 +234,8 @@ func (h *HealthChecker) checkTarget(target *loadbalance.Target) {
return return
} }
// 提取响应头(小写 key // 提取响应头(小写 key,预分配容量
headers := make(map[string]string) headers := make(map[string]string, 20)
for key, value := range resp.Header.All() { for key, value := range resp.Header.All() {
headers[string(key)] = string(value) headers[string(key)] = string(value)
} }

View File

@ -276,7 +276,7 @@ func createHostClient(targetURL string, timeout config.ProxyTimeout, transportCf
// 默认值 // 默认值
maxIdleConnDuration := 90 * time.Second maxIdleConnDuration := 90 * time.Second
maxConns := 100 maxConns := 512 // fasthttp 推荐值 DefaultMaxConnsPerHost
// 应用 Transport 配置 // 应用 Transport 配置
if transportCfg != nil { if transportCfg != nil {
@ -301,17 +301,25 @@ func createHostClient(targetURL string, timeout config.ProxyTimeout, transportCf
SecureErrorLogMessage: false, SecureErrorLogMessage: false,
} }
// ProxyBind使用指定本地地址作为出站连接源 // Dial timeout如果配置了 Dial使用它作为 TCP 连接建立超时
if proxyBind != "" { // 否则使用 Connect 作为向后兼容
localAddr := proxyBind dialTimeout := timeout.Dial
dialTimeout := client.MaxConnWaitTimeout if dialTimeout <= 0 {
if dialTimeout <= 0 { dialTimeout = timeout.Connect
dialTimeout = 30 * time.Second }
} if dialTimeout <= 0 {
dialTimeout = 30 * time.Second // 最终默认值
}
// 设置自定义 Dial 函数以使用 Dial timeout
// 如果有 ProxyBind 或需要自定义 Dial timeout
if proxyBind != "" || timeout.Dial > 0 {
client.Dial = func(addr string) (net.Conn, error) { client.Dial = func(addr string) (net.Conn, error) {
dialer := &net.Dialer{ dialer := &net.Dialer{
LocalAddr: &net.TCPAddr{IP: net.ParseIP(localAddr)}, Timeout: dialTimeout,
Timeout: dialTimeout, }
if proxyBind != "" {
dialer.LocalAddr = &net.TCPAddr{IP: net.ParseIP(proxyBind)}
} }
return dialer.Dial("tcp", addr) return dialer.Dial("tcp", addr)
} }
@ -456,9 +464,14 @@ func FinalizeUpstreamVars(vc *variable.Context, upstreamAddr string, upstreamSta
// 如果没有可用的健康目标,返回 502 Bad Gateway。 // 如果没有可用的健康目标,返回 502 Bad Gateway。
// 如果后端请求失败,根据 next_upstream 配置尝试下一个目标。 // 如果后端请求失败,根据 next_upstream 配置尝试下一个目标。
func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) { func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
// DEBUG: 打印请求信息 // DEBUG: 打印请求信息(条件化避免非 Debug 级别的 string() 分配)
logging.Debug().Msgf("[PROXY] 收到请求: path=%s, host=%s, method=%s", if logging.Debug().Enabled() {
string(ctx.Path()), string(ctx.Host()), string(ctx.Method())) logging.Debug().
Str("path", b2s(ctx.Path())).
Str("host", b2s(ctx.Host())).
Str("method", b2s(ctx.Method())).
Msg("[PROXY] 收到请求")
}
// 上游变量捕获 // 上游变量捕获
var upstreamAddr string var upstreamAddr string
@ -517,8 +530,13 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
attemptedTargets = append(attemptedTargets, target) attemptedTargets = append(attemptedTargets, target)
// DEBUG: 打印选中的目标 // DEBUG: 打印选中的目标(条件化避免分配)
logging.Debug().Msgf("[PROXY] 选中目标: url=%s, healthy=%v", target.URL, target.Healthy.Load()) if logging.Debug().Enabled() {
logging.Debug().
Str("url", target.URL).
Bool("healthy", target.Healthy.Load()).
Msg("[PROXY] 选中目标")
}
// 获取所选目标的客户端 // 获取所选目标的客户端
client := p.getClient(target.URL) client := p.getClient(target.URL)
@ -531,8 +549,13 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
continue continue
} }
// DEBUG: 打印客户端信息 // DEBUG: 打印客户端信息(条件化避免分配)
logging.Debug().Msgf("[PROXY] client 信息: Addr=%s, IsTLS=%v", client.Addr, client.IsTLS) if logging.Debug().Enabled() {
logging.Debug().
Str("addr", client.Addr).
Bool("isTLS", client.IsTLS).
Msg("[PROXY] client 信息")
}
// 增加连接计数(用于最少连接数负载均衡) // 增加连接计数(用于最少连接数负载均衡)
loadbalance.IncrementConnections(target) loadbalance.IncrementConnections(target)
@ -598,9 +621,14 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
} }
req.SetRequestURIBytes(targetURI) req.SetRequestURIBytes(targetURI)
// DEBUG: 打印请求头 // DEBUG: 打印请求头(条件化避免分配)
logging.Debug().Msgf("[PROXY] 请求准备完成: Host=%s, URI=%s, targetURI=%s", if logging.Debug().Enabled() {
string(req.Header.Host()), string(req.RequestURI()), targetURI) logging.Debug().
Str("host", b2s(req.Header.Host())).
Str("uri", b2s(req.RequestURI())).
Str("targetURI", b2s(targetURI)).
Msg("[PROXY] 请求准备完成")
}
// 尝试从缓存获取(如果启用) // 尝试从缓存获取(如果启用)
if p.cache != nil && attempt == 0 { if p.cache != nil && attempt == 0 {
@ -679,7 +707,12 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
if err != nil { if err != nil {
logging.Error().Msgf("[PROXY] 请求失败: url=%s, err=%v, errType=%T", target.URL, err, err) logging.Error().Msgf("[PROXY] 请求失败: url=%s, err=%v, errType=%T", target.URL, err, err)
} else { } else {
logging.Debug().Msgf("[PROXY] 请求成功: url=%s, status=%d", target.URL, ctx.Response.StatusCode()) if logging.Debug().Enabled() {
logging.Debug().
Str("url", target.URL).
Int("status", ctx.Response.StatusCode()).
Msg("[PROXY] 请求成功")
}
} }
if err != nil { if err != nil {
@ -805,17 +838,25 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
var lastModified, etag string var lastModified, etag string
for key, value := range ctx.Response.Header.All() { for key, value := range ctx.Response.Header.All() {
headerName := strings.ToLower(string(key)) // 使用 bytes.EqualFold 进行大小写不敏感比较,避免 strings.ToLower 分配
if ignoreSet[headerName] { shouldIgnore := false
for _, h := range p.config.Cache.CacheIgnoreHeaders {
if bytes.EqualFold(key, s2b(h)) {
shouldIgnore = true
break
}
}
if shouldIgnore {
continue continue
} }
headers[string(key)] = string(value) // 使用 b2s 零分配转换
headers[b2s(key)] = b2s(value)
switch headerName { // 检查特定头部(使用 bytes.EqualFold
case "last-modified": if bytes.EqualFold(key, []byte("last-modified")) {
lastModified = string(value) lastModified = b2s(value)
case "etag": } else if bytes.EqualFold(key, []byte("etag")) {
etag = string(value) etag = b2s(value)
} }
} }
p.cache.Set(hashKey, origKey, ctx.Response.Body(), headers, statusCode, p.getCacheDuration(statusCode)) p.cache.Set(hashKey, origKey, ctx.Response.Body(), headers, statusCode, p.getCacheDuration(statusCode))
@ -1010,4 +1051,3 @@ func extractHostFromURL(urlStr string) string {
return host return host
} }

37
internal/proxy/utils.go Normal file
View File

@ -0,0 +1,37 @@
package proxy
import (
"bytes"
"unsafe"
)
// b2s converts byte slice to string without allocation.
// WARNING: The returned string shares memory with the original slice.
// Do not modify the slice after calling this function.
func b2s(b []byte) string {
if len(b) == 0 {
return ""
}
return unsafe.String(&b[0], len(b))
}
// s2b converts string to byte slice without allocation.
// WARNING: The returned slice shares memory with the original string.
// Do not modify the slice contents.
func s2b(s string) []byte {
if s == "" {
return nil
}
return unsafe.Slice(unsafe.StringData(s), len(s))
}
// isInWhitelist checks if a header key is in the whitelist.
// Uses bytes.EqualFold for case-insensitive comparison without allocation.
func isInWhitelist(key []byte, whitelist map[string]bool) bool {
for wKey := range whitelist {
if bytes.EqualFold(key, s2b(wKey)) {
return true
}
}
return false
}

View File

@ -36,6 +36,15 @@ import (
"rua.plus/lolly/internal/netutil" "rua.plus/lolly/internal/netutil"
) )
// wsBufPool WebSocket 数据转发 buffer pool。
// 复用 32KB buffer 避免每次 copyData 调用分配。
var wsBufPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, 32*1024)
return &buf
},
}
// WebSocketBridge WebSocket 桥接器。 // WebSocketBridge WebSocket 桥接器。
// //
// 在客户端和后端服务器之间建立双向数据通道,透明转发 WebSocket 数据帧。 // 在客户端和后端服务器之间建立双向数据通道,透明转发 WebSocket 数据帧。
@ -118,7 +127,10 @@ func (b *WebSocketBridge) Bridge() error {
// 返回值: // 返回值:
// - error: 读写错误,连接正常关闭返回 nil // - error: 读写错误,连接正常关闭返回 nil
func (b *WebSocketBridge) copyData(dst, src net.Conn, direction string) error { func (b *WebSocketBridge) copyData(dst, src net.Conn, direction string) error {
buf := make([]byte, 32*1024) // 32KB 缓冲区 bufPtr := wsBufPool.Get().(*[]byte)
buf := *bufPtr
defer wsBufPool.Put(bufPtr)
for { for {
n, err := src.Read(buf) n, err := src.Read(buf)
if err != nil { if err != nil {