lolly/docs/nginx/33-nginx-mqtt-module.md
xfy 972eab4267 refactor(docs): 重构文档目录结构,nginx 文档移至子目录
将 docs/ 根目录下的 nginx 相关文档统一移动到 docs/nginx/ 子目录,
提高文档组织性和可维护性。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-16 10:48:14 +08:00

26 KiB
Raw Permalink Blame History

NGINX MQTT 模块指南

1. MQTT Preread 模块概述

1.1 模块介绍

ngx_stream_mqtt_preread_module 模块用于在 preread 阶段从 MQTT CONNECT 消息中提取客户端信息,而无需终止 MQTT 连接。

1.2 版本要求

  • 1.23.4 版本起可用
  • 属于 NGINX Plus 商业订阅功能
  • 支持 MQTT 协议版本 3.1.15.0

1.3 核心用途

MQTT Preread 模块主要用于以下场景:

  1. 基于 Client ID 的路由 - 根据客户端 ID 将连接路由到不同的 MQTT Broker
  2. 基于用户名的路由 - 根据用户名进行负载均衡或访问控制
  3. 透明代理 - 在不解密 MQTT 连接的情况下获取连接元数据
  4. 连接分析 - 记录和分析 MQTT 客户端连接信息

1.4 工作原理

客户端 MQTT 连接
       |
       v
[NGINX Stream 模块]
       |
       v
[Preread 阶段] <--- mqtt_preread 在此阶段读取 CONNECT 消息
       |
       v
[变量提取] <--- $mqtt_preread_clientid, $mqtt_preread_username
       |
       v
[路由决策] <--- 基于变量进行 upstream 选择
       |
       v
[代理到后端 MQTT Broker]

