lolly/docs/lua/api-gateway/upstream.lua
xfy 6543422281 docs: 添加 Nginx 配置和 Lua 脚本示例文档
- config: 反向代理、缓存、负载均衡、安全、SSL 等配置模板
- lua: API 网关、认证、动态路由、限流、WebSocket 等脚本示例

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-10 17:59:22 +08:00

222 lines
6.7 KiB
Lua
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

-- upstream.lua - 上游服务管理
-- 负责:动态节点选择、健康检查、故障剔除、负载均衡
local cjson = require("cjson.safe")
local _M = {}
-- ============================================================
-- 上游服务定义
-- ============================================================
local upstreams = {
user_service = {
nodes = {
{ host = "10.0.0.10", port = 8080, weight = 3 },
{ host = "10.0.0.11", port = 8080, weight = 3 },
{ host = "10.0.0.12", port = 8080, weight = 1 }, -- 低权重备机
},
health_check_interval = 10, -- 健康检查间隔(秒)
max_fails = 3, -- 最大失败次数
fail_timeout = 30, -- 故障剔除时长(秒)
},
order_service = {
nodes = {
{ host = "10.0.0.20", port = 8081, weight = 1 },
{ host = "10.0.0.21", port = 8081, weight = 1 },
},
health_check_interval = 10,
max_fails = 5,
fail_timeout = 60,
},
product_service = {
nodes = {
{ host = "10.0.0.30", port = 8082, weight = 1 },
},
health_check_interval = 15,
max_fails = 3,
fail_timeout = 30,
},
}
-- ============================================================
-- 健康状态存储(共享字典)
-- ============================================================
local health_dict = ngx.shared.upstream_health
--- 初始化上游节点健康状态
local function init_health(upstream_name, node_idx)
local key = upstream_name .. ":" .. node_idx
local existing = health_dict:get(key)
if not existing then
health_dict:set(key, cjson.encode({
fails = 0,
last_fail = 0,
last_check = ngx.now(),
healthy = true,
}))
end
end
-- ============================================================
-- 负载均衡器:加权轮询
-- ============================================================
local lb_state = {} -- 本地轮询状态per-worker
--- 加权轮询选择节点
local function weighted_round_robin(upstream_name, config)
local nodes = config.nodes
local total_weight = 0
local candidates = {}
-- 过滤掉不健康的节点
for i, node in ipairs(nodes) do
init_health(upstream_name, i)
local key = upstream_name .. ":" .. i
local state = cjson.decode(health_dict:get(key))
if state.healthy then
table.insert(candidates, { node = node, index = i, weight = node.weight })
total_weight = total_weight + node.weight
end
end
if #candidates == 0 then
-- 所有节点均不可用,回退到第一个节点
return nodes[1], 1
end
-- 简单加权轮询
if not lb_state[upstream_name] then
lb_state[upstream_name] = { current_weight = 0, index = 0 }
end
local state = lb_state[upstream_name]
state.index = (state.index % #candidates) + 1
local candidate = candidates[state.index]
return candidate.node, candidate.index
end
-- ============================================================
-- 上游节点选择(公开接口)
-- ============================================================
--- 选择一个上游节点
-- @param upstream_name 上游服务名称
-- @return node table 或 nil
function _M.select(upstream_name)
local config = upstreams[upstream_name]
if not config then
ngx.log(ngx.ERR, "unknown upstream: ", upstream_name)
return nil
end
local node, index = weighted_round_robin(upstream_name, config)
return node
end
-- ============================================================
-- 健康检查报告
-- ============================================================
--- 报告上游请求成功
function _M.report_success(upstream_name)
-- 此处可扩展为更精细的健康评分
end
--- 报告上游请求失败
function _M.report_failure(upstream_name)
-- 标记当前 worker 本次请求命中的节点
-- 实际使用中配合 balancer_by_lua_block 使用 ngx.balancer
end
--- 执行健康检查(应通过定时任务周期性调用)
-- 可通过 ngx.timer.at 在 init_worker 阶段启动
function _M.health_check(upstream_name)
local config = upstreams[upstream_name]
if not config then
return
end
for i, node in ipairs(config.nodes) do
local key = upstream_name .. ":" .. i
local state = cjson.decode(health_dict:get(key) or "{}")
if state.fails and state.fails >= config.max_fails then
local now = ngx.now()
if now - (state.last_fail or 0) >= config.fail_timeout then
-- 故障超时,恢复节点
ngx.log(ngx.INFO, "recovering node ", node.host, ":", node.port,
" for ", upstream_name)
health_dict:set(key, cjson.encode({
fails = 0,
last_fail = 0,
last_check = now,
healthy = true,
}))
end
end
-- 更新检查时间
if state.last_check then
state.last_check = now
health_dict:set(key, cjson.encode(state))
end
end
end
--- 启动所有上游的健康检查定时器
function _M.start_health_checks()
for name, config in pairs(upstreams) do
local check_timer = function(premature)
if premature then
return
end
_M.health_check(name)
end
local ok, err = ngx.timer.every(config.health_check_interval, check_timer)
if not ok then
ngx.log(ngx.ERR, "failed to create health check timer for ", name, ": ", err)
else
ngx.log(ngx.INFO, "health check started for ", name,
" (interval: ", config.health_check_interval, "s)")
end
end
end
-- ============================================================
-- 统计信息
-- ============================================================
--- 获取上游服务统计 JSON
function _M.stats_json()
local result = {}
for name, config in pairs(upstreams) do
local nodes_info = {}
for i, node in ipairs(config.nodes) do
local key = name .. ":" .. i
local state = cjson.decode(health_dict:get(key) or "{}")
table.insert(nodes_info, {
host = node.host,
port = node.port,
weight = node.weight,
healthy = state.healthy ~= false,
fails = state.fails or 0,
})
end
result[name] = nodes_info
end
local json, err = cjson.encode(result)
return json or '{"error":"encode_failed"}'
end
return _M