summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJens Axboe <axboe@kernel.dk>2018-05-14 21:17:31 +0300
committerJens Axboe <axboe@kernel.dk>2018-05-14 21:17:31 +0300
commitc854ab5773be1c1a0d3cef0c3a3261f2c48ab7f8 (patch)
treef8736038b49c548fd13327a3d2db26804f027047
parent0eb0b63c1d1a851b4c1606f4170691835d3616a2 (diff)
downloadlinux-c854ab5773be1c1a0d3cef0c3a3261f2c48ab7f8.tar.xz
sbitmap: fix race in wait batch accounting
If we have multiple callers of sbq_wake_up(), we can end up in a situation where the wait_cnt will continually go more and more negative. Consider the case where our wake batch is 1, hence wait_cnt will start out as 1. wait_cnt == 1 CPU0 CPU1 atomic_dec_return(), cnt == 0 atomic_dec_return(), cnt == -1 cmpxchg(-1, 0) (succeeds) [wait_cnt now 0] cmpxchg(0, 1) (fails) This ends up with wait_cnt being 0, we'll wakeup immediately next time. Going through the same loop as above again, and we'll have wait_cnt -1. For the case where we have a larger wake batch, the only difference is that the starting point will be higher. We'll still end up with continually smaller batch wakeups, which defeats the purpose of the rolling wakeups. Always reset the wait_cnt to the batch value. Then it doesn't matter who wins the race. But ensure that whomever does win the race is the one that increments the ws index and wakes up our batch count, loser gets to call __sbq_wake_up() again to account his wakeups towards the next active wait state index. Fixes: 6c0ca7ae292a ("sbitmap: fix wakeup hang after sbq resize") Reviewed-by: Omar Sandoval <osandov@fb.com> Signed-off-by: Jens Axboe <axboe@kernel.dk>
-rw-r--r--lib/sbitmap.c35
1 files changed, 25 insertions, 10 deletions
diff --git a/lib/sbitmap.c b/lib/sbitmap.c
index 8f0950fbaa5c..e6d7d610778d 100644
--- a/lib/sbitmap.c
+++ b/lib/sbitmap.c
@@ -457,7 +457,7 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
return NULL;
}
-static void sbq_wake_up(struct sbitmap_queue *sbq)
+static bool __sbq_wake_up(struct sbitmap_queue *sbq)
{
struct sbq_wait_state *ws;
unsigned int wake_batch;
@@ -474,28 +474,43 @@ static void sbq_wake_up(struct sbitmap_queue *sbq)
ws = sbq_wake_ptr(sbq);
if (!ws)
- return;
+ return false;
wait_cnt = atomic_dec_return(&ws->wait_cnt);
if (wait_cnt <= 0) {
+ int ret;
+
wake_batch = READ_ONCE(sbq->wake_batch);
+
/*
* Pairs with the memory barrier in sbitmap_queue_resize() to
* ensure that we see the batch size update before the wait
* count is reset.
*/
smp_mb__before_atomic();
+
/*
- * If there are concurrent callers to sbq_wake_up(), the last
- * one to decrement the wait count below zero will bump it back
- * up. If there is a concurrent resize, the count reset will
- * either cause the cmpxchg to fail or overwrite after the
- * cmpxchg.
+ * For concurrent callers of this, the one that failed the
+ * atomic_cmpxhcg() race should call this function again
+ * to wakeup a new batch on a different 'ws'.
*/
- atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wait_cnt + wake_batch);
- sbq_index_atomic_inc(&sbq->wake_index);
- wake_up_nr(&ws->wait, wake_batch);
+ ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch);
+ if (ret == wait_cnt) {
+ sbq_index_atomic_inc(&sbq->wake_index);
+ wake_up_nr(&ws->wait, wake_batch);
+ return false;
+ }
+
+ return true;
}
+
+ return false;
+}
+
+static void sbq_wake_up(struct sbitmap_queue *sbq)
+{
+ while (__sbq_wake_up(sbq))
+ ;
}
void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr,