硬件在环(HIL)测试系统完整实现指南¶
项目概述¶
本项目将指导你构建一个完整的硬件在环(Hardware-in-Loop, HIL)测试系统,用于自动化测试嵌入式设备。HIL测试是一种将真实硬件与模拟环境相结合的测试方法,能够在接近真实工作条件下验证系统功能。
项目目标¶
完成本项目后,你将拥有:
- 完整的HIL测试架构设计
- 自动化测试平台实现
- 测试用例管理系统
- 实时数据采集和分析工具
- 测试报告生成系统
- CI/CD集成方案
- 可扩展的测试框架
项目特点¶
技术亮点: - 模块化架构设计 - 支持多种通信协议 - 实时数据采集和分析 - 自动化测试执行 - 详细的测试报告 - 易于扩展和维护
应用场景: - 汽车电子ECU测试 - 工业控制器验证 - 物联网设备测试 - 机器人系统测试 - 航空航天系统验证
前置要求¶
知识要求¶
必备知识: - 精通C/C++或Python编程 - 深入理解嵌入式系统 - 熟悉测试理论和方法 - 了解硬件接口(UART, SPI, I2C, CAN等) - 掌握通信协议
推荐知识: - 了解测试自动化框架 - 熟悉数据库操作 - 了解Web开发基础 - 掌握版本控制(Git) - 了解CI/CD流程
技能要求¶
- 能够设计系统架构
- 能够编写自动化脚本
- 能够调试硬件问题
- 能够分析测试数据
- 能够编写技术文档
准备工作¶
硬件准备¶
| 名称 | 数量 | 说明 | 参考价格 |
|---|---|---|---|
| 测试主机 | 1 | PC或工控机,运行测试软件 | ¥3000-10000 |
| 被测设备(DUT) | 1+ | 需要测试的嵌入式设备 | 根据项目 |
| 数据采集卡 | 1 | USB/PCIe数据采集卡 | ¥500-5000 |
| 串口转换器 | 2-4 | USB转UART/RS485 | ¥50-200 |
| CAN分析仪 | 1 | 用于CAN总线测试(可选) | ¥500-3000 |
| 逻辑分析仪 | 1 | 用于信号分析(可选) | ¥200-2000 |
| 电源 | 1-2 | 可调电源,用于供电测试 | ¥300-1000 |
| 继电器模块 | 1 | 用于电源控制 | ¥50-200 |
| 信号发生器 | 1 | 模拟传感器信号(可选) | ¥500-3000 |
软件准备¶
开发环境: - 操作系统: Linux (Ubuntu 20.04+) 或 Windows 10+ - 编程语言: Python 3.8+ 或 C++17 - IDE: VS Code, PyCharm 或 Visual Studio - 版本控制: Git
必需软件:
# Python环境
sudo apt install python3 python3-pip python3-venv
# 测试框架
pip install pytest pytest-html pytest-cov
# 串口通信
pip install pyserial
# 数据处理
pip install numpy pandas matplotlib
# Web界面(可选)
pip install flask flask-socketio
# 数据库(可选)
pip install sqlalchemy sqlite3
可选软件: - LabVIEW (用于复杂信号处理) - MATLAB (用于数据分析) - Jenkins (用于CI/CD) - Grafana (用于数据可视化)
系统要求¶
最低配置: - CPU: Intel i5 或同等性能 - 内存: 8GB RAM - 存储: 100GB 可用空间 - 网络: 以太网接口
推荐配置: - CPU: Intel i7 或更高 - 内存: 16GB+ RAM - 存储: 256GB SSD - 网络: 千兆以太网 - 显示: 双显示器
步骤1: 系统架构设计¶
1.1 HIL测试系统概述¶
什么是HIL测试?
硬件在环(Hardware-in-Loop)测试是一种将真实硬件与模拟环境相结合的测试方法。系统的一部分使用真实硬件,其他部分通过软件模拟。
HIL测试的优势: - 在安全环境中测试危险场景 - 降低测试成本 - 提高测试覆盖率 - 支持自动化测试 - 可重复性好 - 缩短开发周期
典型应用:
汽车ECU测试:
真实ECU ←→ HIL系统(模拟传感器、执行器、总线)
工业控制器测试:
真实PLC ←→ HIL系统(模拟工艺过程、I/O信号)
物联网设备测试:
真实设备 ←→ HIL系统(模拟云端、传感器、网络)
1.2 系统架构¶
整体架构图:
┌─────────────────────────────────────────────────────────┐
│ 测试管理层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │测试调度器│ │用例管理器│ │报告生成器│ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 测试执行层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │测试引擎 │ │数据采集 │ │结果验证 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 硬件抽象层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │串口驱动 │ │CAN驱动 │ │GPIO控制 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 硬件接口层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │数据采集卡│ │通信接口 │ │电源控制 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌──────────┐
│ 被测设备 │
│ (DUT) │
└──────────┘
1.3 核心模块设计¶
1. 测试管理模块
功能: - 测试用例管理 - 测试计划调度 - 测试执行控制 - 结果收集和报告
2. 硬件接口模块
功能: - 串口通信 - CAN总线通信 - GPIO控制 - 模拟信号输入/输出 - 数字信号输入/输出
3. 数据采集模块
功能: - 实时数据采集 - 数据缓存 - 数据预处理 - 时间戳管理
4. 测试执行模块
功能: - 测试用例执行 - 激励信号生成 - 响应数据采集 - 超时控制
5. 结果验证模块
功能: - 数据对比 - 阈值检查 - 时序验证 - 协议验证
6. 报告生成模块
功能: - 测试结果汇总 - 图表生成 - HTML/PDF报告 - 数据导出
1.4 技术选型¶
编程语言选择:
| 语言 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|
| Python | 开发快速、库丰富、易维护 | 性能较低 | 测试管理、数据分析 |
| C++ | 性能高、实时性好 | 开发复杂 | 实时数据采集、硬件控制 |
| LabVIEW | 图形化编程、硬件支持好 | 成本高、不开源 | 复杂信号处理 |
本项目选择: Python (主要) + C++ (性能关键部分)
通信协议: - UART/RS232/RS485: 通用串口通信 - CAN: 汽车和工业应用 - Ethernet: 高速数据传输 - USB: 设备连接
数据存储: - SQLite: 轻量级数据库 - CSV: 简单数据导出 - HDF5: 大量数据存储
测试框架: - pytest: Python测试框架 - unittest: Python标准库 - Robot Framework: 关键字驱动测试
1.5 目录结构设计¶
hil_test_system/
├── config/ # 配置文件
│ ├── hardware.yaml # 硬件配置
│ ├── test_config.yaml # 测试配置
│ └── logging.yaml # 日志配置
├── src/ # 源代码
│ ├── core/ # 核心模块
│ │ ├── test_manager.py
│ │ ├── test_executor.py
│ │ └── result_validator.py
│ ├── hardware/ # 硬件接口
│ │ ├── serial_interface.py
│ │ ├── can_interface.py
│ │ └── gpio_interface.py
│ ├── utils/ # 工具函数
│ │ ├── logger.py
│ │ ├── data_processor.py
│ │ └── report_generator.py
│ └── models/ # 数据模型
│ ├── test_case.py
│ └── test_result.py
├── tests/ # 测试用例
│ ├── test_cases/ # 具体测试用例
│ ├── test_data/ # 测试数据
│ └── test_scripts/ # 测试脚本
├── reports/ # 测试报告
├── logs/ # 日志文件
├── docs/ # 文档
├── requirements.txt # Python依赖
├── setup.py # 安装脚本
└── README.md # 项目说明
步骤2: 硬件接口层实现¶
2.1 串口通信接口¶
串口接口类设计 (serial_interface.py):
import serial
import threading
import queue
import time
from typing import Optional, Callable
import logging
class SerialInterface:
"""串口通信接口类"""
def __init__(self, port: str, baudrate: int = 115200,
timeout: float = 1.0):
"""
初始化串口接口
Args:
port: 串口名称 (如 'COM1' 或 '/dev/ttyUSB0')
baudrate: 波特率
timeout: 超时时间(秒)
"""
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.serial_port: Optional[serial.Serial] = None
self.is_open = False
# 接收线程相关
self.rx_thread: Optional[threading.Thread] = None
self.rx_queue = queue.Queue()
self.rx_callback: Optional[Callable] = None
self.running = False
self.logger = logging.getLogger(__name__)
def open(self) -> bool:
"""
打开串口
Returns:
bool: 成功返回True,失败返回False
"""
try:
self.serial_port = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=self.timeout
)
self.is_open = True
self.logger.info(f"串口 {self.port} 打开成功")
# 启动接收线程
self.running = True
self.rx_thread = threading.Thread(
target=self._rx_thread_func,
daemon=True
)
self.rx_thread.start()
return True
except serial.SerialException as e:
self.logger.error(f"打开串口失败: {e}")
return False
def close(self):
"""关闭串口"""
self.running = False
if self.rx_thread:
self.rx_thread.join(timeout=2.0)
if self.serial_port and self.is_open:
self.serial_port.close()
self.is_open = False
self.logger.info(f"串口 {self.port} 已关闭")
def write(self, data: bytes) -> int:
"""
发送数据
Args:
data: 要发送的字节数据
Returns:
int: 实际发送的字节数
"""
if not self.is_open or not self.serial_port:
self.logger.error("串口未打开")
return 0
try:
bytes_written = self.serial_port.write(data)
self.logger.debug(f"发送 {bytes_written} 字节: {data.hex()}")
return bytes_written
except serial.SerialException as e:
self.logger.error(f"发送数据失败: {e}")
return 0
def read(self, size: int = 1, timeout: Optional[float] = None) -> bytes:
"""
读取数据
Args:
size: 要读取的字节数
timeout: 超时时间(秒),None使用默认超时
Returns:
bytes: 读取到的数据
"""
if not self.is_open or not self.serial_port:
self.logger.error("串口未打开")
return b''
try:
if timeout is not None:
old_timeout = self.serial_port.timeout
self.serial_port.timeout = timeout
data = self.serial_port.read(size)
if timeout is not None:
self.serial_port.timeout = old_timeout
if data:
self.logger.debug(f"接收 {len(data)} 字节: {data.hex()}")
return data
except serial.SerialException as e:
self.logger.error(f"读取数据失败: {e}")
return b''
def read_line(self, timeout: Optional[float] = None) -> str:
"""
读取一行数据(以\\n结尾)
Args:
timeout: 超时时间(秒)
Returns:
str: 读取到的字符串
"""
if not self.is_open or not self.serial_port:
return ""
try:
if timeout is not None:
old_timeout = self.serial_port.timeout
self.serial_port.timeout = timeout
line = self.serial_port.readline().decode('utf-8', errors='ignore')
if timeout is not None:
self.serial_port.timeout = old_timeout
return line.strip()
except Exception as e:
self.logger.error(f"读取行失败: {e}")
return ""
def flush(self):
"""清空缓冲区"""
if self.serial_port and self.is_open:
self.serial_port.reset_input_buffer()
self.serial_port.reset_output_buffer()
def set_rx_callback(self, callback: Callable[[bytes], None]):
"""
设置接收回调函数
Args:
callback: 回调函数,参数为接收到的数据
"""
self.rx_callback = callback
def _rx_thread_func(self):
"""接收线程函数"""
while self.running and self.is_open:
try:
if self.serial_port.in_waiting > 0:
data = self.serial_port.read(self.serial_port.in_waiting)
# 放入队列
self.rx_queue.put(data)
# 调用回调
if self.rx_callback:
self.rx_callback(data)
else:
time.sleep(0.01) # 避免CPU占用过高
except Exception as e:
self.logger.error(f"接收线程错误: {e}")
time.sleep(0.1)
def get_rx_data(self, timeout: float = 0.1) -> Optional[bytes]:
"""
从接收队列获取数据
Args:
timeout: 超时时间(秒)
Returns:
bytes: 接收到的数据,超时返回None
"""
try:
return self.rx_queue.get(timeout=timeout)
except queue.Empty:
return None
2.2 CAN总线接口¶
CAN接口类设计 (can_interface.py):
import can
import threading
import queue
from typing import Optional, Callable, List
import logging
class CANInterface:
"""CAN总线接口类"""
def __init__(self, channel: str, bustype: str = 'socketcan',
bitrate: int = 500000):
"""
初始化CAN接口
Args:
channel: CAN通道 (如 'can0', 'PCAN_USBBUS1')
bustype: 总线类型 ('socketcan', 'pcan', 'vector'等)
bitrate: 波特率
"""
self.channel = channel
self.bustype = bustype
self.bitrate = bitrate
self.bus: Optional[can.Bus] = None
self.is_open = False
# 接收相关
self.rx_thread: Optional[threading.Thread] = None
self.rx_queue = queue.Queue()
self.rx_callback: Optional[Callable] = None
self.running = False
# 过滤器
self.filters: List[dict] = []
self.logger = logging.getLogger(__name__)
def open(self) -> bool:
"""打开CAN总线"""
try:
self.bus = can.Bus(
channel=self.channel,
bustype=self.bustype,
bitrate=self.bitrate
)
self.is_open = True
self.logger.info(f"CAN总线 {self.channel} 打开成功")
# 启动接收线程
self.running = True
self.rx_thread = threading.Thread(
target=self._rx_thread_func,
daemon=True
)
self.rx_thread.start()
return True
except can.CanError as e:
self.logger.error(f"打开CAN总线失败: {e}")
return False
def close(self):
"""关闭CAN总线"""
self.running = False
if self.rx_thread:
self.rx_thread.join(timeout=2.0)
if self.bus and self.is_open:
self.bus.shutdown()
self.is_open = False
self.logger.info(f"CAN总线 {self.channel} 已关闭")
def send(self, can_id: int, data: bytes,
is_extended: bool = False) -> bool:
"""
发送CAN消息
Args:
can_id: CAN ID
data: 数据(最多8字节)
is_extended: 是否为扩展帧
Returns:
bool: 成功返回True
"""
if not self.is_open or not self.bus:
self.logger.error("CAN总线未打开")
return False
if len(data) > 8:
self.logger.error("CAN数据长度不能超过8字节")
return False
try:
msg = can.Message(
arbitration_id=can_id,
data=data,
is_extended_id=is_extended
)
self.bus.send(msg)
self.logger.debug(f"发送CAN消息: ID=0x{can_id:X}, Data={data.hex()}")
return True
except can.CanError as e:
self.logger.error(f"发送CAN消息失败: {e}")
return False
def receive(self, timeout: float = 1.0) -> Optional[can.Message]:
"""
接收CAN消息
Args:
timeout: 超时时间(秒)
Returns:
can.Message: 接收到的消息,超时返回None
"""
if not self.is_open or not self.bus:
return None
try:
msg = self.bus.recv(timeout=timeout)
if msg:
self.logger.debug(
f"接收CAN消息: ID=0x{msg.arbitration_id:X}, "
f"Data={msg.data.hex()}"
)
return msg
except can.CanError as e:
self.logger.error(f"接收CAN消息失败: {e}")
return None
def set_filters(self, filters: List[dict]):
"""
设置CAN过滤器
Args:
filters: 过滤器列表
例如: [{"can_id": 0x123, "can_mask": 0x7FF}]
"""
self.filters = filters
if self.bus and self.is_open:
try:
self.bus.set_filters(filters)
self.logger.info(f"设置CAN过滤器: {filters}")
except can.CanError as e:
self.logger.error(f"设置过滤器失败: {e}")
def set_rx_callback(self, callback: Callable[[can.Message], None]):
"""设置接收回调函数"""
self.rx_callback = callback
def _rx_thread_func(self):
"""接收线程函数"""
while self.running and self.is_open:
try:
msg = self.bus.recv(timeout=0.1)
if msg:
# 放入队列
self.rx_queue.put(msg)
# 调用回调
if self.rx_callback:
self.rx_callback(msg)
except Exception as e:
self.logger.error(f"CAN接收线程错误: {e}")
def get_rx_message(self, timeout: float = 0.1) -> Optional[can.Message]:
"""从接收队列获取消息"""
try:
return self.rx_queue.get(timeout=timeout)
except queue.Empty:
return None
2.3 GPIO控制接口¶
GPIO接口类设计 (gpio_interface.py):
import time
from typing import Dict, Optional
import logging
try:
import RPi.GPIO as GPIO # 树莓派
HAS_RPI_GPIO = True
except ImportError:
HAS_RPI_GPIO = False
class GPIOInterface:
"""GPIO控制接口类"""
# GPIO模式
MODE_INPUT = 'input'
MODE_OUTPUT = 'output'
# 电平
LOW = 0
HIGH = 1
def __init__(self, use_bcm: bool = True):
"""
初始化GPIO接口
Args:
use_bcm: 使用BCM编号(True)或BOARD编号(False)
"""
self.use_bcm = use_bcm
self.pin_modes: Dict[int, str] = {}
self.is_initialized = False
self.logger = logging.getLogger(__name__)
if not HAS_RPI_GPIO:
self.logger.warning("RPi.GPIO未安装,使用模拟模式")
self.simulated = True
self.pin_states: Dict[int, int] = {}
else:
self.simulated = False
def initialize(self) -> bool:
"""初始化GPIO"""
if self.simulated:
self.is_initialized = True
return True
try:
if self.use_bcm:
GPIO.setmode(GPIO.BCM)
else:
GPIO.setmode(GPIO.BOARD)
GPIO.setwarnings(False)
self.is_initialized = True
self.logger.info("GPIO初始化成功")
return True
except Exception as e:
self.logger.error(f"GPIO初始化失败: {e}")
return False
def cleanup(self):
"""清理GPIO"""
if not self.simulated and self.is_initialized:
GPIO.cleanup()
self.is_initialized = False
self.logger.info("GPIO已清理")
def setup_pin(self, pin: int, mode: str,
pull_up_down: Optional[str] = None) -> bool:
"""
配置GPIO引脚
Args:
pin: 引脚编号
mode: 模式 ('input' 或 'output')
pull_up_down: 上拉/下拉 ('up', 'down', None)
Returns:
bool: 成功返回True
"""
if not self.is_initialized:
self.logger.error("GPIO未初始化")
return False
if self.simulated:
self.pin_modes[pin] = mode
if mode == self.MODE_OUTPUT:
self.pin_states[pin] = self.LOW
return True
try:
gpio_mode = GPIO.IN if mode == self.MODE_INPUT else GPIO.OUT
if pull_up_down:
if pull_up_down == 'up':
pud = GPIO.PUD_UP
elif pull_up_down == 'down':
pud = GPIO.PUD_DOWN
else:
pud = GPIO.PUD_OFF
GPIO.setup(pin, gpio_mode, pull_up_down=pud)
else:
GPIO.setup(pin, gpio_mode)
self.pin_modes[pin] = mode
self.logger.debug(f"配置引脚 {pin} 为 {mode}")
return True
except Exception as e:
self.logger.error(f"配置引脚失败: {e}")
return False
def write_pin(self, pin: int, value: int) -> bool:
"""
写GPIO引脚
Args:
pin: 引脚编号
value: 电平值 (0或1)
Returns:
bool: 成功返回True
"""
if pin not in self.pin_modes:
self.logger.error(f"引脚 {pin} 未配置")
return False
if self.pin_modes[pin] != self.MODE_OUTPUT:
self.logger.error(f"引脚 {pin} 不是输出模式")
return False
if self.simulated:
self.pin_states[pin] = value
self.logger.debug(f"写引脚 {pin} = {value}")
return True
try:
GPIO.output(pin, value)
self.logger.debug(f"写引脚 {pin} = {value}")
return True
except Exception as e:
self.logger.error(f"写引脚失败: {e}")
return False
def read_pin(self, pin: int) -> Optional[int]:
"""
读GPIO引脚
Args:
pin: 引脚编号
Returns:
int: 电平值 (0或1),失败返回None
"""
if pin not in self.pin_modes:
self.logger.error(f"引脚 {pin} 未配置")
return None
if self.simulated:
if self.pin_modes[pin] == self.MODE_OUTPUT:
return self.pin_states.get(pin, self.LOW)
else:
# 模拟输入,返回随机值或固定值
return self.LOW
try:
value = GPIO.input(pin)
self.logger.debug(f"读引脚 {pin} = {value}")
return value
except Exception as e:
self.logger.error(f"读引脚失败: {e}")
return None
def toggle_pin(self, pin: int) -> bool:
"""翻转GPIO引脚电平"""
current = self.read_pin(pin)
if current is None:
return False
new_value = self.HIGH if current == self.LOW else self.LOW
return self.write_pin(pin, new_value)
def pulse_pin(self, pin: int, duration: float = 0.1) -> bool:
"""
产生脉冲信号
Args:
pin: 引脚编号
duration: 脉冲持续时间(秒)
Returns:
bool: 成功返回True
"""
if not self.write_pin(pin, self.HIGH):
return False
time.sleep(duration)
return self.write_pin(pin, self.LOW)
步骤3: 测试用例管理¶
3.1 测试用例数据模型¶
测试用例类设计 (test_case.py):
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
import json
class TestStatus(Enum):
"""测试状态枚举"""
NOT_RUN = "not_run"
RUNNING = "running"
PASSED = "passed"
FAILED = "failed"
ERROR = "error"
SKIPPED = "skipped"
class TestPriority(Enum):
"""测试优先级"""
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class TestStep:
"""测试步骤"""
step_id: int
description: str
action: str # 动作类型: 'send', 'receive', 'wait', 'check'
parameters: Dict[str, Any]
expected_result: Optional[Dict[str, Any]] = None
actual_result: Optional[Dict[str, Any]] = None
status: TestStatus = TestStatus.NOT_RUN
duration: float = 0.0
error_message: str = ""
@dataclass
class TestCase:
"""测试用例"""
test_id: str
name: str
description: str
category: str
priority: TestPriority
steps: List[TestStep] = field(default_factory=list)
# 测试配置
timeout: float = 30.0
retry_count: int = 0
setup_required: bool = True
teardown_required: bool = True
# 测试结果
status: TestStatus = TestStatus.NOT_RUN
start_time: Optional[float] = None
end_time: Optional[float] = None
duration: float = 0.0
error_message: str = ""
# 元数据
tags: List[str] = field(default_factory=list)
author: str = ""
created_date: str = ""
modified_date: str = ""
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'test_id': self.test_id,
'name': self.name,
'description': self.description,
'category': self.category,
'priority': self.priority.name,
'steps': [
{
'step_id': step.step_id,
'description': step.description,
'action': step.action,
'parameters': step.parameters,
'expected_result': step.expected_result,
'actual_result': step.actual_result,
'status': step.status.value,
'duration': step.duration,
'error_message': step.error_message
}
for step in self.steps
],
'timeout': self.timeout,
'retry_count': self.retry_count,
'status': self.status.value,
'start_time': self.start_time,
'end_time': self.end_time,
'duration': self.duration,
'error_message': self.error_message,
'tags': self.tags,
'author': self.author
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TestCase':
"""从字典创建"""
steps = [
TestStep(
step_id=s['step_id'],
description=s['description'],
action=s['action'],
parameters=s['parameters'],
expected_result=s.get('expected_result'),
actual_result=s.get('actual_result'),
status=TestStatus(s.get('status', 'not_run')),
duration=s.get('duration', 0.0),
error_message=s.get('error_message', '')
)
for s in data.get('steps', [])
]
return cls(
test_id=data['test_id'],
name=data['name'],
description=data['description'],
category=data['category'],
priority=TestPriority[data['priority']],
steps=steps,
timeout=data.get('timeout', 30.0),
retry_count=data.get('retry_count', 0),
status=TestStatus(data.get('status', 'not_run')),
start_time=data.get('start_time'),
end_time=data.get('end_time'),
duration=data.get('duration', 0.0),
error_message=data.get('error_message', ''),
tags=data.get('tags', []),
author=data.get('author', '')
)
def to_json(self) -> str:
"""转换为JSON字符串"""
return json.dumps(self.to_dict(), indent=2)
@classmethod
def from_json(cls, json_str: str) -> 'TestCase':
"""从JSON字符串创建"""
data = json.loads(json_str)
return cls.from_dict(data)
3.2 测试用例管理器¶
测试用例管理器 (test_manager.py):
import os
import json
import yaml
from typing import List, Dict, Optional
from pathlib import Path
import logging
from models.test_case import TestCase, TestStatus, TestPriority
class TestCaseManager:
"""测试用例管理器"""
def __init__(self, test_case_dir: str):
"""
初始化测试用例管理器
Args:
test_case_dir: 测试用例目录
"""
self.test_case_dir = Path(test_case_dir)
self.test_cases: Dict[str, TestCase] = {}
self.logger = logging.getLogger(__name__)
# 确保目录存在
self.test_case_dir.mkdir(parents=True, exist_ok=True)
def load_test_cases(self) -> int:
"""
加载所有测试用例
Returns:
int: 加载的测试用例数量
"""
count = 0
# 支持JSON和YAML格式
for ext in ['*.json', '*.yaml', '*.yml']:
for file_path in self.test_case_dir.glob(ext):
try:
test_case = self._load_test_case_file(file_path)
if test_case:
self.test_cases[test_case.test_id] = test_case
count += 1
self.logger.info(f"加载测试用例: {test_case.test_id}")
except Exception as e:
self.logger.error(f"加载测试用例失败 {file_path}: {e}")
self.logger.info(f"共加载 {count} 个测试用例")
return count
def _load_test_case_file(self, file_path: Path) -> Optional[TestCase]:
"""加载单个测试用例文件"""
with open(file_path, 'r', encoding='utf-8') as f:
if file_path.suffix == '.json':
data = json.load(f)
else: # YAML
data = yaml.safe_load(f)
return TestCase.from_dict(data)
def save_test_case(self, test_case: TestCase,
format: str = 'json') -> bool:
"""
保存测试用例
Args:
test_case: 测试用例对象
format: 保存格式 ('json' 或 'yaml')
Returns:
bool: 成功返回True
"""
try:
ext = '.json' if format == 'json' else '.yaml'
file_path = self.test_case_dir / f"{test_case.test_id}{ext}"
with open(file_path, 'w', encoding='utf-8') as f:
if format == 'json':
json.dump(test_case.to_dict(), f, indent=2)
else:
yaml.dump(test_case.to_dict(), f,
default_flow_style=False)
self.logger.info(f"保存测试用例: {test_case.test_id}")
return True
except Exception as e:
self.logger.error(f"保存测试用例失败: {e}")
return False
def get_test_case(self, test_id: str) -> Optional[TestCase]:
"""获取测试用例"""
return self.test_cases.get(test_id)
def get_all_test_cases(self) -> List[TestCase]:
"""获取所有测试用例"""
return list(self.test_cases.values())
def filter_test_cases(self,
category: Optional[str] = None,
priority: Optional[TestPriority] = None,
tags: Optional[List[str]] = None,
status: Optional[TestStatus] = None) -> List[TestCase]:
"""
过滤测试用例
Args:
category: 类别
priority: 优先级
tags: 标签列表
status: 状态
Returns:
List[TestCase]: 符合条件的测试用例列表
"""
filtered = self.get_all_test_cases()
if category:
filtered = [tc for tc in filtered if tc.category == category]
if priority:
filtered = [tc for tc in filtered if tc.priority == priority]
if tags:
filtered = [
tc for tc in filtered
if any(tag in tc.tags for tag in tags)
]
if status:
filtered = [tc for tc in filtered if tc.status == status]
return filtered
def get_statistics(self) -> Dict[str, Any]:
"""
获取测试用例统计信息
Returns:
Dict: 统计信息
"""
total = len(self.test_cases)
status_count = {}
for status in TestStatus:
count = len([
tc for tc in self.test_cases.values()
if tc.status == status
])
status_count[status.value] = count
priority_count = {}
for priority in TestPriority:
count = len([
tc for tc in self.test_cases.values()
if tc.priority == priority
])
priority_count[priority.name] = count
category_count = {}
for tc in self.test_cases.values():
category_count[tc.category] = category_count.get(tc.category, 0) + 1
return {
'total': total,
'by_status': status_count,
'by_priority': priority_count,
'by_category': category_count
}
3.3 测试用例示例¶
串口通信测试用例 (test_uart_communication.yaml):
test_id: "UART_001"
name: "串口基本通信测试"
description: "测试串口发送和接收功能"
category: "communication"
priority: "HIGH"
timeout: 10.0
retry_count: 1
tags:
- uart
- communication
- basic
author: "测试工程师"
created_date: "2024-01-15"
steps:
- step_id: 1
description: "打开串口"
action: "setup"
parameters:
port: "/dev/ttyUSB0"
baudrate: 115200
expected_result:
status: "success"
- step_id: 2
description: "发送测试命令"
action: "send"
parameters:
interface: "uart"
data: "AT\\r\\n"
encoding: "ascii"
expected_result:
sent_bytes: 4
- step_id: 3
description: "等待响应"
action: "wait"
parameters:
duration: 1.0
- step_id: 4
description: "接收响应"
action: "receive"
parameters:
interface: "uart"
timeout: 2.0
expected_result:
data_contains: "OK"
min_length: 2
- step_id: 5
description: "验证响应"
action: "check"
parameters:
check_type: "string_match"
pattern: "OK"
expected_result:
match: true
- step_id: 6
description: "关闭串口"
action: "teardown"
parameters:
interface: "uart"
expected_result:
status: "success"
CAN总线测试用例 (test_can_communication.yaml):
test_id: "CAN_001"
name: "CAN总线消息收发测试"
description: "测试CAN总线发送和接收功能"
category: "communication"
priority: "HIGH"
timeout: 15.0
tags:
- can
- communication
author: "测试工程师"
steps:
- step_id: 1
description: "初始化CAN总线"
action: "setup"
parameters:
channel: "can0"
bitrate: 500000
expected_result:
status: "success"
- step_id: 2
description: "发送CAN消息"
action: "send"
parameters:
interface: "can"
can_id: 0x123
data: [0x01, 0x02, 0x03, 0x04]
is_extended: false
expected_result:
status: "success"
- step_id: 3
description: "接收CAN响应"
action: "receive"
parameters:
interface: "can"
timeout: 3.0
filter_id: 0x456
expected_result:
can_id: 0x456
data_length: 8
- step_id: 4
description: "验证数据"
action: "check"
parameters:
check_type: "data_match"
expected_data: [0x05, 0x06, 0x07, 0x08, 0x00, 0x00, 0x00, 0x00]
expected_result:
match: true
步骤4: 测试执行引擎¶
4.1 测试执行器设计¶
测试执行器 (test_executor.py):
import time
from typing import Dict, Any, Optional
import logging
from models.test_case import TestCase, TestStep, TestStatus
from hardware.serial_interface import SerialInterface
from hardware.can_interface import CANInterface
from hardware.gpio_interface import GPIOInterface
class TestExecutor:
"""测试执行器"""
def __init__(self, config: Dict[str, Any]):
"""
初始化测试执行器
Args:
config: 配置字典
"""
self.config = config
self.logger = logging.getLogger(__name__)
# 硬件接口
self.serial_interface: Optional[SerialInterface] = None
self.can_interface: Optional[CANInterface] = None
self.gpio_interface: Optional[GPIOInterface] = None
# 测试上下文
self.context: Dict[str, Any] = {}
def execute_test_case(self, test_case: TestCase) -> bool:
"""
执行测试用例
Args:
test_case: 测试用例对象
Returns:
bool: 测试通过返回True
"""
self.logger.info(f"开始执行测试: {test_case.test_id} - {test_case.name}")
test_case.status = TestStatus.RUNNING
test_case.start_time = time.time()
try:
# 执行setup
if test_case.setup_required:
if not self._execute_setup(test_case):
test_case.status = TestStatus.ERROR
test_case.error_message = "Setup失败"
return False
# 执行测试步骤
for step in test_case.steps:
if not self._execute_step(step):
test_case.status = TestStatus.FAILED
test_case.error_message = f"步骤 {step.step_id} 失败: {step.error_message}"
return False
# 所有步骤通过
test_case.status = TestStatus.PASSED
self.logger.info(f"测试通过: {test_case.test_id}")
return True
except Exception as e:
test_case.status = TestStatus.ERROR
test_case.error_message = str(e)
self.logger.error(f"测试执行异常: {e}")
return False
finally:
# 执行teardown
if test_case.teardown_required:
self._execute_teardown(test_case)
test_case.end_time = time.time()
test_case.duration = test_case.end_time - test_case.start_time
self.logger.info(f"测试完成,耗时: {test_case.duration:.2f}秒")
def _execute_setup(self, test_case: TestCase) -> bool:
"""执行测试准备"""
self.logger.debug("执行Setup")
# 初始化硬件接口
if 'uart' in test_case.tags:
self.serial_interface = SerialInterface(
port=self.config.get('uart_port', '/dev/ttyUSB0'),
baudrate=self.config.get('uart_baudrate', 115200)
)
if not self.serial_interface.open():
return False
if 'can' in test_case.tags:
self.can_interface = CANInterface(
channel=self.config.get('can_channel', 'can0'),
bitrate=self.config.get('can_bitrate', 500000)
)
if not self.can_interface.open():
return False
if 'gpio' in test_case.tags:
self.gpio_interface = GPIOInterface()
if not self.gpio_interface.initialize():
return False
return True
def _execute_teardown(self, test_case: TestCase):
"""执行测试清理"""
self.logger.debug("执行Teardown")
if self.serial_interface:
self.serial_interface.close()
self.serial_interface = None
if self.can_interface:
self.can_interface.close()
self.can_interface = None
if self.gpio_interface:
self.gpio_interface.cleanup()
self.gpio_interface = None
def _execute_step(self, step: TestStep) -> bool:
"""
执行单个测试步骤
Args:
step: 测试步骤对象
Returns:
bool: 步骤通过返回True
"""
self.logger.info(f"执行步骤 {step.step_id}: {step.description}")
step.status = TestStatus.RUNNING
start_time = time.time()
try:
# 根据动作类型执行
if step.action == 'send':
result = self._action_send(step.parameters)
elif step.action == 'receive':
result = self._action_receive(step.parameters)
elif step.action == 'wait':
result = self._action_wait(step.parameters)
elif step.action == 'check':
result = self._action_check(step.parameters)
elif step.action == 'setup':
result = self._action_setup(step.parameters)
elif step.action == 'teardown':
result = self._action_teardown(step.parameters)
else:
self.logger.error(f"未知动作类型: {step.action}")
step.status = TestStatus.ERROR
step.error_message = f"未知动作: {step.action}"
return False
step.actual_result = result
# 验证结果
if step.expected_result:
if not self._verify_result(step.expected_result, result):
step.status = TestStatus.FAILED
step.error_message = "结果验证失败"
return False
step.status = TestStatus.PASSED
return True
except Exception as e:
step.status = TestStatus.ERROR
step.error_message = str(e)
self.logger.error(f"步骤执行异常: {e}")
return False
finally:
step.duration = time.time() - start_time
def _action_send(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""发送动作"""
interface = params.get('interface')
if interface == 'uart':
data = params.get('data', '')
encoding = params.get('encoding', 'utf-8')
if isinstance(data, str):
data_bytes = data.encode(encoding)
else:
data_bytes = bytes(data)
sent = self.serial_interface.write(data_bytes)
return {'sent_bytes': sent, 'status': 'success'}
elif interface == 'can':
can_id = params.get('can_id')
data = params.get('data', [])
is_extended = params.get('is_extended', False)
success = self.can_interface.send(
can_id, bytes(data), is_extended
)
return {'status': 'success' if success else 'failed'}
else:
raise ValueError(f"不支持的接口: {interface}")
def _action_receive(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""接收动作"""
interface = params.get('interface')
timeout = params.get('timeout', 1.0)
if interface == 'uart':
data = self.serial_interface.read_line(timeout=timeout)
return {
'data': data,
'length': len(data),
'status': 'success' if data else 'timeout'
}
elif interface == 'can':
msg = self.can_interface.receive(timeout=timeout)
if msg:
return {
'can_id': msg.arbitration_id,
'data': list(msg.data),
'data_length': len(msg.data),
'status': 'success'
}
else:
return {'status': 'timeout'}
else:
raise ValueError(f"不支持的接口: {interface}")
def _action_wait(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""等待动作"""
duration = params.get('duration', 1.0)
time.sleep(duration)
return {'status': 'success'}
def _action_check(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""检查动作"""
check_type = params.get('check_type')
if check_type == 'string_match':
pattern = params.get('pattern')
data = self.context.get('last_received_data', '')
match = pattern in data
return {'match': match, 'status': 'success'}
elif check_type == 'data_match':
expected = params.get('expected_data', [])
actual = self.context.get('last_received_data', [])
match = expected == actual
return {'match': match, 'status': 'success'}
else:
raise ValueError(f"不支持的检查类型: {check_type}")
def _action_setup(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Setup动作"""
# 在这里可以执行特定的setup操作
return {'status': 'success'}
def _action_teardown(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Teardown动作"""
# 在这里可以执行特定的teardown操作
return {'status': 'success'}
def _verify_result(self, expected: Dict[str, Any],
actual: Dict[str, Any]) -> bool:
"""
验证结果
Args:
expected: 期望结果
actual: 实际结果
Returns:
bool: 验证通过返回True
"""
for key, expected_value in expected.items():
if key not in actual:
self.logger.error(f"缺少字段: {key}")
return False
actual_value = actual[key]
# 特殊处理
if key == 'data_contains':
if expected_value not in actual.get('data', ''):
self.logger.error(f"数据不包含: {expected_value}")
return False
elif key == 'min_length':
if actual.get('length', 0) < expected_value:
self.logger.error(f"长度不足: {actual.get('length')} < {expected_value}")
return False
else:
if actual_value != expected_value:
self.logger.error(
f"值不匹配: {key} = {actual_value}, 期望 {expected_value}"
)
return False
return True
步骤5: 测试报告生成¶
5.1 报告生成器¶
HTML报告生成器 (report_generator.py):
import os
from datetime import datetime
from typing import List, Dict, Any
from pathlib import Path
import json
from models.test_case import TestCase, TestStatus
class ReportGenerator:
"""测试报告生成器"""
def __init__(self, output_dir: str):
"""
初始化报告生成器
Args:
output_dir: 报告输出目录
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def generate_html_report(self, test_cases: List[TestCase],
report_name: str = "test_report") -> str:
"""
生成HTML测试报告
Args:
test_cases: 测试用例列表
report_name: 报告名称
Returns:
str: 报告文件路径
"""
# 统计信息
stats = self._calculate_statistics(test_cases)
# 生成HTML
html_content = self._generate_html_content(test_cases, stats)
# 保存文件
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{report_name}_{timestamp}.html"
filepath = self.output_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
f.write(html_content)
return str(filepath)
def _calculate_statistics(self, test_cases: List[TestCase]) -> Dict[str, Any]:
"""计算统计信息"""
total = len(test_cases)
passed = sum(1 for tc in test_cases if tc.status == TestStatus.PASSED)
failed = sum(1 for tc in test_cases if tc.status == TestStatus.FAILED)
error = sum(1 for tc in test_cases if tc.status == TestStatus.ERROR)
skipped = sum(1 for tc in test_cases if tc.status == TestStatus.SKIPPED)
pass_rate = (passed / total * 100) if total > 0 else 0
total_duration = sum(tc.duration for tc in test_cases)
return {
'total': total,
'passed': passed,
'failed': failed,
'error': error,
'skipped': skipped,
'pass_rate': pass_rate,
'total_duration': total_duration
}
def _generate_html_content(self, test_cases: List[TestCase],
stats: Dict[str, Any]) -> str:
"""生成HTML内容"""
html = f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>HIL测试报告</title>
<style>
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
}}
body {{
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background-color: #f5f5f5;
padding: 20px;
}}
.container {{
max-width: 1200px;
margin: 0 auto;
background-color: white;
padding: 30px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}}
h1 {{
color: #333;
margin-bottom: 10px;
}}
.timestamp {{
color: #666;
margin-bottom: 30px;
}}
.summary {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
gap: 20px;
margin-bottom: 30px;
}}
.stat-card {{
padding: 20px;
border-radius: 6px;
text-align: center;
}}
.stat-card.total {{
background-color: #e3f2fd;
border-left: 4px solid #2196f3;
}}
.stat-card.passed {{
background-color: #e8f5e9;
border-left: 4px solid #4caf50;
}}
.stat-card.failed {{
background-color: #ffebee;
border-left: 4px solid #f44336;
}}
.stat-card.error {{
background-color: #fff3e0;
border-left: 4px solid #ff9800;
}}
.stat-number {{
font-size: 32px;
font-weight: bold;
margin-bottom: 5px;
}}
.stat-label {{
color: #666;
font-size: 14px;
}}
.pass-rate {{
font-size: 48px;
font-weight: bold;
color: #4caf50;
text-align: center;
margin: 30px 0;
}}
table {{
width: 100%;
border-collapse: collapse;
margin-top: 20px;
}}
th, td {{
padding: 12px;
text-align: left;
border-bottom: 1px solid #ddd;
}}
th {{
background-color: #f5f5f5;
font-weight: 600;
color: #333;
}}
tr:hover {{
background-color: #f9f9f9;
}}
.status {{
padding: 4px 12px;
border-radius: 12px;
font-size: 12px;
font-weight: 600;
display: inline-block;
}}
.status.passed {{
background-color: #4caf50;
color: white;
}}
.status.failed {{
background-color: #f44336;
color: white;
}}
.status.error {{
background-color: #ff9800;
color: white;
}}
.status.skipped {{
background-color: #9e9e9e;
color: white;
}}
.priority {{
padding: 4px 8px;
border-radius: 4px;
font-size: 11px;
font-weight: 600;
}}
.priority.CRITICAL {{
background-color: #f44336;
color: white;
}}
.priority.HIGH {{
background-color: #ff9800;
color: white;
}}
.priority.MEDIUM {{
background-color: #2196f3;
color: white;
}}
.priority.LOW {{
background-color: #9e9e9e;
color: white;
}}
.error-message {{
color: #f44336;
font-size: 12px;
margin-top: 4px;
}}
</style>
</head>
<body>
<div class="container">
<h1>HIL测试报告</h1>
<div class="timestamp">生成时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</div>
<div class="summary">
<div class="stat-card total">
<div class="stat-number">{stats['total']}</div>
<div class="stat-label">总计</div>
</div>
<div class="stat-card passed">
<div class="stat-number">{stats['passed']}</div>
<div class="stat-label">通过</div>
</div>
<div class="stat-card failed">
<div class="stat-number">{stats['failed']}</div>
<div class="stat-label">失败</div>
</div>
<div class="stat-card error">
<div class="stat-number">{stats['error']}</div>
<div class="stat-label">错误</div>
</div>
</div>
<div class="pass-rate">
通过率: {stats['pass_rate']:.1f}%
</div>
<div style="text-align: center; color: #666; margin-bottom: 30px;">
总耗时: {stats['total_duration']:.2f} 秒
</div>
<h2>测试用例详情</h2>
<table>
<thead>
<tr>
<th>测试ID</th>
<th>名称</th>
<th>类别</th>
<th>优先级</th>
<th>状态</th>
<th>耗时(秒)</th>
</tr>
</thead>
<tbody>
"""
# 添加测试用例行
for tc in test_cases:
status_class = tc.status.value
priority_class = tc.priority.name
error_msg = ""
if tc.error_message:
error_msg = f'<div class="error-message">{tc.error_message}</div>'
html += f"""
<tr>
<td>{tc.test_id}</td>
<td>
{tc.name}
{error_msg}
</td>
<td>{tc.category}</td>
<td><span class="priority {priority_class}">{priority_class}</span></td>
<td><span class="status {status_class}">{status_class.upper()}</span></td>
<td>{tc.duration:.2f}</td>
</tr>
"""
html += """
</tbody>
</table>
</div>
</body>
</html>
"""
return html
def generate_json_report(self, test_cases: List[TestCase],
report_name: str = "test_report") -> str:
"""生成JSON格式报告"""
stats = self._calculate_statistics(test_cases)
report_data = {
'timestamp': datetime.now().isoformat(),
'statistics': stats,
'test_cases': [tc.to_dict() for tc in test_cases]
}
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{report_name}_{timestamp}.json"
filepath = self.output_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
return str(filepath)
步骤6: 主程序集成¶
6.1 主程序实现¶
主程序 (main.py):
import argparse
import logging
import sys
from pathlib import Path
from core.test_manager import TestCaseManager
from core.test_executor import TestExecutor
from utils.report_generator import ReportGenerator
from utils.logger import setup_logging
def main():
"""主函数"""
# 解析命令行参数
parser = argparse.ArgumentParser(description='HIL测试系统')
parser.add_argument('--test-dir', default='tests/test_cases',
help='测试用例目录')
parser.add_argument('--report-dir', default='reports',
help='报告输出目录')
parser.add_argument('--config', default='config/test_config.yaml',
help='配置文件路径')
parser.add_argument('--filter-category', help='按类别过滤测试')
parser.add_argument('--filter-priority', help='按优先级过滤测试')
parser.add_argument('--filter-tags', nargs='+', help='按标签过滤测试')
parser.add_argument('--verbose', action='store_true',
help='详细输出')
args = parser.parse_args()
# 设置日志
log_level = logging.DEBUG if args.verbose else logging.INFO
setup_logging(log_level)
logger = logging.getLogger(__name__)
logger.info("=" * 60)
logger.info("HIL测试系统启动")
logger.info("=" * 60)
try:
# 加载配置
config = load_config(args.config)
# 初始化测试管理器
test_manager = TestCaseManager(args.test_dir)
test_count = test_manager.load_test_cases()
if test_count == 0:
logger.error("未找到测试用例")
return 1
# 过滤测试用例
test_cases = test_manager.get_all_test_cases()
if args.filter_category:
test_cases = [tc for tc in test_cases
if tc.category == args.filter_category]
if args.filter_priority:
from models.test_case import TestPriority
priority = TestPriority[args.filter_priority.upper()]
test_cases = [tc for tc in test_cases
if tc.priority == priority]
if args.filter_tags:
test_cases = [tc for tc in test_cases
if any(tag in tc.tags for tag in args.filter_tags)]
logger.info(f"将执行 {len(test_cases)} 个测试用例")
# 初始化测试执行器
executor = TestExecutor(config)
# 执行测试
passed_count = 0
failed_count = 0
for i, test_case in enumerate(test_cases, 1):
logger.info(f"\n[{i}/{len(test_cases)}] 执行测试: {test_case.test_id}")
if executor.execute_test_case(test_case):
passed_count += 1
else:
failed_count += 1
# 生成报告
logger.info("\n生成测试报告...")
report_gen = ReportGenerator(args.report_dir)
html_report = report_gen.generate_html_report(test_cases)
logger.info(f"HTML报告: {html_report}")
json_report = report_gen.generate_json_report(test_cases)
logger.info(f"JSON报告: {json_report}")
# 输出统计
logger.info("\n" + "=" * 60)
logger.info("测试完成")
logger.info("=" * 60)
logger.info(f"总计: {len(test_cases)}")
logger.info(f"通过: {passed_count}")
logger.info(f"失败: {failed_count}")
logger.info(f"通过率: {passed_count/len(test_cases)*100:.1f}%")
# 返回状态码
return 0 if failed_count == 0 else 1
except Exception as e:
logger.error(f"程序异常: {e}", exc_info=True)
return 1
def load_config(config_path: str) -> dict:
"""加载配置文件"""
import yaml
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config
if __name__ == '__main__':
sys.exit(main())
6.2 配置文件¶
测试配置 (config/test_config.yaml):
# HIL测试系统配置
# 硬件配置
hardware:
# 串口配置
uart:
port: "/dev/ttyUSB0"
baudrate: 115200
timeout: 1.0
# CAN配置
can:
channel: "can0"
bustype: "socketcan"
bitrate: 500000
# GPIO配置
gpio:
use_bcm: true
# 测试配置
test:
# 默认超时(秒)
default_timeout: 30.0
# 失败重试次数
retry_count: 1
# 测试间隔(秒)
test_interval: 1.0
# 并行执行
parallel: false
max_workers: 4
# 日志配置
logging:
level: "INFO"
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
file: "logs/hil_test.log"
max_bytes: 10485760 # 10MB
backup_count: 5
# 报告配置
report:
output_dir: "reports"
formats:
- html
- json
include_screenshots: false
步骤7: CI/CD集成¶
7.1 GitHub Actions集成¶
GitHub Actions工作流 (.github/workflows/hil-test.yml):
name: HIL Testing
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
schedule:
# 每天凌晨2点运行
- cron: '0 2 * * *'
jobs:
hil-test:
runs-on: self-hosted # 使用自托管运行器(连接硬件)
steps:
- name: Checkout代码
uses: actions/checkout@v3
- name: 设置Python环境
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: 安装依赖
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: 检查硬件连接
run: |
python scripts/check_hardware.py
- name: 运行HIL测试
run: |
python main.py --verbose
continue-on-error: false
- name: 上传测试报告
if: always()
uses: actions/upload-artifact@v3
with:
name: test-reports
path: reports/
- name: 发送通知
if: failure()
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
text: 'HIL测试失败'
webhook_url: ${{ secrets.SLACK_WEBHOOK }}
7.2 Jenkins集成¶
Jenkinsfile:
pipeline {
agent {
label 'hil-test-node' // 连接硬件的节点
}
parameters {
choice(
name: 'TEST_CATEGORY',
choices: ['all', 'communication', 'control', 'safety'],
description: '测试类别'
)
choice(
name: 'TEST_PRIORITY',
choices: ['all', 'CRITICAL', 'HIGH', 'MEDIUM', 'LOW'],
description: '测试优先级'
)
}
environment {
PYTHON_ENV = 'venv'
}
stages {
stage('准备环境') {
steps {
sh '''
python3 -m venv ${PYTHON_ENV}
. ${PYTHON_ENV}/bin/activate
pip install -r requirements.txt
'''
}
}
stage('检查硬件') {
steps {
sh '''
. ${PYTHON_ENV}/bin/activate
python scripts/check_hardware.py
'''
}
}
stage('运行测试') {
steps {
script {
def filterArgs = ""
if (params.TEST_CATEGORY != 'all') {
filterArgs += "--filter-category ${params.TEST_CATEGORY} "
}
if (params.TEST_PRIORITY != 'all') {
filterArgs += "--filter-priority ${params.TEST_PRIORITY} "
}
sh """
. ${PYTHON_ENV}/bin/activate
python main.py ${filterArgs} --verbose
"""
}
}
}
stage('生成报告') {
steps {
publishHTML([
reportDir: 'reports',
reportFiles: '*.html',
reportName: 'HIL Test Report'
])
}
}
}
post {
always {
archiveArtifacts artifacts: 'reports/**/*', allowEmptyArchive: true
archiveArtifacts artifacts: 'logs/**/*', allowEmptyArchive: true
}
failure {
emailext (
subject: "HIL测试失败: ${env.JOB_NAME} - ${env.BUILD_NUMBER}",
body: """
<p>HIL测试执行失败</p>
<p>项目: ${env.JOB_NAME}</p>
<p>构建号: ${env.BUILD_NUMBER}</p>
<p>查看详情: ${env.BUILD_URL}</p>
""",
to: '${DEFAULT_RECIPIENTS}',
mimeType: 'text/html'
)
}
success {
echo '测试全部通过!'
}
}
}
7.3 硬件检查脚本¶
硬件连接检查 (scripts/check_hardware.py):
#!/usr/bin/env python3
"""检查硬件连接状态"""
import sys
import serial
import serial.tools.list_ports
def check_serial_ports():
"""检查串口"""
print("检查串口连接...")
ports = list(serial.tools.list_ports.comports())
if not ports:
print("❌ 未找到串口设备")
return False
print(f"✓ 找到 {len(ports)} 个串口设备:")
for port in ports:
print(f" - {port.device}: {port.description}")
return True
def check_can_interface():
"""检查CAN接口"""
print("\n检查CAN接口...")
try:
import can
# 尝试打开CAN接口
bus = can.Bus(channel='can0', bustype='socketcan', bitrate=500000)
bus.shutdown()
print("✓ CAN接口正常")
return True
except Exception as e:
print(f"❌ CAN接口异常: {e}")
return False
def check_gpio():
"""检查GPIO"""
print("\n检查GPIO...")
try:
import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.cleanup()
print("✓ GPIO正常")
return True
except Exception as e:
print(f"⚠ GPIO不可用: {e}")
return True # GPIO不是必需的
def main():
"""主函数"""
print("=" * 50)
print("HIL测试系统硬件检查")
print("=" * 50)
results = []
results.append(check_serial_ports())
results.append(check_can_interface())
results.append(check_gpio())
print("\n" + "=" * 50)
if all(results):
print("✓ 所有硬件检查通过")
return 0
else:
print("❌ 部分硬件检查失败")
return 1
if __name__ == '__main__':
sys.exit(main())
故障排除¶
问题1: 串口无法打开¶
现象:
原因: - 设备未连接 - 权限不足 - 端口被占用 - 驱动未安装
解决方法:
# 1. 检查设备是否存在
ls -l /dev/ttyUSB*
# 2. 添加用户到dialout组
sudo usermod -a -G dialout $USER
# 注销后重新登录
# 3. 检查端口占用
lsof /dev/ttyUSB0
# 4. 临时修改权限
sudo chmod 666 /dev/ttyUSB0
问题2: CAN接口初始化失败¶
现象:
原因: - CAN接口未启动 - 驱动未加载 - 波特率不匹配
解决方法:
# 1. 加载CAN驱动
sudo modprobe can
sudo modprobe can_raw
sudo modprobe vcan
# 2. 配置CAN接口
sudo ip link set can0 type can bitrate 500000
sudo ip link set can0 up
# 3. 检查状态
ip -details link show can0
# 4. 测试CAN通信
cansend can0 123#DEADBEEF
candump can0
问题3: 测试超时¶
现象: 测试用例执行超时,无响应
原因: - 设备未响应 - 超时设置过短 - 通信协议不匹配
解决方法:
-
增加超时时间
-
添加调试日志
-
检查设备状态
问题4: 测试结果不稳定¶
现象: 同一测试用例有时通过,有时失败
原因: - 时序问题 - 资源竞争 - 环境干扰
解决方法:
-
添加延迟
-
添加重试机制
-
隔离测试环境
最佳实践¶
测试设计原则¶
- 独立性
- 每个测试用例独立运行
- 不依赖其他测试的结果
-
使用setUp和tearDown确保环境一致
-
可重复性
- 测试结果应该可重复
- 避免依赖随机数或时间
-
使用固定的测试数据
-
清晰性
- 测试用例命名清晰
- 测试步骤描述详细
-
期望结果明确
-
完整性
- 覆盖正常场景
- 覆盖边界条件
- 覆盖异常情况
系统维护¶
- 定期校准
- 定期校准测试设备
- 验证测试基准
-
更新测试数据
-
版本管理
- 测试用例版本化
- 配置文件版本化
-
测试结果归档
-
文档更新
- 及时更新测试文档
- 记录已知问题
-
维护FAQ
-
性能优化
- 优化测试执行时间
- 并行执行测试
- 缓存测试数据
安全考虑¶
- 硬件保护
- 添加过流保护
- 添加过压保护
-
使用隔离电路
-
软件保护
- 输入验证
- 异常处理
-
超时保护
-
数据安全
- 敏感数据加密
- 访问权限控制
- 审计日志
项目扩展¶
扩展方向¶
- 支持更多协议
- Modbus
- Ethernet/IP
- OPC UA
-
MQTT
-
增强数据分析
- 实时波形显示
- 统计分析
- 趋势预测
-
异常检测
-
Web界面
- 测试用例管理
- 实时监控
- 报告查看
-
远程控制
-
分布式测试
- 多节点测试
- 负载均衡
- 结果聚合
Web界面示例¶
Flask Web服务 (web_server.py):
from flask import Flask, render_template, jsonify, request
from flask_socketio import SocketIO, emit
import threading
app = Flask(__name__)
socketio = SocketIO(app)
# 全局测试状态
test_status = {
'running': False,
'current_test': None,
'progress': 0
}
@app.route('/')
def index():
"""主页"""
return render_template('index.html')
@app.route('/api/tests')
def get_tests():
"""获取测试列表"""
test_manager = TestCaseManager('tests/test_cases')
test_manager.load_test_cases()
tests = [tc.to_dict() for tc in test_manager.get_all_test_cases()]
return jsonify(tests)
@app.route('/api/run', methods=['POST'])
def run_tests():
"""运行测试"""
test_ids = request.json.get('test_ids', [])
# 在后台线程运行测试
thread = threading.Thread(
target=run_tests_background,
args=(test_ids,)
)
thread.start()
return jsonify({'status': 'started'})
def run_tests_background(test_ids):
"""后台运行测试"""
global test_status
test_status['running'] = True
for i, test_id in enumerate(test_ids):
test_status['current_test'] = test_id
test_status['progress'] = (i + 1) / len(test_ids) * 100
# 发送进度更新
socketio.emit('test_progress', test_status)
# 执行测试...
time.sleep(2) # 模拟测试执行
test_status['running'] = False
socketio.emit('test_complete', {'status': 'success'})
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=5000)
项目总结¶
项目成果¶
通过本项目,你已经构建了一个完整的HIL测试系统,包括:
✅ 硬件接口层 - 串口通信接口 - CAN总线接口 - GPIO控制接口 - 可扩展的接口架构
✅ 测试管理层 - 测试用例数据模型 - 测试用例管理器 - 灵活的过滤和查询
✅ 测试执行层 - 自动化测试执行 - 多种测试动作支持 - 结果验证机制
✅ 报告生成 - HTML格式报告 - JSON格式报告 - 详细的统计信息
✅ CI/CD集成 - GitHub Actions支持 - Jenkins支持 - 自动化测试流程
关键技术点¶
- 模块化设计
- 清晰的层次结构
- 松耦合的模块
-
易于扩展和维护
-
硬件抽象
- 统一的接口定义
- 支持多种通信协议
-
模拟模式支持
-
测试自动化
- 声明式测试用例
- 自动化执行
-
结果自动验证
-
可扩展性
- 插件式架构
- 配置驱动
- 支持自定义扩展
实际应用建议¶
- 从简单开始
- 先实现基本功能
- 逐步添加复杂特性
-
持续优化改进
-
重视文档
- 编写详细的使用文档
- 记录设计决策
-
维护测试用例文档
-
持续改进
- 收集用户反馈
- 优化测试效率
-
增强系统稳定性
-
团队协作
- 建立测试规范
- 代码审查
- 知识分享
性能指标¶
典型性能: - 测试用例加载: < 1秒 - 单个测试执行: 5-30秒 - 报告生成: < 2秒 - 支持测试数量: 1000+
可靠性: - 测试成功率: > 99% - 系统稳定性: 24/7运行 - 错误恢复: 自动重试
成本分析¶
硬件成本: ¥5,000 - ¥20,000 - 基础配置: ¥5,000 - 标准配置: ¥10,000 - 高级配置: ¥20,000
开发成本: 约40-80人天 - 基础功能: 20人天 - 完整系统: 40人天 - 高级特性: 80人天
维护成本: 约5-10人天/月 - 日常维护: 5人天/月 - 功能扩展: 10人天/月
投资回报: - 减少人工测试时间: 70-90% - 提高测试覆盖率: 50-100% - 缩短开发周期: 20-40% - 提高产品质量: 显著提升
下一步学习¶
相关主题¶
建议继续学习以下内容:
- 测试理论
- 单元测试框架搭建
- 自动化测试集成
-
硬件接口
- 串口调试技巧大全
- CAN协议实战
-
系统集成
- Docker化开发环境
- 微服务架构
进阶方向¶
- 实时系统测试
- RTOS测试方法
- 实时性能分析
-
时序验证
-
安全测试
- 渗透测试
- 模糊测试
-
安全审计
-
性能测试
- 压力测试
- 负载测试
-
基准测试
-
AI辅助测试
- 智能测试用例生成
- 异常检测
- 预测性维护
参考资料¶
书籍推荐¶
- "Hardware-in-the-Loop Simulation" - Hanselmann
- HIL测试理论和实践
-
系统建模方法
-
"Embedded Software Testing" - Broekman & Notenboom
- 嵌入式测试策略
-
测试用例设计
-
"Continuous Delivery" - Humble & Farley
- 持续交付实践
- 自动化测试
在线资源¶
- 官方文档
- PySerial文档
- Python-CAN文档
-
开源项目
- Robot Framework
-
技术博客
- Embedded Artistry
- Interrupt Blog
工具和平台¶
- 商业工具
- dSPACE HIL系统
- NI LabVIEW
-
Vector CANoe
-
开源工具
- pytest
- Robot Framework
-
Jenkins
-
硬件平台
- 树莓派
- BeagleBone
- Arduino
练习项目¶
基础练习¶
- 串口回环测试
- 实现串口发送和接收
- 验证数据完整性
-
测试不同波特率
-
CAN消息测试
- 发送CAN消息
- 接收并解析消息
-
实现过滤功能
-
GPIO控制测试
- 控制LED闪烁
- 读取按键状态
- 实现PWM输出
进阶练习¶
- 协议测试
- 实现Modbus协议测试
- 测试协议时序
-
验证错误处理
-
性能测试
- 测试通信吞吐量
- 测试响应时间
-
分析性能瓶颈
-
压力测试
- 长时间运行测试
- 高频率消息测试
- 资源占用监控
综合项目¶
智能家居设备HIL测试系统
要求: - 支持WiFi、蓝牙、Zigbee通信 - 测试设备联动功能 - 模拟各种传感器输入 - 生成详细测试报告 - 集成到CI/CD流程
恭喜你完成了HIL测试系统项目!
这个系统将帮助你: - 提高测试效率 - 保证产品质量 - 加速开发进度 - 降低测试成本
继续探索和优化你的测试系统,祝你在嵌入式测试领域取得成功!