From 9cbc380de5845d0bf1270622b3145d7c6b233b40 Mon Sep 17 00:00:00 2001 From: xfy Date: Thu, 16 Apr 2026 11:09:26 +0800 Subject: [PATCH] =?UTF-8?q?perf(http3,compression):=20=E5=AF=B9=E8=B1=A1?= =?UTF-8?q?=E6=B1=A0=E5=8C=96=E5=92=8C=E6=B5=81=E5=BC=8F=E5=8E=8B=E7=BC=A9?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit http3: - Adapter 添加 ctxPool 复用 RequestCtx - resetContext 方法重置状态避免污染 compression: - 大响应(>64KB)使用 SetBodyStreamWriter 流式压缩 - 消除 compressed buffer 分配,降低内存峰值 - 新增 streamGzip/streamBrotli 流式方法 - 添加流式压缩测试验证功能正确性 Co-Authored-By: Claude Opus 4.6 --- internal/http3/adapter.go | 33 +++- .../middleware/compression/compression.go | 96 +++++++++++- .../compression/compression_test.go | 147 ++++++++++++++++++ 3 files changed, 264 insertions(+), 12 deletions(-) diff --git a/internal/http3/adapter.go b/internal/http3/adapter.go index 37ca832..86869ed 100644 --- a/internal/http3/adapter.go +++ b/internal/http3/adapter.go @@ -30,6 +30,9 @@ const ( // 由于 quic-go 使用标准库的 http.Handler 接口, // 而 lolly 使用 fasthttp,需要通过适配层进行转换。 type Adapter struct { + // ctxPool 用于复用 fasthttp.RequestCtx 对象 + ctxPool sync.Pool + // bufferPool 用于复用字节缓冲区(流式处理优化) bufferPool sync.Pool } @@ -37,6 +40,11 @@ type Adapter struct { // NewAdapter 创建新的适配器。 func NewAdapter() *Adapter { return &Adapter{ + ctxPool: sync.Pool{ + New: func() interface{} { + return &fasthttp.RequestCtx{} + }, + }, bufferPool: sync.Pool{ New: func() interface{} { buf := make([]byte, 4096) // 4KB 初始缓冲区 @@ -58,11 +66,16 @@ func NewAdapter() *Adapter { // - http.Handler: 标准库兼容的 HTTP 处理器 func (a *Adapter) Wrap(handler fasthttp.RequestHandler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // 直接创建 RequestCtx - ctx := &fasthttp.RequestCtx{} + // 从池中获取 RequestCtx + ctx, ok := a.ctxPool.Get().(*fasthttp.RequestCtx) + if !ok { + // 如果类型断言失败,创建新的上下文(不应该发生,但为了安全) + ctx = &fasthttp.RequestCtx{} + } + defer a.ctxPool.Put(ctx) - // 初始化 ctx(fasthttp 的 RequestCtx 需要 Init 方法) - ctx.Init(&fasthttp.Request{}, nil, nil) + // 重置 ctx 状态以避免污染 + a.resetContext(ctx) // 转换请求 a.convertRequest(r, ctx) @@ -78,6 +91,18 @@ func (a *Adapter) Wrap(handler fasthttp.RequestHandler) http.Handler { }) } +// resetContext 重置 fasthttp.RequestCtx 状态。 +// +// 参数: +// - ctx: 需要重置的上下文 +func (a *Adapter) resetContext(ctx *fasthttp.RequestCtx) { + // 清空请求头 + ctx.Request.Header.DisableNormalizing() + ctx.Request.Reset() + ctx.Response.Reset() + ctx.SetUserValueBytes(nil, nil) +} + // convertRequest 将 net/http.Request 转换为 fasthttp.RequestCtx。 // // 参数: diff --git a/internal/middleware/compression/compression.go b/internal/middleware/compression/compression.go index 804543a..a52b26a 100644 --- a/internal/middleware/compression/compression.go +++ b/internal/middleware/compression/compression.go @@ -18,6 +18,7 @@ package compression import ( + "bufio" "bytes" "strings" "sync" @@ -28,6 +29,11 @@ import ( "rua.plus/lolly/internal/config" ) +// streamingThreshold 流式压缩阈值。 +// 响应体超过此大小时使用 SetBodyStreamWriter 流式压缩, +// 消除 compressed buffer 分配,降低内存峰值。 +const streamingThreshold = 64 * 1024 // 64KB + // Algorithm 压缩算法类型。 type Algorithm int @@ -202,21 +208,35 @@ func (m *Middleware) Process(next fasthttp.RequestHandler) fasthttp.RequestHandl } // 执行压缩 - var compressed []byte var encoding string - if useBrotli { - compressed = m.compressBrotli(body) encoding = "br" } else if useGzip { - compressed = m.compressGzip(body) encoding = compressionGZIP } - if len(compressed) > 0 && len(compressed) < bodyLen { - ctx.Response.SetBody(compressed) - ctx.Response.Header.Set("Content-Encoding", encoding) - ctx.Response.Header.Del("Content-Length") // 让 fasthttp 自动计算 + if bodyLen > streamingThreshold { + // 大响应:流式压缩,消除 compressed buffer 分配 + if useBrotli { + m.streamBrotli(ctx, encoding) + } else if useGzip { + m.streamGzip(ctx, encoding) + } + } else { + // 小响应:缓冲压缩 + var compressed []byte + + if useBrotli { + compressed = m.compressBrotli(body) + } else if useGzip { + compressed = m.compressGzip(body) + } + + if len(compressed) > 0 && len(compressed) < bodyLen { + ctx.Response.SetBody(compressed) + ctx.Response.Header.Set("Content-Encoding", encoding) + ctx.Response.Header.Del("Content-Length") + } } } } @@ -322,3 +342,63 @@ func (m *Middleware) Level() int { func (m *Middleware) MinSize() int { return m.minSize } + +// streamGzip 使用 gzip 流式压缩。 +// +// 通过 SetBodyStreamWriter 将压缩数据直接写入响应流, +// 消除 compressed buffer 分配,降低内存峰值。 +// +// 参数: +// - ctx: fasthttp 请求上下文 +// - encoding: Content-Encoding 值("gzip") +func (m *Middleware) streamGzip(ctx *fasthttp.RequestCtx, encoding string) { + ctx.Response.Header.Set("Content-Encoding", encoding) + ctx.Response.Header.Del("Content-Length") // 使用 chunked encoding + + body := ctx.Response.Body() + ctx.SetBodyStreamWriter(func(w *bufio.Writer) { + writer, ok := m.gzipPool.Get().(*gzip.Writer) + if !ok { + // pool 获取失败,直接写原始 body + _, _ = w.Write(body) + _ = w.Flush() + return + } + defer m.gzipPool.Put(writer) + + writer.Reset(w) + _, _ = writer.Write(body) + _ = writer.Close() + _ = w.Flush() + }) +} + +// streamBrotli 使用 brotli 流式压缩。 +// +// 通过 SetBodyStreamWriter 将压缩数据直接写入响应流, +// 消除 compressed buffer 分配,降低内存峰值。 +// +// 参数: +// - ctx: fasthttp 请求上下文 +// - encoding: Content-Encoding 值("br") +func (m *Middleware) streamBrotli(ctx *fasthttp.RequestCtx, encoding string) { + ctx.Response.Header.Set("Content-Encoding", encoding) + ctx.Response.Header.Del("Content-Length") // 使用 chunked encoding + + body := ctx.Response.Body() + ctx.SetBodyStreamWriter(func(w *bufio.Writer) { + writer, ok := m.brotliPool.Get().(*brotli.Writer) + if !ok { + // pool 获取失败,直接写原始 body + _, _ = w.Write(body) + _ = w.Flush() + return + } + defer m.brotliPool.Put(writer) + + writer.Reset(w) + _, _ = writer.Write(body) + _ = writer.Close() + _ = w.Flush() + }) +} diff --git a/internal/middleware/compression/compression_test.go b/internal/middleware/compression/compression_test.go index 200d909..4f623ce 100644 --- a/internal/middleware/compression/compression_test.go +++ b/internal/middleware/compression/compression_test.go @@ -12,8 +12,11 @@ package compression import ( "bytes" + "io" "testing" + "github.com/andybalholm/brotli" + "github.com/klauspost/compress/gzip" "github.com/valyala/fasthttp" "rua.plus/lolly/internal/config" ) @@ -328,3 +331,147 @@ func TestGetters(t *testing.T) { t.Errorf("Expected 1 type, got %d", len(m.Types())) } } + +func TestProcessStreamingGzip(t *testing.T) { + m, _ := New(&config.CompressionConfig{ + Type: "gzip", + Level: 6, + MinSize: 10, + Types: []string{"text/html"}, + }) + + // 创建大于 streamingThreshold 的响应 + largeResponse := bytes.Repeat([]byte("Hello World! This is streaming test data. "), 2000) // ~80KB + + nextHandler := func(ctx *fasthttp.RequestCtx) { + ctx.Response.Header.Set("Content-Type", "text/html") + _, _ = ctx.Write(largeResponse) + } + + handler := m.Process(nextHandler) + + ctx := &fasthttp.RequestCtx{} + ctx.Request.Header.Set("Accept-Encoding", "gzip") + + handler(ctx) + + encoding := ctx.Response.Header.Peek("Content-Encoding") + if string(encoding) != "gzip" { + t.Errorf("Expected Content-Encoding 'gzip', got %s", encoding) + } + + // Content-Length 应该被移除(使用 chunked encoding) + contentLength := ctx.Response.Header.Peek("Content-Length") + if string(contentLength) != "" { + t.Errorf("Expected no Content-Length for streaming, got %s", contentLength) + } + + // 读取 body 并解压验证 + body := ctx.Response.Body() + if len(body) == 0 { + t.Fatal("Expected non-empty body") + } + + // 解压验证 + gr, err := gzip.NewReader(bytes.NewReader(body)) + if err != nil { + t.Fatalf("Failed to create gzip reader: %v", err) + } + defer gr.Close() + + decompressed, err := io.ReadAll(gr) + if err != nil { + t.Fatalf("Failed to decompress: %v", err) + } + + if !bytes.Equal(decompressed, largeResponse) { + t.Errorf("Decompressed body does not match original") + } +} + +func TestProcessStreamingBrotli(t *testing.T) { + m, _ := New(&config.CompressionConfig{ + Type: "brotli", + Level: 4, + MinSize: 10, + Types: []string{"text/html"}, + }) + + // 创建大于 streamingThreshold 的响应 + largeResponse := bytes.Repeat([]byte("Hello World! This is brotli streaming test data. "), 2000) // ~100KB + + nextHandler := func(ctx *fasthttp.RequestCtx) { + ctx.Response.Header.Set("Content-Type", "text/html") + _, _ = ctx.Write(largeResponse) + } + + handler := m.Process(nextHandler) + + ctx := &fasthttp.RequestCtx{} + ctx.Request.Header.Set("Accept-Encoding", "br") + + handler(ctx) + + encoding := ctx.Response.Header.Peek("Content-Encoding") + if string(encoding) != "br" { + t.Errorf("Expected Content-Encoding 'br', got %s", encoding) + } + + // Content-Length 应该被移除 + contentLength := ctx.Response.Header.Peek("Content-Length") + if string(contentLength) != "" { + t.Errorf("Expected no Content-Length for streaming, got %s", contentLength) + } + + // 读取 body 并解压验证 + body := ctx.Response.Body() + if len(body) == 0 { + t.Fatal("Expected non-empty body") + } + + // 解压验证 + br := brotli.NewReader(bytes.NewReader(body)) + decompressed, err := io.ReadAll(br) + if err != nil { + t.Fatalf("Failed to decompress: %v", err) + } + + if !bytes.Equal(decompressed, largeResponse) { + t.Errorf("Decompressed body does not match original") + } +} + +func TestProcessSmallResponseBuffered(t *testing.T) { + m, _ := New(&config.CompressionConfig{ + Type: "gzip", + Level: 6, + MinSize: 10, + Types: []string{"text/html"}, + }) + + // 创建小于 streamingThreshold 但大于 MinSize 的响应 + smallResponse := bytes.Repeat([]byte("Hello World! "), 100) // ~1.3KB + + nextHandler := func(ctx *fasthttp.RequestCtx) { + ctx.Response.Header.Set("Content-Type", "text/html") + _, _ = ctx.Write(smallResponse) + } + + handler := m.Process(nextHandler) + + ctx := &fasthttp.RequestCtx{} + ctx.Request.Header.Set("Accept-Encoding", "gzip") + + handler(ctx) + + encoding := ctx.Response.Header.Peek("Content-Encoding") + if string(encoding) != "gzip" { + t.Errorf("Expected Content-Encoding 'gzip', got %s", encoding) + } + + // 小响应应该被压缩且 body 更小 + body := ctx.Response.Body() + if len(body) >= len(smallResponse) { + t.Errorf("Expected compressed body smaller than original, got %d >= %d", len(body), len(smallResponse)) + } +}