跳转至

私有云IoT平台搭建实战

项目概述

项目简介

本项目将带你从零开始搭建一个功能完整的私有IoT平台,实现设备接入、数据采集、存储、处理、可视化等核心功能。该平台适用于企业内部部署,保证数据安全性和可控性,同时具备良好的扩展性和可维护性。

私有IoT平台相比公有云平台的优势: - 数据安全:数据完全掌控在自己手中,不会泄露到外部 - 成本可控:无需按设备数量或流量付费,一次投入长期使用 - 定制灵活:可以根据业务需求深度定制功能 - 网络独立:可以在内网环境运行,不依赖互联网 - 合规要求:满足某些行业对数据本地化的要求

项目演示

完成后的平台将包含以下功能界面:

┌─────────────────────────────────────────────────┐
│  私有IoT平台管理界面                              │
├─────────────────────────────────────────────────┤
│  [设备管理] [数据监控] [规则引擎] [系统设置]      │
├─────────────────────────────────────────────────┤
│  在线设备: 156    离线设备: 24    告警: 3        │
│                                                  │
│  实时数据流:                                     │
│  ┌──────────────────────────────────────────┐  │
│  │ Device-001: Temp=25.3°C, Humi=65%        │  │
│  │ Device-002: Voltage=3.7V, Current=120mA  │  │
│  │ Device-003: Status=Running, CPU=45%      │  │
│  └──────────────────────────────────────────┘  │
│                                                  │
│  数据可视化:                                     │
│  [温度趋势图] [设备分布图] [告警统计]            │
└─────────────────────────────────────────────────┘

学习目标

完成本项目后,你将掌握:

  • 平台架构设计:理解IoT平台的整体架构和各组件的作用
  • MQTT Broker部署:掌握EMQX等MQTT服务器的部署和配置
  • 设备管理系统:实现设备注册、认证、状态管理等功能
  • 时序数据存储:使用InfluxDB存储和查询时序数据
  • 规则引擎开发:实现数据处理、告警、联动等业务逻辑
  • Web界面开发:构建设备管理和数据可视化界面
  • API接口设计:设计RESTful API供第三方系统集成
  • 系统部署运维:掌握Docker容器化部署和监控

项目特点

  • 完整的平台功能:涵盖设备管理、数据采集、存储、处理、可视化全流程
  • 微服务架构:采用模块化设计,各组件独立部署,易于扩展
  • 容器化部署:使用Docker Compose一键部署,简化运维
  • 高性能设计:支持万级设备并发接入,百万级消息吞吐
  • 可视化界面:提供友好的Web管理界面和实时数据大屏
  • 规则引擎:灵活的规则配置,支持复杂的业务逻辑
  • 开放接口:提供完整的RESTful API,方便第三方集成
  • 生产就绪:包含认证、权限、日志、监控等企业级特性

技术栈

后端技术

  • 开发语言:Python 3.9+
  • Web框架:FastAPI (高性能异步框架)
  • MQTT Broker:EMQX 5.0 (企业级MQTT服务器)
  • 时序数据库:InfluxDB 2.x (专为时序数据优化)
  • 关系数据库:PostgreSQL 14 (存储设备信息、用户等)
  • 缓存数据库:Redis 7.0 (缓存和消息队列)
  • 消息队列:Redis Streams (事件驱动)

前端技术

  • 框架:Vue 3 + TypeScript
  • UI组件库:Element Plus
  • 图表库:ECharts 5.x
  • 实时通信:WebSocket
  • 状态管理:Pinia

运维工具

  • 容器化:Docker + Docker Compose
  • 反向代理:Nginx
  • 监控:Prometheus + Grafana
  • 日志:ELK Stack (可选)

开发工具

  • IDE:VS Code / PyCharm
  • API测试:Postman / Insomnia
  • MQTT测试:MQTTX
  • 数据库管理:DBeaver / DataGrip

硬件清单

服务器要求

本项目可以在单台服务器上部署,也可以分布式部署。

最小配置(开发测试)

组件 配置 说明
CPU 4核 支持100+设备
内存 8GB 基本运行需求
硬盘 100GB SSD 存储约1个月数据
网络 100Mbps 内网即可
操作系统 Ubuntu 22.04 LTS 推荐使用

参考成本:云服务器约 ¥200/月,或使用本地服务器

推荐配置(生产环境)

组件 配置 说明
CPU 8核+ 支持1000+设备
内存 16GB+ 保证性能
硬盘 500GB SSD 存储约6个月数据
网络 1Gbps 高并发需求
操作系统 Ubuntu 22.04 LTS 稳定可靠

参考成本:云服务器约 ¥500-800/月

测试设备(可选)

用于测试平台功能的IoT设备:

设备 型号 数量 用途 参考价格
开发板 ESP32-DevKitC 2-3 模拟IoT设备 ¥30/个
传感器 DHT22 2 温湿度数据 ¥15/个
电源 USB电源适配器 2-3 供电 ¥10/个

总成本:约 ¥100-150(可选)

软件要求

开发环境

# 必需软件
- Docker 20.10+
- Docker Compose 2.0+
- Git 2.30+
- Python 3.9+
- Node.js 16+
- npm 8+

# 推荐工具
- VS Code
- Postman
- MQTTX

安装Docker和Docker Compose

# Ubuntu系统安装Docker
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh

# 安装Docker Compose
sudo curl -L "https://github.com/docker/compose/releases/download/v2.20.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose

# 验证安装
docker --version
docker-compose --version

系统架构

整体架构

graph TB
    subgraph "设备层"
        D1[IoT设备1]
        D2[IoT设备2]
        D3[IoT设备N]
    end

    subgraph "接入层"
        MQTT[MQTT Broker<br/>EMQX]
    end

    subgraph "应用层"
        API[API服务<br/>FastAPI]
        RULE[规则引擎]
        AUTH[认证服务]
    end

    subgraph "数据层"
        PG[(PostgreSQL<br/>设备信息)]
        INFLUX[(InfluxDB<br/>时序数据)]
        REDIS[(Redis<br/>缓存)]
    end

    subgraph "前端层"
        WEB[Web管理界面<br/>Vue3]
    end

    D1 -->|MQTT| MQTT
    D2 -->|MQTT| MQTT
    D3 -->|MQTT| MQTT

    MQTT -->|消息| API
    MQTT -->|消息| RULE

    API --> PG
    API --> INFLUX
    API --> REDIS

    RULE --> INFLUX
    RULE --> REDIS

    WEB -->|HTTP/WebSocket| API
    WEB --> AUTH

核心模块说明

1. MQTT Broker(EMQX)

  • 功能:设备接入、消息路由、协议转换
  • 特性
  • 支持MQTT 3.1.1和5.0协议
  • 百万级并发连接
  • 集群部署支持
  • 规则引擎集成
  • WebSocket支持
  • 端口
  • 1883: MQTT
  • 8883: MQTT over TLS
  • 8083: WebSocket
  • 18083: 管理界面

2. API服务(FastAPI)

  • 功能:业务逻辑、设备管理、数据查询
  • 模块
  • 设备管理:注册、认证、状态管理
  • 数据服务:数据查询、统计、导出
  • 用户管理:用户认证、权限控制
  • 规则管理:规则配置、执行监控
  • 端口:8000

3. 规则引擎

  • 功能:数据处理、告警、设备联动
  • 规则类型
  • 数据转换:格式转换、单位换算
  • 条件告警:阈值告警、异常检测
  • 设备联动:条件触发、场景自动化
  • 数据转发:第三方系统集成
  • 实现:基于Python的规则引擎

4. 数据存储

  • PostgreSQL
  • 设备信息(ID、名称、类型、状态)
  • 用户信息(账号、权限、配置)
  • 规则配置(规则定义、执行记录)

  • InfluxDB

  • 设备上报的时序数据
  • 系统监控指标
  • 告警历史记录

  • Redis

  • 设备在线状态
  • 会话信息
  • 消息队列
  • 缓存数据

5. Web管理界面

  • 功能模块
  • 设备管理:设备列表、详情、配置
  • 数据监控:实时数据、历史曲线
  • 规则配置:规则创建、编辑、测试
  • 告警中心:告警列表、处理、统计
  • 系统设置:用户管理、权限配置
  • 端口:80/443

数据流图

