本文是为那些希望非常深层次的理解RCU的骨灰级黑客准备的。这些黑客应当首先阅读《深入理解RCU》系列文章的第1~6篇。骨灰级代码狂也可能有兴趣直接看看本文。

本文分别描述如下内容:

1、数据结构和内核参数

2、外部函数接口

3、初始化过程

4、CPU热插拨接口

5、一些杂项函数

6、优雅周期检测机制

7、dynticks-idle接口

8、处理离线及dynticks-idle CPU的函数

9、报告CPU卡顿的函数

10、报告可能的设计缺陷和问题。

1. 数据结构及内核参数

全面的理解分级RCU数据结构,对于理解其算法而言是十分重要的。具体包含如下数据结构:

1、用来跟踪每一个CPU的dyntick-idle 状态的数据结构

2、per-node数据结构的每一个字段

3、每CPU rcu_data 结构

4、全局rcu_state 数据结构

5、控制分级RCU的内核参数。

1.1. 跟踪 dyn-tick 状态的数据结构

每CPUrcu_dynticks数据结构使用下面的字段跟踪dynticks 状态:

1. dynticks_nesting:这个整型值是嵌套计数,表示在相应的CPU中,应当被监控的RCU读端临界区的数量。如果CPU处于dynticks-idle模式,那么就不应当存在RCU读端临界区。此时这个值是中断嵌套级别,否则它比irq中断嵌套级别更大。

2. dynticks:如果相应的CPU处于dynticks-idle模式,并且没有中断处理函数正在该CPU上运行,则这个计数值是偶数,否则是奇数。换句话说,如果计数值是奇数,那么相应的CPU可能处于RCU读端临界区中。

3. dynticks_nmi:如果相应的CPU 处于NMI处理函数中,则这个整型计数器的值是奇数。否则,该计数器是偶数。

rcu和rcu_bh共享这个值。

1.2. 分级RCU实现中的节点

rcu_node是被放到 rcu_state结构内的。每一个rcu_node有下面的字段:

1. lock:这个spinlock保护这个结构中的不是常量的字段。这个锁在软中断上下文中获取,因此必须禁止中断。

根rcu_node的lock字段还有另外的作用:

ü 串行化CPU卡顿检测,这样仅仅只会有一个CPU报告CPU卡顿事件。这在上千个CPU的系统中是很重要的!

ü 串行化启动一个新的优雅周期,这样多个CPU不会同时开始相冲突的优雅周期。

ü 在代码需要限制在单个优雅周期中运行时,防止开始一个新的优雅周期。

ü 将状态机中强制产生静止状态的行为串行化,这样可以将重新调度IPI降低到适当的数目。

2. qsmask:这个位图掩码跟踪哪些CPU(rcu_node 叶子节点)或者CPU组(rcu_node 非叶子节点)仍然需要经历一个静止状态,以结束当前优雅周期。

3. qsmaskinit:这个位图掩码跟踪哪些CPU (rcu_node 叶子节点)或者CPU组(rcu_node 非叶子节点)仍然需要经历一个静止状态,以结束后续的优雅周期。CPU热插拔代码维护qsmaskinit 字段,在开始每一个优雅周期时,将它们复制到相应的qsmask字段。这个复制操作,是优雅周期初始化过程需要与CPU热插拨代码互斥的原因之一。

4. grpmask:这个位图掩码中的位,与这个rcu_node结构在父rcu_node结构中qsmask和qsmaskinit中的位置是一致的。使用这个字段简化了静止状态处理,这是 Manfred Spraul建议的。

5. grplo:这个字段表示这个rcu_node包含的编号最小的CPU或者组。

6. grphi:这个字段表示这个rcu_node包含的编号最大的CPU或者组。

7. grpnum:这个字段包含与这个rcu_node对应的父 rcu_node结构的qsmask和qsmaskinit字段的位编号。换句话说,给定一个rcu_node结构指针rnp,总有1UL<< rnp->grpnum == rnp->grpmask。grpnum字段用于调试目的。

8. level:对根rcu_node结构来说,这个字段是0。根的子结点是1,依此类推。

9. parent:这个字段指向父rcu_node结构,对根结点来说,其值为NULL。

1.3. 每CPU数据

