跳转至

Python在嵌入式开发中的应用

学习目标

完成本教程后,你将能够:

  • 理解Python在嵌入式开发中的应用场景和优势
  • 掌握MicroPython的基本使用方法
  • 学会使用CircuitPython进行快速原型开发
  • 使用Python编写测试脚本和自动化工具
  • 开发基于Python的上位机应用程序

前置要求

在开始本教程之前,你需要:

知识要求: - 掌握Python基础语法(变量、函数、类、模块) - 了解基本的嵌入式系统概念 - 熟悉GPIO、I2C、SPI等基本外设接口

技能要求: - 能够使用Python编写简单程序 - 会使用命令行工具 - 了解基本的电路连接

准备工作

硬件准备

名称 数量 说明 参考链接
ESP32开发板 1 支持MicroPython 购买链接
Adafruit Circuit Playground Express 1 用于CircuitPython示例 购买链接
LED灯 3 红、绿、蓝各一个 -
电阻 3 220Ω -
面包板 1 - -
USB数据线 2 Type-C和Micro-USB -

软件准备

  • Python环境:Python 3.8+ (用于上位机开发)
  • MicroPython固件:ESP32 MicroPython固件
  • CircuitPython固件:Circuit Playground Express固件
  • 开发工具
  • Thonny IDE (推荐用于MicroPython)
  • Mu Editor (推荐用于CircuitPython)
  • VS Code + Python扩展
  • 烧录工具:esptool.py (用于ESP32)
  • 串口工具:PuTTY或screen

环境配置

1. 安装Python和必要的库

# 安装Python (如果尚未安装)
# Windows: 从python.org下载安装包
# macOS: brew install python3
# Linux: sudo apt-get install python3 python3-pip

# 安装必要的Python库
pip install esptool adafruit-ampy pyserial

2. 安装Thonny IDE

# Windows/macOS: 从thonny.org下载安装包
# Linux:
sudo apt-get install thonny

3. 烧录MicroPython固件到ESP32

# 下载ESP32 MicroPython固件
# 从 micropython.org/download/esp32/ 下载最新固件

# 擦除Flash
esptool.py --chip esp32 --port /dev/ttyUSB0 erase_flash

# 烧录固件
esptool.py --chip esp32 --port /dev/ttyUSB0 write_flash -z 0x1000 esp32-*.bin

步骤1:MicroPython基础入门

1.1 连接到MicroPython REPL

MicroPython提供了一个交互式的REPL (Read-Eval-Print Loop)环境,类似于标准Python的交互式解释器。

使用串口工具连接

# Linux/macOS
screen /dev/ttyUSB0 115200

# 或使用PuTTY (Windows)
# 配置: 串口 COM3, 波特率 115200

测试连接

>>> print("Hello, MicroPython!")
Hello, MicroPython!

>>> import sys
>>> sys.implementation
(name='micropython', version=(1, 20, 0))

1.2 GPIO控制 - LED闪烁

让我们从最基本的LED控制开始:

from machine import Pin
import time

# 配置GPIO2为输出模式(ESP32板载LED)
led = Pin(2, Pin.OUT)

# LED闪烁循环
while True:
    led.on()      # 点亮LED
    time.sleep(0.5)  # 延时500ms
    led.off()     # 熄灭LED
    time.sleep(0.5)

代码说明: - machine.Pin: MicroPython的GPIO控制类 - Pin.OUT: 配置引脚为输出模式 - led.on() / led.off(): 控制LED状态 - time.sleep(): 延时函数,单位为秒

1.3 读取按键输入

from machine import Pin
import time

# 配置GPIO0为输入模式,启用上拉电阻
button = Pin(0, Pin.IN, Pin.PULL_UP)
led = Pin(2, Pin.OUT)

while True:
    if button.value() == 0:  # 按键按下(低电平)
        led.on()
        print("Button pressed!")
    else:
        led.off()
    time.sleep(0.1)  # 防抖延时

预期结果: - 按下按键时LED点亮 - 释放按键时LED熄灭 - 串口输出"Button pressed!"

步骤2:MicroPython高级功能

2.1 PWM控制 - LED呼吸灯

使用PWM (脉宽调制) 实现LED亮度渐变效果:

from machine import Pin, PWM
import time

# 创建PWM对象,频率1000Hz
led_pwm = PWM(Pin(2), freq=1000)

def breathing_led():
    """呼吸灯效果"""
    while True:
        # 逐渐变亮
        for duty in range(0, 1024, 10):
            led_pwm.duty(duty)
            time.sleep(0.01)

        # 逐渐变暗
        for duty in range(1023, -1, -10):
            led_pwm.duty(duty)
            time.sleep(0.01)

breathing_led()

代码说明: - PWM(): 创建PWM对象 - freq: 设置PWM频率 - duty(): 设置占空比 (0-1023) - 占空比越大,LED越亮

2.2 I2C通信 - 读取传感器

使用I2C接口读取温湿度传感器(以SHT31为例):

from machine import Pin, I2C
import time

# 初始化I2C
i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=400000)

# 扫描I2C设备
devices = i2c.scan()
print("I2C devices found:", [hex(device) for device in devices])

# SHT31传感器地址
SHT31_ADDR = 0x44

def read_sht31():
    """读取SHT31温湿度数据"""
    # 发送测量命令
    i2c.writeto(SHT31_ADDR, b'\x2C\x06')
    time.sleep(0.5)  # 等待测量完成

    # 读取6字节数据
    data = i2c.readfrom(SHT31_ADDR, 6)

    # 解析温度数据
    temp_raw = (data[0] << 8) | data[1]
    temp = -45 + (175 * temp_raw / 65535.0)

    # 解析湿度数据
    hum_raw = (data[3] << 8) | data[4]
    humidity = 100 * hum_raw / 65535.0

    return temp, humidity

# 持续读取并显示
while True:
    temp, hum = read_sht31()
    print(f"Temperature: {temp:.2f}°C, Humidity: {hum:.2f}%")
    time.sleep(2)

2.3 WiFi连接和HTTP请求

MicroPython支持WiFi连接,可以实现物联网应用:

import network
import urequests
import time

# 连接WiFi
def connect_wifi(ssid, password):
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)

    if not wlan.isconnected():
        print('Connecting to WiFi...')
        wlan.connect(ssid, password)

        # 等待连接
        while not wlan.isconnected():
            time.sleep(1)
            print('.', end='')

    print('\nWiFi connected!')
    print('IP address:', wlan.ifconfig()[0])

# 发送HTTP GET请求
def get_weather(city):
    url = f"http://api.example.com/weather?city={city}"
    response = urequests.get(url)
    data = response.json()
    response.close()
    return data

# 使用示例
connect_wifi('your_ssid', 'your_password')
weather = get_weather('Beijing')
print(f"Weather: {weather['temp']}°C, {weather['description']}")

步骤3:CircuitPython快速开发

3.1 CircuitPython简介

CircuitPython是Adafruit基于MicroPython开发的Python变体,专注于教育和快速原型开发。

CircuitPython的优势: - 即插即用,无需编译器 - 丰富的库支持 - 友好的错误提示 - 自动重载代码

3.2 第一个CircuitPython程序

将Circuit Playground Express连接到电脑,它会显示为一个USB驱动器。

创建 code.py 文件:

import board
import digitalio
import time

# 配置板载LED
led = digitalio.DigitalInOut(board.D13)
led.direction = digitalio.Direction.OUTPUT

# LED闪烁
while True:
    led.value = True
    time.sleep(0.5)
    led.value = False
    time.sleep(0.5)

保存文件后,程序会自动运行!

3.3 使用NeoPixel彩色LED

Circuit Playground Express板载10个NeoPixel RGB LED:

import board
import neopixel
import time

# 初始化NeoPixel (10个LED)
pixels = neopixel.NeoPixel(board.NEOPIXEL, 10, brightness=0.3)

# 定义颜色 (R, G, B)
RED = (255, 0, 0)
GREEN = (0, 255, 0)
BLUE = (0, 0, 255)
YELLOW = (255, 255, 0)
PURPLE = (255, 0, 255)
CYAN = (0, 255, 255)
WHITE = (255, 255, 255)
OFF = (0, 0, 0)