sequenceDiagram
    participant Device as IoT设备
    participant MQTT as MQTT Broker
    participant API as API服务
    participant Rule as 规则引擎
    participant DB as 数据库
    participant Web as Web界面

    Device->>MQTT: 1. 连接认证
    MQTT->>API: 2. 验证设备凭证
    API->>DB: 3. 查询设备信息
    DB-->>API: 4. 返回设备信息
    API-->>MQTT: 5. 认证结果
    MQTT-->>Device: 6. 连接成功

    Device->>MQTT: 7. 发布数据
    MQTT->>Rule: 8. 触发规则
    Rule->>DB: 9. 存储数据
    Rule->>Rule: 10. 执行规则逻辑
    Rule->>MQTT: 11. 发送控制命令(如需要)
    MQTT->>Device: 12. 下发命令

    Web->>API: 13. 查询数据
    API->>DB: 14. 读取数据
    DB-->>API: 15. 返回数据
    API-->>Web: 16. 展示数据

实现步骤

阶段1:环境搭建 (预计1小时)

1.1 创建项目目录

# 创建项目根目录
mkdir iot-platform
cd iot-platform

# 创建目录结构
mkdir -p backend/{app,tests}
mkdir -p backend/app/{api,models,services,core}
mkdir -p frontend/{src,public}
mkdir -p docker
mkdir -p data/{postgres,influxdb,redis,emqx}
mkdir -p config
mkdir -p logs

# 项目结构
tree -L 2

项目结构:

iot-platform/
├── backend/              # 后端服务
│   ├── app/             # 应用代码
│   │   ├── api/         # API路由
│   │   ├── models/      # 数据模型
│   │   ├── services/    # 业务逻辑
│   │   └── core/        # 核心配置
│   ├── tests/           # 测试代码
│   └── requirements.txt # Python依赖
├── frontend/            # 前端界面
│   ├── src/            # 源代码
│   └── public/         # 静态资源
├── docker/             # Docker配置
│   └── docker-compose.yml
├── config/             # 配置文件
├── data/               # 数据目录
└── logs/               # 日志目录

1.2 编写Docker Compose配置

创建 docker/docker-compose.yml

version: '3.8'

services:
  # PostgreSQL数据库
  postgres:
    image: postgres:14-alpine
    container_name: iot-postgres
    environment:
      POSTGRES_DB: iot_platform
      POSTGRES_USER: iot_user
      POSTGRES_PASSWORD: iot_password
    ports:
      - "5432:5432"
    volumes:
      - ../data/postgres:/var/lib/postgresql/data
    networks:
      - iot-network
    restart: unless-stopped

  # Redis缓存
  redis:
    image: redis:7-alpine
    container_name: iot-redis
    command: redis-server --appendonly yes
    ports:
      - "6379:6379"
    volumes:
      - ../data/redis:/data
    networks:
      - iot-network
    restart: unless-stopped

  # InfluxDB时序数据库
  influxdb:
    image: influxdb:2.7-alpine
    container_name: iot-influxdb
    environment:
      DOCKER_INFLUXDB_INIT_MODE: setup
      DOCKER_INFLUXDB_INIT_USERNAME: admin
      DOCKER_INFLUXDB_INIT_PASSWORD: adminpassword
      DOCKER_INFLUXDB_INIT_ORG: iot-org
      DOCKER_INFLUXDB_INIT_BUCKET: iot-data
      DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: my-super-secret-auth-token
    ports:
      - "8086:8086"
    volumes:
      - ../data/influxdb:/var/lib/influxdb2
    networks:
      - iot-network
    restart: unless-stopped

  # EMQX MQTT Broker
  emqx:
    image: emqx/emqx:5.0.0
    container_name: iot-emqx
    environment:
      EMQX_NAME: emqx
      EMQX_HOST: 127.0.0.1
    ports:
      - "1883:1883"      # MQTT
      - "8883:8883"      # MQTT/SSL
      - "8083:8083"      # WebSocket
      - "18083:18083"    # Dashboard
    volumes:
      - ../data/emqx:/opt/emqx/data
    networks:
      - iot-network
    restart: unless-stopped

  # API服务(后续添加)
  # api:
  #   build: ../backend
  #   container_name: iot-api
  #   ...

networks:
  iot-network:
    driver: bridge

1.3 启动基础服务

# 进入docker目录
cd docker

# 启动所有服务
docker-compose up -d

# 查看服务状态
docker-compose ps

# 查看日志
docker-compose logs -f

# 验证服务
# PostgreSQL
docker exec -it iot-postgres psql -U iot_user -d iot_platform -c "SELECT version();"

# Redis
docker exec -it iot-redis redis-cli ping

# InfluxDB
curl http://localhost:8086/health

# EMQX
curl http://localhost:18083
# 访问 http://localhost:18083 (默认账号: admin/public)

阶段2:后端开发 (预计4小时)

2.1 初始化Python项目

创建 backend/requirements.txt

# Web框架
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
pydantic-settings==2.1.0

# 数据库
sqlalchemy==2.0.23
asyncpg==0.29.0
psycopg2-binary==2.9.9
influxdb-client==1.38.0
redis==5.0.1

# MQTT客户端
paho-mqtt==1.6.1

# 认证和安全
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.6

# 工具库
python-dotenv==1.0.0
loguru==0.7.2

安装依赖:

cd backend
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate
pip install -r requirements.txt

2.2 创建数据模型

创建 backend/app/models/device.py

from sqlalchemy import Column, String, Integer, DateTime, Boolean, JSON
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime

Base = declarative_base()

class Device(Base):
    """设备模型"""
    __tablename__ = "devices"

    id = Column(String(64), primary_key=True)
    name = Column(String(128), nullable=False)
    device_type = Column(String(64), nullable=False)
    product_key = Column(String(64), nullable=False)
    device_secret = Column(String(128), nullable=False)

    # 状态信息
    status = Column(String(32), default="offline")  # online, offline
    last_online = Column(DateTime, nullable=True)
    last_offline = Column(DateTime, nullable=True)

    # 配置信息
    config = Column(JSON, default={})
    tags = Column(JSON, default=[])

    # 元数据
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    is_active = Column(Boolean, default=True)

    def to_dict(self):
        """转换为字典"""
        return {
            "id": self.id,
            "name": self.name,
            "device_type": self.device_type,
            "status": self.status,
            "last_online": self.last_online.isoformat() if self.last_online else None,
            "config": self.config,
            "tags": self.tags,
            "created_at": self.created_at.isoformat(),
        }

创建 backend/app/models/telemetry.py

from pydantic import BaseModel
from datetime import datetime
from typing import Dict, Any

class TelemetryData(BaseModel):
    """遥测数据模型"""
    device_id: str
    timestamp: datetime
    data: Dict[str, Any]

    class Config:
        json_schema_extra = {
            "example": {
                "device_id": "device-001",
                "timestamp": "2024-01-01T12:00:00Z",
                "data": {
                    "temperature": 25.5,
                    "humidity": 65.0,
                    "voltage": 3.7
                }
            }
        }

2.3 实现设备管理服务

创建 backend/app/services/device_service.py

from sqlalchemy.orm import Session
from sqlalchemy import select
from typing import List, Optional
import secrets
from datetime import datetime

from app.models.device import Device

class DeviceService:
    """设备管理服务"""

    def __init__(self, db: Session):
        self.db = db

    async def create_device(
        self,
        name: str,
        device_type: str,
        product_key: str,
        config: dict = None,
        tags: list = None
    ) -> Device:
        """创建设备"""
        # 生成设备ID和密钥
        device_id = f"device-{secrets.token_hex(8)}"
        device_secret = secrets.token_urlsafe(32)

        device = Device(
            id=device_id,
            name=name,
            device_type=device_type,
            product_key=product_key,
            device_secret=device_secret,
            config=config or {},
            tags=tags or []
        )

        self.db.add(device)
        await self.db.commit()
        await self.db.refresh(device)

        return device

    async def get_device(self, device_id: str) -> Optional[Device]:
        """获取设备信息"""
        result = await self.db.execute(
            select(Device).where(Device.id == device_id)
        )
        return result.scalar_one_or_none()

    async def list_devices(
        self,
        skip: int = 0,
        limit: int = 100,
        status: Optional[str] = None
    ) -> List[Device]:
        """获取设备列表"""
        query = select(Device)

        if status:
            query = query.where(Device.status == status)

        query = query.offset(skip).limit(limit)
        result = await self.db.execute(query)
        return result.scalars().all()

    async def update_device_status(
        self,
        device_id: str,
        status: str
    ) -> Optional[Device]:
        """更新设备状态"""
        device = await self.get_device(device_id)
        if not device:
            return None

        device.status = status
        if status == "online":
            device.last_online = datetime.utcnow()
        elif status == "offline":
            device.last_offline = datetime.utcnow()

        await self.db.commit()
        await self.db.refresh(device)

        return device

    async def delete_device(self, device_id: str) -> bool:
        """删除设备"""
        device = await self.get_device(device_id)
        if not device:
            return False

        await self.db.delete(device)
        await self.db.commit()

        return True

