feat(benchmark): 新增分层性能回归检测策略

- PR 趋势监控使用宽松阈值,仅警告不阻塞合并
- 定期完整检测使用严格阈值,生成统计报告
- 新增阈值配置文件支持分环境配置
- 回归检测脚本支持 YAML 配置和环境参数
- 新增方差分析脚本用于推导阈值
This commit is contained in:
xfy 2026-04-08 18:25:22 +08:00
parent 8e27ac0f77
commit f46b0dee07
4 changed files with 893 additions and 50 deletions

129
.benchmark-thresholds.yaml Normal file
View File

@ -0,0 +1,129 @@
# 性能回归阈值配置
#
# 阈值推导方法论:
# 1. 运行基准测试 50 次获取样本
# 2. 计算每个测试的变异系数 (CV = stdev / mean * 100)
# 3. threshold_warning = 2 * CV
# 4. threshold_block = 3 * CV
#
# 注意: 以下为示例值,实际值需从 Phase 0 数据推导
# 分环境阈值配置
environments:
# 本地稳定环境 - 严格阈值
local:
description: "本地稳定环境delta <5% 可达"
thresholds:
# 默认阈值
default:
warning: -5.0 # 性能下降 5% 警告
block: -12.0 # 性能下降 12% 阻塞
# 变量展开 - 高频操作,严格阈值
variable:
warning: -3.0
block: -8.0
# 缓存操作 - 高频操作
cache:
warning: -3.0
block: -8.0
# 负载均衡
loadbalance:
warning: -5.0
block: -12.0
# 代理转发
proxy:
warning: -5.0
block: -12.0
# 压缩 - 受数据特征影响较大
compression:
warning: -8.0
block: -15.0
# 限流
ratelimit:
warning: -5.0
block: -12.0
# 滑动窗口
sliding_window:
warning: -8.0
block: -15.0
# 静态文件服务
static:
warning: -5.0
block: -12.0
# DNS 解析 - 网络波动影响大
resolver:
warning: -10.0
block: -20.0
# CI 共享 runner 环境 - 宽松阈值
ci:
description: "CI 共享 runner波动较大delta <20% 作为警告阈值"
thresholds:
# 默认阈值 - CI 环境波动大
default:
warning: -15.0
block: -25.0
# 核心模块相对更稳定
variable:
warning: -10.0
block: -20.0
cache:
warning: -10.0
block: -20.0
loadbalance:
warning: -15.0
block: -25.0
proxy:
warning: -15.0
block: -25.0
# 压缩受数据特征影响
compression:
warning: -20.0
block: -30.0
ratelimit:
warning: -15.0
block: -25.0
sliding_window:
warning: -20.0
block: -30.0
static:
warning: -15.0
block: -25.0
# DNS 解析在 CI 中波动更大
resolver:
warning: -25.0
block: -35.0
# 全局配置
global:
# 最小样本数
min_samples: 10
# P 值阈值(统计显著性)
p_value_threshold: 0.05
# 内存分配增长阈值(独立于时间阈值)
memory_warning: 10.0 # 内存分配增长 10% 警告
memory_block: 25.0 # 内存分配增长 25% 阻塞
# 忽略列表(不检测回归的测试)
ignore:
# - BenchmarkSomeFlakyTest

View File