rcu_data 数据结构包含RCU的每CPU状态。它包含管理优雅周期和静止状态(completed, gpnum, passed_quiesc_completed, passed_quiesc,qs_pending, beenonline, mynode, 和 grpmask)的控制变量。rcu_data数据结构也包含关于RCU回调的控制变量 (nxtlist, nxttail, qlen,和 blimit)。打开dynticks的内核在rcu_data数据结构中也有相关的控制变量(dynticks, dynticks_snap, 和dynticks_nmi_snap)。rcu_data数据结构包含用于跟踪的事件计数器 (dynticks_fqs, offline_fqs, resched_ipi)。最后,还有一对字段,对调用rcu_pending()进行计数,以确定何时强制进行静止状态(n_rcu_pending),以及一个cpu字段标识哪一个CPU与该rcu_data结构对应。

每一个字段描述如下:

1. completed:这个字段包含本CPU已经完成的优雅周期编号。

2. gpnum:这个字段包含本CPU最近启动的优雅周期编号。

3. passed_quiesc_completed:本字段包含本CPU最近经过静止状态时,已经完成的优雅周期编号。最近完成的编号,要从该CPU经历静止状态的角度来看:如果CPU没有观察到优雅周期42完成,它将记录其值为41。这是对的,因为优雅周期能够完成的唯一方法就是这个CPU已经经历过一次静止状态。这个字段被初始化为一个虚构的次数,以避免在boot和CPU上线时产生竞争条件。

4. passed_quiesc:这个字段表示自从存储在passed_quiesc_completed的优雅周期完成以来,本CPU是否经历过一次静止状态。

5. qs_pending:这个字段表示本CPU已经注意到RCU核心机制正在等待它经历一次静止状态。当CPU检测到一个新的优雅周期,或者一个CPU上线时,将这个字段设置为1。

6. beenonline:这个字段初始化为0,当相应的CPU上线时被设置为1,这用于避免处理从没有上线的CPU,当NR_CPUS大大超过实际的CPU数量时,这是有用的。

7. mynode:这个字段指向处理相应CPU的rcu_node叶子节点。

8. grpmask:这个掩码字段只有一个位,表示mynode->qsmask 中哪一位与相应的CPU对应。

9. nxtlist:这个字段指向本CPU最老的RCU回调(rcu_head 结构),如果本CPU最近没有这样的回调,就设置为NULL。其他的回调可能通过它们的next指针链接在一起。

10. nxttail:这是一个二次间接指针数组,每个元素指向nxtlist 回调链表尾部。如果nxtlist是空的,那么所有nxttail 指针直接指向 nxtlist 字段。每一个nxttail数组节点有如下意义:

ü RCU_DONE_TAIL=0:这个元素是CPU在它经历优雅周期时最后调用的回调函数的->next 字段。如果没有这样的回调函数,则它指向nxtlist 字段。

ü RCU_WAIT_TAIL=1:等待当前优雅周期结束时,最后一个回调函数的->next指针,如果没有这样的回调函数,就等于RCU_DONE_TAIL 元素。

ü RCU_NEXT_READY_TAIL=2:等待下一个优雅周期时的回调函数的next字段,如果没有这样的回调函数,则等于RCU_WAIT_TAIL 元素。

ü RCU_NEXT_TAIL=3:链表中的最后一个回调函数的next指针,如果链表为空,就是nxtlist字段。

11. qlen:在nxtlist链表中排队的回调函数数量。

12. blimit:在某一个时刻可以调用的回调函数最大值。在高负载情况下,这个限制增强了系统响应性能。

13. dynticks:与cpu对应的rcu_dynticks结构。

14. dynticks_snap:dynticks->dynticks曾经经历过的值,用于中断处理函数中检查CPU何时经历过一次dynticks idle状态。

15. dynticks_nmi_snap:dynticks->dynticks_nmi曾经经过的值,在CPU NMI处理函数中检查CPU何时经历过一次dynticks idle状态。

16. dynticks_fqs:其他CPU由于dynticksidle而标记一次静止状态的次数。

17. offline_fqs:其他CPU由于离线而标记一次静止状态的次数。

18. resched_ipi:向相应CPU发送的重新调度IPI次数。这些IPI被发向那些没有及时报告自己经历过静止状态的CPU,但既不向离线CPU,也不向处于dynticksidle 状态的CPU发送这样的IPI。

19. n_rcu_pending:调用rcu_pending()的次数,在一个非dynticks-idle 的CPU上,每一个jiffy将调用一次这个函数。

20. n_rcu_pending_force_qs:n_rcu_pending上限。如果 n_rcu_pending达到此值,表示当前优雅周期延迟太久,则调用force_quiescent_state()。本字段在2.6.32版本中已经取消,但是不影响我们理解RCU代码。

