跳转至

硬件在环(HIL)测试系统完整实现指南

项目概述

本项目将指导你构建一个完整的硬件在环(Hardware-in-Loop, HIL)测试系统,用于自动化测试嵌入式设备。HIL测试是一种将真实硬件与模拟环境相结合的测试方法,能够在接近真实工作条件下验证系统功能。

项目目标

完成本项目后,你将拥有:

  • 完整的HIL测试架构设计
  • 自动化测试平台实现
  • 测试用例管理系统
  • 实时数据采集和分析工具
  • 测试报告生成系统
  • CI/CD集成方案
  • 可扩展的测试框架

项目特点

技术亮点: - 模块化架构设计 - 支持多种通信协议 - 实时数据采集和分析 - 自动化测试执行 - 详细的测试报告 - 易于扩展和维护

应用场景: - 汽车电子ECU测试 - 工业控制器验证 - 物联网设备测试 - 机器人系统测试 - 航空航天系统验证

前置要求

知识要求

必备知识: - 精通C/C++或Python编程 - 深入理解嵌入式系统 - 熟悉测试理论和方法 - 了解硬件接口(UART, SPI, I2C, CAN等) - 掌握通信协议

推荐知识: - 了解测试自动化框架 - 熟悉数据库操作 - 了解Web开发基础 - 掌握版本控制(Git) - 了解CI/CD流程

技能要求

  • 能够设计系统架构
  • 能够编写自动化脚本
  • 能够调试硬件问题
  • 能够分析测试数据
  • 能够编写技术文档

准备工作

硬件准备

名称 数量 说明 参考价格
测试主机 1 PC或工控机,运行测试软件 ¥3000-10000
被测设备(DUT) 1+ 需要测试的嵌入式设备 根据项目
数据采集卡 1 USB/PCIe数据采集卡 ¥500-5000
串口转换器 2-4 USB转UART/RS485 ¥50-200
CAN分析仪 1 用于CAN总线测试(可选) ¥500-3000
逻辑分析仪 1 用于信号分析(可选) ¥200-2000
电源 1-2 可调电源,用于供电测试 ¥300-1000
继电器模块 1 用于电源控制 ¥50-200
信号发生器 1 模拟传感器信号(可选) ¥500-3000

软件准备

开发环境: - 操作系统: Linux (Ubuntu 20.04+) 或 Windows 10+ - 编程语言: Python 3.8+ 或 C++17 - IDE: VS Code, PyCharm 或 Visual Studio - 版本控制: Git

必需软件:

# Python环境
sudo apt install python3 python3-pip python3-venv

# 测试框架
pip install pytest pytest-html pytest-cov

# 串口通信
pip install pyserial

# 数据处理
pip install numpy pandas matplotlib

# Web界面(可选)
pip install flask flask-socketio

# 数据库(可选)
pip install sqlalchemy sqlite3

可选软件: - LabVIEW (用于复杂信号处理) - MATLAB (用于数据分析) - Jenkins (用于CI/CD) - Grafana (用于数据可视化)

系统要求

最低配置: - CPU: Intel i5 或同等性能 - 内存: 8GB RAM - 存储: 100GB 可用空间 - 网络: 以太网接口

推荐配置: - CPU: Intel i7 或更高 - 内存: 16GB+ RAM - 存储: 256GB SSD - 网络: 千兆以太网 - 显示: 双显示器

步骤1: 系统架构设计

1.1 HIL测试系统概述

什么是HIL测试?

硬件在环(Hardware-in-Loop)测试是一种将真实硬件与模拟环境相结合的测试方法。系统的一部分使用真实硬件,其他部分通过软件模拟。

HIL测试的优势: - 在安全环境中测试危险场景 - 降低测试成本 - 提高测试覆盖率 - 支持自动化测试 - 可重复性好 - 缩短开发周期

典型应用:

汽车ECU测试:
真实ECU ←→ HIL系统(模拟传感器、执行器、总线)

工业控制器测试:
真实PLC ←→ HIL系统(模拟工艺过程、I/O信号)

物联网设备测试:
真实设备 ←→ HIL系统(模拟云端、传感器、网络)

1.2 系统架构

整体架构图:

┌─────────────────────────────────────────────────────────┐
│                    测试管理层                            │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐             │
│  │测试调度器│  │用例管理器│  │报告生成器│             │
│  └──────────┘  └──────────┘  └──────────┘             │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│                    测试执行层                            │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐             │
│  │测试引擎  │  │数据采集  │  │结果验证  │             │
│  └──────────┘  └──────────┘  └──────────┘             │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│                    硬件抽象层                            │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐             │
│  │串口驱动  │  │CAN驱动   │  │GPIO控制  │             │
│  └──────────┘  └──────────┘  └──────────┘             │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│                    硬件接口层                            │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐             │
│  │数据采集卡│  │通信接口  │  │电源控制  │             │
│  └──────────┘  └──────────┘  └──────────┘             │
└─────────────────────────────────────────────────────────┘
                  ┌──────────┐
                  │ 被测设备  │
                  │  (DUT)   │
                  └──────────┘

1.3 核心模块设计

1. 测试管理模块

功能: - 测试用例管理 - 测试计划调度 - 测试执行控制 - 结果收集和报告

2. 硬件接口模块

功能: - 串口通信 - CAN总线通信 - GPIO控制 - 模拟信号输入/输出 - 数字信号输入/输出

3. 数据采集模块

功能: - 实时数据采集 - 数据缓存 - 数据预处理 - 时间戳管理

4. 测试执行模块

功能: - 测试用例执行 - 激励信号生成 - 响应数据采集 - 超时控制

5. 结果验证模块

功能: - 数据对比 - 阈值检查 - 时序验证 - 协议验证

6. 报告生成模块

功能: - 测试结果汇总 - 图表生成 - HTML/PDF报告 - 数据导出

1.4 技术选型

编程语言选择:

语言 优势 劣势 适用场景
Python 开发快速、库丰富、易维护 性能较低 测试管理、数据分析
C++ 性能高、实时性好 开发复杂 实时数据采集、硬件控制
LabVIEW 图形化编程、硬件支持好 成本高、不开源 复杂信号处理

本项目选择: Python (主要) + C++ (性能关键部分)

通信协议: - UART/RS232/RS485: 通用串口通信 - CAN: 汽车和工业应用 - Ethernet: 高速数据传输 - USB: 设备连接

数据存储: - SQLite: 轻量级数据库 - CSV: 简单数据导出 - HDF5: 大量数据存储

测试框架: - pytest: Python测试框架 - unittest: Python标准库 - Robot Framework: 关键字驱动测试

1.5 目录结构设计

hil_test_system/
├── config/                 # 配置文件
│   ├── hardware.yaml      # 硬件配置
│   ├── test_config.yaml   # 测试配置
│   └── logging.yaml       # 日志配置
├── src/                    # 源代码
│   ├── core/              # 核心模块
│   │   ├── test_manager.py
│   │   ├── test_executor.py
│   │   └── result_validator.py
│   ├── hardware/          # 硬件接口
│   │   ├── serial_interface.py
│   │   ├── can_interface.py
│   │   └── gpio_interface.py
│   ├── utils/             # 工具函数
│   │   ├── logger.py
│   │   ├── data_processor.py
│   │   └── report_generator.py
│   └── models/            # 数据模型
│       ├── test_case.py
│       └── test_result.py
├── tests/                  # 测试用例
│   ├── test_cases/        # 具体测试用例
│   ├── test_data/         # 测试数据
│   └── test_scripts/      # 测试脚本
├── reports/                # 测试报告
├── logs/                   # 日志文件
├── docs/                   # 文档
├── requirements.txt        # Python依赖
├── setup.py               # 安装脚本
└── README.md              # 项目说明

步骤2: 硬件接口层实现

2.1 串口通信接口

串口接口类设计 (serial_interface.py):

import serial
import threading
import queue
import time
from typing import Optional, Callable
import logging

class SerialInterface:
    """串口通信接口类"""

    def __init__(self, port: str, baudrate: int = 115200, 
                 timeout: float = 1.0):
        """
        初始化串口接口

        Args:
            port: 串口名称 (如 'COM1' 或 '/dev/ttyUSB0')
            baudrate: 波特率
            timeout: 超时时间(秒)
        """
        self.port = port
        self.baudrate = baudrate
        self.timeout = timeout
        self.serial_port: Optional[serial.Serial] = None
        self.is_open = False

        # 接收线程相关
        self.rx_thread: Optional[threading.Thread] = None
        self.rx_queue = queue.Queue()
        self.rx_callback: Optional[Callable] = None
        self.running = False

        self.logger = logging.getLogger(__name__)

    def open(self) -> bool:
        """
        打开串口

        Returns:
            bool: 成功返回True,失败返回False
        """
        try:
            self.serial_port = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                bytesize=serial.EIGHTBITS,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                timeout=self.timeout
            )
            self.is_open = True
            self.logger.info(f"串口 {self.port} 打开成功")

            # 启动接收线程
            self.running = True
            self.rx_thread = threading.Thread(
                target=self._rx_thread_func,
                daemon=True
            )
            self.rx_thread.start()

            return True

        except serial.SerialException as e:
            self.logger.error(f"打开串口失败: {e}")
            return False

    def close(self):
        """关闭串口"""
        self.running = False

        if self.rx_thread:
            self.rx_thread.join(timeout=2.0)

        if self.serial_port and self.is_open:
            self.serial_port.close()
            self.is_open = False
            self.logger.info(f"串口 {self.port} 已关闭")

    def write(self, data: bytes) -> int:
        """
        发送数据

        Args:
            data: 要发送的字节数据

        Returns:
            int: 实际发送的字节数
        """
        if not self.is_open or not self.serial_port:
            self.logger.error("串口未打开")
            return 0

        try:
            bytes_written = self.serial_port.write(data)
            self.logger.debug(f"发送 {bytes_written} 字节: {data.hex()}")
            return bytes_written

        except serial.SerialException as e:
            self.logger.error(f"发送数据失败: {e}")
            return 0

    def read(self, size: int = 1, timeout: Optional[float] = None) -> bytes:
        """
        读取数据

        Args:
            size: 要读取的字节数
            timeout: 超时时间(秒),None使用默认超时

        Returns:
            bytes: 读取到的数据
        """
        if not self.is_open or not self.serial_port:
            self.logger.error("串口未打开")
            return b''

        try:
            if timeout is not None:
                old_timeout = self.serial_port.timeout
                self.serial_port.timeout = timeout

            data = self.serial_port.read(size)

            if timeout is not None:
                self.serial_port.timeout = old_timeout

            if data:
                self.logger.debug(f"接收 {len(data)} 字节: {data.hex()}")

            return data

        except serial.SerialException as e:
            self.logger.error(f"读取数据失败: {e}")
            return b''

    def read_line(self, timeout: Optional[float] = None) -> str:
        """
        读取一行数据(以\\n结尾)

        Args:
            timeout: 超时时间(秒)

        Returns:
            str: 读取到的字符串
        """
        if not self.is_open or not self.serial_port:
            return ""

        try:
            if timeout is not None:
                old_timeout = self.serial_port.timeout
                self.serial_port.timeout = timeout

            line = self.serial_port.readline().decode('utf-8', errors='ignore')

            if timeout is not None:
                self.serial_port.timeout = old_timeout

            return line.strip()

        except Exception as e:
            self.logger.error(f"读取行失败: {e}")
            return ""

    def flush(self):
        """清空缓冲区"""
        if self.serial_port and self.is_open:
            self.serial_port.reset_input_buffer()
            self.serial_port.reset_output_buffer()

    def set_rx_callback(self, callback: Callable[[bytes], None]):
        """
        设置接收回调函数

        Args:
            callback: 回调函数,参数为接收到的数据
        """
        self.rx_callback = callback

    def _rx_thread_func(self):
        """接收线程函数"""
        while self.running and self.is_open:
            try:
                if self.serial_port.in_waiting > 0:
                    data = self.serial_port.read(self.serial_port.in_waiting)

                    # 放入队列
                    self.rx_queue.put(data)

                    # 调用回调
                    if self.rx_callback:
                        self.rx_callback(data)
                else:
                    time.sleep(0.01)  # 避免CPU占用过高

            except Exception as e:
                self.logger.error(f"接收线程错误: {e}")
                time.sleep(0.1)

    def get_rx_data(self, timeout: float = 0.1) -> Optional[bytes]:
        """
        从接收队列获取数据

        Args:
            timeout: 超时时间(秒)

        Returns:
            bytes: 接收到的数据,超时返回None
        """
        try:
            return self.rx_queue.get(timeout=timeout)
        except queue.Empty:
            return None

2.2 CAN总线接口

CAN接口类设计 (can_interface.py):

import can
import threading
import queue
from typing import Optional, Callable, List
import logging

class CANInterface:
    """CAN总线接口类"""

    def __init__(self, channel: str, bustype: str = 'socketcan',
                 bitrate: int = 500000):
        """
        初始化CAN接口

        Args:
            channel: CAN通道 (如 'can0', 'PCAN_USBBUS1')
            bustype: 总线类型 ('socketcan', 'pcan', 'vector'等)
            bitrate: 波特率
        """
        self.channel = channel
        self.bustype = bustype
        self.bitrate = bitrate
        self.bus: Optional[can.Bus] = None
        self.is_open = False

        # 接收相关
        self.rx_thread: Optional[threading.Thread] = None
        self.rx_queue = queue.Queue()
        self.rx_callback: Optional[Callable] = None
        self.running = False

        # 过滤器
        self.filters: List[dict] = []

        self.logger = logging.getLogger(__name__)

    def open(self) -> bool:
        """打开CAN总线"""
        try:
            self.bus = can.Bus(
                channel=self.channel,
                bustype=self.bustype,
                bitrate=self.bitrate
            )
            self.is_open = True
            self.logger.info(f"CAN总线 {self.channel} 打开成功")

            # 启动接收线程
            self.running = True
            self.rx_thread = threading.Thread(
                target=self._rx_thread_func,
                daemon=True
            )
            self.rx_thread.start()

            return True

        except can.CanError as e:
            self.logger.error(f"打开CAN总线失败: {e}")
            return False

    def close(self):
        """关闭CAN总线"""
        self.running = False

        if self.rx_thread:
            self.rx_thread.join(timeout=2.0)

        if self.bus and self.is_open:
            self.bus.shutdown()
            self.is_open = False
            self.logger.info(f"CAN总线 {self.channel} 已关闭")

    def send(self, can_id: int, data: bytes, 
             is_extended: bool = False) -> bool:
        """
        发送CAN消息

        Args:
            can_id: CAN ID
            data: 数据(最多8字节)
            is_extended: 是否为扩展帧

        Returns:
            bool: 成功返回True
        """
        if not self.is_open or not self.bus:
            self.logger.error("CAN总线未打开")
            return False

        if len(data) > 8:
            self.logger.error("CAN数据长度不能超过8字节")
            return False

        try:
            msg = can.Message(
                arbitration_id=can_id,
                data=data,
                is_extended_id=is_extended
            )
            self.bus.send(msg)
            self.logger.debug(f"发送CAN消息: ID=0x{can_id:X}, Data={data.hex()}")
            return True

        except can.CanError as e:
            self.logger.error(f"发送CAN消息失败: {e}")
            return False

    def receive(self, timeout: float = 1.0) -> Optional[can.Message]:
        """
        接收CAN消息

        Args:
            timeout: 超时时间(秒)

        Returns:
            can.Message: 接收到的消息,超时返回None
        """
        if not self.is_open or not self.bus:
            return None

        try:
            msg = self.bus.recv(timeout=timeout)
            if msg:
                self.logger.debug(
                    f"接收CAN消息: ID=0x{msg.arbitration_id:X}, "
                    f"Data={msg.data.hex()}"
                )
            return msg

        except can.CanError as e:
            self.logger.error(f"接收CAN消息失败: {e}")
            return None

    def set_filters(self, filters: List[dict]):
        """
        设置CAN过滤器

        Args:
            filters: 过滤器列表
                例如: [{"can_id": 0x123, "can_mask": 0x7FF}]
        """
        self.filters = filters
        if self.bus and self.is_open:
            try:
                self.bus.set_filters(filters)
                self.logger.info(f"设置CAN过滤器: {filters}")
            except can.CanError as e:
                self.logger.error(f"设置过滤器失败: {e}")

    def set_rx_callback(self, callback: Callable[[can.Message], None]):
        """设置接收回调函数"""
        self.rx_callback = callback

    def _rx_thread_func(self):
        """接收线程函数"""
        while self.running and self.is_open:
            try:
                msg = self.bus.recv(timeout=0.1)
                if msg:
                    # 放入队列
                    self.rx_queue.put(msg)

                    # 调用回调
                    if self.rx_callback:
                        self.rx_callback(msg)

            except Exception as e:
                self.logger.error(f"CAN接收线程错误: {e}")

    def get_rx_message(self, timeout: float = 0.1) -> Optional[can.Message]:
        """从接收队列获取消息"""
        try:
            return self.rx_queue.get(timeout=timeout)
        except queue.Empty:
            return None

2.3 GPIO控制接口

GPIO接口类设计 (gpio_interface.py):

import time
from typing import Dict, Optional
import logging

