跳转至

性能分析工具开发:构建专业的嵌入式性能分析系统

项目概述

本项目将指导你开发一个完整的嵌入式性能分析工具,用于实时监控、分析和可视化嵌入式系统的性能数据。该工具能够帮助开发者快速识别性能瓶颈,优化系统性能,提高开发效率。

项目目标

完成本项目后,你将拥有:

  • 完整的性能数据采集系统
  • 实时性能监控界面
  • 多维度性能分析算法
  • 交互式数据可视化工具
  • 性能报告生成系统
  • 历史数据管理功能
  • 可扩展的插件架构

项目特点

技术亮点: - 低开销的数据采集机制 - 实时数据处理和分析 - 丰富的可视化图表 - 智能性能瓶颈识别 - 支持多种嵌入式平台 - 模块化设计,易于扩展

应用场景: - 实时系统性能监控 - 性能瓶颈定位 - 功耗分析 - 内存使用分析 - 任务调度分析 - 中断响应分析

前置要求

知识要求

必备知识: - 精通C/C++或Python编程 - 深入理解嵌入式系统架构 - 熟悉性能优化原理 - 了解数据结构和算法 - 掌握串口通信协议

推荐知识: - 了解Web开发基础(HTML/CSS/JavaScript) - 熟悉数据可视化库(Matplotlib, Chart.js等) - 了解数据库操作 - 掌握多线程编程 - 了解统计分析方法

技能要求

  • 能够设计系统架构
  • 能够编写高效的数据处理代码
  • 能够创建用户界面
  • 能够分析性能数据
  • 能够编写技术文档

准备工作

硬件准备

名称 数量 说明 参考价格
开发主机 1 PC或笔记本,运行分析工具 ¥3000-8000
目标开发板 1 STM32/ESP32等嵌入式开发板 ¥50-500
调试器 1 J-Link/ST-Link等 ¥200-1000
USB转串口 1 用于数据传输 ¥20-50
逻辑分析仪 1 可选,用于信号分析 ¥200-2000

软件准备

开发环境: - 操作系统: Windows 10+, Linux (Ubuntu 20.04+) 或 macOS - 编程语言: Python 3.8+ (主机端), C/C++ (嵌入式端) - IDE: VS Code, PyCharm 或 Visual Studio - 版本控制: Git

Python依赖:

# 核心库
pip install pyserial numpy pandas

# 数据可视化
pip install matplotlib plotly dash

# Web界面
pip install flask flask-socketio

# 数据处理
pip install scipy scikit-learn

# 数据库
pip install sqlalchemy sqlite3

嵌入式工具链: - ARM GCC 或对应平台的编译器 - OpenOCD 或 J-Link 软件 - 串口调试工具

系统要求

最低配置: - CPU: Intel i5 或同等性能 - 内存: 8GB RAM - 存储: 50GB 可用空间 - 显示: 1920x1080分辨率

推荐配置: - CPU: Intel i7 或更高 - 内存: 16GB+ RAM - 存储: 256GB SSD - 显示: 双显示器,2K分辨率

步骤1: 系统架构设计

1.1 性能分析工具概述

什么是性能分析工具?

性能分析工具是用于监控、采集、分析和可视化嵌入式系统运行时性能数据的软件系统。它能够帮助开发者: - 实时监控系统运行状态 - 识别性能瓶颈 - 分析资源使用情况 - 优化系统性能 - 生成性能报告

核心功能: - 数据采集: 从目标设备采集性能数据 - 实时监控: 实时显示系统运行状态 - 数据分析: 分析性能数据,识别问题 - 可视化: 以图表形式展示数据 - 报告生成: 生成详细的性能报告

1.2 系统架构

整体架构图:

┌─────────────────────────────────────────────────────────┐
│                    用户界面层                            │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐             │
│  │实时监控  │  │数据分析  │  │报告生成  │             │
│  └──────────┘  └──────────┘  └──────────┘             │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│                    数据处理层                            │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐             │
│  │数据解析  │  │数据分析  │  │数据存储  │             │
│  └──────────┘  └──────────┘  └──────────┘             │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│                    通信层                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐             │
│  │串口通信  │  │网络通信  │  │调试接口  │             │
│  └──────────┘  └──────────┘  └──────────┘             │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│                    嵌入式端                              │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐             │
│  │数据采集  │  │数据编码  │  │数据发送  │             │
│  └──────────┘  └──────────┘  └──────────┘             │
└─────────────────────────────────────────────────────────┘

1.3 核心模块设计

1. 嵌入式端数据采集模块

功能: - CPU使用率采集 - 内存使用情况采集 - 任务执行时间采集 - 中断响应时间采集 - 函数调用统计 - 自定义性能计数器

2. 主机端通信模块

功能: - 串口数据接收 - 数据包解析 - 数据缓冲管理 - 通信错误处理

3. 数据处理模块

功能: - 数据解码和验证 - 数据聚合和统计 - 性能指标计算 - 异常检测

4. 数据存储模块

功能: - 实时数据缓存 - 历史数据存储 - 数据查询接口 - 数据导出功能

5. 可视化模块

功能: - 实时曲线图 - 柱状图和饼图 - 热力图 - 时间线视图 - 交互式图表

6. 分析模块

功能: - 性能瓶颈识别 - 趋势分析 - 对比分析 - 统计分析 - 智能建议

1.4 数据格式设计

性能数据包格式:

┌────────┬────────┬────────┬────────────┬────────┬────────┐
│ Header │ Length │  Type  │    Data    │  CRC   │  Tail  │
│ (2B)   │ (2B)   │ (1B)   │  (N bytes) │ (2B)   │ (1B)   │
└────────┴────────┴────────┴────────────┴────────┴────────┘

Header: 0xAA55 (固定)
Length: 数据长度
Type: 数据类型
  0x01: CPU使用率
  0x02: 内存使用
  0x03: 任务信息
  0x04: 函数性能
  0x05: 中断统计
  0x06: 自定义数据
Data: 实际数据
CRC: CRC16校验
Tail: 0x0D (固定)

数据类型定义:

// CPU使用率数据
typedef struct {
    uint32_t timestamp;      // 时间戳(ms)
    uint8_t  cpu_usage;      // CPU使用率(0-100)
    uint32_t idle_time;      // 空闲时间(us)
    uint32_t total_time;     // 总时间(us)
} CPUData_t;

// 内存使用数据
typedef struct {
    uint32_t timestamp;      // 时间戳(ms)
    uint32_t total_heap;     // 总堆大小
    uint32_t used_heap;      // 已用堆大小
    uint32_t free_heap;      // 空闲堆大小
    uint32_t peak_usage;     // 峰值使用
} MemoryData_t;

// 任务性能数据
typedef struct {
    uint32_t timestamp;      // 时间戳(ms)
    uint8_t  task_id;        // 任务ID
    char     task_name[16];  // 任务名称
    uint32_t exec_time;      // 执行时间(us)
    uint32_t exec_count;     // 执行次数
    uint8_t  priority;       // 优先级
    uint8_t  state;          // 状态
} TaskData_t;

// 函数性能数据
typedef struct {
    uint32_t timestamp;      // 时间戳(ms)
    uint16_t func_id;        // 函数ID
    char     func_name[32];  // 函数名称
    uint32_t call_count;     // 调用次数
    uint32_t total_time;     // 总时间(us)
    uint32_t min_time;       // 最小时间(us)
    uint32_t max_time;       // 最大时间(us)
    uint32_t avg_time;       // 平均时间(us)
} FunctionData_t;

1.5 目录结构设计

performance_analyzer/
├── embedded/               # 嵌入式端代码
│   ├── inc/               # 头文件
│   │   ├── perf_monitor.h
│   │   ├── perf_config.h
│   │   └── perf_protocol.h
│   ├── src/               # 源文件
│   │   ├── perf_monitor.c
│   │   ├── perf_cpu.c
│   │   ├── perf_memory.c
│   │   ├── perf_task.c
│   │   └── perf_comm.c
│   └── examples/          # 示例代码
├── host/                   # 主机端代码
│   ├── core/              # 核心模块
│   │   ├── data_receiver.py
│   │   ├── data_parser.py
│   │   ├── data_analyzer.py
│   │   └── data_storage.py
│   ├── ui/                # 用户界面
│   │   ├── main_window.py
│   │   ├── monitor_panel.py
│   │   ├── analysis_panel.py
│   │   └── report_panel.py
│   ├── visualization/     # 可视化
│   │   ├── realtime_chart.py
│   │   ├── histogram.py
│   │   └── timeline.py
│   └── utils/             # 工具函数
│       ├── logger.py
│       ├── config.py
│       └── report_gen.py
├── config/                 # 配置文件
│   ├── analyzer.yaml
│   └── logging.yaml
├── data/                   # 数据目录
│   ├── realtime/          # 实时数据
│   └── history/           # 历史数据
├── reports/                # 报告目录
├── docs/                   # 文档
├── tests/                  # 测试代码
├── requirements.txt        # Python依赖
└── README.md              # 项目说明

步骤2: 嵌入式端数据采集实现

2.1 性能监控核心模块

性能监控头文件 (perf_monitor.h):

#ifndef PERF_MONITOR_H
#define PERF_MONITOR_H

#include <stdint.h>
#include <stdbool.h>

// 配置参数
#define PERF_MAX_FUNCTIONS    50    // 最大监控函数数
#define PERF_MAX_TASKS        10    // 最大监控任务数
#define PERF_BUFFER_SIZE      1024  // 数据缓冲区大小
#define PERF_SAMPLE_INTERVAL  100   // 采样间隔(ms)

// 数据类型
typedef enum {
    PERF_TYPE_CPU = 0x01,
    PERF_TYPE_MEMORY = 0x02,
    PERF_TYPE_TASK = 0x03,
    PERF_TYPE_FUNCTION = 0x04,
    PERF_TYPE_INTERRUPT = 0x05,
    PERF_TYPE_CUSTOM = 0x06
} PerfDataType_t;

// CPU性能数据
typedef struct {
    uint32_t timestamp;
    uint8_t  cpu_usage;
    uint32_t idle_time;
    uint32_t total_time;
} PerfCPUData_t;

// 内存性能数据
typedef struct {
    uint32_t timestamp;
    uint32_t total_heap;
    uint32_t used_heap;
    uint32_t free_heap;
    uint32_t peak_usage;
} PerfMemoryData_t;

