跳转至

MQTT协议应用开发实战

学习目标

完成本教程后,你将能够:

  • 理解MQTT协议的工作原理和核心概念
  • 掌握发布/订阅模式的设计思想
  • 了解MQTT的三种QoS级别及其应用场景
  • 使用ESP32实现MQTT客户端功能
  • 将设备连接到MQTT云平台进行数据通信

前置要求

在开始本教程之前,你需要:

知识要求: - 了解TCP/IP协议基础知识 - 熟悉C语言编程 - 理解客户端-服务器架构

技能要求: - 能够使用Arduino IDE或ESP-IDF开发ESP32 - 会配置WiFi网络连接 - 了解基本的JSON数据格式

准备工作

硬件准备

名称 数量 说明 参考链接
ESP32开发板 1 ESP32-DevKitC或类似型号 -
DHT11温湿度传感器 1 可选,用于数据采集 -
LED灯 1 用于演示远程控制 -
电阻 1 220Ω,LED限流电阻 -
Micro USB数据线 1 用于供电和程序下载 -

软件准备

  • 开发环境:Arduino IDE 2.0+ 或 ESP-IDF v4.4+
  • MQTT库:PubSubClient (Arduino) 或 ESP-MQTT (ESP-IDF)
  • MQTT测试工具:MQTT.fx 或 MQTTX
  • MQTT服务器:可使用公共测试服务器 broker.emqx.io 或自建Mosquitto

环境配置

1. 安装Arduino IDE和ESP32支持

在Arduino IDE中添加ESP32开发板支持: - 打开 文件 -> 首选项 -> 附加开发板管理器网址 - 添加URL: https://raw.githubusercontent.com/espressif/arduino-esp32/gh-pages/package_esp32_index.json

2. 安装MQTT库

在Arduino IDE中: - 打开 工具 -> 管理库 - 搜索 "PubSubClient" - 点击安装

或使用ESP-IDF(已内置ESP-MQTT组件)

3. 测试WiFi连接

确保ESP32能够连接到WiFi网络。

MQTT协议基础

什么是MQTT?

MQTT (Message Queuing Telemetry Transport) 是一种轻量级的发布/订阅消息传输协议,专为物联网设备设计。

核心特点: - 轻量级:协议开销小,适合低带宽网络 - 发布/订阅模式:解耦消息发送者和接收者 - QoS保证:提供三种消息质量等级 - 持久会话:支持断线重连和消息缓存 - 遗嘱消息:设备异常断开时自动发送通知

MQTT架构

MQTT采用发布/订阅模式,通过中央Broker进行消息路由:

  • Broker(代理):消息中转服务器,负责接收、过滤和分发消息
  • Publisher(发布者):发布消息到特定主题的客户端
  • Subscriber(订阅者):订阅主题并接收消息的客户端
  • Client(客户端):可同时是发布者和订阅者

主题(Topic)

主题是MQTT消息路由的核心,采用层级结构:

home/livingroom/temperature
home/bedroom/humidity
device/sensor/001/data

通配符: - +:单层通配符,例如 home/+/temperature 匹配所有房间的温度 - #:多层通配符,例如 home/# 匹配home下的所有主题

QoS质量等级

QoS级别 名称 保证 应用场景
QoS 0 至多一次 消息可能丢失 环境监测、实时数据流
QoS 1 至少一次 消息至少送达一次,可能重复 日志记录、一般数据上报
QoS 2 恰好一次 消息恰好送达一次 计费系统、关键控制指令

步骤1:搭建MQTT测试环境

1.1 使用公共MQTT服务器

我们使用EMQX提供的免费公共服务器进行测试:

服务器地址: broker.emqx.io
端口: 1883 (TCP)
WebSocket端口: 8083
SSL端口: 8883

1.2 安装MQTT测试工具

下载并安装MQTTX客户端工具: - 访问 https://mqttx.app/ - 下载适合你操作系统的版本 - 安装并启动

1.3 测试连接

  1. 打开MQTTX
  2. 创建新连接:
  3. Name: Test Connection
  4. Host: broker.emqx.io
  5. Port: 1883
  6. Client ID: 自动生成
  7. 点击"Connect"
  8. 订阅测试主题:test/topic
  9. 发布测试消息

预期结果: - 连接状态显示为"Connected" - 能够成功发布和接收消息

步骤2:ESP32基础MQTT客户端

2.1 创建Arduino项目

创建新的Arduino项目:ESP32_MQTT_Basic

2.2 WiFi和MQTT配置

#include <WiFi.h>
#include <PubSubClient.h>

// WiFi配置
const char* ssid = "你的WiFi名称";
const char* password = "你的WiFi密码";

// MQTT配置
const char* mqtt_server = "broker.emqx.io";
const int mqtt_port = 1883;
const char* mqtt_client_id = "ESP32_Client_001";  // 客户端ID,需唯一