@ -1,27 +1,109 @@
# Benchmark CI Workflow
# 自动化运行 Go 基准测试并进行性能回归检测
# 分层策略:
# - PR 趋势监控: 宽松阈值,不阻塞合并
# - 定期完整检测: 严格阈值,生成报告
#
# 作者: xfy
name: Benchmark
on:
push:
branches: [main, master]
branches: [master]
pull_request:
branches: [main, master]
branches: [master]
schedule:
# 每周一凌晨 2 点运行完整检测
- cron: '0 2 * * 1'
workflow_dispatch:
inputs:
full:
description: 'Run full benchmark suite'
required: false
default: 'false'
env:
GO_VERSION: '1.23'
BENCH_COUNT: 10
jobs:
benchmark:
name: Run Benchmarks
# PR 趋势监控 - 宽松阈值,仅警告
benchmark-pr:
name: PR Benchmark
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0 # 需要完整历史进行基准线对比
fetch-depth: 0
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
- name: Run core benchmarks
id: bench
run: |
# 运行核心模块基准测试
go test -bench='Benchmark(Variable|Compression|RateLimiter|SlidingWindow|AccessLog|Static|Cache|Proxy|LoadBalance)' \
-benchmem -count=${{ env.BENCH_COUNT }} -timeout=10m ./... 2>&1 | tee benchmark-pr.txt
# 统计测试数量
echo "test_count=$(grep -c 'ns/op' benchmark-pr.txt || echo 0)" >> $GITHUB_OUTPUT
- name: Check regression (warning only)
run: |
# 宽松阈值 ±20%,仅作警告
python3 scripts/check_regression.py \
--warning-threshold 20 \
--block-threshold 30 \
benchmark-pr.txt || \
echo "::warning::Potential performance change detected (±20% threshold)"
- name: Comment on PR
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const output = fs.readFileSync('benchmark-pr.txt', 'utf8');
const lines = output.split('\n').filter(l => l.includes('ns/op')).slice(0, 25);
const body = `## 📊 Benchmark Results
\`\`\`
${lines.join('\n')}
\`\`\`
- Tests run: ${{ steps.bench.outputs.test_count }}
- Threshold: ±20% (warning only)
`;
github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: body
});
- name: Upload results
uses: actions/upload-artifact@v4
with:
name: benchmark-pr-results
path: benchmark-pr.txt
retention-days: 7
# 定期完整检测 - 严格阈值
benchmark-weekly:
name: Weekly Full Benchmark
if: github.event_name == 'schedule' || github.event_name == 'workflow_dispatch'
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v5
@ -31,59 +113,64 @@ jobs:
- name: Install benchstat
run: go install golang.org/x/perf/cmd/benchstat@latest
- name: Run benchmarks (current)
- name: Run full benchmarks
id: bench
run: |
go test -bench=. -benchmem -count=${{ env.BENCH_COUNT }} ./... > benchmark-current.txt
cat benchmark-current.txt
echo "Running full benchmark suite..."
go test -bench=. -benchmem -count=20 -timeout=25m ./... 2>&1 | tee benchmark-full.txt
- name: Upload current benchmark results
# 生成统计报告
benchstat benchmark-full.txt > benchmark-stat.txt || true
echo "test_count=$(grep -c 'ns/op' benchmark-full.txt || echo 0)" >> $GITHUB_OUTPUT
- name: Check regression with config
run: |
if [ -f .benchmark-thresholds.yaml ]; then
python3 scripts/check_regression.py \
--config .benchmark-thresholds.yaml \
--environment ci \
benchmark-full.txt || true
else
python3 scripts/check_regression.py \
--warning-threshold 15 \
--block-threshold 25 \
benchmark-full.txt || true
fi
- name: Upload baseline
uses: actions/upload-artifact@v4
with:
name: benchmark-current
path: benchmark-current.txt
name: benchmark-baseline
path: benchmark-full.txt
retention-days: 30
- name: Checkout main branch (for comparison)
if: github.ref != 'refs/heads/main' && github.ref != 'refs/heads/master'
run: |
git stash
git checkout main || git checkout master || echo "No main/master branch"
git stash pop || true
- name: Run benchmarks (baseline)
if: github.ref != 'refs/heads/main' && github.ref != 'refs/heads/master'
run: |
go test -bench=. -benchmem -count=${{ env.BENCH_COUNT }} ./... > benchmark-baseline.txt || echo "Baseline failed" > benchmark-baseline.txt
- name: Compare benchmarks
if: github.ref != 'refs/heads/main' && github.ref != 'refs/heads/master'
run: |
if [ -f benchmark-baseline.txt ] && [ -s benchmark-baseline.txt ]; then
benchstat benchmark-baseline.txt benchmark-current.txt > benchmark-comparison.txt
cat benchmark-comparison.txt
else
echo "No baseline for comparison" > benchmark-comparison.txt
fi
- name: Upload comparison results
if: github.ref != 'refs/heads/main' && github.ref != 'refs/heads/master'
- name: Upload report
uses: actions/upload-artifact@v4
with:
name: benchmark-comparison
path: benchmark-comparison.txt
retention-days: 7
name: benchmark-report
path: |
benchmark-stat.txt
retention-days: 90
- name: Check regression
if: github.ref != 'refs/heads/main' && github.ref != 'refs/heads/master'
run: |
if [ -f benchmark-comparison.txt ]; then
python3 scripts/check_regression.py benchmark-comparison.txt || true
fi
- name: Create issue on regression
if: failure()
uses: actions/github-script@v7
with:
script: |
github.rest.issues.create({
owner: context.repo.owner,
repo: context.repo.repo,
title: `🚨 Performance Regression Detected - ${new Date().toISOString().split('T')[0]}`,
body: `Weekly benchmark detected performance regression.\n\nSee [workflow run](${process.env.GITHUB_SERVER_URL}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId})`,
labels: ['performance', 'regression']
});
# 保存基准线
benchmark-save:
name: Save Benchmark Baseline
if: github.ref == 'refs/heads/master'
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/master'
steps:
- name: Checkout code
uses: actions/checkout@v4