在 preread 阶段NGINX 读取 MQTT CONNECT 消息的前几个字节(不超过 16KB解析其中的 Client ID 和 Username然后基于这些信息进行路由决策。


2. MQTT Filter 模块概述

2.1 模块介绍

ngx_stream_mqtt_filter_module 模块提供完整的 MQTT 协议支持,允许修改 CONNECT 消息中的字段。

2.2 版本要求

  • 1.23.4 版本起可用
  • 属于 NGINX Plus 商业订阅功能
  • 支持 MQTT 协议版本 3.1.15.0

2.3 核心用途

MQTT Filter 模块主要用于以下场景:

  1. 修改 Client ID - 为客户端分配统一的 Client ID 格式
  2. 修改用户名/密码 - 进行身份认证信息的转换或注入
  3. 代理认证 - 在代理层添加统一的认证信息
  4. 会话管理 - 控制客户端会话标识

2.4 与 Preread 模块的区别

特性 Preread 模块 Filter 模块
主要功能 读取 CONNECT 信息 修改 CONNECT 字段
处理阶段 Preread 阶段 代理阶段
是否修改数据 否(只读) 是(读写)
使用场景 路由、分析 认证、转换
性能开销 极低 较低

3. 指令详解

3.1 MQTT Preread 模块指令

mqtt_preread

启用或禁用从 MQTT CONNECT 消息中提取信息。

语法

mqtt_preread on | off;

默认值off

上下文stream, server

说明

  • 在 preread 阶段启用 MQTT CONNECT 消息解析
  • 启用后,可以通过变量 $mqtt_preread_clientid$mqtt_preread_username 访问提取的信息
  • 仅解析 CONNECT 消息,不修改数据流

配置示例

stream {
    server {
        listen 1883;
        
        # 启用 MQTT preread
        mqtt_preread on;
        
        # 基于 clientid 路由
        proxy_pass $mqtt_backend;
    }
}

3.2 MQTT Filter 模块指令

mqtt

为给定虚拟服务器启用 MQTT 协议支持。

语法

mqtt on | off;

默认值off

上下文stream, server

说明

  • 启用后可以使用其他 MQTT 相关指令
  • 必须在 mqtt_set_connect 之前启用

配置示例

stream {
    server {
        listen 1883;
        
        # 启用 MQTT 支持
        mqtt on;
        
        proxy_pass backend;
    }
}

mqtt_buffers

设置单个连接处理 MQTT 消息的缓冲区数量和大小。

语法

mqtt_buffers number size;

默认值100 1k;

上下文stream, server

版本要求1.25.1+

说明

  • 控制用于处理 MQTT 消息的缓冲区配置
  • 较大的缓冲区可以处理更大的 MQTT 消息
  • 根据预期的消息大小和并发连接数调整

配置示例

stream {
    server {
        listen 1883;
        mqtt on;
        
        # 设置缓冲区50 个缓冲区,每个 4KB
        mqtt_buffers 50 4k;
        
        proxy_pass backend;
    }
}

mqtt_rewrite_buffer_size

设置用于写入修改后消息的缓冲区大小。

语法

mqtt_rewrite_buffer_size size;

默认值4k8k(取决于平台内存页大小)

上下文server

版本要求1.25.1+

废弃状态:已废弃,建议使用 mqtt_buffers

说明

  • 该指令在 1.25.1 版本中已被废弃
  • 请使用 mqtt_buffers 替代

mqtt_set_connect

设置 CONNECT 消息的字段为给定值。

语法

mqtt_set_connect field value;

默认值:无

上下文server

说明

  • 支持修改的字段:clientid, username, password
  • 值可以包含文本、变量及其组合
  • 可以在同一级别指定多个指令

可用字段

字段 说明
clientid MQTT 客户端标识符
username 连接用户名
password 连接密码

配置示例

stream {
    server {
        listen 18883;
        proxy_pass backend;
        proxy_buffer_size 16k;
        
        mqtt on;
        
        # 设置 Client ID
        mqtt_set_connect clientid "$client";
        
        # 设置用户名
        mqtt_set_connect username "$name";
        
        # 设置密码(从变量获取)
        mqtt_set_connect password "$mqtt_password";
    }
}

4. 嵌入变量

4.1 MQTT Preread 变量

变量 说明
$mqtt_preread_clientid CONNECT 消息中的 Client ID 值
$mqtt_preread_username CONNECT 消息中的 Username 值

变量使用示例

stream {
    # 使用 map 基于 clientid 路由
    map $mqtt_preread_clientid $backend_pool {
        ~^device-1    backend_1;
        ~^device-2    backend_2;
        ~^sensor-.*   sensors_backend;
        default       default_backend;
    }
    
    server {
        listen 1883;
        mqtt_preread on;
        
        proxy_pass $backend_pool;
    }
}

5. 配置示例

5.1 基础 MQTT 代理

简单的 MQTT 代理配置,将所有连接转发到后端 Broker

stream {
    upstream mqtt_backend {
        server 192.168.1.10:1883;
        server 192.168.1.11:1883 backup;
    }
    
    server {
        listen 1883;
        proxy_pass mqtt_backend;
        proxy_timeout 30s;
        proxy_connect_timeout 5s;
    }
}

5.2 基于 Client ID 的路由

根据 MQTT Client ID 将连接路由到不同的后端:

stream {
    # 设备组 1工业传感器
    upstream sensors_backend {
        server 10.0.1.10:1883 weight=5;
        server 10.0.1.11:1883;
    }
    
    # 设备组 2智能家居设备
    upstream home_backend {
        server 10.0.2.10:1883;
        server 10.0.2.11:1883;
    }
    
    # 设备组 3车联网
    upstream vehicle_backend {
        server 10.0.3.10:1883;
        server 10.0.3.11:1883;
    }
    
    # 默认后端
    upstream default_backend {
        server 10.0.0.10:1883;
    }
    
    # 基于 Client ID 的路由映射
    map $mqtt_preread_clientid $target_backend {
        # 传感器设备(匹配 sensor- 开头的 Client ID
        ~^sensor-    sensors_backend;
        
        # 智能家居设备(匹配 home- 开头的 Client ID
        ~^home-      home_backend;
        
        # 车载设备(匹配 vehicle- 或 car- 开头的 Client ID
        ~^vehicle-   vehicle_backend;
        ~^car-       vehicle_backend;
        
        # 默认后端
        default      default_backend;
    }
    
    server {
        listen 1883;
        
        # 启用 MQTT preread
        mqtt_preread on;
        
        # 基于 Client ID 路由
        proxy_pass $target_backend;
        
        # 连接超时配置
        proxy_timeout 300s;
        proxy_connect_timeout 10s;
        
        # 启用 TCP keepalive
        proxy_socket_keepalive on;
    }
}

5.3 基于用户名的负载均衡

根据用户名进行路由,适用于多租户场景:

stream {
    # 租户 A 集群
    upstream tenant_a_backend {
        server 10.0.10.10:1883;
        server 10.0.10.11:1883;
    }
    
    # 租户 B 集群
    upstream tenant_b_backend {
        server 10.0.20.10:1883;
        server 10.0.20.11:1883;
    }
    
    # 管理员集群
    upstream admin_backend {
        server 10.0.0.10:1883;
    }
    
    # 基于用户名路由
    map $mqtt_preread_username $tenant_backend {
        "tenant-a-user"    tenant_a_backend;
        "tenant-a-admin"   tenant_a_backend;
        "tenant-b-user"    tenant_b_backend;
        "tenant-b-admin"   tenant_b_backend;
        ~^admin-.*         admin_backend;
        default            tenant_a_backend;
    }
    
    server {
        listen 1883;
        mqtt_preread on;
        
        proxy_pass $tenant_backend;
        proxy_timeout 60s;
    }
}

5.4 修改 CONNECT 消息Filter 模块)