1.4. RCU全局状态

rcu_state 结构包含每一个RCU实例(rcu 和 rcu_bh)的全局状态。包括与分级rcu_node相关的字段,包括结点数组本身,分级数组包含指向分级级别的指针,levelcnt数组包含每一级结点计数,levelspread数组包含每一个分级的子结点数量。rda 数组是每一个CPU的rcu_data结构指针。rcu_state也包含一定数量的,与当前优雅周期相关的字段,以及与其他机制交互的字段(signaled、gpnum、completed、onofflock、fqslock、jiffies_force_qs、n_force_qs、n_force_qs_lh、n_force_qs_ngp、gp_start、jiffies_stall、and dynticks_completed)。

每一个字段描述如下:

1. node:这个字段是rcu_node结构数组,根节点位于->node[0]。其长度由 NUM_RCU_NODES C-预处理宏指定,根据 NR_CPUS 和 CONFIG_RCU_FANOUT计算确定。注意,从元素0开始遍历->node 数组可以达到宽度优先搜索rcu_node分级树的效果。

2. level:指向node数组的指针数组。分级树的根节点由->level[0]引用,第二级节点的第一个节点(如果有的话)由->level[1]引用,依此类推。第一个叶子节点由 ->level[NUM_RCU_LVLS-1]引用,level 数组的长度由 NUM_RCU_LVLS指定。->level字段通常与->node 字段一起使用,用于分级扫描rcu_node,例如,扫描所有叶子节点。->level由启动时的rcu_init_one()函数填充。

3. levelcnt:这是一个数组,包含每一层rcu_node 结构的数量,包括引用rcu_node 叶子结构的rcu_data数据结构的数量,因此这个数组的元素比->level数组多。注意 ->levelcnt[0]总是包含值1,表示分级结构的根rcu_node 只有一个。这个数组的值被初始化为NUM_RCU_LVL_0,NUM_RCU_LVL_1,NUM_RCU_LVL_2和 NUM_RCU_LVL_3。->levelcnt 字段用于初始化分级结构的其他部分,并用于调试目的。

4. levelspread:这个字段的每一个元素包含rcu_node分级结构每一层期望的子节点数量。这个数组的值由两个rcu_init_levelspread()函数中的其中一个在运行时计算,具体选择哪一个函数由CONFIG_RCU_FANOUT_EXACT 内核参数确定。

5. rda:该字段的每一个元素包含相应CPU的rcu_data指针。这个数组在启动时由 RCU_DATA_PTR_INIT()宏初始化。

6. signaled:这个字段用于维护force_quiescent_state() 函数所使用的状态。这个字段可以有如下值:

ü RCU_GP_INIT:这个值表示当前优雅周期仍然在初始化过程中。因此force_quiescent_state()不应采取任何动作。当然,在与该函数产生竞争前,优雅周期初始化有三个jiffies的时间避免产生竞争,如果你有大量的CPU,这个竞争才可能会真实的发生。一旦完成优雅周期初始化,这个值要么设置成 RCU_SAVE_DYNTICK (如果配置了 CONFIG_NO_HZ) 要么设置成 RCU_FORCE_QS。

ü RCU_SAVE_DYNTICK:这个值表示force_quiescent_state()应当检查所有还没有为当前优雅周期报告静止状态的CPU的dynticks状态。这些CPU以dyntick-idle 模式表示其静止状态。

ü RCU_FORCE_QS:这个值表示force_quiescent_state() 应当与在线、离线状态一起,重新检查还没有报告静止状态的CPU的dynticks 状态。重新检测dynticks状态可以处理这样一种情况:一个特定CPU可能处于dynticks-idle 状态,但是在检测时,它正处于中断和NMI处理函数中。此时,将会发送一个重新调度IPI给这些CPU。

这个字段由根rcu_node的锁保护。

7. gpnum:当前优雅周期的值,如果当前没有优雅周期,则是上一个优雅周期的值。该字段由根rcu_node结构的锁保护。但是频繁的在没有这个锁保护的情况下进行访问(但是不修改)。

8. completed:上一次已经完成的优雅周期的值。当前没有优雅周期正在处理时,它等于->gpnum。如果当前有优雅周期在处理,则比->gpnum少1。在某些LINUX版本的经典RCU中,这一对字段可能是一个布尔变量。这个字段由根rcu_node 结构的锁进行保护。但是频繁的在没有这个锁保护的情况下访问(但是不修改)。

9. onofflock:防止在优雅周期初始化时,并发的处理上线、离线。但是有一个例外:如果rcu_node分级结构仅仅由一个单一结构组成,那么由单个结构的锁来代替这个锁。

10. fqslock:这个字段用于在force_quiescent_state()中,防止多个任务同时调用此函数强制产生静止状态。

11. jiffies_force_qs:这是一个 以jiffies计算的时间, 当需要调用force_quiescent_state()以强制CPU进入静止状态,或者报告一个静止状态时使用。这个字段由根rcu_node结构的锁保护。但是频繁的在没有这个锁保护的情况下访问(但是不修改)。

12. n_force_qs:调用force_quiescent_state() ,并真正进行强制产生静止状态的次数。如果相应的CPU已经完成静止周期,而导致force_quiescent_state()过早退出,是不统计在这个字段中的。这个字段用于跟踪和调试,以 ->fqslock锁进行保护。

13. n_force_qs_lh:由于->fqslock 被其他CPU获得而导致force_quiescent_state() 过早返回的次数的近似值。这个字段用于跟踪和调试,由于它是近似值,因此没有用任何锁进行保护。

14. n_force_qs_ngp:force_quiescent_state() 成功获得->fqslock 锁,但是随后发现没有正在处理的优雅周期,然后该函数的退出次数。用于调试和跟踪,由->fqslock进行保护。

15. gp_start:记录最近的优雅周期开始时间,以jiffies计数。这用于检测卡顿的CPU,但是仅仅在CONFIG_RCU_CPU_STALL_DETECTOR 内核参数选中时才有效。这个字段由根rcu_node的->lock锁进行保护,但是有时不使用锁访问它(但是不修改它)。

16. jiffies_stall:这个时间值以 jiffies计算,表示当前优雅周期什么时候会变得太长,此时将开始检查CPU卡顿。与->gp_start一样,仅仅在配置了 CONFIG_RCU_CPU_STALL_DETECTOR 内核参数时,这个字段才存在。这个字段由根rcu_node保护,但是有时不使用这个锁而直接访问(但不修改)。

17. dynticks_completed:当force_quiescent_state()对dyntick 进行快照时,这个字段记录->completed 的值。在每一个优雅周期开始时,这个字段被初始化为前一个优雅周期。这个字段用于防止前一个优雅周期的dyntick-idle 静止状态应用到当前优雅周期。同样的,这个字段仅仅在配置了CONFIG_NO_HZ内核参数时才存在。这个字段由根rcu_node的锁进行保护,但有时也在没有锁保护的情况下进行访问(但不修改)。

1.5. 内核参数

以下内核参数将影响RCU:

l NR_CPUS,系统中最大的CPU数量。

l CONFIG_RCU_FANOUT,在rcu_node分级体系中,期望的每一个节点的子节点数量。

l CONFIG_RCU_FANOUT_EXACT,一个布尔值,防止rcu_node分组体系进行平衡操作。

l CONFIG_HOTPLUG_CPU,允许 CPU动态上线、离线。

l CONFIG_NO_HZ,表示支持 dynticks-idle 模式。

l CONFIG_SMP,表示是否多核CPU。

l CONFIG_RCU_CPU_STALL_DETECTOR,表示 RCU 将在优雅周期太长时检查CPU卡顿。

l CONFIG_RCU_TRACE,表示 RCU 将在debugfs中提供跟踪信息。

1 #define MAX_RCU_LVLS 3

2 #define RCU_FANOUT (CONFIG_RCU_FANOUT)

3 #define RCU_FANOUT_SQ (RCU_FANOUT * RCU_FANOUT)

4 #define RCU_FANOUT_CUBE (RCU_FANOUT_SQ * RCU_FANOUT)

5

6 #if NR_CPUS <= RCU_FANOUT

7 # define NUM_RCU_LVLS 1

8 # define NUM_RCU_LVL_0 1

9 # define NUM_RCU_LVL_1 (NR_CPUS)

10 # define NUM_RCU_LVL_2 0

11 # define NUM_RCU_LVL_3 0

12 #elif NR_CPUS <= RCU_FANOUT_SQ

13 # define NUM_RCU_LVLS 2

14 # define NUM_RCU_LVL_0 1

15 # define NUM_RCU_LVL_1 (((NR_CPUS) + RCU_FANOUT – 1) / RCU_FANOUT)

16 # define NUM_RCU_LVL_2 (NR_CPUS)