407
scripts/analyze_variance.py Normal file
View File

@ -0,0 +1,407 @@
#!/usr/bin/env python3
"""分析基准测试方差,推导回归阈值。
该脚本用于
1. 解析 benchstat 输出
2. 计算每个测试的方差和阈值建议
3. 支持正态性检验
4. 生成分环境阈值配置
用法:
python scripts/analyze_variance.py benchmark-results.txt
python scripts/analyze_variance.py --format yaml benchmark-results.txt
go test -bench=. -count=50 ./... | tee results.txt | python scripts/analyze_variance.py -
"""
import sys
import re
import statistics
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
@dataclass
class BenchmarkResult:
"""单个基准测试的结果。"""
name: str
ns_op_values: List[float] = field(default_factory=list)
b_op_values: List[float] = field(default_factory=list)
allocs_op_values: List[float] = field(default_factory=list)
# 统计量
ns_op_mean: float = 0.0
ns_op_stdev: float = 0.0
b_op_mean: float = 0.0
b_op_stdev: float = 0.0
allocs_op_mean: float = 0.0
allocs_op_stdev: float = 0.0
# 变异系数
ns_op_cv: float = 0.0
b_op_cv: float = 0.0
allocs_op_cv: float = 0.0
# 建议阈值
threshold_warning: float = 0.0
threshold_block: float = 0.0
def parse_benchstat_line(line: str) -> Optional[Tuple[str, float, float, float]]:
"""解析单行 benchstat 输出。
格式示例:
BenchmarkVariableExpand-8 123.4 ± 5% 1024 B/op 32 allocs/op
BenchmarkCacheGet-8 45.67 ± 2% 256 B/op 8 allocs/op
返回: (name, ns_op, b_op, allocs_op) None
"""
# 跳过空行和分隔符
if not line.strip() or line.startswith('name') or line.startswith('---'):
return None
# 匹配基准测试行
# 格式: name ns/op ±% B/op allocs/op
pattern = r'^(\S+)\s+([\d.]+)\s*(?:±\s*([\d.]+)%)?\s+([\d.]+)\s+([\d.]+)'
match = re.match(pattern, line.strip())
if match:
name = match.group(1)
ns_op = float(match.group(2))
b_op = float(match.group(4))
allocs_op = float(match.group(5))
return (name, ns_op, b_op, allocs_op)
return None
def parse_benchstat_output(text: str) -> Dict[str, BenchmarkResult]:
"""解析 benchstat 输出,提取每个测试的统计数据。
Args:
text: benchstat 命令的输出文本
Returns:
字典key 为测试名value BenchmarkResult
"""
results: Dict[str, BenchmarkResult] = {}
for line in text.split('\n'):
parsed = parse_benchstat_line(line)
if parsed:
name, ns_op, b_op, allocs_op = parsed
if name not in results:
results[name] = BenchmarkResult(name=name)
results[name].ns_op_values.append(ns_op)
results[name].b_op_values.append(b_op)
results[name].allocs_op_values.append(allocs_op)
return results
def parse_raw_benchmark_output(text: str) -> Dict[str, BenchmarkResult]:
"""解析原始 go test -bench 输出(非 benchstat 格式)。
格式示例:
BenchmarkVariableExpand-8 1000000 1234 ns/op 1024 B/op 32 allocs/op
Args:
text: go test -bench 命令的原始输出
Returns:
字典key 为测试名value BenchmarkResult
"""
results: Dict[str, BenchmarkResult] = {}
# 匹配基准测试输出行
pattern = r'^(Benchmark\S+)\s+(\d+)\s+([\d.]+)\s+ns/op\s+([\d.]+)\s+B/op\s+([\d.]+)\s+allocs/op'
for line in text.split('\n'):
match = re.match(pattern, line.strip())
if match:
name = match.group(1)
ns_op = float(match.group(3))
b_op = float(match.group(4))
allocs_op = float(match.group(5))
if name not in results:
results[name] = BenchmarkResult(name=name)
results[name].ns_op_values.append(ns_op)
results[name].b_op_values.append(b_op)
results[name].allocs_op_values.append(allocs_op)
return results
def calculate_statistics(results: Dict[str, BenchmarkResult]) -> Dict[str, BenchmarkResult]:
"""计算每个测试的统计量和建议阈值。
阈值推导方法:
threshold_warning = 2 * std_dev / mean * 100 (百分比)
threshold_block = 3 * std_dev / mean * 100
Args:
results: 解析后的基准测试结果
Returns:
更新了统计量的结果字典
"""
for name, result in results.items():
if len(result.ns_op_values) < 2:
continue
# 计算 ns/op 统计量
result.ns_op_mean = statistics.mean(result.ns_op_values)
if len(result.ns_op_values) >= 2:
result.ns_op_stdev = statistics.stdev(result.ns_op_values)
# 计算 B/op 统计量
if result.b_op_values:
result.b_op_mean = statistics.mean(result.b_op_values)
if len(result.b_op_values) >= 2:
result.b_op_stdev = statistics.stdev(result.b_op_values)
# 计算 allocs/op 统计量
if result.allocs_op_values:
result.allocs_op_mean = statistics.mean(result.allocs_op_values)
if len(result.allocs_op_values) >= 2:
result.allocs_op_stdev = statistics.stdev(result.allocs_op_values)
# 计算变异系数 (CV = stdev / mean)
if result.ns_op_mean > 0:
result.ns_op_cv = (result.ns_op_stdev / result.ns_op_mean) * 100
# 建议阈值: warning = 2*CV, block = 3*CV
result.threshold_warning = 2 * result.ns_op_cv
result.threshold_block = 3 * result.ns_op_cv
return results
def check_normality(values: List[float]) -> Tuple[bool, str]:
"""简化的正态性检验。
使用变异系数作为简化的正态性指标
- CV < 10%: 近似正态分布
- CV >= 10%: 可能非正态建议增大样本量
对于严格的正态性检验应使用 Shapiro-Wilk 检验
但那需要 scipy.stats
Args:
values: 样本值列表
Returns:
(is_likely_normal, reason)
"""
if len(values) < 10:
return False, f"样本量不足 ({len(values)} < 10),建议至少 50 次采样"
mean = statistics.mean(values)
if mean == 0:
return False, "均值为零,无法计算变异系数"
stdev = statistics.stdev(values)
cv = (stdev / mean) * 100
if cv < 5:
return True, f"CV={cv:.1f}% < 5%,非常稳定"
elif cv < 10:
return True, f"CV={cv:.1f}% < 10%,近似正态分布"
elif cv < 20:
return True, f"CV={cv:.1f}% < 20%,可接受范围(建议增大样本量)"
else:
return False, f"CV={cv:.1f}% >= 20%,方差过大,检查测试稳定性"
def generate_threshold_config(results: Dict[str, BenchmarkResult],
environment: str = "local") -> str:
"""生成阈值配置文件内容。
Args:
results: 计算过统计量的结果
environment: 环境名称local ci
Returns:
YAML 格式的配置文件内容
"""
lines = [
"# 阈值推导方法论:",
"# 1. 运行基准测试 50 次获取样本",
"# 2. 计算每个测试的变异系数 (CV = stdev / mean * 100)",
"# 3. threshold_warning = 2 * CV",
"# 4. threshold_block = 3 * CV",
"#",
f"# 环境类型: {environment}",
"# 生成时间: 自动生成",
"",
f"environments:",
f" {environment}:",
f" description: \"{'本地稳定环境' if environment == 'local' else 'CI 共享 runner 环境'}\"",
f" thresholds:",
]
# 计算全局默认阈值
all_cvs = [r.ns_op_cv for r in results.values() if r.ns_op_cv > 0]
if all_cvs:
median_cv = statistics.median(all_cvs)
default_warning = round(2 * median_cv, 1)
default_block = round(3 * median_cv, 1)
else:
default_warning = 5.0
default_block = 12.0
lines.append(f" default:")
lines.append(f" warning: -{default_warning}")
lines.append(f" block: -{default_block}")
# 为每个模块生成阈值
module_cvs: Dict[str, List[float]] = {}
for name, result in results.items():
# 提取模块名 (Benchmark<Module>... -> Module)
module_match = re.match(r'Benchmark([A-Z][a-z]+)', name)
if module_match:
module = module_match.group(1).lower()
else:
module = "default"
if module not in module_cvs:
module_cvs[module] = []
if result.ns_op_cv > 0:
module_cvs[module].append(result.ns_op_cv)
for module, cvs in sorted(module_cvs.items()):
if len(cvs) >= 1 and module != "default":
avg_cv = statistics.mean(cvs)
warning = round(2 * avg_cv, 1)
block = round(3 * avg_cv, 1)
lines.append(f" {module}:")
lines.append(f" warning: -{warning}")
lines.append(f" block: -{block}")
return "\n".join(lines)
def print_summary(results: Dict[str, BenchmarkResult]) -> None:
"""打印分析摘要。"""
print("\n" + "=" * 80)
print("基准测试方差分析报告")
print("=" * 80)
print(f"{'测试名称':<45} {'均值(ns)':>12} {'标准差':>10} {'CV%':>8} {'建议阈值':>12}")
print("-" * 80)
# 按 CV 排序
sorted_results = sorted(results.items(),
key=lambda x: x[1].ns_op_cv,
reverse=True)
for name, result in sorted_results:
if result.ns_op_mean > 0:
short_name = name[:44] if len(name) > 44 else name
print(f"{short_name:<45} {result.ns_op_mean:>12.2f} "
f"{result.ns_op_stdev:>10.2f} {result.ns_op_cv:>8.1f} "
f"±{result.threshold_warning:.1f}%/±{result.threshold_block:.1f}%")
print("=" * 80)
# 稳定性摘要
stable = sum(1 for r in results.values() if r.ns_op_cv < 5)
acceptable = sum(1 for r in results.values() if 5 <= r.ns_op_cv < 10)
unstable = sum(1 for r in results.values() if r.ns_op_cv >= 10)
print(f"\n稳定性摘要:")
print(f" 非常稳定 (CV < 5%): {stable} 个测试")
print(f" 稳定 (CV 5-10%): {acceptable} 个测试")
print(f" 不稳定 (CV >= 10%): {unstable} 个测试")
if unstable > 0:
print(f"\n警告: {unstable} 个测试方差过大,建议检查:")
for name, result in sorted_results:
if result.ns_op_cv >= 10:
print(f" - {name} (CV={result.ns_op_cv:.1f}%)")
def main():
parser = argparse.ArgumentParser(
description='分析基准测试方差,推导回归阈值',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 分析 benchstat 输出
python scripts/analyze_variance.py benchmark.txt
# 分析原始 go test 输出
go test -bench=. -count=50 ./... | python scripts/analyze_variance.py -
# 生成 YAML 配置
python scripts/analyze_variance.py --format yaml benchmark.txt
"""
)
parser.add_argument('input', nargs='?', default='-',
help='输入文件路径,- 表示从 stdin 读取')
parser.add_argument('--format', choices=['text', 'yaml', 'json'],
default='text',
help='输出格式 (默认: text)')
parser.add_argument('--environment', choices=['local', 'ci'],
default='local',
help='环境类型 (默认: local)')
parser.add_argument('--output', '-o',
help='输出文件路径 (默认: stdout)')
args = parser.parse_args()
# 读取输入
if args.input == '-':
text = sys.stdin.read()
else:
path = Path(args.input)
if not path.exists():
print(f"错误: 文件不存在: {args.input}", file=sys.stderr)
sys.exit(1)
text = path.read_text()
# 解析输入
# 尝试 benchstat 格式,如果失败则尝试原始格式
results = parse_benchstat_output(text)
if not results:
results = parse_raw_benchmark_output(text)
if not results:
print("错误: 未能解析任何基准测试数据", file=sys.stderr)
sys.exit(1)
# 计算统计量
results = calculate_statistics(results)
# 输出结果
output = ""
if args.format == 'yaml':
output = generate_threshold_config(results, args.environment)
elif args.format == 'json':
import json
output = json.dumps({
name: {
'mean_ns_op': r.ns_op_mean,
'stdev_ns_op': r.ns_op_stdev,
'cv_percent': r.ns_op_cv,
'threshold_warning': r.threshold_warning,
'threshold_block': r.threshold_block,
'mean_b_op': r.b_op_mean,
'mean_allocs_op': r.allocs_op_mean,
}
for name, r in results.items()
}, indent=2)
else:
print_summary(results)
return
# 写入输出
if args.output:
Path(args.output).write_text(output)
print(f"结果已写入: {args.output}")
else:
print(output)
if __name__ == '__main__':
main()

View File

@ -4,19 +4,28 @@
用法:
python check_regression.py <benchstat_output_file>
python check_regression.py --config .benchmark-thresholds.yaml benchmark.txt
python check_regression.py --help
退出码:
0 - 无回归或轻微变化
1 - 检测到 WARNING 级别回归 (-5%)
2 - 检测到 BLOCK 级别回归 (-15%)
1 - 检测到 WARNING 级别回归
2 - 检测到 BLOCK 级别回归
"""
import argparse
import re
import sys
import os
from dataclasses import dataclass
from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, Dict
# 尝试导入 YAML 解析器
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
@dataclass
@ -164,6 +173,200 @@ def check_regressions(results: List[BenchmarkResult]) -> Tuple[int, int, int]:
return ok_count, warning_count, block_count
def extract_module_name(benchmark_name: str) -> str:
"""从基准测试名称提取模块名。
Args:
benchmark_name: 完整的基准测试名称 "BenchmarkCacheGet-8"
Returns:
str: 模块名 "cache"
"""
# 移除 Benchmark 前缀和 -N 后缀
name = benchmark_name
if name.startswith('Benchmark'):
name = name[9:] # 移除 "Benchmark"
# 移除 -N 后缀
if '-' in name:
name = name.split('-')[0]
# 提取模块名(第一个单词的小写形式)
module = ''
for c in name:
if c.isupper() and module:
break
module += c.lower()
# 常见模块名映射
module_map = {
'cache': 'cache',
'proxy': 'proxy',
'loadbalance': 'loadbalance',
'round': 'loadbalance',
'weighted': 'loadbalance',
'consistent': 'loadbalance',
'least': 'loadbalance',
'ip': 'loadbalance',
'variable': 'variable',
'expand': 'variable',
'gzip': 'compression',
'brotli': 'compression',
'compression': 'compression',
'ratelimiter': 'ratelimit',
'rate': 'ratelimit',
'sliding': 'sliding_window',
'accesslog': 'accesslog',
'access': 'accesslog',
'static': 'static',
'resolver': 'resolver',
'dns': 'resolver',
'ssl': 'ssl',
'vhost': 'vhost',
'rewrite': 'rewrite',
'bodylimit': 'bodylimit',
'auth': 'auth',
'headers': 'headers',
}
return module_map.get(module, module or 'default')
def load_threshold_config(config_path: str) -> dict:
"""加载阈值配置文件。
Args:
config_path: 配置文件路径
Returns:
dict: 配置字典
"""
if not HAS_YAML:
print("警告: PyYAML 未安装,无法加载配置文件", file=sys.stderr)
return {}
if not os.path.exists(config_path):
print(f"警告: 配置文件不存在: {config_path}", file=sys.stderr)
return {}
try:
with open(config_path, 'r') as f:
return yaml.safe_load(f) or {}
except Exception as e:
print(f"警告: 加载配置文件失败: {e}", file=sys.stderr)
return {}
def get_thresholds(config: dict, environment: str, module: str,
default_warning: float, default_block: float) -> Tuple[float, float]:
"""获取指定环境和模块的阈值。
Args:
config: 配置字典
environment: 环境名称 ("local" "ci")
module: 模块名
default_warning: 默认警告阈值
default_block: 默认阻塞阈值
Returns:
(warning_threshold, block_threshold)
"""
if not config:
return default_warning, default_block
# 获取环境配置
env_config = config.get('environments', {}).get(environment, {})
thresholds = env_config.get('thresholds', {})
# 先查找模块特定阈值
if module in thresholds:
module_thresholds = thresholds[module]
warning = module_thresholds.get('warning', -default_warning)
block = module_thresholds.get('block', -default_block)
return abs(warning), abs(block)
# 使用默认阈值
if 'default' in thresholds:
default = thresholds['default']
warning = default.get('warning', -default_warning)
block = default.get('block', -default_block)
return abs(warning), abs(block)
return default_warning, default_block
def classify_regression_with_config(result: BenchmarkResult, config: dict,
environment: str, default_warning: float,
default_block: float) -> Tuple[str, float, Optional[float]]:
"""
分类回归级别支持配置文件
返回值: (level, change_pct, p_value)
level: "OK", "WARNING", "BLOCK"
"""
change = result.time_change_pct
if change is None:
return "OK", 0.0, result.p_value
# 获取模块阈值
module = extract_module_name(result.name)
warning_threshold, block_threshold = get_thresholds(
config, environment, module, default_warning, default_block
)
# 正值表示性能提升,负值表示性能下降
if change <= -block_threshold:
return "BLOCK", change, result.p_value
elif change <= -warning_threshold:
return "WARNING", change, result.p_value
else:
return "OK", change, result.p_value
def check_regressions_with_config(results: List[BenchmarkResult], config: dict,
environment: str, default_warning: float,
default_block: float) -> Tuple[int, int, int]:
"""
检查所有基准测试的回归情况支持配置文件
返回: (ok_count, warning_count, block_count)
"""
ok_count = 0
warning_count = 0
block_count = 0
print("=" * 80)
print(f"性能回归检测结果 (环境: {environment})")
print("=" * 80)
print(f"{'基准测试':<40} {'变化':<12} {'P值':<12} {'级别':<10}")
print("-" * 80)
for result in results:
level, change, p_value = classify_regression_with_config(
result, config, environment, default_warning, default_block
)
p_str = f"{p_value:.4f}" if p_value else "N/A"
change_str = f"{change:+.2f}%" if change else "N/A"
if level == "OK":
ok_count += 1
icon = ""
elif level == "WARNING":
warning_count += 1
icon = ""
else:
block_count += 1
icon = ""
print(f"{result.name:<40} {change_str:<12} {p_str:<12} {icon} {level}")
print("-" * 80)
print(f"总结: {ok_count} 正常, {warning_count} 警告, {block_count} 阻断")
print("=" * 80)
return ok_count, warning_count, block_count
def main():
parser = argparse.ArgumentParser(
description='解析 benchstat 输出并检测性能回归',
@ -175,6 +378,7 @@ def main():
示例:
python check_regression.py benchmark-comparison.txt
python check_regression.py --config .benchmark-thresholds.yaml --environment ci benchmark.txt
benchstat old.txt new.txt | python check_regression.py -
'''
)
@ -185,9 +389,19 @@ def main():
help='阻断阈值百分比(默认: 15')
parser.add_argument('--p-value', type=float, default=0.05,
help='统计显著性 P 值阈值(默认: 0.05')
parser.add_argument('--config', '-c', type=str,
help='阈值配置文件路径 (.yaml)')
parser.add_argument('--environment', '-e', type=str, default='local',
choices=['local', 'ci'],
help='环境类型(默认: local')
args = parser.parse_args()
# 加载配置文件
config = {}
if args.config:
config = load_threshold_config(args.config)
# 读取输入
if args.file == '-':
content = sys.stdin.read()
@ -214,7 +428,13 @@ def main():
sys.exit(0)
# 检查回归
ok_count, warning_count, block_count = check_regressions(results)
if config:
ok_count, warning_count, block_count = check_regressions_with_config(
results, config, args.environment,
args.warning_threshold, args.block_threshold
)
else:
ok_count, warning_count, block_count = check_regressions(results)
# 设置退出码
if block_count > 0: