将 docs/ 根目录下的 nginx 相关文档统一移动到 docs/nginx/ 子目录, 提高文档组织性和可维护性。 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
26 KiB
NGINX MQTT 模块指南
1. MQTT Preread 模块概述
1.1 模块介绍
ngx_stream_mqtt_preread_module 模块用于在 preread 阶段从 MQTT CONNECT 消息中提取客户端信息,而无需终止 MQTT 连接。
1.2 版本要求
- 自 1.23.4 版本起可用
- 属于 NGINX Plus 商业订阅功能
- 支持 MQTT 协议版本 3.1.1 和 5.0
1.3 核心用途
MQTT Preread 模块主要用于以下场景:
- 基于 Client ID 的路由 - 根据客户端 ID 将连接路由到不同的 MQTT Broker
- 基于用户名的路由 - 根据用户名进行负载均衡或访问控制
- 透明代理 - 在不解密 MQTT 连接的情况下获取连接元数据
- 连接分析 - 记录和分析 MQTT 客户端连接信息
1.4 工作原理
客户端 MQTT 连接
|
v
[NGINX Stream 模块]
|
v
[Preread 阶段] <--- mqtt_preread 在此阶段读取 CONNECT 消息
|
v
[变量提取] <--- $mqtt_preread_clientid, $mqtt_preread_username
|
v
[路由决策] <--- 基于变量进行 upstream 选择
|
v
[代理到后端 MQTT Broker]
在 preread 阶段,NGINX 读取 MQTT CONNECT 消息的前几个字节(不超过 16KB),解析其中的 Client ID 和 Username,然后基于这些信息进行路由决策。
2. MQTT Filter 模块概述
2.1 模块介绍
ngx_stream_mqtt_filter_module 模块提供完整的 MQTT 协议支持,允许修改 CONNECT 消息中的字段。
2.2 版本要求
- 自 1.23.4 版本起可用
- 属于 NGINX Plus 商业订阅功能
- 支持 MQTT 协议版本 3.1.1 和 5.0
2.3 核心用途
MQTT Filter 模块主要用于以下场景:
- 修改 Client ID - 为客户端分配统一的 Client ID 格式
- 修改用户名/密码 - 进行身份认证信息的转换或注入
- 代理认证 - 在代理层添加统一的认证信息
- 会话管理 - 控制客户端会话标识
2.4 与 Preread 模块的区别
| 特性 | Preread 模块 | Filter 模块 |
|---|---|---|
| 主要功能 | 读取 CONNECT 信息 | 修改 CONNECT 字段 |
| 处理阶段 | Preread 阶段 | 代理阶段 |
| 是否修改数据 | 否(只读) | 是(读写) |
| 使用场景 | 路由、分析 | 认证、转换 |
| 性能开销 | 极低 | 较低 |
3. 指令详解
3.1 MQTT Preread 模块指令
mqtt_preread
启用或禁用从 MQTT CONNECT 消息中提取信息。
语法:
mqtt_preread on | off;
默认值:off
上下文:stream, server
说明:
- 在 preread 阶段启用 MQTT CONNECT 消息解析
- 启用后,可以通过变量
$mqtt_preread_clientid和$mqtt_preread_username访问提取的信息 - 仅解析 CONNECT 消息,不修改数据流
配置示例:
stream {
server {
listen 1883;
# 启用 MQTT preread
mqtt_preread on;
# 基于 clientid 路由
proxy_pass $mqtt_backend;
}
}
3.2 MQTT Filter 模块指令
mqtt
为给定虚拟服务器启用 MQTT 协议支持。
语法:
mqtt on | off;
默认值:off
上下文:stream, server
说明:
- 启用后可以使用其他 MQTT 相关指令
- 必须在
mqtt_set_connect之前启用
配置示例:
stream {
server {
listen 1883;
# 启用 MQTT 支持
mqtt on;
proxy_pass backend;
}
}
mqtt_buffers
设置单个连接处理 MQTT 消息的缓冲区数量和大小。
语法:
mqtt_buffers number size;
默认值:100 1k;
上下文:stream, server
版本要求:1.25.1+
说明:
- 控制用于处理 MQTT 消息的缓冲区配置
- 较大的缓冲区可以处理更大的 MQTT 消息
- 根据预期的消息大小和并发连接数调整
配置示例:
stream {
server {
listen 1883;
mqtt on;
# 设置缓冲区:50 个缓冲区,每个 4KB
mqtt_buffers 50 4k;
proxy_pass backend;
}
}
mqtt_rewrite_buffer_size
设置用于写入修改后消息的缓冲区大小。
语法:
mqtt_rewrite_buffer_size size;
默认值:4k 或 8k(取决于平台内存页大小)
上下文:server
版本要求:1.25.1+
废弃状态:已废弃,建议使用 mqtt_buffers
说明:
- 该指令在 1.25.1 版本中已被废弃
- 请使用
mqtt_buffers替代
mqtt_set_connect
设置 CONNECT 消息的字段为给定值。
语法:
mqtt_set_connect field value;
默认值:无
上下文:server
说明:
- 支持修改的字段:
clientid,username,password - 值可以包含文本、变量及其组合
- 可以在同一级别指定多个指令
可用字段:
| 字段 | 说明 |
|---|---|
clientid |
MQTT 客户端标识符 |
username |
连接用户名 |
password |
连接密码 |
配置示例:
stream {
server {
listen 18883;
proxy_pass backend;
proxy_buffer_size 16k;
mqtt on;
# 设置 Client ID
mqtt_set_connect clientid "$client";
# 设置用户名
mqtt_set_connect username "$name";
# 设置密码(从变量获取)
mqtt_set_connect password "$mqtt_password";
}
}
4. 嵌入变量
4.1 MQTT Preread 变量
| 变量 | 说明 |
|---|---|
$mqtt_preread_clientid |
CONNECT 消息中的 Client ID 值 |
$mqtt_preread_username |
CONNECT 消息中的 Username 值 |
变量使用示例:
stream {
# 使用 map 基于 clientid 路由
map $mqtt_preread_clientid $backend_pool {
~^device-1 backend_1;
~^device-2 backend_2;
~^sensor-.* sensors_backend;
default default_backend;
}
server {
listen 1883;
mqtt_preread on;
proxy_pass $backend_pool;
}
}
5. 配置示例
5.1 基础 MQTT 代理
简单的 MQTT 代理配置,将所有连接转发到后端 Broker:
stream {
upstream mqtt_backend {
server 192.168.1.10:1883;
server 192.168.1.11:1883 backup;
}
server {
listen 1883;
proxy_pass mqtt_backend;
proxy_timeout 30s;
proxy_connect_timeout 5s;
}
}
5.2 基于 Client ID 的路由
根据 MQTT Client ID 将连接路由到不同的后端:
stream {
# 设备组 1:工业传感器
upstream sensors_backend {
server 10.0.1.10:1883 weight=5;
server 10.0.1.11:1883;
}
# 设备组 2:智能家居设备
upstream home_backend {
server 10.0.2.10:1883;
server 10.0.2.11:1883;
}
# 设备组 3:车联网
upstream vehicle_backend {
server 10.0.3.10:1883;
server 10.0.3.11:1883;
}
# 默认后端
upstream default_backend {
server 10.0.0.10:1883;
}
# 基于 Client ID 的路由映射
map $mqtt_preread_clientid $target_backend {
# 传感器设备(匹配 sensor- 开头的 Client ID)
~^sensor- sensors_backend;
# 智能家居设备(匹配 home- 开头的 Client ID)
~^home- home_backend;
# 车载设备(匹配 vehicle- 或 car- 开头的 Client ID)
~^vehicle- vehicle_backend;
~^car- vehicle_backend;
# 默认后端
default default_backend;
}
server {
listen 1883;
# 启用 MQTT preread
mqtt_preread on;
# 基于 Client ID 路由
proxy_pass $target_backend;
# 连接超时配置
proxy_timeout 300s;
proxy_connect_timeout 10s;
# 启用 TCP keepalive
proxy_socket_keepalive on;
}
}
5.3 基于用户名的负载均衡
根据用户名进行路由,适用于多租户场景:
stream {
# 租户 A 集群
upstream tenant_a_backend {
server 10.0.10.10:1883;
server 10.0.10.11:1883;
}
# 租户 B 集群
upstream tenant_b_backend {
server 10.0.20.10:1883;
server 10.0.20.11:1883;
}
# 管理员集群
upstream admin_backend {
server 10.0.0.10:1883;
}
# 基于用户名路由
map $mqtt_preread_username $tenant_backend {
"tenant-a-user" tenant_a_backend;
"tenant-a-admin" tenant_a_backend;
"tenant-b-user" tenant_b_backend;
"tenant-b-admin" tenant_b_backend;
~^admin-.* admin_backend;
default tenant_a_backend;
}
server {
listen 1883;
mqtt_preread on;
proxy_pass $tenant_backend;
proxy_timeout 60s;
}
}
5.4 修改 CONNECT 消息(Filter 模块)
在代理层修改 MQTT CONNECT 消息中的字段:
stream {
upstream mqtt_backend {
server 192.168.1.10:1883;
}
server {
listen 1883;
proxy_pass mqtt_backend;
proxy_buffer_size 16k;
# 启用 MQTT Filter
mqtt on;
# 设置固定的 Client ID 前缀(追加原始 ID)
mqtt_set_connect clientid "ng-$mqtt_preread_clientid";
# 注入代理认证用户名
mqtt_set_connect username "nginx-proxy";
# 设置代理密码(从文件或环境变量获取)
mqtt_set_connect password "$proxy_mqtt_password";
}
}
5.5 综合配置示例
结合 Preread 和 Filter 模块的完整配置:
stream {
# 日志格式
log_format mqtt_log '$remote_addr [$time_local] '
'clientid:$mqtt_preread_clientid '
'username:$mqtt_preread_username '
'upstream:$upstream_addr '
'status:$status';
# 设备专用集群
upstream device_cluster_a {
zone devices 64k;
server 10.0.1.10:1883 weight=5;
server 10.0.1.11:1883;
server 10.0.1.12:1883 backup;
}
upstream device_cluster_b {
zone devices 64k;
server 10.0.2.10:1883;
server 10.0.2.11:1883;
}
# 普通设备集群
upstream default_devices {
zone devices 64k;
least_conn;
server 10.0.3.10:1883;
server 10.0.3.11:1883;
server 10.0.3.12:1883;
}
# Client ID 到后端映射
map $mqtt_preread_clientid $backend_pool {
~^dev-a- device_cluster_a;
~^device-a- device_cluster_a;
~^dev-b- device_cluster_b;
~^device-b- device_cluster_b;
default default_devices;
}
# 服务器配置
server {
listen 1883;
access_log /var/log/nginx/mqtt-access.log mqtt_log;
# 启用 MQTT Preread 获取 Client ID 和用户名
mqtt_preread on;
# 启用 MQTT Filter(可选,需要修改 CONNECT 时启用)
mqtt on;
mqtt_buffers 50 4k;
# 可选:修改 CONNECT 消息
# mqtt_set_connect clientid "proxy-$mqtt_preread_clientid";
# 基于 Client ID 路由
proxy_pass $backend_pool;
# 超时配置
proxy_timeout 300s;
proxy_connect_timeout 10s;
# 启用 TCP keepalive
proxy_socket_keepalive on;
# 连接限制
limit_conn mqtt_conn 100;
}
# TLS MQTT 端口
server {
listen 8883 ssl;
ssl_certificate /etc/nginx/ssl/mqtt.crt;
ssl_certificate_key /etc/nginx/ssl/mqtt.key;
ssl_protocols TLSv1.2 TLSv1.3;
mqtt_preread on;
proxy_pass $backend_pool;
proxy_timeout 300s;
}
}
# 连接限制共享内存
limit_conn_zone $binary_remote_addr zone=mqtt_conn:10m;
5.6 与 SSL/TLS 结合
MQTT over TLS 的配置:
stream {
upstream mqtt_ssl_backend {
server 192.168.1.10:8883;
server 192.168.1.11:8883;
}
# 终止 TLS 并读取 MQTT 信息
server {
listen 8883 ssl;
ssl_certificate /etc/nginx/ssl/server.crt;
ssl_certificate_key /etc/nginx/ssl/server.key;
ssl_protocols TLSv1.2 TLSv1.3;
# 在解密后读取 MQTT 信息
mqtt_preread on;
mqtt on;
proxy_pass mqtt_ssl_backend;
# 上游也使用 SSL
proxy_ssl on;
proxy_ssl_protocols TLSv1.2 TLSv1.3;
}
}
6. 与 Lolly 项目的关系
6.1 Lolly 项目简介
Lolly 是一个使用 Go 语言编写的高性能 HTTP 服务器与反向代理,基于 fasthttp 构建。它提供了 HTTP/3、WebSocket、TCP/UDP Stream 代理等功能。
6.2 功能对比
| 特性 | NGINX Plus (MQTT) | Lolly |
|---|---|---|
| MQTT Preread | 支持(商业版) | 未实现 |
| MQTT Filter | 支持(商业版) | 未实现 |
| TCP Stream 代理 | 支持 | 支持 |
| 基于内容路由 | 支持 | 有限支持 |
| SSL/TLS 终端 | 支持 | 支持 |
| 负载均衡 | 丰富算法 | 轮询/加权/最少连接/IP哈希 |
6.3 Lolly 中的 Stream 代理
Lolly 目前支持基础的 TCP/UDP Stream 代理:
// Lolly Stream 代理示例配置(YAML)
stream:
- listen: ":1883"
protocol: "tcp"
upstream:
targets:
- addr: "mqtt1:1883"
weight: 3
- addr: "mqtt2:1883"
weight: 1
load_balance: "round_robin"
当前 Lolly 的 Stream 实现位于 internal/stream/stream.go,提供:
- TCP/UDP 代理
- 负载均衡(轮询、加权轮询、最少连接、IP 哈希)
- 健康检查
- 会话管理(UDP)
6.4 在 Lolly 中实现 MQTT 支持的方案
方案 1:独立 MQTT Preread 中间件
在 Lolly 的 Stream 模块中添加 MQTT Preread 功能:
// internal/stream/mqtt_preread.go
package stream
import (
"bufio"
"encoding/binary"
"io"
"net"
)
// MQTTPrereadConfig MQTT Preread 配置
type MQTTPrereadConfig struct {
Enabled bool
OnClientID func(clientID string) string // 路由回调
OnUsername func(username string) string // 认证回调
}
// MQTTConnectInfo 解析后的 MQTT CONNECT 信息
type MQTTConnectInfo struct {
ClientID string
Username string
Password []byte
Protocol byte
}
// ParseMQTTConnect 从连接读取并解析 MQTT CONNECT 消息
func ParseMQTTConnect(conn net.Conn) (*MQTTConnectInfo, error) {
// 1. 读取固定头(2-5 字节)
// 2. 读取剩余长度
// 3. 读取可变头(协议名、协议级别、连接标志)
// 4. 读取 Payload(Client ID、Will Topic、Will Message、Username、Password)
// 5. 返回解析结果
}
方案 2:基于配置的路由
在 Lolly 配置中添加 MQTT 路由规则:
stream:
- listen: ":1883"
protocol: "tcp"
mqtt_preread: true
routes:
- match: "clientid =~ ^sensor-"
upstream: sensors
- match: "clientid =~ ^home-"
upstream: home_devices
- match: "username == admin"
upstream: admin_cluster
upstreams:
sensors:
targets:
- addr: "mqtt-sensors-1:1883"
- addr: "mqtt-sensors-2:1883"
home_devices:
targets:
- addr: "mqtt-home-1:1883"
admin_cluster:
targets:
- addr: "mqtt-admin-1:1883"
方案 3:与现有 Stream 模块集成
扩展现有的 internal/stream/stream.go:
// Target MQTT 扩展
type Target struct {
addr string
weight int
healthy atomic.Bool
conns int64
// MQTT 特定字段
mqttMatcher func(*MQTTConnectInfo) bool // 匹配函数
}
// Upstream 添加 MQTT 选择支持
type Upstream struct {
name string
targets []*Target
balancer Balancer
// MQTT 路由表
mqttRoutes map[string][]*Target // 标签 -> 目标列表
}
// SelectByMQTT 基于 MQTT CONNECT 信息选择目标
func (u *Upstream) SelectByMQTT(info *MQTTConnectInfo) *Target {
// 1. 检查 MQTT 路由表
// 2. 回退到默认负载均衡
}
6.5 实现建议
短期建议(PoC 验证)
-
实现基础 MQTT CONNECT 解析器
- 支持 MQTT 3.1.1 和 5.0
- 提取 Client ID、Username、Password
- 位置:
internal/stream/mqtt.go
-
添加基于 Client ID 的路由
- 简单的正则匹配
- 配置文件支持
- 与现有 upstream 集成
-
性能测试
- 对比有无 preread 的性能差异
- 内存占用分析
中期建议(功能完善)
-
完整 MQTT Filter 支持
- 支持修改 CONNECT 字段
- 支持消息重写
- 配置热重载
-
监控与日志
- MQTT 特定指标(连接数、消息数)
- 结构化日志输出
-
安全增强
- 基于 Client ID 的访问控制
- 速率限制
长期建议(企业级功能)
-
MQTT 5.0 完整支持
- 用户属性处理
- 共享订阅支持
- 消息过期处理
-
高级路由
- 基于 Topic 的路由(需要解析 PUBLISH/SUBSCRIBE)
- 动态后端发现
-
与 HTTP 层的集成
- 统一的配置管理
- 共享的健康检查
6.6 代码示例
以下是在 Lolly 中实现 MQTT Preread 的参考代码:
// internal/stream/mqtt.go
package stream
import (
"bufio"
"encoding/binary"
"fmt"
"io"
"net"
)
const (
// MQTT 控制包类型
mqttCONNECT = 1
// MQTT 协议名
mqttProtocol311 = "MQTT" // v3.1.1
mqttProtocol31 = "MQIsdp" // v3.1
)
// MQTTPrereadHandler MQTT Preread 处理器
type MQTTPrereadHandler struct {
config *MQTTPrereadConfig
}
// NewMQTTPrereadHandler 创建处理器
func NewMQTTPrereadHandler(config *MQTTPrereadConfig) *MQTTPrereadHandler {
return &MQTTPrereadHandler{config: config}
}
// Handle 处理连接,解析 MQTT CONNECT 并返回信息
func (h *MQTTPrereadHandler) Handle(conn net.Conn) (*MQTTConnectInfo, net.Conn, error) {
// 使用 bufio 预读数据
reader := bufio.NewReaderSize(conn, 4096)
// 读取固定头第一个字节
firstByte, err := reader.Peek(1)
if err != nil {
return nil, nil, err
}
// 检查是否为 CONNECT 包
packetType := (firstByte[0] >> 4) & 0x0F
if packetType != mqttCONNECT {
return nil, nil, fmt.Errorf("expected CONNECT packet, got %d", packetType)
}
// 读取剩余长度(可变长度编码)
remainingLen, headerLen, err := decodeRemainingLength(reader)
if err != nil {
return nil, nil, err
}
// 读取完整的 CONNECT 包
totalLen := 1 + headerLen + remainingLen
packet, err := reader.Peek(totalLen)
if err != nil {
return nil, nil, err
}
// 解析 CONNECT 包
info, err := parseConnectPacket(packet[1+headerLen:])
if err != nil {
return nil, nil, err
}
// 创建包装连接,将预读的数据返回给后续处理
wrappedConn := &mqttConn{
Conn: conn,
reader: reader,
peeked: totalLen,
}
return info, wrappedConn, nil
}
// decodeRemainingLength 解码 MQTT 剩余长度字段
func decodeRemainingLength(r *bufio.Reader) (int, int, error) {
var value int
var multiplier int = 1
var headerLen int
for {
b, err := r.ReadByte()
if err != nil {
return 0, 0, err
}
headerLen++
value += int(b&0x7F) * multiplier
multiplier *= 128
if (b & 0x80) == 0 {
break
}
if multiplier > 128*128*128 {
return 0, 0, fmt.Errorf("malformed remaining length")
}
}
return value, headerLen, nil
}
// parseConnectPacket 解析 CONNECT 包体
func parseConnectPacket(data []byte) (*MQTTConnectInfo, error) {
info := &MQTTConnectInfo{}
offset := 0
// 读取协议名长度
protoLen := binary.BigEndian.Uint16(data[offset:])
offset += 2
// 读取协议名
protoName := string(data[offset : offset+int(protoLen)])
offset += int(protoLen)
// 判断协议版本
if protoName == mqttProtocol311 {
info.Protocol = 4 // MQTT 3.1.1
} else if protoName == mqttProtocol31 {
info.Protocol = 3 // MQTT 3.1
}
// 读取协议级别
protocolLevel := data[offset]
offset++
_ = protocolLevel
// 读取连接标志
connectFlags := data[offset]
offset++
usernameFlag := (connectFlags >> 7) & 1
passwordFlag := (connectFlags >> 6) & 1
// willRetain := (connectFlags >> 5) & 1
willQoS := (connectFlags >> 3) & 3
willFlag := (connectFlags >> 2) & 1
// cleanSession := (connectFlags >> 1) & 1
// 读取保持连接时间(跳过)
offset += 2
// 读取 Client ID
clientIDLen := binary.BigEndian.Uint16(data[offset:])
offset += 2
info.ClientID = string(data[offset : offset+int(clientIDLen)])
offset += int(clientIDLen)
// 读取 Will Topic 和 Will Message(如果有)
if willFlag == 1 {
// Will Topic
willTopicLen := binary.BigEndian.Uint16(data[offset:])
offset += 2
offset += int(willTopicLen)
// Will Message
willMsgLen := binary.BigEndian.Uint16(data[offset:])
offset += 2
offset += int(willMsgLen)
}
// 读取 Username(如果有)
if usernameFlag == 1 {
usernameLen := binary.BigEndian.Uint16(data[offset:])
offset += 2
info.Username = string(data[offset : offset+int(usernameLen)])
offset += int(usernameLen)
}
// 读取 Password(如果有)
if passwordFlag == 1 {
passwordLen := binary.BigEndian.Uint16(data[offset:])
offset += 2
info.Password = data[offset : offset+int(passwordLen)]
}
return info, nil
}
// mqttConn 包装连接,支持预读数据回退
type mqttConn struct {
net.Conn
reader *bufio.Reader
peeked int
}
func (c *mqttConn) Read(p []byte) (n int, err error) {
return c.reader.Read(p)
}
6.7 测试验证
添加 MQTT Preread 的单元测试:
// internal/stream/mqtt_test.go
package stream
import (
"bytes"
"testing"
)
func TestParseConnectPacket(t *testing.T) {
// 构造一个 MQTT 3.1.1 CONNECT 包
// CONNECT + 剩余长度 + 协议名长度(4) + "MQTT" + 协议级别(4) + 连接标志 + 保持连接 + Client ID
packet := []byte{
// 可变头开始
0x00, 0x04, // 协议名长度 = 4
'M', 'Q', 'T', 'T', // 协议名 "MQTT"
0x04, // 协议级别 4 (3.1.1)
0xC2, // 连接标志: 用户名(1) + 密码(1) + Clean Session(0)
0x00, 0x3C, // 保持连接 = 60 秒
// Payload
0x00, 0x0A, // Client ID 长度 = 10
't', 'e', 's', 't', '-', 'c', 'l', 'i', 'e', 'n', 't', // Client ID
0x00, 0x05, // 用户名长度 = 5
'a', 'd', 'm', 'i', 'n', // 用户名
0x00, 0x08, // 密码长度 = 8
'p', 'a', 's', 's', 'w', 'o', 'r', 'd', // 密码
}
info, err := parseConnectPacket(packet)
if err != nil {
t.Fatalf("parseConnectPacket failed: %v", err)
}
if info.ClientID != "test-client" {
t.Errorf("ClientID = %q, want %q", info.ClientID, "test-client")
}
if info.Username != "admin" {
t.Errorf("Username = %q, want %q", info.Username, "admin")
}
if info.Protocol != 4 {
t.Errorf("Protocol = %d, want %d", info.Protocol, 4)
}
}
func TestDecodeRemainingLength(t *testing.T) {
tests := []struct {
input []byte
expected int
}{
{[]byte{0x00}, 0},
{[]byte{0x7F}, 127},
{[]byte{0x80, 0x01}, 128},
{[]byte{0xFF, 0x7F}, 16383},
{[]byte{0x80, 0x80, 0x01}, 16384},
}
for _, tt := range tests {
r := bufio.NewReader(bytes.NewReader(tt.input))
value, _, err := decodeRemainingLength(r)
if err != nil {
t.Errorf("decodeRemainingLength(%v) error: %v", tt.input, err)
continue
}
if value != tt.expected {
t.Errorf("decodeRemainingLength(%v) = %d, want %d", tt.input, value, tt.expected)
}
}
}
7. 总结
7.1 NGINX MQTT 模块要点
-
Preread 模块 (
ngx_stream_mqtt_preread_module)- 只读解析 MQTT CONNECT 消息
- 提取 Client ID 和 Username 用于路由
- 适用于基于内容的负载均衡
-
Filter 模块 (
ngx_stream_mqtt_filter_module)- 支持修改 MQTT CONNECT 字段
- 可用于代理认证和会话管理
- 与 Preread 模块可以配合使用
-
商业订阅限制
- 两个模块都需要 NGINX Plus 商业订阅
- 自 1.23.4 版本起可用
7.2 适用场景
- 物联网平台 - 海量设备接入和路由
- 多租户 MQTT - 基于用户名隔离租户
- 边缘网关 - 设备接入层代理
- MQTT 迁移 - 平滑迁移到新 Broker
7.3 Lolly 实现建议优先级
- P0 - 基础 MQTT CONNECT 解析器
- P1 - 基于 Client ID 的路由
- P2 - MQTT Filter(修改 CONNECT)
- P3 - MQTT 5.0 高级特性