17 # define NUM_RCU_LVL_3 0

18 #elif NR_CPUS <= RCU_FANOUT_CUBE

19 # define NUM_RCU_LVLS 3

20 # define NUM_RCU_LVL_0 1

21 # define NUM_RCU_LVL_1 (((NR_CPUS) + RCU_FANOUT_SQ – 1) / RCU_FANOUT_SQ)

22 # define NUM_RCU_LVL_2 (((NR_CPUS) + (RCU_FANOUT) – 1) / (RCU_FANOUT))

23 # define NUM_RCU_LVL_3 NR_CPUS

24 #else

25 # error “CONFIG_RCU_FANOUT insufficient for NR_CPUS”

26 #endif /* #if (NR_CPUS) <= RCU_FANOUT */

27

28 #define RCU_SUM (NUM_RCU_LVL_0 + NUM_RCU_LVL_1 + NUM_RCU_LVL_2 + NUM_RCU_LVL_3)

29 #define NUM_RCU_NODES (RCU_SUM – NR_CPUS)

RCU的宏

CONFIG_RCU_FANOUT和 NR_CPUS 参数用于在编译时确定rcu_node分级体系的形态。第1行定义rcu_node分级体系的最大深度,最大为3级。注意增加最大深度需要修改其他地方,如,添加另外一个路径到第6-26行的#if语句中。第2-4行计算fanout,fanout的平方,fanout的立方。

然后将这些值与NR_CPUS 进行比较,以确定rcu_node需要的深度,将其赋给NUM_RCU_LVLS,用于rcu_state结构的数组长度。在根一层,总是只有一个节点,并且总有NUM_CPUS个rcu_data结构属于叶子节点。如果仅仅只比根层多一层,则叶子层的节点数是用RCU_FANOUT除NR_CPUS(向上对齐)。其他层的节点使用类似的方法进行计算,但是使用RCU_FANOUT_SQ代替 RCU_FANOUT。

随后第28行计算所有层的总和,结果是rcu_node结构的数量加上rcu_data的数量。最后,第29行从总和中减去 NR_CPUS (是rcu_data 结构的数量) ,结果是rcu_node结构的数量,将其值保存在NUM_RCU_NODES。这个值用于rcu_state结构的->nodes数组的长度。

2. 外部接口

RCU的外部接口不仅仅包含标准的RCU API,也包含RCU向其他内核模块提供的内部接口。这些接口是rcu_read_lock()、rcu_read_unlock()、 rcu_read_lock_bh()、rcu_read_unlock_bh()、call_rcu() (是对__call_rcu()的封装)、call_rcu_bh() (ditto)、rcu_check_callbacks()、rcu_process_callbacks()(是对 __rcu_process_callbacks()的封装)、rcu_pending() (是对__rcu_pending()的封装)、rcu_needs_cpu()、rcu_cpu_notify()和__rcu_init()。注意synchronize_rcu()和 rcu_barrier()通常用于所有RCU 实现,并且以call_rcu()的形式定义。类似的, rcu_barrier_bh()通常用于所有RCU实现,并且以call_rcu_bh()的形式定义。

2.1. 读端临界区

1 void __rcu_read_lock(void)

2 {

3 preempt_disable();

4 __acquire(RCU);

5 rcu_read_acquire();

6 }

7

8 void __rcu_read_unlock(void)

9 {

10 rcu_read_release();

11 __release(RCU);

12 preempt_enable();

13 }

14

15 void __rcu_read_lock_bh(void)

16 {

17 local_bh_disable();

18 __acquire(RCU_BH);

19 rcu_read_acquire();

20 }

21

22 void __rcu_read_unlock_bh(void)

23 {

24 rcu_read_release();

25 __release(RCU_BH);

26 local_bh_enable();

27 }

RCU读端临界区

上图显示了RCU读端临界区的函数。请读者注意:这些函数与源代码略有差异。第1-6 行显示了__rcu_read_lock(),它开始一个“rcu”读端临界区。第3行禁止抢占,第4行是一个弱的标记,表示开始一个RCU读端临界区,第5行更新lockdep状态。第8-13行显示了__rcu_read_unlock(),它是与__rcu_read_lock()相对的函数。第15-20显示了 __rcu_read_lock_bh(),第22-27行显示了__rcu_read_unlock_bh(),它们与前两个函数类似,但是它们禁止、打开下半部分而不是而不是禁止打开抢占。

2.2. call_rcu()

1 static void