在代理层修改 MQTT CONNECT 消息中的字段:

stream {
    upstream mqtt_backend {
        server 192.168.1.10:1883;
    }
    
    server {
        listen 1883;
        proxy_pass mqtt_backend;
        proxy_buffer_size 16k;
        
        # 启用 MQTT Filter
        mqtt on;
        
        # 设置固定的 Client ID 前缀(追加原始 ID
        mqtt_set_connect clientid "ng-$mqtt_preread_clientid";
        
        # 注入代理认证用户名
        mqtt_set_connect username "nginx-proxy";
        
        # 设置代理密码(从文件或环境变量获取)
        mqtt_set_connect password "$proxy_mqtt_password";
    }
}

5.5 综合配置示例

结合 Preread 和 Filter 模块的完整配置:

stream {
    # 日志格式
    log_format mqtt_log '$remote_addr [$time_local] '
                        'clientid:$mqtt_preread_clientid '
                        'username:$mqtt_preread_username '
                        'upstream:$upstream_addr '
                        'status:$status';
    
    # 设备专用集群
    upstream device_cluster_a {
        zone devices 64k;
        server 10.0.1.10:1883 weight=5;
        server 10.0.1.11:1883;
        server 10.0.1.12:1883 backup;
    }
    
    upstream device_cluster_b {
        zone devices 64k;
        server 10.0.2.10:1883;
        server 10.0.2.11:1883;
    }
    
    # 普通设备集群
    upstream default_devices {
        zone devices 64k;
        least_conn;
        server 10.0.3.10:1883;
        server 10.0.3.11:1883;
        server 10.0.3.12:1883;
    }
    
    # Client ID 到后端映射
    map $mqtt_preread_clientid $backend_pool {
        ~^dev-a-    device_cluster_a;
        ~^device-a- device_cluster_a;
        ~^dev-b-    device_cluster_b;
        ~^device-b- device_cluster_b;
        default     default_devices;
    }
    
    # 服务器配置
    server {
        listen 1883;
        access_log /var/log/nginx/mqtt-access.log mqtt_log;
        
        # 启用 MQTT Preread 获取 Client ID 和用户名
        mqtt_preread on;
        
        # 启用 MQTT Filter可选需要修改 CONNECT 时启用)
        mqtt on;
        mqtt_buffers 50 4k;
        
        # 可选:修改 CONNECT 消息
        # mqtt_set_connect clientid "proxy-$mqtt_preread_clientid";
        
        # 基于 Client ID 路由
        proxy_pass $backend_pool;
        
        # 超时配置
        proxy_timeout 300s;
        proxy_connect_timeout 10s;
        
        # 启用 TCP keepalive
        proxy_socket_keepalive on;
        
        # 连接限制
        limit_conn mqtt_conn 100;
    }
    
    # TLS MQTT 端口
    server {
        listen 8883 ssl;
        
        ssl_certificate     /etc/nginx/ssl/mqtt.crt;
        ssl_certificate_key /etc/nginx/ssl/mqtt.key;
        ssl_protocols       TLSv1.2 TLSv1.3;
        
        mqtt_preread on;
        proxy_pass $backend_pool;
        proxy_timeout 300s;
    }
}

