本文是为那些希望非常深层次的理解RCU的骨灰级黑客准备的。这些黑客应当首先阅读《深入理解RCU》系列文章的第1~6篇。骨灰级代码狂也可能有兴趣直接看看本文。
本文分别描述如下内容:
1、数据结构和内核参数
2、外部函数接口
3、初始化过程
4、CPU热插拨接口
5、一些杂项函数
6、优雅周期检测机制
7、dynticks-idle接口
8、处理离线及dynticks-idle CPU的函数
9、报告CPU卡顿的函数
10、报告可能的设计缺陷和问题。
1. 数据结构及内核参数
全面的理解分级RCU数据结构,对于理解其算法而言是十分重要的。具体包含如下数据结构:
1、用来跟踪每一个CPU的dyntick-idle 状态的数据结构
2、per-node数据结构的每一个字段
3、每CPU rcu_data 结构
4、全局rcu_state 数据结构
5、控制分级RCU的内核参数。
1.1. 跟踪 dyn-tick 状态的数据结构
每CPUrcu_dynticks数据结构使用下面的字段跟踪dynticks 状态:
1. dynticks_nesting:这个整型值是嵌套计数,表示在相应的CPU中,应当被监控的RCU读端临界区的数量。如果CPU处于dynticks-idle模式,那么就不应当存在RCU读端临界区。此时这个值是中断嵌套级别,否则它比irq中断嵌套级别更大。
2. dynticks:如果相应的CPU处于dynticks-idle模式,并且没有中断处理函数正在该CPU上运行,则这个计数值是偶数,否则是奇数。换句话说,如果计数值是奇数,那么相应的CPU可能处于RCU读端临界区中。
3. dynticks_nmi:如果相应的CPU 处于NMI处理函数中,则这个整型计数器的值是奇数。否则,该计数器是偶数。
rcu和rcu_bh共享这个值。
1.2. 分级RCU实现中的节点
rcu_node是被放到 rcu_state结构内的。每一个rcu_node有下面的字段:
1. lock:这个spinlock保护这个结构中的不是常量的字段。这个锁在软中断上下文中获取,因此必须禁止中断。
根rcu_node的lock字段还有另外的作用:
ü 串行化CPU卡顿检测,这样仅仅只会有一个CPU报告CPU卡顿事件。这在上千个CPU的系统中是很重要的!
ü 串行化启动一个新的优雅周期,这样多个CPU不会同时开始相冲突的优雅周期。
ü 在代码需要限制在单个优雅周期中运行时,防止开始一个新的优雅周期。
ü 将状态机中强制产生静止状态的行为串行化,这样可以将重新调度IPI降低到适当的数目。
2. qsmask:这个位图掩码跟踪哪些CPU(rcu_node 叶子节点)或者CPU组(rcu_node 非叶子节点)仍然需要经历一个静止状态,以结束当前优雅周期。
3. qsmaskinit:这个位图掩码跟踪哪些CPU (rcu_node 叶子节点)或者CPU组(rcu_node 非叶子节点)仍然需要经历一个静止状态,以结束后续的优雅周期。CPU热插拔代码维护qsmaskinit 字段,在开始每一个优雅周期时,将它们复制到相应的qsmask字段。这个复制操作,是优雅周期初始化过程需要与CPU热插拨代码互斥的原因之一。
4. grpmask:这个位图掩码中的位,与这个rcu_node结构在父rcu_node结构中qsmask和qsmaskinit中的位置是一致的。使用这个字段简化了静止状态处理,这是 Manfred Spraul建议的。
5. grplo:这个字段表示这个rcu_node包含的编号最小的CPU或者组。
6. grphi:这个字段表示这个rcu_node包含的编号最大的CPU或者组。
7. grpnum:这个字段包含与这个rcu_node对应的父 rcu_node结构的qsmask和qsmaskinit字段的位编号。换句话说,给定一个rcu_node结构指针rnp,总有1UL<< rnp->grpnum == rnp->grpmask。grpnum字段用于调试目的。
8. level:对根rcu_node结构来说,这个字段是0。根的子结点是1,依此类推。
9. parent:这个字段指向父rcu_node结构,对根结点来说,其值为NULL。
1.3. 每CPU数据
rcu_data 数据结构包含RCU的每CPU状态。它包含管理优雅周期和静止状态(completed, gpnum, passed_quiesc_completed, passed_quiesc,qs_pending, beenonline, mynode, 和 grpmask)的控制变量。rcu_data数据结构也包含关于RCU回调的控制变量 (nxtlist, nxttail, qlen,和 blimit)。打开dynticks的内核在rcu_data数据结构中也有相关的控制变量(dynticks, dynticks_snap, 和dynticks_nmi_snap)。rcu_data数据结构包含用于跟踪的事件计数器 (dynticks_fqs, offline_fqs, resched_ipi)。最后,还有一对字段,对调用rcu_pending()进行计数,以确定何时强制进行静止状态(n_rcu_pending),以及一个cpu字段标识哪一个CPU与该rcu_data结构对应。
每一个字段描述如下:
1. completed:这个字段包含本CPU已经完成的优雅周期编号。
2. gpnum:这个字段包含本CPU最近启动的优雅周期编号。
3. passed_quiesc_completed:本字段包含本CPU最近经过静止状态时,已经完成的优雅周期编号。最近完成的编号,要从该CPU经历静止状态的角度来看:如果CPU没有观察到优雅周期42完成,它将记录其值为41。这是对的,因为优雅周期能够完成的唯一方法就是这个CPU已经经历过一次静止状态。这个字段被初始化为一个虚构的次数,以避免在boot和CPU上线时产生竞争条件。
4. passed_quiesc:这个字段表示自从存储在passed_quiesc_completed的优雅周期完成以来,本CPU是否经历过一次静止状态。
5. qs_pending:这个字段表示本CPU已经注意到RCU核心机制正在等待它经历一次静止状态。当CPU检测到一个新的优雅周期,或者一个CPU上线时,将这个字段设置为1。
6. beenonline:这个字段初始化为0,当相应的CPU上线时被设置为1,这用于避免处理从没有上线的CPU,当NR_CPUS大大超过实际的CPU数量时,这是有用的。
7. mynode:这个字段指向处理相应CPU的rcu_node叶子节点。
8. grpmask:这个掩码字段只有一个位,表示mynode->qsmask 中哪一位与相应的CPU对应。
9. nxtlist:这个字段指向本CPU最老的RCU回调(rcu_head 结构),如果本CPU最近没有这样的回调,就设置为NULL。其他的回调可能通过它们的next指针链接在一起。
10. nxttail:这是一个二次间接指针数组,每个元素指向nxtlist 回调链表尾部。如果nxtlist是空的,那么所有nxttail 指针直接指向 nxtlist 字段。每一个nxttail数组节点有如下意义:
ü RCU_DONE_TAIL=0:这个元素是CPU在它经历优雅周期时最后调用的回调函数的->next 字段。如果没有这样的回调函数,则它指向nxtlist 字段。
ü RCU_WAIT_TAIL=1:等待当前优雅周期结束时,最后一个回调函数的->next指针,如果没有这样的回调函数,就等于RCU_DONE_TAIL 元素。
ü RCU_NEXT_READY_TAIL=2:等待下一个优雅周期时的回调函数的next字段,如果没有这样的回调函数,则等于RCU_WAIT_TAIL 元素。
ü RCU_NEXT_TAIL=3:链表中的最后一个回调函数的next指针,如果链表为空,就是nxtlist字段。
11. qlen:在nxtlist链表中排队的回调函数数量。
12. blimit:在某一个时刻可以调用的回调函数最大值。在高负载情况下,这个限制增强了系统响应性能。
13. dynticks:与cpu对应的rcu_dynticks结构。
14. dynticks_snap:dynticks->dynticks曾经经历过的值,用于中断处理函数中检查CPU何时经历过一次dynticks idle状态。
15. dynticks_nmi_snap:dynticks->dynticks_nmi曾经经过的值,在CPU NMI处理函数中检查CPU何时经历过一次dynticks idle状态。
16. dynticks_fqs:其他CPU由于dynticksidle而标记一次静止状态的次数。
17. offline_fqs:其他CPU由于离线而标记一次静止状态的次数。
18. resched_ipi:向相应CPU发送的重新调度IPI次数。这些IPI被发向那些没有及时报告自己经历过静止状态的CPU,但既不向离线CPU,也不向处于dynticksidle 状态的CPU发送这样的IPI。
19. n_rcu_pending:调用rcu_pending()的次数,在一个非dynticks-idle 的CPU上,每一个jiffy将调用一次这个函数。
20. n_rcu_pending_force_qs:n_rcu_pending上限。如果 n_rcu_pending达到此值,表示当前优雅周期延迟太久,则调用force_quiescent_state()。本字段在2.6.32版本中已经取消,但是不影响我们理解RCU代码。
1.4. RCU全局状态
rcu_state 结构包含每一个RCU实例(rcu 和 rcu_bh)的全局状态。包括与分级rcu_node相关的字段,包括结点数组本身,分级数组包含指向分级级别的指针,levelcnt数组包含每一级结点计数,levelspread数组包含每一个分级的子结点数量。rda 数组是每一个CPU的rcu_data结构指针。rcu_state也包含一定数量的,与当前优雅周期相关的字段,以及与其他机制交互的字段(signaled、gpnum、completed、onofflock、fqslock、jiffies_force_qs、n_force_qs、n_force_qs_lh、n_force_qs_ngp、gp_start、jiffies_stall、and dynticks_completed)。
每一个字段描述如下:
1. node:这个字段是rcu_node结构数组,根节点位于->node[0]。其长度由 NUM_RCU_NODES C-预处理宏指定,根据 NR_CPUS 和 CONFIG_RCU_FANOUT计算确定。注意,从元素0开始遍历->node 数组可以达到宽度优先搜索rcu_node分级树的效果。
2. level:指向node数组的指针数组。分级树的根节点由->level[0]引用,第二级节点的第一个节点(如果有的话)由->level[1]引用,依此类推。第一个叶子节点由 ->level[NUM_RCU_LVLS-1]引用,level 数组的长度由 NUM_RCU_LVLS指定。->level字段通常与->node 字段一起使用,用于分级扫描rcu_node,例如,扫描所有叶子节点。->level由启动时的rcu_init_one()函数填充。
3. levelcnt:这是一个数组,包含每一层rcu_node 结构的数量,包括引用rcu_node 叶子结构的rcu_data数据结构的数量,因此这个数组的元素比->level数组多。注意 ->levelcnt[0]总是包含值1,表示分级结构的根rcu_node 只有一个。这个数组的值被初始化为NUM_RCU_LVL_0,NUM_RCU_LVL_1,NUM_RCU_LVL_2和 NUM_RCU_LVL_3。->levelcnt 字段用于初始化分级结构的其他部分,并用于调试目的。
4. levelspread:这个字段的每一个元素包含rcu_node分级结构每一层期望的子节点数量。这个数组的值由两个rcu_init_levelspread()函数中的其中一个在运行时计算,具体选择哪一个函数由CONFIG_RCU_FANOUT_EXACT 内核参数确定。
5. rda:该字段的每一个元素包含相应CPU的rcu_data指针。这个数组在启动时由 RCU_DATA_PTR_INIT()宏初始化。
6. signaled:这个字段用于维护force_quiescent_state() 函数所使用的状态。这个字段可以有如下值:
ü RCU_GP_INIT:这个值表示当前优雅周期仍然在初始化过程中。因此force_quiescent_state()不应采取任何动作。当然,在与该函数产生竞争前,优雅周期初始化有三个jiffies的时间避免产生竞争,如果你有大量的CPU,这个竞争才可能会真实的发生。一旦完成优雅周期初始化,这个值要么设置成 RCU_SAVE_DYNTICK (如果配置了 CONFIG_NO_HZ) 要么设置成 RCU_FORCE_QS。
ü RCU_SAVE_DYNTICK:这个值表示force_quiescent_state()应当检查所有还没有为当前优雅周期报告静止状态的CPU的dynticks状态。这些CPU以dyntick-idle 模式表示其静止状态。
ü RCU_FORCE_QS:这个值表示force_quiescent_state() 应当与在线、离线状态一起,重新检查还没有报告静止状态的CPU的dynticks 状态。重新检测dynticks状态可以处理这样一种情况:一个特定CPU可能处于dynticks-idle 状态,但是在检测时,它正处于中断和NMI处理函数中。此时,将会发送一个重新调度IPI给这些CPU。
这个字段由根rcu_node的锁保护。
7. gpnum:当前优雅周期的值,如果当前没有优雅周期,则是上一个优雅周期的值。该字段由根rcu_node结构的锁保护。但是频繁的在没有这个锁保护的情况下进行访问(但是不修改)。
8. completed:上一次已经完成的优雅周期的值。当前没有优雅周期正在处理时,它等于->gpnum。如果当前有优雅周期在处理,则比->gpnum少1。在某些LINUX版本的经典RCU中,这一对字段可能是一个布尔变量。这个字段由根rcu_node 结构的锁进行保护。但是频繁的在没有这个锁保护的情况下访问(但是不修改)。
9. onofflock:防止在优雅周期初始化时,并发的处理上线、离线。但是有一个例外:如果rcu_node分级结构仅仅由一个单一结构组成,那么由单个结构的锁来代替这个锁。
10. fqslock:这个字段用于在force_quiescent_state()中,防止多个任务同时调用此函数强制产生静止状态。
11. jiffies_force_qs:这是一个 以jiffies计算的时间, 当需要调用force_quiescent_state()以强制CPU进入静止状态,或者报告一个静止状态时使用。这个字段由根rcu_node结构的锁保护。但是频繁的在没有这个锁保护的情况下访问(但是不修改)。
12. n_force_qs:调用force_quiescent_state() ,并真正进行强制产生静止状态的次数。如果相应的CPU已经完成静止周期,而导致force_quiescent_state()过早退出,是不统计在这个字段中的。这个字段用于跟踪和调试,以 ->fqslock锁进行保护。
13. n_force_qs_lh:由于->fqslock 被其他CPU获得而导致force_quiescent_state() 过早返回的次数的近似值。这个字段用于跟踪和调试,由于它是近似值,因此没有用任何锁进行保护。
14. n_force_qs_ngp:force_quiescent_state() 成功获得->fqslock 锁,但是随后发现没有正在处理的优雅周期,然后该函数的退出次数。用于调试和跟踪,由->fqslock进行保护。
15. gp_start:记录最近的优雅周期开始时间,以jiffies计数。这用于检测卡顿的CPU,但是仅仅在CONFIG_RCU_CPU_STALL_DETECTOR 内核参数选中时才有效。这个字段由根rcu_node的->lock锁进行保护,但是有时不使用锁访问它(但是不修改它)。
16. jiffies_stall:这个时间值以 jiffies计算,表示当前优雅周期什么时候会变得太长,此时将开始检查CPU卡顿。与->gp_start一样,仅仅在配置了 CONFIG_RCU_CPU_STALL_DETECTOR 内核参数时,这个字段才存在。这个字段由根rcu_node保护,但是有时不使用这个锁而直接访问(但不修改)。
17. dynticks_completed:当force_quiescent_state()对dyntick 进行快照时,这个字段记录->completed 的值。在每一个优雅周期开始时,这个字段被初始化为前一个优雅周期。这个字段用于防止前一个优雅周期的dyntick-idle 静止状态应用到当前优雅周期。同样的,这个字段仅仅在配置了CONFIG_NO_HZ内核参数时才存在。这个字段由根rcu_node的锁进行保护,但有时也在没有锁保护的情况下进行访问(但不修改)。
1.5. 内核参数
以下内核参数将影响RCU:
l NR_CPUS,系统中最大的CPU数量。
l CONFIG_RCU_FANOUT,在rcu_node分级体系中,期望的每一个节点的子节点数量。
l CONFIG_RCU_FANOUT_EXACT,一个布尔值,防止rcu_node分组体系进行平衡操作。
l CONFIG_HOTPLUG_CPU,允许 CPU动态上线、离线。
l CONFIG_NO_HZ,表示支持 dynticks-idle 模式。
l CONFIG_SMP,表示是否多核CPU。
l CONFIG_RCU_CPU_STALL_DETECTOR,表示 RCU 将在优雅周期太长时检查CPU卡顿。
l CONFIG_RCU_TRACE,表示 RCU 将在debugfs中提供跟踪信息。
1 #define MAX_RCU_LVLS 3
2 #define RCU_FANOUT (CONFIG_RCU_FANOUT)
3 #define RCU_FANOUT_SQ (RCU_FANOUT * RCU_FANOUT)
4 #define RCU_FANOUT_CUBE (RCU_FANOUT_SQ * RCU_FANOUT)
5
6 #if NR_CPUS <= RCU_FANOUT
7 # define NUM_RCU_LVLS 1
8 # define NUM_RCU_LVL_0 1
9 # define NUM_RCU_LVL_1 (NR_CPUS)
10 # define NUM_RCU_LVL_2 0
11 # define NUM_RCU_LVL_3 0
12 #elif NR_CPUS <= RCU_FANOUT_SQ
13 # define NUM_RCU_LVLS 2
14 # define NUM_RCU_LVL_0 1
15 # define NUM_RCU_LVL_1 (((NR_CPUS) + RCU_FANOUT – 1) / RCU_FANOUT)
16 # define NUM_RCU_LVL_2 (NR_CPUS)
17 # define NUM_RCU_LVL_3 0
18 #elif NR_CPUS <= RCU_FANOUT_CUBE
19 # define NUM_RCU_LVLS 3
20 # define NUM_RCU_LVL_0 1
21 # define NUM_RCU_LVL_1 (((NR_CPUS) + RCU_FANOUT_SQ – 1) / RCU_FANOUT_SQ)
22 # define NUM_RCU_LVL_2 (((NR_CPUS) + (RCU_FANOUT) – 1) / (RCU_FANOUT))
23 # define NUM_RCU_LVL_3 NR_CPUS
24 #else
25 # error “CONFIG_RCU_FANOUT insufficient for NR_CPUS”
26 #endif /* #if (NR_CPUS) <= RCU_FANOUT */
27
28 #define RCU_SUM (NUM_RCU_LVL_0 + NUM_RCU_LVL_1 + NUM_RCU_LVL_2 + NUM_RCU_LVL_3)
29 #define NUM_RCU_NODES (RCU_SUM – NR_CPUS)
RCU的宏 |
---|
CONFIG_RCU_FANOUT和 NR_CPUS 参数用于在编译时确定rcu_node分级体系的形态。第1行定义rcu_node分级体系的最大深度,最大为3级。注意增加最大深度需要修改其他地方,如,添加另外一个路径到第6-26行的#if语句中。第2-4行计算fanout,fanout的平方,fanout的立方。
然后将这些值与NR_CPUS 进行比较,以确定rcu_node需要的深度,将其赋给NUM_RCU_LVLS,用于rcu_state结构的数组长度。在根一层,总是只有一个节点,并且总有NUM_CPUS个rcu_data结构属于叶子节点。如果仅仅只比根层多一层,则叶子层的节点数是用RCU_FANOUT除NR_CPUS(向上对齐)。其他层的节点使用类似的方法进行计算,但是使用RCU_FANOUT_SQ代替 RCU_FANOUT。
随后第28行计算所有层的总和,结果是rcu_node结构的数量加上rcu_data的数量。最后,第29行从总和中减去 NR_CPUS (是rcu_data 结构的数量) ,结果是rcu_node结构的数量,将其值保存在NUM_RCU_NODES。这个值用于rcu_state结构的->nodes数组的长度。
2. 外部接口
RCU的外部接口不仅仅包含标准的RCU API,也包含RCU向其他内核模块提供的内部接口。这些接口是rcu_read_lock()、rcu_read_unlock()、 rcu_read_lock_bh()、rcu_read_unlock_bh()、call_rcu() (是对__call_rcu()的封装)、call_rcu_bh() (ditto)、rcu_check_callbacks()、rcu_process_callbacks()(是对 __rcu_process_callbacks()的封装)、rcu_pending() (是对__rcu_pending()的封装)、rcu_needs_cpu()、rcu_cpu_notify()和__rcu_init()。注意synchronize_rcu()和 rcu_barrier()通常用于所有RCU 实现,并且以call_rcu()的形式定义。类似的, rcu_barrier_bh()通常用于所有RCU实现,并且以call_rcu_bh()的形式定义。
2.1. 读端临界区
1 void __rcu_read_lock(void)
2 {
3 preempt_disable();
4 __acquire(RCU);
5 rcu_read_acquire();
6 }
7
8 void __rcu_read_unlock(void)
9 {
10 rcu_read_release();
11 __release(RCU);
12 preempt_enable();
13 }
14
15 void __rcu_read_lock_bh(void)
16 {
17 local_bh_disable();
18 __acquire(RCU_BH);
19 rcu_read_acquire();
20 }
21
22 void __rcu_read_unlock_bh(void)
23 {
24 rcu_read_release();
25 __release(RCU_BH);
26 local_bh_enable();
27 }
RCU读端临界区
上图显示了RCU读端临界区的函数。请读者注意:这些函数与源代码略有差异。第1-6 行显示了__rcu_read_lock(),它开始一个“rcu”读端临界区。第3行禁止抢占,第4行是一个弱的标记,表示开始一个RCU读端临界区,第5行更新lockdep状态。第8-13行显示了__rcu_read_unlock(),它是与__rcu_read_lock()相对的函数。第15-20显示了 __rcu_read_lock_bh(),第22-27行显示了__rcu_read_unlock_bh(),它们与前两个函数类似,但是它们禁止、打开下半部分而不是而不是禁止打开抢占。
2.2. call_rcu()
1 static void
2 __call_rcu(struct rcu_head *head,
3 void (*func)(struct rcu_head *rcu),
4 struct rcu_state *rsp)
5 {
6 unsigned long flags;
7 struct rcu_data *rdp;
8
9 head->func = func;
10 head->next = NULL;
11 smp_mb();
12 local_irq_save(flags);
13 rdp = rsp->rda[smp_processor_id()];
14 rcu_process_gp_end(rsp, rdp);
15 check_for_new_grace_period(rsp, rdp);
16 *rdp->nxttail[RCU_NEXT_TAIL] = head;
17 rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
18 if (ACCESS_ONCE(rsp->completed) ==
19 ACCESS_ONCE(rsp->gpnum)) {
20 unsigned long nestflag;
21 struct rcu_node *rnp_root = rcu_get_root(rsp);
22
23 spin_lock_irqsave(&rnp_root->lock, nestflag);
24 rcu_start_gp(rsp, nestflag);
25 }
26 if (unlikely(++rdp->qlen > qhimark)) {
27 rdp->blimit = LONG_MAX;
28 force_quiescent_state(rsp, 0);
29 } else if ((long)(ACCESS_ONCE(rsp->jiffies_force_qs) –
30 jiffies) < 0 ||
31 (rdp->n_rcu_pending_force_qs –
32 rdp->n_rcu_pending) < 0)
33 force_quiescent_state(rsp, 1);
34 local_irq_restore(flags);
35 }
36
37 void call_rcu(struct rcu_head *head,
38 void (*func)(struct rcu_head *rcu))
39 {
40 __call_rcu(head, func, &rcu_state);
41 }
42
43 void call_rcu_bh(struct rcu_head *head,
44 void (*func)(struct rcu_head *rcu))
45 {
46 __call_rcu(head, func, &rcu_bh_state);
47 }
call_rcu()代码
上图显示了__call_rcu(),call_rcu()和call_rcu_bh()函数的代码。注意 call_rcu()和call_rcu_bh()是对__call_rcu()的简单封装,因此这里并不过多考虑它们。
将注意力转到__call_rcu(),第9-10行初始化指定的rcu_head。这里假设读者知道call_rcu的作用,并且明白rcu_head的含义。第11 行确保更新RCU保护数据结构的操作,在注册回调函数之前完成,这里也假设读者smp_mb的含义,并且明白这里“完成”一词的“真正”含义,这里需要强调一点,所有认为smp_mb会刷新cache,或者认为smp_mb会阻止随后的指令执行,这两种潜意识都是不正确的。第12、34行禁止并重新打开中断,以防止在一个中断处理函数中调用__call_rcu()而产生灾难性的冲突。第13行得到当前CPU的rcu_data数据结构的引用,第14行调用rcu_process_gp_end(),其目的是在当前优雅周期已经结束时,将回调函数提前执行。如果新的优雅周期已经开始,则第15行调用check_for_new_grace_period()记录状态。
第16、17行将新的回调函数加入队列。第18、19行检查是否有一个优雅周期正在处理中,如果没有,则第23行获得根rcu_node结构的锁,并在第24行调用rcu_start_gp() 开始一个新的优雅周期(也释放锁)。
第 26行检查是否有太多的RCU回调在本CPU中等待,如果是这样,第27行递增->blimit,目的是提高处理回调函数的速度。第28行立即调用force_quiescent_state() ,其目的是试图使相应CPU尽快经历一次静止状态。否则,第29-32行检查自优雅周期开始以来(或者自从上一次调用force_quiescent_state()以来),是否已经经过太长时间,如果是这样,第33行调用非紧急的force_quiescent_state(),也是为了使相应的CPU经历一次静止状态。
2.3. rcu_check_callbacks()
1 static int __rcu_pending(struct rcu_state *rsp,
2 struct rcu_data *rdp)
3 {
4 rdp->n_rcu_pending++;
5
6 check_cpu_stall(rsp, rdp);
7 if (rdp->qs_pending)
8 return 1;
9 if (cpu_has_callbacks_ready_to_invoke(rdp))
10 return 1;
11 if (cpu_needs_another_gp(rsp, rdp))
12 return 1;
13 if (ACCESS_ONCE(rsp->completed) != rdp->completed)
14 return 1;
15 if (ACCESS_ONCE(rsp->gpnum) != rdp->gpnum)
16 return 1;
17 if (ACCESS_ONCE(rsp->completed) !=
18 ACCESS_ONCE(rsp->gpnum) &&
19 ((long)(ACCESS_ONCE(rsp->jiffies_force_qs) –
20 jiffies) < 0 ||
21 (rdp->n_rcu_pending_force_qs –
22 rdp->n_rcu_pending) < 0))
23 return 1;
24 return 0;
25 }
26
27 int rcu_pending(int cpu)
28 {
29 return __rcu_pending(&rcu_state,
30 &per_cpu(rcu_data, cpu)) ||
31 __rcu_pending(&rcu_bh_state,
32 &per_cpu(rcu_bh_data, cpu));
33 }
34
35 void rcu_check_callbacks(int cpu, int user)
36 {
37 if (user ||
38 (idle_cpu(cpu) && !in_softirq() &&
39 hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
40 rcu_qsctr_inc(cpu);
41 rcu_bh_qsctr_inc(cpu);
42 } else if (!in_softirq()) {
43 rcu_bh_qsctr_inc(cpu);
44 }
45 raise_softirq(RCU_SOFTIRQ);
46 }
rcu_check_callbacks()代码
上图显示的代码,在每一个CPU的每jiffy中,都会在调度中断处理函数中调用。rcu_pending() 函数(是__rcu_pending()的一个封装函数)被调用,如果它返回非0,那么调用rcu_check_callbacks()。 (注意有想法将 rcu_pending()合并到rcu_check_callbacks())
从__rcu_pending()开始,第4行记录本调用次数,用于确定何时强制产生静止状态。第6行调用check_cpu_stall(),目的是报告在内核中自旋的CPU,那些CPU也许是存在硬件问题。如果配置了CONFIG_RCU_CPU_STALL_DETECTOR,第7-23执行一系列的检查,如果RCU需要当前CPU做一些事情,就返回非0值。第7行检查当前CPU是否还缺少了一次静止状态,则在第9行调用cpu_has_callbacks_ready_to_invoke()检查当前CPU是否有回调需要处理,并准备调用它们。这些回调是在已经结束的优雅周期中产生的,现在准备调用它们了。第11 行调用cpu_needs_another_gp() 检查当前CPU是否有回调需要在另外的RCU优雅周期结束。第13行检查当优前雅周期是否已经结束,第15行检查一个新优雅周期是否已经开始,最后,第17-22行检查是否应当强制其他CPU经历一次静止状态。最后进行如下一些检查: (1) 第17-18行检查是否一个优雅周期正在处理,如果是,则第19-22行检查是否经历了足够的jiffies时间(第 19-20行) 或者调用rcu_pending()次数足够多(第21-22),以确定是否应当调用 force_quiescent_state()。如果这一系列的检查都没有触发,则第 24 行返回0,表示不必调用rcu_check_callbacks()。
第27-33行展示了rcu_pending(),它简单的调用__rcu_pending() 两次,一次是为“rcu”,另一次是为“rcu_bh”。
第35-48行展示了rcu_check_callbacks(),它检查调度中断是否中止了一个扩展静止状态,然后开始RCU软中断处理 (rcu_process_callbacks())。第37-41行为“rcu” 执行这个检查,而第42-43行为“rcu_bh”执行这个检查。
第37-39行检查调度时钟中断是否打断了用户态执行 (第37行),或者打断了idle循环 (第38行的idle_cpu()),并且没有影响中断层次(第38行的剩余部分及整个第39行)。如果这个检查成功,则调度时钟中断来自于扩展静止状态,由于任何rcu的静止状态也是rcu_bh的静止状态,那么第40、41行报告两种静止状态。
类似于“rcu_bh”,第 42 行检查调度时钟中断是否来自于打开软中断的代码区,如果是,则第43行报告“rcu_bh”静止状态。
其他情况下,第45行触发一个RCU 软中断,将导致在将来的某个时刻,本CPU上的 rcu_process_callbacks()将被调用。
2.4. rcu_process_callbacks()
1 static void
2__rcu_process_callbacks(struct rcu_state *rsp,
3struct rcu_data *rdp)
4 {
5 unsigned long flags;
6
7 if((long)(ACCESS_ONCE(rsp->jiffies_force_qs) –
8 jiffies)< 0 ||
9 (rdp->n_rcu_pending_force_qs-
10 rdp->n_rcu_pending) < 0)
11 force_quiescent_state(rsp,1);
12 rcu_process_gp_end(rsp, rdp);
13 rcu_check_quiescent_state(rsp, rdp);
14 if (cpu_needs_another_gp(rsp, rdp)) {
15 spin_lock_irqsave(&rcu_get_root(rsp)->lock,flags);
16 rcu_start_gp(rsp,flags);
17 }
18 rcu_do_batch(rdp);
19 }
20
21static void
22rcu_process_callbacks(struct softirq_action *unused)
23 {
24 smp_mb();
25 __rcu_process_callbacks(&rcu_state,
26 &__get_cpu_var(rcu_data));
27 __rcu_process_callbacks(&rcu_bh_state,
28 &__get_cpu_var(rcu_bh_data));
29 smp_mb();
30 }
rcu_process_callbacks()代码
上图展示了rcu_process_callbacks()的代码,它是对__rcu_process_callbacks()函数的封装。这些函数是调用raise_softirq(RCU_SOFTIRQ)函数的结果。例如,当有某种原因使得RCU核心认为:需要当前CPU做某些事情的时候,就会触发此函数。
第7-10行检查自当前优雅周期启动以来,是否已经经历了一段时间。如果是这样,第11行调用 force_quiescent_state(),其目的是试图使相应的CPU经历一次静止状态。
在任何情况下,第 12行调用 rcu_process_gp_end(),它检查其他CPU是否结束了本CPU关注的优雅周期。如果是,标记优雅周期结束并且加快调用本CPU的RCU回调。第13行调用rcu_check_quiescent_state(),该函数检查其他CPU是否启动了一个新的优雅周期,并检查当前CPU是否已经为这个优雅周期经历了一次静止状态。如果是这样,就更新相应的状态。第14行检查是否没有正在处理的优雅周期,并且检查是否有另外的优雅周期所需要的回调函数,如果是这样,第15行获得根rcu_node的锁,第17行调用rcu_start_gp(),开始一个新的优雅周期(并且也释放根rcu_node 的锁)。其他情况下,第18行调用 rcu_do_batch(),它调用本CPU的回调,这些回调对应的优雅周期已经完成。
第21-30行是 rcu_process_callbacks(),也是一个对__rcu_process_callbacks()的封装函数。第 24行执行一个内存屏障,以确保前面的RCU读端临界区在随后的RCU处理过程之前完成。第25-26行和第27-28行分别为“rcu”和“rcu_bh”调用__rcu_process_callbacks(),最后,第29行执行一个内存屏障,以确保任何 __rcu_process_callbacks()执行的操作,看起来都早于随后的RCU读端临界区。
2.5. rcu_needs_cpu() 和 rcu_cpu_notify()
1 intrcu_needs_cpu(int cpu)
2 {
3 return per_cpu(rcu_data, cpu).nxtlist ||
4 per_cpu(rcu_bh_data, cpu).nxtlist;
5 }
6
7static int __cpuinit
8rcu_cpu_notify(struct notifier_block *self,
9unsigned long action, void *hcpu)
10 {
11 long cpu = (long)hcpu;
12
13 switch (action) {
14 caseCPU_UP_PREPARE:
15 caseCPU_UP_PREPARE_FROZEN:
16 rcu_online_cpu(cpu);
17 break;
18 caseCPU_DEAD:
19 caseCPU_DEAD_FROZEN:
20 caseCPU_UP_CANCELED:
21 caseCPU_UP_CANCELED_FROZEN:
22 rcu_offline_cpu(cpu);
23 break;
24 default:
25 break;
26 }
27 return NOTIFY_OK;
28 }
rcu_needs_cpu()和rcu_cpu_notify代码
上图展示了rcu_needs_cpu()和rcu_cpu_notify()的代码,它们被LINUX内核调用,以检查 dynticks-idle 模式转换并处理CPU热插拨。
第1-5行显示了rcu_needs_cpu()的代码,它简单的检查特定的CPU是否有“rcu”(第3行)或者“rcu_bh”(第4行)回调。
第7-28显示了rcu_cpu_notify(),它是一个非常典型的、使用switch语句的CPU热插拨通知函数。如果特定的CPU上线,则在第16行调用rcu_online_cpu(),如果特定CPU离线,则在第22行调用rcu_offline_cpu。请特别注意:CPU热插拨操作不是原子的,因此可能穿越多个优雅周期。因此必须小心的处理CPU热插拨事件。
….
(由于篇幅关系,后面的无法贴下,关心原文的,请直接联系谢宝友老师)