try:
    import RPi.GPIO as GPIO  # 树莓派
    HAS_RPI_GPIO = True
except ImportError:
    HAS_RPI_GPIO = False

class GPIOInterface:
    """GPIO控制接口类"""

    # GPIO模式
    MODE_INPUT = 'input'
    MODE_OUTPUT = 'output'

    # 电平
    LOW = 0
    HIGH = 1

    def __init__(self, use_bcm: bool = True):
        """
        初始化GPIO接口

        Args:
            use_bcm: 使用BCM编号(True)或BOARD编号(False)
        """
        self.use_bcm = use_bcm
        self.pin_modes: Dict[int, str] = {}
        self.is_initialized = False
        self.logger = logging.getLogger(__name__)

        if not HAS_RPI_GPIO:
            self.logger.warning("RPi.GPIO未安装,使用模拟模式")
            self.simulated = True
            self.pin_states: Dict[int, int] = {}
        else:
            self.simulated = False

    def initialize(self) -> bool:
        """初始化GPIO"""
        if self.simulated:
            self.is_initialized = True
            return True

        try:
            if self.use_bcm:
                GPIO.setmode(GPIO.BCM)
            else:
                GPIO.setmode(GPIO.BOARD)

            GPIO.setwarnings(False)
            self.is_initialized = True
            self.logger.info("GPIO初始化成功")
            return True

        except Exception as e:
            self.logger.error(f"GPIO初始化失败: {e}")
            return False

    def cleanup(self):
        """清理GPIO"""
        if not self.simulated and self.is_initialized:
            GPIO.cleanup()
            self.is_initialized = False
            self.logger.info("GPIO已清理")

    def setup_pin(self, pin: int, mode: str, 
                  pull_up_down: Optional[str] = None) -> bool:
        """
        配置GPIO引脚

        Args:
            pin: 引脚编号
            mode: 模式 ('input' 或 'output')
            pull_up_down: 上拉/下拉 ('up', 'down', None)

        Returns:
            bool: 成功返回True
        """
        if not self.is_initialized:
            self.logger.error("GPIO未初始化")
            return False

        if self.simulated:
            self.pin_modes[pin] = mode
            if mode == self.MODE_OUTPUT:
                self.pin_states[pin] = self.LOW
            return True

        try:
            gpio_mode = GPIO.IN if mode == self.MODE_INPUT else GPIO.OUT

            if pull_up_down:
                if pull_up_down == 'up':
                    pud = GPIO.PUD_UP
                elif pull_up_down == 'down':
                    pud = GPIO.PUD_DOWN
                else:
                    pud = GPIO.PUD_OFF
                GPIO.setup(pin, gpio_mode, pull_up_down=pud)
            else:
                GPIO.setup(pin, gpio_mode)

            self.pin_modes[pin] = mode
            self.logger.debug(f"配置引脚 {pin}{mode}")
            return True

        except Exception as e:
            self.logger.error(f"配置引脚失败: {e}")
            return False

    def write_pin(self, pin: int, value: int) -> bool:
        """
        写GPIO引脚

        Args:
            pin: 引脚编号
            value: 电平值 (0或1)

        Returns:
            bool: 成功返回True
        """
        if pin not in self.pin_modes:
            self.logger.error(f"引脚 {pin} 未配置")
            return False

        if self.pin_modes[pin] != self.MODE_OUTPUT:
            self.logger.error(f"引脚 {pin} 不是输出模式")
            return False

        if self.simulated:
            self.pin_states[pin] = value
            self.logger.debug(f"写引脚 {pin} = {value}")
            return True

        try:
            GPIO.output(pin, value)
            self.logger.debug(f"写引脚 {pin} = {value}")
            return True

        except Exception as e:
            self.logger.error(f"写引脚失败: {e}")
            return False

    def read_pin(self, pin: int) -> Optional[int]:
        """
        读GPIO引脚

        Args:
            pin: 引脚编号

        Returns:
            int: 电平值 (0或1),失败返回None
        """
        if pin not in self.pin_modes:
            self.logger.error(f"引脚 {pin} 未配置")
            return None

        if self.simulated:
            if self.pin_modes[pin] == self.MODE_OUTPUT:
                return self.pin_states.get(pin, self.LOW)
            else:
                # 模拟输入,返回随机值或固定值
                return self.LOW

        try:
            value = GPIO.input(pin)
            self.logger.debug(f"读引脚 {pin} = {value}")
            return value

        except Exception as e:
            self.logger.error(f"读引脚失败: {e}")
            return None

    def toggle_pin(self, pin: int) -> bool:
        """翻转GPIO引脚电平"""
        current = self.read_pin(pin)
        if current is None:
            return False

        new_value = self.HIGH if current == self.LOW else self.LOW
        return self.write_pin(pin, new_value)

    def pulse_pin(self, pin: int, duration: float = 0.1) -> bool:
        """
        产生脉冲信号

        Args:
            pin: 引脚编号
            duration: 脉冲持续时间(秒)

        Returns:
            bool: 成功返回True
        """
        if not self.write_pin(pin, self.HIGH):
            return False

        time.sleep(duration)

        return self.write_pin(pin, self.LOW)

步骤3: 测试用例管理

3.1 测试用例数据模型

测试用例类设计 (test_case.py):

from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
import json

class TestStatus(Enum):
    """测试状态枚举"""
    NOT_RUN = "not_run"
    RUNNING = "running"
    PASSED = "passed"
    FAILED = "failed"
    ERROR = "error"
    SKIPPED = "skipped"

class TestPriority(Enum):
    """测试优先级"""
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

@dataclass
class TestStep:
    """测试步骤"""
    step_id: int
    description: str
    action: str  # 动作类型: 'send', 'receive', 'wait', 'check'
    parameters: Dict[str, Any]
    expected_result: Optional[Dict[str, Any]] = None
    actual_result: Optional[Dict[str, Any]] = None
    status: TestStatus = TestStatus.NOT_RUN
    duration: float = 0.0
    error_message: str = ""

