- config: 反向代理、缓存、负载均衡、安全、SSL 等配置模板 - lua: API 网关、认证、动态路由、限流、WebSocket 等脚本示例 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
222 lines
6.7 KiB
Lua
222 lines
6.7 KiB
Lua
-- upstream.lua - 上游服务管理
|
||
-- 负责:动态节点选择、健康检查、故障剔除、负载均衡
|
||
|
||
local cjson = require("cjson.safe")
|
||
|
||
local _M = {}
|
||
|
||
-- ============================================================
|
||
-- 上游服务定义
|
||
-- ============================================================
|
||
|
||
local upstreams = {
|
||
user_service = {
|
||
nodes = {
|
||
{ host = "10.0.0.10", port = 8080, weight = 3 },
|
||
{ host = "10.0.0.11", port = 8080, weight = 3 },
|
||
{ host = "10.0.0.12", port = 8080, weight = 1 }, -- 低权重备机
|
||
},
|
||
health_check_interval = 10, -- 健康检查间隔(秒)
|
||
max_fails = 3, -- 最大失败次数
|
||
fail_timeout = 30, -- 故障剔除时长(秒)
|
||
},
|
||
|
||
order_service = {
|
||
nodes = {
|
||
{ host = "10.0.0.20", port = 8081, weight = 1 },
|
||
{ host = "10.0.0.21", port = 8081, weight = 1 },
|
||
},
|
||
health_check_interval = 10,
|
||
max_fails = 5,
|
||
fail_timeout = 60,
|
||
},
|
||
|
||
product_service = {
|
||
nodes = {
|
||
{ host = "10.0.0.30", port = 8082, weight = 1 },
|
||
},
|
||
health_check_interval = 15,
|
||
max_fails = 3,
|
||
fail_timeout = 30,
|
||
},
|
||
}
|
||
|
||
-- ============================================================
|
||
-- 健康状态存储(共享字典)
|
||
-- ============================================================
|
||
|
||
local health_dict = ngx.shared.upstream_health
|
||
|
||
--- 初始化上游节点健康状态
|
||
local function init_health(upstream_name, node_idx)
|
||
local key = upstream_name .. ":" .. node_idx
|
||
local existing = health_dict:get(key)
|
||
if not existing then
|
||
health_dict:set(key, cjson.encode({
|
||
fails = 0,
|
||
last_fail = 0,
|
||
last_check = ngx.now(),
|
||
healthy = true,
|
||
}))
|
||
end
|
||
end
|
||
|
||
-- ============================================================
|
||
-- 负载均衡器:加权轮询
|
||
-- ============================================================
|
||
|
||
local lb_state = {} -- 本地轮询状态,per-worker
|
||
|
||
--- 加权轮询选择节点
|
||
local function weighted_round_robin(upstream_name, config)
|
||
local nodes = config.nodes
|
||
local total_weight = 0
|
||
local candidates = {}
|
||
|
||
-- 过滤掉不健康的节点
|
||
for i, node in ipairs(nodes) do
|
||
init_health(upstream_name, i)
|
||
local key = upstream_name .. ":" .. i
|
||
local state = cjson.decode(health_dict:get(key))
|
||
|
||
if state.healthy then
|
||
table.insert(candidates, { node = node, index = i, weight = node.weight })
|
||
total_weight = total_weight + node.weight
|
||
end
|
||
end
|
||
|
||
if #candidates == 0 then
|
||
-- 所有节点均不可用,回退到第一个节点
|
||
return nodes[1], 1
|
||
end
|
||
|
||
-- 简单加权轮询
|
||
if not lb_state[upstream_name] then
|
||
lb_state[upstream_name] = { current_weight = 0, index = 0 }
|
||
end
|
||
|
||
local state = lb_state[upstream_name]
|
||
state.index = (state.index % #candidates) + 1
|
||
local candidate = candidates[state.index]
|
||
|
||
return candidate.node, candidate.index
|
||
end
|
||
|
||
-- ============================================================
|
||
-- 上游节点选择(公开接口)
|
||
-- ============================================================
|
||
|
||
--- 选择一个上游节点
|
||
-- @param upstream_name 上游服务名称
|
||
-- @return node table 或 nil
|
||
function _M.select(upstream_name)
|
||
local config = upstreams[upstream_name]
|
||
if not config then
|
||
ngx.log(ngx.ERR, "unknown upstream: ", upstream_name)
|
||
return nil
|
||
end
|
||
|
||
local node, index = weighted_round_robin(upstream_name, config)
|
||
return node
|
||
end
|
||
|
||
-- ============================================================
|
||
-- 健康检查报告
|
||
-- ============================================================
|
||
|
||
--- 报告上游请求成功
|
||
function _M.report_success(upstream_name)
|
||
-- 此处可扩展为更精细的健康评分
|
||
end
|
||
|
||
--- 报告上游请求失败
|
||
function _M.report_failure(upstream_name)
|
||
-- 标记当前 worker 本次请求命中的节点
|
||
-- 实际使用中配合 balancer_by_lua_block 使用 ngx.balancer
|
||
end
|
||
|
||
--- 执行健康检查(应通过定时任务周期性调用)
|
||
-- 可通过 ngx.timer.at 在 init_worker 阶段启动
|
||
function _M.health_check(upstream_name)
|
||
local config = upstreams[upstream_name]
|
||
if not config then
|
||
return
|
||
end
|
||
|
||
for i, node in ipairs(config.nodes) do
|
||
local key = upstream_name .. ":" .. i
|
||
local state = cjson.decode(health_dict:get(key) or "{}")
|
||
|
||
if state.fails and state.fails >= config.max_fails then
|
||
local now = ngx.now()
|
||
if now - (state.last_fail or 0) >= config.fail_timeout then
|
||
-- 故障超时,恢复节点
|
||
ngx.log(ngx.INFO, "recovering node ", node.host, ":", node.port,
|
||
" for ", upstream_name)
|
||
health_dict:set(key, cjson.encode({
|
||
fails = 0,
|
||
last_fail = 0,
|
||
last_check = now,
|
||
healthy = true,
|
||
}))
|
||
end
|
||
end
|
||
|
||
-- 更新检查时间
|
||
if state.last_check then
|
||
state.last_check = now
|
||
health_dict:set(key, cjson.encode(state))
|
||
end
|
||
end
|
||
end
|
||
|
||
--- 启动所有上游的健康检查定时器
|
||
function _M.start_health_checks()
|
||
for name, config in pairs(upstreams) do
|
||
local check_timer = function(premature)
|
||
if premature then
|
||
return
|
||
end
|
||
_M.health_check(name)
|
||
end
|
||
|
||
local ok, err = ngx.timer.every(config.health_check_interval, check_timer)
|
||
if not ok then
|
||
ngx.log(ngx.ERR, "failed to create health check timer for ", name, ": ", err)
|
||
else
|
||
ngx.log(ngx.INFO, "health check started for ", name,
|
||
" (interval: ", config.health_check_interval, "s)")
|
||
end
|
||
end
|
||
end
|
||
|
||
-- ============================================================
|
||
-- 统计信息
|
||
-- ============================================================
|
||
|
||
--- 获取上游服务统计 JSON
|
||
function _M.stats_json()
|
||
local result = {}
|
||
|
||
for name, config in pairs(upstreams) do
|
||
local nodes_info = {}
|
||
for i, node in ipairs(config.nodes) do
|
||
local key = name .. ":" .. i
|
||
local state = cjson.decode(health_dict:get(key) or "{}")
|
||
table.insert(nodes_info, {
|
||
host = node.host,
|
||
port = node.port,
|
||
weight = node.weight,
|
||
healthy = state.healthy ~= false,
|
||
fails = state.fails or 0,
|
||
})
|
||
end
|
||
result[name] = nodes_info
|
||
end
|
||
|
||
local json, err = cjson.encode(result)
|
||
return json or '{"error":"encode_failed"}'
|
||
end
|
||
|
||
return _M
|