一、库总体功能与目的

1.1 核心功能

Sys_UDL(System Universal Data Link)是一个基于发布 – 订阅模式的跨线程安全数据分发库,专为 ESP32 Arduino 平台设计。其核心功能包括:

  • 支持多数据源(服务)的注册与管理,每个服务对应一个发布方和多个订阅方;
  • 提供线程安全的数据分发机制,发布方推送数据时无阻塞,通过独立线程异步处理分发;
  • 为每个订阅方维护独立的缓冲队列,支持数据的有序存储与获取;
  • 内置多种滤波算法(均值滤波、高通 / 低通 / 带通滤波、卡尔曼滤波),可在数据分发过程中自动处理噪声;
  • 采用内存预分配机制,所有内存开销在初始化时计算并申请,避免运行时动态内存分配导致的碎片问题。

1.2 设计目的

  • 跨线程数据安全交互:解决 ESP32 多任务(FreeRTOS 任务)间数据共享的线程安全问题,避免竞态条件;
  • 低资源占用:适配 ESP32 嵌入式平台的资源限制(内存、CPU),通过轻量级设计实现高效数据分发;
  • 灵活的滤波支持:针对传感器数据(如 ECG 心电信号、温度、加速度等)的噪声问题,提供可配置的滤波功能;
  • 解耦发布与订阅:通过发布 – 订阅模式分离数据生产者(发布方)和消费者(订阅方),降低模块间耦合度。

1.3 适用场景

  • 多传感器数据采集与分发(如环境监测、生理信号监测);
  • ESP32 多任务间的数据通信(如采集任务与处理 / 显示任务的交互);
  • 需要噪声抑制的数据预处理场景(如 ECG 信号滤波、振动信号分析)。

二、核心数据结构

库的核心功能依赖以下数据结构实现,需在使用前了解其作用:

结构体名作用描述
SysUdl_State库全局状态结构体,存储所有服务、发布队列、线程同步对象等核心数据
SysUdl_Service服务结构体,对应一个数据源,包含服务名称、订阅者列表、滤波配置等
SysUdl_Subscriber订阅者结构体,包含订阅者 ID、数据缓冲队列、队列同步锁等
SysUdl_Data数据结构体,存储实际数据(整型 / 浮点型)、数据类型及时间戳
SysUdl_FilterParams滤波参数结构体,存储各类滤波器的配置(如截止频率、窗口大小、噪声协方差)
SysUdl_FilterState滤波状态结构体,存储滤波器的运行时状态(如历史数据、中间计算结果)

三、库接口函数功能

3.1 初始化与销毁

size_t SysUdl_CalculateMemorySize(uint16_t publish_queue_size)

  • 功能:计算库初始化所需的内存大小(字节),用于预分配内存池。
  • 参数publish_queue_size – 发布任务队列的最大长度(需根据发布频率配置)。
  • 返回值:所需内存总大小(字节)。

bool SysUdl_Init(SysUdl_State* state, void* memory_pool, uint16_t publish_queue_size)

  • 功能:初始化库,创建分发线程、信号量及数据结构,绑定预分配的内存池。
  • 参数
    • state – 库状态结构体指针(需用户定义并传入);
    • memory_pool – 预分配的内存池指针(大小由 SysUdl_CalculateMemorySize 计算);
    • publish_queue_size – 发布任务队列的最大长度。
  • 返回值true 表示初始化成功,false 表示失败(如内存不足、线程创建失败)。

void SysUdl_Deinit(SysUdl_State* state)

  • 功能:销毁库,释放线程、信号量及资源,重置状态。
  • 参数state – 库状态结构体指针。

3.2 服务管理

bool SysUdl_RegisterService(SysUdl_State* state, const char* service_name, SysUdl_FilterType filter_type)

  • 功能:注册一个数据服务(数据源),指定服务名称和默认滤波类型。
  • 参数
    • state – 库状态结构体指针;
    • service_name – 服务名称(唯一标识,如 “ecg_signal”、”temperature”);
    • filter_type – 滤波类型(枚举值,如 SYSUDL_FILTER_LOW_PASSSYSUDL_FILTER_KALMAN)。
  • 返回值true 表示注册成功,false 表示失败(如服务已存在、超过最大服务数)。

void SysUdl_UnregisterService(SysUdl_State* state, const char* service_name)

  • 功能:注销指定服务,释放其关联的订阅者和资源。
  • 参数state – 库状态结构体指针;service_name – 服务名称。

3.3 订阅管理

bool SysUdl_Subscribe(SysUdl_State* state, const char* service_name, const char* subscriber_id)

  • 功能:订阅指定服务,为订阅者创建独立的数据缓冲队列。
  • 参数
    • state – 库状态结构体指针;
    • service_name – 服务名称;
    • subscriber_id – 订阅者唯一标识(如 “display_task”、”analysis_module”)。
  • 返回值true 表示订阅成功,false 表示失败(如服务不存在、订阅者已存在)。

void SysUdl_Unsubscribe(SysUdl_State* state, const char* service_name, const char* subscriber_id)

  • 功能:取消订阅指定服务,清除订阅者的缓冲队列。
  • 参数state – 库状态结构体指针;service_name – 服务名称;subscriber_id – 订阅者 ID。

3.4 数据发布

bool SysUdl_PushInt(SysUdl_State* state, const char* service_name, int32_t data)

  • 功能:发布整型数据到指定服务,数据会被加入发布队列,由分发线程异步处理。
  • 参数
    • state – 库状态结构体指针;
    • service_name – 服务名称;
    • data – 待发布的整型数据(如传感器原始值)。
  • 返回值true 表示发布成功,false 表示失败(如服务不存在、发布队列满)。

bool SysUdl_PushFloat(SysUdl_State* state, const char* service_name, float data)

  • 功能:发布浮点型数据到指定服务,逻辑同 SysUdl_PushInt
  • 参数state – 库状态结构体指针;service_name – 服务名称;data – 待发布的浮点型数据。
  • 返回值true 表示发布成功,false 表示失败。

3.5 数据获取

bool SysUdl_Get(SysUdl_State* state, const char* service_name, const char* subscriber_id, SysUdl_Data* out_data)

  • 功能:从订阅者的缓冲队列中获取最旧数据(FIFO 顺序),并移除该数据。
  • 参数
    • state – 库状态结构体指针;
    • service_name – 服务名称;
    • subscriber_id – 订阅者 ID;
    • out_data – 输出参数,存储获取到的数据(含类型、值、时间戳)。
  • 返回值true 表示获取成功(数据存在),false 表示失败(队列为空)。

bool SysUdl_GetNew(SysUdl_State* state, const char* service_name, const char* subscriber_id, SysUdl_Data* out_data)

  • 功能:获取订阅者队列中最新的数据,并清空队列(仅保留最新值)。
  • 参数:同 SysUdl_Get
  • 返回值true 表示获取成功,false 表示失败(队列为空)。

3.6 滤波参数设置

bool SysUdl_SetAverageFilterParams(...)

  • 功能:为均值滤波服务设置参数。
  • 参数window_size – 滑动窗口大小(如 5、10,需 ≥2)。

bool SysUdl_SetHighPassFilterParams(...) / SysUdl_SetLowPassFilterParams(...)

  • 功能:为高通 / 低通滤波服务设置参数。
  • 参数cutoff_freq – 截止频率(Hz);sample_freq – 采样频率(Hz,需 > 2× 截止频率)。

bool SysUdl_SetBandPassFilterParams(...)

  • 功能:为带通滤波服务设置参数。
  • 参数low_cutoff – 低截止频率;high_cutoff – 高截止频率;sample_freq – 采样频率。

bool SysUdl_SetKalmanFilterParams(...)

  • 功能:为卡尔曼滤波服务设置参数(适用于动态信号如 ECG)。
  • 参数process_noise(Q)- 过程噪声协方差;measure_noise(R)- 测量噪声协方差;error_estimate(P₀)- 初始估计误差。

bool SysUdl_InitEcgKalmanFilter(...)

  • 功能:ECG 信号专用卡尔曼滤波初始化(内置优化参数:Q=0.08、R=0.2、P₀=5.0)。

四、代码核心逻辑讲解

4.1 线程模型与数据分发流程

库采用 “发布队列 + 独立分发线程” 的异步架构,避免发布方阻塞:

  1. 发布阶段:发布方调用 SysUdl_PushInt/SysUdl_PushFloat 时,数据被封装为 SysUdl_PublishTask 加入全局发布队列,同时通过信号量(publish_semaphore)通知分发线程。此过程仅涉及队列操作,耗时极短(微秒级),不会阻塞发布方任务。
  2. 分发阶段:独立的分发线程(SysUdl_DispatchTask)循环等待发布队列信号,取出任务后根据服务名称查找对应的服务,对数据应用配置的滤波算法,最后将滤波后的数据分发到该服务所有订阅者的缓冲队列中。
  3. 订阅阶段:订阅方通过 SysUdl_Get/SysUdl_GetNew 从自己的缓冲队列中获取数据,队列操作通过互斥锁(queue_mutex)保证线程安全。

4.2 内存管理机制

为避免 ESP32 内存碎片问题,库采用 “预分配 + 静态管理” 策略:

  1. 内存计算SysUdl_CalculateMemorySize 函数根据发布队列大小、最大服务数、最大订阅者数等参数,预先计算所需总内存(包括发布队列、信号量、滤波缓冲区等)。
  2. 内存绑定:用户需根据计算结果分配内存池(如通过 malloc 或静态数组),并在 SysUdl_Init 时传入,库内部所有数据结构均从该内存池分配,无运行时动态内存操作。