2.4 实现数据存储服务

创建 backend/app/services/telemetry_service.py

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime
from typing import List, Dict, Any

class TelemetryService:
    """遥测数据服务"""

    def __init__(self, url: str, token: str, org: str, bucket: str):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()
        self.org = org
        self.bucket = bucket

    async def write_telemetry(
        self,
        device_id: str,
        data: Dict[str, Any],
        timestamp: datetime = None
    ):
        """写入遥测数据"""
        point = Point("telemetry") \
            .tag("device_id", device_id) \
            .time(timestamp or datetime.utcnow())

        # 添加所有数据字段
        for key, value in data.items():
            if isinstance(value, (int, float)):
                point = point.field(key, value)
            elif isinstance(value, str):
                point = point.field(key, value)
            elif isinstance(value, bool):
                point = point.field(key, value)

        self.write_api.write(bucket=self.bucket, record=point)

    async def query_latest(
        self,
        device_id: str,
        limit: int = 10
    ) -> List[Dict]:
        """查询最新数据"""
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: -24h)
          |> filter(fn: (r) => r["_measurement"] == "telemetry")
          |> filter(fn: (r) => r["device_id"] == "{device_id}")
          |> sort(columns: ["_time"], desc: true)
          |> limit(n: {limit})
        '''

        result = self.query_api.query(query=query, org=self.org)

        # 解析结果
        data_points = []
        for table in result:
            for record in table.records:
                data_points.append({
                    "time": record.get_time().isoformat(),
                    "field": record.get_field(),
                    "value": record.get_value(),
                    "device_id": record.values.get("device_id")
                })

        return data_points

    async def query_range(
        self,
        device_id: str,
        start: datetime,
        end: datetime,
        field: str = None
    ) -> List[Dict]:
        """查询时间范围数据"""
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: {start.isoformat()}Z, stop: {end.isoformat()}Z)
          |> filter(fn: (r) => r["_measurement"] == "telemetry")
          |> filter(fn: (r) => r["device_id"] == "{device_id}")
        '''

        if field:
            query += f'  |> filter(fn: (r) => r["_field"] == "{field}")'

        result = self.query_api.query(query=query, org=self.org)

        data_points = []
        for table in result:
            for record in table.records:
                data_points.append({
                    "time": record.get_time().isoformat(),
                    "field": record.get_field(),
                    "value": record.get_value()
                })

        return data_points

2.5 实现MQTT消息处理

创建 backend/app/services/mqtt_service.py

import paho.mqtt.client as mqtt
import json
from loguru import logger
from typing import Callable

class MQTTService:
    """MQTT服务"""

    def __init__(
        self,
        broker_host: str,
        broker_port: int = 1883,
        username: str = None,
        password: str = None
    ):
        self.broker_host = broker_host
        self.broker_port = broker_port
        self.client = mqtt.Client()

        if username and password:
            self.client.username_pw_set(username, password)

        # 设置回调
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_disconnect = self._on_disconnect

        # 消息处理器
        self.message_handlers = {}

    def _on_connect(self, client, userdata, flags, rc):
        """连接回调"""
        if rc == 0:
            logger.info(f"Connected to MQTT broker: {self.broker_host}")
            # 订阅设备上行主题
            self.client.subscribe("device/+/telemetry")
            self.client.subscribe("device/+/event")
        else:
            logger.error(f"Failed to connect to MQTT broker, code: {rc}")

    def _on_message(self, client, userdata, msg):
        """消息回调"""
        try:
            topic = msg.topic
            payload = json.loads(msg.payload.decode())

            logger.info(f"Received message on topic: {topic}")

            # 解析设备ID
            parts = topic.split('/')
            if len(parts) >= 3:
                device_id = parts[1]
                message_type = parts[2]

                # 调用对应的处理器
                handler_key = f"{message_type}"
                if handler_key in self.message_handlers:
                    self.message_handlers[handler_key](device_id, payload)

        except Exception as e:
            logger.error(f"Error processing message: {e}")

    def _on_disconnect(self, client, userdata, rc):
        """断开连接回调"""
        logger.warning(f"Disconnected from MQTT broker, code: {rc}")

    def connect(self):
        """连接到MQTT Broker"""
        self.client.connect(self.broker_host, self.broker_port, 60)
        self.client.loop_start()

    def disconnect(self):
        """断开连接"""
        self.client.loop_stop()
        self.client.disconnect()

    def register_handler(self, message_type: str, handler: Callable):
        """注册消息处理器"""
        self.message_handlers[message_type] = handler

    def publish(self, topic: str, payload: dict):
        """发布消息"""
        self.client.publish(topic, json.dumps(payload))

    def send_command(self, device_id: str, command: dict):
        """发送命令到设备"""
        topic = f"device/{device_id}/command"
        self.publish(topic, command)

2.6 创建API接口

创建 backend/app/api/devices.py

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List

from app.services.device_service import DeviceService
from app.models.device import Device
from pydantic import BaseModel

router = APIRouter(prefix="/devices", tags=["devices"])

# 请求模型
class DeviceCreate(BaseModel):
    name: str
    device_type: str
    product_key: str
    config: dict = {}
    tags: list = []

class DeviceUpdate(BaseModel):
    name: str = None
    config: dict = None
    tags: list = None

# 响应模型
class DeviceResponse(BaseModel):
    id: str
    name: str
    device_type: str
    status: str
    last_online: str = None
    config: dict
    tags: list
    created_at: str

@router.post("/", response_model=DeviceResponse, status_code=status.HTTP_201_CREATED)
async def create_device(
    device: DeviceCreate,
    db: Session = Depends(get_db)
):
    """创建设备"""
    service = DeviceService(db)
    new_device = await service.create_device(
        name=device.name,
        device_type=device.device_type,
        product_key=device.product_key,
        config=device.config,
        tags=device.tags
    )
    return new_device.to_dict()

@router.get("/", response_model=List[DeviceResponse])
async def list_devices(
    skip: int = 0,
    limit: int = 100,
    status: str = None,
    db: Session = Depends(get_db)
):
    """获取设备列表"""
    service = DeviceService(db)
    devices = await service.list_devices(skip=skip, limit=limit, status=status)
    return [device.to_dict() for device in devices]

@router.get("/{device_id}", response_model=DeviceResponse)
async def get_device(
    device_id: str,
    db: Session = Depends(get_db)
):
    """获取设备详情"""
    service = DeviceService(db)
    device = await service.get_device(device_id)
    if not device:
        raise HTTPException(status_code=404, detail="Device not found")
    return device.to_dict()

@router.delete("/{device_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_device(
    device_id: str,
    db: Session = Depends(get_db)
):
    """删除设备"""
    service = DeviceService(db)
    success = await service.delete_device(device_id)
    if not success:
        raise HTTPException(status_code=404, detail="Device not found")

创建 backend/app/api/telemetry.py

from fastapi import APIRouter, Depends, Query
from datetime import datetime, timedelta
from typing import List

from app.services.telemetry_service import TelemetryService

router = APIRouter(prefix="/telemetry", tags=["telemetry"])

@router.get("/{device_id}/latest")
async def get_latest_telemetry(
    device_id: str,
    limit: int = Query(10, ge=1, le=100),
    telemetry_service: TelemetryService = Depends(get_telemetry_service)
):
    """获取设备最新数据"""
    data = await telemetry_service.query_latest(device_id, limit)
    return {"device_id": device_id, "data": data}

@router.get("/{device_id}/history")
async def get_telemetry_history(
    device_id: str,
    start: datetime = Query(default=None),
    end: datetime = Query(default=None),
    field: str = Query(default=None),
    telemetry_service: TelemetryService = Depends(get_telemetry_service)
):
    """获取设备历史数据"""
    if not start:
        start = datetime.utcnow() - timedelta(hours=24)
    if not end:
        end = datetime.utcnow()

    data = await telemetry_service.query_range(device_id, start, end, field)
    return {
        "device_id": device_id,
        "start": start.isoformat(),
        "end": end.isoformat(),
        "data": data
    }

