介绍基于 AVR 的 RTOS 的设计和实现,名为 ROS。

GitHub 地址

我想要怎么的RTOS

ROS作为一个RTOS(Real-Time Operating System)的实现,它的核心是 scheduler(调度器),负责任务/协程(下文统一使用「任务」来表述)的调度。而其他的系统工具比如: Semaphore、 Queue 都可以在此基础上实现。

对于嵌入式系统来说,如果需要一个操作系统,那么它应该是轻量级的。因为嵌入式系统的内存通常有限制。对于我来说,去实现一个复杂的,重量级的操作系统,也不太可能。

所以我的RTOS应该是这样(实际上也确实是这样):

  1. 轻量:实现 priority-based and cooperative scheduler、semaphore,即任务调度和任务通信
  2. 简单:基于优先队列的非抢占式(Cooperative multitasking)的协程调度器。使用排序链表实现,实现简单。插入O(N),取出最高优先级元素O(1)。

注意此处 priority-based 和 cooperative 并不冲突。ROS 归根到底是一个 cooperative 调度器,而 priority 仅仅是任务被启动的顺序,也就是说一个低优先级的任务不会被高优先级的无条件抢占(scheduler never interrupts a process unexpectedly),除非低优先级任务主动让出CPU时间(unless processes voluntarily yield control periodically or when idle or logically blocked)。

需求分析

嵌入式系统上真的需要操作系统吗?或者说任务调度有什么优点?

回答这个问题之前,我们先回想一下日常使用的手机,是不是都搭载了一个系统(比如 Android 或者 iOS),有了操作系统我们可以一边听歌一边聊天,也就是 Multitasking。

如果需要实现的业务比较简单,那么不使用操作系统也可以轻松的完成;对于复杂的逻辑和业务,也有几个解决方法:使用有限状态机或者使用操作系统。

相比于有限状态机的方法,或者采用普通的方式编程,使用操作系统有下面这些优势:

  • 大幅简化业务逻辑和代码。RTOS 提供多任务(和现代操作系统的多线程类似)运行的功能。而使用状态机则会有很多 if、switch 的状态判断。操作系统很好的隐藏了底层细节,提供给用户简单的接口以供使用。

  • 可以快速,一致的响应中断。快速的响应中断和处理中断是 RTOS 的实现要求,所谓的“实时”就是对中断的快速响应。

  • 不需要 busy-wating(忙等)。RTOS可以提供 Semaphore、Queue 之类的任务通信工具。比如使用 Semaphore 之后,就不需要忙等一个 Flag,而当条件不满足时,当前任务会从就绪队列中移除。当条件满足时,唤醒该任务。

技术方案

ROS 主要用 C 语言开发,只有scheduler 的核心部分用 Inline Assembly 实现。开发过程中的工具如下:

  • AVR-GCC:AVR平台的GCC编译器

  • ARV-GDB:AVR平台的 Debugger

  • SimAVR:AVR 模拟器,可以将程序在模拟器上测试

  • Make:Unix下的构建工具,用于构建整个系统

开发流程

  1. 阅读 FreeRTOS 的实现和 Atomthreads 的代码
  2. 编写代码
  3. 编写测试
  4. 使用 Makefile 构建上传
  5. 在 simavr 上测试
  6. 真机测试
  7. 作为 Arduino Library,在 Arduino IDE 中使用

思路:利用 timer 产生周期性中断,在系统中加入一个默认任务。在ISR中调用 scheduler,选择优先级最高的任务(对于相同优先级的任务,使用 round-robin),之后进行上下文切换。同时任务可以主动让出 CPU,让其他低优先级的任务切入。

当一个任务主动让出 CPU,即调用系统函数 ros_delay 时,当前任务不再入队(就绪队列),而是保存在一个阻塞队列中(阻塞任务)。在ISR中检查是否有任务的delay到期,则重新将其加入就绪队列(唤醒任务)。

同时我为每个函数都写了详细的英文注解,在贴代码的同时我会连着注解一起贴上来。

下面简要说明ROS提供的内核API:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// 初始化系统
bool ros_init();
// 获取当前在运行的任务
ROS_TCB *ros_current_tcb();
// 创建任务
status_t ros_create_task(ROS_TCB *tcb, task_func task, uint8_t priority,stack_t *stack, int stack_size);
// 任务调度
void ros_schedule();

// 阻塞任务,以 System tick 为单位
status_t ros_delay(uint32_t ticks);

// 下面四个函数,是移植 ROS 时需要实现的
// 初始化 Timer
extern void ros_init_timer();
// 默认任务
extern void ros_idle_task();
// 初始化任务栈
extern void ros_task_context_init(ROS_TCB *tcb_ptr, task_func task_f,void *stack_top);
// 上下文切换
extern void ros_switch_context(ROS_TCB *old_tcb, ROS_TCB *new_tcb);

数据结构

就绪队列

根据上面的思路,我们需要两个队列:就绪队列和阻塞队列。就绪队列保存所有就绪的任务,并且按照优先级降序;阻塞队列保存阻塞的任务。

首先定义任务的数据结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
typedef enum {
TASK_READY = 0,
TASK_BLOCKED,
TASK_TERMINATED
} Task_Status;

typedef void (*task_func)();
typedef struct ros_tcb {
void *sp;
Task_Status status;
uint8_t priority; // 0~255
task_func task_entry;
struct ros_tcb *next_tcb;
} ROS_TCB;

代码中定义了三种任务状态:READY、BLOCKED 和 TERMINATED。如图 2所示,其实还有一种 RUNNING 状态,这种状态存在,但是不需要显式的表明,因为我们会用 current_tcb 来存储当前运行的任务,所以我在编码的后期删除了 RUNNING 状态。

图2 task status

任务的优先级为0~255,规定0为优先级最高,255为优先级最低。所以越小的数字,表示优先级越高。

sp 保存任务的栈指针,我们为每个任务都分配一个单独的栈。我们把 sp 放在结构体的首位也是别有用意的,因为在 3.3.4上下文切换 时,我们需要改变系统的栈指针,将系统栈指针指向一个 ROS_TCB 结构体,也就指向了该任务的栈指针。

next_tcb 指针保存下一个任务,和FreeRTOS和AtomThreads等RTOS不同,ROS的就绪列表是一个单链表。使用双向链表的优势在这里不明显,并且会增加一个指针的内存使用。使用单链表使得链表的操作简单,不容易出错。

接下来就是任务的本体,一个任务函数指针,我们的业务逻辑也是卸载这个函数中。下面是一个任务函数的例子:

1
2
3
4
5
6
void task1() {
for(;;) {
// task code here
}
// should never return, once return the task will be deleted from the ready list
}

一个典型的任务是把逻辑写在无限循环中。当然你也可以从这个任务中 return,这样的话它就是一个 run-to-compeletion 任务,任务状态变为 TERMINATED,很快会被从就绪列表中删除,并永远不再运行。

阻塞队列

1
2
3
4
5
typedef struct ros_timer {
ROS_TCB *blocked_tcb;
uint32_t ticks;
struct ros_timer *next_timer;
} ROS_TIMER

和就绪队列类似,阻塞队列也是一个单链表。但是阻塞队列不是有序的。Ticks 保存任务需要延迟的 System tick 数,当 ticks 数递减为 0 时,唤醒该任务。

初始化

初始化系统

初始化系统分为两步骤,初始化过程中关闭中断:

  1. 配置 timer

  2. 创建一个 idle(默认) 任务到 ready list,保证系统总有任务在运行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
/**
 * @brief  Start the os:
 * 1. init the system timer, start ticking
 * 2. add a idle task into the list
 * @retval ture if os started
 */
bool ros_init() {
  CRITICAL_STORE;
  CRITICAL_START();
  ros_init_timer();
  status_t ok =
      ros_create_task(&idle_tcb, ros_idle_task, MIN_TASK_PRIORITY, idle_task_stack, ROS_IDLE_STACK_SIZE);
  ROS_STARTED = ok == ROS_OK;
  CRITICAL_END();
  return ROS_STARTED;
}

配置Timer

Timer(定时器)的配置根据平台的不同而不同,对与 Arduino Uno,它拥有三个 timer,我这里选择 Timer1 作为ROS产生 system tick 的 timer,因为使用了 Timer1,而 Timer1 也是驱动 PWM 9 和 10 号引脚的定时器。所以当你使用了ROS,就不能把 PWM 9,10做模拟输出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
static void init_timer1() {
  // Set prescaler 256
  TCCR1B = _BV(CS12) | _BV(WGM12);
  // For Arduino Uno CPU 16000000 HZ, so the OCR1A should count from 0 to 624
  // x * 1/16M * 256 = 10 ms = 0.01 s
  // x = 16 M / 100 / 256 = 625
  OCR1A = (F_CPU / 256 / ROS_SYS_TICK) - 1;
  // enable compare match 1A interrupt
  TIMSK1 = _BV(OCIE1A);
}

首先对 Timer1 进行 256 的预分频,我们设置ROS_SYS_TICK为100,表示每秒钟发出100次 System tick,即每次 tick 间隔 10 ms。我们设置Timer1的运行模式为CTC模式(Clear TImer on COmpare Match),所以每当达到这个计数值时,产生一次中断,并重新开始计数。

