feat(variable,proxy): 新增上游变量支持,集成到代理请求处理
- 新增 upstream_addr、upstream_status、upstream_response_time 等变量 - 新增 UpstreamTiming 结构体捕获连接、首字节、响应时间 - Proxy.ServeHTTP 集成变量上下文,记录上游时间 - 新增测试覆盖上游变量和计时功能 Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
b72394eb3b
commit
61455412eb
@ -211,6 +211,80 @@ func createHostClient(targetURL string, timeout config.ProxyTimeout, transportCf
|
|||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpstreamTiming 上游时间记录,用于捕获各种时间戳
|
||||||
|
type UpstreamTiming struct {
|
||||||
|
start time.Time
|
||||||
|
connectStart time.Time
|
||||||
|
connectEnd time.Time
|
||||||
|
headerReceived time.Time
|
||||||
|
responseEnd time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUpstreamTiming 创建新的上游时间记录器
|
||||||
|
func NewUpstreamTiming() *UpstreamTiming {
|
||||||
|
return &UpstreamTiming{
|
||||||
|
start: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarkConnectStart 标记连接开始
|
||||||
|
func (t *UpstreamTiming) MarkConnectStart() {
|
||||||
|
t.connectStart = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarkConnectEnd 标记连接完成
|
||||||
|
func (t *UpstreamTiming) MarkConnectEnd() {
|
||||||
|
t.connectEnd = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarkHeaderReceived 标记接收到响应头
|
||||||
|
func (t *UpstreamTiming) MarkHeaderReceived() {
|
||||||
|
t.headerReceived = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarkResponseEnd 标记响应完成
|
||||||
|
func (t *UpstreamTiming) MarkResponseEnd() {
|
||||||
|
t.responseEnd = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConnectTime 获取连接时间(秒)
|
||||||
|
func (t *UpstreamTiming) GetConnectTime() float64 {
|
||||||
|
if t.connectStart.IsZero() || t.connectEnd.IsZero() {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return t.connectEnd.Sub(t.connectStart).Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetHeaderTime 获取首字节时间(秒)
|
||||||
|
func (t *UpstreamTiming) GetHeaderTime() float64 {
|
||||||
|
if t.connectEnd.IsZero() || t.headerReceived.IsZero() {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return t.headerReceived.Sub(t.connectEnd).Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetResponseTime 获取响应时间(秒)
|
||||||
|
func (t *UpstreamTiming) GetResponseTime() float64 {
|
||||||
|
if t.connectEnd.IsZero() || t.responseEnd.IsZero() {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return t.responseEnd.Sub(t.connectEnd).Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
// FinalizeUpstreamVars 在请求处理结束时设置上游变量到 VariableContext
|
||||||
|
// 这个函数应该在 ServeHTTP 的 defer 中调用
|
||||||
|
func FinalizeUpstreamVars(vc *variable.VariableContext, upstreamAddr string, upstreamStatus int, timing *UpstreamTiming) {
|
||||||
|
if vc == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
connectTime := timing.GetConnectTime()
|
||||||
|
headerTime := timing.GetHeaderTime()
|
||||||
|
responseTime := timing.GetResponseTime()
|
||||||
|
|
||||||
|
vc.SetUpstreamVars(upstreamAddr, upstreamStatus, responseTime, connectTime, headerTime)
|
||||||
|
}
|
||||||
|
|
||||||
// ServeHTTP 通过将传入的 HTTP 请求转发到选定的后端目标来处理请求。
|
// ServeHTTP 通过将传入的 HTTP 请求转发到选定的后端目标来处理请求。
|
||||||
// 实现了 fasthttp 请求处理器接口。
|
// 实现了 fasthttp 请求处理器接口。
|
||||||
//
|
//
|
||||||
@ -223,6 +297,24 @@ func createHostClient(targetURL string, timeout config.ProxyTimeout, transportCf
|
|||||||
// 如果没有可用的健康目标,返回 502 Bad Gateway。
|
// 如果没有可用的健康目标,返回 502 Bad Gateway。
|
||||||
// 如果后端请求失败,根据 next_upstream 配置尝试下一个目标。
|
// 如果后端请求失败,根据 next_upstream 配置尝试下一个目标。
|
||||||
func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
||||||
|
// 上游变量捕获
|
||||||
|
var upstreamAddr string
|
||||||
|
var upstreamStatus int
|
||||||
|
timing := NewUpstreamTiming()
|
||||||
|
|
||||||
|
// 创建变量上下文用于设置上游变量
|
||||||
|
vc := variable.NewVariableContext(ctx)
|
||||||
|
defer func() {
|
||||||
|
// 确保记录了响应结束时间
|
||||||
|
if timing.responseEnd.IsZero() {
|
||||||
|
timing.MarkResponseEnd()
|
||||||
|
}
|
||||||
|
// 设置上游变量
|
||||||
|
FinalizeUpstreamVars(vc, upstreamAddr, upstreamStatus, timing)
|
||||||
|
// 释放变量上下文
|
||||||
|
variable.ReleaseVariableContext(vc)
|
||||||
|
}()
|
||||||
|
|
||||||
// 故障转移配置
|
// 故障转移配置
|
||||||
maxTries := p.config.NextUpstream.Tries
|
maxTries := p.config.NextUpstream.Tries
|
||||||
if maxTries <= 0 {
|
if maxTries <= 0 {
|
||||||
@ -250,6 +342,9 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
|||||||
|
|
||||||
if target == nil {
|
if target == nil {
|
||||||
if attempt == 0 {
|
if attempt == 0 {
|
||||||
|
// 没有可用后端
|
||||||
|
upstreamAddr = "FAILED"
|
||||||
|
upstreamStatus = 502
|
||||||
ctx.Error("Bad Gateway: no healthy upstream", fasthttp.StatusBadGateway)
|
ctx.Error("Bad Gateway: no healthy upstream", fasthttp.StatusBadGateway)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -272,11 +367,23 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
|||||||
// 增加连接计数(用于最少连接数负载均衡)
|
// 增加连接计数(用于最少连接数负载均衡)
|
||||||
loadbalance.IncrementConnections(target)
|
loadbalance.IncrementConnections(target)
|
||||||
|
|
||||||
|
// 设置上游地址
|
||||||
|
upstreamAddr = target.URL
|
||||||
|
|
||||||
// 检查是否为 WebSocket 升级请求
|
// 检查是否为 WebSocket 升级请求
|
||||||
if isWebSocketRequest(ctx) {
|
if isWebSocketRequest(ctx) {
|
||||||
// WebSocket 使用 defer 确保连接计数释放
|
// WebSocket 使用 defer 确保连接计数释放
|
||||||
defer loadbalance.DecrementConnections(target)
|
defer loadbalance.DecrementConnections(target)
|
||||||
p.handleWebSocket(ctx, target, client)
|
timing.MarkConnectStart()
|
||||||
|
err := ProxyWebSocket(ctx, target, p.config.Timeout.Connect)
|
||||||
|
timing.MarkConnectEnd()
|
||||||
|
if err != nil {
|
||||||
|
upstreamStatus = 502
|
||||||
|
logging.Error().Msgf("WebSocket proxy error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// WebSocket 成功
|
||||||
|
upstreamStatus = 101
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -294,11 +401,15 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
|||||||
loadbalance.DecrementConnections(target)
|
loadbalance.DecrementConnections(target)
|
||||||
if !stale {
|
if !stale {
|
||||||
// 新鲜缓存,直接返回
|
// 新鲜缓存,直接返回
|
||||||
|
upstreamAddr = "CACHE"
|
||||||
|
upstreamStatus = entry.Status
|
||||||
p.writeCachedResponse(ctx, entry)
|
p.writeCachedResponse(ctx, entry)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// 过期缓存,尝试后台刷新,同时返回旧数据
|
// 过期缓存,尝试后台刷新,同时返回旧数据
|
||||||
go p.backgroundRefresh(ctx, target, cacheKey)
|
go p.backgroundRefresh(ctx, target, cacheKey)
|
||||||
|
upstreamAddr = "CACHE"
|
||||||
|
upstreamStatus = entry.Status
|
||||||
p.writeCachedResponse(ctx, entry)
|
p.writeCachedResponse(ctx, entry)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -310,6 +421,8 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
|||||||
<-done
|
<-done
|
||||||
// 重新尝试获取缓存
|
// 重新尝试获取缓存
|
||||||
if entry, ok, _ := p.cache.Get(cacheKey); ok {
|
if entry, ok, _ := p.cache.Get(cacheKey); ok {
|
||||||
|
upstreamAddr = "CACHE"
|
||||||
|
upstreamStatus = entry.Status
|
||||||
p.writeCachedResponse(ctx, entry)
|
p.writeCachedResponse(ctx, entry)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -319,7 +432,9 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 执行代理请求
|
// 执行代理请求
|
||||||
|
timing.MarkConnectStart()
|
||||||
err := client.Do(req, &ctx.Response)
|
err := client.Do(req, &ctx.Response)
|
||||||
|
timing.MarkConnectEnd()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loadbalance.DecrementConnections(target)
|
loadbalance.DecrementConnections(target)
|
||||||
@ -334,16 +449,28 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
|||||||
p.cache.ReleaseLock(p.buildCacheKey(ctx), err)
|
p.cache.ReleaseLock(p.buildCacheKey(ctx), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 设置失败状态
|
||||||
|
if errors.Is(err, fasthttp.ErrTimeout) {
|
||||||
|
upstreamStatus = 504
|
||||||
|
} else {
|
||||||
|
upstreamStatus = 502
|
||||||
|
}
|
||||||
|
|
||||||
lastErr = err
|
lastErr = err
|
||||||
// 继续尝试下一个目标
|
// 继续尝试下一个目标
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 记录首字节时间
|
||||||
|
timing.MarkHeaderReceived()
|
||||||
|
|
||||||
// 请求成功,减少连接计数
|
// 请求成功,减少连接计数
|
||||||
loadbalance.DecrementConnections(target)
|
loadbalance.DecrementConnections(target)
|
||||||
|
|
||||||
// 检查响应状态码是否需要重试
|
// 检查响应状态码是否需要重试
|
||||||
statusCode := ctx.Response.StatusCode()
|
statusCode := ctx.Response.StatusCode()
|
||||||
|
upstreamStatus = statusCode
|
||||||
|
|
||||||
shouldRetry := false
|
shouldRetry := false
|
||||||
for _, code := range httpCodes {
|
for _, code := range httpCodes {
|
||||||
if statusCode == code {
|
if statusCode == code {
|
||||||
@ -396,13 +523,18 @@ func (p *Proxy) ServeHTTP(ctx *fasthttp.RequestCtx) {
|
|||||||
if lastErr != nil {
|
if lastErr != nil {
|
||||||
// 处理不同类型的错误
|
// 处理不同类型的错误
|
||||||
if errors.Is(lastErr, fasthttp.ErrTimeout) {
|
if errors.Is(lastErr, fasthttp.ErrTimeout) {
|
||||||
|
upstreamStatus = 504
|
||||||
ctx.Error("Gateway Timeout", fasthttp.StatusGatewayTimeout)
|
ctx.Error("Gateway Timeout", fasthttp.StatusGatewayTimeout)
|
||||||
} else if errors.Is(lastErr, fasthttp.ErrConnectionClosed) {
|
} else if errors.Is(lastErr, fasthttp.ErrConnectionClosed) {
|
||||||
|
upstreamStatus = 502
|
||||||
ctx.Error("Bad Gateway: upstream connection closed", fasthttp.StatusBadGateway)
|
ctx.Error("Bad Gateway: upstream connection closed", fasthttp.StatusBadGateway)
|
||||||
} else {
|
} else {
|
||||||
|
upstreamStatus = 502
|
||||||
ctx.Error("Bad Gateway", fasthttp.StatusBadGateway)
|
ctx.Error("Bad Gateway", fasthttp.StatusBadGateway)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
upstreamAddr = "FAILED"
|
||||||
|
upstreamStatus = 502
|
||||||
ctx.Error("Bad Gateway: all upstreams failed", fasthttp.StatusBadGateway)
|
ctx.Error("Bad Gateway: all upstreams failed", fasthttp.StatusBadGateway)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -572,8 +704,9 @@ func isWebSocketRequest(ctx *fasthttp.RequestCtx) bool {
|
|||||||
return strings.EqualFold(string(upgrade), "websocket")
|
return strings.EqualFold(string(upgrade), "websocket")
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleWebSocket 处理 WebSocket 升级请求。
|
// handleWebSocket 处理 WebSocket 升级请求(保留用于兼容性,实际逻辑在 ServeHTTP 中)
|
||||||
func (p *Proxy) handleWebSocket(ctx *fasthttp.RequestCtx, target *loadbalance.Target, client *fasthttp.HostClient) {
|
// nolint:unused // 保留用于未来 WebSocket 功能扩展
|
||||||
|
func (p *Proxy) handleWebSocket(ctx *fasthttp.RequestCtx, target *loadbalance.Target, _ *fasthttp.HostClient) {
|
||||||
timeout := p.config.Timeout.Connect
|
timeout := p.config.Timeout.Connect
|
||||||
if timeout == 0 {
|
if timeout == 0 {
|
||||||
timeout = 30 * time.Second
|
timeout = 30 * time.Second
|
||||||
|
|||||||
@ -19,6 +19,7 @@
|
|||||||
package proxy
|
package proxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -28,6 +29,7 @@ import (
|
|||||||
"rua.plus/lolly/internal/config"
|
"rua.plus/lolly/internal/config"
|
||||||
"rua.plus/lolly/internal/loadbalance"
|
"rua.plus/lolly/internal/loadbalance"
|
||||||
"rua.plus/lolly/internal/netutil"
|
"rua.plus/lolly/internal/netutil"
|
||||||
|
"rua.plus/lolly/internal/variable"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestNewProxy 测试 NewProxy 函数
|
// TestNewProxy 测试 NewProxy 函数
|
||||||
@ -1130,3 +1132,207 @@ func containsAt(s, substr string, start int) bool {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestUpstreamVariablesCapture 测试上游变量捕获
|
||||||
|
func TestUpstreamVariablesCapture(t *testing.T) {
|
||||||
|
// 创建后端服务器
|
||||||
|
backend := &fasthttp.Server{
|
||||||
|
Handler: func(ctx *fasthttp.RequestCtx) {
|
||||||
|
ctx.SetStatusCode(200)
|
||||||
|
ctx.SetBodyString("OK")
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 在随机端口启动后端
|
||||||
|
backendLn, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create listener: %v", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = backendLn.Close() }()
|
||||||
|
|
||||||
|
go func() { _ = backend.Serve(backendLn) }()
|
||||||
|
|
||||||
|
// 等待后端启动
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
|
backendAddr := "http://" + backendLn.Addr().String()
|
||||||
|
|
||||||
|
// 创建代理
|
||||||
|
targets := []*loadbalance.Target{
|
||||||
|
{URL: backendAddr, Weight: 1},
|
||||||
|
}
|
||||||
|
targets[0].Healthy.Store(true)
|
||||||
|
|
||||||
|
cfg := &config.ProxyConfig{
|
||||||
|
Path: "/",
|
||||||
|
LoadBalance: "round_robin",
|
||||||
|
Timeout: config.ProxyTimeout{
|
||||||
|
Connect: 5 * time.Second,
|
||||||
|
Read: 30 * time.Second,
|
||||||
|
Write: 30 * time.Second,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
p, err := NewProxy(cfg, targets, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create proxy: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建请求
|
||||||
|
ctx := &fasthttp.RequestCtx{}
|
||||||
|
ctx.Request.Header.SetMethod("GET")
|
||||||
|
ctx.Request.Header.SetRequestURI("/test")
|
||||||
|
ctx.Request.Header.SetHost("example.com")
|
||||||
|
|
||||||
|
// 执行代理请求
|
||||||
|
p.ServeHTTP(ctx)
|
||||||
|
|
||||||
|
// 验证响应
|
||||||
|
if ctx.Response.StatusCode() != 200 {
|
||||||
|
t.Errorf("expected status 200, got %d", ctx.Response.StatusCode())
|
||||||
|
}
|
||||||
|
|
||||||
|
// 测试 UpstreamTiming
|
||||||
|
timing := NewUpstreamTiming()
|
||||||
|
if timing == nil {
|
||||||
|
t.Error("NewUpstreamTiming() returned nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 测试时间标记
|
||||||
|
timing.MarkConnectStart()
|
||||||
|
timing.MarkConnectEnd()
|
||||||
|
timing.MarkHeaderReceived()
|
||||||
|
timing.MarkResponseEnd()
|
||||||
|
|
||||||
|
// 验证时间计算
|
||||||
|
if timing.GetConnectTime() < 0 {
|
||||||
|
t.Error("GetConnectTime() should be >= 0")
|
||||||
|
}
|
||||||
|
if timing.GetHeaderTime() < 0 {
|
||||||
|
t.Error("GetHeaderTime() should be >= 0")
|
||||||
|
}
|
||||||
|
if timing.GetResponseTime() < 0 {
|
||||||
|
t.Error("GetResponseTime() should be >= 0")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestUpstreamVariablesErrorPaths 测试上游变量错误路径
|
||||||
|
func TestUpstreamVariablesErrorPaths(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
backendAddr string
|
||||||
|
expectedAddr string
|
||||||
|
expectedCode int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no healthy backend",
|
||||||
|
backendAddr: "",
|
||||||
|
expectedAddr: "FAILED",
|
||||||
|
expectedCode: 502,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
var targets []*loadbalance.Target
|
||||||
|
if tt.backendAddr != "" {
|
||||||
|
targets = []*loadbalance.Target{
|
||||||
|
{URL: tt.backendAddr, Weight: 1},
|
||||||
|
}
|
||||||
|
targets[0].Healthy.Store(true)
|
||||||
|
} else {
|
||||||
|
// 创建一个不健康目标
|
||||||
|
targets = []*loadbalance.Target{
|
||||||
|
{URL: "http://127.0.0.1:1", Weight: 1},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := &config.ProxyConfig{
|
||||||
|
Path: "/",
|
||||||
|
LoadBalance: "round_robin",
|
||||||
|
Timeout: config.ProxyTimeout{
|
||||||
|
Connect: 1 * time.Millisecond, // 超短超时
|
||||||
|
Read: 1 * time.Millisecond,
|
||||||
|
Write: 1 * time.Millisecond,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
p, err := NewProxy(cfg, targets, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create proxy: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := &fasthttp.RequestCtx{}
|
||||||
|
ctx.Request.Header.SetMethod("GET")
|
||||||
|
ctx.Request.Header.SetRequestURI("/test")
|
||||||
|
ctx.Request.Header.SetHost("example.com")
|
||||||
|
|
||||||
|
p.ServeHTTP(ctx)
|
||||||
|
|
||||||
|
// 验证错误状态码
|
||||||
|
if ctx.Response.StatusCode() != tt.expectedCode &&
|
||||||
|
ctx.Response.StatusCode() != 502 &&
|
||||||
|
ctx.Response.StatusCode() != 504 {
|
||||||
|
t.Errorf("expected status %d or 502/504, got %d", tt.expectedCode, ctx.Response.StatusCode())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestFinalizeUpstreamVars 测试 FinalizeUpstreamVars 函数
|
||||||
|
func TestFinalizeUpstreamVars(t *testing.T) {
|
||||||
|
ctx := &fasthttp.RequestCtx{}
|
||||||
|
ctx.Request.Header.SetMethod("GET")
|
||||||
|
ctx.Request.Header.SetRequestURI("/test")
|
||||||
|
|
||||||
|
vc := variable.NewVariableContext(ctx)
|
||||||
|
defer variable.ReleaseVariableContext(vc)
|
||||||
|
|
||||||
|
timing := NewUpstreamTiming()
|
||||||
|
timing.MarkConnectStart()
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
timing.MarkConnectEnd()
|
||||||
|
timing.MarkHeaderReceived()
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
timing.MarkResponseEnd()
|
||||||
|
|
||||||
|
// 测试 FinalizeUpstreamVars
|
||||||
|
FinalizeUpstreamVars(vc, "http://backend:8080", 200, timing)
|
||||||
|
|
||||||
|
// 验证变量已设置
|
||||||
|
addr, ok := vc.Get("upstream_addr")
|
||||||
|
if !ok || addr != "http://backend:8080" {
|
||||||
|
t.Errorf("upstream_addr = %q, want 'http://backend:8080'", addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
status, ok := vc.Get("upstream_status")
|
||||||
|
if !ok || status != "200" {
|
||||||
|
t.Errorf("upstream_status = %q, want '200'", status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 测试 nil vc
|
||||||
|
FinalizeUpstreamVars(nil, "http://backend:8080", 200, timing)
|
||||||
|
// 不应该 panic
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestUpstreamTimingZero 测试 UpstreamTiming 零值处理
|
||||||
|
func TestUpstreamTimingZero(t *testing.T) {
|
||||||
|
timing := NewUpstreamTiming()
|
||||||
|
|
||||||
|
// 未标记时应该返回 0
|
||||||
|
if timing.GetConnectTime() != 0 {
|
||||||
|
t.Errorf("GetConnectTime() = %v, want 0", timing.GetConnectTime())
|
||||||
|
}
|
||||||
|
if timing.GetHeaderTime() != 0 {
|
||||||
|
t.Errorf("GetHeaderTime() = %v, want 0", timing.GetHeaderTime())
|
||||||
|
}
|
||||||
|
if timing.GetResponseTime() != 0 {
|
||||||
|
t.Errorf("GetResponseTime() = %v, want 0", timing.GetResponseTime())
|
||||||
|
}
|
||||||
|
|
||||||
|
// 只标记开始
|
||||||
|
timing.MarkConnectStart()
|
||||||
|
if timing.GetConnectTime() != 0 {
|
||||||
|
t.Errorf("GetConnectTime() after MarkConnectStart = %v, want 0", timing.GetConnectTime())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -31,6 +31,12 @@ const (
|
|||||||
VarTimeLocal = "time_local"
|
VarTimeLocal = "time_local"
|
||||||
VarTimeISO8601 = "time_iso8601"
|
VarTimeISO8601 = "time_iso8601"
|
||||||
VarRequestID = "request_id"
|
VarRequestID = "request_id"
|
||||||
|
// 上游变量
|
||||||
|
VarUpstreamAddr = "upstream_addr"
|
||||||
|
VarUpstreamStatus = "upstream_status"
|
||||||
|
VarUpstreamResponseTime = "upstream_response_time"
|
||||||
|
VarUpstreamConnectTime = "upstream_connect_time"
|
||||||
|
VarUpstreamHeaderTime = "upstream_header_time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// init 注册所有内置变量
|
// init 注册所有内置变量
|
||||||
|
|||||||
@ -49,6 +49,12 @@ type VariableContext struct {
|
|||||||
bodySize int64 // 响应体大小(由外部设置)
|
bodySize int64 // 响应体大小(由外部设置)
|
||||||
duration int64 // 请求处理时间纳秒(由外部设置)
|
duration int64 // 请求处理时间纳秒(由外部设置)
|
||||||
serverName string // 服务器名称
|
serverName string // 服务器名称
|
||||||
|
// 上游变量
|
||||||
|
upstreamAddr string // 上游服务器地址
|
||||||
|
upstreamStatus int // 上游响应状态码
|
||||||
|
upstreamResponseTime float64 // 上游响应时间(秒)
|
||||||
|
upstreamConnectTime float64 // 上游连接时间(秒)
|
||||||
|
upstreamHeaderTime float64 // 上游首字节时间(秒)
|
||||||
}
|
}
|
||||||
|
|
||||||
// pool 用于复用 VariableContext
|
// pool 用于复用 VariableContext
|
||||||
@ -82,6 +88,11 @@ func NewVariableContext(ctx *fasthttp.RequestCtx) *VariableContext {
|
|||||||
vc.bodySize = 0
|
vc.bodySize = 0
|
||||||
vc.duration = 0
|
vc.duration = 0
|
||||||
vc.serverName = ""
|
vc.serverName = ""
|
||||||
|
vc.upstreamAddr = ""
|
||||||
|
vc.upstreamStatus = 0
|
||||||
|
vc.upstreamResponseTime = 0
|
||||||
|
vc.upstreamConnectTime = 0
|
||||||
|
vc.upstreamHeaderTime = 0
|
||||||
// 清空缓存
|
// 清空缓存
|
||||||
for k := range vc.cache {
|
for k := range vc.cache {
|
||||||
delete(vc.cache, k)
|
delete(vc.cache, k)
|
||||||
@ -103,6 +114,11 @@ func ReleaseVariableContext(vc *VariableContext) {
|
|||||||
vc.bodySize = 0
|
vc.bodySize = 0
|
||||||
vc.duration = 0
|
vc.duration = 0
|
||||||
vc.serverName = ""
|
vc.serverName = ""
|
||||||
|
vc.upstreamAddr = ""
|
||||||
|
vc.upstreamStatus = 0
|
||||||
|
vc.upstreamResponseTime = 0
|
||||||
|
vc.upstreamConnectTime = 0
|
||||||
|
vc.upstreamHeaderTime = 0
|
||||||
pool.Put(vc)
|
pool.Put(vc)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,6 +134,15 @@ func (vc *VariableContext) SetServerName(name string) {
|
|||||||
vc.serverName = name
|
vc.serverName = name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetUpstreamVars 设置上游变量
|
||||||
|
func (vc *VariableContext) SetUpstreamVars(addr string, status int, responseTime, connectTime, headerTime float64) {
|
||||||
|
vc.upstreamAddr = addr
|
||||||
|
vc.upstreamStatus = status
|
||||||
|
vc.upstreamResponseTime = responseTime
|
||||||
|
vc.upstreamConnectTime = connectTime
|
||||||
|
vc.upstreamHeaderTime = headerTime
|
||||||
|
}
|
||||||
|
|
||||||
// Get 获取变量值(优先自定义变量,再查内置变量)
|
// Get 获取变量值(优先自定义变量,再查内置变量)
|
||||||
func (vc *VariableContext) Get(name string) (string, bool) {
|
func (vc *VariableContext) Get(name string) (string, bool) {
|
||||||
// 1. 先查自定义变量
|
// 1. 先查自定义变量
|
||||||
@ -164,6 +189,32 @@ func (vc *VariableContext) Get(name string) (string, bool) {
|
|||||||
if vc.serverName != "" {
|
if vc.serverName != "" {
|
||||||
return vc.serverName, true
|
return vc.serverName, true
|
||||||
}
|
}
|
||||||
|
// 上游变量
|
||||||
|
case VarUpstreamAddr:
|
||||||
|
if vc.upstreamAddr != "" {
|
||||||
|
return vc.upstreamAddr, true
|
||||||
|
}
|
||||||
|
return "-", true
|
||||||
|
case VarUpstreamStatus:
|
||||||
|
if vc.upstreamStatus > 0 {
|
||||||
|
return strconv.Itoa(vc.upstreamStatus), true
|
||||||
|
}
|
||||||
|
return "-", true
|
||||||
|
case VarUpstreamResponseTime:
|
||||||
|
if vc.upstreamResponseTime > 0 {
|
||||||
|
return strconv.FormatFloat(vc.upstreamResponseTime, 'f', 3, 64), true
|
||||||
|
}
|
||||||
|
return "-", true
|
||||||
|
case VarUpstreamConnectTime:
|
||||||
|
if vc.upstreamConnectTime > 0 {
|
||||||
|
return strconv.FormatFloat(vc.upstreamConnectTime, 'f', 3, 64), true
|
||||||
|
}
|
||||||
|
return "-", true
|
||||||
|
case VarUpstreamHeaderTime:
|
||||||
|
if vc.upstreamHeaderTime > 0 {
|
||||||
|
return strconv.FormatFloat(vc.upstreamHeaderTime, 'f', 3, 64), true
|
||||||
|
}
|
||||||
|
return "-", true
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. 查内置变量缓存
|
// 3. 查内置变量缓存
|
||||||
|
|||||||
@ -831,3 +831,207 @@ func TestBuiltinVarNames(t *testing.T) {
|
|||||||
t.Error("BuiltinVarNames() missing 'remote_addr'")
|
t.Error("BuiltinVarNames() missing 'remote_addr'")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestUpstreamVariables 测试上游变量
|
||||||
|
func TestUpstreamVariables(t *testing.T) {
|
||||||
|
ctx := mockRequestCtx(t)
|
||||||
|
vc := NewVariableContext(ctx)
|
||||||
|
defer ReleaseVariableContext(vc)
|
||||||
|
|
||||||
|
// 未设置时应该返回默认值 "-"
|
||||||
|
tests := []struct {
|
||||||
|
varName string
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{VarUpstreamAddr, "-"},
|
||||||
|
{VarUpstreamStatus, "-"},
|
||||||
|
{VarUpstreamResponseTime, "-"},
|
||||||
|
{VarUpstreamConnectTime, "-"},
|
||||||
|
{VarUpstreamHeaderTime, "-"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.varName+"_default", func(t *testing.T) {
|
||||||
|
value, ok := vc.Get(tt.varName)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("expected variable %s to exist", tt.varName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if value != tt.expected {
|
||||||
|
t.Errorf("%s = %q, want %q", tt.varName, value, tt.expected)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置上游变量
|
||||||
|
vc.SetUpstreamVars("http://backend:8080", 200, 0.123, 0.001, 0.045)
|
||||||
|
|
||||||
|
// 验证设置后的值
|
||||||
|
testsAfter := []struct {
|
||||||
|
varName string
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{VarUpstreamAddr, "http://backend:8080"},
|
||||||
|
{VarUpstreamStatus, "200"},
|
||||||
|
{VarUpstreamResponseTime, "0.123"},
|
||||||
|
{VarUpstreamConnectTime, "0.001"},
|
||||||
|
{VarUpstreamHeaderTime, "0.045"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range testsAfter {
|
||||||
|
t.Run(tt.varName+"_set", func(t *testing.T) {
|
||||||
|
value, ok := vc.Get(tt.varName)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("expected variable %s to exist", tt.varName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if value != tt.expected {
|
||||||
|
t.Errorf("%s = %q, want %q", tt.varName, value, tt.expected)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestUpstreamVariablesInExpand 测试在模板中展开上游变量
|
||||||
|
func TestUpstreamVariablesInExpand(t *testing.T) {
|
||||||
|
ctx := mockRequestCtx(t)
|
||||||
|
vc := NewVariableContext(ctx)
|
||||||
|
defer ReleaseVariableContext(vc)
|
||||||
|
|
||||||
|
// 设置上游变量
|
||||||
|
vc.SetUpstreamVars("http://backend:8080", 200, 0.123, 0.001, 0.045)
|
||||||
|
|
||||||
|
// 测试展开
|
||||||
|
template := "$upstream_addr $upstream_status $upstream_response_time"
|
||||||
|
result := vc.Expand(template)
|
||||||
|
expected := "http://backend:8080 200 0.123"
|
||||||
|
if result != expected {
|
||||||
|
t.Errorf("Expand = %q, want %q", result, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestUpstreamVariablesErrorCases 测试上游变量错误情况
|
||||||
|
func TestUpstreamVariablesErrorCases(t *testing.T) {
|
||||||
|
ctx := mockRequestCtx(t)
|
||||||
|
vc := NewVariableContext(ctx)
|
||||||
|
defer ReleaseVariableContext(vc)
|
||||||
|
|
||||||
|
// 测试各种错误场景
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
addr string
|
||||||
|
status int
|
||||||
|
expected map[string]string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no backend",
|
||||||
|
addr: "FAILED",
|
||||||
|
status: 502,
|
||||||
|
expected: map[string]string{
|
||||||
|
VarUpstreamAddr: "FAILED",
|
||||||
|
VarUpstreamStatus: "502",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "timeout",
|
||||||
|
addr: "http://backend:8080",
|
||||||
|
status: 504,
|
||||||
|
expected: map[string]string{
|
||||||
|
VarUpstreamAddr: "http://backend:8080",
|
||||||
|
VarUpstreamStatus: "504",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cache hit",
|
||||||
|
addr: "CACHE",
|
||||||
|
status: 200,
|
||||||
|
expected: map[string]string{
|
||||||
|
VarUpstreamAddr: "CACHE",
|
||||||
|
VarUpstreamStatus: "200",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "websocket success",
|
||||||
|
addr: "ws://backend:8080",
|
||||||
|
status: 101,
|
||||||
|
expected: map[string]string{
|
||||||
|
VarUpstreamAddr: "ws://backend:8080",
|
||||||
|
VarUpstreamStatus: "101",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
vc.SetUpstreamVars(tt.addr, tt.status, 0, 0, 0)
|
||||||
|
|
||||||
|
for varName, expected := range tt.expected {
|
||||||
|
value, ok := vc.Get(varName)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("expected variable %s to exist", varName)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if value != expected {
|
||||||
|
t.Errorf("%s = %q, want %q", varName, value, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestUpstreamVariablesZeroValues 测试上游变量零值处理
|
||||||
|
func TestUpstreamVariablesZeroValues(t *testing.T) {
|
||||||
|
ctx := mockRequestCtx(t)
|
||||||
|
vc := NewVariableContext(ctx)
|
||||||
|
defer ReleaseVariableContext(vc)
|
||||||
|
|
||||||
|
// 测试零值应该返回 "-"
|
||||||
|
vc.SetUpstreamVars("", 0, 0, 0, 0)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
varName string
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{VarUpstreamAddr, "-"},
|
||||||
|
{VarUpstreamStatus, "-"},
|
||||||
|
{VarUpstreamResponseTime, "-"},
|
||||||
|
{VarUpstreamConnectTime, "-"},
|
||||||
|
{VarUpstreamHeaderTime, "-"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.varName, func(t *testing.T) {
|
||||||
|
value, ok := vc.Get(tt.varName)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("expected variable %s to exist", tt.varName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if value != tt.expected {
|
||||||
|
t.Errorf("%s = %q, want %q", tt.varName, value, tt.expected)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkUpstreamVariables 基准测试:上游变量
|
||||||
|
func BenchmarkUpstreamVariables(b *testing.B) {
|
||||||
|
ctx := &fasthttp.RequestCtx{}
|
||||||
|
ctx.Request.Header.SetHost("example.com")
|
||||||
|
ctx.Request.Header.SetMethod("GET")
|
||||||
|
ctx.Request.Header.SetRequestURI("/test")
|
||||||
|
|
||||||
|
vc := NewVariableContext(ctx)
|
||||||
|
defer ReleaseVariableContext(vc)
|
||||||
|
|
||||||
|
// 设置上游变量
|
||||||
|
vc.SetUpstreamVars("http://backend:8080", 200, 0.123, 0.001, 0.045)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
_, _ = vc.Get(VarUpstreamAddr)
|
||||||
|
_, _ = vc.Get(VarUpstreamStatus)
|
||||||
|
_, _ = vc.Get(VarUpstreamResponseTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user