跳转至

测试自动化

学习目标

通过本文档的学习,你将能够:

  • 理解核心概念和原理
  • 掌握实际应用方法
  • 了解最佳实践和注意事项

前置知识

在学习本文档之前,建议你已经掌握:

  • 基础的嵌入式系统知识
  • C/C++编程基础
  • 相关领域的基本概念

概述

测试自动化是使用软件工具自动执行测试用例、比较实际结果与预期结果、生成测试报告的过程。对于医疗设备软件,自动化测试可以提高测试效率、确保测试一致性、支持持续集成和快速迭代。

为什么需要测试自动化?

医疗设备软件的挑战

  • 频繁的回归测试: 每次代码变更都需要验证现有功能
  • 复杂的测试场景: 需要测试大量的输入组合和边界条件
  • 监管要求: 需要可追溯的测试记录和文档
  • 质量保证: 确保每个版本都经过充分测试

自动化的优势

方面 手动测试 自动化测试
执行速度
可重复性
人为错误
成本(长期)
覆盖率 有限 广泛
CI/CD集成 困难 容易

测试自动化策略

测试金字塔

        /\
       /  \  UI测试 (10%)
      /____\  手动/自动化
     /      \
    / API测试 \ (30%)
   /___________\  自动化
  /             \
 /   单元测试    \ (60%)
/_________________\  自动化

自动化测试框架选择

Python测试框架

pytest - 推荐用于医疗设备软件

# test_ecg_processor.py
import pytest
from medical_device.ecg import ECGProcessor

class TestECGProcessor:
    """ECG处理器测试"""

    @pytest.fixture
    def processor(self):
        """测试夹具 - 创建处理器实例"""
        return ECGProcessor(sample_rate=500)

    @pytest.fixture
    def sample_ecg_data(self):
        """测试夹具 - 生成样本ECG数据"""
        import numpy as np
        # 生成60秒的模拟ECG数据
        t = np.linspace(0, 60, 30000)
        ecg = np.sin(2 * np.pi * 1.2 * t)  # 72 bpm
        return ecg

    def test_heart_rate_calculation(self, processor, sample_ecg_data):
        """测试心率计算"""
        heart_rate = processor.calculate_heart_rate(sample_ecg_data)
        assert 70 <= heart_rate <= 75, f"心率应在70-75之间,实际: {heart_rate}"

    def test_r_peak_detection(self, processor, sample_ecg_data):
        """测试R波检测"""
        r_peaks = processor.detect_r_peaks(sample_ecg_data)
        assert len(r_peaks) > 0, "应该检测到R波"
        assert len(r_peaks) >= 70, "60秒应检测到至少70个R波"

    @pytest.mark.parametrize("noise_level", [0.1, 0.2, 0.5])
    def test_noise_tolerance(self, processor, sample_ecg_data, noise_level):
        """测试噪声容忍度"""
        import numpy as np
        noisy_data = sample_ecg_data + np.random.normal(0, noise_level, len(sample_ecg_data))
        heart_rate = processor.calculate_heart_rate(noisy_data)
        assert 60 <= heart_rate <= 85, f"噪声水平{noise_level}下心率计算失败"

    def test_invalid_input(self, processor):
        """测试无效输入处理"""
        with pytest.raises(ValueError):
            processor.calculate_heart_rate([])  # 空数据

        with pytest.raises(ValueError):
            processor.calculate_heart_rate(None)  # None

    @pytest.mark.slow
    def test_long_duration_processing(self, processor):
        """测试长时间数据处理"""
        import numpy as np
        # 24小时的数据
        long_data = np.random.randn(43200000)
        result = processor.process_continuous(long_data)
        assert result is not None

# pytest配置文件
# pytest.ini
[pytest]
markers =
    slow: marks tests as slow (deselect with '-m "not slow"')
    integration: marks tests as integration tests
    unit: marks tests as unit tests
    critical: marks tests as critical for patient safety

testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*

# 覆盖率配置
addopts = 
    --cov=medical_device
    --cov-report=html
    --cov-report=term
    --cov-fail-under=80
    --strict-markers
    -v

Java测试框架

JUnit 5 + Mockito

// ECGProcessorTest.java
import org.junit.jupiter.api.*;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

@DisplayName("ECG处理器测试")
class ECGProcessorTest {

    private ECGProcessor processor;

    @Mock
    private DataLogger logger;

    @BeforeEach
    void setUp() {
        MockitoAnnotations.openMocks(this);
        processor = new ECGProcessor(500, logger);
    }

    @Test
    @DisplayName("应该正确计算心率")
    @Tag("unit")
    void shouldCalculateHeartRateCorrectly() {
        // Arrange
        double[] ecgData = generateSampleECG(60, 72);

        // Act
        int heartRate = processor.calculateHeartRate(ecgData);

        // Assert
        assertTrue(heartRate >= 70 && heartRate <= 75,
            "心率应在70-75之间,实际: " + heartRate);
        verify(logger).log(contains("Heart rate calculated"));
    }

    @ParameterizedTest
    @ValueSource(ints = {60, 80, 100, 120})
    @DisplayName("应该处理不同心率")
    void shouldHandleDifferentHeartRates(int expectedHR) {
        double[] ecgData = generateSampleECG(60, expectedHR);
        int actualHR = processor.calculateHeartRate(ecgData);
        assertEquals(expectedHR, actualHR, 5);
    }

    @Test
    @DisplayName("应该抛出异常当输入为空")
    void shouldThrowExceptionForEmptyInput() {
        assertThrows(IllegalArgumentException.class, () -> {
            processor.calculateHeartRate(new double[0]);
        });
    }

    @RepeatedTest(10)
    @DisplayName("应该产生一致的结果")
    void shouldProduceConsistentResults() {
        double[] ecgData = generateSampleECG(60, 72);
        int firstResult = processor.calculateHeartRate(ecgData);
        int secondResult = processor.calculateHeartRate(ecgData);
        assertEquals(firstResult, secondResult);
    }

    @Nested
    @DisplayName("R波检测测试")
    class RPeakDetectionTests {

        @Test
        @DisplayName("应该检测到所有R波")
        void shouldDetectAllRPeaks() {
            double[] ecgData = generateSampleECG(60, 72);
            List<Integer> rPeaks = processor.detectRPeaks(ecgData);
            assertTrue(rPeaks.size() >= 70);
        }

        @Test
        @DisplayName("R波间隔应该合理")
        void rPeakIntervalsShouldBeReasonable() {
            double[] ecgData = generateSampleECG(60, 72);
            List<Integer> rPeaks = processor.detectRPeaks(ecgData);

            for (int i = 1; i < rPeaks.size(); i++) {
                int interval = rPeaks.get(i) - rPeaks.get(i-1);
                assertTrue(interval > 200 && interval < 600,
                    "R-R间隔应该合理");
            }
        }
    }

    private double[] generateSampleECG(int durationSeconds, int heartRate) {
        // 生成模拟ECG数据
        int sampleRate = 500;
        int numSamples = durationSeconds * sampleRate;
        double[] data = new double[numSamples];

        double frequency = heartRate / 60.0;
        for (int i = 0; i < numSamples; i++) {
            double t = (double) i / sampleRate;
            data[i] = Math.sin(2 * Math.PI * frequency * t);
        }

        return data;
    }
}

CI/CD集成

GitHub Actions工作流

# .github/workflows/medical-device-ci.yml
name: Medical Device CI/CD

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

env:
  PYTHON_VERSION: '3.9'

jobs:
  # 单元测试
  unit-tests:
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v3

    - name: 设置Python
      uses: actions/setup-python@v4
      with:
        python-version: ${{ env.PYTHON_VERSION }}

    - name: 安装依赖
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install -r requirements-dev.txt

    - name: 运行单元测试
      run: |
        pytest tests/unit \
          --cov=medical_device \
          --cov-report=xml \
          --cov-report=html \
          --junitxml=junit/test-results.xml

    - name: 上传覆盖率报告
      uses: codecov/codecov-action@v3
      with:
        files: ./coverage.xml
        flags: unittests
        name: codecov-umbrella

    - name: 发布测试结果
      uses: EnricoMi/publish-unit-test-result-action@v2
      if: always()
      with:
        files: junit/test-results.xml

  # 集成测试
  integration-tests:
    runs-on: ubuntu-latest
    needs: unit-tests

    services:
      postgres:
        image: postgres:13
        env:
          POSTGRES_PASSWORD: postgres
          POSTGRES_DB: medical_device_test
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
        ports:
          - 5432:5432

      redis:
        image: redis:6
        options: >-
          --health-cmd "redis-cli ping"
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
        ports:
          - 6379:6379

    steps:
    - uses: actions/checkout@v3

    - name: 设置Python
      uses: actions/setup-python@v4
      with:
        python-version: ${{ env.PYTHON_VERSION }}

    - name: 安装依赖
      run: |
        pip install -r requirements.txt
        pip install -r requirements-dev.txt

    - name: 运行集成测试
      env:
        DATABASE_URL: postgresql://postgres:postgres@localhost:5432/medical_device_test
        REDIS_URL: redis://localhost:6379
      run: |
        pytest tests/integration -v --tb=short

  # 性能测试
  performance-tests:
    runs-on: ubuntu-latest
    needs: integration-tests

    steps:
    - uses: actions/checkout@v3

    - name: 设置Python
      uses: actions/setup-python@v4
      with:
        python-version: ${{ env.PYTHON_VERSION }}

    - name: 安装依赖
      run: |
        pip install -r requirements.txt
        pip install locust

    - name: 运行性能测试
      run: |
        locust -f tests/performance/locustfile.py \
          --headless \
          --users 100 \
          --spawn-rate 10 \
          --run-time 5m \
          --host https://staging.medical-device.com \
          --html performance-report.html

    - name: 上传性能报告
      uses: actions/upload-artifact@v3
      with:
        name: performance-report
        path: performance-report.html

  # 安全扫描
  security-scan:
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v3

    - name: 运行Bandit安全扫描
      run: |
        pip install bandit
        bandit -r medical_device/ -f json -o bandit-report.json

    - name: 运行Safety依赖检查
      run: |
        pip install safety
        safety check --json > safety-report.json

    - name: 上传安全报告
      uses: actions/upload-artifact@v3
      with:
        name: security-reports
        path: |
          bandit-report.json
          safety-report.json

  # 构建Docker镜像
  build-docker:
    runs-on: ubuntu-latest
    needs: [unit-tests, integration-tests, security-scan]

    steps:
    - uses: actions/checkout@v3

    - name: 设置Docker Buildx
      uses: docker/setup-buildx-action@v2

    - name: 登录Docker Hub
      uses: docker/login-action@v2
      with:
        username: ${{ secrets.DOCKER_USERNAME }}
        password: ${{ secrets.DOCKER_PASSWORD }}

    - name: 构建并推送
      uses: docker/build-push-action@v4
      with:
        context: .
        push: true
        tags: |
          medical-device:${{ github.sha }}
          medical-device:latest
        cache-from: type=gha
        cache-to: type=gha,mode=max

    - name: 扫描Docker镜像
      run: |
        docker run --rm \
          -v /var/run/docker.sock:/var/run/docker.sock \
          aquasec/trivy image medical-device:${{ github.sha }}

GitLab CI/CD配置

# .gitlab-ci.yml
stages:
  - test
  - security
  - build
  - deploy

variables:
  PYTHON_VERSION: "3.9"
  PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"

cache:
  paths:
    - .cache/pip
    - venv/

before_script:
  - python -V
  - pip install virtualenv
  - virtualenv venv
  - source venv/bin/activate
  - pip install -r requirements.txt

# 单元测试
unit-test:
  stage: test
  script:
    - pip install pytest pytest-cov
    - pytest tests/unit --cov=medical_device --cov-report=term --cov-report=xml
  coverage: '/TOTAL.*\s+(\d+%)$/'
  artifacts:
    reports:
      coverage_report:
        coverage_format: cobertura
        path: coverage.xml
    paths:
      - htmlcov/
    expire_in: 1 week

# 集成测试
integration-test:
  stage: test
  services:
    - postgres:13
    - redis:6
  variables:
    POSTGRES_DB: medical_device_test
    POSTGRES_USER: postgres
    POSTGRES_PASSWORD: postgres
    DATABASE_URL: postgresql://postgres:postgres@postgres:5432/medical_device_test
  script:
    - pytest tests/integration -v
  only:
    - merge_requests
    - main

# 代码质量检查
code-quality:
  stage: test
  script:
    - pip install pylint flake8 mypy
    - pylint medical_device/
    - flake8 medical_device/
    - mypy medical_device/
  allow_failure: true

# 安全扫描
security-sast:
  stage: security
  script:
    - pip install bandit
    - bandit -r medical_device/ -f json -o bandit-report.json
  artifacts:
    reports:
      sast: bandit-report.json

# 依赖扫描
dependency-scanning:
  stage: security
  script:
    - pip install safety
    - safety check --json > safety-report.json
  artifacts:
    paths:
      - safety-report.json