// 函数性能计数器
typedef struct {
    const char *name;
    uint16_t func_id;
    uint32_t call_count;
    uint32_t total_cycles;
    uint32_t min_cycles;
    uint32_t max_cycles;
    bool enabled;
} PerfFunctionCounter_t;

// 性能监控配置
typedef struct {
    bool cpu_monitor_enabled;
    bool memory_monitor_enabled;
    bool task_monitor_enabled;
    bool function_monitor_enabled;
    uint32_t sample_interval_ms;
    void (*send_callback)(uint8_t *data, uint16_t len);
} PerfMonitorConfig_t;

// 初始化和控制
void Perf_Init(PerfMonitorConfig_t *config);
void Perf_Start(void);
void Perf_Stop(void);
void Perf_Reset(void);

// CPU监控
void Perf_CPU_Update(void);
uint8_t Perf_CPU_GetUsage(void);

// 内存监控
void Perf_Memory_Update(void);
uint32_t Perf_Memory_GetUsed(void);
uint32_t Perf_Memory_GetFree(void);

// 函数性能监控
int Perf_Function_Register(const char *name);
void Perf_Function_Start(int func_id);
void Perf_Function_End(int func_id);
void Perf_Function_GetStats(int func_id, PerfFunctionCounter_t *stats);

// 数据发送
void Perf_SendData(PerfDataType_t type, void *data, uint16_t len);