2 __call_rcu(struct rcu_head *head,

3 void (*func)(struct rcu_head *rcu),

4 struct rcu_state *rsp)

5 {

6 unsigned long flags;

7 struct rcu_data *rdp;

8

9 head->func = func;

10 head->next = NULL;

11 smp_mb();

12 local_irq_save(flags);

13 rdp = rsp->rda[smp_processor_id()];

14 rcu_process_gp_end(rsp, rdp);

15 check_for_new_grace_period(rsp, rdp);

16 *rdp->nxttail[RCU_NEXT_TAIL] = head;

17 rdp->nxttail[RCU_NEXT_TAIL] = &head->next;

18 if (ACCESS_ONCE(rsp->completed) ==

19 ACCESS_ONCE(rsp->gpnum)) {

20 unsigned long nestflag;

21 struct rcu_node *rnp_root = rcu_get_root(rsp);

22

23 spin_lock_irqsave(&rnp_root->lock, nestflag);

24 rcu_start_gp(rsp, nestflag);

25 }

26 if (unlikely(++rdp->qlen > qhimark)) {

27 rdp->blimit = LONG_MAX;

28 force_quiescent_state(rsp, 0);

29 } else if ((long)(ACCESS_ONCE(rsp->jiffies_force_qs) –

30 jiffies) < 0 ||

31 (rdp->n_rcu_pending_force_qs –

32 rdp->n_rcu_pending) < 0)

33 force_quiescent_state(rsp, 1);

34 local_irq_restore(flags);

35 }

36

37 void call_rcu(struct rcu_head *head,

38 void (*func)(struct rcu_head *rcu))

39 {

40 __call_rcu(head, func, &rcu_state);

41 }

42

43 void call_rcu_bh(struct rcu_head *head,

44 void (*func)(struct rcu_head *rcu))

45 {

46 __call_rcu(head, func, &rcu_bh_state);

47 }

call_rcu()代码

上图显示了__call_rcu(),call_rcu()和call_rcu_bh()函数的代码。注意 call_rcu()和call_rcu_bh()是对__call_rcu()的简单封装,因此这里并不过多考虑它们。

将注意力转到__call_rcu(),第9-10行初始化指定的rcu_head。这里假设读者知道call_rcu的作用,并且明白rcu_head的含义。第11 行确保更新RCU保护数据结构的操作,在注册回调函数之前完成,这里也假设读者smp_mb的含义,并且明白这里“完成”一词的“真正”含义,这里需要强调一点,所有认为smp_mb会刷新cache,或者认为smp_mb会阻止随后的指令执行,这两种潜意识都是不正确的。第12、34行禁止并重新打开中断,以防止在一个中断处理函数中调用__call_rcu()而产生灾难性的冲突。第13行得到当前CPU的rcu_data数据结构的引用,第14行调用rcu_process_gp_end(),其目的是在当前优雅周期已经结束时,将回调函数提前执行。如果新的优雅周期已经开始,则第15行调用check_for_new_grace_period()记录状态。

第16、17行将新的回调函数加入队列。第18、19行检查是否有一个优雅周期正在处理中,如果没有,则第23行获得根rcu_node结构的锁,并在第24行调用rcu_start_gp() 开始一个新的优雅周期(也释放锁)。

第 26行检查是否有太多的RCU回调在本CPU中等待,如果是这样,第27行递增->blimit,目的是提高处理回调函数的速度。第28行立即调用force_quiescent_state() ,其目的是试图使相应CPU尽快经历一次静止状态。否则,第29-32行检查自优雅周期开始以来(或者自从上一次调用force_quiescent_state()以来),是否已经经过太长时间,如果是这样,第33行调用非紧急的force_quiescent_state(),也是为了使相应的CPU经历一次静止状态。

2.3. rcu_check_callbacks()

1 static int __rcu_pending(struct rcu_state *rsp,

2 struct rcu_data *rdp)

3 {

4 rdp->n_rcu_pending++;

5

6 check_cpu_stall(rsp, rdp);

7 if (rdp->qs_pending)

8 return 1;

9 if (cpu_has_callbacks_ready_to_invoke(rdp))

10 return 1;

11 if (cpu_needs_another_gp(rsp, rdp))

12 return 1;

13 if (ACCESS_ONCE(rsp->completed) != rdp->completed)

14 return 1;

15 if (ACCESS_ONCE(rsp->gpnum) != rdp->gpnum)

16 return 1;

17 if (ACCESS_ONCE(rsp->completed) !=

18 ACCESS_ONCE(rsp->gpnum) &&

19 ((long)(ACCESS_ONCE(rsp->jiffies_force_qs) –

20 jiffies) < 0 ||

21 (rdp->n_rcu_pending_force_qs –

22 rdp->n_rcu_pending) < 0))

23 return 1;

24 return 0;

25 }

26

27 int rcu_pending(int cpu)

28 {

29 return __rcu_pending(&rcu_state,

30 &per_cpu(rcu_data, cpu)) ||

31 __rcu_pending(&rcu_bh_state,

32 &per_cpu(rcu_bh_data, cpu));

33 }

34

35 void rcu_check_callbacks(int cpu, int user)

36 {

37 if (user ||

38 (idle_cpu(cpu) && !in_softirq() &&

39 hardirq_count() <= (1 << HARDIRQ_SHIFT))) {

40 rcu_qsctr_inc(cpu);

41 rcu_bh_qsctr_inc(cpu);

42 } else if (!in_softirq()) {

43 rcu_bh_qsctr_inc(cpu);

44 }

45 raise_softirq(RCU_SOFTIRQ);

46 }

rcu_check_callbacks()代码

上图显示的代码,在每一个CPU的每jiffy中,都会在调度中断处理函数中调用。rcu_pending() 函数(是__rcu_pending()的一个封装函数)被调用,如果它返回非0,那么调用rcu_check_callbacks()。 (注意有想法将 rcu_pending()合并到rcu_check_callbacks())

从__rcu_pending()开始,第4行记录本调用次数,用于确定何时强制产生静止状态。第6行调用check_cpu_stall(),目的是报告在内核中自旋的CPU,那些CPU也许是存在硬件问题。如果配置了CONFIG_RCU_CPU_STALL_DETECTOR,第7-23执行一系列的检查,如果RCU需要当前CPU做一些事情,就返回非0值。第7行检查当前CPU是否还缺少了一次静止状态,则在第9行调用cpu_has_callbacks_ready_to_invoke()检查当前CPU是否有回调需要处理,并准备调用它们。这些回调是在已经结束的优雅周期中产生的,现在准备调用它们了。第11 行调用cpu_needs_another_gp() 检查当前CPU是否有回调需要在另外的RCU优雅周期结束。第13行检查当优前雅周期是否已经结束,第15行检查一个新优雅周期是否已经开始,最后,第17-22行检查是否应当强制其他CPU经历一次静止状态。最后进行如下一些检查: (1) 第17-18行检查是否一个优雅周期正在处理,如果是,则第19-22行检查是否经历了足够的jiffies时间(第 19-20行) 或者调用rcu_pending()次数足够多(第21-22),以确定是否应当调用 force_quiescent_state()。如果这一系列的检查都没有触发,则第 24 行返回0,表示不必调用rcu_check_callbacks()。

第27-33行展示了rcu_pending(),它简单的调用__rcu_pending() 两次,一次是为“rcu”,另一次是为“rcu_bh”。

第35-48行展示了rcu_check_callbacks(),它检查调度中断是否中止了一个扩展静止状态,然后开始RCU软中断处理 (rcu_process_callbacks())。第37-41行为“rcu” 执行这个检查,而第42-43行为“rcu_bh”执行这个检查。

第37-39行检查调度时钟中断是否打断了用户态执行 (第37行),或者打断了idle循环 (第38行的idle_cpu()),并且没有影响中断层次(第38行的剩余部分及整个第39行)。如果这个检查成功,则调度时钟中断来自于扩展静止状态,由于任何rcu的静止状态也是rcu_bh的静止状态,那么第40、41行报告两种静止状态。

类似于“rcu_bh”,第 42 行检查调度时钟中断是否来自于打开软中断的代码区,如果是,则第43行报告“rcu_bh”静止状态。

其他情况下,第45行触发一个RCU 软中断,将导致在将来的某个时刻,本CPU上的 rcu_process_callbacks()将被调用。

2.4. rcu_process_callbacks()

1 static void

2__rcu_process_callbacks(struct rcu_state *rsp,

3struct rcu_data *rdp)

4 {

5 unsigned long flags;

6

7 if((long)(ACCESS_ONCE(rsp->jiffies_force_qs) –

8 jiffies)< 0 ||

9 (rdp->n_rcu_pending_force_qs-

10 rdp->n_rcu_pending) < 0)

11 force_quiescent_state(rsp,1);

12 rcu_process_gp_end(rsp, rdp);

13 rcu_check_quiescent_state(rsp, rdp);

14 if (cpu_needs_another_gp(rsp, rdp)) {

15 spin_lock_irqsave(&rcu_get_root(rsp)->lock,flags);

16 rcu_start_gp(rsp,flags);

17 }

18 rcu_do_batch(rdp);

19 }