# 彩虹效果
def rainbow_cycle(wait):
    for j in range(255):
        for i in range(10):
            pixel_index = (i * 256 // 10) + j
            pixels[i] = wheel(pixel_index & 255)
        pixels.show()
        time.sleep(wait)

def wheel(pos):
    """生成彩虹颜色"""
    if pos < 85:
        return (pos * 3, 255 - pos * 3, 0)
    elif pos < 170:
        pos -= 85
        return (255 - pos * 3, 0, pos * 3)
    else:
        pos -= 170
        return (0, pos * 3, 255 - pos * 3)

# 运行彩虹效果
while True:
    rainbow_cycle(0.01)

3.4 读取加速度计和按键

import board
import digitalio
import adafruit_lis3dh
import time

# 初始化加速度计
i2c = board.I2C()
accelerometer = adafruit_lis3dh.LIS3DH_I2C(i2c, address=0x19)

# 初始化按键
button_a = digitalio.DigitalInOut(board.BUTTON_A)
button_a.direction = digitalio.Direction.INPUT
button_a.pull = digitalio.Pull.DOWN

button_b = digitalio.DigitalInOut(board.BUTTON_B)
button_b.direction = digitalio.Direction.INPUT
button_b.pull = digitalio.Pull.DOWN

while True:
    # 读取加速度
    x, y, z = accelerometer.acceleration
    print(f"X: {x:.2f} Y: {y:.2f} Z: {z:.2f} m/s^2")

    # 检测按键
    if button_a.value:
        print("Button A pressed!")
    if button_b.value:
        print("Button B pressed!")

    time.sleep(0.1)

步骤4:Python测试自动化

4.1 串口通信测试脚本

使用Python编写自动化测试脚本,通过串口与嵌入式设备通信:

import serial
import time

class EmbeddedTester:
    """嵌入式设备测试类"""

    def __init__(self, port, baudrate=115200):
        self.ser = serial.Serial(port, baudrate, timeout=1)
        time.sleep(2)  # 等待连接稳定

    def send_command(self, cmd):
        """发送命令"""
        self.ser.write(f"{cmd}\r\n".encode())
        time.sleep(0.1)

    def read_response(self, timeout=1):
        """读取响应"""
        start_time = time.time()
        response = ""

        while time.time() - start_time < timeout:
            if self.ser.in_waiting:
                response += self.ser.read(self.ser.in_waiting).decode()
            time.sleep(0.01)

        return response

    def test_led_control(self):
        """测试LED控制"""
        print("Testing LED control...")

        # 测试LED开
        self.send_command("LED ON")
        response = self.read_response()
        assert "LED: ON" in response, "LED ON command failed"
        print("✓ LED ON test passed")

        # 测试LED关
        self.send_command("LED OFF")
        response = self.read_response()
        assert "LED: OFF" in response, "LED OFF command failed"
        print("✓ LED OFF test passed")

    def test_sensor_reading(self):
        """测试传感器读取"""
        print("Testing sensor reading...")

        self.send_command("READ TEMP")
        response = self.read_response()

        # 解析温度值
        if "TEMP:" in response:
            temp_str = response.split("TEMP:")[1].strip()
            temp = float(temp_str)
            assert -40 < temp < 85, f"Temperature out of range: {temp}"
            print(f"✓ Temperature reading: {temp}°C")
        else:
            raise AssertionError("Failed to read temperature")

    def close(self):
        """关闭串口"""
        self.ser.close()

# 使用示例
if __name__ == "__main__":
    tester = EmbeddedTester('/dev/ttyUSB0')

    try:
        tester.test_led_control()
        tester.test_sensor_reading()
        print("\n✓ All tests passed!")
    except AssertionError as e:
        print(f"\n✗ Test failed: {e}")
    finally:
        tester.close()

4.2 自动化固件烧录脚本

import subprocess
import os
import sys

class FirmwareFlasher:
    """固件烧录工具"""

    def __init__(self, port, chip='esp32'):
        self.port = port
        self.chip = chip

    def erase_flash(self):
        """擦除Flash"""
        print("Erasing flash...")
        cmd = [
            'esptool.py',
            '--chip', self.chip,
            '--port', self.port,
            'erase_flash'
        ]
        result = subprocess.run(cmd, capture_output=True, text=True)

        if result.returncode == 0:
            print("✓ Flash erased successfully")
            return True
        else:
            print(f"✗ Flash erase failed: {result.stderr}")
            return False

    def flash_firmware(self, firmware_path):
        """烧录固件"""
        if not os.path.exists(firmware_path):
            print(f"✗ Firmware file not found: {firmware_path}")
            return False

        print(f"Flashing firmware: {firmware_path}")
        cmd = [
            'esptool.py',
            '--chip', self.chip,
            '--port', self.port,
            'write_flash',
            '-z', '0x1000',
            firmware_path
        ]
        result = subprocess.run(cmd, capture_output=True, text=True)

        if result.returncode == 0:
            print("✓ Firmware flashed successfully")
            return True
        else:
            print(f"✗ Firmware flash failed: {result.stderr}")
            return False

    def verify_firmware(self):
        """验证固件"""
        print("Verifying firmware...")
        # 这里可以添加验证逻辑
        return True

# 使用示例
if __name__ == "__main__":
    flasher = FirmwareFlasher('/dev/ttyUSB0')

    if flasher.erase_flash():
        if flasher.flash_firmware('firmware.bin'):
            flasher.verify_firmware()
            print("\n✓ Firmware update completed!")

步骤5:Python上位机开发

5.1 使用Tkinter创建GUI监控工具

创建一个简单的串口监控工具:

import tkinter as tk
from tkinter import ttk, scrolledtext
import serial
import threading
import time

class SerialMonitor:
    """串口监控GUI应用"""

    def __init__(self, root):
        self.root = root
        self.root.title("嵌入式设备监控工具")
        self.root.geometry("800x600")

        self.serial_port = None
        self.is_running = False

        self.create_widgets()

    def create_widgets(self):
        """创建界面组件"""
        # 连接设置框架
        conn_frame = ttk.LabelFrame(self.root, text="连接设置", padding=10)
        conn_frame.pack(fill=tk.X, padx=10, pady=5)

        ttk.Label(conn_frame, text="串口:").grid(row=0, column=0, padx=5)
        self.port_entry = ttk.Entry(conn_frame, width=20)
        self.port_entry.insert(0, "/dev/ttyUSB0")
        self.port_entry.grid(row=0, column=1, padx=5)

        ttk.Label(conn_frame, text="波特率:").grid(row=0, column=2, padx=5)
        self.baudrate_combo = ttk.Combobox(conn_frame, width=15,
                                           values=[9600, 115200, 230400])
        self.baudrate_combo.set(115200)
        self.baudrate_combo.grid(row=0, column=3, padx=5)

        self.connect_btn = ttk.Button(conn_frame, text="连接",
                                      command=self.toggle_connection)
        self.connect_btn.grid(row=0, column=4, padx=5)

        # 数据显示区域
        data_frame = ttk.LabelFrame(self.root, text="数据监控", padding=10)
        data_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)

        self.data_text = scrolledtext.ScrolledText(data_frame, height=20)
        self.data_text.pack(fill=tk.BOTH, expand=True)

        # 控制按钮框架
        ctrl_frame = ttk.Frame(self.root, padding=10)
        ctrl_frame.pack(fill=tk.X, padx=10, pady=5)

        ttk.Button(ctrl_frame, text="清空", 
                  command=self.clear_display).pack(side=tk.LEFT, padx=5)
        ttk.Button(ctrl_frame, text="LED开", 
                  command=lambda: self.send_command("LED ON")).pack(side=tk.LEFT, padx=5)
        ttk.Button(ctrl_frame, text="LED关", 
                  command=lambda: self.send_command("LED OFF")).pack(side=tk.LEFT, padx=5)

        # 状态栏
        self.status_label = ttk.Label(self.root, text="未连接", 
                                     relief=tk.SUNKEN, anchor=tk.W)
        self.status_label.pack(fill=tk.X, side=tk.BOTTOM)

    def toggle_connection(self):
        """切换连接状态"""
        if not self.is_running:
            self.connect()
        else:
            self.disconnect()

    def connect(self):
        """连接串口"""
        try:
            port = self.port_entry.get()
            baudrate = int(self.baudrate_combo.get())

            self.serial_port = serial.Serial(port, baudrate, timeout=0.1)
            self.is_running = True

            # 启动接收线程
            self.receive_thread = threading.Thread(target=self.receive_data)
            self.receive_thread.daemon = True
            self.receive_thread.start()

            self.connect_btn.config(text="断开")
            self.status_label.config(text=f"已连接: {port} @ {baudrate}")
            self.append_text(f"[系统] 已连接到 {port}\n", "green")

        except Exception as e:
            self.append_text(f"[错误] 连接失败: {e}\n", "red")

    def disconnect(self):
        """断开连接"""
        self.is_running = False
        if self.serial_port:
            self.serial_port.close()

        self.connect_btn.config(text="连接")
        self.status_label.config(text="未连接")
        self.append_text("[系统] 已断开连接\n", "orange")

    def receive_data(self):
        """接收数据线程"""
        while self.is_running:
            try:
                if self.serial_port.in_waiting:
                    data = self.serial_port.read(self.serial_port.in_waiting)
                    text = data.decode('utf-8', errors='ignore')
                    self.append_text(text)
            except Exception as e:
                self.append_text(f"[错误] 接收数据失败: {e}\n", "red")
            time.sleep(0.01)

    def send_command(self, cmd):
        """发送命令"""
        if self.serial_port and self.is_running:
            try:
                self.serial_port.write(f"{cmd}\r\n".encode())
                self.append_text(f"[发送] {cmd}\n", "blue")
            except Exception as e:
                self.append_text(f"[错误] 发送失败: {e}\n", "red")

    def append_text(self, text, color="black"):
        """添加文本到显示区域"""
        self.data_text.insert(tk.END, text)
        self.data_text.see(tk.END)

    def clear_display(self):
        """清空显示"""
        self.data_text.delete(1.0, tk.END)