计算公式: x * 1/16M * 256 = 10 ms = 0.01 s,计算得到 x=625,因为OCR1A是从 0 开始计数的,我们把OCR1A的值设定为 625 - 1 = 624。这样配置 Timer1,就可以让它每 10 ms 产生一次中断,符合我们的System tick。

最后,打开“compare match 1A” 这个中断,将TIMSK1 的 OCIE1A 位 置1 即可。_BV(bit) 是一个宏定义:(1 « (bit))。

创建默认任务

系统初始化时,创建一个idle任务,保证系统在任何时候都有一个任务在运行。Idle任务的优先级最低,为255。

同时idle任务利用了Atmega328p的省电功能,在默认任务中调用睡眠模式以节省能耗。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
/**
 * @brief The idle task takes advantage of atmega328p's sleep mode, sleep when
 * there is no task to run
 */
void ros_idle_task() {
  set_sleep_mode(SLEEP_MODE_IDLE);
  while (1) {
    sleep_mode();
  }
}

上下文是什么?

在开始初始化任务的“上下文”之前,我们先介绍一下在ROS中上下文指的是什么。

ROS 为每个任务都单独分配一个栈,而程序(函数)是运行在栈上的,那么它的上下文就是栈(一块连续的内存空间)。

栈上都有些什么?函数运行过程中,比如我们想要相加两个数,首先这两个数会被存放在寄存器上,然后 CPU 对他们进行计算,再将结果保存在寄存器中。函数的运行实际上就是内存(栈)、寄存器、CPU三者的交互。

如果我们要打断一个函数的运行,而去运行另一个函数(将系统栈指针指向另一个函数的入口)。因为另一个函数也要使用寄存器,所以我们需要在栈上保存寄存器中的值。

初始化上下文

栈空间由用户分配和定义,栈就是一片连续的内存空间,所以可以用数组来存储。函数在栈中运行,PUSH 和 POP 指令也作用于栈。如图 3所示,因为在系统中栈是从内存地址高的位置增长到内存地址低的位置,也就是说 PUSH 指令将递减栈指针,POP指令则递增栈指针。所以在初始化栈的时候,我们传入的是数组末端的地址,这样在 PUSH 和 POP 的时候才不会越界。

图3 栈和数组

需要保存的寄存器也是有讲究的,因为我们用 C 写代码,所以这和 C 编译器有关,不同的编译器规则也不同。ROS 主要针对 AVR 平台,使用avr-gcc作为 C 编译器。

avr-gcc 的 wiki 中有一些规则:

R18-R27,R30,R31,R0,SREG 状态寄存器的T-Flag 都是 Called-Used Register。所谓Called-Used Register在调用函数进入和结束的时候,GCC会生成保存和恢复这些寄存器的指令(PUSH 和POP)。对于ISR,在进入和退出时会保存和恢复这些寄存器。

R2-R17,R28,R29,R1 是Called-Saved Register。如果函数使用了这些寄存器,那么调用者需要负责保存和恢复这些寄存器。所以我们需要手动保存的寄存器就是Called-Saved Register。而 R1是 Fixed Register,GCC也会保存和恢复。

因此在创建一个新任务时,初始化它的栈:留下空间给R2-R17,R28-R29,并赋值为0。因此栈的大小至少为 18 bytes。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void ros_task_context_init(ROS_TCB *tcb_ptr, task_func task_f, void *sp) {
  uint8_t *stack_top = (uint8_t *)sp;
  // pc
  // the function pointer is uint16_t in avr
  *stack_top-- = (uint8_t)((uint16_t)task_shell & 0xFF);         // the LSB
  *stack_top-- = (uint8_t)(((uint16_t)task_shell >> 8) & 0xFF);  // THE MSB
  // Make space for R2-R17, R28-R29
  *stack_top-- = 0x00; // R2
  *stack_top-- = 0x00; // R3
  *stack_top-- = 0x00; // R4
  *stack_top-- = 0x00; // R5
  *stack_top-- = 0x00; // R6
  *stack_top-- = 0x00; // R7
  *stack_top-- = 0x00; // R8
  *stack_top-- = 0x00; // R9
  *stack_top-- = 0x00; // R10
  *stack_top-- = 0x00; // R11
  *stack_top-- = 0x00; // R12
  *stack_top-- = 0x00; // R13
  *stack_top-- = 0x00; // R14
  *stack_top-- = 0x00; // R15
  *stack_top-- = 0x00; // R16
  *stack_top-- = 0x00; // R17
  *stack_top-- = 0x00; // R28
  *stack_top-- = 0x00; // R29
  tcb_ptr->sp = stack_top;
}

