
RTOS设计与开发(8):最小消息队列,环形缓冲、双等待链表与deadline重试
RTOS设计与开发(8):最小消息队列,环形缓冲、双等待链表与deadline重试
信号量解决的是“有没有一个可用资源”的问题,消息队列解决的则是“有没有一条具体数据可以交给别人”。
当前仓库里的 os_queue 不是一个大而全的消息系统,而是一个很克制的最小静态消息队列实现:固定消息大小、静态缓冲区、环形存储、双等待链表、线程态阻塞和 ISR 发送。
它很小,但设计思路非常清晰。
先看对象结构:为什么是静态环形缓冲
当前队列对象定义如下:
typedef struct os_queue {
uint32_t magic;
uint8_t *buffer;
uint32_t msg_size;
uint32_t capacity;
uint32_t count;
uint32_t head_index;
uint32_t tail_index;
list_t send_wait_list;
list_t recv_wait_list;
} os_queue_t;
这份结构说明了当前设计的几个明确取舍:
- 缓冲区由调用方静态提供,内核不自己分配内存。
- 每条消息大小固定,所以槽位地址可以直接用
index * msg_size算出来。 - 发送和接收分别维护各自等待链表,而不是用一条链表混装两种 waiter。
这里最值得注意的是最后一点。
队列“满”和“空”是两种完全不同的阻塞原因。发送者是在等“出现空槽”,接收者是在等“出现消息”。如果把它们塞进同一条链表,对象满足路径就会变得很混乱:你每次成功发完或收完之后,都得先辨认“现在该唤醒的到底是哪一类任务”。
所以当前实现直接拆成了:
send_wait_listrecv_wait_list
这个设计非常对。
为什么 head_index、tail_index 和 count 要同时存在
环形队列通常会有两种写法:
- 只保存
head和tail,用它们的关系推导空/满。 - 保存
head、tail,再额外保存count。
当前代码选了第二种:
static os_status_t os_queue_send_copy_locked(os_queue_t *queue, const void *msg)
{
...
slot = os_queue_get_slot_ptr(queue, queue->tail_index);
...
queue->tail_index = os_queue_advance_index(queue, queue->tail_index);
queue->count++;
return OS_STATUS_OK;
}
static os_status_t os_queue_recv_copy_locked(os_queue_t *queue, void *msg)
{
...
slot = os_queue_get_slot_ptr(queue, queue->head_index);
...
queue->head_index = os_queue_advance_index(queue, queue->head_index);
queue->count--;
return OS_STATUS_OK;
}
为什么这个取舍适合当前阶段?
因为它把“空”和“满”的判断写得最直白:
count == 0就是空。count >= capacity就是满。
这让对象层代码可以把更多注意力放在阻塞/唤醒语义,而不是去处理“头尾相等时到底是空还是满”的额外判定规则。
为什么 timeout 不是每次重试都重新开始算
当前队列实现里,一个特别值得写进博客的设计点,是 deadline 的处理方式。
在发送路径里,有限等待会在第一次进入 API 时就固定绝对 deadline:
wait_forever = (uint8_t)(timeout_ticks == OS_WAIT_FOREVER);
if (wait_forever == 0U)
{
deadline_tick = (os_tick_t)(os_tick_get() + timeout_ticks);
}
之后如果任务被唤醒,但再次尝试时队列仍然不满足条件,代码并不会重新给它一个完整 timeout,而是重新结算剩余时间:
if (wait_forever == 0U)
{
current_tick = os_tick_get();
if (os_queue_tick_is_due(current_tick, deadline_tick) != 0U)
{
os_port_exit_critical(primask);
return OS_STATUS_TIMEOUT;
}
remaining_timeout = (os_tick_t)(deadline_tick - current_tick);
}
else
{
remaining_timeout = OS_WAIT_FOREVER;
}
这个设计非常重要。
否则只要任务每次被唤醒后都“重新拿到一个完整 timeout”,等待上限就会被无限刷新,最终 API 表面上写的是“最多等 10 tick”,实际效果却可能是永远等下去。
当前实现把 timeout 明确解释成“从第一次调用开始,总共最多等多久”,这才是更符合直觉的语义。
为什么发送成功后只唤醒 receiver
看发送快路径:
if (queue->count < queue->capacity)
{
status = os_queue_send_copy_locked(queue, msg);
...
status = os_queue_wake_first_receiver_locked(queue);
...
if (status == OS_STATUS_SWITCH_REQUIRED)
{
os_port_trigger_pendsv();
}
os_port_exit_critical(primask);
return OS_STATUS_OK;
}
这里的策略非常明确:
发送成功以后,只尝试唤醒等待接收的任务,不碰 send_wait_list。
为什么?
因为一次成功发送带来的新条件是“队列里多了一条消息”,它只可能让 receiver 的等待条件成立;它不会让 sender 获得新的发送空间。反过来,接收成功以后只唤醒 sender,也是同一个逻辑。
这种一一对应的唤醒策略,让对象层状态变化和等待条件之间的因果关系非常干净。
为什么 ISR 版本目前只提供 send_from_isr
当前公开的 ISR 接口只有一个:
os_status_t os_queue_send_from_isr(os_queue_t *queue, const void *msg)
它的语义也很清楚:
- ISR 不能阻塞,所以队列满了直接返回
OS_STATUS_TIMEOUT。 - 发送成功后,若唤醒了更高优先级 receiver,就 pend 一次
PendSV。
核心代码如下:
if (queue->count >= queue->capacity)
{
os_port_exit_critical(primask);
return OS_STATUS_TIMEOUT;
}
status = os_queue_send_copy_locked(queue, msg);
...
status = os_queue_wake_first_receiver_locked(queue);
...
if (status == OS_STATUS_SWITCH_REQUIRED)
{
os_port_trigger_pendsv();
}
为什么当前没有 recv_from_isr?
因为现阶段更常见、也更合理的中断通信方向是“中断把数据投递给线程处理”,而不是“中断自己从队列里取消息”。所以实现上优先把最有实际价值的一半做扎实,是合理的。
双等待链表为什么能和任务层自然配合
队列对象本身并不自己负责“让任务睡眠”或“让任务恢复运行”,它只是按当前条件把任务分别挂进:
send_wait_listrecv_wait_list
然后把真正的阻塞和唤醒动作交给任务层:
status = task_wait_list_insert_priority_ordered(&queue->send_wait_list, current_task);
...
status = task_block_current(queue, remaining_timeout);
以及:
waiter = task_wait_list_peek_head_task(&queue->recv_wait_list);
...
status = task_unblock(waiter, TASK_WAIT_RESULT_OBJECT);
这说明当前实现是很克制的。
队列层只维护“谁在等发送、谁在等接收、环形缓冲里现在有什么”,任务层只维护“谁 runnable、谁 blocked、谁 timeout”。这两层没有互相越权,所以代码虽然已经不短,但语义依然清晰。
当前这版消息队列刻意保持了什么边界
既然要写成系列博客,就要把当前版本的边界说透。
这版 os_queue 目前仍然是一个“最小消息队列”,它还没有:
- 可变长度消息。
- 多生产者/多消费者之外的更复杂投递策略。
- ISR 接收接口。
- 零拷贝模式。
但它已经把最重要的骨架搭起来了:
- 静态固定槽位存储。
- 环形 head/tail 推进。
- 发送等待者与接收等待者分离。
- timeout 采用 deadline + remaining timeout 语义。
- ISR 可以直接向线程投递消息。
对一个正在成长中的 RTOS 来说,这已经是非常扎实的一步。
小结
当前这版最小消息队列最值得称道的地方,不是“终于能发消息了”,而是它在复杂度明显上升之后,仍然保持了很干净的边界:
- 数据存储是环形缓冲的责任。
- 等待排序是对象等待链表的责任。
- 阻塞、超时和恢复 runnable 是任务层的责任。
- 抢占切换最终仍然落到
PendSV。
这说明当前 RTOS 已经不再是“几个模块各写各的”了,而是开始形成一套能彼此拼接的内核结构。
如果后面继续往下做,最自然的方向会是:
- 计数型信号量或 mutex。
- 更完整的 ISR 到线程通信模式。
- 更丰富的对象等待超时接口。
但无论往哪个方向扩,当前这一版的任务层、对象等待层和 port 层边界,已经足够支撑后续继续演进。