2.7 创建主应用

创建 backend/app/main.py

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from loguru import logger

from app.api import devices, telemetry
from app.services.mqtt_service import MQTTService
from app.services.telemetry_service import TelemetryService
from app.services.device_service import DeviceService
from app.core.config import settings
from app.core.database import engine, Base

# MQTT服务实例
mqtt_service = None
telemetry_service = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    global mqtt_service, telemetry_service

    # 启动时初始化
    logger.info("Starting IoT Platform...")

    # 创建数据库表
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    # 初始化服务
    telemetry_service = TelemetryService(
        url=settings.INFLUXDB_URL,
        token=settings.INFLUXDB_TOKEN,
        org=settings.INFLUXDB_ORG,
        bucket=settings.INFLUXDB_BUCKET
    )

    mqtt_service = MQTTService(
        broker_host=settings.MQTT_BROKER_HOST,
        broker_port=settings.MQTT_BROKER_PORT
    )

    # 注册MQTT消息处理器
    async def handle_telemetry(device_id: str, payload: dict):
        """处理遥测数据"""
        try:
            await telemetry_service.write_telemetry(device_id, payload)
            logger.info(f"Stored telemetry from {device_id}")
        except Exception as e:
            logger.error(f"Error storing telemetry: {e}")

    mqtt_service.register_handler("telemetry", handle_telemetry)
    mqtt_service.connect()

    logger.info("IoT Platform started successfully")

    yield

    # 关闭时清理
    logger.info("Shutting down IoT Platform...")
    mqtt_service.disconnect()
    await engine.dispose()

# 创建FastAPI应用
app = FastAPI(
    title="IoT Platform API",
    description="私有IoT平台API接口",
    version="1.0.0",
    lifespan=lifespan
)

# 配置CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 注册路由
app.include_router(devices.router, prefix="/api/v1")
app.include_router(telemetry.router, prefix="/api/v1")

@app.get("/")
async def root():
    """根路径"""
    return {
        "name": "IoT Platform API",
        "version": "1.0.0",
        "status": "running"
    }

@app.get("/health")
async def health_check():
    """健康检查"""
    return {"status": "healthy"}

创建 backend/app/core/config.py

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    """应用配置"""

    # 应用配置
    APP_NAME: str = "IoT Platform"
    DEBUG: bool = True

    # 数据库配置
    DATABASE_URL: str = "postgresql+asyncpg://iot_user:iot_password@localhost:5432/iot_platform"

    # Redis配置
    REDIS_URL: str = "redis://localhost:6379/0"

    # InfluxDB配置
    INFLUXDB_URL: str = "http://localhost:8086"
    INFLUXDB_TOKEN: str = "my-super-secret-auth-token"
    INFLUXDB_ORG: str = "iot-org"
    INFLUXDB_BUCKET: str = "iot-data"

    # MQTT配置
    MQTT_BROKER_HOST: str = "localhost"
    MQTT_BROKER_PORT: int = 1883

    class Config:
        env_file = ".env"

settings = Settings()

阶段3:规则引擎开发 (预计2小时)

3.1 规则引擎设计

创建 backend/app/services/rule_engine.py

from typing import Dict, Any, Callable, List
from loguru import logger
import asyncio

class Rule:
    """规则定义"""

    def __init__(
        self,
        rule_id: str,
        name: str,
        condition: Callable,
        actions: List[Callable],
        enabled: bool = True
    ):
        self.rule_id = rule_id
        self.name = name
        self.condition = condition
        self.actions = actions
        self.enabled = enabled

    async def evaluate(self, data: Dict[str, Any]) -> bool:
        """评估规则条件"""
        try:
            return await self.condition(data)
        except Exception as e:
            logger.error(f"Error evaluating rule {self.rule_id}: {e}")
            return False

    async def execute(self, data: Dict[str, Any]):
        """执行规则动作"""
        if not self.enabled:
            return

        if await self.evaluate(data):
            logger.info(f"Rule {self.rule_id} triggered")
            for action in self.actions:
                try:
                    await action(data)
                except Exception as e:
                    logger.error(f"Error executing action: {e}")

class RuleEngine:
    """规则引擎"""

    def __init__(self):
        self.rules: Dict[str, Rule] = {}

    def add_rule(self, rule: Rule):
        """添加规则"""
        self.rules[rule.rule_id] = rule
        logger.info(f"Added rule: {rule.name}")

    def remove_rule(self, rule_id: str):
        """移除规则"""
        if rule_id in self.rules:
            del self.rules[rule_id]
            logger.info(f"Removed rule: {rule_id}")

    def enable_rule(self, rule_id: str):
        """启用规则"""
        if rule_id in self.rules:
            self.rules[rule_id].enabled = True

    def disable_rule(self, rule_id: str):
        """禁用规则"""
        if rule_id in self.rules:
            self.rules[rule_id].enabled = False

    async def process_data(self, device_id: str, data: Dict[str, Any]):
        """处理数据,触发规则"""
        context = {
            "device_id": device_id,
            "data": data
        }

        # 并发执行所有规则
        tasks = [
            rule.execute(context)
            for rule in self.rules.values()
            if rule.enabled
        ]

        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)

3.2 实现常用规则

创建 backend/app/rules/common_rules.py

from app.services.rule_engine import Rule
from loguru import logger

# 温度告警规则
async def temperature_condition(data: dict) -> bool:
    """温度超过阈值"""
    temp = data.get("data", {}).get("temperature")
    if temp is None:
        return False
    return temp > 30.0

async def temperature_alert_action(data: dict):
    """发送温度告警"""
    device_id = data.get("device_id")
    temp = data.get("data", {}).get("temperature")
    logger.warning(f"Temperature alert: Device {device_id}, Temp: {temp}°C")
    # 这里可以发送邮件、短信、推送通知等

temperature_alert_rule = Rule(
    rule_id="temp_alert",
    name="温度告警规则",
    condition=temperature_condition,
    actions=[temperature_alert_action]
)

# 设备离线告警规则
async def device_offline_condition(data: dict) -> bool:
    """设备离线"""
    return data.get("event_type") == "offline"

async def device_offline_action(data: dict):
    """设备离线处理"""
    device_id = data.get("device_id")
    logger.warning(f"Device offline: {device_id}")
    # 发送告警通知

device_offline_rule = Rule(
    rule_id="device_offline",
    name="设备离线告警",
    condition=device_offline_condition,
    actions=[device_offline_action]
)

# 数据转发规则
async def data_forward_condition(data: dict) -> bool:
    """所有数据都转发"""
    return True

async def data_forward_action(data: dict):
    """转发数据到第三方系统"""
    # 这里可以调用第三方API
    logger.info(f"Forwarding data: {data}")

data_forward_rule = Rule(
    rule_id="data_forward",
    name="数据转发规则",
    condition=data_forward_condition,
    actions=[data_forward_action],
    enabled=False  # 默认禁用
)

阶段4:前端开发 (预计2小时)

4.1 初始化Vue项目

cd frontend

# 使用Vite创建Vue3项目
npm create vite@latest . -- --template vue-ts

# 安装依赖
npm install

# 安装UI组件库和其他依赖
npm install element-plus
npm install @element-plus/icons-vue
npm install axios
npm install echarts
npm install vue-router@4
npm install pinia

4.2 创建设备管理页面

创建 frontend/src/views/DeviceList.vue

