LK 博客
RTOS设计与开发(8):最小消息队列,环形缓冲、双等待链表与deadline重试
嵌入式
约 1 分钟阅读 0 赞 0 条评论 鸿蒙黑体

RTOS设计与开发(8):最小消息队列,环形缓冲、双等待链表与deadline重试

Yukikaze
Yukikaze @Yukikaze
累计点赞 0 登录后每个账号只能点一次
内容长度 0 正文词元数
正文
目录会跟随阅读位置移动。
阅读进度

RTOS设计与开发(8):最小消息队列,环形缓冲、双等待链表与deadline重试

信号量解决的是“有没有一个可用资源”的问题,消息队列解决的则是“有没有一条具体数据可以交给别人”。

当前仓库里的 os_queue 不是一个大而全的消息系统,而是一个很克制的最小静态消息队列实现:固定消息大小、静态缓冲区、环形存储、双等待链表、线程态阻塞和 ISR 发送。

它很小,但设计思路非常清晰。

先看对象结构:为什么是静态环形缓冲

当前队列对象定义如下:

typedef struct os_queue {
    uint32_t magic;
    uint8_t *buffer;
    uint32_t msg_size;
    uint32_t capacity;
    uint32_t count;
    uint32_t head_index;
    uint32_t tail_index;
    list_t   send_wait_list;
    list_t   recv_wait_list;
} os_queue_t;

这份结构说明了当前设计的几个明确取舍:

  1. 缓冲区由调用方静态提供,内核不自己分配内存。
  2. 每条消息大小固定,所以槽位地址可以直接用 index * msg_size 算出来。
  3. 发送和接收分别维护各自等待链表,而不是用一条链表混装两种 waiter。

这里最值得注意的是最后一点。

队列“满”和“空”是两种完全不同的阻塞原因。发送者是在等“出现空槽”,接收者是在等“出现消息”。如果把它们塞进同一条链表,对象满足路径就会变得很混乱:你每次成功发完或收完之后,都得先辨认“现在该唤醒的到底是哪一类任务”。

所以当前实现直接拆成了:

  1. send_wait_list
  2. recv_wait_list

这个设计非常对。

为什么 head_indextail_indexcount 要同时存在

环形队列通常会有两种写法:

  1. 只保存 headtail,用它们的关系推导空/满。
  2. 保存 headtail,再额外保存 count

当前代码选了第二种:

static os_status_t os_queue_send_copy_locked(os_queue_t *queue, const void *msg)
{
    ...
    slot = os_queue_get_slot_ptr(queue, queue->tail_index);
    ...
    queue->tail_index = os_queue_advance_index(queue, queue->tail_index);
    queue->count++;
    return OS_STATUS_OK;
}

static os_status_t os_queue_recv_copy_locked(os_queue_t *queue, void *msg)
{
    ...
    slot = os_queue_get_slot_ptr(queue, queue->head_index);
    ...
    queue->head_index = os_queue_advance_index(queue, queue->head_index);
    queue->count--;
    return OS_STATUS_OK;
}

为什么这个取舍适合当前阶段?

因为它把“空”和“满”的判断写得最直白:

  1. count == 0 就是空。
  2. count >= capacity 就是满。

这让对象层代码可以把更多注意力放在阻塞/唤醒语义,而不是去处理“头尾相等时到底是空还是满”的额外判定规则。

为什么 timeout 不是每次重试都重新开始算

当前队列实现里,一个特别值得写进博客的设计点,是 deadline 的处理方式。

在发送路径里,有限等待会在第一次进入 API 时就固定绝对 deadline:

wait_forever = (uint8_t)(timeout_ticks == OS_WAIT_FOREVER);
if (wait_forever == 0U)
{
    deadline_tick = (os_tick_t)(os_tick_get() + timeout_ticks);
}

之后如果任务被唤醒,但再次尝试时队列仍然不满足条件,代码并不会重新给它一个完整 timeout,而是重新结算剩余时间:

if (wait_forever == 0U)
{
    current_tick = os_tick_get();

    if (os_queue_tick_is_due(current_tick, deadline_tick) != 0U)
    {
        os_port_exit_critical(primask);
        return OS_STATUS_TIMEOUT;
    }

    remaining_timeout = (os_tick_t)(deadline_tick - current_tick);
}
else
{
    remaining_timeout = OS_WAIT_FOREVER;
}

这个设计非常重要。

否则只要任务每次被唤醒后都“重新拿到一个完整 timeout”,等待上限就会被无限刷新,最终 API 表面上写的是“最多等 10 tick”,实际效果却可能是永远等下去。

当前实现把 timeout 明确解释成“从第一次调用开始,总共最多等多久”,这才是更符合直觉的语义。

