lolly/docs/nginx/33-nginx-mqtt-module.md
xfy 972eab4267 refactor(docs): 重构文档目录结构,nginx 文档移至子目录
将 docs/ 根目录下的 nginx 相关文档统一移动到 docs/nginx/ 子目录,
提高文档组织性和可维护性。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-16 10:48:14 +08:00

1088 lines
26 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# NGINX MQTT 模块指南
## 1. MQTT Preread 模块概述
### 1.1 模块介绍
`ngx_stream_mqtt_preread_module` 模块用于在 preread 阶段从 MQTT CONNECT 消息中提取客户端信息,而无需终止 MQTT 连接。
### 1.2 版本要求
-**1.23.4** 版本起可用
- 属于 **NGINX Plus 商业订阅**功能
- 支持 MQTT 协议版本 **3.1.1****5.0**
### 1.3 核心用途
MQTT Preread 模块主要用于以下场景:
1. **基于 Client ID 的路由** - 根据客户端 ID 将连接路由到不同的 MQTT Broker
2. **基于用户名的路由** - 根据用户名进行负载均衡或访问控制
3. **透明代理** - 在不解密 MQTT 连接的情况下获取连接元数据
4. **连接分析** - 记录和分析 MQTT 客户端连接信息
### 1.4 工作原理
```
客户端 MQTT 连接
|
v
[NGINX Stream 模块]
|
v
[Preread 阶段] <--- mqtt_preread 在此阶段读取 CONNECT 消息
|
v
[变量提取] <--- $mqtt_preread_clientid, $mqtt_preread_username
|
v
[路由决策] <--- 基于变量进行 upstream 选择
|
v
[代理到后端 MQTT Broker]
```
在 preread 阶段NGINX 读取 MQTT CONNECT 消息的前几个字节(不超过 16KB解析其中的 Client ID 和 Username然后基于这些信息进行路由决策。
---
## 2. MQTT Filter 模块概述
### 2.1 模块介绍
`ngx_stream_mqtt_filter_module` 模块提供完整的 MQTT 协议支持,允许修改 CONNECT 消息中的字段。
### 2.2 版本要求
-**1.23.4** 版本起可用
- 属于 **NGINX Plus 商业订阅**功能
- 支持 MQTT 协议版本 **3.1.1****5.0**
### 2.3 核心用途
MQTT Filter 模块主要用于以下场景:
1. **修改 Client ID** - 为客户端分配统一的 Client ID 格式
2. **修改用户名/密码** - 进行身份认证信息的转换或注入
3. **代理认证** - 在代理层添加统一的认证信息
4. **会话管理** - 控制客户端会话标识
### 2.4 与 Preread 模块的区别
| 特性 | Preread 模块 | Filter 模块 |
|------|--------------|-------------|
| 主要功能 | 读取 CONNECT 信息 | 修改 CONNECT 字段 |
| 处理阶段 | Preread 阶段 | 代理阶段 |
| 是否修改数据 | 否(只读) | 是(读写) |
| 使用场景 | 路由、分析 | 认证、转换 |
| 性能开销 | 极低 | 较低 |
---
## 3. 指令详解
### 3.1 MQTT Preread 模块指令
#### mqtt_preread
启用或禁用从 MQTT CONNECT 消息中提取信息。
**语法**
```nginx
mqtt_preread on | off;
```
**默认值**`off`
**上下文**`stream`, `server`
**说明**
- 在 preread 阶段启用 MQTT CONNECT 消息解析
- 启用后,可以通过变量 `$mqtt_preread_clientid``$mqtt_preread_username` 访问提取的信息
- 仅解析 CONNECT 消息,不修改数据流
**配置示例**
```nginx
stream {
server {
listen 1883;
# 启用 MQTT preread
mqtt_preread on;
# 基于 clientid 路由
proxy_pass $mqtt_backend;
}
}
```
---
### 3.2 MQTT Filter 模块指令
#### mqtt
为给定虚拟服务器启用 MQTT 协议支持。
**语法**
```nginx
mqtt on | off;
```
**默认值**`off`
**上下文**`stream`, `server`
**说明**
- 启用后可以使用其他 MQTT 相关指令
- 必须在 `mqtt_set_connect` 之前启用
**配置示例**
```nginx
stream {
server {
listen 1883;
# 启用 MQTT 支持
mqtt on;
proxy_pass backend;
}
}
```
---
#### mqtt_buffers
设置单个连接处理 MQTT 消息的缓冲区数量和大小。
**语法**
```nginx
mqtt_buffers number size;
```
**默认值**`100 1k;`
**上下文**`stream`, `server`
**版本要求**1.25.1+
**说明**
- 控制用于处理 MQTT 消息的缓冲区配置
- 较大的缓冲区可以处理更大的 MQTT 消息
- 根据预期的消息大小和并发连接数调整
**配置示例**
```nginx
stream {
server {
listen 1883;
mqtt on;
# 设置缓冲区50 个缓冲区,每个 4KB
mqtt_buffers 50 4k;
proxy_pass backend;
}
}
```
---
#### mqtt_rewrite_buffer_size
设置用于写入修改后消息的缓冲区大小。
**语法**
```nginx
mqtt_rewrite_buffer_size size;
```
**默认值**`4k``8k`(取决于平台内存页大小)
**上下文**`server`
**版本要求**1.25.1+
**废弃状态**:已废弃,建议使用 `mqtt_buffers`
**说明**
- 该指令在 1.25.1 版本中已被废弃
- 请使用 `mqtt_buffers` 替代
---
#### mqtt_set_connect
设置 CONNECT 消息的字段为给定值。
**语法**
```nginx
mqtt_set_connect field value;
```
**默认值**:无
**上下文**`server`
**说明**
- 支持修改的字段:`clientid`, `username`, `password`
- 值可以包含文本、变量及其组合
- 可以在同一级别指定多个指令
**可用字段**
| 字段 | 说明 |
|------|------|
| `clientid` | MQTT 客户端标识符 |
| `username` | 连接用户名 |
| `password` | 连接密码 |
**配置示例**
```nginx
stream {
server {
listen 18883;
proxy_pass backend;
proxy_buffer_size 16k;
mqtt on;
# 设置 Client ID
mqtt_set_connect clientid "$client";
# 设置用户名
mqtt_set_connect username "$name";
# 设置密码(从变量获取)
mqtt_set_connect password "$mqtt_password";
}
}
```
---
## 4. 嵌入变量
### 4.1 MQTT Preread 变量
| 变量 | 说明 |
|------|------|
| `$mqtt_preread_clientid` | CONNECT 消息中的 Client ID 值 |
| `$mqtt_preread_username` | CONNECT 消息中的 Username 值 |
**变量使用示例**
```nginx
stream {
# 使用 map 基于 clientid 路由
map $mqtt_preread_clientid $backend_pool {
~^device-1 backend_1;
~^device-2 backend_2;
~^sensor-.* sensors_backend;
default default_backend;
}
server {
listen 1883;
mqtt_preread on;
proxy_pass $backend_pool;
}
}
```
---
## 5. 配置示例
### 5.1 基础 MQTT 代理
简单的 MQTT 代理配置,将所有连接转发到后端 Broker
```nginx
stream {
upstream mqtt_backend {
server 192.168.1.10:1883;
server 192.168.1.11:1883 backup;
}
server {
listen 1883;
proxy_pass mqtt_backend;
proxy_timeout 30s;
proxy_connect_timeout 5s;
}
}
```
### 5.2 基于 Client ID 的路由
根据 MQTT Client ID 将连接路由到不同的后端:
```nginx
stream {
# 设备组 1工业传感器
upstream sensors_backend {
server 10.0.1.10:1883 weight=5;
server 10.0.1.11:1883;
}
# 设备组 2智能家居设备
upstream home_backend {
server 10.0.2.10:1883;
server 10.0.2.11:1883;
}
# 设备组 3车联网
upstream vehicle_backend {
server 10.0.3.10:1883;
server 10.0.3.11:1883;
}
# 默认后端
upstream default_backend {
server 10.0.0.10:1883;
}
# 基于 Client ID 的路由映射
map $mqtt_preread_clientid $target_backend {
# 传感器设备(匹配 sensor- 开头的 Client ID
~^sensor- sensors_backend;
# 智能家居设备(匹配 home- 开头的 Client ID
~^home- home_backend;
# 车载设备(匹配 vehicle- 或 car- 开头的 Client ID
~^vehicle- vehicle_backend;
~^car- vehicle_backend;
# 默认后端
default default_backend;
}
server {
listen 1883;
# 启用 MQTT preread
mqtt_preread on;
# 基于 Client ID 路由
proxy_pass $target_backend;
# 连接超时配置
proxy_timeout 300s;
proxy_connect_timeout 10s;
# 启用 TCP keepalive
proxy_socket_keepalive on;
}
}
```
### 5.3 基于用户名的负载均衡
根据用户名进行路由,适用于多租户场景:
```nginx
stream {
# 租户 A 集群
upstream tenant_a_backend {
server 10.0.10.10:1883;
server 10.0.10.11:1883;
}
# 租户 B 集群
upstream tenant_b_backend {
server 10.0.20.10:1883;
server 10.0.20.11:1883;
}
# 管理员集群
upstream admin_backend {
server 10.0.0.10:1883;
}
# 基于用户名路由
map $mqtt_preread_username $tenant_backend {
"tenant-a-user" tenant_a_backend;
"tenant-a-admin" tenant_a_backend;
"tenant-b-user" tenant_b_backend;
"tenant-b-admin" tenant_b_backend;
~^admin-.* admin_backend;
default tenant_a_backend;
}
server {
listen 1883;
mqtt_preread on;
proxy_pass $tenant_backend;
proxy_timeout 60s;
}
}
```
### 5.4 修改 CONNECT 消息Filter 模块)
在代理层修改 MQTT CONNECT 消息中的字段:
```nginx
stream {
upstream mqtt_backend {
server 192.168.1.10:1883;
}
server {
listen 1883;
proxy_pass mqtt_backend;
proxy_buffer_size 16k;
# 启用 MQTT Filter
mqtt on;
# 设置固定的 Client ID 前缀(追加原始 ID
mqtt_set_connect clientid "ng-$mqtt_preread_clientid";
# 注入代理认证用户名
mqtt_set_connect username "nginx-proxy";
# 设置代理密码(从文件或环境变量获取)
mqtt_set_connect password "$proxy_mqtt_password";
}
}
```
### 5.5 综合配置示例
结合 Preread 和 Filter 模块的完整配置:
```nginx
stream {
# 日志格式
log_format mqtt_log '$remote_addr [$time_local] '
'clientid:$mqtt_preread_clientid '
'username:$mqtt_preread_username '
'upstream:$upstream_addr '
'status:$status';
# 设备专用集群
upstream device_cluster_a {
zone devices 64k;
server 10.0.1.10:1883 weight=5;
server 10.0.1.11:1883;
server 10.0.1.12:1883 backup;
}
upstream device_cluster_b {
zone devices 64k;
server 10.0.2.10:1883;
server 10.0.2.11:1883;
}
# 普通设备集群
upstream default_devices {
zone devices 64k;
least_conn;
server 10.0.3.10:1883;
server 10.0.3.11:1883;
server 10.0.3.12:1883;
}
# Client ID 到后端映射
map $mqtt_preread_clientid $backend_pool {
~^dev-a- device_cluster_a;
~^device-a- device_cluster_a;
~^dev-b- device_cluster_b;
~^device-b- device_cluster_b;
default default_devices;
}
# 服务器配置
server {
listen 1883;
access_log /var/log/nginx/mqtt-access.log mqtt_log;
# 启用 MQTT Preread 获取 Client ID 和用户名
mqtt_preread on;
# 启用 MQTT Filter可选需要修改 CONNECT 时启用)
mqtt on;
mqtt_buffers 50 4k;
# 可选:修改 CONNECT 消息
# mqtt_set_connect clientid "proxy-$mqtt_preread_clientid";
# 基于 Client ID 路由
proxy_pass $backend_pool;
# 超时配置
proxy_timeout 300s;
proxy_connect_timeout 10s;
# 启用 TCP keepalive
proxy_socket_keepalive on;
# 连接限制
limit_conn mqtt_conn 100;
}
# TLS MQTT 端口
server {
listen 8883 ssl;
ssl_certificate /etc/nginx/ssl/mqtt.crt;
ssl_certificate_key /etc/nginx/ssl/mqtt.key;
ssl_protocols TLSv1.2 TLSv1.3;
mqtt_preread on;
proxy_pass $backend_pool;
proxy_timeout 300s;
}
}
# 连接限制共享内存
limit_conn_zone $binary_remote_addr zone=mqtt_conn:10m;
```
### 5.6 与 SSL/TLS 结合
MQTT over TLS 的配置:
```nginx
stream {
upstream mqtt_ssl_backend {
server 192.168.1.10:8883;
server 192.168.1.11:8883;
}
# 终止 TLS 并读取 MQTT 信息
server {
listen 8883 ssl;
ssl_certificate /etc/nginx/ssl/server.crt;
ssl_certificate_key /etc/nginx/ssl/server.key;
ssl_protocols TLSv1.2 TLSv1.3;
# 在解密后读取 MQTT 信息
mqtt_preread on;
mqtt on;
proxy_pass mqtt_ssl_backend;
# 上游也使用 SSL
proxy_ssl on;
proxy_ssl_protocols TLSv1.2 TLSv1.3;
}
}
```
---
## 6. 与 Lolly 项目的关系
### 6.1 Lolly 项目简介
Lolly 是一个使用 Go 语言编写的高性能 HTTP 服务器与反向代理,基于 fasthttp 构建。它提供了 HTTP/3、WebSocket、TCP/UDP Stream 代理等功能。
### 6.2 功能对比
| 特性 | NGINX Plus (MQTT) | Lolly |
|------|-------------------|-------|
| MQTT Preread | 支持(商业版) | 未实现 |
| MQTT Filter | 支持(商业版) | 未实现 |
| TCP Stream 代理 | 支持 | 支持 |
| 基于内容路由 | 支持 | 有限支持 |
| SSL/TLS 终端 | 支持 | 支持 |
| 负载均衡 | 丰富算法 | 轮询/加权/最少连接/IP哈希 |
### 6.3 Lolly 中的 Stream 代理
Lolly 目前支持基础的 TCP/UDP Stream 代理:
```go
// Lolly Stream 代理示例配置YAML
stream:
- listen: ":1883"
protocol: "tcp"
upstream:
targets:
- addr: "mqtt1:1883"
weight: 3
- addr: "mqtt2:1883"
weight: 1
load_balance: "round_robin"
```
当前 Lolly 的 Stream 实现位于 `internal/stream/stream.go`,提供:
- TCP/UDP 代理
- 负载均衡轮询、加权轮询、最少连接、IP 哈希)
- 健康检查
- 会话管理UDP
### 6.4 在 Lolly 中实现 MQTT 支持的方案
#### 方案 1独立 MQTT Preread 中间件
在 Lolly 的 Stream 模块中添加 MQTT Preread 功能:
```go
// internal/stream/mqtt_preread.go
package stream
import (
"bufio"
"encoding/binary"
"io"
"net"
)
// MQTTPrereadConfig MQTT Preread 配置
type MQTTPrereadConfig struct {
Enabled bool
OnClientID func(clientID string) string // 路由回调
OnUsername func(username string) string // 认证回调
}
// MQTTConnectInfo 解析后的 MQTT CONNECT 信息
type MQTTConnectInfo struct {
ClientID string
Username string
Password []byte
Protocol byte
}
// ParseMQTTConnect 从连接读取并解析 MQTT CONNECT 消息
func ParseMQTTConnect(conn net.Conn) (*MQTTConnectInfo, error) {
// 1. 读取固定头2-5 字节)
// 2. 读取剩余长度
// 3. 读取可变头(协议名、协议级别、连接标志)
// 4. 读取 PayloadClient ID、Will Topic、Will Message、Username、Password
// 5. 返回解析结果
}
```
#### 方案 2基于配置的路由
在 Lolly 配置中添加 MQTT 路由规则:
```yaml
stream:
- listen: ":1883"
protocol: "tcp"
mqtt_preread: true
routes:
- match: "clientid =~ ^sensor-"
upstream: sensors
- match: "clientid =~ ^home-"
upstream: home_devices
- match: "username == admin"
upstream: admin_cluster
upstreams:
sensors:
targets:
- addr: "mqtt-sensors-1:1883"
- addr: "mqtt-sensors-2:1883"
home_devices:
targets:
- addr: "mqtt-home-1:1883"
admin_cluster:
targets:
- addr: "mqtt-admin-1:1883"
```
#### 方案 3与现有 Stream 模块集成
扩展现有的 `internal/stream/stream.go`
```go
// Target MQTT 扩展
type Target struct {
addr string
weight int
healthy atomic.Bool
conns int64
// MQTT 特定字段
mqttMatcher func(*MQTTConnectInfo) bool // 匹配函数
}
// Upstream 添加 MQTT 选择支持
type Upstream struct {
name string
targets []*Target
balancer Balancer
// MQTT 路由表
mqttRoutes map[string][]*Target // 标签 -> 目标列表
}
// SelectByMQTT 基于 MQTT CONNECT 信息选择目标
func (u *Upstream) SelectByMQTT(info *MQTTConnectInfo) *Target {
// 1. 检查 MQTT 路由表
// 2. 回退到默认负载均衡
}
```
### 6.5 实现建议
#### 短期建议PoC 验证)
1. **实现基础 MQTT CONNECT 解析器**
- 支持 MQTT 3.1.1 和 5.0
- 提取 Client ID、Username、Password
- 位置:`internal/stream/mqtt.go`
2. **添加基于 Client ID 的路由**
- 简单的正则匹配
- 配置文件支持
- 与现有 upstream 集成
3. **性能测试**
- 对比有无 preread 的性能差异
- 内存占用分析
#### 中期建议(功能完善)
1. **完整 MQTT Filter 支持**
- 支持修改 CONNECT 字段
- 支持消息重写
- 配置热重载
2. **监控与日志**
- MQTT 特定指标(连接数、消息数)
- 结构化日志输出
3. **安全增强**
- 基于 Client ID 的访问控制
- 速率限制
#### 长期建议(企业级功能)
1. **MQTT 5.0 完整支持**
- 用户属性处理
- 共享订阅支持
- 消息过期处理
2. **高级路由**
- 基于 Topic 的路由(需要解析 PUBLISH/SUBSCRIBE
- 动态后端发现
3. **与 HTTP 层的集成**
- 统一的配置管理
- 共享的健康检查
### 6.6 代码示例
以下是在 Lolly 中实现 MQTT Preread 的参考代码:
```go
// internal/stream/mqtt.go
package stream
import (
"bufio"
"encoding/binary"
"fmt"
"io"
"net"
)
const (
// MQTT 控制包类型
mqttCONNECT = 1
// MQTT 协议名
mqttProtocol311 = "MQTT" // v3.1.1
mqttProtocol31 = "MQIsdp" // v3.1
)
// MQTTPrereadHandler MQTT Preread 处理器
type MQTTPrereadHandler struct {
config *MQTTPrereadConfig
}
// NewMQTTPrereadHandler 创建处理器
func NewMQTTPrereadHandler(config *MQTTPrereadConfig) *MQTTPrereadHandler {
return &MQTTPrereadHandler{config: config}
}
// Handle 处理连接,解析 MQTT CONNECT 并返回信息
func (h *MQTTPrereadHandler) Handle(conn net.Conn) (*MQTTConnectInfo, net.Conn, error) {
// 使用 bufio 预读数据
reader := bufio.NewReaderSize(conn, 4096)
// 读取固定头第一个字节
firstByte, err := reader.Peek(1)
if err != nil {
return nil, nil, err
}
// 检查是否为 CONNECT 包
packetType := (firstByte[0] >> 4) & 0x0F
if packetType != mqttCONNECT {
return nil, nil, fmt.Errorf("expected CONNECT packet, got %d", packetType)
}
// 读取剩余长度(可变长度编码)
remainingLen, headerLen, err := decodeRemainingLength(reader)
if err != nil {
return nil, nil, err
}
// 读取完整的 CONNECT 包
totalLen := 1 + headerLen + remainingLen
packet, err := reader.Peek(totalLen)
if err != nil {
return nil, nil, err
}
// 解析 CONNECT 包
info, err := parseConnectPacket(packet[1+headerLen:])
if err != nil {
return nil, nil, err
}
// 创建包装连接,将预读的数据返回给后续处理
wrappedConn := &mqttConn{
Conn: conn,
reader: reader,
peeked: totalLen,
}
return info, wrappedConn, nil
}
// decodeRemainingLength 解码 MQTT 剩余长度字段
func decodeRemainingLength(r *bufio.Reader) (int, int, error) {
var value int
var multiplier int = 1
var headerLen int
for {
b, err := r.ReadByte()
if err != nil {
return 0, 0, err
}
headerLen++
value += int(b&0x7F) * multiplier
multiplier *= 128
if (b & 0x80) == 0 {
break
}
if multiplier > 128*128*128 {
return 0, 0, fmt.Errorf("malformed remaining length")
}
}
return value, headerLen, nil
}
// parseConnectPacket 解析 CONNECT 包体
func parseConnectPacket(data []byte) (*MQTTConnectInfo, error) {
info := &MQTTConnectInfo{}
offset := 0
// 读取协议名长度
protoLen := binary.BigEndian.Uint16(data[offset:])
offset += 2
// 读取协议名
protoName := string(data[offset : offset+int(protoLen)])
offset += int(protoLen)
// 判断协议版本
if protoName == mqttProtocol311 {
info.Protocol = 4 // MQTT 3.1.1
} else if protoName == mqttProtocol31 {
info.Protocol = 3 // MQTT 3.1
}
// 读取协议级别
protocolLevel := data[offset]
offset++
_ = protocolLevel
// 读取连接标志
connectFlags := data[offset]
offset++
usernameFlag := (connectFlags >> 7) & 1
passwordFlag := (connectFlags >> 6) & 1
// willRetain := (connectFlags >> 5) & 1
willQoS := (connectFlags >> 3) & 3
willFlag := (connectFlags >> 2) & 1
// cleanSession := (connectFlags >> 1) & 1
// 读取保持连接时间(跳过)
offset += 2
// 读取 Client ID
clientIDLen := binary.BigEndian.Uint16(data[offset:])
offset += 2
info.ClientID = string(data[offset : offset+int(clientIDLen)])
offset += int(clientIDLen)
// 读取 Will Topic 和 Will Message如果有
if willFlag == 1 {
// Will Topic
willTopicLen := binary.BigEndian.Uint16(data[offset:])
offset += 2
offset += int(willTopicLen)
// Will Message
willMsgLen := binary.BigEndian.Uint16(data[offset:])
offset += 2
offset += int(willMsgLen)
}
// 读取 Username如果有
if usernameFlag == 1 {
usernameLen := binary.BigEndian.Uint16(data[offset:])
offset += 2
info.Username = string(data[offset : offset+int(usernameLen)])
offset += int(usernameLen)
}
// 读取 Password如果有
if passwordFlag == 1 {
passwordLen := binary.BigEndian.Uint16(data[offset:])
offset += 2
info.Password = data[offset : offset+int(passwordLen)]
}
return info, nil
}
// mqttConn 包装连接,支持预读数据回退
type mqttConn struct {
net.Conn
reader *bufio.Reader
peeked int
}
func (c *mqttConn) Read(p []byte) (n int, err error) {
return c.reader.Read(p)
}
```
### 6.7 测试验证
添加 MQTT Preread 的单元测试:
```go
// internal/stream/mqtt_test.go
package stream
import (
"bytes"
"testing"
)
func TestParseConnectPacket(t *testing.T) {
// 构造一个 MQTT 3.1.1 CONNECT 包
// CONNECT + 剩余长度 + 协议名长度(4) + "MQTT" + 协议级别(4) + 连接标志 + 保持连接 + Client ID
packet := []byte{
// 可变头开始
0x00, 0x04, // 协议名长度 = 4
'M', 'Q', 'T', 'T', // 协议名 "MQTT"
0x04, // 协议级别 4 (3.1.1)
0xC2, // 连接标志: 用户名(1) + 密码(1) + Clean Session(0)
0x00, 0x3C, // 保持连接 = 60 秒
// Payload
0x00, 0x0A, // Client ID 长度 = 10
't', 'e', 's', 't', '-', 'c', 'l', 'i', 'e', 'n', 't', // Client ID
0x00, 0x05, // 用户名长度 = 5
'a', 'd', 'm', 'i', 'n', // 用户名
0x00, 0x08, // 密码长度 = 8
'p', 'a', 's', 's', 'w', 'o', 'r', 'd', // 密码
}
info, err := parseConnectPacket(packet)
if err != nil {
t.Fatalf("parseConnectPacket failed: %v", err)
}
if info.ClientID != "test-client" {
t.Errorf("ClientID = %q, want %q", info.ClientID, "test-client")
}
if info.Username != "admin" {
t.Errorf("Username = %q, want %q", info.Username, "admin")
}
if info.Protocol != 4 {
t.Errorf("Protocol = %d, want %d", info.Protocol, 4)
}
}
func TestDecodeRemainingLength(t *testing.T) {
tests := []struct {
input []byte
expected int
}{
{[]byte{0x00}, 0},
{[]byte{0x7F}, 127},
{[]byte{0x80, 0x01}, 128},
{[]byte{0xFF, 0x7F}, 16383},
{[]byte{0x80, 0x80, 0x01}, 16384},
}
for _, tt := range tests {
r := bufio.NewReader(bytes.NewReader(tt.input))
value, _, err := decodeRemainingLength(r)
if err != nil {
t.Errorf("decodeRemainingLength(%v) error: %v", tt.input, err)
continue
}
if value != tt.expected {
t.Errorf("decodeRemainingLength(%v) = %d, want %d", tt.input, value, tt.expected)
}
}
}
```
---
## 7. 总结
### 7.1 NGINX MQTT 模块要点
1. **Preread 模块** (`ngx_stream_mqtt_preread_module`)
- 只读解析 MQTT CONNECT 消息
- 提取 Client ID 和 Username 用于路由
- 适用于基于内容的负载均衡
2. **Filter 模块** (`ngx_stream_mqtt_filter_module`)
- 支持修改 MQTT CONNECT 字段
- 可用于代理认证和会话管理
- 与 Preread 模块可以配合使用
3. **商业订阅限制**
- 两个模块都需要 NGINX Plus 商业订阅
- 自 1.23.4 版本起可用
### 7.2 适用场景
- **物联网平台** - 海量设备接入和路由
- **多租户 MQTT** - 基于用户名隔离租户
- **边缘网关** - 设备接入层代理
- **MQTT 迁移** - 平滑迁移到新 Broker
### 7.3 Lolly 实现建议优先级
1. **P0** - 基础 MQTT CONNECT 解析器
2. **P1** - 基于 Client ID 的路由
3. **P2** - MQTT Filter修改 CONNECT
4. **P3** - MQTT 5.0 高级特性
### 7.4 参考资料
- [NGINX MQTT Preread Module](https://nginx.org/en/docs/stream/ngx_stream_mqtt_preread_module.html)
- [NGINX MQTT Filter Module](https://nginx.org/en/docs/stream/ngx_stream_mqtt_filter_module.html)
- [MQTT 3.1.1 Specification](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html)
- [MQTT 5.0 Specification](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html)