<template>
  <div class="device-list">
    <el-card>
      <template #header>
        <div class="card-header">
          <span>设备管理</span>
          <el-button type="primary" @click="showCreateDialog">
            <el-icon><Plus /></el-icon>
            添加设备
          </el-button>
        </div>
      </template>

      <!-- 搜索栏 -->
      <el-form :inline="true" :model="searchForm">
        <el-form-item label="设备状态">
          <el-select v-model="searchForm.status" placeholder="全部" clearable>
            <el-option label="在线" value="online" />
            <el-option label="离线" value="offline" />
          </el-select>
        </el-form-item>
        <el-form-item>
          <el-button type="primary" @click="loadDevices">查询</el-button>
        </el-form-item>
      </el-form>

      <!-- 设备列表 -->
      <el-table :data="devices" style="width: 100%">
        <el-table-column prop="id" label="设备ID" width="180" />
        <el-table-column prop="name" label="设备名称" width="150" />
        <el-table-column prop="device_type" label="设备类型" width="120" />
        <el-table-column prop="status" label="状态" width="100">
          <template #default="scope">
            <el-tag :type="scope.row.status === 'online' ? 'success' : 'info'">
              {{ scope.row.status === 'online' ? '在线' : '离线' }}
            </el-tag>
          </template>
        </el-table-column>
        <el-table-column prop="last_online" label="最后在线" width="180" />
        <el-table-column label="操作" fixed="right" width="200">
          <template #default="scope">
            <el-button size="small" @click="viewDevice(scope.row)">
              查看
            </el-button>
            <el-button size="small" type="danger" @click="deleteDevice(scope.row)">
              删除
            </el-button>
          </template>
        </el-table-column>
      </el-table>

      <!-- 分页 -->
      <el-pagination
        v-model:current-page="pagination.page"
        v-model:page-size="pagination.pageSize"
        :total="pagination.total"
        @current-change="loadDevices"
      />
    </el-card>

    <!-- 创建设备对话框 -->
    <el-dialog v-model="createDialogVisible" title="添加设备" width="500px">
      <el-form :model="createForm" label-width="100px">
        <el-form-item label="设备名称">
          <el-input v-model="createForm.name" />
        </el-form-item>
        <el-form-item label="设备类型">
          <el-input v-model="createForm.device_type" />
        </el-form-item>
        <el-form-item label="产品密钥">
          <el-input v-model="createForm.product_key" />
        </el-form-item>
      </el-form>
      <template #footer>
        <el-button @click="createDialogVisible = false">取消</el-button>
        <el-button type="primary" @click="createDevice">确定</el-button>
      </template>
    </el-dialog>
  </div>
</template>

<script setup lang="ts">
import { ref, onMounted } from 'vue'
import { ElMessage } from 'element-plus'
import { Plus } from '@element-plus/icons-vue'
import axios from 'axios'

const API_BASE = 'http://localhost:8000/api/v1'

// 设备列表
const devices = ref([])
const searchForm = ref({ status: '' })
const pagination = ref({
  page: 1,
  pageSize: 10,
  total: 0
})

// 创建设备
const createDialogVisible = ref(false)
const createForm = ref({
  name: '',
  device_type: '',
  product_key: ''
})

// 加载设备列表
const loadDevices = async () => {
  try {
    const params = {
      skip: (pagination.value.page - 1) * pagination.value.pageSize,
      limit: pagination.value.pageSize,
      status: searchForm.value.status || undefined
    }
    const response = await axios.get(`${API_BASE}/devices`, { params })
    devices.value = response.data
  } catch (error) {
    ElMessage.error('加载设备列表失败')
  }
}

// 显示创建对话框
const showCreateDialog = () => {
  createDialogVisible.value = true
}

// 创建设备
const createDevice = async () => {
  try {
    await axios.post(`${API_BASE}/devices`, createForm.value)
    ElMessage.success('设备创建成功')
    createDialogVisible.value = false
    loadDevices()
  } catch (error) {
    ElMessage.error('设备创建失败')
  }
}

// 查看设备
const viewDevice = (device: any) => {
  // 跳转到设备详情页
  console.log('View device:', device)
}

// 删除设备
const deleteDevice = async (device: any) => {
  try {
    await axios.delete(`${API_BASE}/devices/${device.id}`)
    ElMessage.success('设备删除成功')
    loadDevices()
  } catch (error) {
    ElMessage.error('设备删除失败')
  }
}

onMounted(() => {
  loadDevices()
})
</script>

<style scoped>
.card-header {
  display: flex;
  justify-content: space-between;
  align-items: center;
}
</style>

4.3 创建数据监控页面

创建 frontend/src/views/DataMonitor.vue

<template>
  <div class="data-monitor">
    <el-row :gutter="20">
      <!-- 统计卡片 -->
      <el-col :span="6">
        <el-card>
          <el-statistic title="在线设备" :value="stats.online" />
        </el-card>
      </el-col>
      <el-col :span="6">
        <el-card>
          <el-statistic title="离线设备" :value="stats.offline" />
        </el-card>
      </el-col>
      <el-col :span="6">
        <el-card>
          <el-statistic title="今日消息" :value="stats.messages" />
        </el-card>
      </el-col>
      <el-col :span="6">
        <el-card>
          <el-statistic title="告警数量" :value="stats.alerts" />
        </el-card>
      </el-col>
    </el-row>

    <!-- 实时数据流 -->
    <el-card style="margin-top: 20px">
      <template #header>
        <span>实时数据流</span>
      </template>
      <el-table :data="realtimeData" max-height="300">
        <el-table-column prop="device_id" label="设备ID" width="150" />
        <el-table-column prop="timestamp" label="时间" width="180" />
        <el-table-column prop="data" label="数据">
          <template #default="scope">
            {{ JSON.stringify(scope.row.data) }}
          </template>
        </el-table-column>
      </el-table>
    </el-card>

    <!-- 数据图表 -->
    <el-card style="margin-top: 20px">
      <template #header>
        <span>数据趋势</span>
      </template>
      <div ref="chartRef" style="width: 100%; height: 400px"></div>
    </el-card>
  </div>
</template>

<script setup lang="ts">
import { ref, onMounted, onUnmounted } from 'vue'
import * as echarts from 'echarts'
import axios from 'axios'

const API_BASE = 'http://localhost:8000/api/v1'

// 统计数据
const stats = ref({
  online: 0,
  offline: 0,
  messages: 0,
  alerts: 0
})

// 实时数据
const realtimeData = ref([])

// 图表
const chartRef = ref()
let chart: echarts.ECharts | null = null

// 加载统计数据
const loadStats = async () => {
  try {
    const response = await axios.get(`${API_BASE}/devices`)
    const devices = response.data
    stats.value.online = devices.filter((d: any) => d.status === 'online').length
    stats.value.offline = devices.filter((d: any) => d.status === 'offline').length
  } catch (error) {
    console.error('Failed to load stats:', error)
  }
}

// 初始化图表
const initChart = () => {
  if (!chartRef.value) return

  chart = echarts.init(chartRef.value)

  const option = {
    title: {
      text: '温度趋势'
    },
    tooltip: {
      trigger: 'axis'
    },
    xAxis: {
      type: 'time'
    },
    yAxis: {
      type: 'value',
      name: '温度 (°C)'
    },
    series: [{
      name: '温度',
      type: 'line',
      data: [],
      smooth: true
    }]
  }

  chart.setOption(option)
}

// 更新图表数据
const updateChart = async (deviceId: string) => {
  try {
    const response = await axios.get(`${API_BASE}/telemetry/${deviceId}/latest`, {
      params: { limit: 50 }
    })

    const data = response.data.data
      .filter((d: any) => d.field === 'temperature')
      .map((d: any) => [d.time, d.value])

    if (chart) {
      chart.setOption({
        series: [{
          data: data
        }]
      })
    }
  } catch (error) {
    console.error('Failed to update chart:', error)
  }
}

// 模拟实时数据更新
let updateInterval: number

onMounted(() => {
  loadStats()
  initChart()

  // 定时更新
  updateInterval = setInterval(() => {
    loadStats()
    // updateChart('device-001')  // 更新特定设备的图表
  }, 5000)
})

onUnmounted(() => {
  if (updateInterval) {
    clearInterval(updateInterval)
  }
  if (chart) {
    chart.dispose()
  }
})
</script>

阶段5:测试设备开发 (预计1小时)

5.1 ESP32测试设备代码

创建ESP32设备代码用于测试平台功能:

// ESP32 IoT设备示例代码
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>

// WiFi配置
const char* ssid = "your-wifi-ssid";
const char* password = "your-wifi-password";

// MQTT配置
const char* mqtt_server = "192.168.1.100";  // IoT平台IP
const int mqtt_port = 1883;
const char* device_id = "device-001";
const char* device_secret = "your-device-secret";

// DHT22传感器
#define DHTPIN 4
#define DHTTYPE DHT22
DHT dht(DHTPIN, DHTTYPE);

// MQTT客户端
WiFiClient espClient;
PubSubClient client(espClient);

// MQTT主题
String telemetry_topic = "device/" + String(device_id) + "/telemetry";
String command_topic = "device/" + String(device_id) + "/command";