# 运行应用
if __name__ == "__main__":
    root = tk.Tk()
    app = SerialMonitor(root)
    root.mainloop()

5.2 使用PyQt5创建专业级上位机

PyQt5提供更强大的GUI功能:

from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
                             QHBoxLayout, QTextEdit, QPushButton, QLabel,
                             QComboBox, QLineEdit, QGroupBox)
from PyQt5.QtCore import QThread, pyqtSignal
import serial
import sys

class SerialThread(QThread):
    """串口接收线程"""
    data_received = pyqtSignal(str)

    def __init__(self, serial_port):
        super().__init__()
        self.serial_port = serial_port
        self.is_running = True

    def run(self):
        while self.is_running:
            if self.serial_port.in_waiting:
                data = self.serial_port.read(self.serial_port.in_waiting)
                text = data.decode('utf-8', errors='ignore')
                self.data_received.emit(text)
            self.msleep(10)

    def stop(self):
        self.is_running = False

class MainWindow(QMainWindow):
    """主窗口"""

    def __init__(self):
        super().__init__()
        self.serial_port = None
        self.serial_thread = None
        self.init_ui()

    def init_ui(self):
        self.setWindowTitle('嵌入式设备监控系统')
        self.setGeometry(100, 100, 900, 700)

        # 中心部件
        central_widget = QWidget()
        self.setCentralWidget(central_widget)
        layout = QVBoxLayout(central_widget)

        # 连接设置组
        conn_group = QGroupBox("连接设置")
        conn_layout = QHBoxLayout()

        conn_layout.addWidget(QLabel("串口:"))
        self.port_input = QLineEdit("/dev/ttyUSB0")
        conn_layout.addWidget(self.port_input)

        conn_layout.addWidget(QLabel("波特率:"))
        self.baudrate_combo = QComboBox()
        self.baudrate_combo.addItems(['9600', '115200', '230400'])
        self.baudrate_combo.setCurrentText('115200')
        conn_layout.addWidget(self.baudrate_combo)

        self.connect_btn = QPushButton("连接")
        self.connect_btn.clicked.connect(self.toggle_connection)
        conn_layout.addWidget(self.connect_btn)

        conn_group.setLayout(conn_layout)
        layout.addWidget(conn_group)

        # 数据显示区
        self.data_display = QTextEdit()
        self.data_display.setReadOnly(True)
        layout.addWidget(self.data_display)

        # 控制按钮组
        ctrl_layout = QHBoxLayout()

        clear_btn = QPushButton("清空")
        clear_btn.clicked.connect(self.data_display.clear)
        ctrl_layout.addWidget(clear_btn)

        led_on_btn = QPushButton("LED开")
        led_on_btn.clicked.connect(lambda: self.send_command("LED ON"))
        ctrl_layout.addWidget(led_on_btn)

        led_off_btn = QPushButton("LED关")
        led_off_btn.clicked.connect(lambda: self.send_command("LED OFF"))
        ctrl_layout.addWidget(led_off_btn)

        layout.addLayout(ctrl_layout)

        # 状态栏
        self.statusBar().showMessage('未连接')

    def toggle_connection(self):
        if self.serial_port is None:
            self.connect()
        else:
            self.disconnect()

    def connect(self):
        try:
            port = self.port_input.text()
            baudrate = int(self.baudrate_combo.currentText())

            self.serial_port = serial.Serial(port, baudrate, timeout=0.1)

            # 启动接收线程
            self.serial_thread = SerialThread(self.serial_port)
            self.serial_thread.data_received.connect(self.on_data_received)
            self.serial_thread.start()

            self.connect_btn.setText("断开")
            self.statusBar().showMessage(f'已连接: {port} @ {baudrate}')
            self.data_display.append(f"[系统] 已连接到 {port}\n")

        except Exception as e:
            self.data_display.append(f"[错误] 连接失败: {e}\n")

    def disconnect(self):
        if self.serial_thread:
            self.serial_thread.stop()
            self.serial_thread.wait()

        if self.serial_port:
            self.serial_port.close()
            self.serial_port = None

        self.connect_btn.setText("连接")
        self.statusBar().showMessage('未连接')
        self.data_display.append("[系统] 已断开连接\n")

    def send_command(self, cmd):
        if self.serial_port:
            try:
                self.serial_port.write(f"{cmd}\r\n".encode())
                self.data_display.append(f"[发送] {cmd}\n")
            except Exception as e:
                self.data_display.append(f"[错误] 发送失败: {e}\n")

    def on_data_received(self, data):
        self.data_display.append(data)
        self.data_display.ensureCursorVisible()