// 主题定义
const char* topic_pub = "esp32/sensor/data";      // 发布主题
const char* topic_sub = "esp32/control/led";      // 订阅主题

// 创建对象
WiFiClient espClient;
PubSubClient client(espClient);

// LED引脚
const int LED_PIN = 2;  // ESP32内置LED

配置说明: - ssidpassword:替换为你的WiFi信息 - mqtt_client_id:每个设备需要唯一的客户端ID - topic_pub:设备发布数据的主题 - topic_sub:设备订阅控制命令的主题

2.3 WiFi连接函数

void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("连接到WiFi: ");
    Serial.println(ssid);

    WiFi.mode(WIFI_STA);  // 设置为Station模式
    WiFi.begin(ssid, password);

    // 等待连接
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }

    Serial.println("");
    Serial.println("WiFi连接成功");
    Serial.print("IP地址: ");
    Serial.println(WiFi.localIP());
}

代码说明: - WiFi.mode(WIFI_STA):设置为Station(客户端)模式 - WiFi.begin():开始连接WiFi - 循环等待直到连接成功 - 打印IP地址用于调试

2.4 MQTT回调函数

// MQTT消息接收回调函数
void callback(char* topic, byte* payload, unsigned int length) {
    Serial.print("收到消息 [");
    Serial.print(topic);
    Serial.print("]: ");

    // 将payload转换为字符串
    String message = "";
    for (int i = 0; i < length; i++) {
        message += (char)payload[i];
    }
    Serial.println(message);

    // 处理LED控制命令
    if (String(topic) == topic_sub) {
        if (message == "ON") {
            digitalWrite(LED_PIN, HIGH);
            Serial.println("LED已打开");
        } else if (message == "OFF") {
            digitalWrite(LED_PIN, LOW);
            Serial.println("LED已关闭");
        }
    }
}

回调函数说明: - 当收到订阅主题的消息时自动调用 - topic:消息的主题 - payload:消息内容(字节数组) - length:消息长度 - 根据消息内容控制LED开关

2.5 MQTT连接和重连

void reconnect() {
    // 循环直到重新连接
    while (!client.connected()) {
        Serial.print("尝试MQTT连接...");

        // 尝试连接
        if (client.connect(mqtt_client_id)) {
            Serial.println("已连接");

            // 连接成功后订阅主题
            client.subscribe(topic_sub);
            Serial.print("已订阅主题: ");
            Serial.println(topic_sub);

            // 发布上线消息
            client.publish(topic_pub, "ESP32 Online");
        } else {
            Serial.print("连接失败, rc=");
            Serial.print(client.state());
            Serial.println(" 5秒后重试");
            delay(5000);
        }
    }
}

连接逻辑: - 检查连接状态,断开则重连 - client.connect():连接到MQTT服务器 - 连接成功后订阅控制主题 - 发布上线消息通知服务器 - 连接失败则等待5秒后重试

步骤3:完整的主程序

3.1 setup()函数

void setup() {
    // 初始化串口
    Serial.begin(115200);
    delay(1000);

    // 初始化LED引脚
    pinMode(LED_PIN, OUTPUT);
    digitalWrite(LED_PIN, LOW);

    // 连接WiFi
    setup_wifi();

    // 配置MQTT服务器
    client.setServer(mqtt_server, mqtt_port);
    client.setCallback(callback);

    Serial.println("初始化完成");
}

3.2 loop()函数

void loop() {
    // 确保MQTT连接
    if (!client.connected()) {
        reconnect();
    }
    client.loop();  // 处理MQTT消息

    // 每5秒发布一次传感器数据
    static unsigned long lastMsg = 0;
    unsigned long now = millis();

    if (now - lastMsg > 5000) {
        lastMsg = now;

        // 模拟传感器数据
        float temperature = 20.0 + random(0, 100) / 10.0;  // 20.0-30.0°C
        float humidity = 50.0 + random(0, 300) / 10.0;     // 50.0-80.0%

        // 构建JSON消息
        String payload = "{";
        payload += "\"temperature\":";
        payload += String(temperature, 1);
        payload += ",\"humidity\":";
        payload += String(humidity, 1);
        payload += ",\"device\":\"ESP32_001\"";
        payload += "}";

        // 发布消息
        Serial.print("发布消息: ");
        Serial.println(payload);
        client.publish(topic_pub, payload.c_str());
    }
}

主循环说明: - client.loop():必须定期调用,处理接收消息和保持连接 - 每5秒发布一次模拟的传感器数据 - 使用JSON格式组织数据 - client.publish():发布消息到指定主题

步骤4:编译和测试

4.1 编译上传

  1. 修改WiFi配置为你的网络信息
  2. 选择开发板:工具 -> 开发板 -> ESP32 Dev Module
  3. 选择端口:工具 -> 端口
  4. 点击上传按钮
  5. 打开串口监视器(波特率115200)