// 辅助宏
#define PERF_MEASURE_START(id) \
    static int __perf_id_##id = -1; \
    if (__perf_id_##id == -1) { \
        __perf_id_##id = Perf_Function_Register(#id); \
    } \
    Perf_Function_Start(__perf_id_##id)

#define PERF_MEASURE_END(id) \
    Perf_Function_End(__perf_id_##id)

#endif // PERF_MONITOR_H

2.2 CPU使用率监控

CPU监控实现 (perf_cpu.c):

#include "perf_monitor.h"
#include <string.h>

// DWT寄存器定义(Cortex-M)
#define DWT_CTRL    (*(volatile uint32_t *)0xE0001000)
#define DWT_CYCCNT  (*(volatile uint32_t *)0xE0001004)
#define DEM_CR      (*(volatile uint32_t *)0xE000EDFC)
#define DEM_CR_TRCENA (1 << 24)

// CPU监控数据
static struct {
    uint32_t last_total_cycles;
    uint32_t last_idle_cycles;
    uint32_t last_timestamp;
    uint8_t  cpu_usage;
    bool initialized;
} cpu_monitor;

/**
 * @brief 初始化DWT周期计数器
 */
static void DWT_Init(void) {
    // 使能DWT
    DEM_CR |= DEM_CR_TRCENA;

    // 复位周期计数器
    DWT_CYCCNT = 0;

    // 使能周期计数器
    DWT_CTRL |= 1;
}

/**
 * @brief 获取当前周期计数
 */
static inline uint32_t DWT_GetCycles(void) {
    return DWT_CYCCNT;
}

/**
 * @brief 初始化CPU监控
 */
void Perf_CPU_Init(void) {
    DWT_Init();

    cpu_monitor.last_total_cycles = 0;
    cpu_monitor.last_idle_cycles = 0;
    cpu_monitor.last_timestamp = 0;
    cpu_monitor.cpu_usage = 0;
    cpu_monitor.initialized = true;
}

/**
 * @brief 更新CPU使用率
 * 
 * 应在空闲任务中调用,记录空闲时间
 */
void Perf_CPU_Update(void) {
    if (!cpu_monitor.initialized) {
        return;
    }

    uint32_t current_cycles = DWT_GetCycles();
    uint32_t current_time = HAL_GetTick();

    // 计算时间差
    uint32_t delta_time = current_time - cpu_monitor.last_timestamp;

    if (delta_time >= PERF_SAMPLE_INTERVAL) {
        // 计算周期差
        uint32_t total_cycles = current_cycles - cpu_monitor.last_total_cycles;
        uint32_t idle_cycles = cpu_monitor.last_idle_cycles;

        // 计算CPU使用率
        if (total_cycles > 0) {
            uint32_t busy_cycles = total_cycles - idle_cycles;
            cpu_monitor.cpu_usage = (busy_cycles * 100) / total_cycles;

            // 限制在0-100范围
            if (cpu_monitor.cpu_usage > 100) {
                cpu_monitor.cpu_usage = 100;
            }
        }

        // 发送数据
        PerfCPUData_t data = {
            .timestamp = current_time,
            .cpu_usage = cpu_monitor.cpu_usage,
            .idle_time = idle_cycles,
            .total_time = total_cycles
        };
        Perf_SendData(PERF_TYPE_CPU, &data, sizeof(data));

        // 更新记录
        cpu_monitor.last_total_cycles = current_cycles;
        cpu_monitor.last_idle_cycles = 0;
        cpu_monitor.last_timestamp = current_time;
    }
}

/**
 * @brief 记录空闲时间
 * 
 * 在空闲任务中调用
 */
void Perf_CPU_RecordIdle(void) {
    static uint32_t idle_start = 0;

    if (idle_start == 0) {
        idle_start = DWT_GetCycles();
    } else {
        uint32_t idle_end = DWT_GetCycles();
        cpu_monitor.last_idle_cycles += (idle_end - idle_start);
        idle_start = idle_end;
    }
}

/**
 * @brief 获取CPU使用率
 */
uint8_t Perf_CPU_GetUsage(void) {
    return cpu_monitor.cpu_usage;
}

2.3 内存使用监控

内存监控实现 (perf_memory.c):

#include "perf_monitor.h"
#include <stdlib.h>
#include <string.h>

// 内存监控数据
static struct {
    uint32_t total_heap;
    uint32_t used_heap;
    uint32_t free_heap;
    uint32_t peak_usage;
    uint32_t alloc_count;
    uint32_t free_count;
    bool initialized;
} memory_monitor;

// 内存块头部
typedef struct MemBlock {
    size_t size;
    struct MemBlock *next;
    uint32_t magic;  // 用于检测内存损坏
} MemBlock_t;

#define MEM_MAGIC 0xDEADBEEF

// 内存块链表
static MemBlock_t *mem_list_head = NULL;

/**
 * @brief 初始化内存监控
 */
void Perf_Memory_Init(uint32_t heap_size) {
    memory_monitor.total_heap = heap_size;
    memory_monitor.used_heap = 0;
    memory_monitor.free_heap = heap_size;
    memory_monitor.peak_usage = 0;
    memory_monitor.alloc_count = 0;
    memory_monitor.free_count = 0;
    memory_monitor.initialized = true;

    mem_list_head = NULL;
}

/**
 * @brief 包装的malloc函数
 */
void* Perf_Malloc(size_t size) {
    if (!memory_monitor.initialized) {
        return malloc(size);
    }

    // 分配额外空间存储头部
    size_t total_size = size + sizeof(MemBlock_t);
    MemBlock_t *block = (MemBlock_t*)malloc(total_size);

    if (block) {
        // 填充头部
        block->size = size;
        block->magic = MEM_MAGIC;
        block->next = mem_list_head;
        mem_list_head = block;

        // 更新统计
        memory_monitor.used_heap += size;
        memory_monitor.free_heap -= size;
        memory_monitor.alloc_count++;

        // 更新峰值
        if (memory_monitor.used_heap > memory_monitor.peak_usage) {
            memory_monitor.peak_usage = memory_monitor.used_heap;
        }

        // 返回用户数据指针
        return (void*)((uint8_t*)block + sizeof(MemBlock_t));
    }

    return NULL;
}

/**
 * @brief 包装的free函数
 */
void Perf_Free(void *ptr) {
    if (!ptr || !memory_monitor.initialized) {
        free(ptr);
        return;
    }

    // 获取块头部
    MemBlock_t *block = (MemBlock_t*)((uint8_t*)ptr - sizeof(MemBlock_t));

    // 检查魔数
    if (block->magic != MEM_MAGIC) {
        // 内存损坏
        return;
    }

    // 从链表中移除
    MemBlock_t **current = &mem_list_head;
    while (*current) {
        if (*current == block) {
            *current = block->next;
            break;
        }
        current = &(*current)->next;
    }

    // 更新统计
    memory_monitor.used_heap -= block->size;
    memory_monitor.free_heap += block->size;
    memory_monitor.free_count++;

    // 释放内存
    free(block);
}

/**
 * @brief 更新内存监控数据
 */
void Perf_Memory_Update(void) {
    if (!memory_monitor.initialized) {
        return;
    }

    // 发送数据
    PerfMemoryData_t data = {
        .timestamp = HAL_GetTick(),
        .total_heap = memory_monitor.total_heap,
        .used_heap = memory_monitor.used_heap,
        .free_heap = memory_monitor.free_heap,
        .peak_usage = memory_monitor.peak_usage
    };

    Perf_SendData(PERF_TYPE_MEMORY, &data, sizeof(data));
}

/**
 * @brief 获取已用内存
 */
uint32_t Perf_Memory_GetUsed(void) {
    return memory_monitor.used_heap;
}

/**
 * @brief 获取空闲内存
 */
uint32_t Perf_Memory_GetFree(void) {
    return memory_monitor.free_heap;
}

/**
 * @brief 检测内存泄漏
 */
bool Perf_Memory_CheckLeaks(void) {
    return (memory_monitor.alloc_count != memory_monitor.free_count);
}

2.4 函数性能监控

函数性能监控实现 (perf_function.c):

#include "perf_monitor.h"
#include <string.h>

// 函数性能计数器数组
static PerfFunctionCounter_t function_counters[PERF_MAX_FUNCTIONS];
static int function_count = 0;

// 当前测量状态
typedef struct {
    int func_id;
    uint32_t start_cycles;
} PerfMeasureState_t;

static PerfMeasureState_t measure_stack[10];
static int measure_stack_top = 0;

/**
 * @brief 初始化函数性能监控
 */
void Perf_Function_Init(void) {
    memset(function_counters, 0, sizeof(function_counters));
    function_count = 0;
    measure_stack_top = 0;
}

/**
 * @brief 注册函数
 */
int Perf_Function_Register(const char *name) {
    if (function_count >= PERF_MAX_FUNCTIONS) {
        return -1;
    }

    // 检查是否已注册
    for (int i = 0; i < function_count; i++) {
        if (strcmp(function_counters[i].name, name) == 0) {
            return i;
        }
    }

    // 注册新函数
    int id = function_count++;
    function_counters[id].name = name;
    function_counters[id].func_id = id;
    function_counters[id].call_count = 0;
    function_counters[id].total_cycles = 0;
    function_counters[id].min_cycles = UINT32_MAX;
    function_counters[id].max_cycles = 0;
    function_counters[id].enabled = true;

    return id;
}

/**
 * @brief 开始测量函数
 */
void Perf_Function_Start(int func_id) {
    if (func_id < 0 || func_id >= function_count) {
        return;
    }

    if (!function_counters[func_id].enabled) {
        return;
    }

    // 压栈
    if (measure_stack_top < 10) {
        measure_stack[measure_stack_top].func_id = func_id;
        measure_stack[measure_stack_top].start_cycles = DWT_CYCCNT;
        measure_stack_top++;
    }
}

/**
 * @brief 结束测量函数
 */
void Perf_Function_End(int func_id) {
    if (func_id < 0 || func_id >= function_count) {
        return;
    }

    if (!function_counters[func_id].enabled) {
        return;
    }

    // 出栈
    if (measure_stack_top > 0) {
        measure_stack_top--;

        if (measure_stack[measure_stack_top].func_id == func_id) {
            uint32_t end_cycles = DWT_CYCCNT;
            uint32_t elapsed = end_cycles - measure_stack[measure_stack_top].start_cycles;

            // 更新统计
            PerfFunctionCounter_t *counter = &function_counters[func_id];
            counter->call_count++;
            counter->total_cycles += elapsed;

            if (elapsed < counter->min_cycles) {
                counter->min_cycles = elapsed;
            }
            if (elapsed > counter->max_cycles) {
                counter->max_cycles = elapsed;
            }
        }
    }
}

/**
 * @brief 获取函数统计信息
 */
void Perf_Function_GetStats(int func_id, PerfFunctionCounter_t *stats) {
    if (func_id < 0 || func_id >= function_count || !stats) {
        return;
    }

    memcpy(stats, &function_counters[func_id], sizeof(PerfFunctionCounter_t));
}

/**
 * @brief 发送所有函数统计数据
 */
void Perf_Function_SendAll(void) {
    for (int i = 0; i < function_count; i++) {
        if (function_counters[i].call_count > 0) {
            // 计算平均值
            uint32_t avg_cycles = function_counters[i].total_cycles / 
                                 function_counters[i].call_count;

            // 构造数据包
            typedef struct {
                uint32_t timestamp;
                uint16_t func_id;
                char func_name[32];
                uint32_t call_count;
                uint32_t total_cycles;
                uint32_t min_cycles;
                uint32_t max_cycles;
                uint32_t avg_cycles;
            } FunctionPerfData_t;

            FunctionPerfData_t data = {
                .timestamp = HAL_GetTick(),
                .func_id = i,
                .call_count = function_counters[i].call_count,
                .total_cycles = function_counters[i].total_cycles,
                .min_cycles = function_counters[i].min_cycles,
                .max_cycles = function_counters[i].max_cycles,
                .avg_cycles = avg_cycles
            };

            strncpy(data.func_name, function_counters[i].name, 31);
            data.func_name[31] = '\0';

            Perf_SendData(PERF_TYPE_FUNCTION, &data, sizeof(data));
        }
    }
}

/**
 * @brief 重置函数统计
 */
void Perf_Function_Reset(int func_id) {
    if (func_id < 0) {
        // 重置所有
        for (int i = 0; i < function_count; i++) {
            function_counters[i].call_count = 0;
            function_counters[i].total_cycles = 0;
            function_counters[i].min_cycles = UINT32_MAX;
            function_counters[i].max_cycles = 0;
        }
    } else if (func_id < function_count) {
        // 重置指定函数
        function_counters[func_id].call_count = 0;
        function_counters[func_id].total_cycles = 0;
        function_counters[func_id].min_cycles = UINT32_MAX;
        function_counters[func_id].max_cycles = 0;
    }
}

2.5 数据通信协议

通信协议实现 (perf_comm.c):

#include "perf_monitor.h"
#include <string.h>

// 协议定义
#define PERF_HEADER  0xAA55
#define PERF_TAIL    0x0D

// 数据包结构
typedef struct __attribute__((packed)) {
    uint16_t header;
    uint16_t length;
    uint8_t  type;
    uint8_t  data[256];
    uint16_t crc;
    uint8_t  tail;
} PerfPacket_t;

// 发送回调
static void (*send_callback)(uint8_t *data, uint16_t len) = NULL;

/**
 * @brief 计算CRC16
 */
static uint16_t Calculate_CRC16(uint8_t *data, uint16_t len) {
    uint16_t crc = 0xFFFF;

    for (uint16_t i = 0; i < len; i++) {
        crc ^= data[i];
        for (uint8_t j = 0; j < 8; j++) {
            if (crc & 0x0001) {
                crc = (crc >> 1) ^ 0xA001;
            } else {
                crc >>= 1;
            }
        }
    }

    return crc;
}

/**
 * @brief 设置发送回调
 */
void Perf_Comm_SetCallback(void (*callback)(uint8_t *data, uint16_t len)) {
    send_callback = callback;
}

/**
 * @brief 发送性能数据
 */
void Perf_SendData(PerfDataType_t type, void *data, uint16_t len) {
    if (!send_callback || len > 256) {
        return;
    }

    PerfPacket_t packet;

    // 填充包头
    packet.header = PERF_HEADER;
    packet.length = len;
    packet.type = type;

    // 复制数据
    memcpy(packet.data, data, len);

    // 计算CRC (header + length + type + data)
    uint8_t *crc_data = (uint8_t*)&packet;
    uint16_t crc_len = 5 + len;  // 2(header) + 2(length) + 1(type) + len
    packet.crc = Calculate_CRC16(crc_data, crc_len);

    // 填充包尾
    packet.tail = PERF_TAIL;

    // 发送数据包
    uint16_t packet_len = 5 + len + 3;  // header+length+type+data+crc+tail
    send_callback((uint8_t*)&packet, packet_len);
}

/**
 * @brief 串口发送回调实现示例
 */
void UART_SendCallback(uint8_t *data, uint16_t len) {
    // 使用HAL库发送
    HAL_UART_Transmit(&huart1, data, len, 100);
}

2.6 主监控模块

主监控模块实现 (perf_monitor.c):

#include "perf_monitor.h"
#include <string.h>

// 监控配置
static PerfMonitorConfig_t monitor_config;
static bool monitor_running = false;
static uint32_t last_update_time = 0;

/**
 * @brief 初始化性能监控
 */
void Perf_Init(PerfMonitorConfig_t *config) {
    if (!config) {
        return;
    }

    // 保存配置
    memcpy(&monitor_config, config, sizeof(PerfMonitorConfig_t));

    // 初始化各模块
    if (monitor_config.cpu_monitor_enabled) {
        Perf_CPU_Init();
    }

    if (monitor_config.memory_monitor_enabled) {
        Perf_Memory_Init(64 * 1024);  // 假设64KB堆
    }

    if (monitor_config.function_monitor_enabled) {
        Perf_Function_Init();
    }

    // 设置通信回调
    if (monitor_config.send_callback) {
        Perf_Comm_SetCallback(monitor_config.send_callback);
    }

    last_update_time = HAL_GetTick();
}

/**
 * @brief 启动性能监控
 */
void Perf_Start(void) {
    monitor_running = true;
}

/**
 * @brief 停止性能监控
 */
void Perf_Stop(void) {
    monitor_running = false;
}

/**
 * @brief 重置性能监控
 */
void Perf_Reset(void) {
    if (monitor_config.function_monitor_enabled) {
        Perf_Function_Reset(-1);  // 重置所有函数
    }

    last_update_time = HAL_GetTick();
}

/**
 * @brief 性能监控更新
 * 
 * 应在主循环或定时器中周期性调用
 */
void Perf_Update(void) {
    if (!monitor_running) {
        return;
    }

    uint32_t current_time = HAL_GetTick();
    uint32_t elapsed = current_time - last_update_time;

    if (elapsed >= monitor_config.sample_interval_ms) {
        // 更新CPU监控
        if (monitor_config.cpu_monitor_enabled) {
            Perf_CPU_Update();
        }

        // 更新内存监控
        if (monitor_config.memory_monitor_enabled) {
            Perf_Memory_Update();
        }

        // 发送函数性能数据
        if (monitor_config.function_monitor_enabled) {
            Perf_Function_SendAll();
        }

        last_update_time = current_time;
    }
}

2.7 使用示例

示例代码 (main.c):

#include "perf_monitor.h"

// 外部UART句柄
extern UART_HandleTypeDef huart1;

// 串口发送回调
void UART_SendData(uint8_t *data, uint16_t len) {
    HAL_UART_Transmit(&huart1, data, len, 100);
}

// 测试函数1
void TestFunction1(void) {
    PERF_MEASURE_START(TestFunction1);

    // 模拟工作
    for (volatile int i = 0; i < 10000; i++);

    PERF_MEASURE_END(TestFunction1);
}

// 测试函数2
void TestFunction2(void) {
    PERF_MEASURE_START(TestFunction2);

    // 模拟工作
    HAL_Delay(10);

    PERF_MEASURE_END(TestFunction2);
}

// 空闲任务(FreeRTOS)
void vApplicationIdleHook(void) {
    Perf_CPU_RecordIdle();
}

int main(void) {
    // 硬件初始化
    HAL_Init();
    SystemClock_Config();
    MX_UART1_Init();

    // 配置性能监控
    PerfMonitorConfig_t config = {
        .cpu_monitor_enabled = true,
        .memory_monitor_enabled = true,
        .task_monitor_enabled = false,
        .function_monitor_enabled = true,
        .sample_interval_ms = 100,
        .send_callback = UART_SendData
    };

    // 初始化性能监控
    Perf_Init(&config);
    Perf_Start();

    // 主循环
    while (1) {
        // 更新性能监控
        Perf_Update();

        // 调用测试函数
        TestFunction1();
        TestFunction2();

        // 使用监控的内存分配
        void *ptr = Perf_Malloc(100);
        if (ptr) {
            // 使用内存
            Perf_Free(ptr);
        }

        HAL_Delay(50);
    }
}

步骤3: 主机端数据接收与解析

3.1 数据接收模块

数据接收器 (data_receiver.py):

import serial
import threading
import queue
import struct
import logging
from typing import Optional, Callable
from dataclasses import dataclass

@dataclass
class PerfPacket:
    """性能数据包"""
    header: int
    length: int
    type: int
    data: bytes
    crc: int
    tail: int
    timestamp: float  # 接收时间戳

class DataReceiver:
    """数据接收器"""

    # 数据类型
    TYPE_CPU = 0x01
    TYPE_MEMORY = 0x02
    TYPE_TASK = 0x03
    TYPE_FUNCTION = 0x04
    TYPE_INTERRUPT = 0x05
    TYPE_CUSTOM = 0x06

    def __init__(self, port: str, baudrate: int = 115200):
        """
        初始化数据接收器

        Args:
            port: 串口名称
            baudrate: 波特率
        """
        self.port = port
        self.baudrate = baudrate
        self.serial_port: Optional[serial.Serial] = None
        self.running = False
        self.rx_thread: Optional[threading.Thread] = None
        self.packet_queue = queue.Queue()
        self.callbacks = {}
        self.logger = logging.getLogger(__name__)

        # 统计信息
        self.stats = {
            'packets_received': 0,
            'packets_valid': 0,
            'packets_invalid': 0,
            'crc_errors': 0,
            'bytes_received': 0
        }

    def open(self) -> bool:
        """打开串口"""
        try:
            self.serial_port = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                bytesize=serial.EIGHTBITS,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                timeout=0.1
            )
            self.logger.info(f"串口 {self.port} 打开成功")
            return True
        except serial.SerialException as e:
            self.logger.error(f"打开串口失败: {e}")
            return False

    def close(self):
        """关闭串口"""
        self.stop()
        if self.serial_port and self.serial_port.is_open:
            self.serial_port.close()
            self.logger.info("串口已关闭")

    def start(self):
        """启动接收线程"""
        if not self.serial_port or not self.serial_port.is_open:
            self.logger.error("串口未打开")
            return

        self.running = True
        self.rx_thread = threading.Thread(target=self._rx_thread_func, daemon=True)
        self.rx_thread.start()
        self.logger.info("接收线程已启动")

    def stop(self):
        """停止接收线程"""
        self.running = False
        if self.rx_thread:
            self.rx_thread.join(timeout=2.0)
            self.logger.info("接收线程已停止")

    def register_callback(self, data_type: int, callback: Callable):
        """
        注册数据类型回调

        Args:
            data_type: 数据类型
            callback: 回调函数
        """
        self.callbacks[data_type] = callback

    def get_packet(self, timeout: float = 0.1) -> Optional[PerfPacket]:
        """
        从队列获取数据包

        Args:
            timeout: 超时时间(秒)

        Returns:
            数据包或None
        """
        try:
            return self.packet_queue.get(timeout=timeout)
        except queue.Empty:
            return None

    def _calculate_crc16(self, data: bytes) -> int:
        """计算CRC16"""
        crc = 0xFFFF
        for byte in data:
            crc ^= byte
            for _ in range(8):
                if crc & 0x0001:
                    crc = (crc >> 1) ^ 0xA001
                else:
                    crc >>= 1
        return crc

    def _parse_packet(self, buffer: bytes) -> Optional[PerfPacket]:
        """
        解析数据包

        Args:
            buffer: 数据缓冲区

        Returns:
            解析的数据包或None
        """
        if len(buffer) < 8:  # 最小包长度
            return None

        try:
            # 查找包头
            header_pos = buffer.find(b'\x55\xAA')
            if header_pos == -1:
                return None

            buffer = buffer[header_pos:]

            # 解析包头
            header = struct.unpack('<H', buffer[0:2])[0]
            length = struct.unpack('<H', buffer[2:4])[0]
            data_type = buffer[4]

            # 检查长度
            packet_len = 5 + length + 3  # header+length+type+data+crc+tail
            if len(buffer) < packet_len:
                return None

            # 提取数据
            data = buffer[5:5+length]
            crc_received = struct.unpack('<H', buffer[5+length:7+length])[0]
            tail = buffer[7+length]

            # 验证CRC
            crc_data = buffer[0:5+length]
            crc_calculated = self._calculate_crc16(crc_data)

            if crc_received != crc_calculated:
                self.stats['crc_errors'] += 1
                self.logger.warning(f"CRC错误: 接收={crc_received:04X}, 计算={crc_calculated:04X}")
                return None

            # 验证包尾
            if tail != 0x0D:
                self.logger.warning(f"包尾错误: {tail:02X}")
                return None

            # 创建数据包
            import time
            packet = PerfPacket(
                header=header,
                length=length,
                type=data_type,
                data=data,
                crc=crc_received,
                tail=tail,
                timestamp=time.time()
            )

            self.stats['packets_valid'] += 1
            return packet

        except Exception as e:
            self.logger.error(f"解析数据包失败: {e}")
            self.stats['packets_invalid'] += 1
            return None

    def _rx_thread_func(self):
        """接收线程函数"""
        buffer = bytearray()

        while self.running:
            try:
                # 读取数据
                if self.serial_port.in_waiting > 0:
                    data = self.serial_port.read(self.serial_port.in_waiting)
                    buffer.extend(data)
                    self.stats['bytes_received'] += len(data)

                    # 解析数据包
                    while len(buffer) >= 8:
                        packet = self._parse_packet(bytes(buffer))

                        if packet:
                            # 放入队列
                            self.packet_queue.put(packet)
                            self.stats['packets_received'] += 1

                            # 调用回调
                            if packet.type in self.callbacks:
                                self.callbacks[packet.type](packet)

                            # 移除已处理的数据
                            packet_len = 5 + packet.length + 3
                            buffer = buffer[packet_len:]
                        else:
                            # 移除第一个字节,继续查找
                            if len(buffer) > 0:
                                buffer = buffer[1:]
                            break

            except Exception as e:
                self.logger.error(f"接收线程错误: {e}")

    def get_stats(self) -> dict:
        """获取统计信息"""
        return self.stats.copy()

3.2 数据解析模块

数据解析器 (data_parser.py):

import struct
from dataclasses import dataclass
from typing import Optional
import logging

@dataclass
class CPUData:
    """CPU性能数据"""
    timestamp: int
    cpu_usage: int
    idle_time: int
    total_time: int

@dataclass
class MemoryData:
    """内存性能数据"""
    timestamp: int
    total_heap: int
    used_heap: int
    free_heap: int
    peak_usage: int

@dataclass
class FunctionData:
    """函数性能数据"""
    timestamp: int
    func_id: int
    func_name: str
    call_count: int
    total_cycles: int
    min_cycles: int
    max_cycles: int
    avg_cycles: int

class DataParser:
    """数据解析器"""

    def __init__(self):
        self.logger = logging.getLogger(__name__)

    def parse_cpu_data(self, data: bytes) -> Optional[CPUData]:
        """
        解析CPU数据

        Args:
            data: 原始数据

        Returns:
            CPU数据对象
        """
        try:
            if len(data) < 13:
                return None

            timestamp, cpu_usage, idle_time, total_time = struct.unpack('<IBII', data)

            return CPUData(
                timestamp=timestamp,
                cpu_usage=cpu_usage,
                idle_time=idle_time,
                total_time=total_time
            )
        except Exception as e:
            self.logger.error(f"解析CPU数据失败: {e}")
            return None

    def parse_memory_data(self, data: bytes) -> Optional[MemoryData]:
        """
        解析内存数据

        Args:
            data: 原始数据

        Returns:
            内存数据对象
        """
        try:
            if len(data) < 20:
                return None

            timestamp, total_heap, used_heap, free_heap, peak_usage = \
                struct.unpack('<IIIII', data)

            return MemoryData(
                timestamp=timestamp,
                total_heap=total_heap,
                used_heap=used_heap,
                free_heap=free_heap,
                peak_usage=peak_usage
            )
        except Exception as e:
            self.logger.error(f"解析内存数据失败: {e}")
            return None

    def parse_function_data(self, data: bytes) -> Optional[FunctionData]:
        """
        解析函数性能数据

        Args:
            data: 原始数据

        Returns:
            函数数据对象
        """
        try:
            if len(data) < 58:
                return None

            # 解析固定部分
            timestamp, func_id = struct.unpack('<IH', data[0:6])
            func_name = data[6:38].decode('utf-8', errors='ignore').rstrip('\x00')
            call_count, total_cycles, min_cycles, max_cycles, avg_cycles = \
                struct.unpack('<IIIII', data[38:58])

            return FunctionData(
                timestamp=timestamp,
                func_id=func_id,
                func_name=func_name,
                call_count=call_count,
                total_cycles=total_cycles,
                min_cycles=min_cycles,
                max_cycles=max_cycles,
                avg_cycles=avg_cycles
            )
        except Exception as e:
            self.logger.error(f"解析函数数据失败: {e}")
            return None

3.3 数据存储模块

数据存储器 (data_storage.py):

import sqlite3
import json
from typing import List, Optional, Dict, Any
from datetime import datetime
import logging

class DataStorage:
    """数据存储器"""

    def __init__(self, db_path: str = "performance_data.db"):
        """
        初始化数据存储

        Args:
            db_path: 数据库文件路径
        """
        self.db_path = db_path
        self.conn: Optional[sqlite3.Connection] = None
        self.logger = logging.getLogger(__name__)
        self._init_database()

    def _init_database(self):
        """初始化数据库"""
        try:
            self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
            cursor = self.conn.cursor()

            # 创建CPU数据表
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS cpu_data (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp INTEGER,
                    cpu_usage INTEGER,
                    idle_time INTEGER,
                    total_time INTEGER,
                    recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            # 创建内存数据表
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS memory_data (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp INTEGER,
                    total_heap INTEGER,
                    used_heap INTEGER,
                    free_heap INTEGER,
                    peak_usage INTEGER,
                    recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            # 创建函数性能表
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS function_data (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp INTEGER,
                    func_id INTEGER,
                    func_name TEXT,
                    call_count INTEGER,
                    total_cycles INTEGER,
                    min_cycles INTEGER,
                    max_cycles INTEGER,
                    avg_cycles INTEGER,
                    recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            # 创建会话表
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS sessions (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    session_name TEXT,
                    start_time TIMESTAMP,
                    end_time TIMESTAMP,
                    description TEXT,
                    metadata TEXT
                )
            ''')

            self.conn.commit()
            self.logger.info("数据库初始化成功")

        except sqlite3.Error as e:
            self.logger.error(f"数据库初始化失败: {e}")

    def save_cpu_data(self, data: 'CPUData'):
        """保存CPU数据"""
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                INSERT INTO cpu_data (timestamp, cpu_usage, idle_time, total_time)
                VALUES (?, ?, ?, ?)
            ''', (data.timestamp, data.cpu_usage, data.idle_time, data.total_time))
            self.conn.commit()
        except sqlite3.Error as e:
            self.logger.error(f"保存CPU数据失败: {e}")

    def save_memory_data(self, data: 'MemoryData'):
        """保存内存数据"""
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                INSERT INTO memory_data 
                (timestamp, total_heap, used_heap, free_heap, peak_usage)
                VALUES (?, ?, ?, ?, ?)
            ''', (data.timestamp, data.total_heap, data.used_heap, 
                  data.free_heap, data.peak_usage))
            self.conn.commit()
        except sqlite3.Error as e:
            self.logger.error(f"保存内存数据失败: {e}")

    def save_function_data(self, data: 'FunctionData'):
        """保存函数性能数据"""
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                INSERT INTO function_data 
                (timestamp, func_id, func_name, call_count, total_cycles,
                 min_cycles, max_cycles, avg_cycles)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            ''', (data.timestamp, data.func_id, data.func_name, data.call_count,
                  data.total_cycles, data.min_cycles, data.max_cycles, data.avg_cycles))
            self.conn.commit()
        except sqlite3.Error as e:
            self.logger.error(f"保存函数数据失败: {e}")

    def query_cpu_data(self, start_time: int = 0, end_time: int = None, 
                       limit: int = 1000) -> List[Dict[str, Any]]:
        """
        查询CPU数据

        Args:
            start_time: 开始时间戳
            end_time: 结束时间戳
            limit: 最大记录数

        Returns:
            数据列表
        """
        try:
            cursor = self.conn.cursor()

            if end_time:
                cursor.execute('''
                    SELECT * FROM cpu_data 
                    WHERE timestamp >= ? AND timestamp <= ?
                    ORDER BY timestamp DESC LIMIT ?
                ''', (start_time, end_time, limit))
            else:
                cursor.execute('''
                    SELECT * FROM cpu_data 
                    WHERE timestamp >= ?
                    ORDER BY timestamp DESC LIMIT ?
                ''', (start_time, limit))

            columns = [desc[0] for desc in cursor.description]
            results = []
            for row in cursor.fetchall():
                results.append(dict(zip(columns, row)))

            return results

        except sqlite3.Error as e:
            self.logger.error(f"查询CPU数据失败: {e}")
            return []

    def query_memory_data(self, start_time: int = 0, end_time: int = None,
                          limit: int = 1000) -> List[Dict[str, Any]]:
        """查询内存数据"""
        try:
            cursor = self.conn.cursor()

            if end_time:
                cursor.execute('''
                    SELECT * FROM memory_data 
                    WHERE timestamp >= ? AND timestamp <= ?
                    ORDER BY timestamp DESC LIMIT ?
                ''', (start_time, end_time, limit))
            else:
                cursor.execute('''
                    SELECT * FROM memory_data 
                    WHERE timestamp >= ?
                    ORDER BY timestamp DESC LIMIT ?
                ''', (start_time, limit))

            columns = [desc[0] for desc in cursor.description]
            results = []
            for row in cursor.fetchall():
                results.append(dict(zip(columns, row)))

            return results

        except sqlite3.Error as e:
            self.logger.error(f"查询内存数据失败: {e}")
            return []

    def query_function_data(self, func_name: str = None, 
                           limit: int = 100) -> List[Dict[str, Any]]:
        """查询函数性能数据"""
        try:
            cursor = self.conn.cursor()

            if func_name:
                cursor.execute('''
                    SELECT * FROM function_data 
                    WHERE func_name = ?
                    ORDER BY timestamp DESC LIMIT ?
                ''', (func_name, limit))
            else:
                cursor.execute('''
                    SELECT * FROM function_data 
                    ORDER BY timestamp DESC LIMIT ?
                ''', (limit,))

            columns = [desc[0] for desc in cursor.description]
            results = []
            for row in cursor.fetchall():
                results.append(dict(zip(columns, row)))

            return results

        except sqlite3.Error as e:
            self.logger.error(f"查询函数数据失败: {e}")
            return []

    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""
        try:
            cursor = self.conn.cursor()

            stats = {}

            # CPU统计
            cursor.execute('SELECT COUNT(*), AVG(cpu_usage), MAX(cpu_usage) FROM cpu_data')
            row = cursor.fetchone()
            stats['cpu'] = {
                'count': row[0],
                'avg_usage': row[1],
                'max_usage': row[2]
            }

            # 内存统计
            cursor.execute('SELECT COUNT(*), AVG(used_heap), MAX(peak_usage) FROM memory_data')
            row = cursor.fetchone()
            stats['memory'] = {
                'count': row[0],
                'avg_used': row[1],
                'peak_usage': row[2]
            }

            # 函数统计
            cursor.execute('SELECT COUNT(DISTINCT func_name) FROM function_data')
            stats['functions'] = {
                'unique_count': cursor.fetchone()[0]
            }

            return stats

        except sqlite3.Error as e:
            self.logger.error(f"获取统计信息失败: {e}")
            return {}

    def clear_data(self, table: str = None):
        """
        清空数据

        Args:
            table: 表名,None表示清空所有表
        """
        try:
            cursor = self.conn.cursor()

            if table:
                cursor.execute(f'DELETE FROM {table}')
            else:
                cursor.execute('DELETE FROM cpu_data')
                cursor.execute('DELETE FROM memory_data')
                cursor.execute('DELETE FROM function_data')

            self.conn.commit()
            self.logger.info(f"数据已清空: {table or '所有表'}")

        except sqlite3.Error as e:
            self.logger.error(f"清空数据失败: {e}")

    def close(self):
        """关闭数据库连接"""
        if self.conn:
            self.conn.close()
            self.logger.info("数据库连接已关闭")

步骤4: 数据可视化实现

4.1 实时图表绘制

实时图表类 (realtime_chart.py):

import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from collections import deque
import numpy as np
from typing import List, Tuple
import logging

class RealtimeChart:
    """实时图表"""

    def __init__(self, title: str, ylabel: str, max_points: int = 100):
        """
        初始化实时图表

        Args:
            title: 图表标题
            ylabel: Y轴标签
            max_points: 最大数据点数
        """
        self.title = title
        self.ylabel = ylabel
        self.max_points = max_points

        # 数据缓冲区
        self.time_data = deque(maxlen=max_points)
        self.value_data = deque(maxlen=max_points)

        # 创建图表
        self.fig, self.ax = plt.subplots(figsize=(10, 6))
        self.line, = self.ax.plot([], [], 'b-', linewidth=2)

        # 配置图表
        self.ax.set_title(title, fontsize=14, fontweight='bold')
        self.ax.set_xlabel('时间 (s)', fontsize=12)
        self.ax.set_ylabel(ylabel, fontsize=12)
        self.ax.grid(True, alpha=0.3)

        self.logger = logging.getLogger(__name__)

    def add_data(self, time: float, value: float):
        """
        添加数据点

        Args:
            time: 时间
            value: 值
        """
        self.time_data.append(time)
        self.value_data.append(value)

    def update(self, frame):
        """更新图表"""
        if len(self.time_data) > 0:
            self.line.set_data(list(self.time_data), list(self.value_data))

            # 自动调整坐标轴
            self.ax.relim()
            self.ax.autoscale_view()

        return self.line,

    def start(self, interval: int = 100):
        """
        启动动画

        Args:
            interval: 更新间隔(ms)
        """
        self.anim = FuncAnimation(
            self.fig, 
            self.update, 
            interval=interval,
            blit=True
        )
        plt.show()

    def save(self, filename: str):
        """保存图表"""
        self.fig.savefig(filename, dpi=300, bbox_inches='tight')
        self.logger.info(f"图表已保存: {filename}")

class MultiLineChart:
    """多线图表"""

    def __init__(self, title: str, ylabel: str, 
                 line_labels: List[str], max_points: int = 100):
        """
        初始化多线图表

        Args:
            title: 图表标题
            ylabel: Y轴标签
            line_labels: 线条标签列表
            max_points: 最大数据点数
        """
        self.title = title
        self.ylabel = ylabel
        self.line_labels = line_labels
        self.max_points = max_points

        # 数据缓冲区
        self.time_data = deque(maxlen=max_points)
        self.value_data = {label: deque(maxlen=max_points) 
                          for label in line_labels}

        # 创建图表
        self.fig, self.ax = plt.subplots(figsize=(12, 6))

        # 创建线条
        colors = ['b', 'r', 'g', 'orange', 'purple']
        self.lines = {}
        for i, label in enumerate(line_labels):
            color = colors[i % len(colors)]
            line, = self.ax.plot([], [], color=color, linewidth=2, label=label)
            self.lines[label] = line

        # 配置图表
        self.ax.set_title(title, fontsize=14, fontweight='bold')
        self.ax.set_xlabel('时间 (s)', fontsize=12)
        self.ax.set_ylabel(ylabel, fontsize=12)
        self.ax.legend(loc='upper right')
        self.ax.grid(True, alpha=0.3)

        self.logger = logging.getLogger(__name__)

    def add_data(self, time: float, values: dict):
        """
        添加数据点

        Args:
            time: 时间
            values: 值字典 {label: value}
        """
        self.time_data.append(time)
        for label, value in values.items():
            if label in self.value_data:
                self.value_data[label].append(value)

    def update(self, frame):
        """更新图表"""
        if len(self.time_data) > 0:
            time_list = list(self.time_data)

            for label, line in self.lines.items():
                value_list = list(self.value_data[label])
                line.set_data(time_list, value_list)

            # 自动调整坐标轴
            self.ax.relim()
            self.ax.autoscale_view()

        return list(self.lines.values())

    def start(self, interval: int = 100):
        """启动动画"""
        self.anim = FuncAnimation(
            self.fig,
            self.update,
            interval=interval,
            blit=True
        )
        plt.show()

4.2 统计图表

统计图表类 (histogram.py):

import matplotlib.pyplot as plt
import numpy as np
from typing import List, Dict, Any
import logging

class PerformanceHistogram:
    """性能直方图"""

    def __init__(self):
        self.logger = logging.getLogger(__name__)

    def plot_function_performance(self, function_data: List[Dict[str, Any]],
                                  top_n: int = 10):
        """
        绘制函数性能对比图

        Args:
            function_data: 函数性能数据列表
            top_n: 显示前N个函数
        """
        if not function_data:
            self.logger.warning("没有函数数据")
            return

        # 按平均执行时间排序
        sorted_data = sorted(function_data, 
                           key=lambda x: x['avg_cycles'], 
                           reverse=True)[:top_n]

        func_names = [d['func_name'] for d in sorted_data]
        avg_cycles = [d['avg_cycles'] for d in sorted_data]
        call_counts = [d['call_count'] for d in sorted_data]

        # 创建图表
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

        # 平均执行时间
        bars1 = ax1.barh(func_names, avg_cycles, color='steelblue')
        ax1.set_xlabel('平均周期数', fontsize=12)
        ax1.set_title('函数平均执行时间 (Top 10)', fontsize=14, fontweight='bold')
        ax1.grid(axis='x', alpha=0.3)

        # 添加数值标签
        for bar in bars1:
            width = bar.get_width()
            ax1.text(width, bar.get_y() + bar.get_height()/2,
                    f'{int(width)}',
                    ha='left', va='center', fontsize=10)

        # 调用次数
        bars2 = ax2.barh(func_names, call_counts, color='coral')
        ax2.set_xlabel('调用次数', fontsize=12)
        ax2.set_title('函数调用次数 (Top 10)', fontsize=14, fontweight='bold')
        ax2.grid(axis='x', alpha=0.3)

        # 添加数值标签
        for bar in bars2:
            width = bar.get_width()
            ax2.text(width, bar.get_y() + bar.get_height()/2,
                    f'{int(width)}',
                    ha='left', va='center', fontsize=10)

        plt.tight_layout()
        plt.show()

    def plot_memory_usage(self, memory_data: List[Dict[str, Any]]):
        """
        绘制内存使用情况

        Args:
            memory_data: 内存数据列表
        """
        if not memory_data:
            self.logger.warning("没有内存数据")
            return

        # 提取数据
        timestamps = [d['timestamp'] / 1000.0 for d in memory_data]  # 转换为秒
        used_heap = [d['used_heap'] / 1024 for d in memory_data]  # 转换为KB
        free_heap = [d['free_heap'] / 1024 for d in memory_data]

        # 创建图表
        fig, ax = plt.subplots(figsize=(12, 6))

        # 堆叠面积图
        ax.fill_between(timestamps, 0, used_heap, 
                        alpha=0.6, color='steelblue', label='已用内存')
        ax.fill_between(timestamps, used_heap, 
                        [u + f for u, f in zip(used_heap, free_heap)],
                        alpha=0.6, color='lightgreen', label='空闲内存')

        ax.set_xlabel('时间 (s)', fontsize=12)
        ax.set_ylabel('内存 (KB)', fontsize=12)
        ax.set_title('内存使用情况', fontsize=14, fontweight='bold')
        ax.legend(loc='upper right')
        ax.grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

    def plot_cpu_usage_distribution(self, cpu_data: List[Dict[str, Any]]):
        """
        绘制CPU使用率分布

        Args:
            cpu_data: CPU数据列表
        """
        if not cpu_data:
            self.logger.warning("没有CPU数据")
            return

        # 提取CPU使用率
        cpu_usage = [d['cpu_usage'] for d in cpu_data]

        # 创建图表
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

        # 直方图
        ax1.hist(cpu_usage, bins=20, color='steelblue', alpha=0.7, edgecolor='black')
        ax1.set_xlabel('CPU使用率 (%)', fontsize=12)
        ax1.set_ylabel('频次', fontsize=12)
        ax1.set_title('CPU使用率分布', fontsize=14, fontweight='bold')
        ax1.grid(axis='y', alpha=0.3)

        # 箱线图
        ax2.boxplot(cpu_usage, vert=True, patch_artist=True,
                   boxprops=dict(facecolor='lightblue'))
        ax2.set_ylabel('CPU使用率 (%)', fontsize=12)
        ax2.set_title('CPU使用率统计', fontsize=14, fontweight='bold')
        ax2.grid(axis='y', alpha=0.3)

        # 添加统计信息
        stats_text = f'平均: {np.mean(cpu_usage):.1f}%\n'
        stats_text += f'中位数: {np.median(cpu_usage):.1f}%\n'
        stats_text += f'最大: {np.max(cpu_usage):.1f}%\n'
        stats_text += f'最小: {np.min(cpu_usage):.1f}%'
        ax2.text(1.15, 0.5, stats_text, transform=ax2.transAxes,
                fontsize=10, verticalalignment='center',
                bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

        plt.tight_layout()
        plt.show()

步骤5: 性能分析算法

5.1 性能分析器

性能分析器 (data_analyzer.py):

import numpy as np
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass
import logging

@dataclass
class PerformanceIssue:
    """性能问题"""
    severity: str  # 'critical', 'warning', 'info'
    category: str  # 'cpu', 'memory', 'function'
    description: str
    recommendation: str
    data: Dict[str, Any]

class PerformanceAnalyzer:
    """性能分析器"""

    def __init__(self):
        self.logger = logging.getLogger(__name__)

        # 阈值配置
        self.thresholds = {
            'cpu_high': 80,  # CPU使用率高阈值
            'cpu_critical': 95,  # CPU使用率临界阈值
            'memory_high': 80,  # 内存使用率高阈值
            'memory_critical': 95,  # 内存使用率临界阈值
            'function_slow': 10000,  # 函数慢阈值(周期数)
        }

    def analyze_cpu_usage(self, cpu_data: List[Dict[str, Any]]) -> List[PerformanceIssue]:
        """
        分析CPU使用情况

        Args:
            cpu_data: CPU数据列表

        Returns:
            性能问题列表
        """
        issues = []

        if not cpu_data:
            return issues

        # 提取CPU使用率
        cpu_usage = [d['cpu_usage'] for d in cpu_data]

        # 计算统计信息
        avg_usage = np.mean(cpu_usage)
        max_usage = np.max(cpu_usage)
        std_usage = np.std(cpu_usage)

        # 检查平均使用率
        if avg_usage > self.thresholds['cpu_critical']:
            issues.append(PerformanceIssue(
                severity='critical',
                category='cpu',
                description=f'CPU平均使用率过高: {avg_usage:.1f}%',
                recommendation='考虑优化算法或降低任务频率',
                data={'avg_usage': avg_usage, 'max_usage': max_usage}
            ))
        elif avg_usage > self.thresholds['cpu_high']:
            issues.append(PerformanceIssue(
                severity='warning',
                category='cpu',
                description=f'CPU平均使用率较高: {avg_usage:.1f}%',
                recommendation='建议进行性能优化',
                data={'avg_usage': avg_usage, 'max_usage': max_usage}
            ))

        # 检查峰值使用率
        if max_usage > self.thresholds['cpu_critical']:
            issues.append(PerformanceIssue(
                severity='critical',
                category='cpu',
                description=f'CPU峰值使用率达到: {max_usage:.1f}%',
                recommendation='检查是否有突发性高负载任务',
                data={'max_usage': max_usage}
            ))

        # 检查使用率波动
        if std_usage > 20:
            issues.append(PerformanceIssue(
                severity='info',
                category='cpu',
                description=f'CPU使用率波动较大: 标准差={std_usage:.1f}%',
                recommendation='考虑负载均衡或任务调度优化',
                data={'std_usage': std_usage}
            ))

        return issues

    def analyze_memory_usage(self, memory_data: List[Dict[str, Any]]) -> List[PerformanceIssue]:
        """
        分析内存使用情况

        Args:
            memory_data: 内存数据列表

        Returns:
            性能问题列表
        """
        issues = []

        if not memory_data:
            return issues

        # 计算内存使用率
        usage_rates = []
        for d in memory_data:
            if d['total_heap'] > 0:
                rate = (d['used_heap'] / d['total_heap']) * 100
                usage_rates.append(rate)

        if not usage_rates:
            return issues

        # 统计信息
        avg_usage = np.mean(usage_rates)
        max_usage = np.max(usage_rates)
        peak_usage = memory_data[-1]['peak_usage']
        total_heap = memory_data[-1]['total_heap']
        peak_rate = (peak_usage / total_heap) * 100 if total_heap > 0 else 0

        # 检查平均使用率
        if avg_usage > self.thresholds['memory_critical']:
            issues.append(PerformanceIssue(
                severity='critical',
                category='memory',
                description=f'内存平均使用率过高: {avg_usage:.1f}%',
                recommendation='检查内存泄漏,优化内存使用',
                data={'avg_usage': avg_usage, 'peak_rate': peak_rate}
            ))
        elif avg_usage > self.thresholds['memory_high']:
            issues.append(PerformanceIssue(
                severity='warning',
                category='memory',
                description=f'内存平均使用率较高: {avg_usage:.1f}%',
                recommendation='考虑优化数据结构或增加内存',
                data={'avg_usage': avg_usage, 'peak_rate': peak_rate}
            ))

        # 检查峰值使用率
        if peak_rate > self.thresholds['memory_critical']:
            issues.append(PerformanceIssue(
                severity='critical',
                category='memory',
                description=f'内存峰值使用率: {peak_rate:.1f}%',
                recommendation='存在内存不足风险,需要优化',
                data={'peak_rate': peak_rate, 'peak_usage': peak_usage}
            ))

        # 检查内存增长趋势
        if len(usage_rates) > 10:
            # 简单线性回归检测趋势
            x = np.arange(len(usage_rates))
            y = np.array(usage_rates)
            slope = np.polyfit(x, y, 1)[0]

            if slope > 0.1:  # 增长趋势
                issues.append(PerformanceIssue(
                    severity='warning',
                    category='memory',
                    description='检测到内存使用持续增长',
                    recommendation='可能存在内存泄漏,建议检查',
                    data={'growth_rate': slope}
                ))

        return issues

    def analyze_function_performance(self, function_data: List[Dict[str, Any]]) -> List[PerformanceIssue]:
        """
        分析函数性能

        Args:
            function_data: 函数性能数据列表

        Returns:
            性能问题列表
        """
        issues = []

        if not function_data:
            return issues

        # 按函数分组
        func_stats = {}
        for d in function_data:
            func_name = d['func_name']
            if func_name not in func_stats:
                func_stats[func_name] = []
            func_stats[func_name].append(d)

        # 分析每个函数
        for func_name, data_list in func_stats.items():
            # 获取最新数据
            latest = data_list[-1]
            avg_cycles = latest['avg_cycles']
            max_cycles = latest['max_cycles']
            call_count = latest['call_count']

            # 检查执行时间
            if avg_cycles > self.thresholds['function_slow']:
                issues.append(PerformanceIssue(
                    severity='warning',
                    category='function',
                    description=f'函数 {func_name} 执行较慢: 平均{avg_cycles}周期',
                    recommendation='考虑优化算法或减少计算量',
                    data={
                        'func_name': func_name,
                        'avg_cycles': avg_cycles,
                        'max_cycles': max_cycles
                    }
                ))

            # 检查执行时间波动
            if max_cycles > avg_cycles * 3:
                issues.append(PerformanceIssue(
                    severity='info',
                    category='function',
                    description=f'函数 {func_name} 执行时间波动大',
                    recommendation='检查是否有条件分支导致性能差异',
                    data={
                        'func_name': func_name,
                        'avg_cycles': avg_cycles,
                        'max_cycles': max_cycles
                    }
                ))

            # 检查调用频率
            if call_count > 1000:
                issues.append(PerformanceIssue(
                    severity='info',
                    category='function',
                    description=f'函数 {func_name} 调用频繁: {call_count}次',
                    recommendation='如果性能有问题,优先优化此函数',
                    data={
                        'func_name': func_name,
                        'call_count': call_count
                    }
                ))

        return issues

    def generate_report(self, cpu_data: List[Dict[str, Any]],
                       memory_data: List[Dict[str, Any]],
                       function_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        生成性能分析报告

        Args:
            cpu_data: CPU数据
            memory_data: 内存数据
            function_data: 函数数据

        Returns:
            分析报告
        """
        report = {
            'summary': {},
            'issues': [],
            'recommendations': []
        }

        # CPU分析
        cpu_issues = self.analyze_cpu_usage(cpu_data)
        report['issues'].extend(cpu_issues)

        if cpu_data:
            cpu_usage = [d['cpu_usage'] for d in cpu_data]
            report['summary']['cpu'] = {
                'avg_usage': np.mean(cpu_usage),
                'max_usage': np.max(cpu_usage),
                'min_usage': np.min(cpu_usage),
                'std_usage': np.std(cpu_usage)
            }

        # 内存分析
        memory_issues = self.analyze_memory_usage(memory_data)
        report['issues'].extend(memory_issues)

        if memory_data:
            latest_mem = memory_data[-1]
            report['summary']['memory'] = {
                'total_heap': latest_mem['total_heap'],
                'used_heap': latest_mem['used_heap'],
                'free_heap': latest_mem['free_heap'],
                'peak_usage': latest_mem['peak_usage'],
                'usage_rate': (latest_mem['used_heap'] / latest_mem['total_heap'] * 100)
                              if latest_mem['total_heap'] > 0 else 0
            }

        # 函数分析
        function_issues = self.analyze_function_performance(function_data)
        report['issues'].extend(function_issues)

        if function_data:
            # 找出最慢的函数
            sorted_funcs = sorted(function_data, 
                                key=lambda x: x['avg_cycles'], 
                                reverse=True)[:5]
            report['summary']['top_slow_functions'] = [
                {
                    'name': f['func_name'],
                    'avg_cycles': f['avg_cycles'],
                    'call_count': f['call_count']
                }
                for f in sorted_funcs
            ]

        # 生成建议
        critical_issues = [i for i in report['issues'] if i.severity == 'critical']
        warning_issues = [i for i in report['issues'] if i.severity == 'warning']

        if critical_issues:
            report['recommendations'].append(
                '发现严重性能问题,需要立即处理'
            )
        if warning_issues:
            report['recommendations'].append(
                '存在性能警告,建议进行优化'
            )
        if not report['issues']:
            report['recommendations'].append(
                '系统性能良好,无明显问题'
            )

        return report

步骤6: 主程序集成

6.1 主程序

主程序 (main.py):

import sys
import time
import logging
from pathlib import Path

from core.data_receiver import DataReceiver
from core.data_parser import DataParser
from core.data_storage import DataStorage
from core.data_analyzer import PerformanceAnalyzer
from visualization.realtime_chart import RealtimeChart, MultiLineChart
from visualization.histogram import PerformanceHistogram

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('performance_analyzer.log'),
        logging.StreamHandler()
    ]
)

class PerformanceAnalyzerApp:
    """性能分析工具主程序"""

    def __init__(self, port: str, baudrate: int = 115200):
        """
        初始化应用

        Args:
            port: 串口名称
            baudrate: 波特率
        """
        self.logger = logging.getLogger(__name__)

        # 初始化模块
        self.receiver = DataReceiver(port, baudrate)
        self.parser = DataParser()
        self.storage = DataStorage()
        self.analyzer = PerformanceAnalyzer()

        # 实时图表
        self.cpu_chart = RealtimeChart('CPU使用率', 'CPU (%)')
        self.memory_chart = MultiLineChart(
            '内存使用情况', 
            '内存 (KB)',
            ['已用', '空闲']
        )

        # 统计图表
        self.histogram = PerformanceHistogram()

        # 数据缓存
        self.cpu_data_cache = []
        self.memory_data_cache = []
        self.function_data_cache = []

        # 起始时间
        self.start_time = time.time()

    def setup(self) -> bool:
        """设置应用"""
        # 打开串口
        if not self.receiver.open():
            self.logger.error("无法打开串口")
            return False

        # 注册回调
        self.receiver.register_callback(
            DataReceiver.TYPE_CPU,
            self._on_cpu_data
        )
        self.receiver.register_callback(
            DataReceiver.TYPE_MEMORY,
            self._on_memory_data
        )
        self.receiver.register_callback(
            DataReceiver.TYPE_FUNCTION,
            self._on_function_data
        )

        # 启动接收
        self.receiver.start()

        self.logger.info("应用设置完成")
        return True

    def _on_cpu_data(self, packet):
        """CPU数据回调"""
        data = self.parser.parse_cpu_data(packet.data)
        if data:
            # 保存到数据库
            self.storage.save_cpu_data(data)

            # 缓存数据
            self.cpu_data_cache.append({
                'timestamp': data.timestamp,
                'cpu_usage': data.cpu_usage,
                'idle_time': data.idle_time,
                'total_time': data.total_time
            })

            # 更新实时图表
            elapsed = time.time() - self.start_time
            self.cpu_chart.add_data(elapsed, data.cpu_usage)

            self.logger.debug(f"CPU使用率: {data.cpu_usage}%")

    def _on_memory_data(self, packet):
        """内存数据回调"""
        data = self.parser.parse_memory_data(packet.data)
        if data:
            # 保存到数据库
            self.storage.save_memory_data(data)

            # 缓存数据
            self.memory_data_cache.append({
                'timestamp': data.timestamp,
                'total_heap': data.total_heap,
                'used_heap': data.used_heap,
                'free_heap': data.free_heap,
                'peak_usage': data.peak_usage
            })

            # 更新实时图表
            elapsed = time.time() - self.start_time
            self.memory_chart.add_data(elapsed, {
                '已用': data.used_heap / 1024,
                '空闲': data.free_heap / 1024
            })

            self.logger.debug(f"内存使用: {data.used_heap}/{data.total_heap}")

    def _on_function_data(self, packet):
        """函数数据回调"""
        data = self.parser.parse_function_data(packet.data)
        if data:
            # 保存到数据库
            self.storage.save_function_data(data)

            # 缓存数据
            self.function_data_cache.append({
                'timestamp': data.timestamp,
                'func_id': data.func_id,
                'func_name': data.func_name,
                'call_count': data.call_count,
                'total_cycles': data.total_cycles,
                'min_cycles': data.min_cycles,
                'max_cycles': data.max_cycles,
                'avg_cycles': data.avg_cycles
            })

            self.logger.debug(f"函数 {data.func_name}: 平均{data.avg_cycles}周期")

    def run_realtime_monitor(self, duration: int = 60):
        """
        运行实时监控

        Args:
            duration: 监控时长(秒)
        """
        self.logger.info(f"开始实时监控,时长: {duration}秒")

        # 启动实时图表(在单独线程中)
        import threading

        def show_cpu_chart():
            self.cpu_chart.start(interval=100)

        def show_memory_chart():
            self.memory_chart.start(interval=100)

        cpu_thread = threading.Thread(target=show_cpu_chart, daemon=True)
        memory_thread = threading.Thread(target=show_memory_chart, daemon=True)

        cpu_thread.start()
        memory_thread.start()

        # 等待指定时长
        time.sleep(duration)

        self.logger.info("实时监控结束")

    def run_analysis(self):
        """运行性能分析"""
        self.logger.info("开始性能分析")

        # 生成分析报告
        report = self.analyzer.generate_report(
            self.cpu_data_cache,
            self.memory_data_cache,
            self.function_data_cache
        )

        # 打印报告
        print("\n" + "="*60)
        print("性能分析报告")
        print("="*60)

        # 摘要
        print("\n摘要:")
        if 'cpu' in report['summary']:
            cpu = report['summary']['cpu']
            print(f"  CPU使用率: 平均={cpu['avg_usage']:.1f}%, "
                  f"最大={cpu['max_usage']:.1f}%, "
                  f"最小={cpu['min_usage']:.1f}%")

        if 'memory' in report['summary']:
            mem = report['summary']['memory']
            print(f"  内存使用: {mem['used_heap']}/{mem['total_heap']} "
                  f"({mem['usage_rate']:.1f}%), "
                  f"峰值={mem['peak_usage']}")

        if 'top_slow_functions' in report['summary']:
            print("\n  最慢的5个函数:")
            for func in report['summary']['top_slow_functions']:
                print(f"    - {func['name']}: {func['avg_cycles']}周期 "
                      f"(调用{func['call_count']}次)")

        # 问题
        if report['issues']:
            print("\n发现的问题:")
            for issue in report['issues']:
                severity_icon = {
                    'critical': '🔴',
                    'warning': '🟡',
                    'info': '🔵'
                }.get(issue.severity, '⚪')

                print(f"  {severity_icon} [{issue.severity.upper()}] "
                      f"{issue.description}")
                print(f"     建议: {issue.recommendation}")

        # 建议
        if report['recommendations']:
            print("\n总体建议:")
            for rec in report['recommendations']:
                print(f"  - {rec}")

        print("\n" + "="*60 + "\n")

        # 显示统计图表
        if self.function_data_cache:
            self.histogram.plot_function_performance(self.function_data_cache)

        if self.memory_data_cache:
            self.histogram.plot_memory_usage(self.memory_data_cache)

        if self.cpu_data_cache:
            self.histogram.plot_cpu_usage_distribution(self.cpu_data_cache)

    def cleanup(self):
        """清理资源"""
        self.receiver.close()
        self.storage.close()
        self.logger.info("资源已清理")

def main():
    """主函数"""
    # 检查命令行参数
    if len(sys.argv) < 2:
        print("用法: python main.py <串口名称> [波特率]")
        print("示例: python main.py COM3 115200")
        sys.exit(1)

    port = sys.argv[1]
    baudrate = int(sys.argv[2]) if len(sys.argv) > 2 else 115200

    # 创建应用
    app = PerformanceAnalyzerApp(port, baudrate)

    try:
        # 设置应用
        if not app.setup():
            sys.exit(1)

        # 运行实时监控
        print("正在监控性能数据...")
        print("按Ctrl+C停止监控并生成分析报告")

        # 持续监控
        while True:
            time.sleep(1)

            # 显示统计信息
            stats = app.receiver.get_stats()
            print(f"\r接收: {stats['packets_received']}包, "
                  f"有效: {stats['packets_valid']}包, "
                  f"CRC错误: {stats['crc_errors']}包", end='')

    except KeyboardInterrupt:
        print("\n\n停止监控...")

        # 运行分析
        app.run_analysis()

    finally:
        # 清理
        app.cleanup()

if __name__ == '__main__':
    main()

6.2 配置文件

配置文件 (config/analyzer.yaml):

# 性能分析工具配置

# 串口配置
serial:
  port: "COM3"  # Windows: COM3, Linux: /dev/ttyUSB0
  baudrate: 115200
  timeout: 1.0

# 数据采集配置
collection:
  sample_interval: 100  # 采样间隔(ms)
  buffer_size: 1024     # 缓冲区大小
  max_points: 1000      # 最大数据点数

# 分析配置
analysis:
  thresholds:
    cpu_high: 80        # CPU使用率高阈值(%)
    cpu_critical: 95    # CPU使用率临界阈值(%)
    memory_high: 80     # 内存使用率高阈值(%)
    memory_critical: 95 # 内存使用率临界阈值(%)
    function_slow: 10000  # 函数慢阈值(周期数)

# 可视化配置
visualization:
  realtime_update_interval: 100  # 实时图表更新间隔(ms)
  chart_max_points: 100          # 图表最大数据点数
  colors:
    cpu: "steelblue"
    memory_used: "coral"
    memory_free: "lightgreen"

# 存储配置
storage:
  database: "performance_data.db"
  auto_save: true
  save_interval: 10  # 自动保存间隔(秒)

# 日志配置
logging:
  level: "INFO"  # DEBUG, INFO, WARNING, ERROR
  file: "performance_analyzer.log"
  max_size: 10485760  # 10MB
  backup_count: 5

步骤7: 测试与验证

7.1 单元测试

测试数据解析 (tests/test_parser.py):

import unittest
import struct
from core.data_parser import DataParser, CPUData, MemoryData, FunctionData

class TestDataParser(unittest.TestCase):
    """数据解析器测试"""

    def setUp(self):
        self.parser = DataParser()

    def test_parse_cpu_data(self):
        """测试CPU数据解析"""
        # 构造测试数据
        timestamp = 1000
        cpu_usage = 75
        idle_time = 25000
        total_time = 100000

        data = struct.pack('<IBII', timestamp, cpu_usage, idle_time, total_time)

        # 解析
        result = self.parser.parse_cpu_data(data)

        # 验证
        self.assertIsNotNone(result)
        self.assertEqual(result.timestamp, timestamp)
        self.assertEqual(result.cpu_usage, cpu_usage)
        self.assertEqual(result.idle_time, idle_time)
        self.assertEqual(result.total_time, total_time)

    def test_parse_memory_data(self):
        """测试内存数据解析"""
        # 构造测试数据
        timestamp = 2000
        total_heap = 65536
        used_heap = 32768
        free_heap = 32768
        peak_usage = 40000

        data = struct.pack('<IIIII', timestamp, total_heap, used_heap, 
                          free_heap, peak_usage)

        # 解析
        result = self.parser.parse_memory_data(data)

        # 验证
        self.assertIsNotNone(result)
        self.assertEqual(result.timestamp, timestamp)
        self.assertEqual(result.total_heap, total_heap)
        self.assertEqual(result.used_heap, used_heap)
        self.assertEqual(result.free_heap, free_heap)
        self.assertEqual(result.peak_usage, peak_usage)

    def test_parse_function_data(self):
        """测试函数数据解析"""
        # 构造测试数据
        timestamp = 3000
        func_id = 1
        func_name = b"test_function\x00" + b'\x00' * 19  # 32字节
        call_count = 100
        total_cycles = 50000
        min_cycles = 400
        max_cycles = 600
        avg_cycles = 500

        data = struct.pack('<IH', timestamp, func_id)
        data += func_name
        data += struct.pack('<IIIII', call_count, total_cycles, 
                           min_cycles, max_cycles, avg_cycles)

        # 解析
        result = self.parser.parse_function_data(data)

        # 验证
        self.assertIsNotNone(result)
        self.assertEqual(result.timestamp, timestamp)
        self.assertEqual(result.func_id, func_id)
        self.assertEqual(result.func_name, "test_function")
        self.assertEqual(result.call_count, call_count)
        self.assertEqual(result.avg_cycles, avg_cycles)

if __name__ == '__main__':
    unittest.main()

7.2 集成测试

测试完整流程 (tests/test_integration.py):

import unittest
import time
import threading
from core.data_receiver import DataReceiver
from core.data_parser import DataParser
from core.data_storage import DataStorage

class TestIntegration(unittest.TestCase):
    """集成测试"""

    def setUp(self):
        self.storage = DataStorage(':memory:')  # 使用内存数据库
        self.parser = DataParser()

    def tearDown(self):
        self.storage.close()

    def test_data_flow(self):
        """测试数据流"""
        # 模拟CPU数据
        import struct
        cpu_data_raw = struct.pack('<IBII', 1000, 75, 25000, 100000)
        cpu_data = self.parser.parse_cpu_data(cpu_data_raw)

        # 保存到数据库
        self.storage.save_cpu_data(cpu_data)

        # 查询数据
        results = self.storage.query_cpu_data()

        # 验证
        self.assertEqual(len(results), 1)
        self.assertEqual(results[0]['cpu_usage'], 75)

    def test_statistics(self):
        """测试统计功能"""
        # 添加多条数据
        import struct
        for i in range(10):
            cpu_data_raw = struct.pack('<IBII', i*1000, 50+i, 25000, 100000)
            cpu_data = self.parser.parse_cpu_data(cpu_data_raw)
            self.storage.save_cpu_data(cpu_data)

        # 获取统计
        stats = self.storage.get_statistics()

        # 验证
        self.assertEqual(stats['cpu']['count'], 10)
        self.assertGreater(stats['cpu']['avg_usage'], 0)

if __name__ == '__main__':
    unittest.main()

项目总结

完成的功能

通过本项目,我们实现了一个完整的嵌入式性能分析工具,包括:

  • ✅ 嵌入式端数据采集系统
  • CPU使用率监控
  • 内存使用监控
  • 函数性能监控
  • 低开销的数据采集机制

  • ✅ 主机端数据处理系统

  • 串口通信和数据接收
  • 数据包解析和验证
  • 数据存储和查询
  • 性能数据分析

  • ✅ 可视化系统

  • 实时性能曲线图
  • 统计图表和分布图
  • 交互式数据展示

  • ✅ 分析系统

  • 性能瓶颈识别
  • 智能问题检测
  • 优化建议生成

技术亮点

  1. 低开销采集: 使用DWT周期计数器,对系统性能影响小
  2. 实时监控: 支持实时数据采集和显示
  3. 多维分析: 从CPU、内存、函数多个维度分析性能
  4. 智能诊断: 自动识别性能问题并给出建议
  5. 可扩展架构: 模块化设计,易于扩展新功能

应用场景

  • 嵌入式系统性能优化
  • 实时系统调试
  • 产品性能测试
  • 系统资源监控
  • 性能回归测试

扩展方向

功能扩展: - 支持更多通信接口(网络、USB等) - 添加任务调度分析 - 支持多设备同时监控 - 添加性能对比功能 - 实现远程监控

界面扩展: - 开发Web界面 - 添加更多图表类型 - 支持自定义仪表盘 - 实现报警功能

分析扩展: - 机器学习异常检测 - 性能预测 - 自动优化建议 - 性能基准测试

延伸阅读

相关文章

进阶主题

参考资料

工具和库: 1. Matplotlib - Python绘图库 2. PySerial - Python串口库 3. SQLite - 轻量级数据库 4. NumPy - 数值计算库

技术文档: 1. ARM DWT - ARM调试和追踪 2. Cortex-M Performance - Cortex-M性能 3. Python Threading - Python多线程

开源项目: 1. Tracealyzer - RTOS追踪工具 2. SystemView - 系统分析工具 3. Perfetto - 性能追踪工具


项目交付物: - 完整的源代码 - 配置文件和文档 - 测试用例 - 使用说明 - 示例项目

预计开发时间: 180小时

难度等级: 高级

适用人群: 有嵌入式开发经验,熟悉Python编程,了解性能优化的开发者