20

21static void

22rcu_process_callbacks(struct softirq_action *unused)

23 {

24 smp_mb();

25 __rcu_process_callbacks(&rcu_state,

26 &__get_cpu_var(rcu_data));

27 __rcu_process_callbacks(&rcu_bh_state,

28 &__get_cpu_var(rcu_bh_data));

29 smp_mb();

30 }

rcu_process_callbacks()代码

上图展示了rcu_process_callbacks()的代码,它是对__rcu_process_callbacks()函数的封装。这些函数是调用raise_softirq(RCU_SOFTIRQ)函数的结果。例如,当有某种原因使得RCU核心认为:需要当前CPU做某些事情的时候,就会触发此函数。

第7-10行检查自当前优雅周期启动以来,是否已经经历了一段时间。如果是这样,第11行调用 force_quiescent_state(),其目的是试图使相应的CPU经历一次静止状态。

在任何情况下,第 12行调用 rcu_process_gp_end(),它检查其他CPU是否结束了本CPU关注的优雅周期。如果是,标记优雅周期结束并且加快调用本CPU的RCU回调。第13行调用rcu_check_quiescent_state(),该函数检查其他CPU是否启动了一个新的优雅周期,并检查当前CPU是否已经为这个优雅周期经历了一次静止状态。如果是这样,就更新相应的状态。第14行检查是否没有正在处理的优雅周期,并且检查是否有另外的优雅周期所需要的回调函数,如果是这样,第15行获得根rcu_node的锁,第17行调用rcu_start_gp(),开始一个新的优雅周期(并且也释放根rcu_node 的锁)。其他情况下,第18行调用 rcu_do_batch(),它调用本CPU的回调,这些回调对应的优雅周期已经完成。

第21-30行是 rcu_process_callbacks(),也是一个对__rcu_process_callbacks()的封装函数。第 24行执行一个内存屏障,以确保前面的RCU读端临界区在随后的RCU处理过程之前完成。第25-26行和第27-28行分别为“rcu”和“rcu_bh”调用__rcu_process_callbacks(),最后,第29行执行一个内存屏障,以确保任何 __rcu_process_callbacks()执行的操作,看起来都早于随后的RCU读端临界区。

2.5. rcu_needs_cpu() 和 rcu_cpu_notify()

1 intrcu_needs_cpu(int cpu)

2 {

3 return per_cpu(rcu_data, cpu).nxtlist ||

4 per_cpu(rcu_bh_data, cpu).nxtlist;

5 }

6

7static int __cpuinit

8rcu_cpu_notify(struct notifier_block *self,

9unsigned long action, void *hcpu)

10 {

11 long cpu = (long)hcpu;

12

13 switch (action) {

14 caseCPU_UP_PREPARE:

15 caseCPU_UP_PREPARE_FROZEN:

16 rcu_online_cpu(cpu);

17 break;

18 caseCPU_DEAD:

19 caseCPU_DEAD_FROZEN:

20 caseCPU_UP_CANCELED:

21 caseCPU_UP_CANCELED_FROZEN:

22 rcu_offline_cpu(cpu);

23 break;

24 default:

25 break;

26 }

27 return NOTIFY_OK;

28 }

rcu_needs_cpu()和rcu_cpu_notify代码

上图展示了rcu_needs_cpu()和rcu_cpu_notify()的代码,它们被LINUX内核调用,以检查 dynticks-idle 模式转换并处理CPU热插拨。

第1-5行显示了rcu_needs_cpu()的代码,它简单的检查特定的CPU是否有“rcu”(第3行)或者“rcu_bh”(第4行)回调。

第7-28显示了rcu_cpu_notify(),它是一个非常典型的、使用switch语句的CPU热插拨通知函数。如果特定的CPU上线,则在第16行调用rcu_online_cpu(),如果特定CPU离线,则在第22行调用rcu_offline_cpu。请特别注意:CPU热插拨操作不是原子的,因此可能穿越多个优雅周期。因此必须小心的处理CPU热插拨事件。

….

(由于篇幅关系,后面的无法贴下,关心原文的,请直接联系谢宝友老师)

from:https://cloud.tencent.com/developer/article/1518203