预期输出

连接到WiFi: YourWiFi
....
WiFi连接成功
IP地址: 192.168.1.100
尝试MQTT连接...已连接
已订阅主题: esp32/control/led
初始化完成
发布消息: {"temperature":25.3,"humidity":65.2,"device":"ESP32_001"}

4.2 使用MQTTX测试

测试数据接收:

  1. 在MQTTX中订阅主题:esp32/sensor/data
  2. 观察ESP32发送的数据
  3. 应该每5秒收到一条JSON格式的消息

测试远程控制:

  1. 在MQTTX中发布消息到主题:esp32/control/led
  2. 消息内容:ON
  3. 观察ESP32的LED是否点亮
  4. 发送消息:OFF
  5. 观察LED是否熄灭

步骤5:添加QoS和遗嘱消息

5.1 使用不同的QoS级别

// 修改发布函数,添加QoS参数
void publishWithQoS(const char* topic, const char* payload, int qos) {
    bool retained = false;  // 是否保留消息
    client.publish(topic, payload, retained, qos);
}

// 在loop()中使用
publishWithQoS(topic_pub, payload.c_str(), 1);  // 使用QoS 1

QoS选择建议: - 环境数据(温湿度):QoS 0(允许偶尔丢失) - 状态上报:QoS 1(确保送达) - 控制命令:QoS 1或2(确保执行)

5.2 配置遗嘱消息(Last Will)

void reconnect() {
    while (!client.connected()) {
        Serial.print("尝试MQTT连接...");

        // 配置遗嘱消息
        const char* willTopic = "esp32/status";
        const char* willMessage = "ESP32 Offline";
        int willQoS = 1;
        bool willRetain = true;

        // 带遗嘱消息的连接
        if (client.connect(mqtt_client_id, willTopic, willQoS, willRetain, willMessage)) {
            Serial.println("已连接");

            // 发布在线状态
            client.publish("esp32/status", "ESP32 Online", true);

            // 订阅控制主题
            client.subscribe(topic_sub, 1);  // QoS 1
        } else {
            Serial.print("连接失败, rc=");
            Serial.print(client.state());
            Serial.println(" 5秒后重试");
            delay(5000);
        }
    }
}

遗嘱消息说明: - 当设备异常断开时,Broker自动发布遗嘱消息 - willRetain = true:保留消息,新订阅者也能收到 - 用于监控设备在线状态

步骤6:实际传感器集成

6.1 添加DHT11温湿度传感器

#include <DHT.h>

// DHT11配置
#define DHTPIN 4        // DHT11数据引脚连接到GPIO4
#define DHTTYPE DHT11   // 传感器类型
DHT dht(DHTPIN, DHTTYPE);

void setup() {
    // ... 其他初始化代码 ...

    // 初始化DHT传感器
    dht.begin();
}

void loop() {
    // ... MQTT连接检查 ...

    if (now - lastMsg > 5000) {
        lastMsg = now;

        // 读取真实传感器数据
        float temperature = dht.readTemperature();
        float humidity = dht.readHumidity();

        // 检查读取是否成功
        if (isnan(temperature) || isnan(humidity)) {
            Serial.println("读取DHT传感器失败!");
            return;
        }

        // 构建并发布消息
        String payload = "{";
        payload += "\"temperature\":";
        payload += String(temperature, 1);
        payload += ",\"humidity\":";
        payload += String(humidity, 1);
        payload += ",\"device\":\"ESP32_001\"";
        payload += ",\"timestamp\":";
        payload += String(millis());
        payload += "}";

        Serial.print("发布消息: ");
        Serial.println(payload);
        client.publish(topic_pub, payload.c_str(), true);  // retained = true
    }
}

6.2 电路连接

DHT11引脚 ESP32引脚 说明
VCC 3.3V 电源正极
DATA GPIO4 数据引脚
GND GND 电源负极

注意:DHT11的DATA引脚需要一个4.7kΩ-10kΩ的上拉电阻连接到VCC。

步骤7:连接云平台

7.1 使用阿里云IoT平台

创建产品和设备:

  1. 登录阿里云IoT平台控制台
  2. 创建产品:选择"直连设备"
  3. 创建设备:记录设备证书信息
  4. ProductKey
  5. DeviceName
  6. DeviceSecret

修改连接参数:

// 阿里云IoT平台MQTT配置
const char* mqtt_server = "iot-as-mqtt.cn-shanghai.aliyuncs.com";
const int mqtt_port = 1883;

// 设备证书信息(从控制台获取)
const char* productKey = "a1AbCdEfGhI";
const char* deviceName = "device001";
const char* deviceSecret = "1234567890abcdef";