if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = MainWindow()
    window.show()
    sys.exit(app.exec_())

步骤6:实战项目 - 智能温度监控系统

6.1 项目概述

创建一个完整的温度监控系统,包括: - ESP32 + 温度传感器(嵌入式端) - Python上位机(数据采集和可视化) - 数据存储和历史记录

6.2 嵌入式端代码(MicroPython)

# main.py - 运行在ESP32上
from machine import Pin, I2C
import network
import socket
import time
import json

# 初始化I2C和传感器
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
SHT31_ADDR = 0x44

def read_temperature():
    """读取温度"""
    i2c.writeto(SHT31_ADDR, b'\x2C\x06')
    time.sleep(0.5)
    data = i2c.readfrom(SHT31_ADDR, 6)
    temp_raw = (data[0] << 8) | data[1]
    temp = -45 + (175 * temp_raw / 65535.0)
    hum_raw = (data[3] << 8) | data[4]
    humidity = 100 * hum_raw / 65535.0
    return temp, humidity

def connect_wifi(ssid, password):
    """连接WiFi"""
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    wlan.connect(ssid, password)

    while not wlan.isconnected():
        time.sleep(1)

    print('WiFi connected:', wlan.ifconfig()[0])
    return wlan.ifconfig()[0]

def start_server():
    """启动HTTP服务器"""
    addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
    s = socket.socket()
    s.bind(addr)
    s.listen(1)
    print('Server listening on', addr)

    while True:
        cl, addr = s.accept()
        print('Client connected from', addr)

        request = cl.recv(1024).decode()

        if '/data' in request:
            # 读取传感器数据
            temp, hum = read_temperature()

            # 构造JSON响应
            data = {
                'temperature': round(temp, 2),
                'humidity': round(hum, 2),
                'timestamp': time.time()
            }

            response = json.dumps(data)

            cl.send('HTTP/1.1 200 OK\r\n')
            cl.send('Content-Type: application/json\r\n')
            cl.send('Access-Control-Allow-Origin: *\r\n')
            cl.send('\r\n')
            cl.send(response)

        cl.close()