# 连接限制共享内存
limit_conn_zone $binary_remote_addr zone=mqtt_conn:10m;

5.6 与 SSL/TLS 结合

MQTT over TLS 的配置:

stream {
    upstream mqtt_ssl_backend {
        server 192.168.1.10:8883;
        server 192.168.1.11:8883;
    }
    
    # 终止 TLS 并读取 MQTT 信息
    server {
        listen 8883 ssl;
        
        ssl_certificate     /etc/nginx/ssl/server.crt;
        ssl_certificate_key /etc/nginx/ssl/server.key;
        ssl_protocols       TLSv1.2 TLSv1.3;
        
        # 在解密后读取 MQTT 信息
        mqtt_preread on;
        mqtt on;
        
        proxy_pass mqtt_ssl_backend;
        
        # 上游也使用 SSL
        proxy_ssl on;
        proxy_ssl_protocols TLSv1.2 TLSv1.3;
    }
}

6. 与 Lolly 项目的关系

6.1 Lolly 项目简介

Lolly 是一个使用 Go 语言编写的高性能 HTTP 服务器与反向代理,基于 fasthttp 构建。它提供了 HTTP/3、WebSocket、TCP/UDP Stream 代理等功能。

6.2 功能对比

特性 NGINX Plus (MQTT) Lolly
MQTT Preread 支持(商业版) 未实现
MQTT Filter 支持(商业版) 未实现
TCP Stream 代理 支持 支持
基于内容路由 支持 有限支持
SSL/TLS 终端 支持 支持
负载均衡 丰富算法 轮询/加权/最少连接/IP哈希

6.3 Lolly 中的 Stream 代理

Lolly 目前支持基础的 TCP/UDP Stream 代理:

// Lolly Stream 代理示例配置YAML
stream:
  - listen: ":1883"
    protocol: "tcp"
    upstream:
      targets:
        - addr: "mqtt1:1883"
          weight: 3
        - addr: "mqtt2:1883"
          weight: 1
      load_balance: "round_robin"

当前 Lolly 的 Stream 实现位于 internal/stream/stream.go,提供:

  • TCP/UDP 代理
  • 负载均衡轮询、加权轮询、最少连接、IP 哈希)
  • 健康检查
  • 会话管理UDP

6.4 在 Lolly 中实现 MQTT 支持的方案

方案 1独立 MQTT Preread 中间件

在 Lolly 的 Stream 模块中添加 MQTT Preread 功能:

// internal/stream/mqtt_preread.go
package stream

import (
    "bufio"
    "encoding/binary"
    "io"
    "net"
)

// MQTTPrereadConfig MQTT Preread 配置
type MQTTPrereadConfig struct {
    Enabled  bool
    OnClientID func(clientID string) string // 路由回调
    OnUsername func(username string) string // 认证回调
}

// MQTTConnectInfo 解析后的 MQTT CONNECT 信息
type MQTTConnectInfo struct {
    ClientID string
    Username string
    Password []byte
    Protocol byte
}

// ParseMQTTConnect 从连接读取并解析 MQTT CONNECT 消息
func ParseMQTTConnect(conn net.Conn) (*MQTTConnectInfo, error) {
    // 1. 读取固定头2-5 字节)
    // 2. 读取剩余长度
    // 3. 读取可变头(协议名、协议级别、连接标志)
    // 4. 读取 PayloadClient ID、Will Topic、Will Message、Username、Password
    // 5. 返回解析结果
}

