summaryrefslogtreecommitdiff
path: root/kernel/rcu
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/rcu')
-rw-r--r--kernel/rcu/Kconfig8
-rw-r--r--kernel/rcu/Kconfig.debug11
-rw-r--r--kernel/rcu/rcu.h1
-rw-r--r--kernel/rcu/rcu_segcblist.c174
-rw-r--r--kernel/rcu/rcu_segcblist.h54
-rw-r--r--kernel/rcu/rcuperf.c10
-rw-r--r--kernel/rcu/rcutorture.c30
-rw-r--r--kernel/rcu/srcutree.c5
-rw-r--r--kernel/rcu/tree.c217
-rw-r--r--kernel/rcu/tree.h81
-rw-r--r--kernel/rcu/tree_exp.h8
-rw-r--r--kernel/rcu/tree_plugin.h1195
-rw-r--r--kernel/rcu/tree_stall.h15
-rw-r--r--kernel/rcu/update.c105
14 files changed, 1217 insertions, 697 deletions
diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig
index 480edf328b51..7644eda17d62 100644
--- a/kernel/rcu/Kconfig
+++ b/kernel/rcu/Kconfig
@@ -7,7 +7,7 @@ menu "RCU Subsystem"
config TREE_RCU
bool
- default y if !PREEMPT && SMP
+ default y if !PREEMPTION && SMP
help
This option selects the RCU implementation that is
designed for very large SMP system with hundreds or
@@ -16,7 +16,7 @@ config TREE_RCU
config PREEMPT_RCU
bool
- default y if PREEMPT
+ default y if PREEMPTION
help
This option selects the RCU implementation that is
designed for very large SMP systems with hundreds or
@@ -28,7 +28,7 @@ config PREEMPT_RCU
config TINY_RCU
bool
- default y if !PREEMPT && !SMP
+ default y if !PREEMPTION && !SMP
help
This option selects the RCU implementation that is
designed for UP systems from which real-time response
@@ -70,7 +70,7 @@ config TREE_SRCU
This option selects the full-fledged version of SRCU.
config TASKS_RCU
- def_bool PREEMPT
+ def_bool PREEMPTION
select SRCU
help
This option enables a task-based RCU implementation that uses
diff --git a/kernel/rcu/Kconfig.debug b/kernel/rcu/Kconfig.debug
index 5ec3ea4028e2..4aa02eee8f6c 100644
--- a/kernel/rcu/Kconfig.debug
+++ b/kernel/rcu/Kconfig.debug
@@ -8,6 +8,17 @@ menu "RCU Debugging"
config PROVE_RCU
def_bool PROVE_LOCKING
+config PROVE_RCU_LIST
+ bool "RCU list lockdep debugging"
+ depends on PROVE_RCU && RCU_EXPERT
+ default n
+ help
+ Enable RCU lockdep checking for list usages. By default it is
+ turned off since there are several list RCU users that still
+ need to be converted to pass a lockdep expression. To prevent
+ false-positive splats, we keep it default disabled but once all
+ users are converted, we can remove this config option.
+
config TORTURE_TEST
tristate
default n
diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
index 5290b01de534..8fd4f82c9b3d 100644
--- a/kernel/rcu/rcu.h
+++ b/kernel/rcu/rcu.h
@@ -227,6 +227,7 @@ static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head)
#ifdef CONFIG_RCU_STALL_COMMON
+extern int rcu_cpu_stall_ftrace_dump;
extern int rcu_cpu_stall_suppress;
extern int rcu_cpu_stall_timeout;
int rcu_jiffies_till_stall_check(void);
diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
index 9bd5f6023c21..495c58ce1640 100644
--- a/kernel/rcu/rcu_segcblist.c
+++ b/kernel/rcu/rcu_segcblist.c
@@ -24,6 +24,49 @@ void rcu_cblist_init(struct rcu_cblist *rclp)
}
/*
+ * Enqueue an rcu_head structure onto the specified callback list.
+ * This function assumes that the callback is non-lazy because it
+ * is intended for use by no-CBs CPUs, which do not distinguish
+ * between lazy and non-lazy RCU callbacks.
+ */
+void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp)
+{
+ *rclp->tail = rhp;
+ rclp->tail = &rhp->next;
+ WRITE_ONCE(rclp->len, rclp->len + 1);
+}
+
+/*
+ * Flush the second rcu_cblist structure onto the first one, obliterating
+ * any contents of the first. If rhp is non-NULL, enqueue it as the sole
+ * element of the second rcu_cblist structure, but ensuring that the second
+ * rcu_cblist structure, if initially non-empty, always appears non-empty
+ * throughout the process. If rdp is NULL, the second rcu_cblist structure
+ * is instead initialized to empty.
+ */
+void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
+ struct rcu_cblist *srclp,
+ struct rcu_head *rhp)
+{
+ drclp->head = srclp->head;
+ if (drclp->head)
+ drclp->tail = srclp->tail;
+ else
+ drclp->tail = &drclp->head;
+ drclp->len = srclp->len;
+ drclp->len_lazy = srclp->len_lazy;
+ if (!rhp) {
+ rcu_cblist_init(srclp);
+ } else {
+ rhp->next = NULL;
+ srclp->head = rhp;
+ srclp->tail = &rhp->next;
+ WRITE_ONCE(srclp->len, 1);
+ srclp->len_lazy = 0;
+ }
+}
+
+/*
* Dequeue the oldest rcu_head structure from the specified callback
* list. This function assumes that the callback is non-lazy, but
* the caller can later invoke rcu_cblist_dequeued_lazy() if it
@@ -44,6 +87,67 @@ struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp)
return rhp;
}
+/* Set the length of an rcu_segcblist structure. */
+void rcu_segcblist_set_len(struct rcu_segcblist *rsclp, long v)
+{
+#ifdef CONFIG_RCU_NOCB_CPU
+ atomic_long_set(&rsclp->len, v);
+#else
+ WRITE_ONCE(rsclp->len, v);
+#endif
+}
+
+/*
+ * Increase the numeric length of an rcu_segcblist structure by the
+ * specified amount, which can be negative. This can cause the ->len
+ * field to disagree with the actual number of callbacks on the structure.
+ * This increase is fully ordered with respect to the callers accesses
+ * both before and after.
+ */
+void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v)
+{
+#ifdef CONFIG_RCU_NOCB_CPU
+ smp_mb__before_atomic(); /* Up to the caller! */
+ atomic_long_add(v, &rsclp->len);
+ smp_mb__after_atomic(); /* Up to the caller! */
+#else
+ smp_mb(); /* Up to the caller! */
+ WRITE_ONCE(rsclp->len, rsclp->len + v);
+ smp_mb(); /* Up to the caller! */
+#endif
+}
+
+/*
+ * Increase the numeric length of an rcu_segcblist structure by one.
+ * This can cause the ->len field to disagree with the actual number of
+ * callbacks on the structure. This increase is fully ordered with respect
+ * to the callers accesses both before and after.
+ */
+void rcu_segcblist_inc_len(struct rcu_segcblist *rsclp)
+{
+ rcu_segcblist_add_len(rsclp, 1);
+}
+
+/*
+ * Exchange the numeric length of the specified rcu_segcblist structure
+ * with the specified value. This can cause the ->len field to disagree
+ * with the actual number of callbacks on the structure. This exchange is
+ * fully ordered with respect to the callers accesses both before and after.
+ */
+long rcu_segcblist_xchg_len(struct rcu_segcblist *rsclp, long v)
+{
+#ifdef CONFIG_RCU_NOCB_CPU
+ return atomic_long_xchg(&rsclp->len, v);
+#else
+ long ret = rsclp->len;
+
+ smp_mb(); /* Up to the caller! */
+ WRITE_ONCE(rsclp->len, v);
+ smp_mb(); /* Up to the caller! */
+ return ret;
+#endif
+}
+
/*
* Initialize an rcu_segcblist structure.
*/
@@ -56,8 +160,9 @@ void rcu_segcblist_init(struct rcu_segcblist *rsclp)
rsclp->head = NULL;
for (i = 0; i < RCU_CBLIST_NSEGS; i++)
rsclp->tails[i] = &rsclp->head;
- rsclp->len = 0;
+ rcu_segcblist_set_len(rsclp, 0);
rsclp->len_lazy = 0;
+ rsclp->enabled = 1;
}
/*
@@ -69,7 +174,16 @@ void rcu_segcblist_disable(struct rcu_segcblist *rsclp)
WARN_ON_ONCE(!rcu_segcblist_empty(rsclp));
WARN_ON_ONCE(rcu_segcblist_n_cbs(rsclp));
WARN_ON_ONCE(rcu_segcblist_n_lazy_cbs(rsclp));
- rsclp->tails[RCU_NEXT_TAIL] = NULL;
+ rsclp->enabled = 0;
+}
+
+/*
+ * Mark the specified rcu_segcblist structure as offloaded. This
+ * structure must be empty.
+ */
+void rcu_segcblist_offload(struct rcu_segcblist *rsclp)
+{
+ rsclp->offloaded = 1;
}
/*
@@ -118,6 +232,18 @@ struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp)
}
/*
+ * Return false if there are no CBs awaiting grace periods, otherwise,
+ * return true and store the nearest waited-upon grace period into *lp.
+ */
+bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp)
+{
+ if (!rcu_segcblist_pend_cbs(rsclp))
+ return false;
+ *lp = rsclp->gp_seq[RCU_WAIT_TAIL];
+ return true;
+}
+
+/*
* Enqueue the specified callback onto the specified rcu_segcblist
* structure, updating accounting as needed. Note that the ->len
* field may be accessed locklessly, hence the WRITE_ONCE().
@@ -129,13 +255,13 @@ struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp)
void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
struct rcu_head *rhp, bool lazy)
{
- WRITE_ONCE(rsclp->len, rsclp->len + 1); /* ->len sampled locklessly. */
+ rcu_segcblist_inc_len(rsclp);
if (lazy)
rsclp->len_lazy++;
smp_mb(); /* Ensure counts are updated before callback is enqueued. */
rhp->next = NULL;
- *rsclp->tails[RCU_NEXT_TAIL] = rhp;
- rsclp->tails[RCU_NEXT_TAIL] = &rhp->next;
+ WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
+ WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
}
/*
@@ -155,7 +281,7 @@ bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
if (rcu_segcblist_n_cbs(rsclp) == 0)
return false;
- WRITE_ONCE(rsclp->len, rsclp->len + 1);
+ rcu_segcblist_inc_len(rsclp);
if (lazy)
rsclp->len_lazy++;
smp_mb(); /* Ensure counts are updated before callback is entrained. */
@@ -163,9 +289,9 @@ bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
for (i = RCU_NEXT_TAIL; i > RCU_DONE_TAIL; i--)
if (rsclp->tails[i] != rsclp->tails[i - 1])
break;
- *rsclp->tails[i] = rhp;
+ WRITE_ONCE(*rsclp->tails[i], rhp);
for (; i <= RCU_NEXT_TAIL; i++)
- rsclp->tails[i] = &rhp->next;
+ WRITE_ONCE(rsclp->tails[i], &rhp->next);
return true;
}
@@ -182,9 +308,8 @@ void rcu_segcblist_extract_count(struct rcu_segcblist *rsclp,
struct rcu_cblist *rclp)
{
rclp->len_lazy += rsclp->len_lazy;
- rclp->len += rsclp->len;
rsclp->len_lazy = 0;
- WRITE_ONCE(rsclp->len, 0); /* ->len sampled locklessly. */
+ rclp->len = rcu_segcblist_xchg_len(rsclp, 0);
}
/*
@@ -200,12 +325,12 @@ void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
if (!rcu_segcblist_ready_cbs(rsclp))
return; /* Nothing to do. */
*rclp->tail = rsclp->head;
- rsclp->head = *rsclp->tails[RCU_DONE_TAIL];
- *rsclp->tails[RCU_DONE_TAIL] = NULL;
+ WRITE_ONCE(rsclp->head, *rsclp->tails[RCU_DONE_TAIL]);
+ WRITE_ONCE(*rsclp->tails[RCU_DONE_TAIL], NULL);
rclp->tail = rsclp->tails[RCU_DONE_TAIL];
for (i = RCU_CBLIST_NSEGS - 1; i >= RCU_DONE_TAIL; i--)
if (rsclp->tails[i] == rsclp->tails[RCU_DONE_TAIL])
- rsclp->tails[i] = &rsclp->head;
+ WRITE_ONCE(rsclp->tails[i], &rsclp->head);
}
/*
@@ -224,9 +349,9 @@ void rcu_segcblist_extract_pend_cbs(struct rcu_segcblist *rsclp,
return; /* Nothing to do. */
*rclp->tail = *rsclp->tails[RCU_DONE_TAIL];
rclp->tail = rsclp->tails[RCU_NEXT_TAIL];
- *rsclp->tails[RCU_DONE_TAIL] = NULL;
+ WRITE_ONCE(*rsclp->tails[RCU_DONE_TAIL], NULL);
for (i = RCU_DONE_TAIL + 1; i < RCU_CBLIST_NSEGS; i++)
- rsclp->tails[i] = rsclp->tails[RCU_DONE_TAIL];
+ WRITE_ONCE(rsclp->tails[i], rsclp->tails[RCU_DONE_TAIL]);
}
/*
@@ -237,8 +362,7 @@ void rcu_segcblist_insert_count(struct rcu_segcblist *rsclp,
struct rcu_cblist *rclp)
{
rsclp->len_lazy += rclp->len_lazy;
- /* ->len sampled locklessly. */
- WRITE_ONCE(rsclp->len, rsclp->len + rclp->len);
+ rcu_segcblist_add_len(rsclp, rclp->len);
rclp->len_lazy = 0;
rclp->len = 0;
}
@@ -255,10 +379,10 @@ void rcu_segcblist_insert_done_cbs(struct rcu_segcblist *rsclp,
if (!rclp->head)
return; /* No callbacks to move. */
*rclp->tail = rsclp->head;
- rsclp->head = rclp->head;
+ WRITE_ONCE(rsclp->head, rclp->head);
for (i = RCU_DONE_TAIL; i < RCU_CBLIST_NSEGS; i++)
if (&rsclp->head == rsclp->tails[i])
- rsclp->tails[i] = rclp->tail;
+ WRITE_ONCE(rsclp->tails[i], rclp->tail);
else
break;
rclp->head = NULL;
@@ -274,8 +398,8 @@ void rcu_segcblist_insert_pend_cbs(struct rcu_segcblist *rsclp,
{
if (!rclp->head)
return; /* Nothing to do. */
- *rsclp->tails[RCU_NEXT_TAIL] = rclp->head;
- rsclp->tails[RCU_NEXT_TAIL] = rclp->tail;
+ WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rclp->head);
+ WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], rclp->tail);
rclp->head = NULL;
rclp->tail = &rclp->head;
}
@@ -299,7 +423,7 @@ void rcu_segcblist_advance(struct rcu_segcblist *rsclp, unsigned long seq)
for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) {
if (ULONG_CMP_LT(seq, rsclp->gp_seq[i]))
break;
- rsclp->tails[RCU_DONE_TAIL] = rsclp->tails[i];
+ WRITE_ONCE(rsclp->tails[RCU_DONE_TAIL], rsclp->tails[i]);
}
/* If no callbacks moved, nothing more need be done. */
@@ -308,7 +432,7 @@ void rcu_segcblist_advance(struct rcu_segcblist *rsclp, unsigned long seq)
/* Clean up tail pointers that might have been misordered above. */
for (j = RCU_WAIT_TAIL; j < i; j++)
- rsclp->tails[j] = rsclp->tails[RCU_DONE_TAIL];
+ WRITE_ONCE(rsclp->tails[j], rsclp->tails[RCU_DONE_TAIL]);
/*
* Callbacks moved, so clean up the misordered ->tails[] pointers
@@ -319,7 +443,7 @@ void rcu_segcblist_advance(struct rcu_segcblist *rsclp, unsigned long seq)
for (j = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++, j++) {
if (rsclp->tails[j] == rsclp->tails[RCU_NEXT_TAIL])
break; /* No more callbacks. */
- rsclp->tails[j] = rsclp->tails[i];
+ WRITE_ONCE(rsclp->tails[j], rsclp->tails[i]);
rsclp->gp_seq[j] = rsclp->gp_seq[i];
}
}
@@ -384,7 +508,7 @@ bool rcu_segcblist_accelerate(struct rcu_segcblist *rsclp, unsigned long seq)
* structure other than in the RCU_NEXT_TAIL segment.
*/
for (; i < RCU_NEXT_TAIL; i++) {
- rsclp->tails[i] = rsclp->tails[RCU_NEXT_TAIL];
+ WRITE_ONCE(rsclp->tails[i], rsclp->tails[RCU_NEXT_TAIL]);
rsclp->gp_seq[i] = seq;
}
return true;
diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
index 71b64648464e..815c2fdd3fcc 100644
--- a/kernel/rcu/rcu_segcblist.h
+++ b/kernel/rcu/rcu_segcblist.h
@@ -9,6 +9,12 @@
#include <linux/rcu_segcblist.h>
+/* Return number of callbacks in the specified callback list. */
+static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp)
+{
+ return READ_ONCE(rclp->len);
+}
+
/*
* Account for the fact that a previously dequeued callback turned out
* to be marked as lazy.
@@ -19,6 +25,10 @@ static inline void rcu_cblist_dequeued_lazy(struct rcu_cblist *rclp)
}
void rcu_cblist_init(struct rcu_cblist *rclp);
+void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp);
+void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
+ struct rcu_cblist *srclp,
+ struct rcu_head *rhp);
struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp);
/*
@@ -36,13 +46,17 @@ struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp);
*/
static inline bool rcu_segcblist_empty(struct rcu_segcblist *rsclp)
{
- return !rsclp->head;
+ return !READ_ONCE(rsclp->head);
}
/* Return number of callbacks in segmented callback list. */
static inline long rcu_segcblist_n_cbs(struct rcu_segcblist *rsclp)
{
+#ifdef CONFIG_RCU_NOCB_CPU
+ return atomic_long_read(&rsclp->len);
+#else
return READ_ONCE(rsclp->len);
+#endif
}
/* Return number of lazy callbacks in segmented callback list. */
@@ -54,16 +68,22 @@ static inline long rcu_segcblist_n_lazy_cbs(struct rcu_segcblist *rsclp)
/* Return number of lazy callbacks in segmented callback list. */
static inline long rcu_segcblist_n_nonlazy_cbs(struct rcu_segcblist *rsclp)
{
- return rsclp->len - rsclp->len_lazy;
+ return rcu_segcblist_n_cbs(rsclp) - rsclp->len_lazy;
}
/*
* Is the specified rcu_segcblist enabled, for example, not corresponding
- * to an offline or callback-offloaded CPU?
+ * to an offline CPU?
*/
static inline bool rcu_segcblist_is_enabled(struct rcu_segcblist *rsclp)
{
- return !!rsclp->tails[RCU_NEXT_TAIL];
+ return rsclp->enabled;
+}
+
+/* Is the specified rcu_segcblist offloaded? */
+static inline bool rcu_segcblist_is_offloaded(struct rcu_segcblist *rsclp)
+{
+ return rsclp->offloaded;
}
/*
@@ -73,36 +93,18 @@ static inline bool rcu_segcblist_is_enabled(struct rcu_segcblist *rsclp)
*/
static inline bool rcu_segcblist_restempty(struct rcu_segcblist *rsclp, int seg)
{
- return !*rsclp->tails[seg];
-}
-
-/*
- * Interim function to return rcu_segcblist head pointer. Longer term, the
- * rcu_segcblist will be used more pervasively, removing the need for this
- * function.
- */
-static inline struct rcu_head *rcu_segcblist_head(struct rcu_segcblist *rsclp)
-{
- return rsclp->head;
-}
-
-/*
- * Interim function to return rcu_segcblist head pointer. Longer term, the
- * rcu_segcblist will be used more pervasively, removing the need for this
- * function.
- */
-static inline struct rcu_head **rcu_segcblist_tail(struct rcu_segcblist *rsclp)
-{
- WARN_ON_ONCE(rcu_segcblist_empty(rsclp));
- return rsclp->tails[RCU_NEXT_TAIL];
+ return !READ_ONCE(*READ_ONCE(rsclp->tails[seg]));
}
+void rcu_segcblist_inc_len(struct rcu_segcblist *rsclp);
void rcu_segcblist_init(struct rcu_segcblist *rsclp);
void rcu_segcblist_disable(struct rcu_segcblist *rsclp);
+void rcu_segcblist_offload(struct rcu_segcblist *rsclp);
bool rcu_segcblist_ready_cbs(struct rcu_segcblist *rsclp);
bool rcu_segcblist_pend_cbs(struct rcu_segcblist *rsclp);
struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp);
struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp);
+bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp);
void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
struct rcu_head *rhp, bool lazy);
bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
diff --git a/kernel/rcu/rcuperf.c b/kernel/rcu/rcuperf.c
index 7a6890b23c5f..5a879d073c1c 100644
--- a/kernel/rcu/rcuperf.c
+++ b/kernel/rcu/rcuperf.c
@@ -89,7 +89,7 @@ torture_param(int, writer_holdoff, 0, "Holdoff (us) between GPs, zero to disable
static char *perf_type = "rcu";
module_param(perf_type, charp, 0444);
-MODULE_PARM_DESC(perf_type, "Type of RCU to performance-test (rcu, rcu_bh, ...)");
+MODULE_PARM_DESC(perf_type, "Type of RCU to performance-test (rcu, srcu, ...)");
static int nrealreaders;
static int nrealwriters;
@@ -375,6 +375,14 @@ rcu_perf_writer(void *arg)
if (holdoff)
schedule_timeout_uninterruptible(holdoff * HZ);
+ /*
+ * Wait until rcu_end_inkernel_boot() is called for normal GP tests
+ * so that RCU is not always expedited for normal GP tests.
+ * The system_state test is approximate, but works well in practice.
+ */
+ while (!gp_exp && system_state != SYSTEM_RUNNING)
+ schedule_timeout_uninterruptible(1);
+
t = ktime_get_mono_fast_ns();
if (atomic_inc_return(&n_rcu_perf_writer_started) >= nrealwriters) {
t_rcu_perf_writer_started = t;
diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c
index fce4e7e6f502..3c9feca1eab1 100644
--- a/kernel/rcu/rcutorture.c
+++ b/kernel/rcu/rcutorture.c
@@ -161,6 +161,7 @@ static atomic_long_t n_rcu_torture_timers;
static long n_barrier_attempts;
static long n_barrier_successes; /* did rcu_barrier test succeed? */
static struct list_head rcu_torture_removed;
+static unsigned long shutdown_jiffies;
static int rcu_torture_writer_state;
#define RTWS_FIXED_DELAY 0
@@ -228,6 +229,15 @@ static u64 notrace rcu_trace_clock_local(void)
}
#endif /* #else #ifdef CONFIG_RCU_TRACE */
+/*
+ * Stop aggressive CPU-hog tests a bit before the end of the test in order
+ * to avoid interfering with test shutdown.
+ */
+static bool shutdown_time_arrived(void)
+{
+ return shutdown_secs && time_after(jiffies, shutdown_jiffies - 30 * HZ);
+}
+
static unsigned long boost_starttime; /* jiffies of next boost test start. */
static DEFINE_MUTEX(boost_mutex); /* protect setting boost_starttime */
/* and boost task create/destroy. */
@@ -1713,12 +1723,14 @@ static void rcu_torture_fwd_cb_cr(struct rcu_head *rhp)
}
// Give the scheduler a chance, even on nohz_full CPUs.
-static void rcu_torture_fwd_prog_cond_resched(void)
+static void rcu_torture_fwd_prog_cond_resched(unsigned long iter)
{
if (IS_ENABLED(CONFIG_PREEMPT) && IS_ENABLED(CONFIG_NO_HZ_FULL)) {
- if (need_resched())
+ // Real call_rcu() floods hit userspace, so emulate that.
+ if (need_resched() || (iter & 0xfff))
schedule();
} else {
+ // No userspace emulation: CB invocation throttles call_rcu()
cond_resched();
}
}
@@ -1746,7 +1758,7 @@ static unsigned long rcu_torture_fwd_prog_cbfree(void)
spin_unlock_irqrestore(&rcu_fwd_lock, flags);
kfree(rfcp);
freed++;
- rcu_torture_fwd_prog_cond_resched();
+ rcu_torture_fwd_prog_cond_resched(freed);
}
return freed;
}
@@ -1785,15 +1797,17 @@ static void rcu_torture_fwd_prog_nr(int *tested, int *tested_tries)
WRITE_ONCE(rcu_fwd_startat, jiffies);
stopat = rcu_fwd_startat + dur;
while (time_before(jiffies, stopat) &&
+ !shutdown_time_arrived() &&
!READ_ONCE(rcu_fwd_emergency_stop) && !torture_must_stop()) {
idx = cur_ops->readlock();
udelay(10);
cur_ops->readunlock(idx);
if (!fwd_progress_need_resched || need_resched())
- rcu_torture_fwd_prog_cond_resched();
+ rcu_torture_fwd_prog_cond_resched(1);
}
(*tested_tries)++;
if (!time_before(jiffies, stopat) &&
+ !shutdown_time_arrived() &&
!READ_ONCE(rcu_fwd_emergency_stop) && !torture_must_stop()) {
(*tested)++;
cver = READ_ONCE(rcu_torture_current_version) - cver;
@@ -1852,6 +1866,7 @@ static void rcu_torture_fwd_prog_cr(void)
gps = cur_ops->get_gp_seq();
rcu_launder_gp_seq_start = gps;
while (time_before(jiffies, stopat) &&
+ !shutdown_time_arrived() &&
!READ_ONCE(rcu_fwd_emergency_stop) && !torture_must_stop()) {
rfcp = READ_ONCE(rcu_fwd_cb_head);
rfcpn = NULL;
@@ -1875,7 +1890,7 @@ static void rcu_torture_fwd_prog_cr(void)
rfcp->rfc_gps = 0;
}
cur_ops->call(&rfcp->rh, rcu_torture_fwd_cb_cr);
- rcu_torture_fwd_prog_cond_resched();
+ rcu_torture_fwd_prog_cond_resched(n_launders + n_max_cbs);
}
stoppedat = jiffies;
n_launders_cb_snap = READ_ONCE(n_launders_cb);
@@ -1884,7 +1899,8 @@ static void rcu_torture_fwd_prog_cr(void)
cur_ops->cb_barrier(); /* Wait for callbacks to be invoked. */
(void)rcu_torture_fwd_prog_cbfree();
- if (!torture_must_stop() && !READ_ONCE(rcu_fwd_emergency_stop)) {
+ if (!torture_must_stop() && !READ_ONCE(rcu_fwd_emergency_stop) &&
+ !shutdown_time_arrived()) {
WARN_ON(n_max_gps < MIN_FWD_CBS_LAUNDERED);
pr_alert("%s Duration %lu barrier: %lu pending %ld n_launders: %ld n_launders_sa: %ld n_max_gps: %ld n_max_cbs: %ld cver %ld gps %ld\n",
__func__,
@@ -2160,6 +2176,7 @@ rcu_torture_cleanup(void)
return;
}
+ show_rcu_gp_kthreads();
rcu_torture_barrier_cleanup();
torture_stop_kthread(rcu_torture_fwd_prog, fwd_prog_task);
torture_stop_kthread(rcu_torture_stall, stall_task);
@@ -2465,6 +2482,7 @@ rcu_torture_init(void)
goto unwind;
rcutor_hp = firsterr;
}
+ shutdown_jiffies = jiffies + shutdown_secs * HZ;
firsterr = torture_shutdown_init(shutdown_secs, rcu_torture_cleanup);
if (firsterr)
goto unwind;
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
index cf0e886314f2..5dffade2d7cd 100644
--- a/kernel/rcu/srcutree.c
+++ b/kernel/rcu/srcutree.c
@@ -1279,8 +1279,9 @@ void srcu_torture_stats_print(struct srcu_struct *ssp, char *tt, char *tf)
c0 = l0 - u0;
c1 = l1 - u1;
- pr_cont(" %d(%ld,%ld %1p)",
- cpu, c0, c1, rcu_segcblist_head(&sdp->srcu_cblist));
+ pr_cont(" %d(%ld,%ld %c)",
+ cpu, c0, c1,
+ "C."[rcu_segcblist_empty(&sdp->srcu_cblist)]);
s0 += c0;
s1 += c1;
}
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index a14e5fbbea46..81105141b6a8 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -56,6 +56,7 @@
#include <linux/smpboot.h>
#include <linux/jiffies.h>
#include <linux/sched/isolation.h>
+#include <linux/sched/clock.h>
#include "../time/tick-internal.h"
#include "tree.h"
@@ -210,9 +211,9 @@ static long rcu_get_n_cbs_cpu(int cpu)
{
struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
- if (rcu_segcblist_is_enabled(&rdp->cblist)) /* Online normal CPU? */
+ if (rcu_segcblist_is_enabled(&rdp->cblist))
return rcu_segcblist_n_cbs(&rdp->cblist);
- return rcu_get_n_cbs_nocb_cpu(rdp); /* Works for offline, too. */
+ return 0;
}
void rcu_softirq_qs(void)
@@ -416,6 +417,12 @@ module_param(qlowmark, long, 0444);
static ulong jiffies_till_first_fqs = ULONG_MAX;
static ulong jiffies_till_next_fqs = ULONG_MAX;
static bool rcu_kick_kthreads;
+static int rcu_divisor = 7;
+module_param(rcu_divisor, int, 0644);
+
+/* Force an exit from rcu_do_batch() after 3 milliseconds. */
+static long rcu_resched_ns = 3 * NSEC_PER_MSEC;
+module_param(rcu_resched_ns, long, 0644);
/*
* How long the grace period must be before we start recruiting
@@ -1251,6 +1258,7 @@ static bool rcu_accelerate_cbs(struct rcu_node *rnp, struct rcu_data *rdp)
unsigned long gp_seq_req;
bool ret = false;
+ rcu_lockdep_assert_cblist_protected(rdp);
raw_lockdep_assert_held_rcu_node(rnp);
/* If no pending (not yet ready to invoke) callbacks, nothing to do. */
@@ -1292,7 +1300,7 @@ static void rcu_accelerate_cbs_unlocked(struct rcu_node *rnp,
unsigned long c;
bool needwake;
- lockdep_assert_irqs_disabled();
+ rcu_lockdep_assert_cblist_protected(rdp);
c = rcu_seq_snap(&rcu_state.gp_seq);
if (!rdp->gpwrap && ULONG_CMP_GE(rdp->gp_seq_needed, c)) {
/* Old request still live, so mark recent callbacks. */
@@ -1318,6 +1326,7 @@ static void rcu_accelerate_cbs_unlocked(struct rcu_node *rnp,
*/
static bool rcu_advance_cbs(struct rcu_node *rnp, struct rcu_data *rdp)
{
+ rcu_lockdep_assert_cblist_protected(rdp);
raw_lockdep_assert_held_rcu_node(rnp);
/* If no pending (not yet ready to invoke) callbacks, nothing to do. */
@@ -1335,6 +1344,21 @@ static bool rcu_advance_cbs(struct rcu_node *rnp, struct rcu_data *rdp)
}
/*
+ * Move and classify callbacks, but only if doing so won't require
+ * that the RCU grace-period kthread be awakened.
+ */
+static void __maybe_unused rcu_advance_cbs_nowake(struct rcu_node *rnp,
+ struct rcu_data *rdp)
+{
+ rcu_lockdep_assert_cblist_protected(rdp);
+ if (!rcu_seq_state(rcu_seq_current(&rnp->gp_seq)) ||
+ !raw_spin_trylock_rcu_node(rnp))
+ return;
+ WARN_ON_ONCE(rcu_advance_cbs(rnp, rdp));
+ raw_spin_unlock_rcu_node(rnp);
+}
+
+/*
* Update CPU-local rcu_data state to record the beginnings and ends of
* grace periods. The caller must hold the ->lock of the leaf rcu_node
* structure corresponding to the current CPU, and must have irqs disabled.
@@ -1342,8 +1366,10 @@ static bool rcu_advance_cbs(struct rcu_node *rnp, struct rcu_data *rdp)
*/
static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp)
{
- bool ret;
+ bool ret = false;
bool need_gp;
+ const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+ rcu_segcblist_is_offloaded(&rdp->cblist);
raw_lockdep_assert_held_rcu_node(rnp);
@@ -1353,10 +1379,12 @@ static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp)
/* Handle the ends of any preceding grace periods first. */
if (rcu_seq_completed_gp(rdp->gp_seq, rnp->gp_seq) ||
unlikely(READ_ONCE(rdp->gpwrap))) {
- ret = rcu_advance_cbs(rnp, rdp); /* Advance callbacks. */
+ if (!offloaded)
+ ret = rcu_advance_cbs(rnp, rdp); /* Advance CBs. */
trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("cpuend"));
} else {
- ret = rcu_accelerate_cbs(rnp, rdp); /* Recent callbacks. */
+ if (!offloaded)
+ ret = rcu_accelerate_cbs(rnp, rdp); /* Recent CBs. */
}
/* Now handle the beginnings of any new-to-this-CPU grace periods. */
@@ -1657,6 +1685,7 @@ static void rcu_gp_cleanup(void)
unsigned long gp_duration;
bool needgp = false;
unsigned long new_gp_seq;
+ bool offloaded;
struct rcu_data *rdp;
struct rcu_node *rnp = rcu_get_root();
struct swait_queue_head *sq;
@@ -1722,7 +1751,9 @@ static void rcu_gp_cleanup(void)
needgp = true;
}
/* Advance CBs to reduce false positives below. */
- if (!rcu_accelerate_cbs(rnp, rdp) && needgp) {
+ offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+ rcu_segcblist_is_offloaded(&rdp->cblist);
+ if ((offloaded || !rcu_accelerate_cbs(rnp, rdp)) && needgp) {
WRITE_ONCE(rcu_state.gp_flags, RCU_GP_FLAG_INIT);
rcu_state.gp_req_activity = jiffies;
trace_rcu_grace_period(rcu_state.name,
@@ -1881,7 +1912,7 @@ rcu_report_unblock_qs_rnp(struct rcu_node *rnp, unsigned long flags)
struct rcu_node *rnp_p;
raw_lockdep_assert_held_rcu_node(rnp);
- if (WARN_ON_ONCE(!IS_ENABLED(CONFIG_PREEMPT)) ||
+ if (WARN_ON_ONCE(!IS_ENABLED(CONFIG_PREEMPTION)) ||
WARN_ON_ONCE(rcu_preempt_blocked_readers_cgp(rnp)) ||
rnp->qsmask != 0) {
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
@@ -1916,7 +1947,9 @@ rcu_report_qs_rdp(int cpu, struct rcu_data *rdp)
{
unsigned long flags;
unsigned long mask;
- bool needwake;
+ bool needwake = false;
+ const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+ rcu_segcblist_is_offloaded(&rdp->cblist);
struct rcu_node *rnp;
rnp = rdp->mynode;
@@ -1943,7 +1976,8 @@ rcu_report_qs_rdp(int cpu, struct rcu_data *rdp)
* This GP can't end until cpu checks in, so all of our
* callbacks can be processed during the next GP.
*/
- needwake = rcu_accelerate_cbs(rnp, rdp);
+ if (!offloaded)
+ needwake = rcu_accelerate_cbs(rnp, rdp);
rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
/* ^^^ Released rnp->lock */
@@ -2077,9 +2111,12 @@ int rcutree_dead_cpu(unsigned int cpu)
static void rcu_do_batch(struct rcu_data *rdp)
{
unsigned long flags;
+ const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+ rcu_segcblist_is_offloaded(&rdp->cblist);
struct rcu_head *rhp;
struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
long bl, count;
+ long pending, tlimit = 0;
/* If no callbacks are ready, just return. */
if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
@@ -2099,13 +2136,19 @@ static void rcu_do_batch(struct rcu_data *rdp)
* callback counts, as rcu_barrier() needs to be conservative.
*/
local_irq_save(flags);
+ rcu_nocb_lock(rdp);
WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));
- bl = rdp->blimit;
+ pending = rcu_segcblist_n_cbs(&rdp->cblist);
+ bl = max(rdp->blimit, pending >> rcu_divisor);
+ if (unlikely(bl > 100))
+ tlimit = local_clock() + rcu_resched_ns;
trace_rcu_batch_start(rcu_state.name,
rcu_segcblist_n_lazy_cbs(&rdp->cblist),
rcu_segcblist_n_cbs(&rdp->cblist), bl);
rcu_segcblist_extract_done_cbs(&rdp->cblist, &rcl);
- local_irq_restore(flags);
+ if (offloaded)
+ rdp->qlen_last_fqs_check = rcu_segcblist_n_cbs(&rdp->cblist);
+ rcu_nocb_unlock_irqrestore(rdp, flags);
/* Invoke callbacks. */
rhp = rcu_cblist_dequeue(&rcl);
@@ -2117,13 +2160,29 @@ static void rcu_do_batch(struct rcu_data *rdp)
* Stop only if limit reached and CPU has something to do.
* Note: The rcl structure counts down from zero.
*/
- if (-rcl.len >= bl &&
+ if (-rcl.len >= bl && !offloaded &&
(need_resched() ||
(!is_idle_task(current) && !rcu_is_callbacks_kthread())))
break;
+ if (unlikely(tlimit)) {
+ /* only call local_clock() every 32 callbacks */
+ if (likely((-rcl.len & 31) || local_clock() < tlimit))
+ continue;
+ /* Exceeded the time limit, so leave. */
+ break;
+ }
+ if (offloaded) {
+ WARN_ON_ONCE(in_serving_softirq());
+ local_bh_enable();
+ lockdep_assert_irqs_enabled();
+ cond_resched_tasks_rcu_qs();
+ lockdep_assert_irqs_enabled();
+ local_bh_disable();
+ }
}
local_irq_save(flags);
+ rcu_nocb_lock(rdp);
count = -rcl.len;
trace_rcu_batch_end(rcu_state.name, count, !!rcl.head, need_resched(),
is_idle_task(current), rcu_is_callbacks_kthread());
@@ -2149,12 +2208,14 @@ static void rcu_do_batch(struct rcu_data *rdp)
* The following usually indicates a double call_rcu(). To track
* this down, try building with CONFIG_DEBUG_OBJECTS_RCU_HEAD=y.
*/
- WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) != (count == 0));
+ WARN_ON_ONCE(count == 0 && !rcu_segcblist_empty(&rdp->cblist));
+ WARN_ON_ONCE(!IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+ count != 0 && rcu_segcblist_empty(&rdp->cblist));
- local_irq_restore(flags);
+ rcu_nocb_unlock_irqrestore(rdp, flags);
/* Re-invoke RCU core processing if there are callbacks remaining. */
- if (rcu_segcblist_ready_cbs(&rdp->cblist))
+ if (!offloaded && rcu_segcblist_ready_cbs(&rdp->cblist))
invoke_rcu_core();
}
@@ -2205,7 +2266,7 @@ static void force_qs_rnp(int (*f)(struct rcu_data *rdp))
mask = 0;
raw_spin_lock_irqsave_rcu_node(rnp, flags);
if (rnp->qsmask == 0) {
- if (!IS_ENABLED(CONFIG_PREEMPT) ||
+ if (!IS_ENABLED(CONFIG_PREEMPTION) ||
rcu_preempt_blocked_readers_cgp(rnp)) {
/*
* No point in scanning bits because they
@@ -2280,6 +2341,8 @@ static __latent_entropy void rcu_core(void)
unsigned long flags;
struct rcu_data *rdp = raw_cpu_ptr(&rcu_data);
struct rcu_node *rnp = rdp->mynode;
+ const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+ rcu_segcblist_is_offloaded(&rdp->cblist);
if (cpu_is_offline(smp_processor_id()))
return;
@@ -2299,7 +2362,7 @@ static __latent_entropy void rcu_core(void)
/* No grace period and unregistered callbacks? */
if (!rcu_gp_in_progress() &&
- rcu_segcblist_is_enabled(&rdp->cblist)) {
+ rcu_segcblist_is_enabled(&rdp->cblist) && !offloaded) {
local_irq_save(flags);
if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL))
rcu_accelerate_cbs_unlocked(rnp, rdp);
@@ -2309,7 +2372,7 @@ static __latent_entropy void rcu_core(void)
rcu_check_gp_start_stall(rnp, rdp, rcu_jiffies_till_stall_check());
/* If there are callbacks ready, invoke them. */
- if (rcu_segcblist_ready_cbs(&rdp->cblist) &&
+ if (!offloaded && rcu_segcblist_ready_cbs(&rdp->cblist) &&
likely(READ_ONCE(rcu_scheduler_fully_active)))
rcu_do_batch(rdp);
@@ -2489,10 +2552,11 @@ static void rcu_leak_callback(struct rcu_head *rhp)
* is expected to specify a CPU.
*/
static void
-__call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
+__call_rcu(struct rcu_head *head, rcu_callback_t func, bool lazy)
{
unsigned long flags;
struct rcu_data *rdp;
+ bool was_alldone;
/* Misaligned rcu_head! */
WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1));
@@ -2514,28 +2578,18 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
rdp = this_cpu_ptr(&rcu_data);
/* Add the callback to our list. */
- if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist)) || cpu != -1) {
- int offline;
-
- if (cpu != -1)
- rdp = per_cpu_ptr(&rcu_data, cpu);
- if (likely(rdp->mynode)) {
- /* Post-boot, so this should be for a no-CBs CPU. */
- offline = !__call_rcu_nocb(rdp, head, lazy, flags);
- WARN_ON_ONCE(offline);
- /* Offline CPU, _call_rcu() illegal, leak callback. */
- local_irq_restore(flags);
- return;
- }
- /*
- * Very early boot, before rcu_init(). Initialize if needed
- * and then drop through to queue the callback.
- */
- WARN_ON_ONCE(cpu != -1);
+ if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist))) {
+ // This can trigger due to call_rcu() from offline CPU:
+ WARN_ON_ONCE(rcu_scheduler_active != RCU_SCHEDULER_INACTIVE);
WARN_ON_ONCE(!rcu_is_watching());
+ // Very early boot, before rcu_init(). Initialize if needed
+ // and then drop through to queue the callback.
if (rcu_segcblist_empty(&rdp->cblist))
rcu_segcblist_init(&rdp->cblist);
}
+ if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
+ return; // Enqueued onto ->nocb_bypass, so just leave.
+ /* If we get here, rcu_nocb_try_bypass() acquired ->nocb_lock. */
rcu_segcblist_enqueue(&rdp->cblist, head, lazy);
if (__is_kfree_rcu_offset((unsigned long)func))
trace_rcu_kfree_callback(rcu_state.name, head,
@@ -2548,8 +2602,13 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
rcu_segcblist_n_cbs(&rdp->cblist));
/* Go handle any RCU core processing required. */
- __call_rcu_core(rdp, head, flags);
- local_irq_restore(flags);
+ if (IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+ unlikely(rcu_segcblist_is_offloaded(&rdp->cblist))) {
+ __call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */
+ } else {
+ __call_rcu_core(rdp, head, flags);
+ local_irq_restore(flags);
+ }
}
/**
@@ -2589,7 +2648,7 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
*/
void call_rcu(struct rcu_head *head, rcu_callback_t func)
{
- __call_rcu(head, func, -1, 0);
+ __call_rcu(head, func, 0);
}
EXPORT_SYMBOL_GPL(call_rcu);
@@ -2602,7 +2661,7 @@ EXPORT_SYMBOL_GPL(call_rcu);
*/
void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
{
- __call_rcu(head, func, -1, 1);
+ __call_rcu(head, func, 1);
}
EXPORT_SYMBOL_GPL(kfree_call_rcu);
@@ -2622,7 +2681,7 @@ static int rcu_blocking_is_gp(void)
{
int ret;
- if (IS_ENABLED(CONFIG_PREEMPT))
+ if (IS_ENABLED(CONFIG_PREEMPTION))
return rcu_scheduler_active == RCU_SCHEDULER_INACTIVE;
might_sleep(); /* Check for RCU read-side critical section. */
preempt_disable();
@@ -2735,6 +2794,10 @@ static int rcu_pending(void)
/* Check for CPU stalls, if enabled. */
check_cpu_stall(rdp);
+ /* Does this CPU need a deferred NOCB wakeup? */
+ if (rcu_nocb_need_deferred_wakeup(rdp))
+ return 1;
+
/* Is this CPU a NO_HZ_FULL CPU that should ignore RCU? */
if (rcu_nohz_full_cpu())
return 0;
@@ -2750,6 +2813,8 @@ static int rcu_pending(void)
/* Has RCU gone idle with this CPU needing another grace period? */
if (!rcu_gp_in_progress() &&
rcu_segcblist_is_enabled(&rdp->cblist) &&
+ (!IS_ENABLED(CONFIG_RCU_NOCB_CPU) ||
+ !rcu_segcblist_is_offloaded(&rdp->cblist)) &&
!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL))
return 1;
@@ -2758,10 +2823,6 @@ static int rcu_pending(void)
unlikely(READ_ONCE(rdp->gpwrap))) /* outside lock */
return 1;
- /* Does this CPU need a deferred NOCB wakeup? */
- if (rcu_nocb_need_deferred_wakeup(rdp))
- return 1;
-
/* nothing to do */
return 0;
}
@@ -2801,6 +2862,8 @@ static void rcu_barrier_func(void *unused)
rcu_barrier_trace(TPS("IRQ"), -1, rcu_state.barrier_sequence);
rdp->barrier_head.func = rcu_barrier_callback;
debug_rcu_head_queue(&rdp->barrier_head);
+ rcu_nocb_lock(rdp);
+ WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head, 0)) {
atomic_inc(&rcu_state.barrier_cpu_count);
} else {
@@ -2808,6 +2871,7 @@ static void rcu_barrier_func(void *unused)
rcu_barrier_trace(TPS("IRQNQ"), -1,
rcu_state.barrier_sequence);
}
+ rcu_nocb_unlock(rdp);
}
/**
@@ -2858,22 +2922,11 @@ void rcu_barrier(void)
* corresponding CPU's preceding callbacks have been invoked.
*/
for_each_possible_cpu(cpu) {
- if (!cpu_online(cpu) && !rcu_is_nocb_cpu(cpu))
- continue;
rdp = per_cpu_ptr(&rcu_data, cpu);
- if (rcu_is_nocb_cpu(cpu)) {
- if (!rcu_nocb_cpu_needs_barrier(cpu)) {
- rcu_barrier_trace(TPS("OfflineNoCB"), cpu,
- rcu_state.barrier_sequence);
- } else {
- rcu_barrier_trace(TPS("OnlineNoCB"), cpu,
- rcu_state.barrier_sequence);
- smp_mb__before_atomic();
- atomic_inc(&rcu_state.barrier_cpu_count);
- __call_rcu(&rdp->barrier_head,
- rcu_barrier_callback, cpu, 0);
- }
- } else if (rcu_segcblist_n_cbs(&rdp->cblist)) {
+ if (!cpu_online(cpu) &&
+ !rcu_segcblist_is_offloaded(&rdp->cblist))
+ continue;
+ if (rcu_segcblist_n_cbs(&rdp->cblist)) {
rcu_barrier_trace(TPS("OnlineQ"), cpu,
rcu_state.barrier_sequence);
smp_call_function_single(cpu, rcu_barrier_func, NULL, 1);
@@ -2958,7 +3011,8 @@ rcu_boot_init_percpu_data(int cpu)
* Initializes a CPU's per-CPU RCU data. Note that only one online or
* offline event can be happening at a given time. Note also that we can
* accept some slop in the rsp->gp_seq access due to the fact that this
- * CPU cannot possibly have any RCU callbacks in flight yet.
+ * CPU cannot possibly have any non-offloaded RCU callbacks in flight yet.
+ * And any offloaded callbacks are being numbered elsewhere.
*/
int rcutree_prepare_cpu(unsigned int cpu)
{
@@ -2972,7 +3026,7 @@ int rcutree_prepare_cpu(unsigned int cpu)
rdp->n_force_qs_snap = rcu_state.n_force_qs;
rdp->blimit = blimit;
if (rcu_segcblist_empty(&rdp->cblist) && /* No early-boot CBs? */
- !init_nocb_callback_list(rdp))
+ !rcu_segcblist_is_offloaded(&rdp->cblist))
rcu_segcblist_init(&rdp->cblist); /* Re-enable callbacks. */
rdp->dynticks_nesting = 1; /* CPU not up, no tearing. */
rcu_dynticks_eqs_online();
@@ -3151,29 +3205,38 @@ void rcutree_migrate_callbacks(int cpu)
{
unsigned long flags;
struct rcu_data *my_rdp;
+ struct rcu_node *my_rnp;
struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
- struct rcu_node *rnp_root = rcu_get_root();
bool needwake;
- if (rcu_is_nocb_cpu(cpu) || rcu_segcblist_empty(&rdp->cblist))
+ if (rcu_segcblist_is_offloaded(&rdp->cblist) ||
+ rcu_segcblist_empty(&rdp->cblist))
return; /* No callbacks to migrate. */
local_irq_save(flags);
my_rdp = this_cpu_ptr(&rcu_data);
- if (rcu_nocb_adopt_orphan_cbs(my_rdp, rdp, flags)) {
- local_irq_restore(flags);
- return;
- }
- raw_spin_lock_rcu_node(rnp_root); /* irqs already disabled. */
+ my_rnp = my_rdp->mynode;
+ rcu_nocb_lock(my_rdp); /* irqs already disabled. */
+ WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies));
+ raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
/* Leverage recent GPs and set GP for new callbacks. */
- needwake = rcu_advance_cbs(rnp_root, rdp) ||
- rcu_advance_cbs(rnp_root, my_rdp);
+ needwake = rcu_advance_cbs(my_rnp, rdp) ||
+ rcu_advance_cbs(my_rnp, my_rdp);
rcu_segcblist_merge(&my_rdp->cblist, &rdp->cblist);
+ needwake = needwake || rcu_advance_cbs(my_rnp, my_rdp);
+ rcu_segcblist_disable(&rdp->cblist);
WARN_ON_ONCE(rcu_segcblist_empty(&my_rdp->cblist) !=
!rcu_segcblist_n_cbs(&my_rdp->cblist));
- raw_spin_unlock_irqrestore_rcu_node(rnp_root, flags);
+ if (rcu_segcblist_is_offloaded(&my_rdp->cblist)) {
+ raw_spin_unlock_rcu_node(my_rnp); /* irqs remain disabled. */
+ __call_rcu_nocb_wake(my_rdp, true, flags);
+ } else {
+ rcu_nocb_unlock(my_rdp); /* irqs remain disabled. */
+ raw_spin_unlock_irqrestore_rcu_node(my_rnp, flags);
+ }
if (needwake)
rcu_gp_kthread_wake();
+ lockdep_assert_irqs_enabled();
WARN_ONCE(rcu_segcblist_n_cbs(&rdp->cblist) != 0 ||
!rcu_segcblist_empty(&rdp->cblist),
"rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 1stCB=%p\n",
@@ -3234,13 +3297,13 @@ static int __init rcu_spawn_gp_kthread(void)
t = kthread_create(rcu_gp_kthread, NULL, "%s", rcu_state.name);
if (WARN_ONCE(IS_ERR(t), "%s: Could not start grace-period kthread, OOM is now expected behavior\n", __func__))
return 0;
- rnp = rcu_get_root();
- raw_spin_lock_irqsave_rcu_node(rnp, flags);
- rcu_state.gp_kthread = t;
if (kthread_prio) {
sp.sched_priority = kthread_prio;
sched_setscheduler_nocheck(t, SCHED_FIFO, &sp);
}
+ rnp = rcu_get_root();
+ raw_spin_lock_irqsave_rcu_node(rnp, flags);
+ rcu_state.gp_kthread = t;
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
wake_up_process(t);
rcu_spawn_nocb_kthreads();
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 7acaf3a62d39..c612f306fe89 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -194,29 +194,38 @@ struct rcu_data {
/* 5) Callback offloading. */
#ifdef CONFIG_RCU_NOCB_CPU
- struct rcu_head *nocb_head; /* CBs waiting for kthread. */
- struct rcu_head **nocb_tail;
- atomic_long_t nocb_q_count; /* # CBs waiting for nocb */
- atomic_long_t nocb_q_count_lazy; /* invocation (all stages). */
- struct rcu_head *nocb_follower_head; /* CBs ready to invoke. */
- struct rcu_head **nocb_follower_tail;
- struct swait_queue_head nocb_wq; /* For nocb kthreads to sleep on. */
- struct task_struct *nocb_kthread;
+ struct swait_queue_head nocb_cb_wq; /* For nocb kthreads to sleep on. */
+ struct task_struct *nocb_gp_kthread;
raw_spinlock_t nocb_lock; /* Guard following pair of fields. */
+ atomic_t nocb_lock_contended; /* Contention experienced. */
int nocb_defer_wakeup; /* Defer wakeup of nocb_kthread. */
struct timer_list nocb_timer; /* Enforce finite deferral. */
-
- /* The following fields are used by the leader, hence own cacheline. */
- struct rcu_head *nocb_gp_head ____cacheline_internodealigned_in_smp;
- /* CBs waiting for GP. */
- struct rcu_head **nocb_gp_tail;
- bool nocb_leader_sleep; /* Is the nocb leader thread asleep? */
- struct rcu_data *nocb_next_follower;
- /* Next follower in wakeup chain. */
-
- /* The following fields are used by the follower, hence new cachline. */
- struct rcu_data *nocb_leader ____cacheline_internodealigned_in_smp;
- /* Leader CPU takes GP-end wakeups. */
+ unsigned long nocb_gp_adv_time; /* Last call_rcu() CB adv (jiffies). */
+
+ /* The following fields are used by call_rcu, hence own cacheline. */
+ raw_spinlock_t nocb_bypass_lock ____cacheline_internodealigned_in_smp;
+ struct rcu_cblist nocb_bypass; /* Lock-contention-bypass CB list. */
+ unsigned long nocb_bypass_first; /* Time (jiffies) of first enqueue. */
+ unsigned long nocb_nobypass_last; /* Last ->cblist enqueue (jiffies). */
+ int nocb_nobypass_count; /* # ->cblist enqueues at ^^^ time. */
+
+ /* The following fields are used by GP kthread, hence own cacheline. */
+ raw_spinlock_t nocb_gp_lock ____cacheline_internodealigned_in_smp;
+ struct timer_list nocb_bypass_timer; /* Force nocb_bypass flush. */
+ u8 nocb_gp_sleep; /* Is the nocb GP thread asleep? */
+ u8 nocb_gp_bypass; /* Found a bypass on last scan? */
+ u8 nocb_gp_gp; /* GP to wait for on last scan? */
+ unsigned long nocb_gp_seq; /* If so, ->gp_seq to wait for. */
+ unsigned long nocb_gp_loops; /* # passes through wait code. */
+ struct swait_queue_head nocb_gp_wq; /* For nocb kthreads to sleep on. */
+ bool nocb_cb_sleep; /* Is the nocb CB thread asleep? */
+ struct task_struct *nocb_cb_kthread;
+ struct rcu_data *nocb_next_cb_rdp;
+ /* Next rcu_data in wakeup chain. */
+
+ /* The following fields are used by CB kthread, hence new cacheline. */
+ struct rcu_data *nocb_gp_rdp ____cacheline_internodealigned_in_smp;
+ /* GP rdp takes GP-end wakeups. */
#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
/* 6) RCU priority boosting. */
@@ -419,25 +428,39 @@ static bool rcu_preempt_has_tasks(struct rcu_node *rnp);
static bool rcu_preempt_need_deferred_qs(struct task_struct *t);
static void rcu_preempt_deferred_qs(struct task_struct *t);
static void zero_cpu_stall_ticks(struct rcu_data *rdp);
-static bool rcu_nocb_cpu_needs_barrier(int cpu);
static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
static void rcu_init_one_nocb(struct rcu_node *rnp);
-static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
- bool lazy, unsigned long flags);
-static bool rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp,
- struct rcu_data *rdp,
- unsigned long flags);
+static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+ unsigned long j);
+static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+ bool *was_alldone, unsigned long flags);
+static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
+ unsigned long flags);
static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp);
static void do_nocb_deferred_wakeup(struct rcu_data *rdp);
static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp);
static void rcu_spawn_cpu_nocb_kthread(int cpu);
static void __init rcu_spawn_nocb_kthreads(void);
+static void show_rcu_nocb_state(struct rcu_data *rdp);
+static void rcu_nocb_lock(struct rcu_data *rdp);
+static void rcu_nocb_unlock(struct rcu_data *rdp);
+static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
+ unsigned long flags);
+static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp);
#ifdef CONFIG_RCU_NOCB_CPU
static void __init rcu_organize_nocb_kthreads(void);
-#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
-static bool init_nocb_callback_list(struct rcu_data *rdp);
-static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp);
+#define rcu_nocb_lock_irqsave(rdp, flags) \
+do { \
+ if (!rcu_segcblist_is_offloaded(&(rdp)->cblist)) \
+ local_irq_save(flags); \
+ else \
+ raw_spin_lock_irqsave(&(rdp)->nocb_lock, (flags)); \
+} while (0)
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+#define rcu_nocb_lock_irqsave(rdp, flags) local_irq_save(flags)
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
+
static void rcu_bind_gp_kthread(void);
static bool rcu_nohz_full_cpu(void);
static void rcu_dynticks_task_enter(void);
diff --git a/kernel/rcu/tree_exp.h b/kernel/rcu/tree_exp.h
index af7e7b9c86af..d632cd019597 100644
--- a/kernel/rcu/tree_exp.h
+++ b/kernel/rcu/tree_exp.h
@@ -781,7 +781,7 @@ static int rcu_print_task_exp_stall(struct rcu_node *rnp)
* other hand, if the CPU is not in an RCU read-side critical section,
* the IPI handler reports the quiescent state immediately.
*
- * Although this is a greate improvement over previous expedited
+ * Although this is a great improvement over previous expedited
* implementations, it is still unfriendly to real-time workloads, so is
* thus not recommended for any sort of common-case code. In fact, if
* you are using synchronize_rcu_expedited() in a loop, please restructure
@@ -792,6 +792,7 @@ static int rcu_print_task_exp_stall(struct rcu_node *rnp)
*/
void synchronize_rcu_expedited(void)
{
+ bool boottime = (rcu_scheduler_active == RCU_SCHEDULER_INIT);
struct rcu_exp_work rew;
struct rcu_node *rnp;
unsigned long s;
@@ -817,7 +818,7 @@ void synchronize_rcu_expedited(void)
return; /* Someone else did our work for us. */
/* Ensure that load happens before action based on it. */
- if (unlikely(rcu_scheduler_active == RCU_SCHEDULER_INIT)) {
+ if (unlikely(boottime)) {
/* Direct call during scheduler init and early_initcalls(). */
rcu_exp_sel_wait_wake(s);
} else {
@@ -835,5 +836,8 @@ void synchronize_rcu_expedited(void)
/* Let the next expedited grace period start. */
mutex_unlock(&rcu_state.exp_mutex);
+
+ if (likely(!boottime))
+ destroy_work_on_stack(&rew.rew_work);
}
EXPORT_SYMBOL_GPL(synchronize_rcu_expedited);
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index acb225023ed1..2defc7fe74c3 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -288,7 +288,6 @@ void rcu_note_context_switch(bool preempt)
struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
struct rcu_node *rnp;
- barrier(); /* Avoid RCU read-side critical sections leaking down. */
trace_rcu_utilization(TPS("Start context switch"));
lockdep_assert_irqs_disabled();
WARN_ON_ONCE(!preempt && t->rcu_read_lock_nesting > 0);
@@ -314,15 +313,6 @@ void rcu_note_context_switch(bool preempt)
? rnp->gp_seq
: rcu_seq_snap(&rnp->gp_seq));
rcu_preempt_ctxt_queue(rnp, rdp);
- } else if (t->rcu_read_lock_nesting < 0 &&
- t->rcu_read_unlock_special.s) {
-
- /*
- * Complete exit from RCU read-side critical section on
- * behalf of preempted instance of __rcu_read_unlock().
- */
- rcu_read_unlock_special(t);
- rcu_preempt_deferred_qs(t);
} else {
rcu_preempt_deferred_qs(t);
}
@@ -340,7 +330,6 @@ void rcu_note_context_switch(bool preempt)
if (rdp->exp_deferred_qs)
rcu_report_exp_rdp(rdp);
trace_rcu_utilization(TPS("End context switch"));
- barrier(); /* Avoid RCU read-side critical sections leaking up. */
}
EXPORT_SYMBOL_GPL(rcu_note_context_switch);
@@ -626,22 +615,18 @@ static void rcu_read_unlock_special(struct task_struct *t)
(rdp->grpmask & rnp->expmask) ||
tick_nohz_full_cpu(rdp->cpu);
// Need to defer quiescent state until everything is enabled.
- if ((exp || in_irq()) && irqs_were_disabled && use_softirq &&
- (in_irq() || !t->rcu_read_unlock_special.b.deferred_qs)) {
+ if (irqs_were_disabled && use_softirq &&
+ (in_interrupt() ||
+ (exp && !t->rcu_read_unlock_special.b.deferred_qs))) {
// Using softirq, safe to awaken, and we get
// no help from enabling irqs, unlike bh/preempt.
raise_softirq_irqoff(RCU_SOFTIRQ);
- } else if (exp && irqs_were_disabled && !use_softirq &&
- !t->rcu_read_unlock_special.b.deferred_qs) {
- // Safe to awaken and we get no help from enabling
- // irqs, unlike bh/preempt.
- invoke_rcu_core();
} else {
// Enabling BH or preempt does reschedule, so...
// Also if no expediting or NO_HZ_FULL, slow is OK.
set_tsk_need_resched(current);
set_preempt_need_resched();
- if (IS_ENABLED(CONFIG_IRQ_WORK) &&
+ if (IS_ENABLED(CONFIG_IRQ_WORK) && irqs_were_disabled &&
!rdp->defer_qs_iw_pending && exp) {
// Get scheduler to re-evaluate and call hooks.
// If !IRQ_WORK, FQS scan will eventually IPI.
@@ -828,11 +813,6 @@ static void rcu_qs(void)
* dyntick-idle quiescent state visible to other CPUs, which will in
* some cases serve for expedited as well as normal grace periods.
* Either way, register a lightweight quiescent state.
- *
- * The barrier() calls are redundant in the common case when this is
- * called externally, but just in case this is called from within this
- * file.
- *
*/
void rcu_all_qs(void)
{
@@ -847,14 +827,12 @@ void rcu_all_qs(void)
return;
}
this_cpu_write(rcu_data.rcu_urgent_qs, false);
- barrier(); /* Avoid RCU read-side critical sections leaking down. */
if (unlikely(raw_cpu_read(rcu_data.rcu_need_heavy_qs))) {
local_irq_save(flags);
rcu_momentary_dyntick_idle();
local_irq_restore(flags);
}
rcu_qs();
- barrier(); /* Avoid RCU read-side critical sections leaking up. */
preempt_enable();
}
EXPORT_SYMBOL_GPL(rcu_all_qs);
@@ -864,7 +842,6 @@ EXPORT_SYMBOL_GPL(rcu_all_qs);
*/
void rcu_note_context_switch(bool preempt)
{
- barrier(); /* Avoid RCU read-side critical sections leaking down. */
trace_rcu_utilization(TPS("Start context switch"));
rcu_qs();
/* Load rcu_urgent_qs before other flags. */
@@ -877,7 +854,6 @@ void rcu_note_context_switch(bool preempt)
rcu_tasks_qs(current);
out:
trace_rcu_utilization(TPS("End context switch"));
- barrier(); /* Avoid RCU read-side critical sections leaking up. */
}
EXPORT_SYMBOL_GPL(rcu_note_context_switch);
@@ -1134,7 +1110,7 @@ static void rcu_preempt_boost_start_gp(struct rcu_node *rnp)
* already exist. We only create this kthread for preemptible RCU.
* Returns zero if all is well, a negated errno otherwise.
*/
-static int rcu_spawn_one_boost_kthread(struct rcu_node *rnp)
+static void rcu_spawn_one_boost_kthread(struct rcu_node *rnp)
{
int rnp_index = rnp - rcu_get_root();
unsigned long flags;
@@ -1142,25 +1118,27 @@ static int rcu_spawn_one_boost_kthread(struct rcu_node *rnp)
struct task_struct *t;
if (!IS_ENABLED(CONFIG_PREEMPT_RCU))
- return 0;
+ return;
if (!rcu_scheduler_fully_active || rcu_rnp_online_cpus(rnp) == 0)
- return 0;
+ return;
rcu_state.boost = 1;
+
if (rnp->boost_kthread_task != NULL)
- return 0;
+ return;
+
t = kthread_create(rcu_boost_kthread, (void *)rnp,
"rcub/%d", rnp_index);
- if (IS_ERR(t))
- return PTR_ERR(t);
+ if (WARN_ON_ONCE(IS_ERR(t)))
+ return;
+
raw_spin_lock_irqsave_rcu_node(rnp, flags);
rnp->boost_kthread_task = t;
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
sp.sched_priority = kthread_prio;
sched_setscheduler_nocheck(t, SCHED_FIFO, &sp);
wake_up_process(t); /* get to TASK_INTERRUPTIBLE quickly. */
- return 0;
}
/*
@@ -1201,7 +1179,7 @@ static void __init rcu_spawn_boost_kthreads(void)
struct rcu_node *rnp;
rcu_for_each_leaf_node(rnp)
- (void)rcu_spawn_one_boost_kthread(rnp);
+ rcu_spawn_one_boost_kthread(rnp);
}
static void rcu_prepare_kthreads(int cpu)
@@ -1211,7 +1189,7 @@ static void rcu_prepare_kthreads(int cpu)
/* Fire up the incoming CPU's kthread and leaf rcu_node kthread. */
if (rcu_scheduler_fully_active)
- (void)rcu_spawn_one_boost_kthread(rnp);
+ rcu_spawn_one_boost_kthread(rnp);
}
#else /* #ifdef CONFIG_RCU_BOOST */
@@ -1248,10 +1226,10 @@ static void rcu_prepare_kthreads(int cpu)
#if !defined(CONFIG_RCU_FAST_NO_HZ)
/*
- * Check to see if any future RCU-related work will need to be done
- * by the current CPU, even if none need be done immediately, returning
- * 1 if so. This function is part of the RCU implementation; it is -not-
- * an exported member of the RCU API.
+ * Check to see if any future non-offloaded RCU-related work will need
+ * to be done by the current CPU, even if none need be done immediately,
+ * returning 1 if so. This function is part of the RCU implementation;
+ * it is -not- an exported member of the RCU API.
*
* Because we not have RCU_FAST_NO_HZ, just check whether or not this
* CPU has RCU callbacks queued.
@@ -1259,7 +1237,8 @@ static void rcu_prepare_kthreads(int cpu)
int rcu_needs_cpu(u64 basemono, u64 *nextevt)
{
*nextevt = KTIME_MAX;
- return !rcu_segcblist_empty(&this_cpu_ptr(&rcu_data)->cblist);
+ return !rcu_segcblist_empty(&this_cpu_ptr(&rcu_data)->cblist) &&
+ !rcu_segcblist_is_offloaded(&this_cpu_ptr(&rcu_data)->cblist);
}
/*
@@ -1360,8 +1339,9 @@ int rcu_needs_cpu(u64 basemono, u64 *nextevt)
lockdep_assert_irqs_disabled();
- /* If no callbacks, RCU doesn't need the CPU. */
- if (rcu_segcblist_empty(&rdp->cblist)) {
+ /* If no non-offloaded callbacks, RCU doesn't need the CPU. */
+ if (rcu_segcblist_empty(&rdp->cblist) ||
+ rcu_segcblist_is_offloaded(&this_cpu_ptr(&rcu_data)->cblist)) {
*nextevt = KTIME_MAX;
return 0;
}
@@ -1404,7 +1384,7 @@ static void rcu_prepare_for_idle(void)
int tne;
lockdep_assert_irqs_disabled();
- if (rcu_is_nocb_cpu(smp_processor_id()))
+ if (rcu_segcblist_is_offloaded(&rdp->cblist))
return;
/* Handle nohz enablement switches conservatively. */
@@ -1453,8 +1433,10 @@ static void rcu_prepare_for_idle(void)
*/
static void rcu_cleanup_after_idle(void)
{
+ struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
+
lockdep_assert_irqs_disabled();
- if (rcu_is_nocb_cpu(smp_processor_id()))
+ if (rcu_segcblist_is_offloaded(&rdp->cblist))
return;
if (rcu_try_advance_all_cbs())
invoke_rcu_core();
@@ -1469,10 +1451,10 @@ static void rcu_cleanup_after_idle(void)
* specified by rcu_nocb_mask. For the CPUs in the set, there are kthreads
* created that pull the callbacks from the corresponding CPU, wait for
* a grace period to elapse, and invoke the callbacks. These kthreads
- * are organized into leaders, which manage incoming callbacks, wait for
- * grace periods, and awaken followers, and the followers, which only
- * invoke callbacks. Each leader is its own follower. The no-CBs CPUs
- * do a wake_up() on their kthread when they insert a callback into any
+ * are organized into GP kthreads, which manage incoming callbacks, wait for
+ * grace periods, and awaken CB kthreads, and the CB kthreads, which only
+ * invoke callbacks. Each GP kthread invokes its own CBs. The no-CBs CPUs
+ * do a wake_up() on their GP kthread when they insert a callback into any
* empty list, unless the rcu_nocb_poll boot parameter has been specified,
* in which case each kthread actively polls its CPU. (Which isn't so great
* for energy efficiency, but which does reduce RCU's overhead on that CPU.)
@@ -1515,6 +1497,116 @@ static int __init parse_rcu_nocb_poll(char *arg)
early_param("rcu_nocb_poll", parse_rcu_nocb_poll);
/*
+ * Don't bother bypassing ->cblist if the call_rcu() rate is low.
+ * After all, the main point of bypassing is to avoid lock contention
+ * on ->nocb_lock, which only can happen at high call_rcu() rates.
+ */
+int nocb_nobypass_lim_per_jiffy = 16 * 1000 / HZ;
+module_param(nocb_nobypass_lim_per_jiffy, int, 0);
+
+/*
+ * Acquire the specified rcu_data structure's ->nocb_bypass_lock. If the
+ * lock isn't immediately available, increment ->nocb_lock_contended to
+ * flag the contention.
+ */
+static void rcu_nocb_bypass_lock(struct rcu_data *rdp)
+{
+ lockdep_assert_irqs_disabled();
+ if (raw_spin_trylock(&rdp->nocb_bypass_lock))
+ return;
+ atomic_inc(&rdp->nocb_lock_contended);
+ WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
+ smp_mb__after_atomic(); /* atomic_inc() before lock. */
+ raw_spin_lock(&rdp->nocb_bypass_lock);
+ smp_mb__before_atomic(); /* atomic_dec() after lock. */
+ atomic_dec(&rdp->nocb_lock_contended);
+}
+
+/*
+ * Spinwait until the specified rcu_data structure's ->nocb_lock is
+ * not contended. Please note that this is extremely special-purpose,
+ * relying on the fact that at most two kthreads and one CPU contend for
+ * this lock, and also that the two kthreads are guaranteed to have frequent
+ * grace-period-duration time intervals between successive acquisitions
+ * of the lock. This allows us to use an extremely simple throttling
+ * mechanism, and further to apply it only to the CPU doing floods of
+ * call_rcu() invocations. Don't try this at home!
+ */
+static void rcu_nocb_wait_contended(struct rcu_data *rdp)
+{
+ WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
+ while (WARN_ON_ONCE(atomic_read(&rdp->nocb_lock_contended)))
+ cpu_relax();
+}
+
+/*
+ * Conditionally acquire the specified rcu_data structure's
+ * ->nocb_bypass_lock.
+ */
+static bool rcu_nocb_bypass_trylock(struct rcu_data *rdp)
+{
+ lockdep_assert_irqs_disabled();
+ return raw_spin_trylock(&rdp->nocb_bypass_lock);
+}
+
+/*
+ * Release the specified rcu_data structure's ->nocb_bypass_lock.
+ */
+static void rcu_nocb_bypass_unlock(struct rcu_data *rdp)
+{
+ lockdep_assert_irqs_disabled();
+ raw_spin_unlock(&rdp->nocb_bypass_lock);
+}
+
+/*
+ * Acquire the specified rcu_data structure's ->nocb_lock, but only
+ * if it corresponds to a no-CBs CPU.
+ */
+static void rcu_nocb_lock(struct rcu_data *rdp)
+{
+ lockdep_assert_irqs_disabled();
+ if (!rcu_segcblist_is_offloaded(&rdp->cblist))
+ return;
+ raw_spin_lock(&rdp->nocb_lock);
+}
+
+/*
+ * Release the specified rcu_data structure's ->nocb_lock, but only
+ * if it corresponds to a no-CBs CPU.
+ */
+static void rcu_nocb_unlock(struct rcu_data *rdp)
+{
+ if (rcu_segcblist_is_offloaded(&rdp->cblist)) {
+ lockdep_assert_irqs_disabled();
+ raw_spin_unlock(&rdp->nocb_lock);
+ }
+}
+
+/*
+ * Release the specified rcu_data structure's ->nocb_lock and restore
+ * interrupts, but only if it corresponds to a no-CBs CPU.
+ */
+static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
+ unsigned long flags)
+{
+ if (rcu_segcblist_is_offloaded(&rdp->cblist)) {
+ lockdep_assert_irqs_disabled();
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+ } else {
+ local_irq_restore(flags);
+ }
+}
+
+/* Lockdep check that ->cblist may be safely accessed. */
+static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
+{
+ lockdep_assert_irqs_disabled();
+ if (rcu_segcblist_is_offloaded(&rdp->cblist) &&
+ cpu_online(rdp->cpu))
+ lockdep_assert_held(&rdp->nocb_lock);
+}
+
+/*
* Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended
* grace period.
*/
@@ -1543,440 +1635,514 @@ bool rcu_is_nocb_cpu(int cpu)
}
/*
- * Kick the leader kthread for this NOCB group. Caller holds ->nocb_lock
+ * Kick the GP kthread for this NOCB group. Caller holds ->nocb_lock
* and this function releases it.
*/
-static void __wake_nocb_leader(struct rcu_data *rdp, bool force,
- unsigned long flags)
+static void wake_nocb_gp(struct rcu_data *rdp, bool force,
+ unsigned long flags)
__releases(rdp->nocb_lock)
{
- struct rcu_data *rdp_leader = rdp->nocb_leader;
+ bool needwake = false;
+ struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
lockdep_assert_held(&rdp->nocb_lock);
- if (!READ_ONCE(rdp_leader->nocb_kthread)) {
- raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+ if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) {
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+ TPS("AlreadyAwake"));
+ rcu_nocb_unlock_irqrestore(rdp, flags);
return;
}
- if (rdp_leader->nocb_leader_sleep || force) {
- /* Prior smp_mb__after_atomic() orders against prior enqueue. */
- WRITE_ONCE(rdp_leader->nocb_leader_sleep, false);
- del_timer(&rdp->nocb_timer);
- raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
- smp_mb(); /* ->nocb_leader_sleep before swake_up_one(). */
- swake_up_one(&rdp_leader->nocb_wq);
- } else {
- raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+ del_timer(&rdp->nocb_timer);
+ rcu_nocb_unlock_irqrestore(rdp, flags);
+ raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+ if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) {
+ WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
+ needwake = true;
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake"));
}
+ raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+ if (needwake)
+ wake_up_process(rdp_gp->nocb_gp_kthread);
}
/*
- * Kick the leader kthread for this NOCB group, but caller has not
- * acquired locks.
+ * Arrange to wake the GP kthread for this NOCB group at some future
+ * time when it is safe to do so.
*/
-static void wake_nocb_leader(struct rcu_data *rdp, bool force)
+static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
+ const char *reason)
{
- unsigned long flags;
+ if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT)
+ mod_timer(&rdp->nocb_timer, jiffies + 1);
+ if (rdp->nocb_defer_wakeup < waketype)
+ WRITE_ONCE(rdp->nocb_defer_wakeup, waketype);
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
+}
+
+/*
+ * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
+ * However, if there is a callback to be enqueued and if ->nocb_bypass
+ * proves to be initially empty, just return false because the no-CB GP
+ * kthread may need to be awakened in this case.
+ *
+ * Note that this function always returns true if rhp is NULL.
+ */
+static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+ unsigned long j)
+{
+ struct rcu_cblist rcl;
- raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
- __wake_nocb_leader(rdp, force, flags);
+ WARN_ON_ONCE(!rcu_segcblist_is_offloaded(&rdp->cblist));
+ rcu_lockdep_assert_cblist_protected(rdp);
+ lockdep_assert_held(&rdp->nocb_bypass_lock);
+ if (rhp && !rcu_cblist_n_cbs(&rdp->nocb_bypass)) {
+ raw_spin_unlock(&rdp->nocb_bypass_lock);
+ return false;
+ }
+ /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
+ if (rhp)
+ rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
+ rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
+ rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
+ WRITE_ONCE(rdp->nocb_bypass_first, j);
+ rcu_nocb_bypass_unlock(rdp);
+ return true;
}
/*
- * Arrange to wake the leader kthread for this NOCB group at some
- * future time when it is safe to do so.
+ * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
+ * However, if there is a callback to be enqueued and if ->nocb_bypass
+ * proves to be initially empty, just return false because the no-CB GP
+ * kthread may need to be awakened in this case.
+ *
+ * Note that this function always returns true if rhp is NULL.
*/
-static void wake_nocb_leader_defer(struct rcu_data *rdp, int waketype,
- const char *reason)
+static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+ unsigned long j)
{
- unsigned long flags;
+ if (!rcu_segcblist_is_offloaded(&rdp->cblist))
+ return true;
+ rcu_lockdep_assert_cblist_protected(rdp);
+ rcu_nocb_bypass_lock(rdp);
+ return rcu_nocb_do_flush_bypass(rdp, rhp, j);
+}
- raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
- if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT)
- mod_timer(&rdp->nocb_timer, jiffies + 1);
- WRITE_ONCE(rdp->nocb_defer_wakeup, waketype);
- trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
- raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+/*
+ * If the ->nocb_bypass_lock is immediately available, flush the
+ * ->nocb_bypass queue into ->cblist.
+ */
+static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
+{
+ rcu_lockdep_assert_cblist_protected(rdp);
+ if (!rcu_segcblist_is_offloaded(&rdp->cblist) ||
+ !rcu_nocb_bypass_trylock(rdp))
+ return;
+ WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
}
-/* Does rcu_barrier need to queue an RCU callback on the specified CPU? */
-static bool rcu_nocb_cpu_needs_barrier(int cpu)
+/*
+ * See whether it is appropriate to use the ->nocb_bypass list in order
+ * to control contention on ->nocb_lock. A limited number of direct
+ * enqueues are permitted into ->cblist per jiffy. If ->nocb_bypass
+ * is non-empty, further callbacks must be placed into ->nocb_bypass,
+ * otherwise rcu_barrier() breaks. Use rcu_nocb_flush_bypass() to switch
+ * back to direct use of ->cblist. However, ->nocb_bypass should not be
+ * used if ->cblist is empty, because otherwise callbacks can be stranded
+ * on ->nocb_bypass because we cannot count on the current CPU ever again
+ * invoking call_rcu(). The general rule is that if ->nocb_bypass is
+ * non-empty, the corresponding no-CBs grace-period kthread must not be
+ * in an indefinite sleep state.
+ *
+ * Finally, it is not permitted to use the bypass during early boot,
+ * as doing so would confuse the auto-initialization code. Besides
+ * which, there is no point in worrying about lock contention while
+ * there is only one CPU in operation.
+ */
+static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+ bool *was_alldone, unsigned long flags)
{
- struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
- unsigned long ret;
-#ifdef CONFIG_PROVE_RCU
- struct rcu_head *rhp;
-#endif /* #ifdef CONFIG_PROVE_RCU */
+ unsigned long c;
+ unsigned long cur_gp_seq;
+ unsigned long j = jiffies;
+ long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
- /*
- * Check count of all no-CBs callbacks awaiting invocation.
- * There needs to be a barrier before this function is called,
- * but associated with a prior determination that no more
- * callbacks would be posted. In the worst case, the first
- * barrier in rcu_barrier() suffices (but the caller cannot
- * necessarily rely on this, not a substitute for the caller
- * getting the concurrency design right!). There must also be a
- * barrier between the following load and posting of a callback
- * (if a callback is in fact needed). This is associated with an
- * atomic_inc() in the caller.
- */
- ret = rcu_get_n_cbs_nocb_cpu(rdp);
-
-#ifdef CONFIG_PROVE_RCU
- rhp = READ_ONCE(rdp->nocb_head);
- if (!rhp)
- rhp = READ_ONCE(rdp->nocb_gp_head);
- if (!rhp)
- rhp = READ_ONCE(rdp->nocb_follower_head);
-
- /* Having no rcuo kthread but CBs after scheduler starts is bad! */
- if (!READ_ONCE(rdp->nocb_kthread) && rhp &&
- rcu_scheduler_fully_active) {
- /* RCU callback enqueued before CPU first came online??? */
- pr_err("RCU: Never-onlined no-CBs CPU %d has CB %p\n",
- cpu, rhp->func);
- WARN_ON_ONCE(1);
+ if (!rcu_segcblist_is_offloaded(&rdp->cblist)) {
+ *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+ return false; /* Not offloaded, no bypassing. */
+ }
+ lockdep_assert_irqs_disabled();
+
+ // Don't use ->nocb_bypass during early boot.
+ if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) {
+ rcu_nocb_lock(rdp);
+ WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+ *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+ return false;
+ }
+
+ // If we have advanced to a new jiffy, reset counts to allow
+ // moving back from ->nocb_bypass to ->cblist.
+ if (j == rdp->nocb_nobypass_last) {
+ c = rdp->nocb_nobypass_count + 1;
+ } else {
+ WRITE_ONCE(rdp->nocb_nobypass_last, j);
+ c = rdp->nocb_nobypass_count - nocb_nobypass_lim_per_jiffy;
+ if (ULONG_CMP_LT(rdp->nocb_nobypass_count,
+ nocb_nobypass_lim_per_jiffy))
+ c = 0;
+ else if (c > nocb_nobypass_lim_per_jiffy)
+ c = nocb_nobypass_lim_per_jiffy;
+ }
+ WRITE_ONCE(rdp->nocb_nobypass_count, c);
+
+ // If there hasn't yet been all that many ->cblist enqueues
+ // this jiffy, tell the caller to enqueue onto ->cblist. But flush
+ // ->nocb_bypass first.
+ if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
+ rcu_nocb_lock(rdp);
+ *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+ if (*was_alldone)
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+ TPS("FirstQ"));
+ WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
+ WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+ return false; // Caller must enqueue the callback.
+ }
+
+ // If ->nocb_bypass has been used too long or is too full,
+ // flush ->nocb_bypass to ->cblist.
+ if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
+ ncbs >= qhimark) {
+ rcu_nocb_lock(rdp);
+ if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
+ *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+ if (*was_alldone)
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+ TPS("FirstQ"));
+ WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+ return false; // Caller must enqueue the callback.
+ }
+ if (j != rdp->nocb_gp_adv_time &&
+ rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+ rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
+ rcu_advance_cbs_nowake(rdp->mynode, rdp);
+ rdp->nocb_gp_adv_time = j;
+ }
+ rcu_nocb_unlock_irqrestore(rdp, flags);
+ return true; // Callback already enqueued.
}
-#endif /* #ifdef CONFIG_PROVE_RCU */
- return !!ret;
+ // We need to use the bypass.
+ rcu_nocb_wait_contended(rdp);
+ rcu_nocb_bypass_lock(rdp);
+ ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+ rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
+ rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
+ if (!ncbs) {
+ WRITE_ONCE(rdp->nocb_bypass_first, j);
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
+ }
+ rcu_nocb_bypass_unlock(rdp);
+ smp_mb(); /* Order enqueue before wake. */
+ if (ncbs) {
+ local_irq_restore(flags);
+ } else {
+ // No-CBs GP kthread might be indefinitely asleep, if so, wake.
+ rcu_nocb_lock(rdp); // Rare during call_rcu() flood.
+ if (!rcu_segcblist_pend_cbs(&rdp->cblist)) {
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+ TPS("FirstBQwake"));
+ __call_rcu_nocb_wake(rdp, true, flags);
+ } else {
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+ TPS("FirstBQnoWake"));
+ rcu_nocb_unlock_irqrestore(rdp, flags);
+ }
+ }
+ return true; // Callback already enqueued.
}
/*
- * Enqueue the specified string of rcu_head structures onto the specified
- * CPU's no-CBs lists. The CPU is specified by rdp, the head of the
- * string by rhp, and the tail of the string by rhtp. The non-lazy/lazy
- * counts are supplied by rhcount and rhcount_lazy.
+ * Awaken the no-CBs grace-period kthead if needed, either due to it
+ * legitimately being asleep or due to overload conditions.
*
* If warranted, also wake up the kthread servicing this CPUs queues.
*/
-static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
- struct rcu_head *rhp,
- struct rcu_head **rhtp,
- int rhcount, int rhcount_lazy,
- unsigned long flags)
+static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
+ unsigned long flags)
+ __releases(rdp->nocb_lock)
{
- int len;
- struct rcu_head **old_rhpp;
+ unsigned long cur_gp_seq;
+ unsigned long j;
+ long len;
struct task_struct *t;
- /* Enqueue the callback on the nocb list and update counts. */
- atomic_long_add(rhcount, &rdp->nocb_q_count);
- /* rcu_barrier() relies on ->nocb_q_count add before xchg. */
- old_rhpp = xchg(&rdp->nocb_tail, rhtp);
- WRITE_ONCE(*old_rhpp, rhp);
- atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy);
- smp_mb__after_atomic(); /* Store *old_rhpp before _wake test. */
-
- /* If we are not being polled and there is a kthread, awaken it ... */
- t = READ_ONCE(rdp->nocb_kthread);
+ // If we are being polled or there is no kthread, just leave.
+ t = READ_ONCE(rdp->nocb_gp_kthread);
if (rcu_nocb_poll || !t) {
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
TPS("WakeNotPoll"));
+ rcu_nocb_unlock_irqrestore(rdp, flags);
return;
}
- len = rcu_get_n_cbs_nocb_cpu(rdp);
- if (old_rhpp == &rdp->nocb_head) {
+ // Need to actually to a wakeup.
+ len = rcu_segcblist_n_cbs(&rdp->cblist);
+ if (was_alldone) {
+ rdp->qlen_last_fqs_check = len;
if (!irqs_disabled_flags(flags)) {
/* ... if queue was empty ... */
- wake_nocb_leader(rdp, false);
+ wake_nocb_gp(rdp, false, flags);
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
TPS("WakeEmpty"));
} else {
- wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE,
- TPS("WakeEmptyIsDeferred"));
+ wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE,
+ TPS("WakeEmptyIsDeferred"));
+ rcu_nocb_unlock_irqrestore(rdp, flags);
}
- rdp->qlen_last_fqs_check = 0;
} else if (len > rdp->qlen_last_fqs_check + qhimark) {
/* ... or if many callbacks queued. */
- if (!irqs_disabled_flags(flags)) {
- wake_nocb_leader(rdp, true);
- trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
- TPS("WakeOvf"));
- } else {
- wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE_FORCE,
- TPS("WakeOvfIsDeferred"));
+ rdp->qlen_last_fqs_check = len;
+ j = jiffies;
+ if (j != rdp->nocb_gp_adv_time &&
+ rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+ rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
+ rcu_advance_cbs_nowake(rdp->mynode, rdp);
+ rdp->nocb_gp_adv_time = j;
}
- rdp->qlen_last_fqs_check = LONG_MAX / 2;
+ smp_mb(); /* Enqueue before timer_pending(). */
+ if ((rdp->nocb_cb_sleep ||
+ !rcu_segcblist_ready_cbs(&rdp->cblist)) &&
+ !timer_pending(&rdp->nocb_bypass_timer))
+ wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE,
+ TPS("WakeOvfIsDeferred"));
+ rcu_nocb_unlock_irqrestore(rdp, flags);
} else {
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
+ rcu_nocb_unlock_irqrestore(rdp, flags);
}
return;
}
-/*
- * This is a helper for __call_rcu(), which invokes this when the normal
- * callback queue is inoperable. If this is not a no-CBs CPU, this
- * function returns failure back to __call_rcu(), which can complain
- * appropriately.
- *
- * Otherwise, this function queues the callback where the corresponding
- * "rcuo" kthread can find it.
- */
-static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
- bool lazy, unsigned long flags)
+/* Wake up the no-CBs GP kthread to flush ->nocb_bypass. */
+static void do_nocb_bypass_wakeup_timer(struct timer_list *t)
{
+ unsigned long flags;
+ struct rcu_data *rdp = from_timer(rdp, t, nocb_bypass_timer);
- if (!rcu_is_nocb_cpu(rdp->cpu))
- return false;
- __call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy, flags);
- if (__is_kfree_rcu_offset((unsigned long)rhp->func))
- trace_rcu_kfree_callback(rcu_state.name, rhp,
- (unsigned long)rhp->func,
- -atomic_long_read(&rdp->nocb_q_count_lazy),
- -rcu_get_n_cbs_nocb_cpu(rdp));
- else
- trace_rcu_callback(rcu_state.name, rhp,
- -atomic_long_read(&rdp->nocb_q_count_lazy),
- -rcu_get_n_cbs_nocb_cpu(rdp));
-
- /*
- * If called from an extended quiescent state with interrupts
- * disabled, invoke the RCU core in order to allow the idle-entry
- * deferred-wakeup check to function.
- */
- if (irqs_disabled_flags(flags) &&
- !rcu_is_watching() &&
- cpu_online(smp_processor_id()))
- invoke_rcu_core();
-
- return true;
-}
-
-/*
- * Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is
- * not a no-CBs CPU.
- */
-static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp,
- struct rcu_data *rdp,
- unsigned long flags)
-{
- lockdep_assert_irqs_disabled();
- if (!rcu_is_nocb_cpu(smp_processor_id()))
- return false; /* Not NOCBs CPU, caller must migrate CBs. */
- __call_rcu_nocb_enqueue(my_rdp, rcu_segcblist_head(&rdp->cblist),
- rcu_segcblist_tail(&rdp->cblist),
- rcu_segcblist_n_cbs(&rdp->cblist),
- rcu_segcblist_n_lazy_cbs(&rdp->cblist), flags);
- rcu_segcblist_init(&rdp->cblist);
- rcu_segcblist_disable(&rdp->cblist);
- return true;
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer"));
+ rcu_nocb_lock_irqsave(rdp, flags);
+ smp_mb__after_spinlock(); /* Timer expire before wakeup. */
+ __call_rcu_nocb_wake(rdp, true, flags);
}
/*
- * If necessary, kick off a new grace period, and either way wait
- * for a subsequent grace period to complete.
+ * No-CBs GP kthreads come here to wait for additional callbacks to show up
+ * or for grace periods to end.
*/
-static void rcu_nocb_wait_gp(struct rcu_data *rdp)
+static void nocb_gp_wait(struct rcu_data *my_rdp)
{
- unsigned long c;
- bool d;
+ bool bypass = false;
+ long bypass_ncbs;
+ int __maybe_unused cpu = my_rdp->cpu;
+ unsigned long cur_gp_seq;
unsigned long flags;
+ bool gotcbs;
+ unsigned long j = jiffies;
+ bool needwait_gp = false; // This prevents actual uninitialized use.
bool needwake;
- struct rcu_node *rnp = rdp->mynode;
+ bool needwake_gp;
+ struct rcu_data *rdp;
+ struct rcu_node *rnp;
+ unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning.
- local_irq_save(flags);
- c = rcu_seq_snap(&rcu_state.gp_seq);
- if (!rdp->gpwrap && ULONG_CMP_GE(rdp->gp_seq_needed, c)) {
- local_irq_restore(flags);
- } else {
- raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */
- needwake = rcu_start_this_gp(rnp, rdp, c);
- raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
- if (needwake)
+ /*
+ * Each pass through the following loop checks for CBs and for the
+ * nearest grace period (if any) to wait for next. The CB kthreads
+ * and the global grace-period kthread are awakened if needed.
+ */
+ for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) {
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
+ rcu_nocb_lock_irqsave(rdp, flags);
+ bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+ if (bypass_ncbs &&
+ (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
+ bypass_ncbs > 2 * qhimark)) {
+ // Bypass full or old, so flush it.
+ (void)rcu_nocb_try_flush_bypass(rdp, j);
+ bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+ } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
+ rcu_nocb_unlock_irqrestore(rdp, flags);
+ continue; /* No callbacks here, try next. */
+ }
+ if (bypass_ncbs) {
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+ TPS("Bypass"));
+ bypass = true;
+ }
+ rnp = rdp->mynode;
+ if (bypass) { // Avoid race with first bypass CB.
+ WRITE_ONCE(my_rdp->nocb_defer_wakeup,
+ RCU_NOCB_WAKE_NOT);
+ del_timer(&my_rdp->nocb_timer);
+ }
+ // Advance callbacks if helpful and low contention.
+ needwake_gp = false;
+ if (!rcu_segcblist_restempty(&rdp->cblist,
+ RCU_NEXT_READY_TAIL) ||
+ (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+ rcu_seq_done(&rnp->gp_seq, cur_gp_seq))) {
+ raw_spin_lock_rcu_node(rnp); /* irqs disabled. */
+ needwake_gp = rcu_advance_cbs(rnp, rdp);
+ raw_spin_unlock_rcu_node(rnp); /* irqs disabled. */
+ }
+ // Need to wait on some grace period?
+ WARN_ON_ONCE(!rcu_segcblist_restempty(&rdp->cblist,
+ RCU_NEXT_READY_TAIL));
+ if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) {
+ if (!needwait_gp ||
+ ULONG_CMP_LT(cur_gp_seq, wait_gp_seq))
+ wait_gp_seq = cur_gp_seq;
+ needwait_gp = true;
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+ TPS("NeedWaitGP"));
+ }
+ if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
+ needwake = rdp->nocb_cb_sleep;
+ WRITE_ONCE(rdp->nocb_cb_sleep, false);
+ smp_mb(); /* CB invocation -after- GP end. */
+ } else {
+ needwake = false;
+ }
+ rcu_nocb_unlock_irqrestore(rdp, flags);
+ if (needwake) {
+ swake_up_one(&rdp->nocb_cb_wq);
+ gotcbs = true;
+ }
+ if (needwake_gp)
rcu_gp_kthread_wake();
}
- /*
- * Wait for the grace period. Do so interruptibly to avoid messing
- * up the load average.
- */
- trace_rcu_this_gp(rnp, rdp, c, TPS("StartWait"));
- for (;;) {
+ my_rdp->nocb_gp_bypass = bypass;
+ my_rdp->nocb_gp_gp = needwait_gp;
+ my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
+ if (bypass && !rcu_nocb_poll) {
+ // At least one child with non-empty ->nocb_bypass, so set
+ // timer in order to avoid stranding its callbacks.
+ raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
+ mod_timer(&my_rdp->nocb_bypass_timer, j + 2);
+ raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
+ }
+ if (rcu_nocb_poll) {
+ /* Polling, so trace if first poll in the series. */
+ if (gotcbs)
+ trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll"));
+ schedule_timeout_interruptible(1);
+ } else if (!needwait_gp) {
+ /* Wait for callbacks to appear. */
+ trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep"));
+ swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq,
+ !READ_ONCE(my_rdp->nocb_gp_sleep));
+ trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep"));
+ } else {
+ rnp = my_rdp->mynode;
+ trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait"));
swait_event_interruptible_exclusive(
- rnp->nocb_gp_wq[rcu_seq_ctr(c) & 0x1],
- (d = rcu_seq_done(&rnp->gp_seq, c)));
- if (likely(d))
- break;
- WARN_ON(signal_pending(current));
- trace_rcu_this_gp(rnp, rdp, c, TPS("ResumeWait"));
+ rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1],
+ rcu_seq_done(&rnp->gp_seq, wait_gp_seq) ||
+ !READ_ONCE(my_rdp->nocb_gp_sleep));
+ trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait"));
}
- trace_rcu_this_gp(rnp, rdp, c, TPS("EndWait"));
- smp_mb(); /* Ensure that CB invocation happens after GP end. */
+ if (!rcu_nocb_poll) {
+ raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
+ if (bypass)
+ del_timer(&my_rdp->nocb_bypass_timer);
+ WRITE_ONCE(my_rdp->nocb_gp_sleep, true);
+ raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
+ }
+ my_rdp->nocb_gp_seq = -1;
+ WARN_ON(signal_pending(current));
}
/*
- * Leaders come here to wait for additional callbacks to show up.
- * This function does not return until callbacks appear.
+ * No-CBs grace-period-wait kthread. There is one of these per group
+ * of CPUs, but only once at least one CPU in that group has come online
+ * at least once since boot. This kthread checks for newly posted
+ * callbacks from any of the CPUs it is responsible for, waits for a
+ * grace period, then awakens all of the rcu_nocb_cb_kthread() instances
+ * that then have callback-invocation work to do.
*/
-static void nocb_leader_wait(struct rcu_data *my_rdp)
+static int rcu_nocb_gp_kthread(void *arg)
{
- bool firsttime = true;
- unsigned long flags;
- bool gotcbs;
- struct rcu_data *rdp;
- struct rcu_head **tail;
-
-wait_again:
-
- /* Wait for callbacks to appear. */
- if (!rcu_nocb_poll) {
- trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, TPS("Sleep"));
- swait_event_interruptible_exclusive(my_rdp->nocb_wq,
- !READ_ONCE(my_rdp->nocb_leader_sleep));
- raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
- my_rdp->nocb_leader_sleep = true;
- WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
- del_timer(&my_rdp->nocb_timer);
- raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
- } else if (firsttime) {
- firsttime = false; /* Don't drown trace log with "Poll"! */
- trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, TPS("Poll"));
- }
-
- /*
- * Each pass through the following loop checks a follower for CBs.
- * We are our own first follower. Any CBs found are moved to
- * nocb_gp_head, where they await a grace period.
- */
- gotcbs = false;
- smp_mb(); /* wakeup and _sleep before ->nocb_head reads. */
- for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
- rdp->nocb_gp_head = READ_ONCE(rdp->nocb_head);
- if (!rdp->nocb_gp_head)
- continue; /* No CBs here, try next follower. */
-
- /* Move callbacks to wait-for-GP list, which is empty. */
- WRITE_ONCE(rdp->nocb_head, NULL);
- rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
- gotcbs = true;
- }
-
- /* No callbacks? Sleep a bit if polling, and go retry. */
- if (unlikely(!gotcbs)) {
- WARN_ON(signal_pending(current));
- if (rcu_nocb_poll) {
- schedule_timeout_interruptible(1);
- } else {
- trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu,
- TPS("WokeEmpty"));
- }
- goto wait_again;
- }
+ struct rcu_data *rdp = arg;
- /* Wait for one grace period. */
- rcu_nocb_wait_gp(my_rdp);
-
- /* Each pass through the following loop wakes a follower, if needed. */
- for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
- if (!rcu_nocb_poll &&
- READ_ONCE(rdp->nocb_head) &&
- READ_ONCE(my_rdp->nocb_leader_sleep)) {
- raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
- my_rdp->nocb_leader_sleep = false;/* No need to sleep.*/
- raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
- }
- if (!rdp->nocb_gp_head)
- continue; /* No CBs, so no need to wake follower. */
-
- /* Append callbacks to follower's "done" list. */
- raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
- tail = rdp->nocb_follower_tail;
- rdp->nocb_follower_tail = rdp->nocb_gp_tail;
- *tail = rdp->nocb_gp_head;
- raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
- if (rdp != my_rdp && tail == &rdp->nocb_follower_head) {
- /* List was empty, so wake up the follower. */
- swake_up_one(&rdp->nocb_wq);
- }
+ for (;;) {
+ WRITE_ONCE(rdp->nocb_gp_loops, rdp->nocb_gp_loops + 1);
+ nocb_gp_wait(rdp);
+ cond_resched_tasks_rcu_qs();
}
-
- /* If we (the leader) don't have CBs, go wait some more. */
- if (!my_rdp->nocb_follower_head)
- goto wait_again;
+ return 0;
}
/*
- * Followers come here to wait for additional callbacks to show up.
- * This function does not return until callbacks appear.
+ * Invoke any ready callbacks from the corresponding no-CBs CPU,
+ * then, if there are no more, wait for more to appear.
*/
-static void nocb_follower_wait(struct rcu_data *rdp)
+static void nocb_cb_wait(struct rcu_data *rdp)
{
- for (;;) {
- trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FollowerSleep"));
- swait_event_interruptible_exclusive(rdp->nocb_wq,
- READ_ONCE(rdp->nocb_follower_head));
- if (smp_load_acquire(&rdp->nocb_follower_head)) {
- /* ^^^ Ensure CB invocation follows _head test. */
- return;
- }
- WARN_ON(signal_pending(current));
- trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
+ unsigned long cur_gp_seq;
+ unsigned long flags;
+ bool needwake_gp = false;
+ struct rcu_node *rnp = rdp->mynode;
+
+ local_irq_save(flags);
+ rcu_momentary_dyntick_idle();
+ local_irq_restore(flags);
+ local_bh_disable();
+ rcu_do_batch(rdp);
+ local_bh_enable();
+ lockdep_assert_irqs_enabled();
+ rcu_nocb_lock_irqsave(rdp, flags);
+ if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+ rcu_seq_done(&rnp->gp_seq, cur_gp_seq) &&
+ raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */
+ needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
+ raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
+ }
+ if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
+ rcu_nocb_unlock_irqrestore(rdp, flags);
+ if (needwake_gp)
+ rcu_gp_kthread_wake();
+ return;
+ }
+
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
+ WRITE_ONCE(rdp->nocb_cb_sleep, true);
+ rcu_nocb_unlock_irqrestore(rdp, flags);
+ if (needwake_gp)
+ rcu_gp_kthread_wake();
+ swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
+ !READ_ONCE(rdp->nocb_cb_sleep));
+ if (!smp_load_acquire(&rdp->nocb_cb_sleep)) { /* VVV */
+ /* ^^^ Ensure CB invocation follows _sleep test. */
+ return;
}
+ WARN_ON(signal_pending(current));
+ trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
}
/*
- * Per-rcu_data kthread, but only for no-CBs CPUs. Each kthread invokes
- * callbacks queued by the corresponding no-CBs CPU, however, there is
- * an optional leader-follower relationship so that the grace-period
- * kthreads don't have to do quite so many wakeups.
+ * Per-rcu_data kthread, but only for no-CBs CPUs. Repeatedly invoke
+ * nocb_cb_wait() to do the dirty work.
*/
-static int rcu_nocb_kthread(void *arg)
+static int rcu_nocb_cb_kthread(void *arg)
{
- int c, cl;
- unsigned long flags;
- struct rcu_head *list;
- struct rcu_head *next;
- struct rcu_head **tail;
struct rcu_data *rdp = arg;
- /* Each pass through this loop invokes one batch of callbacks */
+ // Each pass through this loop does one callback batch, and,
+ // if there are no more ready callbacks, waits for them.
for (;;) {
- /* Wait for callbacks. */
- if (rdp->nocb_leader == rdp)
- nocb_leader_wait(rdp);
- else
- nocb_follower_wait(rdp);
-
- /* Pull the ready-to-invoke callbacks onto local list. */
- raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
- list = rdp->nocb_follower_head;
- rdp->nocb_follower_head = NULL;
- tail = rdp->nocb_follower_tail;
- rdp->nocb_follower_tail = &rdp->nocb_follower_head;
- raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
- if (WARN_ON_ONCE(!list))
- continue;
- trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeNonEmpty"));
-
- /* Each pass through the following loop invokes a callback. */
- trace_rcu_batch_start(rcu_state.name,
- atomic_long_read(&rdp->nocb_q_count_lazy),
- rcu_get_n_cbs_nocb_cpu(rdp), -1);
- c = cl = 0;
- while (list) {
- next = list->next;
- /* Wait for enqueuing to complete, if needed. */
- while (next == NULL && &list->next != tail) {
- trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
- TPS("WaitQueue"));
- schedule_timeout_interruptible(1);
- trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
- TPS("WokeQueue"));
- next = list->next;
- }
- debug_rcu_head_unqueue(list);
- local_bh_disable();
- if (__rcu_reclaim(rcu_state.name, list))
- cl++;
- c++;
- local_bh_enable();
- cond_resched_tasks_rcu_qs();
- list = next;
- }
- trace_rcu_batch_end(rcu_state.name, c, !!list, 0, 0, 1);
- smp_mb__before_atomic(); /* _add after CB invocation. */
- atomic_long_add(-c, &rdp->nocb_q_count);
- atomic_long_add(-cl, &rdp->nocb_q_count_lazy);
+ nocb_cb_wait(rdp);
+ cond_resched_tasks_rcu_qs();
}
return 0;
}
@@ -1993,14 +2159,14 @@ static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp)
unsigned long flags;
int ndw;
- raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+ rcu_nocb_lock_irqsave(rdp, flags);
if (!rcu_nocb_need_deferred_wakeup(rdp)) {
- raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+ rcu_nocb_unlock_irqrestore(rdp, flags);
return;
}
ndw = READ_ONCE(rdp->nocb_defer_wakeup);
WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
- __wake_nocb_leader(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
+ wake_nocb_gp(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));
}
@@ -2027,6 +2193,7 @@ void __init rcu_init_nohz(void)
{
int cpu;
bool need_rcu_nocb_mask = false;
+ struct rcu_data *rdp;
#if defined(CONFIG_NO_HZ_FULL)
if (tick_nohz_full_running && cpumask_weight(tick_nohz_full_mask))
@@ -2060,67 +2227,63 @@ void __init rcu_init_nohz(void)
if (rcu_nocb_poll)
pr_info("\tPoll for callbacks from no-CBs CPUs.\n");
- for_each_cpu(cpu, rcu_nocb_mask)
- init_nocb_callback_list(per_cpu_ptr(&rcu_data, cpu));
+ for_each_cpu(cpu, rcu_nocb_mask) {
+ rdp = per_cpu_ptr(&rcu_data, cpu);
+ if (rcu_segcblist_empty(&rdp->cblist))
+ rcu_segcblist_init(&rdp->cblist);
+ rcu_segcblist_offload(&rdp->cblist);
+ }
rcu_organize_nocb_kthreads();
}
/* Initialize per-rcu_data variables for no-CBs CPUs. */
static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
{
- rdp->nocb_tail = &rdp->nocb_head;
- init_swait_queue_head(&rdp->nocb_wq);
- rdp->nocb_follower_tail = &rdp->nocb_follower_head;
+ init_swait_queue_head(&rdp->nocb_cb_wq);
+ init_swait_queue_head(&rdp->nocb_gp_wq);
raw_spin_lock_init(&rdp->nocb_lock);
+ raw_spin_lock_init(&rdp->nocb_bypass_lock);
+ raw_spin_lock_init(&rdp->nocb_gp_lock);
timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
+ timer_setup(&rdp->nocb_bypass_timer, do_nocb_bypass_wakeup_timer, 0);
+ rcu_cblist_init(&rdp->nocb_bypass);
}
/*
* If the specified CPU is a no-CBs CPU that does not already have its
- * rcuo kthread, spawn it. If the CPUs are brought online out of order,
- * this can require re-organizing the leader-follower relationships.
+ * rcuo CB kthread, spawn it. Additionally, if the rcuo GP kthread
+ * for this CPU's group has not yet been created, spawn it as well.
*/
static void rcu_spawn_one_nocb_kthread(int cpu)
{
- struct rcu_data *rdp;
- struct rcu_data *rdp_last;
- struct rcu_data *rdp_old_leader;
- struct rcu_data *rdp_spawn = per_cpu_ptr(&rcu_data, cpu);
+ struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+ struct rcu_data *rdp_gp;
struct task_struct *t;
/*
* If this isn't a no-CBs CPU or if it already has an rcuo kthread,
* then nothing to do.
*/
- if (!rcu_is_nocb_cpu(cpu) || rdp_spawn->nocb_kthread)
+ if (!rcu_is_nocb_cpu(cpu) || rdp->nocb_cb_kthread)
return;
- /* If we didn't spawn the leader first, reorganize! */
- rdp_old_leader = rdp_spawn->nocb_leader;
- if (rdp_old_leader != rdp_spawn && !rdp_old_leader->nocb_kthread) {
- rdp_last = NULL;
- rdp = rdp_old_leader;
- do {
- rdp->nocb_leader = rdp_spawn;
- if (rdp_last && rdp != rdp_spawn)
- rdp_last->nocb_next_follower = rdp;
- if (rdp == rdp_spawn) {
- rdp = rdp->nocb_next_follower;
- } else {
- rdp_last = rdp;
- rdp = rdp->nocb_next_follower;
- rdp_last->nocb_next_follower = NULL;
- }
- } while (rdp);
- rdp_spawn->nocb_next_follower = rdp_old_leader;
+ /* If we didn't spawn the GP kthread first, reorganize! */
+ rdp_gp = rdp->nocb_gp_rdp;
+ if (!rdp_gp->nocb_gp_kthread) {
+ t = kthread_run(rcu_nocb_gp_kthread, rdp_gp,
+ "rcuog/%d", rdp_gp->cpu);
+ if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__))
+ return;
+ WRITE_ONCE(rdp_gp->nocb_gp_kthread, t);
}
/* Spawn the kthread for this CPU. */
- t = kthread_run(rcu_nocb_kthread, rdp_spawn,
+ t = kthread_run(rcu_nocb_cb_kthread, rdp,
"rcuo%c/%d", rcu_state.abbr, cpu);
- if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo kthread, OOM is now expected behavior\n", __func__))
+ if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo CB kthread, OOM is now expected behavior\n", __func__))
return;
- WRITE_ONCE(rdp_spawn->nocb_kthread, t);
+ WRITE_ONCE(rdp->nocb_cb_kthread, t);
+ WRITE_ONCE(rdp->nocb_gp_kthread, rdp_gp->nocb_gp_kthread);
}
/*
@@ -2147,27 +2310,28 @@ static void __init rcu_spawn_nocb_kthreads(void)
rcu_spawn_cpu_nocb_kthread(cpu);
}
-/* How many follower CPU IDs per leader? Default of -1 for sqrt(nr_cpu_ids). */
-static int rcu_nocb_leader_stride = -1;
-module_param(rcu_nocb_leader_stride, int, 0444);
+/* How many CB CPU IDs per GP kthread? Default of -1 for sqrt(nr_cpu_ids). */
+static int rcu_nocb_gp_stride = -1;
+module_param(rcu_nocb_gp_stride, int, 0444);
/*
- * Initialize leader-follower relationships for all no-CBs CPU.
+ * Initialize GP-CB relationships for all no-CBs CPU.
*/
static void __init rcu_organize_nocb_kthreads(void)
{
int cpu;
- int ls = rcu_nocb_leader_stride;
- int nl = 0; /* Next leader. */
+ bool firsttime = true;
+ int ls = rcu_nocb_gp_stride;
+ int nl = 0; /* Next GP kthread. */
struct rcu_data *rdp;
- struct rcu_data *rdp_leader = NULL; /* Suppress misguided gcc warn. */
+ struct rcu_data *rdp_gp = NULL; /* Suppress misguided gcc warn. */
struct rcu_data *rdp_prev = NULL;
if (!cpumask_available(rcu_nocb_mask))
return;
if (ls == -1) {
- ls = int_sqrt(nr_cpu_ids);
- rcu_nocb_leader_stride = ls;
+ ls = nr_cpu_ids / int_sqrt(nr_cpu_ids);
+ rcu_nocb_gp_stride = ls;
}
/*
@@ -2178,39 +2342,24 @@ static void __init rcu_organize_nocb_kthreads(void)
for_each_cpu(cpu, rcu_nocb_mask) {
rdp = per_cpu_ptr(&rcu_data, cpu);
if (rdp->cpu >= nl) {
- /* New leader, set up for followers & next leader. */
+ /* New GP kthread, set up for CBs & next GP. */
nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
- rdp->nocb_leader = rdp;
- rdp_leader = rdp;
+ rdp->nocb_gp_rdp = rdp;
+ rdp_gp = rdp;
+ if (!firsttime && dump_tree)
+ pr_cont("\n");
+ firsttime = false;
+ pr_alert("%s: No-CB GP kthread CPU %d:", __func__, cpu);
} else {
- /* Another follower, link to previous leader. */
- rdp->nocb_leader = rdp_leader;
- rdp_prev->nocb_next_follower = rdp;
+ /* Another CB kthread, link to previous GP kthread. */
+ rdp->nocb_gp_rdp = rdp_gp;
+ rdp_prev->nocb_next_cb_rdp = rdp;
+ pr_alert(" %d", cpu);
}
rdp_prev = rdp;
}
}
-/* Prevent __call_rcu() from enqueuing callbacks on no-CBs CPUs */
-static bool init_nocb_callback_list(struct rcu_data *rdp)
-{
- if (!rcu_is_nocb_cpu(rdp->cpu))
- return false;
-
- /* If there are early-boot callbacks, move them to nocb lists. */
- if (!rcu_segcblist_empty(&rdp->cblist)) {
- rdp->nocb_head = rcu_segcblist_head(&rdp->cblist);
- rdp->nocb_tail = rcu_segcblist_tail(&rdp->cblist);
- atomic_long_set(&rdp->nocb_q_count,
- rcu_segcblist_n_cbs(&rdp->cblist));
- atomic_long_set(&rdp->nocb_q_count_lazy,
- rcu_segcblist_n_lazy_cbs(&rdp->cblist));
- rcu_segcblist_init(&rdp->cblist);
- }
- rcu_segcblist_disable(&rdp->cblist);
- return true;
-}
-
/*
* Bind the current task to the offloaded CPUs. If there are no offloaded
* CPUs, leave the task unbound. Splat if the bind attempt fails.
@@ -2223,20 +2372,101 @@ void rcu_bind_current_to_nocb(void)
EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb);
/*
- * Return the number of RCU callbacks still queued from the specified
- * CPU, which must be a nocbs CPU.
+ * Dump out nocb grace-period kthread state for the specified rcu_data
+ * structure.
*/
-static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp)
+static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
{
- return atomic_long_read(&rdp->nocb_q_count);
+ struct rcu_node *rnp = rdp->mynode;
+
+ pr_info("nocb GP %d %c%c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu\n",
+ rdp->cpu,
+ "kK"[!!rdp->nocb_gp_kthread],
+ "lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)],
+ "dD"[!!rdp->nocb_defer_wakeup],
+ "tT"[timer_pending(&rdp->nocb_timer)],
+ "bB"[timer_pending(&rdp->nocb_bypass_timer)],
+ "sS"[!!rdp->nocb_gp_sleep],
+ ".W"[swait_active(&rdp->nocb_gp_wq)],
+ ".W"[swait_active(&rnp->nocb_gp_wq[0])],
+ ".W"[swait_active(&rnp->nocb_gp_wq[1])],
+ ".B"[!!rdp->nocb_gp_bypass],
+ ".G"[!!rdp->nocb_gp_gp],
+ (long)rdp->nocb_gp_seq,
+ rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops));
+}
+
+/* Dump out nocb kthread state for the specified rcu_data structure. */
+static void show_rcu_nocb_state(struct rcu_data *rdp)
+{
+ struct rcu_segcblist *rsclp = &rdp->cblist;
+ bool waslocked;
+ bool wastimer;
+ bool wassleep;
+
+ if (rdp->nocb_gp_rdp == rdp)
+ show_rcu_nocb_gp_state(rdp);
+
+ pr_info(" CB %d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%c%c%c q%ld\n",
+ rdp->cpu, rdp->nocb_gp_rdp->cpu,
+ "kK"[!!rdp->nocb_cb_kthread],
+ "bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)],
+ "cC"[!!atomic_read(&rdp->nocb_lock_contended)],
+ "lL"[raw_spin_is_locked(&rdp->nocb_lock)],
+ "sS"[!!rdp->nocb_cb_sleep],
+ ".W"[swait_active(&rdp->nocb_cb_wq)],
+ jiffies - rdp->nocb_bypass_first,
+ jiffies - rdp->nocb_nobypass_last,
+ rdp->nocb_nobypass_count,
+ ".D"[rcu_segcblist_ready_cbs(rsclp)],
+ ".W"[!rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL)],
+ ".R"[!rcu_segcblist_restempty(rsclp, RCU_WAIT_TAIL)],
+ ".N"[!rcu_segcblist_restempty(rsclp, RCU_NEXT_READY_TAIL)],
+ ".B"[!!rcu_cblist_n_cbs(&rdp->nocb_bypass)],
+ rcu_segcblist_n_cbs(&rdp->cblist));
+
+ /* It is OK for GP kthreads to have GP state. */
+ if (rdp->nocb_gp_rdp == rdp)
+ return;
+
+ waslocked = raw_spin_is_locked(&rdp->nocb_gp_lock);
+ wastimer = timer_pending(&rdp->nocb_timer);
+ wassleep = swait_active(&rdp->nocb_gp_wq);
+ if (!rdp->nocb_defer_wakeup && !rdp->nocb_gp_sleep &&
+ !waslocked && !wastimer && !wassleep)
+ return; /* Nothing untowards. */
+
+ pr_info(" !!! %c%c%c%c %c\n",
+ "lL"[waslocked],
+ "dD"[!!rdp->nocb_defer_wakeup],
+ "tT"[wastimer],
+ "sS"[!!rdp->nocb_gp_sleep],
+ ".W"[wassleep]);
}
#else /* #ifdef CONFIG_RCU_NOCB_CPU */
-static bool rcu_nocb_cpu_needs_barrier(int cpu)
+/* No ->nocb_lock to acquire. */
+static void rcu_nocb_lock(struct rcu_data *rdp)
+{
+}
+
+/* No ->nocb_lock to release. */
+static void rcu_nocb_unlock(struct rcu_data *rdp)
{
- WARN_ON_ONCE(1); /* Should be dead code. */
- return false;
+}
+
+/* No ->nocb_lock to release. */
+static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
+ unsigned long flags)
+{
+ local_irq_restore(flags);
+}
+
+/* Lockdep check that ->cblist may be safely accessed. */
+static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
+{
+ lockdep_assert_irqs_disabled();
}
static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
@@ -2252,19 +2482,24 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)
{
}
-static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
- bool lazy, unsigned long flags)
+static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+ unsigned long j)
{
- return false;
+ return true;
}
-static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp,
- struct rcu_data *rdp,
- unsigned long flags)
+static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+ bool *was_alldone, unsigned long flags)
{
return false;
}
+static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
+ unsigned long flags)
+{
+ WARN_ON_ONCE(1); /* Should be dead code! */
+}
+
static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
{
}
@@ -2286,14 +2521,8 @@ static void __init rcu_spawn_nocb_kthreads(void)
{
}
-static bool init_nocb_callback_list(struct rcu_data *rdp)
-{
- return false;
-}
-
-static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp)
+static void show_rcu_nocb_state(struct rcu_data *rdp)
{
- return 0;
}
#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
diff --git a/kernel/rcu/tree_stall.h b/kernel/rcu/tree_stall.h
index 065183391f75..c0b8c458d8a6 100644
--- a/kernel/rcu/tree_stall.h
+++ b/kernel/rcu/tree_stall.h
@@ -163,7 +163,7 @@ static void rcu_iw_handler(struct irq_work *iwp)
//
// Printing RCU CPU stall warnings
-#ifdef CONFIG_PREEMPT
+#ifdef CONFIG_PREEMPTION
/*
* Dump detailed information for all tasks blocking the current RCU
@@ -215,7 +215,7 @@ static int rcu_print_task_stall(struct rcu_node *rnp)
return ndetected;
}
-#else /* #ifdef CONFIG_PREEMPT */
+#else /* #ifdef CONFIG_PREEMPTION */
/*
* Because preemptible RCU does not exist, we never have to check for
@@ -233,7 +233,7 @@ static int rcu_print_task_stall(struct rcu_node *rnp)
{
return 0;
}
-#endif /* #else #ifdef CONFIG_PREEMPT */
+#endif /* #else #ifdef CONFIG_PREEMPTION */
/*
* Dump stacks of all tasks running on stalled CPUs. First try using
@@ -527,6 +527,8 @@ static void check_cpu_stall(struct rcu_data *rdp)
/* We haven't checked in, so go dump stack. */
print_cpu_stall();
+ if (rcu_cpu_stall_ftrace_dump)
+ rcu_ftrace_dump(DUMP_ALL);
} else if (rcu_gp_in_progress() &&
ULONG_CMP_GE(j, js + RCU_STALL_RAT_DELAY) &&
@@ -534,6 +536,8 @@ static void check_cpu_stall(struct rcu_data *rdp)
/* They had a few time units to dump stack, so complain. */
print_other_cpu_stall(gs2);
+ if (rcu_cpu_stall_ftrace_dump)
+ rcu_ftrace_dump(DUMP_ALL);
}
}
@@ -585,6 +589,11 @@ void show_rcu_gp_kthreads(void)
cpu, (long)rdp->gp_seq_needed);
}
}
+ for_each_possible_cpu(cpu) {
+ rdp = per_cpu_ptr(&rcu_data, cpu);
+ if (rcu_segcblist_is_offloaded(&rdp->cblist))
+ show_rcu_nocb_state(rdp);
+ }
/* sched_show_task(rcu_state.gp_kthread); */
}
EXPORT_SYMBOL_GPL(show_rcu_gp_kthreads);
diff --git a/kernel/rcu/update.c b/kernel/rcu/update.c
index 61df2bf08563..1861103662db 100644
--- a/kernel/rcu/update.c
+++ b/kernel/rcu/update.c
@@ -61,9 +61,15 @@ module_param(rcu_normal_after_boot, int, 0);
#ifdef CONFIG_DEBUG_LOCK_ALLOC
/**
- * rcu_read_lock_sched_held() - might we be in RCU-sched read-side critical section?
+ * rcu_read_lock_held_common() - might we be in RCU-sched read-side critical section?
+ * @ret: Best guess answer if lockdep cannot be relied on
*
- * If CONFIG_DEBUG_LOCK_ALLOC is selected, returns nonzero iff in an
+ * Returns true if lockdep must be ignored, in which case *ret contains
+ * the best guess described below. Otherwise returns false, in which
+ * case *ret tells the caller nothing and the caller should instead
+ * consult lockdep.
+ *
+ * If CONFIG_DEBUG_LOCK_ALLOC is selected, set *ret to nonzero iff in an
* RCU-sched read-side critical section. In absence of
* CONFIG_DEBUG_LOCK_ALLOC, this assumes we are in an RCU-sched read-side
* critical section unless it can prove otherwise. Note that disabling
@@ -75,35 +81,45 @@ module_param(rcu_normal_after_boot, int, 0);
* Check debug_lockdep_rcu_enabled() to prevent false positives during boot
* and while lockdep is disabled.
*
- * Note that if the CPU is in the idle loop from an RCU point of
- * view (ie: that we are in the section between rcu_idle_enter() and
- * rcu_idle_exit()) then rcu_read_lock_held() returns false even if the CPU
- * did an rcu_read_lock(). The reason for this is that RCU ignores CPUs
- * that are in such a section, considering these as in extended quiescent
- * state, so such a CPU is effectively never in an RCU read-side critical
- * section regardless of what RCU primitives it invokes. This state of
- * affairs is required --- we need to keep an RCU-free window in idle
- * where the CPU may possibly enter into low power mode. This way we can
- * notice an extended quiescent state to other CPUs that started a grace
- * period. Otherwise we would delay any grace period as long as we run in
- * the idle task.
+ * Note that if the CPU is in the idle loop from an RCU point of view (ie:
+ * that we are in the section between rcu_idle_enter() and rcu_idle_exit())
+ * then rcu_read_lock_held() sets *ret to false even if the CPU did an
+ * rcu_read_lock(). The reason for this is that RCU ignores CPUs that are
+ * in such a section, considering these as in extended quiescent state,
+ * so such a CPU is effectively never in an RCU read-side critical section
+ * regardless of what RCU primitives it invokes. This state of affairs is
+ * required --- we need to keep an RCU-free window in idle where the CPU may
+ * possibly enter into low power mode. This way we can notice an extended
+ * quiescent state to other CPUs that started a grace period. Otherwise
+ * we would delay any grace period as long as we run in the idle task.
*
- * Similarly, we avoid claiming an SRCU read lock held if the current
+ * Similarly, we avoid claiming an RCU read lock held if the current
* CPU is offline.
*/
+static bool rcu_read_lock_held_common(bool *ret)
+{
+ if (!debug_lockdep_rcu_enabled()) {
+ *ret = 1;
+ return true;
+ }
+ if (!rcu_is_watching()) {
+ *ret = 0;
+ return true;
+ }
+ if (!rcu_lockdep_current_cpu_online()) {
+ *ret = 0;
+ return true;
+ }
+ return false;
+}
+
int rcu_read_lock_sched_held(void)
{
- int lockdep_opinion = 0;
+ bool ret;
- if (!debug_lockdep_rcu_enabled())
- return 1;
- if (!rcu_is_watching())
- return 0;
- if (!rcu_lockdep_current_cpu_online())
- return 0;
- if (debug_locks)
- lockdep_opinion = lock_is_held(&rcu_sched_lock_map);
- return lockdep_opinion || !preemptible();
+ if (rcu_read_lock_held_common(&ret))
+ return ret;
+ return lock_is_held(&rcu_sched_lock_map) || !preemptible();
}
EXPORT_SYMBOL(rcu_read_lock_sched_held);
#endif
@@ -136,8 +152,7 @@ static atomic_t rcu_expedited_nesting = ATOMIC_INIT(1);
*/
bool rcu_gp_is_expedited(void)
{
- return rcu_expedited || atomic_read(&rcu_expedited_nesting) ||
- rcu_scheduler_active == RCU_SCHEDULER_INIT;
+ return rcu_expedited || atomic_read(&rcu_expedited_nesting);
}
EXPORT_SYMBOL_GPL(rcu_gp_is_expedited);
@@ -261,12 +276,10 @@ NOKPROBE_SYMBOL(debug_lockdep_rcu_enabled);
*/
int rcu_read_lock_held(void)
{
- if (!debug_lockdep_rcu_enabled())
- return 1;
- if (!rcu_is_watching())
- return 0;
- if (!rcu_lockdep_current_cpu_online())
- return 0;
+ bool ret;
+
+ if (rcu_read_lock_held_common(&ret))
+ return ret;
return lock_is_held(&rcu_lock_map);
}
EXPORT_SYMBOL_GPL(rcu_read_lock_held);
@@ -288,16 +301,28 @@ EXPORT_SYMBOL_GPL(rcu_read_lock_held);
*/
int rcu_read_lock_bh_held(void)
{
- if (!debug_lockdep_rcu_enabled())
- return 1;
- if (!rcu_is_watching())
- return 0;
- if (!rcu_lockdep_current_cpu_online())
- return 0;
+ bool ret;
+
+ if (rcu_read_lock_held_common(&ret))
+ return ret;
return in_softirq() || irqs_disabled();
}
EXPORT_SYMBOL_GPL(rcu_read_lock_bh_held);
+int rcu_read_lock_any_held(void)
+{
+ bool ret;
+
+ if (rcu_read_lock_held_common(&ret))
+ return ret;
+ if (lock_is_held(&rcu_lock_map) ||
+ lock_is_held(&rcu_bh_lock_map) ||
+ lock_is_held(&rcu_sched_lock_map))
+ return 1;
+ return !preemptible();
+}
+EXPORT_SYMBOL_GPL(rcu_read_lock_any_held);
+
#endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
/**
@@ -437,6 +462,8 @@ EXPORT_SYMBOL_GPL(rcutorture_sched_setaffinity);
#endif
#ifdef CONFIG_RCU_STALL_COMMON
+int rcu_cpu_stall_ftrace_dump __read_mostly;
+module_param(rcu_cpu_stall_ftrace_dump, int, 0644);
int rcu_cpu_stall_suppress __read_mostly; /* 1 = suppress stall warnings. */
EXPORT_SYMBOL_GPL(rcu_cpu_stall_suppress);
module_param(rcu_cpu_stall_suppress, int, 0644);