From c2a8ec0778b2ca0d360ba9b5cac7fcd5ddfe798f Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Fri, 10 Mar 2017 15:31:55 -0800 Subject: srcu: Move to state-based grace-period sequencing The current SRCU grace-period processing might never reach the last portion of srcu_advance_batches(). This is OK given the current implementation, as the first portion, up to the try_check_zero() following the srcu_flip() is sufficient to drive grace periods forward. However, it has the unfortunate side-effect of making it impossible to determine when a given grace period has ended, and it will be necessary to efficiently trace ends of grace periods in order to efficiently handle per-CPU SRCU callback lists. This commit therefore adds states to the SRCU grace-period processing, so that the end of a given SRCU grace period is marked by the transition to the SRCU_STATE_DONE state. Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'include/linux/srcu.h') diff --git a/include/linux/srcu.h b/include/linux/srcu.h index a598cf3ac70c..f149a685896c 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -48,7 +48,7 @@ struct srcu_struct { unsigned long completed; struct srcu_array __percpu *per_cpu_ref; spinlock_t queue_lock; /* protect ->batch_queue, ->running */ - bool running; + int srcu_state; /* callbacks just queued */ struct rcu_batch batch_queue; /* callbacks try to do the first check_zero */ @@ -62,6 +62,12 @@ struct srcu_struct { #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ }; +/* Values for -> state variable. */ +#define SRCU_STATE_IDLE 0 +#define SRCU_STATE_SCAN1 1 +#define SRCU_STATE_SCAN2 2 +#define SRCU_STATE_DONE 3 + #ifdef CONFIG_DEBUG_LOCK_ALLOC int __init_srcu_struct(struct srcu_struct *sp, const char *name, @@ -89,7 +95,7 @@ void process_srcu(struct work_struct *work); .completed = -300, \ .per_cpu_ref = &name##_srcu_array, \ .queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \ - .running = false, \ + .srcu_state = SRCU_STATE_IDLE, \ .batch_queue = RCU_BATCH_INIT(name.batch_queue), \ .batch_check0 = RCU_BATCH_INIT(name.batch_check0), \ .batch_check1 = RCU_BATCH_INIT(name.batch_check1), \ -- cgit v1.2.3 From ac367c1c621b75689f6d5cd8301d364ba2c9f292 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Sat, 11 Mar 2017 07:14:06 -0800 Subject: srcu: Add grace-period sequence numbers This commit adds grace-period sequence numbers, which will be used to handle mid-boot grace periods and per-CPU callback lists. Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 1 + kernel/rcu/srcu.c | 27 +++++++++++++++++++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) (limited to 'include/linux/srcu.h') diff --git a/include/linux/srcu.h b/include/linux/srcu.h index f149a685896c..047ac8c28a4e 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -46,6 +46,7 @@ struct rcu_batch { struct srcu_struct { unsigned long completed; + unsigned long srcu_gp_seq; struct srcu_array __percpu *per_cpu_ref; spinlock_t queue_lock; /* protect ->batch_queue, ->running */ int srcu_state; diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c index 70286f68f3a1..d464986d82b6 100644 --- a/kernel/rcu/srcu.c +++ b/kernel/rcu/srcu.c @@ -110,6 +110,7 @@ static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from) static int init_srcu_struct_fields(struct srcu_struct *sp) { sp->completed = 0; + sp->srcu_gp_seq = 0; spin_lock_init(&sp->queue_lock); sp->srcu_state = SRCU_STATE_IDLE; rcu_batch_init(&sp->batch_queue); @@ -318,6 +319,15 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); #define SYNCHRONIZE_SRCU_TRYCOUNT 2 #define SYNCHRONIZE_SRCU_EXP_TRYCOUNT 12 +/* + * Start an SRCU grace period. + */ +static void srcu_gp_start(struct srcu_struct *sp) +{ + WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1); + rcu_seq_start(&sp->srcu_gp_seq); +} + /* * @@@ Wait until all pre-existing readers complete. Such readers * will have used the index specified by "idx". @@ -354,6 +364,15 @@ static void srcu_flip(struct srcu_struct *sp) smp_mb(); /* D */ /* Pairs with C. */ } +/* + * End an SRCU grace period. + */ +static void srcu_gp_end(struct srcu_struct *sp) +{ + rcu_seq_end(&sp->srcu_gp_seq); + WRITE_ONCE(sp->srcu_state, SRCU_STATE_DONE); +} + /* * Enqueue an SRCU callback on the specified srcu_struct structure, * initiating grace-period processing if it is not already running. @@ -392,7 +411,7 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head, smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ rcu_batch_queue(&sp->batch_queue, head); if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) { - WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1); + srcu_gp_start(sp); queue_delayed_work(system_power_efficient_wq, &sp->work, 0); } spin_unlock_irqrestore(&sp->queue_lock, flags); @@ -426,7 +445,7 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount) smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) { /* steal the processing owner */ - WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1); + srcu_gp_start(sp); rcu_batch_queue(&sp->batch_check0, head); spin_unlock_irq(&sp->queue_lock); /* give the processing owner to work_struct */ @@ -561,7 +580,7 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) */ if (sp->srcu_state == SRCU_STATE_DONE) - WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1); + srcu_gp_start(sp); if (sp->srcu_state == SRCU_STATE_SCAN1) { idx = 1 ^ (sp->completed & 1); @@ -608,7 +627,7 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) */ rcu_batch_move(&sp->batch_done, &sp->batch_check1); - WRITE_ONCE(sp->srcu_state, SRCU_STATE_DONE); + srcu_gp_end(sp); } } -- cgit v1.2.3 From 8660b7d8a545227fd9ee80508aa82528ea9947d7 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Mon, 13 Mar 2017 16:48:18 -0700 Subject: srcu: Use rcu_segcblist to track SRCU callbacks This commit switches SRCU from custom-built callback queues to the new rcu_segcblist structure. This change associates grace-period sequence numbers with groups of callbacks, which will be needed for efficient processing of per-CPU callbacks. Signed-off-by: Paul E. McKenney --- include/linux/rcu_segcblist.h | 678 ++++++++++++++++++++++++++++++++++++++++++ include/linux/srcu.h | 24 +- kernel/rcu/rcu.h | 6 + kernel/rcu/rcu_segcblist.h | 670 ----------------------------------------- kernel/rcu/srcu.c | 159 ++-------- kernel/rcu/tree.h | 2 +- 6 files changed, 721 insertions(+), 818 deletions(-) create mode 100644 include/linux/rcu_segcblist.h delete mode 100644 kernel/rcu/rcu_segcblist.h (limited to 'include/linux/srcu.h') diff --git a/include/linux/rcu_segcblist.h b/include/linux/rcu_segcblist.h new file mode 100644 index 000000000000..74b1e7243955 --- /dev/null +++ b/include/linux/rcu_segcblist.h @@ -0,0 +1,678 @@ +/* + * RCU segmented callback lists + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, you can access it online at + * http://www.gnu.org/licenses/gpl-2.0.html. + * + * Copyright IBM Corporation, 2017 + * + * Authors: Paul E. McKenney + */ + +#ifndef __KERNEL_RCU_SEGCBLIST_H +#define __KERNEL_RCU_SEGCBLIST_H + +/* Simple unsegmented callback lists. */ +struct rcu_cblist { + struct rcu_head *head; + struct rcu_head **tail; + long len; + long len_lazy; +}; + +#define RCU_CBLIST_INITIALIZER(n) { .head = NULL, .tail = &n.head } + +/* Initialize simple callback list. */ +static inline void rcu_cblist_init(struct rcu_cblist *rclp) +{ + rclp->head = NULL; + rclp->tail = &rclp->head; + rclp->len = 0; + rclp->len_lazy = 0; +} + +/* Is simple callback list empty? */ +static inline bool rcu_cblist_empty(struct rcu_cblist *rclp) +{ + return !rclp->head; +} + +/* Return number of callbacks in simple callback list. */ +static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp) +{ + return rclp->len; +} + +/* Return number of lazy callbacks in simple callback list. */ +static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp) +{ + return rclp->len_lazy; +} + +/* + * Debug function to actually count the number of callbacks. + * If the number exceeds the limit specified, return -1. + */ +static inline long rcu_cblist_count_cbs(struct rcu_cblist *rclp, long lim) +{ + int cnt = 0; + struct rcu_head **rhpp = &rclp->head; + + for (;;) { + if (!*rhpp) + return cnt; + if (++cnt > lim) + return -1; + rhpp = &(*rhpp)->next; + } +} + +/* + * Dequeue the oldest rcu_head structure from the specified callback + * list. This function assumes that the callback is non-lazy, but + * the caller can later invoke rcu_cblist_dequeued_lazy() if it + * finds otherwise (and if it cares about laziness). This allows + * different users to have different ways of determining laziness. + */ +static inline struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp) +{ + struct rcu_head *rhp; + + rhp = rclp->head; + if (!rhp) + return NULL; + rclp->len--; + rclp->head = rhp->next; + if (!rclp->head) + rclp->tail = &rclp->head; + return rhp; +} + +/* + * Account for the fact that a previously dequeued callback turned out + * to be marked as lazy. + */ +static inline void rcu_cblist_dequeued_lazy(struct rcu_cblist *rclp) +{ + rclp->len_lazy--; +} + +/* + * Interim function to return rcu_cblist head pointer. Longer term, the + * rcu_cblist will be used more pervasively, removing the need for this + * function. + */ +static inline struct rcu_head *rcu_cblist_head(struct rcu_cblist *rclp) +{ + return rclp->head; +} + +/* + * Interim function to return rcu_cblist head pointer. Longer term, the + * rcu_cblist will be used more pervasively, removing the need for this + * function. + */ +static inline struct rcu_head **rcu_cblist_tail(struct rcu_cblist *rclp) +{ + WARN_ON_ONCE(rcu_cblist_empty(rclp)); + return rclp->tail; +} + +/* Complicated segmented callback lists. ;-) */ + +/* + * Index values for segments in rcu_segcblist structure. + * + * The segments are as follows: + * + * [head, *tails[RCU_DONE_TAIL]): + * Callbacks whose grace period has elapsed, and thus can be invoked. + * [*tails[RCU_DONE_TAIL], *tails[RCU_WAIT_TAIL]): + * Callbacks waiting for the current GP from the current CPU's viewpoint. + * [*tails[RCU_WAIT_TAIL], *tails[RCU_NEXT_READY_TAIL]): + * Callbacks that arrived before the next GP started, again from + * the current CPU's viewpoint. These can be handled by the next GP. + * [*tails[RCU_NEXT_READY_TAIL], *tails[RCU_NEXT_TAIL]): + * Callbacks that might have arrived after the next GP started. + * There is some uncertainty as to when a given GP starts and + * ends, but a CPU knows the exact times if it is the one starting + * or ending the GP. Other CPUs know that the previous GP ends + * before the next one starts. + * + * Note that RCU_WAIT_TAIL cannot be empty unless RCU_NEXT_READY_TAIL is also + * empty. + * + * The ->gp_seq[] array contains the grace-period number at which the + * corresponding segment of callbacks will be ready to invoke. A given + * element of this array is meaningful only when the corresponding segment + * is non-empty, and it is never valid for RCU_DONE_TAIL (whose callbacks + * are already ready to invoke) or for RCU_NEXT_TAIL (whose callbacks have + * not yet been assigned a grace-period number). + */ +#define RCU_DONE_TAIL 0 /* Also RCU_WAIT head. */ +#define RCU_WAIT_TAIL 1 /* Also RCU_NEXT_READY head. */ +#define RCU_NEXT_READY_TAIL 2 /* Also RCU_NEXT head. */ +#define RCU_NEXT_TAIL 3 +#define RCU_CBLIST_NSEGS 4 + +struct rcu_segcblist { + struct rcu_head *head; + struct rcu_head **tails[RCU_CBLIST_NSEGS]; + unsigned long gp_seq[RCU_CBLIST_NSEGS]; + long len; + long len_lazy; +}; + +#define RCU_SEGCBLIST_INITIALIZER(n) \ +{ \ + .head = NULL, \ + .tails[RCU_DONE_TAIL] = &n.head, \ + .tails[RCU_WAIT_TAIL] = &n.head, \ + .tails[RCU_NEXT_READY_TAIL] = &n.head, \ + .tails[RCU_NEXT_TAIL] = &n.head, \ +} + +/* + * Initialize an rcu_segcblist structure. + */ +static inline void rcu_segcblist_init(struct rcu_segcblist *rsclp) +{ + int i; + + BUILD_BUG_ON(RCU_NEXT_TAIL + 1 != ARRAY_SIZE(rsclp->gp_seq)); + BUILD_BUG_ON(ARRAY_SIZE(rsclp->tails) != ARRAY_SIZE(rsclp->gp_seq)); + rsclp->head = NULL; + for (i = 0; i < RCU_CBLIST_NSEGS; i++) + rsclp->tails[i] = &rsclp->head; + rsclp->len = 0; + rsclp->len_lazy = 0; +} + +/* + * Is the specified rcu_segcblist structure empty? + * + * But careful! The fact that the ->head field is NULL does not + * necessarily imply that there are no callbacks associated with + * this structure. When callbacks are being invoked, they are + * removed as a group. If callback invocation must be preempted, + * the remaining callbacks will be added back to the list. Either + * way, the counts are updated later. + * + * So it is often the case that rcu_segcblist_n_cbs() should be used + * instead. + */ +static inline bool rcu_segcblist_empty(struct rcu_segcblist *rsclp) +{ + return !rsclp->head; +} + +/* Return number of callbacks in segmented callback list. */ +static inline long rcu_segcblist_n_cbs(struct rcu_segcblist *rsclp) +{ + return READ_ONCE(rsclp->len); +} + +/* Return number of lazy callbacks in segmented callback list. */ +static inline long rcu_segcblist_n_lazy_cbs(struct rcu_segcblist *rsclp) +{ + return rsclp->len_lazy; +} + +/* Return number of lazy callbacks in segmented callback list. */ +static inline long rcu_segcblist_n_nonlazy_cbs(struct rcu_segcblist *rsclp) +{ + return rsclp->len - rsclp->len_lazy; +} + +/* + * Is the specified rcu_segcblist enabled, for example, not corresponding + * to an offline or callback-offloaded CPU? + */ +static inline bool rcu_segcblist_is_enabled(struct rcu_segcblist *rsclp) +{ + return !!rsclp->tails[RCU_NEXT_TAIL]; +} + +/* + * Disable the specified rcu_segcblist structure, so that callbacks can + * no longer be posted to it. This structure must be empty. + */ +static inline void rcu_segcblist_disable(struct rcu_segcblist *rsclp) +{ + WARN_ON_ONCE(!rcu_segcblist_empty(rsclp)); + WARN_ON_ONCE(rcu_segcblist_n_cbs(rsclp)); + WARN_ON_ONCE(rcu_segcblist_n_lazy_cbs(rsclp)); + rsclp->tails[RCU_NEXT_TAIL] = NULL; +} + +/* + * Is the specified segment of the specified rcu_segcblist structure + * empty of callbacks? + */ +static inline bool rcu_segcblist_segempty(struct rcu_segcblist *rsclp, int seg) +{ + if (seg == RCU_DONE_TAIL) + return &rsclp->head == rsclp->tails[RCU_DONE_TAIL]; + return rsclp->tails[seg - 1] == rsclp->tails[seg]; +} + +/* + * Are all segments following the specified segment of the specified + * rcu_segcblist structure empty of callbacks? (The specified + * segment might well contain callbacks.) + */ +static inline bool rcu_segcblist_restempty(struct rcu_segcblist *rsclp, int seg) +{ + return !*rsclp->tails[seg]; +} + +/* + * Does the specified rcu_segcblist structure contain callbacks that + * are ready to be invoked? + */ +static inline bool rcu_segcblist_ready_cbs(struct rcu_segcblist *rsclp) +{ + return rcu_segcblist_is_enabled(rsclp) && + &rsclp->head != rsclp->tails[RCU_DONE_TAIL]; +} + +/* + * Does the specified rcu_segcblist structure contain callbacks that + * are still pending, that is, not yet ready to be invoked? + */ +static inline bool rcu_segcblist_pend_cbs(struct rcu_segcblist *rsclp) +{ + return rcu_segcblist_is_enabled(rsclp) && + !rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL); +} + +/* + * Dequeue and return the first ready-to-invoke callback. If there + * are no ready-to-invoke callbacks, return NULL. Disables interrupts + * to avoid interference. Does not protect from interference from other + * CPUs or tasks. + */ +static inline struct rcu_head * +rcu_segcblist_dequeue(struct rcu_segcblist *rsclp) +{ + unsigned long flags; + int i; + struct rcu_head *rhp; + + local_irq_save(flags); + if (!rcu_segcblist_ready_cbs(rsclp)) { + local_irq_restore(flags); + return NULL; + } + rhp = rsclp->head; + BUG_ON(!rhp); + rsclp->head = rhp->next; + for (i = RCU_DONE_TAIL; i < RCU_CBLIST_NSEGS; i++) { + if (rsclp->tails[i] != &rhp->next) + break; + rsclp->tails[i] = &rsclp->head; + } + smp_mb(); /* Dequeue before decrement for rcu_barrier(). */ + WRITE_ONCE(rsclp->len, rsclp->len - 1); + local_irq_restore(flags); + return rhp; +} + +/* + * Account for the fact that a previously dequeued callback turned out + * to be marked as lazy. + */ +static inline void rcu_segcblist_dequeued_lazy(struct rcu_segcblist *rsclp) +{ + unsigned long flags; + + local_irq_save(flags); + rsclp->len_lazy--; + local_irq_restore(flags); +} + +/* + * Return a pointer to the first callback in the specified rcu_segcblist + * structure. This is useful for diagnostics. + */ +static inline struct rcu_head * +rcu_segcblist_first_cb(struct rcu_segcblist *rsclp) +{ + if (rcu_segcblist_is_enabled(rsclp)) + return rsclp->head; + return NULL; +} + +/* + * Return a pointer to the first pending callback in the specified + * rcu_segcblist structure. This is useful just after posting a given + * callback -- if that callback is the first pending callback, then + * you cannot rely on someone else having already started up the required + * grace period. + */ +static inline struct rcu_head * +rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp) +{ + if (rcu_segcblist_is_enabled(rsclp)) + return *rsclp->tails[RCU_DONE_TAIL]; + return NULL; +} + +/* + * Does the specified rcu_segcblist structure contain callbacks that + * have not yet been processed beyond having been posted, that is, + * does it contain callbacks in its last segment? + */ +static inline bool rcu_segcblist_new_cbs(struct rcu_segcblist *rsclp) +{ + return rcu_segcblist_is_enabled(rsclp) && + !rcu_segcblist_restempty(rsclp, RCU_NEXT_READY_TAIL); +} + +/* + * Enqueue the specified callback onto the specified rcu_segcblist + * structure, updating accounting as needed. Note that the ->len + * field may be accessed locklessly, hence the WRITE_ONCE(). + * The ->len field is used by rcu_barrier() and friends to determine + * if it must post a callback on this structure, and it is OK + * for rcu_barrier() to sometimes post callbacks needlessly, but + * absolutely not OK for it to ever miss posting a callback. + */ +static inline void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp, + struct rcu_head *rhp, bool lazy) +{ + WRITE_ONCE(rsclp->len, rsclp->len + 1); /* ->len sampled locklessly. */ + if (lazy) + rsclp->len_lazy++; + smp_mb(); /* Ensure counts are updated before callback is enqueued. */ + rhp->next = NULL; + *rsclp->tails[RCU_NEXT_TAIL] = rhp; + rsclp->tails[RCU_NEXT_TAIL] = &rhp->next; +} + +/* + * Extract only the counts from the specified rcu_segcblist structure, + * and place them in the specified rcu_cblist structure. This function + * supports both callback orphaning and invocation, hence the separation + * of counts and callbacks. (Callbacks ready for invocation must be + * orphaned and adopted separately from pending callbacks, but counts + * apply to all callbacks. Locking must be used to make sure that + * both orphaned-callbacks lists are consistent.) + */ +static inline void rcu_segcblist_extract_count(struct rcu_segcblist *rsclp, + struct rcu_cblist *rclp) +{ + rclp->len_lazy += rsclp->len_lazy; + rclp->len += rsclp->len; + rsclp->len_lazy = 0; + WRITE_ONCE(rsclp->len, 0); /* ->len sampled locklessly. */ +} + +/* + * Extract only those callbacks ready to be invoked from the specified + * rcu_segcblist structure and place them in the specified rcu_cblist + * structure. + */ +static inline void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp, + struct rcu_cblist *rclp) +{ + int i; + + if (!rcu_segcblist_ready_cbs(rsclp)) + return; /* Nothing to do. */ + *rclp->tail = rsclp->head; + rsclp->head = *rsclp->tails[RCU_DONE_TAIL]; + *rsclp->tails[RCU_DONE_TAIL] = NULL; + rclp->tail = rsclp->tails[RCU_DONE_TAIL]; + for (i = RCU_CBLIST_NSEGS - 1; i >= RCU_DONE_TAIL; i--) + if (rsclp->tails[i] == rsclp->tails[RCU_DONE_TAIL]) + rsclp->tails[i] = &rsclp->head; +} + +/* + * Extract only those callbacks still pending (not yet ready to be + * invoked) from the specified rcu_segcblist structure and place them in + * the specified rcu_cblist structure. Note that this loses information + * about any callbacks that might have been partway done waiting for + * their grace period. Too bad! They will have to start over. + */ +static inline void +rcu_segcblist_extract_pend_cbs(struct rcu_segcblist *rsclp, + struct rcu_cblist *rclp) +{ + int i; + + if (!rcu_segcblist_pend_cbs(rsclp)) + return; /* Nothing to do. */ + *rclp->tail = *rsclp->tails[RCU_DONE_TAIL]; + rclp->tail = rsclp->tails[RCU_NEXT_TAIL]; + *rsclp->tails[RCU_DONE_TAIL] = NULL; + for (i = RCU_DONE_TAIL + 1; i < RCU_CBLIST_NSEGS; i++) + rsclp->tails[i] = rsclp->tails[RCU_DONE_TAIL]; +} + +/* + * Move the entire contents of the specified rcu_segcblist structure, + * counts, callbacks, and all, to the specified rcu_cblist structure. + * @@@ Why do we need this??? Moving early-boot CBs to NOCB lists? + * @@@ Memory barrier needed? (Not if only used at boot time...) + */ +static inline void rcu_segcblist_extract_all(struct rcu_segcblist *rsclp, + struct rcu_cblist *rclp) +{ + rcu_segcblist_extract_done_cbs(rsclp, rclp); + rcu_segcblist_extract_pend_cbs(rsclp, rclp); + rcu_segcblist_extract_count(rsclp, rclp); +} + +/* + * Insert counts from the specified rcu_cblist structure in the + * specified rcu_segcblist structure. + */ +static inline void rcu_segcblist_insert_count(struct rcu_segcblist *rsclp, + struct rcu_cblist *rclp) +{ + rsclp->len_lazy += rclp->len_lazy; + /* ->len sampled locklessly. */ + WRITE_ONCE(rsclp->len, rsclp->len + rclp->len); + rclp->len_lazy = 0; + rclp->len = 0; +} + +/* + * Move callbacks from the specified rcu_cblist to the beginning of the + * done-callbacks segment of the specified rcu_segcblist. + */ +static inline void rcu_segcblist_insert_done_cbs(struct rcu_segcblist *rsclp, + struct rcu_cblist *rclp) +{ + int i; + + if (!rclp->head) + return; /* No callbacks to move. */ + *rclp->tail = rsclp->head; + rsclp->head = rclp->head; + for (i = RCU_DONE_TAIL; i < RCU_CBLIST_NSEGS; i++) + if (&rsclp->head == rsclp->tails[i]) + rsclp->tails[i] = rclp->tail; + else + break; + rclp->head = NULL; + rclp->tail = &rclp->head; +} + +/* + * Move callbacks from the specified rcu_cblist to the end of the + * new-callbacks segment of the specified rcu_segcblist. + */ +static inline void rcu_segcblist_insert_pend_cbs(struct rcu_segcblist *rsclp, + struct rcu_cblist *rclp) +{ + if (!rclp->head) + return; /* Nothing to do. */ + *rsclp->tails[RCU_NEXT_TAIL] = rclp->head; + rsclp->tails[RCU_NEXT_TAIL] = rclp->tail; + rclp->head = NULL; + rclp->tail = &rclp->head; +} + +/* + * Advance the callbacks in the specified rcu_segcblist structure based + * on the current value passed in for the grace-period counter. + */ +static inline void rcu_segcblist_advance(struct rcu_segcblist *rsclp, + unsigned long seq) +{ + int i, j; + + WARN_ON_ONCE(!rcu_segcblist_is_enabled(rsclp)); + WARN_ON_ONCE(rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL)); + + /* + * Find all callbacks whose ->gp_seq numbers indicate that they + * are ready to invoke, and put them into the RCU_DONE_TAIL segment. + */ + for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) { + if (ULONG_CMP_LT(seq, rsclp->gp_seq[i])) + break; + rsclp->tails[RCU_DONE_TAIL] = rsclp->tails[i]; + } + + /* If no callbacks moved, nothing more need be done. */ + if (i == RCU_WAIT_TAIL) + return; + + /* Clean up tail pointers that might have been misordered above. */ + for (j = RCU_WAIT_TAIL; j < i; j++) + rsclp->tails[j] = rsclp->tails[RCU_DONE_TAIL]; + + /* + * Callbacks moved, so clean up the misordered ->tails[] pointers + * that now point into the middle of the list of ready-to-invoke + * callbacks. The overall effect is to copy down the later pointers + * into the gap that was created by the now-ready segments. + */ + for (j = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++, j++) { + if (rsclp->tails[j] == rsclp->tails[RCU_NEXT_TAIL]) + break; /* No more callbacks. */ + rsclp->tails[j] = rsclp->tails[i]; + rsclp->gp_seq[j] = rsclp->gp_seq[i]; + } +} + +/* + * "Accelerate" callbacks based on more-accurate grace-period information. + * The reason for this is that RCU does not synchronize the beginnings and + * ends of grace periods, and that callbacks are posted locally. This in + * turn means that the callbacks must be labelled conservatively early + * on, as getting exact information would degrade both performance and + * scalability. When more accurate grace-period information becomes + * available, previously posted callbacks can be "accelerated", marking + * them to complete at the end of the earlier grace period. + * + * This function operates on an rcu_segcblist structure, and also the + * grace-period sequence number at which new callbacks would become + * ready to invoke. + */ +static inline bool rcu_segcblist_accelerate(struct rcu_segcblist *rsclp, + unsigned long seq) +{ + int i; + + WARN_ON_ONCE(!rcu_segcblist_is_enabled(rsclp)); + WARN_ON_ONCE(rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL)); + + /* + * Find the segment preceding the oldest segment of callbacks + * whose ->gp_seq[] completion is at or after that passed in via + * "seq", skipping any empty segments. This oldest segment, along + * with any later segments, can be merged in with any newly arrived + * callbacks in the RCU_NEXT_TAIL segment, and assigned "seq" + * as their ->gp_seq[] grace-period completion sequence number. + */ + for (i = RCU_NEXT_READY_TAIL; i > RCU_DONE_TAIL; i--) + if (rsclp->tails[i] != rsclp->tails[i - 1] && + ULONG_CMP_LT(rsclp->gp_seq[i], seq)) + break; + + /* + * If all the segments contain callbacks that correspond to + * earlier grace-period sequence numbers than "seq", leave. + * Assuming that the rcu_segcblist structure has enough + * segments in its arrays, this can only happen if some of + * the non-done segments contain callbacks that really are + * ready to invoke. This situation will get straightened + * out by the next call to rcu_segcblist_advance(). + * + * Also advance to the oldest segment of callbacks whose + * ->gp_seq[] completion is at or after that passed in via "seq", + * skipping any empty segments. + */ + if (++i >= RCU_NEXT_TAIL) + return false; + + /* + * Merge all later callbacks, including newly arrived callbacks, + * into the segment located by the for-loop above. Assign "seq" + * as the ->gp_seq[] value in order to correctly handle the case + * where there were no pending callbacks in the rcu_segcblist + * structure other than in the RCU_NEXT_TAIL segment. + */ + for (; i < RCU_NEXT_TAIL; i++) { + rsclp->tails[i] = rsclp->tails[RCU_NEXT_TAIL]; + rsclp->gp_seq[i] = seq; + } + return true; +} + +/* + * Scan the specified rcu_segcblist structure for callbacks that need + * a grace period later than the one specified by "seq". We don't look + * at the RCU_DONE_TAIL or RCU_NEXT_TAIL segments because they don't + * have a grace-period sequence number. + */ +static inline bool rcu_segcblist_future_gp_needed(struct rcu_segcblist *rsclp, + unsigned long seq) +{ + int i; + + for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) + if (rsclp->tails[i - 1] != rsclp->tails[i] && + ULONG_CMP_LT(seq, rsclp->gp_seq[i])) + return true; + return false; +} + +/* + * Interim function to return rcu_segcblist head pointer. Longer term, the + * rcu_segcblist will be used more pervasively, removing the need for this + * function. + */ +static inline struct rcu_head *rcu_segcblist_head(struct rcu_segcblist *rsclp) +{ + return rsclp->head; +} + +/* + * Interim function to return rcu_segcblist head pointer. Longer term, the + * rcu_segcblist will be used more pervasively, removing the need for this + * function. + */ +static inline struct rcu_head **rcu_segcblist_tail(struct rcu_segcblist *rsclp) +{ + WARN_ON_ONCE(rcu_segcblist_empty(rsclp)); + return rsclp->tails[RCU_NEXT_TAIL]; +} + +#endif /* __KERNEL_RCU_SEGCBLIST_H */ diff --git a/include/linux/srcu.h b/include/linux/srcu.h index 047ac8c28a4e..ad154a7bc114 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -22,7 +22,7 @@ * Lai Jiangshan * * For detailed explanation of Read-Copy Update mechanism see - - * Documentation/RCU/ *.txt + * Documentation/RCU/ *.txt * */ @@ -32,31 +32,20 @@ #include #include #include +#include struct srcu_array { unsigned long lock_count[2]; unsigned long unlock_count[2]; }; -struct rcu_batch { - struct rcu_head *head, **tail; -}; - -#define RCU_BATCH_INIT(name) { NULL, &(name.head) } - struct srcu_struct { unsigned long completed; unsigned long srcu_gp_seq; struct srcu_array __percpu *per_cpu_ref; - spinlock_t queue_lock; /* protect ->batch_queue, ->running */ + spinlock_t queue_lock; /* protect ->srcu_cblist, ->srcu_state */ int srcu_state; - /* callbacks just queued */ - struct rcu_batch batch_queue; - /* callbacks try to do the first check_zero */ - struct rcu_batch batch_check0; - /* callbacks done with the first check_zero and the flip */ - struct rcu_batch batch_check1; - struct rcu_batch batch_done; + struct rcu_segcblist srcu_cblist; struct delayed_work work; #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; @@ -97,10 +86,7 @@ void process_srcu(struct work_struct *work); .per_cpu_ref = &name##_srcu_array, \ .queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \ .srcu_state = SRCU_STATE_IDLE, \ - .batch_queue = RCU_BATCH_INIT(name.batch_queue), \ - .batch_check0 = RCU_BATCH_INIT(name.batch_check0), \ - .batch_check1 = RCU_BATCH_INIT(name.batch_check1), \ - .batch_done = RCU_BATCH_INIT(name.batch_done), \ + .srcu_cblist = RCU_SEGCBLIST_INITIALIZER(name.srcu_cblist),\ .work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\ __SRCU_DEP_MAP_INIT(name) \ } diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h index 0bc1313c49e2..a943b42a9cf7 100644 --- a/kernel/rcu/rcu.h +++ b/kernel/rcu/rcu.h @@ -87,6 +87,12 @@ static inline unsigned long rcu_seq_snap(unsigned long *sp) return s; } +/* Return the current value the update side's sequence number, no ordering. */ +static inline unsigned long rcu_seq_current(unsigned long *sp) +{ + return READ_ONCE(*sp); +} + /* * Given a snapshot from rcu_seq_snap(), determine whether or not a * full update-side operation has occurred. diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h deleted file mode 100644 index 982e3e05b22a..000000000000 --- a/kernel/rcu/rcu_segcblist.h +++ /dev/null @@ -1,670 +0,0 @@ -/* - * RCU segmented callback lists - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, you can access it online at - * http://www.gnu.org/licenses/gpl-2.0.html. - * - * Copyright IBM Corporation, 2017 - * - * Authors: Paul E. McKenney - */ - -#ifndef __KERNEL_RCU_SEGCBLIST_H -#define __KERNEL_RCU_SEGCBLIST_H - -/* Simple unsegmented callback lists. */ -struct rcu_cblist { - struct rcu_head *head; - struct rcu_head **tail; - long len; - long len_lazy; -}; - -#define RCU_CBLIST_INITIALIZER(n) { .head = NULL, .tail = &n.head } - -/* Initialize simple callback list. */ -static inline void rcu_cblist_init(struct rcu_cblist *rclp) -{ - rclp->head = NULL; - rclp->tail = &rclp->head; - rclp->len = 0; - rclp->len_lazy = 0; -} - -/* Is simple callback list empty? */ -static inline bool rcu_cblist_empty(struct rcu_cblist *rclp) -{ - return !rclp->head; -} - -/* Return number of callbacks in simple callback list. */ -static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp) -{ - return rclp->len; -} - -/* Return number of lazy callbacks in simple callback list. */ -static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp) -{ - return rclp->len_lazy; -} - -/* - * Debug function to actually count the number of callbacks. - * If the number exceeds the limit specified, return -1. - */ -static inline long rcu_cblist_count_cbs(struct rcu_cblist *rclp, long lim) -{ - int cnt = 0; - struct rcu_head **rhpp = &rclp->head; - - for (;;) { - if (!*rhpp) - return cnt; - if (++cnt > lim) - return -1; - rhpp = &(*rhpp)->next; - } -} - -/* - * Dequeue the oldest rcu_head structure from the specified callback - * list. This function assumes that the callback is non-lazy, but - * the caller can later invoke rcu_cblist_dequeued_lazy() if it - * finds otherwise (and if it cares about laziness). This allows - * different users to have different ways of determining laziness. - */ -static inline struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp) -{ - struct rcu_head *rhp; - - rhp = rclp->head; - if (!rhp) - return NULL; - prefetch(rhp); - rclp->len--; - rclp->head = rhp->next; - if (!rclp->head) - rclp->tail = &rclp->head; - return rhp; -} - -/* - * Account for the fact that a previously dequeued callback turned out - * to be marked as lazy. - */ -static inline void rcu_cblist_dequeued_lazy(struct rcu_cblist *rclp) -{ - rclp->len_lazy--; -} - -/* - * Interim function to return rcu_cblist head pointer. Longer term, the - * rcu_cblist will be used more pervasively, removing the need for this - * function. - */ -static inline struct rcu_head *rcu_cblist_head(struct rcu_cblist *rclp) -{ - return rclp->head; -} - -/* - * Interim function to return rcu_cblist head pointer. Longer term, the - * rcu_cblist will be used more pervasively, removing the need for this - * function. - */ -static inline struct rcu_head **rcu_cblist_tail(struct rcu_cblist *rclp) -{ - WARN_ON_ONCE(rcu_cblist_empty(rclp)); - return rclp->tail; -} - -/* Complicated segmented callback lists. ;-) */ - -/* - * Index values for segments in rcu_segcblist structure. - * - * The segments are as follows: - * - * [head, *tails[RCU_DONE_TAIL]): - * Callbacks whose grace period has elapsed, and thus can be invoked. - * [*tails[RCU_DONE_TAIL], *tails[RCU_WAIT_TAIL]): - * Callbacks waiting for the current GP from the current CPU's viewpoint. - * [*tails[RCU_WAIT_TAIL], *tails[RCU_NEXT_READY_TAIL]): - * Callbacks that arrived before the next GP started, again from - * the current CPU's viewpoint. These can be handled by the next GP. - * [*tails[RCU_NEXT_READY_TAIL], *tails[RCU_NEXT_TAIL]): - * Callbacks that might have arrived after the next GP started. - * There is some uncertainty as to when a given GP starts and - * ends, but a CPU knows the exact times if it is the one starting - * or ending the GP. Other CPUs know that the previous GP ends - * before the next one starts. - * - * Note that RCU_WAIT_TAIL cannot be empty unless RCU_NEXT_READY_TAIL is also - * empty. - * - * The ->gp_seq[] array contains the grace-period number at which the - * corresponding segment of callbacks will be ready to invoke. A given - * element of this array is meaningful only when the corresponding segment - * is non-empty, and it is never valid for RCU_DONE_TAIL (whose callbacks - * are already ready to invoke) or for RCU_NEXT_TAIL (whose callbacks have - * not yet been assigned a grace-period number). - */ -#define RCU_DONE_TAIL 0 /* Also RCU_WAIT head. */ -#define RCU_WAIT_TAIL 1 /* Also RCU_NEXT_READY head. */ -#define RCU_NEXT_READY_TAIL 2 /* Also RCU_NEXT head. */ -#define RCU_NEXT_TAIL 3 -#define RCU_CBLIST_NSEGS 4 - -struct rcu_segcblist { - struct rcu_head *head; - struct rcu_head **tails[RCU_CBLIST_NSEGS]; - unsigned long gp_seq[RCU_CBLIST_NSEGS]; - long len; - long len_lazy; -}; - -/* - * Initialize an rcu_segcblist structure. - */ -static inline void rcu_segcblist_init(struct rcu_segcblist *rsclp) -{ - int i; - - BUILD_BUG_ON(RCU_NEXT_TAIL + 1 != ARRAY_SIZE(rsclp->gp_seq)); - BUILD_BUG_ON(ARRAY_SIZE(rsclp->tails) != ARRAY_SIZE(rsclp->gp_seq)); - rsclp->head = NULL; - for (i = 0; i < RCU_CBLIST_NSEGS; i++) - rsclp->tails[i] = &rsclp->head; - rsclp->len = 0; - rsclp->len_lazy = 0; -} - -/* - * Is the specified rcu_segcblist structure empty? - * - * But careful! The fact that the ->head field is NULL does not - * necessarily imply that there are no callbacks associated with - * this structure. When callbacks are being invoked, they are - * removed as a group. If callback invocation must be preempted, - * the remaining callbacks will be added back to the list. Either - * way, the counts are updated later. - * - * So it is often the case that rcu_segcblist_n_cbs() should be used - * instead. - */ -static inline bool rcu_segcblist_empty(struct rcu_segcblist *rsclp) -{ - return !rsclp->head; -} - -/* Return number of callbacks in segmented callback list. */ -static inline long rcu_segcblist_n_cbs(struct rcu_segcblist *rsclp) -{ - return READ_ONCE(rsclp->len); -} - -/* Return number of lazy callbacks in segmented callback list. */ -static inline long rcu_segcblist_n_lazy_cbs(struct rcu_segcblist *rsclp) -{ - return rsclp->len_lazy; -} - -/* Return number of lazy callbacks in segmented callback list. */ -static inline long rcu_segcblist_n_nonlazy_cbs(struct rcu_segcblist *rsclp) -{ - return rsclp->len - rsclp->len_lazy; -} - -/* - * Is the specified rcu_segcblist enabled, for example, not corresponding - * to an offline or callback-offloaded CPU? - */ -static inline bool rcu_segcblist_is_enabled(struct rcu_segcblist *rsclp) -{ - return !!rsclp->tails[RCU_NEXT_TAIL]; -} - -/* - * Disable the specified rcu_segcblist structure, so that callbacks can - * no longer be posted to it. This structure must be empty. - */ -static inline void rcu_segcblist_disable(struct rcu_segcblist *rsclp) -{ - WARN_ON_ONCE(!rcu_segcblist_empty(rsclp)); - WARN_ON_ONCE(rcu_segcblist_n_cbs(rsclp)); - WARN_ON_ONCE(rcu_segcblist_n_lazy_cbs(rsclp)); - rsclp->tails[RCU_NEXT_TAIL] = NULL; -} - -/* - * Is the specified segment of the specified rcu_segcblist structure - * empty of callbacks? - */ -static inline bool rcu_segcblist_segempty(struct rcu_segcblist *rsclp, int seg) -{ - if (seg == RCU_DONE_TAIL) - return &rsclp->head == rsclp->tails[RCU_DONE_TAIL]; - return rsclp->tails[seg - 1] == rsclp->tails[seg]; -} - -/* - * Are all segments following the specified segment of the specified - * rcu_segcblist structure empty of callbacks? (The specified - * segment might well contain callbacks.) - */ -static inline bool rcu_segcblist_restempty(struct rcu_segcblist *rsclp, int seg) -{ - return !*rsclp->tails[seg]; -} - -/* - * Does the specified rcu_segcblist structure contain callbacks that - * are ready to be invoked? - */ -static inline bool rcu_segcblist_ready_cbs(struct rcu_segcblist *rsclp) -{ - return rcu_segcblist_is_enabled(rsclp) && - &rsclp->head != rsclp->tails[RCU_DONE_TAIL]; -} - -/* - * Does the specified rcu_segcblist structure contain callbacks that - * are still pending, that is, not yet ready to be invoked? - */ -static inline bool rcu_segcblist_pend_cbs(struct rcu_segcblist *rsclp) -{ - return rcu_segcblist_is_enabled(rsclp) && - !rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL); -} - -/* - * Dequeue and return the first ready-to-invoke callback. If there - * are no ready-to-invoke callbacks, return NULL. Disables interrupts - * to avoid interference. Does not protect from interference from other - * CPUs or tasks. - */ -static inline struct rcu_head * -rcu_segcblist_dequeue(struct rcu_segcblist *rsclp) -{ - unsigned long flags; - int i; - struct rcu_head *rhp; - - local_irq_save(flags); - if (!rcu_segcblist_ready_cbs(rsclp)) { - local_irq_restore(flags); - return NULL; - } - rhp = rsclp->head; - BUG_ON(!rhp); - rsclp->head = rhp->next; - for (i = RCU_DONE_TAIL; i < RCU_CBLIST_NSEGS; i++) { - if (rsclp->tails[i] != &rhp->next) - break; - rsclp->tails[i] = &rsclp->head; - } - smp_mb(); /* Dequeue before decrement for rcu_barrier(). */ - WRITE_ONCE(rsclp->len, rsclp->len - 1); - local_irq_restore(flags); - return rhp; -} - -/* - * Account for the fact that a previously dequeued callback turned out - * to be marked as lazy. - */ -static inline void rcu_segcblist_dequeued_lazy(struct rcu_segcblist *rsclp) -{ - unsigned long flags; - - local_irq_save(flags); - rsclp->len_lazy--; - local_irq_restore(flags); -} - -/* - * Return a pointer to the first callback in the specified rcu_segcblist - * structure. This is useful for diagnostics. - */ -static inline struct rcu_head * -rcu_segcblist_first_cb(struct rcu_segcblist *rsclp) -{ - if (rcu_segcblist_is_enabled(rsclp)) - return rsclp->head; - return NULL; -} - -/* - * Return a pointer to the first pending callback in the specified - * rcu_segcblist structure. This is useful just after posting a given - * callback -- if that callback is the first pending callback, then - * you cannot rely on someone else having already started up the required - * grace period. - */ -static inline struct rcu_head * -rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp) -{ - if (rcu_segcblist_is_enabled(rsclp)) - return *rsclp->tails[RCU_DONE_TAIL]; - return NULL; -} - -/* - * Does the specified rcu_segcblist structure contain callbacks that - * have not yet been processed beyond having been posted, that is, - * does it contain callbacks in its last segment? - */ -static inline bool rcu_segcblist_new_cbs(struct rcu_segcblist *rsclp) -{ - return rcu_segcblist_is_enabled(rsclp) && - !rcu_segcblist_restempty(rsclp, RCU_NEXT_READY_TAIL); -} - -/* - * Enqueue the specified callback onto the specified rcu_segcblist - * structure, updating accounting as needed. Note that the ->len - * field may be accessed locklessly, hence the WRITE_ONCE(). - * The ->len field is used by rcu_barrier() and friends to determine - * if it must post a callback on this structure, and it is OK - * for rcu_barrier() to sometimes post callbacks needlessly, but - * absolutely not OK for it to ever miss posting a callback. - */ -static inline void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp, - struct rcu_head *rhp, bool lazy) -{ - WRITE_ONCE(rsclp->len, rsclp->len + 1); /* ->len sampled locklessly. */ - if (lazy) - rsclp->len_lazy++; - smp_mb(); /* Ensure counts are updated before callback is enqueued. */ - rhp->next = NULL; - *rsclp->tails[RCU_NEXT_TAIL] = rhp; - rsclp->tails[RCU_NEXT_TAIL] = &rhp->next; -} - -/* - * Extract only the counts from the specified rcu_segcblist structure, - * and place them in the specified rcu_cblist structure. This function - * supports both callback orphaning and invocation, hence the separation - * of counts and callbacks. (Callbacks ready for invocation must be - * orphaned and adopted separately from pending callbacks, but counts - * apply to all callbacks. Locking must be used to make sure that - * both orphaned-callbacks lists are consistent.) - */ -static inline void rcu_segcblist_extract_count(struct rcu_segcblist *rsclp, - struct rcu_cblist *rclp) -{ - rclp->len_lazy += rsclp->len_lazy; - rclp->len += rsclp->len; - rsclp->len_lazy = 0; - WRITE_ONCE(rsclp->len, 0); /* ->len sampled locklessly. */ -} - -/* - * Extract only those callbacks ready to be invoked from the specified - * rcu_segcblist structure and place them in the specified rcu_cblist - * structure. - */ -static inline void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp, - struct rcu_cblist *rclp) -{ - int i; - - if (!rcu_segcblist_ready_cbs(rsclp)) - return; /* Nothing to do. */ - *rclp->tail = rsclp->head; - rsclp->head = *rsclp->tails[RCU_DONE_TAIL]; - *rsclp->tails[RCU_DONE_TAIL] = NULL; - rclp->tail = rsclp->tails[RCU_DONE_TAIL]; - for (i = RCU_CBLIST_NSEGS - 1; i >= RCU_DONE_TAIL; i--) - if (rsclp->tails[i] == rsclp->tails[RCU_DONE_TAIL]) - rsclp->tails[i] = &rsclp->head; -} - -/* - * Extract only those callbacks still pending (not yet ready to be - * invoked) from the specified rcu_segcblist structure and place them in - * the specified rcu_cblist structure. Note that this loses information - * about any callbacks that might have been partway done waiting for - * their grace period. Too bad! They will have to start over. - */ -static inline void -rcu_segcblist_extract_pend_cbs(struct rcu_segcblist *rsclp, - struct rcu_cblist *rclp) -{ - int i; - - if (!rcu_segcblist_pend_cbs(rsclp)) - return; /* Nothing to do. */ - *rclp->tail = *rsclp->tails[RCU_DONE_TAIL]; - rclp->tail = rsclp->tails[RCU_NEXT_TAIL]; - *rsclp->tails[RCU_DONE_TAIL] = NULL; - for (i = RCU_DONE_TAIL + 1; i < RCU_CBLIST_NSEGS; i++) - rsclp->tails[i] = rsclp->tails[RCU_DONE_TAIL]; -} - -/* - * Move the entire contents of the specified rcu_segcblist structure, - * counts, callbacks, and all, to the specified rcu_cblist structure. - * @@@ Why do we need this??? Moving early-boot CBs to NOCB lists? - * @@@ Memory barrier needed? (Not if only used at boot time...) - */ -static inline void rcu_segcblist_extract_all(struct rcu_segcblist *rsclp, - struct rcu_cblist *rclp) -{ - rcu_segcblist_extract_done_cbs(rsclp, rclp); - rcu_segcblist_extract_pend_cbs(rsclp, rclp); - rcu_segcblist_extract_count(rsclp, rclp); -} - -/* - * Insert counts from the specified rcu_cblist structure in the - * specified rcu_segcblist structure. - */ -static inline void rcu_segcblist_insert_count(struct rcu_segcblist *rsclp, - struct rcu_cblist *rclp) -{ - rsclp->len_lazy += rclp->len_lazy; - /* ->len sampled locklessly. */ - WRITE_ONCE(rsclp->len, rsclp->len + rclp->len); - rclp->len_lazy = 0; - rclp->len = 0; -} - -/* - * Move callbacks from the specified rcu_cblist to the beginning of the - * done-callbacks segment of the specified rcu_segcblist. - */ -static inline void rcu_segcblist_insert_done_cbs(struct rcu_segcblist *rsclp, - struct rcu_cblist *rclp) -{ - int i; - - if (!rclp->head) - return; /* No callbacks to move. */ - *rclp->tail = rsclp->head; - rsclp->head = rclp->head; - for (i = RCU_DONE_TAIL; i < RCU_CBLIST_NSEGS; i++) - if (&rsclp->head == rsclp->tails[i]) - rsclp->tails[i] = rclp->tail; - else - break; - rclp->head = NULL; - rclp->tail = &rclp->head; -} - -/* - * Move callbacks from the specified rcu_cblist to the end of the - * new-callbacks segment of the specified rcu_segcblist. - */ -static inline void rcu_segcblist_insert_pend_cbs(struct rcu_segcblist *rsclp, - struct rcu_cblist *rclp) -{ - if (!rclp->head) - return; /* Nothing to do. */ - *rsclp->tails[RCU_NEXT_TAIL] = rclp->head; - rsclp->tails[RCU_NEXT_TAIL] = rclp->tail; - rclp->head = NULL; - rclp->tail = &rclp->head; -} - -/* - * Advance the callbacks in the specified rcu_segcblist structure based - * on the current value passed in for the grace-period counter. - */ -static inline void rcu_segcblist_advance(struct rcu_segcblist *rsclp, - unsigned long seq) -{ - int i, j; - - WARN_ON_ONCE(!rcu_segcblist_is_enabled(rsclp)); - WARN_ON_ONCE(rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL)); - - /* - * Find all callbacks whose ->gp_seq numbers indicate that they - * are ready to invoke, and put them into the RCU_DONE_TAIL segment. - */ - for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) { - if (ULONG_CMP_LT(seq, rsclp->gp_seq[i])) - break; - rsclp->tails[RCU_DONE_TAIL] = rsclp->tails[i]; - } - - /* If no callbacks moved, nothing more need be done. */ - if (i == RCU_WAIT_TAIL) - return; - - /* Clean up tail pointers that might have been misordered above. */ - for (j = RCU_WAIT_TAIL; j < i; j++) - rsclp->tails[j] = rsclp->tails[RCU_DONE_TAIL]; - - /* - * Callbacks moved, so clean up the misordered ->tails[] pointers - * that now point into the middle of the list of ready-to-invoke - * callbacks. The overall effect is to copy down the later pointers - * into the gap that was created by the now-ready segments. - */ - for (j = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++, j++) { - if (rsclp->tails[j] == rsclp->tails[RCU_NEXT_TAIL]) - break; /* No more callbacks. */ - rsclp->tails[j] = rsclp->tails[i]; - rsclp->gp_seq[j] = rsclp->gp_seq[i]; - } -} - -/* - * "Accelerate" callbacks based on more-accurate grace-period information. - * The reason for this is that RCU does not synchronize the beginnings and - * ends of grace periods, and that callbacks are posted locally. This in - * turn means that the callbacks must be labelled conservatively early - * on, as getting exact information would degrade both performance and - * scalability. When more accurate grace-period information becomes - * available, previously posted callbacks can be "accelerated", marking - * them to complete at the end of the earlier grace period. - * - * This function operates on an rcu_segcblist structure, and also the - * grace-period sequence number at which new callbacks would become - * ready to invoke. - */ -static inline bool rcu_segcblist_accelerate(struct rcu_segcblist *rsclp, - unsigned long seq) -{ - int i; - - WARN_ON_ONCE(!rcu_segcblist_is_enabled(rsclp)); - WARN_ON_ONCE(rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL)); - - /* - * Find the segment preceding the oldest segment of callbacks - * whose ->gp_seq[] completion is at or after that passed in via - * "seq", skipping any empty segments. This oldest segment, along - * with any later segments, can be merged in with any newly arrived - * callbacks in the RCU_NEXT_TAIL segment, and assigned "seq" - * as their ->gp_seq[] grace-period completion sequence number. - */ - for (i = RCU_NEXT_READY_TAIL; i > RCU_DONE_TAIL; i--) - if (rsclp->tails[i] != rsclp->tails[i - 1] && - ULONG_CMP_LT(rsclp->gp_seq[i], seq)) - break; - - /* - * If all the segments contain callbacks that correspond to - * earlier grace-period sequence numbers than "seq", leave. - * Assuming that the rcu_segcblist structure has enough - * segments in its arrays, this can only happen if some of - * the non-done segments contain callbacks that really are - * ready to invoke. This situation will get straightened - * out by the next call to rcu_segcblist_advance(). - * - * Also advance to the oldest segment of callbacks whose - * ->gp_seq[] completion is at or after that passed in via "seq", - * skipping any empty segments. - */ - if (++i >= RCU_NEXT_TAIL) - return false; - - /* - * Merge all later callbacks, including newly arrived callbacks, - * into the segment located by the for-loop above. Assign "seq" - * as the ->gp_seq[] value in order to correctly handle the case - * where there were no pending callbacks in the rcu_segcblist - * structure other than in the RCU_NEXT_TAIL segment. - */ - for (; i < RCU_NEXT_TAIL; i++) { - rsclp->tails[i] = rsclp->tails[RCU_NEXT_TAIL]; - rsclp->gp_seq[i] = seq; - } - return true; -} - -/* - * Scan the specified rcu_segcblist structure for callbacks that need - * a grace period later than the one specified by "seq". We don't look - * at the RCU_DONE_TAIL or RCU_NEXT_TAIL segments because they don't - * have a grace-period sequence number. - */ -static inline bool rcu_segcblist_future_gp_needed(struct rcu_segcblist *rsclp, - unsigned long seq) -{ - int i; - - for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) - if (rsclp->tails[i - 1] != rsclp->tails[i] && - ULONG_CMP_LT(seq, rsclp->gp_seq[i])) - return true; - return false; -} - -/* - * Interim function to return rcu_segcblist head pointer. Longer term, the - * rcu_segcblist will be used more pervasively, removing the need for this - * function. - */ -static inline struct rcu_head *rcu_segcblist_head(struct rcu_segcblist *rsclp) -{ - return rsclp->head; -} - -/* - * Interim function to return rcu_segcblist head pointer. Longer term, the - * rcu_segcblist will be used more pervasively, removing the need for this - * function. - */ -static inline struct rcu_head **rcu_segcblist_tail(struct rcu_segcblist *rsclp) -{ - WARN_ON_ONCE(rcu_segcblist_empty(rsclp)); - return rsclp->tails[RCU_NEXT_TAIL]; -} - -#endif /* __KERNEL_RCU_SEGCBLIST_H */ diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c index d464986d82b6..56fd30862122 100644 --- a/kernel/rcu/srcu.c +++ b/kernel/rcu/srcu.c @@ -22,7 +22,7 @@ * Lai Jiangshan * * For detailed explanation of Read-Copy Update mechanism see - - * Documentation/RCU/ *.txt + * Documentation/RCU/ *.txt * */ @@ -38,85 +38,13 @@ #include "rcu.h" -/* - * Initialize an rcu_batch structure to empty. - */ -static inline void rcu_batch_init(struct rcu_batch *b) -{ - b->head = NULL; - b->tail = &b->head; -} - -/* - * Enqueue a callback onto the tail of the specified rcu_batch structure. - */ -static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head) -{ - *b->tail = head; - b->tail = &head->next; -} - -/* - * Is the specified rcu_batch structure empty? - */ -static inline bool rcu_batch_empty(struct rcu_batch *b) -{ - return b->tail == &b->head; -} - -/* - * Are all batches empty for the specified srcu_struct? - */ -static inline bool rcu_all_batches_empty(struct srcu_struct *sp) -{ - return rcu_batch_empty(&sp->batch_done) && - rcu_batch_empty(&sp->batch_check1) && - rcu_batch_empty(&sp->batch_check0) && - rcu_batch_empty(&sp->batch_queue); -} - -/* - * Remove the callback at the head of the specified rcu_batch structure - * and return a pointer to it, or return NULL if the structure is empty. - */ -static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b) -{ - struct rcu_head *head; - - if (rcu_batch_empty(b)) - return NULL; - - head = b->head; - b->head = head->next; - if (b->tail == &head->next) - rcu_batch_init(b); - - return head; -} - -/* - * Move all callbacks from the rcu_batch structure specified by "from" to - * the structure specified by "to". - */ -static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from) -{ - if (!rcu_batch_empty(from)) { - *to->tail = from->head; - to->tail = from->tail; - rcu_batch_init(from); - } -} - static int init_srcu_struct_fields(struct srcu_struct *sp) { sp->completed = 0; sp->srcu_gp_seq = 0; spin_lock_init(&sp->queue_lock); sp->srcu_state = SRCU_STATE_IDLE; - rcu_batch_init(&sp->batch_queue); - rcu_batch_init(&sp->batch_check0); - rcu_batch_init(&sp->batch_check1); - rcu_batch_init(&sp->batch_done); + rcu_segcblist_init(&sp->srcu_cblist); INIT_DELAYED_WORK(&sp->work, process_srcu); sp->per_cpu_ref = alloc_percpu(struct srcu_array); return sp->per_cpu_ref ? 0 : -ENOMEM; @@ -268,7 +196,7 @@ void cleanup_srcu_struct(struct srcu_struct *sp) { if (WARN_ON(srcu_readers_active(sp))) return; /* Leakage unless caller handles error. */ - if (WARN_ON(!rcu_all_batches_empty(sp))) + if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist))) return; /* Leakage unless caller handles error. */ flush_delayed_work(&sp->work); if (WARN_ON(READ_ONCE(sp->srcu_state) != SRCU_STATE_IDLE)) @@ -324,6 +252,8 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); */ static void srcu_gp_start(struct srcu_struct *sp) { + rcu_segcblist_accelerate(&sp->srcu_cblist, + rcu_seq_snap(&sp->srcu_gp_seq)); WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1); rcu_seq_start(&sp->srcu_gp_seq); } @@ -371,6 +301,11 @@ static void srcu_gp_end(struct srcu_struct *sp) { rcu_seq_end(&sp->srcu_gp_seq); WRITE_ONCE(sp->srcu_state, SRCU_STATE_DONE); + + spin_lock_irq(&sp->queue_lock); + rcu_segcblist_advance(&sp->srcu_cblist, + rcu_seq_current(&sp->srcu_gp_seq)); + spin_unlock_irq(&sp->queue_lock); } /* @@ -409,7 +344,7 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head, head->func = func; spin_lock_irqsave(&sp->queue_lock, flags); smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ - rcu_batch_queue(&sp->batch_queue, head); + rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) { srcu_gp_start(sp); queue_delayed_work(system_power_efficient_wq, &sp->work, 0); @@ -445,13 +380,13 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount) smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) { /* steal the processing owner */ + rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); srcu_gp_start(sp); - rcu_batch_queue(&sp->batch_check0, head); spin_unlock_irq(&sp->queue_lock); /* give the processing owner to work_struct */ srcu_reschedule(sp, 0); } else { - rcu_batch_queue(&sp->batch_queue, head); + rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); spin_unlock_irq(&sp->queue_lock); } @@ -548,19 +483,6 @@ EXPORT_SYMBOL_GPL(srcu_batches_completed); #define SRCU_CALLBACK_BATCH 10 #define SRCU_INTERVAL 1 -/* - * Move any new SRCU callbacks to the first stage of the SRCU grace - * period pipeline. - */ -static void srcu_collect_new(struct srcu_struct *sp) -{ - if (!rcu_batch_empty(&sp->batch_queue)) { - spin_lock_irq(&sp->queue_lock); - rcu_batch_move(&sp->batch_check0, &sp->batch_queue); - spin_unlock_irq(&sp->queue_lock); - } -} - /* * Core SRCU state machine. Advance callbacks from ->batch_check0 to * ->batch_check1 and then to ->batch_done as readers drain. @@ -586,26 +508,7 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) idx = 1 ^ (sp->completed & 1); if (!try_check_zero(sp, idx, trycount)) return; /* readers present, retry after SRCU_INTERVAL */ - - /* - * The callbacks in ->batch_check1 have already done - * with their first zero check and flip back when they were - * enqueued on ->batch_check0 in a previous invocation of - * srcu_advance_batches(). (Presumably try_check_zero() - * returned false during that invocation, leaving the - * callbacks stranded on ->batch_check1.) They are therefore - * ready to invoke, so move them to ->batch_done. - */ - rcu_batch_move(&sp->batch_done, &sp->batch_check1); srcu_flip(sp); - - /* - * The callbacks in ->batch_check0 just finished their - * first check zero and flip, so move them to ->batch_check1 - * for future checking on the other idx. - */ - rcu_batch_move(&sp->batch_check1, &sp->batch_check0); - WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN2); } @@ -619,14 +522,6 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) trycount = trycount < 2 ? 2 : trycount; if (!try_check_zero(sp, idx, trycount)) return; /* readers present, retry after SRCU_INTERVAL */ - - /* - * The callbacks in ->batch_check1 have now waited for - * all pre-existing readers using both idx values. They are - * therefore ready to invoke, so move them to ->batch_done. - */ - rcu_batch_move(&sp->batch_done, &sp->batch_check1); - srcu_gp_end(sp); } } @@ -639,17 +534,26 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) */ static void srcu_invoke_callbacks(struct srcu_struct *sp) { - int i; - struct rcu_head *head; + struct rcu_cblist ready_cbs; + struct rcu_head *rhp; - for (i = 0; i < SRCU_CALLBACK_BATCH; i++) { - head = rcu_batch_dequeue(&sp->batch_done); - if (!head) - break; + spin_lock_irq(&sp->queue_lock); + if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) { + spin_unlock_irq(&sp->queue_lock); + return; + } + rcu_cblist_init(&ready_cbs); + rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs); + spin_unlock_irq(&sp->queue_lock); + rhp = rcu_cblist_dequeue(&ready_cbs); + for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) { local_bh_disable(); - head->func(head); + rhp->func(rhp); local_bh_enable(); } + spin_lock_irq(&sp->queue_lock); + rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs); + spin_unlock_irq(&sp->queue_lock); } /* @@ -660,9 +564,9 @@ static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay) { bool pending = true; - if (rcu_all_batches_empty(sp)) { + if (rcu_segcblist_empty(&sp->srcu_cblist)) { spin_lock_irq(&sp->queue_lock); - if (rcu_all_batches_empty(sp) && + if (rcu_segcblist_empty(&sp->srcu_cblist) && READ_ONCE(sp->srcu_state) == SRCU_STATE_DONE) { WRITE_ONCE(sp->srcu_state, SRCU_STATE_IDLE); pending = false; @@ -683,7 +587,6 @@ void process_srcu(struct work_struct *work) sp = container_of(work, struct srcu_struct, work.work); - srcu_collect_new(sp); srcu_advance_batches(sp, 1); srcu_invoke_callbacks(sp); srcu_reschedule(sp, SRCU_INTERVAL); diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h index 93889ff21dbb..4f62651588ea 100644 --- a/kernel/rcu/tree.h +++ b/kernel/rcu/tree.h @@ -30,7 +30,7 @@ #include #include #include -#include "rcu_segcblist.h" +#include /* * Define shape of hierarchy based on NR_CPUS, CONFIG_RCU_FANOUT, and -- cgit v1.2.3 From 80a7956fe36c2ee40c6ff12c77926d267802b7c8 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Wed, 22 Mar 2017 15:26:18 -0700 Subject: srcu: Merge ->srcu_state into ->srcu_gp_seq Updating ->srcu_state and ->srcu_gp_seq will lead to extremely complex race conditions given multiple callback queues, so this commit takes advantage of the two-bit state now available in rcu_seq counters to store the state in the bottom two bits of ->srcu_gp_seq. Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 5 +---- kernel/rcu/rcu.h | 10 ++++++++++ kernel/rcu/srcu.c | 55 +++++++++++++++++++++++++++++++++------------------- 3 files changed, 46 insertions(+), 24 deletions(-) (limited to 'include/linux/srcu.h') diff --git a/include/linux/srcu.h b/include/linux/srcu.h index ad154a7bc114..e7dbc01b61a1 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -43,8 +43,7 @@ struct srcu_struct { unsigned long completed; unsigned long srcu_gp_seq; struct srcu_array __percpu *per_cpu_ref; - spinlock_t queue_lock; /* protect ->srcu_cblist, ->srcu_state */ - int srcu_state; + spinlock_t queue_lock; /* protect ->srcu_cblist */ struct rcu_segcblist srcu_cblist; struct delayed_work work; #ifdef CONFIG_DEBUG_LOCK_ALLOC @@ -56,7 +55,6 @@ struct srcu_struct { #define SRCU_STATE_IDLE 0 #define SRCU_STATE_SCAN1 1 #define SRCU_STATE_SCAN2 2 -#define SRCU_STATE_DONE 3 #ifdef CONFIG_DEBUG_LOCK_ALLOC @@ -85,7 +83,6 @@ void process_srcu(struct work_struct *work); .completed = -300, \ .per_cpu_ref = &name##_srcu_array, \ .queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \ - .srcu_state = SRCU_STATE_IDLE, \ .srcu_cblist = RCU_SEGCBLIST_INITIALIZER(name.srcu_cblist),\ .work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\ __SRCU_DEP_MAP_INIT(name) \ diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h index 87a0ac95b551..73e16ec4054b 100644 --- a/kernel/rcu/rcu.h +++ b/kernel/rcu/rcu.h @@ -82,6 +82,16 @@ static inline int rcu_seq_state(unsigned long s) return s & RCU_SEQ_STATE_MASK; } +/* + * Set the state portion of the pointed-to sequence number. + * The caller is responsible for preventing conflicting updates. + */ +static inline void rcu_seq_set_state(unsigned long *sp, int newstate) +{ + WARN_ON_ONCE(newstate & ~RCU_SEQ_STATE_MASK); + WRITE_ONCE(*sp, (*sp & ~RCU_SEQ_STATE_MASK) + newstate); +} + /* Adjust sequence number for start of update-side operation. */ static inline void rcu_seq_start(unsigned long *sp) { diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c index 1a2dc74bb625..90ffea31b188 100644 --- a/kernel/rcu/srcu.c +++ b/kernel/rcu/srcu.c @@ -44,7 +44,6 @@ static int init_srcu_struct_fields(struct srcu_struct *sp) sp->completed = 0; sp->srcu_gp_seq = 0; spin_lock_init(&sp->queue_lock); - sp->srcu_state = SRCU_STATE_IDLE; rcu_segcblist_init(&sp->srcu_cblist); INIT_DELAYED_WORK(&sp->work, process_srcu); sp->per_cpu_ref = alloc_percpu(struct srcu_array); @@ -180,6 +179,9 @@ static bool srcu_readers_active(struct srcu_struct *sp) return sum; } +#define SRCU_CALLBACK_BATCH 10 +#define SRCU_INTERVAL 1 + /** * cleanup_srcu_struct - deconstruct a sleep-RCU structure * @sp: structure to clean up. @@ -200,8 +202,10 @@ void cleanup_srcu_struct(struct srcu_struct *sp) if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist))) return; /* Leakage unless caller handles error. */ flush_delayed_work(&sp->work); - if (WARN_ON(READ_ONCE(sp->srcu_state) != SRCU_STATE_IDLE)) + if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE)) { + pr_info("cleanup_srcu_struct: Active srcu_struct %lu CBs %c state: %d\n", rcu_segcblist_n_cbs(&sp->srcu_cblist), ".E"[rcu_segcblist_empty(&sp->srcu_cblist)], rcu_seq_state(READ_ONCE(sp->srcu_gp_seq))); return; /* Caller forgot to stop doing call_srcu()? */ + } free_percpu(sp->per_cpu_ref); sp->per_cpu_ref = NULL; } @@ -253,10 +257,13 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); */ static void srcu_gp_start(struct srcu_struct *sp) { + int state; + rcu_segcblist_accelerate(&sp->srcu_cblist, rcu_seq_snap(&sp->srcu_gp_seq)); - WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1); rcu_seq_start(&sp->srcu_gp_seq); + state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); + WARN_ON_ONCE(state != SRCU_STATE_SCAN1); } /* @@ -300,7 +307,6 @@ static void srcu_flip(struct srcu_struct *sp) static void srcu_gp_end(struct srcu_struct *sp) { rcu_seq_end(&sp->srcu_gp_seq); - WRITE_ONCE(sp->srcu_state, SRCU_STATE_DONE); spin_lock_irq(&sp->queue_lock); rcu_segcblist_advance(&sp->srcu_cblist, @@ -345,7 +351,7 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head, spin_lock_irqsave(&sp->queue_lock, flags); smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); - if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) { + if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) { srcu_gp_start(sp); queue_delayed_work(system_power_efficient_wq, &sp->work, 0); } @@ -378,7 +384,7 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount) head->func = wakeme_after_rcu; spin_lock_irq(&sp->queue_lock); smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ - if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) { + if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) { /* steal the processing owner */ rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); srcu_gp_start(sp); @@ -480,9 +486,6 @@ unsigned long srcu_batches_completed(struct srcu_struct *sp) } EXPORT_SYMBOL_GPL(srcu_batches_completed); -#define SRCU_CALLBACK_BATCH 10 -#define SRCU_INTERVAL 1 - /* * Core SRCU state machine. Advance callbacks from ->batch_check0 to * ->batch_check1 and then to ->batch_done as readers drain. @@ -491,28 +494,40 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) { int idx; - WARN_ON_ONCE(sp->srcu_state == SRCU_STATE_IDLE); - /* * Because readers might be delayed for an extended period after * fetching ->completed for their index, at any point in time there * might well be readers using both idx=0 and idx=1. We therefore * need to wait for readers to clear from both index values before * invoking a callback. + * + * The load-acquire ensures that we see the accesses performed + * by the prior grace period. */ + idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */ + if (idx == SRCU_STATE_IDLE) { + spin_lock_irq(&sp->queue_lock); + if (rcu_segcblist_empty(&sp->srcu_cblist)) { + spin_unlock_irq(&sp->queue_lock); + return; + } + idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); + if (idx == SRCU_STATE_IDLE) + srcu_gp_start(sp); + spin_unlock_irq(&sp->queue_lock); + if (idx != SRCU_STATE_IDLE) + return; /* Someone else started the grace period. */ + } - if (sp->srcu_state == SRCU_STATE_DONE) - srcu_gp_start(sp); - - if (sp->srcu_state == SRCU_STATE_SCAN1) { + if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) { idx = 1 ^ (sp->completed & 1); if (!try_check_zero(sp, idx, trycount)) return; /* readers present, retry after SRCU_INTERVAL */ srcu_flip(sp); - WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN2); + rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2); } - if (sp->srcu_state == SRCU_STATE_SCAN2) { + if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN2) { /* * SRCU read-side critical sections are normally short, @@ -563,14 +578,14 @@ static void srcu_invoke_callbacks(struct srcu_struct *sp) static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay) { bool pending = true; + int state; if (rcu_segcblist_empty(&sp->srcu_cblist)) { spin_lock_irq(&sp->queue_lock); + state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); if (rcu_segcblist_empty(&sp->srcu_cblist) && - READ_ONCE(sp->srcu_state) == SRCU_STATE_DONE) { - WRITE_ONCE(sp->srcu_state, SRCU_STATE_IDLE); + state == SRCU_STATE_IDLE) pending = false; - } spin_unlock_irq(&sp->queue_lock); } -- cgit v1.2.3 From f60d231a87c5c9f23f10e69996f396d46f5bf901 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Fri, 24 Mar 2017 13:46:33 -0700 Subject: srcu: Crude control of expedited grace periods SRCU's implementation of expedited grace periods has always assumed that the SRCU instance is idle when the expedited request arrives. This commit improves this a bit by maintaining a count of the number of outstanding expedited requests, thus allowing prior non-expedited grace periods accommodate these requests by shifting to expedited mode. However, any non-expedited wait already in progress will still wait for the full duration. Improved control of expedited grace periods is planned, but one step at a time. Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 1 + kernel/rcu/srcu.c | 84 ++++++++++++++++++++++++++++------------------------ 2 files changed, 47 insertions(+), 38 deletions(-) (limited to 'include/linux/srcu.h') diff --git a/include/linux/srcu.h b/include/linux/srcu.h index e7dbc01b61a1..73a1b6296224 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -42,6 +42,7 @@ struct srcu_array { struct srcu_struct { unsigned long completed; unsigned long srcu_gp_seq; + atomic_t srcu_exp_cnt; struct srcu_array __percpu *per_cpu_ref; spinlock_t queue_lock; /* protect ->srcu_cblist */ struct rcu_segcblist srcu_cblist; diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c index 90ffea31b188..3cfcc59bddf3 100644 --- a/kernel/rcu/srcu.c +++ b/kernel/rcu/srcu.c @@ -43,6 +43,7 @@ static int init_srcu_struct_fields(struct srcu_struct *sp) { sp->completed = 0; sp->srcu_gp_seq = 0; + atomic_set(&sp->srcu_exp_cnt, 0); spin_lock_init(&sp->queue_lock); rcu_segcblist_init(&sp->srcu_cblist); INIT_DELAYED_WORK(&sp->work, process_srcu); @@ -179,7 +180,6 @@ static bool srcu_readers_active(struct srcu_struct *sp) return sum; } -#define SRCU_CALLBACK_BATCH 10 #define SRCU_INTERVAL 1 /** @@ -197,6 +197,7 @@ static bool srcu_readers_active(struct srcu_struct *sp) */ void cleanup_srcu_struct(struct srcu_struct *sp) { + WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt)); if (WARN_ON(srcu_readers_active(sp))) return; /* Leakage unless caller handles error. */ if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist))) @@ -244,13 +245,10 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); * We use an adaptive strategy for synchronize_srcu() and especially for * synchronize_srcu_expedited(). We spin for a fixed time period * (defined below) to allow SRCU readers to exit their read-side critical - * sections. If there are still some readers after 10 microseconds, - * we repeatedly block for 1-millisecond time periods. This approach - * has done well in testing, so there is no need for a config parameter. + * sections. If there are still some readers after a few microseconds, + * we repeatedly block for 1-millisecond time periods. */ #define SRCU_RETRY_CHECK_DELAY 5 -#define SYNCHRONIZE_SRCU_TRYCOUNT 2 -#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT 12 /* * Start an SRCU grace period. @@ -267,16 +265,16 @@ static void srcu_gp_start(struct srcu_struct *sp) } /* - * Wait until all readers counted by array index idx complete, but loop - * a maximum of trycount times. The caller must ensure that ->completed - * is not changed while checking. + * Wait until all readers counted by array index idx complete, but + * loop an additional time if there is an expedited grace period pending. + * The caller must ensure that ->completed is not changed while checking. */ static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount) { for (;;) { if (srcu_readers_active_idx_check(sp, idx)) return true; - if (--trycount <= 0) + if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0) return false; udelay(SRCU_RETRY_CHECK_DELAY); } @@ -364,7 +362,7 @@ static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay); /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ -static void __synchronize_srcu(struct srcu_struct *sp, int trycount) +static void __synchronize_srcu(struct srcu_struct *sp) { struct rcu_synchronize rcu; struct rcu_head *head = &rcu.head; @@ -400,6 +398,32 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount) smp_mb(); /* Caller's later accesses after GP. */ } +/** + * synchronize_srcu_expedited - Brute-force SRCU grace period + * @sp: srcu_struct with which to synchronize. + * + * Wait for an SRCU grace period to elapse, but be more aggressive about + * spinning rather than blocking when waiting. + * + * Note that synchronize_srcu_expedited() has the same deadlock and + * memory-ordering properties as does synchronize_srcu(). + */ +void synchronize_srcu_expedited(struct srcu_struct *sp) +{ + bool do_norm = rcu_gp_is_normal(); + + if (!do_norm) { + atomic_inc(&sp->srcu_exp_cnt); + smp_mb__after_atomic(); /* increment before GP. */ + } + __synchronize_srcu(sp); + if (!do_norm) { + smp_mb__before_atomic(); /* GP before decrement. */ + atomic_dec(&sp->srcu_exp_cnt); + } +} +EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); + /** * synchronize_srcu - wait for prior SRCU read-side critical-section completion * @sp: srcu_struct with which to synchronize. @@ -441,28 +465,13 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount) */ void synchronize_srcu(struct srcu_struct *sp) { - __synchronize_srcu(sp, (rcu_gp_is_expedited() && !rcu_gp_is_normal()) - ? SYNCHRONIZE_SRCU_EXP_TRYCOUNT - : SYNCHRONIZE_SRCU_TRYCOUNT); + if (rcu_gp_is_expedited()) + synchronize_srcu_expedited(sp); + else + __synchronize_srcu(sp); } EXPORT_SYMBOL_GPL(synchronize_srcu); -/** - * synchronize_srcu_expedited - Brute-force SRCU grace period - * @sp: srcu_struct with which to synchronize. - * - * Wait for an SRCU grace period to elapse, but be more aggressive about - * spinning rather than blocking when waiting. - * - * Note that synchronize_srcu_expedited() has the same deadlock and - * memory-ordering properties as does synchronize_srcu(). - */ -void synchronize_srcu_expedited(struct srcu_struct *sp) -{ - __synchronize_srcu(sp, SYNCHRONIZE_SRCU_EXP_TRYCOUNT); -} -EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); - /** * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete. * @sp: srcu_struct on which to wait for in-flight callbacks. @@ -490,7 +499,7 @@ EXPORT_SYMBOL_GPL(srcu_batches_completed); * Core SRCU state machine. Advance callbacks from ->batch_check0 to * ->batch_check1 and then to ->batch_done as readers drain. */ -static void srcu_advance_batches(struct srcu_struct *sp, int trycount) +static void srcu_advance_batches(struct srcu_struct *sp) { int idx; @@ -521,8 +530,8 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) { idx = 1 ^ (sp->completed & 1); - if (!try_check_zero(sp, idx, trycount)) - return; /* readers present, retry after SRCU_INTERVAL */ + if (!try_check_zero(sp, idx, 1)) + return; /* readers present, retry later. */ srcu_flip(sp); rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2); } @@ -534,9 +543,8 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) * so check at least twice in quick succession after a flip. */ idx = 1 ^ (sp->completed & 1); - trycount = trycount < 2 ? 2 : trycount; - if (!try_check_zero(sp, idx, trycount)) - return; /* readers present, retry after SRCU_INTERVAL */ + if (!try_check_zero(sp, idx, 2)) + return; /* readers present, retry after later. */ srcu_gp_end(sp); } } @@ -602,8 +610,8 @@ void process_srcu(struct work_struct *work) sp = container_of(work, struct srcu_struct, work.work); - srcu_advance_batches(sp, 1); + srcu_advance_batches(sp); srcu_invoke_callbacks(sp); - srcu_reschedule(sp, SRCU_INTERVAL); + srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL); } EXPORT_SYMBOL_GPL(process_srcu); -- cgit v1.2.3 From d8be81735aa89413b333de488251f0e64e2be591 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Sat, 25 Mar 2017 09:59:38 -0700 Subject: srcu: Create a tiny SRCU In response to automated complaints about modifications to SRCU increasing its size, this commit creates a tiny SRCU that is used in SMP=n && PREEMPT=n builds. Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 69 ++------------- include/linux/srcutiny.h | 81 ++++++++++++++++++ include/linux/srcutree.h | 91 ++++++++++++++++++++ init/Kconfig | 12 +++ kernel/rcu/Makefile | 3 +- kernel/rcu/rcutorture.c | 2 + kernel/rcu/srcutiny.c | 215 +++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 411 insertions(+), 62 deletions(-) create mode 100644 include/linux/srcutiny.h create mode 100644 include/linux/srcutree.h create mode 100644 kernel/rcu/srcutiny.c (limited to 'include/linux/srcu.h') diff --git a/include/linux/srcu.h b/include/linux/srcu.h index 73a1b6296224..907f09b14eda 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -34,28 +34,7 @@ #include #include -struct srcu_array { - unsigned long lock_count[2]; - unsigned long unlock_count[2]; -}; - -struct srcu_struct { - unsigned long completed; - unsigned long srcu_gp_seq; - atomic_t srcu_exp_cnt; - struct srcu_array __percpu *per_cpu_ref; - spinlock_t queue_lock; /* protect ->srcu_cblist */ - struct rcu_segcblist srcu_cblist; - struct delayed_work work; -#ifdef CONFIG_DEBUG_LOCK_ALLOC - struct lockdep_map dep_map; -#endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ -}; - -/* Values for -> state variable. */ -#define SRCU_STATE_IDLE 0 -#define SRCU_STATE_SCAN1 1 -#define SRCU_STATE_SCAN2 2 +struct srcu_struct; #ifdef CONFIG_DEBUG_LOCK_ALLOC @@ -77,42 +56,13 @@ int init_srcu_struct(struct srcu_struct *sp); #define __SRCU_DEP_MAP_INIT(srcu_name) #endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */ -void process_srcu(struct work_struct *work); - -#define __SRCU_STRUCT_INIT(name) \ - { \ - .completed = -300, \ - .per_cpu_ref = &name##_srcu_array, \ - .queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \ - .srcu_cblist = RCU_SEGCBLIST_INITIALIZER(name.srcu_cblist),\ - .work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\ - __SRCU_DEP_MAP_INIT(name) \ - } - -/* - * Define and initialize a srcu struct at build time. - * Do -not- call init_srcu_struct() nor cleanup_srcu_struct() on it. - * - * Note that although DEFINE_STATIC_SRCU() hides the name from other - * files, the per-CPU variable rules nevertheless require that the - * chosen name be globally unique. These rules also prohibit use of - * DEFINE_STATIC_SRCU() within a function. If these rules are too - * restrictive, declare the srcu_struct manually. For example, in - * each file: - * - * static struct srcu_struct my_srcu; - * - * Then, before the first use of each my_srcu, manually initialize it: - * - * init_srcu_struct(&my_srcu); - * - * See include/linux/percpu-defs.h for the rules on per-CPU variables. - */ -#define __DEFINE_SRCU(name, is_static) \ - static DEFINE_PER_CPU(struct srcu_array, name##_srcu_array);\ - is_static struct srcu_struct name = __SRCU_STRUCT_INIT(name) -#define DEFINE_SRCU(name) __DEFINE_SRCU(name, /* not static */) -#define DEFINE_STATIC_SRCU(name) __DEFINE_SRCU(name, static) +#ifdef CONFIG_TINY_SRCU +#include +#elif defined(CONFIG_TREE_SRCU) +#include +#else +#error "Unknown SRCU implementation specified to kernel configuration" +#endif /** * call_srcu() - Queue a callback for invocation after an SRCU grace period @@ -138,9 +88,6 @@ void cleanup_srcu_struct(struct srcu_struct *sp); int __srcu_read_lock(struct srcu_struct *sp) __acquires(sp); void __srcu_read_unlock(struct srcu_struct *sp, int idx) __releases(sp); void synchronize_srcu(struct srcu_struct *sp); -void synchronize_srcu_expedited(struct srcu_struct *sp); -unsigned long srcu_batches_completed(struct srcu_struct *sp); -void srcu_barrier(struct srcu_struct *sp); #ifdef CONFIG_DEBUG_LOCK_ALLOC diff --git a/include/linux/srcutiny.h b/include/linux/srcutiny.h new file mode 100644 index 000000000000..4f284e4f4d8c --- /dev/null +++ b/include/linux/srcutiny.h @@ -0,0 +1,81 @@ +/* + * Sleepable Read-Copy Update mechanism for mutual exclusion, + * tiny variant. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, you can access it online at + * http://www.gnu.org/licenses/gpl-2.0.html. + * + * Copyright (C) IBM Corporation, 2017 + * + * Author: Paul McKenney + */ + +#ifndef _LINUX_SRCU_TINY_H +#define _LINUX_SRCU_TINY_H + +#include + +struct srcu_struct { + int srcu_lock_nesting[2]; /* srcu_read_lock() nesting depth. */ + struct swait_queue_head srcu_wq; + /* Last srcu_read_unlock() wakes GP. */ + unsigned long srcu_gp_seq; /* GP seq # for callback tagging. */ + struct rcu_segcblist srcu_cblist; + /* Pending SRCU callbacks. */ + int srcu_idx; /* Current reader array element. */ + bool srcu_gp_running; /* GP workqueue running? */ + bool srcu_gp_waiting; /* GP waiting for readers? */ + struct work_struct srcu_work; /* For driving grace periods. */ +#ifdef CONFIG_DEBUG_LOCK_ALLOC + struct lockdep_map dep_map; +#endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ +}; + +void srcu_drive_gp(struct work_struct *wp); + +#define __SRCU_STRUCT_INIT(name) \ +{ \ + .srcu_wq = __SWAIT_QUEUE_HEAD_INITIALIZER(name.srcu_wq), \ + .srcu_cblist = RCU_SEGCBLIST_INITIALIZER(name.srcu_cblist), \ + .srcu_work = __WORK_INITIALIZER(name.srcu_work, srcu_drive_gp), \ + __SRCU_DEP_MAP_INIT(name) \ +} + +/* + * This odd _STATIC_ arrangement is needed for API compatibility with + * Tree SRCU, which needs some per-CPU data. + */ +#define DEFINE_SRCU(name) \ + struct srcu_struct name = __SRCU_STRUCT_INIT(name) +#define DEFINE_STATIC_SRCU(name) \ + static struct srcu_struct name = __SRCU_STRUCT_INIT(name) + +void synchronize_srcu(struct srcu_struct *sp); + +static inline void synchronize_srcu_expedited(struct srcu_struct *sp) +{ + synchronize_srcu(sp); +} + +static inline void srcu_barrier(struct srcu_struct *sp) +{ + synchronize_srcu(sp); +} + +static inline unsigned long srcu_batches_completed(struct srcu_struct *sp) +{ + return 0; +} + +#endif diff --git a/include/linux/srcutree.h b/include/linux/srcutree.h new file mode 100644 index 000000000000..f2b3bd6c6bc2 --- /dev/null +++ b/include/linux/srcutree.h @@ -0,0 +1,91 @@ +/* + * Sleepable Read-Copy Update mechanism for mutual exclusion, + * tree variant. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, you can access it online at + * http://www.gnu.org/licenses/gpl-2.0.html. + * + * Copyright (C) IBM Corporation, 2017 + * + * Author: Paul McKenney + */ + +#ifndef _LINUX_SRCU_TREE_H +#define _LINUX_SRCU_TREE_H + +struct srcu_array { + unsigned long lock_count[2]; + unsigned long unlock_count[2]; +}; + +struct srcu_struct { + unsigned long completed; + unsigned long srcu_gp_seq; + atomic_t srcu_exp_cnt; + struct srcu_array __percpu *per_cpu_ref; + spinlock_t queue_lock; /* protect ->srcu_cblist */ + struct rcu_segcblist srcu_cblist; + struct delayed_work work; +#ifdef CONFIG_DEBUG_LOCK_ALLOC + struct lockdep_map dep_map; +#endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ +}; + +/* Values for -> state variable. */ +#define SRCU_STATE_IDLE 0 +#define SRCU_STATE_SCAN1 1 +#define SRCU_STATE_SCAN2 2 + +void process_srcu(struct work_struct *work); + +#define __SRCU_STRUCT_INIT(name) \ + { \ + .completed = -300, \ + .per_cpu_ref = &name##_srcu_array, \ + .queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \ + .srcu_cblist = RCU_SEGCBLIST_INITIALIZER(name.srcu_cblist),\ + .work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\ + __SRCU_DEP_MAP_INIT(name) \ + } + +/* + * Define and initialize a srcu struct at build time. + * Do -not- call init_srcu_struct() nor cleanup_srcu_struct() on it. + * + * Note that although DEFINE_STATIC_SRCU() hides the name from other + * files, the per-CPU variable rules nevertheless require that the + * chosen name be globally unique. These rules also prohibit use of + * DEFINE_STATIC_SRCU() within a function. If these rules are too + * restrictive, declare the srcu_struct manually. For example, in + * each file: + * + * static struct srcu_struct my_srcu; + * + * Then, before the first use of each my_srcu, manually initialize it: + * + * init_srcu_struct(&my_srcu); + * + * See include/linux/percpu-defs.h for the rules on per-CPU variables. + */ +#define __DEFINE_SRCU(name, is_static) \ + static DEFINE_PER_CPU(struct srcu_array, name##_srcu_array);\ + is_static struct srcu_struct name = __SRCU_STRUCT_INIT(name) +#define DEFINE_SRCU(name) __DEFINE_SRCU(name, /* not static */) +#define DEFINE_STATIC_SRCU(name) __DEFINE_SRCU(name, static) + +void synchronize_srcu_expedited(struct srcu_struct *sp); +void srcu_barrier(struct srcu_struct *sp); +unsigned long srcu_batches_completed(struct srcu_struct *sp); + +#endif diff --git a/init/Kconfig b/init/Kconfig index a92f27da4a27..d269f2ca17b8 100644 --- a/init/Kconfig +++ b/init/Kconfig @@ -526,6 +526,18 @@ config SRCU permits arbitrary sleeping or blocking within RCU read-side critical sections. +config TINY_SRCU + bool + default y if TINY_RCU + help + This option selects the single-CPU non-preemptible version of SRCU. + +config TREE_SRCU + bool + default y if !TINY_RCU + help + This option selects the full-fledged version of SRCU. + config TASKS_RCU bool default n diff --git a/kernel/rcu/Makefile b/kernel/rcu/Makefile index 18dfc485225c..b853214a2b99 100644 --- a/kernel/rcu/Makefile +++ b/kernel/rcu/Makefile @@ -3,7 +3,8 @@ KCOV_INSTRUMENT := n obj-y += update.o sync.o -obj-$(CONFIG_SRCU) += srcu.o +obj-$(CONFIG_TREE_SRCU) += srcu.o +obj-$(CONFIG_TINY_SRCU) += srcutiny.o obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o obj-$(CONFIG_RCU_PERF_TEST) += rcuperf.o obj-$(CONFIG_TREE_RCU) += tree.o diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c index cccc417a8135..98591e16db1a 100644 --- a/kernel/rcu/rcutorture.c +++ b/kernel/rcu/rcutorture.c @@ -559,6 +559,7 @@ static void srcu_torture_barrier(void) static void srcu_torture_stats(void) { +#ifdef CONFIG_TREE_SRCU int cpu; int idx = srcu_ctlp->completed & 0x1; @@ -587,6 +588,7 @@ static void srcu_torture_stats(void) pr_cont(" %d(%ld,%ld)", cpu, c0, c1); } pr_cont("\n"); +#endif } static void srcu_torture_synchronize_expedited(void) diff --git a/kernel/rcu/srcutiny.c b/kernel/rcu/srcutiny.c new file mode 100644 index 000000000000..b8293527ee18 --- /dev/null +++ b/kernel/rcu/srcutiny.c @@ -0,0 +1,215 @@ +/* + * Sleepable Read-Copy Update mechanism for mutual exclusion, + * tiny version for non-preemptible single-CPU use. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, you can access it online at + * http://www.gnu.org/licenses/gpl-2.0.html. + * + * Copyright (C) IBM Corporation, 2017 + * + * Author: Paul McKenney + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include "rcu.h" + +static int init_srcu_struct_fields(struct srcu_struct *sp) +{ + sp->srcu_lock_nesting[0] = 0; + sp->srcu_lock_nesting[1] = 0; + init_swait_queue_head(&sp->srcu_wq); + sp->srcu_gp_seq = 0; + rcu_segcblist_init(&sp->srcu_cblist); + sp->srcu_gp_running = false; + sp->srcu_gp_waiting = false; + sp->srcu_idx = 0; + INIT_WORK(&sp->srcu_work, srcu_drive_gp); + return 0; +} + +#ifdef CONFIG_DEBUG_LOCK_ALLOC + +int __init_srcu_struct(struct srcu_struct *sp, const char *name, + struct lock_class_key *key) +{ + /* Don't re-initialize a lock while it is held. */ + debug_check_no_locks_freed((void *)sp, sizeof(*sp)); + lockdep_init_map(&sp->dep_map, name, key, 0); + return init_srcu_struct_fields(sp); +} +EXPORT_SYMBOL_GPL(__init_srcu_struct); + +#else /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ + +/* + * init_srcu_struct - initialize a sleep-RCU structure + * @sp: structure to initialize. + * + * Must invoke this on a given srcu_struct before passing that srcu_struct + * to any other function. Each srcu_struct represents a separate domain + * of SRCU protection. + */ +int init_srcu_struct(struct srcu_struct *sp) +{ + return init_srcu_struct_fields(sp); +} +EXPORT_SYMBOL_GPL(init_srcu_struct); + +#endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */ + +/* + * cleanup_srcu_struct - deconstruct a sleep-RCU structure + * @sp: structure to clean up. + * + * Must invoke this after you are finished using a given srcu_struct that + * was initialized via init_srcu_struct(), else you leak memory. + */ +void cleanup_srcu_struct(struct srcu_struct *sp) +{ + WARN_ON(sp->srcu_lock_nesting[0] || sp->srcu_lock_nesting[1]); + flush_work(&sp->srcu_work); + WARN_ON(rcu_seq_state(sp->srcu_gp_seq)); + WARN_ON(sp->srcu_gp_running); + WARN_ON(sp->srcu_gp_waiting); + WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist)); +} +EXPORT_SYMBOL_GPL(cleanup_srcu_struct); + +/* + * Counts the new reader in the appropriate per-CPU element of the + * srcu_struct. Must be called from process context. + * Returns an index that must be passed to the matching srcu_read_unlock(). + */ +int __srcu_read_lock(struct srcu_struct *sp) +{ + int idx; + + idx = READ_ONCE(sp->srcu_idx); + WRITE_ONCE(sp->srcu_lock_nesting[idx], sp->srcu_lock_nesting[idx] + 1); + return idx; +} +EXPORT_SYMBOL_GPL(__srcu_read_lock); + +/* + * Removes the count for the old reader from the appropriate element of + * the srcu_struct. Must be called from process context. + */ +void __srcu_read_unlock(struct srcu_struct *sp, int idx) +{ + int newval = sp->srcu_lock_nesting[idx] - 1; + + WRITE_ONCE(sp->srcu_lock_nesting[idx], newval); + if (!newval && READ_ONCE(sp->srcu_gp_waiting)) + swake_up(&sp->srcu_wq); +} +EXPORT_SYMBOL_GPL(__srcu_read_unlock); + +/* + * Workqueue handler to drive one grace period and invoke any callbacks + * that become ready as a result. Single-CPU and !PREEMPT operation + * means that we get away with murder on synchronization. ;-) + */ +void srcu_drive_gp(struct work_struct *wp) +{ + int idx; + struct rcu_cblist ready_cbs; + struct srcu_struct *sp; + struct rcu_head *rhp; + + sp = container_of(wp, struct srcu_struct, srcu_work); + if (sp->srcu_gp_running || rcu_segcblist_empty(&sp->srcu_cblist)) + return; /* Already running or nothing to do. */ + + /* Tag recently arrived callbacks and wait for readers. */ + WRITE_ONCE(sp->srcu_gp_running, true); + rcu_segcblist_accelerate(&sp->srcu_cblist, + rcu_seq_snap(&sp->srcu_gp_seq)); + rcu_seq_start(&sp->srcu_gp_seq); + idx = sp->srcu_idx; + WRITE_ONCE(sp->srcu_idx, !sp->srcu_idx); + WRITE_ONCE(sp->srcu_gp_waiting, true); /* srcu_read_unlock() wakes! */ + swait_event(sp->srcu_wq, !READ_ONCE(sp->srcu_lock_nesting[idx])); + WRITE_ONCE(sp->srcu_gp_waiting, false); /* srcu_read_unlock() cheap. */ + rcu_seq_end(&sp->srcu_gp_seq); + + /* Update callback list based on GP, and invoke ready callbacks. */ + rcu_segcblist_advance(&sp->srcu_cblist, + rcu_seq_current(&sp->srcu_gp_seq)); + if (rcu_segcblist_ready_cbs(&sp->srcu_cblist)) { + rcu_cblist_init(&ready_cbs); + local_irq_disable(); + rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs); + local_irq_enable(); + rhp = rcu_cblist_dequeue(&ready_cbs); + for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) { + local_bh_disable(); + rhp->func(rhp); + local_bh_enable(); + } + local_irq_disable(); + rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs); + local_irq_enable(); + } + WRITE_ONCE(sp->srcu_gp_running, false); + + /* + * If more callbacks, reschedule ourselves. This can race with + * a call_srcu() at interrupt level, but the ->srcu_gp_running + * checks will straighten that out. + */ + if (!rcu_segcblist_empty(&sp->srcu_cblist)) + schedule_work(&sp->srcu_work); +} +EXPORT_SYMBOL_GPL(srcu_drive_gp); + +/* + * Enqueue an SRCU callback on the specified srcu_struct structure, + * initiating grace-period processing if it is not already running. + */ +void call_srcu(struct srcu_struct *sp, struct rcu_head *head, + rcu_callback_t func) +{ + unsigned long flags; + + head->func = func; + local_irq_save(flags); + rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); + local_irq_restore(flags); + if (!READ_ONCE(sp->srcu_gp_running)) + schedule_work(&sp->srcu_work); +} +EXPORT_SYMBOL_GPL(call_srcu); + +/* + * synchronize_srcu - wait for prior SRCU read-side critical-section completion + */ +void synchronize_srcu(struct srcu_struct *sp) +{ + struct rcu_synchronize rs; + + init_rcu_head_on_stack(&rs.head); + init_completion(&rs.completion); + call_srcu(sp, &rs.head, wakeme_after_rcu); + wait_for_completion(&rs.completion); + destroy_rcu_head_on_stack(&rs.head); +} +EXPORT_SYMBOL_GPL(synchronize_srcu); -- cgit v1.2.3 From dad81a2026841b5e2651aab58a7398c13cc05847 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Sat, 25 Mar 2017 17:23:44 -0700 Subject: srcu: Introduce CLASSIC_SRCU Kconfig option The TREE_SRCU rewrite is large and a bit on the non-simple side, so this commit helps reduce risk by allowing the old v4.11 SRCU algorithm to be selected using a new CLASSIC_SRCU Kconfig option that depends on RCU_EXPERT. The default is to use the new TREE_SRCU and TINY_SRCU algorithms, in order to help get these the testing that they need. However, if your users do not require the update-side scalability that is to be provided by TREE_SRCU, select RCU_EXPERT and then CLASSIC_SRCU to revert back to the old classic SRCU algorithm. Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 2 + include/linux/srcuclassic.h | 101 ++++++++ init/Kconfig | 21 +- kernel/rcu/Makefile | 3 +- kernel/rcu/rcutorture.c | 2 +- kernel/rcu/srcu.c | 347 ++++++++++++++----------- kernel/rcu/srcutree.c | 613 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 934 insertions(+), 155 deletions(-) create mode 100644 include/linux/srcuclassic.h create mode 100644 kernel/rcu/srcutree.c (limited to 'include/linux/srcu.h') diff --git a/include/linux/srcu.h b/include/linux/srcu.h index 907f09b14eda..167ad8831aaf 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -60,6 +60,8 @@ int init_srcu_struct(struct srcu_struct *sp); #include #elif defined(CONFIG_TREE_SRCU) #include +#elif defined(CONFIG_CLASSIC_SRCU) +#include #else #error "Unknown SRCU implementation specified to kernel configuration" #endif diff --git a/include/linux/srcuclassic.h b/include/linux/srcuclassic.h new file mode 100644 index 000000000000..41cf99930f34 --- /dev/null +++ b/include/linux/srcuclassic.h @@ -0,0 +1,101 @@ +/* + * Sleepable Read-Copy Update mechanism for mutual exclusion, + * classic v4.11 variant. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, you can access it online at + * http://www.gnu.org/licenses/gpl-2.0.html. + * + * Copyright (C) IBM Corporation, 2017 + * + * Author: Paul McKenney + */ + +#ifndef _LINUX_SRCU_CLASSIC_H +#define _LINUX_SRCU_CLASSIC_H + +struct srcu_array { + unsigned long lock_count[2]; + unsigned long unlock_count[2]; +}; + +struct rcu_batch { + struct rcu_head *head, **tail; +}; + +#define RCU_BATCH_INIT(name) { NULL, &(name.head) } + +struct srcu_struct { + unsigned long completed; + struct srcu_array __percpu *per_cpu_ref; + spinlock_t queue_lock; /* protect ->batch_queue, ->running */ + bool running; + /* callbacks just queued */ + struct rcu_batch batch_queue; + /* callbacks try to do the first check_zero */ + struct rcu_batch batch_check0; + /* callbacks done with the first check_zero and the flip */ + struct rcu_batch batch_check1; + struct rcu_batch batch_done; + struct delayed_work work; +#ifdef CONFIG_DEBUG_LOCK_ALLOC + struct lockdep_map dep_map; +#endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ +}; + +void process_srcu(struct work_struct *work); + +#define __SRCU_STRUCT_INIT(name) \ + { \ + .completed = -300, \ + .per_cpu_ref = &name##_srcu_array, \ + .queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \ + .running = false, \ + .batch_queue = RCU_BATCH_INIT(name.batch_queue), \ + .batch_check0 = RCU_BATCH_INIT(name.batch_check0), \ + .batch_check1 = RCU_BATCH_INIT(name.batch_check1), \ + .batch_done = RCU_BATCH_INIT(name.batch_done), \ + .work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\ + __SRCU_DEP_MAP_INIT(name) \ + } + +/* + * Define and initialize a srcu struct at build time. + * Do -not- call init_srcu_struct() nor cleanup_srcu_struct() on it. + * + * Note that although DEFINE_STATIC_SRCU() hides the name from other + * files, the per-CPU variable rules nevertheless require that the + * chosen name be globally unique. These rules also prohibit use of + * DEFINE_STATIC_SRCU() within a function. If these rules are too + * restrictive, declare the srcu_struct manually. For example, in + * each file: + * + * static struct srcu_struct my_srcu; + * + * Then, before the first use of each my_srcu, manually initialize it: + * + * init_srcu_struct(&my_srcu); + * + * See include/linux/percpu-defs.h for the rules on per-CPU variables. + */ +#define __DEFINE_SRCU(name, is_static) \ + static DEFINE_PER_CPU(struct srcu_array, name##_srcu_array);\ + is_static struct srcu_struct name = __SRCU_STRUCT_INIT(name) +#define DEFINE_SRCU(name) __DEFINE_SRCU(name, /* not static */) +#define DEFINE_STATIC_SRCU(name) __DEFINE_SRCU(name, static) + +void synchronize_srcu_expedited(struct srcu_struct *sp); +void srcu_barrier(struct srcu_struct *sp); +unsigned long srcu_batches_completed(struct srcu_struct *sp); + +#endif diff --git a/init/Kconfig b/init/Kconfig index d269f2ca17b8..558cc3638ab9 100644 --- a/init/Kconfig +++ b/init/Kconfig @@ -526,15 +526,32 @@ config SRCU permits arbitrary sleeping or blocking within RCU read-side critical sections. +config CLASSIC_SRCU + bool "Use v4.11 classic SRCU implementation" + default n + depends on RCU_EXPERT && SRCU + help + This option selects the traditional well-tested classic SRCU + implementation from v4.11, as might be desired for enterprise + Linux distributions. Without this option, the shiny new + Tiny SRCU and Tree SRCU implementations are used instead. + At some point, it is hoped that Tiny SRCU and Tree SRCU + will accumulate enough test time and confidence to allow + Classic SRCU to be dropped entirely. + + Say Y if you need a rock-solid SRCU. + + Say N if you would like help test Tree SRCU. + config TINY_SRCU bool - default y if TINY_RCU + default y if TINY_RCU && !CLASSIC_SRCU help This option selects the single-CPU non-preemptible version of SRCU. config TREE_SRCU bool - default y if !TINY_RCU + default y if !TINY_RCU && !CLASSIC_SRCU help This option selects the full-fledged version of SRCU. diff --git a/kernel/rcu/Makefile b/kernel/rcu/Makefile index b853214a2b99..158e6593d58c 100644 --- a/kernel/rcu/Makefile +++ b/kernel/rcu/Makefile @@ -3,7 +3,8 @@ KCOV_INSTRUMENT := n obj-y += update.o sync.o -obj-$(CONFIG_TREE_SRCU) += srcu.o +obj-$(CONFIG_CLASSIC_SRCU) += srcu.o +obj-$(CONFIG_TREE_SRCU) += srcutree.o obj-$(CONFIG_TINY_SRCU) += srcutiny.o obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o obj-$(CONFIG_RCU_PERF_TEST) += rcuperf.o diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c index 9cbb8a7b909d..6f344b6748a8 100644 --- a/kernel/rcu/rcutorture.c +++ b/kernel/rcu/rcutorture.c @@ -562,7 +562,7 @@ static void srcu_torture_stats(void) int __maybe_unused cpu; int idx; -#ifdef CONFIG_TREE_SRCU +#if defined(CONFIG_TREE_SRCU) || defined(CONFIG_CLASSIC_SRCU) idx = srcu_ctlp->completed & 0x1; pr_alert("%s%s Tree SRCU per-CPU(idx=%d):", torture_type, TORTURE_FLAG, idx); diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c index 3cfcc59bddf3..584d8a983883 100644 --- a/kernel/rcu/srcu.c +++ b/kernel/rcu/srcu.c @@ -36,16 +36,75 @@ #include #include -#include #include "rcu.h" +/* + * Initialize an rcu_batch structure to empty. + */ +static inline void rcu_batch_init(struct rcu_batch *b) +{ + b->head = NULL; + b->tail = &b->head; +} + +/* + * Enqueue a callback onto the tail of the specified rcu_batch structure. + */ +static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head) +{ + *b->tail = head; + b->tail = &head->next; +} + +/* + * Is the specified rcu_batch structure empty? + */ +static inline bool rcu_batch_empty(struct rcu_batch *b) +{ + return b->tail == &b->head; +} + +/* + * Remove the callback at the head of the specified rcu_batch structure + * and return a pointer to it, or return NULL if the structure is empty. + */ +static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b) +{ + struct rcu_head *head; + + if (rcu_batch_empty(b)) + return NULL; + + head = b->head; + b->head = head->next; + if (b->tail == &head->next) + rcu_batch_init(b); + + return head; +} + +/* + * Move all callbacks from the rcu_batch structure specified by "from" to + * the structure specified by "to". + */ +static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from) +{ + if (!rcu_batch_empty(from)) { + *to->tail = from->head; + to->tail = from->tail; + rcu_batch_init(from); + } +} + static int init_srcu_struct_fields(struct srcu_struct *sp) { sp->completed = 0; - sp->srcu_gp_seq = 0; - atomic_set(&sp->srcu_exp_cnt, 0); spin_lock_init(&sp->queue_lock); - rcu_segcblist_init(&sp->srcu_cblist); + sp->running = false; + rcu_batch_init(&sp->batch_queue); + rcu_batch_init(&sp->batch_check0); + rcu_batch_init(&sp->batch_check1); + rcu_batch_init(&sp->batch_done); INIT_DELAYED_WORK(&sp->work, process_srcu); sp->per_cpu_ref = alloc_percpu(struct srcu_array); return sp->per_cpu_ref ? 0 : -ENOMEM; @@ -180,8 +239,6 @@ static bool srcu_readers_active(struct srcu_struct *sp) return sum; } -#define SRCU_INTERVAL 1 - /** * cleanup_srcu_struct - deconstruct a sleep-RCU structure * @sp: structure to clean up. @@ -197,16 +254,8 @@ static bool srcu_readers_active(struct srcu_struct *sp) */ void cleanup_srcu_struct(struct srcu_struct *sp) { - WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt)); if (WARN_ON(srcu_readers_active(sp))) return; /* Leakage unless caller handles error. */ - if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist))) - return; /* Leakage unless caller handles error. */ - flush_delayed_work(&sp->work); - if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE)) { - pr_info("cleanup_srcu_struct: Active srcu_struct %lu CBs %c state: %d\n", rcu_segcblist_n_cbs(&sp->srcu_cblist), ".E"[rcu_segcblist_empty(&sp->srcu_cblist)], rcu_seq_state(READ_ONCE(sp->srcu_gp_seq))); - return; /* Caller forgot to stop doing call_srcu()? */ - } free_percpu(sp->per_cpu_ref); sp->per_cpu_ref = NULL; } @@ -245,36 +294,26 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); * We use an adaptive strategy for synchronize_srcu() and especially for * synchronize_srcu_expedited(). We spin for a fixed time period * (defined below) to allow SRCU readers to exit their read-side critical - * sections. If there are still some readers after a few microseconds, - * we repeatedly block for 1-millisecond time periods. + * sections. If there are still some readers after 10 microseconds, + * we repeatedly block for 1-millisecond time periods. This approach + * has done well in testing, so there is no need for a config parameter. */ #define SRCU_RETRY_CHECK_DELAY 5 +#define SYNCHRONIZE_SRCU_TRYCOUNT 2 +#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT 12 /* - * Start an SRCU grace period. - */ -static void srcu_gp_start(struct srcu_struct *sp) -{ - int state; - - rcu_segcblist_accelerate(&sp->srcu_cblist, - rcu_seq_snap(&sp->srcu_gp_seq)); - rcu_seq_start(&sp->srcu_gp_seq); - state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); - WARN_ON_ONCE(state != SRCU_STATE_SCAN1); -} - -/* - * Wait until all readers counted by array index idx complete, but - * loop an additional time if there is an expedited grace period pending. - * The caller must ensure that ->completed is not changed while checking. + * @@@ Wait until all pre-existing readers complete. Such readers + * will have used the index specified by "idx". + * the caller should ensures the ->completed is not changed while checking + * and idx = (->completed & 1) ^ 1 */ static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount) { for (;;) { if (srcu_readers_active_idx_check(sp, idx)) return true; - if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0) + if (--trycount <= 0) return false; udelay(SRCU_RETRY_CHECK_DELAY); } @@ -299,19 +338,6 @@ static void srcu_flip(struct srcu_struct *sp) smp_mb(); /* D */ /* Pairs with C. */ } -/* - * End an SRCU grace period. - */ -static void srcu_gp_end(struct srcu_struct *sp) -{ - rcu_seq_end(&sp->srcu_gp_seq); - - spin_lock_irq(&sp->queue_lock); - rcu_segcblist_advance(&sp->srcu_cblist, - rcu_seq_current(&sp->srcu_gp_seq)); - spin_unlock_irq(&sp->queue_lock); -} - /* * Enqueue an SRCU callback on the specified srcu_struct structure, * initiating grace-period processing if it is not already running. @@ -348,24 +374,26 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head, head->func = func; spin_lock_irqsave(&sp->queue_lock, flags); smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ - rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); - if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) { - srcu_gp_start(sp); + rcu_batch_queue(&sp->batch_queue, head); + if (!sp->running) { + sp->running = true; queue_delayed_work(system_power_efficient_wq, &sp->work, 0); } spin_unlock_irqrestore(&sp->queue_lock, flags); } EXPORT_SYMBOL_GPL(call_srcu); -static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay); +static void srcu_advance_batches(struct srcu_struct *sp, int trycount); +static void srcu_reschedule(struct srcu_struct *sp); /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ -static void __synchronize_srcu(struct srcu_struct *sp) +static void __synchronize_srcu(struct srcu_struct *sp, int trycount) { struct rcu_synchronize rcu; struct rcu_head *head = &rcu.head; + bool done = false; RCU_LOCKDEP_WARN(lock_is_held(&sp->dep_map) || lock_is_held(&rcu_bh_lock_map) || @@ -373,8 +401,6 @@ static void __synchronize_srcu(struct srcu_struct *sp) lock_is_held(&rcu_sched_lock_map), "Illegal synchronize_srcu() in same-type SRCU (or in RCU) read-side critical section"); - if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE) - return; might_sleep(); init_completion(&rcu.completion); @@ -382,47 +408,31 @@ static void __synchronize_srcu(struct srcu_struct *sp) head->func = wakeme_after_rcu; spin_lock_irq(&sp->queue_lock); smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ - if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) { + if (!sp->running) { /* steal the processing owner */ - rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); - srcu_gp_start(sp); + sp->running = true; + rcu_batch_queue(&sp->batch_check0, head); spin_unlock_irq(&sp->queue_lock); + + srcu_advance_batches(sp, trycount); + if (!rcu_batch_empty(&sp->batch_done)) { + BUG_ON(sp->batch_done.head != head); + rcu_batch_dequeue(&sp->batch_done); + done = true; + } /* give the processing owner to work_struct */ - srcu_reschedule(sp, 0); + srcu_reschedule(sp); } else { - rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); + rcu_batch_queue(&sp->batch_queue, head); spin_unlock_irq(&sp->queue_lock); } - wait_for_completion(&rcu.completion); - smp_mb(); /* Caller's later accesses after GP. */ -} - -/** - * synchronize_srcu_expedited - Brute-force SRCU grace period - * @sp: srcu_struct with which to synchronize. - * - * Wait for an SRCU grace period to elapse, but be more aggressive about - * spinning rather than blocking when waiting. - * - * Note that synchronize_srcu_expedited() has the same deadlock and - * memory-ordering properties as does synchronize_srcu(). - */ -void synchronize_srcu_expedited(struct srcu_struct *sp) -{ - bool do_norm = rcu_gp_is_normal(); - - if (!do_norm) { - atomic_inc(&sp->srcu_exp_cnt); - smp_mb__after_atomic(); /* increment before GP. */ - } - __synchronize_srcu(sp); - if (!do_norm) { - smp_mb__before_atomic(); /* GP before decrement. */ - atomic_dec(&sp->srcu_exp_cnt); + if (!done) { + wait_for_completion(&rcu.completion); + smp_mb(); /* Caller's later accesses after GP. */ } + } -EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); /** * synchronize_srcu - wait for prior SRCU read-side critical-section completion @@ -465,13 +475,28 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); */ void synchronize_srcu(struct srcu_struct *sp) { - if (rcu_gp_is_expedited()) - synchronize_srcu_expedited(sp); - else - __synchronize_srcu(sp); + __synchronize_srcu(sp, (rcu_gp_is_expedited() && !rcu_gp_is_normal()) + ? SYNCHRONIZE_SRCU_EXP_TRYCOUNT + : SYNCHRONIZE_SRCU_TRYCOUNT); } EXPORT_SYMBOL_GPL(synchronize_srcu); +/** + * synchronize_srcu_expedited - Brute-force SRCU grace period + * @sp: srcu_struct with which to synchronize. + * + * Wait for an SRCU grace period to elapse, but be more aggressive about + * spinning rather than blocking when waiting. + * + * Note that synchronize_srcu_expedited() has the same deadlock and + * memory-ordering properties as does synchronize_srcu(). + */ +void synchronize_srcu_expedited(struct srcu_struct *sp) +{ + __synchronize_srcu(sp, SYNCHRONIZE_SRCU_EXP_TRYCOUNT); +} +EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); + /** * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete. * @sp: srcu_struct on which to wait for in-flight callbacks. @@ -495,13 +520,29 @@ unsigned long srcu_batches_completed(struct srcu_struct *sp) } EXPORT_SYMBOL_GPL(srcu_batches_completed); +#define SRCU_CALLBACK_BATCH 10 +#define SRCU_INTERVAL 1 + +/* + * Move any new SRCU callbacks to the first stage of the SRCU grace + * period pipeline. + */ +static void srcu_collect_new(struct srcu_struct *sp) +{ + if (!rcu_batch_empty(&sp->batch_queue)) { + spin_lock_irq(&sp->queue_lock); + rcu_batch_move(&sp->batch_check0, &sp->batch_queue); + spin_unlock_irq(&sp->queue_lock); + } +} + /* * Core SRCU state machine. Advance callbacks from ->batch_check0 to * ->batch_check1 and then to ->batch_done as readers drain. */ -static void srcu_advance_batches(struct srcu_struct *sp) +static void srcu_advance_batches(struct srcu_struct *sp, int trycount) { - int idx; + int idx = 1 ^ (sp->completed & 1); /* * Because readers might be delayed for an extended period after @@ -509,44 +550,50 @@ static void srcu_advance_batches(struct srcu_struct *sp) * might well be readers using both idx=0 and idx=1. We therefore * need to wait for readers to clear from both index values before * invoking a callback. - * - * The load-acquire ensures that we see the accesses performed - * by the prior grace period. */ - idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */ - if (idx == SRCU_STATE_IDLE) { - spin_lock_irq(&sp->queue_lock); - if (rcu_segcblist_empty(&sp->srcu_cblist)) { - spin_unlock_irq(&sp->queue_lock); - return; - } - idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); - if (idx == SRCU_STATE_IDLE) - srcu_gp_start(sp); - spin_unlock_irq(&sp->queue_lock); - if (idx != SRCU_STATE_IDLE) - return; /* Someone else started the grace period. */ - } - if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) { - idx = 1 ^ (sp->completed & 1); - if (!try_check_zero(sp, idx, 1)) - return; /* readers present, retry later. */ - srcu_flip(sp); - rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2); - } + if (rcu_batch_empty(&sp->batch_check0) && + rcu_batch_empty(&sp->batch_check1)) + return; /* no callbacks need to be advanced */ - if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN2) { + if (!try_check_zero(sp, idx, trycount)) + return; /* failed to advance, will try after SRCU_INTERVAL */ - /* - * SRCU read-side critical sections are normally short, - * so check at least twice in quick succession after a flip. - */ - idx = 1 ^ (sp->completed & 1); - if (!try_check_zero(sp, idx, 2)) - return; /* readers present, retry after later. */ - srcu_gp_end(sp); - } + /* + * The callbacks in ->batch_check1 have already done with their + * first zero check and flip back when they were enqueued on + * ->batch_check0 in a previous invocation of srcu_advance_batches(). + * (Presumably try_check_zero() returned false during that + * invocation, leaving the callbacks stranded on ->batch_check1.) + * They are therefore ready to invoke, so move them to ->batch_done. + */ + rcu_batch_move(&sp->batch_done, &sp->batch_check1); + + if (rcu_batch_empty(&sp->batch_check0)) + return; /* no callbacks need to be advanced */ + srcu_flip(sp); + + /* + * The callbacks in ->batch_check0 just finished their + * first check zero and flip, so move them to ->batch_check1 + * for future checking on the other idx. + */ + rcu_batch_move(&sp->batch_check1, &sp->batch_check0); + + /* + * SRCU read-side critical sections are normally short, so check + * at least twice in quick succession after a flip. + */ + trycount = trycount < 2 ? 2 : trycount; + if (!try_check_zero(sp, idx^1, trycount)) + return; /* failed to advance, will try after SRCU_INTERVAL */ + + /* + * The callbacks in ->batch_check1 have now waited for all + * pre-existing readers using both idx values. They are therefore + * ready to invoke, so move them to ->batch_done. + */ + rcu_batch_move(&sp->batch_done, &sp->batch_check1); } /* @@ -557,48 +604,45 @@ static void srcu_advance_batches(struct srcu_struct *sp) */ static void srcu_invoke_callbacks(struct srcu_struct *sp) { - struct rcu_cblist ready_cbs; - struct rcu_head *rhp; + int i; + struct rcu_head *head; - spin_lock_irq(&sp->queue_lock); - if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) { - spin_unlock_irq(&sp->queue_lock); - return; - } - rcu_cblist_init(&ready_cbs); - rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs); - spin_unlock_irq(&sp->queue_lock); - rhp = rcu_cblist_dequeue(&ready_cbs); - for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) { + for (i = 0; i < SRCU_CALLBACK_BATCH; i++) { + head = rcu_batch_dequeue(&sp->batch_done); + if (!head) + break; local_bh_disable(); - rhp->func(rhp); + head->func(head); local_bh_enable(); } - spin_lock_irq(&sp->queue_lock); - rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs); - spin_unlock_irq(&sp->queue_lock); } /* * Finished one round of SRCU grace period. Start another if there are * more SRCU callbacks queued, otherwise put SRCU into not-running state. */ -static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay) +static void srcu_reschedule(struct srcu_struct *sp) { bool pending = true; - int state; - if (rcu_segcblist_empty(&sp->srcu_cblist)) { + if (rcu_batch_empty(&sp->batch_done) && + rcu_batch_empty(&sp->batch_check1) && + rcu_batch_empty(&sp->batch_check0) && + rcu_batch_empty(&sp->batch_queue)) { spin_lock_irq(&sp->queue_lock); - state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); - if (rcu_segcblist_empty(&sp->srcu_cblist) && - state == SRCU_STATE_IDLE) + if (rcu_batch_empty(&sp->batch_done) && + rcu_batch_empty(&sp->batch_check1) && + rcu_batch_empty(&sp->batch_check0) && + rcu_batch_empty(&sp->batch_queue)) { + sp->running = false; pending = false; + } spin_unlock_irq(&sp->queue_lock); } if (pending) - queue_delayed_work(system_power_efficient_wq, &sp->work, delay); + queue_delayed_work(system_power_efficient_wq, + &sp->work, SRCU_INTERVAL); } /* @@ -610,8 +654,9 @@ void process_srcu(struct work_struct *work) sp = container_of(work, struct srcu_struct, work.work); - srcu_advance_batches(sp); + srcu_collect_new(sp); + srcu_advance_batches(sp, 1); srcu_invoke_callbacks(sp); - srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL); + srcu_reschedule(sp); } EXPORT_SYMBOL_GPL(process_srcu); diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c new file mode 100644 index 000000000000..da676b0d016b --- /dev/null +++ b/kernel/rcu/srcutree.c @@ -0,0 +1,613 @@ +/* + * Sleepable Read-Copy Update mechanism for mutual exclusion. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, you can access it online at + * http://www.gnu.org/licenses/gpl-2.0.html. + * + * Copyright (C) IBM Corporation, 2006 + * Copyright (C) Fujitsu, 2012 + * + * Author: Paul McKenney + * Lai Jiangshan + * + * For detailed explanation of Read-Copy Update mechanism see - + * Documentation/RCU/ *.txt + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "rcu.h" + +static int init_srcu_struct_fields(struct srcu_struct *sp) +{ + sp->completed = 0; + sp->srcu_gp_seq = 0; + atomic_set(&sp->srcu_exp_cnt, 0); + spin_lock_init(&sp->queue_lock); + rcu_segcblist_init(&sp->srcu_cblist); + INIT_DELAYED_WORK(&sp->work, process_srcu); + sp->per_cpu_ref = alloc_percpu(struct srcu_array); + return sp->per_cpu_ref ? 0 : -ENOMEM; +} + +#ifdef CONFIG_DEBUG_LOCK_ALLOC + +int __init_srcu_struct(struct srcu_struct *sp, const char *name, + struct lock_class_key *key) +{ + /* Don't re-initialize a lock while it is held. */ + debug_check_no_locks_freed((void *)sp, sizeof(*sp)); + lockdep_init_map(&sp->dep_map, name, key, 0); + return init_srcu_struct_fields(sp); +} +EXPORT_SYMBOL_GPL(__init_srcu_struct); + +#else /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ + +/** + * init_srcu_struct - initialize a sleep-RCU structure + * @sp: structure to initialize. + * + * Must invoke this on a given srcu_struct before passing that srcu_struct + * to any other function. Each srcu_struct represents a separate domain + * of SRCU protection. + */ +int init_srcu_struct(struct srcu_struct *sp) +{ + return init_srcu_struct_fields(sp); +} +EXPORT_SYMBOL_GPL(init_srcu_struct); + +#endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */ + +/* + * Returns approximate total of the readers' ->lock_count[] values for the + * rank of per-CPU counters specified by idx. + */ +static unsigned long srcu_readers_lock_idx(struct srcu_struct *sp, int idx) +{ + int cpu; + unsigned long sum = 0; + + for_each_possible_cpu(cpu) { + struct srcu_array *cpuc = per_cpu_ptr(sp->per_cpu_ref, cpu); + + sum += READ_ONCE(cpuc->lock_count[idx]); + } + return sum; +} + +/* + * Returns approximate total of the readers' ->unlock_count[] values for the + * rank of per-CPU counters specified by idx. + */ +static unsigned long srcu_readers_unlock_idx(struct srcu_struct *sp, int idx) +{ + int cpu; + unsigned long sum = 0; + + for_each_possible_cpu(cpu) { + struct srcu_array *cpuc = per_cpu_ptr(sp->per_cpu_ref, cpu); + + sum += READ_ONCE(cpuc->unlock_count[idx]); + } + return sum; +} + +/* + * Return true if the number of pre-existing readers is determined to + * be zero. + */ +static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx) +{ + unsigned long unlocks; + + unlocks = srcu_readers_unlock_idx(sp, idx); + + /* + * Make sure that a lock is always counted if the corresponding + * unlock is counted. Needs to be a smp_mb() as the read side may + * contain a read from a variable that is written to before the + * synchronize_srcu() in the write side. In this case smp_mb()s + * A and B act like the store buffering pattern. + * + * This smp_mb() also pairs with smp_mb() C to prevent accesses + * after the synchronize_srcu() from being executed before the + * grace period ends. + */ + smp_mb(); /* A */ + + /* + * If the locks are the same as the unlocks, then there must have + * been no readers on this index at some time in between. This does + * not mean that there are no more readers, as one could have read + * the current index but not have incremented the lock counter yet. + * + * Possible bug: There is no guarantee that there haven't been + * ULONG_MAX increments of ->lock_count[] since the unlocks were + * counted, meaning that this could return true even if there are + * still active readers. Since there are no memory barriers around + * srcu_flip(), the CPU is not required to increment ->completed + * before running srcu_readers_unlock_idx(), which means that there + * could be an arbitrarily large number of critical sections that + * execute after srcu_readers_unlock_idx() but use the old value + * of ->completed. + */ + return srcu_readers_lock_idx(sp, idx) == unlocks; +} + +/** + * srcu_readers_active - returns true if there are readers. and false + * otherwise + * @sp: which srcu_struct to count active readers (holding srcu_read_lock). + * + * Note that this is not an atomic primitive, and can therefore suffer + * severe errors when invoked on an active srcu_struct. That said, it + * can be useful as an error check at cleanup time. + */ +static bool srcu_readers_active(struct srcu_struct *sp) +{ + int cpu; + unsigned long sum = 0; + + for_each_possible_cpu(cpu) { + struct srcu_array *cpuc = per_cpu_ptr(sp->per_cpu_ref, cpu); + + sum += READ_ONCE(cpuc->lock_count[0]); + sum += READ_ONCE(cpuc->lock_count[1]); + sum -= READ_ONCE(cpuc->unlock_count[0]); + sum -= READ_ONCE(cpuc->unlock_count[1]); + } + return sum; +} + +#define SRCU_INTERVAL 1 + +/** + * cleanup_srcu_struct - deconstruct a sleep-RCU structure + * @sp: structure to clean up. + * + * Must invoke this after you are finished using a given srcu_struct that + * was initialized via init_srcu_struct(), else you leak memory. + */ +void cleanup_srcu_struct(struct srcu_struct *sp) +{ + WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt)); + if (WARN_ON(srcu_readers_active(sp))) + return; /* Leakage unless caller handles error. */ + if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist))) + return; /* Leakage unless caller handles error. */ + flush_delayed_work(&sp->work); + if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE)) { + pr_info("cleanup_srcu_struct: Active srcu_struct %lu CBs %c state: %d\n", rcu_segcblist_n_cbs(&sp->srcu_cblist), ".E"[rcu_segcblist_empty(&sp->srcu_cblist)], rcu_seq_state(READ_ONCE(sp->srcu_gp_seq))); + return; /* Caller forgot to stop doing call_srcu()? */ + } + free_percpu(sp->per_cpu_ref); + sp->per_cpu_ref = NULL; +} +EXPORT_SYMBOL_GPL(cleanup_srcu_struct); + +/* + * Counts the new reader in the appropriate per-CPU element of the + * srcu_struct. Must be called from process context. + * Returns an index that must be passed to the matching srcu_read_unlock(). + */ +int __srcu_read_lock(struct srcu_struct *sp) +{ + int idx; + + idx = READ_ONCE(sp->completed) & 0x1; + __this_cpu_inc(sp->per_cpu_ref->lock_count[idx]); + smp_mb(); /* B */ /* Avoid leaking the critical section. */ + return idx; +} +EXPORT_SYMBOL_GPL(__srcu_read_lock); + +/* + * Removes the count for the old reader from the appropriate per-CPU + * element of the srcu_struct. Note that this may well be a different + * CPU than that which was incremented by the corresponding srcu_read_lock(). + * Must be called from process context. + */ +void __srcu_read_unlock(struct srcu_struct *sp, int idx) +{ + smp_mb(); /* C */ /* Avoid leaking the critical section. */ + this_cpu_inc(sp->per_cpu_ref->unlock_count[idx]); +} +EXPORT_SYMBOL_GPL(__srcu_read_unlock); + +/* + * We use an adaptive strategy for synchronize_srcu() and especially for + * synchronize_srcu_expedited(). We spin for a fixed time period + * (defined below) to allow SRCU readers to exit their read-side critical + * sections. If there are still some readers after a few microseconds, + * we repeatedly block for 1-millisecond time periods. + */ +#define SRCU_RETRY_CHECK_DELAY 5 + +/* + * Start an SRCU grace period. + */ +static void srcu_gp_start(struct srcu_struct *sp) +{ + int state; + + rcu_segcblist_accelerate(&sp->srcu_cblist, + rcu_seq_snap(&sp->srcu_gp_seq)); + rcu_seq_start(&sp->srcu_gp_seq); + state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); + WARN_ON_ONCE(state != SRCU_STATE_SCAN1); +} + +/* + * Wait until all readers counted by array index idx complete, but + * loop an additional time if there is an expedited grace period pending. + * The caller must ensure that ->completed is not changed while checking. + */ +static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount) +{ + for (;;) { + if (srcu_readers_active_idx_check(sp, idx)) + return true; + if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0) + return false; + udelay(SRCU_RETRY_CHECK_DELAY); + } +} + +/* + * Increment the ->completed counter so that future SRCU readers will + * use the other rank of the ->(un)lock_count[] arrays. This allows + * us to wait for pre-existing readers in a starvation-free manner. + */ +static void srcu_flip(struct srcu_struct *sp) +{ + WRITE_ONCE(sp->completed, sp->completed + 1); + + /* + * Ensure that if the updater misses an __srcu_read_unlock() + * increment, that task's next __srcu_read_lock() will see the + * above counter update. Note that both this memory barrier + * and the one in srcu_readers_active_idx_check() provide the + * guarantee for __srcu_read_lock(). + */ + smp_mb(); /* D */ /* Pairs with C. */ +} + +/* + * End an SRCU grace period. + */ +static void srcu_gp_end(struct srcu_struct *sp) +{ + rcu_seq_end(&sp->srcu_gp_seq); + + spin_lock_irq(&sp->queue_lock); + rcu_segcblist_advance(&sp->srcu_cblist, + rcu_seq_current(&sp->srcu_gp_seq)); + spin_unlock_irq(&sp->queue_lock); +} + +/* + * Enqueue an SRCU callback on the specified srcu_struct structure, + * initiating grace-period processing if it is not already running. + * + * Note that all CPUs must agree that the grace period extended beyond + * all pre-existing SRCU read-side critical section. On systems with + * more than one CPU, this means that when "func()" is invoked, each CPU + * is guaranteed to have executed a full memory barrier since the end of + * its last corresponding SRCU read-side critical section whose beginning + * preceded the call to call_rcu(). It also means that each CPU executing + * an SRCU read-side critical section that continues beyond the start of + * "func()" must have executed a memory barrier after the call_rcu() + * but before the beginning of that SRCU read-side critical section. + * Note that these guarantees include CPUs that are offline, idle, or + * executing in user mode, as well as CPUs that are executing in the kernel. + * + * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the + * resulting SRCU callback function "func()", then both CPU A and CPU + * B are guaranteed to execute a full memory barrier during the time + * interval between the call to call_rcu() and the invocation of "func()". + * This guarantee applies even if CPU A and CPU B are the same CPU (but + * again only if the system has more than one CPU). + * + * Of course, these guarantees apply only for invocations of call_srcu(), + * srcu_read_lock(), and srcu_read_unlock() that are all passed the same + * srcu_struct structure. + */ +void call_srcu(struct srcu_struct *sp, struct rcu_head *head, + rcu_callback_t func) +{ + unsigned long flags; + + head->next = NULL; + head->func = func; + spin_lock_irqsave(&sp->queue_lock, flags); + smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ + rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); + if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) { + srcu_gp_start(sp); + queue_delayed_work(system_power_efficient_wq, &sp->work, 0); + } + spin_unlock_irqrestore(&sp->queue_lock, flags); +} +EXPORT_SYMBOL_GPL(call_srcu); + +static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay); + +/* + * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). + */ +static void __synchronize_srcu(struct srcu_struct *sp) +{ + struct rcu_synchronize rcu; + struct rcu_head *head = &rcu.head; + + RCU_LOCKDEP_WARN(lock_is_held(&sp->dep_map) || + lock_is_held(&rcu_bh_lock_map) || + lock_is_held(&rcu_lock_map) || + lock_is_held(&rcu_sched_lock_map), + "Illegal synchronize_srcu() in same-type SRCU (or in RCU) read-side critical section"); + + if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE) + return; + might_sleep(); + init_completion(&rcu.completion); + + head->next = NULL; + head->func = wakeme_after_rcu; + spin_lock_irq(&sp->queue_lock); + smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ + if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) { + /* steal the processing owner */ + rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); + srcu_gp_start(sp); + spin_unlock_irq(&sp->queue_lock); + /* give the processing owner to work_struct */ + srcu_reschedule(sp, 0); + } else { + rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); + spin_unlock_irq(&sp->queue_lock); + } + + wait_for_completion(&rcu.completion); + smp_mb(); /* Caller's later accesses after GP. */ +} + +/** + * synchronize_srcu_expedited - Brute-force SRCU grace period + * @sp: srcu_struct with which to synchronize. + * + * Wait for an SRCU grace period to elapse, but be more aggressive about + * spinning rather than blocking when waiting. + * + * Note that synchronize_srcu_expedited() has the same deadlock and + * memory-ordering properties as does synchronize_srcu(). + */ +void synchronize_srcu_expedited(struct srcu_struct *sp) +{ + bool do_norm = rcu_gp_is_normal(); + + if (!do_norm) { + atomic_inc(&sp->srcu_exp_cnt); + smp_mb__after_atomic(); /* increment before GP. */ + } + __synchronize_srcu(sp); + if (!do_norm) { + smp_mb__before_atomic(); /* GP before decrement. */ + atomic_dec(&sp->srcu_exp_cnt); + } +} +EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); + +/** + * synchronize_srcu - wait for prior SRCU read-side critical-section completion + * @sp: srcu_struct with which to synchronize. + * + * Wait for the count to drain to zero of both indexes. To avoid the + * possible starvation of synchronize_srcu(), it waits for the count of + * the index=((->completed & 1) ^ 1) to drain to zero at first, + * and then flip the completed and wait for the count of the other index. + * + * Can block; must be called from process context. + * + * Note that it is illegal to call synchronize_srcu() from the corresponding + * SRCU read-side critical section; doing so will result in deadlock. + * However, it is perfectly legal to call synchronize_srcu() on one + * srcu_struct from some other srcu_struct's read-side critical section, + * as long as the resulting graph of srcu_structs is acyclic. + * + * There are memory-ordering constraints implied by synchronize_srcu(). + * On systems with more than one CPU, when synchronize_srcu() returns, + * each CPU is guaranteed to have executed a full memory barrier since + * the end of its last corresponding SRCU-sched read-side critical section + * whose beginning preceded the call to synchronize_srcu(). In addition, + * each CPU having an SRCU read-side critical section that extends beyond + * the return from synchronize_srcu() is guaranteed to have executed a + * full memory barrier after the beginning of synchronize_srcu() and before + * the beginning of that SRCU read-side critical section. Note that these + * guarantees include CPUs that are offline, idle, or executing in user mode, + * as well as CPUs that are executing in the kernel. + * + * Furthermore, if CPU A invoked synchronize_srcu(), which returned + * to its caller on CPU B, then both CPU A and CPU B are guaranteed + * to have executed a full memory barrier during the execution of + * synchronize_srcu(). This guarantee applies even if CPU A and CPU B + * are the same CPU, but again only if the system has more than one CPU. + * + * Of course, these memory-ordering guarantees apply only when + * synchronize_srcu(), srcu_read_lock(), and srcu_read_unlock() are + * passed the same srcu_struct structure. + */ +void synchronize_srcu(struct srcu_struct *sp) +{ + if (rcu_gp_is_expedited()) + synchronize_srcu_expedited(sp); + else + __synchronize_srcu(sp); +} +EXPORT_SYMBOL_GPL(synchronize_srcu); + +/** + * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete. + * @sp: srcu_struct on which to wait for in-flight callbacks. + */ +void srcu_barrier(struct srcu_struct *sp) +{ + synchronize_srcu(sp); +} +EXPORT_SYMBOL_GPL(srcu_barrier); + +/** + * srcu_batches_completed - return batches completed. + * @sp: srcu_struct on which to report batch completion. + * + * Report the number of batches, correlated with, but not necessarily + * precisely the same as, the number of grace periods that have elapsed. + */ +unsigned long srcu_batches_completed(struct srcu_struct *sp) +{ + return sp->completed; +} +EXPORT_SYMBOL_GPL(srcu_batches_completed); + +/* + * Core SRCU state machine. Advance callbacks from ->batch_check0 to + * ->batch_check1 and then to ->batch_done as readers drain. + */ +static void srcu_advance_batches(struct srcu_struct *sp) +{ + int idx; + + /* + * Because readers might be delayed for an extended period after + * fetching ->completed for their index, at any point in time there + * might well be readers using both idx=0 and idx=1. We therefore + * need to wait for readers to clear from both index values before + * invoking a callback. + * + * The load-acquire ensures that we see the accesses performed + * by the prior grace period. + */ + idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */ + if (idx == SRCU_STATE_IDLE) { + spin_lock_irq(&sp->queue_lock); + if (rcu_segcblist_empty(&sp->srcu_cblist)) { + spin_unlock_irq(&sp->queue_lock); + return; + } + idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); + if (idx == SRCU_STATE_IDLE) + srcu_gp_start(sp); + spin_unlock_irq(&sp->queue_lock); + if (idx != SRCU_STATE_IDLE) + return; /* Someone else started the grace period. */ + } + + if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) { + idx = 1 ^ (sp->completed & 1); + if (!try_check_zero(sp, idx, 1)) + return; /* readers present, retry later. */ + srcu_flip(sp); + rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2); + } + + if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN2) { + + /* + * SRCU read-side critical sections are normally short, + * so check at least twice in quick succession after a flip. + */ + idx = 1 ^ (sp->completed & 1); + if (!try_check_zero(sp, idx, 2)) + return; /* readers present, retry after later. */ + srcu_gp_end(sp); + } +} + +/* + * Invoke a limited number of SRCU callbacks that have passed through + * their grace period. If there are more to do, SRCU will reschedule + * the workqueue. Note that needed memory barriers have been executed + * in this task's context by srcu_readers_active_idx_check(). + */ +static void srcu_invoke_callbacks(struct srcu_struct *sp) +{ + struct rcu_cblist ready_cbs; + struct rcu_head *rhp; + + spin_lock_irq(&sp->queue_lock); + if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) { + spin_unlock_irq(&sp->queue_lock); + return; + } + rcu_cblist_init(&ready_cbs); + rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs); + spin_unlock_irq(&sp->queue_lock); + rhp = rcu_cblist_dequeue(&ready_cbs); + for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) { + local_bh_disable(); + rhp->func(rhp); + local_bh_enable(); + } + spin_lock_irq(&sp->queue_lock); + rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs); + spin_unlock_irq(&sp->queue_lock); +} + +/* + * Finished one round of SRCU grace period. Start another if there are + * more SRCU callbacks queued, otherwise put SRCU into not-running state. + */ +static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay) +{ + bool pending = true; + int state; + + if (rcu_segcblist_empty(&sp->srcu_cblist)) { + spin_lock_irq(&sp->queue_lock); + state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); + if (rcu_segcblist_empty(&sp->srcu_cblist) && + state == SRCU_STATE_IDLE) + pending = false; + spin_unlock_irq(&sp->queue_lock); + } + + if (pending) + queue_delayed_work(system_power_efficient_wq, &sp->work, delay); +} + +/* + * This is the work-queue function that handles SRCU grace periods. + */ +void process_srcu(struct work_struct *work) +{ + struct srcu_struct *sp; + + sp = container_of(work, struct srcu_struct, work.work); + + srcu_advance_batches(sp); + srcu_invoke_callbacks(sp); + srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL); +} +EXPORT_SYMBOL_GPL(process_srcu); -- cgit v1.2.3 From 1123a6041654e8f889014659593bad4168e542c2 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Wed, 31 May 2017 14:03:11 +0200 Subject: srcu: Allow use of Classic SRCU from both process and interrupt context Linu Cherian reported a WARN in cleanup_srcu_struct() when shutting down a guest running iperf on a VFIO assigned device. This happens because irqfd_wakeup() calls srcu_read_lock(&kvm->irq_srcu) in interrupt context, while a worker thread does the same inside kvm_set_irq(). If the interrupt happens while the worker thread is executing __srcu_read_lock(), updates to the Classic SRCU ->lock_count[] field or the Tree SRCU ->srcu_lock_count[] field can be lost. The docs say you are not supposed to call srcu_read_lock() and srcu_read_unlock() from irq context, but KVM interrupt injection happens from (host) interrupt context and it would be nice if SRCU supported the use case. KVM is using SRCU here not really for the "sleepable" part, but rather due to its IPI-free fast detection of grace periods. It is therefore not desirable to switch back to RCU, which would effectively revert commit 719d93cd5f5c ("kvm/irqchip: Speed up KVM_SET_GSI_ROUTING", 2014-01-16). However, the docs are overly conservative. You can have an SRCU instance only has users in irq context, and you can mix process and irq context as long as process context users disable interrupts. In addition, __srcu_read_unlock() actually uses this_cpu_dec() on both Tree SRCU and Classic SRCU. For those two implementations, only srcu_read_lock() is unsafe. When Classic SRCU's __srcu_read_unlock() was changed to use this_cpu_dec(), in commit 5a41344a3d83 ("srcu: Simplify __srcu_read_unlock() via this_cpu_dec()", 2012-11-29), __srcu_read_lock() did two increments. Therefore it kept __this_cpu_inc(), with preempt_disable/enable in the caller. Tree SRCU however only does one increment, so on most architectures it is more efficient for __srcu_read_lock() to use this_cpu_inc(), and any performance differences appear to be down in the noise. Cc: stable@vger.kernel.org Fixes: 719d93cd5f5c ("kvm/irqchip: Speed up KVM_SET_GSI_ROUTING") Reported-by: Linu Cherian Suggested-by: Linu Cherian Cc: kvm@vger.kernel.org Signed-off-by: Paolo Bonzini Cc: Linus Torvalds Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 2 -- kernel/rcu/srcu.c | 5 ++--- 2 files changed, 2 insertions(+), 5 deletions(-) (limited to 'include/linux/srcu.h') diff --git a/include/linux/srcu.h b/include/linux/srcu.h index 167ad8831aaf..4c1d5f7e62c4 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -172,9 +172,7 @@ static inline int srcu_read_lock(struct srcu_struct *sp) __acquires(sp) { int retval; - preempt_disable(); retval = __srcu_read_lock(sp); - preempt_enable(); rcu_lock_acquire(&(sp)->dep_map); return retval; } diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c index 584d8a983883..dea03614263f 100644 --- a/kernel/rcu/srcu.c +++ b/kernel/rcu/srcu.c @@ -263,7 +263,7 @@ EXPORT_SYMBOL_GPL(cleanup_srcu_struct); /* * Counts the new reader in the appropriate per-CPU element of the - * srcu_struct. Must be called from process context. + * srcu_struct. * Returns an index that must be passed to the matching srcu_read_unlock(). */ int __srcu_read_lock(struct srcu_struct *sp) @@ -271,7 +271,7 @@ int __srcu_read_lock(struct srcu_struct *sp) int idx; idx = READ_ONCE(sp->completed) & 0x1; - __this_cpu_inc(sp->per_cpu_ref->lock_count[idx]); + this_cpu_inc(sp->per_cpu_ref->lock_count[idx]); smp_mb(); /* B */ /* Avoid leaking the critical section. */ return idx; } @@ -281,7 +281,6 @@ EXPORT_SYMBOL_GPL(__srcu_read_lock); * Removes the count for the old reader from the appropriate per-CPU * element of the srcu_struct. Note that this may well be a different * CPU than that which was incremented by the corresponding srcu_read_lock(). - * Must be called from process context. */ void __srcu_read_unlock(struct srcu_struct *sp, int idx) { -- cgit v1.2.3