- 04-proxy-loadbalancing: 新增第18节主动健康检查详解 - 被动检查vs主动检查对比、NGINX Plus health_check/match指令 - Stream健康检查、gRPC健康检查、开源替代方案 - 新增 MQTT 模块文档 (33): broker负载均衡、Client ID路由 - 新增 OIDC 模块文档 (34): OpenID Connect认证、JWT验证 - 新增 Keyval 模块文档 (35): 动态键值存储、API管理接口 - 新增 流媒体模块文档 (36): HLS/FLV/MP4伪流媒体配置 - 新增 WebDAV 模块文档 (37): 文件共享服务器配置 - 新增 Zone Sync 模块文档 (38): 多节点状态同步 - 新增 HTTP Tunnel 模块文档 (39): HTTP CONNECT代理隧道 - 更新 README.md 目录索引 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1088 lines
26 KiB
Markdown
1088 lines
26 KiB
Markdown
# NGINX MQTT 模块指南
|
||
|
||
## 1. MQTT Preread 模块概述
|
||
|
||
### 1.1 模块介绍
|
||
|
||
`ngx_stream_mqtt_preread_module` 模块用于在 preread 阶段从 MQTT CONNECT 消息中提取客户端信息,而无需终止 MQTT 连接。
|
||
|
||
### 1.2 版本要求
|
||
|
||
- 自 **1.23.4** 版本起可用
|
||
- 属于 **NGINX Plus 商业订阅**功能
|
||
- 支持 MQTT 协议版本 **3.1.1** 和 **5.0**
|
||
|
||
### 1.3 核心用途
|
||
|
||
MQTT Preread 模块主要用于以下场景:
|
||
|
||
1. **基于 Client ID 的路由** - 根据客户端 ID 将连接路由到不同的 MQTT Broker
|
||
2. **基于用户名的路由** - 根据用户名进行负载均衡或访问控制
|
||
3. **透明代理** - 在不解密 MQTT 连接的情况下获取连接元数据
|
||
4. **连接分析** - 记录和分析 MQTT 客户端连接信息
|
||
|
||
### 1.4 工作原理
|
||
|
||
```
|
||
客户端 MQTT 连接
|
||
|
|
||
v
|
||
[NGINX Stream 模块]
|
||
|
|
||
v
|
||
[Preread 阶段] <--- mqtt_preread 在此阶段读取 CONNECT 消息
|
||
|
|
||
v
|
||
[变量提取] <--- $mqtt_preread_clientid, $mqtt_preread_username
|
||
|
|
||
v
|
||
[路由决策] <--- 基于变量进行 upstream 选择
|
||
|
|
||
v
|
||
[代理到后端 MQTT Broker]
|
||
```
|
||
|
||
在 preread 阶段,NGINX 读取 MQTT CONNECT 消息的前几个字节(不超过 16KB),解析其中的 Client ID 和 Username,然后基于这些信息进行路由决策。
|
||
|
||
---
|
||
|
||
## 2. MQTT Filter 模块概述
|
||
|
||
### 2.1 模块介绍
|
||
|
||
`ngx_stream_mqtt_filter_module` 模块提供完整的 MQTT 协议支持,允许修改 CONNECT 消息中的字段。
|
||
|
||
### 2.2 版本要求
|
||
|
||
- 自 **1.23.4** 版本起可用
|
||
- 属于 **NGINX Plus 商业订阅**功能
|
||
- 支持 MQTT 协议版本 **3.1.1** 和 **5.0**
|
||
|
||
### 2.3 核心用途
|
||
|
||
MQTT Filter 模块主要用于以下场景:
|
||
|
||
1. **修改 Client ID** - 为客户端分配统一的 Client ID 格式
|
||
2. **修改用户名/密码** - 进行身份认证信息的转换或注入
|
||
3. **代理认证** - 在代理层添加统一的认证信息
|
||
4. **会话管理** - 控制客户端会话标识
|
||
|
||
### 2.4 与 Preread 模块的区别
|
||
|
||
| 特性 | Preread 模块 | Filter 模块 |
|
||
|------|--------------|-------------|
|
||
| 主要功能 | 读取 CONNECT 信息 | 修改 CONNECT 字段 |
|
||
| 处理阶段 | Preread 阶段 | 代理阶段 |
|
||
| 是否修改数据 | 否(只读) | 是(读写) |
|
||
| 使用场景 | 路由、分析 | 认证、转换 |
|
||
| 性能开销 | 极低 | 较低 |
|
||
|
||
---
|
||
|
||
## 3. 指令详解
|
||
|
||
### 3.1 MQTT Preread 模块指令
|
||
|
||
#### mqtt_preread
|
||
|
||
启用或禁用从 MQTT CONNECT 消息中提取信息。
|
||
|
||
**语法**:
|
||
```nginx
|
||
mqtt_preread on | off;
|
||
```
|
||
|
||
**默认值**:`off`
|
||
|
||
**上下文**:`stream`, `server`
|
||
|
||
**说明**:
|
||
- 在 preread 阶段启用 MQTT CONNECT 消息解析
|
||
- 启用后,可以通过变量 `$mqtt_preread_clientid` 和 `$mqtt_preread_username` 访问提取的信息
|
||
- 仅解析 CONNECT 消息,不修改数据流
|
||
|
||
**配置示例**:
|
||
```nginx
|
||
stream {
|
||
server {
|
||
listen 1883;
|
||
|
||
# 启用 MQTT preread
|
||
mqtt_preread on;
|
||
|
||
# 基于 clientid 路由
|
||
proxy_pass $mqtt_backend;
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 3.2 MQTT Filter 模块指令
|
||
|
||
#### mqtt
|
||
|
||
为给定虚拟服务器启用 MQTT 协议支持。
|
||
|
||
**语法**:
|
||
```nginx
|
||
mqtt on | off;
|
||
```
|
||
|
||
**默认值**:`off`
|
||
|
||
**上下文**:`stream`, `server`
|
||
|
||
**说明**:
|
||
- 启用后可以使用其他 MQTT 相关指令
|
||
- 必须在 `mqtt_set_connect` 之前启用
|
||
|
||
**配置示例**:
|
||
```nginx
|
||
stream {
|
||
server {
|
||
listen 1883;
|
||
|
||
# 启用 MQTT 支持
|
||
mqtt on;
|
||
|
||
proxy_pass backend;
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
#### mqtt_buffers
|
||
|
||
设置单个连接处理 MQTT 消息的缓冲区数量和大小。
|
||
|
||
**语法**:
|
||
```nginx
|
||
mqtt_buffers number size;
|
||
```
|
||
|
||
**默认值**:`100 1k;`
|
||
|
||
**上下文**:`stream`, `server`
|
||
|
||
**版本要求**:1.25.1+
|
||
|
||
**说明**:
|
||
- 控制用于处理 MQTT 消息的缓冲区配置
|
||
- 较大的缓冲区可以处理更大的 MQTT 消息
|
||
- 根据预期的消息大小和并发连接数调整
|
||
|
||
**配置示例**:
|
||
```nginx
|
||
stream {
|
||
server {
|
||
listen 1883;
|
||
mqtt on;
|
||
|
||
# 设置缓冲区:50 个缓冲区,每个 4KB
|
||
mqtt_buffers 50 4k;
|
||
|
||
proxy_pass backend;
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
#### mqtt_rewrite_buffer_size
|
||
|
||
设置用于写入修改后消息的缓冲区大小。
|
||
|
||
**语法**:
|
||
```nginx
|
||
mqtt_rewrite_buffer_size size;
|
||
```
|
||
|
||
**默认值**:`4k` 或 `8k`(取决于平台内存页大小)
|
||
|
||
**上下文**:`server`
|
||
|
||
**版本要求**:1.25.1+
|
||
|
||
**废弃状态**:已废弃,建议使用 `mqtt_buffers`
|
||
|
||
**说明**:
|
||
- 该指令在 1.25.1 版本中已被废弃
|
||
- 请使用 `mqtt_buffers` 替代
|
||
|
||
---
|
||
|
||
#### mqtt_set_connect
|
||
|
||
设置 CONNECT 消息的字段为给定值。
|
||
|
||
**语法**:
|
||
```nginx
|
||
mqtt_set_connect field value;
|
||
```
|
||
|
||
**默认值**:无
|
||
|
||
**上下文**:`server`
|
||
|
||
**说明**:
|
||
- 支持修改的字段:`clientid`, `username`, `password`
|
||
- 值可以包含文本、变量及其组合
|
||
- 可以在同一级别指定多个指令
|
||
|
||
**可用字段**:
|
||
|
||
| 字段 | 说明 |
|
||
|------|------|
|
||
| `clientid` | MQTT 客户端标识符 |
|
||
| `username` | 连接用户名 |
|
||
| `password` | 连接密码 |
|
||
|
||
**配置示例**:
|
||
```nginx
|
||
stream {
|
||
server {
|
||
listen 18883;
|
||
proxy_pass backend;
|
||
proxy_buffer_size 16k;
|
||
|
||
mqtt on;
|
||
|
||
# 设置 Client ID
|
||
mqtt_set_connect clientid "$client";
|
||
|
||
# 设置用户名
|
||
mqtt_set_connect username "$name";
|
||
|
||
# 设置密码(从变量获取)
|
||
mqtt_set_connect password "$mqtt_password";
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 4. 嵌入变量
|
||
|
||
### 4.1 MQTT Preread 变量
|
||
|
||
| 变量 | 说明 |
|
||
|------|------|
|
||
| `$mqtt_preread_clientid` | CONNECT 消息中的 Client ID 值 |
|
||
| `$mqtt_preread_username` | CONNECT 消息中的 Username 值 |
|
||
|
||
**变量使用示例**:
|
||
```nginx
|
||
stream {
|
||
# 使用 map 基于 clientid 路由
|
||
map $mqtt_preread_clientid $backend_pool {
|
||
~^device-1 backend_1;
|
||
~^device-2 backend_2;
|
||
~^sensor-.* sensors_backend;
|
||
default default_backend;
|
||
}
|
||
|
||
server {
|
||
listen 1883;
|
||
mqtt_preread on;
|
||
|
||
proxy_pass $backend_pool;
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 5. 配置示例
|
||
|
||
### 5.1 基础 MQTT 代理
|
||
|
||
简单的 MQTT 代理配置,将所有连接转发到后端 Broker:
|
||
|
||
```nginx
|
||
stream {
|
||
upstream mqtt_backend {
|
||
server 192.168.1.10:1883;
|
||
server 192.168.1.11:1883 backup;
|
||
}
|
||
|
||
server {
|
||
listen 1883;
|
||
proxy_pass mqtt_backend;
|
||
proxy_timeout 30s;
|
||
proxy_connect_timeout 5s;
|
||
}
|
||
}
|
||
```
|
||
|
||
### 5.2 基于 Client ID 的路由
|
||
|
||
根据 MQTT Client ID 将连接路由到不同的后端:
|
||
|
||
```nginx
|
||
stream {
|
||
# 设备组 1:工业传感器
|
||
upstream sensors_backend {
|
||
server 10.0.1.10:1883 weight=5;
|
||
server 10.0.1.11:1883;
|
||
}
|
||
|
||
# 设备组 2:智能家居设备
|
||
upstream home_backend {
|
||
server 10.0.2.10:1883;
|
||
server 10.0.2.11:1883;
|
||
}
|
||
|
||
# 设备组 3:车联网
|
||
upstream vehicle_backend {
|
||
server 10.0.3.10:1883;
|
||
server 10.0.3.11:1883;
|
||
}
|
||
|
||
# 默认后端
|
||
upstream default_backend {
|
||
server 10.0.0.10:1883;
|
||
}
|
||
|
||
# 基于 Client ID 的路由映射
|
||
map $mqtt_preread_clientid $target_backend {
|
||
# 传感器设备(匹配 sensor- 开头的 Client ID)
|
||
~^sensor- sensors_backend;
|
||
|
||
# 智能家居设备(匹配 home- 开头的 Client ID)
|
||
~^home- home_backend;
|
||
|
||
# 车载设备(匹配 vehicle- 或 car- 开头的 Client ID)
|
||
~^vehicle- vehicle_backend;
|
||
~^car- vehicle_backend;
|
||
|
||
# 默认后端
|
||
default default_backend;
|
||
}
|
||
|
||
server {
|
||
listen 1883;
|
||
|
||
# 启用 MQTT preread
|
||
mqtt_preread on;
|
||
|
||
# 基于 Client ID 路由
|
||
proxy_pass $target_backend;
|
||
|
||
# 连接超时配置
|
||
proxy_timeout 300s;
|
||
proxy_connect_timeout 10s;
|
||
|
||
# 启用 TCP keepalive
|
||
proxy_socket_keepalive on;
|
||
}
|
||
}
|
||
```
|
||
|
||
### 5.3 基于用户名的负载均衡
|
||
|
||
根据用户名进行路由,适用于多租户场景:
|
||
|
||
```nginx
|
||
stream {
|
||
# 租户 A 集群
|
||
upstream tenant_a_backend {
|
||
server 10.0.10.10:1883;
|
||
server 10.0.10.11:1883;
|
||
}
|
||
|
||
# 租户 B 集群
|
||
upstream tenant_b_backend {
|
||
server 10.0.20.10:1883;
|
||
server 10.0.20.11:1883;
|
||
}
|
||
|
||
# 管理员集群
|
||
upstream admin_backend {
|
||
server 10.0.0.10:1883;
|
||
}
|
||
|
||
# 基于用户名路由
|
||
map $mqtt_preread_username $tenant_backend {
|
||
"tenant-a-user" tenant_a_backend;
|
||
"tenant-a-admin" tenant_a_backend;
|
||
"tenant-b-user" tenant_b_backend;
|
||
"tenant-b-admin" tenant_b_backend;
|
||
~^admin-.* admin_backend;
|
||
default tenant_a_backend;
|
||
}
|
||
|
||
server {
|
||
listen 1883;
|
||
mqtt_preread on;
|
||
|
||
proxy_pass $tenant_backend;
|
||
proxy_timeout 60s;
|
||
}
|
||
}
|
||
```
|
||
|
||
### 5.4 修改 CONNECT 消息(Filter 模块)
|
||
|
||
在代理层修改 MQTT CONNECT 消息中的字段:
|
||
|
||
```nginx
|
||
stream {
|
||
upstream mqtt_backend {
|
||
server 192.168.1.10:1883;
|
||
}
|
||
|
||
server {
|
||
listen 1883;
|
||
proxy_pass mqtt_backend;
|
||
proxy_buffer_size 16k;
|
||
|
||
# 启用 MQTT Filter
|
||
mqtt on;
|
||
|
||
# 设置固定的 Client ID 前缀(追加原始 ID)
|
||
mqtt_set_connect clientid "ng-$mqtt_preread_clientid";
|
||
|
||
# 注入代理认证用户名
|
||
mqtt_set_connect username "nginx-proxy";
|
||
|
||
# 设置代理密码(从文件或环境变量获取)
|
||
mqtt_set_connect password "$proxy_mqtt_password";
|
||
}
|
||
}
|
||
```
|
||
|
||
### 5.5 综合配置示例
|
||
|
||
结合 Preread 和 Filter 模块的完整配置:
|
||
|
||
```nginx
|
||
stream {
|
||
# 日志格式
|
||
log_format mqtt_log '$remote_addr [$time_local] '
|
||
'clientid:$mqtt_preread_clientid '
|
||
'username:$mqtt_preread_username '
|
||
'upstream:$upstream_addr '
|
||
'status:$status';
|
||
|
||
# 设备专用集群
|
||
upstream device_cluster_a {
|
||
zone devices 64k;
|
||
server 10.0.1.10:1883 weight=5;
|
||
server 10.0.1.11:1883;
|
||
server 10.0.1.12:1883 backup;
|
||
}
|
||
|
||
upstream device_cluster_b {
|
||
zone devices 64k;
|
||
server 10.0.2.10:1883;
|
||
server 10.0.2.11:1883;
|
||
}
|
||
|
||
# 普通设备集群
|
||
upstream default_devices {
|
||
zone devices 64k;
|
||
least_conn;
|
||
server 10.0.3.10:1883;
|
||
server 10.0.3.11:1883;
|
||
server 10.0.3.12:1883;
|
||
}
|
||
|
||
# Client ID 到后端映射
|
||
map $mqtt_preread_clientid $backend_pool {
|
||
~^dev-a- device_cluster_a;
|
||
~^device-a- device_cluster_a;
|
||
~^dev-b- device_cluster_b;
|
||
~^device-b- device_cluster_b;
|
||
default default_devices;
|
||
}
|
||
|
||
# 服务器配置
|
||
server {
|
||
listen 1883;
|
||
access_log /var/log/nginx/mqtt-access.log mqtt_log;
|
||
|
||
# 启用 MQTT Preread 获取 Client ID 和用户名
|
||
mqtt_preread on;
|
||
|
||
# 启用 MQTT Filter(可选,需要修改 CONNECT 时启用)
|
||
mqtt on;
|
||
mqtt_buffers 50 4k;
|
||
|
||
# 可选:修改 CONNECT 消息
|
||
# mqtt_set_connect clientid "proxy-$mqtt_preread_clientid";
|
||
|
||
# 基于 Client ID 路由
|
||
proxy_pass $backend_pool;
|
||
|
||
# 超时配置
|
||
proxy_timeout 300s;
|
||
proxy_connect_timeout 10s;
|
||
|
||
# 启用 TCP keepalive
|
||
proxy_socket_keepalive on;
|
||
|
||
# 连接限制
|
||
limit_conn mqtt_conn 100;
|
||
}
|
||
|
||
# TLS MQTT 端口
|
||
server {
|
||
listen 8883 ssl;
|
||
|
||
ssl_certificate /etc/nginx/ssl/mqtt.crt;
|
||
ssl_certificate_key /etc/nginx/ssl/mqtt.key;
|
||
ssl_protocols TLSv1.2 TLSv1.3;
|
||
|
||
mqtt_preread on;
|
||
proxy_pass $backend_pool;
|
||
proxy_timeout 300s;
|
||
}
|
||
}
|
||
|
||
# 连接限制共享内存
|
||
limit_conn_zone $binary_remote_addr zone=mqtt_conn:10m;
|
||
```
|
||
|
||
### 5.6 与 SSL/TLS 结合
|
||
|
||
MQTT over TLS 的配置:
|
||
|
||
```nginx
|
||
stream {
|
||
upstream mqtt_ssl_backend {
|
||
server 192.168.1.10:8883;
|
||
server 192.168.1.11:8883;
|
||
}
|
||
|
||
# 终止 TLS 并读取 MQTT 信息
|
||
server {
|
||
listen 8883 ssl;
|
||
|
||
ssl_certificate /etc/nginx/ssl/server.crt;
|
||
ssl_certificate_key /etc/nginx/ssl/server.key;
|
||
ssl_protocols TLSv1.2 TLSv1.3;
|
||
|
||
# 在解密后读取 MQTT 信息
|
||
mqtt_preread on;
|
||
mqtt on;
|
||
|
||
proxy_pass mqtt_ssl_backend;
|
||
|
||
# 上游也使用 SSL
|
||
proxy_ssl on;
|
||
proxy_ssl_protocols TLSv1.2 TLSv1.3;
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 6. 与 Lolly 项目的关系
|
||
|
||
### 6.1 Lolly 项目简介
|
||
|
||
Lolly 是一个使用 Go 语言编写的高性能 HTTP 服务器与反向代理,基于 fasthttp 构建。它提供了 HTTP/3、WebSocket、TCP/UDP Stream 代理等功能。
|
||
|
||
### 6.2 功能对比
|
||
|
||
| 特性 | NGINX Plus (MQTT) | Lolly |
|
||
|------|-------------------|-------|
|
||
| MQTT Preread | 支持(商业版) | 未实现 |
|
||
| MQTT Filter | 支持(商业版) | 未实现 |
|
||
| TCP Stream 代理 | 支持 | 支持 |
|
||
| 基于内容路由 | 支持 | 有限支持 |
|
||
| SSL/TLS 终端 | 支持 | 支持 |
|
||
| 负载均衡 | 丰富算法 | 轮询/加权/最少连接/IP哈希 |
|
||
|
||
### 6.3 Lolly 中的 Stream 代理
|
||
|
||
Lolly 目前支持基础的 TCP/UDP Stream 代理:
|
||
|
||
```go
|
||
// Lolly Stream 代理示例配置(YAML)
|
||
stream:
|
||
- listen: ":1883"
|
||
protocol: "tcp"
|
||
upstream:
|
||
targets:
|
||
- addr: "mqtt1:1883"
|
||
weight: 3
|
||
- addr: "mqtt2:1883"
|
||
weight: 1
|
||
load_balance: "round_robin"
|
||
```
|
||
|
||
当前 Lolly 的 Stream 实现位于 `internal/stream/stream.go`,提供:
|
||
- TCP/UDP 代理
|
||
- 负载均衡(轮询、加权轮询、最少连接、IP 哈希)
|
||
- 健康检查
|
||
- 会话管理(UDP)
|
||
|
||
### 6.4 在 Lolly 中实现 MQTT 支持的方案
|
||
|
||
#### 方案 1:独立 MQTT Preread 中间件
|
||
|
||
在 Lolly 的 Stream 模块中添加 MQTT Preread 功能:
|
||
|
||
```go
|
||
// internal/stream/mqtt_preread.go
|
||
package stream
|
||
|
||
import (
|
||
"bufio"
|
||
"encoding/binary"
|
||
"io"
|
||
"net"
|
||
)
|
||
|
||
// MQTTPrereadConfig MQTT Preread 配置
|
||
type MQTTPrereadConfig struct {
|
||
Enabled bool
|
||
OnClientID func(clientID string) string // 路由回调
|
||
OnUsername func(username string) string // 认证回调
|
||
}
|
||
|
||
// MQTTConnectInfo 解析后的 MQTT CONNECT 信息
|
||
type MQTTConnectInfo struct {
|
||
ClientID string
|
||
Username string
|
||
Password []byte
|
||
Protocol byte
|
||
}
|
||
|
||
// ParseMQTTConnect 从连接读取并解析 MQTT CONNECT 消息
|
||
func ParseMQTTConnect(conn net.Conn) (*MQTTConnectInfo, error) {
|
||
// 1. 读取固定头(2-5 字节)
|
||
// 2. 读取剩余长度
|
||
// 3. 读取可变头(协议名、协议级别、连接标志)
|
||
// 4. 读取 Payload(Client ID、Will Topic、Will Message、Username、Password)
|
||
// 5. 返回解析结果
|
||
}
|
||
```
|
||
|
||
#### 方案 2:基于配置的路由
|
||
|
||
在 Lolly 配置中添加 MQTT 路由规则:
|
||
|
||
```yaml
|
||
stream:
|
||
- listen: ":1883"
|
||
protocol: "tcp"
|
||
mqtt_preread: true
|
||
routes:
|
||
- match: "clientid =~ ^sensor-"
|
||
upstream: sensors
|
||
- match: "clientid =~ ^home-"
|
||
upstream: home_devices
|
||
- match: "username == admin"
|
||
upstream: admin_cluster
|
||
upstreams:
|
||
sensors:
|
||
targets:
|
||
- addr: "mqtt-sensors-1:1883"
|
||
- addr: "mqtt-sensors-2:1883"
|
||
home_devices:
|
||
targets:
|
||
- addr: "mqtt-home-1:1883"
|
||
admin_cluster:
|
||
targets:
|
||
- addr: "mqtt-admin-1:1883"
|
||
```
|
||
|
||
#### 方案 3:与现有 Stream 模块集成
|
||
|
||
扩展现有的 `internal/stream/stream.go`:
|
||
|
||
```go
|
||
// Target MQTT 扩展
|
||
type Target struct {
|
||
addr string
|
||
weight int
|
||
healthy atomic.Bool
|
||
conns int64
|
||
|
||
// MQTT 特定字段
|
||
mqttMatcher func(*MQTTConnectInfo) bool // 匹配函数
|
||
}
|
||
|
||
// Upstream 添加 MQTT 选择支持
|
||
type Upstream struct {
|
||
name string
|
||
targets []*Target
|
||
balancer Balancer
|
||
|
||
// MQTT 路由表
|
||
mqttRoutes map[string][]*Target // 标签 -> 目标列表
|
||
}
|
||
|
||
// SelectByMQTT 基于 MQTT CONNECT 信息选择目标
|
||
func (u *Upstream) SelectByMQTT(info *MQTTConnectInfo) *Target {
|
||
// 1. 检查 MQTT 路由表
|
||
// 2. 回退到默认负载均衡
|
||
}
|
||
```
|
||
|
||
### 6.5 实现建议
|
||
|
||
#### 短期建议(PoC 验证)
|
||
|
||
1. **实现基础 MQTT CONNECT 解析器**
|
||
- 支持 MQTT 3.1.1 和 5.0
|
||
- 提取 Client ID、Username、Password
|
||
- 位置:`internal/stream/mqtt.go`
|
||
|
||
2. **添加基于 Client ID 的路由**
|
||
- 简单的正则匹配
|
||
- 配置文件支持
|
||
- 与现有 upstream 集成
|
||
|
||
3. **性能测试**
|
||
- 对比有无 preread 的性能差异
|
||
- 内存占用分析
|
||
|
||
#### 中期建议(功能完善)
|
||
|
||
1. **完整 MQTT Filter 支持**
|
||
- 支持修改 CONNECT 字段
|
||
- 支持消息重写
|
||
- 配置热重载
|
||
|
||
2. **监控与日志**
|
||
- MQTT 特定指标(连接数、消息数)
|
||
- 结构化日志输出
|
||
|
||
3. **安全增强**
|
||
- 基于 Client ID 的访问控制
|
||
- 速率限制
|
||
|
||
#### 长期建议(企业级功能)
|
||
|
||
1. **MQTT 5.0 完整支持**
|
||
- 用户属性处理
|
||
- 共享订阅支持
|
||
- 消息过期处理
|
||
|
||
2. **高级路由**
|
||
- 基于 Topic 的路由(需要解析 PUBLISH/SUBSCRIBE)
|
||
- 动态后端发现
|
||
|
||
3. **与 HTTP 层的集成**
|
||
- 统一的配置管理
|
||
- 共享的健康检查
|
||
|
||
### 6.6 代码示例
|
||
|
||
以下是在 Lolly 中实现 MQTT Preread 的参考代码:
|
||
|
||
```go
|
||
// internal/stream/mqtt.go
|
||
package stream
|
||
|
||
import (
|
||
"bufio"
|
||
"encoding/binary"
|
||
"fmt"
|
||
"io"
|
||
"net"
|
||
)
|
||
|
||
const (
|
||
// MQTT 控制包类型
|
||
mqttCONNECT = 1
|
||
|
||
// MQTT 协议名
|
||
mqttProtocol311 = "MQTT" // v3.1.1
|
||
mqttProtocol31 = "MQIsdp" // v3.1
|
||
)
|
||
|
||
// MQTTPrereadHandler MQTT Preread 处理器
|
||
type MQTTPrereadHandler struct {
|
||
config *MQTTPrereadConfig
|
||
}
|
||
|
||
// NewMQTTPrereadHandler 创建处理器
|
||
func NewMQTTPrereadHandler(config *MQTTPrereadConfig) *MQTTPrereadHandler {
|
||
return &MQTTPrereadHandler{config: config}
|
||
}
|
||
|
||
// Handle 处理连接,解析 MQTT CONNECT 并返回信息
|
||
func (h *MQTTPrereadHandler) Handle(conn net.Conn) (*MQTTConnectInfo, net.Conn, error) {
|
||
// 使用 bufio 预读数据
|
||
reader := bufio.NewReaderSize(conn, 4096)
|
||
|
||
// 读取固定头第一个字节
|
||
firstByte, err := reader.Peek(1)
|
||
if err != nil {
|
||
return nil, nil, err
|
||
}
|
||
|
||
// 检查是否为 CONNECT 包
|
||
packetType := (firstByte[0] >> 4) & 0x0F
|
||
if packetType != mqttCONNECT {
|
||
return nil, nil, fmt.Errorf("expected CONNECT packet, got %d", packetType)
|
||
}
|
||
|
||
// 读取剩余长度(可变长度编码)
|
||
remainingLen, headerLen, err := decodeRemainingLength(reader)
|
||
if err != nil {
|
||
return nil, nil, err
|
||
}
|
||
|
||
// 读取完整的 CONNECT 包
|
||
totalLen := 1 + headerLen + remainingLen
|
||
packet, err := reader.Peek(totalLen)
|
||
if err != nil {
|
||
return nil, nil, err
|
||
}
|
||
|
||
// 解析 CONNECT 包
|
||
info, err := parseConnectPacket(packet[1+headerLen:])
|
||
if err != nil {
|
||
return nil, nil, err
|
||
}
|
||
|
||
// 创建包装连接,将预读的数据返回给后续处理
|
||
wrappedConn := &mqttConn{
|
||
Conn: conn,
|
||
reader: reader,
|
||
peeked: totalLen,
|
||
}
|
||
|
||
return info, wrappedConn, nil
|
||
}
|
||
|
||
// decodeRemainingLength 解码 MQTT 剩余长度字段
|
||
func decodeRemainingLength(r *bufio.Reader) (int, int, error) {
|
||
var value int
|
||
var multiplier int = 1
|
||
var headerLen int
|
||
|
||
for {
|
||
b, err := r.ReadByte()
|
||
if err != nil {
|
||
return 0, 0, err
|
||
}
|
||
headerLen++
|
||
|
||
value += int(b&0x7F) * multiplier
|
||
multiplier *= 128
|
||
|
||
if (b & 0x80) == 0 {
|
||
break
|
||
}
|
||
|
||
if multiplier > 128*128*128 {
|
||
return 0, 0, fmt.Errorf("malformed remaining length")
|
||
}
|
||
}
|
||
|
||
return value, headerLen, nil
|
||
}
|
||
|
||
// parseConnectPacket 解析 CONNECT 包体
|
||
func parseConnectPacket(data []byte) (*MQTTConnectInfo, error) {
|
||
info := &MQTTConnectInfo{}
|
||
offset := 0
|
||
|
||
// 读取协议名长度
|
||
protoLen := binary.BigEndian.Uint16(data[offset:])
|
||
offset += 2
|
||
|
||
// 读取协议名
|
||
protoName := string(data[offset : offset+int(protoLen)])
|
||
offset += int(protoLen)
|
||
|
||
// 判断协议版本
|
||
if protoName == mqttProtocol311 {
|
||
info.Protocol = 4 // MQTT 3.1.1
|
||
} else if protoName == mqttProtocol31 {
|
||
info.Protocol = 3 // MQTT 3.1
|
||
}
|
||
|
||
// 读取协议级别
|
||
protocolLevel := data[offset]
|
||
offset++
|
||
_ = protocolLevel
|
||
|
||
// 读取连接标志
|
||
connectFlags := data[offset]
|
||
offset++
|
||
|
||
usernameFlag := (connectFlags >> 7) & 1
|
||
passwordFlag := (connectFlags >> 6) & 1
|
||
// willRetain := (connectFlags >> 5) & 1
|
||
willQoS := (connectFlags >> 3) & 3
|
||
willFlag := (connectFlags >> 2) & 1
|
||
// cleanSession := (connectFlags >> 1) & 1
|
||
|
||
// 读取保持连接时间(跳过)
|
||
offset += 2
|
||
|
||
// 读取 Client ID
|
||
clientIDLen := binary.BigEndian.Uint16(data[offset:])
|
||
offset += 2
|
||
info.ClientID = string(data[offset : offset+int(clientIDLen)])
|
||
offset += int(clientIDLen)
|
||
|
||
// 读取 Will Topic 和 Will Message(如果有)
|
||
if willFlag == 1 {
|
||
// Will Topic
|
||
willTopicLen := binary.BigEndian.Uint16(data[offset:])
|
||
offset += 2
|
||
offset += int(willTopicLen)
|
||
|
||
// Will Message
|
||
willMsgLen := binary.BigEndian.Uint16(data[offset:])
|
||
offset += 2
|
||
offset += int(willMsgLen)
|
||
}
|
||
|
||
// 读取 Username(如果有)
|
||
if usernameFlag == 1 {
|
||
usernameLen := binary.BigEndian.Uint16(data[offset:])
|
||
offset += 2
|
||
info.Username = string(data[offset : offset+int(usernameLen)])
|
||
offset += int(usernameLen)
|
||
}
|
||
|
||
// 读取 Password(如果有)
|
||
if passwordFlag == 1 {
|
||
passwordLen := binary.BigEndian.Uint16(data[offset:])
|
||
offset += 2
|
||
info.Password = data[offset : offset+int(passwordLen)]
|
||
}
|
||
|
||
return info, nil
|
||
}
|
||
|
||
// mqttConn 包装连接,支持预读数据回退
|
||
type mqttConn struct {
|
||
net.Conn
|
||
reader *bufio.Reader
|
||
peeked int
|
||
}
|
||
|
||
func (c *mqttConn) Read(p []byte) (n int, err error) {
|
||
return c.reader.Read(p)
|
||
}
|
||
```
|
||
|
||
### 6.7 测试验证
|
||
|
||
添加 MQTT Preread 的单元测试:
|
||
|
||
```go
|
||
// internal/stream/mqtt_test.go
|
||
package stream
|
||
|
||
import (
|
||
"bytes"
|
||
"testing"
|
||
)
|
||
|
||
func TestParseConnectPacket(t *testing.T) {
|
||
// 构造一个 MQTT 3.1.1 CONNECT 包
|
||
// CONNECT + 剩余长度 + 协议名长度(4) + "MQTT" + 协议级别(4) + 连接标志 + 保持连接 + Client ID
|
||
packet := []byte{
|
||
// 可变头开始
|
||
0x00, 0x04, // 协议名长度 = 4
|
||
'M', 'Q', 'T', 'T', // 协议名 "MQTT"
|
||
0x04, // 协议级别 4 (3.1.1)
|
||
0xC2, // 连接标志: 用户名(1) + 密码(1) + Clean Session(0)
|
||
0x00, 0x3C, // 保持连接 = 60 秒
|
||
// Payload
|
||
0x00, 0x0A, // Client ID 长度 = 10
|
||
't', 'e', 's', 't', '-', 'c', 'l', 'i', 'e', 'n', 't', // Client ID
|
||
0x00, 0x05, // 用户名长度 = 5
|
||
'a', 'd', 'm', 'i', 'n', // 用户名
|
||
0x00, 0x08, // 密码长度 = 8
|
||
'p', 'a', 's', 's', 'w', 'o', 'r', 'd', // 密码
|
||
}
|
||
|
||
info, err := parseConnectPacket(packet)
|
||
if err != nil {
|
||
t.Fatalf("parseConnectPacket failed: %v", err)
|
||
}
|
||
|
||
if info.ClientID != "test-client" {
|
||
t.Errorf("ClientID = %q, want %q", info.ClientID, "test-client")
|
||
}
|
||
|
||
if info.Username != "admin" {
|
||
t.Errorf("Username = %q, want %q", info.Username, "admin")
|
||
}
|
||
|
||
if info.Protocol != 4 {
|
||
t.Errorf("Protocol = %d, want %d", info.Protocol, 4)
|
||
}
|
||
}
|
||
|
||
func TestDecodeRemainingLength(t *testing.T) {
|
||
tests := []struct {
|
||
input []byte
|
||
expected int
|
||
}{
|
||
{[]byte{0x00}, 0},
|
||
{[]byte{0x7F}, 127},
|
||
{[]byte{0x80, 0x01}, 128},
|
||
{[]byte{0xFF, 0x7F}, 16383},
|
||
{[]byte{0x80, 0x80, 0x01}, 16384},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
r := bufio.NewReader(bytes.NewReader(tt.input))
|
||
value, _, err := decodeRemainingLength(r)
|
||
if err != nil {
|
||
t.Errorf("decodeRemainingLength(%v) error: %v", tt.input, err)
|
||
continue
|
||
}
|
||
if value != tt.expected {
|
||
t.Errorf("decodeRemainingLength(%v) = %d, want %d", tt.input, value, tt.expected)
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 7. 总结
|
||
|
||
### 7.1 NGINX MQTT 模块要点
|
||
|
||
1. **Preread 模块** (`ngx_stream_mqtt_preread_module`)
|
||
- 只读解析 MQTT CONNECT 消息
|
||
- 提取 Client ID 和 Username 用于路由
|
||
- 适用于基于内容的负载均衡
|
||
|
||
2. **Filter 模块** (`ngx_stream_mqtt_filter_module`)
|
||
- 支持修改 MQTT CONNECT 字段
|
||
- 可用于代理认证和会话管理
|
||
- 与 Preread 模块可以配合使用
|
||
|
||
3. **商业订阅限制**
|
||
- 两个模块都需要 NGINX Plus 商业订阅
|
||
- 自 1.23.4 版本起可用
|
||
|
||
### 7.2 适用场景
|
||
|
||
- **物联网平台** - 海量设备接入和路由
|
||
- **多租户 MQTT** - 基于用户名隔离租户
|
||
- **边缘网关** - 设备接入层代理
|
||
- **MQTT 迁移** - 平滑迁移到新 Broker
|
||
|
||
### 7.3 Lolly 实现建议优先级
|
||
|
||
1. **P0** - 基础 MQTT CONNECT 解析器
|
||
2. **P1** - 基于 Client ID 的路由
|
||
3. **P2** - MQTT Filter(修改 CONNECT)
|
||
4. **P3** - MQTT 5.0 高级特性
|
||
|
||
### 7.4 参考资料
|
||
|
||
- [NGINX MQTT Preread Module](https://nginx.org/en/docs/stream/ngx_stream_mqtt_preread_module.html)
|
||
- [NGINX MQTT Filter Module](https://nginx.org/en/docs/stream/ngx_stream_mqtt_filter_module.html)
|
||
- [MQTT 3.1.1 Specification](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html)
|
||
- [MQTT 5.0 Specification](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html)
|