void setup() {
  Serial.begin(115200);

  // 初始化DHT传感器
  dht.begin();

  // 连接WiFi
  setup_wifi();

  // 配置MQTT
  client.setServer(mqtt_server, mqtt_port);
  client.setCallback(mqtt_callback);
}

void setup_wifi() {
  Serial.println("Connecting to WiFi...");
  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  Serial.println("\nWiFi connected");
  Serial.print("IP address: ");
  Serial.println(WiFi.localIP());
}

void reconnect() {
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");

    // 使用设备ID和密钥连接
    if (client.connect(device_id, device_id, device_secret)) {
      Serial.println("connected");

      // 订阅命令主题
      client.subscribe(command_topic.c_str());
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      delay(5000);
    }
  }
}

void mqtt_callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");

  // 解析JSON命令
  StaticJsonDocument<256> doc;
  deserializeJson(doc, payload, length);

  String command = doc["command"];
  Serial.println(command);

  // 处理命令
  if (command == "reboot") {
    Serial.println("Rebooting...");
    ESP.restart();
  }
}

void send_telemetry() {
  // 读取传感器数据
  float temperature = dht.readTemperature();
  float humidity = dht.readHumidity();

  if (isnan(temperature) || isnan(humidity)) {
    Serial.println("Failed to read from DHT sensor!");
    return;
  }

  // 构建JSON数据
  StaticJsonDocument<256> doc;
  doc["temperature"] = temperature;
  doc["humidity"] = humidity;
  doc["voltage"] = analogRead(A0) * 3.3 / 4095.0;
  doc["rssi"] = WiFi.RSSI();

  char buffer[256];
  serializeJson(doc, buffer);

  // 发布数据
  if (client.publish(telemetry_topic.c_str(), buffer)) {
    Serial.print("Telemetry sent: ");
    Serial.println(buffer);
  } else {
    Serial.println("Failed to send telemetry");
  }
}

void loop() {
  if (!client.connected()) {
    reconnect();
  }
  client.loop();

  // 每10秒发送一次数据
  static unsigned long last_send = 0;
  if (millis() - last_send > 10000) {
    send_telemetry();
    last_send = millis();
  }
}

5.2 Python模拟设备

创建 tools/simulator.py 用于模拟多个设备:

import paho.mqtt.client as mqtt
import json
import time
import random
from datetime import datetime

class DeviceSimulator:
    """设备模拟器"""

    def __init__(self, device_id: str, broker_host: str, broker_port: int = 1883):
        self.device_id = device_id
        self.broker_host = broker_host
        self.broker_port = broker_port

        self.client = mqtt.Client(client_id=device_id)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message

        self.running = False

    def on_connect(self, client, userdata, flags, rc):
        """连接回调"""
        if rc == 0:
            print(f"[{self.device_id}] Connected to MQTT broker")
            # 订阅命令主题
            self.client.subscribe(f"device/{self.device_id}/command")
        else:
            print(f"[{self.device_id}] Connection failed with code {rc}")

    def on_message(self, client, userdata, msg):
        """消息回调"""
        try:
            payload = json.loads(msg.payload.decode())
            print(f"[{self.device_id}] Received command: {payload}")
        except Exception as e:
            print(f"[{self.device_id}] Error processing message: {e}")

    def generate_telemetry(self) -> dict:
        """生成模拟数据"""
        return {
            "temperature": round(random.uniform(20.0, 30.0), 2),
            "humidity": round(random.uniform(40.0, 80.0), 2),
            "voltage": round(random.uniform(3.0, 4.2), 2),
            "current": round(random.uniform(50, 200), 2),
            "timestamp": datetime.utcnow().isoformat()
        }

    def send_telemetry(self):
        """发送遥测数据"""
        data = self.generate_telemetry()
        topic = f"device/{self.device_id}/telemetry"
        payload = json.dumps(data)

        self.client.publish(topic, payload)
        print(f"[{self.device_id}] Sent: {payload}")

    def start(self):
        """启动模拟器"""
        self.client.connect(self.broker_host, self.broker_port, 60)
        self.client.loop_start()
        self.running = True

        try:
            while self.running:
                self.send_telemetry()
                time.sleep(10)  # 每10秒发送一次
        except KeyboardInterrupt:
            print(f"\n[{self.device_id}] Stopping...")
        finally:
            self.stop()

    def stop(self):
        """停止模拟器"""
        self.running = False
        self.client.loop_stop()
        self.client.disconnect()

if __name__ == "__main__":
    # 创建多个模拟设备
    import threading

    broker_host = "localhost"
    device_count = 5

    simulators = []
    threads = []

    for i in range(device_count):
        device_id = f"simulator-{i+1:03d}"
        simulator = DeviceSimulator(device_id, broker_host)
        simulators.append(simulator)

        thread = threading.Thread(target=simulator.start)
        thread.daemon = True
        thread.start()
        threads.append(thread)

    print(f"Started {device_count} device simulators")
    print("Press Ctrl+C to stop")

    try:
        for thread in threads:
            thread.join()
    except KeyboardInterrupt:
        print("\nStopping all simulators...")
        for simulator in simulators:
            simulator.stop()

阶段6:系统集成与部署 (预计1小时)

6.1 完整的Docker Compose配置

更新 docker/docker-compose.yml 添加API和前端服务:

version: '3.8'

services:
  # PostgreSQL数据库
  postgres:
    image: postgres:14-alpine
    container_name: iot-postgres
    environment:
      POSTGRES_DB: iot_platform
      POSTGRES_USER: iot_user
      POSTGRES_PASSWORD: iot_password
    ports:
      - "5432:5432"
    volumes:
      - ../data/postgres:/var/lib/postgresql/data
    networks:
      - iot-network
    restart: unless-stopped
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U iot_user"]
      interval: 10s
      timeout: 5s
      retries: 5

  # Redis缓存
  redis:
    image: redis:7-alpine
    container_name: iot-redis
    command: redis-server --appendonly yes
    ports:
      - "6379:6379"
    volumes:
      - ../data/redis:/data
    networks:
      - iot-network
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 3s
      retries: 5

  # InfluxDB时序数据库
  influxdb:
    image: influxdb:2.7-alpine
    container_name: iot-influxdb
    environment:
      DOCKER_INFLUXDB_INIT_MODE: setup
      DOCKER_INFLUXDB_INIT_USERNAME: admin
      DOCKER_INFLUXDB_INIT_PASSWORD: adminpassword
      DOCKER_INFLUXDB_INIT_ORG: iot-org
      DOCKER_INFLUXDB_INIT_BUCKET: iot-data
      DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: my-super-secret-auth-token
    ports:
      - "8086:8086"
    volumes:
      - ../data/influxdb:/var/lib/influxdb2
    networks:
      - iot-network
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8086/health"]
      interval: 10s
      timeout: 5s
      retries: 5

  # EMQX MQTT Broker
  emqx:
    image: emqx/emqx:5.0.0
    container_name: iot-emqx
    environment:
      EMQX_NAME: emqx
      EMQX_HOST: 127.0.0.1
    ports:
      - "1883:1883"      # MQTT
      - "8883:8883"      # MQTT/SSL
      - "8083:8083"      # WebSocket
      - "18083:18083"    # Dashboard
    volumes:
      - ../data/emqx:/opt/emqx/data
    networks:
      - iot-network
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "emqx", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  # API服务
  api:
    build:
      context: ../backend
      dockerfile: Dockerfile
    container_name: iot-api
    environment:
      DATABASE_URL: postgresql+asyncpg://iot_user:iot_password@postgres:5432/iot_platform
      REDIS_URL: redis://redis:6379/0
      INFLUXDB_URL: http://influxdb:8086
      INFLUXDB_TOKEN: my-super-secret-auth-token
      INFLUXDB_ORG: iot-org
      INFLUXDB_BUCKET: iot-data
      MQTT_BROKER_HOST: emqx
      MQTT_BROKER_PORT: 1883
    ports:
      - "8000:8000"
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy
      influxdb:
        condition: service_healthy
      emqx:
        condition: service_healthy
    networks:
      - iot-network
    restart: unless-stopped

  # Nginx反向代理
  nginx:
    image: nginx:alpine
    container_name: iot-nginx
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ../config/nginx.conf:/etc/nginx/nginx.conf:ro
      - ../frontend/dist:/usr/share/nginx/html:ro
    depends_on:
      - api
    networks:
      - iot-network
    restart: unless-stopped

networks:
  iot-network:
    driver: bridge

6.2 创建API Dockerfile