@dataclass
class TestCase:
    """测试用例"""
    test_id: str
    name: str
    description: str
    category: str
    priority: TestPriority
    steps: List[TestStep] = field(default_factory=list)

    # 测试配置
    timeout: float = 30.0
    retry_count: int = 0
    setup_required: bool = True
    teardown_required: bool = True

    # 测试结果
    status: TestStatus = TestStatus.NOT_RUN
    start_time: Optional[float] = None
    end_time: Optional[float] = None
    duration: float = 0.0
    error_message: str = ""

    # 元数据
    tags: List[str] = field(default_factory=list)
    author: str = ""
    created_date: str = ""
    modified_date: str = ""

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return {
            'test_id': self.test_id,
            'name': self.name,
            'description': self.description,
            'category': self.category,
            'priority': self.priority.name,
            'steps': [
                {
                    'step_id': step.step_id,
                    'description': step.description,
                    'action': step.action,
                    'parameters': step.parameters,
                    'expected_result': step.expected_result,
                    'actual_result': step.actual_result,
                    'status': step.status.value,
                    'duration': step.duration,
                    'error_message': step.error_message
                }
                for step in self.steps
            ],
            'timeout': self.timeout,
            'retry_count': self.retry_count,
            'status': self.status.value,
            'start_time': self.start_time,
            'end_time': self.end_time,
            'duration': self.duration,
            'error_message': self.error_message,
            'tags': self.tags,
            'author': self.author
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'TestCase':
        """从字典创建"""
        steps = [
            TestStep(
                step_id=s['step_id'],
                description=s['description'],
                action=s['action'],
                parameters=s['parameters'],
                expected_result=s.get('expected_result'),
                actual_result=s.get('actual_result'),
                status=TestStatus(s.get('status', 'not_run')),
                duration=s.get('duration', 0.0),
                error_message=s.get('error_message', '')
            )
            for s in data.get('steps', [])
        ]

        return cls(
            test_id=data['test_id'],
            name=data['name'],
            description=data['description'],
            category=data['category'],
            priority=TestPriority[data['priority']],
            steps=steps,
            timeout=data.get('timeout', 30.0),
            retry_count=data.get('retry_count', 0),
            status=TestStatus(data.get('status', 'not_run')),
            start_time=data.get('start_time'),
            end_time=data.get('end_time'),
            duration=data.get('duration', 0.0),
            error_message=data.get('error_message', ''),
            tags=data.get('tags', []),
            author=data.get('author', '')
        )

    def to_json(self) -> str:
        """转换为JSON字符串"""
        return json.dumps(self.to_dict(), indent=2)

    @classmethod
    def from_json(cls, json_str: str) -> 'TestCase':
        """从JSON字符串创建"""
        data = json.loads(json_str)
        return cls.from_dict(data)

3.2 测试用例管理器

测试用例管理器 (test_manager.py):

import os
import json
import yaml
from typing import List, Dict, Optional
from pathlib import Path
import logging

from models.test_case import TestCase, TestStatus, TestPriority

class TestCaseManager:
    """测试用例管理器"""

    def __init__(self, test_case_dir: str):
        """
        初始化测试用例管理器

        Args:
            test_case_dir: 测试用例目录
        """
        self.test_case_dir = Path(test_case_dir)
        self.test_cases: Dict[str, TestCase] = {}
        self.logger = logging.getLogger(__name__)

        # 确保目录存在
        self.test_case_dir.mkdir(parents=True, exist_ok=True)

    def load_test_cases(self) -> int:
        """
        加载所有测试用例

        Returns:
            int: 加载的测试用例数量
        """
        count = 0

        # 支持JSON和YAML格式
        for ext in ['*.json', '*.yaml', '*.yml']:
            for file_path in self.test_case_dir.glob(ext):
                try:
                    test_case = self._load_test_case_file(file_path)
                    if test_case:
                        self.test_cases[test_case.test_id] = test_case
                        count += 1
                        self.logger.info(f"加载测试用例: {test_case.test_id}")

                except Exception as e:
                    self.logger.error(f"加载测试用例失败 {file_path}: {e}")

        self.logger.info(f"共加载 {count} 个测试用例")
        return count

    def _load_test_case_file(self, file_path: Path) -> Optional[TestCase]:
        """加载单个测试用例文件"""
        with open(file_path, 'r', encoding='utf-8') as f:
            if file_path.suffix == '.json':
                data = json.load(f)
            else:  # YAML
                data = yaml.safe_load(f)

        return TestCase.from_dict(data)

    def save_test_case(self, test_case: TestCase, 
                       format: str = 'json') -> bool:
        """
        保存测试用例

        Args:
            test_case: 测试用例对象
            format: 保存格式 ('json' 或 'yaml')

        Returns:
            bool: 成功返回True
        """
        try:
            ext = '.json' if format == 'json' else '.yaml'
            file_path = self.test_case_dir / f"{test_case.test_id}{ext}"

            with open(file_path, 'w', encoding='utf-8') as f:
                if format == 'json':
                    json.dump(test_case.to_dict(), f, indent=2)
                else:
                    yaml.dump(test_case.to_dict(), f, 
                             default_flow_style=False)

            self.logger.info(f"保存测试用例: {test_case.test_id}")
            return True

        except Exception as e:
            self.logger.error(f"保存测试用例失败: {e}")
            return False

    def get_test_case(self, test_id: str) -> Optional[TestCase]:
        """获取测试用例"""
        return self.test_cases.get(test_id)

    def get_all_test_cases(self) -> List[TestCase]:
        """获取所有测试用例"""
        return list(self.test_cases.values())

    def filter_test_cases(self, 
                         category: Optional[str] = None,
                         priority: Optional[TestPriority] = None,
                         tags: Optional[List[str]] = None,
                         status: Optional[TestStatus] = None) -> List[TestCase]:
        """
        过滤测试用例

        Args:
            category: 类别
            priority: 优先级
            tags: 标签列表
            status: 状态

        Returns:
            List[TestCase]: 符合条件的测试用例列表
        """
        filtered = self.get_all_test_cases()

        if category:
            filtered = [tc for tc in filtered if tc.category == category]

        if priority:
            filtered = [tc for tc in filtered if tc.priority == priority]

        if tags:
            filtered = [
                tc for tc in filtered 
                if any(tag in tc.tags for tag in tags)
            ]

        if status:
            filtered = [tc for tc in filtered if tc.status == status]

        return filtered

    def get_statistics(self) -> Dict[str, Any]:
        """
        获取测试用例统计信息

        Returns:
            Dict: 统计信息
        """
        total = len(self.test_cases)

        status_count = {}
        for status in TestStatus:
            count = len([
                tc for tc in self.test_cases.values() 
                if tc.status == status
            ])
            status_count[status.value] = count

        priority_count = {}
        for priority in TestPriority:
            count = len([
                tc for tc in self.test_cases.values() 
                if tc.priority == priority
            ])
            priority_count[priority.name] = count

        category_count = {}
        for tc in self.test_cases.values():
            category_count[tc.category] = category_count.get(tc.category, 0) + 1

        return {
            'total': total,
            'by_status': status_count,
            'by_priority': priority_count,
            'by_category': category_count
        }

3.3 测试用例示例

串口通信测试用例 (test_uart_communication.yaml):

test_id: "UART_001"
name: "串口基本通信测试"
description: "测试串口发送和接收功能"
category: "communication"
priority: "HIGH"
timeout: 10.0
retry_count: 1
tags:
  - uart
  - communication
  - basic
author: "测试工程师"
created_date: "2024-01-15"

steps:
  - step_id: 1
    description: "打开串口"
    action: "setup"
    parameters:
      port: "/dev/ttyUSB0"
      baudrate: 115200
    expected_result:
      status: "success"

  - step_id: 2
    description: "发送测试命令"
    action: "send"
    parameters:
      interface: "uart"
      data: "AT\\r\\n"
      encoding: "ascii"
    expected_result:
      sent_bytes: 4

  - step_id: 3
    description: "等待响应"
    action: "wait"
    parameters:
      duration: 1.0

  - step_id: 4
    description: "接收响应"
    action: "receive"
    parameters:
      interface: "uart"
      timeout: 2.0
    expected_result:
      data_contains: "OK"
      min_length: 2

  - step_id: 5
    description: "验证响应"
    action: "check"
    parameters:
      check_type: "string_match"
      pattern: "OK"
    expected_result:
      match: true

  - step_id: 6
    description: "关闭串口"
    action: "teardown"
    parameters:
      interface: "uart"
    expected_result:
      status: "success"

CAN总线测试用例 (test_can_communication.yaml):

test_id: "CAN_001"
name: "CAN总线消息收发测试"
description: "测试CAN总线发送和接收功能"
category: "communication"
priority: "HIGH"
timeout: 15.0
tags:
  - can
  - communication
author: "测试工程师"

steps:
  - step_id: 1
    description: "初始化CAN总线"
    action: "setup"
    parameters:
      channel: "can0"
      bitrate: 500000
    expected_result:
      status: "success"

  - step_id: 2
    description: "发送CAN消息"
    action: "send"
    parameters:
      interface: "can"
      can_id: 0x123
      data: [0x01, 0x02, 0x03, 0x04]
      is_extended: false
    expected_result:
      status: "success"

  - step_id: 3
    description: "接收CAN响应"
    action: "receive"
    parameters:
      interface: "can"
      timeout: 3.0
      filter_id: 0x456
    expected_result:
      can_id: 0x456
      data_length: 8

  - step_id: 4
    description: "验证数据"
    action: "check"
    parameters:
      check_type: "data_match"
      expected_data: [0x05, 0x06, 0x07, 0x08, 0x00, 0x00, 0x00, 0x00]
    expected_result:
      match: true

步骤4: 测试执行引擎

4.1 测试执行器设计

测试执行器 (test_executor.py):

import time
from typing import Dict, Any, Optional
import logging

from models.test_case import TestCase, TestStep, TestStatus
from hardware.serial_interface import SerialInterface
from hardware.can_interface import CANInterface
from hardware.gpio_interface import GPIOInterface

class TestExecutor:
    """测试执行器"""

    def __init__(self, config: Dict[str, Any]):
        """
        初始化测试执行器

        Args:
            config: 配置字典
        """
        self.config = config
        self.logger = logging.getLogger(__name__)

        # 硬件接口
        self.serial_interface: Optional[SerialInterface] = None
        self.can_interface: Optional[CANInterface] = None
        self.gpio_interface: Optional[GPIOInterface] = None

        # 测试上下文
        self.context: Dict[str, Any] = {}

    def execute_test_case(self, test_case: TestCase) -> bool:
        """
        执行测试用例

        Args:
            test_case: 测试用例对象

        Returns:
            bool: 测试通过返回True
        """
        self.logger.info(f"开始执行测试: {test_case.test_id} - {test_case.name}")

        test_case.status = TestStatus.RUNNING
        test_case.start_time = time.time()

        try:
            # 执行setup
            if test_case.setup_required:
                if not self._execute_setup(test_case):
                    test_case.status = TestStatus.ERROR
                    test_case.error_message = "Setup失败"
                    return False

            # 执行测试步骤
            for step in test_case.steps:
                if not self._execute_step(step):
                    test_case.status = TestStatus.FAILED
                    test_case.error_message = f"步骤 {step.step_id} 失败: {step.error_message}"
                    return False

            # 所有步骤通过
            test_case.status = TestStatus.PASSED
            self.logger.info(f"测试通过: {test_case.test_id}")
            return True

        except Exception as e:
            test_case.status = TestStatus.ERROR
            test_case.error_message = str(e)
            self.logger.error(f"测试执行异常: {e}")
            return False

        finally:
            # 执行teardown
            if test_case.teardown_required:
                self._execute_teardown(test_case)

            test_case.end_time = time.time()
            test_case.duration = test_case.end_time - test_case.start_time
            self.logger.info(f"测试完成,耗时: {test_case.duration:.2f}秒")

    def _execute_setup(self, test_case: TestCase) -> bool:
        """执行测试准备"""
        self.logger.debug("执行Setup")

        # 初始化硬件接口
        if 'uart' in test_case.tags:
            self.serial_interface = SerialInterface(
                port=self.config.get('uart_port', '/dev/ttyUSB0'),
                baudrate=self.config.get('uart_baudrate', 115200)
            )
            if not self.serial_interface.open():
                return False

        if 'can' in test_case.tags:
            self.can_interface = CANInterface(
                channel=self.config.get('can_channel', 'can0'),
                bitrate=self.config.get('can_bitrate', 500000)
            )
            if not self.can_interface.open():
                return False

        if 'gpio' in test_case.tags:
            self.gpio_interface = GPIOInterface()
            if not self.gpio_interface.initialize():
                return False

        return True

    def _execute_teardown(self, test_case: TestCase):
        """执行测试清理"""
        self.logger.debug("执行Teardown")

        if self.serial_interface:
            self.serial_interface.close()
            self.serial_interface = None

        if self.can_interface:
            self.can_interface.close()
            self.can_interface = None

        if self.gpio_interface:
            self.gpio_interface.cleanup()
            self.gpio_interface = None

    def _execute_step(self, step: TestStep) -> bool:
        """
        执行单个测试步骤

        Args:
            step: 测试步骤对象

        Returns:
            bool: 步骤通过返回True
        """
        self.logger.info(f"执行步骤 {step.step_id}: {step.description}")

        step.status = TestStatus.RUNNING
        start_time = time.time()

        try:
            # 根据动作类型执行
            if step.action == 'send':
                result = self._action_send(step.parameters)
            elif step.action == 'receive':
                result = self._action_receive(step.parameters)
            elif step.action == 'wait':
                result = self._action_wait(step.parameters)
            elif step.action == 'check':
                result = self._action_check(step.parameters)
            elif step.action == 'setup':
                result = self._action_setup(step.parameters)
            elif step.action == 'teardown':
                result = self._action_teardown(step.parameters)
            else:
                self.logger.error(f"未知动作类型: {step.action}")
                step.status = TestStatus.ERROR
                step.error_message = f"未知动作: {step.action}"
                return False

            step.actual_result = result

            # 验证结果
            if step.expected_result:
                if not self._verify_result(step.expected_result, result):
                    step.status = TestStatus.FAILED
                    step.error_message = "结果验证失败"
                    return False

            step.status = TestStatus.PASSED
            return True

        except Exception as e:
            step.status = TestStatus.ERROR
            step.error_message = str(e)
            self.logger.error(f"步骤执行异常: {e}")
            return False

        finally:
            step.duration = time.time() - start_time

    def _action_send(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """发送动作"""
        interface = params.get('interface')

        if interface == 'uart':
            data = params.get('data', '')
            encoding = params.get('encoding', 'utf-8')

            if isinstance(data, str):
                data_bytes = data.encode(encoding)
            else:
                data_bytes = bytes(data)

            sent = self.serial_interface.write(data_bytes)
            return {'sent_bytes': sent, 'status': 'success'}

        elif interface == 'can':
            can_id = params.get('can_id')
            data = params.get('data', [])
            is_extended = params.get('is_extended', False)

            success = self.can_interface.send(
                can_id, bytes(data), is_extended
            )
            return {'status': 'success' if success else 'failed'}

        else:
            raise ValueError(f"不支持的接口: {interface}")

    def _action_receive(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """接收动作"""
        interface = params.get('interface')
        timeout = params.get('timeout', 1.0)

        if interface == 'uart':
            data = self.serial_interface.read_line(timeout=timeout)
            return {
                'data': data,
                'length': len(data),
                'status': 'success' if data else 'timeout'
            }

        elif interface == 'can':
            msg = self.can_interface.receive(timeout=timeout)
            if msg:
                return {
                    'can_id': msg.arbitration_id,
                    'data': list(msg.data),
                    'data_length': len(msg.data),
                    'status': 'success'
                }
            else:
                return {'status': 'timeout'}

        else:
            raise ValueError(f"不支持的接口: {interface}")

    def _action_wait(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """等待动作"""
        duration = params.get('duration', 1.0)
        time.sleep(duration)
        return {'status': 'success'}

    def _action_check(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """检查动作"""
        check_type = params.get('check_type')

        if check_type == 'string_match':
            pattern = params.get('pattern')
            data = self.context.get('last_received_data', '')
            match = pattern in data
            return {'match': match, 'status': 'success'}

        elif check_type == 'data_match':
            expected = params.get('expected_data', [])
            actual = self.context.get('last_received_data', [])
            match = expected == actual
            return {'match': match, 'status': 'success'}

        else:
            raise ValueError(f"不支持的检查类型: {check_type}")

    def _action_setup(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """Setup动作"""
        # 在这里可以执行特定的setup操作
        return {'status': 'success'}

    def _action_teardown(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """Teardown动作"""
        # 在这里可以执行特定的teardown操作
        return {'status': 'success'}

    def _verify_result(self, expected: Dict[str, Any], 
                      actual: Dict[str, Any]) -> bool:
        """
        验证结果

        Args:
            expected: 期望结果
            actual: 实际结果

        Returns:
            bool: 验证通过返回True
        """
        for key, expected_value in expected.items():
            if key not in actual:
                self.logger.error(f"缺少字段: {key}")
                return False

            actual_value = actual[key]

            # 特殊处理
            if key == 'data_contains':
                if expected_value not in actual.get('data', ''):
                    self.logger.error(f"数据不包含: {expected_value}")
                    return False
            elif key == 'min_length':
                if actual.get('length', 0) < expected_value:
                    self.logger.error(f"长度不足: {actual.get('length')} < {expected_value}")
                    return False
            else:
                if actual_value != expected_value:
                    self.logger.error(
                        f"值不匹配: {key} = {actual_value}, 期望 {expected_value}"
                    )
                    return False

        return True

步骤5: 测试报告生成

5.1 报告生成器

HTML报告生成器 (report_generator.py):

import os
from datetime import datetime
from typing import List, Dict, Any
from pathlib import Path
import json

from models.test_case import TestCase, TestStatus

class ReportGenerator:
    """测试报告生成器"""

    def __init__(self, output_dir: str):
        """
        初始化报告生成器

        Args:
            output_dir: 报告输出目录
        """
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)

    def generate_html_report(self, test_cases: List[TestCase],
                            report_name: str = "test_report") -> str:
        """
        生成HTML测试报告

        Args:
            test_cases: 测试用例列表
            report_name: 报告名称

        Returns:
            str: 报告文件路径
        """
        # 统计信息
        stats = self._calculate_statistics(test_cases)

        # 生成HTML
        html_content = self._generate_html_content(test_cases, stats)

        # 保存文件
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"{report_name}_{timestamp}.html"
        filepath = self.output_dir / filename

        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(html_content)

        return str(filepath)

    def _calculate_statistics(self, test_cases: List[TestCase]) -> Dict[str, Any]:
        """计算统计信息"""
        total = len(test_cases)
        passed = sum(1 for tc in test_cases if tc.status == TestStatus.PASSED)
        failed = sum(1 for tc in test_cases if tc.status == TestStatus.FAILED)
        error = sum(1 for tc in test_cases if tc.status == TestStatus.ERROR)
        skipped = sum(1 for tc in test_cases if tc.status == TestStatus.SKIPPED)

        pass_rate = (passed / total * 100) if total > 0 else 0

        total_duration = sum(tc.duration for tc in test_cases)

        return {
            'total': total,
            'passed': passed,
            'failed': failed,
            'error': error,
            'skipped': skipped,
            'pass_rate': pass_rate,
            'total_duration': total_duration
        }

    def _generate_html_content(self, test_cases: List[TestCase],
                               stats: Dict[str, Any]) -> str:
        """生成HTML内容"""
        html = f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>HIL测试报告</title>
    <style>
        * {{
            margin: 0;
            padding: 0;
            box-sizing: border-box;
        }}

        body {{
            font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
            background-color: #f5f5f5;
            padding: 20px;
        }}

        .container {{
            max-width: 1200px;
            margin: 0 auto;
            background-color: white;
            padding: 30px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }}

        h1 {{
            color: #333;
            margin-bottom: 10px;
        }}

        .timestamp {{
            color: #666;
            margin-bottom: 30px;
        }}

        .summary {{
            display: grid;
            grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
            gap: 20px;
            margin-bottom: 30px;
        }}

        .stat-card {{
            padding: 20px;
            border-radius: 6px;
            text-align: center;
        }}

        .stat-card.total {{
            background-color: #e3f2fd;
            border-left: 4px solid #2196f3;
        }}

        .stat-card.passed {{
            background-color: #e8f5e9;
            border-left: 4px solid #4caf50;
        }}

        .stat-card.failed {{
            background-color: #ffebee;
            border-left: 4px solid #f44336;
        }}

        .stat-card.error {{
            background-color: #fff3e0;
            border-left: 4px solid #ff9800;
        }}

        .stat-number {{
            font-size: 32px;
            font-weight: bold;
            margin-bottom: 5px;
        }}

        .stat-label {{
            color: #666;
            font-size: 14px;
        }}

        .pass-rate {{
            font-size: 48px;
            font-weight: bold;
            color: #4caf50;
            text-align: center;
            margin: 30px 0;
        }}

        table {{
            width: 100%;
            border-collapse: collapse;
            margin-top: 20px;
        }}

        th, td {{
            padding: 12px;
            text-align: left;
            border-bottom: 1px solid #ddd;
        }}

        th {{
            background-color: #f5f5f5;
            font-weight: 600;
            color: #333;
        }}

        tr:hover {{
            background-color: #f9f9f9;
        }}

        .status {{
            padding: 4px 12px;
            border-radius: 12px;
            font-size: 12px;
            font-weight: 600;
            display: inline-block;
        }}

        .status.passed {{
            background-color: #4caf50;
            color: white;
        }}

        .status.failed {{
            background-color: #f44336;
            color: white;
        }}

        .status.error {{
            background-color: #ff9800;
            color: white;
        }}

        .status.skipped {{
            background-color: #9e9e9e;
            color: white;
        }}

        .priority {{
            padding: 4px 8px;
            border-radius: 4px;
            font-size: 11px;
            font-weight: 600;
        }}

        .priority.CRITICAL {{
            background-color: #f44336;
            color: white;
        }}

        .priority.HIGH {{
            background-color: #ff9800;
            color: white;
        }}

        .priority.MEDIUM {{
            background-color: #2196f3;
            color: white;
        }}

        .priority.LOW {{
            background-color: #9e9e9e;
            color: white;
        }}

        .error-message {{
            color: #f44336;
            font-size: 12px;
            margin-top: 4px;
        }}
    </style>
</head>
<body>
    <div class="container">
        <h1>HIL测试报告</h1>
        <div class="timestamp">生成时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</div>

        <div class="summary">
            <div class="stat-card total">
                <div class="stat-number">{stats['total']}</div>
                <div class="stat-label">总计</div>
            </div>
            <div class="stat-card passed">
                <div class="stat-number">{stats['passed']}</div>
                <div class="stat-label">通过</div>
            </div>
            <div class="stat-card failed">
                <div class="stat-number">{stats['failed']}</div>
                <div class="stat-label">失败</div>
            </div>
            <div class="stat-card error">
                <div class="stat-number">{stats['error']}</div>
                <div class="stat-label">错误</div>
            </div>
        </div>

        <div class="pass-rate">
            通过率: {stats['pass_rate']:.1f}%
        </div>

        <div style="text-align: center; color: #666; margin-bottom: 30px;">
            总耗时: {stats['total_duration']:.2f}
        </div>

        <h2>测试用例详情</h2>
        <table>
            <thead>
                <tr>
                    <th>测试ID</th>
                    <th>名称</th>
                    <th>类别</th>
                    <th>优先级</th>
                    <th>状态</th>
                    <th>耗时(秒)</th>
                </tr>
            </thead>
            <tbody>
"""

        # 添加测试用例行
        for tc in test_cases:
            status_class = tc.status.value
            priority_class = tc.priority.name

            error_msg = ""
            if tc.error_message:
                error_msg = f'<div class="error-message">{tc.error_message}</div>'

            html += f"""
                <tr>
                    <td>{tc.test_id}</td>
                    <td>
                        {tc.name}
                        {error_msg}
                    </td>
                    <td>{tc.category}</td>
                    <td><span class="priority {priority_class}">{priority_class}</span></td>
                    <td><span class="status {status_class}">{status_class.upper()}</span></td>
                    <td>{tc.duration:.2f}</td>
                </tr>
"""

        html += """
            </tbody>
        </table>
    </div>
</body>
</html>
"""

        return html

    def generate_json_report(self, test_cases: List[TestCase],
                            report_name: str = "test_report") -> str:
        """生成JSON格式报告"""
        stats = self._calculate_statistics(test_cases)

        report_data = {
            'timestamp': datetime.now().isoformat(),
            'statistics': stats,
            'test_cases': [tc.to_dict() for tc in test_cases]
        }

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"{report_name}_{timestamp}.json"
        filepath = self.output_dir / filename

        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(report_data, f, indent=2, ensure_ascii=False)

        return str(filepath)

步骤6: 主程序集成

6.1 主程序实现

主程序 (main.py):

import argparse
import logging
import sys
from pathlib import Path

from core.test_manager import TestCaseManager
from core.test_executor import TestExecutor
from utils.report_generator import ReportGenerator
from utils.logger import setup_logging

def main():
    """主函数"""
    # 解析命令行参数
    parser = argparse.ArgumentParser(description='HIL测试系统')
    parser.add_argument('--test-dir', default='tests/test_cases',
                       help='测试用例目录')
    parser.add_argument('--report-dir', default='reports',
                       help='报告输出目录')
    parser.add_argument('--config', default='config/test_config.yaml',
                       help='配置文件路径')
    parser.add_argument('--filter-category', help='按类别过滤测试')
    parser.add_argument('--filter-priority', help='按优先级过滤测试')
    parser.add_argument('--filter-tags', nargs='+', help='按标签过滤测试')
    parser.add_argument('--verbose', action='store_true',
                       help='详细输出')

    args = parser.parse_args()

    # 设置日志
    log_level = logging.DEBUG if args.verbose else logging.INFO
    setup_logging(log_level)
    logger = logging.getLogger(__name__)

    logger.info("=" * 60)
    logger.info("HIL测试系统启动")
    logger.info("=" * 60)

    try:
        # 加载配置
        config = load_config(args.config)

        # 初始化测试管理器
        test_manager = TestCaseManager(args.test_dir)
        test_count = test_manager.load_test_cases()

        if test_count == 0:
            logger.error("未找到测试用例")
            return 1

        # 过滤测试用例
        test_cases = test_manager.get_all_test_cases()

        if args.filter_category:
            test_cases = [tc for tc in test_cases 
                         if tc.category == args.filter_category]

        if args.filter_priority:
            from models.test_case import TestPriority
            priority = TestPriority[args.filter_priority.upper()]
            test_cases = [tc for tc in test_cases 
                         if tc.priority == priority]

        if args.filter_tags:
            test_cases = [tc for tc in test_cases 
                         if any(tag in tc.tags for tag in args.filter_tags)]

        logger.info(f"将执行 {len(test_cases)} 个测试用例")

        # 初始化测试执行器
        executor = TestExecutor(config)

        # 执行测试
        passed_count = 0
        failed_count = 0

        for i, test_case in enumerate(test_cases, 1):
            logger.info(f"\n[{i}/{len(test_cases)}] 执行测试: {test_case.test_id}")

            if executor.execute_test_case(test_case):
                passed_count += 1
            else:
                failed_count += 1

        # 生成报告
        logger.info("\n生成测试报告...")
        report_gen = ReportGenerator(args.report_dir)

        html_report = report_gen.generate_html_report(test_cases)
        logger.info(f"HTML报告: {html_report}")

        json_report = report_gen.generate_json_report(test_cases)
        logger.info(f"JSON报告: {json_report}")

        # 输出统计
        logger.info("\n" + "=" * 60)
        logger.info("测试完成")
        logger.info("=" * 60)
        logger.info(f"总计: {len(test_cases)}")
        logger.info(f"通过: {passed_count}")
        logger.info(f"失败: {failed_count}")
        logger.info(f"通过率: {passed_count/len(test_cases)*100:.1f}%")

        # 返回状态码
        return 0 if failed_count == 0 else 1

    except Exception as e:
        logger.error(f"程序异常: {e}", exc_info=True)
        return 1

def load_config(config_path: str) -> dict:
    """加载配置文件"""
    import yaml

    with open(config_path, 'r', encoding='utf-8') as f:
        config = yaml.safe_load(f)

    return config

if __name__ == '__main__':
    sys.exit(main())

6.2 配置文件

测试配置 (config/test_config.yaml):

# HIL测试系统配置

# 硬件配置
hardware:
  # 串口配置
  uart:
    port: "/dev/ttyUSB0"
    baudrate: 115200
    timeout: 1.0

  # CAN配置
  can:
    channel: "can0"
    bustype: "socketcan"
    bitrate: 500000

  # GPIO配置
  gpio:
    use_bcm: true

# 测试配置
test:
  # 默认超时(秒)
  default_timeout: 30.0

  # 失败重试次数
  retry_count: 1

  # 测试间隔(秒)
  test_interval: 1.0

  # 并行执行
  parallel: false
  max_workers: 4

# 日志配置
logging:
  level: "INFO"
  format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  file: "logs/hil_test.log"
  max_bytes: 10485760  # 10MB
  backup_count: 5

# 报告配置
report:
  output_dir: "reports"
  formats:
    - html
    - json
  include_screenshots: false

步骤7: CI/CD集成

7.1 GitHub Actions集成

GitHub Actions工作流 (.github/workflows/hil-test.yml):

name: HIL Testing

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]
  schedule:
    # 每天凌晨2点运行
    - cron: '0 2 * * *'

jobs:
  hil-test:
    runs-on: self-hosted  # 使用自托管运行器(连接硬件)

    steps:
    - name: Checkout代码
      uses: actions/checkout@v3

    - name: 设置Python环境
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'

    - name: 安装依赖
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt

    - name: 检查硬件连接
      run: |
        python scripts/check_hardware.py

    - name: 运行HIL测试
      run: |
        python main.py --verbose
      continue-on-error: false

    - name: 上传测试报告
      if: always()
      uses: actions/upload-artifact@v3
      with:
        name: test-reports
        path: reports/

    - name: 发送通知
      if: failure()
      uses: 8398a7/action-slack@v3
      with:
        status: ${{ job.status }}
        text: 'HIL测试失败'
        webhook_url: ${{ secrets.SLACK_WEBHOOK }}

7.2 Jenkins集成

Jenkinsfile:

pipeline {
    agent {
        label 'hil-test-node'  // 连接硬件的节点
    }

    parameters {
        choice(
            name: 'TEST_CATEGORY',
            choices: ['all', 'communication', 'control', 'safety'],
            description: '测试类别'
        )
        choice(
            name: 'TEST_PRIORITY',
            choices: ['all', 'CRITICAL', 'HIGH', 'MEDIUM', 'LOW'],
            description: '测试优先级'
        )
    }

    environment {
        PYTHON_ENV = 'venv'
    }

    stages {
        stage('准备环境') {
            steps {
                sh '''
                    python3 -m venv ${PYTHON_ENV}
                    . ${PYTHON_ENV}/bin/activate
                    pip install -r requirements.txt
                '''
            }
        }

        stage('检查硬件') {
            steps {
                sh '''
                    . ${PYTHON_ENV}/bin/activate
                    python scripts/check_hardware.py
                '''
            }
        }

        stage('运行测试') {
            steps {
                script {
                    def filterArgs = ""

                    if (params.TEST_CATEGORY != 'all') {
                        filterArgs += "--filter-category ${params.TEST_CATEGORY} "
                    }

                    if (params.TEST_PRIORITY != 'all') {
                        filterArgs += "--filter-priority ${params.TEST_PRIORITY} "
                    }

                    sh """
                        . ${PYTHON_ENV}/bin/activate
                        python main.py ${filterArgs} --verbose
                    """
                }
            }
        }

        stage('生成报告') {
            steps {
                publishHTML([
                    reportDir: 'reports',
                    reportFiles: '*.html',
                    reportName: 'HIL Test Report'
                ])
            }
        }
    }

    post {
        always {
            archiveArtifacts artifacts: 'reports/**/*', allowEmptyArchive: true
            archiveArtifacts artifacts: 'logs/**/*', allowEmptyArchive: true
        }

        failure {
            emailext (
                subject: "HIL测试失败: ${env.JOB_NAME} - ${env.BUILD_NUMBER}",
                body: """
                    <p>HIL测试执行失败</p>
                    <p>项目: ${env.JOB_NAME}</p>
                    <p>构建号: ${env.BUILD_NUMBER}</p>
                    <p>查看详情: ${env.BUILD_URL}</p>
                """,
                to: '${DEFAULT_RECIPIENTS}',
                mimeType: 'text/html'
            )
        }

        success {
            echo '测试全部通过!'
        }
    }
}

7.3 硬件检查脚本

硬件连接检查 (scripts/check_hardware.py):

#!/usr/bin/env python3
"""检查硬件连接状态"""

import sys
import serial
import serial.tools.list_ports

def check_serial_ports():
    """检查串口"""
    print("检查串口连接...")
    ports = list(serial.tools.list_ports.comports())

    if not ports:
        print("❌ 未找到串口设备")
        return False

    print(f"✓ 找到 {len(ports)} 个串口设备:")
    for port in ports:
        print(f"  - {port.device}: {port.description}")

    return True

def check_can_interface():
    """检查CAN接口"""
    print("\n检查CAN接口...")

    try:
        import can

        # 尝试打开CAN接口
        bus = can.Bus(channel='can0', bustype='socketcan', bitrate=500000)
        bus.shutdown()

        print("✓ CAN接口正常")
        return True

    except Exception as e:
        print(f"❌ CAN接口异常: {e}")
        return False

def check_gpio():
    """检查GPIO"""
    print("\n检查GPIO...")

    try:
        import RPi.GPIO as GPIO
        GPIO.setmode(GPIO.BCM)
        GPIO.setwarnings(False)
        GPIO.cleanup()

        print("✓ GPIO正常")
        return True

    except Exception as e:
        print(f"⚠ GPIO不可用: {e}")
        return True  # GPIO不是必需的

def main():
    """主函数"""
    print("=" * 50)
    print("HIL测试系统硬件检查")
    print("=" * 50)

    results = []

    results.append(check_serial_ports())
    results.append(check_can_interface())
    results.append(check_gpio())

    print("\n" + "=" * 50)

    if all(results):
        print("✓ 所有硬件检查通过")
        return 0
    else:
        print("❌ 部分硬件检查失败")
        return 1

if __name__ == '__main__':
    sys.exit(main())

故障排除

问题1: 串口无法打开

现象:

SerialException: could not open port /dev/ttyUSB0

原因: - 设备未连接 - 权限不足 - 端口被占用 - 驱动未安装

解决方法:

# 1. 检查设备是否存在
ls -l /dev/ttyUSB*

# 2. 添加用户到dialout组
sudo usermod -a -G dialout $USER
# 注销后重新登录

# 3. 检查端口占用
lsof /dev/ttyUSB0

# 4. 临时修改权限
sudo chmod 666 /dev/ttyUSB0

问题2: CAN接口初始化失败

现象:

CanError: Failed to create socket

原因: - CAN接口未启动 - 驱动未加载 - 波特率不匹配

解决方法:

# 1. 加载CAN驱动
sudo modprobe can
sudo modprobe can_raw
sudo modprobe vcan

# 2. 配置CAN接口
sudo ip link set can0 type can bitrate 500000
sudo ip link set can0 up

# 3. 检查状态
ip -details link show can0

# 4. 测试CAN通信
cansend can0 123#DEADBEEF
candump can0

问题3: 测试超时

现象: 测试用例执行超时,无响应

原因: - 设备未响应 - 超时设置过短 - 通信协议不匹配

解决方法:

  1. 增加超时时间

    # test_config.yaml
    test:
      default_timeout: 60.0  # 增加到60秒
    

  2. 添加调试日志

    # 在测试执行器中添加
    self.logger.debug(f"等待响应,超时: {timeout}秒")
    

  3. 检查设备状态

    # 添加设备健康检查
    def check_device_health(self):
        response = self.serial_interface.read_line(timeout=5.0)
        if not response:
            raise Exception("设备无响应")
    

问题4: 测试结果不稳定

现象: 同一测试用例有时通过,有时失败

原因: - 时序问题 - 资源竞争 - 环境干扰

解决方法:

  1. 添加延迟

    # 在关键步骤之间添加延迟
    time.sleep(0.1)
    

  2. 添加重试机制

    def execute_with_retry(self, test_case, max_retries=3):
        for attempt in range(max_retries):
            if self.execute_test_case(test_case):
                return True
            self.logger.warning(f"重试 {attempt + 1}/{max_retries}")
        return False
    

  3. 隔离测试环境

    # 每个测试前重置设备
    def reset_device(self):
        self.gpio_interface.pulse_pin(RESET_PIN, duration=0.5)
        time.sleep(2.0)  # 等待设备启动
    

最佳实践

测试设计原则

  1. 独立性
  2. 每个测试用例独立运行
  3. 不依赖其他测试的结果
  4. 使用setUp和tearDown确保环境一致

  5. 可重复性

  6. 测试结果应该可重复
  7. 避免依赖随机数或时间
  8. 使用固定的测试数据

  9. 清晰性

  10. 测试用例命名清晰
  11. 测试步骤描述详细
  12. 期望结果明确

  13. 完整性

  14. 覆盖正常场景
  15. 覆盖边界条件
  16. 覆盖异常情况

系统维护

  1. 定期校准
  2. 定期校准测试设备
  3. 验证测试基准
  4. 更新测试数据

  5. 版本管理

  6. 测试用例版本化
  7. 配置文件版本化
  8. 测试结果归档

  9. 文档更新

  10. 及时更新测试文档
  11. 记录已知问题
  12. 维护FAQ

  13. 性能优化

  14. 优化测试执行时间
  15. 并行执行测试
  16. 缓存测试数据

安全考虑

  1. 硬件保护
  2. 添加过流保护
  3. 添加过压保护
  4. 使用隔离电路

  5. 软件保护

  6. 输入验证
  7. 异常处理
  8. 超时保护

  9. 数据安全

  10. 敏感数据加密
  11. 访问权限控制
  12. 审计日志

项目扩展

扩展方向

  1. 支持更多协议
  2. Modbus
  3. Ethernet/IP
  4. OPC UA
  5. MQTT

  6. 增强数据分析

  7. 实时波形显示
  8. 统计分析
  9. 趋势预测
  10. 异常检测

  11. Web界面

  12. 测试用例管理
  13. 实时监控
  14. 报告查看
  15. 远程控制

  16. 分布式测试

  17. 多节点测试
  18. 负载均衡
  19. 结果聚合

Web界面示例

Flask Web服务 (web_server.py):

from flask import Flask, render_template, jsonify, request
from flask_socketio import SocketIO, emit
import threading

app = Flask(__name__)
socketio = SocketIO(app)

# 全局测试状态
test_status = {
    'running': False,
    'current_test': None,
    'progress': 0
}

@app.route('/')
def index():
    """主页"""
    return render_template('index.html')

@app.route('/api/tests')
def get_tests():
    """获取测试列表"""
    test_manager = TestCaseManager('tests/test_cases')
    test_manager.load_test_cases()

    tests = [tc.to_dict() for tc in test_manager.get_all_test_cases()]
    return jsonify(tests)

@app.route('/api/run', methods=['POST'])
def run_tests():
    """运行测试"""
    test_ids = request.json.get('test_ids', [])

    # 在后台线程运行测试
    thread = threading.Thread(
        target=run_tests_background,
        args=(test_ids,)
    )
    thread.start()

    return jsonify({'status': 'started'})

def run_tests_background(test_ids):
    """后台运行测试"""
    global test_status

    test_status['running'] = True

    for i, test_id in enumerate(test_ids):
        test_status['current_test'] = test_id
        test_status['progress'] = (i + 1) / len(test_ids) * 100

        # 发送进度更新
        socketio.emit('test_progress', test_status)

        # 执行测试...
        time.sleep(2)  # 模拟测试执行

    test_status['running'] = False
    socketio.emit('test_complete', {'status': 'success'})

if __name__ == '__main__':
    socketio.run(app, host='0.0.0.0', port=5000)

项目总结

项目成果

通过本项目,你已经构建了一个完整的HIL测试系统,包括:

硬件接口层 - 串口通信接口 - CAN总线接口 - GPIO控制接口 - 可扩展的接口架构

测试管理层 - 测试用例数据模型 - 测试用例管理器 - 灵活的过滤和查询

测试执行层 - 自动化测试执行 - 多种测试动作支持 - 结果验证机制

报告生成 - HTML格式报告 - JSON格式报告 - 详细的统计信息

CI/CD集成 - GitHub Actions支持 - Jenkins支持 - 自动化测试流程

关键技术点

  1. 模块化设计
  2. 清晰的层次结构
  3. 松耦合的模块
  4. 易于扩展和维护

  5. 硬件抽象

  6. 统一的接口定义
  7. 支持多种通信协议
  8. 模拟模式支持

  9. 测试自动化

  10. 声明式测试用例
  11. 自动化执行
  12. 结果自动验证

  13. 可扩展性

  14. 插件式架构
  15. 配置驱动
  16. 支持自定义扩展

实际应用建议

  1. 从简单开始
  2. 先实现基本功能
  3. 逐步添加复杂特性
  4. 持续优化改进

  5. 重视文档

  6. 编写详细的使用文档
  7. 记录设计决策
  8. 维护测试用例文档

  9. 持续改进

  10. 收集用户反馈
  11. 优化测试效率
  12. 增强系统稳定性

  13. 团队协作

  14. 建立测试规范
  15. 代码审查
  16. 知识分享

性能指标

典型性能: - 测试用例加载: < 1秒 - 单个测试执行: 5-30秒 - 报告生成: < 2秒 - 支持测试数量: 1000+

可靠性: - 测试成功率: > 99% - 系统稳定性: 24/7运行 - 错误恢复: 自动重试

成本分析

硬件成本: ¥5,000 - ¥20,000 - 基础配置: ¥5,000 - 标准配置: ¥10,000 - 高级配置: ¥20,000

开发成本: 约40-80人天 - 基础功能: 20人天 - 完整系统: 40人天 - 高级特性: 80人天

维护成本: 约5-10人天/月 - 日常维护: 5人天/月 - 功能扩展: 10人天/月

投资回报: - 减少人工测试时间: 70-90% - 提高测试覆盖率: 50-100% - 缩短开发周期: 20-40% - 提高产品质量: 显著提升

下一步学习

相关主题

建议继续学习以下内容:

  1. 测试理论
  2. 单元测试框架搭建
  3. 自动化测试集成
  4. 持续集成CI/CD实践

  5. 硬件接口

  6. 串口调试技巧大全
  7. CAN协议实战
  8. 逻辑分析仪使用入门

  9. 系统集成

  10. Docker化开发环境
  11. 微服务架构

进阶方向

  1. 实时系统测试
  2. RTOS测试方法
  3. 实时性能分析
  4. 时序验证

  5. 安全测试

  6. 渗透测试
  7. 模糊测试
  8. 安全审计

  9. 性能测试

  10. 压力测试
  11. 负载测试
  12. 基准测试

  13. AI辅助测试

  14. 智能测试用例生成
  15. 异常检测
  16. 预测性维护

参考资料

书籍推荐

  1. "Hardware-in-the-Loop Simulation" - Hanselmann
  2. HIL测试理论和实践
  3. 系统建模方法

  4. "Embedded Software Testing" - Broekman & Notenboom

  5. 嵌入式测试策略
  6. 测试用例设计

  7. "Continuous Delivery" - Humble & Farley

  8. 持续交付实践
  9. 自动化测试

在线资源

  1. 官方文档
  2. PySerial文档
  3. Python-CAN文档
  4. pytest文档

  5. 开源项目

  6. Robot Framework
  7. Automotive Test Framework

  8. 技术博客

  9. Embedded Artistry
  10. Interrupt Blog

工具和平台

  1. 商业工具
  2. dSPACE HIL系统
  3. NI LabVIEW
  4. Vector CANoe

  5. 开源工具

  6. pytest
  7. Robot Framework
  8. Jenkins

  9. 硬件平台

  10. 树莓派
  11. BeagleBone
  12. Arduino

练习项目

基础练习

  1. 串口回环测试
  2. 实现串口发送和接收
  3. 验证数据完整性
  4. 测试不同波特率

  5. CAN消息测试

  6. 发送CAN消息
  7. 接收并解析消息
  8. 实现过滤功能

  9. GPIO控制测试

  10. 控制LED闪烁
  11. 读取按键状态
  12. 实现PWM输出

进阶练习

  1. 协议测试
  2. 实现Modbus协议测试
  3. 测试协议时序
  4. 验证错误处理

  5. 性能测试

  6. 测试通信吞吐量
  7. 测试响应时间
  8. 分析性能瓶颈

  9. 压力测试

  10. 长时间运行测试
  11. 高频率消息测试
  12. 资源占用监控

综合项目

智能家居设备HIL测试系统

要求: - 支持WiFi、蓝牙、Zigbee通信 - 测试设备联动功能 - 模拟各种传感器输入 - 生成详细测试报告 - 集成到CI/CD流程


恭喜你完成了HIL测试系统项目!

这个系统将帮助你: - 提高测试效率 - 保证产品质量 - 加速开发进度 - 降低测试成本

继续探索和优化你的测试系统,祝你在嵌入式测试领域取得成功!