RT-Thread的位图调度算法分析(最新版)

RT-Thread的内核调度算法

rt-thread的调度算法为基于优先级调度和基于时间片轮转调度共存的策略。rt-thread内核中存在多个线程优先级,并且支持多个线程具有同样的线程优先级。线程级别数目在rtconfig.h中以宏定义的方式配置,

当系统存在多个线程时,可能的情况是,某些线程具有不同的线程优先级,但是还有一些线程具有相同的优先级。rt-thread采用的调度策略是:

  • 不同优先级的线程,采用可抢占的方式:就绪的高优先级的线程会“立刻”抢占低优先级的线程;

  • 同线程优先级别的多个线程则采用时间片轮转,同级线程依次获得CPU时间

 

在上面的情形中,摆在rt-thread面前的问题是,如何从多个线程优先级别中找出当前优先级最高的那个线程,并调度执行。

rt-thread的内核调度算法采用位图(bitmap)实现,算法时间复杂度为O(1)(注,O(1)定义,请参考数据结构相关【书籍】,即每次调度的时间恒定:无论当前的系统中存在多少个线程,多少个优先级,rt-thread的调度函数总可以在恒定的时间内选择出最高优先级的线程执行。

线程结构存储

寻找当前线程优先级最高的线程并调度执行,首先需要解决线程数据结构的存储问题。下面先来分析rt-thread中如何存储多个线程的数据结构。

先做几点说明:

  1. 每个线程的信息用线程控制块(Thread Control-Block,缩写为TCB)表示,它是定义在rtdef.h中的struct结构体,用来描述一个线程所有必要信息;

  2. 线程的优先级别用非负整数(即无符号整数)表示。数值越小,优先级越高;

  3. 系统的线程优先级的数目固定,最多支持256级;

  4. 系统中的线程数目不做任何限制,线程的数目仅受限于系统RAM的大小。

读者不妨思考最后两点,当系统存在多个的线程时,如何存储线程控制块才能满足要求?

  • 线程的优先级别数目固定。使用数组存储TCB,数组的长度即为线程优先级的数目,数组的每个元素为一个指向TCB数据结构的指针。

  • 线程数目不受限制。那当某个线程优先级上存在多个线程时,这些TCB显然没办法存储在上面定义的数组对应的优先级位置上,那么使用链表,链表是一种数据结构,每个元素彼此链接,TCB中有一个链接下一个TCB的“链表数据结构”,如同一个钩子一样。

这样就可以达到上面提及的两点设计要求,不同线程优先级的线程的TCB分别存在线程TCB数组对应优先级的位置上。对于相同优先级别的多个线程,我们只需要将该优先级的第一个就绪线程的TCB存储在线程TCB数组中相关位置,后续同级线程通过链表依次连接。

scheduler.c

...
(1) rt_list_t rt_thread_priority_table[RT_THREAD_PRIORITY_MAX];
(2) struct rt_thread *rt_current_thread;

(3) rt_uint8_t rt_current_priority;

#if RT_THREAD_PRIORITY_MAX > 32
/* maximun priority level, 256 */
(4) rt_uint32_t rt_thread_ready_priority_group;
(5) rt_uint8_t rt_thread_ready_table[32];
#else
/* maximun priority level, 32 */
(6)rt_uint32_t rt_thread_ready_priority_group;
#endif

假定RT_THREAD_PRIORITY_MAX这个宏为256

  • 语句(1)即定义了线程TCB数组。该数组存储rt_list_t类型的元素,实际上这就是个链表。

  • 语句(2)中定义了一个指针,从名称上来看,即当前线程,struct rt_thread就是线程TCB数据结构类型。

  • 语句(3)定了当前的线程优先级

  • 语句(4)、(5)为位图调度算法的必要数据结构,下文详细展开

rt-thread中的线程数据结构的存储问题已经解决,接下来分析位图调度算法实现。

位图调度算法

调度算法首先要找出所有线程优先级中优先级最高的那个线程优先级。系统中某些线程优先级上可能不存在线程。也就说,rt_thread_priority_table数组中某些元素为空,因此要找出该数组中第一个非空的元素。

调度算法1

for(i=0; i<256; i++)
{
    if(rt_thread_priority_table[i] != NULL)
        break;
}
highest_ready_priority = i;

上面策略可以工作,但是它的问题是运行时间并不固定,如果当前系统中具有最高优先级的线程对应的优先级的数字为0级,循环一次就可以找出,如果很不幸,从0级到254级上都没有就绪的线程,仅在255级上有就绪的线程,这个调度函数不得不在检查了数组这256个元素之后,才能找出可以运行的线程。

这个算法虽然直接简单,但是太低效,而且运行时间也不稳定,作为嵌入式实时操作系统这是不可接受的。我们需要寻找一种具有恒定执行时间的调度算法 。

首先来考虑,每一个优先级上是否存在线程,这是一个是/否问题,要么存在线程,要么不存在,这可以用一个bit位来表示。我们规定这个bit为1表示存在线程,为0表示不存在线程。

对于256级的线程,则共需要256个bit位。理想的情况是,创建一个具有256个bit的变量,操作系统使用这个变量来维护整个系统所有对应优先级上是否存在活动的线程。显然,C语言不支持:-(,但是256个bit也就是32个字节,定义一个32字节长的数组,然后将它看作整体。

现在需要约定,这32个字节和256个线程优先级的对应关系。一个字节的最高位为bit7,最低位为bit0,用bit0表示更高的优先级,用BIT7表示稍低的优先级。

来考虑这32个字节中的第一个字节。第一个字节的bit0用来表示优先级0,bit7表示优先级7。第二个字节bit0表示优先级8,bit7表示优先级15。其他依次类推。如下表格描述了这32个字节的各个bit是和系统的256个优先级的对应关系。

       bit7 6   5   4   3   2    1  0
byte0 |007|006|005|004|003|002|001|000|
byte1 |0l5|014|013|012|011|010|009|008|
.................................
byte32|255|254|253|252|251|250|249|248|

每行对应一个字节,每一列为各个bit位,单元格中的内容表示对应的优先级。

上面这32个字节所组成的256个bit,他们的排列方式很像一张图(map),所以这种方式就别称为位图(bit map)。这张图就是scheduler.c中定义的32个字节的数组:

rt_uint8_t rt_thread_ready_table[32];

举个例子,我们创建了一个线程,并且指定了它的优先级是125,然后将它设置为就绪(READY),实际上在我们在调用函数将它变为READY的函数中,RTT就会去上面这256个bit中(也即是这32个字节),找到第125个bit,我称之为位图的BIT125, 也即是字节15的第5个bit,将这个bit置1。 即位图的BIT125,就是rt_thread_ready_table[125/8]的BIT5.我们可以用位代码表示为

BITMPA.BIT_125 = rt_thread_ready_table[125/8].BIT5

优先级125 对应那个字节的哪个bit呢?

这里有个换算关系。其计算公式 :

优先级别除以8的商取整数即对应位图中的字节
优先级别除以8的余数就是对应位图字节中的bit位

优先级125,125/8=15,125%8=5,位图的BIT125就是rt_thread_ready_table[15]的BIT5

为了叙述的方便,做如下说明:

位图,就指的是数组rt_uint8_t rt_thread_ready_table[32]这32个字节组成的256个bit。

内核需要根据各个线程的状态实时的更新这个位图。当优先级为125的不再存在就绪的线程时,操作系统就需要将位图的BIT125清0,当一个线程状态为READY后,则需要将这个线程的优先级在位图中对应的BIT位置1。

寻找优先级最高的线程的问题,就变成从位图中找出第一个为1的bit的位置。比如说,内核中存在三个线程A、B、C, 优先级分别为5、25、125。即位图中BIT5,BIT25,BIT125分别为1,其余bit位全部为0。调度程序得能找出非零的当前优先级最高的BIT位。也就是BIT5,对应的优先级为5。

下面是一种显然的调度思路,即依次遍历数组rt_thread_priority_table,找出第一个非0的bit,这就是当前存在就绪线程的最高优先级。根据指针取出当前线程TCB,进而调度执行。

调度算法2

for(i=0; i<32; i++)
{
    for(j=0; j<8; j++)
    {
        if (rt_thread_priority_table[i] & (1<<j) ) 
            break;//这就是找到最低的那个为1的bit位置了。
    }
    //下面就是我们找到的最高优先级
    highest_ready_priority = i * 8 + j;
}

该调度算法双层for循环可能只循环一次,也可能会循环256次,这取决于位图中位图中为1的最低BIT的位置。如果BIT0为1,则执行一次即跳出循环,如果BIT0-BIT254都是0,仅BIT255为1,则循环256次。 平均来说, 双层for循环的次数大约是 255/2 次。即与优先级数目N成正比。

每次调度函数执行的时间不恒定,取决于当前线程的优先级分布状况。这种调度策略从整体上说执行的时间是O(n)的,即调度算法的平均执行时间跟优先级数目成正比。这种方式本质上跟调度算法1一样,依然不能实现在恒定时间完成调度的目标。

RT-Thread的调度算法

将位图看作一个变量,并假定当前优先级别为8,则位图变量可以用一个字节表示。考虑位图变量的取值范围,当位图所有BIT0全为0时,位图变量的值就是0,当位图所有BIT位都是1时(表示所有线程优先级上都存在就绪的线程,此时最高优先级为0级),则位图变量的值是255。反过来,如果当位图变量为1时,此时位图的BIT0为1,即最高优先级为优先级0,同样,位图变量为255时,最高优先级依然是0。 当位图变量为6时,BIT2=1,BIT1=1,即最高优先级为1。因此当位图变量取0-255之间的任意一个数字时,它的最低为1的BIT位置都是预知的。可以预先将这位图变量的所有取值所对应的最高优先级计算出来,并存成一张表格,然后就可以避免算法2中的for循环,而只需要查表即可,执行时间自然是恒定的。查表法就是一种常用的用空间换取时间的方法。

位图取值 最低为1的bit位

0x01 0 (第0个bit位为1)
0x02 1 (第1个bit位为1)
0x03 0 (第0个bit位为1)
....
0xff 0 (第0个bit为1)

注意0x0比较特殊,全部bit位都是0,返回0但不表示其第0位为1。只是为了数组整齐所以填充0。

可以写个简单的程序来生成位图首BIT表,我写了个python程序,

gettab.py

#coding=gbk
#打印一个字节的最低bit位,可能的值为0,1,2,3,4,5,6,7
samples = 256

def getlowbit(byte):
    c = 0
    for i in range(0,8):
        if(byte & 0x01):
            return c
        c = c+1
        byte = byte >> 1
    return 0

line =""
for i in range(0,samples):
    print "%d," %getlowbit(i),
    if((i+1)%16 == 0):
        print "\n

就可以得到如下的表了:

const rt_uint8_t rt_lowest_bitmap[] =
{
    /* 00 */ 0, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* 10 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* 20 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* 30 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* 40 */ 6, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* 50 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* 60 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* 70 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* 80 */ 7, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* 90 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* A0 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* B0 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* C0 */ 6, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* D0 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* E0 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
    /* F0 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0
};

当进程优先级为8时,直接查表就得到最高优先级了。

当系统存在32个优先级时,如果采用类似方案直接制作表格的话,表格的元素个数将是2**32=4G字节,显然这是不可接受的。32个优先级,即4个字节,正好可以用一个uint32_t变量存储。查找首个非0位,可以将其分拆为4个字节,以此查询上表。

假定u32 rt_thread_priority_bitmap维护着当前系统优先级位图。

调度算法3-32级优先级查找最高优先级算法

//kservice.c
int __rt_ffs(int value)
{
    if (value == 0) return 0;

    if (value & 0xff)
        return __lowest_bit_bitmap[value & 0xff] + 1;

    if (value & 0xff00)
        return __lowest_bit_bitmap[(value & 0xff00) >> 8] + 9;

    if (value & 0xff0000)
        return __lowest_bit_bitmap[(value & 0xff0000) >> 16] + 17;

    return __lowest_bit_bitmap[(value & 0xff000000) >> 24] + 25;
}

这就解决了32个系统优先级时的调度问题,现在来考虑线程优先级为256的情况。读者可能觉得这没什么不同,256个bit=32个字节,依然采用算法3的思路,对着32个字节依次查表。问题是,当位图变量有32个字节时,依次查表耗费的时间就不可以忽略了,为了提升系统实时调度的性能,需要对算法3进行改进。

为了解决这个问题,RT-Thread引入了二级位图。

即256个bit由32个字节存储,每一个字节的8个bit代表着位图变量中的8个优先级,如果某个字节非0,则表示其中必有非0的bit位。

rtt中对应的数组为rt_uint8_t rt_thread_ready_table[32]

所谓二级位图,即先确定32个字节中最低的非0的字节。为了实现这个效果,现对这32个字节引入一个32个bit的位图变量,每一个bit位表示对应的字节是否为0。例如,这个32bit的位图变量的BIT5为0,表示系统线程优先级256bit所分成的32个字节中的第五个字节非0。为了区分,称这个32个bit的位图变量为字节位图变量,这就是rt-thread中使用的是rt_thread_ready_priority_group.

这样查找系统系统最高优先级时,先确定非0的最低字节,这实际上依然是算法3,然后再对该字节进行查表,即得到该字节内最低为1的bit位,然后两者叠加(注意不是简单的加)即可。

根据上面的分析,要想使用这个二级位图算法,rt-thread在跟踪线程的状态转换时,不仅需要维护256bit的位图变量数组rt_thread_ready_table[thread->number] |= thread->high_mask,还需要维护32bit的字节位图变量 rt_thread_ready_priority_group。参看如下代码。

// thread.c
rt_err_t rt_thread_startup(rt_thread_t thread)
{
    ...
    /* set current priority to init priority */
    thread->current_priority = thread->init_priority;

(1) thread->number      = thread->current_priority >> 3; /* 5bit */
(2) thread->number_mask = 1L << thread->number;
(3) thread->high_mask   = 1L << (thread->current_priority & 0x07); /* 3bit */
    ...
}

void rt_schedule_insert_thread(struct rt_thread *thread) 
{
    ...
#if RT_THREAD_PRIORITY_MAX > 32
(4) rt_thread_ready_table[thread->number] |= thread->high_mask;
#endif
(5) rt_thread_ready_priority_group |= thread->number_mask;
    ....
}

初始化线程时,指定了线程的优先级别thread->init_priority,由于线程优先级为0到255,一个字节就可以表示。但是bitmap是32个字节。为了调高效率,最好能快速向位图的对应的bit写1。

  • 语句(1)thread->current_priority >> 3,等价于除以8,移位效率效率更高。

  • 上面除法的余数,就表示这个优先级在上面字节中的第几个bit。这个余数可以使用 (thread->current_priority & 0x07)来表示。

  • 语句(3)是得到该bit对应的权值。例如一个字节的bit7对应的权值即 (1<<7),这样做是为了使用“位与,或,非”等位运算,可以提高运行速度,即语句(4)。

  • 语句(4)表示了这几个变量作用。可见,根据某个表示优先级的数字向位图中相应的bit位写入了1。

  • 那么语句(2)和(5)是做什么用的呢? 这个number_mask实际上是为了加快查找位图的速度而创建的。它将在rt_schedule函数中发挥作用。

thread->number表示当前线程优先级在32个字节的位图数组中的字节位置。为了提高效率,rt-thread另外使用了一个u32类型的变量rt_thread_ready_priority_group来加快速度。如果这32个bit中某一个bit为1,就表示对应的某个字节非0(想想看,这意味着该字节所表示的8个优先级中存在就绪线程)。

rt_thread_ready_priority_group变量为32位宽度,长度上等于4个字节,因此可以对每一个字节查表(上面生成的表格)就可以得到为1的最低的bit位置。

概括起来就是,rt-thread首先确定32个字节的位图中,非0的最低的那个字节,然后再查表得到这个字节非0的最低那个bit。这两步骤正好可以利用两次上面的表格rt_lowest_bitmap

下面是rt_schedule的核心逻辑,非必要的代码被我隐去。读者可以对比下面的代码理解思路

// scheduler.c
void rt_schedule(void)
{
    ....
    register rt_ubase_t highest_ready_priority;

#if RT_THREAD_PRIORITY_MAX == 8
    highest_ready_priority = rt_lowest_bitmap[rt_thread_ready_priority_group];
#else
#if RT_THREAD_PRIORITY_MAX <= 32
        highest_ready_priority = __rt_ffs(rt_thread_ready_priority_group) - 1;
#else
        register rt_ubase_t number;

        number = __rt_ffs(rt_thread_ready_priority_group) - 1;
        highest_ready_priority = (number << 3) + __rt_ffs(rt_thread_ready_table[number]) - 1;
#endif
    ....
}

// kservice.c
int __rt_ffs(int value)
{
    if (value == 0) return 0;

    if (value & 0xff)
        return __lowest_bit_bitmap[value & 0xff] + 1;

    if (value & 0xff00)
        return __lowest_bit_bitmap[(value & 0xff00) >> 8] + 9;

    if (value & 0xff0000)
        return __lowest_bit_bitmap[(value & 0xff0000) >> 16] + 17;

    return __lowest_bit_bitmap[(value & 0xff000000) >> 24] + 25;
}

one more thing

可以看出位图调度算法的核心就是查找字节最低非0 bit位的查表法软件实现,是整个位图调度算法的核心。ARM公司提供专门的指令获取寄存器最低位,只要几条汇编语句就可以完成同样的功能,而且性能更好。

rt-thread作为一款成熟商用的RTOS内核,也支持使用CPU指令实现查找字节最低非0位,这部分代码在libcpu/arm//cpuport.c中,以cortex-m3的为例,代码如下

// libcpu/arm/cortex-m3/cpuport.c

__asm int __rt_ffs(int value)
{
    CMP     r0, #0x00
    BEQ     exit

    RBIT    r0, r0
    CLZ     r0, r0
    ADDS    r0, r0, #0x01

exit
    BX      lr
}

通过编译器提供的内联汇编功能,在C语言程序中直接使用汇编指令实现原本软件查表实现的功能,代码更少,性能更好。