// 计算客户端ID和用户名密码
String clientId = String(deviceName) + "|securemode=3,signmethod=hmacsha1|";
String username = String(deviceName) + "&" + String(productKey);
String password = calculatePassword();  // 需要实现HMAC-SHA1签名

// 主题格式
String topic_pub = "/" + String(productKey) + "/" + String(deviceName) + "/user/update";
String topic_sub = "/" + String(productKey) + "/" + String(deviceName) + "/user/get";

7.2 使用EMQX Cloud

EMQX Cloud提供免费的MQTT云服务:

  1. 注册EMQX Cloud账号
  2. 创建免费部署
  3. 获取连接地址和端口
  4. 创建认证信息(用户名/密码)
// EMQX Cloud配置
const char* mqtt_server = "your-deployment.emqxsl.com";
const int mqtt_port = 1883;
const char* mqtt_user = "your_username";
const char* mqtt_password = "your_password";

// 带认证的连接
if (client.connect(mqtt_client_id, mqtt_user, mqtt_password)) {
    // 连接成功
}

故障排除

问题1:无法连接到MQTT服务器

可能原因: - WiFi未连接成功 - MQTT服务器地址或端口错误 - 客户端ID冲突 - 防火墙阻止连接

解决方法: 1. 检查WiFi连接状态 2. 使用MQTTX测试服务器是否可达 3. 确保客户端ID唯一 4. 检查网络防火墙设置

问题2:消息发布失败

可能原因: - 未连接到服务器 - 主题格式错误 - 消息过大 - QoS级别不支持

解决方法: 1. 检查client.connected()状态 2. 验证主题格式(不能包含空格) 3. 减小消息大小或增加缓冲区 4. 使用QoS 0或1

问题3:无法接收订阅消息

可能原因: - 未正确订阅主题 - 回调函数未设置 - 主题通配符使用错误 - client.loop()未调用

解决方法: 1. 确认订阅成功(检查返回值) 2. 确保调用了client.setCallback() 3. 检查主题匹配规则 4. 在loop()中定期调用client.loop()

问题4:频繁断线重连

可能原因: - WiFi信号不稳定 - Keep-Alive时间过短 - 服务器负载过高 - 客户端ID冲突

解决方法: 1. 改善WiFi信号质量 2. 增加Keep-Alive时间:client.setKeepAlive(60) 3. 更换MQTT服务器 4. 使用唯一的客户端ID

总结

通过本教程,你学习了:

  • ✅ MQTT协议的核心概念和工作原理
  • ✅ 发布/订阅模式的实现方法
  • ✅ 三种QoS级别的区别和应用
  • ✅ ESP32 MQTT客户端的完整开发流程
  • ✅ 传感器数据采集和远程控制
  • ✅ 云平台对接的基本方法

关键要点: - MQTT是物联网设备通信的首选协议 - 发布/订阅模式实现了设备间的解耦 - QoS级别根据应用场景选择 - 遗嘱消息用于设备状态监控 - 定期调用client.loop()保持连接

进阶挑战

尝试以下挑战来巩固学习:

  1. 挑战1:实现SSL/TLS加密连接(使用8883端口)
  2. 挑战2:添加断线重连时的消息缓存机制
  3. 挑战3:实现多个传感器数据的批量上报
  4. 挑战4:使用retained消息实现设备配置持久化
  5. 挑战5:开发一个简单的Web控制面板

完整代码

完整的项目代码可以在GitHub上找到:

GitHub仓库: https://github.com/embedded-knowledge/mqtt-esp32-tutorial

项目包含: - 基础MQTT客户端代码 - DHT11传感器集成示例 - 云平台对接示例 - Web控制面板示例

下一步

建议继续学习:

  • CoAP轻量级协议 - 学习另一种物联网协议
  • HTTP/HTTPS协议应用 - 学习Web通信
  • WebSocket实时通信 - 学习双向实时通信
  • AWS IoT平台接入 - 学习云平台集成

参考资料

  1. MQTT 3.1.1协议规范 - http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
  2. MQTT 5.0协议规范 - https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
  3. PubSubClient库文档 - https://pubsubclient.knolleary.net/
  4. ESP-MQTT组件文档 - https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/protocols/mqtt.html
  5. EMQX MQTT Broker - https://www.emqx.io/
  6. Mosquitto MQTT Broker - https://mosquitto.org/

常见MQTT术语

  • Broker:MQTT消息代理服务器
  • Client:MQTT客户端(发布者或订阅者)
  • Topic:消息主题,用于消息路由
  • Payload:消息内容
  • QoS:服务质量等级
  • Retain:保留消息标志
  • Clean Session:清除会话标志
  • Keep Alive:心跳间隔
  • Last Will:遗嘱消息

反馈:如果你在学习过程中遇到问题,欢迎在评论区留言或提交Issue!