方案 2基于配置的路由

在 Lolly 配置中添加 MQTT 路由规则:

stream:
  - listen: ":1883"
    protocol: "tcp"
    mqtt_preread: true
    routes:
      - match: "clientid =~ ^sensor-"
        upstream: sensors
      - match: "clientid =~ ^home-"
        upstream: home_devices
      - match: "username == admin"
        upstream: admin_cluster
    upstreams:
      sensors:
        targets:
          - addr: "mqtt-sensors-1:1883"
          - addr: "mqtt-sensors-2:1883"
      home_devices:
        targets:
          - addr: "mqtt-home-1:1883"
      admin_cluster:
        targets:
          - addr: "mqtt-admin-1:1883"

方案 3与现有 Stream 模块集成

扩展现有的 internal/stream/stream.go

// Target MQTT 扩展
type Target struct {
    addr   string
    weight int
    healthy atomic.Bool
    conns int64
    
    // MQTT 特定字段
    mqttMatcher func(*MQTTConnectInfo) bool // 匹配函数
}

// Upstream 添加 MQTT 选择支持
type Upstream struct {
    name     string
    targets  []*Target
    balancer Balancer
    
    // MQTT 路由表
    mqttRoutes map[string][]*Target // 标签 -> 目标列表
}

// SelectByMQTT 基于 MQTT CONNECT 信息选择目标
func (u *Upstream) SelectByMQTT(info *MQTTConnectInfo) *Target {
    // 1. 检查 MQTT 路由表
    // 2. 回退到默认负载均衡
}

6.5 实现建议

短期建议PoC 验证)

  1. 实现基础 MQTT CONNECT 解析器

    • 支持 MQTT 3.1.1 和 5.0
    • 提取 Client ID、Username、Password
    • 位置:internal/stream/mqtt.go
  2. 添加基于 Client ID 的路由

    • 简单的正则匹配
    • 配置文件支持
    • 与现有 upstream 集成
  3. 性能测试

    • 对比有无 preread 的性能差异
    • 内存占用分析

中期建议(功能完善)

  1. 完整 MQTT Filter 支持

    • 支持修改 CONNECT 字段
    • 支持消息重写
    • 配置热重载
  2. 监控与日志

    • MQTT 特定指标(连接数、消息数)
    • 结构化日志输出
  3. 安全增强

    • 基于 Client ID 的访问控制
    • 速率限制

