From 559f9badd11ddf399f88b18b4c0f110fd511ae53 Mon Sep 17 00:00:00 2001 From: Dave Jones Date: Wed, 14 Mar 2012 22:17:39 -0400 Subject: rcu: List-debug variants of rcu list routines. * Make __list_add_rcu check the next->prev and prev->next pointers just like __list_add does. * Make list_del_rcu use __list_del_entry, which does the same checking at deletion time. Has been running for a week here without anything being tripped up, but it seems worth adding for completeness just in case something ever does corrupt those lists. Signed-off-by: Dave Jones Signed-off-by: Paul E. McKenney --- include/linux/rculist.h | 7 ++++++- lib/list_debug.c | 22 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/include/linux/rculist.h b/include/linux/rculist.h index d079290843a9..a20c05096231 100644 --- a/include/linux/rculist.h +++ b/include/linux/rculist.h @@ -30,6 +30,7 @@ * This is only for internal list manipulation where we know * the prev/next entries already! */ +#ifndef CONFIG_DEBUG_LIST static inline void __list_add_rcu(struct list_head *new, struct list_head *prev, struct list_head *next) { @@ -38,6 +39,10 @@ static inline void __list_add_rcu(struct list_head *new, rcu_assign_pointer(list_next_rcu(prev), new); next->prev = new; } +#else +extern void __list_add_rcu(struct list_head *new, + struct list_head *prev, struct list_head *next); +#endif /** * list_add_rcu - add a new entry to rcu-protected list @@ -108,7 +113,7 @@ static inline void list_add_tail_rcu(struct list_head *new, */ static inline void list_del_rcu(struct list_head *entry) { - __list_del(entry->prev, entry->next); + __list_del_entry(entry); entry->prev = LIST_POISON2; } diff --git a/lib/list_debug.c b/lib/list_debug.c index 982b850d4e7a..3810b481f940 100644 --- a/lib/list_debug.c +++ b/lib/list_debug.c @@ -10,6 +10,7 @@ #include #include #include +#include /* * Insert a new entry between two known consecutive entries. @@ -75,3 +76,24 @@ void list_del(struct list_head *entry) entry->prev = LIST_POISON2; } EXPORT_SYMBOL(list_del); + +/* + * RCU variants. + */ +void __list_add_rcu(struct list_head *new, + struct list_head *prev, struct list_head *next) +{ + WARN(next->prev != prev, + "list_add_rcu corruption. next->prev should be " + "prev (%p), but was %p. (next=%p).\n", + prev, next->prev, next); + WARN(prev->next != next, + "list_add_rcu corruption. prev->next should be " + "next (%p), but was %p. (prev=%p).\n", + next, prev->next, prev); + new->next = next; + new->prev = prev; + rcu_assign_pointer(list_next_rcu(prev), new); + next->prev = new; +} +EXPORT_SYMBOL(__list_add_rcu); -- cgit v1.2.3 From f88022a4f650ac1778cafcc17d2e522283bdf590 Mon Sep 17 00:00:00 2001 From: Michel Machado Date: Tue, 10 Apr 2012 14:07:40 -0400 Subject: rcu: Replace list_first_entry_rcu() with list_first_or_null_rcu() The list_first_entry_rcu() macro is inherently unsafe because it cannot be applied to an empty list. But because RCU readers do not exclude updaters, a list might become empty between the time that list_empty() claimed it was non-empty and the time that list_first_entry_rcu() is invoked. Therefore, the list_empty() test cannot be separated from the list_first_entry_rcu() call. This commit therefore combines these to macros to create a new list_first_or_null_rcu() macro that replaces the old (and unsafe) list_first_entry_rcu() macro. This patch incorporates Paul's review comments on the previous version of this patch available here: https://lkml.org/lkml/2012/4/2/536 This patch cannot break any upstream code because list_first_entry_rcu() is not being used anywhere in the kernel (tested with grep(1)), and any external code using it is probably broken as a result of using it. Signed-off-by: Michel Machado CC: "Paul E. McKenney" CC: Dipankar Sarma Signed-off-by: Paul E. McKenney --- include/linux/rculist.h | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/include/linux/rculist.h b/include/linux/rculist.h index a20c05096231..e0f0fab20415 100644 --- a/include/linux/rculist.h +++ b/include/linux/rculist.h @@ -233,18 +233,43 @@ static inline void list_splice_init_rcu(struct list_head *list, }) /** - * list_first_entry_rcu - get the first element from a list + * Where are list_empty_rcu() and list_first_entry_rcu()? + * + * Implementing those functions following their counterparts list_empty() and + * list_first_entry() is not advisable because they lead to subtle race + * conditions as the following snippet shows: + * + * if (!list_empty_rcu(mylist)) { + * struct foo *bar = list_first_entry_rcu(mylist, struct foo, list_member); + * do_something(bar); + * } + * + * The list may not be empty when list_empty_rcu checks it, but it may be when + * list_first_entry_rcu rereads the ->next pointer. + * + * Rereading the ->next pointer is not a problem for list_empty() and + * list_first_entry() because they would be protected by a lock that blocks + * writers. + * + * See list_first_or_null_rcu for an alternative. + */ + +/** + * list_first_or_null_rcu - get the first element from a list * @ptr: the list head to take the element from. * @type: the type of the struct this is embedded in. * @member: the name of the list_struct within the struct. * - * Note, that list is expected to be not empty. + * Note that if the list is empty, it returns NULL. * * This primitive may safely run concurrently with the _rcu list-mutation * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock(). */ -#define list_first_entry_rcu(ptr, type, member) \ - list_entry_rcu((ptr)->next, type, member) +#define list_first_or_null_rcu(ptr, type, member) \ + ({struct list_head *__ptr = (ptr); \ + struct list_head __rcu *__next = list_next_rcu(__ptr); \ + likely(__ptr != __next) ? container_of(__next, type, member) : NULL; \ + }) /** * list_for_each_entry_rcu - iterate over rcu list of given type -- cgit v1.2.3 From c9336643e1440f4dfc89ad4ac6185813619abb8c Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Wed, 18 Apr 2012 16:20:18 -0700 Subject: rcu: Clarify help text for RCU_BOOST_PRIO MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The old text confused real-time applications with real-time threads, so that you pretty much needed to understand how this kernel configuration parameter worked to understand the help text. This commit therefore attempts to make the help text human-readable. Reported-by: Jörn Engel Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney --- init/Kconfig | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/init/Kconfig b/init/Kconfig index 6cfd71d06463..85c6870ed476 100644 --- a/init/Kconfig +++ b/init/Kconfig @@ -515,10 +515,25 @@ config RCU_BOOST_PRIO depends on RCU_BOOST default 1 help - This option specifies the real-time priority to which preempted - RCU readers are to be boosted. If you are working with CPU-bound - real-time applications, you should specify a priority higher then - the highest-priority CPU-bound application. + This option specifies the real-time priority to which long-term + preempted RCU readers are to be boosted. If you are working + with a real-time application that has one or more CPU-bound + threads running at a real-time priority level, you should set + RCU_BOOST_PRIO to a priority higher then the highest-priority + real-time CPU-bound thread. The default RCU_BOOST_PRIO value + of 1 is appropriate in the common case, which is real-time + applications that do not have any CPU-bound threads. + + Some real-time applications might not have a single real-time + thread that saturates a given CPU, but instead might have + multiple real-time threads that, taken together, fully utilize + that CPU. In this case, you should set RCU_BOOST_PRIO to + a priority higher than the lowest-priority thread that is + conspiring to prevent the CPU from running any non-real-time + tasks. For example, if one thread at priority 10 and another + thread at priority 5 are between themselves fully consuming + the CPU time on a given CPU, then RCU_BOOST_PRIO should be + set to priority 6 or higher. Specify the real-time priority, or take the default if unsure. -- cgit v1.2.3 From d8169d4c369e8aa2fda10df705a4957331b5a4db Mon Sep 17 00:00:00 2001 From: Jan Engelhardt Date: Thu, 19 Apr 2012 11:44:39 -0700 Subject: rcu: Make __kfree_rcu() less dependent on compiler choices Currently, __kfree_rcu() is implemented as an inline function, and contains a BUILD_BUG_ON() that malfunctions if __kfree_rcu() is compiled as an out-of-line function. Unfortunately, there are compiler settings (e.g., -O0) that can result in __kfree_rcu() being compiled out of line, resulting in annoying build breakage. This commit therefore converts both __kfree_rcu() and __is_kfree_rcu_offset() from inline functions to macros to prevent such misbehavior on the part of the compiler. Signed-off-by: Jan Engelhardt Signed-off-by: Paul E. McKenney Reviewed-by: Josh Triplett --- include/linux/rcupdate.h | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h index 20fb776a1d4a..d5dfb109dfe1 100644 --- a/include/linux/rcupdate.h +++ b/include/linux/rcupdate.h @@ -922,6 +922,21 @@ void __kfree_rcu(struct rcu_head *head, unsigned long offset) kfree_call_rcu(head, (rcu_callback)offset); } +/* + * Does the specified offset indicate that the corresponding rcu_head + * structure can be handled by kfree_rcu()? + */ +#define __is_kfree_rcu_offset(offset) ((offset) < 4096) + +/* + * Helper macro for kfree_rcu() to prevent argument-expansion eyestrain. + */ +#define __kfree_rcu(head, offset) \ + do { \ + BUILD_BUG_ON(!__is_kfree_rcu_offset(offset)); \ + call_rcu(head, (void (*)(struct rcu_head *))(unsigned long)(offset)); \ + } while (0) + /** * kfree_rcu() - kfree an object after a grace period. * @ptr: pointer to kfree @@ -944,6 +959,9 @@ void __kfree_rcu(struct rcu_head *head, unsigned long offset) * * Note that the allowable offset might decrease in the future, for example, * to allow something like kmem_cache_free_rcu(). + * + * The BUILD_BUG_ON check must not involve any function calls, hence the + * checks are done in macros here. */ #define kfree_rcu(ptr, rcu_head) \ __kfree_rcu(&((ptr)->rcu_head), offsetof(typeof(*(ptr)), rcu_head)) -- cgit v1.2.3 From 8932a63d5edb02f714d50c26583152fe0a97a69c Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Thu, 19 Apr 2012 12:20:14 -0700 Subject: rcu: Reduce cache-miss initialization latencies for large systems Commit #0209f649 (rcu: limit rcu_node leaf-level fanout) set an upper limit of 16 on the leaf-level fanout for the rcu_node tree. This was needed to reduce lock contention that was induced by the synchronization of scheduling-clock interrupts, which was in turn needed to improve energy efficiency for moderate-sized lightly loaded servers. However, reducing the leaf-level fanout means that there are more leaf-level rcu_node structures in the tree, which in turn means that RCU's grace-period initialization incurs more cache misses. This is not a problem on moderate-sized servers with only a few tens of CPUs, but becomes a major source of real-time latency spikes on systems with many hundreds of CPUs. In addition, the workloads running on these large systems tend to be CPU-bound, which eliminates the energy-efficiency advantages of synchronizing scheduling-clock interrupts. Therefore, these systems need maximal values for the rcu_node leaf-level fanout. This commit addresses this problem by introducing a new kernel parameter named RCU_FANOUT_LEAF that directly controls the leaf-level fanout. This parameter defaults to 16 to handle the common case of a moderate sized lightly loaded servers, but may be set higher on larger systems. Reported-by: Mike Galbraith Reported-by: Dimitri Sivanich Signed-off-by: Paul E. McKenney --- init/Kconfig | 27 +++++++++++++++++++++++++++ kernel/rcutree.c | 2 +- kernel/rcutree.h | 10 +++------- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/init/Kconfig b/init/Kconfig index 85c6870ed476..6d18ef8071b5 100644 --- a/init/Kconfig +++ b/init/Kconfig @@ -458,6 +458,33 @@ config RCU_FANOUT Select a specific number if testing RCU itself. Take the default if unsure. +config RCU_FANOUT_LEAF + int "Tree-based hierarchical RCU leaf-level fanout value" + range 2 RCU_FANOUT if 64BIT + range 2 RCU_FANOUT if !64BIT + depends on TREE_RCU || TREE_PREEMPT_RCU + default 16 + help + This option controls the leaf-level fanout of hierarchical + implementations of RCU, and allows trading off cache misses + against lock contention. Systems that synchronize their + scheduling-clock interrupts for energy-efficiency reasons will + want the default because the smaller leaf-level fanout keeps + lock contention levels acceptably low. Very large systems + (hundreds or thousands of CPUs) will instead want to set this + value to the maximum value possible in order to reduce the + number of cache misses incurred during RCU's grace-period + initialization. These systems tend to run CPU-bound, and thus + are not helped by synchronized interrupts, and thus tend to + skew them, which reduces lock contention enough that large + leaf-level fanouts work well. + + Select a specific number if testing RCU itself. + + Select the maximum permissible value for large systems. + + Take the default if unsure. + config RCU_FANOUT_EXACT bool "Disable tree-based hierarchical RCU auto-balancing" depends on TREE_RCU || TREE_PREEMPT_RCU diff --git a/kernel/rcutree.c b/kernel/rcutree.c index 1050d6d3922c..780acf8e15e9 100644 --- a/kernel/rcutree.c +++ b/kernel/rcutree.c @@ -2418,7 +2418,7 @@ static void __init rcu_init_levelspread(struct rcu_state *rsp) for (i = NUM_RCU_LVLS - 1; i > 0; i--) rsp->levelspread[i] = CONFIG_RCU_FANOUT; - rsp->levelspread[0] = RCU_FANOUT_LEAF; + rsp->levelspread[0] = CONFIG_RCU_FANOUT_LEAF; } #else /* #ifdef CONFIG_RCU_FANOUT_EXACT */ static void __init rcu_init_levelspread(struct rcu_state *rsp) diff --git a/kernel/rcutree.h b/kernel/rcutree.h index cdd1be0a4072..a905c200405c 100644 --- a/kernel/rcutree.h +++ b/kernel/rcutree.h @@ -29,18 +29,14 @@ #include /* - * Define shape of hierarchy based on NR_CPUS and CONFIG_RCU_FANOUT. + * Define shape of hierarchy based on NR_CPUS, CONFIG_RCU_FANOUT, and + * CONFIG_RCU_FANOUT_LEAF. * In theory, it should be possible to add more levels straightforwardly. * In practice, this did work well going from three levels to four. * Of course, your mileage may vary. */ #define MAX_RCU_LVLS 4 -#if CONFIG_RCU_FANOUT > 16 -#define RCU_FANOUT_LEAF 16 -#else /* #if CONFIG_RCU_FANOUT > 16 */ -#define RCU_FANOUT_LEAF (CONFIG_RCU_FANOUT) -#endif /* #else #if CONFIG_RCU_FANOUT > 16 */ -#define RCU_FANOUT_1 (RCU_FANOUT_LEAF) +#define RCU_FANOUT_1 (CONFIG_RCU_FANOUT_LEAF) #define RCU_FANOUT_2 (RCU_FANOUT_1 * CONFIG_RCU_FANOUT) #define RCU_FANOUT_3 (RCU_FANOUT_2 * CONFIG_RCU_FANOUT) #define RCU_FANOUT_4 (RCU_FANOUT_3 * CONFIG_RCU_FANOUT) -- cgit v1.2.3 From dabb8aa96020bde8359bc73e76c484dd7ff9b7f2 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Mon, 23 Apr 2012 10:54:45 -0700 Subject: rcu: Document kernel command-line parameters Bring RCU's kernel command-line parameter documentation up to date. Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney --- Documentation/kernel-parameters.txt | 88 +++++++++++++++++++++++++++++++++++-- 1 file changed, 85 insertions(+), 3 deletions(-) diff --git a/Documentation/kernel-parameters.txt b/Documentation/kernel-parameters.txt index c1601e5a8b71..ab84a01c8d68 100644 --- a/Documentation/kernel-parameters.txt +++ b/Documentation/kernel-parameters.txt @@ -2330,18 +2330,100 @@ bytes respectively. Such letter suffixes can also be entirely omitted. ramdisk_size= [RAM] Sizes of RAM disks in kilobytes See Documentation/blockdev/ramdisk.txt. - rcupdate.blimit= [KNL,BOOT] + rcutree.blimit= [KNL,BOOT] Set maximum number of finished RCU callbacks to process in one batch. - rcupdate.qhimark= [KNL,BOOT] + rcutree.qhimark= [KNL,BOOT] Set threshold of queued RCU callbacks over which batch limiting is disabled. - rcupdate.qlowmark= [KNL,BOOT] + rcutree.qlowmark= [KNL,BOOT] Set threshold of queued RCU callbacks below which batch limiting is re-enabled. + rcutree.rcu_cpu_stall_suppress= [KNL,BOOT] + Suppress RCU CPU stall warning messages. + + rcutree.rcu_cpu_stall_timeout= [KNL,BOOT] + Set timeout for RCU CPU stall warning messages. + + rcutorture.fqs_duration= [KNL,BOOT] + Set duration of force_quiescent_state bursts. + + rcutorture.fqs_holdoff= [KNL,BOOT] + Set holdoff time within force_quiescent_state bursts. + + rcutorture.fqs_stutter= [KNL,BOOT] + Set wait time between force_quiescent_state bursts. + + rcutorture.irqreader= [KNL,BOOT] + Test RCU readers from irq handlers. + + rcutorture.n_barrier_cbs= [KNL,BOOT] + Set callbacks/threads for rcu_barrier() testing. + + rcutorture.nfakewriters= [KNL,BOOT] + Set number of concurrent RCU writers. These just + stress RCU, they don't participate in the actual + test, hence the "fake". + + rcutorture.nreaders= [KNL,BOOT] + Set number of RCU readers. + + rcutorture.onoff_holdoff= [KNL,BOOT] + Set time (s) after boot for CPU-hotplug testing. + + rcutorture.onoff_interval= [KNL,BOOT] + Set time (s) between CPU-hotplug operations, or + zero to disable CPU-hotplug testing. + + rcutorture.shuffle_interval= [KNL,BOOT] + Set task-shuffle interval (s). Shuffling tasks + allows some CPUs to go into dyntick-idle mode + during the rcutorture test. + + rcutorture.shutdown_secs= [KNL,BOOT] + Set time (s) after boot system shutdown. This + is useful for hands-off automated testing. + + rcutorture.stall_cpu= [KNL,BOOT] + Duration of CPU stall (s) to test RCU CPU stall + warnings, zero to disable. + + rcutorture.stall_cpu_holdoff= [KNL,BOOT] + Time to wait (s) after boot before inducing stall. + + rcutorture.stat_interval= [KNL,BOOT] + Time (s) between statistics printk()s. + + rcutorture.stutter= [KNL,BOOT] + Time (s) to stutter testing, for example, specifying + five seconds causes the test to run for five seconds, + wait for five seconds, and so on. This tests RCU's + ability to transition abruptly to and from idle. + + rcutorture.test_boost= [KNL,BOOT] + Test RCU priority boosting? 0=no, 1=maybe, 2=yes. + "Maybe" means test if the RCU implementation + under test support RCU priority boosting. + + rcutorture.test_boost_duration= [KNL,BOOT] + Duration (s) of each individual boost test. + + rcutorture.test_boost_interval= [KNL,BOOT] + Interval (s) between each boost test. + + rcutorture.test_no_idle_hz= [KNL,BOOT] + Test RCU's dyntick-idle handling. See also the + rcutorture.shuffle_interval parameter. + + rcutorture.torture_type= [KNL,BOOT] + Specify the RCU implementation to test. + + rcutorture.verbose= [KNL,BOOT] + Enable additional printk() statements. + rdinit= [KNL] Format: Run specified binary instead of /init from the ramdisk, -- cgit v1.2.3 From 6d8133919bac4270883b24328500875a49e71b36 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Thu, 23 Feb 2012 13:30:16 -0800 Subject: rcu: Document why rcu_blocking_is_gp() is safe The rcu_blocking_is_gp() function tests to see if there is only one online CPU, and if so, synchronize_sched() and friends become no-ops. However, for larger systems, num_online_cpus() scans a large vector, and might be preempted while doing so. While preempted, any number of CPUs might come online and go offline, potentially resulting in num_online_cpus() returning 1 when there never had only been one CPU online. This could result in a too-short RCU grace period, which could in turn result in total failure, except that the only way that the grace period is too short is if there is an RCU read-side critical section spanning it. For RCU-sched and RCU-bh (which are the only cases using rcu_blocking_is_gp()), RCU read-side critical sections have either preemption or bh disabled, which prevents CPUs from going offline. This in turn prevents actual failures from occurring. This commit therefore adds a large block comment to rcu_blocking_is_gp() documenting why it is safe. This commit also moves rcu_blocking_is_gp() into kernel/rcutree.c, which should help prevent unwary developers from mistaking it for a generally useful function. Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney --- include/linux/rcutree.h | 7 ------- kernel/rcutree.c | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h index e8ee5dd0854c..b06363055ef8 100644 --- a/include/linux/rcutree.h +++ b/include/linux/rcutree.h @@ -98,13 +98,6 @@ extern void rcu_force_quiescent_state(void); extern void rcu_bh_force_quiescent_state(void); extern void rcu_sched_force_quiescent_state(void); -/* A context switch is a grace period for RCU-sched and RCU-bh. */ -static inline int rcu_blocking_is_gp(void) -{ - might_sleep(); /* Check for RCU read-side critical section. */ - return num_online_cpus() == 1; -} - extern void rcu_scheduler_starting(void); extern int rcu_scheduler_active __read_mostly; diff --git a/kernel/rcutree.c b/kernel/rcutree.c index 780acf8e15e9..8f6a344306e6 100644 --- a/kernel/rcutree.c +++ b/kernel/rcutree.c @@ -1894,6 +1894,38 @@ void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu)) } EXPORT_SYMBOL_GPL(call_rcu_bh); +/* + * Because a context switch is a grace period for RCU-sched and RCU-bh, + * any blocking grace-period wait automatically implies a grace period + * if there is only one CPU online at any point time during execution + * of either synchronize_sched() or synchronize_rcu_bh(). It is OK to + * occasionally incorrectly indicate that there are multiple CPUs online + * when there was in fact only one the whole time, as this just adds + * some overhead: RCU still operates correctly. + * + * Of course, sampling num_online_cpus() with preemption enabled can + * give erroneous results if there are concurrent CPU-hotplug operations. + * For example, given a demonic sequence of preemptions in num_online_cpus() + * and CPU-hotplug operations, there could be two or more CPUs online at + * all times, but num_online_cpus() might well return one (or even zero). + * + * However, all such demonic sequences require at least one CPU-offline + * operation. Furthermore, rcu_blocking_is_gp() giving the wrong answer + * is only a problem if there is an RCU read-side critical section executing + * throughout. But RCU-sched and RCU-bh read-side critical sections + * disable either preemption or bh, which prevents a CPU from going offline. + * Therefore, the only way that rcu_blocking_is_gp() can incorrectly return + * that there is only one CPU when in fact there was more than one throughout + * is when there were no RCU readers in the system. If there are no + * RCU readers, the grace period by definition can be of zero length, + * regardless of the number of online CPUs. + */ +static inline int rcu_blocking_is_gp(void) +{ + might_sleep(); /* Check for RCU read-side critical section. */ + return num_online_cpus() <= 1; +} + /** * synchronize_sched - wait until an rcu-sched grace period has elapsed. * -- cgit v1.2.3 From 2fdbb31b662787f78bb78b3e4e18f1a072058ffc Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Thu, 23 Feb 2012 15:58:29 -0800 Subject: rcu: Add RCU_FAST_NO_HZ tracing for idle exit Traces of rcu_prep_idle events can be confusing because rcu_cleanup_after_idle() does no tracing. This commit therefore adds this tracing. Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney --- include/trace/events/rcu.h | 1 + kernel/rcutree_plugin.h | 1 + 2 files changed, 2 insertions(+) diff --git a/include/trace/events/rcu.h b/include/trace/events/rcu.h index 337099783f37..aaa55e1b8c48 100644 --- a/include/trace/events/rcu.h +++ b/include/trace/events/rcu.h @@ -292,6 +292,7 @@ TRACE_EVENT(rcu_dyntick, * "More callbacks": Still more callbacks, try again to clear them out. * "Callbacks drained": All callbacks processed, off to dyntick idle! * "Timer": Timer fired to cause CPU to continue processing callbacks. + * "Cleanup after idle": Idle exited, timer canceled. */ TRACE_EVENT(rcu_prep_idle, diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h index c023464816be..1e561ab952a3 100644 --- a/kernel/rcutree_plugin.h +++ b/kernel/rcutree_plugin.h @@ -2085,6 +2085,7 @@ static void rcu_prepare_for_idle_init(int cpu) static void rcu_cleanup_after_idle(int cpu) { hrtimer_cancel(&per_cpu(rcu_idle_gp_timer, cpu)); + trace_rcu_prep_idle("Cleanup after idle"); } /* -- cgit v1.2.3 From 2ee3dc80660ac8285a37e662fd91b2e45c46f06a Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Thu, 23 Feb 2012 17:13:19 -0800 Subject: rcu: Make RCU_FAST_NO_HZ use timer rather than hrtimer The RCU_FAST_NO_HZ facility uses an hrtimer to wake up a CPU when it is allowed to go into dyntick-idle mode, which is almost always cancelled soon after. This is not what hrtimers are good at, so this commit switches to the timer wheel. Reported-by: Steven Rostedt Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney --- kernel/rcutree_plugin.h | 40 ++++++++++++---------------------------- 1 file changed, 12 insertions(+), 28 deletions(-) diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h index 1e561ab952a3..0f007b363dba 100644 --- a/kernel/rcutree_plugin.h +++ b/kernel/rcutree_plugin.h @@ -1980,9 +1980,7 @@ static void rcu_prepare_for_idle(int cpu) static DEFINE_PER_CPU(int, rcu_dyntick_drain); static DEFINE_PER_CPU(unsigned long, rcu_dyntick_holdoff); -static DEFINE_PER_CPU(struct hrtimer, rcu_idle_gp_timer); -static ktime_t rcu_idle_gp_wait; /* If some non-lazy callbacks. */ -static ktime_t rcu_idle_lazy_gp_wait; /* If only lazy callbacks. */ +static DEFINE_PER_CPU(struct timer_list, rcu_idle_gp_timer); /* * Allow the CPU to enter dyntick-idle mode if either: (1) There are no @@ -2051,10 +2049,9 @@ static bool rcu_cpu_has_nonlazy_callbacks(int cpu) * real work is done upon re-entry to idle, or by the next scheduling-clock * interrupt should idle not be re-entered. */ -static enum hrtimer_restart rcu_idle_gp_timer_func(struct hrtimer *hrtp) +static void rcu_idle_gp_timer_func(unsigned long unused) { trace_rcu_prep_idle("Timer"); - return HRTIMER_NORESTART; } /* @@ -2062,19 +2059,8 @@ static enum hrtimer_restart rcu_idle_gp_timer_func(struct hrtimer *hrtp) */ static void rcu_prepare_for_idle_init(int cpu) { - static int firsttime = 1; - struct hrtimer *hrtp = &per_cpu(rcu_idle_gp_timer, cpu); - - hrtimer_init(hrtp, CLOCK_MONOTONIC, HRTIMER_MODE_REL); - hrtp->function = rcu_idle_gp_timer_func; - if (firsttime) { - unsigned int upj = jiffies_to_usecs(RCU_IDLE_GP_DELAY); - - rcu_idle_gp_wait = ns_to_ktime(upj * (u64)1000); - upj = jiffies_to_usecs(RCU_IDLE_LAZY_GP_DELAY); - rcu_idle_lazy_gp_wait = ns_to_ktime(upj * (u64)1000); - firsttime = 0; - } + setup_timer(&per_cpu(rcu_idle_gp_timer, cpu), + rcu_idle_gp_timer_func, 0); } /* @@ -2084,7 +2070,7 @@ static void rcu_prepare_for_idle_init(int cpu) */ static void rcu_cleanup_after_idle(int cpu) { - hrtimer_cancel(&per_cpu(rcu_idle_gp_timer, cpu)); + del_timer(&per_cpu(rcu_idle_gp_timer, cpu)); trace_rcu_prep_idle("Cleanup after idle"); } @@ -2141,11 +2127,11 @@ static void rcu_prepare_for_idle(int cpu) per_cpu(rcu_dyntick_drain, cpu) = 0; per_cpu(rcu_dyntick_holdoff, cpu) = jiffies; if (rcu_cpu_has_nonlazy_callbacks(cpu)) - hrtimer_start(&per_cpu(rcu_idle_gp_timer, cpu), - rcu_idle_gp_wait, HRTIMER_MODE_REL); + mod_timer(&per_cpu(rcu_idle_gp_timer, cpu), + jiffies + RCU_IDLE_GP_DELAY); else - hrtimer_start(&per_cpu(rcu_idle_gp_timer, cpu), - rcu_idle_lazy_gp_wait, HRTIMER_MODE_REL); + mod_timer(&per_cpu(rcu_idle_gp_timer, cpu), + jiffies + RCU_IDLE_LAZY_GP_DELAY); return; /* Nothing more to do immediately. */ } else if (--per_cpu(rcu_dyntick_drain, cpu) <= 0) { /* We have hit the limit, so time to give up. */ @@ -2193,14 +2179,12 @@ static void rcu_prepare_for_idle(int cpu) static void print_cpu_stall_fast_no_hz(char *cp, int cpu) { - struct hrtimer *hrtp = &per_cpu(rcu_idle_gp_timer, cpu); + struct timer_list *tltp = &per_cpu(rcu_idle_gp_timer, cpu); - sprintf(cp, "drain=%d %c timer=%lld", + sprintf(cp, "drain=%d %c timer=%lu", per_cpu(rcu_dyntick_drain, cpu), per_cpu(rcu_dyntick_holdoff, cpu) == jiffies ? 'H' : '.', - hrtimer_active(hrtp) - ? ktime_to_us(hrtimer_get_remaining(hrtp)) - : -1); + timer_pending(tltp) ? tltp->expires - jiffies : -1); } #else /* #ifdef CONFIG_RCU_FAST_NO_HZ */ -- cgit v1.2.3 From c57afe80db4e169135eb675acc2d241e26cc064e Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Tue, 28 Feb 2012 11:02:21 -0800 Subject: rcu: Make RCU_FAST_NO_HZ account for pauses out of idle Both Steven Rostedt's new idle-capable trace macros and the RCU_NONIDLE() macro can cause RCU to momentarily pause out of idle without the rest of the system being involved. This can cause rcu_prepare_for_idle() to run through its state machine too quickly, which can in turn result in needless scheduling-clock interrupts. This commit therefore adds code to enable rcu_prepare_for_idle() to distinguish between an initial entry to idle on the one hand (which needs to advance the rcu_prepare_for_idle() state machine) and an idle reentry due to idle-capable trace macros and RCU_NONIDLE() on the other hand (which should avoid advancing the rcu_prepare_for_idle() state machine). Additional state is maintained to allow the timer to be correctly reposted when returning after a momentary pause out of idle, and even more state is maintained to detect when new non-lazy callbacks have been enqueued (which may require re-evaluation of the approach to idleness). Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney --- kernel/rcutree.c | 2 ++ kernel/rcutree.h | 1 + kernel/rcutree_plugin.h | 57 +++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/kernel/rcutree.c b/kernel/rcutree.c index 1050d6d3922c..403306b86e78 100644 --- a/kernel/rcutree.c +++ b/kernel/rcutree.c @@ -1829,6 +1829,8 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu), rdp->qlen++; if (lazy) rdp->qlen_lazy++; + else + rcu_idle_count_callbacks_posted(); if (__is_kfree_rcu_offset((unsigned long)func)) trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func, diff --git a/kernel/rcutree.h b/kernel/rcutree.h index cdd1be0a4072..36ca28ecedc6 100644 --- a/kernel/rcutree.h +++ b/kernel/rcutree.h @@ -471,6 +471,7 @@ static void __cpuinit rcu_prepare_kthreads(int cpu); static void rcu_prepare_for_idle_init(int cpu); static void rcu_cleanup_after_idle(int cpu); static void rcu_prepare_for_idle(int cpu); +static void rcu_idle_count_callbacks_posted(void); static void print_cpu_stall_info_begin(void); static void print_cpu_stall_info(struct rcu_state *rsp, int cpu); static void print_cpu_stall_info_end(void); diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h index 0f007b363dba..50c17975d4f4 100644 --- a/kernel/rcutree_plugin.h +++ b/kernel/rcutree_plugin.h @@ -1938,6 +1938,14 @@ static void rcu_prepare_for_idle(int cpu) { } +/* + * Don't bother keeping a running count of the number of RCU callbacks + * posted because CONFIG_RCU_FAST_NO_HZ=n. + */ +static void rcu_idle_count_callbacks_posted(void) +{ +} + #else /* #if !defined(CONFIG_RCU_FAST_NO_HZ) */ /* @@ -1981,6 +1989,10 @@ static void rcu_prepare_for_idle(int cpu) static DEFINE_PER_CPU(int, rcu_dyntick_drain); static DEFINE_PER_CPU(unsigned long, rcu_dyntick_holdoff); static DEFINE_PER_CPU(struct timer_list, rcu_idle_gp_timer); +static DEFINE_PER_CPU(unsigned long, rcu_idle_gp_timer_expires); +static DEFINE_PER_CPU(bool, rcu_idle_first_pass); +static DEFINE_PER_CPU(unsigned long, rcu_nonlazy_posted); +static DEFINE_PER_CPU(unsigned long, rcu_nonlazy_posted_snap); /* * Allow the CPU to enter dyntick-idle mode if either: (1) There are no @@ -1993,6 +2005,8 @@ static DEFINE_PER_CPU(struct timer_list, rcu_idle_gp_timer); */ int rcu_needs_cpu(int cpu) { + /* Flag a new idle sojourn to the idle-entry state machine. */ + per_cpu(rcu_idle_first_pass, cpu) = 1; /* If no callbacks, RCU doesn't need the CPU. */ if (!rcu_cpu_has_callbacks(cpu)) return 0; @@ -2095,6 +2109,26 @@ static void rcu_cleanup_after_idle(int cpu) */ static void rcu_prepare_for_idle(int cpu) { + /* + * If this is an idle re-entry, for example, due to use of + * RCU_NONIDLE() or the new idle-loop tracing API within the idle + * loop, then don't take any state-machine actions, unless the + * momentary exit from idle queued additional non-lazy callbacks. + * Instead, repost the rcu_idle_gp_timer if this CPU has callbacks + * pending. + */ + if (!per_cpu(rcu_idle_first_pass, cpu) && + (per_cpu(rcu_nonlazy_posted, cpu) == + per_cpu(rcu_nonlazy_posted_snap, cpu))) { + if (rcu_cpu_has_callbacks(cpu)) + mod_timer(&per_cpu(rcu_idle_gp_timer, cpu), + per_cpu(rcu_idle_gp_timer_expires, cpu)); + return; + } + per_cpu(rcu_idle_first_pass, cpu) = 0; + per_cpu(rcu_nonlazy_posted_snap, cpu) = + per_cpu(rcu_nonlazy_posted, cpu) - 1; + /* * If there are no callbacks on this CPU, enter dyntick-idle mode. * Also reset state to avoid prejudicing later attempts. @@ -2127,11 +2161,15 @@ static void rcu_prepare_for_idle(int cpu) per_cpu(rcu_dyntick_drain, cpu) = 0; per_cpu(rcu_dyntick_holdoff, cpu) = jiffies; if (rcu_cpu_has_nonlazy_callbacks(cpu)) - mod_timer(&per_cpu(rcu_idle_gp_timer, cpu), - jiffies + RCU_IDLE_GP_DELAY); + per_cpu(rcu_idle_gp_timer_expires, cpu) = + jiffies + RCU_IDLE_GP_DELAY; else - mod_timer(&per_cpu(rcu_idle_gp_timer, cpu), - jiffies + RCU_IDLE_LAZY_GP_DELAY); + per_cpu(rcu_idle_gp_timer_expires, cpu) = + jiffies + RCU_IDLE_LAZY_GP_DELAY; + mod_timer(&per_cpu(rcu_idle_gp_timer, cpu), + per_cpu(rcu_idle_gp_timer_expires, cpu)); + per_cpu(rcu_nonlazy_posted_snap, cpu) = + per_cpu(rcu_nonlazy_posted, cpu); return; /* Nothing more to do immediately. */ } else if (--per_cpu(rcu_dyntick_drain, cpu) <= 0) { /* We have hit the limit, so time to give up. */ @@ -2171,6 +2209,17 @@ static void rcu_prepare_for_idle(int cpu) trace_rcu_prep_idle("Callbacks drained"); } +/* + * Keep a running count of callbacks posted so that rcu_prepare_for_idle() + * can detect when something out of the idle loop posts a callback. + * Of course, it had better do so either from a trace event designed to + * be called from idle or from within RCU_NONIDLE(). + */ +static void rcu_idle_count_callbacks_posted(void) +{ + __this_cpu_add(rcu_nonlazy_posted, 1); +} + #endif /* #else #if !defined(CONFIG_RCU_FAST_NO_HZ) */ #ifdef CONFIG_RCU_CPU_STALL_INFO -- cgit v1.2.3 From 37e377d2823e03528cb64f435d7c0e30b1c668eb Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Fri, 17 Feb 2012 22:12:18 -0800 Subject: rcu: Fixes to rcutorture error handling and cleanup The rcutorture initialization code ignored the error returns from rcu_torture_onoff_init() and rcu_torture_stall_init(). The rcutorture cleanup code failed to NULL out a number of pointers. These bugs will normally have no effect, but this commit fixes them nevertheless. Signed-off-by: Paul E. McKenney --- kernel/rcutorture.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/kernel/rcutorture.c b/kernel/rcutorture.c index a89b381a8c6e..1463a0636443 100644 --- a/kernel/rcutorture.c +++ b/kernel/rcutorture.c @@ -1337,6 +1337,7 @@ static void rcutorture_booster_cleanup(int cpu) /* This must be outside of the mutex, otherwise deadlock! */ kthread_stop(t); + boost_tasks[cpu] = NULL; } static int rcutorture_booster_init(int cpu) @@ -1484,13 +1485,15 @@ static void rcu_torture_onoff_cleanup(void) return; VERBOSE_PRINTK_STRING("Stopping rcu_torture_onoff task"); kthread_stop(onoff_task); + onoff_task = NULL; } #else /* #ifdef CONFIG_HOTPLUG_CPU */ -static void +static int rcu_torture_onoff_init(void) { + return 0; } static void rcu_torture_onoff_cleanup(void) @@ -1554,6 +1557,7 @@ static void rcu_torture_stall_cleanup(void) return; VERBOSE_PRINTK_STRING("Stopping rcu_torture_stall_task."); kthread_stop(stall_task); + stall_task = NULL; } static int rcutorture_cpu_notify(struct notifier_block *self, @@ -1665,6 +1669,7 @@ rcu_torture_cleanup(void) VERBOSE_PRINTK_STRING("Stopping rcu_torture_shutdown task"); kthread_stop(shutdown_task); } + shutdown_task = NULL; rcu_torture_onoff_cleanup(); /* Wait for all RCU callbacks to fire. */ @@ -1897,9 +1902,17 @@ rcu_torture_init(void) goto unwind; } } - rcu_torture_onoff_init(); + i = rcu_torture_onoff_init(); + if (i != 0) { + firsterr = i; + goto unwind; + } register_reboot_notifier(&rcutorture_shutdown_nb); - rcu_torture_stall_init(); + i = rcu_torture_stall_init(); + if (i != 0) { + firsterr = i; + goto unwind; + } rcutorture_record_test_transition(); mutex_unlock(&fullstop_mutex); return 0; -- cgit v1.2.3 From 79b9a75fb703b6a2670e46b9dc495af5bc7029b3 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Mon, 23 Apr 2012 10:06:39 -0700 Subject: rcu: Add warning for RCU_FAST_NO_HZ timer firing RCU_FAST_NO_HZ uses a timer to limit the time that a CPU with callbacks can remain in dyntick-idle mode. This timer is cancelled when the CPU exits idle, and therefore should never fire. However, if the timer were migrated to some other CPU for whatever reason (1) the timer could actually fire and (2) firing on some other CPU would fail to wake up the CPU with callbacks, possibly resulting in sluggishness or a system hang. This commit therfore adds a WARN_ON_ONCE() to the timer handler in order to detect this condition. Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney --- kernel/rcutree_plugin.h | 1 + 1 file changed, 1 insertion(+) diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h index 50c17975d4f4..ad61da79b311 100644 --- a/kernel/rcutree_plugin.h +++ b/kernel/rcutree_plugin.h @@ -2065,6 +2065,7 @@ static bool rcu_cpu_has_nonlazy_callbacks(int cpu) */ static void rcu_idle_gp_timer_func(unsigned long unused) { + WARN_ON_ONCE(1); /* Getting here can hang the system... */ trace_rcu_prep_idle("Timer"); } -- cgit v1.2.3 From 048a0e8f5e1d94c01a5fc70f5b2f2fd2f4527326 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Thu, 26 Apr 2012 10:52:27 -0700 Subject: timer: Fix mod_timer_pinned() header comment The mod_timer_pinned() header comment states that it prevents timers from being migrated to a different CPU. This is not the case, instead, it ensures that the timer is posted to the current CPU, but does nothing to prevent CPU-hotplug operations from migrating the timer. This commit therefore brings the comment header into alignment with reality. Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney Acked-by: Steven Rostedt --- kernel/timer.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/kernel/timer.c b/kernel/timer.c index a297ffcf888e..837c552fe838 100644 --- a/kernel/timer.c +++ b/kernel/timer.c @@ -861,7 +861,13 @@ EXPORT_SYMBOL(mod_timer); * * mod_timer_pinned() is a way to update the expire field of an * active timer (if the timer is inactive it will be activated) - * and not allow the timer to be migrated to a different CPU. + * and to ensure that the timer is scheduled on the current CPU. + * + * Note that this does not prevent the timer from being migrated + * when the current CPU goes offline. If this is a problem for + * you, use CPU-hotplug notifiers to handle it correctly, for + * example, cancelling the timer when the corresponding CPU goes + * offline. * * mod_timer_pinned(timer, expires) is equivalent to: * -- cgit v1.2.3 From fae4b54f28f034d228fa3bfc98858c698b64e89c Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Mon, 20 Feb 2012 17:51:45 -0800 Subject: rcu: Introduce rcutorture testing for rcu_barrier() Although rcutorture does invoke rcu_barrier() and friends, it cannot really be called a torture test given that it invokes them only once at the end of the test. This commit therefore introduces heavy-duty rcutorture testing for rcu_barrier(), which may be carried out concurrently with normal rcutorture testing. Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney --- Documentation/RCU/torture.txt | 15 +++- kernel/rcutorture.c | 194 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 200 insertions(+), 9 deletions(-) diff --git a/Documentation/RCU/torture.txt b/Documentation/RCU/torture.txt index 375d3fb71437..4ddf3913fd8c 100644 --- a/Documentation/RCU/torture.txt +++ b/Documentation/RCU/torture.txt @@ -47,6 +47,16 @@ irqreader Says to invoke RCU readers from irq level. This is currently permit this. (Or, more accurately, variants of RCU that do -not- permit this know to ignore this variable.) +n_barrier_cbs If this is nonzero, RCU barrier testing will be conducted, + in which case n_barrier_cbs specifies the number of + RCU callbacks (and corresponding kthreads) to use for + this testing. The value cannot be negative. If you + specify this to be non-zero when torture_type indicates a + synchronous RCU implementation (one for which a member of + the synchronize_rcu() rather than the call_rcu() family is + used -- see the documentation for torture_type below), an + error will be reported and no testing will be carried out. + nfakewriters This is the number of RCU fake writer threads to run. Fake writer threads repeatedly use the synchronous "wait for current readers" function of the interface selected by @@ -188,7 +198,7 @@ OUTPUT The statistics output is as follows: rcu-torture:--- Start of test: nreaders=16 nfakewriters=4 stat_interval=30 verbose=0 test_no_idle_hz=1 shuffle_interval=3 stutter=5 irqreader=1 fqs_duration=0 fqs_holdoff=0 fqs_stutter=3 test_boost=1/0 test_boost_interval=7 test_boost_duration=4 - rcu-torture: rtc: (null) ver: 155441 tfle: 0 rta: 155441 rtaf: 8884 rtf: 155440 rtmbe: 0 rtbke: 0 rtbre: 0 rtbf: 0 rtb: 0 nt: 3055767 + rcu-torture: rtc: (null) ver: 155441 tfle: 0 rta: 155441 rtaf: 8884 rtf: 155440 rtmbe: 0 rtbe: 0 rtbke: 0 rtbre: 0 rtbf: 0 rtb: 0 nt: 3055767 rcu-torture: Reader Pipe: 727860534 34213 0 0 0 0 0 0 0 0 0 rcu-torture: Reader Batch: 727877838 17003 0 0 0 0 0 0 0 0 0 rcu-torture: Free-Block Circulation: 155440 155440 155440 155440 155440 155440 155440 155440 155440 155440 0 @@ -230,6 +240,9 @@ o "rtmbe": A non-zero value indicates that rcutorture believes that rcu_assign_pointer() and rcu_dereference() are not working correctly. This value should be zero. +o "rtbe": A non-zero value indicates that one of the rcu_barrier() + family of functions is not working correctly. + o "rtbke": rcutorture was unable to create the real-time kthreads used to force RCU priority inversion. This value should be zero. diff --git a/kernel/rcutorture.c b/kernel/rcutorture.c index 1463a0636443..8cd262b41499 100644 --- a/kernel/rcutorture.c +++ b/kernel/rcutorture.c @@ -64,6 +64,7 @@ static int irqreader = 1; /* RCU readers from irq (timers). */ static int fqs_duration; /* Duration of bursts (us), 0 to disable. */ static int fqs_holdoff; /* Hold time within burst (us). */ static int fqs_stutter = 3; /* Wait time between bursts (s). */ +static int n_barrier_cbs; /* Number of callbacks to test RCU barriers. */ static int onoff_interval; /* Wait time between CPU hotplugs, 0=disable. */ static int onoff_holdoff; /* Seconds after boot before CPU hotplugs. */ static int shutdown_secs; /* Shutdown time (s). <=0 for no shutdown. */ @@ -96,6 +97,8 @@ module_param(fqs_holdoff, int, 0444); MODULE_PARM_DESC(fqs_holdoff, "Holdoff time within fqs bursts (us)"); module_param(fqs_stutter, int, 0444); MODULE_PARM_DESC(fqs_stutter, "Wait time between fqs bursts (s)"); +module_param(n_barrier_cbs, int, 0444); +MODULE_PARM_DESC(n_barrier_cbs, "# of callbacks/kthreads for barrier testing"); module_param(onoff_interval, int, 0444); MODULE_PARM_DESC(onoff_interval, "Time between CPU hotplugs (s), 0=disable"); module_param(onoff_holdoff, int, 0444); @@ -139,6 +142,8 @@ static struct task_struct *shutdown_task; static struct task_struct *onoff_task; #endif /* #ifdef CONFIG_HOTPLUG_CPU */ static struct task_struct *stall_task; +static struct task_struct **barrier_cbs_tasks; +static struct task_struct *barrier_task; #define RCU_TORTURE_PIPE_LEN 10 @@ -164,6 +169,7 @@ static atomic_t n_rcu_torture_alloc_fail; static atomic_t n_rcu_torture_free; static atomic_t n_rcu_torture_mberror; static atomic_t n_rcu_torture_error; +static long n_rcu_torture_barrier_error; static long n_rcu_torture_boost_ktrerror; static long n_rcu_torture_boost_rterror; static long n_rcu_torture_boost_failure; @@ -173,6 +179,8 @@ static long n_offline_attempts; static long n_offline_successes; static long n_online_attempts; static long n_online_successes; +static long n_barrier_attempts; +static long n_barrier_successes; static struct list_head rcu_torture_removed; static cpumask_var_t shuffle_tmp_mask; @@ -197,6 +205,10 @@ static unsigned long shutdown_time; /* jiffies to system shutdown. */ static unsigned long boost_starttime; /* jiffies of next boost test start. */ DEFINE_MUTEX(boost_mutex); /* protect setting boost_starttime */ /* and boost task create/destroy. */ +static atomic_t barrier_cbs_count; /* Barrier callbacks registered. */ +static atomic_t barrier_cbs_invoked; /* Barrier callbacks invoked. */ +static wait_queue_head_t *barrier_cbs_wq; /* Coordinate barrier testing. */ +static DECLARE_WAIT_QUEUE_HEAD(barrier_wq); /* Mediate rmmod and system shutdown. Concurrent rmmod & shutdown illegal! */ @@ -327,6 +339,7 @@ struct rcu_torture_ops { int (*completed)(void); void (*deferred_free)(struct rcu_torture *p); void (*sync)(void); + void (*call)(struct rcu_head *head, void (*func)(struct rcu_head *rcu)); void (*cb_barrier)(void); void (*fqs)(void); int (*stats)(char *page); @@ -417,6 +430,7 @@ static struct rcu_torture_ops rcu_ops = { .completed = rcu_torture_completed, .deferred_free = rcu_torture_deferred_free, .sync = synchronize_rcu, + .call = call_rcu, .cb_barrier = rcu_barrier, .fqs = rcu_force_quiescent_state, .stats = NULL, @@ -460,6 +474,7 @@ static struct rcu_torture_ops rcu_sync_ops = { .completed = rcu_torture_completed, .deferred_free = rcu_sync_torture_deferred_free, .sync = synchronize_rcu, + .call = NULL, .cb_barrier = NULL, .fqs = rcu_force_quiescent_state, .stats = NULL, @@ -477,6 +492,7 @@ static struct rcu_torture_ops rcu_expedited_ops = { .completed = rcu_no_completed, .deferred_free = rcu_sync_torture_deferred_free, .sync = synchronize_rcu_expedited, + .call = NULL, .cb_barrier = NULL, .fqs = rcu_force_quiescent_state, .stats = NULL, @@ -519,6 +535,7 @@ static struct rcu_torture_ops rcu_bh_ops = { .completed = rcu_bh_torture_completed, .deferred_free = rcu_bh_torture_deferred_free, .sync = synchronize_rcu_bh, + .call = call_rcu_bh, .cb_barrier = rcu_barrier_bh, .fqs = rcu_bh_force_quiescent_state, .stats = NULL, @@ -535,6 +552,7 @@ static struct rcu_torture_ops rcu_bh_sync_ops = { .completed = rcu_bh_torture_completed, .deferred_free = rcu_sync_torture_deferred_free, .sync = synchronize_rcu_bh, + .call = NULL, .cb_barrier = NULL, .fqs = rcu_bh_force_quiescent_state, .stats = NULL, @@ -551,6 +569,7 @@ static struct rcu_torture_ops rcu_bh_expedited_ops = { .completed = rcu_bh_torture_completed, .deferred_free = rcu_sync_torture_deferred_free, .sync = synchronize_rcu_bh_expedited, + .call = NULL, .cb_barrier = NULL, .fqs = rcu_bh_force_quiescent_state, .stats = NULL, @@ -637,6 +656,7 @@ static struct rcu_torture_ops srcu_ops = { .completed = srcu_torture_completed, .deferred_free = rcu_sync_torture_deferred_free, .sync = srcu_torture_synchronize, + .call = NULL, .cb_barrier = NULL, .stats = srcu_torture_stats, .name = "srcu" @@ -661,6 +681,7 @@ static struct rcu_torture_ops srcu_raw_ops = { .completed = srcu_torture_completed, .deferred_free = rcu_sync_torture_deferred_free, .sync = srcu_torture_synchronize, + .call = NULL, .cb_barrier = NULL, .stats = srcu_torture_stats, .name = "srcu_raw" @@ -680,6 +701,7 @@ static struct rcu_torture_ops srcu_expedited_ops = { .completed = srcu_torture_completed, .deferred_free = rcu_sync_torture_deferred_free, .sync = srcu_torture_synchronize_expedited, + .call = NULL, .cb_barrier = NULL, .stats = srcu_torture_stats, .name = "srcu_expedited" @@ -1129,7 +1151,8 @@ rcu_torture_printk(char *page) "rtc: %p ver: %lu tfle: %d rta: %d rtaf: %d rtf: %d " "rtmbe: %d rtbke: %ld rtbre: %ld " "rtbf: %ld rtb: %ld nt: %ld " - "onoff: %ld/%ld:%ld/%ld", + "onoff: %ld/%ld:%ld/%ld " + "barrier: %ld/%ld:%ld", rcu_torture_current, rcu_torture_current_version, list_empty(&rcu_torture_freelist), @@ -1145,14 +1168,17 @@ rcu_torture_printk(char *page) n_online_successes, n_online_attempts, n_offline_successes, - n_offline_attempts); + n_offline_attempts, + n_barrier_successes, + n_barrier_attempts, + n_rcu_torture_barrier_error); + cnt += sprintf(&page[cnt], "\n%s%s ", torture_type, TORTURE_FLAG); if (atomic_read(&n_rcu_torture_mberror) != 0 || + n_rcu_torture_barrier_error != 0 || n_rcu_torture_boost_ktrerror != 0 || n_rcu_torture_boost_rterror != 0 || - n_rcu_torture_boost_failure != 0) - cnt += sprintf(&page[cnt], " !!!"); - cnt += sprintf(&page[cnt], "\n%s%s ", torture_type, TORTURE_FLAG); - if (i > 1) { + n_rcu_torture_boost_failure != 0 || + i > 1) { cnt += sprintf(&page[cnt], "!!! "); atomic_inc(&n_rcu_torture_error); WARN_ON_ONCE(1); @@ -1560,6 +1586,151 @@ static void rcu_torture_stall_cleanup(void) stall_task = NULL; } +/* Callback function for RCU barrier testing. */ +void rcu_torture_barrier_cbf(struct rcu_head *rcu) +{ + atomic_inc(&barrier_cbs_invoked); +} + +/* kthread function to register callbacks used to test RCU barriers. */ +static int rcu_torture_barrier_cbs(void *arg) +{ + long myid = (long)arg; + struct rcu_head rcu; + + init_rcu_head_on_stack(&rcu); + VERBOSE_PRINTK_STRING("rcu_torture_barrier_cbs task started"); + set_user_nice(current, 19); + do { + wait_event(barrier_cbs_wq[myid], + atomic_read(&barrier_cbs_count) == n_barrier_cbs || + kthread_should_stop() || + fullstop != FULLSTOP_DONTSTOP); + if (kthread_should_stop() || fullstop != FULLSTOP_DONTSTOP) + break; + cur_ops->call(&rcu, rcu_torture_barrier_cbf); + if (atomic_dec_and_test(&barrier_cbs_count)) + wake_up(&barrier_wq); + } while (!kthread_should_stop() && fullstop == FULLSTOP_DONTSTOP); + VERBOSE_PRINTK_STRING("rcu_torture_barrier_cbs task stopping"); + rcutorture_shutdown_absorb("rcu_torture_barrier_cbs"); + while (!kthread_should_stop()) + schedule_timeout_interruptible(1); + cur_ops->cb_barrier(); + destroy_rcu_head_on_stack(&rcu); + return 0; +} + +/* kthread function to drive and coordinate RCU barrier testing. */ +static int rcu_torture_barrier(void *arg) +{ + int i; + + VERBOSE_PRINTK_STRING("rcu_torture_barrier task starting"); + do { + atomic_set(&barrier_cbs_invoked, 0); + atomic_set(&barrier_cbs_count, n_barrier_cbs); + /* wake_up() path contains the required barriers. */ + for (i = 0; i < n_barrier_cbs; i++) + wake_up(&barrier_cbs_wq[i]); + wait_event(barrier_wq, + atomic_read(&barrier_cbs_count) == 0 || + kthread_should_stop() || + fullstop != FULLSTOP_DONTSTOP); + if (kthread_should_stop() || fullstop != FULLSTOP_DONTSTOP) + break; + n_barrier_attempts++; + cur_ops->cb_barrier(); + if (atomic_read(&barrier_cbs_invoked) != n_barrier_cbs) { + n_rcu_torture_barrier_error++; + WARN_ON_ONCE(1); + } + n_barrier_successes++; + schedule_timeout_interruptible(HZ / 10); + } while (!kthread_should_stop() && fullstop == FULLSTOP_DONTSTOP); + VERBOSE_PRINTK_STRING("rcu_torture_barrier task stopping"); + rcutorture_shutdown_absorb("rcu_torture_barrier_cbs"); + while (!kthread_should_stop()) + schedule_timeout_interruptible(1); + return 0; +} + +/* Initialize RCU barrier testing. */ +static int rcu_torture_barrier_init(void) +{ + int i; + int ret; + + if (n_barrier_cbs == 0) + return 0; + if (cur_ops->call == NULL || cur_ops->cb_barrier == NULL) { + printk(KERN_ALERT "%s" TORTURE_FLAG + " Call or barrier ops missing for %s,\n", + torture_type, cur_ops->name); + printk(KERN_ALERT "%s" TORTURE_FLAG + " RCU barrier testing omitted from run.\n", + torture_type); + return 0; + } + atomic_set(&barrier_cbs_count, 0); + atomic_set(&barrier_cbs_invoked, 0); + barrier_cbs_tasks = + kzalloc(n_barrier_cbs * sizeof(barrier_cbs_tasks[0]), + GFP_KERNEL); + barrier_cbs_wq = + kzalloc(n_barrier_cbs * sizeof(barrier_cbs_wq[0]), + GFP_KERNEL); + if (barrier_cbs_tasks == NULL || barrier_cbs_wq == 0) + return -ENOMEM; + for (i = 0; i < n_barrier_cbs; i++) { + init_waitqueue_head(&barrier_cbs_wq[i]); + barrier_cbs_tasks[i] = kthread_run(rcu_torture_barrier_cbs, + (void *)i, + "rcu_torture_barrier_cbs"); + if (IS_ERR(barrier_cbs_tasks[i])) { + ret = PTR_ERR(barrier_cbs_tasks[i]); + VERBOSE_PRINTK_ERRSTRING("Failed to create rcu_torture_barrier_cbs"); + barrier_cbs_tasks[i] = NULL; + return ret; + } + } + barrier_task = kthread_run(rcu_torture_barrier, NULL, + "rcu_torture_barrier"); + if (IS_ERR(barrier_task)) { + ret = PTR_ERR(barrier_task); + VERBOSE_PRINTK_ERRSTRING("Failed to create rcu_torture_barrier"); + barrier_task = NULL; + } + return 0; +} + +/* Clean up after RCU barrier testing. */ +static void rcu_torture_barrier_cleanup(void) +{ + int i; + + if (barrier_task != NULL) { + VERBOSE_PRINTK_STRING("Stopping rcu_torture_barrier task"); + kthread_stop(barrier_task); + barrier_task = NULL; + } + if (barrier_cbs_tasks != NULL) { + for (i = 0; i < n_barrier_cbs; i++) { + if (barrier_cbs_tasks[i] != NULL) { + VERBOSE_PRINTK_STRING("Stopping rcu_torture_barrier_cbs task"); + kthread_stop(barrier_cbs_tasks[i]); + barrier_cbs_tasks[i] = NULL; + } + } + kfree(barrier_cbs_tasks); + barrier_cbs_tasks = NULL; + } + if (barrier_cbs_wq != NULL) { + kfree(barrier_cbs_wq); + barrier_cbs_wq = NULL; + } +} + static int rcutorture_cpu_notify(struct notifier_block *self, unsigned long action, void *hcpu) { @@ -1602,6 +1773,7 @@ rcu_torture_cleanup(void) fullstop = FULLSTOP_RMMOD; mutex_unlock(&fullstop_mutex); unregister_reboot_notifier(&rcutorture_shutdown_nb); + rcu_torture_barrier_cleanup(); rcu_torture_stall_cleanup(); if (stutter_task) { VERBOSE_PRINTK_STRING("Stopping rcu_torture_stutter task"); @@ -1681,7 +1853,7 @@ rcu_torture_cleanup(void) if (cur_ops->cleanup) cur_ops->cleanup(); - if (atomic_read(&n_rcu_torture_error)) + if (atomic_read(&n_rcu_torture_error) || n_rcu_torture_barrier_error) rcu_torture_print_module_parms(cur_ops, "End of test: FAILURE"); else if (n_online_successes != n_online_attempts || n_offline_successes != n_offline_attempts) @@ -1697,6 +1869,7 @@ rcu_torture_init(void) int i; int cpu; int firsterr = 0; + int retval; static struct rcu_torture_ops *torture_ops[] = { &rcu_ops, &rcu_sync_ops, &rcu_expedited_ops, &rcu_bh_ops, &rcu_bh_sync_ops, &rcu_bh_expedited_ops, @@ -1754,6 +1927,7 @@ rcu_torture_init(void) atomic_set(&n_rcu_torture_free, 0); atomic_set(&n_rcu_torture_mberror, 0); atomic_set(&n_rcu_torture_error, 0); + n_rcu_torture_barrier_error = 0; n_rcu_torture_boost_ktrerror = 0; n_rcu_torture_boost_rterror = 0; n_rcu_torture_boost_failure = 0; @@ -1877,7 +2051,6 @@ rcu_torture_init(void) test_boost_duration = 2; if ((test_boost == 1 && cur_ops->can_boost) || test_boost == 2) { - int retval; boost_starttime = jiffies + test_boost_interval * HZ; register_cpu_notifier(&rcutorture_cpu_nb); @@ -1913,6 +2086,11 @@ rcu_torture_init(void) firsterr = i; goto unwind; } + retval = rcu_torture_barrier_init(); + if (retval != 0) { + firsterr = retval; + goto unwind; + } rcutorture_record_test_transition(); mutex_unlock(&fullstop_mutex); return 0; -- cgit v1.2.3 From cef50120b61c2af4ce34bc165e19cad66296f93d Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Sun, 5 Feb 2012 07:42:44 -0800 Subject: rcu: Direct algorithmic SRCU implementation The current implementation of synchronize_srcu_expedited() can cause severe OS jitter due to its use of synchronize_sched(), which in turn invokes try_stop_cpus(), which causes each CPU to be sent an IPI. This can result in severe performance degradation for real-time workloads and especially for short-interation-length HPC workloads. Furthermore, because only one instance of try_stop_cpus() can be making forward progress at a given time, only one instance of synchronize_srcu_expedited() can make forward progress at a time, even if they are all operating on distinct srcu_struct structures. This commit, inspired by an earlier implementation by Peter Zijlstra (https://lkml.org/lkml/2012/1/31/211) and by further offline discussions, takes a strictly algorithmic bits-in-memory approach. This has the disadvantage of requiring one explicit memory-barrier instruction in each of srcu_read_lock() and srcu_read_unlock(), but on the other hand completely dispenses with OS jitter and furthermore allows SRCU to be used freely by CPUs that RCU believes to be idle or offline. The update-side implementation handles the single read-side memory barrier by rechecking the per-CPU counters after summing them and by running through the update-side state machine twice. This implementation has passed moderate rcutorture testing on both x86 and Power. Also updated to use this_cpu_ptr() instead of per_cpu_ptr(), as suggested by Peter Zijlstra. Reported-by: Peter Zijlstra Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney Acked-by: Peter Zijlstra Reviewed-by: Lai Jiangshan --- include/linux/srcu.h | 10 +- kernel/rcutorture.c | 2 +- kernel/srcu.c | 284 ++++++++++++++++++++++++++++++++++----------------- 3 files changed, 198 insertions(+), 98 deletions(-) diff --git a/include/linux/srcu.h b/include/linux/srcu.h index d3d5fa54f25e..a478c8eb8479 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -31,13 +31,19 @@ #include struct srcu_struct_array { - int c[2]; + unsigned long c[2]; }; +/* Bit definitions for field ->c above and ->snap below. */ +#define SRCU_USAGE_BITS 2 +#define SRCU_REF_MASK (ULONG_MAX >> SRCU_USAGE_BITS) +#define SRCU_USAGE_COUNT (SRCU_REF_MASK + 1) + struct srcu_struct { - int completed; + unsigned completed; struct srcu_struct_array __percpu *per_cpu_ref; struct mutex mutex; + unsigned long snap[NR_CPUS]; #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ diff --git a/kernel/rcutorture.c b/kernel/rcutorture.c index 8cd262b41499..d10b179dea83 100644 --- a/kernel/rcutorture.c +++ b/kernel/rcutorture.c @@ -639,7 +639,7 @@ static int srcu_torture_stats(char *page) cnt += sprintf(&page[cnt], "%s%s per-CPU(idx=%d):", torture_type, TORTURE_FLAG, idx); for_each_possible_cpu(cpu) { - cnt += sprintf(&page[cnt], " %d(%d,%d)", cpu, + cnt += sprintf(&page[cnt], " %d(%lu,%lu)", cpu, per_cpu_ptr(srcu_ctl.per_cpu_ref, cpu)->c[!idx], per_cpu_ptr(srcu_ctl.per_cpu_ref, cpu)->c[idx]); } diff --git a/kernel/srcu.c b/kernel/srcu.c index ba35f3a4a1f4..84c9b97dc3d9 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -73,19 +73,102 @@ EXPORT_SYMBOL_GPL(init_srcu_struct); #endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */ /* - * srcu_readers_active_idx -- returns approximate number of readers - * active on the specified rank of per-CPU counters. + * Returns approximate number of readers active on the specified rank + * of per-CPU counters. Also snapshots each counter's value in the + * corresponding element of sp->snap[] for later use validating + * the sum. */ +static unsigned long srcu_readers_active_idx(struct srcu_struct *sp, int idx) +{ + int cpu; + unsigned long sum = 0; + unsigned long t; -static int srcu_readers_active_idx(struct srcu_struct *sp, int idx) + for_each_possible_cpu(cpu) { + t = ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx]); + sum += t; + sp->snap[cpu] = t; + } + return sum & SRCU_REF_MASK; +} + +/* + * To be called from the update side after an index flip. Returns true + * if the modulo sum of the counters is stably zero, false if there is + * some possibility of non-zero. + */ +static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx) { int cpu; - int sum; - sum = 0; + /* + * Note that srcu_readers_active_idx() can incorrectly return + * zero even though there is a pre-existing reader throughout. + * To see this, suppose that task A is in a very long SRCU + * read-side critical section that started on CPU 0, and that + * no other reader exists, so that the modulo sum of the counters + * is equal to one. Then suppose that task B starts executing + * srcu_readers_active_idx(), summing up to CPU 1, and then that + * task C starts reading on CPU 0, so that its increment is not + * summed, but finishes reading on CPU 2, so that its decrement + * -is- summed. Then when task B completes its sum, it will + * incorrectly get zero, despite the fact that task A has been + * in its SRCU read-side critical section the whole time. + * + * We therefore do a validation step should srcu_readers_active_idx() + * return zero. + */ + if (srcu_readers_active_idx(sp, idx) != 0) + return false; + + /* + * Since the caller recently flipped ->completed, we can see at + * most one increment of each CPU's counter from this point + * forward. The reason for this is that the reader CPU must have + * fetched the index before srcu_readers_active_idx checked + * that CPU's counter, but not yet incremented its counter. + * Its eventual counter increment will follow the read in + * srcu_readers_active_idx(), and that increment is immediately + * followed by smp_mb() B. Because smp_mb() D is between + * the ->completed flip and srcu_readers_active_idx()'s read, + * that CPU's subsequent load of ->completed must see the new + * value, and therefore increment the counter in the other rank. + */ + smp_mb(); /* A */ + + /* + * Now, we check the ->snap array that srcu_readers_active_idx() + * filled in from the per-CPU counter values. Since both + * __srcu_read_lock() and __srcu_read_unlock() increment the + * upper bits of the per-CPU counter, an increment/decrement + * pair will change the value of the counter. Since there is + * only one possible increment, the only way to wrap the counter + * is to have a huge number of counter decrements, which requires + * a huge number of tasks and huge SRCU read-side critical-section + * nesting levels, even on 32-bit systems. + * + * All of the ways of confusing the readings require that the scan + * in srcu_readers_active_idx() see the read-side task's decrement, + * but not its increment. However, between that decrement and + * increment are smb_mb() B and C. Either or both of these pair + * with smp_mb() A above to ensure that the scan below will see + * the read-side tasks's increment, thus noting a difference in + * the counter values between the two passes. + * + * Therefore, if srcu_readers_active_idx() returned zero, and + * none of the counters changed, we know that the zero was the + * correct sum. + * + * Of course, it is possible that a task might be delayed + * for a very long time in __srcu_read_lock() after fetching + * the index but before incrementing its counter. This + * possibility will be dealt with in __synchronize_srcu(). + */ for_each_possible_cpu(cpu) - sum += per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx]; - return sum; + if (sp->snap[cpu] != + ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx])) + return false; /* False zero reading! */ + return true; } /** @@ -131,10 +214,11 @@ int __srcu_read_lock(struct srcu_struct *sp) int idx; preempt_disable(); - idx = sp->completed & 0x1; - barrier(); /* ensure compiler looks -once- at sp->completed. */ - per_cpu_ptr(sp->per_cpu_ref, smp_processor_id())->c[idx]++; - srcu_barrier(); /* ensure compiler won't misorder critical section. */ + idx = rcu_dereference_index_check(sp->completed, + rcu_read_lock_sched_held()) & 0x1; + ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) += + SRCU_USAGE_COUNT + 1; + smp_mb(); /* B */ /* Avoid leaking the critical section. */ preempt_enable(); return idx; } @@ -149,8 +233,9 @@ EXPORT_SYMBOL_GPL(__srcu_read_lock); void __srcu_read_unlock(struct srcu_struct *sp, int idx) { preempt_disable(); - srcu_barrier(); /* ensure compiler won't misorder critical section. */ - per_cpu_ptr(sp->per_cpu_ref, smp_processor_id())->c[idx]--; + smp_mb(); /* C */ /* Avoid leaking the critical section. */ + ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) += + SRCU_USAGE_COUNT - 1; preempt_enable(); } EXPORT_SYMBOL_GPL(__srcu_read_unlock); @@ -163,12 +248,65 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); * we repeatedly block for 1-millisecond time periods. This approach * has done well in testing, so there is no need for a config parameter. */ -#define SYNCHRONIZE_SRCU_READER_DELAY 10 +#define SYNCHRONIZE_SRCU_READER_DELAY 5 + +/* + * Flip the readers' index by incrementing ->completed, then wait + * until there are no more readers using the counters referenced by + * the old index value. (Recall that the index is the bottom bit + * of ->completed.) + * + * Of course, it is possible that a reader might be delayed for the + * full duration of flip_idx_and_wait() between fetching the + * index and incrementing its counter. This possibility is handled + * by __synchronize_srcu() invoking flip_idx_and_wait() twice. + */ +static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) +{ + int idx; + int trycount = 0; + + idx = sp->completed++ & 0x1; + + /* + * If a reader fetches the index before the above increment, + * but increments its counter after srcu_readers_active_idx_check() + * sums it, then smp_mb() D will pair with __srcu_read_lock()'s + * smp_mb() B to ensure that the SRCU read-side critical section + * will see any updates that the current task performed before its + * call to synchronize_srcu(), or to synchronize_srcu_expedited(), + * as the case may be. + */ + smp_mb(); /* D */ + + /* + * SRCU read-side critical sections are normally short, so wait + * a small amount of time before possibly blocking. + */ + if (!srcu_readers_active_idx_check(sp, idx)) { + udelay(SYNCHRONIZE_SRCU_READER_DELAY); + while (!srcu_readers_active_idx_check(sp, idx)) { + if (expedited && ++ trycount < 10) + udelay(SYNCHRONIZE_SRCU_READER_DELAY); + else + schedule_timeout_interruptible(1); + } + } + + /* + * The following smp_mb() E pairs with srcu_read_unlock()'s + * smp_mb C to ensure that if srcu_readers_active_idx_check() + * sees srcu_read_unlock()'s counter decrement, then any + * of the current task's subsequent code will happen after + * that SRCU read-side critical section. + */ + smp_mb(); /* E */ +} /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ -static void __synchronize_srcu(struct srcu_struct *sp, void (*sync_func)(void)) +static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) { int idx; @@ -178,90 +316,53 @@ static void __synchronize_srcu(struct srcu_struct *sp, void (*sync_func)(void)) !lock_is_held(&rcu_sched_lock_map), "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section"); - idx = sp->completed; + smp_mb(); /* Ensure prior action happens before grace period. */ + idx = ACCESS_ONCE(sp->completed); + smp_mb(); /* Access to ->completed before lock acquisition. */ mutex_lock(&sp->mutex); /* * Check to see if someone else did the work for us while we were - * waiting to acquire the lock. We need -two- advances of + * waiting to acquire the lock. We need -three- advances of * the counter, not just one. If there was but one, we might have * shown up -after- our helper's first synchronize_sched(), thus * having failed to prevent CPU-reordering races with concurrent - * srcu_read_unlock()s on other CPUs (see comment below). So we - * either (1) wait for two or (2) supply the second ourselves. + * srcu_read_unlock()s on other CPUs (see comment below). If there + * was only two, we are guaranteed to have waited through only one + * full index-flip phase. So we either (1) wait for three or + * (2) supply the additional ones we need. */ - if ((sp->completed - idx) >= 2) { + if (sp->completed == idx + 2) + idx = 1; + else if (sp->completed == idx + 3) { mutex_unlock(&sp->mutex); return; - } - - sync_func(); /* Force memory barrier on all CPUs. */ + } else + idx = 0; /* - * The preceding synchronize_sched() ensures that any CPU that - * sees the new value of sp->completed will also see any preceding - * changes to data structures made by this CPU. This prevents - * some other CPU from reordering the accesses in its SRCU - * read-side critical section to precede the corresponding - * srcu_read_lock() -- ensuring that such references will in - * fact be protected. + * If there were no helpers, then we need to do two flips of + * the index. The first flip is required if there are any + * outstanding SRCU readers even if there are no new readers + * running concurrently with the first counter flip. * - * So it is now safe to do the flip. - */ - - idx = sp->completed & 0x1; - sp->completed++; - - sync_func(); /* Force memory barrier on all CPUs. */ - - /* - * At this point, because of the preceding synchronize_sched(), - * all srcu_read_lock() calls using the old counters have completed. - * Their corresponding critical sections might well be still - * executing, but the srcu_read_lock() primitives themselves - * will have finished executing. We initially give readers - * an arbitrarily chosen 10 microseconds to get out of their - * SRCU read-side critical sections, then loop waiting 1/HZ - * seconds per iteration. The 10-microsecond value has done - * very well in testing. - */ - - if (srcu_readers_active_idx(sp, idx)) - udelay(SYNCHRONIZE_SRCU_READER_DELAY); - while (srcu_readers_active_idx(sp, idx)) - schedule_timeout_interruptible(1); - - sync_func(); /* Force memory barrier on all CPUs. */ - - /* - * The preceding synchronize_sched() forces all srcu_read_unlock() - * primitives that were executing concurrently with the preceding - * for_each_possible_cpu() loop to have completed by this point. - * More importantly, it also forces the corresponding SRCU read-side - * critical sections to have also completed, and the corresponding - * references to SRCU-protected data items to be dropped. + * The second flip is required when a new reader picks up + * the old value of the index, but does not increment its + * counter until after its counters is summed/rechecked by + * srcu_readers_active_idx_check(). In this case, the current SRCU + * grace period would be OK because the SRCU read-side critical + * section started after this SRCU grace period started, so the + * grace period is not required to wait for the reader. * - * Note: - * - * Despite what you might think at first glance, the - * preceding synchronize_sched() -must- be within the - * critical section ended by the following mutex_unlock(). - * Otherwise, a task taking the early exit can race - * with a srcu_read_unlock(), which might have executed - * just before the preceding srcu_readers_active() check, - * and whose CPU might have reordered the srcu_read_unlock() - * with the preceding critical section. In this case, there - * is nothing preventing the synchronize_sched() task that is - * taking the early exit from freeing a data structure that - * is still being referenced (out of order) by the task - * doing the srcu_read_unlock(). - * - * Alternatively, the comparison with "2" on the early exit - * could be changed to "3", but this increases synchronize_srcu() - * latency for bulk loads. So the current code is preferred. + * However, the next SRCU grace period would be waiting for the + * other set of counters to go to zero, and therefore would not + * wait for the reader, which would be very bad. To avoid this + * bad scenario, we flip and wait twice, clearing out both sets + * of counters. */ - + for (; idx < 2; idx++) + flip_idx_and_wait(sp, expedited); mutex_unlock(&sp->mutex); } @@ -281,7 +382,7 @@ static void __synchronize_srcu(struct srcu_struct *sp, void (*sync_func)(void)) */ void synchronize_srcu(struct srcu_struct *sp) { - __synchronize_srcu(sp, synchronize_sched); + __synchronize_srcu(sp, 0); } EXPORT_SYMBOL_GPL(synchronize_srcu); @@ -289,18 +390,11 @@ EXPORT_SYMBOL_GPL(synchronize_srcu); * synchronize_srcu_expedited - Brute-force SRCU grace period * @sp: srcu_struct with which to synchronize. * - * Wait for an SRCU grace period to elapse, but use a "big hammer" - * approach to force the grace period to end quickly. This consumes - * significant time on all CPUs and is unfriendly to real-time workloads, - * so is thus not recommended for any sort of common-case code. In fact, - * if you are using synchronize_srcu_expedited() in a loop, please - * restructure your code to batch your updates, and then use a single - * synchronize_srcu() instead. + * Wait for an SRCU grace period to elapse, but be more aggressive about + * spinning rather than blocking when waiting. * * Note that it is illegal to call this function while holding any lock - * that is acquired by a CPU-hotplug notifier. And yes, it is also illegal - * to call this function from a CPU-hotplug notifier. Failing to observe - * these restriction will result in deadlock. It is also illegal to call + * that is acquired by a CPU-hotplug notifier. It is also illegal to call * synchronize_srcu_expedited() from the corresponding SRCU read-side * critical section; doing so will result in deadlock. However, it is * perfectly legal to call synchronize_srcu_expedited() on one srcu_struct @@ -309,7 +403,7 @@ EXPORT_SYMBOL_GPL(synchronize_srcu); */ void synchronize_srcu_expedited(struct srcu_struct *sp) { - __synchronize_srcu(sp, synchronize_sched_expedited); + __synchronize_srcu(sp, 1); } EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); -- cgit v1.2.3 From 4b7a3e9e32114a09c61995048f055615b5d4c26d Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Wed, 22 Feb 2012 13:06:51 -0800 Subject: rcu: Remove fast check path from __synchronize_srcu() The fastpath in __synchronize_srcu() is designed to handle cases where there are a large number of concurrent calls for the same srcu_struct structure. However, the Linux kernel currently does not use SRCU in this manner, so remove the fastpath checks for simplicity. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- kernel/srcu.c | 25 +------------------------ 1 file changed, 1 insertion(+), 24 deletions(-) diff --git a/kernel/srcu.c b/kernel/srcu.c index 84c9b97dc3d9..17e95bcc901c 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -308,7 +308,7 @@ static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) */ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) { - int idx; + int idx = 0; rcu_lockdep_assert(!lock_is_held(&sp->dep_map) && !lock_is_held(&rcu_bh_lock_map) && @@ -316,31 +316,8 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) !lock_is_held(&rcu_sched_lock_map), "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section"); - smp_mb(); /* Ensure prior action happens before grace period. */ - idx = ACCESS_ONCE(sp->completed); - smp_mb(); /* Access to ->completed before lock acquisition. */ mutex_lock(&sp->mutex); - /* - * Check to see if someone else did the work for us while we were - * waiting to acquire the lock. We need -three- advances of - * the counter, not just one. If there was but one, we might have - * shown up -after- our helper's first synchronize_sched(), thus - * having failed to prevent CPU-reordering races with concurrent - * srcu_read_unlock()s on other CPUs (see comment below). If there - * was only two, we are guaranteed to have waited through only one - * full index-flip phase. So we either (1) wait for three or - * (2) supply the additional ones we need. - */ - - if (sp->completed == idx + 2) - idx = 1; - else if (sp->completed == idx + 3) { - mutex_unlock(&sp->mutex); - return; - } else - idx = 0; - /* * If there were no helpers, then we need to do two flips of * the index. The first flip is required if there are any -- cgit v1.2.3 From 440253c17fc4ed41d778492a7fb44dc0d756eccc Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Wed, 22 Feb 2012 13:29:06 -0800 Subject: rcu: Increment upper bit only for srcu_read_lock() The purpose of the upper bit of SRCU's per-CPU counters is to guarantee that no reasonable series of srcu_read_lock() and srcu_read_unlock() operations can return the value of the counter to its original value. This guarantee is require only after the index has been switched to the other set of counters, so at most one srcu_read_lock() can affect a given CPU's counter. The number of srcu_read_unlock() operations on a given counter is limited to the number of tasks in the system, which given the Linux kernel's current structure is limited to far less than 2^30 on 32-bit systems and far less than 2^62 on 64-bit systems. (Something about a limited number of bytes in the kernel's address space.) Therefore, if srcu_read_lock() increments the upper bits, then srcu_read_unlock() need not do so. In this case, an srcu_read_lock() and an srcu_read_unlock() will flip the lower bit of the upper field of the counter. An unreasonably large additional number of srcu_read_unlock() operations would be required to return the counter to its initial value, thus preserving the guarantee. This commit takes this approach, which further allows it to shrink the size of the upper field to one bit, making the number of srcu_read_unlock() operations required to return the counter to its initial value even more unreasonable than before. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 2 +- kernel/srcu.c | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/include/linux/srcu.h b/include/linux/srcu.h index a478c8eb8479..5b49d41868c8 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -35,7 +35,7 @@ struct srcu_struct_array { }; /* Bit definitions for field ->c above and ->snap below. */ -#define SRCU_USAGE_BITS 2 +#define SRCU_USAGE_BITS 1 #define SRCU_REF_MASK (ULONG_MAX >> SRCU_USAGE_BITS) #define SRCU_USAGE_COUNT (SRCU_REF_MASK + 1) diff --git a/kernel/srcu.c b/kernel/srcu.c index 17e95bcc901c..43f1d61e513e 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -138,14 +138,14 @@ static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx) /* * Now, we check the ->snap array that srcu_readers_active_idx() - * filled in from the per-CPU counter values. Since both - * __srcu_read_lock() and __srcu_read_unlock() increment the - * upper bits of the per-CPU counter, an increment/decrement - * pair will change the value of the counter. Since there is - * only one possible increment, the only way to wrap the counter - * is to have a huge number of counter decrements, which requires - * a huge number of tasks and huge SRCU read-side critical-section - * nesting levels, even on 32-bit systems. + * filled in from the per-CPU counter values. Since + * __srcu_read_lock() increments the upper bits of the per-CPU + * counter, an increment/decrement pair will change the value + * of the counter. Since there is only one possible increment, + * the only way to wrap the counter is to have a huge number of + * counter decrements, which requires a huge number of tasks and + * huge SRCU read-side critical-section nesting levels, even on + * 32-bit systems. * * All of the ways of confusing the readings require that the scan * in srcu_readers_active_idx() see the read-side task's decrement, @@ -234,8 +234,7 @@ void __srcu_read_unlock(struct srcu_struct *sp, int idx) { preempt_disable(); smp_mb(); /* C */ /* Avoid leaking the critical section. */ - ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) += - SRCU_USAGE_COUNT - 1; + ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) -= 1; preempt_enable(); } EXPORT_SYMBOL_GPL(__srcu_read_unlock); -- cgit v1.2.3 From 944ce9af4767ca085d465e4add69df11a8faa9ef Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Wed, 22 Feb 2012 16:43:55 -0800 Subject: rcu: Flip ->completed only once per SRCU grace period This is an optimization of the SRCU grace period. To guard against preempted readers with old values of the counter, it suffices to scan the old counters once more, then flip ->completed only one time. The reason this works is that the old readers must have incremented the old set of counters (if they have not yet incremented, then their critical section starts after this grace period, so they may be safely ignored). This commit therefore optimizes the second flip out in favor of a simple rescan. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- kernel/srcu.c | 92 ++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 56 insertions(+), 36 deletions(-) diff --git a/kernel/srcu.c b/kernel/srcu.c index 43f1d61e513e..b6b9ea2eb51c 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -249,26 +249,12 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); */ #define SYNCHRONIZE_SRCU_READER_DELAY 5 -/* - * Flip the readers' index by incrementing ->completed, then wait - * until there are no more readers using the counters referenced by - * the old index value. (Recall that the index is the bottom bit - * of ->completed.) - * - * Of course, it is possible that a reader might be delayed for the - * full duration of flip_idx_and_wait() between fetching the - * index and incrementing its counter. This possibility is handled - * by __synchronize_srcu() invoking flip_idx_and_wait() twice. - */ -static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) +static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) { - int idx; int trycount = 0; - idx = sp->completed++ & 0x1; - /* - * If a reader fetches the index before the above increment, + * If a reader fetches the index before the ->completed increment, * but increments its counter after srcu_readers_active_idx_check() * sums it, then smp_mb() D will pair with __srcu_read_lock()'s * smp_mb() B to ensure that the SRCU read-side critical section @@ -298,17 +284,38 @@ static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) * sees srcu_read_unlock()'s counter decrement, then any * of the current task's subsequent code will happen after * that SRCU read-side critical section. + * + * It also ensures the order between the above waiting and + * the next flipping. */ smp_mb(); /* E */ } +/* + * Flip the readers' index by incrementing ->completed, then wait + * until there are no more readers using the counters referenced by + * the old index value. (Recall that the index is the bottom bit + * of ->completed.) + * + * Of course, it is possible that a reader might be delayed for the + * full duration of flip_idx_and_wait() between fetching the + * index and incrementing its counter. This possibility is handled + * by the next __synchronize_srcu() invoking wait_idx() for such readers + * before starting a new grace period. + */ +static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) +{ + int idx; + + idx = sp->completed++ & 0x1; + wait_idx(sp, idx, expedited); +} + /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) { - int idx = 0; - rcu_lockdep_assert(!lock_is_held(&sp->dep_map) && !lock_is_held(&rcu_bh_lock_map) && !lock_is_held(&rcu_lock_map) && @@ -318,27 +325,40 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) mutex_lock(&sp->mutex); /* - * If there were no helpers, then we need to do two flips of - * the index. The first flip is required if there are any - * outstanding SRCU readers even if there are no new readers - * running concurrently with the first counter flip. + * Suppose that during the previous grace period, a reader + * picked up the old value of the index, but did not increment + * its counter until after the previous instance of + * __synchronize_srcu() did the counter summation and recheck. + * That previous grace period was OK because the reader did + * not start until after the grace period started, so the grace + * period was not obligated to wait for that reader. + * + * However, the current SRCU grace period does have to wait for + * that reader. This is handled by invoking wait_idx() on the + * non-active set of counters (hence sp->completed - 1). Once + * wait_idx() returns, we know that all readers that picked up + * the old value of ->completed and that already incremented their + * counter will have completed. * - * The second flip is required when a new reader picks up - * the old value of the index, but does not increment its - * counter until after its counters is summed/rechecked by - * srcu_readers_active_idx_check(). In this case, the current SRCU - * grace period would be OK because the SRCU read-side critical - * section started after this SRCU grace period started, so the - * grace period is not required to wait for the reader. + * But what about readers that picked up the old value of + * ->completed, but -still- have not managed to increment their + * counter? We do not need to wait for those readers, because + * they will have started their SRCU read-side critical section + * after the current grace period starts. * - * However, the next SRCU grace period would be waiting for the - * other set of counters to go to zero, and therefore would not - * wait for the reader, which would be very bad. To avoid this - * bad scenario, we flip and wait twice, clearing out both sets - * of counters. + * Because it is unlikely that readers will be preempted between + * fetching ->completed and incrementing their counter, wait_idx() + * will normally not need to wait. */ - for (; idx < 2; idx++) - flip_idx_and_wait(sp, expedited); + wait_idx(sp, (sp->completed - 1) & 0x1, expedited); + + /* + * Now that wait_idx() has waited for the really old readers, + * invoke flip_idx_and_wait() to flip the counter and wait + * for current SRCU readers. + */ + flip_idx_and_wait(sp, expedited); + mutex_unlock(&sp->mutex); } -- cgit v1.2.3 From 18108ebfebe9e871d0a9af830baf8f5df69eb5fc Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Mon, 27 Feb 2012 09:28:10 -0800 Subject: rcu: Improve SRCU's wait_idx() comments The safety of SRCU is provided byy wait_idx() rather than flipping. The flipping actually prevents starvation. This commit therefore updates the comments to more accurately and precisely describe what is going on. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- kernel/srcu.c | 77 ++++++++++++++++++++++++++++------------------------------- 1 file changed, 37 insertions(+), 40 deletions(-) diff --git a/kernel/srcu.c b/kernel/srcu.c index b6b9ea2eb51c..1fecb4d858ed 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -249,6 +249,10 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); */ #define SYNCHRONIZE_SRCU_READER_DELAY 5 +/* + * Wait until all pre-existing readers complete. Such readers + * will have used the index specified by "idx". + */ static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) { int trycount = 0; @@ -291,24 +295,9 @@ static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) smp_mb(); /* E */ } -/* - * Flip the readers' index by incrementing ->completed, then wait - * until there are no more readers using the counters referenced by - * the old index value. (Recall that the index is the bottom bit - * of ->completed.) - * - * Of course, it is possible that a reader might be delayed for the - * full duration of flip_idx_and_wait() between fetching the - * index and incrementing its counter. This possibility is handled - * by the next __synchronize_srcu() invoking wait_idx() for such readers - * before starting a new grace period. - */ -static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) +static void srcu_flip(struct srcu_struct *sp) { - int idx; - - idx = sp->completed++ & 0x1; - wait_idx(sp, idx, expedited); + sp->completed++; } /* @@ -316,6 +305,8 @@ static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) */ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) { + int busy_idx; + rcu_lockdep_assert(!lock_is_held(&sp->dep_map) && !lock_is_held(&rcu_bh_lock_map) && !lock_is_held(&rcu_lock_map) && @@ -323,8 +314,28 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section"); mutex_lock(&sp->mutex); + busy_idx = sp->completed & 0X1UL; /* + * If we recently flipped the index, there will be some readers + * using idx=0 and others using idx=1. Therefore, two calls to + * wait_idx()s suffice to ensure that all pre-existing readers + * have completed: + * + * __synchronize_srcu() { + * wait_idx(sp, 0, expedited); + * wait_idx(sp, 1, expedited); + * } + * + * Starvation is prevented by the fact that we flip the index. + * While we wait on one index to clear out, almost all new readers + * will be using the other index. The number of new readers using the + * index we are waiting on is sharply bounded by roughly the number + * of CPUs. + * + * How can new readers possibly using the old pre-flip value of + * the index? Consider the following sequence of events: + * * Suppose that during the previous grace period, a reader * picked up the old value of the index, but did not increment * its counter until after the previous instance of @@ -333,31 +344,17 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) * not start until after the grace period started, so the grace * period was not obligated to wait for that reader. * - * However, the current SRCU grace period does have to wait for - * that reader. This is handled by invoking wait_idx() on the - * non-active set of counters (hence sp->completed - 1). Once - * wait_idx() returns, we know that all readers that picked up - * the old value of ->completed and that already incremented their - * counter will have completed. - * - * But what about readers that picked up the old value of - * ->completed, but -still- have not managed to increment their - * counter? We do not need to wait for those readers, because - * they will have started their SRCU read-side critical section - * after the current grace period starts. - * - * Because it is unlikely that readers will be preempted between - * fetching ->completed and incrementing their counter, wait_idx() - * will normally not need to wait. + * However, this sequence of events is quite improbable, so + * this call to wait_idx(), which waits on really old readers + * describe in this comment above, will almost never need to wait. */ - wait_idx(sp, (sp->completed - 1) & 0x1, expedited); + wait_idx(sp, 1 - busy_idx, expedited); - /* - * Now that wait_idx() has waited for the really old readers, - * invoke flip_idx_and_wait() to flip the counter and wait - * for current SRCU readers. - */ - flip_idx_and_wait(sp, expedited); + /* Flip the index to avoid reader-induced starvation. */ + srcu_flip(sp); + + /* Wait for recent pre-existing readers. */ + wait_idx(sp, busy_idx, expedited); mutex_unlock(&sp->mutex); } -- cgit v1.2.3 From b52ce066c55a6a53cf1f8d71308d74f908e31b99 Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Mon, 27 Feb 2012 09:29:09 -0800 Subject: rcu: Implement a variant of Peter's SRCU algorithm This commit implements a variant of Peter's algorithm, which may be found at https://lkml.org/lkml/2012/2/1/119. o Make the checking lock-free to enable parallel checking. Parallel checking is required when (1) the original checking task is preempted for a long time, (2) sychronize_srcu_expedited() starts during an ongoing SRCU grace period, or (3) we wish to avoid acquiring a lock. o Since the checking is lock-free, we avoid a mutex in state machine for call_srcu(). o Remove the SRCU_REF_MASK and remove the coupling with the flipping. This might allow us to remove the preempt_disable() in future versions, though such removal will need great care because it rescinds the one-old-reader-per-CPU guarantee. o Remove a smp_mb(), simplify the comments and make the smp_mb() pairs more intuitive. Inspired-by: Peter Zijlstra Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 7 +-- kernel/srcu.c | 149 ++++++++++++++++++++++++--------------------------- 2 files changed, 70 insertions(+), 86 deletions(-) diff --git a/include/linux/srcu.h b/include/linux/srcu.h index 5b49d41868c8..15354db3e865 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -32,18 +32,13 @@ struct srcu_struct_array { unsigned long c[2]; + unsigned long seq[2]; }; -/* Bit definitions for field ->c above and ->snap below. */ -#define SRCU_USAGE_BITS 1 -#define SRCU_REF_MASK (ULONG_MAX >> SRCU_USAGE_BITS) -#define SRCU_USAGE_COUNT (SRCU_REF_MASK + 1) - struct srcu_struct { unsigned completed; struct srcu_struct_array __percpu *per_cpu_ref; struct mutex mutex; - unsigned long snap[NR_CPUS]; #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ diff --git a/kernel/srcu.c b/kernel/srcu.c index 1fecb4d858ed..e0139a274856 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -72,11 +72,26 @@ EXPORT_SYMBOL_GPL(init_srcu_struct); #endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */ +/* + * Returns approximate total of the readers' ->seq[] values for the + * rank of per-CPU counters specified by idx. + */ +static unsigned long srcu_readers_seq_idx(struct srcu_struct *sp, int idx) +{ + int cpu; + unsigned long sum = 0; + unsigned long t; + + for_each_possible_cpu(cpu) { + t = ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->seq[idx]); + sum += t; + } + return sum; +} + /* * Returns approximate number of readers active on the specified rank - * of per-CPU counters. Also snapshots each counter's value in the - * corresponding element of sp->snap[] for later use validating - * the sum. + * of the per-CPU ->c[] counters. */ static unsigned long srcu_readers_active_idx(struct srcu_struct *sp, int idx) { @@ -87,26 +102,45 @@ static unsigned long srcu_readers_active_idx(struct srcu_struct *sp, int idx) for_each_possible_cpu(cpu) { t = ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx]); sum += t; - sp->snap[cpu] = t; } - return sum & SRCU_REF_MASK; + return sum; } /* - * To be called from the update side after an index flip. Returns true - * if the modulo sum of the counters is stably zero, false if there is - * some possibility of non-zero. + * Return true if the number of pre-existing readers is determined to + * be stably zero. An example unstable zero can occur if the call + * to srcu_readers_active_idx() misses an __srcu_read_lock() increment, + * but due to task migration, sees the corresponding __srcu_read_unlock() + * decrement. This can happen because srcu_readers_active_idx() takes + * time to sum the array, and might in fact be interrupted or preempted + * partway through the summation. */ static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx) { - int cpu; + unsigned long seq; + + seq = srcu_readers_seq_idx(sp, idx); + + /* + * The following smp_mb() A pairs with the smp_mb() B located in + * __srcu_read_lock(). This pairing ensures that if an + * __srcu_read_lock() increments its counter after the summation + * in srcu_readers_active_idx(), then the corresponding SRCU read-side + * critical section will see any changes made prior to the start + * of the current SRCU grace period. + * + * Also, if the above call to srcu_readers_seq_idx() saw the + * increment of ->seq[], then the call to srcu_readers_active_idx() + * must see the increment of ->c[]. + */ + smp_mb(); /* A */ /* * Note that srcu_readers_active_idx() can incorrectly return * zero even though there is a pre-existing reader throughout. * To see this, suppose that task A is in a very long SRCU * read-side critical section that started on CPU 0, and that - * no other reader exists, so that the modulo sum of the counters + * no other reader exists, so that the sum of the counters * is equal to one. Then suppose that task B starts executing * srcu_readers_active_idx(), summing up to CPU 1, and then that * task C starts reading on CPU 0, so that its increment is not @@ -122,53 +156,31 @@ static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx) return false; /* - * Since the caller recently flipped ->completed, we can see at - * most one increment of each CPU's counter from this point - * forward. The reason for this is that the reader CPU must have - * fetched the index before srcu_readers_active_idx checked - * that CPU's counter, but not yet incremented its counter. - * Its eventual counter increment will follow the read in - * srcu_readers_active_idx(), and that increment is immediately - * followed by smp_mb() B. Because smp_mb() D is between - * the ->completed flip and srcu_readers_active_idx()'s read, - * that CPU's subsequent load of ->completed must see the new - * value, and therefore increment the counter in the other rank. - */ - smp_mb(); /* A */ - - /* - * Now, we check the ->snap array that srcu_readers_active_idx() - * filled in from the per-CPU counter values. Since - * __srcu_read_lock() increments the upper bits of the per-CPU - * counter, an increment/decrement pair will change the value - * of the counter. Since there is only one possible increment, - * the only way to wrap the counter is to have a huge number of - * counter decrements, which requires a huge number of tasks and - * huge SRCU read-side critical-section nesting levels, even on - * 32-bit systems. + * The remainder of this function is the validation step. + * The following smp_mb() D pairs with the smp_mb() C in + * __srcu_read_unlock(). If the __srcu_read_unlock() was seen + * by srcu_readers_active_idx() above, then any destructive + * operation performed after the grace period will happen after + * the corresponding SRCU read-side critical section. * - * All of the ways of confusing the readings require that the scan - * in srcu_readers_active_idx() see the read-side task's decrement, - * but not its increment. However, between that decrement and - * increment are smb_mb() B and C. Either or both of these pair - * with smp_mb() A above to ensure that the scan below will see - * the read-side tasks's increment, thus noting a difference in - * the counter values between the two passes. - * - * Therefore, if srcu_readers_active_idx() returned zero, and - * none of the counters changed, we know that the zero was the - * correct sum. - * - * Of course, it is possible that a task might be delayed - * for a very long time in __srcu_read_lock() after fetching - * the index but before incrementing its counter. This - * possibility will be dealt with in __synchronize_srcu(). + * Note that there can be at most NR_CPUS worth of readers using + * the old index, which is not enough to overflow even a 32-bit + * integer. (Yes, this does mean that systems having more than + * a billion or so CPUs need to be 64-bit systems.) Therefore, + * the sum of the ->seq[] counters cannot possibly overflow. + * Therefore, the only way that the return values of the two + * calls to srcu_readers_seq_idx() can be equal is if there were + * no increments of the corresponding rank of ->seq[] counts + * in the interim. But the missed-increment scenario laid out + * above includes an increment of the ->seq[] counter by + * the corresponding __srcu_read_lock(). Therefore, if this + * scenario occurs, the return values from the two calls to + * srcu_readers_seq_idx() will differ, and thus the validation + * step below suffices. */ - for_each_possible_cpu(cpu) - if (sp->snap[cpu] != - ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx])) - return false; /* False zero reading! */ - return true; + smp_mb(); /* D */ + + return srcu_readers_seq_idx(sp, idx) == seq; } /** @@ -216,9 +228,9 @@ int __srcu_read_lock(struct srcu_struct *sp) preempt_disable(); idx = rcu_dereference_index_check(sp->completed, rcu_read_lock_sched_held()) & 0x1; - ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) += - SRCU_USAGE_COUNT + 1; + ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) += 1; smp_mb(); /* B */ /* Avoid leaking the critical section. */ + ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->seq[idx]) += 1; preempt_enable(); return idx; } @@ -257,17 +269,6 @@ static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) { int trycount = 0; - /* - * If a reader fetches the index before the ->completed increment, - * but increments its counter after srcu_readers_active_idx_check() - * sums it, then smp_mb() D will pair with __srcu_read_lock()'s - * smp_mb() B to ensure that the SRCU read-side critical section - * will see any updates that the current task performed before its - * call to synchronize_srcu(), or to synchronize_srcu_expedited(), - * as the case may be. - */ - smp_mb(); /* D */ - /* * SRCU read-side critical sections are normally short, so wait * a small amount of time before possibly blocking. @@ -281,18 +282,6 @@ static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) schedule_timeout_interruptible(1); } } - - /* - * The following smp_mb() E pairs with srcu_read_unlock()'s - * smp_mb C to ensure that if srcu_readers_active_idx_check() - * sees srcu_read_unlock()'s counter decrement, then any - * of the current task's subsequent code will happen after - * that SRCU read-side critical section. - * - * It also ensures the order between the above waiting and - * the next flipping. - */ - smp_mb(); /* E */ } static void srcu_flip(struct srcu_struct *sp) -- cgit v1.2.3 From 966f58c2f6df826f385706673a9bb1edcfd3499a Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Tue, 6 Mar 2012 17:57:33 +0800 Subject: rcu: Remove unused srcu_barrier() The old srcu_barrier() macro is now unused. This commit removes it so that it may be used for the SRCU flavor of rcu_barrier(), which will in turn be needed to allow the upcoming call_srcu() to be used from within modules. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 6 ------ 1 file changed, 6 deletions(-) diff --git a/include/linux/srcu.h b/include/linux/srcu.h index 15354db3e865..e5ce80452b62 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -44,12 +44,6 @@ struct srcu_struct { #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ }; -#ifndef CONFIG_PREEMPT -#define srcu_barrier() barrier() -#else /* #ifndef CONFIG_PREEMPT */ -#define srcu_barrier() -#endif /* #else #ifndef CONFIG_PREEMPT */ - #ifdef CONFIG_DEBUG_LOCK_ALLOC int __init_srcu_struct(struct srcu_struct *sp, const char *name, -- cgit v1.2.3 From dc87917501e324701dbfb249def44054b5220187 Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Tue, 6 Mar 2012 17:57:34 +0800 Subject: rcu: Improve srcu_readers_active_idx()'s cache locality Expand the calls to srcu_readers_active_idx() from srcu_readers_active() inline. This change improves cache locality by interating over the CPUs once rather than twice. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- kernel/srcu.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/kernel/srcu.c b/kernel/srcu.c index e0139a274856..a43211c92863 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -193,7 +193,14 @@ static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx) */ static int srcu_readers_active(struct srcu_struct *sp) { - return srcu_readers_active_idx(sp, 0) + srcu_readers_active_idx(sp, 1); + int cpu; + unsigned long sum = 0; + + for_each_possible_cpu(cpu) { + sum += ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[0]); + sum += ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[1]); + } + return sum; } /** -- cgit v1.2.3 From d9792edd7a9a0858a3b1df92cf8beb31e4191e3c Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Mon, 19 Mar 2012 16:12:12 +0800 Subject: rcu: Use single value to handle expedited SRCU grace periods The earlier algorithm used an "expedited" flag combined with a "trycount" counter to differentiate between normal and expedited SRCU grace periods. However, the difference can be encoded into a single counter with a cutoff value and different initial values for expedited and normal SRCU grace periods. This commit makes that change. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney Conflicts: kernel/srcu.c --- kernel/srcu.c | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/kernel/srcu.c b/kernel/srcu.c index a43211c92863..b9088524935a 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -266,16 +266,16 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); * we repeatedly block for 1-millisecond time periods. This approach * has done well in testing, so there is no need for a config parameter. */ -#define SYNCHRONIZE_SRCU_READER_DELAY 5 +#define SYNCHRONIZE_SRCU_READER_DELAY 5 +#define SYNCHRONIZE_SRCU_TRYCOUNT 2 +#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT 12 /* * Wait until all pre-existing readers complete. Such readers * will have used the index specified by "idx". */ -static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) +static void wait_idx(struct srcu_struct *sp, int idx, int trycount) { - int trycount = 0; - /* * SRCU read-side critical sections are normally short, so wait * a small amount of time before possibly blocking. @@ -283,9 +283,10 @@ static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) if (!srcu_readers_active_idx_check(sp, idx)) { udelay(SYNCHRONIZE_SRCU_READER_DELAY); while (!srcu_readers_active_idx_check(sp, idx)) { - if (expedited && ++ trycount < 10) + if (trycount > 0) { + trycount--; udelay(SYNCHRONIZE_SRCU_READER_DELAY); - else + } else schedule_timeout_interruptible(1); } } @@ -299,7 +300,7 @@ static void srcu_flip(struct srcu_struct *sp) /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ -static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) +static void __synchronize_srcu(struct srcu_struct *sp, int trycount) { int busy_idx; @@ -319,8 +320,8 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) * have completed: * * __synchronize_srcu() { - * wait_idx(sp, 0, expedited); - * wait_idx(sp, 1, expedited); + * wait_idx(sp, 0, trycount); + * wait_idx(sp, 1, trycount); * } * * Starvation is prevented by the fact that we flip the index. @@ -344,13 +345,13 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) * this call to wait_idx(), which waits on really old readers * describe in this comment above, will almost never need to wait. */ - wait_idx(sp, 1 - busy_idx, expedited); + wait_idx(sp, 1 - busy_idx, trycount); /* Flip the index to avoid reader-induced starvation. */ srcu_flip(sp); /* Wait for recent pre-existing readers. */ - wait_idx(sp, busy_idx, expedited); + wait_idx(sp, busy_idx, trycount); mutex_unlock(&sp->mutex); } @@ -371,7 +372,7 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) */ void synchronize_srcu(struct srcu_struct *sp) { - __synchronize_srcu(sp, 0); + __synchronize_srcu(sp, SYNCHRONIZE_SRCU_TRYCOUNT); } EXPORT_SYMBOL_GPL(synchronize_srcu); @@ -392,7 +393,7 @@ EXPORT_SYMBOL_GPL(synchronize_srcu); */ void synchronize_srcu_expedited(struct srcu_struct *sp) { - __synchronize_srcu(sp, 1); + __synchronize_srcu(sp, SYNCHRONIZE_SRCU_EXP_TRYCOUNT); } EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); -- cgit v1.2.3 From 931ea9d1a6e06a5e3af03aa4aaaa7c7fd90e163f Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Mon, 19 Mar 2012 16:12:13 +0800 Subject: rcu: Implement per-domain single-threaded call_srcu() state machine This commit implements an SRCU state machine in support of call_srcu(). The state machine is preemptible, light-weight, and single-threaded, minimizing synchronization overhead. In particular, there is no longer any need for synchronize_srcu() to be guarded by a mutex. Expedited processing is handled, at least in the absence of concurrent grace-period operations on that same srcu_struct structure, by having the synchronize_srcu_expedited() thread take on the role of the workqueue thread for one iteration. There is a reasonable probability that a given SRCU callback will be invoked on the same CPU that registered it, however, there is no guarantee. Concurrent SRCU grace-period primitives can cause callbacks to be executed elsewhere, even in absence of CPU-hotplug operations. Callbacks execute in process context, but under the influence of local_bh_disable(), so it is illegal to sleep in an SRCU callback function. Signed-off-by: Lai Jiangshan Acked-by: Peter Zijlstra Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 37 +++++- kernel/srcu.c | 362 ++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 336 insertions(+), 63 deletions(-) diff --git a/include/linux/srcu.h b/include/linux/srcu.h index e5ce80452b62..55a5c52cbb25 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -29,16 +29,30 @@ #include #include +#include struct srcu_struct_array { unsigned long c[2]; unsigned long seq[2]; }; +struct rcu_batch { + struct rcu_head *head, **tail; +}; + struct srcu_struct { unsigned completed; struct srcu_struct_array __percpu *per_cpu_ref; - struct mutex mutex; + spinlock_t queue_lock; /* protect ->batch_queue, ->running */ + bool running; + /* callbacks just queued */ + struct rcu_batch batch_queue; + /* callbacks try to do the first check_zero */ + struct rcu_batch batch_check0; + /* callbacks done with the first check_zero and the flip */ + struct rcu_batch batch_check1; + struct rcu_batch batch_done; + struct delayed_work work; #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ @@ -62,12 +76,33 @@ int init_srcu_struct(struct srcu_struct *sp); #endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */ +/** + * call_srcu() - Queue a callback for invocation after an SRCU grace period + * @sp: srcu_struct in queue the callback + * @head: structure to be used for queueing the SRCU callback. + * @func: function to be invoked after the SRCU grace period + * + * The callback function will be invoked some time after a full SRCU + * grace period elapses, in other words after all pre-existing SRCU + * read-side critical sections have completed. However, the callback + * function might well execute concurrently with other SRCU read-side + * critical sections that started after call_srcu() was invoked. SRCU + * read-side critical sections are delimited by srcu_read_lock() and + * srcu_read_unlock(), and may be nested. + * + * The callback will be invoked from process context, but must nevertheless + * be fast and must not block. + */ +void call_srcu(struct srcu_struct *sp, struct rcu_head *head, + void (*func)(struct rcu_head *head)); + void cleanup_srcu_struct(struct srcu_struct *sp); int __srcu_read_lock(struct srcu_struct *sp) __acquires(sp); void __srcu_read_unlock(struct srcu_struct *sp, int idx) __releases(sp); void synchronize_srcu(struct srcu_struct *sp); void synchronize_srcu_expedited(struct srcu_struct *sp); long srcu_batches_completed(struct srcu_struct *sp); +void srcu_barrier(struct srcu_struct *sp); #ifdef CONFIG_DEBUG_LOCK_ALLOC diff --git a/kernel/srcu.c b/kernel/srcu.c index b9088524935a..2095be3318d5 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -34,10 +34,77 @@ #include #include +/* + * Initialize an rcu_batch structure to empty. + */ +static inline void rcu_batch_init(struct rcu_batch *b) +{ + b->head = NULL; + b->tail = &b->head; +} + +/* + * Enqueue a callback onto the tail of the specified rcu_batch structure. + */ +static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head) +{ + *b->tail = head; + b->tail = &head->next; +} + +/* + * Is the specified rcu_batch structure empty? + */ +static inline bool rcu_batch_empty(struct rcu_batch *b) +{ + return b->tail == &b->head; +} + +/* + * Remove the callback at the head of the specified rcu_batch structure + * and return a pointer to it, or return NULL if the structure is empty. + */ +static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b) +{ + struct rcu_head *head; + + if (rcu_batch_empty(b)) + return NULL; + + head = b->head; + b->head = head->next; + if (b->tail == &head->next) + rcu_batch_init(b); + + return head; +} + +/* + * Move all callbacks from the rcu_batch structure specified by "from" to + * the structure specified by "to". + */ +static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from) +{ + if (!rcu_batch_empty(from)) { + *to->tail = from->head; + to->tail = from->tail; + rcu_batch_init(from); + } +} + +/* single-thread state-machine */ +static void process_srcu(struct work_struct *work); + static int init_srcu_struct_fields(struct srcu_struct *sp) { sp->completed = 0; - mutex_init(&sp->mutex); + spin_lock_init(&sp->queue_lock); + sp->running = false; + rcu_batch_init(&sp->batch_queue); + rcu_batch_init(&sp->batch_check0); + rcu_batch_init(&sp->batch_check1); + rcu_batch_init(&sp->batch_done); + INIT_DELAYED_WORK(&sp->work, process_srcu); sp->per_cpu_ref = alloc_percpu(struct srcu_struct_array); return sp->per_cpu_ref ? 0 : -ENOMEM; } @@ -266,43 +333,86 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); * we repeatedly block for 1-millisecond time periods. This approach * has done well in testing, so there is no need for a config parameter. */ -#define SYNCHRONIZE_SRCU_READER_DELAY 5 +#define SRCU_RETRY_CHECK_DELAY 5 #define SYNCHRONIZE_SRCU_TRYCOUNT 2 #define SYNCHRONIZE_SRCU_EXP_TRYCOUNT 12 /* - * Wait until all pre-existing readers complete. Such readers + * @@@ Wait until all pre-existing readers complete. Such readers * will have used the index specified by "idx". + * the caller should ensures the ->completed is not changed while checking + * and idx = (->completed & 1) ^ 1 */ -static void wait_idx(struct srcu_struct *sp, int idx, int trycount) +static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount) { - /* - * SRCU read-side critical sections are normally short, so wait - * a small amount of time before possibly blocking. - */ - if (!srcu_readers_active_idx_check(sp, idx)) { - udelay(SYNCHRONIZE_SRCU_READER_DELAY); - while (!srcu_readers_active_idx_check(sp, idx)) { - if (trycount > 0) { - trycount--; - udelay(SYNCHRONIZE_SRCU_READER_DELAY); - } else - schedule_timeout_interruptible(1); - } + for (;;) { + if (srcu_readers_active_idx_check(sp, idx)) + return true; + if (--trycount <= 0) + return false; + udelay(SRCU_RETRY_CHECK_DELAY); } } +/* + * Increment the ->completed counter so that future SRCU readers will + * use the other rank of the ->c[] and ->seq[] arrays. This allows + * us to wait for pre-existing readers in a starvation-free manner. + */ static void srcu_flip(struct srcu_struct *sp) { sp->completed++; } +/* + * Enqueue an SRCU callback on the specified srcu_struct structure, + * initiating grace-period processing if it is not already running. + */ +void call_srcu(struct srcu_struct *sp, struct rcu_head *head, + void (*func)(struct rcu_head *head)) +{ + unsigned long flags; + + head->next = NULL; + head->func = func; + spin_lock_irqsave(&sp->queue_lock, flags); + rcu_batch_queue(&sp->batch_queue, head); + if (!sp->running) { + sp->running = true; + queue_delayed_work(system_nrt_wq, &sp->work, 0); + } + spin_unlock_irqrestore(&sp->queue_lock, flags); +} +EXPORT_SYMBOL_GPL(call_srcu); + +struct rcu_synchronize { + struct rcu_head head; + struct completion completion; +}; + +/* + * Awaken the corresponding synchronize_srcu() instance now that a + * grace period has elapsed. + */ +static void wakeme_after_rcu(struct rcu_head *head) +{ + struct rcu_synchronize *rcu; + + rcu = container_of(head, struct rcu_synchronize, head); + complete(&rcu->completion); +} + +static void srcu_advance_batches(struct srcu_struct *sp, int trycount); +static void srcu_reschedule(struct srcu_struct *sp); + /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ static void __synchronize_srcu(struct srcu_struct *sp, int trycount) { - int busy_idx; + struct rcu_synchronize rcu; + struct rcu_head *head = &rcu.head; + bool done = false; rcu_lockdep_assert(!lock_is_held(&sp->dep_map) && !lock_is_held(&rcu_bh_lock_map) && @@ -310,50 +420,32 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount) !lock_is_held(&rcu_sched_lock_map), "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section"); - mutex_lock(&sp->mutex); - busy_idx = sp->completed & 0X1UL; - - /* - * If we recently flipped the index, there will be some readers - * using idx=0 and others using idx=1. Therefore, two calls to - * wait_idx()s suffice to ensure that all pre-existing readers - * have completed: - * - * __synchronize_srcu() { - * wait_idx(sp, 0, trycount); - * wait_idx(sp, 1, trycount); - * } - * - * Starvation is prevented by the fact that we flip the index. - * While we wait on one index to clear out, almost all new readers - * will be using the other index. The number of new readers using the - * index we are waiting on is sharply bounded by roughly the number - * of CPUs. - * - * How can new readers possibly using the old pre-flip value of - * the index? Consider the following sequence of events: - * - * Suppose that during the previous grace period, a reader - * picked up the old value of the index, but did not increment - * its counter until after the previous instance of - * __synchronize_srcu() did the counter summation and recheck. - * That previous grace period was OK because the reader did - * not start until after the grace period started, so the grace - * period was not obligated to wait for that reader. - * - * However, this sequence of events is quite improbable, so - * this call to wait_idx(), which waits on really old readers - * describe in this comment above, will almost never need to wait. - */ - wait_idx(sp, 1 - busy_idx, trycount); - - /* Flip the index to avoid reader-induced starvation. */ - srcu_flip(sp); - - /* Wait for recent pre-existing readers. */ - wait_idx(sp, busy_idx, trycount); + init_completion(&rcu.completion); + + head->next = NULL; + head->func = wakeme_after_rcu; + spin_lock_irq(&sp->queue_lock); + if (!sp->running) { + /* steal the processing owner */ + sp->running = true; + rcu_batch_queue(&sp->batch_check0, head); + spin_unlock_irq(&sp->queue_lock); + + srcu_advance_batches(sp, trycount); + if (!rcu_batch_empty(&sp->batch_done)) { + BUG_ON(sp->batch_done.head != head); + rcu_batch_dequeue(&sp->batch_done); + done = true; + } + /* give the processing owner to work_struct */ + srcu_reschedule(sp); + } else { + rcu_batch_queue(&sp->batch_queue, head); + spin_unlock_irq(&sp->queue_lock); + } - mutex_unlock(&sp->mutex); + if (!done) + wait_for_completion(&rcu.completion); } /** @@ -397,6 +489,15 @@ void synchronize_srcu_expedited(struct srcu_struct *sp) } EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); +/** + * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete. + */ +void srcu_barrier(struct srcu_struct *sp) +{ + synchronize_srcu(sp); +} +EXPORT_SYMBOL_GPL(srcu_barrier); + /** * srcu_batches_completed - return batches completed. * @sp: srcu_struct on which to report batch completion. @@ -404,9 +505,146 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); * Report the number of batches, correlated with, but not necessarily * precisely the same as, the number of grace periods that have elapsed. */ - long srcu_batches_completed(struct srcu_struct *sp) { return sp->completed; } EXPORT_SYMBOL_GPL(srcu_batches_completed); + +#define SRCU_CALLBACK_BATCH 10 +#define SRCU_INTERVAL 1 + +/* + * Move any new SRCU callbacks to the first stage of the SRCU grace + * period pipeline. + */ +static void srcu_collect_new(struct srcu_struct *sp) +{ + if (!rcu_batch_empty(&sp->batch_queue)) { + spin_lock_irq(&sp->queue_lock); + rcu_batch_move(&sp->batch_check0, &sp->batch_queue); + spin_unlock_irq(&sp->queue_lock); + } +} + +/* + * Core SRCU state machine. Advance callbacks from ->batch_check0 to + * ->batch_check1 and then to ->batch_done as readers drain. + */ +static void srcu_advance_batches(struct srcu_struct *sp, int trycount) +{ + int idx = 1 ^ (sp->completed & 1); + + /* + * Because readers might be delayed for an extended period after + * fetching ->completed for their index, at any point in time there + * might well be readers using both idx=0 and idx=1. We therefore + * need to wait for readers to clear from both index values before + * invoking a callback. + */ + + if (rcu_batch_empty(&sp->batch_check0) && + rcu_batch_empty(&sp->batch_check1)) + return; /* no callbacks need to be advanced */ + + if (!try_check_zero(sp, idx, trycount)) + return; /* failed to advance, will try after SRCU_INTERVAL */ + + /* + * The callbacks in ->batch_check1 have already done with their + * first zero check and flip back when they were enqueued on + * ->batch_check0 in a previous invocation of srcu_advance_batches(). + * (Presumably try_check_zero() returned false during that + * invocation, leaving the callbacks stranded on ->batch_check1.) + * They are therefore ready to invoke, so move them to ->batch_done. + */ + rcu_batch_move(&sp->batch_done, &sp->batch_check1); + + if (rcu_batch_empty(&sp->batch_check0)) + return; /* no callbacks need to be advanced */ + srcu_flip(sp); + + /* + * The callbacks in ->batch_check0 just finished their + * first check zero and flip, so move them to ->batch_check1 + * for future checking on the other idx. + */ + rcu_batch_move(&sp->batch_check1, &sp->batch_check0); + + /* + * SRCU read-side critical sections are normally short, so check + * at least twice in quick succession after a flip. + */ + trycount = trycount < 2 ? 2 : trycount; + if (!try_check_zero(sp, idx^1, trycount)) + return; /* failed to advance, will try after SRCU_INTERVAL */ + + /* + * The callbacks in ->batch_check1 have now waited for all + * pre-existing readers using both idx values. They are therefore + * ready to invoke, so move them to ->batch_done. + */ + rcu_batch_move(&sp->batch_done, &sp->batch_check1); +} + +/* + * Invoke a limited number of SRCU callbacks that have passed through + * their grace period. If there are more to do, SRCU will reschedule + * the workqueue. + */ +static void srcu_invoke_callbacks(struct srcu_struct *sp) +{ + int i; + struct rcu_head *head; + + for (i = 0; i < SRCU_CALLBACK_BATCH; i++) { + head = rcu_batch_dequeue(&sp->batch_done); + if (!head) + break; + local_bh_disable(); + head->func(head); + local_bh_enable(); + } +} + +/* + * Finished one round of SRCU grace period. Start another if there are + * more SRCU callbacks queued, otherwise put SRCU into not-running state. + */ +static void srcu_reschedule(struct srcu_struct *sp) +{ + bool pending = true; + + if (rcu_batch_empty(&sp->batch_done) && + rcu_batch_empty(&sp->batch_check1) && + rcu_batch_empty(&sp->batch_check0) && + rcu_batch_empty(&sp->batch_queue)) { + spin_lock_irq(&sp->queue_lock); + if (rcu_batch_empty(&sp->batch_done) && + rcu_batch_empty(&sp->batch_check1) && + rcu_batch_empty(&sp->batch_check0) && + rcu_batch_empty(&sp->batch_queue)) { + sp->running = false; + pending = false; + } + spin_unlock_irq(&sp->queue_lock); + } + + if (pending) + queue_delayed_work(system_nrt_wq, &sp->work, SRCU_INTERVAL); +} + +/* + * This is the work-queue function that handles SRCU grace periods. + */ +static void process_srcu(struct work_struct *work) +{ + struct srcu_struct *sp; + + sp = container_of(work, struct srcu_struct, work.work); + + srcu_collect_new(sp); + srcu_advance_batches(sp, 1); + srcu_invoke_callbacks(sp); + srcu_reschedule(sp); +} -- cgit v1.2.3 From 9059c94017f748d9e20c3b089188a7abb27f6233 Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Mon, 19 Mar 2012 16:12:14 +0800 Subject: rcu: Add rcutorture test for call_srcu() Add srcu_torture_deferred_free() for srcu_ops so as to test the new call_srcu(). Rename the original srcu_ops to srcu_sync_ops. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- kernel/rcutorture.c | 44 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/kernel/rcutorture.c b/kernel/rcutorture.c index d10b179dea83..e66b34ab7555 100644 --- a/kernel/rcutorture.c +++ b/kernel/rcutorture.c @@ -625,6 +625,11 @@ static int srcu_torture_completed(void) return srcu_batches_completed(&srcu_ctl); } +static void srcu_torture_deferred_free(struct rcu_torture *rp) +{ + call_srcu(&srcu_ctl, &rp->rtort_rcu, rcu_torture_cb); +} + static void srcu_torture_synchronize(void) { synchronize_srcu(&srcu_ctl); @@ -654,7 +659,7 @@ static struct rcu_torture_ops srcu_ops = { .read_delay = srcu_read_delay, .readunlock = srcu_torture_read_unlock, .completed = srcu_torture_completed, - .deferred_free = rcu_sync_torture_deferred_free, + .deferred_free = srcu_torture_deferred_free, .sync = srcu_torture_synchronize, .call = NULL, .cb_barrier = NULL, @@ -662,6 +667,21 @@ static struct rcu_torture_ops srcu_ops = { .name = "srcu" }; +static struct rcu_torture_ops srcu_sync_ops = { + .init = srcu_torture_init, + .cleanup = srcu_torture_cleanup, + .readlock = srcu_torture_read_lock, + .read_delay = srcu_read_delay, + .readunlock = srcu_torture_read_unlock, + .completed = srcu_torture_completed, + .deferred_free = rcu_sync_torture_deferred_free, + .sync = srcu_torture_synchronize, + .call = NULL, + .cb_barrier = NULL, + .stats = srcu_torture_stats, + .name = "srcu_sync" +}; + static int srcu_torture_read_lock_raw(void) __acquires(&srcu_ctl) { return srcu_read_lock_raw(&srcu_ctl); @@ -679,7 +699,7 @@ static struct rcu_torture_ops srcu_raw_ops = { .read_delay = srcu_read_delay, .readunlock = srcu_torture_read_unlock_raw, .completed = srcu_torture_completed, - .deferred_free = rcu_sync_torture_deferred_free, + .deferred_free = srcu_torture_deferred_free, .sync = srcu_torture_synchronize, .call = NULL, .cb_barrier = NULL, @@ -687,6 +707,21 @@ static struct rcu_torture_ops srcu_raw_ops = { .name = "srcu_raw" }; +static struct rcu_torture_ops srcu_raw_sync_ops = { + .init = srcu_torture_init, + .cleanup = srcu_torture_cleanup, + .readlock = srcu_torture_read_lock_raw, + .read_delay = srcu_read_delay, + .readunlock = srcu_torture_read_unlock_raw, + .completed = srcu_torture_completed, + .deferred_free = rcu_sync_torture_deferred_free, + .sync = srcu_torture_synchronize, + .call = NULL, + .cb_barrier = NULL, + .stats = srcu_torture_stats, + .name = "srcu_raw_sync" +}; + static void srcu_torture_synchronize_expedited(void) { synchronize_srcu_expedited(&srcu_ctl); @@ -1685,7 +1720,7 @@ static int rcu_torture_barrier_init(void) for (i = 0; i < n_barrier_cbs; i++) { init_waitqueue_head(&barrier_cbs_wq[i]); barrier_cbs_tasks[i] = kthread_run(rcu_torture_barrier_cbs, - (void *)i, + (void *)(long)i, "rcu_torture_barrier_cbs"); if (IS_ERR(barrier_cbs_tasks[i])) { ret = PTR_ERR(barrier_cbs_tasks[i]); @@ -1873,7 +1908,8 @@ rcu_torture_init(void) static struct rcu_torture_ops *torture_ops[] = { &rcu_ops, &rcu_sync_ops, &rcu_expedited_ops, &rcu_bh_ops, &rcu_bh_sync_ops, &rcu_bh_expedited_ops, - &srcu_ops, &srcu_raw_ops, &srcu_expedited_ops, + &srcu_ops, &srcu_sync_ops, &srcu_raw_ops, + &srcu_raw_sync_ops, &srcu_expedited_ops, &sched_ops, &sched_sync_ops, &sched_expedited_ops, }; mutex_lock(&fullstop_mutex); -- cgit v1.2.3 From f511fc624642f0bb8cf65aaa28979737514d4746 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Thu, 15 Mar 2012 12:16:26 -0700 Subject: rcu: Ensure that RCU_FAST_NO_HZ timers expire on correct CPU Timers are subject to migration, which can lead to the following system-hang scenario when CONFIG_RCU_FAST_NO_HZ=y: 1. CPU 0 executes synchronize_rcu(), which posts an RCU callback. 2. CPU 0 then goes idle. It cannot immediately invoke the callback, but there is nothing RCU needs from ti, so it enters dyntick-idle mode after posting a timer. 3. The timer gets migrated to CPU 1. 4. CPU 0 never wakes up, so the synchronize_rcu() never returns, so the system hangs. This commit fixes this problem by using mod_timer_pinned(), as suggested by Peter Zijlstra, to ensure that the timer is actually posted on the running CPU. Reported-by: Dipankar Sarma Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney --- kernel/rcutree_plugin.h | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h index ad61da79b311..d01e26df55a1 100644 --- a/kernel/rcutree_plugin.h +++ b/kernel/rcutree_plugin.h @@ -2110,6 +2110,8 @@ static void rcu_cleanup_after_idle(int cpu) */ static void rcu_prepare_for_idle(int cpu) { + struct timer_list *tp; + /* * If this is an idle re-entry, for example, due to use of * RCU_NONIDLE() or the new idle-loop tracing API within the idle @@ -2121,9 +2123,10 @@ static void rcu_prepare_for_idle(int cpu) if (!per_cpu(rcu_idle_first_pass, cpu) && (per_cpu(rcu_nonlazy_posted, cpu) == per_cpu(rcu_nonlazy_posted_snap, cpu))) { - if (rcu_cpu_has_callbacks(cpu)) - mod_timer(&per_cpu(rcu_idle_gp_timer, cpu), - per_cpu(rcu_idle_gp_timer_expires, cpu)); + if (rcu_cpu_has_callbacks(cpu)) { + tp = &per_cpu(rcu_idle_gp_timer, cpu); + mod_timer_pinned(tp, per_cpu(rcu_idle_gp_timer_expires, cpu)); + } return; } per_cpu(rcu_idle_first_pass, cpu) = 0; @@ -2167,8 +2170,8 @@ static void rcu_prepare_for_idle(int cpu) else per_cpu(rcu_idle_gp_timer_expires, cpu) = jiffies + RCU_IDLE_LAZY_GP_DELAY; - mod_timer(&per_cpu(rcu_idle_gp_timer, cpu), - per_cpu(rcu_idle_gp_timer_expires, cpu)); + tp = &per_cpu(rcu_idle_gp_timer, cpu); + mod_timer_pinned(tp, per_cpu(rcu_idle_gp_timer_expires, cpu)); per_cpu(rcu_nonlazy_posted_snap, cpu) = per_cpu(rcu_nonlazy_posted, cpu); return; /* Nothing more to do immediately. */ -- cgit v1.2.3 From 616c310e83b872024271c915c1b9ab505b9efad9 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Tue, 27 Mar 2012 16:02:08 -0700 Subject: rcu: Move PREEMPT_RCU preemption to switch_to() invocation Currently, PREEMPT_RCU readers are enqueued upon entry to the scheduler. This is inefficient because enqueuing is required only if there is a context switch, and entry to the scheduler does not guarantee a context switch. The commit therefore moves the enqueuing to immediately precede the call to switch_to() from the scheduler. Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney Tested-by: Linus Torvalds --- arch/um/drivers/mconsole_kern.c | 1 + include/linux/rcupdate.h | 1 + include/linux/rcutiny.h | 6 ------ include/linux/sched.h | 10 ++++++++++ kernel/rcutree.c | 1 - kernel/rcutree.h | 1 - kernel/rcutree_plugin.h | 14 +++----------- kernel/sched/core.c | 1 + 8 files changed, 16 insertions(+), 19 deletions(-) diff --git a/arch/um/drivers/mconsole_kern.c b/arch/um/drivers/mconsole_kern.c index 43b39d61b538..88e466b159dc 100644 --- a/arch/um/drivers/mconsole_kern.c +++ b/arch/um/drivers/mconsole_kern.c @@ -705,6 +705,7 @@ static void stack_proc(void *arg) struct task_struct *from = current, *to = arg; to->thread.saved_task = from; + rcu_switch_from(from); switch_to(from, to, from); } diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h index 20fb776a1d4a..bbfe7854a6a6 100644 --- a/include/linux/rcupdate.h +++ b/include/linux/rcupdate.h @@ -184,6 +184,7 @@ static inline int rcu_preempt_depth(void) /* Internal to kernel */ extern void rcu_sched_qs(int cpu); extern void rcu_bh_qs(int cpu); +extern void rcu_preempt_note_context_switch(void); extern void rcu_check_callbacks(int cpu, int user); struct notifier_block; extern void rcu_idle_enter(void); diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h index e93df77176d1..080b5bdda28e 100644 --- a/include/linux/rcutiny.h +++ b/include/linux/rcutiny.h @@ -87,10 +87,6 @@ static inline void kfree_call_rcu(struct rcu_head *head, #ifdef CONFIG_TINY_RCU -static inline void rcu_preempt_note_context_switch(void) -{ -} - static inline void exit_rcu(void) { } @@ -102,7 +98,6 @@ static inline int rcu_needs_cpu(int cpu) #else /* #ifdef CONFIG_TINY_RCU */ -void rcu_preempt_note_context_switch(void); extern void exit_rcu(void); int rcu_preempt_needs_cpu(void); @@ -116,7 +111,6 @@ static inline int rcu_needs_cpu(int cpu) static inline void rcu_note_context_switch(int cpu) { rcu_sched_qs(cpu); - rcu_preempt_note_context_switch(); } /* diff --git a/include/linux/sched.h b/include/linux/sched.h index 81a173c0897d..8f3fd945070f 100644 --- a/include/linux/sched.h +++ b/include/linux/sched.h @@ -1905,12 +1905,22 @@ static inline void rcu_copy_process(struct task_struct *p) INIT_LIST_HEAD(&p->rcu_node_entry); } +static inline void rcu_switch_from(struct task_struct *prev) +{ + if (prev->rcu_read_lock_nesting != 0) + rcu_preempt_note_context_switch(); +} + #else static inline void rcu_copy_process(struct task_struct *p) { } +static inline void rcu_switch_from(struct task_struct *prev) +{ +} + #endif #ifdef CONFIG_SMP diff --git a/kernel/rcutree.c b/kernel/rcutree.c index 1050d6d3922c..61351505ec78 100644 --- a/kernel/rcutree.c +++ b/kernel/rcutree.c @@ -192,7 +192,6 @@ void rcu_note_context_switch(int cpu) { trace_rcu_utilization("Start context switch"); rcu_sched_qs(cpu); - rcu_preempt_note_context_switch(cpu); trace_rcu_utilization("End context switch"); } EXPORT_SYMBOL_GPL(rcu_note_context_switch); diff --git a/kernel/rcutree.h b/kernel/rcutree.h index cdd1be0a4072..d6b70b08a01a 100644 --- a/kernel/rcutree.h +++ b/kernel/rcutree.h @@ -423,7 +423,6 @@ DECLARE_PER_CPU(char, rcu_cpu_has_work); /* Forward declarations for rcutree_plugin.h */ static void rcu_bootup_announce(void); long rcu_batches_completed(void); -static void rcu_preempt_note_context_switch(int cpu); static int rcu_preempt_blocked_readers_cgp(struct rcu_node *rnp); #ifdef CONFIG_HOTPLUG_CPU static void rcu_report_unblock_qs_rnp(struct rcu_node *rnp, diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h index c023464816be..b1ac22e6fa31 100644 --- a/kernel/rcutree_plugin.h +++ b/kernel/rcutree_plugin.h @@ -153,7 +153,7 @@ static void rcu_preempt_qs(int cpu) * * Caller must disable preemption. */ -static void rcu_preempt_note_context_switch(int cpu) +void rcu_preempt_note_context_switch(void) { struct task_struct *t = current; unsigned long flags; @@ -164,7 +164,7 @@ static void rcu_preempt_note_context_switch(int cpu) (t->rcu_read_unlock_special & RCU_READ_UNLOCK_BLOCKED) == 0) { /* Possibly blocking in an RCU read-side critical section. */ - rdp = per_cpu_ptr(rcu_preempt_state.rda, cpu); + rdp = __this_cpu_ptr(rcu_preempt_state.rda); rnp = rdp->mynode; raw_spin_lock_irqsave(&rnp->lock, flags); t->rcu_read_unlock_special |= RCU_READ_UNLOCK_BLOCKED; @@ -228,7 +228,7 @@ static void rcu_preempt_note_context_switch(int cpu) * means that we continue to block the current grace period. */ local_irq_save(flags); - rcu_preempt_qs(cpu); + rcu_preempt_qs(smp_processor_id()); local_irq_restore(flags); } @@ -1017,14 +1017,6 @@ void rcu_force_quiescent_state(void) } EXPORT_SYMBOL_GPL(rcu_force_quiescent_state); -/* - * Because preemptible RCU does not exist, we never have to check for - * CPUs being in quiescent states. - */ -static void rcu_preempt_note_context_switch(int cpu) -{ -} - /* * Because preemptible RCU does not exist, there are never any preempted * RCU readers. diff --git a/kernel/sched/core.c b/kernel/sched/core.c index 4603b9d8f30a..5d89eb93f7e4 100644 --- a/kernel/sched/core.c +++ b/kernel/sched/core.c @@ -2083,6 +2083,7 @@ context_switch(struct rq *rq, struct task_struct *prev, #endif /* Here we just switch the register state and the stack. */ + rcu_switch_from(prev); switch_to(prev, next, prev); barrier(); -- cgit v1.2.3 From 9dd8fb16c36178df2066387d2abd44d8b4dca8c8 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Fri, 13 Apr 2012 12:54:22 -0700 Subject: rcu: Make exit_rcu() more precise and consolidate When running preemptible RCU, if a task exits in an RCU read-side critical section having blocked within that same RCU read-side critical section, the task must be removed from the list of tasks blocking a grace period (perhaps the current grace period, perhaps the next grace period, depending on timing). The exit() path invokes exit_rcu() to do this cleanup. However, the current implementation of exit_rcu() needlessly does the cleanup even if the task did not block within the current RCU read-side critical section, which wastes time and needlessly increases the size of the state space. Fix this by only doing the cleanup if the current task is actually on the list of tasks blocking some grace period. While we are at it, consolidate the two identical exit_rcu() functions into a single function. Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney Tested-by: Linus Torvalds Conflicts: kernel/rcupdate.c --- include/linux/rcupdate.h | 1 + include/linux/rcutiny.h | 5 ----- include/linux/rcutree.h | 12 ------------ kernel/rcupdate.c | 28 ++++++++++++++++++++++++++++ kernel/rcutiny_plugin.h | 16 ---------------- kernel/rcutree_plugin.h | 16 ---------------- 6 files changed, 29 insertions(+), 49 deletions(-) diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h index bbfe7854a6a6..29665a3b3ac5 100644 --- a/include/linux/rcupdate.h +++ b/include/linux/rcupdate.h @@ -191,6 +191,7 @@ extern void rcu_idle_enter(void); extern void rcu_idle_exit(void); extern void rcu_irq_enter(void); extern void rcu_irq_exit(void); +extern void exit_rcu(void); /** * RCU_NONIDLE - Indicate idle-loop code that needs RCU readers diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h index 080b5bdda28e..adb5e5a38cae 100644 --- a/include/linux/rcutiny.h +++ b/include/linux/rcutiny.h @@ -87,10 +87,6 @@ static inline void kfree_call_rcu(struct rcu_head *head, #ifdef CONFIG_TINY_RCU -static inline void exit_rcu(void) -{ -} - static inline int rcu_needs_cpu(int cpu) { return 0; @@ -98,7 +94,6 @@ static inline int rcu_needs_cpu(int cpu) #else /* #ifdef CONFIG_TINY_RCU */ -extern void exit_rcu(void); int rcu_preempt_needs_cpu(void); static inline int rcu_needs_cpu(int cpu) diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h index e8ee5dd0854c..782a8ab51bc1 100644 --- a/include/linux/rcutree.h +++ b/include/linux/rcutree.h @@ -45,18 +45,6 @@ static inline void rcu_virt_note_context_switch(int cpu) rcu_note_context_switch(cpu); } -#ifdef CONFIG_TREE_PREEMPT_RCU - -extern void exit_rcu(void); - -#else /* #ifdef CONFIG_TREE_PREEMPT_RCU */ - -static inline void exit_rcu(void) -{ -} - -#endif /* #else #ifdef CONFIG_TREE_PREEMPT_RCU */ - extern void synchronize_rcu_bh(void); extern void synchronize_sched_expedited(void); extern void synchronize_rcu_expedited(void); diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c index a86f1741cc27..95cba41ce1e9 100644 --- a/kernel/rcupdate.c +++ b/kernel/rcupdate.c @@ -51,6 +51,34 @@ #include "rcu.h" +#ifdef CONFIG_PREEMPT_RCU + +/* + * Check for a task exiting while in a preemptible-RCU read-side + * critical section, clean up if so. No need to issue warnings, + * as debug_check_no_locks_held() already does this if lockdep + * is enabled. + */ +void exit_rcu(void) +{ + struct task_struct *t = current; + + if (likely(list_empty(¤t->rcu_node_entry))) + return; + t->rcu_read_lock_nesting = 1; + barrier(); + t->rcu_read_unlock_special = RCU_READ_UNLOCK_BLOCKED; + __rcu_read_unlock(); +} + +#else /* #ifdef CONFIG_PREEMPT_RCU */ + +void exit_rcu(void) +{ +} + +#endif /* #else #ifdef CONFIG_PREEMPT_RCU */ + #ifdef CONFIG_DEBUG_LOCK_ALLOC static struct lock_class_key rcu_lock_key; struct lockdep_map rcu_lock_map = diff --git a/kernel/rcutiny_plugin.h b/kernel/rcutiny_plugin.h index 22ecea0dfb62..fc31a2d65100 100644 --- a/kernel/rcutiny_plugin.h +++ b/kernel/rcutiny_plugin.h @@ -851,22 +851,6 @@ int rcu_preempt_needs_cpu(void) return rcu_preempt_ctrlblk.rcb.rcucblist != NULL; } -/* - * Check for a task exiting while in a preemptible -RCU read-side - * critical section, clean up if so. No need to issue warnings, - * as debug_check_no_locks_held() already does this if lockdep - * is enabled. - */ -void exit_rcu(void) -{ - struct task_struct *t = current; - - if (t->rcu_read_lock_nesting == 0) - return; - t->rcu_read_lock_nesting = 1; - __rcu_read_unlock(); -} - #else /* #ifdef CONFIG_TINY_PREEMPT_RCU */ #ifdef CONFIG_RCU_TRACE diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h index b1ac22e6fa31..4936fff820eb 100644 --- a/kernel/rcutree_plugin.h +++ b/kernel/rcutree_plugin.h @@ -969,22 +969,6 @@ static void __init __rcu_init_preempt(void) rcu_init_one(&rcu_preempt_state, &rcu_preempt_data); } -/* - * Check for a task exiting while in a preemptible-RCU read-side - * critical section, clean up if so. No need to issue warnings, - * as debug_check_no_locks_held() already does this if lockdep - * is enabled. - */ -void exit_rcu(void) -{ - struct task_struct *t = current; - - if (t->rcu_read_lock_nesting == 0) - return; - t->rcu_read_lock_nesting = 1; - __rcu_read_unlock(); -} - #else /* #ifdef CONFIG_TREE_PREEMPT_RCU */ static struct rcu_state *rcu_state = &rcu_sched_state; -- cgit v1.2.3 From 9fab97876af844c2abb7c39300bff34025926565 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Mon, 7 May 2012 09:36:34 -0700 Subject: rcu: Update RCU maintainership Split SRCU out and add Lai Jiangshan as SRCU co-maintainer. Signed-off-by: Paul E. McKenney --- MAINTAINERS | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/MAINTAINERS b/MAINTAINERS index 1a2f8f5823e0..a1c2ab2c6d60 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -5607,14 +5607,13 @@ F: net/rds/ READ-COPY UPDATE (RCU) M: Dipankar Sarma M: "Paul E. McKenney" -W: http://www.rdrop.com/users/paulmck/rclock/ +W: http://www.rdrop.com/users/paulmck/RCU/ S: Supported T: git git://git.kernel.org/pub/scm/linux/kernel/git/paulmck/linux-rcu.git F: Documentation/RCU/ +X: Documentation/RCU/torture.txt F: include/linux/rcu* -F: include/linux/srcu* F: kernel/rcu* -F: kernel/srcu* X: kernel/rcutorture.c REAL TIME CLOCK (RTC) SUBSYSTEM @@ -6131,6 +6130,15 @@ S: Maintained F: include/linux/sl?b*.h F: mm/sl?b.c +SLEEPABLE READ-COPY UPDATE (SRCU) +M: Lai Jiangshan +M: "Paul E. McKenney" +W: http://www.rdrop.com/users/paulmck/RCU/ +S: Supported +T: git git://git.kernel.org/pub/scm/linux/kernel/git/paulmck/linux-rcu.git +F: include/linux/srcu* +F: kernel/srcu* + SMC91x ETHERNET DRIVER M: Nicolas Pitre S: Odd Fixes -- cgit v1.2.3 From 21e52e15666323078b8517a4312712579176b56f Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Mon, 30 Apr 2012 14:16:19 -0700 Subject: rcu: Make RCU_FAST_NO_HZ handle timer migration The current RCU_FAST_NO_HZ assumes that timers do not migrate unless a CPU goes offline, in which case it assumes that the CPU will have to come out of dyntick-idle mode (cancelling the timer) in order to go offline. This is important because when RCU_FAST_NO_HZ permits a CPU to enter dyntick-idle mode despite having RCU callbacks pending, it posts a timer on that CPU to force a wakeup on that CPU. This wakeup ensures that the CPU will eventually handle the end of the grace period, including invoking its RCU callbacks. However, Pascal Chapperon's test setup shows that the timer handler rcu_idle_gp_timer_func() really does get invoked in some cases. This is problematic because this can cause the CPU that entered dyntick-idle mode despite still having RCU callbacks pending to remain in dyntick-idle mode indefinitely, which means that its RCU callbacks might never be invoked. This situation can result in grace-period delays or even system hangs, which matches Pascal's observations of slow boot-up and shutdown (https://lkml.org/lkml/2012/4/5/142). See also the bugzilla: https://bugzilla.redhat.com/show_bug.cgi?id=806548 This commit therefore causes the "should never be invoked" timer handler rcu_idle_gp_timer_func() to use smp_call_function_single() to wake up the CPU for which the timer was intended, allowing that CPU to invoke its RCU callbacks in a timely manner. Reported-by: Pascal Chapperon Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney --- include/trace/events/rcu.h | 1 + kernel/rcutree_plugin.h | 24 +++++++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/include/trace/events/rcu.h b/include/trace/events/rcu.h index aaa55e1b8c48..1480900c511c 100644 --- a/include/trace/events/rcu.h +++ b/include/trace/events/rcu.h @@ -292,6 +292,7 @@ TRACE_EVENT(rcu_dyntick, * "More callbacks": Still more callbacks, try again to clear them out. * "Callbacks drained": All callbacks processed, off to dyntick idle! * "Timer": Timer fired to cause CPU to continue processing callbacks. + * "Demigrate": Timer fired on wrong CPU, woke up correct CPU. * "Cleanup after idle": Idle exited, timer canceled. */ TRACE_EVENT(rcu_prep_idle, diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h index d01e26df55a1..bbb43cad755e 100644 --- a/kernel/rcutree_plugin.h +++ b/kernel/rcutree_plugin.h @@ -2056,17 +2056,35 @@ static bool rcu_cpu_has_nonlazy_callbacks(int cpu) rcu_preempt_cpu_has_nonlazy_callbacks(cpu); } +/* + * Handler for smp_call_function_single(). The only point of this + * handler is to wake the CPU up, so the handler does only tracing. + */ +void rcu_idle_demigrate(void *unused) +{ + trace_rcu_prep_idle("Demigrate"); +} + /* * Timer handler used to force CPU to start pushing its remaining RCU * callbacks in the case where it entered dyntick-idle mode with callbacks * pending. The hander doesn't really need to do anything because the * real work is done upon re-entry to idle, or by the next scheduling-clock * interrupt should idle not be re-entered. + * + * One special case: the timer gets migrated without awakening the CPU + * on which the timer was scheduled on. In this case, we must wake up + * that CPU. We do so with smp_call_function_single(). */ -static void rcu_idle_gp_timer_func(unsigned long unused) +static void rcu_idle_gp_timer_func(unsigned long cpu_in) { - WARN_ON_ONCE(1); /* Getting here can hang the system... */ + int cpu = (int)cpu_in; + trace_rcu_prep_idle("Timer"); + if (cpu != smp_processor_id()) + smp_call_function_single(cpu, rcu_idle_demigrate, NULL, 0); + else + WARN_ON_ONCE(1); /* Getting here can hang the system... */ } /* @@ -2075,7 +2093,7 @@ static void rcu_idle_gp_timer_func(unsigned long unused) static void rcu_prepare_for_idle_init(int cpu) { setup_timer(&per_cpu(rcu_idle_gp_timer, cpu), - rcu_idle_gp_timer_func, 0); + rcu_idle_gp_timer_func, cpu); } /* -- cgit v1.2.3 From 98248a0e24327bc64eb7518145c44bff7bebebc3 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Thu, 3 May 2012 15:38:10 -0700 Subject: rcu: Explicitly initialize RCU_FAST_NO_HZ per-CPU variables The current initialization of the RCU_FAST_NO_HZ per-CPU variables makes needless and fragile assumptions about the initial value of things like the jiffies counter. This commit therefore explicitly initializes all of them that are better started with a non-zero value. It also adds some comments describing the per-CPU state variables. Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney --- kernel/rcutree_plugin.h | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h index bbb43cad755e..7082ea93566f 100644 --- a/kernel/rcutree_plugin.h +++ b/kernel/rcutree_plugin.h @@ -1986,12 +1986,19 @@ static void rcu_idle_count_callbacks_posted(void) #define RCU_IDLE_GP_DELAY 6 /* Roughly one grace period. */ #define RCU_IDLE_LAZY_GP_DELAY (6 * HZ) /* Roughly six seconds. */ +/* Loop counter for rcu_prepare_for_idle(). */ static DEFINE_PER_CPU(int, rcu_dyntick_drain); +/* If rcu_dyntick_holdoff==jiffies, don't try to enter dyntick-idle mode. */ static DEFINE_PER_CPU(unsigned long, rcu_dyntick_holdoff); +/* Timer to awaken the CPU if it enters dyntick-idle mode with callbacks. */ static DEFINE_PER_CPU(struct timer_list, rcu_idle_gp_timer); +/* Scheduled expiry time for rcu_idle_gp_timer to allow reposting. */ static DEFINE_PER_CPU(unsigned long, rcu_idle_gp_timer_expires); +/* Enable special processing on first attempt to enter dyntick-idle mode. */ static DEFINE_PER_CPU(bool, rcu_idle_first_pass); +/* Running count of non-lazy callbacks posted, never decremented. */ static DEFINE_PER_CPU(unsigned long, rcu_nonlazy_posted); +/* Snapshot of rcu_nonlazy_posted to detect meaningful exits from idle. */ static DEFINE_PER_CPU(unsigned long, rcu_nonlazy_posted_snap); /* @@ -2092,8 +2099,11 @@ static void rcu_idle_gp_timer_func(unsigned long cpu_in) */ static void rcu_prepare_for_idle_init(int cpu) { + per_cpu(rcu_dyntick_holdoff, cpu) = jiffies - 1; setup_timer(&per_cpu(rcu_idle_gp_timer, cpu), rcu_idle_gp_timer_func, cpu); + per_cpu(rcu_idle_gp_timer_expires, cpu) = jiffies - 1; + per_cpu(rcu_idle_first_pass, cpu) = 1; } /* @@ -2232,10 +2242,12 @@ static void rcu_prepare_for_idle(int cpu) } /* - * Keep a running count of callbacks posted so that rcu_prepare_for_idle() - * can detect when something out of the idle loop posts a callback. - * Of course, it had better do so either from a trace event designed to - * be called from idle or from within RCU_NONIDLE(). + * Keep a running count of the number of non-lazy callbacks posted + * on this CPU. This running counter (which is never decremented) allows + * rcu_prepare_for_idle() to detect when something out of the idle loop + * posts a callback, even if an equal number of callbacks are invoked. + * Of course, callbacks should only be posted from within a trace event + * designed to be called from idle or from within RCU_NONIDLE(). */ static void rcu_idle_count_callbacks_posted(void) { -- cgit v1.2.3 From b1420f1c8bfc30ecf6380a31d0f686884834b599 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Thu, 1 Mar 2012 13:18:08 -0800 Subject: rcu: Make rcu_barrier() less disruptive The rcu_barrier() primitive interrupts each and every CPU, registering a callback on every CPU. Once all of these callbacks have been invoked, rcu_barrier() knows that every callback that was registered before the call to rcu_barrier() has also been invoked. However, there is no point in registering a callback on a CPU that currently has no callbacks, most especially if that CPU is in a deep idle state. This commit therefore makes rcu_barrier() avoid interrupting CPUs that have no callbacks. Doing this requires reworking the handling of orphaned callbacks, otherwise callbacks could slip through rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had not yet interrupted to a CPU that rcu_barrier() had already interrupted. This reworking was needed anyway to take a first step towards weaning RCU from the CPU_DYING notifier's use of stop_cpu(). Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney --- kernel/rcutree.c | 295 +++++++++++++++++++++++++++++++++++-------------- kernel/rcutree.h | 11 ++ kernel/rcutree_trace.c | 4 +- 3 files changed, 222 insertions(+), 88 deletions(-) diff --git a/kernel/rcutree.c b/kernel/rcutree.c index 403306b86e78..e578dd327c64 100644 --- a/kernel/rcutree.c +++ b/kernel/rcutree.c @@ -75,6 +75,8 @@ static struct lock_class_key rcu_node_class[NUM_RCU_LVLS]; .gpnum = -300, \ .completed = -300, \ .onofflock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.onofflock), \ + .orphan_nxttail = &structname##_state.orphan_nxtlist, \ + .orphan_donetail = &structname##_state.orphan_donelist, \ .fqslock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.fqslock), \ .n_force_qs = 0, \ .n_force_qs_ngp = 0, \ @@ -145,6 +147,13 @@ static void invoke_rcu_callbacks(struct rcu_state *rsp, struct rcu_data *rdp); unsigned long rcutorture_testseq; unsigned long rcutorture_vernum; +/* State information for rcu_barrier() and friends. */ + +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL}; +static atomic_t rcu_barrier_cpu_count; +static DEFINE_MUTEX(rcu_barrier_mutex); +static struct completion rcu_barrier_completion; + /* * Return true if an RCU grace period is in progress. The ACCESS_ONCE()s * permit this function to be invoked without holding the root rcu_node @@ -1311,95 +1320,133 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp) #ifdef CONFIG_HOTPLUG_CPU /* - * Move a dying CPU's RCU callbacks to online CPU's callback list. - * Also record a quiescent state for this CPU for the current grace period. - * Synchronization and interrupt disabling are not required because - * this function executes in stop_machine() context. Therefore, cleanup - * operations that might block must be done later from the CPU_DEAD - * notifier. - * - * Note that the outgoing CPU's bit has already been cleared in the - * cpu_online_mask. This allows us to randomly pick a callback - * destination from the bits set in that mask. + * Send the specified CPU's RCU callbacks to the orphanage. The + * specified CPU must be offline, and the caller must hold the + * ->onofflock. */ -static void rcu_cleanup_dying_cpu(struct rcu_state *rsp) +static void +rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp, + struct rcu_node *rnp, struct rcu_data *rdp) { int i; - unsigned long mask; - int receive_cpu = cpumask_any(cpu_online_mask); - struct rcu_data *rdp = this_cpu_ptr(rsp->rda); - struct rcu_data *receive_rdp = per_cpu_ptr(rsp->rda, receive_cpu); - RCU_TRACE(struct rcu_node *rnp = rdp->mynode); /* For dying CPU. */ - /* First, adjust the counts. */ + /* + * Orphan the callbacks. First adjust the counts. This is safe + * because ->onofflock excludes _rcu_barrier()'s adoption of + * the callbacks, thus no memory barrier is required. + */ if (rdp->nxtlist != NULL) { - receive_rdp->qlen_lazy += rdp->qlen_lazy; - receive_rdp->qlen += rdp->qlen; + rsp->qlen_lazy += rdp->qlen_lazy; + rsp->qlen += rdp->qlen; + rdp->n_cbs_orphaned += rdp->qlen; rdp->qlen_lazy = 0; rdp->qlen = 0; } /* - * Next, move ready-to-invoke callbacks to be invoked on some - * other CPU. These will not be required to pass through another - * grace period: They are done, regardless of CPU. + * Next, move those callbacks still needing a grace period to + * the orphanage, where some other CPU will pick them up. + * Some of the callbacks might have gone partway through a grace + * period, but that is too bad. They get to start over because we + * cannot assume that grace periods are synchronized across CPUs. + * We don't bother updating the ->nxttail[] array yet, instead + * we just reset the whole thing later on. */ - if (rdp->nxtlist != NULL && - rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist) { - struct rcu_head *oldhead; - struct rcu_head **oldtail; - struct rcu_head **newtail; - - oldhead = rdp->nxtlist; - oldtail = receive_rdp->nxttail[RCU_DONE_TAIL]; - rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL]; - *rdp->nxttail[RCU_DONE_TAIL] = *oldtail; - *receive_rdp->nxttail[RCU_DONE_TAIL] = oldhead; - newtail = rdp->nxttail[RCU_DONE_TAIL]; - for (i = RCU_DONE_TAIL; i < RCU_NEXT_SIZE; i++) { - if (receive_rdp->nxttail[i] == oldtail) - receive_rdp->nxttail[i] = newtail; - if (rdp->nxttail[i] == newtail) - rdp->nxttail[i] = &rdp->nxtlist; - } + if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) { + *rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL]; + rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL]; + *rdp->nxttail[RCU_DONE_TAIL] = NULL; } /* - * Finally, put the rest of the callbacks at the end of the list. - * The ones that made it partway through get to start over: We - * cannot assume that grace periods are synchronized across CPUs. - * (We could splice RCU_WAIT_TAIL into RCU_NEXT_READY_TAIL, but - * this does not seem compelling. Not yet, anyway.) + * Then move the ready-to-invoke callbacks to the orphanage, + * where some other CPU will pick them up. These will not be + * required to pass though another grace period: They are done. */ if (rdp->nxtlist != NULL) { - *receive_rdp->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist; - receive_rdp->nxttail[RCU_NEXT_TAIL] = - rdp->nxttail[RCU_NEXT_TAIL]; - receive_rdp->n_cbs_adopted += rdp->qlen; - rdp->n_cbs_orphaned += rdp->qlen; - - rdp->nxtlist = NULL; - for (i = 0; i < RCU_NEXT_SIZE; i++) - rdp->nxttail[i] = &rdp->nxtlist; + *rsp->orphan_donetail = rdp->nxtlist; + rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL]; } + /* Finally, initialize the rcu_data structure's list to empty. */ + rdp->nxtlist = NULL; + for (i = 0; i < RCU_NEXT_SIZE; i++) + rdp->nxttail[i] = &rdp->nxtlist; +} + +/* + * Adopt the RCU callbacks from the specified rcu_state structure's + * orphanage. The caller must hold the ->onofflock. + */ +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp) +{ + int i; + struct rcu_data *rdp = __this_cpu_ptr(rsp->rda); + /* - * Record a quiescent state for the dying CPU. This is safe - * only because we have already cleared out the callbacks. - * (Otherwise, the RCU core might try to schedule the invocation - * of callbacks on this now-offline CPU, which would be bad.) + * If there is an rcu_barrier() operation in progress, then + * only the task doing that operation is permitted to adopt + * callbacks. To do otherwise breaks rcu_barrier() and friends + * by causing them to fail to wait for the callbacks in the + * orphanage. */ - mask = rdp->grpmask; /* rnp->grplo is constant. */ + if (rsp->rcu_barrier_in_progress && + rsp->rcu_barrier_in_progress != current) + return; + + /* Do the accounting first. */ + rdp->qlen_lazy += rsp->qlen_lazy; + rdp->qlen += rsp->qlen; + rdp->n_cbs_adopted += rsp->qlen; + rsp->qlen_lazy = 0; + rsp->qlen = 0; + + /* + * We do not need a memory barrier here because the only way we + * can get here if there is an rcu_barrier() in flight is if + * we are the task doing the rcu_barrier(). + */ + + /* First adopt the ready-to-invoke callbacks. */ + if (rsp->orphan_donelist != NULL) { + *rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL]; + *rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist; + for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--) + if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL]) + rdp->nxttail[i] = rsp->orphan_donetail; + rsp->orphan_donelist = NULL; + rsp->orphan_donetail = &rsp->orphan_donelist; + } + + /* And then adopt the callbacks that still need a grace period. */ + if (rsp->orphan_nxtlist != NULL) { + *rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist; + rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail; + rsp->orphan_nxtlist = NULL; + rsp->orphan_nxttail = &rsp->orphan_nxtlist; + } +} + +/* + * Trace the fact that this CPU is going offline. + */ +static void rcu_cleanup_dying_cpu(struct rcu_state *rsp) +{ + RCU_TRACE(unsigned long mask); + RCU_TRACE(struct rcu_data *rdp = this_cpu_ptr(rsp->rda)); + RCU_TRACE(struct rcu_node *rnp = rdp->mynode); + + RCU_TRACE(mask = rdp->grpmask); trace_rcu_grace_period(rsp->name, rnp->gpnum + 1 - !!(rnp->qsmask & mask), "cpuofl"); - rcu_report_qs_rdp(smp_processor_id(), rsp, rdp, rsp->gpnum); - /* Note that rcu_report_qs_rdp() might call trace_rcu_grace_period(). */ } /* * The CPU has been completely removed, and some other CPU is reporting - * this fact from process context. Do the remainder of the cleanup. + * this fact from process context. Do the remainder of the cleanup, + * including orphaning the outgoing CPU's RCU callbacks, and also + * adopting them, if there is no _rcu_barrier() instance running. * There can only be one CPU hotplug operation at a time, so no other * CPU can be attempting to update rcu_cpu_kthread_task. */ @@ -1409,17 +1456,21 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp) unsigned long mask; int need_report = 0; struct rcu_data *rdp = per_cpu_ptr(rsp->rda, cpu); - struct rcu_node *rnp = rdp->mynode; /* Outgoing CPU's rnp. */ + struct rcu_node *rnp = rdp->mynode; /* Outgoing CPU's rdp & rnp. */ /* Adjust any no-longer-needed kthreads. */ rcu_stop_cpu_kthread(cpu); rcu_node_kthread_setaffinity(rnp, -1); - /* Remove the dying CPU from the bitmasks in the rcu_node hierarchy. */ + /* Remove the dead CPU from the bitmasks in the rcu_node hierarchy. */ /* Exclude any attempts to start a new grace period. */ raw_spin_lock_irqsave(&rsp->onofflock, flags); + /* Orphan the dead CPU's callbacks, and adopt them if appropriate. */ + rcu_send_cbs_to_orphanage(cpu, rsp, rnp, rdp); + rcu_adopt_orphan_cbs(rsp); + /* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */ mask = rdp->grpmask; /* rnp->grplo is constant. */ do { @@ -1456,6 +1507,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp) #else /* #ifdef CONFIG_HOTPLUG_CPU */ +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp) +{ +} + static void rcu_cleanup_dying_cpu(struct rcu_state *rsp) { } @@ -1524,9 +1579,6 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp) rcu_is_callbacks_kthread()); /* Update count, and requeue any remaining callbacks. */ - rdp->qlen_lazy -= count_lazy; - rdp->qlen -= count; - rdp->n_cbs_invoked += count; if (list != NULL) { *tail = rdp->nxtlist; rdp->nxtlist = list; @@ -1536,6 +1588,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp) else break; } + smp_mb(); /* List handling before counting for rcu_barrier(). */ + rdp->qlen_lazy -= count_lazy; + rdp->qlen -= count; + rdp->n_cbs_invoked += count; /* Reinstate batch limit if we have worked down the excess. */ if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark) @@ -1824,13 +1880,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu), rdp = this_cpu_ptr(rsp->rda); /* Add the callback to our list. */ - *rdp->nxttail[RCU_NEXT_TAIL] = head; - rdp->nxttail[RCU_NEXT_TAIL] = &head->next; rdp->qlen++; if (lazy) rdp->qlen_lazy++; else rcu_idle_count_callbacks_posted(); + smp_mb(); /* Count before adding callback for rcu_barrier(). */ + *rdp->nxttail[RCU_NEXT_TAIL] = head; + rdp->nxttail[RCU_NEXT_TAIL] = &head->next; if (__is_kfree_rcu_offset((unsigned long)func)) trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func, @@ -2169,11 +2226,10 @@ static int rcu_cpu_has_callbacks(int cpu) rcu_preempt_cpu_has_callbacks(cpu); } -static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL}; -static atomic_t rcu_barrier_cpu_count; -static DEFINE_MUTEX(rcu_barrier_mutex); -static struct completion rcu_barrier_completion; - +/* + * RCU callback function for _rcu_barrier(). If we are last, wake + * up the task executing _rcu_barrier(). + */ static void rcu_barrier_callback(struct rcu_head *notused) { if (atomic_dec_and_test(&rcu_barrier_cpu_count)) @@ -2203,27 +2259,94 @@ static void _rcu_barrier(struct rcu_state *rsp, void (*call_rcu_func)(struct rcu_head *head, void (*func)(struct rcu_head *head))) { - BUG_ON(in_interrupt()); + int cpu; + unsigned long flags; + struct rcu_data *rdp; + struct rcu_head rh; + + init_rcu_head_on_stack(&rh); + /* Take mutex to serialize concurrent rcu_barrier() requests. */ mutex_lock(&rcu_barrier_mutex); - init_completion(&rcu_barrier_completion); + + smp_mb(); /* Prevent any prior operations from leaking in. */ + /* - * Initialize rcu_barrier_cpu_count to 1, then invoke - * rcu_barrier_func() on each CPU, so that each CPU also has - * incremented rcu_barrier_cpu_count. Only then is it safe to - * decrement rcu_barrier_cpu_count -- otherwise the first CPU - * might complete its grace period before all of the other CPUs - * did their increment, causing this function to return too - * early. Note that on_each_cpu() disables irqs, which prevents - * any CPUs from coming online or going offline until each online - * CPU has queued its RCU-barrier callback. + * Initialize the count to one rather than to zero in order to + * avoid a too-soon return to zero in case of a short grace period + * (or preemption of this task). Also flag this task as doing + * an rcu_barrier(). This will prevent anyone else from adopting + * orphaned callbacks, which could cause otherwise failure if a + * CPU went offline and quickly came back online. To see this, + * consider the following sequence of events: + * + * 1. We cause CPU 0 to post an rcu_barrier_callback() callback. + * 2. CPU 1 goes offline, orphaning its callbacks. + * 3. CPU 0 adopts CPU 1's orphaned callbacks. + * 4. CPU 1 comes back online. + * 5. We cause CPU 1 to post an rcu_barrier_callback() callback. + * 6. Both rcu_barrier_callback() callbacks are invoked, awakening + * us -- but before CPU 1's orphaned callbacks are invoked!!! */ + init_completion(&rcu_barrier_completion); atomic_set(&rcu_barrier_cpu_count, 1); - on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1); + raw_spin_lock_irqsave(&rsp->onofflock, flags); + rsp->rcu_barrier_in_progress = current; + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); + + /* + * Force every CPU with callbacks to register a new callback + * that will tell us when all the preceding callbacks have + * been invoked. If an offline CPU has callbacks, wait for + * it to either come back online or to finish orphaning those + * callbacks. + */ + for_each_possible_cpu(cpu) { + preempt_disable(); + rdp = per_cpu_ptr(rsp->rda, cpu); + if (cpu_is_offline(cpu)) { + preempt_enable(); + while (cpu_is_offline(cpu) && ACCESS_ONCE(rdp->qlen)) + schedule_timeout_interruptible(1); + } else if (ACCESS_ONCE(rdp->qlen)) { + smp_call_function_single(cpu, rcu_barrier_func, + (void *)call_rcu_func, 1); + preempt_enable(); + } else { + preempt_enable(); + } + } + + /* + * Now that all online CPUs have rcu_barrier_callback() callbacks + * posted, we can adopt all of the orphaned callbacks and place + * an rcu_barrier_callback() callback after them. When that is done, + * we are guaranteed to have an rcu_barrier_callback() callback + * following every callback that could possibly have been + * registered before _rcu_barrier() was called. + */ + raw_spin_lock_irqsave(&rsp->onofflock, flags); + rcu_adopt_orphan_cbs(rsp); + rsp->rcu_barrier_in_progress = NULL; + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); + atomic_inc(&rcu_barrier_cpu_count); + smp_mb__after_atomic_inc(); /* Ensure atomic_inc() before callback. */ + call_rcu_func(&rh, rcu_barrier_callback); + + /* + * Now that we have an rcu_barrier_callback() callback on each + * CPU, and thus each counted, remove the initial count. + */ if (atomic_dec_and_test(&rcu_barrier_cpu_count)) complete(&rcu_barrier_completion); + + /* Wait for all rcu_barrier_callback() callbacks to be invoked. */ wait_for_completion(&rcu_barrier_completion); + + /* Other rcu_barrier() invocations can now safely proceed. */ mutex_unlock(&rcu_barrier_mutex); + + destroy_rcu_head_on_stack(&rh); } /** diff --git a/kernel/rcutree.h b/kernel/rcutree.h index 36ca28ecedc6..1e49c5685960 100644 --- a/kernel/rcutree.h +++ b/kernel/rcutree.h @@ -371,6 +371,17 @@ struct rcu_state { raw_spinlock_t onofflock; /* exclude on/offline and */ /* starting new GP. */ + struct rcu_head *orphan_nxtlist; /* Orphaned callbacks that */ + /* need a grace period. */ + struct rcu_head **orphan_nxttail; /* Tail of above. */ + struct rcu_head *orphan_donelist; /* Orphaned callbacks that */ + /* are ready to invoke. */ + struct rcu_head **orphan_donetail; /* Tail of above. */ + long qlen_lazy; /* Number of lazy callbacks. */ + long qlen; /* Total number of callbacks. */ + struct task_struct *rcu_barrier_in_progress; + /* Task doing rcu_barrier(), */ + /* or NULL if no barrier. */ raw_spinlock_t fqslock; /* Only one task forcing */ /* quiescent states. */ unsigned long jiffies_force_qs; /* Time at which to invoke */ diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c index ed459edeff43..d4bc16ddd1d4 100644 --- a/kernel/rcutree_trace.c +++ b/kernel/rcutree_trace.c @@ -271,13 +271,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp) gpnum = rsp->gpnum; seq_printf(m, "c=%lu g=%lu s=%d jfq=%ld j=%x " - "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n", + "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n", rsp->completed, gpnum, rsp->fqs_state, (long)(rsp->jiffies_force_qs - jiffies), (int)(jiffies & 0xffff), rsp->n_force_qs, rsp->n_force_qs_ngp, rsp->n_force_qs - rsp->n_force_qs_ngp, - rsp->n_force_qs_lh); + rsp->n_force_qs_lh, rsp->qlen_lazy, rsp->qlen); for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) { if (rnp->level != level) { seq_puts(m, "\n"); -- cgit v1.2.3