为什么发送成功后只唤醒 receiver

看发送快路径:

if (queue->count < queue->capacity)
{
    status = os_queue_send_copy_locked(queue, msg);
    ...
    status = os_queue_wake_first_receiver_locked(queue);
    ...
    if (status == OS_STATUS_SWITCH_REQUIRED)
    {
        os_port_trigger_pendsv();
    }

    os_port_exit_critical(primask);
    return OS_STATUS_OK;
}

这里的策略非常明确:
发送成功以后,只尝试唤醒等待接收的任务,不碰 send_wait_list

为什么?

因为一次成功发送带来的新条件是“队列里多了一条消息”,它只可能让 receiver 的等待条件成立;它不会让 sender 获得新的发送空间。反过来,接收成功以后只唤醒 sender,也是同一个逻辑。

这种一一对应的唤醒策略,让对象层状态变化和等待条件之间的因果关系非常干净。

为什么 ISR 版本目前只提供 send_from_isr

当前公开的 ISR 接口只有一个:

os_status_t os_queue_send_from_isr(os_queue_t *queue, const void *msg)

它的语义也很清楚:

  1. ISR 不能阻塞,所以队列满了直接返回 OS_STATUS_TIMEOUT
  2. 发送成功后,若唤醒了更高优先级 receiver,就 pend 一次 PendSV

核心代码如下:

if (queue->count >= queue->capacity)
{
    os_port_exit_critical(primask);
    return OS_STATUS_TIMEOUT;
}

status = os_queue_send_copy_locked(queue, msg);
...
status = os_queue_wake_first_receiver_locked(queue);
...
if (status == OS_STATUS_SWITCH_REQUIRED)
{
    os_port_trigger_pendsv();
}

为什么当前没有 recv_from_isr

因为现阶段更常见、也更合理的中断通信方向是“中断把数据投递给线程处理”,而不是“中断自己从队列里取消息”。所以实现上优先把最有实际价值的一半做扎实,是合理的。

双等待链表为什么能和任务层自然配合

队列对象本身并不自己负责“让任务睡眠”或“让任务恢复运行”,它只是按当前条件把任务分别挂进:

  1. send_wait_list
  2. recv_wait_list

然后把真正的阻塞和唤醒动作交给任务层:

status = task_wait_list_insert_priority_ordered(&queue->send_wait_list, current_task);
...
status = task_block_current(queue, remaining_timeout);

以及:

waiter = task_wait_list_peek_head_task(&queue->recv_wait_list);
...
status = task_unblock(waiter, TASK_WAIT_RESULT_OBJECT);

这说明当前实现是很克制的。

队列层只维护“谁在等发送、谁在等接收、环形缓冲里现在有什么”,任务层只维护“谁 runnable、谁 blocked、谁 timeout”。这两层没有互相越权,所以代码虽然已经不短,但语义依然清晰。

当前这版消息队列刻意保持了什么边界

既然要写成系列博客,就要把当前版本的边界说透。

这版 os_queue 目前仍然是一个“最小消息队列”,它还没有:

  1. 可变长度消息。
  2. 多生产者/多消费者之外的更复杂投递策略。
  3. ISR 接收接口。
  4. 零拷贝模式。

但它已经把最重要的骨架搭起来了:

  1. 静态固定槽位存储。
  2. 环形 head/tail 推进。
  3. 发送等待者与接收等待者分离。
  4. timeout 采用 deadline + remaining timeout 语义。
  5. ISR 可以直接向线程投递消息。

对一个正在成长中的 RTOS 来说,这已经是非常扎实的一步。

小结

当前这版最小消息队列最值得称道的地方,不是“终于能发消息了”,而是它在复杂度明显上升之后,仍然保持了很干净的边界:

  1. 数据存储是环形缓冲的责任。
  2. 等待排序是对象等待链表的责任。
  3. 阻塞、超时和恢复 runnable 是任务层的责任。
  4. 抢占切换最终仍然落到 PendSV

这说明当前 RTOS 已经不再是“几个模块各写各的”了,而是开始形成一套能彼此拼接的内核结构。

如果后面继续往下做,最自然的方向会是:

  1. 计数型信号量或 mutex。
  2. 更完整的 ISR 到线程通信模式。
  3. 更丰富的对象等待超时接口。

但无论往哪个方向扩,当前这一版的任务层、对象等待层和 port 层边界,已经足够支撑后续继续演进。

作者名片

Yukikaze
Yukikaze
@Yukikaze

私にできることなら何でもするから

评论区
文章作者和管理员都可以管理这里的评论。
0 条评论
登录后即可参与评论。 去登录
还没有评论,欢迎留下第一条交流内容。