# 主程序
if __name__ == '__main__':
    ip = connect_wifi('your_ssid', 'your_password')
    print(f'Access the data at: http://{ip}/data')
    start_server()

6.3 上位机代码(Python + Matplotlib)

# monitor.py - 运行在PC上
import requests
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from datetime import datetime
import json

class TemperatureMonitor:
    """温度监控类"""

    def __init__(self, device_ip):
        self.device_ip = device_ip
        self.url = f"http://{device_ip}/data"

        # 数据存储
        self.timestamps = []
        self.temperatures = []
        self.humidities = []

        # 设置图表
        self.fig, (self.ax1, self.ax2) = plt.subplots(2, 1, figsize=(10, 8))
        self.fig.suptitle('实时温湿度监控')

    def fetch_data(self):
        """获取数据"""
        try:
            response = requests.get(self.url, timeout=2)
            data = response.json()

            # 添加数据
            self.timestamps.append(datetime.now())
            self.temperatures.append(data['temperature'])
            self.humidities.append(data['humidity'])

            # 只保留最近100个数据点
            if len(self.timestamps) > 100:
                self.timestamps.pop(0)
                self.temperatures.pop(0)
                self.humidities.pop(0)

            return True
        except Exception as e:
            print(f"Error fetching data: {e}")
            return False

    def update_plot(self, frame):
        """更新图表"""
        if self.fetch_data():
            # 清空图表
            self.ax1.clear()
            self.ax2.clear()

            # 绘制温度
            self.ax1.plot(self.timestamps, self.temperatures, 'r-', label='温度')
            self.ax1.set_ylabel('温度 (°C)')
            self.ax1.legend()
            self.ax1.grid(True)

            # 绘制湿度
            self.ax2.plot(self.timestamps, self.humidities, 'b-', label='湿度')
            self.ax2.set_ylabel('湿度 (%)')
            self.ax2.set_xlabel('时间')
            self.ax2.legend()
            self.ax2.grid(True)

            # 格式化x轴
            self.fig.autofmt_xdate()

    def start(self):
        """启动监控"""
        ani = animation.FuncAnimation(
            self.fig, self.update_plot, interval=2000, cache_frame_data=False
        )
        plt.show()

    def save_data(self, filename='temperature_log.json'):
        """保存数据到文件"""
        data = {
            'timestamps': [t.isoformat() for t in self.timestamps],
            'temperatures': self.temperatures,
            'humidities': self.humidities
        }

        with open(filename, 'w') as f:
            json.dump(data, f, indent=2)

        print(f"Data saved to {filename}")

