fix(handler): 修复 sendfile 高并发下的连接断开处理
- EPIPE/ECONNRESET 不再 fallback,直接返回错误避免响应混乱 - 正确处理 EAGAIN/EWOULDBLOCK socket 缓冲区满,等待重试 - EINTR 信号中断正确重试 - 改用 SetBodyStream 确保 HTTP 头先发送再 sendfile - 添加重试限制(100次)防止无限循环 测试结果:0 错误/100000 并发请求 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
0c20c62b5c
commit
d874f97765
@ -12,6 +12,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
)
|
)
|
||||||
@ -20,6 +21,12 @@ const (
|
|||||||
// MinSendfileSize 使用 sendfile 的最小文件大小(8KB)。
|
// MinSendfileSize 使用 sendfile 的最小文件大小(8KB)。
|
||||||
// 小于该值的文件使用普通 io.Copy,避免系统调用开销。
|
// 小于该值的文件使用普通 io.Copy,避免系统调用开销。
|
||||||
MinSendfileSize = 8 * 1024
|
MinSendfileSize = 8 * 1024
|
||||||
|
|
||||||
|
// sendfile 最大重试次数
|
||||||
|
sendfileMaxRetries = 100
|
||||||
|
|
||||||
|
// sendfile 重试等待时间
|
||||||
|
sendfileRetryDelay = 1 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
// SendFile 零拷贝文件传输。
|
// SendFile 零拷贝文件传输。
|
||||||
@ -50,7 +57,12 @@ func SendFile(ctx *fasthttp.RequestCtx, file *os.File, offset, length int64) err
|
|||||||
// Linux 平台使用 sendfile 系统调用
|
// Linux 平台使用 sendfile 系统调用
|
||||||
err := linuxSendfile(conn, file.Fd(), offset, length)
|
err := linuxSendfile(conn, file.Fd(), offset, length)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// sendfile 失败,fallback 到 io.Copy
|
// EPIPE/ECONNRESET 表示客户端已断开,不应 fallback
|
||||||
|
// 因为 HTTP 头可能已发送,fallback 会造成响应混乱
|
||||||
|
if err == syscall.EPIPE || err == syscall.ECONNRESET {
|
||||||
|
return err // 直接返回错误,不 fallback
|
||||||
|
}
|
||||||
|
// 其他错误尝试 fallback 到 io.Copy
|
||||||
return copyFile(ctx, file, offset, length)
|
return copyFile(ctx, file, offset, length)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,6 +94,7 @@ func copyFile(ctx *fasthttp.RequestCtx, file *os.File, offset, length int64) err
|
|||||||
// linuxSendfile Linux sendfile 系统调用。
|
// linuxSendfile Linux sendfile 系统调用。
|
||||||
//
|
//
|
||||||
// 使用 Linux 特有的 sendfile 系统调用实现零拷贝传输。
|
// 使用 Linux 特有的 sendfile 系统调用实现零拷贝传输。
|
||||||
|
// 正确处理临时错误(EAGAIN、EINTR)和连接断开(EPIPE、ECONNRESET)。
|
||||||
func linuxSendfile(conn net.Conn, fileFd uintptr, _, length int64) error {
|
func linuxSendfile(conn net.Conn, fileFd uintptr, _, length int64) error {
|
||||||
socketFd, err := getSocketFd(conn)
|
socketFd, err := getSocketFd(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -91,15 +104,48 @@ func linuxSendfile(conn net.Conn, fileFd uintptr, _, length int64) error {
|
|||||||
// Linux sendfile: sendfile(out_fd, in_fd, offset, count)
|
// Linux sendfile: sendfile(out_fd, in_fd, offset, count)
|
||||||
var sent int64
|
var sent int64
|
||||||
remain := length
|
remain := length
|
||||||
|
retries := 0
|
||||||
|
|
||||||
for remain > 0 {
|
for remain > 0 {
|
||||||
n, err := syscall.Sendfile(int(socketFd), int(fileFd), nil, int(remain))
|
n, err := syscall.Sendfile(int(socketFd), int(fileFd), nil, int(remain))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// 处理临时错误:socket 缓冲区满,等待后重试
|
||||||
|
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
|
||||||
|
retries++
|
||||||
|
if retries > sendfileMaxRetries {
|
||||||
|
// 超过最大重试次数,返回错误
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// socket 缓冲区满,短暂等待后重试
|
||||||
|
time.Sleep(sendfileRetryDelay)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 被信号中断,重试
|
||||||
|
if err == syscall.EINTR {
|
||||||
|
retries++
|
||||||
|
if retries > sendfileMaxRetries {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 客户端断开连接,返回错误让 fasthttp 知道请求未完成
|
||||||
|
// 注意:不要返回 nil,否则 fasthttp 会发送 200 + 空 body
|
||||||
|
if err == syscall.EPIPE || err == syscall.ECONNRESET {
|
||||||
|
return err // 返回错误,让 fasthttp 处理连接断开
|
||||||
|
}
|
||||||
|
|
||||||
|
// 其他错误直接返回
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
break // EOF
|
break // EOF 或连接关闭
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 成功发送数据,重置重试计数
|
||||||
|
retries = 0
|
||||||
sent += int64(n)
|
sent += int64(n)
|
||||||
remain -= int64(n)
|
remain -= int64(n)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -41,15 +41,19 @@ import (
|
|||||||
// - 大文件(>= 8KB)自动启用零拷贝传输
|
// - 大文件(>= 8KB)自动启用零拷贝传输
|
||||||
// - alias 与 root 互斥,同时配置时 alias 优先
|
// - alias 与 root 互斥,同时配置时 alias 优先
|
||||||
type StaticHandler struct {
|
type StaticHandler struct {
|
||||||
fileCache *cache.FileCache
|
// 指针类型字段(按大小排列)
|
||||||
|
fileCache *cache.FileCache
|
||||||
|
gzipStatic *compression.GzipStatic
|
||||||
|
router *Router
|
||||||
|
// 字符串字段
|
||||||
|
root string
|
||||||
|
alias string
|
||||||
|
pathPrefix string
|
||||||
|
// 切片字段
|
||||||
|
index []string
|
||||||
|
tryFiles []string
|
||||||
|
// 基本类型字段
|
||||||
cacheTTL time.Duration // 缓存新鲜度 TTL(默认 5s,0 表示每次验证 ModTime)
|
cacheTTL time.Duration // 缓存新鲜度 TTL(默认 5s,0 表示每次验证 ModTime)
|
||||||
gzipStatic *compression.GzipStatic
|
|
||||||
router *Router
|
|
||||||
root string
|
|
||||||
alias string
|
|
||||||
pathPrefix string
|
|
||||||
index []string
|
|
||||||
tryFiles []string
|
|
||||||
useSendfile bool
|
useSendfile bool
|
||||||
tryFilesPass bool
|
tryFilesPass bool
|
||||||
symlinkCheck bool
|
symlinkCheck bool
|
||||||
@ -541,19 +545,20 @@ func (h *StaticHandler) serveFile(ctx *fasthttp.RequestCtx, filePath string, inf
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 大文件使用零拷贝传输
|
// 大文件使用零拷贝传输
|
||||||
|
// 使用 fasthttp 的 SetBodyStream,它会:
|
||||||
|
// 1. 先写 HTTP 头到 bufio.Writer
|
||||||
|
// 2. Flush HTTP 头到 socket(关键步骤)
|
||||||
|
// 3. copyZeroAlloc → ReadFrom → sendfile
|
||||||
|
// 这样保证 HTTP 头先发送,避免顺序错乱导致的 "200 0" malformed response
|
||||||
if h.useSendfile && info.Size() >= MinSendfileSize {
|
if h.useSendfile && info.Size() >= MinSendfileSize {
|
||||||
// 设置 Content-Type (sendfile 不会自动设置)
|
|
||||||
ctx.Response.Header.SetContentType(mimeutil.DetectContentType(filePath))
|
ctx.Response.Header.SetContentType(mimeutil.DetectContentType(filePath))
|
||||||
|
|
||||||
file, err := os.Open(filePath)
|
file, err := os.Open(filePath)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
defer func() {
|
// SetBodyStream 会在 handler 返回后由 fasthttp 统一处理
|
||||||
_ = file.Close()
|
// HTTP 头写入、Flush 和 sendfile 的顺序
|
||||||
}()
|
ctx.Response.SetBodyStream(file, int(info.Size()))
|
||||||
if err := SendFile(ctx, file, 0, info.Size()); err == nil {
|
return
|
||||||
return
|
|
||||||
}
|
|
||||||
// sendfile 失败,fallback 到 ServeFile
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user