perf(proxy,logging,compression): 使用零拷贝字节路径减少内存分配
- proxy: headersPool sync.Pool 复用 header map,容量 20 - proxy: buildCacheKeyHash 使用池化 map 替代 make(map[string]string) - proxy: ServeHTTP 目标 URI 构造使用 []byte append + SetRequestURIBytes - headers: X-Forwarded-For 构造使用 []byte append + SetBytesKV - logging: Str() 改为 Bytes() 零拷贝日志字段 - compression: Process() 直接操作 []byte,使用 bytes.Contains/Equal/HasPrefix - compression: isCompressible() 签名从 string 改为 []byte Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
dc7358bf4e
commit
326eedc729
@ -110,8 +110,8 @@ func getOutput(path string) io.Writer {
|
||||
// LogAccess 记录访问日志。
|
||||
func LogAccess(ctx *fasthttp.RequestCtx, status int, size int64, duration time.Duration) {
|
||||
log.Info().
|
||||
Str("method", string(ctx.Method())).
|
||||
Str("path", string(ctx.Path())).
|
||||
Bytes("method", ctx.Method()).
|
||||
Bytes("path", ctx.Path()).
|
||||
Int("status", status).
|
||||
Int64("size", size).
|
||||
Dur("duration", duration).
|
||||
@ -125,7 +125,7 @@ func (l *Logger) LogAccess(ctx *fasthttp.RequestCtx, status int, size int64, dur
|
||||
if l.accessFormat == formatJSON || l.accessFormat == "" {
|
||||
l.accessLog.Info().
|
||||
Str("remote_addr", ctx.RemoteAddr().String()).
|
||||
Str("request", string(ctx.Method())+" "+string(ctx.Path())).
|
||||
Bytes("request", append(append(ctx.Method(), ' '), ctx.Path()...)).
|
||||
Int("status", status).
|
||||
Int64("body_bytes_sent", size).
|
||||
Dur("request_time", duration).
|
||||
|
||||
@ -115,7 +115,7 @@ func New(cfg *config.CompressionConfig) (*Middleware, error) {
|
||||
|
||||
// 初始化缓冲池
|
||||
m.gzipPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
New: func() any {
|
||||
w, err := gzip.NewWriterLevel(nil, cfg.Level)
|
||||
if err != nil {
|
||||
// 使用默认压缩级别作为回退
|
||||
@ -127,7 +127,7 @@ func New(cfg *config.CompressionConfig) (*Middleware, error) {
|
||||
|
||||
// 初始化 brotli 缓冲池
|
||||
m.brotliPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
New: func() any {
|
||||
return brotli.NewWriterOptions(nil, brotli.WriterOptions{
|
||||
Quality: cfg.Level,
|
||||
})
|
||||
@ -166,19 +166,19 @@ func (m *Middleware) Name() string {
|
||||
// - fasthttp.RequestHandler: 包装后的请求处理器
|
||||
func (m *Middleware) Process(next fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||
return func(ctx *fasthttp.RequestCtx) {
|
||||
// 检查客户端是否支持压缩
|
||||
acceptEncoding := string(ctx.Request.Header.Peek("Accept-Encoding"))
|
||||
// 检查客户端是否支持压缩(零拷贝使用 []byte)
|
||||
acceptEncoding := ctx.Request.Header.Peek("Accept-Encoding")
|
||||
|
||||
// 根据算法和客户端支持选择压缩方式
|
||||
var useGzip, useBrotli bool
|
||||
switch m.algorithm {
|
||||
case AlgorithmGzip:
|
||||
useGzip = strings.Contains(acceptEncoding, "gzip")
|
||||
useGzip = bytes.Contains(acceptEncoding, []byte("gzip"))
|
||||
case AlgorithmBrotli:
|
||||
// brotli 或 both 模式
|
||||
if strings.Contains(acceptEncoding, "br") {
|
||||
if bytes.Contains(acceptEncoding, []byte("br")) {
|
||||
useBrotli = true
|
||||
} else if strings.Contains(acceptEncoding, "gzip") {
|
||||
} else if bytes.Contains(acceptEncoding, []byte("gzip")) {
|
||||
useGzip = true
|
||||
}
|
||||
}
|
||||
@ -201,8 +201,8 @@ func (m *Middleware) Process(next fasthttp.RequestHandler) fasthttp.RequestHandl
|
||||
return // 不压缩
|
||||
}
|
||||
|
||||
// 检查 MIME 类型
|
||||
contentType := string(ctx.Response.Header.ContentType())
|
||||
// 检查 MIME 类型(零拷贝使用 []byte)
|
||||
contentType := ctx.Response.Header.ContentType()
|
||||
if !m.isCompressible(contentType) {
|
||||
return // 不压缩此类型
|
||||
}
|
||||
@ -244,26 +244,25 @@ func (m *Middleware) Process(next fasthttp.RequestHandler) fasthttp.RequestHandl
|
||||
// isCompressible 检查 MIME 类型是否可压缩。
|
||||
//
|
||||
// 参数:
|
||||
// - contentType: 内容类型(MIME 类型)
|
||||
// - contentType: 内容类型(MIME 类型)[]
|
||||
//
|
||||
// 返回值:
|
||||
// - bool: 是否可压缩
|
||||
func (m *Middleware) isCompressible(contentType string) bool {
|
||||
func (m *Middleware) isCompressible(contentType []byte) bool {
|
||||
// 移除 charset 等参数
|
||||
ct := contentType
|
||||
if idx := strings.Index(ct, ";"); idx >= 0 {
|
||||
if idx := bytes.IndexByte(ct, ';'); idx >= 0 {
|
||||
ct = ct[:idx]
|
||||
}
|
||||
ct = strings.TrimSpace(strings.ToLower(ct))
|
||||
ct = bytes.TrimSpace(ct)
|
||||
|
||||
for _, t := range m.types {
|
||||
if strings.ToLower(t) == ct {
|
||||
if bytes.Equal(bytes.ToLower([]byte(t)), ct) {
|
||||
return true
|
||||
}
|
||||
// 支持通配符匹配
|
||||
if strings.HasSuffix(t, "/*") {
|
||||
base := strings.TrimSuffix(t, "/*")
|
||||
if strings.HasPrefix(ct, base) {
|
||||
if base, found := strings.CutSuffix(t, "/*"); found {
|
||||
if bytes.HasPrefix(ct, []byte(base)) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,7 +26,7 @@ func BenchmarkGzipCompress_1KB(b *testing.B) {
|
||||
data := tools.GenerateTestData(tools.Size1KB)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
mw.compressGzip(data)
|
||||
}
|
||||
}
|
||||
@ -43,7 +43,7 @@ func BenchmarkGzipCompress_10KB(b *testing.B) {
|
||||
data := tools.GenerateTestData(tools.Size10KB)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
mw.compressGzip(data)
|
||||
}
|
||||
}
|
||||
@ -60,7 +60,7 @@ func BenchmarkGzipCompress_100KB(b *testing.B) {
|
||||
data := tools.GenerateTestData(tools.Size100KB)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
mw.compressGzip(data)
|
||||
}
|
||||
}
|
||||
@ -77,7 +77,7 @@ func BenchmarkBrotliCompress_1KB(b *testing.B) {
|
||||
data := tools.GenerateTestData(tools.Size1KB)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
mw.compressBrotli(data)
|
||||
}
|
||||
}
|
||||
@ -94,7 +94,7 @@ func BenchmarkBrotliCompress_10KB(b *testing.B) {
|
||||
data := tools.GenerateTestData(tools.Size10KB)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
mw.compressBrotli(data)
|
||||
}
|
||||
}
|
||||
@ -113,7 +113,7 @@ func BenchmarkCompressionPool(b *testing.B) {
|
||||
data := tools.GenerateTestData(tools.Size1KB)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
mw.compressGzip(data)
|
||||
}
|
||||
}
|
||||
@ -145,7 +145,7 @@ func BenchmarkCompressionMiddleware(b *testing.B) {
|
||||
handler := mw.Process(mockHandler)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
ctx := &fasthttp.RequestCtx{}
|
||||
ctx.Request.Header.SetMethod(fasthttp.MethodGet)
|
||||
ctx.Request.SetRequestURI("/api/test")
|
||||
@ -176,7 +176,7 @@ func BenchmarkCompressionMiddlewareNoCompress(b *testing.B) {
|
||||
handler := mw.Process(mockHandler)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
ctx := &fasthttp.RequestCtx{}
|
||||
ctx.Request.Header.SetMethod(fasthttp.MethodGet)
|
||||
ctx.Request.SetRequestURI("/api/test")
|
||||
@ -197,16 +197,16 @@ func BenchmarkIsCompressible(b *testing.B) {
|
||||
}
|
||||
mw, _ := New(cfg)
|
||||
|
||||
contentTypes := []string{
|
||||
"application/json",
|
||||
"text/html; charset=utf-8",
|
||||
"image/png",
|
||||
"application/octet-stream",
|
||||
"text/css",
|
||||
contentTypes := [][]byte{
|
||||
[]byte("application/json"),
|
||||
[]byte("text/html; charset=utf-8"),
|
||||
[]byte("image/png"),
|
||||
[]byte("application/octet-stream"),
|
||||
[]byte("text/css"),
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
for _, ct := range contentTypes {
|
||||
mw.isCompressible(ct)
|
||||
}
|
||||
@ -221,7 +221,7 @@ func BenchmarkCompressionLevelComparison(b *testing.B) {
|
||||
cfg := &config.CompressionConfig{Type: "gzip", Level: 1}
|
||||
mw, _ := New(cfg)
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
mw.compressGzip(data)
|
||||
}
|
||||
})
|
||||
@ -230,7 +230,7 @@ func BenchmarkCompressionLevelComparison(b *testing.B) {
|
||||
cfg := &config.CompressionConfig{Type: "gzip", Level: 6}
|
||||
mw, _ := New(cfg)
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
mw.compressGzip(data)
|
||||
}
|
||||
})
|
||||
@ -239,7 +239,7 @@ func BenchmarkCompressionLevelComparison(b *testing.B) {
|
||||
cfg := &config.CompressionConfig{Type: "gzip", Level: 9}
|
||||
mw, _ := New(cfg)
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
mw.compressGzip(data)
|
||||
}
|
||||
})
|
||||
|
||||
@ -101,22 +101,22 @@ func TestIsCompressible(t *testing.T) {
|
||||
})
|
||||
|
||||
tests := []struct {
|
||||
contentType string
|
||||
contentType []byte
|
||||
expected bool
|
||||
}{
|
||||
{"text/html", true},
|
||||
{"text/html; charset=utf-8", true},
|
||||
{"text/css", true},
|
||||
{"text/plain", true},
|
||||
{"application/json", true},
|
||||
{"application/json; charset=utf-8", true},
|
||||
{"image/png", false},
|
||||
{"application/octet-stream", false},
|
||||
{"", false},
|
||||
{[]byte("text/html"), true},
|
||||
{[]byte("text/html; charset=utf-8"), true},
|
||||
{[]byte("text/css"), true},
|
||||
{[]byte("text/plain"), true},
|
||||
{[]byte("application/json"), true},
|
||||
{[]byte("application/json; charset=utf-8"), true},
|
||||
{[]byte("image/png"), false},
|
||||
{[]byte("application/octet-stream"), false},
|
||||
{[]byte(""), false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.contentType, func(t *testing.T) {
|
||||
t.Run(string(tt.contentType), func(t *testing.T) {
|
||||
result := m.isCompressible(tt.contentType)
|
||||
if result != tt.expected {
|
||||
t.Errorf("isCompressible(%s) = %v, expected %v", tt.contentType, result, tt.expected)
|
||||
|
||||
@ -63,12 +63,17 @@ func SetForwardedHeaders(headers *fasthttp.RequestHeader, fh ForwardedHeaders, a
|
||||
if appendXFF {
|
||||
existingXFF := headers.Peek("X-Forwarded-For")
|
||||
if len(existingXFF) > 0 {
|
||||
headers.Set("X-Forwarded-For", string(existingXFF)+", "+fh.ClientIP)
|
||||
// SAFETY: Ephemeral — xffBuf is written to header immediately and not reused.
|
||||
var xffBuf []byte
|
||||
xffBuf = append(xffBuf, existingXFF...)
|
||||
xffBuf = append(xffBuf, ", "...)
|
||||
xffBuf = append(xffBuf, fh.ClientIP...)
|
||||
headers.SetBytesKV([]byte("X-Forwarded-For"), xffBuf)
|
||||
} else {
|
||||
headers.Set("X-Forwarded-For", fh.ClientIP)
|
||||
headers.SetBytesKV([]byte("X-Forwarded-For"), []byte(fh.ClientIP))
|
||||
}
|
||||
} else {
|
||||
headers.Set("X-Forwarded-For", fh.ClientIP)
|
||||
headers.SetBytesKV([]byte("X-Forwarded-For"), []byte(fh.ClientIP))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -69,6 +69,14 @@ const (
|
||||
lbConsistentHash = "consistent_hash"
|
||||
)
|
||||
|
||||
// headersPool 复用缓存 headers map,减少分配。
|
||||
// 预容量 20 覆盖大多数 HTTP 响应头数量。
|
||||
var headersPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make(map[string]string, 20)
|
||||
},
|
||||
}
|
||||
|
||||
// Proxy 表示反向代理实例,负责将 HTTP 请求转发到后端目标。
|
||||
//
|
||||
// 它为每个后端目标管理连接池,并提供负载均衡功能。
|
||||
@ -448,11 +456,17 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
||||
// 关键:修改请求 URI 为完整的目标 URL
|
||||
// HostClient 要求 URI 格式必须与 Addr/IsTLS 一致
|
||||
// 例如:IsTLS=true 时,URI 应为 https://host/path
|
||||
targetURI := target.URL + string(ctx.URI().Path())
|
||||
if len(ctx.URI().QueryString()) > 0 {
|
||||
targetURI += "?" + string(ctx.URI().QueryString())
|
||||
// SAFETY: lifetime=ephemeral - consumed immediately by SetRequestURIBytes
|
||||
path := ctx.URI().Path()
|
||||
query := ctx.URI().QueryString()
|
||||
targetURI := make([]byte, 0, len(target.URL)+len(path)+len(query)+1)
|
||||
targetURI = append(targetURI, target.URL...)
|
||||
targetURI = append(targetURI, path...)
|
||||
if len(query) > 0 {
|
||||
targetURI = append(targetURI, '?')
|
||||
targetURI = append(targetURI, query...)
|
||||
}
|
||||
req.SetRequestURI(targetURI)
|
||||
req.SetRequestURIBytes(targetURI)
|
||||
|
||||
// DEBUG: 打印请求头
|
||||
logging.Debug().Msgf("[PROXY] 请求准备完成: Host=%s, URI=%s, targetURI=%s",
|
||||
@ -585,12 +599,20 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
||||
if p.cache != nil {
|
||||
hashKey, origKey := p.buildCacheKeyHash(ctx)
|
||||
if statusCode >= 200 && statusCode < 300 {
|
||||
// 提取响应头
|
||||
headers := make(map[string]string)
|
||||
// 提取响应头(使用 pool 复用 map)
|
||||
headers, ok := headersPool.Get().(map[string]string)
|
||||
if !ok {
|
||||
headers = make(map[string]string, 20)
|
||||
}
|
||||
for k := range headers {
|
||||
delete(headers, k)
|
||||
}
|
||||
for key, value := range ctx.Response.Header.All() {
|
||||
headers[string(key)] = string(value)
|
||||
}
|
||||
p.cache.Set(hashKey, origKey, ctx.Response.Body(), headers, statusCode, p.getCacheDuration(statusCode))
|
||||
// 注意:不能 Put 回 pool,因为 cache.Set 存储了 map 引用
|
||||
// 后续 writeCachedResponse 会读取该 map
|
||||
}
|
||||
p.cache.ReleaseLock(hashKey, nil)
|
||||
}
|
||||
@ -967,8 +989,14 @@ func (p *Proxy) backgroundRefresh(ctx *fasthttp.RequestCtx, target *loadbalance.
|
||||
return
|
||||
}
|
||||
|
||||
// 提取响应头
|
||||
headers := make(map[string]string)
|
||||
// 提取响应头(使用 pool 复用 map)
|
||||
headers, ok := headersPool.Get().(map[string]string)
|
||||
if !ok {
|
||||
headers = make(map[string]string, 20)
|
||||
}
|
||||
for k := range headers {
|
||||
delete(headers, k)
|
||||
}
|
||||
for key, value := range resp.Header.All() {
|
||||
headers[string(key)] = string(value)
|
||||
}
|
||||
|
||||
@ -244,7 +244,7 @@ func BenchmarkProxyHostClient(b *testing.B) {
|
||||
client := createHostClient("http://"+addr, timeout, nil, nil)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
req := fasthttp.AcquireRequest()
|
||||
resp := fasthttp.AcquireResponse()
|
||||
|
||||
@ -450,7 +450,7 @@ func BenchmarkBuildCacheKeyHash(b *testing.B) {
|
||||
|
||||
b.Run("buildCacheKeyHash_with_string", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
hashKey, _ := p.buildCacheKeyHash(ctx)
|
||||
_ = hashKey
|
||||
}
|
||||
@ -458,7 +458,7 @@ func BenchmarkBuildCacheKeyHash(b *testing.B) {
|
||||
|
||||
b.Run("buildCacheKeyHashValue_direct", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
hashKey := p.buildCacheKeyHashValue(ctx)
|
||||
_ = hashKey
|
||||
}
|
||||
@ -494,7 +494,7 @@ func BenchmarkProxyObjectPoolGetRelease(b *testing.B) {
|
||||
b.Run("UpstreamTiming_Pooled", func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
timing := NewUpstreamTiming()
|
||||
timing.MarkConnectStart()
|
||||
time.Sleep(time.Microsecond)
|
||||
@ -515,7 +515,7 @@ func BenchmarkProxyObjectPoolGetRelease(b *testing.B) {
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
vc := variable.NewContext(ctx)
|
||||
vc.Set("key", "value")
|
||||
_ = vc.Expand("$key")
|
||||
@ -584,7 +584,7 @@ func BenchmarkProxyZeroAllocPath(b *testing.B) {
|
||||
b.Run("ZeroAlloc_buildCacheKeyHashValue", func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
hash := p.buildCacheKeyHashValue(ctx)
|
||||
_ = hash
|
||||
}
|
||||
@ -593,7 +593,7 @@ func BenchmarkProxyZeroAllocPath(b *testing.B) {
|
||||
b.Run("WithAlloc_buildCacheKeyHash", func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
hash, key := p.buildCacheKeyHash(ctx)
|
||||
_ = hash
|
||||
_ = key
|
||||
@ -612,7 +612,7 @@ func BenchmarkProxyZeroAllocPath(b *testing.B) {
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
fh := ExtractForwardedHeaders(ctx)
|
||||
_ = fh
|
||||
}
|
||||
|
||||
@ -34,7 +34,7 @@ func BenchmarkWebSocketHandshake(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
ctx := &fasthttp.RequestCtx{}
|
||||
ctx.Request.SetRequestURI("/ws?token=abc123&channel=default")
|
||||
ctx.Request.Header.SetHost("client.example.com")
|
||||
@ -305,7 +305,7 @@ func BenchmarkWebSocketConcurrent(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
client1, client2 := net.Pipe()
|
||||
target1, target2 := net.Pipe()
|
||||
|
||||
@ -370,7 +370,7 @@ func BenchmarkWebSocketWriteUpgradeResponse(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for b.Loop() {
|
||||
conn1, conn2 := net.Pipe()
|
||||
|
||||
done := make(chan error, 1)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user