# 使用示例
if __name__ == '__main__':
    monitor = TemperatureMonitor('192.168.1.100')  # 替换为你的ESP32 IP

    try:
        monitor.start()
    except KeyboardInterrupt:
        monitor.save_data()
        print("\nMonitoring stopped")

故障排除

问题1:无法连接到MicroPython REPL

可能原因: - 串口驱动未安装 - 串口被其他程序占用 - 波特率设置错误 - USB线缆质量问题

解决方法: 1. 检查设备管理器中的串口设备 2. 关闭其他串口工具(Arduino IDE、PuTTY等) 3. 尝试不同的波特率(115200、9600) 4. 更换USB线缆或USB端口 5. 重新烧录MicroPython固件

问题2:MicroPython程序运行异常

可能原因: - 内存不足 - 语法错误 - 模块导入失败 - 硬件连接问题

解决方法

# 检查可用内存
import gc
gc.collect()
print(f"Free memory: {gc.mem_free()} bytes")

# 捕获异常
try:
    # 你的代码
    pass
except Exception as e:
    print(f"Error: {e}")
    import sys
    sys.print_exception(e)

问题3:CircuitPython代码不自动运行

可能原因: - 文件名不是code.py - 语法错误导致程序崩溃 - 文件保存失败

解决方法: 1. 确保文件名为code.py 2. 检查boot_out.txt查看错误信息 3. 使用串口连接查看详细错误 4. 尝试重新保存文件