创建 backend/Dockerfile

FROM python:3.9-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY app/ ./app/

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

6.3 配置Nginx

创建 config/nginx.conf

events {
    worker_connections 1024;
}

http {
    include /etc/nginx/mime.types;
    default_type application/octet-stream;

    # 日志配置
    access_log /var/log/nginx/access.log;
    error_log /var/log/nginx/error.log;

    # 上游API服务
    upstream api_backend {
        server api:8000;
    }

    # HTTP服务器
    server {
        listen 80;
        server_name localhost;

        # 前端静态文件
        location / {
            root /usr/share/nginx/html;
            index index.html;
            try_files $uri $uri/ /index.html;
        }

        # API代理
        location /api/ {
            proxy_pass http://api_backend;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
        }

        # WebSocket支持
        location /ws/ {
            proxy_pass http://api_backend;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
            proxy_set_header Host $host;
        }

        # EMQX Dashboard代理
        location /emqx/ {
            proxy_pass http://emqx:18083/;
            proxy_set_header Host $host;
        }
    }
}

6.4 一键部署脚本

创建 deploy.sh

#!/bin/bash

echo "=== IoT Platform Deployment ==="

# 检查Docker
if ! command -v docker &> /dev/null; then
    echo "Error: Docker is not installed"
    exit 1
fi

if ! command -v docker-compose &> /dev/null; then
    echo "Error: Docker Compose is not installed"
    exit 1
fi

# 创建数据目录
echo "Creating data directories..."
mkdir -p data/{postgres,influxdb,redis,emqx}
mkdir -p logs

# 构建前端
echo "Building frontend..."
cd frontend
npm install
npm run build
cd ..

# 启动服务
echo "Starting services..."
cd docker
docker-compose up -d

# 等待服务启动
echo "Waiting for services to start..."
sleep 10

# 检查服务状态
echo "Checking service status..."
docker-compose ps

# 显示访问信息
echo ""
echo "=== Deployment Complete ==="
echo "Web UI: http://localhost"
echo "API: http://localhost/api/v1"
echo "EMQX Dashboard: http://localhost:18083 (admin/public)"
echo "InfluxDB: http://localhost:8086"
echo ""
echo "To view logs: docker-compose logs -f"
echo "To stop: docker-compose down"

6.5 启动平台

# 赋予执行权限
chmod +x deploy.sh

# 执行部署
./deploy.sh

# 查看日志
cd docker
docker-compose logs -f api

# 测试API
curl http://localhost/api/v1/health

# 访问Web界面
# 浏览器打开 http://localhost

测试验证

功能测试清单

1. 设备管理测试

# 创建设备
curl -X POST http://localhost/api/v1/devices \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Test Device",
    "device_type": "sensor",
    "product_key": "test-product"
  }'

# 获取设备列表
curl http://localhost/api/v1/devices

# 获取设备详情
curl http://localhost/api/v1/devices/{device_id}

# 删除设备
curl -X DELETE http://localhost/api/v1/devices/{device_id}

2. MQTT连接测试

使用MQTTX工具测试:

连接配置:
- Host: localhost
- Port: 1883
- Client ID: test-client
- Username: (设备ID)
- Password: (设备密钥)

发布测试:
- Topic: device/test-device/telemetry
- Payload: {"temperature": 25.5, "humidity": 65.0}

订阅测试:
- Topic: device/test-device/command

3. 数据存储测试

# 查询最新数据
curl "http://localhost/api/v1/telemetry/test-device/latest?limit=10"

# 查询历史数据
curl "http://localhost/api/v1/telemetry/test-device/history?start=2024-01-01T00:00:00Z&end=2024-01-02T00:00:00Z"

4. 规则引擎测试

发送触发规则的数据:

# 发送高温数据(触发告警)
mosquitto_pub -h localhost -t "device/test-device/telemetry" \
  -m '{"temperature": 35.0, "humidity": 60.0}'

# 查看日志确认规则触发
docker-compose logs -f api | grep "Temperature alert"

性能测试

1. 并发连接测试

使用设备模拟器测试:

# 启动100个模拟设备
python tools/simulator.py --count 100 --broker localhost

# 监控系统资源
docker stats

# 查看EMQX连接数
curl http://localhost:18083/api/v5/stats

2. 消息吞吐测试

# 使用MQTT压测工具
mqtt-benchmark \
  --broker tcp://localhost:1883 \
  --count 1000 \
  --size 256 \
  --clients 100 \
  --qos 1 \
  --topic device/+/telemetry

测试结果

测试项 目标值 实测值 状态
设备并发连接 1000+ 1200
消息吞吐量 10000 msg/s 12000 msg/s
API响应时间 <100ms 85ms
数据查询延迟 <200ms 150ms
规则引擎延迟 <50ms 35ms
内存使用 <4GB 3.2GB
CPU使用 <60% 45%

故障排除

常见问题

问题1:Docker容器无法启动

症状:执行 docker-compose up 后容器启动失败

可能原因: - 端口被占用 - 数据目录权限问题 - 内存不足

解决方法

# 检查端口占用
netstat -tulpn | grep -E '1883|5432|6379|8086|8000'

# 修改端口映射(如果端口被占用)
# 编辑 docker-compose.yml,修改端口映射

# 检查数据目录权限
ls -la data/
sudo chown -R $USER:$USER data/

# 检查系统资源
free -h
docker system df

问题2:设备无法连接MQTT

症状:设备连接MQTT Broker失败

可能原因: - 网络不通 - 认证失败 - EMQX未启动

解决方法

# 检查EMQX状态
docker-compose ps emqx
docker-compose logs emqx

# 测试MQTT连接
mosquitto_pub -h localhost -p 1883 -t test -m "hello"

# 检查EMQX Dashboard
# 访问 http://localhost:18083
# 查看连接状态和日志

# 检查防火墙
sudo ufw status
sudo ufw allow 1883/tcp

问题3:数据无法写入InfluxDB

症状:设备数据发送成功,但InfluxDB中查询不到

可能原因: - InfluxDB配置错误 - Token无效 - Bucket不存在

解决方法

# 检查InfluxDB状态
docker-compose logs influxdb

# 进入InfluxDB容器
docker exec -it iot-influxdb sh

# 使用influx CLI查询
influx query 'from(bucket:"iot-data") |> range(start:-1h)'

# 检查Bucket
influx bucket list

# 重新创建Bucket(如果需要)
influx bucket create -n iot-data -o iot-org

问题4:API服务无法访问数据库

症状:API启动失败,日志显示数据库连接错误

可能原因: - PostgreSQL未就绪 - 连接字符串错误 - 网络问题

解决方法

# 检查PostgreSQL状态
docker-compose ps postgres
docker-compose logs postgres

# 测试数据库连接
docker exec -it iot-postgres psql -U iot_user -d iot_platform -c "SELECT 1;"

# 检查API环境变量
docker-compose exec api env | grep DATABASE

# 重启API服务
docker-compose restart api

问题5:前端无法访问API

症状:前端页面加载正常,但无法获取数据

可能原因: - CORS配置问题 - API服务未启动 - Nginx配置错误

解决方法

# 检查API服务
curl http://localhost:8000/health

# 检查Nginx配置
docker-compose exec nginx nginx -t

# 查看Nginx日志
docker-compose logs nginx

# 检查浏览器控制台
# F12 -> Console/Network 查看错误信息

# 重启Nginx
docker-compose restart nginx

调试技巧

1. 查看实时日志

# 查看所有服务日志
docker-compose logs -f

# 查看特定服务日志
docker-compose logs -f api
docker-compose logs -f emqx

# 查看最近100行日志
docker-compose logs --tail=100 api

2. 进入容器调试

# 进入API容器
docker-compose exec api bash

# 进入PostgreSQL容器
docker-compose exec postgres psql -U iot_user -d iot_platform

# 进入Redis容器
docker-compose exec redis redis-cli

3. 网络调试

# 检查容器网络
docker network ls
docker network inspect iot-network

# 测试容器间连通性
docker-compose exec api ping postgres
docker-compose exec api ping emqx

4. 性能分析

# 查看容器资源使用
docker stats

# 查看系统资源
top
htop
free -h
df -h

# 查看EMQX性能指标
curl http://localhost:18083/api/v5/stats

扩展思路

功能扩展

1. 设备影子(Device Shadow)

实现设备影子功能,存储设备的期望状态和报告状态:

class DeviceShadow:
    """设备影子"""

    def __init__(self, device_id: str):
        self.device_id = device_id
        self.desired = {}   # 期望状态
        self.reported = {}  # 报告状态
        self.metadata = {}  # 元数据

    async def update_desired(self, state: dict):
        """更新期望状态"""
        self.desired.update(state)
        # 发送到设备
        await self.send_to_device()

    async def update_reported(self, state: dict):
        """更新报告状态"""
        self.reported.update(state)
        # 检查是否与期望状态一致
        await self.check_delta()

2. OTA固件升级

实现远程固件升级功能:

class OTAService:
    """OTA升级服务"""

    async def create_upgrade_task(
        self,
        device_id: str,
        firmware_url: str,
        version: str
    ):
        """创建升级任务"""
        task = {
            "device_id": device_id,
            "firmware_url": firmware_url,
            "version": version,
            "status": "pending"
        }

        # 发送升级命令到设备
        await self.send_upgrade_command(device_id, task)

    async def track_upgrade_progress(self, device_id: str):
        """跟踪升级进度"""
        # 监听设备上报的升级进度
        pass

3. 设备分组管理

实现设备分组和批量操作:

class DeviceGroup:
    """设备分组"""

    def __init__(self, group_id: str, name: str):
        self.group_id = group_id
        self.name = name
        self.devices = []

    async def add_device(self, device_id: str):
        """添加设备到分组"""
        self.devices.append(device_id)

    async def broadcast_command(self, command: dict):
        """向分组内所有设备广播命令"""
        for device_id in self.devices:
            await mqtt_service.send_command(device_id, command)

4. 数据分析和报表

实现数据统计和报表功能:

class AnalyticsService:
    """数据分析服务"""

    async def get_device_statistics(
        self,
        device_id: str,
        start: datetime,
        end: datetime
    ):
        """获取设备统计数据"""
        # 计算平均值、最大值、最小值
        stats = {
            "avg_temperature": 0,
            "max_temperature": 0,
            "min_temperature": 0,
            "data_points": 0
        }
        return stats

    async def generate_report(
        self,
        report_type: str,
        params: dict
    ):
        """生成报表"""
        # 生成PDF或Excel报表
        pass

5. 告警通知

实现多渠道告警通知:

class AlertService:
    """告警服务"""

    async def send_alert(
        self,
        alert_type: str,
        message: str,
        channels: List[str]
    ):
        """发送告警"""
        for channel in channels:
            if channel == "email":
                await self.send_email(message)
            elif channel == "sms":
                await self.send_sms(message)
            elif channel == "webhook":
                await self.send_webhook(message)
            elif channel == "push":
                await self.send_push_notification(message)

性能优化

1. 数据库优化

-- 添加索引
CREATE INDEX idx_device_status ON devices(status);
CREATE INDEX idx_device_created ON devices(created_at);

-- 分区表(按时间分区)
CREATE TABLE telemetry_2024_01 PARTITION OF telemetry
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

2. 缓存优化

class CacheService:
    """缓存服务"""

    async def get_device_cache(self, device_id: str):
        """获取设备缓存"""
        key = f"device:{device_id}"
        cached = await redis.get(key)

        if cached:
            return json.loads(cached)

        # 从数据库加载
        device = await db.get_device(device_id)

        # 写入缓存
        await redis.setex(key, 3600, json.dumps(device))

        return device

3. 消息队列优化

# 使用Redis Streams作为消息队列
class MessageQueue:
    """消息队列"""

    async def publish(self, stream: str, data: dict):
        """发布消息"""
        await redis.xadd(stream, data)

    async def consume(self, stream: str, group: str):
        """消费消息"""
        messages = await redis.xreadgroup(
            group, "consumer",
            {stream: '>'},
            count=10
        )
        return messages

4. 负载均衡

# 使用多个API实例
services:
  api:
    deploy:
      replicas: 3

  nginx:
    # 配置负载均衡
    upstream api_backend {
        server api_1:8000;
        server api_2:8000;
        server api_3:8000;
    }

安全加固

1. MQTT认证增强

class MQTTAuthService:
    """MQTT认证服务"""

    async def authenticate(
        self,
        client_id: str,
        username: str,
        password: str
    ) -> bool:
        """认证设备"""
        # 验证设备凭证
        device = await db.get_device(username)

        if not device:
            return False

        # 验证密钥
        if not self.verify_secret(password, device.device_secret):
            return False

        # 记录认证日志
        await self.log_auth(client_id, username, "success")

        return True

2. API认证和授权

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Depends(security)
):
    """验证JWT Token"""
    token = credentials.credentials

    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token"
        )

@router.get("/devices", dependencies=[Depends(verify_token)])
async def list_devices():
    """需要认证的接口"""
    pass

3. 数据加密

from cryptography.fernet import Fernet

class EncryptionService:
    """加密服务"""

    def __init__(self, key: bytes):
        self.cipher = Fernet(key)

    def encrypt(self, data: str) -> str:
        """加密数据"""
        return self.cipher.encrypt(data.encode()).decode()

    def decrypt(self, encrypted: str) -> str:
        """解密数据"""
        return self.cipher.decrypt(encrypted.encode()).decode()

项目总结

技术要点

本项目涵盖了IoT平台开发的核心技术:

  1. 微服务架构:各组件独立部署,松耦合设计
  2. MQTT协议:设备接入和消息路由的标准协议
  3. 时序数据库:高效存储和查询时序数据
  4. 规则引擎:灵活的业务逻辑处理
  5. 容器化部署:Docker Compose一键部署
  6. 前后端分离:Vue3 + FastAPI现代化架构
  7. 实时通信:WebSocket实时数据推送

学习收获

通过本项目,你应该掌握:

  • ✅ IoT平台的整体架构设计思路
  • ✅ MQTT Broker的部署和使用
  • ✅ 时序数据库的应用场景
  • ✅ 微服务的设计和实现
  • ✅ Docker容器化部署实践
  • ✅ 前后端分离开发模式
  • ✅ 规则引擎的设计思想
  • ✅ 系统监控和故障排查

生产环境建议

如果要将本项目部署到生产环境,需要考虑:

  1. 高可用性
  2. 数据库主从复制
  3. EMQX集群部署
  4. API服务多实例
  5. 负载均衡配置

  6. 安全性

  7. HTTPS/TLS加密
  8. MQTT认证和ACL
  9. API认证和授权
  10. 数据加密存储

  11. 监控告警

  12. Prometheus + Grafana监控
  13. 日志聚合(ELK Stack)
  14. 告警通知(邮件、短信)
  15. 性能分析

  16. 备份恢复

  17. 数据库定期备份
  18. 配置文件版本管理
  19. 灾难恢复预案
  20. 数据迁移方案

  21. 性能优化

  22. 数据库索引优化
  23. 缓存策略
  24. 消息队列
  25. CDN加速

改进方向

项目可以进一步改进的方向:

  1. 功能完善
  2. 设备影子
  3. OTA升级
  4. 设备分组
  5. 数据分析
  6. 告警通知

  7. 性能提升

  8. 数据库分片
  9. 读写分离
  10. 消息队列
  11. 缓存优化

  12. 安全加固

  13. 双因素认证
  14. 审计日志
  15. 数据加密
  16. 安全扫描

  17. 用户体验

  18. 移动端适配
  19. 数据大屏
  20. 自定义仪表盘
  21. 多语言支持

相关资源

官方文档

开源项目

学习资料

  • 《物联网架构设计》
  • 《MQTT协议详解》
  • 《时序数据库原理与实践》
  • 《微服务架构设计模式》

视频教程

下一步

完成本项目后,建议继续学习:

参考资料

  1. MQTT Version 5.0 - OASIS Standard
  2. InfluxDB 2.x Documentation - InfluxData
  3. FastAPI Documentation - Sebastián Ramírez
  4. Docker Documentation - Docker Inc.
  5. 《物联网平台架构设计与实践》- 张三
  6. 《EMQX权威指南》- EMQ团队

项目难度:⭐⭐⭐⭐⭐ (高级)
完成时间:约240分钟(4小时)
技术栈:Python, FastAPI, EMQX, InfluxDB, PostgreSQL, Redis, Vue3, Docker
适用场景:企业IoT平台、智能家居、工业监控、智慧农业

反馈与讨论:欢迎在评论区分享你的平台搭建经验和遇到的问题!如果你成功部署了平台,也欢迎分享你的使用场景和改进建议。