长期建议(企业级功能)

  1. MQTT 5.0 完整支持

    • 用户属性处理
    • 共享订阅支持
    • 消息过期处理
  2. 高级路由

    • 基于 Topic 的路由(需要解析 PUBLISH/SUBSCRIBE
    • 动态后端发现
  3. 与 HTTP 层的集成

    • 统一的配置管理
    • 共享的健康检查

6.6 代码示例

以下是在 Lolly 中实现 MQTT Preread 的参考代码:

// internal/stream/mqtt.go
package stream

import (
    "bufio"
    "encoding/binary"
    "fmt"
    "io"
    "net"
)

const (
    // MQTT 控制包类型
    mqttCONNECT = 1
    
    // MQTT 协议名
    mqttProtocol311 = "MQTT"    // v3.1.1
    mqttProtocol31  = "MQIsdp"  // v3.1
)

// MQTTPrereadHandler MQTT Preread 处理器
type MQTTPrereadHandler struct {
    config *MQTTPrereadConfig
}

// NewMQTTPrereadHandler 创建处理器
func NewMQTTPrereadHandler(config *MQTTPrereadConfig) *MQTTPrereadHandler {
    return &MQTTPrereadHandler{config: config}
}

// Handle 处理连接,解析 MQTT CONNECT 并返回信息
func (h *MQTTPrereadHandler) Handle(conn net.Conn) (*MQTTConnectInfo, net.Conn, error) {
    // 使用 bufio 预读数据
    reader := bufio.NewReaderSize(conn, 4096)
    
    // 读取固定头第一个字节
    firstByte, err := reader.Peek(1)
    if err != nil {
        return nil, nil, err
    }
    
    // 检查是否为 CONNECT 包
    packetType := (firstByte[0] >> 4) & 0x0F
    if packetType != mqttCONNECT {
        return nil, nil, fmt.Errorf("expected CONNECT packet, got %d", packetType)
    }
    
    // 读取剩余长度(可变长度编码)
    remainingLen, headerLen, err := decodeRemainingLength(reader)
    if err != nil {
        return nil, nil, err
    }
    
    // 读取完整的 CONNECT 包
    totalLen := 1 + headerLen + remainingLen
    packet, err := reader.Peek(totalLen)
    if err != nil {
        return nil, nil, err
    }
    
    // 解析 CONNECT 包
    info, err := parseConnectPacket(packet[1+headerLen:])
    if err != nil {
        return nil, nil, err
    }
    
    // 创建包装连接,将预读的数据返回给后续处理
    wrappedConn := &mqttConn{
        Conn:   conn,
        reader: reader,
        peeked: totalLen,
    }
    
    return info, wrappedConn, nil
}

// decodeRemainingLength 解码 MQTT 剩余长度字段
func decodeRemainingLength(r *bufio.Reader) (int, int, error) {
    var value int
    var multiplier int = 1
    var headerLen int
    
    for {
        b, err := r.ReadByte()
        if err != nil {
            return 0, 0, err
        }
        headerLen++
        
        value += int(b&0x7F) * multiplier
        multiplier *= 128
        
        if (b & 0x80) == 0 {
            break
        }
        
        if multiplier > 128*128*128 {
            return 0, 0, fmt.Errorf("malformed remaining length")
        }
    }
    
    return value, headerLen, nil
}

// parseConnectPacket 解析 CONNECT 包体
func parseConnectPacket(data []byte) (*MQTTConnectInfo, error) {
    info := &MQTTConnectInfo{}
    offset := 0
    
    // 读取协议名长度
    protoLen := binary.BigEndian.Uint16(data[offset:])
    offset += 2
    
    // 读取协议名
    protoName := string(data[offset : offset+int(protoLen)])
    offset += int(protoLen)
    
    // 判断协议版本
    if protoName == mqttProtocol311 {
        info.Protocol = 4 // MQTT 3.1.1
    } else if protoName == mqttProtocol31 {
        info.Protocol = 3 // MQTT 3.1
    }
    
    // 读取协议级别
    protocolLevel := data[offset]
    offset++
    _ = protocolLevel
    
    // 读取连接标志
    connectFlags := data[offset]
    offset++
    
    usernameFlag := (connectFlags >> 7) & 1
    passwordFlag := (connectFlags >> 6) & 1
    // willRetain := (connectFlags >> 5) & 1
    willQoS := (connectFlags >> 3) & 3
    willFlag := (connectFlags >> 2) & 1
    // cleanSession := (connectFlags >> 1) & 1
    
    // 读取保持连接时间(跳过)
    offset += 2
    
    // 读取 Client ID
    clientIDLen := binary.BigEndian.Uint16(data[offset:])
    offset += 2
    info.ClientID = string(data[offset : offset+int(clientIDLen)])
    offset += int(clientIDLen)
    
    // 读取 Will Topic 和 Will Message如果有
    if willFlag == 1 {
        // Will Topic
        willTopicLen := binary.BigEndian.Uint16(data[offset:])
        offset += 2
        offset += int(willTopicLen)
        
        // Will Message
        willMsgLen := binary.BigEndian.Uint16(data[offset:])
        offset += 2
        offset += int(willMsgLen)
    }
    
    // 读取 Username如果有
    if usernameFlag == 1 {
        usernameLen := binary.BigEndian.Uint16(data[offset:])
        offset += 2
        info.Username = string(data[offset : offset+int(usernameLen)])
        offset += int(usernameLen)
    }
    
    // 读取 Password如果有
    if passwordFlag == 1 {
        passwordLen := binary.BigEndian.Uint16(data[offset:])
        offset += 2
        info.Password = data[offset : offset+int(passwordLen)]
    }
    
    return info, nil
}

// mqttConn 包装连接,支持预读数据回退
type mqttConn struct {
    net.Conn
    reader *bufio.Reader
    peeked int
}

func (c *mqttConn) Read(p []byte) (n int, err error) {
    return c.reader.Read(p)
}

6.7 测试验证

添加 MQTT Preread 的单元测试:

// internal/stream/mqtt_test.go
package stream

import (
    "bytes"
    "testing"
)

func TestParseConnectPacket(t *testing.T) {
    // 构造一个 MQTT 3.1.1 CONNECT 包
    // CONNECT + 剩余长度 + 协议名长度(4) + "MQTT" + 协议级别(4) + 连接标志 + 保持连接 + Client ID
    packet := []byte{
        // 可变头开始
        0x00, 0x04,       // 协议名长度 = 4
        'M', 'Q', 'T', 'T', // 协议名 "MQTT"
        0x04,             // 协议级别 4 (3.1.1)
        0xC2,             // 连接标志: 用户名(1) + 密码(1) + Clean Session(0)
        0x00, 0x3C,       // 保持连接 = 60 秒
        // Payload
        0x00, 0x0A,       // Client ID 长度 = 10
        't', 'e', 's', 't', '-', 'c', 'l', 'i', 'e', 'n', 't', // Client ID
        0x00, 0x05,       // 用户名长度 = 5
        'a', 'd', 'm', 'i', 'n', // 用户名
        0x00, 0x08,       // 密码长度 = 8
        'p', 'a', 's', 's', 'w', 'o', 'r', 'd', // 密码
    }
    
    info, err := parseConnectPacket(packet)
    if err != nil {
        t.Fatalf("parseConnectPacket failed: %v", err)
    }
    
    if info.ClientID != "test-client" {
        t.Errorf("ClientID = %q, want %q", info.ClientID, "test-client")
    }
    
    if info.Username != "admin" {
        t.Errorf("Username = %q, want %q", info.Username, "admin")
    }
    
    if info.Protocol != 4 {
        t.Errorf("Protocol = %d, want %d", info.Protocol, 4)
    }
}

func TestDecodeRemainingLength(t *testing.T) {
    tests := []struct {
        input    []byte
        expected int
    }{
        {[]byte{0x00}, 0},
        {[]byte{0x7F}, 127},
        {[]byte{0x80, 0x01}, 128},
        {[]byte{0xFF, 0x7F}, 16383},
        {[]byte{0x80, 0x80, 0x01}, 16384},
    }
    
    for _, tt := range tests {
        r := bufio.NewReader(bytes.NewReader(tt.input))
        value, _, err := decodeRemainingLength(r)
        if err != nil {
            t.Errorf("decodeRemainingLength(%v) error: %v", tt.input, err)
            continue
        }
        if value != tt.expected {
            t.Errorf("decodeRemainingLength(%v) = %d, want %d", tt.input, value, tt.expected)
        }
    }
}

7. 总结

7.1 NGINX MQTT 模块要点

  1. Preread 模块 (ngx_stream_mqtt_preread_module)

    • 只读解析 MQTT CONNECT 消息
    • 提取 Client ID 和 Username 用于路由
    • 适用于基于内容的负载均衡
  2. Filter 模块 (ngx_stream_mqtt_filter_module)

    • 支持修改 MQTT CONNECT 字段
    • 可用于代理认证和会话管理
    • 与 Preread 模块可以配合使用
  3. 商业订阅限制

    • 两个模块都需要 NGINX Plus 商业订阅
    • 自 1.23.4 版本起可用

7.2 适用场景

  • 物联网平台 - 海量设备接入和路由
  • 多租户 MQTT - 基于用户名隔离租户
  • 边缘网关 - 设备接入层代理
  • MQTT 迁移 - 平滑迁移到新 Broker

7.3 Lolly 实现建议优先级

  1. P0 - 基础 MQTT CONNECT 解析器
  2. P1 - 基于 Client ID 的路由
  3. P2 - MQTT Filter修改 CONNECT
  4. P3 - MQTT 5.0 高级特性

7.4 参考资料