# 构建
build:
  stage: build
  script:
    - python setup.py bdist_wheel
  artifacts:
    paths:
      - dist/
    expire_in: 1 month
  only:
    - tags
    - main

# 部署到测试环境
deploy-staging:
  stage: deploy
  script:
    - echo "Deploying to staging environment"
    - pip install dist/*.whl
    - python scripts/deploy_staging.py
  environment:
    name: staging
    url: https://staging.medical-device.com
  only:
    - develop

# 部署到生产环境
deploy-production:
  stage: deploy
  script:
    - echo "Deploying to production environment"
    - pip install dist/*.whl
    - python scripts/deploy_production.py
  environment:
    name: production
    url: https://medical-device.com
  when: manual
  only:
    - tags

测试数据管理

测试数据生成

# test_data_factory.py
from faker import Faker
import random
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List

fake = Faker('zh_CN')

@dataclass
class PatientData:
    """患者数据"""
    patient_id: str
    name: str
    gender: str
    birth_date: str
    phone: str
    address: str

@dataclass
class VitalSigns:
    """生命体征数据"""
    patient_id: str
    timestamp: datetime
    heart_rate: int
    blood_pressure_systolic: int
    blood_pressure_diastolic: int
    temperature: float
    spo2: int

class TestDataFactory:
    """测试数据工厂"""

    @staticmethod
    def create_patient(patient_id=None) -> PatientData:
        """创建患者数据"""
        return PatientData(
            patient_id=patient_id or f"P{random.randint(100000, 999999)}",
            name=fake.name(),
            gender=random.choice(['M', 'F']),
            birth_date=fake.date_of_birth(minimum_age=18, maximum_age=90).isoformat(),
            phone=fake.phone_number(),
            address=fake.address()
        )

    @staticmethod
    def create_vital_signs(patient_id: str, timestamp=None) -> VitalSigns:
        """创建生命体征数据"""
        return VitalSigns(
            patient_id=patient_id,
            timestamp=timestamp or datetime.now(),
            heart_rate=random.randint(60, 100),
            blood_pressure_systolic=random.randint(110, 140),
            blood_pressure_diastolic=random.randint(70, 90),
            temperature=round(random.uniform(36.0, 37.5), 1),
            spo2=random.randint(95, 100)
        )

    @staticmethod
    def create_ecg_data(duration_seconds=60, sample_rate=500, heart_rate=72):
        """创建ECG数据"""
        import numpy as np
        num_samples = duration_seconds * sample_rate
        t = np.linspace(0, duration_seconds, num_samples)
        frequency = heart_rate / 60.0

        # 生成基础正弦波
        ecg = np.sin(2 * np.pi * frequency * t)

        # 添加噪声
        noise = np.random.normal(0, 0.1, num_samples)
        ecg += noise

        return ecg

    @staticmethod
    def create_batch_patients(count=100) -> List[PatientData]:
        """批量创建患者数据"""
        return [TestDataFactory.create_patient() for _ in range(count)]

    @staticmethod
    def create_patient_history(patient_id: str, days=30) -> List[VitalSigns]:
        """创建患者历史数据"""
        history = []
        start_date = datetime.now() - timedelta(days=days)

        for day in range(days):
            for hour in range(0, 24, 4):  # 每4小时一次测量
                timestamp = start_date + timedelta(days=day, hours=hour)
                history.append(
                    TestDataFactory.create_vital_signs(patient_id, timestamp)
                )

        return history

# 使用示例
def test_patient_data_processing():
    """测试患者数据处理"""
    # 创建测试数据
    patient = TestDataFactory.create_patient()
    vitals = TestDataFactory.create_vital_signs(patient.patient_id)

    # 执行测试
    processor = PatientDataProcessor()
    result = processor.process(patient, vitals)

    assert result is not None
    assert result.patient_id == patient.patient_id

测试数据库管理

# test_database.py
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from medical_device.models import Base, Patient, VitalSigns

@pytest.fixture(scope='session')
def test_engine():
    """创建测试数据库引擎"""
    engine = create_engine('sqlite:///:memory:')
    Base.metadata.create_all(engine)
    return engine

@pytest.fixture(scope='function')
def db_session(test_engine):
    """创建数据库会话"""
    Session = sessionmaker(bind=test_engine)
    session = Session()

    yield session

    # 清理
    session.rollback()
    session.close()

@pytest.fixture
def sample_patient(db_session):
    """创建样本患者"""
    patient = Patient(
        patient_id='P123456',
        name='张三',
        gender='M',
        birth_date='1980-01-01'
    )
    db_session.add(patient)
    db_session.commit()
    return patient

def test_patient_crud(db_session, sample_patient):
    """测试患者CRUD操作"""
    # Read
    patient = db_session.query(Patient).filter_by(
        patient_id='P123456'
    ).first()
    assert patient is not None
    assert patient.name == '张三'

    # Update
    patient.name = '李四'
    db_session.commit()

    updated_patient = db_session.query(Patient).filter_by(
        patient_id='P123456'
    ).first()
    assert updated_patient.name == '李四'

    # Delete
    db_session.delete(patient)
    db_session.commit()

    deleted_patient = db_session.query(Patient).filter_by(
        patient_id='P123456'
    ).first()
    assert deleted_patient is None

回归测试自动化

回归测试套件

# regression_test_suite.py
import pytest
from medical_device import ECGProcessor, PatientDataManager
from test_data_factory import TestDataFactory

class RegressionTestSuite:
    """回归测试套件"""

    @pytest.mark.regression
    class TestECGProcessing:
        """ECG处理回归测试"""

        def test_heart_rate_calculation_accuracy(self):
            """测试心率计算准确性 - 回归测试"""
            processor = ECGProcessor(sample_rate=500)

            # 测试已知的心率值
            test_cases = [
                (60, 60),   # 60 bpm
                (72, 72),   # 72 bpm
                (100, 100), # 100 bpm
                (120, 120)  # 120 bpm
            ]

            for expected_hr, input_hr in test_cases:
                ecg_data = TestDataFactory.create_ecg_data(
                    duration_seconds=60,
                    heart_rate=input_hr
                )
                calculated_hr = processor.calculate_heart_rate(ecg_data)
                assert abs(calculated_hr - expected_hr) <= 2, \
                    f"心率计算偏差过大: 期望{expected_hr}, 实际{calculated_hr}"

        def test_r_peak_detection_consistency(self):
            """测试R波检测一致性 - 回归测试"""
            processor = ECGProcessor(sample_rate=500)
            ecg_data = TestDataFactory.create_ecg_data(duration_seconds=60)

            # 多次运行应该产生相同结果
            results = [processor.detect_r_peaks(ecg_data) for _ in range(10)]

            # 验证结果一致性
            first_result = results[0]
            for result in results[1:]:
                assert len(result) == len(first_result), \
                    "R波检测结果不一致"

        def test_filter_performance(self):
            """测试滤波器性能 - 回归测试"""
            processor = ECGProcessor(sample_rate=500)
            ecg_data = TestDataFactory.create_ecg_data(duration_seconds=60)

            import time
            start_time = time.time()
            filtered_data = processor.apply_bandpass_filter(ecg_data)
            elapsed_time = time.time() - start_time

            # 性能回归检查
            assert elapsed_time < 0.1, \
                f"滤波器性能退化: {elapsed_time}秒 > 0.1秒"
            assert len(filtered_data) == len(ecg_data), \
                "滤波后数据长度改变"

    @pytest.mark.regression
    class TestPatientDataManagement:
        """患者数据管理回归测试"""

        def test_patient_search_functionality(self, db_session):
            """测试患者搜索功能 - 回归测试"""
            # 创建测试数据
            patients = TestDataFactory.create_batch_patients(100)
            for patient in patients:
                db_session.add(patient)
            db_session.commit()

            manager = PatientDataManager(db_session)

            # 测试各种搜索条件
            results = manager.search_patients(name='张')
            assert len(results) > 0, "按姓名搜索失败"

            results = manager.search_patients(gender='M')
            assert all(p.gender == 'M' for p in results), "按性别搜索失败"

        def test_data_export_format(self, db_session):
            """测试数据导出格式 - 回归测试"""
            patient = TestDataFactory.create_patient()
            db_session.add(patient)
            db_session.commit()

            manager = PatientDataManager(db_session)
            exported_data = manager.export_patient_data(patient.patient_id)

            # 验证导出格式
            required_fields = ['patient_id', 'name', 'gender', 'birth_date']
            for field in required_fields:
                assert field in exported_data, \
                    f"导出数据缺少字段: {field}"

# 运行回归测试
# pytest -m regression -v

视觉回归测试

# visual_regression_test.py
import pytest
from selenium import webdriver
from selenium.webdriver.common.by import By
from PIL import Image, ImageChops
import os

class VisualRegressionTest:
    """视觉回归测试"""

    @pytest.fixture
    def driver(self):
        """创建WebDriver"""
        options = webdriver.ChromeOptions()
        options.add_argument('--headless')
        driver = webdriver.Chrome(options=options)
        driver.set_window_size(1920, 1080)
        yield driver
        driver.quit()

    def capture_screenshot(self, driver, url, filename):
        """捕获截图"""
        driver.get(url)
        driver.save_screenshot(filename)

    def compare_images(self, baseline_path, current_path, threshold=0.01):
        """比较图像"""
        baseline = Image.open(baseline_path)
        current = Image.open(current_path)

        # 计算差异
        diff = ImageChops.difference(baseline, current)

        # 计算差异百分比
        histogram = diff.histogram()
        total_pixels = sum(histogram)
        different_pixels = sum(histogram[1:])
        diff_percentage = different_pixels / total_pixels

        return diff_percentage <= threshold

    @pytest.mark.visual
    def test_dashboard_layout(self, driver):
        """测试仪表板布局 - 视觉回归"""
        url = 'https://medical-device.com/dashboard'
        baseline_path = 'tests/visual/baseline/dashboard.png'
        current_path = 'tests/visual/current/dashboard.png'

        # 捕获当前截图
        self.capture_screenshot(driver, url, current_path)

        # 如果基线不存在,创建基线
        if not os.path.exists(baseline_path):
            os.makedirs(os.path.dirname(baseline_path), exist_ok=True)
            self.capture_screenshot(driver, url, baseline_path)
            pytest.skip("创建基线图像")

        # 比较图像
        is_similar = self.compare_images(baseline_path, current_path)
        assert is_similar, "仪表板布局发生变化"

API自动化测试

REST API测试

# test_api.py
import pytest
import requests
from requests.auth import HTTPBasicAuth

class TestMedicalDeviceAPI:
    """医疗设备API测试"""

    BASE_URL = "https://api.medical-device.com/v1"

    @pytest.fixture(scope='class')
    def auth_token(self):
        """获取认证令牌"""
        response = requests.post(
            f"{self.BASE_URL}/auth/login",
            json={
                "username": "test_user",
                "password": "test_password"
            }
        )
        assert response.status_code == 200
        return response.json()['token']

    @pytest.fixture
    def headers(self, auth_token):
        """请求头"""
        return {
            "Authorization": f"Bearer {auth_token}",
            "Content-Type": "application/json"
        }

    def test_get_patients_list(self, headers):
        """测试获取患者列表"""
        response = requests.get(
            f"{self.BASE_URL}/patients",
            headers=headers
        )

        assert response.status_code == 200
        data = response.json()
        assert 'patients' in data
        assert isinstance(data['patients'], list)

    def test_create_patient(self, headers):
        """测试创建患者"""
        patient_data = {
            "name": "测试患者",
            "gender": "M",
            "birth_date": "1980-01-01",
            "phone": "13800138000"
        }

        response = requests.post(
            f"{self.BASE_URL}/patients",
            json=patient_data,
            headers=headers
        )

        assert response.status_code == 201
        data = response.json()
        assert 'patient_id' in data
        assert data['name'] == patient_data['name']

        # 清理
        patient_id = data['patient_id']
        requests.delete(
            f"{self.BASE_URL}/patients/{patient_id}",
            headers=headers
        )

    def test_get_patient_by_id(self, headers):
        """测试根据ID获取患者"""
        # 先创建患者
        patient_data = {"name": "测试患者", "gender": "M", "birth_date": "1980-01-01"}
        create_response = requests.post(
            f"{self.BASE_URL}/patients",
            json=patient_data,
            headers=headers
        )
        patient_id = create_response.json()['patient_id']

        # 获取患者
        response = requests.get(
            f"{self.BASE_URL}/patients/{patient_id}",
            headers=headers
        )

        assert response.status_code == 200
        data = response.json()
        assert data['patient_id'] == patient_id
        assert data['name'] == patient_data['name']

        # 清理
        requests.delete(
            f"{self.BASE_URL}/patients/{patient_id}",
            headers=headers
        )

    def test_update_patient(self, headers):
        """测试更新患者信息"""
        # 创建患者
        patient_data = {"name": "原始姓名", "gender": "M", "birth_date": "1980-01-01"}
        create_response = requests.post(
            f"{self.BASE_URL}/patients",
            json=patient_data,
            headers=headers
        )
        patient_id = create_response.json()['patient_id']

        # 更新患者
        update_data = {"name": "更新后姓名"}
        response = requests.patch(
            f"{self.BASE_URL}/patients/{patient_id}",
            json=update_data,
            headers=headers
        )

        assert response.status_code == 200
        data = response.json()
        assert data['name'] == update_data['name']

        # 清理
        requests.delete(
            f"{self.BASE_URL}/patients/{patient_id}",
            headers=headers
        )

    def test_delete_patient(self, headers):
        """测试删除患者"""
        # 创建患者
        patient_data = {"name": "待删除患者", "gender": "M", "birth_date": "1980-01-01"}
        create_response = requests.post(
            f"{self.BASE_URL}/patients",
            json=patient_data,
            headers=headers
        )
        patient_id = create_response.json()['patient_id']

        # 删除患者
        response = requests.delete(
            f"{self.BASE_URL}/patients/{patient_id}",
            headers=headers
        )

        assert response.status_code == 204

        # 验证已删除
        get_response = requests.get(
            f"{self.BASE_URL}/patients/{patient_id}",
            headers=headers
        )
        assert get_response.status_code == 404

    @pytest.mark.parametrize("invalid_data", [
        {},  # 空数据
        {"name": ""},  # 空姓名
        {"name": "测试", "gender": "X"},  # 无效性别
        {"name": "测试", "birth_date": "invalid"},  # 无效日期
    ])
    def test_create_patient_validation(self, headers, invalid_data):
        """测试患者创建验证"""
        response = requests.post(
            f"{self.BASE_URL}/patients",
            json=invalid_data,
            headers=headers
        )

        assert response.status_code == 400
        data = response.json()
        assert 'error' in data

使用Tavern进行API测试

# tests/api/test_patients.tavern.yaml
---
test_name: 患者管理API测试

stages:
  - name: 登录获取令牌
    request:
      url: "{base_url}/auth/login"
      method: POST
      json:
        username: test_user
        password: test_password
    response:
      status_code: 200
      save:
        json:
          auth_token: token

  - name: 创建患者
    request:
      url: "{base_url}/patients"
      method: POST
      headers:
        Authorization: "Bearer {auth_token}"
      json:
        name: 测试患者
        gender: M
        birth_date: "1980-01-01"
        phone: "13800138000"
    response:
      status_code: 201
      json:
        name: 测试患者
        gender: M
      save:
        json:
          patient_id: patient_id

  - name: 获取患者信息
    request:
      url: "{base_url}/patients/{patient_id}"
      method: GET
      headers:
        Authorization: "Bearer {auth_token}"
    response:
      status_code: 200
      json:
        patient_id: "{patient_id}"
        name: 测试患者

  - name: 更新患者信息
    request:
      url: "{base_url}/patients/{patient_id}"
      method: PATCH
      headers:
        Authorization: "Bearer {auth_token}"
      json:
        name: 更新后姓名
    response:
      status_code: 200
      json:
        name: 更新后姓名

  - name: 删除患者
    request:
      url: "{base_url}/patients/{patient_id}"
      method: DELETE
      headers:
        Authorization: "Bearer {auth_token}"
    response:
      status_code: 204

  - name: 验证患者已删除
    request:
      url: "{base_url}/patients/{patient_id}"
      method: GET
      headers:
        Authorization: "Bearer {auth_token}"
    response:
      status_code: 404

测试报告和度量

自定义测试报告

# conftest.py
import pytest
from datetime import datetime
import json

class TestReport:
    """测试报告生成器"""

    def __init__(self):
        self.results = {
            'start_time': datetime.now().isoformat(),
            'tests': [],
            'summary': {
                'total': 0,
                'passed': 0,
                'failed': 0,
                'skipped': 0,
                'errors': 0
            }
        }

    def add_test_result(self, nodeid, outcome, duration, error=None):
        """添加测试结果"""
        self.results['tests'].append({
            'test': nodeid,
            'outcome': outcome,
            'duration': duration,
            'error': error
        })

        self.results['summary']['total'] += 1
        self.results['summary'][outcome] += 1

    def generate_report(self, filename='test_report.json'):
        """生成报告"""
        self.results['end_time'] = datetime.now().isoformat()

        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(self.results, f, indent=2, ensure_ascii=False)

@pytest.fixture(scope='session')
def test_report():
    """测试报告夹具"""
    return TestReport()

@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
    """钩子:生成测试报告"""
    outcome = yield
    report = outcome.get_result()

    if report.when == 'call':
        test_report = item.config._test_report
        test_report.add_test_result(
            nodeid=item.nodeid,
            outcome=report.outcome,
            duration=report.duration,
            error=str(report.longrepr) if report.failed else None
        )

def pytest_configure(config):
    """配置钩子"""
    config._test_report = TestReport()

def pytest_sessionfinish(session, exitstatus):
    """会话结束钩子"""
    test_report = session.config._test_report
    test_report.generate_report()

HTML测试报告

# 使用pytest-html生成HTML报告
# pytest --html=report.html --self-contained-html

# 自定义HTML报告
from py.xml import html

def pytest_html_report_title(report):
    """自定义报告标题"""
    report.title = "医疗设备软件测试报告"

def pytest_html_results_summary(prefix, summary, postfix):
    """自定义摘要"""
    prefix.extend([
        html.h2("测试环境"),
        html.p("操作系统: Ubuntu 20.04"),
        html.p("Python版本: 3.9"),
        html.p("测试日期: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    ])

def pytest_html_results_table_header(cells):
    """自定义表头"""
    cells.insert(2, html.th("测试类型"))
    cells.insert(3, html.th("优先级"))

def pytest_html_results_table_row(report, cells):
    """自定义表格行"""
    # 添加测试类型
    test_type = "单元测试"
    if "integration" in report.nodeid:
        test_type = "集成测试"
    elif "e2e" in report.nodeid:
        test_type = "端到端测试"

    cells.insert(2, html.td(test_type))

    # 添加优先级
    priority = "中"
    if "critical" in report.keywords:
        priority = "高"
    elif "low" in report.keywords:
        priority = "低"

    cells.insert(3, html.td(priority))

测试最佳实践

1. 测试命名规范

# 好的测试命名
def test_ecg_processor_calculates_correct_heart_rate_for_normal_rhythm():
    """测试ECG处理器为正常心律计算正确的心率"""
    pass

def test_patient_manager_raises_error_when_patient_id_is_invalid():
    """测试当患者ID无效时患者管理器抛出错误"""
    pass

# 不好的测试命名
def test_1():
    pass

def test_ecg():
    pass

2. 测试独立性

# 好的做法 - 测试独立
class TestPatientManager:
    @pytest.fixture
    def manager(self):
        return PatientManager()

    def test_add_patient(self, manager):
        patient = manager.add_patient("张三")
        assert patient is not None

    def test_remove_patient(self, manager):
        patient = manager.add_patient("李四")
        result = manager.remove_patient(patient.id)
        assert result is True

# 不好的做法 - 测试相互依赖
class TestPatientManager:
    def test_add_patient(self):
        self.patient = manager.add_patient("张三")
        assert self.patient is not None

    def test_remove_patient(self):
        # 依赖于test_add_patient
        result = manager.remove_patient(self.patient.id)
        assert result is True

3. 使用测试夹具

@pytest.fixture(scope='session')
def database():
    """会话级数据库夹具"""
    db = create_test_database()
    yield db
    db.close()

@pytest.fixture(scope='module')
def test_data():
    """模块级测试数据"""
    return load_test_data()

@pytest.fixture(scope='function')
def clean_database(database):
    """函数级数据库清理"""
    yield database
    database.clear()

4. 参数化测试

@pytest.mark.parametrize("heart_rate,expected_category", [
    (40, "bradycardia"),
    (60, "normal"),
    (100, "normal"),
    (120, "tachycardia"),
])
def test_heart_rate_categorization(heart_rate, expected_category):
    """测试心率分类"""
    category = categorize_heart_rate(heart_rate)
    assert category == expected_category

总结

测试自动化是确保医疗设备软件质量的关键实践。通过建立完善的自动化测试体系,可以:

  • ✅ 提高测试效率和覆盖率
  • ✅ 支持持续集成和快速迭代
  • ✅ 确保测试一致性和可重复性
  • ✅ 降低长期测试成本
  • ✅ 满足监管合规要求

相关资源

参考资料

  • pytest文档: https://docs.pytest.org/
  • JUnit 5文档: https://junit.org/junit5/docs/current/user-guide/
  • Selenium文档: https://www.selenium.dev/documentation/
  • GitHub Actions文档: https://docs.github.com/en/actions
  • GitLab CI/CD文档: https://docs.gitlab.com/ee/ci/

💬 讨论区

欢迎在这里分享您的想法、提出问题或参与讨论。需要 GitHub 账号登录。