diff options
Diffstat (limited to 'drivers/md/bcache/journal.c')
-rw-r--r-- | drivers/md/bcache/journal.c | 293 |
1 files changed, 143 insertions, 150 deletions
diff --git a/drivers/md/bcache/journal.c b/drivers/md/bcache/journal.c index 8435f81e5d85..ecdaa671bd50 100644 --- a/drivers/md/bcache/journal.c +++ b/drivers/md/bcache/journal.c @@ -7,7 +7,6 @@ #include "bcache.h" #include "btree.h" #include "debug.h" -#include "request.h" #include <trace/events/bcache.h> @@ -31,17 +30,20 @@ static void journal_read_endio(struct bio *bio, int error) } static int journal_read_bucket(struct cache *ca, struct list_head *list, - struct btree_op *op, unsigned bucket_index) + unsigned bucket_index) { struct journal_device *ja = &ca->journal; struct bio *bio = &ja->bio; struct journal_replay *i; struct jset *j, *data = ca->set->journal.w[0].data; + struct closure cl; unsigned len, left, offset = 0; int ret = 0; sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]); + closure_init_stack(&cl); + pr_debug("reading %llu", (uint64_t) bucket); while (offset < ca->sb.bucket_size) { @@ -55,11 +57,11 @@ reread: left = ca->sb.bucket_size - offset; bio->bi_size = len << 9; bio->bi_end_io = journal_read_endio; - bio->bi_private = &op->cl; + bio->bi_private = &cl; bch_bio_map(bio, data); - closure_bio_submit(bio, &op->cl, ca); - closure_sync(&op->cl); + closure_bio_submit(bio, &cl, ca); + closure_sync(&cl); /* This function could be simpler now since we no longer write * journal entries that overlap bucket boundaries; this means @@ -72,7 +74,7 @@ reread: left = ca->sb.bucket_size - offset; struct list_head *where; size_t blocks, bytes = set_bytes(j); - if (j->magic != jset_magic(ca->set)) + if (j->magic != jset_magic(&ca->sb)) return ret; if (bytes > left << 9) @@ -129,12 +131,11 @@ next_set: return ret; } -int bch_journal_read(struct cache_set *c, struct list_head *list, - struct btree_op *op) +int bch_journal_read(struct cache_set *c, struct list_head *list) { #define read_bucket(b) \ ({ \ - int ret = journal_read_bucket(ca, list, op, b); \ + int ret = journal_read_bucket(ca, list, b); \ __set_bit(b, bitmap); \ if (ret < 0) \ return ret; \ @@ -292,8 +293,7 @@ void bch_journal_mark(struct cache_set *c, struct list_head *list) } } -int bch_journal_replay(struct cache_set *s, struct list_head *list, - struct btree_op *op) +int bch_journal_replay(struct cache_set *s, struct list_head *list) { int ret = 0, keys = 0, entries = 0; struct bkey *k; @@ -301,31 +301,30 @@ int bch_journal_replay(struct cache_set *s, struct list_head *list, list_entry(list->prev, struct journal_replay, list); uint64_t start = i->j.last_seq, end = i->j.seq, n = start; + struct keylist keylist; + + bch_keylist_init(&keylist); list_for_each_entry(i, list, list) { BUG_ON(i->pin && atomic_read(i->pin) != 1); - if (n != i->j.seq) - pr_err( - "journal entries %llu-%llu missing! (replaying %llu-%llu)\n", - n, i->j.seq - 1, start, end); + cache_set_err_on(n != i->j.seq, s, +"bcache: journal entries %llu-%llu missing! (replaying %llu-%llu)", + n, i->j.seq - 1, start, end); for (k = i->j.start; k < end(&i->j); k = bkey_next(k)) { trace_bcache_journal_replay_key(k); - bkey_copy(op->keys.top, k); - bch_keylist_push(&op->keys); - - op->journal = i->pin; - atomic_inc(op->journal); + bkey_copy(keylist.top, k); + bch_keylist_push(&keylist); - ret = bch_btree_insert(op, s); + ret = bch_btree_insert(s, &keylist, i->pin, NULL); if (ret) goto err; - BUG_ON(!bch_keylist_empty(&op->keys)); + BUG_ON(!bch_keylist_empty(&keylist)); keys++; cond_resched(); @@ -339,14 +338,13 @@ int bch_journal_replay(struct cache_set *s, struct list_head *list, pr_info("journal replay done, %i keys in %i entries, seq %llu", keys, entries, end); - +err: while (!list_empty(list)) { i = list_first_entry(list, struct journal_replay, list); list_del(&i->list); kfree(i); } -err: - closure_sync(&op->cl); + return ret; } @@ -358,48 +356,35 @@ static void btree_flush_write(struct cache_set *c) * Try to find the btree node with that references the oldest journal * entry, best is our current candidate and is locked if non NULL: */ - struct btree *b, *best = NULL; - unsigned iter; + struct btree *b, *best; + unsigned i; +retry: + best = NULL; + + for_each_cached_btree(b, c, i) + if (btree_current_write(b)->journal) { + if (!best) + best = b; + else if (journal_pin_cmp(c, + btree_current_write(best)->journal, + btree_current_write(b)->journal)) { + best = b; + } + } - for_each_cached_btree(b, c, iter) { - if (!down_write_trylock(&b->lock)) - continue; + b = best; + if (b) { + rw_lock(true, b, b->level); - if (!btree_node_dirty(b) || - !btree_current_write(b)->journal) { + if (!btree_current_write(b)->journal) { rw_unlock(true, b); - continue; + /* We raced */ + goto retry; } - if (!best) - best = b; - else if (journal_pin_cmp(c, - btree_current_write(best), - btree_current_write(b))) { - rw_unlock(true, best); - best = b; - } else - rw_unlock(true, b); + bch_btree_node_write(b, NULL); + rw_unlock(true, b); } - - if (best) - goto out; - - /* We can't find the best btree node, just pick the first */ - list_for_each_entry(b, &c->btree_cache, list) - if (!b->level && btree_node_dirty(b)) { - best = b; - rw_lock(true, best, best->level); - goto found; - } - -out: - if (!best) - return; -found: - if (btree_node_dirty(best)) - bch_btree_node_write(best, NULL); - rw_unlock(true, best); } #define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1) @@ -495,7 +480,7 @@ static void journal_reclaim(struct cache_set *c) do_journal_discard(ca); if (c->journal.blocks_free) - return; + goto out; /* * Allocate: @@ -521,7 +506,7 @@ static void journal_reclaim(struct cache_set *c) if (n) c->journal.blocks_free = c->sb.bucket_size >> c->block_bits; - +out: if (!journal_full(&c->journal)) __closure_wake_up(&c->journal.wait); } @@ -554,32 +539,26 @@ static void journal_write_endio(struct bio *bio, int error) struct journal_write *w = bio->bi_private; cache_set_err_on(error, w->c, "journal io error"); - closure_put(&w->c->journal.io.cl); + closure_put(&w->c->journal.io); } static void journal_write(struct closure *); static void journal_write_done(struct closure *cl) { - struct journal *j = container_of(cl, struct journal, io.cl); - struct cache_set *c = container_of(j, struct cache_set, journal); - + struct journal *j = container_of(cl, struct journal, io); struct journal_write *w = (j->cur == j->w) ? &j->w[1] : &j->w[0]; __closure_wake_up(&w->wait); - - if (c->journal_delay_ms) - closure_delay(&j->io, msecs_to_jiffies(c->journal_delay_ms)); - - continue_at(cl, journal_write, system_wq); + continue_at_nobarrier(cl, journal_write, system_wq); } static void journal_write_unlocked(struct closure *cl) __releases(c->journal.lock) { - struct cache_set *c = container_of(cl, struct cache_set, journal.io.cl); + struct cache_set *c = container_of(cl, struct cache_set, journal.io); struct cache *ca; struct journal_write *w = c->journal.cur; struct bkey *k = &c->journal.key; @@ -617,7 +596,7 @@ static void journal_write_unlocked(struct closure *cl) for_each_cache(ca, c, i) w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0]; - w->data->magic = jset_magic(c); + w->data->magic = jset_magic(&c->sb); w->data->version = BCACHE_JSET_VERSION; w->data->last_seq = last_seq(&c->journal); w->data->csum = csum_set(w->data); @@ -660,121 +639,134 @@ static void journal_write_unlocked(struct closure *cl) static void journal_write(struct closure *cl) { - struct cache_set *c = container_of(cl, struct cache_set, journal.io.cl); + struct cache_set *c = container_of(cl, struct cache_set, journal.io); spin_lock(&c->journal.lock); journal_write_unlocked(cl); } -static void __journal_try_write(struct cache_set *c, bool noflush) +static void journal_try_write(struct cache_set *c) __releases(c->journal.lock) { - struct closure *cl = &c->journal.io.cl; + struct closure *cl = &c->journal.io; + struct journal_write *w = c->journal.cur; - if (!closure_trylock(cl, &c->cl)) - spin_unlock(&c->journal.lock); - else if (noflush && journal_full(&c->journal)) { - spin_unlock(&c->journal.lock); - continue_at(cl, journal_write, system_wq); - } else + w->need_write = true; + + if (closure_trylock(cl, &c->cl)) journal_write_unlocked(cl); + else + spin_unlock(&c->journal.lock); } -#define journal_try_write(c) __journal_try_write(c, false) - -void bch_journal_meta(struct cache_set *c, struct closure *cl) +static struct journal_write *journal_wait_for_write(struct cache_set *c, + unsigned nkeys) { - struct journal_write *w; + size_t sectors; + struct closure cl; - if (CACHE_SYNC(&c->sb)) { - spin_lock(&c->journal.lock); + closure_init_stack(&cl); + + spin_lock(&c->journal.lock); - w = c->journal.cur; - w->need_write = true; + while (1) { + struct journal_write *w = c->journal.cur; - if (cl) - BUG_ON(!closure_wait(&w->wait, cl)); + sectors = __set_blocks(w->data, w->data->keys + nkeys, + c) * c->sb.block_size; - closure_flush(&c->journal.io); - __journal_try_write(c, true); + if (sectors <= min_t(size_t, + c->journal.blocks_free * c->sb.block_size, + PAGE_SECTORS << JSET_BITS)) + return w; + + /* XXX: tracepoint */ + if (!journal_full(&c->journal)) { + trace_bcache_journal_entry_full(c); + + /* + * XXX: If we were inserting so many keys that they + * won't fit in an _empty_ journal write, we'll + * deadlock. For now, handle this in + * bch_keylist_realloc() - but something to think about. + */ + BUG_ON(!w->data->keys); + + closure_wait(&w->wait, &cl); + journal_try_write(c); /* unlocks */ + } else { + trace_bcache_journal_full(c); + + closure_wait(&c->journal.wait, &cl); + journal_reclaim(c); + spin_unlock(&c->journal.lock); + + btree_flush_write(c); + } + + closure_sync(&cl); + spin_lock(&c->journal.lock); } } +static void journal_write_work(struct work_struct *work) +{ + struct cache_set *c = container_of(to_delayed_work(work), + struct cache_set, + journal.work); + spin_lock(&c->journal.lock); + journal_try_write(c); +} + /* * Entry point to the journalling code - bio_insert() and btree_invalidate() * pass bch_journal() a list of keys to be journalled, and then * bch_journal() hands those same keys off to btree_insert_async() */ -void bch_journal(struct closure *cl) +atomic_t *bch_journal(struct cache_set *c, + struct keylist *keys, + struct closure *parent) { - struct btree_op *op = container_of(cl, struct btree_op, cl); - struct cache_set *c = op->c; struct journal_write *w; - size_t b, n = ((uint64_t *) op->keys.top) - op->keys.list; - - if (op->type != BTREE_INSERT || - !CACHE_SYNC(&c->sb)) - goto out; + atomic_t *ret; - /* - * If we're looping because we errored, might already be waiting on - * another journal write: - */ - while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING) - closure_sync(cl->parent); + if (!CACHE_SYNC(&c->sb)) + return NULL; - spin_lock(&c->journal.lock); + w = journal_wait_for_write(c, bch_keylist_nkeys(keys)); - if (journal_full(&c->journal)) { - trace_bcache_journal_full(c); + memcpy(end(w->data), keys->keys, bch_keylist_bytes(keys)); + w->data->keys += bch_keylist_nkeys(keys); - closure_wait(&c->journal.wait, cl); + ret = &fifo_back(&c->journal.pin); + atomic_inc(ret); - journal_reclaim(c); + if (parent) { + closure_wait(&w->wait, parent); + journal_try_write(c); + } else if (!w->need_write) { + schedule_delayed_work(&c->journal.work, + msecs_to_jiffies(c->journal_delay_ms)); + spin_unlock(&c->journal.lock); + } else { spin_unlock(&c->journal.lock); - - btree_flush_write(c); - continue_at(cl, bch_journal, bcache_wq); } - w = c->journal.cur; - w->need_write = true; - b = __set_blocks(w->data, w->data->keys + n, c); - - if (b * c->sb.block_size > PAGE_SECTORS << JSET_BITS || - b > c->journal.blocks_free) { - trace_bcache_journal_entry_full(c); - - /* - * XXX: If we were inserting so many keys that they won't fit in - * an _empty_ journal write, we'll deadlock. For now, handle - * this in bch_keylist_realloc() - but something to think about. - */ - BUG_ON(!w->data->keys); - - BUG_ON(!closure_wait(&w->wait, cl)); - - closure_flush(&c->journal.io); - journal_try_write(c); - continue_at(cl, bch_journal, bcache_wq); - } - - memcpy(end(w->data), op->keys.list, n * sizeof(uint64_t)); - w->data->keys += n; + return ret; +} - op->journal = &fifo_back(&c->journal.pin); - atomic_inc(op->journal); +void bch_journal_meta(struct cache_set *c, struct closure *cl) +{ + struct keylist keys; + atomic_t *ref; - if (op->flush_journal) { - closure_flush(&c->journal.io); - closure_wait(&w->wait, cl->parent); - } + bch_keylist_init(&keys); - journal_try_write(c); -out: - bch_btree_insert_async(cl); + ref = bch_journal(c, &keys, cl); + if (ref) + atomic_dec_bug(ref); } void bch_journal_free(struct cache_set *c) @@ -790,6 +782,7 @@ int bch_journal_alloc(struct cache_set *c) closure_init_unlocked(&j->io); spin_lock_init(&j->lock); + INIT_DELAYED_WORK(&j->work, journal_write_work); c->journal_delay_ms = 100; |