需要注意的是,我们在栈上最先PUSH的是task_shell的函数指针,而函数指针在AVR中一般为 16 位,所以需要两个字节,分别存储它的低8位和高8位。使用task_shell,而不直接用任务的task_entry意图很明显:我们可以在task_shell中标记结束的任务,因为ROS是支持run-to-completion的任务的;同时也可以支持传递任务函数的参数,虽然ROS的任务没有参数,但也有考虑到未来系统的扩展。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
/**
 * @brief Wrapper of task function, which can pass param to task(for furture
 * usage), and terminated it and re-schedule when a task run to compeletion
 */
static void task_shell() {
  ROS_TCB *cur_tcb = ros_current_tcb();

  // enable interrupt after context switching finished.
  sei();

  if (cur_tcb && cur_tcb->task_entry) {
    cur_tcb->task_entry();
    // when the task terminated(task return), remove it from ready list and
    cur_tcb->status = TASK_TERMINATED;
  }
  // re-schedule
  ros_schedule();
}

任务调度算法

ROS采用 priority-based 和 Round-robin的调度算法,代码中有三个地方会调用scheduler:

  • Timer ISR 中周期性调用 scheduler

  • 任务主动调用 ros_delay,阻塞自己

  • run-to-completion的任务,在任务结束后需要调用scheduler

ISR

当中断发生时,系统会调用ISR(Interrupt Service Routine)函数来处理中断。Scheduler的实现依赖于Timer的周期性中断和ISR,因此中断是实现Multitask的基础。每个中断名称都定义在中断向量表中,编写ISR函数的基本格式如下:

1
2
3
ISR(interrupt_name_in_vect) {
 //your code
}

AVR平台在进入ISR时会默认关闭中断。但是对于支持嵌套中断的平台,如果我们在嵌套的ISR中进行上下文切换,可能会破坏队列的结构,使得任务的运行顺序和期望的不同,这就和多线程中的与时间相关的错误类似。我们需要一种机制来让在所有的嵌套ISR快结束时才调用scheduler。通过一个计数就可以解决这个问题:

1
2
3
4
5
void ros_int_enter() { ros_int_cnt++; }
void ros_int_exit() {
	ros_int_cnt--;
	ros_schedule();
}

如果 ros_int_cnt 不为 0,则当前处于ISR context中,为0时处于 task context。在 task context 中可以安全的进行任务调度。而这里我们需要处理的中断为Timer1的“compare match 1A”中断。

1
2
3
4
5
6
7
// interrupt every SYS_TICK to re-schedule tasks
ISR(TIMER1_COMPA_vect) {
  ros_int_enter();
  ros_sys_tick();
  // exit ISR, ready to call scheduler
  ros_int_exit();
}
1
2
3
4
void ros_schedule() {
	if (ros_int_cnt != 0 || !ROS_STARTED) return
	// core scheduler code
}

队列操作

对于队列的操作只有两个:入队和出队。

1
2
void ros_tcb_enqueue(ROS_TCB *tcb);
ROS_TCB *ros_tcb_dequeue(uint8_t lowest_priority);

维护一个优先队列,使用排序链表实现。入队操作的时间复杂度为O(N),因为入队时需要把TCB按优先级按序插入;而出队最高优先级的任务时间复杂度为O(1)。Round-robin(轮转调度)算法的实现也体现在入队操作中。对于相同优先级的任务,该任务会入队到其他相同优先级任务的尾部:next_ptr->priority > tcb->priority,配合系统的周期性中断,实现相同优先级的任务按时间片轮转调度。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * @brief enqueue tcb to list order by priority. Do round-robin when priority is
 * same.
 * @param  *tcb: the tcb to insert
 */
void ros_tcb_enqueue(ROS_TCB *tcb) {
  if (tcb == NULL) return;
  ROS_TCB *prev_ptr, *next_ptr;
  prev_ptr = next_ptr = tcb_ready_list;
  do {
    // Insert when:
    // next == NULL
    // next tcb's priority is lower than than the one we enqueuing
    // same priority task will do round-bobin
    if ((next_ptr == NULL) || (next_ptr->priority > tcb->priority)) {
      // list is empty or insert to head
      if (next_ptr == tcb_ready_list) {
        tcb_ready_list = tcb;
        tcb->next_tcb = next_ptr;  // next_ptr maybe NULL
      } else {                     // insert between tow tcb or tail
        tcb->next_tcb = next_ptr;  // next_ptr maybe NUL
        prev_ptr->next_tcb = tcb;
      }
      break;
    } else {
      prev_ptr = next_ptr;
      next_ptr = next_ptr->next_tcb;
    }
  } while (prev_ptr != NULL);
}

出队操作则只需要检查队首的TCB,因为队列是有序的,所以队首的TCB总是优先级最高的,如果队首不满足条件,那么接下来的TCB更不会满足。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * @brief  dequeue a tcb to swap in, requeir its priority no lower than
 * lowest_priority Because the list ordered by priority, we just check the head,
 * if the head is lower than lowest_priority, return NULL. use
 * ros_tcb_dequeue(MIN_TASK_PRIORITY) to dequeue head unconditionally
 * @param lowest_priority: the lowest priority of dequeue tcb or NULL if no such
 * tcb
 */
ROS_TCB *ros_tcb_dequeue(uint8_t lowest_priority) {
  if (tcb_ready_list == NULL || tcb_ready_list->priority > lowest_priority) {
    return NULL;
  } else {
    ROS_TCB *tcb = tcb_ready_list;
    tcb_ready_list = tcb_ready_list->next_tcb;
    if (tcb_ready_list) {
      // make return tcb isolated
      tcb->next_tcb = NULL;
    }
    return tcb;
  }
}

ros_schedule

ros_scheduler函数实现了调度器的主要逻辑:

如果当然在 ISR 中,则直接返回。否则,关闭中断,进入临界区。

如果当前的任务已经结束(run to compeletion)或者被阻塞(任务主动让出 CPU),则无条件调进下一个优先级最高的任务。

否则,从链表中dequeue 一个 new_tcb,如果任务已经终结,则删除任务继续dequeue,直到一个任务符合要求或者为NULL。

如果new_tcb不为 NULL, enqueue当前的任务,调用switch_context。

Scheduler依赖于两个队列操作,操作队列时需要关中断。一个基于优先级的非抢占式调度器基本实现,同时还支持删除 run to complete 的任务。详细的代码和逻辑描述如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void ros_schedule() {
  // no schedule and context switch util the very end of ISR
  if (ros_int_cnt != 0 || !ROS_STARTED) return;
  CRITICAL_STORE;
  ROS_TCB *new_tcb = NULL;
  CRITICAL_START();
  // if current task is NULL or suspend or terminated, a new task will swap in
  // unconditionally
  if (current_tcb == NULL || current_tcb->status == TASK_BLOCKED ||
      current_tcb->status == TASK_TERMINATED) {
    // task with any priority(0~255) can be swap in
    new_tcb = ros_tcb_dequeue(MIN_TASK_PRIORITY);
    if (new_tcb) {
      // Do not enqueue curren_tcb here, when the task is blocked, it is added
      // to timer_queue, it will enqueue when the ticks due.
      ros_switch_context_shell(current_tcb, new_tcb);
    } else {
      // but you can't block the idle task
      if (current_tcb == &idle_tcb) current_tcb->status = TASK_READY;
    }
  } else {
    // remove terminated task
    do {
      new_tcb = ros_tcb_dequeue(current_tcb->priority);
    } while (new_tcb && new_tcb->status == TASK_TERMINATED);
    if (new_tcb) {
      ros_tcb_enqueue(current_tcb);
      ros_switch_context_shell(current_tcb, new_tcb);
    }
  }
  CRITICAL_END();
}

上下文切换

如 初始化上下文 所述,我们在初始化时为Call-Saved Registers(R1,R2-R17,R28,R29) 预留了空间来保存和恢复他们。上下文切换的代码需要用汇编代码编写,因为需要操作指定的寄存器。方便起见,我直接使用 inline assembly 将汇编代码嵌入到 C 函数中,使用的是Intel风格的汇编,source总是在右边。

上下文切换的步骤可以分为四步,如图 4所示:

  1. 保存当前任务的上下文

  2. 更新当前任务的栈指针(保存系统栈指针到old_tcb->sp中)

  3. 将系统栈指针指向新任务的栈指针(将 new_tcb->sp 赋值给系统栈指针)

  4. 恢复新任务的上下文(因为我们现在已经在新任务的栈上)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void ros_switch_context(ROS_TCB *old_tcb, ROS_TCB *new_tcb) {
  // The assembly code is in intel style, source is always on the right
  // Y-reg is R28 and R29
  __asm__ __volatile__(
      "push r2\n\t"
      "push r3\n\t"
      "push r4\n\t"
      "push r5\n\t"
      "push r6\n\t"
      "push r7\n\t"
      "push r8\n\t"
      "push r9\n\t"
      "push r10\n\t"
      "push r11\n\t"
      "push r12\n\t"
      "push r13\n\t"
      "push r14\n\t"
      "push r15\n\t"
      "push r16\n\t"
      "push r17\n\t"
      "push r28\n\t"
      "push r29\n\t"
      // r16, r17, r28 and r29 is saved, we're safe to use them
      "mov r28, %A[_old_tcb_]\n\t" // move old tcb(LSB) to Y-regs
      "mov r29, %B[_old_tcb_]\n\t" // MSB
      "sbiw r28, 0\n\t"        // subract 0 from r29:r28, we need this to set SREG-Z if result is zero
      "breq restore\n\t"           // if old_tcb is NULL, jump to restore
      "in r16, %[_SPL_]\n\t"       // get stack pointer to r17:r16
      "in r17, %[_SPH_]\n\t"
      "st Y, r16\n\t"              // set old_tcb->sp to stack pointer
      "std Y+1, r17\n\t"           // because sp is the first member of the TCB struct
      "restore:"
      "mov r28, %A[_new_tcb_]\n\t"
      "mov r29, %B[_new_tcb_]\n\t"
      "ld r16, Y\n\t"              //load new_tcb->sp to r17:r16
      "ldd r17, Y+1\n\t"
      "out %[_SPL_], r16\n\t"      //change the stack pointer to new_tcb->sp
      "out %[_SPH_], r17\n\t"
      "pop r29\n\t"                // restore new_tcb's context
      "pop r28\n\t"
      "pop r17\n\t"
      "pop r16\n\t"
      "pop r15\n\t"
      "pop r14\n\t"
      "pop r13\n\t"
      "pop r12\n\t"
      "pop r11\n\t"
      "pop r10\n\t"
      "pop r9\n\t"
      "pop r8\n\t"
      "pop r7\n\t"
      "pop r6\n\t"
      "pop r5\n\t"
      "pop r4\n\t"
      "pop r3\n\t"
      "pop r2\n\t"
      "ret\n\t"
      "" ::
      [_SPL_] "i" _SFR_IO_ADDR(SPL),
      [_SPH_] "i" _SFR_IO_ADDR(SPH),
      [_old_tcb_] "r"(old_tcb),
      [_new_tcb_] "r" (new_tcb)
  );
}

保存和恢复寄存器的值分别使用 PUSH 和POP 指令。

在更新当前任务的栈指针时,如果是第一个进行任务调度,那么old_tcb会是 NULL,所以我用了sbiw 和 breq 指令进行判断和跳转。使用 sbiw 从 Y 寄存器中减去0,这样Y寄存器中的值不变,但是如果结果为0的话,就说明Y寄存器是NULL,即old_tcb是NULL。那么SREG的Z标记会被置0。而breq指令会在Z标记置零时,进行跳转。

获取和更新系统栈指针( _SFR_IO_ADDR(SPL) 和 _SFR_IO_ADDR(SPH))时,同样的,因为栈指针有16位,所以需要两个寄存器来保存它的值。又因系统栈指针在IO地址空间内,所以要用in和out指令来获取和更新。

st指令用来把数据从寄存器存回内存,ld则是把内存地址上的数据加载到寄存器。因为我们的old_tcb和new_tcb都是指针(即内存地址),所以要用st和ld指令来读写任务的栈指针。

图 4为从 task1 切换到 task2 的栈示意图:

图4 switch context

ros_delay

到此,我们已经完成了scheduler的核心功能,ROS已经具备运行多任务的能力,但是现在还不能运行多个任务。如果一个高优先级的任务存在,那么其他低优先级的任务永远不会运行,这种行为对于非抢占式调度器是正确的。通过定时中断进行任务调度,而因为任务是有优先级的,如图 2ROS的任务四态模型 所示,只有将高优先级的任务设置为不可调度的状态Blocked,那么低优先级的任务才能调入。

阻塞和唤醒任务的思路如下:

ROS 默认 system tick 为每 10 ms 一次,所以可以 delay 的精度也只能精确到 10。ros_delay()的原理就是利用timer产生的周期性中断。

  1. ros_delay()设置 tcb 的状态为TASK_BLOCKED,并调用 scheduler,则会无条件的调入下一个任务;
  2. 而当前的任务则保存在timer_queue中,每次 system tick,在 ISR 中递减需要延迟的 tick 数;
  3. timer_queue中遍历出 tick 等于 0 的任务,更新任务状态为TASK_READY,并重新enqueue

阻塞任务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// delay current tcb
status_t ros_delay(uint32_t ticks) {
  ROS_TIMER timer;
  ROS_TCB *cur_tcb;
  uint8_t status;
  CRITICAL_STORE;
  cur_tcb = ros_current_tcb();
  if (ticks == 0) {
    status = ROS_ERR_PARAM;
  } else if (cur_tcb == NULL) {
    status = ROS_ERR_CONTEXT;
  } else {
    CRITICAL_START();
    cur_tcb->status = TASK_BLOCKED;
    timer.ticks = ticks;
    timer.blocked_tcb = cur_tcb;
    if (ros_register_timer(&timer) != ROS_OK) {
      status = ROS_ERR_TIMER;
      CRITICAL_END();
    } else {
      status = ROS_OK;
      CRITICAL_END();
      // call scheduler to swap out current task
      ros_schedule();
    }
  }
  return status;
}

唤醒任务:

1
2
3
4
5
6
7
static void wakeup_task(ROS_TCB *tcb) {
  CRITICAL_STORE;
  CRITICAL_START();
  tcb->status = TASK_READY;
  ros_tcb_enqueue(tcb);
  CRITICAL_END();
}

构建

我使用make来构建ROS。对于一个比较大的C项目,如果每次做一点修改,都要去一个个编译,链接,而且编译和链接是有顺序的,每次如此就很繁琐。

使用make,编写Makefile来构建整个项目,只要编写一次Makefile,接下来就可以实现自动构建。Makefile的基本格式如下:

1
2
3
4
# comment
# (note: the <tab> in the command line is necessary for make to work)
target:  dependency1 dependency2 ...
      <tab> command

一个构建的目标分为依赖和命令。下面贴上ROS的Makefile:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
CC=avr-gcc
OBJCOPY=avr-objcopy
SIZE=avr-size
DUDE=$(ARDUINO_DIR)/hardware/tools/avr/bin/avrdude
SIMAVR=simavr

MCU=atmega328p
FCPU=16000000L

ARDUINO_DIR=/home/leer/program/arduino-1.8.9
DUDE_CONF=$(ARDUINO_DIR)/hardware/tools/avr/etc/avrdude.conf
BUILD_DIR=build

CFLAGS=$(INCLUDES) -g -Wall -Werror -Os -mmcu=$(MCU) -DF_CPU=$(FCPU) -DARDUINO=10809 -DARDUINO_AVR_UNO -DARDUINO_ARCH_AVR

TARGET=ros
SOURCE=$(wildcard *.c)
# *.c -> build/*.o
OBJS=$(addprefix $(BUILD_DIR)/,$(SOURCE:.c=.o))

all: $(BUILD_DIR) $(BUILD_DIR)/$(TARGET).hex

$(BUILD_DIR)/%.o: %.c
  $(CC) $(CFLAGS) -c $< -o $@

$(BUILD_DIR):
  mkdir -p $(BUILD_DIR)

# Build elf, depends on *.o
$(BUILD_DIR)/$(TARGET).elf: $(OBJS)
  @echo Building $@...
  $(CC) $(CFLAGS) $(OBJS) -o $@
  $(SIZE) -C --mcu=$(MCU) $@

# Build hex, depends on elf
$(BUILD_DIR)/$(TARGET).hex: $(BUILD_DIR)/$(TARGET).elf
  @echo Building $@...
  $(OBJCOPY) -j .text -j .data -O ihex $< $@

upload:
  @echo Upload ROS to board...
  $(DUDE) -C $(DUDE_CONF) -v -p $(MCU) -c arduino -P /dev/ttyACM0 -b 115200 -D -U flash:w:$(BUILD_DIR)/$(TARGET).hex:i
.PHONY: clean
clean:
  rm -rf build

其中的默认目标 all 将构建 $(BUILD_DIR)/$(TARGET).hex 而其依赖于$(BUILD_DIR)/$(TARGET).elf,而它又依赖于 OBJS,所以最终的构建顺序为:

OBJS -> $(BUILD_DIR)/$(TARGET).elf -> $(BUILD_DIR)/$(TARGET).hex

Upload 将构建好的 hex 文件通过 avrdude 工具上传到 avr开发板上,这里为atmega328p的Arduino Uno。同时也可以自定义 MCU 变量来为不同的开发板构建和上传。

示例

下面的代码是使用ROS来以不同的频率闪烁两个LED的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
 * Blink example in ROS, DO NOT use any function in Arduino.h
 */
#include <avr/io.h>
#include "ros.h"

// include for simavr
// #include "avr_mcu_section.h"
// AVR_MCU(F_CPU, "atmega328p");
ROS_TCB task1;
ROS_TCB task2;
uint8_t task1_stack[ROS_DEFAULT_STACK_SIZE];
uint8_t task2_stack[ROS_DEFAULT_STACK_SIZE];

#define LED1 13
#define LED2 12
#define bitSet(value, bit) ((value) |= (1UL << (bit)))
#define bitClear(value, bit) ((value) &= ~(1UL << (bit)))

#define TASK1_PRIORITY 1
#define TASK2_PRIORITY 0  // max priority

void t1() {
  while (1) {
    // set LED1 high
    bitSet(PORTB, 5);
    ros_delay(200);
    bitClear(PORTB, 5);
    ros_delay(200);
  }
}

void t2() {
  while (1) {
    bitSet(PORTB, 4);
    // delay a second
    ros_delay(100);
    bitClear(PORTB, 4);
    ros_delay(100);
  }
}

void setup() {
  // set LED 13 and LED 12 as output
  bitSet(DDRB, 5);
  bitSet(DDRB, 4);
  bool os_started = ros_init();
  if (os_started) {
    ros_create_task(&task1, t1, TASK1_PRIORITY, task1_stack, ROS_DEFAULT_STACK_SIZE);
    ros_create_task(&task2, t2, TASK2_PRIORITY, task2_stack, ROS_DEFAULT_STACK_SIZE);
    ros_schedule();
  }
}

void loop() {
  // nothing
}

int main() {
  setup();
  // never call loop
  loop();
  return 0;
}

两个任务运行流程如下:

  1. 系统初始化和创建任务之后,Task1,Task2和默认任务都在就绪队列中,最高优先级的Task2被选中运行,将LED2 点亮,然后遇到ros_delay(100),Task2将被阻塞1秒。
  2. Task1得以运行,将LED1点亮,然后调用ros_delay(200),将阻塞两秒。
  3. 两个任务都被阻塞,默认任务得以运行。
  4. 一秒之后,Task2被唤醒,继续运行,将LED2熄灭,接着Task2又将被阻塞1秒。
  5. 两秒之后,这时Task1和Task2都被唤醒,优先级较高的Task2运行,将LED2 点亮,调用ros_delay(100)阻塞。
  6. Task2得以运行,将LED1熄灭,调用ros_delay(200)阻塞。

这两个任务就以这样的次序不断运行下去,形成的效果就是LDE1会每隔2秒点亮和熄灭,而LED2则每间隔1秒点亮和熄灭。

当然这只是一个简单基础的例子,使用ROS提供的API和Semaphore、Queue 等任务通讯工具就可以构建复杂的multitask应用。

和FreeRTOS比较

在两个方面和FreeRTOS进行比较:比较ROS和FreeRTOS的代码量(不包括注释);使用FreeRTOS写出和以上示例相同功能的代码,比较它们二进制文件的大小。

对于内存有限的嵌入式系统来说,二进制文件的大小也是一个重要的考虑因素。如 图表 2 所示。FreeRTOS在编译时不包含semaphores、queues、mutexs,只包含了设计任务调度的代码,并且两者编译时使用相同的编译器参数。ROS和FreeRTOS的二进制文件,在数据存储空间相差不大;在程序存储空间ROS占用3484字节,占总空间的10.6%,而FreeRTOS占用8424字节,占总空间的25.7%。

两者在代码量上的比较,FreeRTOS为4722行,ROS为677行。FreeRTOS的代码量是ROS的7倍,这是因为FreeRTOS有很多的功能和为通用性考虑的代码。ROS显得更加简洁,适合小型项目和个人学习。

ros-compare-with-freertos

总结

ROS拥有一个非抢占式的任务调度器,对于优先级相同的任务,使用 round-robin算法。

在实现系统时,架构往往比写代码更加重要,如果一开始的程序设计和思路是错误的,那么接下来做的都是徒劳。架构是为了理清思路,而不去纠结细节。ROS的架构和FreeRTOS类似:利用 timer 产生周期性中断,在ISR中调用 scheduler,选择优先级最高的任务(对于相同优先级的任务,使用 round-robin),之后进行上下文切换。同时任务可以主动让出 CPU,让其他低优先级的任务切入。

而花在架构和写代码上的时间也印证了这一点,我用了两星期的时间学习和阅读RTOS的实现思路和代码和其他工具的使用,而写代码用了一个星期的时间。

关于汇编:在写汇编代码的时候,需要时刻保持谨慎。因为写错了真的很难 debug。

关于测试:单元测试在很好用,单独测试每个模块,可以确保这个模块是没有问题的,那么出现错误的时候,我就不会怀疑这个模块,当然也不是绝对的。

在实现ROS的过程中,书上的理论和实际的实现还是有差距的,也发现自己知识的不足(主要是inline assembly 和上下文切换)。也学到了很多关于 AVR 处理器的知识,比如:程序是如何在栈上运行的和上下文切换的具体实现。即使 ROS没有达到生产环境使用的级别,ROS 也让我更加了解实时操作系统的底层实现。

参考文献

[1]Barry, R. FreeRTOS. https://www.freertos.org.

[2]Lawson, K. AtomThreads. http://atomthreads.com

[3](美)DaleWheat著,翁恺译.Arduino技术内幕[M].人民邮电出版社,2013:332.

[4] [美]Michael Margolis.Arduino权威指南(第2版)[M].人民邮电出版社,2015:607.

[5]费翔林,骆斌.操作系统教程(第5版)[M].高等教育出版社,2014:431.

[6]朱迪. FreeRTOS实时操作系统任务调度优化的研究与实现[D].南京邮电大学,2015.

[7]AVR Assembly Instruction

[8]Avr-gcc wiki