diff options
Diffstat (limited to 'kernel/rcu')
-rw-r--r-- | kernel/rcu/Kconfig | 8 | ||||
-rw-r--r-- | kernel/rcu/Kconfig.debug | 11 | ||||
-rw-r--r-- | kernel/rcu/rcu.h | 1 | ||||
-rw-r--r-- | kernel/rcu/rcu_segcblist.c | 174 | ||||
-rw-r--r-- | kernel/rcu/rcu_segcblist.h | 54 | ||||
-rw-r--r-- | kernel/rcu/rcuperf.c | 10 | ||||
-rw-r--r-- | kernel/rcu/rcutorture.c | 30 | ||||
-rw-r--r-- | kernel/rcu/srcutree.c | 5 | ||||
-rw-r--r-- | kernel/rcu/tree.c | 217 | ||||
-rw-r--r-- | kernel/rcu/tree.h | 81 | ||||
-rw-r--r-- | kernel/rcu/tree_exp.h | 8 | ||||
-rw-r--r-- | kernel/rcu/tree_plugin.h | 1195 | ||||
-rw-r--r-- | kernel/rcu/tree_stall.h | 15 | ||||
-rw-r--r-- | kernel/rcu/update.c | 105 |
14 files changed, 1217 insertions, 697 deletions
diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig index 480edf328b51..7644eda17d62 100644 --- a/kernel/rcu/Kconfig +++ b/kernel/rcu/Kconfig @@ -7,7 +7,7 @@ menu "RCU Subsystem" config TREE_RCU bool - default y if !PREEMPT && SMP + default y if !PREEMPTION && SMP help This option selects the RCU implementation that is designed for very large SMP system with hundreds or @@ -16,7 +16,7 @@ config TREE_RCU config PREEMPT_RCU bool - default y if PREEMPT + default y if PREEMPTION help This option selects the RCU implementation that is designed for very large SMP systems with hundreds or @@ -28,7 +28,7 @@ config PREEMPT_RCU config TINY_RCU bool - default y if !PREEMPT && !SMP + default y if !PREEMPTION && !SMP help This option selects the RCU implementation that is designed for UP systems from which real-time response @@ -70,7 +70,7 @@ config TREE_SRCU This option selects the full-fledged version of SRCU. config TASKS_RCU - def_bool PREEMPT + def_bool PREEMPTION select SRCU help This option enables a task-based RCU implementation that uses diff --git a/kernel/rcu/Kconfig.debug b/kernel/rcu/Kconfig.debug index 5ec3ea4028e2..4aa02eee8f6c 100644 --- a/kernel/rcu/Kconfig.debug +++ b/kernel/rcu/Kconfig.debug @@ -8,6 +8,17 @@ menu "RCU Debugging" config PROVE_RCU def_bool PROVE_LOCKING +config PROVE_RCU_LIST + bool "RCU list lockdep debugging" + depends on PROVE_RCU && RCU_EXPERT + default n + help + Enable RCU lockdep checking for list usages. By default it is + turned off since there are several list RCU users that still + need to be converted to pass a lockdep expression. To prevent + false-positive splats, we keep it default disabled but once all + users are converted, we can remove this config option. + config TORTURE_TEST tristate default n diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h index 5290b01de534..8fd4f82c9b3d 100644 --- a/kernel/rcu/rcu.h +++ b/kernel/rcu/rcu.h @@ -227,6 +227,7 @@ static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head) #ifdef CONFIG_RCU_STALL_COMMON +extern int rcu_cpu_stall_ftrace_dump; extern int rcu_cpu_stall_suppress; extern int rcu_cpu_stall_timeout; int rcu_jiffies_till_stall_check(void); diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c index 9bd5f6023c21..495c58ce1640 100644 --- a/kernel/rcu/rcu_segcblist.c +++ b/kernel/rcu/rcu_segcblist.c @@ -24,6 +24,49 @@ void rcu_cblist_init(struct rcu_cblist *rclp) } /* + * Enqueue an rcu_head structure onto the specified callback list. + * This function assumes that the callback is non-lazy because it + * is intended for use by no-CBs CPUs, which do not distinguish + * between lazy and non-lazy RCU callbacks. + */ +void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp) +{ + *rclp->tail = rhp; + rclp->tail = &rhp->next; + WRITE_ONCE(rclp->len, rclp->len + 1); +} + +/* + * Flush the second rcu_cblist structure onto the first one, obliterating + * any contents of the first. If rhp is non-NULL, enqueue it as the sole + * element of the second rcu_cblist structure, but ensuring that the second + * rcu_cblist structure, if initially non-empty, always appears non-empty + * throughout the process. If rdp is NULL, the second rcu_cblist structure + * is instead initialized to empty. + */ +void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp, + struct rcu_cblist *srclp, + struct rcu_head *rhp) +{ + drclp->head = srclp->head; + if (drclp->head) + drclp->tail = srclp->tail; + else + drclp->tail = &drclp->head; + drclp->len = srclp->len; + drclp->len_lazy = srclp->len_lazy; + if (!rhp) { + rcu_cblist_init(srclp); + } else { + rhp->next = NULL; + srclp->head = rhp; + srclp->tail = &rhp->next; + WRITE_ONCE(srclp->len, 1); + srclp->len_lazy = 0; + } +} + +/* * Dequeue the oldest rcu_head structure from the specified callback * list. This function assumes that the callback is non-lazy, but * the caller can later invoke rcu_cblist_dequeued_lazy() if it @@ -44,6 +87,67 @@ struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp) return rhp; } +/* Set the length of an rcu_segcblist structure. */ +void rcu_segcblist_set_len(struct rcu_segcblist *rsclp, long v) +{ +#ifdef CONFIG_RCU_NOCB_CPU + atomic_long_set(&rsclp->len, v); +#else + WRITE_ONCE(rsclp->len, v); +#endif +} + +/* + * Increase the numeric length of an rcu_segcblist structure by the + * specified amount, which can be negative. This can cause the ->len + * field to disagree with the actual number of callbacks on the structure. + * This increase is fully ordered with respect to the callers accesses + * both before and after. + */ +void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v) +{ +#ifdef CONFIG_RCU_NOCB_CPU + smp_mb__before_atomic(); /* Up to the caller! */ + atomic_long_add(v, &rsclp->len); + smp_mb__after_atomic(); /* Up to the caller! */ +#else + smp_mb(); /* Up to the caller! */ + WRITE_ONCE(rsclp->len, rsclp->len + v); + smp_mb(); /* Up to the caller! */ +#endif +} + +/* + * Increase the numeric length of an rcu_segcblist structure by one. + * This can cause the ->len field to disagree with the actual number of + * callbacks on the structure. This increase is fully ordered with respect + * to the callers accesses both before and after. + */ +void rcu_segcblist_inc_len(struct rcu_segcblist *rsclp) +{ + rcu_segcblist_add_len(rsclp, 1); +} + +/* + * Exchange the numeric length of the specified rcu_segcblist structure + * with the specified value. This can cause the ->len field to disagree + * with the actual number of callbacks on the structure. This exchange is + * fully ordered with respect to the callers accesses both before and after. + */ +long rcu_segcblist_xchg_len(struct rcu_segcblist *rsclp, long v) +{ +#ifdef CONFIG_RCU_NOCB_CPU + return atomic_long_xchg(&rsclp->len, v); +#else + long ret = rsclp->len; + + smp_mb(); /* Up to the caller! */ + WRITE_ONCE(rsclp->len, v); + smp_mb(); /* Up to the caller! */ + return ret; +#endif +} + /* * Initialize an rcu_segcblist structure. */ @@ -56,8 +160,9 @@ void rcu_segcblist_init(struct rcu_segcblist *rsclp) rsclp->head = NULL; for (i = 0; i < RCU_CBLIST_NSEGS; i++) rsclp->tails[i] = &rsclp->head; - rsclp->len = 0; + rcu_segcblist_set_len(rsclp, 0); rsclp->len_lazy = 0; + rsclp->enabled = 1; } /* @@ -69,7 +174,16 @@ void rcu_segcblist_disable(struct rcu_segcblist *rsclp) WARN_ON_ONCE(!rcu_segcblist_empty(rsclp)); WARN_ON_ONCE(rcu_segcblist_n_cbs(rsclp)); WARN_ON_ONCE(rcu_segcblist_n_lazy_cbs(rsclp)); - rsclp->tails[RCU_NEXT_TAIL] = NULL; + rsclp->enabled = 0; +} + +/* + * Mark the specified rcu_segcblist structure as offloaded. This + * structure must be empty. + */ +void rcu_segcblist_offload(struct rcu_segcblist *rsclp) +{ + rsclp->offloaded = 1; } /* @@ -118,6 +232,18 @@ struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp) } /* + * Return false if there are no CBs awaiting grace periods, otherwise, + * return true and store the nearest waited-upon grace period into *lp. + */ +bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp) +{ + if (!rcu_segcblist_pend_cbs(rsclp)) + return false; + *lp = rsclp->gp_seq[RCU_WAIT_TAIL]; + return true; +} + +/* * Enqueue the specified callback onto the specified rcu_segcblist * structure, updating accounting as needed. Note that the ->len * field may be accessed locklessly, hence the WRITE_ONCE(). @@ -129,13 +255,13 @@ struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp) void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp, struct rcu_head *rhp, bool lazy) { - WRITE_ONCE(rsclp->len, rsclp->len + 1); /* ->len sampled locklessly. */ + rcu_segcblist_inc_len(rsclp); if (lazy) rsclp->len_lazy++; smp_mb(); /* Ensure counts are updated before callback is enqueued. */ rhp->next = NULL; - *rsclp->tails[RCU_NEXT_TAIL] = rhp; - rsclp->tails[RCU_NEXT_TAIL] = &rhp->next; + WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp); + WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next); } /* @@ -155,7 +281,7 @@ bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp, if (rcu_segcblist_n_cbs(rsclp) == 0) return false; - WRITE_ONCE(rsclp->len, rsclp->len + 1); + rcu_segcblist_inc_len(rsclp); if (lazy) rsclp->len_lazy++; smp_mb(); /* Ensure counts are updated before callback is entrained. */ @@ -163,9 +289,9 @@ bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp, for (i = RCU_NEXT_TAIL; i > RCU_DONE_TAIL; i--) if (rsclp->tails[i] != rsclp->tails[i - 1]) break; - *rsclp->tails[i] = rhp; + WRITE_ONCE(*rsclp->tails[i], rhp); for (; i <= RCU_NEXT_TAIL; i++) - rsclp->tails[i] = &rhp->next; + WRITE_ONCE(rsclp->tails[i], &rhp->next); return true; } @@ -182,9 +308,8 @@ void rcu_segcblist_extract_count(struct rcu_segcblist *rsclp, struct rcu_cblist *rclp) { rclp->len_lazy += rsclp->len_lazy; - rclp->len += rsclp->len; rsclp->len_lazy = 0; - WRITE_ONCE(rsclp->len, 0); /* ->len sampled locklessly. */ + rclp->len = rcu_segcblist_xchg_len(rsclp, 0); } /* @@ -200,12 +325,12 @@ void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp, if (!rcu_segcblist_ready_cbs(rsclp)) return; /* Nothing to do. */ *rclp->tail = rsclp->head; - rsclp->head = *rsclp->tails[RCU_DONE_TAIL]; - *rsclp->tails[RCU_DONE_TAIL] = NULL; + WRITE_ONCE(rsclp->head, *rsclp->tails[RCU_DONE_TAIL]); + WRITE_ONCE(*rsclp->tails[RCU_DONE_TAIL], NULL); rclp->tail = rsclp->tails[RCU_DONE_TAIL]; for (i = RCU_CBLIST_NSEGS - 1; i >= RCU_DONE_TAIL; i--) if (rsclp->tails[i] == rsclp->tails[RCU_DONE_TAIL]) - rsclp->tails[i] = &rsclp->head; + WRITE_ONCE(rsclp->tails[i], &rsclp->head); } /* @@ -224,9 +349,9 @@ void rcu_segcblist_extract_pend_cbs(struct rcu_segcblist *rsclp, return; /* Nothing to do. */ *rclp->tail = *rsclp->tails[RCU_DONE_TAIL]; rclp->tail = rsclp->tails[RCU_NEXT_TAIL]; - *rsclp->tails[RCU_DONE_TAIL] = NULL; + WRITE_ONCE(*rsclp->tails[RCU_DONE_TAIL], NULL); for (i = RCU_DONE_TAIL + 1; i < RCU_CBLIST_NSEGS; i++) - rsclp->tails[i] = rsclp->tails[RCU_DONE_TAIL]; + WRITE_ONCE(rsclp->tails[i], rsclp->tails[RCU_DONE_TAIL]); } /* @@ -237,8 +362,7 @@ void rcu_segcblist_insert_count(struct rcu_segcblist *rsclp, struct rcu_cblist *rclp) { rsclp->len_lazy += rclp->len_lazy; - /* ->len sampled locklessly. */ - WRITE_ONCE(rsclp->len, rsclp->len + rclp->len); + rcu_segcblist_add_len(rsclp, rclp->len); rclp->len_lazy = 0; rclp->len = 0; } @@ -255,10 +379,10 @@ void rcu_segcblist_insert_done_cbs(struct rcu_segcblist *rsclp, if (!rclp->head) return; /* No callbacks to move. */ *rclp->tail = rsclp->head; - rsclp->head = rclp->head; + WRITE_ONCE(rsclp->head, rclp->head); for (i = RCU_DONE_TAIL; i < RCU_CBLIST_NSEGS; i++) if (&rsclp->head == rsclp->tails[i]) - rsclp->tails[i] = rclp->tail; + WRITE_ONCE(rsclp->tails[i], rclp->tail); else break; rclp->head = NULL; @@ -274,8 +398,8 @@ void rcu_segcblist_insert_pend_cbs(struct rcu_segcblist *rsclp, { if (!rclp->head) return; /* Nothing to do. */ - *rsclp->tails[RCU_NEXT_TAIL] = rclp->head; - rsclp->tails[RCU_NEXT_TAIL] = rclp->tail; + WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rclp->head); + WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], rclp->tail); rclp->head = NULL; rclp->tail = &rclp->head; } @@ -299,7 +423,7 @@ void rcu_segcblist_advance(struct rcu_segcblist *rsclp, unsigned long seq) for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) { if (ULONG_CMP_LT(seq, rsclp->gp_seq[i])) break; - rsclp->tails[RCU_DONE_TAIL] = rsclp->tails[i]; + WRITE_ONCE(rsclp->tails[RCU_DONE_TAIL], rsclp->tails[i]); } /* If no callbacks moved, nothing more need be done. */ @@ -308,7 +432,7 @@ void rcu_segcblist_advance(struct rcu_segcblist *rsclp, unsigned long seq) /* Clean up tail pointers that might have been misordered above. */ for (j = RCU_WAIT_TAIL; j < i; j++) - rsclp->tails[j] = rsclp->tails[RCU_DONE_TAIL]; + WRITE_ONCE(rsclp->tails[j], rsclp->tails[RCU_DONE_TAIL]); /* * Callbacks moved, so clean up the misordered ->tails[] pointers @@ -319,7 +443,7 @@ void rcu_segcblist_advance(struct rcu_segcblist *rsclp, unsigned long seq) for (j = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++, j++) { if (rsclp->tails[j] == rsclp->tails[RCU_NEXT_TAIL]) break; /* No more callbacks. */ - rsclp->tails[j] = rsclp->tails[i]; + WRITE_ONCE(rsclp->tails[j], rsclp->tails[i]); rsclp->gp_seq[j] = rsclp->gp_seq[i]; } } @@ -384,7 +508,7 @@ bool rcu_segcblist_accelerate(struct rcu_segcblist *rsclp, unsigned long seq) * structure other than in the RCU_NEXT_TAIL segment. */ for (; i < RCU_NEXT_TAIL; i++) { - rsclp->tails[i] = rsclp->tails[RCU_NEXT_TAIL]; + WRITE_ONCE(rsclp->tails[i], rsclp->tails[RCU_NEXT_TAIL]); rsclp->gp_seq[i] = seq; } return true; diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h index 71b64648464e..815c2fdd3fcc 100644 --- a/kernel/rcu/rcu_segcblist.h +++ b/kernel/rcu/rcu_segcblist.h @@ -9,6 +9,12 @@ #include <linux/rcu_segcblist.h> +/* Return number of callbacks in the specified callback list. */ +static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp) +{ + return READ_ONCE(rclp->len); +} + /* * Account for the fact that a previously dequeued callback turned out * to be marked as lazy. @@ -19,6 +25,10 @@ static inline void rcu_cblist_dequeued_lazy(struct rcu_cblist *rclp) } void rcu_cblist_init(struct rcu_cblist *rclp); +void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp); +void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp, + struct rcu_cblist *srclp, + struct rcu_head *rhp); struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp); /* @@ -36,13 +46,17 @@ struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp); */ static inline bool rcu_segcblist_empty(struct rcu_segcblist *rsclp) { - return !rsclp->head; + return !READ_ONCE(rsclp->head); } /* Return number of callbacks in segmented callback list. */ static inline long rcu_segcblist_n_cbs(struct rcu_segcblist *rsclp) { +#ifdef CONFIG_RCU_NOCB_CPU + return atomic_long_read(&rsclp->len); +#else return READ_ONCE(rsclp->len); +#endif } /* Return number of lazy callbacks in segmented callback list. */ @@ -54,16 +68,22 @@ static inline long rcu_segcblist_n_lazy_cbs(struct rcu_segcblist *rsclp) /* Return number of lazy callbacks in segmented callback list. */ static inline long rcu_segcblist_n_nonlazy_cbs(struct rcu_segcblist *rsclp) { - return rsclp->len - rsclp->len_lazy; + return rcu_segcblist_n_cbs(rsclp) - rsclp->len_lazy; } /* * Is the specified rcu_segcblist enabled, for example, not corresponding - * to an offline or callback-offloaded CPU? + * to an offline CPU? */ static inline bool rcu_segcblist_is_enabled(struct rcu_segcblist *rsclp) { - return !!rsclp->tails[RCU_NEXT_TAIL]; + return rsclp->enabled; +} + +/* Is the specified rcu_segcblist offloaded? */ +static inline bool rcu_segcblist_is_offloaded(struct rcu_segcblist *rsclp) +{ + return rsclp->offloaded; } /* @@ -73,36 +93,18 @@ static inline bool rcu_segcblist_is_enabled(struct rcu_segcblist *rsclp) */ static inline bool rcu_segcblist_restempty(struct rcu_segcblist *rsclp, int seg) { - return !*rsclp->tails[seg]; -} - -/* - * Interim function to return rcu_segcblist head pointer. Longer term, the - * rcu_segcblist will be used more pervasively, removing the need for this - * function. - */ -static inline struct rcu_head *rcu_segcblist_head(struct rcu_segcblist *rsclp) -{ - return rsclp->head; -} - -/* - * Interim function to return rcu_segcblist head pointer. Longer term, the - * rcu_segcblist will be used more pervasively, removing the need for this - * function. - */ -static inline struct rcu_head **rcu_segcblist_tail(struct rcu_segcblist *rsclp) -{ - WARN_ON_ONCE(rcu_segcblist_empty(rsclp)); - return rsclp->tails[RCU_NEXT_TAIL]; + return !READ_ONCE(*READ_ONCE(rsclp->tails[seg])); } +void rcu_segcblist_inc_len(struct rcu_segcblist *rsclp); void rcu_segcblist_init(struct rcu_segcblist *rsclp); void rcu_segcblist_disable(struct rcu_segcblist *rsclp); +void rcu_segcblist_offload(struct rcu_segcblist *rsclp); bool rcu_segcblist_ready_cbs(struct rcu_segcblist *rsclp); bool rcu_segcblist_pend_cbs(struct rcu_segcblist *rsclp); struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp); struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp); +bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp); void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp, struct rcu_head *rhp, bool lazy); bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp, diff --git a/kernel/rcu/rcuperf.c b/kernel/rcu/rcuperf.c index 7a6890b23c5f..5a879d073c1c 100644 --- a/kernel/rcu/rcuperf.c +++ b/kernel/rcu/rcuperf.c @@ -89,7 +89,7 @@ torture_param(int, writer_holdoff, 0, "Holdoff (us) between GPs, zero to disable static char *perf_type = "rcu"; module_param(perf_type, charp, 0444); -MODULE_PARM_DESC(perf_type, "Type of RCU to performance-test (rcu, rcu_bh, ...)"); +MODULE_PARM_DESC(perf_type, "Type of RCU to performance-test (rcu, srcu, ...)"); static int nrealreaders; static int nrealwriters; @@ -375,6 +375,14 @@ rcu_perf_writer(void *arg) if (holdoff) schedule_timeout_uninterruptible(holdoff * HZ); + /* + * Wait until rcu_end_inkernel_boot() is called for normal GP tests + * so that RCU is not always expedited for normal GP tests. + * The system_state test is approximate, but works well in practice. + */ + while (!gp_exp && system_state != SYSTEM_RUNNING) + schedule_timeout_uninterruptible(1); + t = ktime_get_mono_fast_ns(); if (atomic_inc_return(&n_rcu_perf_writer_started) >= nrealwriters) { t_rcu_perf_writer_started = t; diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c index fce4e7e6f502..3c9feca1eab1 100644 --- a/kernel/rcu/rcutorture.c +++ b/kernel/rcu/rcutorture.c @@ -161,6 +161,7 @@ static atomic_long_t n_rcu_torture_timers; static long n_barrier_attempts; static long n_barrier_successes; /* did rcu_barrier test succeed? */ static struct list_head rcu_torture_removed; +static unsigned long shutdown_jiffies; static int rcu_torture_writer_state; #define RTWS_FIXED_DELAY 0 @@ -228,6 +229,15 @@ static u64 notrace rcu_trace_clock_local(void) } #endif /* #else #ifdef CONFIG_RCU_TRACE */ +/* + * Stop aggressive CPU-hog tests a bit before the end of the test in order + * to avoid interfering with test shutdown. + */ +static bool shutdown_time_arrived(void) +{ + return shutdown_secs && time_after(jiffies, shutdown_jiffies - 30 * HZ); +} + static unsigned long boost_starttime; /* jiffies of next boost test start. */ static DEFINE_MUTEX(boost_mutex); /* protect setting boost_starttime */ /* and boost task create/destroy. */ @@ -1713,12 +1723,14 @@ static void rcu_torture_fwd_cb_cr(struct rcu_head *rhp) } // Give the scheduler a chance, even on nohz_full CPUs. -static void rcu_torture_fwd_prog_cond_resched(void) +static void rcu_torture_fwd_prog_cond_resched(unsigned long iter) { if (IS_ENABLED(CONFIG_PREEMPT) && IS_ENABLED(CONFIG_NO_HZ_FULL)) { - if (need_resched()) + // Real call_rcu() floods hit userspace, so emulate that. + if (need_resched() || (iter & 0xfff)) schedule(); } else { + // No userspace emulation: CB invocation throttles call_rcu() cond_resched(); } } @@ -1746,7 +1758,7 @@ static unsigned long rcu_torture_fwd_prog_cbfree(void) spin_unlock_irqrestore(&rcu_fwd_lock, flags); kfree(rfcp); freed++; - rcu_torture_fwd_prog_cond_resched(); + rcu_torture_fwd_prog_cond_resched(freed); } return freed; } @@ -1785,15 +1797,17 @@ static void rcu_torture_fwd_prog_nr(int *tested, int *tested_tries) WRITE_ONCE(rcu_fwd_startat, jiffies); stopat = rcu_fwd_startat + dur; while (time_before(jiffies, stopat) && + !shutdown_time_arrived() && !READ_ONCE(rcu_fwd_emergency_stop) && !torture_must_stop()) { idx = cur_ops->readlock(); udelay(10); cur_ops->readunlock(idx); if (!fwd_progress_need_resched || need_resched()) - rcu_torture_fwd_prog_cond_resched(); + rcu_torture_fwd_prog_cond_resched(1); } (*tested_tries)++; if (!time_before(jiffies, stopat) && + !shutdown_time_arrived() && !READ_ONCE(rcu_fwd_emergency_stop) && !torture_must_stop()) { (*tested)++; cver = READ_ONCE(rcu_torture_current_version) - cver; @@ -1852,6 +1866,7 @@ static void rcu_torture_fwd_prog_cr(void) gps = cur_ops->get_gp_seq(); rcu_launder_gp_seq_start = gps; while (time_before(jiffies, stopat) && + !shutdown_time_arrived() && !READ_ONCE(rcu_fwd_emergency_stop) && !torture_must_stop()) { rfcp = READ_ONCE(rcu_fwd_cb_head); rfcpn = NULL; @@ -1875,7 +1890,7 @@ static void rcu_torture_fwd_prog_cr(void) rfcp->rfc_gps = 0; } cur_ops->call(&rfcp->rh, rcu_torture_fwd_cb_cr); - rcu_torture_fwd_prog_cond_resched(); + rcu_torture_fwd_prog_cond_resched(n_launders + n_max_cbs); } stoppedat = jiffies; n_launders_cb_snap = READ_ONCE(n_launders_cb); @@ -1884,7 +1899,8 @@ static void rcu_torture_fwd_prog_cr(void) cur_ops->cb_barrier(); /* Wait for callbacks to be invoked. */ (void)rcu_torture_fwd_prog_cbfree(); - if (!torture_must_stop() && !READ_ONCE(rcu_fwd_emergency_stop)) { + if (!torture_must_stop() && !READ_ONCE(rcu_fwd_emergency_stop) && + !shutdown_time_arrived()) { WARN_ON(n_max_gps < MIN_FWD_CBS_LAUNDERED); pr_alert("%s Duration %lu barrier: %lu pending %ld n_launders: %ld n_launders_sa: %ld n_max_gps: %ld n_max_cbs: %ld cver %ld gps %ld\n", __func__, @@ -2160,6 +2176,7 @@ rcu_torture_cleanup(void) return; } + show_rcu_gp_kthreads(); rcu_torture_barrier_cleanup(); torture_stop_kthread(rcu_torture_fwd_prog, fwd_prog_task); torture_stop_kthread(rcu_torture_stall, stall_task); @@ -2465,6 +2482,7 @@ rcu_torture_init(void) goto unwind; rcutor_hp = firsterr; } + shutdown_jiffies = jiffies + shutdown_secs * HZ; firsterr = torture_shutdown_init(shutdown_secs, rcu_torture_cleanup); if (firsterr) goto unwind; diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c index cf0e886314f2..5dffade2d7cd 100644 --- a/kernel/rcu/srcutree.c +++ b/kernel/rcu/srcutree.c @@ -1279,8 +1279,9 @@ void srcu_torture_stats_print(struct srcu_struct *ssp, char *tt, char *tf) c0 = l0 - u0; c1 = l1 - u1; - pr_cont(" %d(%ld,%ld %1p)", - cpu, c0, c1, rcu_segcblist_head(&sdp->srcu_cblist)); + pr_cont(" %d(%ld,%ld %c)", + cpu, c0, c1, + "C."[rcu_segcblist_empty(&sdp->srcu_cblist)]); s0 += c0; s1 += c1; } diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index a14e5fbbea46..81105141b6a8 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -56,6 +56,7 @@ #include <linux/smpboot.h> #include <linux/jiffies.h> #include <linux/sched/isolation.h> +#include <linux/sched/clock.h> #include "../time/tick-internal.h" #include "tree.h" @@ -210,9 +211,9 @@ static long rcu_get_n_cbs_cpu(int cpu) { struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); - if (rcu_segcblist_is_enabled(&rdp->cblist)) /* Online normal CPU? */ + if (rcu_segcblist_is_enabled(&rdp->cblist)) return rcu_segcblist_n_cbs(&rdp->cblist); - return rcu_get_n_cbs_nocb_cpu(rdp); /* Works for offline, too. */ + return 0; } void rcu_softirq_qs(void) @@ -416,6 +417,12 @@ module_param(qlowmark, long, 0444); static ulong jiffies_till_first_fqs = ULONG_MAX; static ulong jiffies_till_next_fqs = ULONG_MAX; static bool rcu_kick_kthreads; +static int rcu_divisor = 7; +module_param(rcu_divisor, int, 0644); + +/* Force an exit from rcu_do_batch() after 3 milliseconds. */ +static long rcu_resched_ns = 3 * NSEC_PER_MSEC; +module_param(rcu_resched_ns, long, 0644); /* * How long the grace period must be before we start recruiting @@ -1251,6 +1258,7 @@ static bool rcu_accelerate_cbs(struct rcu_node *rnp, struct rcu_data *rdp) unsigned long gp_seq_req; bool ret = false; + rcu_lockdep_assert_cblist_protected(rdp); raw_lockdep_assert_held_rcu_node(rnp); /* If no pending (not yet ready to invoke) callbacks, nothing to do. */ @@ -1292,7 +1300,7 @@ static void rcu_accelerate_cbs_unlocked(struct rcu_node *rnp, unsigned long c; bool needwake; - lockdep_assert_irqs_disabled(); + rcu_lockdep_assert_cblist_protected(rdp); c = rcu_seq_snap(&rcu_state.gp_seq); if (!rdp->gpwrap && ULONG_CMP_GE(rdp->gp_seq_needed, c)) { /* Old request still live, so mark recent callbacks. */ @@ -1318,6 +1326,7 @@ static void rcu_accelerate_cbs_unlocked(struct rcu_node *rnp, */ static bool rcu_advance_cbs(struct rcu_node *rnp, struct rcu_data *rdp) { + rcu_lockdep_assert_cblist_protected(rdp); raw_lockdep_assert_held_rcu_node(rnp); /* If no pending (not yet ready to invoke) callbacks, nothing to do. */ @@ -1335,6 +1344,21 @@ static bool rcu_advance_cbs(struct rcu_node *rnp, struct rcu_data *rdp) } /* + * Move and classify callbacks, but only if doing so won't require + * that the RCU grace-period kthread be awakened. + */ +static void __maybe_unused rcu_advance_cbs_nowake(struct rcu_node *rnp, + struct rcu_data *rdp) +{ + rcu_lockdep_assert_cblist_protected(rdp); + if (!rcu_seq_state(rcu_seq_current(&rnp->gp_seq)) || + !raw_spin_trylock_rcu_node(rnp)) + return; + WARN_ON_ONCE(rcu_advance_cbs(rnp, rdp)); + raw_spin_unlock_rcu_node(rnp); +} + +/* * Update CPU-local rcu_data state to record the beginnings and ends of * grace periods. The caller must hold the ->lock of the leaf rcu_node * structure corresponding to the current CPU, and must have irqs disabled. @@ -1342,8 +1366,10 @@ static bool rcu_advance_cbs(struct rcu_node *rnp, struct rcu_data *rdp) */ static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp) { - bool ret; + bool ret = false; bool need_gp; + const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) && + rcu_segcblist_is_offloaded(&rdp->cblist); raw_lockdep_assert_held_rcu_node(rnp); @@ -1353,10 +1379,12 @@ static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp) /* Handle the ends of any preceding grace periods first. */ if (rcu_seq_completed_gp(rdp->gp_seq, rnp->gp_seq) || unlikely(READ_ONCE(rdp->gpwrap))) { - ret = rcu_advance_cbs(rnp, rdp); /* Advance callbacks. */ + if (!offloaded) + ret = rcu_advance_cbs(rnp, rdp); /* Advance CBs. */ trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("cpuend")); } else { - ret = rcu_accelerate_cbs(rnp, rdp); /* Recent callbacks. */ + if (!offloaded) + ret = rcu_accelerate_cbs(rnp, rdp); /* Recent CBs. */ } /* Now handle the beginnings of any new-to-this-CPU grace periods. */ @@ -1657,6 +1685,7 @@ static void rcu_gp_cleanup(void) unsigned long gp_duration; bool needgp = false; unsigned long new_gp_seq; + bool offloaded; struct rcu_data *rdp; struct rcu_node *rnp = rcu_get_root(); struct swait_queue_head *sq; @@ -1722,7 +1751,9 @@ static void rcu_gp_cleanup(void) needgp = true; } /* Advance CBs to reduce false positives below. */ - if (!rcu_accelerate_cbs(rnp, rdp) && needgp) { + offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) && + rcu_segcblist_is_offloaded(&rdp->cblist); + if ((offloaded || !rcu_accelerate_cbs(rnp, rdp)) && needgp) { WRITE_ONCE(rcu_state.gp_flags, RCU_GP_FLAG_INIT); rcu_state.gp_req_activity = jiffies; trace_rcu_grace_period(rcu_state.name, @@ -1881,7 +1912,7 @@ rcu_report_unblock_qs_rnp(struct rcu_node *rnp, unsigned long flags) struct rcu_node *rnp_p; raw_lockdep_assert_held_rcu_node(rnp); - if (WARN_ON_ONCE(!IS_ENABLED(CONFIG_PREEMPT)) || + if (WARN_ON_ONCE(!IS_ENABLED(CONFIG_PREEMPTION)) || WARN_ON_ONCE(rcu_preempt_blocked_readers_cgp(rnp)) || rnp->qsmask != 0) { raw_spin_unlock_irqrestore_rcu_node(rnp, flags); @@ -1916,7 +1947,9 @@ rcu_report_qs_rdp(int cpu, struct rcu_data *rdp) { unsigned long flags; unsigned long mask; - bool needwake; + bool needwake = false; + const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) && + rcu_segcblist_is_offloaded(&rdp->cblist); struct rcu_node *rnp; rnp = rdp->mynode; @@ -1943,7 +1976,8 @@ rcu_report_qs_rdp(int cpu, struct rcu_data *rdp) * This GP can't end until cpu checks in, so all of our * callbacks can be processed during the next GP. */ - needwake = rcu_accelerate_cbs(rnp, rdp); + if (!offloaded) + needwake = rcu_accelerate_cbs(rnp, rdp); rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags); /* ^^^ Released rnp->lock */ @@ -2077,9 +2111,12 @@ int rcutree_dead_cpu(unsigned int cpu) static void rcu_do_batch(struct rcu_data *rdp) { unsigned long flags; + const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) && + rcu_segcblist_is_offloaded(&rdp->cblist); struct rcu_head *rhp; struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl); long bl, count; + long pending, tlimit = 0; /* If no callbacks are ready, just return. */ if (!rcu_segcblist_ready_cbs(&rdp->cblist)) { @@ -2099,13 +2136,19 @@ static void rcu_do_batch(struct rcu_data *rdp) * callback counts, as rcu_barrier() needs to be conservative. */ local_irq_save(flags); + rcu_nocb_lock(rdp); WARN_ON_ONCE(cpu_is_offline(smp_processor_id())); - bl = rdp->blimit; + pending = rcu_segcblist_n_cbs(&rdp->cblist); + bl = max(rdp->blimit, pending >> rcu_divisor); + if (unlikely(bl > 100)) + tlimit = local_clock() + rcu_resched_ns; trace_rcu_batch_start(rcu_state.name, rcu_segcblist_n_lazy_cbs(&rdp->cblist), rcu_segcblist_n_cbs(&rdp->cblist), bl); rcu_segcblist_extract_done_cbs(&rdp->cblist, &rcl); - local_irq_restore(flags); + if (offloaded) + rdp->qlen_last_fqs_check = rcu_segcblist_n_cbs(&rdp->cblist); + rcu_nocb_unlock_irqrestore(rdp, flags); /* Invoke callbacks. */ rhp = rcu_cblist_dequeue(&rcl); @@ -2117,13 +2160,29 @@ static void rcu_do_batch(struct rcu_data *rdp) * Stop only if limit reached and CPU has something to do. * Note: The rcl structure counts down from zero. */ - if (-rcl.len >= bl && + if (-rcl.len >= bl && !offloaded && (need_resched() || (!is_idle_task(current) && !rcu_is_callbacks_kthread()))) break; + if (unlikely(tlimit)) { + /* only call local_clock() every 32 callbacks */ + if (likely((-rcl.len & 31) || local_clock() < tlimit)) + continue; + /* Exceeded the time limit, so leave. */ + break; + } + if (offloaded) { + WARN_ON_ONCE(in_serving_softirq()); + local_bh_enable(); + lockdep_assert_irqs_enabled(); + cond_resched_tasks_rcu_qs(); + lockdep_assert_irqs_enabled(); + local_bh_disable(); + } } local_irq_save(flags); + rcu_nocb_lock(rdp); count = -rcl.len; trace_rcu_batch_end(rcu_state.name, count, !!rcl.head, need_resched(), is_idle_task(current), rcu_is_callbacks_kthread()); @@ -2149,12 +2208,14 @@ static void rcu_do_batch(struct rcu_data *rdp) * The following usually indicates a double call_rcu(). To track * this down, try building with CONFIG_DEBUG_OBJECTS_RCU_HEAD=y. */ - WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) != (count == 0)); + WARN_ON_ONCE(count == 0 && !rcu_segcblist_empty(&rdp->cblist)); + WARN_ON_ONCE(!IS_ENABLED(CONFIG_RCU_NOCB_CPU) && + count != 0 && rcu_segcblist_empty(&rdp->cblist)); - local_irq_restore(flags); + rcu_nocb_unlock_irqrestore(rdp, flags); /* Re-invoke RCU core processing if there are callbacks remaining. */ - if (rcu_segcblist_ready_cbs(&rdp->cblist)) + if (!offloaded && rcu_segcblist_ready_cbs(&rdp->cblist)) invoke_rcu_core(); } @@ -2205,7 +2266,7 @@ static void force_qs_rnp(int (*f)(struct rcu_data *rdp)) mask = 0; raw_spin_lock_irqsave_rcu_node(rnp, flags); if (rnp->qsmask == 0) { - if (!IS_ENABLED(CONFIG_PREEMPT) || + if (!IS_ENABLED(CONFIG_PREEMPTION) || rcu_preempt_blocked_readers_cgp(rnp)) { /* * No point in scanning bits because they @@ -2280,6 +2341,8 @@ static __latent_entropy void rcu_core(void) unsigned long flags; struct rcu_data *rdp = raw_cpu_ptr(&rcu_data); struct rcu_node *rnp = rdp->mynode; + const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) && + rcu_segcblist_is_offloaded(&rdp->cblist); if (cpu_is_offline(smp_processor_id())) return; @@ -2299,7 +2362,7 @@ static __latent_entropy void rcu_core(void) /* No grace period and unregistered callbacks? */ if (!rcu_gp_in_progress() && - rcu_segcblist_is_enabled(&rdp->cblist)) { + rcu_segcblist_is_enabled(&rdp->cblist) && !offloaded) { local_irq_save(flags); if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL)) rcu_accelerate_cbs_unlocked(rnp, rdp); @@ -2309,7 +2372,7 @@ static __latent_entropy void rcu_core(void) rcu_check_gp_start_stall(rnp, rdp, rcu_jiffies_till_stall_check()); /* If there are callbacks ready, invoke them. */ - if (rcu_segcblist_ready_cbs(&rdp->cblist) && + if (!offloaded && rcu_segcblist_ready_cbs(&rdp->cblist) && likely(READ_ONCE(rcu_scheduler_fully_active))) rcu_do_batch(rdp); @@ -2489,10 +2552,11 @@ static void rcu_leak_callback(struct rcu_head *rhp) * is expected to specify a CPU. */ static void -__call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy) +__call_rcu(struct rcu_head *head, rcu_callback_t func, bool lazy) { unsigned long flags; struct rcu_data *rdp; + bool was_alldone; /* Misaligned rcu_head! */ WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1)); @@ -2514,28 +2578,18 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy) rdp = this_cpu_ptr(&rcu_data); /* Add the callback to our list. */ - if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist)) || cpu != -1) { - int offline; - - if (cpu != -1) - rdp = per_cpu_ptr(&rcu_data, cpu); - if (likely(rdp->mynode)) { - /* Post-boot, so this should be for a no-CBs CPU. */ - offline = !__call_rcu_nocb(rdp, head, lazy, flags); - WARN_ON_ONCE(offline); - /* Offline CPU, _call_rcu() illegal, leak callback. */ - local_irq_restore(flags); - return; - } - /* - * Very early boot, before rcu_init(). Initialize if needed - * and then drop through to queue the callback. - */ - WARN_ON_ONCE(cpu != -1); + if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist))) { + // This can trigger due to call_rcu() from offline CPU: + WARN_ON_ONCE(rcu_scheduler_active != RCU_SCHEDULER_INACTIVE); WARN_ON_ONCE(!rcu_is_watching()); + // Very early boot, before rcu_init(). Initialize if needed + // and then drop through to queue the callback. if (rcu_segcblist_empty(&rdp->cblist)) rcu_segcblist_init(&rdp->cblist); } + if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags)) + return; // Enqueued onto ->nocb_bypass, so just leave. + /* If we get here, rcu_nocb_try_bypass() acquired ->nocb_lock. */ rcu_segcblist_enqueue(&rdp->cblist, head, lazy); if (__is_kfree_rcu_offset((unsigned long)func)) trace_rcu_kfree_callback(rcu_state.name, head, @@ -2548,8 +2602,13 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy) rcu_segcblist_n_cbs(&rdp->cblist)); /* Go handle any RCU core processing required. */ - __call_rcu_core(rdp, head, flags); - local_irq_restore(flags); + if (IS_ENABLED(CONFIG_RCU_NOCB_CPU) && + unlikely(rcu_segcblist_is_offloaded(&rdp->cblist))) { + __call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */ + } else { + __call_rcu_core(rdp, head, flags); + local_irq_restore(flags); + } } /** @@ -2589,7 +2648,7 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy) */ void call_rcu(struct rcu_head *head, rcu_callback_t func) { - __call_rcu(head, func, -1, 0); + __call_rcu(head, func, 0); } EXPORT_SYMBOL_GPL(call_rcu); @@ -2602,7 +2661,7 @@ EXPORT_SYMBOL_GPL(call_rcu); */ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func) { - __call_rcu(head, func, -1, 1); + __call_rcu(head, func, 1); } EXPORT_SYMBOL_GPL(kfree_call_rcu); @@ -2622,7 +2681,7 @@ static int rcu_blocking_is_gp(void) { int ret; - if (IS_ENABLED(CONFIG_PREEMPT)) + if (IS_ENABLED(CONFIG_PREEMPTION)) return rcu_scheduler_active == RCU_SCHEDULER_INACTIVE; might_sleep(); /* Check for RCU read-side critical section. */ preempt_disable(); @@ -2735,6 +2794,10 @@ static int rcu_pending(void) /* Check for CPU stalls, if enabled. */ check_cpu_stall(rdp); + /* Does this CPU need a deferred NOCB wakeup? */ + if (rcu_nocb_need_deferred_wakeup(rdp)) + return 1; + /* Is this CPU a NO_HZ_FULL CPU that should ignore RCU? */ if (rcu_nohz_full_cpu()) return 0; @@ -2750,6 +2813,8 @@ static int rcu_pending(void) /* Has RCU gone idle with this CPU needing another grace period? */ if (!rcu_gp_in_progress() && rcu_segcblist_is_enabled(&rdp->cblist) && + (!IS_ENABLED(CONFIG_RCU_NOCB_CPU) || + !rcu_segcblist_is_offloaded(&rdp->cblist)) && !rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL)) return 1; @@ -2758,10 +2823,6 @@ static int rcu_pending(void) unlikely(READ_ONCE(rdp->gpwrap))) /* outside lock */ return 1; - /* Does this CPU need a deferred NOCB wakeup? */ - if (rcu_nocb_need_deferred_wakeup(rdp)) - return 1; - /* nothing to do */ return 0; } @@ -2801,6 +2862,8 @@ static void rcu_barrier_func(void *unused) rcu_barrier_trace(TPS("IRQ"), -1, rcu_state.barrier_sequence); rdp->barrier_head.func = rcu_barrier_callback; debug_rcu_head_queue(&rdp->barrier_head); + rcu_nocb_lock(rdp); + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head, 0)) { atomic_inc(&rcu_state.barrier_cpu_count); } else { @@ -2808,6 +2871,7 @@ static void rcu_barrier_func(void *unused) rcu_barrier_trace(TPS("IRQNQ"), -1, rcu_state.barrier_sequence); } + rcu_nocb_unlock(rdp); } /** @@ -2858,22 +2922,11 @@ void rcu_barrier(void) * corresponding CPU's preceding callbacks have been invoked. */ for_each_possible_cpu(cpu) { - if (!cpu_online(cpu) && !rcu_is_nocb_cpu(cpu)) - continue; rdp = per_cpu_ptr(&rcu_data, cpu); - if (rcu_is_nocb_cpu(cpu)) { - if (!rcu_nocb_cpu_needs_barrier(cpu)) { - rcu_barrier_trace(TPS("OfflineNoCB"), cpu, - rcu_state.barrier_sequence); - } else { - rcu_barrier_trace(TPS("OnlineNoCB"), cpu, - rcu_state.barrier_sequence); - smp_mb__before_atomic(); - atomic_inc(&rcu_state.barrier_cpu_count); - __call_rcu(&rdp->barrier_head, - rcu_barrier_callback, cpu, 0); - } - } else if (rcu_segcblist_n_cbs(&rdp->cblist)) { + if (!cpu_online(cpu) && + !rcu_segcblist_is_offloaded(&rdp->cblist)) + continue; + if (rcu_segcblist_n_cbs(&rdp->cblist)) { rcu_barrier_trace(TPS("OnlineQ"), cpu, rcu_state.barrier_sequence); smp_call_function_single(cpu, rcu_barrier_func, NULL, 1); @@ -2958,7 +3011,8 @@ rcu_boot_init_percpu_data(int cpu) * Initializes a CPU's per-CPU RCU data. Note that only one online or * offline event can be happening at a given time. Note also that we can * accept some slop in the rsp->gp_seq access due to the fact that this - * CPU cannot possibly have any RCU callbacks in flight yet. + * CPU cannot possibly have any non-offloaded RCU callbacks in flight yet. + * And any offloaded callbacks are being numbered elsewhere. */ int rcutree_prepare_cpu(unsigned int cpu) { @@ -2972,7 +3026,7 @@ int rcutree_prepare_cpu(unsigned int cpu) rdp->n_force_qs_snap = rcu_state.n_force_qs; rdp->blimit = blimit; if (rcu_segcblist_empty(&rdp->cblist) && /* No early-boot CBs? */ - !init_nocb_callback_list(rdp)) + !rcu_segcblist_is_offloaded(&rdp->cblist)) rcu_segcblist_init(&rdp->cblist); /* Re-enable callbacks. */ rdp->dynticks_nesting = 1; /* CPU not up, no tearing. */ rcu_dynticks_eqs_online(); @@ -3151,29 +3205,38 @@ void rcutree_migrate_callbacks(int cpu) { unsigned long flags; struct rcu_data *my_rdp; + struct rcu_node *my_rnp; struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); - struct rcu_node *rnp_root = rcu_get_root(); bool needwake; - if (rcu_is_nocb_cpu(cpu) || rcu_segcblist_empty(&rdp->cblist)) + if (rcu_segcblist_is_offloaded(&rdp->cblist) || + rcu_segcblist_empty(&rdp->cblist)) return; /* No callbacks to migrate. */ local_irq_save(flags); my_rdp = this_cpu_ptr(&rcu_data); - if (rcu_nocb_adopt_orphan_cbs(my_rdp, rdp, flags)) { - local_irq_restore(flags); - return; - } - raw_spin_lock_rcu_node(rnp_root); /* irqs already disabled. */ + my_rnp = my_rdp->mynode; + rcu_nocb_lock(my_rdp); /* irqs already disabled. */ + WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies)); + raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */ /* Leverage recent GPs and set GP for new callbacks. */ - needwake = rcu_advance_cbs(rnp_root, rdp) || - rcu_advance_cbs(rnp_root, my_rdp); + needwake = rcu_advance_cbs(my_rnp, rdp) || + rcu_advance_cbs(my_rnp, my_rdp); rcu_segcblist_merge(&my_rdp->cblist, &rdp->cblist); + needwake = needwake || rcu_advance_cbs(my_rnp, my_rdp); + rcu_segcblist_disable(&rdp->cblist); WARN_ON_ONCE(rcu_segcblist_empty(&my_rdp->cblist) != !rcu_segcblist_n_cbs(&my_rdp->cblist)); - raw_spin_unlock_irqrestore_rcu_node(rnp_root, flags); + if (rcu_segcblist_is_offloaded(&my_rdp->cblist)) { + raw_spin_unlock_rcu_node(my_rnp); /* irqs remain disabled. */ + __call_rcu_nocb_wake(my_rdp, true, flags); + } else { + rcu_nocb_unlock(my_rdp); /* irqs remain disabled. */ + raw_spin_unlock_irqrestore_rcu_node(my_rnp, flags); + } if (needwake) rcu_gp_kthread_wake(); + lockdep_assert_irqs_enabled(); WARN_ONCE(rcu_segcblist_n_cbs(&rdp->cblist) != 0 || !rcu_segcblist_empty(&rdp->cblist), "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 1stCB=%p\n", @@ -3234,13 +3297,13 @@ static int __init rcu_spawn_gp_kthread(void) t = kthread_create(rcu_gp_kthread, NULL, "%s", rcu_state.name); if (WARN_ONCE(IS_ERR(t), "%s: Could not start grace-period kthread, OOM is now expected behavior\n", __func__)) return 0; - rnp = rcu_get_root(); - raw_spin_lock_irqsave_rcu_node(rnp, flags); - rcu_state.gp_kthread = t; if (kthread_prio) { sp.sched_priority = kthread_prio; sched_setscheduler_nocheck(t, SCHED_FIFO, &sp); } + rnp = rcu_get_root(); + raw_spin_lock_irqsave_rcu_node(rnp, flags); + rcu_state.gp_kthread = t; raw_spin_unlock_irqrestore_rcu_node(rnp, flags); wake_up_process(t); rcu_spawn_nocb_kthreads(); diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h index 7acaf3a62d39..c612f306fe89 100644 --- a/kernel/rcu/tree.h +++ b/kernel/rcu/tree.h @@ -194,29 +194,38 @@ struct rcu_data { /* 5) Callback offloading. */ #ifdef CONFIG_RCU_NOCB_CPU - struct rcu_head *nocb_head; /* CBs waiting for kthread. */ - struct rcu_head **nocb_tail; - atomic_long_t nocb_q_count; /* # CBs waiting for nocb */ - atomic_long_t nocb_q_count_lazy; /* invocation (all stages). */ - struct rcu_head *nocb_follower_head; /* CBs ready to invoke. */ - struct rcu_head **nocb_follower_tail; - struct swait_queue_head nocb_wq; /* For nocb kthreads to sleep on. */ - struct task_struct *nocb_kthread; + struct swait_queue_head nocb_cb_wq; /* For nocb kthreads to sleep on. */ + struct task_struct *nocb_gp_kthread; raw_spinlock_t nocb_lock; /* Guard following pair of fields. */ + atomic_t nocb_lock_contended; /* Contention experienced. */ int nocb_defer_wakeup; /* Defer wakeup of nocb_kthread. */ struct timer_list nocb_timer; /* Enforce finite deferral. */ - - /* The following fields are used by the leader, hence own cacheline. */ - struct rcu_head *nocb_gp_head ____cacheline_internodealigned_in_smp; - /* CBs waiting for GP. */ - struct rcu_head **nocb_gp_tail; - bool nocb_leader_sleep; /* Is the nocb leader thread asleep? */ - struct rcu_data *nocb_next_follower; - /* Next follower in wakeup chain. */ - - /* The following fields are used by the follower, hence new cachline. */ - struct rcu_data *nocb_leader ____cacheline_internodealigned_in_smp; - /* Leader CPU takes GP-end wakeups. */ + unsigned long nocb_gp_adv_time; /* Last call_rcu() CB adv (jiffies). */ + + /* The following fields are used by call_rcu, hence own cacheline. */ + raw_spinlock_t nocb_bypass_lock ____cacheline_internodealigned_in_smp; + struct rcu_cblist nocb_bypass; /* Lock-contention-bypass CB list. */ + unsigned long nocb_bypass_first; /* Time (jiffies) of first enqueue. */ + unsigned long nocb_nobypass_last; /* Last ->cblist enqueue (jiffies). */ + int nocb_nobypass_count; /* # ->cblist enqueues at ^^^ time. */ + + /* The following fields are used by GP kthread, hence own cacheline. */ + raw_spinlock_t nocb_gp_lock ____cacheline_internodealigned_in_smp; + struct timer_list nocb_bypass_timer; /* Force nocb_bypass flush. */ + u8 nocb_gp_sleep; /* Is the nocb GP thread asleep? */ + u8 nocb_gp_bypass; /* Found a bypass on last scan? */ + u8 nocb_gp_gp; /* GP to wait for on last scan? */ + unsigned long nocb_gp_seq; /* If so, ->gp_seq to wait for. */ + unsigned long nocb_gp_loops; /* # passes through wait code. */ + struct swait_queue_head nocb_gp_wq; /* For nocb kthreads to sleep on. */ + bool nocb_cb_sleep; /* Is the nocb CB thread asleep? */ + struct task_struct *nocb_cb_kthread; + struct rcu_data *nocb_next_cb_rdp; + /* Next rcu_data in wakeup chain. */ + + /* The following fields are used by CB kthread, hence new cacheline. */ + struct rcu_data *nocb_gp_rdp ____cacheline_internodealigned_in_smp; + /* GP rdp takes GP-end wakeups. */ #endif /* #ifdef CONFIG_RCU_NOCB_CPU */ /* 6) RCU priority boosting. */ @@ -419,25 +428,39 @@ static bool rcu_preempt_has_tasks(struct rcu_node *rnp); static bool rcu_preempt_need_deferred_qs(struct task_struct *t); static void rcu_preempt_deferred_qs(struct task_struct *t); static void zero_cpu_stall_ticks(struct rcu_data *rdp); -static bool rcu_nocb_cpu_needs_barrier(int cpu); static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp); static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq); static void rcu_init_one_nocb(struct rcu_node *rnp); -static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, - bool lazy, unsigned long flags); -static bool rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp, - struct rcu_data *rdp, - unsigned long flags); +static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, + unsigned long j); +static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, + bool *was_alldone, unsigned long flags); +static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, + unsigned long flags); static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp); static void do_nocb_deferred_wakeup(struct rcu_data *rdp); static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp); static void rcu_spawn_cpu_nocb_kthread(int cpu); static void __init rcu_spawn_nocb_kthreads(void); +static void show_rcu_nocb_state(struct rcu_data *rdp); +static void rcu_nocb_lock(struct rcu_data *rdp); +static void rcu_nocb_unlock(struct rcu_data *rdp); +static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp, + unsigned long flags); +static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp); #ifdef CONFIG_RCU_NOCB_CPU static void __init rcu_organize_nocb_kthreads(void); -#endif /* #ifdef CONFIG_RCU_NOCB_CPU */ -static bool init_nocb_callback_list(struct rcu_data *rdp); -static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp); +#define rcu_nocb_lock_irqsave(rdp, flags) \ +do { \ + if (!rcu_segcblist_is_offloaded(&(rdp)->cblist)) \ + local_irq_save(flags); \ + else \ + raw_spin_lock_irqsave(&(rdp)->nocb_lock, (flags)); \ +} while (0) +#else /* #ifdef CONFIG_RCU_NOCB_CPU */ +#define rcu_nocb_lock_irqsave(rdp, flags) local_irq_save(flags) +#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */ + static void rcu_bind_gp_kthread(void); static bool rcu_nohz_full_cpu(void); static void rcu_dynticks_task_enter(void); diff --git a/kernel/rcu/tree_exp.h b/kernel/rcu/tree_exp.h index af7e7b9c86af..d632cd019597 100644 --- a/kernel/rcu/tree_exp.h +++ b/kernel/rcu/tree_exp.h @@ -781,7 +781,7 @@ static int rcu_print_task_exp_stall(struct rcu_node *rnp) * other hand, if the CPU is not in an RCU read-side critical section, * the IPI handler reports the quiescent state immediately. * - * Although this is a greate improvement over previous expedited + * Although this is a great improvement over previous expedited * implementations, it is still unfriendly to real-time workloads, so is * thus not recommended for any sort of common-case code. In fact, if * you are using synchronize_rcu_expedited() in a loop, please restructure @@ -792,6 +792,7 @@ static int rcu_print_task_exp_stall(struct rcu_node *rnp) */ void synchronize_rcu_expedited(void) { + bool boottime = (rcu_scheduler_active == RCU_SCHEDULER_INIT); struct rcu_exp_work rew; struct rcu_node *rnp; unsigned long s; @@ -817,7 +818,7 @@ void synchronize_rcu_expedited(void) return; /* Someone else did our work for us. */ /* Ensure that load happens before action based on it. */ - if (unlikely(rcu_scheduler_active == RCU_SCHEDULER_INIT)) { + if (unlikely(boottime)) { /* Direct call during scheduler init and early_initcalls(). */ rcu_exp_sel_wait_wake(s); } else { @@ -835,5 +836,8 @@ void synchronize_rcu_expedited(void) /* Let the next expedited grace period start. */ mutex_unlock(&rcu_state.exp_mutex); + + if (likely(!boottime)) + destroy_work_on_stack(&rew.rew_work); } EXPORT_SYMBOL_GPL(synchronize_rcu_expedited); diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h index acb225023ed1..2defc7fe74c3 100644 --- a/kernel/rcu/tree_plugin.h +++ b/kernel/rcu/tree_plugin.h @@ -288,7 +288,6 @@ void rcu_note_context_switch(bool preempt) struct rcu_data *rdp = this_cpu_ptr(&rcu_data); struct rcu_node *rnp; - barrier(); /* Avoid RCU read-side critical sections leaking down. */ trace_rcu_utilization(TPS("Start context switch")); lockdep_assert_irqs_disabled(); WARN_ON_ONCE(!preempt && t->rcu_read_lock_nesting > 0); @@ -314,15 +313,6 @@ void rcu_note_context_switch(bool preempt) ? rnp->gp_seq : rcu_seq_snap(&rnp->gp_seq)); rcu_preempt_ctxt_queue(rnp, rdp); - } else if (t->rcu_read_lock_nesting < 0 && - t->rcu_read_unlock_special.s) { - - /* - * Complete exit from RCU read-side critical section on - * behalf of preempted instance of __rcu_read_unlock(). - */ - rcu_read_unlock_special(t); - rcu_preempt_deferred_qs(t); } else { rcu_preempt_deferred_qs(t); } @@ -340,7 +330,6 @@ void rcu_note_context_switch(bool preempt) if (rdp->exp_deferred_qs) rcu_report_exp_rdp(rdp); trace_rcu_utilization(TPS("End context switch")); - barrier(); /* Avoid RCU read-side critical sections leaking up. */ } EXPORT_SYMBOL_GPL(rcu_note_context_switch); @@ -626,22 +615,18 @@ static void rcu_read_unlock_special(struct task_struct *t) (rdp->grpmask & rnp->expmask) || tick_nohz_full_cpu(rdp->cpu); // Need to defer quiescent state until everything is enabled. - if ((exp || in_irq()) && irqs_were_disabled && use_softirq && - (in_irq() || !t->rcu_read_unlock_special.b.deferred_qs)) { + if (irqs_were_disabled && use_softirq && + (in_interrupt() || + (exp && !t->rcu_read_unlock_special.b.deferred_qs))) { // Using softirq, safe to awaken, and we get // no help from enabling irqs, unlike bh/preempt. raise_softirq_irqoff(RCU_SOFTIRQ); - } else if (exp && irqs_were_disabled && !use_softirq && - !t->rcu_read_unlock_special.b.deferred_qs) { - // Safe to awaken and we get no help from enabling - // irqs, unlike bh/preempt. - invoke_rcu_core(); } else { // Enabling BH or preempt does reschedule, so... // Also if no expediting or NO_HZ_FULL, slow is OK. set_tsk_need_resched(current); set_preempt_need_resched(); - if (IS_ENABLED(CONFIG_IRQ_WORK) && + if (IS_ENABLED(CONFIG_IRQ_WORK) && irqs_were_disabled && !rdp->defer_qs_iw_pending && exp) { // Get scheduler to re-evaluate and call hooks. // If !IRQ_WORK, FQS scan will eventually IPI. @@ -828,11 +813,6 @@ static void rcu_qs(void) * dyntick-idle quiescent state visible to other CPUs, which will in * some cases serve for expedited as well as normal grace periods. * Either way, register a lightweight quiescent state. - * - * The barrier() calls are redundant in the common case when this is - * called externally, but just in case this is called from within this - * file. - * */ void rcu_all_qs(void) { @@ -847,14 +827,12 @@ void rcu_all_qs(void) return; } this_cpu_write(rcu_data.rcu_urgent_qs, false); - barrier(); /* Avoid RCU read-side critical sections leaking down. */ if (unlikely(raw_cpu_read(rcu_data.rcu_need_heavy_qs))) { local_irq_save(flags); rcu_momentary_dyntick_idle(); local_irq_restore(flags); } rcu_qs(); - barrier(); /* Avoid RCU read-side critical sections leaking up. */ preempt_enable(); } EXPORT_SYMBOL_GPL(rcu_all_qs); @@ -864,7 +842,6 @@ EXPORT_SYMBOL_GPL(rcu_all_qs); */ void rcu_note_context_switch(bool preempt) { - barrier(); /* Avoid RCU read-side critical sections leaking down. */ trace_rcu_utilization(TPS("Start context switch")); rcu_qs(); /* Load rcu_urgent_qs before other flags. */ @@ -877,7 +854,6 @@ void rcu_note_context_switch(bool preempt) rcu_tasks_qs(current); out: trace_rcu_utilization(TPS("End context switch")); - barrier(); /* Avoid RCU read-side critical sections leaking up. */ } EXPORT_SYMBOL_GPL(rcu_note_context_switch); @@ -1134,7 +1110,7 @@ static void rcu_preempt_boost_start_gp(struct rcu_node *rnp) * already exist. We only create this kthread for preemptible RCU. * Returns zero if all is well, a negated errno otherwise. */ -static int rcu_spawn_one_boost_kthread(struct rcu_node *rnp) +static void rcu_spawn_one_boost_kthread(struct rcu_node *rnp) { int rnp_index = rnp - rcu_get_root(); unsigned long flags; @@ -1142,25 +1118,27 @@ static int rcu_spawn_one_boost_kthread(struct rcu_node *rnp) struct task_struct *t; if (!IS_ENABLED(CONFIG_PREEMPT_RCU)) - return 0; + return; if (!rcu_scheduler_fully_active || rcu_rnp_online_cpus(rnp) == 0) - return 0; + return; rcu_state.boost = 1; + if (rnp->boost_kthread_task != NULL) - return 0; + return; + t = kthread_create(rcu_boost_kthread, (void *)rnp, "rcub/%d", rnp_index); - if (IS_ERR(t)) - return PTR_ERR(t); + if (WARN_ON_ONCE(IS_ERR(t))) + return; + raw_spin_lock_irqsave_rcu_node(rnp, flags); rnp->boost_kthread_task = t; raw_spin_unlock_irqrestore_rcu_node(rnp, flags); sp.sched_priority = kthread_prio; sched_setscheduler_nocheck(t, SCHED_FIFO, &sp); wake_up_process(t); /* get to TASK_INTERRUPTIBLE quickly. */ - return 0; } /* @@ -1201,7 +1179,7 @@ static void __init rcu_spawn_boost_kthreads(void) struct rcu_node *rnp; rcu_for_each_leaf_node(rnp) - (void)rcu_spawn_one_boost_kthread(rnp); + rcu_spawn_one_boost_kthread(rnp); } static void rcu_prepare_kthreads(int cpu) @@ -1211,7 +1189,7 @@ static void rcu_prepare_kthreads(int cpu) /* Fire up the incoming CPU's kthread and leaf rcu_node kthread. */ if (rcu_scheduler_fully_active) - (void)rcu_spawn_one_boost_kthread(rnp); + rcu_spawn_one_boost_kthread(rnp); } #else /* #ifdef CONFIG_RCU_BOOST */ @@ -1248,10 +1226,10 @@ static void rcu_prepare_kthreads(int cpu) #if !defined(CONFIG_RCU_FAST_NO_HZ) /* - * Check to see if any future RCU-related work will need to be done - * by the current CPU, even if none need be done immediately, returning - * 1 if so. This function is part of the RCU implementation; it is -not- - * an exported member of the RCU API. + * Check to see if any future non-offloaded RCU-related work will need + * to be done by the current CPU, even if none need be done immediately, + * returning 1 if so. This function is part of the RCU implementation; + * it is -not- an exported member of the RCU API. * * Because we not have RCU_FAST_NO_HZ, just check whether or not this * CPU has RCU callbacks queued. @@ -1259,7 +1237,8 @@ static void rcu_prepare_kthreads(int cpu) int rcu_needs_cpu(u64 basemono, u64 *nextevt) { *nextevt = KTIME_MAX; - return !rcu_segcblist_empty(&this_cpu_ptr(&rcu_data)->cblist); + return !rcu_segcblist_empty(&this_cpu_ptr(&rcu_data)->cblist) && + !rcu_segcblist_is_offloaded(&this_cpu_ptr(&rcu_data)->cblist); } /* @@ -1360,8 +1339,9 @@ int rcu_needs_cpu(u64 basemono, u64 *nextevt) lockdep_assert_irqs_disabled(); - /* If no callbacks, RCU doesn't need the CPU. */ - if (rcu_segcblist_empty(&rdp->cblist)) { + /* If no non-offloaded callbacks, RCU doesn't need the CPU. */ + if (rcu_segcblist_empty(&rdp->cblist) || + rcu_segcblist_is_offloaded(&this_cpu_ptr(&rcu_data)->cblist)) { *nextevt = KTIME_MAX; return 0; } @@ -1404,7 +1384,7 @@ static void rcu_prepare_for_idle(void) int tne; lockdep_assert_irqs_disabled(); - if (rcu_is_nocb_cpu(smp_processor_id())) + if (rcu_segcblist_is_offloaded(&rdp->cblist)) return; /* Handle nohz enablement switches conservatively. */ @@ -1453,8 +1433,10 @@ static void rcu_prepare_for_idle(void) */ static void rcu_cleanup_after_idle(void) { + struct rcu_data *rdp = this_cpu_ptr(&rcu_data); + lockdep_assert_irqs_disabled(); - if (rcu_is_nocb_cpu(smp_processor_id())) + if (rcu_segcblist_is_offloaded(&rdp->cblist)) return; if (rcu_try_advance_all_cbs()) invoke_rcu_core(); @@ -1469,10 +1451,10 @@ static void rcu_cleanup_after_idle(void) * specified by rcu_nocb_mask. For the CPUs in the set, there are kthreads * created that pull the callbacks from the corresponding CPU, wait for * a grace period to elapse, and invoke the callbacks. These kthreads - * are organized into leaders, which manage incoming callbacks, wait for - * grace periods, and awaken followers, and the followers, which only - * invoke callbacks. Each leader is its own follower. The no-CBs CPUs - * do a wake_up() on their kthread when they insert a callback into any + * are organized into GP kthreads, which manage incoming callbacks, wait for + * grace periods, and awaken CB kthreads, and the CB kthreads, which only + * invoke callbacks. Each GP kthread invokes its own CBs. The no-CBs CPUs + * do a wake_up() on their GP kthread when they insert a callback into any * empty list, unless the rcu_nocb_poll boot parameter has been specified, * in which case each kthread actively polls its CPU. (Which isn't so great * for energy efficiency, but which does reduce RCU's overhead on that CPU.) @@ -1515,6 +1497,116 @@ static int __init parse_rcu_nocb_poll(char *arg) early_param("rcu_nocb_poll", parse_rcu_nocb_poll); /* + * Don't bother bypassing ->cblist if the call_rcu() rate is low. + * After all, the main point of bypassing is to avoid lock contention + * on ->nocb_lock, which only can happen at high call_rcu() rates. + */ +int nocb_nobypass_lim_per_jiffy = 16 * 1000 / HZ; +module_param(nocb_nobypass_lim_per_jiffy, int, 0); + +/* + * Acquire the specified rcu_data structure's ->nocb_bypass_lock. If the + * lock isn't immediately available, increment ->nocb_lock_contended to + * flag the contention. + */ +static void rcu_nocb_bypass_lock(struct rcu_data *rdp) +{ + lockdep_assert_irqs_disabled(); + if (raw_spin_trylock(&rdp->nocb_bypass_lock)) + return; + atomic_inc(&rdp->nocb_lock_contended); + WARN_ON_ONCE(smp_processor_id() != rdp->cpu); + smp_mb__after_atomic(); /* atomic_inc() before lock. */ + raw_spin_lock(&rdp->nocb_bypass_lock); + smp_mb__before_atomic(); /* atomic_dec() after lock. */ + atomic_dec(&rdp->nocb_lock_contended); +} + +/* + * Spinwait until the specified rcu_data structure's ->nocb_lock is + * not contended. Please note that this is extremely special-purpose, + * relying on the fact that at most two kthreads and one CPU contend for + * this lock, and also that the two kthreads are guaranteed to have frequent + * grace-period-duration time intervals between successive acquisitions + * of the lock. This allows us to use an extremely simple throttling + * mechanism, and further to apply it only to the CPU doing floods of + * call_rcu() invocations. Don't try this at home! + */ +static void rcu_nocb_wait_contended(struct rcu_data *rdp) +{ + WARN_ON_ONCE(smp_processor_id() != rdp->cpu); + while (WARN_ON_ONCE(atomic_read(&rdp->nocb_lock_contended))) + cpu_relax(); +} + +/* + * Conditionally acquire the specified rcu_data structure's + * ->nocb_bypass_lock. + */ +static bool rcu_nocb_bypass_trylock(struct rcu_data *rdp) +{ + lockdep_assert_irqs_disabled(); + return raw_spin_trylock(&rdp->nocb_bypass_lock); +} + +/* + * Release the specified rcu_data structure's ->nocb_bypass_lock. + */ +static void rcu_nocb_bypass_unlock(struct rcu_data *rdp) +{ + lockdep_assert_irqs_disabled(); + raw_spin_unlock(&rdp->nocb_bypass_lock); +} + +/* + * Acquire the specified rcu_data structure's ->nocb_lock, but only + * if it corresponds to a no-CBs CPU. + */ +static void rcu_nocb_lock(struct rcu_data *rdp) +{ + lockdep_assert_irqs_disabled(); + if (!rcu_segcblist_is_offloaded(&rdp->cblist)) + return; + raw_spin_lock(&rdp->nocb_lock); +} + +/* + * Release the specified rcu_data structure's ->nocb_lock, but only + * if it corresponds to a no-CBs CPU. + */ +static void rcu_nocb_unlock(struct rcu_data *rdp) +{ + if (rcu_segcblist_is_offloaded(&rdp->cblist)) { + lockdep_assert_irqs_disabled(); + raw_spin_unlock(&rdp->nocb_lock); + } +} + +/* + * Release the specified rcu_data structure's ->nocb_lock and restore + * interrupts, but only if it corresponds to a no-CBs CPU. + */ +static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp, + unsigned long flags) +{ + if (rcu_segcblist_is_offloaded(&rdp->cblist)) { + lockdep_assert_irqs_disabled(); + raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + } else { + local_irq_restore(flags); + } +} + +/* Lockdep check that ->cblist may be safely accessed. */ +static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp) +{ + lockdep_assert_irqs_disabled(); + if (rcu_segcblist_is_offloaded(&rdp->cblist) && + cpu_online(rdp->cpu)) + lockdep_assert_held(&rdp->nocb_lock); +} + +/* * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended * grace period. */ @@ -1543,440 +1635,514 @@ bool rcu_is_nocb_cpu(int cpu) } /* - * Kick the leader kthread for this NOCB group. Caller holds ->nocb_lock + * Kick the GP kthread for this NOCB group. Caller holds ->nocb_lock * and this function releases it. */ -static void __wake_nocb_leader(struct rcu_data *rdp, bool force, - unsigned long flags) +static void wake_nocb_gp(struct rcu_data *rdp, bool force, + unsigned long flags) __releases(rdp->nocb_lock) { - struct rcu_data *rdp_leader = rdp->nocb_leader; + bool needwake = false; + struct rcu_data *rdp_gp = rdp->nocb_gp_rdp; lockdep_assert_held(&rdp->nocb_lock); - if (!READ_ONCE(rdp_leader->nocb_kthread)) { - raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) { + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, + TPS("AlreadyAwake")); + rcu_nocb_unlock_irqrestore(rdp, flags); return; } - if (rdp_leader->nocb_leader_sleep || force) { - /* Prior smp_mb__after_atomic() orders against prior enqueue. */ - WRITE_ONCE(rdp_leader->nocb_leader_sleep, false); - del_timer(&rdp->nocb_timer); - raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); - smp_mb(); /* ->nocb_leader_sleep before swake_up_one(). */ - swake_up_one(&rdp_leader->nocb_wq); - } else { - raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + del_timer(&rdp->nocb_timer); + rcu_nocb_unlock_irqrestore(rdp, flags); + raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags); + if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) { + WRITE_ONCE(rdp_gp->nocb_gp_sleep, false); + needwake = true; + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake")); } + raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags); + if (needwake) + wake_up_process(rdp_gp->nocb_gp_kthread); } /* - * Kick the leader kthread for this NOCB group, but caller has not - * acquired locks. + * Arrange to wake the GP kthread for this NOCB group at some future + * time when it is safe to do so. */ -static void wake_nocb_leader(struct rcu_data *rdp, bool force) +static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, + const char *reason) { - unsigned long flags; + if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT) + mod_timer(&rdp->nocb_timer, jiffies + 1); + if (rdp->nocb_defer_wakeup < waketype) + WRITE_ONCE(rdp->nocb_defer_wakeup, waketype); + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason); +} + +/* + * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL. + * However, if there is a callback to be enqueued and if ->nocb_bypass + * proves to be initially empty, just return false because the no-CB GP + * kthread may need to be awakened in this case. + * + * Note that this function always returns true if rhp is NULL. + */ +static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, + unsigned long j) +{ + struct rcu_cblist rcl; - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); - __wake_nocb_leader(rdp, force, flags); + WARN_ON_ONCE(!rcu_segcblist_is_offloaded(&rdp->cblist)); + rcu_lockdep_assert_cblist_protected(rdp); + lockdep_assert_held(&rdp->nocb_bypass_lock); + if (rhp && !rcu_cblist_n_cbs(&rdp->nocb_bypass)) { + raw_spin_unlock(&rdp->nocb_bypass_lock); + return false; + } + /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */ + if (rhp) + rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ + rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); + rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl); + WRITE_ONCE(rdp->nocb_bypass_first, j); + rcu_nocb_bypass_unlock(rdp); + return true; } /* - * Arrange to wake the leader kthread for this NOCB group at some - * future time when it is safe to do so. + * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL. + * However, if there is a callback to be enqueued and if ->nocb_bypass + * proves to be initially empty, just return false because the no-CB GP + * kthread may need to be awakened in this case. + * + * Note that this function always returns true if rhp is NULL. */ -static void wake_nocb_leader_defer(struct rcu_data *rdp, int waketype, - const char *reason) +static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, + unsigned long j) { - unsigned long flags; + if (!rcu_segcblist_is_offloaded(&rdp->cblist)) + return true; + rcu_lockdep_assert_cblist_protected(rdp); + rcu_nocb_bypass_lock(rdp); + return rcu_nocb_do_flush_bypass(rdp, rhp, j); +} - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); - if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT) - mod_timer(&rdp->nocb_timer, jiffies + 1); - WRITE_ONCE(rdp->nocb_defer_wakeup, waketype); - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason); - raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); +/* + * If the ->nocb_bypass_lock is immediately available, flush the + * ->nocb_bypass queue into ->cblist. + */ +static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) +{ + rcu_lockdep_assert_cblist_protected(rdp); + if (!rcu_segcblist_is_offloaded(&rdp->cblist) || + !rcu_nocb_bypass_trylock(rdp)) + return; + WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j)); } -/* Does rcu_barrier need to queue an RCU callback on the specified CPU? */ -static bool rcu_nocb_cpu_needs_barrier(int cpu) +/* + * See whether it is appropriate to use the ->nocb_bypass list in order + * to control contention on ->nocb_lock. A limited number of direct + * enqueues are permitted into ->cblist per jiffy. If ->nocb_bypass + * is non-empty, further callbacks must be placed into ->nocb_bypass, + * otherwise rcu_barrier() breaks. Use rcu_nocb_flush_bypass() to switch + * back to direct use of ->cblist. However, ->nocb_bypass should not be + * used if ->cblist is empty, because otherwise callbacks can be stranded + * on ->nocb_bypass because we cannot count on the current CPU ever again + * invoking call_rcu(). The general rule is that if ->nocb_bypass is + * non-empty, the corresponding no-CBs grace-period kthread must not be + * in an indefinite sleep state. + * + * Finally, it is not permitted to use the bypass during early boot, + * as doing so would confuse the auto-initialization code. Besides + * which, there is no point in worrying about lock contention while + * there is only one CPU in operation. + */ +static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, + bool *was_alldone, unsigned long flags) { - struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); - unsigned long ret; -#ifdef CONFIG_PROVE_RCU - struct rcu_head *rhp; -#endif /* #ifdef CONFIG_PROVE_RCU */ + unsigned long c; + unsigned long cur_gp_seq; + unsigned long j = jiffies; + long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); - /* - * Check count of all no-CBs callbacks awaiting invocation. - * There needs to be a barrier before this function is called, - * but associated with a prior determination that no more - * callbacks would be posted. In the worst case, the first - * barrier in rcu_barrier() suffices (but the caller cannot - * necessarily rely on this, not a substitute for the caller - * getting the concurrency design right!). There must also be a - * barrier between the following load and posting of a callback - * (if a callback is in fact needed). This is associated with an - * atomic_inc() in the caller. - */ - ret = rcu_get_n_cbs_nocb_cpu(rdp); - -#ifdef CONFIG_PROVE_RCU - rhp = READ_ONCE(rdp->nocb_head); - if (!rhp) - rhp = READ_ONCE(rdp->nocb_gp_head); - if (!rhp) - rhp = READ_ONCE(rdp->nocb_follower_head); - - /* Having no rcuo kthread but CBs after scheduler starts is bad! */ - if (!READ_ONCE(rdp->nocb_kthread) && rhp && - rcu_scheduler_fully_active) { - /* RCU callback enqueued before CPU first came online??? */ - pr_err("RCU: Never-onlined no-CBs CPU %d has CB %p\n", - cpu, rhp->func); - WARN_ON_ONCE(1); + if (!rcu_segcblist_is_offloaded(&rdp->cblist)) { + *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); + return false; /* Not offloaded, no bypassing. */ + } + lockdep_assert_irqs_disabled(); + + // Don't use ->nocb_bypass during early boot. + if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) { + rcu_nocb_lock(rdp); + WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); + *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); + return false; + } + + // If we have advanced to a new jiffy, reset counts to allow + // moving back from ->nocb_bypass to ->cblist. + if (j == rdp->nocb_nobypass_last) { + c = rdp->nocb_nobypass_count + 1; + } else { + WRITE_ONCE(rdp->nocb_nobypass_last, j); + c = rdp->nocb_nobypass_count - nocb_nobypass_lim_per_jiffy; + if (ULONG_CMP_LT(rdp->nocb_nobypass_count, + nocb_nobypass_lim_per_jiffy)) + c = 0; + else if (c > nocb_nobypass_lim_per_jiffy) + c = nocb_nobypass_lim_per_jiffy; + } + WRITE_ONCE(rdp->nocb_nobypass_count, c); + + // If there hasn't yet been all that many ->cblist enqueues + // this jiffy, tell the caller to enqueue onto ->cblist. But flush + // ->nocb_bypass first. + if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { + rcu_nocb_lock(rdp); + *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); + if (*was_alldone) + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, + TPS("FirstQ")); + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); + WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); + return false; // Caller must enqueue the callback. + } + + // If ->nocb_bypass has been used too long or is too full, + // flush ->nocb_bypass to ->cblist. + if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || + ncbs >= qhimark) { + rcu_nocb_lock(rdp); + if (!rcu_nocb_flush_bypass(rdp, rhp, j)) { + *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); + if (*was_alldone) + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, + TPS("FirstQ")); + WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); + return false; // Caller must enqueue the callback. + } + if (j != rdp->nocb_gp_adv_time && + rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) && + rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) { + rcu_advance_cbs_nowake(rdp->mynode, rdp); + rdp->nocb_gp_adv_time = j; + } + rcu_nocb_unlock_irqrestore(rdp, flags); + return true; // Callback already enqueued. } -#endif /* #ifdef CONFIG_PROVE_RCU */ - return !!ret; + // We need to use the bypass. + rcu_nocb_wait_contended(rdp); + rcu_nocb_bypass_lock(rdp); + ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); + rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ + rcu_cblist_enqueue(&rdp->nocb_bypass, rhp); + if (!ncbs) { + WRITE_ONCE(rdp->nocb_bypass_first, j); + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ")); + } + rcu_nocb_bypass_unlock(rdp); + smp_mb(); /* Order enqueue before wake. */ + if (ncbs) { + local_irq_restore(flags); + } else { + // No-CBs GP kthread might be indefinitely asleep, if so, wake. + rcu_nocb_lock(rdp); // Rare during call_rcu() flood. + if (!rcu_segcblist_pend_cbs(&rdp->cblist)) { + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, + TPS("FirstBQwake")); + __call_rcu_nocb_wake(rdp, true, flags); + } else { + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, + TPS("FirstBQnoWake")); + rcu_nocb_unlock_irqrestore(rdp, flags); + } + } + return true; // Callback already enqueued. } /* - * Enqueue the specified string of rcu_head structures onto the specified - * CPU's no-CBs lists. The CPU is specified by rdp, the head of the - * string by rhp, and the tail of the string by rhtp. The non-lazy/lazy - * counts are supplied by rhcount and rhcount_lazy. + * Awaken the no-CBs grace-period kthead if needed, either due to it + * legitimately being asleep or due to overload conditions. * * If warranted, also wake up the kthread servicing this CPUs queues. */ -static void __call_rcu_nocb_enqueue(struct rcu_data *rdp, - struct rcu_head *rhp, - struct rcu_head **rhtp, - int rhcount, int rhcount_lazy, - unsigned long flags) +static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, + unsigned long flags) + __releases(rdp->nocb_lock) { - int len; - struct rcu_head **old_rhpp; + unsigned long cur_gp_seq; + unsigned long j; + long len; struct task_struct *t; - /* Enqueue the callback on the nocb list and update counts. */ - atomic_long_add(rhcount, &rdp->nocb_q_count); - /* rcu_barrier() relies on ->nocb_q_count add before xchg. */ - old_rhpp = xchg(&rdp->nocb_tail, rhtp); - WRITE_ONCE(*old_rhpp, rhp); - atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy); - smp_mb__after_atomic(); /* Store *old_rhpp before _wake test. */ - - /* If we are not being polled and there is a kthread, awaken it ... */ - t = READ_ONCE(rdp->nocb_kthread); + // If we are being polled or there is no kthread, just leave. + t = READ_ONCE(rdp->nocb_gp_kthread); if (rcu_nocb_poll || !t) { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNotPoll")); + rcu_nocb_unlock_irqrestore(rdp, flags); return; } - len = rcu_get_n_cbs_nocb_cpu(rdp); - if (old_rhpp == &rdp->nocb_head) { + // Need to actually to a wakeup. + len = rcu_segcblist_n_cbs(&rdp->cblist); + if (was_alldone) { + rdp->qlen_last_fqs_check = len; if (!irqs_disabled_flags(flags)) { /* ... if queue was empty ... */ - wake_nocb_leader(rdp, false); + wake_nocb_gp(rdp, false, flags); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeEmpty")); } else { - wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE, - TPS("WakeEmptyIsDeferred")); + wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE, + TPS("WakeEmptyIsDeferred")); + rcu_nocb_unlock_irqrestore(rdp, flags); } - rdp->qlen_last_fqs_check = 0; } else if (len > rdp->qlen_last_fqs_check + qhimark) { /* ... or if many callbacks queued. */ - if (!irqs_disabled_flags(flags)) { - wake_nocb_leader(rdp, true); - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, - TPS("WakeOvf")); - } else { - wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE_FORCE, - TPS("WakeOvfIsDeferred")); + rdp->qlen_last_fqs_check = len; + j = jiffies; + if (j != rdp->nocb_gp_adv_time && + rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) && + rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) { + rcu_advance_cbs_nowake(rdp->mynode, rdp); + rdp->nocb_gp_adv_time = j; } - rdp->qlen_last_fqs_check = LONG_MAX / 2; + smp_mb(); /* Enqueue before timer_pending(). */ + if ((rdp->nocb_cb_sleep || + !rcu_segcblist_ready_cbs(&rdp->cblist)) && + !timer_pending(&rdp->nocb_bypass_timer)) + wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE, + TPS("WakeOvfIsDeferred")); + rcu_nocb_unlock_irqrestore(rdp, flags); } else { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot")); + rcu_nocb_unlock_irqrestore(rdp, flags); } return; } -/* - * This is a helper for __call_rcu(), which invokes this when the normal - * callback queue is inoperable. If this is not a no-CBs CPU, this - * function returns failure back to __call_rcu(), which can complain - * appropriately. - * - * Otherwise, this function queues the callback where the corresponding - * "rcuo" kthread can find it. - */ -static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, - bool lazy, unsigned long flags) +/* Wake up the no-CBs GP kthread to flush ->nocb_bypass. */ +static void do_nocb_bypass_wakeup_timer(struct timer_list *t) { + unsigned long flags; + struct rcu_data *rdp = from_timer(rdp, t, nocb_bypass_timer); - if (!rcu_is_nocb_cpu(rdp->cpu)) - return false; - __call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy, flags); - if (__is_kfree_rcu_offset((unsigned long)rhp->func)) - trace_rcu_kfree_callback(rcu_state.name, rhp, - (unsigned long)rhp->func, - -atomic_long_read(&rdp->nocb_q_count_lazy), - -rcu_get_n_cbs_nocb_cpu(rdp)); - else - trace_rcu_callback(rcu_state.name, rhp, - -atomic_long_read(&rdp->nocb_q_count_lazy), - -rcu_get_n_cbs_nocb_cpu(rdp)); - - /* - * If called from an extended quiescent state with interrupts - * disabled, invoke the RCU core in order to allow the idle-entry - * deferred-wakeup check to function. - */ - if (irqs_disabled_flags(flags) && - !rcu_is_watching() && - cpu_online(smp_processor_id())) - invoke_rcu_core(); - - return true; -} - -/* - * Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is - * not a no-CBs CPU. - */ -static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp, - struct rcu_data *rdp, - unsigned long flags) -{ - lockdep_assert_irqs_disabled(); - if (!rcu_is_nocb_cpu(smp_processor_id())) - return false; /* Not NOCBs CPU, caller must migrate CBs. */ - __call_rcu_nocb_enqueue(my_rdp, rcu_segcblist_head(&rdp->cblist), - rcu_segcblist_tail(&rdp->cblist), - rcu_segcblist_n_cbs(&rdp->cblist), - rcu_segcblist_n_lazy_cbs(&rdp->cblist), flags); - rcu_segcblist_init(&rdp->cblist); - rcu_segcblist_disable(&rdp->cblist); - return true; + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer")); + rcu_nocb_lock_irqsave(rdp, flags); + smp_mb__after_spinlock(); /* Timer expire before wakeup. */ + __call_rcu_nocb_wake(rdp, true, flags); } /* - * If necessary, kick off a new grace period, and either way wait - * for a subsequent grace period to complete. + * No-CBs GP kthreads come here to wait for additional callbacks to show up + * or for grace periods to end. */ -static void rcu_nocb_wait_gp(struct rcu_data *rdp) +static void nocb_gp_wait(struct rcu_data *my_rdp) { - unsigned long c; - bool d; + bool bypass = false; + long bypass_ncbs; + int __maybe_unused cpu = my_rdp->cpu; + unsigned long cur_gp_seq; unsigned long flags; + bool gotcbs; + unsigned long j = jiffies; + bool needwait_gp = false; // This prevents actual uninitialized use. bool needwake; - struct rcu_node *rnp = rdp->mynode; + bool needwake_gp; + struct rcu_data *rdp; + struct rcu_node *rnp; + unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning. - local_irq_save(flags); - c = rcu_seq_snap(&rcu_state.gp_seq); - if (!rdp->gpwrap && ULONG_CMP_GE(rdp->gp_seq_needed, c)) { - local_irq_restore(flags); - } else { - raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */ - needwake = rcu_start_this_gp(rnp, rdp, c); - raw_spin_unlock_irqrestore_rcu_node(rnp, flags); - if (needwake) + /* + * Each pass through the following loop checks for CBs and for the + * nearest grace period (if any) to wait for next. The CB kthreads + * and the global grace-period kthread are awakened if needed. + */ + for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) { + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check")); + rcu_nocb_lock_irqsave(rdp, flags); + bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); + if (bypass_ncbs && + (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) || + bypass_ncbs > 2 * qhimark)) { + // Bypass full or old, so flush it. + (void)rcu_nocb_try_flush_bypass(rdp, j); + bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); + } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) { + rcu_nocb_unlock_irqrestore(rdp, flags); + continue; /* No callbacks here, try next. */ + } + if (bypass_ncbs) { + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, + TPS("Bypass")); + bypass = true; + } + rnp = rdp->mynode; + if (bypass) { // Avoid race with first bypass CB. + WRITE_ONCE(my_rdp->nocb_defer_wakeup, + RCU_NOCB_WAKE_NOT); + del_timer(&my_rdp->nocb_timer); + } + // Advance callbacks if helpful and low contention. + needwake_gp = false; + if (!rcu_segcblist_restempty(&rdp->cblist, + RCU_NEXT_READY_TAIL) || + (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) && + rcu_seq_done(&rnp->gp_seq, cur_gp_seq))) { + raw_spin_lock_rcu_node(rnp); /* irqs disabled. */ + needwake_gp = rcu_advance_cbs(rnp, rdp); + raw_spin_unlock_rcu_node(rnp); /* irqs disabled. */ + } + // Need to wait on some grace period? + WARN_ON_ONCE(!rcu_segcblist_restempty(&rdp->cblist, + RCU_NEXT_READY_TAIL)); + if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) { + if (!needwait_gp || + ULONG_CMP_LT(cur_gp_seq, wait_gp_seq)) + wait_gp_seq = cur_gp_seq; + needwait_gp = true; + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, + TPS("NeedWaitGP")); + } + if (rcu_segcblist_ready_cbs(&rdp->cblist)) { + needwake = rdp->nocb_cb_sleep; + WRITE_ONCE(rdp->nocb_cb_sleep, false); + smp_mb(); /* CB invocation -after- GP end. */ + } else { + needwake = false; + } + rcu_nocb_unlock_irqrestore(rdp, flags); + if (needwake) { + swake_up_one(&rdp->nocb_cb_wq); + gotcbs = true; + } + if (needwake_gp) rcu_gp_kthread_wake(); } - /* - * Wait for the grace period. Do so interruptibly to avoid messing - * up the load average. - */ - trace_rcu_this_gp(rnp, rdp, c, TPS("StartWait")); - for (;;) { + my_rdp->nocb_gp_bypass = bypass; + my_rdp->nocb_gp_gp = needwait_gp; + my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0; + if (bypass && !rcu_nocb_poll) { + // At least one child with non-empty ->nocb_bypass, so set + // timer in order to avoid stranding its callbacks. + raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); + mod_timer(&my_rdp->nocb_bypass_timer, j + 2); + raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); + } + if (rcu_nocb_poll) { + /* Polling, so trace if first poll in the series. */ + if (gotcbs) + trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll")); + schedule_timeout_interruptible(1); + } else if (!needwait_gp) { + /* Wait for callbacks to appear. */ + trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep")); + swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq, + !READ_ONCE(my_rdp->nocb_gp_sleep)); + trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep")); + } else { + rnp = my_rdp->mynode; + trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait")); swait_event_interruptible_exclusive( - rnp->nocb_gp_wq[rcu_seq_ctr(c) & 0x1], - (d = rcu_seq_done(&rnp->gp_seq, c))); - if (likely(d)) - break; - WARN_ON(signal_pending(current)); - trace_rcu_this_gp(rnp, rdp, c, TPS("ResumeWait")); + rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1], + rcu_seq_done(&rnp->gp_seq, wait_gp_seq) || + !READ_ONCE(my_rdp->nocb_gp_sleep)); + trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait")); } - trace_rcu_this_gp(rnp, rdp, c, TPS("EndWait")); - smp_mb(); /* Ensure that CB invocation happens after GP end. */ + if (!rcu_nocb_poll) { + raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); + if (bypass) + del_timer(&my_rdp->nocb_bypass_timer); + WRITE_ONCE(my_rdp->nocb_gp_sleep, true); + raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); + } + my_rdp->nocb_gp_seq = -1; + WARN_ON(signal_pending(current)); } /* - * Leaders come here to wait for additional callbacks to show up. - * This function does not return until callbacks appear. + * No-CBs grace-period-wait kthread. There is one of these per group + * of CPUs, but only once at least one CPU in that group has come online + * at least once since boot. This kthread checks for newly posted + * callbacks from any of the CPUs it is responsible for, waits for a + * grace period, then awakens all of the rcu_nocb_cb_kthread() instances + * that then have callback-invocation work to do. */ -static void nocb_leader_wait(struct rcu_data *my_rdp) +static int rcu_nocb_gp_kthread(void *arg) { - bool firsttime = true; - unsigned long flags; - bool gotcbs; - struct rcu_data *rdp; - struct rcu_head **tail; - -wait_again: - - /* Wait for callbacks to appear. */ - if (!rcu_nocb_poll) { - trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, TPS("Sleep")); - swait_event_interruptible_exclusive(my_rdp->nocb_wq, - !READ_ONCE(my_rdp->nocb_leader_sleep)); - raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags); - my_rdp->nocb_leader_sleep = true; - WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); - del_timer(&my_rdp->nocb_timer); - raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags); - } else if (firsttime) { - firsttime = false; /* Don't drown trace log with "Poll"! */ - trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, TPS("Poll")); - } - - /* - * Each pass through the following loop checks a follower for CBs. - * We are our own first follower. Any CBs found are moved to - * nocb_gp_head, where they await a grace period. - */ - gotcbs = false; - smp_mb(); /* wakeup and _sleep before ->nocb_head reads. */ - for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) { - rdp->nocb_gp_head = READ_ONCE(rdp->nocb_head); - if (!rdp->nocb_gp_head) - continue; /* No CBs here, try next follower. */ - - /* Move callbacks to wait-for-GP list, which is empty. */ - WRITE_ONCE(rdp->nocb_head, NULL); - rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head); - gotcbs = true; - } - - /* No callbacks? Sleep a bit if polling, and go retry. */ - if (unlikely(!gotcbs)) { - WARN_ON(signal_pending(current)); - if (rcu_nocb_poll) { - schedule_timeout_interruptible(1); - } else { - trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, - TPS("WokeEmpty")); - } - goto wait_again; - } + struct rcu_data *rdp = arg; - /* Wait for one grace period. */ - rcu_nocb_wait_gp(my_rdp); - - /* Each pass through the following loop wakes a follower, if needed. */ - for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) { - if (!rcu_nocb_poll && - READ_ONCE(rdp->nocb_head) && - READ_ONCE(my_rdp->nocb_leader_sleep)) { - raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags); - my_rdp->nocb_leader_sleep = false;/* No need to sleep.*/ - raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags); - } - if (!rdp->nocb_gp_head) - continue; /* No CBs, so no need to wake follower. */ - - /* Append callbacks to follower's "done" list. */ - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); - tail = rdp->nocb_follower_tail; - rdp->nocb_follower_tail = rdp->nocb_gp_tail; - *tail = rdp->nocb_gp_head; - raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); - if (rdp != my_rdp && tail == &rdp->nocb_follower_head) { - /* List was empty, so wake up the follower. */ - swake_up_one(&rdp->nocb_wq); - } + for (;;) { + WRITE_ONCE(rdp->nocb_gp_loops, rdp->nocb_gp_loops + 1); + nocb_gp_wait(rdp); + cond_resched_tasks_rcu_qs(); } - - /* If we (the leader) don't have CBs, go wait some more. */ - if (!my_rdp->nocb_follower_head) - goto wait_again; + return 0; } /* - * Followers come here to wait for additional callbacks to show up. - * This function does not return until callbacks appear. + * Invoke any ready callbacks from the corresponding no-CBs CPU, + * then, if there are no more, wait for more to appear. */ -static void nocb_follower_wait(struct rcu_data *rdp) +static void nocb_cb_wait(struct rcu_data *rdp) { - for (;;) { - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FollowerSleep")); - swait_event_interruptible_exclusive(rdp->nocb_wq, - READ_ONCE(rdp->nocb_follower_head)); - if (smp_load_acquire(&rdp->nocb_follower_head)) { - /* ^^^ Ensure CB invocation follows _head test. */ - return; - } - WARN_ON(signal_pending(current)); - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty")); + unsigned long cur_gp_seq; + unsigned long flags; + bool needwake_gp = false; + struct rcu_node *rnp = rdp->mynode; + + local_irq_save(flags); + rcu_momentary_dyntick_idle(); + local_irq_restore(flags); + local_bh_disable(); + rcu_do_batch(rdp); + local_bh_enable(); + lockdep_assert_irqs_enabled(); + rcu_nocb_lock_irqsave(rdp, flags); + if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) && + rcu_seq_done(&rnp->gp_seq, cur_gp_seq) && + raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */ + needwake_gp = rcu_advance_cbs(rdp->mynode, rdp); + raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */ + } + if (rcu_segcblist_ready_cbs(&rdp->cblist)) { + rcu_nocb_unlock_irqrestore(rdp, flags); + if (needwake_gp) + rcu_gp_kthread_wake(); + return; + } + + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep")); + WRITE_ONCE(rdp->nocb_cb_sleep, true); + rcu_nocb_unlock_irqrestore(rdp, flags); + if (needwake_gp) + rcu_gp_kthread_wake(); + swait_event_interruptible_exclusive(rdp->nocb_cb_wq, + !READ_ONCE(rdp->nocb_cb_sleep)); + if (!smp_load_acquire(&rdp->nocb_cb_sleep)) { /* VVV */ + /* ^^^ Ensure CB invocation follows _sleep test. */ + return; } + WARN_ON(signal_pending(current)); + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty")); } /* - * Per-rcu_data kthread, but only for no-CBs CPUs. Each kthread invokes - * callbacks queued by the corresponding no-CBs CPU, however, there is - * an optional leader-follower relationship so that the grace-period - * kthreads don't have to do quite so many wakeups. + * Per-rcu_data kthread, but only for no-CBs CPUs. Repeatedly invoke + * nocb_cb_wait() to do the dirty work. */ -static int rcu_nocb_kthread(void *arg) +static int rcu_nocb_cb_kthread(void *arg) { - int c, cl; - unsigned long flags; - struct rcu_head *list; - struct rcu_head *next; - struct rcu_head **tail; struct rcu_data *rdp = arg; - /* Each pass through this loop invokes one batch of callbacks */ + // Each pass through this loop does one callback batch, and, + // if there are no more ready callbacks, waits for them. for (;;) { - /* Wait for callbacks. */ - if (rdp->nocb_leader == rdp) - nocb_leader_wait(rdp); - else - nocb_follower_wait(rdp); - - /* Pull the ready-to-invoke callbacks onto local list. */ - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); - list = rdp->nocb_follower_head; - rdp->nocb_follower_head = NULL; - tail = rdp->nocb_follower_tail; - rdp->nocb_follower_tail = &rdp->nocb_follower_head; - raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); - if (WARN_ON_ONCE(!list)) - continue; - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeNonEmpty")); - - /* Each pass through the following loop invokes a callback. */ - trace_rcu_batch_start(rcu_state.name, - atomic_long_read(&rdp->nocb_q_count_lazy), - rcu_get_n_cbs_nocb_cpu(rdp), -1); - c = cl = 0; - while (list) { - next = list->next; - /* Wait for enqueuing to complete, if needed. */ - while (next == NULL && &list->next != tail) { - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, - TPS("WaitQueue")); - schedule_timeout_interruptible(1); - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, - TPS("WokeQueue")); - next = list->next; - } - debug_rcu_head_unqueue(list); - local_bh_disable(); - if (__rcu_reclaim(rcu_state.name, list)) - cl++; - c++; - local_bh_enable(); - cond_resched_tasks_rcu_qs(); - list = next; - } - trace_rcu_batch_end(rcu_state.name, c, !!list, 0, 0, 1); - smp_mb__before_atomic(); /* _add after CB invocation. */ - atomic_long_add(-c, &rdp->nocb_q_count); - atomic_long_add(-cl, &rdp->nocb_q_count_lazy); + nocb_cb_wait(rdp); + cond_resched_tasks_rcu_qs(); } return 0; } @@ -1993,14 +2159,14 @@ static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp) unsigned long flags; int ndw; - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); + rcu_nocb_lock_irqsave(rdp, flags); if (!rcu_nocb_need_deferred_wakeup(rdp)) { - raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + rcu_nocb_unlock_irqrestore(rdp, flags); return; } ndw = READ_ONCE(rdp->nocb_defer_wakeup); WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); - __wake_nocb_leader(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags); + wake_nocb_gp(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake")); } @@ -2027,6 +2193,7 @@ void __init rcu_init_nohz(void) { int cpu; bool need_rcu_nocb_mask = false; + struct rcu_data *rdp; #if defined(CONFIG_NO_HZ_FULL) if (tick_nohz_full_running && cpumask_weight(tick_nohz_full_mask)) @@ -2060,67 +2227,63 @@ void __init rcu_init_nohz(void) if (rcu_nocb_poll) pr_info("\tPoll for callbacks from no-CBs CPUs.\n"); - for_each_cpu(cpu, rcu_nocb_mask) - init_nocb_callback_list(per_cpu_ptr(&rcu_data, cpu)); + for_each_cpu(cpu, rcu_nocb_mask) { + rdp = per_cpu_ptr(&rcu_data, cpu); + if (rcu_segcblist_empty(&rdp->cblist)) + rcu_segcblist_init(&rdp->cblist); + rcu_segcblist_offload(&rdp->cblist); + } rcu_organize_nocb_kthreads(); } /* Initialize per-rcu_data variables for no-CBs CPUs. */ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp) { - rdp->nocb_tail = &rdp->nocb_head; - init_swait_queue_head(&rdp->nocb_wq); - rdp->nocb_follower_tail = &rdp->nocb_follower_head; + init_swait_queue_head(&rdp->nocb_cb_wq); + init_swait_queue_head(&rdp->nocb_gp_wq); raw_spin_lock_init(&rdp->nocb_lock); + raw_spin_lock_init(&rdp->nocb_bypass_lock); + raw_spin_lock_init(&rdp->nocb_gp_lock); timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0); + timer_setup(&rdp->nocb_bypass_timer, do_nocb_bypass_wakeup_timer, 0); + rcu_cblist_init(&rdp->nocb_bypass); } /* * If the specified CPU is a no-CBs CPU that does not already have its - * rcuo kthread, spawn it. If the CPUs are brought online out of order, - * this can require re-organizing the leader-follower relationships. + * rcuo CB kthread, spawn it. Additionally, if the rcuo GP kthread + * for this CPU's group has not yet been created, spawn it as well. */ static void rcu_spawn_one_nocb_kthread(int cpu) { - struct rcu_data *rdp; - struct rcu_data *rdp_last; - struct rcu_data *rdp_old_leader; - struct rcu_data *rdp_spawn = per_cpu_ptr(&rcu_data, cpu); + struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); + struct rcu_data *rdp_gp; struct task_struct *t; /* * If this isn't a no-CBs CPU or if it already has an rcuo kthread, * then nothing to do. */ - if (!rcu_is_nocb_cpu(cpu) || rdp_spawn->nocb_kthread) + if (!rcu_is_nocb_cpu(cpu) || rdp->nocb_cb_kthread) return; - /* If we didn't spawn the leader first, reorganize! */ - rdp_old_leader = rdp_spawn->nocb_leader; - if (rdp_old_leader != rdp_spawn && !rdp_old_leader->nocb_kthread) { - rdp_last = NULL; - rdp = rdp_old_leader; - do { - rdp->nocb_leader = rdp_spawn; - if (rdp_last && rdp != rdp_spawn) - rdp_last->nocb_next_follower = rdp; - if (rdp == rdp_spawn) { - rdp = rdp->nocb_next_follower; - } else { - rdp_last = rdp; - rdp = rdp->nocb_next_follower; - rdp_last->nocb_next_follower = NULL; - } - } while (rdp); - rdp_spawn->nocb_next_follower = rdp_old_leader; + /* If we didn't spawn the GP kthread first, reorganize! */ + rdp_gp = rdp->nocb_gp_rdp; + if (!rdp_gp->nocb_gp_kthread) { + t = kthread_run(rcu_nocb_gp_kthread, rdp_gp, + "rcuog/%d", rdp_gp->cpu); + if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__)) + return; + WRITE_ONCE(rdp_gp->nocb_gp_kthread, t); } /* Spawn the kthread for this CPU. */ - t = kthread_run(rcu_nocb_kthread, rdp_spawn, + t = kthread_run(rcu_nocb_cb_kthread, rdp, "rcuo%c/%d", rcu_state.abbr, cpu); - if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo kthread, OOM is now expected behavior\n", __func__)) + if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo CB kthread, OOM is now expected behavior\n", __func__)) return; - WRITE_ONCE(rdp_spawn->nocb_kthread, t); + WRITE_ONCE(rdp->nocb_cb_kthread, t); + WRITE_ONCE(rdp->nocb_gp_kthread, rdp_gp->nocb_gp_kthread); } /* @@ -2147,27 +2310,28 @@ static void __init rcu_spawn_nocb_kthreads(void) rcu_spawn_cpu_nocb_kthread(cpu); } -/* How many follower CPU IDs per leader? Default of -1 for sqrt(nr_cpu_ids). */ -static int rcu_nocb_leader_stride = -1; -module_param(rcu_nocb_leader_stride, int, 0444); +/* How many CB CPU IDs per GP kthread? Default of -1 for sqrt(nr_cpu_ids). */ +static int rcu_nocb_gp_stride = -1; +module_param(rcu_nocb_gp_stride, int, 0444); /* - * Initialize leader-follower relationships for all no-CBs CPU. + * Initialize GP-CB relationships for all no-CBs CPU. */ static void __init rcu_organize_nocb_kthreads(void) { int cpu; - int ls = rcu_nocb_leader_stride; - int nl = 0; /* Next leader. */ + bool firsttime = true; + int ls = rcu_nocb_gp_stride; + int nl = 0; /* Next GP kthread. */ struct rcu_data *rdp; - struct rcu_data *rdp_leader = NULL; /* Suppress misguided gcc warn. */ + struct rcu_data *rdp_gp = NULL; /* Suppress misguided gcc warn. */ struct rcu_data *rdp_prev = NULL; if (!cpumask_available(rcu_nocb_mask)) return; if (ls == -1) { - ls = int_sqrt(nr_cpu_ids); - rcu_nocb_leader_stride = ls; + ls = nr_cpu_ids / int_sqrt(nr_cpu_ids); + rcu_nocb_gp_stride = ls; } /* @@ -2178,39 +2342,24 @@ static void __init rcu_organize_nocb_kthreads(void) for_each_cpu(cpu, rcu_nocb_mask) { rdp = per_cpu_ptr(&rcu_data, cpu); if (rdp->cpu >= nl) { - /* New leader, set up for followers & next leader. */ + /* New GP kthread, set up for CBs & next GP. */ nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls; - rdp->nocb_leader = rdp; - rdp_leader = rdp; + rdp->nocb_gp_rdp = rdp; + rdp_gp = rdp; + if (!firsttime && dump_tree) + pr_cont("\n"); + firsttime = false; + pr_alert("%s: No-CB GP kthread CPU %d:", __func__, cpu); } else { - /* Another follower, link to previous leader. */ - rdp->nocb_leader = rdp_leader; - rdp_prev->nocb_next_follower = rdp; + /* Another CB kthread, link to previous GP kthread. */ + rdp->nocb_gp_rdp = rdp_gp; + rdp_prev->nocb_next_cb_rdp = rdp; + pr_alert(" %d", cpu); } rdp_prev = rdp; } } -/* Prevent __call_rcu() from enqueuing callbacks on no-CBs CPUs */ -static bool init_nocb_callback_list(struct rcu_data *rdp) -{ - if (!rcu_is_nocb_cpu(rdp->cpu)) - return false; - - /* If there are early-boot callbacks, move them to nocb lists. */ - if (!rcu_segcblist_empty(&rdp->cblist)) { - rdp->nocb_head = rcu_segcblist_head(&rdp->cblist); - rdp->nocb_tail = rcu_segcblist_tail(&rdp->cblist); - atomic_long_set(&rdp->nocb_q_count, - rcu_segcblist_n_cbs(&rdp->cblist)); - atomic_long_set(&rdp->nocb_q_count_lazy, - rcu_segcblist_n_lazy_cbs(&rdp->cblist)); - rcu_segcblist_init(&rdp->cblist); - } - rcu_segcblist_disable(&rdp->cblist); - return true; -} - /* * Bind the current task to the offloaded CPUs. If there are no offloaded * CPUs, leave the task unbound. Splat if the bind attempt fails. @@ -2223,20 +2372,101 @@ void rcu_bind_current_to_nocb(void) EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb); /* - * Return the number of RCU callbacks still queued from the specified - * CPU, which must be a nocbs CPU. + * Dump out nocb grace-period kthread state for the specified rcu_data + * structure. */ -static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp) +static void show_rcu_nocb_gp_state(struct rcu_data *rdp) { - return atomic_long_read(&rdp->nocb_q_count); + struct rcu_node *rnp = rdp->mynode; + + pr_info("nocb GP %d %c%c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu\n", + rdp->cpu, + "kK"[!!rdp->nocb_gp_kthread], + "lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)], + "dD"[!!rdp->nocb_defer_wakeup], + "tT"[timer_pending(&rdp->nocb_timer)], + "bB"[timer_pending(&rdp->nocb_bypass_timer)], + "sS"[!!rdp->nocb_gp_sleep], + ".W"[swait_active(&rdp->nocb_gp_wq)], + ".W"[swait_active(&rnp->nocb_gp_wq[0])], + ".W"[swait_active(&rnp->nocb_gp_wq[1])], + ".B"[!!rdp->nocb_gp_bypass], + ".G"[!!rdp->nocb_gp_gp], + (long)rdp->nocb_gp_seq, + rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops)); +} + +/* Dump out nocb kthread state for the specified rcu_data structure. */ +static void show_rcu_nocb_state(struct rcu_data *rdp) +{ + struct rcu_segcblist *rsclp = &rdp->cblist; + bool waslocked; + bool wastimer; + bool wassleep; + + if (rdp->nocb_gp_rdp == rdp) + show_rcu_nocb_gp_state(rdp); + + pr_info(" CB %d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%c%c%c q%ld\n", + rdp->cpu, rdp->nocb_gp_rdp->cpu, + "kK"[!!rdp->nocb_cb_kthread], + "bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)], + "cC"[!!atomic_read(&rdp->nocb_lock_contended)], + "lL"[raw_spin_is_locked(&rdp->nocb_lock)], + "sS"[!!rdp->nocb_cb_sleep], + ".W"[swait_active(&rdp->nocb_cb_wq)], + jiffies - rdp->nocb_bypass_first, + jiffies - rdp->nocb_nobypass_last, + rdp->nocb_nobypass_count, + ".D"[rcu_segcblist_ready_cbs(rsclp)], + ".W"[!rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL)], + ".R"[!rcu_segcblist_restempty(rsclp, RCU_WAIT_TAIL)], + ".N"[!rcu_segcblist_restempty(rsclp, RCU_NEXT_READY_TAIL)], + ".B"[!!rcu_cblist_n_cbs(&rdp->nocb_bypass)], + rcu_segcblist_n_cbs(&rdp->cblist)); + + /* It is OK for GP kthreads to have GP state. */ + if (rdp->nocb_gp_rdp == rdp) + return; + + waslocked = raw_spin_is_locked(&rdp->nocb_gp_lock); + wastimer = timer_pending(&rdp->nocb_timer); + wassleep = swait_active(&rdp->nocb_gp_wq); + if (!rdp->nocb_defer_wakeup && !rdp->nocb_gp_sleep && + !waslocked && !wastimer && !wassleep) + return; /* Nothing untowards. */ + + pr_info(" !!! %c%c%c%c %c\n", + "lL"[waslocked], + "dD"[!!rdp->nocb_defer_wakeup], + "tT"[wastimer], + "sS"[!!rdp->nocb_gp_sleep], + ".W"[wassleep]); } #else /* #ifdef CONFIG_RCU_NOCB_CPU */ -static bool rcu_nocb_cpu_needs_barrier(int cpu) +/* No ->nocb_lock to acquire. */ +static void rcu_nocb_lock(struct rcu_data *rdp) +{ +} + +/* No ->nocb_lock to release. */ +static void rcu_nocb_unlock(struct rcu_data *rdp) { - WARN_ON_ONCE(1); /* Should be dead code. */ - return false; +} + +/* No ->nocb_lock to release. */ +static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp, + unsigned long flags) +{ + local_irq_restore(flags); +} + +/* Lockdep check that ->cblist may be safely accessed. */ +static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp) +{ + lockdep_assert_irqs_disabled(); } static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq) @@ -2252,19 +2482,24 @@ static void rcu_init_one_nocb(struct rcu_node *rnp) { } -static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, - bool lazy, unsigned long flags) +static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, + unsigned long j) { - return false; + return true; } -static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp, - struct rcu_data *rdp, - unsigned long flags) +static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, + bool *was_alldone, unsigned long flags) { return false; } +static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, + unsigned long flags) +{ + WARN_ON_ONCE(1); /* Should be dead code! */ +} + static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp) { } @@ -2286,14 +2521,8 @@ static void __init rcu_spawn_nocb_kthreads(void) { } -static bool init_nocb_callback_list(struct rcu_data *rdp) -{ - return false; -} - -static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp) +static void show_rcu_nocb_state(struct rcu_data *rdp) { - return 0; } #endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */ diff --git a/kernel/rcu/tree_stall.h b/kernel/rcu/tree_stall.h index 065183391f75..c0b8c458d8a6 100644 --- a/kernel/rcu/tree_stall.h +++ b/kernel/rcu/tree_stall.h @@ -163,7 +163,7 @@ static void rcu_iw_handler(struct irq_work *iwp) // // Printing RCU CPU stall warnings -#ifdef CONFIG_PREEMPT +#ifdef CONFIG_PREEMPTION /* * Dump detailed information for all tasks blocking the current RCU @@ -215,7 +215,7 @@ static int rcu_print_task_stall(struct rcu_node *rnp) return ndetected; } -#else /* #ifdef CONFIG_PREEMPT */ +#else /* #ifdef CONFIG_PREEMPTION */ /* * Because preemptible RCU does not exist, we never have to check for @@ -233,7 +233,7 @@ static int rcu_print_task_stall(struct rcu_node *rnp) { return 0; } -#endif /* #else #ifdef CONFIG_PREEMPT */ +#endif /* #else #ifdef CONFIG_PREEMPTION */ /* * Dump stacks of all tasks running on stalled CPUs. First try using @@ -527,6 +527,8 @@ static void check_cpu_stall(struct rcu_data *rdp) /* We haven't checked in, so go dump stack. */ print_cpu_stall(); + if (rcu_cpu_stall_ftrace_dump) + rcu_ftrace_dump(DUMP_ALL); } else if (rcu_gp_in_progress() && ULONG_CMP_GE(j, js + RCU_STALL_RAT_DELAY) && @@ -534,6 +536,8 @@ static void check_cpu_stall(struct rcu_data *rdp) /* They had a few time units to dump stack, so complain. */ print_other_cpu_stall(gs2); + if (rcu_cpu_stall_ftrace_dump) + rcu_ftrace_dump(DUMP_ALL); } } @@ -585,6 +589,11 @@ void show_rcu_gp_kthreads(void) cpu, (long)rdp->gp_seq_needed); } } + for_each_possible_cpu(cpu) { + rdp = per_cpu_ptr(&rcu_data, cpu); + if (rcu_segcblist_is_offloaded(&rdp->cblist)) + show_rcu_nocb_state(rdp); + } /* sched_show_task(rcu_state.gp_kthread); */ } EXPORT_SYMBOL_GPL(show_rcu_gp_kthreads); diff --git a/kernel/rcu/update.c b/kernel/rcu/update.c index 61df2bf08563..1861103662db 100644 --- a/kernel/rcu/update.c +++ b/kernel/rcu/update.c @@ -61,9 +61,15 @@ module_param(rcu_normal_after_boot, int, 0); #ifdef CONFIG_DEBUG_LOCK_ALLOC /** - * rcu_read_lock_sched_held() - might we be in RCU-sched read-side critical section? + * rcu_read_lock_held_common() - might we be in RCU-sched read-side critical section? + * @ret: Best guess answer if lockdep cannot be relied on * - * If CONFIG_DEBUG_LOCK_ALLOC is selected, returns nonzero iff in an + * Returns true if lockdep must be ignored, in which case *ret contains + * the best guess described below. Otherwise returns false, in which + * case *ret tells the caller nothing and the caller should instead + * consult lockdep. + * + * If CONFIG_DEBUG_LOCK_ALLOC is selected, set *ret to nonzero iff in an * RCU-sched read-side critical section. In absence of * CONFIG_DEBUG_LOCK_ALLOC, this assumes we are in an RCU-sched read-side * critical section unless it can prove otherwise. Note that disabling @@ -75,35 +81,45 @@ module_param(rcu_normal_after_boot, int, 0); * Check debug_lockdep_rcu_enabled() to prevent false positives during boot * and while lockdep is disabled. * - * Note that if the CPU is in the idle loop from an RCU point of - * view (ie: that we are in the section between rcu_idle_enter() and - * rcu_idle_exit()) then rcu_read_lock_held() returns false even if the CPU - * did an rcu_read_lock(). The reason for this is that RCU ignores CPUs - * that are in such a section, considering these as in extended quiescent - * state, so such a CPU is effectively never in an RCU read-side critical - * section regardless of what RCU primitives it invokes. This state of - * affairs is required --- we need to keep an RCU-free window in idle - * where the CPU may possibly enter into low power mode. This way we can - * notice an extended quiescent state to other CPUs that started a grace - * period. Otherwise we would delay any grace period as long as we run in - * the idle task. + * Note that if the CPU is in the idle loop from an RCU point of view (ie: + * that we are in the section between rcu_idle_enter() and rcu_idle_exit()) + * then rcu_read_lock_held() sets *ret to false even if the CPU did an + * rcu_read_lock(). The reason for this is that RCU ignores CPUs that are + * in such a section, considering these as in extended quiescent state, + * so such a CPU is effectively never in an RCU read-side critical section + * regardless of what RCU primitives it invokes. This state of affairs is + * required --- we need to keep an RCU-free window in idle where the CPU may + * possibly enter into low power mode. This way we can notice an extended + * quiescent state to other CPUs that started a grace period. Otherwise + * we would delay any grace period as long as we run in the idle task. * - * Similarly, we avoid claiming an SRCU read lock held if the current + * Similarly, we avoid claiming an RCU read lock held if the current * CPU is offline. */ +static bool rcu_read_lock_held_common(bool *ret) +{ + if (!debug_lockdep_rcu_enabled()) { + *ret = 1; + return true; + } + if (!rcu_is_watching()) { + *ret = 0; + return true; + } + if (!rcu_lockdep_current_cpu_online()) { + *ret = 0; + return true; + } + return false; +} + int rcu_read_lock_sched_held(void) { - int lockdep_opinion = 0; + bool ret; - if (!debug_lockdep_rcu_enabled()) - return 1; - if (!rcu_is_watching()) - return 0; - if (!rcu_lockdep_current_cpu_online()) - return 0; - if (debug_locks) - lockdep_opinion = lock_is_held(&rcu_sched_lock_map); - return lockdep_opinion || !preemptible(); + if (rcu_read_lock_held_common(&ret)) + return ret; + return lock_is_held(&rcu_sched_lock_map) || !preemptible(); } EXPORT_SYMBOL(rcu_read_lock_sched_held); #endif @@ -136,8 +152,7 @@ static atomic_t rcu_expedited_nesting = ATOMIC_INIT(1); */ bool rcu_gp_is_expedited(void) { - return rcu_expedited || atomic_read(&rcu_expedited_nesting) || - rcu_scheduler_active == RCU_SCHEDULER_INIT; + return rcu_expedited || atomic_read(&rcu_expedited_nesting); } EXPORT_SYMBOL_GPL(rcu_gp_is_expedited); @@ -261,12 +276,10 @@ NOKPROBE_SYMBOL(debug_lockdep_rcu_enabled); */ int rcu_read_lock_held(void) { - if (!debug_lockdep_rcu_enabled()) - return 1; - if (!rcu_is_watching()) - return 0; - if (!rcu_lockdep_current_cpu_online()) - return 0; + bool ret; + + if (rcu_read_lock_held_common(&ret)) + return ret; return lock_is_held(&rcu_lock_map); } EXPORT_SYMBOL_GPL(rcu_read_lock_held); @@ -288,16 +301,28 @@ EXPORT_SYMBOL_GPL(rcu_read_lock_held); */ int rcu_read_lock_bh_held(void) { - if (!debug_lockdep_rcu_enabled()) - return 1; - if (!rcu_is_watching()) - return 0; - if (!rcu_lockdep_current_cpu_online()) - return 0; + bool ret; + + if (rcu_read_lock_held_common(&ret)) + return ret; return in_softirq() || irqs_disabled(); } EXPORT_SYMBOL_GPL(rcu_read_lock_bh_held); +int rcu_read_lock_any_held(void) +{ + bool ret; + + if (rcu_read_lock_held_common(&ret)) + return ret; + if (lock_is_held(&rcu_lock_map) || + lock_is_held(&rcu_bh_lock_map) || + lock_is_held(&rcu_sched_lock_map)) + return 1; + return !preemptible(); +} +EXPORT_SYMBOL_GPL(rcu_read_lock_any_held); + #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ /** @@ -437,6 +462,8 @@ EXPORT_SYMBOL_GPL(rcutorture_sched_setaffinity); #endif #ifdef CONFIG_RCU_STALL_COMMON +int rcu_cpu_stall_ftrace_dump __read_mostly; +module_param(rcu_cpu_stall_ftrace_dump, int, 0644); int rcu_cpu_stall_suppress __read_mostly; /* 1 = suppress stall warnings. */ EXPORT_SYMBOL_GPL(rcu_cpu_stall_suppress); module_param(rcu_cpu_stall_suppress, int, 0644); |