问题4:Python上位机无法连接设备

可能原因: - 串口权限不足(Linux) - 防火墙阻止连接 - IP地址错误 - 设备未连接到网络

解决方法

# Linux: 添加用户到dialout组
sudo usermod -a -G dialout $USER
# 需要重新登录

# 检查串口权限
ls -l /dev/ttyUSB0

# 测试网络连接
ping 192.168.1.100

问题5:数据传输不稳定

可能原因: - 串口缓冲区溢出 - 数据格式错误 - 网络延迟

解决方法

# 增加串口缓冲区
ser = serial.Serial(port, baudrate, timeout=1)
ser.set_buffer_size(rx_size=4096, tx_size=4096)

# 添加数据校验
import hashlib

def send_with_checksum(data):
    checksum = hashlib.md5(data.encode()).hexdigest()[:8]
    return f"{data}|{checksum}"

def verify_checksum(received):
    data, checksum = received.rsplit('|', 1)
    expected = hashlib.md5(data.encode()).hexdigest()[:8]
    return checksum == expected

总结

通过本教程,你学习了:

  • ✅ MicroPython的基本使用和GPIO控制
  • ✅ CircuitPython的快速开发方法
  • ✅ Python在测试自动化中的应用
  • ✅ 使用Python开发上位机应用
  • ✅ 完整的温度监控系统实战项目

Python在嵌入式开发中的优势: - 快速原型开发 - 丰富的库支持 - 易于学习和使用 - 强大的数据处理能力 - 跨平台支持

适用场景: - 教育和学习 - 快速原型验证 - 测试和调试工具 - 数据采集和分析 - 上位机应用开发

进阶挑战

尝试以下挑战来巩固学习:

  1. 挑战1:MQTT物联网通信
  2. 在ESP32上实现MQTT客户端
  3. 发布传感器数据到MQTT服务器
  4. 使用Python订阅并处理数据

  5. 挑战2:Web界面控制

  6. 在ESP32上创建Web服务器
  7. 实现HTML/CSS/JavaScript前端
  8. 通过浏览器控制LED和读取传感器

  9. 挑战3:数据库存储

  10. 使用SQLite或MySQL存储历史数据
  11. 实现数据查询和统计功能
  12. 生成数据报表

  13. 挑战4:机器学习应用

  14. 收集传感器数据训练模型
  15. 使用TensorFlow Lite在ESP32上运行模型
  16. 实现异常检测功能

完整代码和资源

代码仓库

完整的项目代码可以在GitHub上找到: - MicroPython示例:github.com/example/micropython-examples - CircuitPython示例:github.com/example/circuitpython-examples - 上位机代码:github.com/example/python-host-tools

推荐资源

官方文档: - MicroPython官方文档 - CircuitPython官方文档 - Adafruit学习系统

开发板推荐: - ESP32系列:性价比高,WiFi/蓝牙支持 - Raspberry Pi Pico:低成本,易于使用 - Circuit Playground Express:教育友好,功能丰富 - PyBoard:MicroPython官方开发板

Python库推荐: - pyserial:串口通信 - matplotlib:数据可视化 - tkinter/PyQt5:GUI开发 - requests:HTTP通信 - paho-mqtt:MQTT通信

下一步

建议继续学习:

参考资料

  1. MicroPython官方文档 - docs.micropython.org
  2. CircuitPython官方文档 - docs.circuitpython.org
  3. ESP32 MicroPython教程 - randomnerdtutorials.com
  4. Adafruit CircuitPython指南 - learn.adafruit.com
  5. Python Serial通信指南 - pyserial.readthedocs.io
  6. 《MicroPython for the Internet of Things》- Charles Bell
  7. 《Programming with MicroPython》- Nicholas H. Tollervey

反馈:如果你在学习过程中遇到问题或有改进建议,欢迎在评论区留言!

版权声明:本教程中的代码示例采用MIT许可证,可自由使用和修改。