perf(http3,compression): 对象池化和流式压缩优化
http3: - Adapter 添加 ctxPool 复用 RequestCtx - resetContext 方法重置状态避免污染 compression: - 大响应(>64KB)使用 SetBodyStreamWriter 流式压缩 - 消除 compressed buffer 分配,降低内存峰值 - 新增 streamGzip/streamBrotli 流式方法 - 添加流式压缩测试验证功能正确性 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
6dd651af5f
commit
9cbc380de5
@ -30,6 +30,9 @@ const (
|
||||
// 由于 quic-go 使用标准库的 http.Handler 接口,
|
||||
// 而 lolly 使用 fasthttp,需要通过适配层进行转换。
|
||||
type Adapter struct {
|
||||
// ctxPool 用于复用 fasthttp.RequestCtx 对象
|
||||
ctxPool sync.Pool
|
||||
|
||||
// bufferPool 用于复用字节缓冲区(流式处理优化)
|
||||
bufferPool sync.Pool
|
||||
}
|
||||
@ -37,6 +40,11 @@ type Adapter struct {
|
||||
// NewAdapter 创建新的适配器。
|
||||
func NewAdapter() *Adapter {
|
||||
return &Adapter{
|
||||
ctxPool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &fasthttp.RequestCtx{}
|
||||
},
|
||||
},
|
||||
bufferPool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
buf := make([]byte, 4096) // 4KB 初始缓冲区
|
||||
@ -58,11 +66,16 @@ func NewAdapter() *Adapter {
|
||||
// - http.Handler: 标准库兼容的 HTTP 处理器
|
||||
func (a *Adapter) Wrap(handler fasthttp.RequestHandler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// 直接创建 RequestCtx
|
||||
ctx := &fasthttp.RequestCtx{}
|
||||
// 从池中获取 RequestCtx
|
||||
ctx, ok := a.ctxPool.Get().(*fasthttp.RequestCtx)
|
||||
if !ok {
|
||||
// 如果类型断言失败,创建新的上下文(不应该发生,但为了安全)
|
||||
ctx = &fasthttp.RequestCtx{}
|
||||
}
|
||||
defer a.ctxPool.Put(ctx)
|
||||
|
||||
// 初始化 ctx(fasthttp 的 RequestCtx 需要 Init 方法)
|
||||
ctx.Init(&fasthttp.Request{}, nil, nil)
|
||||
// 重置 ctx 状态以避免污染
|
||||
a.resetContext(ctx)
|
||||
|
||||
// 转换请求
|
||||
a.convertRequest(r, ctx)
|
||||
@ -78,6 +91,18 @@ func (a *Adapter) Wrap(handler fasthttp.RequestHandler) http.Handler {
|
||||
})
|
||||
}
|
||||
|
||||
// resetContext 重置 fasthttp.RequestCtx 状态。
|
||||
//
|
||||
// 参数:
|
||||
// - ctx: 需要重置的上下文
|
||||
func (a *Adapter) resetContext(ctx *fasthttp.RequestCtx) {
|
||||
// 清空请求头
|
||||
ctx.Request.Header.DisableNormalizing()
|
||||
ctx.Request.Reset()
|
||||
ctx.Response.Reset()
|
||||
ctx.SetUserValueBytes(nil, nil)
|
||||
}
|
||||
|
||||
// convertRequest 将 net/http.Request 转换为 fasthttp.RequestCtx。
|
||||
//
|
||||
// 参数:
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package compression
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -28,6 +29,11 @@ import (
|
||||
"rua.plus/lolly/internal/config"
|
||||
)
|
||||
|
||||
// streamingThreshold 流式压缩阈值。
|
||||
// 响应体超过此大小时使用 SetBodyStreamWriter 流式压缩,
|
||||
// 消除 compressed buffer 分配,降低内存峰值。
|
||||
const streamingThreshold = 64 * 1024 // 64KB
|
||||
|
||||
// Algorithm 压缩算法类型。
|
||||
type Algorithm int
|
||||
|
||||
@ -202,21 +208,35 @@ func (m *Middleware) Process(next fasthttp.RequestHandler) fasthttp.RequestHandl
|
||||
}
|
||||
|
||||
// 执行压缩
|
||||
var compressed []byte
|
||||
var encoding string
|
||||
|
||||
if useBrotli {
|
||||
compressed = m.compressBrotli(body)
|
||||
encoding = "br"
|
||||
} else if useGzip {
|
||||
compressed = m.compressGzip(body)
|
||||
encoding = compressionGZIP
|
||||
}
|
||||
|
||||
if len(compressed) > 0 && len(compressed) < bodyLen {
|
||||
ctx.Response.SetBody(compressed)
|
||||
ctx.Response.Header.Set("Content-Encoding", encoding)
|
||||
ctx.Response.Header.Del("Content-Length") // 让 fasthttp 自动计算
|
||||
if bodyLen > streamingThreshold {
|
||||
// 大响应:流式压缩,消除 compressed buffer 分配
|
||||
if useBrotli {
|
||||
m.streamBrotli(ctx, encoding)
|
||||
} else if useGzip {
|
||||
m.streamGzip(ctx, encoding)
|
||||
}
|
||||
} else {
|
||||
// 小响应:缓冲压缩
|
||||
var compressed []byte
|
||||
|
||||
if useBrotli {
|
||||
compressed = m.compressBrotli(body)
|
||||
} else if useGzip {
|
||||
compressed = m.compressGzip(body)
|
||||
}
|
||||
|
||||
if len(compressed) > 0 && len(compressed) < bodyLen {
|
||||
ctx.Response.SetBody(compressed)
|
||||
ctx.Response.Header.Set("Content-Encoding", encoding)
|
||||
ctx.Response.Header.Del("Content-Length")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -322,3 +342,63 @@ func (m *Middleware) Level() int {
|
||||
func (m *Middleware) MinSize() int {
|
||||
return m.minSize
|
||||
}
|
||||
|
||||
// streamGzip 使用 gzip 流式压缩。
|
||||
//
|
||||
// 通过 SetBodyStreamWriter 将压缩数据直接写入响应流,
|
||||
// 消除 compressed buffer 分配,降低内存峰值。
|
||||
//
|
||||
// 参数:
|
||||
// - ctx: fasthttp 请求上下文
|
||||
// - encoding: Content-Encoding 值("gzip")
|
||||
func (m *Middleware) streamGzip(ctx *fasthttp.RequestCtx, encoding string) {
|
||||
ctx.Response.Header.Set("Content-Encoding", encoding)
|
||||
ctx.Response.Header.Del("Content-Length") // 使用 chunked encoding
|
||||
|
||||
body := ctx.Response.Body()
|
||||
ctx.SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||
writer, ok := m.gzipPool.Get().(*gzip.Writer)
|
||||
if !ok {
|
||||
// pool 获取失败,直接写原始 body
|
||||
_, _ = w.Write(body)
|
||||
_ = w.Flush()
|
||||
return
|
||||
}
|
||||
defer m.gzipPool.Put(writer)
|
||||
|
||||
writer.Reset(w)
|
||||
_, _ = writer.Write(body)
|
||||
_ = writer.Close()
|
||||
_ = w.Flush()
|
||||
})
|
||||
}
|
||||
|
||||
// streamBrotli 使用 brotli 流式压缩。
|
||||
//
|
||||
// 通过 SetBodyStreamWriter 将压缩数据直接写入响应流,
|
||||
// 消除 compressed buffer 分配,降低内存峰值。
|
||||
//
|
||||
// 参数:
|
||||
// - ctx: fasthttp 请求上下文
|
||||
// - encoding: Content-Encoding 值("br")
|
||||
func (m *Middleware) streamBrotli(ctx *fasthttp.RequestCtx, encoding string) {
|
||||
ctx.Response.Header.Set("Content-Encoding", encoding)
|
||||
ctx.Response.Header.Del("Content-Length") // 使用 chunked encoding
|
||||
|
||||
body := ctx.Response.Body()
|
||||
ctx.SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||
writer, ok := m.brotliPool.Get().(*brotli.Writer)
|
||||
if !ok {
|
||||
// pool 获取失败,直接写原始 body
|
||||
_, _ = w.Write(body)
|
||||
_ = w.Flush()
|
||||
return
|
||||
}
|
||||
defer m.brotliPool.Put(writer)
|
||||
|
||||
writer.Reset(w)
|
||||
_, _ = writer.Write(body)
|
||||
_ = writer.Close()
|
||||
_ = w.Flush()
|
||||
})
|
||||
}
|
||||
|
||||
@ -12,8 +12,11 @@ package compression
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/andybalholm/brotli"
|
||||
"github.com/klauspost/compress/gzip"
|
||||
"github.com/valyala/fasthttp"
|
||||
"rua.plus/lolly/internal/config"
|
||||
)
|
||||
@ -328,3 +331,147 @@ func TestGetters(t *testing.T) {
|
||||
t.Errorf("Expected 1 type, got %d", len(m.Types()))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessStreamingGzip(t *testing.T) {
|
||||
m, _ := New(&config.CompressionConfig{
|
||||
Type: "gzip",
|
||||
Level: 6,
|
||||
MinSize: 10,
|
||||
Types: []string{"text/html"},
|
||||
})
|
||||
|
||||
// 创建大于 streamingThreshold 的响应
|
||||
largeResponse := bytes.Repeat([]byte("Hello World! This is streaming test data. "), 2000) // ~80KB
|
||||
|
||||
nextHandler := func(ctx *fasthttp.RequestCtx) {
|
||||
ctx.Response.Header.Set("Content-Type", "text/html")
|
||||
_, _ = ctx.Write(largeResponse)
|
||||
}
|
||||
|
||||
handler := m.Process(nextHandler)
|
||||
|
||||
ctx := &fasthttp.RequestCtx{}
|
||||
ctx.Request.Header.Set("Accept-Encoding", "gzip")
|
||||
|
||||
handler(ctx)
|
||||
|
||||
encoding := ctx.Response.Header.Peek("Content-Encoding")
|
||||
if string(encoding) != "gzip" {
|
||||
t.Errorf("Expected Content-Encoding 'gzip', got %s", encoding)
|
||||
}
|
||||
|
||||
// Content-Length 应该被移除(使用 chunked encoding)
|
||||
contentLength := ctx.Response.Header.Peek("Content-Length")
|
||||
if string(contentLength) != "" {
|
||||
t.Errorf("Expected no Content-Length for streaming, got %s", contentLength)
|
||||
}
|
||||
|
||||
// 读取 body 并解压验证
|
||||
body := ctx.Response.Body()
|
||||
if len(body) == 0 {
|
||||
t.Fatal("Expected non-empty body")
|
||||
}
|
||||
|
||||
// 解压验证
|
||||
gr, err := gzip.NewReader(bytes.NewReader(body))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create gzip reader: %v", err)
|
||||
}
|
||||
defer gr.Close()
|
||||
|
||||
decompressed, err := io.ReadAll(gr)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to decompress: %v", err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(decompressed, largeResponse) {
|
||||
t.Errorf("Decompressed body does not match original")
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessStreamingBrotli(t *testing.T) {
|
||||
m, _ := New(&config.CompressionConfig{
|
||||
Type: "brotli",
|
||||
Level: 4,
|
||||
MinSize: 10,
|
||||
Types: []string{"text/html"},
|
||||
})
|
||||
|
||||
// 创建大于 streamingThreshold 的响应
|
||||
largeResponse := bytes.Repeat([]byte("Hello World! This is brotli streaming test data. "), 2000) // ~100KB
|
||||
|
||||
nextHandler := func(ctx *fasthttp.RequestCtx) {
|
||||
ctx.Response.Header.Set("Content-Type", "text/html")
|
||||
_, _ = ctx.Write(largeResponse)
|
||||
}
|
||||
|
||||
handler := m.Process(nextHandler)
|
||||
|
||||
ctx := &fasthttp.RequestCtx{}
|
||||
ctx.Request.Header.Set("Accept-Encoding", "br")
|
||||
|
||||
handler(ctx)
|
||||
|
||||
encoding := ctx.Response.Header.Peek("Content-Encoding")
|
||||
if string(encoding) != "br" {
|
||||
t.Errorf("Expected Content-Encoding 'br', got %s", encoding)
|
||||
}
|
||||
|
||||
// Content-Length 应该被移除
|
||||
contentLength := ctx.Response.Header.Peek("Content-Length")
|
||||
if string(contentLength) != "" {
|
||||
t.Errorf("Expected no Content-Length for streaming, got %s", contentLength)
|
||||
}
|
||||
|
||||
// 读取 body 并解压验证
|
||||
body := ctx.Response.Body()
|
||||
if len(body) == 0 {
|
||||
t.Fatal("Expected non-empty body")
|
||||
}
|
||||
|
||||
// 解压验证
|
||||
br := brotli.NewReader(bytes.NewReader(body))
|
||||
decompressed, err := io.ReadAll(br)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to decompress: %v", err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(decompressed, largeResponse) {
|
||||
t.Errorf("Decompressed body does not match original")
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessSmallResponseBuffered(t *testing.T) {
|
||||
m, _ := New(&config.CompressionConfig{
|
||||
Type: "gzip",
|
||||
Level: 6,
|
||||
MinSize: 10,
|
||||
Types: []string{"text/html"},
|
||||
})
|
||||
|
||||
// 创建小于 streamingThreshold 但大于 MinSize 的响应
|
||||
smallResponse := bytes.Repeat([]byte("Hello World! "), 100) // ~1.3KB
|
||||
|
||||
nextHandler := func(ctx *fasthttp.RequestCtx) {
|
||||
ctx.Response.Header.Set("Content-Type", "text/html")
|
||||
_, _ = ctx.Write(smallResponse)
|
||||
}
|
||||
|
||||
handler := m.Process(nextHandler)
|
||||
|
||||
ctx := &fasthttp.RequestCtx{}
|
||||
ctx.Request.Header.Set("Accept-Encoding", "gzip")
|
||||
|
||||
handler(ctx)
|
||||
|
||||
encoding := ctx.Response.Header.Peek("Content-Encoding")
|
||||
if string(encoding) != "gzip" {
|
||||
t.Errorf("Expected Content-Encoding 'gzip', got %s", encoding)
|
||||
}
|
||||
|
||||
// 小响应应该被压缩且 body 更小
|
||||
body := ctx.Response.Body()
|
||||
if len(body) >= len(smallResponse) {
|
||||
t.Errorf("Expected compressed body smaller than original, got %d >= %d", len(body), len(smallResponse))
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user