4.3 滤波机制实现

滤波逻辑在数据分发阶段自动执行,核心流程为:

  1. 分发线程取出原始数据后,根据服务配置的 filter_type 调用对应滤波函数(如 SysUdl_LowPassFilterSysUdl_KalmanFilter);
  2. 滤波函数使用 SysUdl_FilterParams 中的配置参数和 SysUdl_FilterState 中的历史状态计算滤波后的值;
  3. 滤波后的数据被封装为 SysUdl_Data(含时间戳),存入订阅者队列。

以卡尔曼滤波为例:针对 ECG 信号的非平稳特性,优化后的实现增加了自适应基线跟踪,在 QRS 波(信号突变)期间暂停基线更新,避免波形失真,同时通过简化标量运算(替代矩阵操作)降低计算开销,适配 ESP32 性能。

4.4 线程安全保障

库通过 FreeRTOS 信号量(SemaphoreHandle_t)替代标准库 mutex,确保多任务访问安全:

  • 服务列表、订阅者列表的修改通过 service_mutex 保护;
  • 发布队列的读写通过 publish_mutex 和 publish_semaphore 同步;
  • 订阅者缓冲队列的操作通过 queue_mutex 隔离;
  • 滤波状态的更新通过 filter_mutex 保护,避免多线程同时修改滤波参数或状态。

五、使用示例

5.1 基础流程(初始化 + 发布 + 订阅)

#include <Sys_UDL.h>

// 配置参数
#define PUBLISH_QUEUE_SIZE 50  // 发布队列大小
#define SYSUDL_QUEUE_SIZE 20   // 订阅者队列大小(在头文件中定义)

// 全局变量
static SysUdl_State udl_state;
static uint8_t* udl_memory_pool;

// 发布任务:模拟 ECG 信号发布
void publish_task(void* param) {
  while (1) {
    float ecg_data = /* 从传感器读取的原始 ECG 数据 */;
    SysUdl_PushFloat(&udl_state, "ecg_signal", ecg_data);  // 发布浮点数据
    vTaskDelay(pdMS_TO_TICKS(10));  // 100Hz 采样
  }
}

// 订阅任务:获取滤波后的 ECG 数据并处理
void subscribe_task(void* param) {
  SysUdl_Data data;
  while (1) {
    if (SysUdl_Get(&udl_state, "ecg_signal", "display", &data)) {  // 获取最旧数据
      Serial.printf("ECG: %.2f, 时间戳: %llu\n", data.data.float_val, data.timestamp);
    }
    vTaskDelay(pdMS_TO_TICKS(5));
  }
}

void setup() {
  Serial.begin(115200);

  // 1. 计算并分配内存池
  size_t required_mem = SysUdl_CalculateMemorySize(PUBLISH_QUEUE_SIZE);
  udl_memory_pool = (uint8_t*)malloc(required_mem);
  if (!udl_memory_pool) {
    Serial.println("内存分配失败!");
    while (1);
  }

  // 2. 初始化库
  if (!SysUdl_Init(&udl_state, udl_memory_pool, PUBLISH_QUEUE_SIZE)) {
    Serial.println("库初始化失败!");
    while (1);
  }

  // 3. 注册服务(ECG 信号 + 卡尔曼滤波)
  SysUdl_RegisterService(&udl_state, "ecg_signal", SYSUDL_FILTER_KALMAN);
  SysUdl_InitEcgKalmanFilter(&udl_state, "ecg_signal");  // 应用 ECG 优化参数

  // 4. 订阅服务
  SysUdl_Subscribe(&udl_state, "ecg_signal", "display");

  // 5. 创建发布和订阅任务
  xTaskCreatePinnedToCore(publish_task, "Publish", 2048, NULL, 5, NULL, 1);
  xTaskCreatePinnedToCore(subscribe_task, "Subscribe", 2048, NULL, 5, NULL, 0);
}

void loop() {
  vTaskDelay(pdMS_TO_TICKS(1000));  // 主任务空转
}

六、总结

Sys_UDL 库通过轻量级设计实现了 ESP32 平台的跨线程安全数据分发,核心优势包括:

  • 异步分发机制确保发布方无阻塞,适合高频率数据采集场景;
  • 内存预分配机制降低嵌入式平台的内存风险;
  • 灵活的滤波支持满足传感器数据预处理需求;
  • 基于 FreeRTOS 的线程安全设计适配多任务环境。

通过合理配置服务、滤波参数和队列大小,可广泛应用于物联网、医疗监测、工业控制等嵌入式数据交互场景。

Avatar photo

作者 skyate

发表回复