diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2014-08-14 19:10:21 +0400 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2014-08-14 19:10:21 +0400 |
commit | d429a3639ca967ce2f35e3e8d4e70caec7149ded (patch) | |
tree | cad1e5602551b6a744f63ef062de2c2e21cfe39a /drivers/block | |
parent | 4a319a490ca59a746b3d36768c0e29ee19832366 (diff) | |
parent | 99d540018caa920b7a54e2d3048f1dff530b294b (diff) | |
download | linux-d429a3639ca967ce2f35e3e8d4e70caec7149ded.tar.xz |
Merge branch 'for-3.17/drivers' of git://git.kernel.dk/linux-block
Pull block driver changes from Jens Axboe:
"Nothing out of the ordinary here, this pull request contains:
- A big round of fixes for bcache from Kent Overstreet, Slava Pestov,
and Surbhi Palande. No new features, just a lot of fixes.
- The usual round of drbd updates from Andreas Gruenbacher, Lars
Ellenberg, and Philipp Reisner.
- virtio_blk was converted to blk-mq back in 3.13, but now Ming Lei
has taken it one step further and added support for actually using
more than one queue.
- Addition of an explicit SG_FLAG_Q_AT_HEAD for block/bsg, to
compliment the the default behavior of adding to the tail of the
queue. From Douglas Gilbert"
* 'for-3.17/drivers' of git://git.kernel.dk/linux-block: (86 commits)
bcache: Drop unneeded blk_sync_queue() calls
bcache: add mutex lock for bch_is_open
bcache: Correct printing of btree_gc_max_duration_ms
bcache: try to set b->parent properly
bcache: fix memory corruption in init error path
bcache: fix crash with incomplete cache set
bcache: Fix more early shutdown bugs
bcache: fix use-after-free in btree_gc_coalesce()
bcache: Fix an infinite loop in journal replay
bcache: fix crash in bcache_btree_node_alloc_fail tracepoint
bcache: bcache_write tracepoint was crashing
bcache: fix typo in bch_bkey_equal_header
bcache: Allocate bounce buffers with GFP_NOWAIT
bcache: Make sure to pass GFP_WAIT to mempool_alloc()
bcache: fix uninterruptible sleep in writeback thread
bcache: wait for buckets when allocating new btree root
bcache: fix crash on shutdown in passthrough mode
bcache: fix lockdep warnings on shutdown
bcache allocator: send discards with correct size
bcache: Fix to remove the rcu_sched stalls.
...
Diffstat (limited to 'drivers/block')
-rw-r--r-- | drivers/block/drbd/Makefile | 1 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_actlog.c | 518 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_bitmap.c | 150 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_debugfs.c | 958 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_debugfs.h | 39 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_int.h | 383 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_interval.h | 4 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_main.c | 302 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_nl.c | 110 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_proc.c | 125 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_receiver.c | 316 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_req.c | 527 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_req.h | 1 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_state.c | 90 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_worker.c | 348 | ||||
-rw-r--r-- | drivers/block/virtio_blk.c | 104 |
16 files changed, 2758 insertions, 1218 deletions
diff --git a/drivers/block/drbd/Makefile b/drivers/block/drbd/Makefile index 8b450338075e..4464e353c1e8 100644 --- a/drivers/block/drbd/Makefile +++ b/drivers/block/drbd/Makefile @@ -3,5 +3,6 @@ drbd-y += drbd_worker.o drbd_receiver.o drbd_req.o drbd_actlog.o drbd-y += drbd_main.o drbd_strings.o drbd_nl.o drbd-y += drbd_interval.o drbd_state.o drbd-y += drbd_nla.o +drbd-$(CONFIG_DEBUG_FS) += drbd_debugfs.o obj-$(CONFIG_BLK_DEV_DRBD) += drbd.o diff --git a/drivers/block/drbd/drbd_actlog.c b/drivers/block/drbd/drbd_actlog.c index 05a1780ffa85..d26a3fa63688 100644 --- a/drivers/block/drbd/drbd_actlog.c +++ b/drivers/block/drbd/drbd_actlog.c @@ -92,34 +92,26 @@ struct __packed al_transaction_on_disk { __be32 context[AL_CONTEXT_PER_TRANSACTION]; }; -struct update_odbm_work { - struct drbd_work w; - struct drbd_device *device; - unsigned int enr; -}; - -struct update_al_work { - struct drbd_work w; - struct drbd_device *device; - struct completion event; - int err; -}; - - -void *drbd_md_get_buffer(struct drbd_device *device) +void *drbd_md_get_buffer(struct drbd_device *device, const char *intent) { int r; wait_event(device->misc_wait, - (r = atomic_cmpxchg(&device->md_io_in_use, 0, 1)) == 0 || + (r = atomic_cmpxchg(&device->md_io.in_use, 0, 1)) == 0 || device->state.disk <= D_FAILED); - return r ? NULL : page_address(device->md_io_page); + if (r) + return NULL; + + device->md_io.current_use = intent; + device->md_io.start_jif = jiffies; + device->md_io.submit_jif = device->md_io.start_jif - 1; + return page_address(device->md_io.page); } void drbd_md_put_buffer(struct drbd_device *device) { - if (atomic_dec_and_test(&device->md_io_in_use)) + if (atomic_dec_and_test(&device->md_io.in_use)) wake_up(&device->misc_wait); } @@ -145,10 +137,11 @@ void wait_until_done_or_force_detached(struct drbd_device *device, struct drbd_b static int _drbd_md_sync_page_io(struct drbd_device *device, struct drbd_backing_dev *bdev, - struct page *page, sector_t sector, - int rw, int size) + sector_t sector, int rw) { struct bio *bio; + /* we do all our meta data IO in aligned 4k blocks. */ + const int size = 4096; int err; device->md_io.done = 0; @@ -156,15 +149,15 @@ static int _drbd_md_sync_page_io(struct drbd_device *device, if ((rw & WRITE) && !test_bit(MD_NO_FUA, &device->flags)) rw |= REQ_FUA | REQ_FLUSH; - rw |= REQ_SYNC; + rw |= REQ_SYNC | REQ_NOIDLE; bio = bio_alloc_drbd(GFP_NOIO); bio->bi_bdev = bdev->md_bdev; bio->bi_iter.bi_sector = sector; err = -EIO; - if (bio_add_page(bio, page, size, 0) != size) + if (bio_add_page(bio, device->md_io.page, size, 0) != size) goto out; - bio->bi_private = &device->md_io; + bio->bi_private = device; bio->bi_end_io = drbd_md_io_complete; bio->bi_rw = rw; @@ -179,7 +172,8 @@ static int _drbd_md_sync_page_io(struct drbd_device *device, } bio_get(bio); /* one bio_put() is in the completion handler */ - atomic_inc(&device->md_io_in_use); /* drbd_md_put_buffer() is in the completion handler */ + atomic_inc(&device->md_io.in_use); /* drbd_md_put_buffer() is in the completion handler */ + device->md_io.submit_jif = jiffies; if (drbd_insert_fault(device, (rw & WRITE) ? DRBD_FAULT_MD_WR : DRBD_FAULT_MD_RD)) bio_endio(bio, -EIO); else @@ -197,9 +191,7 @@ int drbd_md_sync_page_io(struct drbd_device *device, struct drbd_backing_dev *bd sector_t sector, int rw) { int err; - struct page *iop = device->md_io_page; - - D_ASSERT(device, atomic_read(&device->md_io_in_use) == 1); + D_ASSERT(device, atomic_read(&device->md_io.in_use) == 1); BUG_ON(!bdev->md_bdev); @@ -214,8 +206,7 @@ int drbd_md_sync_page_io(struct drbd_device *device, struct drbd_backing_dev *bd current->comm, current->pid, __func__, (unsigned long long)sector, (rw & WRITE) ? "WRITE" : "READ"); - /* we do all our meta data IO in aligned 4k blocks. */ - err = _drbd_md_sync_page_io(device, bdev, iop, sector, rw, 4096); + err = _drbd_md_sync_page_io(device, bdev, sector, rw); if (err) { drbd_err(device, "drbd_md_sync_page_io(,%llus,%s) failed with error %d\n", (unsigned long long)sector, (rw & WRITE) ? "WRITE" : "READ", err); @@ -297,26 +288,12 @@ bool drbd_al_begin_io_prepare(struct drbd_device *device, struct drbd_interval * return need_transaction; } -static int al_write_transaction(struct drbd_device *device, bool delegate); - -/* When called through generic_make_request(), we must delegate - * activity log I/O to the worker thread: a further request - * submitted via generic_make_request() within the same task - * would be queued on current->bio_list, and would only start - * after this function returns (see generic_make_request()). - * - * However, if we *are* the worker, we must not delegate to ourselves. - */ +static int al_write_transaction(struct drbd_device *device); -/* - * @delegate: delegate activity log I/O to the worker thread - */ -void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate) +void drbd_al_begin_io_commit(struct drbd_device *device) { bool locked = false; - BUG_ON(delegate && current == first_peer_device(device)->connection->worker.task); - /* Serialize multiple transactions. * This uses test_and_set_bit, memory barrier is implicit. */ @@ -335,7 +312,7 @@ void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate) rcu_read_unlock(); if (write_al_updates) - al_write_transaction(device, delegate); + al_write_transaction(device); spin_lock_irq(&device->al_lock); /* FIXME if (err) @@ -352,12 +329,10 @@ void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate) /* * @delegate: delegate activity log I/O to the worker thread */ -void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i, bool delegate) +void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i) { - BUG_ON(delegate && current == first_peer_device(device)->connection->worker.task); - if (drbd_al_begin_io_prepare(device, i)) - drbd_al_begin_io_commit(device, delegate); + drbd_al_begin_io_commit(device); } int drbd_al_begin_io_nonblock(struct drbd_device *device, struct drbd_interval *i) @@ -380,8 +355,19 @@ int drbd_al_begin_io_nonblock(struct drbd_device *device, struct drbd_interval * /* We want all necessary updates for a given request within the same transaction * We could first check how many updates are *actually* needed, * and use that instead of the worst-case nr_al_extents */ - if (available_update_slots < nr_al_extents) - return -EWOULDBLOCK; + if (available_update_slots < nr_al_extents) { + /* Too many activity log extents are currently "hot". + * + * If we have accumulated pending changes already, + * we made progress. + * + * If we cannot get even a single pending change through, + * stop the fast path until we made some progress, + * or requests to "cold" extents could be starved. */ + if (!al->pending_changes) + __set_bit(__LC_STARVING, &device->act_log->flags); + return -ENOBUFS; + } /* Is resync active in this area? */ for (enr = first; enr <= last; enr++) { @@ -452,15 +438,6 @@ static unsigned int al_extent_to_bm_page(unsigned int al_enr) (AL_EXTENT_SHIFT - BM_BLOCK_SHIFT)); } -static unsigned int rs_extent_to_bm_page(unsigned int rs_enr) -{ - return rs_enr >> - /* bit to page */ - ((PAGE_SHIFT + 3) - - /* resync extent number to bit */ - (BM_EXT_SHIFT - BM_BLOCK_SHIFT)); -} - static sector_t al_tr_number_to_on_disk_sector(struct drbd_device *device) { const unsigned int stripes = device->ldev->md.al_stripes; @@ -479,8 +456,7 @@ static sector_t al_tr_number_to_on_disk_sector(struct drbd_device *device) return device->ldev->md.md_offset + device->ldev->md.al_offset + t; } -static int -_al_write_transaction(struct drbd_device *device) +int al_write_transaction(struct drbd_device *device) { struct al_transaction_on_disk *buffer; struct lc_element *e; @@ -505,7 +481,8 @@ _al_write_transaction(struct drbd_device *device) return -EIO; } - buffer = drbd_md_get_buffer(device); /* protects md_io_buffer, al_tr_cycle, ... */ + /* protects md_io_buffer, al_tr_cycle, ... */ + buffer = drbd_md_get_buffer(device, __func__); if (!buffer) { drbd_err(device, "disk failed while waiting for md_io buffer\n"); put_ldev(device); @@ -590,38 +567,6 @@ _al_write_transaction(struct drbd_device *device) return err; } - -static int w_al_write_transaction(struct drbd_work *w, int unused) -{ - struct update_al_work *aw = container_of(w, struct update_al_work, w); - struct drbd_device *device = aw->device; - int err; - - err = _al_write_transaction(device); - aw->err = err; - complete(&aw->event); - - return err != -EIO ? err : 0; -} - -/* Calls from worker context (see w_restart_disk_io()) need to write the - transaction directly. Others came through generic_make_request(), - those need to delegate it to the worker. */ -static int al_write_transaction(struct drbd_device *device, bool delegate) -{ - if (delegate) { - struct update_al_work al_work; - init_completion(&al_work.event); - al_work.w.cb = w_al_write_transaction; - al_work.device = device; - drbd_queue_work_front(&first_peer_device(device)->connection->sender_work, - &al_work.w); - wait_for_completion(&al_work.event); - return al_work.err; - } else - return _al_write_transaction(device); -} - static int _try_lc_del(struct drbd_device *device, struct lc_element *al_ext) { int rv; @@ -682,72 +627,56 @@ int drbd_initialize_al(struct drbd_device *device, void *buffer) return 0; } -static int w_update_odbm(struct drbd_work *w, int unused) -{ - struct update_odbm_work *udw = container_of(w, struct update_odbm_work, w); - struct drbd_device *device = udw->device; - struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, }; - - if (!get_ldev(device)) { - if (__ratelimit(&drbd_ratelimit_state)) - drbd_warn(device, "Can not update on disk bitmap, local IO disabled.\n"); - kfree(udw); - return 0; - } - - drbd_bm_write_page(device, rs_extent_to_bm_page(udw->enr)); - put_ldev(device); - - kfree(udw); - - if (drbd_bm_total_weight(device) <= device->rs_failed) { - switch (device->state.conn) { - case C_SYNC_SOURCE: case C_SYNC_TARGET: - case C_PAUSED_SYNC_S: case C_PAUSED_SYNC_T: - drbd_resync_finished(device); - default: - /* nothing to do */ - break; - } - } - drbd_bcast_event(device, &sib); - - return 0; -} - +static const char *drbd_change_sync_fname[] = { + [RECORD_RS_FAILED] = "drbd_rs_failed_io", + [SET_IN_SYNC] = "drbd_set_in_sync", + [SET_OUT_OF_SYNC] = "drbd_set_out_of_sync" +}; /* ATTENTION. The AL's extents are 4MB each, while the extents in the * resync LRU-cache are 16MB each. * The caller of this function has to hold an get_ldev() reference. * + * Adjusts the caching members ->rs_left (success) or ->rs_failed (!success), + * potentially pulling in (and recounting the corresponding bits) + * this resync extent into the resync extent lru cache. + * + * Returns whether all bits have been cleared for this resync extent, + * precisely: (rs_left <= rs_failed) + * * TODO will be obsoleted once we have a caching lru of the on disk bitmap */ -static void drbd_try_clear_on_disk_bm(struct drbd_device *device, sector_t sector, - int count, int success) +static bool update_rs_extent(struct drbd_device *device, + unsigned int enr, int count, + enum update_sync_bits_mode mode) { struct lc_element *e; - struct update_odbm_work *udw; - - unsigned int enr; D_ASSERT(device, atomic_read(&device->local_cnt)); - /* I simply assume that a sector/size pair never crosses - * a 16 MB extent border. (Currently this is true...) */ - enr = BM_SECT_TO_EXT(sector); - - e = lc_get(device->resync, enr); + /* When setting out-of-sync bits, + * we don't need it cached (lc_find). + * But if it is present in the cache, + * we should update the cached bit count. + * Otherwise, that extent should be in the resync extent lru cache + * already -- or we want to pull it in if necessary -- (lc_get), + * then update and check rs_left and rs_failed. */ + if (mode == SET_OUT_OF_SYNC) + e = lc_find(device->resync, enr); + else + e = lc_get(device->resync, enr); if (e) { struct bm_extent *ext = lc_entry(e, struct bm_extent, lce); if (ext->lce.lc_number == enr) { - if (success) + if (mode == SET_IN_SYNC) ext->rs_left -= count; + else if (mode == SET_OUT_OF_SYNC) + ext->rs_left += count; else ext->rs_failed += count; if (ext->rs_left < ext->rs_failed) { - drbd_warn(device, "BAD! sector=%llus enr=%u rs_left=%d " + drbd_warn(device, "BAD! enr=%u rs_left=%d " "rs_failed=%d count=%d cstate=%s\n", - (unsigned long long)sector, ext->lce.lc_number, ext->rs_left, ext->rs_failed, count, drbd_conn_str(device->state.conn)); @@ -781,34 +710,27 @@ static void drbd_try_clear_on_disk_bm(struct drbd_device *device, sector_t secto ext->lce.lc_number, ext->rs_failed); } ext->rs_left = rs_left; - ext->rs_failed = success ? 0 : count; + ext->rs_failed = (mode == RECORD_RS_FAILED) ? count : 0; /* we don't keep a persistent log of the resync lru, * we can commit any change right away. */ lc_committed(device->resync); } - lc_put(device->resync, &ext->lce); + if (mode != SET_OUT_OF_SYNC) + lc_put(device->resync, &ext->lce); /* no race, we are within the al_lock! */ - if (ext->rs_left == ext->rs_failed) { + if (ext->rs_left <= ext->rs_failed) { ext->rs_failed = 0; - - udw = kmalloc(sizeof(*udw), GFP_ATOMIC); - if (udw) { - udw->enr = ext->lce.lc_number; - udw->w.cb = w_update_odbm; - udw->device = device; - drbd_queue_work_front(&first_peer_device(device)->connection->sender_work, - &udw->w); - } else { - drbd_warn(device, "Could not kmalloc an udw\n"); - } + return true; } - } else { + } else if (mode != SET_OUT_OF_SYNC) { + /* be quiet if lc_find() did not find it. */ drbd_err(device, "lc_get() failed! locked=%d/%d flags=%lu\n", device->resync_locked, device->resync->nr_elements, device->resync->flags); } + return false; } void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go) @@ -827,105 +749,105 @@ void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go } } -/* clear the bit corresponding to the piece of storage in question: - * size byte of data starting from sector. Only clear a bits of the affected - * one ore more _aligned_ BM_BLOCK_SIZE blocks. - * - * called by worker on C_SYNC_TARGET and receiver on SyncSource. - * - */ -void __drbd_set_in_sync(struct drbd_device *device, sector_t sector, int size, - const char *file, const unsigned int line) +/* It is called lazy update, so don't do write-out too often. */ +static bool lazy_bitmap_update_due(struct drbd_device *device) { - /* Is called from worker and receiver context _only_ */ - unsigned long sbnr, ebnr, lbnr; - unsigned long count = 0; - sector_t esector, nr_sectors; - int wake_up = 0; - unsigned long flags; + return time_after(jiffies, device->rs_last_bcast + 2*HZ); +} - if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) { - drbd_err(device, "drbd_set_in_sync: sector=%llus size=%d nonsense!\n", - (unsigned long long)sector, size); +static void maybe_schedule_on_disk_bitmap_update(struct drbd_device *device, bool rs_done) +{ + if (rs_done) + set_bit(RS_DONE, &device->flags); + /* and also set RS_PROGRESS below */ + else if (!lazy_bitmap_update_due(device)) return; - } - - if (!get_ldev(device)) - return; /* no disk, no metadata, no bitmap to clear bits in */ - - nr_sectors = drbd_get_capacity(device->this_bdev); - esector = sector + (size >> 9) - 1; - - if (!expect(sector < nr_sectors)) - goto out; - if (!expect(esector < nr_sectors)) - esector = nr_sectors - 1; - - lbnr = BM_SECT_TO_BIT(nr_sectors-1); - - /* we clear it (in sync). - * round up start sector, round down end sector. we make sure we only - * clear full, aligned, BM_BLOCK_SIZE (4K) blocks */ - if (unlikely(esector < BM_SECT_PER_BIT-1)) - goto out; - if (unlikely(esector == (nr_sectors-1))) - ebnr = lbnr; - else - ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1)); - sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1); - if (sbnr > ebnr) - goto out; + drbd_device_post_work(device, RS_PROGRESS); +} +static int update_sync_bits(struct drbd_device *device, + unsigned long sbnr, unsigned long ebnr, + enum update_sync_bits_mode mode) +{ /* - * ok, (capacity & 7) != 0 sometimes, but who cares... - * we count rs_{total,left} in bits, not sectors. + * We keep a count of set bits per resync-extent in the ->rs_left + * caching member, so we need to loop and work within the resync extent + * alignment. Typically this loop will execute exactly once. */ - count = drbd_bm_clear_bits(device, sbnr, ebnr); - if (count) { - drbd_advance_rs_marks(device, drbd_bm_total_weight(device)); - spin_lock_irqsave(&device->al_lock, flags); - drbd_try_clear_on_disk_bm(device, sector, count, true); - spin_unlock_irqrestore(&device->al_lock, flags); - - /* just wake_up unconditional now, various lc_chaged(), - * lc_put() in drbd_try_clear_on_disk_bm(). */ - wake_up = 1; + unsigned long flags; + unsigned long count = 0; + unsigned int cleared = 0; + while (sbnr <= ebnr) { + /* set temporary boundary bit number to last bit number within + * the resync extent of the current start bit number, + * but cap at provided end bit number */ + unsigned long tbnr = min(ebnr, sbnr | BM_BLOCKS_PER_BM_EXT_MASK); + unsigned long c; + + if (mode == RECORD_RS_FAILED) + /* Only called from drbd_rs_failed_io(), bits + * supposedly still set. Recount, maybe some + * of the bits have been successfully cleared + * by application IO meanwhile. + */ + c = drbd_bm_count_bits(device, sbnr, tbnr); + else if (mode == SET_IN_SYNC) + c = drbd_bm_clear_bits(device, sbnr, tbnr); + else /* if (mode == SET_OUT_OF_SYNC) */ + c = drbd_bm_set_bits(device, sbnr, tbnr); + + if (c) { + spin_lock_irqsave(&device->al_lock, flags); + cleared += update_rs_extent(device, BM_BIT_TO_EXT(sbnr), c, mode); + spin_unlock_irqrestore(&device->al_lock, flags); + count += c; + } + sbnr = tbnr + 1; } -out: - put_ldev(device); - if (wake_up) + if (count) { + if (mode == SET_IN_SYNC) { + unsigned long still_to_go = drbd_bm_total_weight(device); + bool rs_is_done = (still_to_go <= device->rs_failed); + drbd_advance_rs_marks(device, still_to_go); + if (cleared || rs_is_done) + maybe_schedule_on_disk_bitmap_update(device, rs_is_done); + } else if (mode == RECORD_RS_FAILED) + device->rs_failed += count; wake_up(&device->al_wait); + } + return count; } -/* - * this is intended to set one request worth of data out of sync. - * affects at least 1 bit, - * and at most 1+DRBD_MAX_BIO_SIZE/BM_BLOCK_SIZE bits. +/* clear the bit corresponding to the piece of storage in question: + * size byte of data starting from sector. Only clear a bits of the affected + * one ore more _aligned_ BM_BLOCK_SIZE blocks. + * + * called by worker on C_SYNC_TARGET and receiver on SyncSource. * - * called by tl_clear and drbd_send_dblock (==drbd_make_request). - * so this can be _any_ process. */ -int __drbd_set_out_of_sync(struct drbd_device *device, sector_t sector, int size, - const char *file, const unsigned int line) +int __drbd_change_sync(struct drbd_device *device, sector_t sector, int size, + enum update_sync_bits_mode mode, + const char *file, const unsigned int line) { - unsigned long sbnr, ebnr, flags; + /* Is called from worker and receiver context _only_ */ + unsigned long sbnr, ebnr, lbnr; + unsigned long count = 0; sector_t esector, nr_sectors; - unsigned int enr, count = 0; - struct lc_element *e; - /* this should be an empty REQ_FLUSH */ - if (size == 0) + /* This would be an empty REQ_FLUSH, be silent. */ + if ((mode == SET_OUT_OF_SYNC) && size == 0) return 0; - if (size < 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) { - drbd_err(device, "sector: %llus, size: %d\n", - (unsigned long long)sector, size); + if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) { + drbd_err(device, "%s: sector=%llus size=%d nonsense!\n", + drbd_change_sync_fname[mode], + (unsigned long long)sector, size); return 0; } if (!get_ldev(device)) - return 0; /* no disk, no metadata, no bitmap to set bits in */ + return 0; /* no disk, no metadata, no bitmap to manipulate bits in */ nr_sectors = drbd_get_capacity(device->this_bdev); esector = sector + (size >> 9) - 1; @@ -935,25 +857,28 @@ int __drbd_set_out_of_sync(struct drbd_device *device, sector_t sector, int size if (!expect(esector < nr_sectors)) esector = nr_sectors - 1; - /* we set it out of sync, - * we do not need to round anything here */ - sbnr = BM_SECT_TO_BIT(sector); - ebnr = BM_SECT_TO_BIT(esector); - - /* ok, (capacity & 7) != 0 sometimes, but who cares... - * we count rs_{total,left} in bits, not sectors. */ - spin_lock_irqsave(&device->al_lock, flags); - count = drbd_bm_set_bits(device, sbnr, ebnr); + lbnr = BM_SECT_TO_BIT(nr_sectors-1); - enr = BM_SECT_TO_EXT(sector); - e = lc_find(device->resync, enr); - if (e) - lc_entry(e, struct bm_extent, lce)->rs_left += count; - spin_unlock_irqrestore(&device->al_lock, flags); + if (mode == SET_IN_SYNC) { + /* Round up start sector, round down end sector. We make sure + * we only clear full, aligned, BM_BLOCK_SIZE blocks. */ + if (unlikely(esector < BM_SECT_PER_BIT-1)) + goto out; + if (unlikely(esector == (nr_sectors-1))) + ebnr = lbnr; + else + ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1)); + sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1); + } else { + /* We set it out of sync, or record resync failure. + * Should not round anything here. */ + sbnr = BM_SECT_TO_BIT(sector); + ebnr = BM_SECT_TO_BIT(esector); + } + count = update_sync_bits(device, sbnr, ebnr, mode); out: put_ldev(device); - return count; } @@ -1075,6 +1000,15 @@ int drbd_try_rs_begin_io(struct drbd_device *device, sector_t sector) struct lc_element *e; struct bm_extent *bm_ext; int i; + bool throttle = drbd_rs_should_slow_down(device, sector, true); + + /* If we need to throttle, a half-locked (only marked BME_NO_WRITES, + * not yet BME_LOCKED) extent needs to be kicked out explicitly if we + * need to throttle. There is at most one such half-locked extent, + * which is remembered in resync_wenr. */ + + if (throttle && device->resync_wenr != enr) + return -EAGAIN; spin_lock_irq(&device->al_lock); if (device->resync_wenr != LC_FREE && device->resync_wenr != enr) { @@ -1098,8 +1032,10 @@ int drbd_try_rs_begin_io(struct drbd_device *device, sector_t sector) D_ASSERT(device, test_bit(BME_NO_WRITES, &bm_ext->flags)); clear_bit(BME_NO_WRITES, &bm_ext->flags); device->resync_wenr = LC_FREE; - if (lc_put(device->resync, &bm_ext->lce) == 0) + if (lc_put(device->resync, &bm_ext->lce) == 0) { + bm_ext->flags = 0; device->resync_locked--; + } wake_up(&device->al_wait); } else { drbd_alert(device, "LOGIC BUG\n"); @@ -1161,8 +1097,20 @@ proceed: return 0; try_again: - if (bm_ext) - device->resync_wenr = enr; + if (bm_ext) { + if (throttle) { + D_ASSERT(device, !test_bit(BME_LOCKED, &bm_ext->flags)); + D_ASSERT(device, test_bit(BME_NO_WRITES, &bm_ext->flags)); + clear_bit(BME_NO_WRITES, &bm_ext->flags); + device->resync_wenr = LC_FREE; + if (lc_put(device->resync, &bm_ext->lce) == 0) { + bm_ext->flags = 0; + device->resync_locked--; + } + wake_up(&device->al_wait); + } else + device->resync_wenr = enr; + } spin_unlock_irq(&device->al_lock); return -EAGAIN; } @@ -1270,69 +1218,3 @@ int drbd_rs_del_all(struct drbd_device *device) return 0; } - -/** - * drbd_rs_failed_io() - Record information on a failure to resync the specified blocks - * @device: DRBD device. - * @sector: The sector number. - * @size: Size of failed IO operation, in byte. - */ -void drbd_rs_failed_io(struct drbd_device *device, sector_t sector, int size) -{ - /* Is called from worker and receiver context _only_ */ - unsigned long sbnr, ebnr, lbnr; - unsigned long count; - sector_t esector, nr_sectors; - int wake_up = 0; - - if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) { - drbd_err(device, "drbd_rs_failed_io: sector=%llus size=%d nonsense!\n", - (unsigned long long)sector, size); - return; - } - nr_sectors = drbd_get_capacity(device->this_bdev); - esector = sector + (size >> 9) - 1; - - if (!expect(sector < nr_sectors)) - return; - if (!expect(esector < nr_sectors)) - esector = nr_sectors - 1; - - lbnr = BM_SECT_TO_BIT(nr_sectors-1); - - /* - * round up start sector, round down end sector. we make sure we only - * handle full, aligned, BM_BLOCK_SIZE (4K) blocks */ - if (unlikely(esector < BM_SECT_PER_BIT-1)) - return; - if (unlikely(esector == (nr_sectors-1))) - ebnr = lbnr; - else - ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1)); - sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1); - - if (sbnr > ebnr) - return; - - /* - * ok, (capacity & 7) != 0 sometimes, but who cares... - * we count rs_{total,left} in bits, not sectors. - */ - spin_lock_irq(&device->al_lock); - count = drbd_bm_count_bits(device, sbnr, ebnr); - if (count) { - device->rs_failed += count; - - if (get_ldev(device)) { - drbd_try_clear_on_disk_bm(device, sector, count, false); - put_ldev(device); - } - - /* just wake_up unconditional now, various lc_chaged(), - * lc_put() in drbd_try_clear_on_disk_bm(). */ - wake_up = 1; - } - spin_unlock_irq(&device->al_lock); - if (wake_up) - wake_up(&device->al_wait); -} diff --git a/drivers/block/drbd/drbd_bitmap.c b/drivers/block/drbd/drbd_bitmap.c index 1aa29f8fdfe1..426c97aef900 100644 --- a/drivers/block/drbd/drbd_bitmap.c +++ b/drivers/block/drbd/drbd_bitmap.c @@ -22,6 +22,8 @@ the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. */ +#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt + #include <linux/bitops.h> #include <linux/vmalloc.h> #include <linux/string.h> @@ -353,9 +355,8 @@ static void bm_free_pages(struct page **pages, unsigned long number) for (i = 0; i < number; i++) { if (!pages[i]) { - printk(KERN_ALERT "drbd: bm_free_pages tried to free " - "a NULL pointer; i=%lu n=%lu\n", - i, number); + pr_alert("bm_free_pages tried to free a NULL pointer; i=%lu n=%lu\n", + i, number); continue; } __free_page(pages[i]); @@ -592,7 +593,7 @@ static void bm_memset(struct drbd_bitmap *b, size_t offset, int c, size_t len) end = offset + len; if (end > b->bm_words) { - printk(KERN_ALERT "drbd: bm_memset end > bm_words\n"); + pr_alert("bm_memset end > bm_words\n"); return; } @@ -602,7 +603,7 @@ static void bm_memset(struct drbd_bitmap *b, size_t offset, int c, size_t len) p_addr = bm_map_pidx(b, idx); bm = p_addr + MLPP(offset); if (bm+do_now > p_addr + LWPP) { - printk(KERN_ALERT "drbd: BUG BUG BUG! p_addr:%p bm:%p do_now:%d\n", + pr_alert("BUG BUG BUG! p_addr:%p bm:%p do_now:%d\n", p_addr, bm, (int)do_now); } else memset(bm, c, do_now * sizeof(long)); @@ -927,22 +928,14 @@ void drbd_bm_clear_all(struct drbd_device *device) spin_unlock_irq(&b->bm_lock); } -struct bm_aio_ctx { - struct drbd_device *device; - atomic_t in_flight; - unsigned int done; - unsigned flags; -#define BM_AIO_COPY_PAGES 1 -#define BM_AIO_WRITE_HINTED 2 -#define BM_WRITE_ALL_PAGES 4 - int error; - struct kref kref; -}; - -static void bm_aio_ctx_destroy(struct kref *kref) +static void drbd_bm_aio_ctx_destroy(struct kref *kref) { - struct bm_aio_ctx *ctx = container_of(kref, struct bm_aio_ctx, kref); + struct drbd_bm_aio_ctx *ctx = container_of(kref, struct drbd_bm_aio_ctx, kref); + unsigned long flags; + spin_lock_irqsave(&ctx->device->resource->req_lock, flags); + list_del(&ctx->list); + spin_unlock_irqrestore(&ctx->device->resource->req_lock, flags); put_ldev(ctx->device); kfree(ctx); } @@ -950,7 +943,7 @@ static void bm_aio_ctx_destroy(struct kref *kref) /* bv_page may be a copy, or may be the original */ static void bm_async_io_complete(struct bio *bio, int error) { - struct bm_aio_ctx *ctx = bio->bi_private; + struct drbd_bm_aio_ctx *ctx = bio->bi_private; struct drbd_device *device = ctx->device; struct drbd_bitmap *b = device->bitmap; unsigned int idx = bm_page_to_idx(bio->bi_io_vec[0].bv_page); @@ -993,17 +986,18 @@ static void bm_async_io_complete(struct bio *bio, int error) if (atomic_dec_and_test(&ctx->in_flight)) { ctx->done = 1; wake_up(&device->misc_wait); - kref_put(&ctx->kref, &bm_aio_ctx_destroy); + kref_put(&ctx->kref, &drbd_bm_aio_ctx_destroy); } } -static void bm_page_io_async(struct bm_aio_ctx *ctx, int page_nr, int rw) __must_hold(local) +static void bm_page_io_async(struct drbd_bm_aio_ctx *ctx, int page_nr) __must_hold(local) { struct bio *bio = bio_alloc_drbd(GFP_NOIO); struct drbd_device *device = ctx->device; struct drbd_bitmap *b = device->bitmap; struct page *page; unsigned int len; + unsigned int rw = (ctx->flags & BM_AIO_READ) ? READ : WRITE; sector_t on_disk_sector = device->ldev->md.md_offset + device->ldev->md.bm_offset; @@ -1049,9 +1043,9 @@ static void bm_page_io_async(struct bm_aio_ctx *ctx, int page_nr, int rw) __must /* * bm_rw: read/write the whole bitmap from/to its on disk location. */ -static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned lazy_writeout_upper_idx) __must_hold(local) +static int bm_rw(struct drbd_device *device, const unsigned int flags, unsigned lazy_writeout_upper_idx) __must_hold(local) { - struct bm_aio_ctx *ctx; + struct drbd_bm_aio_ctx *ctx; struct drbd_bitmap *b = device->bitmap; int num_pages, i, count = 0; unsigned long now; @@ -1067,12 +1061,13 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la * as we submit copies of pages anyways. */ - ctx = kmalloc(sizeof(struct bm_aio_ctx), GFP_NOIO); + ctx = kmalloc(sizeof(struct drbd_bm_aio_ctx), GFP_NOIO); if (!ctx) return -ENOMEM; - *ctx = (struct bm_aio_ctx) { + *ctx = (struct drbd_bm_aio_ctx) { .device = device, + .start_jif = jiffies, .in_flight = ATOMIC_INIT(1), .done = 0, .flags = flags, @@ -1080,15 +1075,21 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la .kref = { ATOMIC_INIT(2) }, }; - if (!get_ldev_if_state(device, D_ATTACHING)) { /* put is in bm_aio_ctx_destroy() */ + if (!get_ldev_if_state(device, D_ATTACHING)) { /* put is in drbd_bm_aio_ctx_destroy() */ drbd_err(device, "ASSERT FAILED: get_ldev_if_state() == 1 in bm_rw()\n"); kfree(ctx); return -ENODEV; } + /* Here D_ATTACHING is sufficient since drbd_bm_read() is called only from + drbd_adm_attach(), after device->ldev was assigned. */ - if (!ctx->flags) + if (0 == (ctx->flags & ~BM_AIO_READ)) WARN_ON(!(BM_LOCKED_MASK & b->bm_flags)); + spin_lock_irq(&device->resource->req_lock); + list_add_tail(&ctx->list, &device->pending_bitmap_io); + spin_unlock_irq(&device->resource->req_lock); + num_pages = b->bm_number_of_pages; now = jiffies; @@ -1098,13 +1099,13 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la /* ignore completely unchanged pages */ if (lazy_writeout_upper_idx && i == lazy_writeout_upper_idx) break; - if (rw & WRITE) { + if (!(flags & BM_AIO_READ)) { if ((flags & BM_AIO_WRITE_HINTED) && !test_and_clear_bit(BM_PAGE_HINT_WRITEOUT, &page_private(b->bm_pages[i]))) continue; - if (!(flags & BM_WRITE_ALL_PAGES) && + if (!(flags & BM_AIO_WRITE_ALL_PAGES) && bm_test_page_unchanged(b->bm_pages[i])) { dynamic_drbd_dbg(device, "skipped bm write for idx %u\n", i); continue; @@ -1118,7 +1119,7 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la } } atomic_inc(&ctx->in_flight); - bm_page_io_async(ctx, i, rw); + bm_page_io_async(ctx, i); ++count; cond_resched(); } @@ -1134,12 +1135,12 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la if (!atomic_dec_and_test(&ctx->in_flight)) wait_until_done_or_force_detached(device, device->ldev, &ctx->done); else - kref_put(&ctx->kref, &bm_aio_ctx_destroy); + kref_put(&ctx->kref, &drbd_bm_aio_ctx_destroy); /* summary for global bitmap IO */ if (flags == 0) drbd_info(device, "bitmap %s of %u pages took %lu jiffies\n", - rw == WRITE ? "WRITE" : "READ", + (flags & BM_AIO_READ) ? "READ" : "WRITE", count, jiffies - now); if (ctx->error) { @@ -1152,20 +1153,18 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la err = -EIO; /* Disk timeout/force-detach during IO... */ now = jiffies; - if (rw == WRITE) { - drbd_md_flush(device); - } else /* rw == READ */ { + if (flags & BM_AIO_READ) { b->bm_set = bm_count_bits(b); drbd_info(device, "recounting of set bits took additional %lu jiffies\n", jiffies - now); } now = b->bm_set; - if (flags == 0) + if ((flags & ~BM_AIO_READ) == 0) drbd_info(device, "%s (%lu bits) marked out-of-sync by on disk bit-map.\n", ppsize(ppb, now << (BM_BLOCK_SHIFT-10)), now); - kref_put(&ctx->kref, &bm_aio_ctx_destroy); + kref_put(&ctx->kref, &drbd_bm_aio_ctx_destroy); return err; } @@ -1175,7 +1174,7 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la */ int drbd_bm_read(struct drbd_device *device) __must_hold(local) { - return bm_rw(device, READ, 0, 0); + return bm_rw(device, BM_AIO_READ, 0); } /** @@ -1186,7 +1185,7 @@ int drbd_bm_read(struct drbd_device *device) __must_hold(local) */ int drbd_bm_write(struct drbd_device *device) __must_hold(local) { - return bm_rw(device, WRITE, 0, 0); + return bm_rw(device, 0, 0); } /** @@ -1197,7 +1196,17 @@ int drbd_bm_write(struct drbd_device *device) __must_hold(local) */ int drbd_bm_write_all(struct drbd_device *device) __must_hold(local) { - return bm_rw(device, WRITE, BM_WRITE_ALL_PAGES, 0); + return bm_rw(device, BM_AIO_WRITE_ALL_PAGES, 0); +} + +/** + * drbd_bm_write_lazy() - Write bitmap pages 0 to @upper_idx-1, if they have changed. + * @device: DRBD device. + * @upper_idx: 0: write all changed pages; +ve: page index to stop scanning for changed pages + */ +int drbd_bm_write_lazy(struct drbd_device *device, unsigned upper_idx) __must_hold(local) +{ + return bm_rw(device, BM_AIO_COPY_PAGES, upper_idx); } /** @@ -1213,7 +1222,7 @@ int drbd_bm_write_all(struct drbd_device *device) __must_hold(local) */ int drbd_bm_write_copy_pages(struct drbd_device *device) __must_hold(local) { - return bm_rw(device, WRITE, BM_AIO_COPY_PAGES, 0); + return bm_rw(device, BM_AIO_COPY_PAGES, 0); } /** @@ -1222,62 +1231,7 @@ int drbd_bm_write_copy_pages(struct drbd_device *device) __must_hold(local) */ int drbd_bm_write_hinted(struct drbd_device *device) __must_hold(local) { - return bm_rw(device, WRITE, BM_AIO_WRITE_HINTED | BM_AIO_COPY_PAGES, 0); -} - -/** - * drbd_bm_write_page() - Writes a PAGE_SIZE aligned piece of bitmap - * @device: DRBD device. - * @idx: bitmap page index - * - * We don't want to special case on logical_block_size of the backend device, - * so we submit PAGE_SIZE aligned pieces. - * Note that on "most" systems, PAGE_SIZE is 4k. - * - * In case this becomes an issue on systems with larger PAGE_SIZE, - * we may want to change this again to write 4k aligned 4k pieces. - */ -int drbd_bm_write_page(struct drbd_device *device, unsigned int idx) __must_hold(local) -{ - struct bm_aio_ctx *ctx; - int err; - - if (bm_test_page_unchanged(device->bitmap->bm_pages[idx])) { - dynamic_drbd_dbg(device, "skipped bm page write for idx %u\n", idx); - return 0; - } - - ctx = kmalloc(sizeof(struct bm_aio_ctx), GFP_NOIO); - if (!ctx) - return -ENOMEM; - - *ctx = (struct bm_aio_ctx) { - .device = device, - .in_flight = ATOMIC_INIT(1), - .done = 0, - .flags = BM_AIO_COPY_PAGES, - .error = 0, - .kref = { ATOMIC_INIT(2) }, - }; - - if (!get_ldev_if_state(device, D_ATTACHING)) { /* put is in bm_aio_ctx_destroy() */ - drbd_err(device, "ASSERT FAILED: get_ldev_if_state() == 1 in drbd_bm_write_page()\n"); - kfree(ctx); - return -ENODEV; - } - - bm_page_io_async(ctx, idx, WRITE_SYNC); - wait_until_done_or_force_detached(device, device->ldev, &ctx->done); - - if (ctx->error) - drbd_chk_io_error(device, 1, DRBD_META_IO_ERROR); - /* that causes us to detach, so the in memory bitmap will be - * gone in a moment as well. */ - - device->bm_writ_cnt++; - err = atomic_read(&ctx->in_flight) ? -EIO : ctx->error; - kref_put(&ctx->kref, &bm_aio_ctx_destroy); - return err; + return bm_rw(device, BM_AIO_WRITE_HINTED | BM_AIO_COPY_PAGES, 0); } /* NOTE diff --git a/drivers/block/drbd/drbd_debugfs.c b/drivers/block/drbd/drbd_debugfs.c new file mode 100644 index 000000000000..5c20b18540b8 --- /dev/null +++ b/drivers/block/drbd/drbd_debugfs.c @@ -0,0 +1,958 @@ +#define pr_fmt(fmt) "drbd debugfs: " fmt +#include <linux/kernel.h> +#include <linux/module.h> +#include <linux/debugfs.h> +#include <linux/seq_file.h> +#include <linux/stat.h> +#include <linux/jiffies.h> +#include <linux/list.h> + +#include "drbd_int.h" +#include "drbd_req.h" +#include "drbd_debugfs.h" + + +/********************************************************************** + * Whenever you change the file format, remember to bump the version. * + **********************************************************************/ + +static struct dentry *drbd_debugfs_root; +static struct dentry *drbd_debugfs_version; +static struct dentry *drbd_debugfs_resources; +static struct dentry *drbd_debugfs_minors; + +static void seq_print_age_or_dash(struct seq_file *m, bool valid, unsigned long dt) +{ + if (valid) + seq_printf(m, "\t%d", jiffies_to_msecs(dt)); + else + seq_printf(m, "\t-"); +} + +static void __seq_print_rq_state_bit(struct seq_file *m, + bool is_set, char *sep, const char *set_name, const char *unset_name) +{ + if (is_set && set_name) { + seq_putc(m, *sep); + seq_puts(m, set_name); + *sep = '|'; + } else if (!is_set && unset_name) { + seq_putc(m, *sep); + seq_puts(m, unset_name); + *sep = '|'; + } +} + +static void seq_print_rq_state_bit(struct seq_file *m, + bool is_set, char *sep, const char *set_name) +{ + __seq_print_rq_state_bit(m, is_set, sep, set_name, NULL); +} + +/* pretty print enum drbd_req_state_bits req->rq_state */ +static void seq_print_request_state(struct seq_file *m, struct drbd_request *req) +{ + unsigned int s = req->rq_state; + char sep = ' '; + seq_printf(m, "\t0x%08x", s); + seq_printf(m, "\tmaster: %s", req->master_bio ? "pending" : "completed"); + + /* RQ_WRITE ignored, already reported */ + seq_puts(m, "\tlocal:"); + seq_print_rq_state_bit(m, s & RQ_IN_ACT_LOG, &sep, "in-AL"); + seq_print_rq_state_bit(m, s & RQ_POSTPONED, &sep, "postponed"); + seq_print_rq_state_bit(m, s & RQ_COMPLETION_SUSP, &sep, "suspended"); + sep = ' '; + seq_print_rq_state_bit(m, s & RQ_LOCAL_PENDING, &sep, "pending"); + seq_print_rq_state_bit(m, s & RQ_LOCAL_COMPLETED, &sep, "completed"); + seq_print_rq_state_bit(m, s & RQ_LOCAL_ABORTED, &sep, "aborted"); + seq_print_rq_state_bit(m, s & RQ_LOCAL_OK, &sep, "ok"); + if (sep == ' ') + seq_puts(m, " -"); + + /* for_each_connection ... */ + seq_printf(m, "\tnet:"); + sep = ' '; + seq_print_rq_state_bit(m, s & RQ_NET_PENDING, &sep, "pending"); + seq_print_rq_state_bit(m, s & RQ_NET_QUEUED, &sep, "queued"); + seq_print_rq_state_bit(m, s & RQ_NET_SENT, &sep, "sent"); + seq_print_rq_state_bit(m, s & RQ_NET_DONE, &sep, "done"); + seq_print_rq_state_bit(m, s & RQ_NET_SIS, &sep, "sis"); + seq_print_rq_state_bit(m, s & RQ_NET_OK, &sep, "ok"); + if (sep == ' ') + seq_puts(m, " -"); + + seq_printf(m, " :"); + sep = ' '; + seq_print_rq_state_bit(m, s & RQ_EXP_RECEIVE_ACK, &sep, "B"); + seq_print_rq_state_bit(m, s & RQ_EXP_WRITE_ACK, &sep, "C"); + seq_print_rq_state_bit(m, s & RQ_EXP_BARR_ACK, &sep, "barr"); + if (sep == ' ') + seq_puts(m, " -"); + seq_printf(m, "\n"); +} + +static void seq_print_one_request(struct seq_file *m, struct drbd_request *req, unsigned long now) +{ + /* change anything here, fixup header below! */ + unsigned int s = req->rq_state; + +#define RQ_HDR_1 "epoch\tsector\tsize\trw" + seq_printf(m, "0x%x\t%llu\t%u\t%s", + req->epoch, + (unsigned long long)req->i.sector, req->i.size >> 9, + (s & RQ_WRITE) ? "W" : "R"); + +#define RQ_HDR_2 "\tstart\tin AL\tsubmit" + seq_printf(m, "\t%d", jiffies_to_msecs(now - req->start_jif)); + seq_print_age_or_dash(m, s & RQ_IN_ACT_LOG, now - req->in_actlog_jif); + seq_print_age_or_dash(m, s & RQ_LOCAL_PENDING, now - req->pre_submit_jif); + +#define RQ_HDR_3 "\tsent\tacked\tdone" + seq_print_age_or_dash(m, s & RQ_NET_SENT, now - req->pre_send_jif); + seq_print_age_or_dash(m, (s & RQ_NET_SENT) && !(s & RQ_NET_PENDING), now - req->acked_jif); + seq_print_age_or_dash(m, s & RQ_NET_DONE, now - req->net_done_jif); + +#define RQ_HDR_4 "\tstate\n" + seq_print_request_state(m, req); +} +#define RQ_HDR RQ_HDR_1 RQ_HDR_2 RQ_HDR_3 RQ_HDR_4 + +static void seq_print_minor_vnr_req(struct seq_file *m, struct drbd_request *req, unsigned long now) +{ + seq_printf(m, "%u\t%u\t", req->device->minor, req->device->vnr); + seq_print_one_request(m, req, now); +} + +static void seq_print_resource_pending_meta_io(struct seq_file *m, struct drbd_resource *resource, unsigned long now) +{ + struct drbd_device *device; + unsigned int i; + + seq_puts(m, "minor\tvnr\tstart\tsubmit\tintent\n"); + rcu_read_lock(); + idr_for_each_entry(&resource->devices, device, i) { + struct drbd_md_io tmp; + /* In theory this is racy, + * in the sense that there could have been a + * drbd_md_put_buffer(); drbd_md_get_buffer(); + * between accessing these members here. */ + tmp = device->md_io; + if (atomic_read(&tmp.in_use)) { + seq_printf(m, "%u\t%u\t%d\t", + device->minor, device->vnr, + jiffies_to_msecs(now - tmp.start_jif)); + if (time_before(tmp.submit_jif, tmp.start_jif)) + seq_puts(m, "-\t"); + else + seq_printf(m, "%d\t", jiffies_to_msecs(now - tmp.submit_jif)); + seq_printf(m, "%s\n", tmp.current_use); + } + } + rcu_read_unlock(); +} + +static void seq_print_waiting_for_AL(struct seq_file *m, struct drbd_resource *resource, unsigned long now) +{ + struct drbd_device *device; + unsigned int i; + + seq_puts(m, "minor\tvnr\tage\t#waiting\n"); + rcu_read_lock(); + idr_for_each_entry(&resource->devices, device, i) { + unsigned long jif; + struct drbd_request *req; + int n = atomic_read(&device->ap_actlog_cnt); + if (n) { + spin_lock_irq(&device->resource->req_lock); + req = list_first_entry_or_null(&device->pending_master_completion[1], + struct drbd_request, req_pending_master_completion); + /* if the oldest request does not wait for the activity log + * it is not interesting for us here */ + if (req && !(req->rq_state & RQ_IN_ACT_LOG)) + jif = req->start_jif; + else + req = NULL; + spin_unlock_irq(&device->resource->req_lock); + } + if (n) { + seq_printf(m, "%u\t%u\t", device->minor, device->vnr); + if (req) + seq_printf(m, "%u\t", jiffies_to_msecs(now - jif)); + else + seq_puts(m, "-\t"); + seq_printf(m, "%u\n", n); + } + } + rcu_read_unlock(); +} + +static void seq_print_device_bitmap_io(struct seq_file *m, struct drbd_device *device, unsigned long now) +{ + struct drbd_bm_aio_ctx *ctx; + unsigned long start_jif; + unsigned int in_flight; + unsigned int flags; + spin_lock_irq(&device->resource->req_lock); + ctx = list_first_entry_or_null(&device->pending_bitmap_io, struct drbd_bm_aio_ctx, list); + if (ctx && ctx->done) + ctx = NULL; + if (ctx) { + start_jif = ctx->start_jif; + in_flight = atomic_read(&ctx->in_flight); + flags = ctx->flags; + } + spin_unlock_irq(&device->resource->req_lock); + if (ctx) { + seq_printf(m, "%u\t%u\t%c\t%u\t%u\n", + device->minor, device->vnr, + (flags & BM_AIO_READ) ? 'R' : 'W', + jiffies_to_msecs(now - start_jif), + in_flight); + } +} + +static void seq_print_resource_pending_bitmap_io(struct seq_file *m, struct drbd_resource *resource, unsigned long now) +{ + struct drbd_device *device; + unsigned int i; + + seq_puts(m, "minor\tvnr\trw\tage\t#in-flight\n"); + rcu_read_lock(); + idr_for_each_entry(&resource->devices, device, i) { + seq_print_device_bitmap_io(m, device, now); + } + rcu_read_unlock(); +} + +/* pretty print enum peer_req->flags */ +static void seq_print_peer_request_flags(struct seq_file *m, struct drbd_peer_request *peer_req) +{ + unsigned long f = peer_req->flags; + char sep = ' '; + + __seq_print_rq_state_bit(m, f & EE_SUBMITTED, &sep, "submitted", "preparing"); + __seq_print_rq_state_bit(m, f & EE_APPLICATION, &sep, "application", "internal"); + seq_print_rq_state_bit(m, f & EE_CALL_AL_COMPLETE_IO, &sep, "in-AL"); + seq_print_rq_state_bit(m, f & EE_SEND_WRITE_ACK, &sep, "C"); + seq_print_rq_state_bit(m, f & EE_MAY_SET_IN_SYNC, &sep, "set-in-sync"); + + if (f & EE_IS_TRIM) { + seq_putc(m, sep); + sep = '|'; + if (f & EE_IS_TRIM_USE_ZEROOUT) + seq_puts(m, "zero-out"); + else + seq_puts(m, "trim"); + } + seq_putc(m, '\n'); +} + +static void seq_print_peer_request(struct seq_file *m, + struct drbd_device *device, struct list_head *lh, + unsigned long now) +{ + bool reported_preparing = false; + struct drbd_peer_request *peer_req; + list_for_each_entry(peer_req, lh, w.list) { + if (reported_preparing && !(peer_req->flags & EE_SUBMITTED)) + continue; + + if (device) + seq_printf(m, "%u\t%u\t", device->minor, device->vnr); + + seq_printf(m, "%llu\t%u\t%c\t%u\t", + (unsigned long long)peer_req->i.sector, peer_req->i.size >> 9, + (peer_req->flags & EE_WRITE) ? 'W' : 'R', + jiffies_to_msecs(now - peer_req->submit_jif)); + seq_print_peer_request_flags(m, peer_req); + if (peer_req->flags & EE_SUBMITTED) + break; + else + reported_preparing = true; + } +} + +static void seq_print_device_peer_requests(struct seq_file *m, + struct drbd_device *device, unsigned long now) +{ + seq_puts(m, "minor\tvnr\tsector\tsize\trw\tage\tflags\n"); + spin_lock_irq(&device->resource->req_lock); + seq_print_peer_request(m, device, &device->active_ee, now); + seq_print_peer_request(m, device, &device->read_ee, now); + seq_print_peer_request(m, device, &device->sync_ee, now); + spin_unlock_irq(&device->resource->req_lock); + if (test_bit(FLUSH_PENDING, &device->flags)) { + seq_printf(m, "%u\t%u\t-\t-\tF\t%u\tflush\n", + device->minor, device->vnr, + jiffies_to_msecs(now - device->flush_jif)); + } +} + +static void seq_print_resource_pending_peer_requests(struct seq_file *m, + struct drbd_resource *resource, unsigned long now) +{ + struct drbd_device *device; + unsigned int i; + + rcu_read_lock(); + idr_for_each_entry(&resource->devices, device, i) { + seq_print_device_peer_requests(m, device, now); + } + rcu_read_unlock(); +} + +static void seq_print_resource_transfer_log_summary(struct seq_file *m, + struct drbd_resource *resource, + struct drbd_connection *connection, + unsigned long now) +{ + struct drbd_request *req; + unsigned int count = 0; + unsigned int show_state = 0; + + seq_puts(m, "n\tdevice\tvnr\t" RQ_HDR); + spin_lock_irq(&resource->req_lock); + list_for_each_entry(req, &connection->transfer_log, tl_requests) { + unsigned int tmp = 0; + unsigned int s; + ++count; + + /* don't disable irq "forever" */ + if (!(count & 0x1ff)) { + struct drbd_request *req_next; + kref_get(&req->kref); + spin_unlock_irq(&resource->req_lock); + cond_resched(); + spin_lock_irq(&resource->req_lock); + req_next = list_next_entry(req, tl_requests); + if (kref_put(&req->kref, drbd_req_destroy)) + req = req_next; + if (&req->tl_requests == &connection->transfer_log) + break; + } + + s = req->rq_state; + + /* This is meant to summarize timing issues, to be able to tell + * local disk problems from network problems. + * Skip requests, if we have shown an even older request with + * similar aspects already. */ + if (req->master_bio == NULL) + tmp |= 1; + if ((s & RQ_LOCAL_MASK) && (s & RQ_LOCAL_PENDING)) + tmp |= 2; + if (s & RQ_NET_MASK) { + if (!(s & RQ_NET_SENT)) + tmp |= 4; + if (s & RQ_NET_PENDING) + tmp |= 8; + if (!(s & RQ_NET_DONE)) + tmp |= 16; + } + if ((tmp & show_state) == tmp) + continue; + show_state |= tmp; + seq_printf(m, "%u\t", count); + seq_print_minor_vnr_req(m, req, now); + if (show_state == 0x1f) + break; + } + spin_unlock_irq(&resource->req_lock); +} + +/* TODO: transfer_log and friends should be moved to resource */ +static int in_flight_summary_show(struct seq_file *m, void *pos) +{ + struct drbd_resource *resource = m->private; + struct drbd_connection *connection; + unsigned long jif = jiffies; + + connection = first_connection(resource); + /* This does not happen, actually. + * But be robust and prepare for future code changes. */ + if (!connection || !kref_get_unless_zero(&connection->kref)) + return -ESTALE; + + /* BUMP me if you change the file format/content/presentation */ + seq_printf(m, "v: %u\n\n", 0); + + seq_puts(m, "oldest bitmap IO\n"); + seq_print_resource_pending_bitmap_io(m, resource, jif); + seq_putc(m, '\n'); + + seq_puts(m, "meta data IO\n"); + seq_print_resource_pending_meta_io(m, resource, jif); + seq_putc(m, '\n'); + + seq_puts(m, "socket buffer stats\n"); + /* for each connection ... once we have more than one */ + rcu_read_lock(); + if (connection->data.socket) { + /* open coded SIOCINQ, the "relevant" part */ + struct tcp_sock *tp = tcp_sk(connection->data.socket->sk); + int answ = tp->rcv_nxt - tp->copied_seq; + seq_printf(m, "unread receive buffer: %u Byte\n", answ); + /* open coded SIOCOUTQ, the "relevant" part */ + answ = tp->write_seq - tp->snd_una; + seq_printf(m, "unacked send buffer: %u Byte\n", answ); + } + rcu_read_unlock(); + seq_putc(m, '\n'); + + seq_puts(m, "oldest peer requests\n"); + seq_print_resource_pending_peer_requests(m, resource, jif); + seq_putc(m, '\n'); + + seq_puts(m, "application requests waiting for activity log\n"); + seq_print_waiting_for_AL(m, resource, jif); + seq_putc(m, '\n'); + + seq_puts(m, "oldest application requests\n"); + seq_print_resource_transfer_log_summary(m, resource, connection, jif); + seq_putc(m, '\n'); + + jif = jiffies - jif; + if (jif) + seq_printf(m, "generated in %d ms\n", jiffies_to_msecs(jif)); + kref_put(&connection->kref, drbd_destroy_connection); + return 0; +} + +/* simple_positive(file->f_dentry) respectively debugfs_positive(), + * but neither is "reachable" from here. + * So we have our own inline version of it above. :-( */ +static inline int debugfs_positive(struct dentry *dentry) +{ + return dentry->d_inode && !d_unhashed(dentry); +} + +/* make sure at *open* time that the respective object won't go away. */ +static int drbd_single_open(struct file *file, int (*show)(struct seq_file *, void *), + void *data, struct kref *kref, + void (*release)(struct kref *)) +{ + struct dentry *parent; + int ret = -ESTALE; + + /* Are we still linked, + * or has debugfs_remove() already been called? */ + parent = file->f_dentry->d_parent; + /* not sure if this can happen: */ + if (!parent || !parent->d_inode) + goto out; + /* serialize with d_delete() */ + mutex_lock(&parent->d_inode->i_mutex); + /* Make sure the object is still alive */ + if (debugfs_positive(file->f_dentry) + && kref_get_unless_zero(kref)) + ret = 0; + mutex_unlock(&parent->d_inode->i_mutex); + if (!ret) { + ret = single_open(file, show, data); + if (ret) + kref_put(kref, release); + } +out: + return ret; +} + +static int in_flight_summary_open(struct inode *inode, struct file *file) +{ + struct drbd_resource *resource = inode->i_private; + return drbd_single_open(file, in_flight_summary_show, resource, + &resource->kref, drbd_destroy_resource); +} + +static int in_flight_summary_release(struct inode *inode, struct file *file) +{ + struct drbd_resource *resource = inode->i_private; + kref_put(&resource->kref, drbd_destroy_resource); + return single_release(inode, file); +} + +static const struct file_operations in_flight_summary_fops = { + .owner = THIS_MODULE, + .open = in_flight_summary_open, + .read = seq_read, + .llseek = seq_lseek, + .release = in_flight_summary_release, +}; + +void drbd_debugfs_resource_add(struct drbd_resource *resource) +{ + struct dentry *dentry; + if (!drbd_debugfs_resources) + return; + + dentry = debugfs_create_dir(resource->name, drbd_debugfs_resources); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + resource->debugfs_res = dentry; + + dentry = debugfs_create_dir("volumes", resource->debugfs_res); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + resource->debugfs_res_volumes = dentry; + + dentry = debugfs_create_dir("connections", resource->debugfs_res); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + resource->debugfs_res_connections = dentry; + + dentry = debugfs_create_file("in_flight_summary", S_IRUSR|S_IRGRP, + resource->debugfs_res, resource, + &in_flight_summary_fops); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + resource->debugfs_res_in_flight_summary = dentry; + return; + +fail: + drbd_debugfs_resource_cleanup(resource); + drbd_err(resource, "failed to create debugfs dentry\n"); +} + +static void drbd_debugfs_remove(struct dentry **dp) +{ + debugfs_remove(*dp); + *dp = NULL; +} + +void drbd_debugfs_resource_cleanup(struct drbd_resource *resource) +{ + /* it is ok to call debugfs_remove(NULL) */ + drbd_debugfs_remove(&resource->debugfs_res_in_flight_summary); + drbd_debugfs_remove(&resource->debugfs_res_connections); + drbd_debugfs_remove(&resource->debugfs_res_volumes); + drbd_debugfs_remove(&resource->debugfs_res); +} + +static void seq_print_one_timing_detail(struct seq_file *m, + const struct drbd_thread_timing_details *tdp, + unsigned long now) +{ + struct drbd_thread_timing_details td; + /* No locking... + * use temporary assignment to get at consistent data. */ + do { + td = *tdp; + } while (td.cb_nr != tdp->cb_nr); + if (!td.cb_addr) + return; + seq_printf(m, "%u\t%d\t%s:%u\t%ps\n", + td.cb_nr, + jiffies_to_msecs(now - td.start_jif), + td.caller_fn, td.line, + td.cb_addr); +} + +static void seq_print_timing_details(struct seq_file *m, + const char *title, + unsigned int cb_nr, struct drbd_thread_timing_details *tdp, unsigned long now) +{ + unsigned int start_idx; + unsigned int i; + + seq_printf(m, "%s\n", title); + /* If not much is going on, this will result in natural ordering. + * If it is very busy, we will possibly skip events, or even see wrap + * arounds, which could only be avoided with locking. + */ + start_idx = cb_nr % DRBD_THREAD_DETAILS_HIST; + for (i = start_idx; i < DRBD_THREAD_DETAILS_HIST; i++) + seq_print_one_timing_detail(m, tdp+i, now); + for (i = 0; i < start_idx; i++) + seq_print_one_timing_detail(m, tdp+i, now); +} + +static int callback_history_show(struct seq_file *m, void *ignored) +{ + struct drbd_connection *connection = m->private; + unsigned long jif = jiffies; + + /* BUMP me if you change the file format/content/presentation */ + seq_printf(m, "v: %u\n\n", 0); + + seq_puts(m, "n\tage\tcallsite\tfn\n"); + seq_print_timing_details(m, "worker", connection->w_cb_nr, connection->w_timing_details, jif); + seq_print_timing_details(m, "receiver", connection->r_cb_nr, connection->r_timing_details, jif); + return 0; +} + +static int callback_history_open(struct inode *inode, struct file *file) +{ + struct drbd_connection *connection = inode->i_private; + return drbd_single_open(file, callback_history_show, connection, + &connection->kref, drbd_destroy_connection); +} + +static int callback_history_release(struct inode *inode, struct file *file) +{ + struct drbd_connection *connection = inode->i_private; + kref_put(&connection->kref, drbd_destroy_connection); + return single_release(inode, file); +} + +static const struct file_operations connection_callback_history_fops = { + .owner = THIS_MODULE, + .open = callback_history_open, + .read = seq_read, + .llseek = seq_lseek, + .release = callback_history_release, +}; + +static int connection_oldest_requests_show(struct seq_file *m, void *ignored) +{ + struct drbd_connection *connection = m->private; + unsigned long now = jiffies; + struct drbd_request *r1, *r2; + + /* BUMP me if you change the file format/content/presentation */ + seq_printf(m, "v: %u\n\n", 0); + + spin_lock_irq(&connection->resource->req_lock); + r1 = connection->req_next; + if (r1) + seq_print_minor_vnr_req(m, r1, now); + r2 = connection->req_ack_pending; + if (r2 && r2 != r1) { + r1 = r2; + seq_print_minor_vnr_req(m, r1, now); + } + r2 = connection->req_not_net_done; + if (r2 && r2 != r1) + seq_print_minor_vnr_req(m, r2, now); + spin_unlock_irq(&connection->resource->req_lock); + return 0; +} + +static int connection_oldest_requests_open(struct inode *inode, struct file *file) +{ + struct drbd_connection *connection = inode->i_private; + return drbd_single_open(file, connection_oldest_requests_show, connection, + &connection->kref, drbd_destroy_connection); +} + +static int connection_oldest_requests_release(struct inode *inode, struct file *file) +{ + struct drbd_connection *connection = inode->i_private; + kref_put(&connection->kref, drbd_destroy_connection); + return single_release(inode, file); +} + +static const struct file_operations connection_oldest_requests_fops = { + .owner = THIS_MODULE, + .open = connection_oldest_requests_open, + .read = seq_read, + .llseek = seq_lseek, + .release = connection_oldest_requests_release, +}; + +void drbd_debugfs_connection_add(struct drbd_connection *connection) +{ + struct dentry *conns_dir = connection->resource->debugfs_res_connections; + struct dentry *dentry; + if (!conns_dir) + return; + + /* Once we enable mutliple peers, + * these connections will have descriptive names. + * For now, it is just the one connection to the (only) "peer". */ + dentry = debugfs_create_dir("peer", conns_dir); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + connection->debugfs_conn = dentry; + + dentry = debugfs_create_file("callback_history", S_IRUSR|S_IRGRP, + connection->debugfs_conn, connection, + &connection_callback_history_fops); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + connection->debugfs_conn_callback_history = dentry; + + dentry = debugfs_create_file("oldest_requests", S_IRUSR|S_IRGRP, + connection->debugfs_conn, connection, + &connection_oldest_requests_fops); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + connection->debugfs_conn_oldest_requests = dentry; + return; + +fail: + drbd_debugfs_connection_cleanup(connection); + drbd_err(connection, "failed to create debugfs dentry\n"); +} + +void drbd_debugfs_connection_cleanup(struct drbd_connection *connection) +{ + drbd_debugfs_remove(&connection->debugfs_conn_callback_history); + drbd_debugfs_remove(&connection->debugfs_conn_oldest_requests); + drbd_debugfs_remove(&connection->debugfs_conn); +} + +static void resync_dump_detail(struct seq_file *m, struct lc_element *e) +{ + struct bm_extent *bme = lc_entry(e, struct bm_extent, lce); + + seq_printf(m, "%5d %s %s %s\n", bme->rs_left, + test_bit(BME_NO_WRITES, &bme->flags) ? "NO_WRITES" : "---------", + test_bit(BME_LOCKED, &bme->flags) ? "LOCKED" : "------", + test_bit(BME_PRIORITY, &bme->flags) ? "PRIORITY" : "--------" + ); +} + +static int device_resync_extents_show(struct seq_file *m, void *ignored) +{ + struct drbd_device *device = m->private; + + /* BUMP me if you change the file format/content/presentation */ + seq_printf(m, "v: %u\n\n", 0); + + if (get_ldev_if_state(device, D_FAILED)) { + lc_seq_printf_stats(m, device->resync); + lc_seq_dump_details(m, device->resync, "rs_left flags", resync_dump_detail); + put_ldev(device); + } + return 0; +} + +static int device_act_log_extents_show(struct seq_file *m, void *ignored) +{ + struct drbd_device *device = m->private; + + /* BUMP me if you change the file format/content/presentation */ + seq_printf(m, "v: %u\n\n", 0); + + if (get_ldev_if_state(device, D_FAILED)) { + lc_seq_printf_stats(m, device->act_log); + lc_seq_dump_details(m, device->act_log, "", NULL); + put_ldev(device); + } + return 0; +} + +static int device_oldest_requests_show(struct seq_file *m, void *ignored) +{ + struct drbd_device *device = m->private; + struct drbd_resource *resource = device->resource; + unsigned long now = jiffies; + struct drbd_request *r1, *r2; + int i; + + /* BUMP me if you change the file format/content/presentation */ + seq_printf(m, "v: %u\n\n", 0); + + seq_puts(m, RQ_HDR); + spin_lock_irq(&resource->req_lock); + /* WRITE, then READ */ + for (i = 1; i >= 0; --i) { + r1 = list_first_entry_or_null(&device->pending_master_completion[i], + struct drbd_request, req_pending_master_completion); + r2 = list_first_entry_or_null(&device->pending_completion[i], + struct drbd_request, req_pending_local); + if (r1) + seq_print_one_request(m, r1, now); + if (r2 && r2 != r1) + seq_print_one_request(m, r2, now); + } + spin_unlock_irq(&resource->req_lock); + return 0; +} + +static int device_data_gen_id_show(struct seq_file *m, void *ignored) +{ + struct drbd_device *device = m->private; + struct drbd_md *md; + enum drbd_uuid_index idx; + + if (!get_ldev_if_state(device, D_FAILED)) + return -ENODEV; + + md = &device->ldev->md; + spin_lock_irq(&md->uuid_lock); + for (idx = UI_CURRENT; idx <= UI_HISTORY_END; idx++) { + seq_printf(m, "0x%016llX\n", md->uuid[idx]); + } + spin_unlock_irq(&md->uuid_lock); + put_ldev(device); + return 0; +} + +#define drbd_debugfs_device_attr(name) \ +static int device_ ## name ## _open(struct inode *inode, struct file *file) \ +{ \ + struct drbd_device *device = inode->i_private; \ + return drbd_single_open(file, device_ ## name ## _show, device, \ + &device->kref, drbd_destroy_device); \ +} \ +static int device_ ## name ## _release(struct inode *inode, struct file *file) \ +{ \ + struct drbd_device *device = inode->i_private; \ + kref_put(&device->kref, drbd_destroy_device); \ + return single_release(inode, file); \ +} \ +static const struct file_operations device_ ## name ## _fops = { \ + .owner = THIS_MODULE, \ + .open = device_ ## name ## _open, \ + .read = seq_read, \ + .llseek = seq_lseek, \ + .release = device_ ## name ## _release, \ +}; + +drbd_debugfs_device_attr(oldest_requests) +drbd_debugfs_device_attr(act_log_extents) +drbd_debugfs_device_attr(resync_extents) +drbd_debugfs_device_attr(data_gen_id) + +void drbd_debugfs_device_add(struct drbd_device *device) +{ + struct dentry *vols_dir = device->resource->debugfs_res_volumes; + char minor_buf[8]; /* MINORMASK, MINORBITS == 20; */ + char vnr_buf[8]; /* volume number vnr is even 16 bit only; */ + char *slink_name = NULL; + + struct dentry *dentry; + if (!vols_dir || !drbd_debugfs_minors) + return; + + snprintf(vnr_buf, sizeof(vnr_buf), "%u", device->vnr); + dentry = debugfs_create_dir(vnr_buf, vols_dir); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + device->debugfs_vol = dentry; + + snprintf(minor_buf, sizeof(minor_buf), "%u", device->minor); + slink_name = kasprintf(GFP_KERNEL, "../resources/%s/volumes/%u", + device->resource->name, device->vnr); + if (!slink_name) + goto fail; + dentry = debugfs_create_symlink(minor_buf, drbd_debugfs_minors, slink_name); + kfree(slink_name); + slink_name = NULL; + if (IS_ERR_OR_NULL(dentry)) + goto fail; + device->debugfs_minor = dentry; + +#define DCF(name) do { \ + dentry = debugfs_create_file(#name, S_IRUSR|S_IRGRP, \ + device->debugfs_vol, device, \ + &device_ ## name ## _fops); \ + if (IS_ERR_OR_NULL(dentry)) \ + goto fail; \ + device->debugfs_vol_ ## name = dentry; \ + } while (0) + + DCF(oldest_requests); + DCF(act_log_extents); + DCF(resync_extents); + DCF(data_gen_id); +#undef DCF + return; + +fail: + drbd_debugfs_device_cleanup(device); + drbd_err(device, "failed to create debugfs entries\n"); +} + +void drbd_debugfs_device_cleanup(struct drbd_device *device) +{ + drbd_debugfs_remove(&device->debugfs_minor); + drbd_debugfs_remove(&device->debugfs_vol_oldest_requests); + drbd_debugfs_remove(&device->debugfs_vol_act_log_extents); + drbd_debugfs_remove(&device->debugfs_vol_resync_extents); + drbd_debugfs_remove(&device->debugfs_vol_data_gen_id); + drbd_debugfs_remove(&device->debugfs_vol); +} + +void drbd_debugfs_peer_device_add(struct drbd_peer_device *peer_device) +{ + struct dentry *conn_dir = peer_device->connection->debugfs_conn; + struct dentry *dentry; + char vnr_buf[8]; + + if (!conn_dir) + return; + + snprintf(vnr_buf, sizeof(vnr_buf), "%u", peer_device->device->vnr); + dentry = debugfs_create_dir(vnr_buf, conn_dir); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + peer_device->debugfs_peer_dev = dentry; + return; + +fail: + drbd_debugfs_peer_device_cleanup(peer_device); + drbd_err(peer_device, "failed to create debugfs entries\n"); +} + +void drbd_debugfs_peer_device_cleanup(struct drbd_peer_device *peer_device) +{ + drbd_debugfs_remove(&peer_device->debugfs_peer_dev); +} + +static int drbd_version_show(struct seq_file *m, void *ignored) +{ + seq_printf(m, "# %s\n", drbd_buildtag()); + seq_printf(m, "VERSION=%s\n", REL_VERSION); + seq_printf(m, "API_VERSION=%u\n", API_VERSION); + seq_printf(m, "PRO_VERSION_MIN=%u\n", PRO_VERSION_MIN); + seq_printf(m, "PRO_VERSION_MAX=%u\n", PRO_VERSION_MAX); + return 0; +} + +static int drbd_version_open(struct inode *inode, struct file *file) +{ + return single_open(file, drbd_version_show, NULL); +} + +static struct file_operations drbd_version_fops = { + .owner = THIS_MODULE, + .open = drbd_version_open, + .llseek = seq_lseek, + .read = seq_read, + .release = single_release, +}; + +/* not __exit, may be indirectly called + * from the module-load-failure path as well. */ +void drbd_debugfs_cleanup(void) +{ + drbd_debugfs_remove(&drbd_debugfs_resources); + drbd_debugfs_remove(&drbd_debugfs_minors); + drbd_debugfs_remove(&drbd_debugfs_version); + drbd_debugfs_remove(&drbd_debugfs_root); +} + +int __init drbd_debugfs_init(void) +{ + struct dentry *dentry; + + dentry = debugfs_create_dir("drbd", NULL); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + drbd_debugfs_root = dentry; + + dentry = debugfs_create_file("version", 0444, drbd_debugfs_root, NULL, &drbd_version_fops); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + drbd_debugfs_version = dentry; + + dentry = debugfs_create_dir("resources", drbd_debugfs_root); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + drbd_debugfs_resources = dentry; + + dentry = debugfs_create_dir("minors", drbd_debugfs_root); + if (IS_ERR_OR_NULL(dentry)) + goto fail; + drbd_debugfs_minors = dentry; + return 0; + +fail: + drbd_debugfs_cleanup(); + if (dentry) + return PTR_ERR(dentry); + else + return -EINVAL; +} diff --git a/drivers/block/drbd/drbd_debugfs.h b/drivers/block/drbd/drbd_debugfs.h new file mode 100644 index 000000000000..8bee21340dce --- /dev/null +++ b/drivers/block/drbd/drbd_debugfs.h @@ -0,0 +1,39 @@ +#include <linux/kernel.h> +#include <linux/module.h> +#include <linux/debugfs.h> + +#include "drbd_int.h" + +#ifdef CONFIG_DEBUG_FS +int __init drbd_debugfs_init(void); +void drbd_debugfs_cleanup(void); + +void drbd_debugfs_resource_add(struct drbd_resource *resource); +void drbd_debugfs_resource_cleanup(struct drbd_resource *resource); + +void drbd_debugfs_connection_add(struct drbd_connection *connection); +void drbd_debugfs_connection_cleanup(struct drbd_connection *connection); + +void drbd_debugfs_device_add(struct drbd_device *device); +void drbd_debugfs_device_cleanup(struct drbd_device *device); + +void drbd_debugfs_peer_device_add(struct drbd_peer_device *peer_device); +void drbd_debugfs_peer_device_cleanup(struct drbd_peer_device *peer_device); +#else + +static inline int __init drbd_debugfs_init(void) { return -ENODEV; } +static inline void drbd_debugfs_cleanup(void) { } + +static inline void drbd_debugfs_resource_add(struct drbd_resource *resource) { } +static inline void drbd_debugfs_resource_cleanup(struct drbd_resource *resource) { } + +static inline void drbd_debugfs_connection_add(struct drbd_connection *connection) { } +static inline void drbd_debugfs_connection_cleanup(struct drbd_connection *connection) { } + +static inline void drbd_debugfs_device_add(struct drbd_device *device) { } +static inline void drbd_debugfs_device_cleanup(struct drbd_device *device) { } + +static inline void drbd_debugfs_peer_device_add(struct drbd_peer_device *peer_device) { } +static inline void drbd_debugfs_peer_device_cleanup(struct drbd_peer_device *peer_device) { } + +#endif diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h index a76ceb344d64..1a000016ccdf 100644 --- a/drivers/block/drbd/drbd_int.h +++ b/drivers/block/drbd/drbd_int.h @@ -317,7 +317,63 @@ struct drbd_request { struct list_head tl_requests; /* ring list in the transfer log */ struct bio *master_bio; /* master bio pointer */ - unsigned long start_time; + + /* see struct drbd_device */ + struct list_head req_pending_master_completion; + struct list_head req_pending_local; + + /* for generic IO accounting */ + unsigned long start_jif; + + /* for DRBD internal statistics */ + + /* Minimal set of time stamps to determine if we wait for activity log + * transactions, local disk or peer. 32 bit "jiffies" are good enough, + * we don't expect a DRBD request to be stalled for several month. + */ + + /* before actual request processing */ + unsigned long in_actlog_jif; + + /* local disk */ + unsigned long pre_submit_jif; + + /* per connection */ + unsigned long pre_send_jif; + unsigned long acked_jif; + unsigned long net_done_jif; + + /* Possibly even more detail to track each phase: + * master_completion_jif + * how long did it take to complete the master bio + * (application visible latency) + * allocated_jif + * how long the master bio was blocked until we finally allocated + * a tracking struct + * in_actlog_jif + * how long did we wait for activity log transactions + * + * net_queued_jif + * when did we finally queue it for sending + * pre_send_jif + * when did we start sending it + * post_send_jif + * how long did we block in the network stack trying to send it + * acked_jif + * when did we receive (or fake, in protocol A) a remote ACK + * net_done_jif + * when did we receive final acknowledgement (P_BARRIER_ACK), + * or decide, e.g. on connection loss, that we do no longer expect + * anything from this peer for this request. + * + * pre_submit_jif + * post_sub_jif + * when did we start submiting to the lower level device, + * and how long did we block in that submit function + * local_completion_jif + * how long did it take the lower level device to complete this request + */ + /* once it hits 0, we may complete the master_bio */ atomic_t completion_ref; @@ -366,6 +422,7 @@ struct drbd_peer_request { struct drbd_interval i; /* see comments on ee flag bits below */ unsigned long flags; + unsigned long submit_jif; union { u64 block_id; struct digest_info *digest; @@ -408,6 +465,17 @@ enum { /* Is set when net_conf had two_primaries set while creating this peer_req */ __EE_IN_INTERVAL_TREE, + + /* for debugfs: */ + /* has this been submitted, or does it still wait for something else? */ + __EE_SUBMITTED, + + /* this is/was a write request */ + __EE_WRITE, + + /* this originates from application on peer + * (not some resync or verify or other DRBD internal request) */ + __EE_APPLICATION, }; #define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO) #define EE_MAY_SET_IN_SYNC (1<<__EE_MAY_SET_IN_SYNC) @@ -419,6 +487,9 @@ enum { #define EE_RESTART_REQUESTS (1<<__EE_RESTART_REQUESTS) #define EE_SEND_WRITE_ACK (1<<__EE_SEND_WRITE_ACK) #define EE_IN_INTERVAL_TREE (1<<__EE_IN_INTERVAL_TREE) +#define EE_SUBMITTED (1<<__EE_SUBMITTED) +#define EE_WRITE (1<<__EE_WRITE) +#define EE_APPLICATION (1<<__EE_APPLICATION) /* flag bits per device */ enum { @@ -433,11 +504,11 @@ enum { CONSIDER_RESYNC, MD_NO_FUA, /* Users wants us to not use FUA/FLUSH on meta data dev */ + SUSPEND_IO, /* suspend application io */ BITMAP_IO, /* suspend application io; once no more io in flight, start bitmap io */ BITMAP_IO_QUEUED, /* Started bitmap IO */ - GO_DISKLESS, /* Disk is being detached, on io-error or admin request. */ WAS_IO_ERROR, /* Local disk failed, returned IO error */ WAS_READ_ERROR, /* Local disk READ failed (set additionally to the above) */ FORCE_DETACH, /* Force-detach from local disk, aborting any pending local IO */ @@ -450,6 +521,20 @@ enum { B_RS_H_DONE, /* Before resync handler done (already executed) */ DISCARD_MY_DATA, /* discard_my_data flag per volume */ READ_BALANCE_RR, + + FLUSH_PENDING, /* if set, device->flush_jif is when we submitted that flush + * from drbd_flush_after_epoch() */ + + /* cleared only after backing device related structures have been destroyed. */ + GOING_DISKLESS, /* Disk is being detached, because of io-error, or admin request. */ + + /* to be used in drbd_device_post_work() */ + GO_DISKLESS, /* tell worker to schedule cleanup before detach */ + DESTROY_DISK, /* tell worker to close backing devices and destroy related structures. */ + MD_SYNC, /* tell worker to call drbd_md_sync() */ + RS_START, /* tell worker to start resync/OV */ + RS_PROGRESS, /* tell worker that resync made significant progress */ + RS_DONE, /* tell worker that resync is done */ }; struct drbd_bitmap; /* opaque for drbd_device */ @@ -531,6 +616,11 @@ struct drbd_backing_dev { }; struct drbd_md_io { + struct page *page; + unsigned long start_jif; /* last call to drbd_md_get_buffer */ + unsigned long submit_jif; /* last _drbd_md_sync_page_io() submit */ + const char *current_use; + atomic_t in_use; unsigned int done; int error; }; @@ -577,10 +667,18 @@ enum { * and potentially deadlock on, this drbd worker. */ DISCONNECT_SENT, + + DEVICE_WORK_PENDING, /* tell worker that some device has pending work */ }; struct drbd_resource { char *name; +#ifdef CONFIG_DEBUG_FS + struct dentry *debugfs_res; + struct dentry *debugfs_res_volumes; + struct dentry *debugfs_res_connections; + struct dentry *debugfs_res_in_flight_summary; +#endif struct kref kref; struct idr devices; /* volume number to device mapping */ struct list_head connections; @@ -594,12 +692,28 @@ struct drbd_resource { unsigned susp_nod:1; /* IO suspended because no data */ unsigned susp_fen:1; /* IO suspended because fence peer handler runs */ + enum write_ordering_e write_ordering; + cpumask_var_t cpu_mask; }; +struct drbd_thread_timing_details +{ + unsigned long start_jif; + void *cb_addr; + const char *caller_fn; + unsigned int line; + unsigned int cb_nr; +}; + struct drbd_connection { struct list_head connections; struct drbd_resource *resource; +#ifdef CONFIG_DEBUG_FS + struct dentry *debugfs_conn; + struct dentry *debugfs_conn_callback_history; + struct dentry *debugfs_conn_oldest_requests; +#endif struct kref kref; struct idr peer_devices; /* volume number to peer device mapping */ enum drbd_conns cstate; /* Only C_STANDALONE to C_WF_REPORT_PARAMS */ @@ -636,7 +750,6 @@ struct drbd_connection { struct drbd_epoch *current_epoch; spinlock_t epoch_lock; unsigned int epochs; - enum write_ordering_e write_ordering; atomic_t current_tle_nr; /* transfer log epoch number */ unsigned current_tle_writes; /* writes seen within this tl epoch */ @@ -645,9 +758,22 @@ struct drbd_connection { struct drbd_thread worker; struct drbd_thread asender; + /* cached pointers, + * so we can look up the oldest pending requests more quickly. + * protected by resource->req_lock */ + struct drbd_request *req_next; /* DRBD 9: todo.req_next */ + struct drbd_request *req_ack_pending; + struct drbd_request *req_not_net_done; + /* sender side */ struct drbd_work_queue sender_work; +#define DRBD_THREAD_DETAILS_HIST 16 + unsigned int w_cb_nr; /* keeps counting up */ + unsigned int r_cb_nr; /* keeps counting up */ + struct drbd_thread_timing_details w_timing_details[DRBD_THREAD_DETAILS_HIST]; + struct drbd_thread_timing_details r_timing_details[DRBD_THREAD_DETAILS_HIST]; + struct { /* whether this sender thread * has processed a single write yet. */ @@ -663,11 +789,22 @@ struct drbd_connection { } send; }; +void __update_timing_details( + struct drbd_thread_timing_details *tdp, + unsigned int *cb_nr, + void *cb, + const char *fn, const unsigned int line); + +#define update_worker_timing_details(c, cb) \ + __update_timing_details(c->w_timing_details, &c->w_cb_nr, cb, __func__ , __LINE__ ) +#define update_receiver_timing_details(c, cb) \ + __update_timing_details(c->r_timing_details, &c->r_cb_nr, cb, __func__ , __LINE__ ) + struct submit_worker { struct workqueue_struct *wq; struct work_struct worker; - spinlock_t lock; + /* protected by ..->resource->req_lock */ struct list_head writes; }; @@ -675,12 +812,29 @@ struct drbd_peer_device { struct list_head peer_devices; struct drbd_device *device; struct drbd_connection *connection; +#ifdef CONFIG_DEBUG_FS + struct dentry *debugfs_peer_dev; +#endif }; struct drbd_device { struct drbd_resource *resource; struct list_head peer_devices; - int vnr; /* volume number within the connection */ + struct list_head pending_bitmap_io; + + unsigned long flush_jif; +#ifdef CONFIG_DEBUG_FS + struct dentry *debugfs_minor; + struct dentry *debugfs_vol; + struct dentry *debugfs_vol_oldest_requests; + struct dentry *debugfs_vol_act_log_extents; + struct dentry *debugfs_vol_resync_extents; + struct dentry *debugfs_vol_data_gen_id; +#endif + + unsigned int vnr; /* volume number within the connection */ + unsigned int minor; /* device minor number */ + struct kref kref; /* things that are stored as / read from meta data on disk */ @@ -697,19 +851,10 @@ struct drbd_device { unsigned long last_reattach_jif; struct drbd_work resync_work; struct drbd_work unplug_work; - struct drbd_work go_diskless; - struct drbd_work md_sync_work; - struct drbd_work start_resync_work; struct timer_list resync_timer; struct timer_list md_sync_timer; struct timer_list start_resync_timer; struct timer_list request_timer; -#ifdef DRBD_DEBUG_MD_SYNC - struct { - unsigned int line; - const char* func; - } last_md_mark_dirty; -#endif /* Used after attach while negotiating new disk state. */ union drbd_state new_state_tmp; @@ -724,6 +869,7 @@ struct drbd_device { unsigned int al_writ_cnt; unsigned int bm_writ_cnt; atomic_t ap_bio_cnt; /* Requests we need to complete */ + atomic_t ap_actlog_cnt; /* Requests waiting for activity log */ atomic_t ap_pending_cnt; /* AP data packets on the wire, ack expected */ atomic_t rs_pending_cnt; /* RS request/data packets on the wire */ atomic_t unacked_cnt; /* Need to send replies for */ @@ -733,6 +879,13 @@ struct drbd_device { struct rb_root read_requests; struct rb_root write_requests; + /* for statistics and timeouts */ + /* [0] read, [1] write */ + struct list_head pending_master_completion[2]; + struct list_head pending_completion[2]; + + /* use checksums for *this* resync */ + bool use_csums; /* blocks to resync in this run [unit BM_BLOCK_SIZE] */ unsigned long rs_total; /* number of resync blocks that failed in this run */ @@ -788,9 +941,7 @@ struct drbd_device { atomic_t pp_in_use; /* allocated from page pool */ atomic_t pp_in_use_by_net; /* sendpage()d, still referenced by tcp */ wait_queue_head_t ee_wait; - struct page *md_io_page; /* one page buffer for md_io */ struct drbd_md_io md_io; - atomic_t md_io_in_use; /* protects the md_io, md_io_page and md_io_tmpp */ spinlock_t al_lock; wait_queue_head_t al_wait; struct lru_cache *act_log; /* activity log */ @@ -800,7 +951,6 @@ struct drbd_device { atomic_t packet_seq; unsigned int peer_seq; spinlock_t peer_seq_lock; - unsigned int minor; unsigned long comm_bm_set; /* communicated number of set bits. */ struct bm_io_work bm_io_work; u64 ed_uuid; /* UUID of the exposed data */ @@ -824,6 +974,21 @@ struct drbd_device { struct submit_worker submit; }; +struct drbd_bm_aio_ctx { + struct drbd_device *device; + struct list_head list; /* on device->pending_bitmap_io */; + unsigned long start_jif; + atomic_t in_flight; + unsigned int done; + unsigned flags; +#define BM_AIO_COPY_PAGES 1 +#define BM_AIO_WRITE_HINTED 2 +#define BM_AIO_WRITE_ALL_PAGES 4 +#define BM_AIO_READ 8 + int error; + struct kref kref; +}; + struct drbd_config_context { /* assigned from drbd_genlmsghdr */ unsigned int minor; @@ -949,7 +1114,7 @@ extern int drbd_send_ov_request(struct drbd_peer_device *, sector_t sector, int extern int drbd_send_bitmap(struct drbd_device *device); extern void drbd_send_sr_reply(struct drbd_peer_device *, enum drbd_state_rv retcode); extern void conn_send_sr_reply(struct drbd_connection *connection, enum drbd_state_rv retcode); -extern void drbd_free_bc(struct drbd_backing_dev *ldev); +extern void drbd_free_ldev(struct drbd_backing_dev *ldev); extern void drbd_device_cleanup(struct drbd_device *device); void drbd_print_uuids(struct drbd_device *device, const char *text); @@ -966,13 +1131,7 @@ extern void __drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must extern void drbd_md_set_flag(struct drbd_device *device, int flags) __must_hold(local); extern void drbd_md_clear_flag(struct drbd_device *device, int flags)__must_hold(local); extern int drbd_md_test_flag(struct drbd_backing_dev *, int); -#ifndef DRBD_DEBUG_MD_SYNC extern void drbd_md_mark_dirty(struct drbd_device *device); -#else -#define drbd_md_mark_dirty(m) drbd_md_mark_dirty_(m, __LINE__ , __func__ ) -extern void drbd_md_mark_dirty_(struct drbd_device *device, - unsigned int line, const char *func); -#endif extern void drbd_queue_bitmap_io(struct drbd_device *device, int (*io_fn)(struct drbd_device *), void (*done)(struct drbd_device *, int), @@ -983,9 +1142,8 @@ extern int drbd_bitmap_io(struct drbd_device *device, extern int drbd_bitmap_io_from_worker(struct drbd_device *device, int (*io_fn)(struct drbd_device *), char *why, enum bm_flag flags); -extern int drbd_bmio_set_n_write(struct drbd_device *device); -extern int drbd_bmio_clear_n_write(struct drbd_device *device); -extern void drbd_ldev_destroy(struct drbd_device *device); +extern int drbd_bmio_set_n_write(struct drbd_device *device) __must_hold(local); +extern int drbd_bmio_clear_n_write(struct drbd_device *device) __must_hold(local); /* Meta data layout * @@ -1105,17 +1263,21 @@ struct bm_extent { /* in which _bitmap_ extent (resp. sector) the bit for a certain * _storage_ sector is located in */ #define BM_SECT_TO_EXT(x) ((x)>>(BM_EXT_SHIFT-9)) +#define BM_BIT_TO_EXT(x) ((x) >> (BM_EXT_SHIFT - BM_BLOCK_SHIFT)) -/* how much _storage_ sectors we have per bitmap sector */ +/* first storage sector a bitmap extent corresponds to */ #define BM_EXT_TO_SECT(x) ((sector_t)(x) << (BM_EXT_SHIFT-9)) +/* how much _storage_ sectors we have per bitmap extent */ #define BM_SECT_PER_EXT BM_EXT_TO_SECT(1) +/* how many bits are covered by one bitmap extent (resync extent) */ +#define BM_BITS_PER_EXT (1UL << (BM_EXT_SHIFT - BM_BLOCK_SHIFT)) + +#define BM_BLOCKS_PER_BM_EXT_MASK (BM_BITS_PER_EXT - 1) + /* in one sector of the bitmap, we have this many activity_log extents. */ #define AL_EXT_PER_BM_SECT (1 << (BM_EXT_SHIFT - AL_EXTENT_SHIFT)) -#define BM_BLOCKS_PER_BM_EXT_B (BM_EXT_SHIFT - BM_BLOCK_SHIFT) -#define BM_BLOCKS_PER_BM_EXT_MASK ((1<<BM_BLOCKS_PER_BM_EXT_B) - 1) - /* the extent in "PER_EXTENT" below is an activity log extent * we need that many (long words/bytes) to store the bitmap * of one AL_EXTENT_SIZE chunk of storage. @@ -1195,11 +1357,11 @@ extern void _drbd_bm_set_bits(struct drbd_device *device, const unsigned long s, const unsigned long e); extern int drbd_bm_test_bit(struct drbd_device *device, unsigned long bitnr); extern int drbd_bm_e_weight(struct drbd_device *device, unsigned long enr); -extern int drbd_bm_write_page(struct drbd_device *device, unsigned int idx) __must_hold(local); extern int drbd_bm_read(struct drbd_device *device) __must_hold(local); extern void drbd_bm_mark_for_writeout(struct drbd_device *device, int page_nr); extern int drbd_bm_write(struct drbd_device *device) __must_hold(local); extern int drbd_bm_write_hinted(struct drbd_device *device) __must_hold(local); +extern int drbd_bm_write_lazy(struct drbd_device *device, unsigned upper_idx) __must_hold(local); extern int drbd_bm_write_all(struct drbd_device *device) __must_hold(local); extern int drbd_bm_write_copy_pages(struct drbd_device *device) __must_hold(local); extern size_t drbd_bm_words(struct drbd_device *device); @@ -1213,7 +1375,6 @@ extern unsigned long _drbd_bm_find_next(struct drbd_device *device, unsigned lon extern unsigned long _drbd_bm_find_next_zero(struct drbd_device *device, unsigned long bm_fo); extern unsigned long _drbd_bm_total_weight(struct drbd_device *device); extern unsigned long drbd_bm_total_weight(struct drbd_device *device); -extern int drbd_bm_rs_done(struct drbd_device *device); /* for receive_bitmap */ extern void drbd_bm_merge_lel(struct drbd_device *device, size_t offset, size_t number, unsigned long *buffer); @@ -1312,7 +1473,7 @@ enum determine_dev_size { extern enum determine_dev_size drbd_determine_dev_size(struct drbd_device *, enum dds_flags, struct resize_parms *) __must_hold(local); extern void resync_after_online_grow(struct drbd_device *); -extern void drbd_reconsider_max_bio_size(struct drbd_device *device); +extern void drbd_reconsider_max_bio_size(struct drbd_device *device, struct drbd_backing_dev *bdev); extern enum drbd_state_rv drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force); @@ -1333,7 +1494,7 @@ extern void resume_next_sg(struct drbd_device *device); extern void suspend_other_sg(struct drbd_device *device); extern int drbd_resync_finished(struct drbd_device *device); /* maybe rather drbd_main.c ? */ -extern void *drbd_md_get_buffer(struct drbd_device *device); +extern void *drbd_md_get_buffer(struct drbd_device *device, const char *intent); extern void drbd_md_put_buffer(struct drbd_device *device); extern int drbd_md_sync_page_io(struct drbd_device *device, struct drbd_backing_dev *bdev, sector_t sector, int rw); @@ -1380,7 +1541,8 @@ extern void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req); extern int drbd_receiver(struct drbd_thread *thi); extern int drbd_asender(struct drbd_thread *thi); extern bool drbd_rs_c_min_rate_throttle(struct drbd_device *device); -extern bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector); +extern bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector, + bool throttle_if_app_is_waiting); extern int drbd_submit_peer_request(struct drbd_device *, struct drbd_peer_request *, const unsigned, const int); @@ -1464,10 +1626,7 @@ static inline void drbd_generic_make_request(struct drbd_device *device, { __release(local); if (!bio->bi_bdev) { - printk(KERN_ERR "drbd%d: drbd_generic_make_request: " - "bio->bi_bdev == NULL\n", - device_to_minor(device)); - dump_stack(); + drbd_err(device, "drbd_generic_make_request: bio->bi_bdev == NULL\n"); bio_endio(bio, -ENODEV); return; } @@ -1478,7 +1637,8 @@ static inline void drbd_generic_make_request(struct drbd_device *device, generic_make_request(bio); } -void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo); +void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev, + enum write_ordering_e wo); /* drbd_proc.c */ extern struct proc_dir_entry *drbd_proc; @@ -1489,9 +1649,9 @@ extern const char *drbd_role_str(enum drbd_role s); /* drbd_actlog.c */ extern bool drbd_al_begin_io_prepare(struct drbd_device *device, struct drbd_interval *i); extern int drbd_al_begin_io_nonblock(struct drbd_device *device, struct drbd_interval *i); -extern void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate); +extern void drbd_al_begin_io_commit(struct drbd_device *device); extern bool drbd_al_begin_io_fastpath(struct drbd_device *device, struct drbd_interval *i); -extern void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i, bool delegate); +extern void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i); extern void drbd_al_complete_io(struct drbd_device *device, struct drbd_interval *i); extern void drbd_rs_complete_io(struct drbd_device *device, sector_t sector); extern int drbd_rs_begin_io(struct drbd_device *device, sector_t sector); @@ -1501,14 +1661,17 @@ extern int drbd_rs_del_all(struct drbd_device *device); extern void drbd_rs_failed_io(struct drbd_device *device, sector_t sector, int size); extern void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go); -extern void __drbd_set_in_sync(struct drbd_device *device, sector_t sector, - int size, const char *file, const unsigned int line); + +enum update_sync_bits_mode { RECORD_RS_FAILED, SET_OUT_OF_SYNC, SET_IN_SYNC }; +extern int __drbd_change_sync(struct drbd_device *device, sector_t sector, int size, + enum update_sync_bits_mode mode, + const char *file, const unsigned int line); #define drbd_set_in_sync(device, sector, size) \ - __drbd_set_in_sync(device, sector, size, __FILE__, __LINE__) -extern int __drbd_set_out_of_sync(struct drbd_device *device, sector_t sector, - int size, const char *file, const unsigned int line); + __drbd_change_sync(device, sector, size, SET_IN_SYNC, __FILE__, __LINE__) #define drbd_set_out_of_sync(device, sector, size) \ - __drbd_set_out_of_sync(device, sector, size, __FILE__, __LINE__) + __drbd_change_sync(device, sector, size, SET_OUT_OF_SYNC, __FILE__, __LINE__) +#define drbd_rs_failed_io(device, sector, size) \ + __drbd_change_sync(device, sector, size, RECORD_RS_FAILED, __FILE__, __LINE__) extern void drbd_al_shrink(struct drbd_device *device); extern int drbd_initialize_al(struct drbd_device *, void *); @@ -1764,25 +1927,38 @@ static inline sector_t drbd_md_ss(struct drbd_backing_dev *bdev) } static inline void -drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w) +drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w) { unsigned long flags; spin_lock_irqsave(&q->q_lock, flags); - list_add(&w->list, &q->q); + list_add_tail(&w->list, &q->q); spin_unlock_irqrestore(&q->q_lock, flags); wake_up(&q->q_wait); } static inline void -drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w) +drbd_queue_work_if_unqueued(struct drbd_work_queue *q, struct drbd_work *w) { unsigned long flags; spin_lock_irqsave(&q->q_lock, flags); - list_add_tail(&w->list, &q->q); + if (list_empty_careful(&w->list)) + list_add_tail(&w->list, &q->q); spin_unlock_irqrestore(&q->q_lock, flags); wake_up(&q->q_wait); } +static inline void +drbd_device_post_work(struct drbd_device *device, int work_bit) +{ + if (!test_and_set_bit(work_bit, &device->flags)) { + struct drbd_connection *connection = + first_peer_device(device)->connection; + struct drbd_work_queue *q = &connection->sender_work; + if (!test_and_set_bit(DEVICE_WORK_PENDING, &connection->flags)) + wake_up(&q->q_wait); + } +} + extern void drbd_flush_workqueue(struct drbd_work_queue *work_queue); static inline void wake_asender(struct drbd_connection *connection) @@ -1859,7 +2035,7 @@ static inline void inc_ap_pending(struct drbd_device *device) func, line, \ atomic_read(&device->which)) -#define dec_ap_pending(device) _dec_ap_pending(device, __FUNCTION__, __LINE__) +#define dec_ap_pending(device) _dec_ap_pending(device, __func__, __LINE__) static inline void _dec_ap_pending(struct drbd_device *device, const char *func, int line) { if (atomic_dec_and_test(&device->ap_pending_cnt)) @@ -1878,7 +2054,7 @@ static inline void inc_rs_pending(struct drbd_device *device) atomic_inc(&device->rs_pending_cnt); } -#define dec_rs_pending(device) _dec_rs_pending(device, __FUNCTION__, __LINE__) +#define dec_rs_pending(device) _dec_rs_pending(device, __func__, __LINE__) static inline void _dec_rs_pending(struct drbd_device *device, const char *func, int line) { atomic_dec(&device->rs_pending_cnt); @@ -1899,20 +2075,29 @@ static inline void inc_unacked(struct drbd_device *device) atomic_inc(&device->unacked_cnt); } -#define dec_unacked(device) _dec_unacked(device, __FUNCTION__, __LINE__) +#define dec_unacked(device) _dec_unacked(device, __func__, __LINE__) static inline void _dec_unacked(struct drbd_device *device, const char *func, int line) { atomic_dec(&device->unacked_cnt); ERR_IF_CNT_IS_NEGATIVE(unacked_cnt, func, line); } -#define sub_unacked(device, n) _sub_unacked(device, n, __FUNCTION__, __LINE__) +#define sub_unacked(device, n) _sub_unacked(device, n, __func__, __LINE__) static inline void _sub_unacked(struct drbd_device *device, int n, const char *func, int line) { atomic_sub(n, &device->unacked_cnt); ERR_IF_CNT_IS_NEGATIVE(unacked_cnt, func, line); } +static inline bool is_sync_state(enum drbd_conns connection_state) +{ + return + (connection_state == C_SYNC_SOURCE + || connection_state == C_SYNC_TARGET + || connection_state == C_PAUSED_SYNC_S + || connection_state == C_PAUSED_SYNC_T); +} + /** * get_ldev() - Increase the ref count on device->ldev. Returns 0 if there is no ldev * @M: DRBD device. @@ -1924,6 +2109,11 @@ static inline void _sub_unacked(struct drbd_device *device, int n, const char *f static inline void put_ldev(struct drbd_device *device) { + enum drbd_disk_state ds = device->state.disk; + /* We must check the state *before* the atomic_dec becomes visible, + * or we have a theoretical race where someone hitting zero, + * while state still D_FAILED, will then see D_DISKLESS in the + * condition below and calling into destroy, where he must not, yet. */ int i = atomic_dec_return(&device->local_cnt); /* This may be called from some endio handler, @@ -1932,15 +2122,13 @@ static inline void put_ldev(struct drbd_device *device) __release(local); D_ASSERT(device, i >= 0); if (i == 0) { - if (device->state.disk == D_DISKLESS) + if (ds == D_DISKLESS) /* even internal references gone, safe to destroy */ - drbd_ldev_destroy(device); - if (device->state.disk == D_FAILED) { + drbd_device_post_work(device, DESTROY_DISK); + if (ds == D_FAILED) /* all application IO references gone. */ - if (!test_and_set_bit(GO_DISKLESS, &device->flags)) - drbd_queue_work(&first_peer_device(device)->connection->sender_work, - &device->go_diskless); - } + if (!test_and_set_bit(GOING_DISKLESS, &device->flags)) + drbd_device_post_work(device, GO_DISKLESS); wake_up(&device->misc_wait); } } @@ -1964,54 +2152,6 @@ static inline int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_ extern int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_state mins); #endif -/* you must have an "get_ldev" reference */ -static inline void drbd_get_syncer_progress(struct drbd_device *device, - unsigned long *bits_left, unsigned int *per_mil_done) -{ - /* this is to break it at compile time when we change that, in case we - * want to support more than (1<<32) bits on a 32bit arch. */ - typecheck(unsigned long, device->rs_total); - - /* note: both rs_total and rs_left are in bits, i.e. in - * units of BM_BLOCK_SIZE. - * for the percentage, we don't care. */ - - if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) - *bits_left = device->ov_left; - else - *bits_left = drbd_bm_total_weight(device) - device->rs_failed; - /* >> 10 to prevent overflow, - * +1 to prevent division by zero */ - if (*bits_left > device->rs_total) { - /* doh. maybe a logic bug somewhere. - * may also be just a race condition - * between this and a disconnect during sync. - * for now, just prevent in-kernel buffer overflow. - */ - smp_rmb(); - drbd_warn(device, "cs:%s rs_left=%lu > rs_total=%lu (rs_failed %lu)\n", - drbd_conn_str(device->state.conn), - *bits_left, device->rs_total, device->rs_failed); - *per_mil_done = 0; - } else { - /* Make sure the division happens in long context. - * We allow up to one petabyte storage right now, - * at a granularity of 4k per bit that is 2**38 bits. - * After shift right and multiplication by 1000, - * this should still fit easily into a 32bit long, - * so we don't need a 64bit division on 32bit arch. - * Note: currently we don't support such large bitmaps on 32bit - * arch anyways, but no harm done to be prepared for it here. - */ - unsigned int shift = device->rs_total > UINT_MAX ? 16 : 10; - unsigned long left = *bits_left >> shift; - unsigned long total = 1UL + (device->rs_total >> shift); - unsigned long tmp = 1000UL - left * 1000UL/total; - *per_mil_done = tmp; - } -} - - /* this throttles on-the-fly application requests * according to max_buffers settings; * maybe re-implement using semaphores? */ @@ -2201,25 +2341,6 @@ static inline int drbd_queue_order_type(struct drbd_device *device) return QUEUE_ORDERED_NONE; } -static inline void drbd_md_flush(struct drbd_device *device) -{ - int r; - - if (device->ldev == NULL) { - drbd_warn(device, "device->ldev == NULL in drbd_md_flush\n"); - return; - } - - if (test_bit(MD_NO_FUA, &device->flags)) - return; - - r = blkdev_issue_flush(device->ldev->md_bdev, GFP_NOIO, NULL); - if (r) { - set_bit(MD_NO_FUA, &device->flags); - drbd_err(device, "meta data flush failed with status %d, disabling md-flushes\n", r); - } -} - static inline struct drbd_connection *first_connection(struct drbd_resource *resource) { return list_first_entry_or_null(&resource->connections, diff --git a/drivers/block/drbd/drbd_interval.h b/drivers/block/drbd/drbd_interval.h index f38fcb00c10d..f210543f05f4 100644 --- a/drivers/block/drbd/drbd_interval.h +++ b/drivers/block/drbd/drbd_interval.h @@ -10,7 +10,9 @@ struct drbd_interval { unsigned int size; /* size in bytes */ sector_t end; /* highest interval end in subtree */ int local:1 /* local or remote request? */; - int waiting:1; + int waiting:1; /* someone is waiting for this to complete */ + int completed:1; /* this has been completed already; + * ignore for conflict detection */ }; static inline void drbd_clear_interval(struct drbd_interval *i) diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c index 960645c26e6f..9b465bb68487 100644 --- a/drivers/block/drbd/drbd_main.c +++ b/drivers/block/drbd/drbd_main.c @@ -26,7 +26,10 @@ */ +#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt + #include <linux/module.h> +#include <linux/jiffies.h> #include <linux/drbd.h> #include <asm/uaccess.h> #include <asm/types.h> @@ -54,16 +57,14 @@ #include "drbd_int.h" #include "drbd_protocol.h" #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */ - #include "drbd_vli.h" +#include "drbd_debugfs.h" static DEFINE_MUTEX(drbd_main_mutex); static int drbd_open(struct block_device *bdev, fmode_t mode); static void drbd_release(struct gendisk *gd, fmode_t mode); -static int w_md_sync(struct drbd_work *w, int unused); static void md_sync_timer_fn(unsigned long data); static int w_bitmap_io(struct drbd_work *w, int unused); -static int w_go_diskless(struct drbd_work *w, int unused); MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, " "Lars Ellenberg <lars@linbit.com>"); @@ -264,7 +265,7 @@ bail: /** * _tl_restart() - Walks the transfer log, and applies an action to all requests - * @device: DRBD device. + * @connection: DRBD connection to operate on. * @what: The action/event to perform with all request objects * * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO, @@ -662,6 +663,11 @@ static int __send_command(struct drbd_connection *connection, int vnr, msg_flags); if (data && !err) err = drbd_send_all(connection, sock->socket, data, size, 0); + /* DRBD protocol "pings" are latency critical. + * This is supposed to trigger tcp_push_pending_frames() */ + if (!err && (cmd == P_PING || cmd == P_PING_ACK)) + drbd_tcp_nodelay(sock->socket); + return err; } @@ -1636,7 +1642,10 @@ int drbd_send_dblock(struct drbd_peer_device *peer_device, struct drbd_request * if (peer_device->connection->agreed_pro_version >= 100) { if (req->rq_state & RQ_EXP_RECEIVE_ACK) dp_flags |= DP_SEND_RECEIVE_ACK; - if (req->rq_state & RQ_EXP_WRITE_ACK) + /* During resync, request an explicit write ack, + * even in protocol != C */ + if (req->rq_state & RQ_EXP_WRITE_ACK + || (dp_flags & DP_MAY_SET_IN_SYNC)) dp_flags |= DP_SEND_WRITE_ACK; } p->dp_flags = cpu_to_be32(dp_flags); @@ -1900,6 +1909,7 @@ void drbd_init_set_defaults(struct drbd_device *device) drbd_set_defaults(device); atomic_set(&device->ap_bio_cnt, 0); + atomic_set(&device->ap_actlog_cnt, 0); atomic_set(&device->ap_pending_cnt, 0); atomic_set(&device->rs_pending_cnt, 0); atomic_set(&device->unacked_cnt, 0); @@ -1908,7 +1918,7 @@ void drbd_init_set_defaults(struct drbd_device *device) atomic_set(&device->rs_sect_in, 0); atomic_set(&device->rs_sect_ev, 0); atomic_set(&device->ap_in_flight, 0); - atomic_set(&device->md_io_in_use, 0); + atomic_set(&device->md_io.in_use, 0); mutex_init(&device->own_state_mutex); device->state_mutex = &device->own_state_mutex; @@ -1924,17 +1934,15 @@ void drbd_init_set_defaults(struct drbd_device *device) INIT_LIST_HEAD(&device->resync_reads); INIT_LIST_HEAD(&device->resync_work.list); INIT_LIST_HEAD(&device->unplug_work.list); - INIT_LIST_HEAD(&device->go_diskless.list); - INIT_LIST_HEAD(&device->md_sync_work.list); - INIT_LIST_HEAD(&device->start_resync_work.list); INIT_LIST_HEAD(&device->bm_io_work.w.list); + INIT_LIST_HEAD(&device->pending_master_completion[0]); + INIT_LIST_HEAD(&device->pending_master_completion[1]); + INIT_LIST_HEAD(&device->pending_completion[0]); + INIT_LIST_HEAD(&device->pending_completion[1]); device->resync_work.cb = w_resync_timer; device->unplug_work.cb = w_send_write_hint; - device->go_diskless.cb = w_go_diskless; - device->md_sync_work.cb = w_md_sync; device->bm_io_work.w.cb = w_bitmap_io; - device->start_resync_work.cb = w_start_resync; init_timer(&device->resync_timer); init_timer(&device->md_sync_timer); @@ -1992,7 +2000,7 @@ void drbd_device_cleanup(struct drbd_device *device) drbd_bm_cleanup(device); } - drbd_free_bc(device->ldev); + drbd_free_ldev(device->ldev); device->ldev = NULL; clear_bit(AL_SUSPENDED, &device->flags); @@ -2006,7 +2014,6 @@ void drbd_device_cleanup(struct drbd_device *device) D_ASSERT(device, list_empty(&first_peer_device(device)->connection->sender_work.q)); D_ASSERT(device, list_empty(&device->resync_work.list)); D_ASSERT(device, list_empty(&device->unplug_work.list)); - D_ASSERT(device, list_empty(&device->go_diskless.list)); drbd_set_defaults(device); } @@ -2129,20 +2136,6 @@ Enomem: return -ENOMEM; } -static int drbd_notify_sys(struct notifier_block *this, unsigned long code, - void *unused) -{ - /* just so we have it. you never know what interesting things we - * might want to do here some day... - */ - - return NOTIFY_DONE; -} - -static struct notifier_block drbd_notifier = { - .notifier_call = drbd_notify_sys, -}; - static void drbd_release_all_peer_reqs(struct drbd_device *device) { int rr; @@ -2173,7 +2166,7 @@ void drbd_destroy_device(struct kref *kref) { struct drbd_device *device = container_of(kref, struct drbd_device, kref); struct drbd_resource *resource = device->resource; - struct drbd_connection *connection; + struct drbd_peer_device *peer_device, *tmp_peer_device; del_timer_sync(&device->request_timer); @@ -2187,7 +2180,7 @@ void drbd_destroy_device(struct kref *kref) if (device->this_bdev) bdput(device->this_bdev); - drbd_free_bc(device->ldev); + drbd_free_ldev(device->ldev); device->ldev = NULL; drbd_release_all_peer_reqs(device); @@ -2200,15 +2193,20 @@ void drbd_destroy_device(struct kref *kref) if (device->bitmap) /* should no longer be there. */ drbd_bm_cleanup(device); - __free_page(device->md_io_page); + __free_page(device->md_io.page); put_disk(device->vdisk); blk_cleanup_queue(device->rq_queue); kfree(device->rs_plan_s); - kfree(first_peer_device(device)); - kfree(device); - for_each_connection(connection, resource) - kref_put(&connection->kref, drbd_destroy_connection); + /* not for_each_connection(connection, resource): + * those may have been cleaned up and disassociated already. + */ + for_each_peer_device_safe(peer_device, tmp_peer_device, device) { + kref_put(&peer_device->connection->kref, drbd_destroy_connection); + kfree(peer_device); + } + memset(device, 0xfd, sizeof(*device)); + kfree(device); kref_put(&resource->kref, drbd_destroy_resource); } @@ -2236,7 +2234,7 @@ static void do_retry(struct work_struct *ws) list_for_each_entry_safe(req, tmp, &writes, tl_requests) { struct drbd_device *device = req->device; struct bio *bio = req->master_bio; - unsigned long start_time = req->start_time; + unsigned long start_jif = req->start_jif; bool expected; expected = @@ -2271,10 +2269,12 @@ static void do_retry(struct work_struct *ws) /* We are not just doing generic_make_request(), * as we want to keep the start_time information. */ inc_ap_bio(device); - __drbd_make_request(device, bio, start_time); + __drbd_make_request(device, bio, start_jif); } } +/* called via drbd_req_put_completion_ref(), + * holds resource->req_lock */ void drbd_restart_request(struct drbd_request *req) { unsigned long flags; @@ -2298,6 +2298,7 @@ void drbd_destroy_resource(struct kref *kref) idr_destroy(&resource->devices); free_cpumask_var(resource->cpu_mask); kfree(resource->name); + memset(resource, 0xf2, sizeof(*resource)); kfree(resource); } @@ -2307,8 +2308,10 @@ void drbd_free_resource(struct drbd_resource *resource) for_each_connection_safe(connection, tmp, resource) { list_del(&connection->connections); + drbd_debugfs_connection_cleanup(connection); kref_put(&connection->kref, drbd_destroy_connection); } + drbd_debugfs_resource_cleanup(resource); kref_put(&resource->kref, drbd_destroy_resource); } @@ -2318,8 +2321,6 @@ static void drbd_cleanup(void) struct drbd_device *device; struct drbd_resource *resource, *tmp; - unregister_reboot_notifier(&drbd_notifier); - /* first remove proc, * drbdsetup uses it's presence to detect * whether DRBD is loaded. @@ -2335,6 +2336,7 @@ static void drbd_cleanup(void) destroy_workqueue(retry.wq); drbd_genl_unregister(); + drbd_debugfs_cleanup(); idr_for_each_entry(&drbd_devices, device, i) drbd_delete_device(device); @@ -2350,7 +2352,7 @@ static void drbd_cleanup(void) idr_destroy(&drbd_devices); - printk(KERN_INFO "drbd: module cleanup done.\n"); + pr_info("module cleanup done.\n"); } /** @@ -2539,6 +2541,20 @@ int set_resource_options(struct drbd_resource *resource, struct res_opts *res_op if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) { err = bitmap_parse(res_opts->cpu_mask, DRBD_CPU_MASK_SIZE, cpumask_bits(new_cpu_mask), nr_cpu_ids); + if (err == -EOVERFLOW) { + /* So what. mask it out. */ + cpumask_var_t tmp_cpu_mask; + if (zalloc_cpumask_var(&tmp_cpu_mask, GFP_KERNEL)) { + cpumask_setall(tmp_cpu_mask); + cpumask_and(new_cpu_mask, new_cpu_mask, tmp_cpu_mask); + drbd_warn(resource, "Overflow in bitmap_parse(%.12s%s), truncating to %u bits\n", + res_opts->cpu_mask, + strlen(res_opts->cpu_mask) > 12 ? "..." : "", + nr_cpu_ids); + free_cpumask_var(tmp_cpu_mask); + err = 0; + } + } if (err) { drbd_warn(resource, "bitmap_parse() failed with %d\n", err); /* retcode = ERR_CPU_MASK_PARSE; */ @@ -2579,10 +2595,12 @@ struct drbd_resource *drbd_create_resource(const char *name) kref_init(&resource->kref); idr_init(&resource->devices); INIT_LIST_HEAD(&resource->connections); + resource->write_ordering = WO_bdev_flush; list_add_tail_rcu(&resource->resources, &drbd_resources); mutex_init(&resource->conf_update); mutex_init(&resource->adm_mutex); spin_lock_init(&resource->req_lock); + drbd_debugfs_resource_add(resource); return resource; fail_free_name: @@ -2593,7 +2611,7 @@ fail: return NULL; } -/* caller must be under genl_lock() */ +/* caller must be under adm_mutex */ struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts) { struct drbd_resource *resource; @@ -2617,7 +2635,6 @@ struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts) INIT_LIST_HEAD(&connection->current_epoch->list); connection->epochs = 1; spin_lock_init(&connection->epoch_lock); - connection->write_ordering = WO_bdev_flush; connection->send.seen_any_write_yet = false; connection->send.current_epoch_nr = 0; @@ -2652,6 +2669,7 @@ struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts) kref_get(&resource->kref); list_add_tail_rcu(&connection->connections, &resource->connections); + drbd_debugfs_connection_add(connection); return connection; fail_resource: @@ -2680,6 +2698,7 @@ void drbd_destroy_connection(struct kref *kref) drbd_free_socket(&connection->data); kfree(connection->int_dig_in); kfree(connection->int_dig_vv); + memset(connection, 0xfc, sizeof(*connection)); kfree(connection); kref_put(&resource->kref, drbd_destroy_resource); } @@ -2694,7 +2713,6 @@ static int init_submitter(struct drbd_device *device) return -ENOMEM; INIT_WORK(&device->submit.worker, do_submit); - spin_lock_init(&device->submit.lock); INIT_LIST_HEAD(&device->submit.writes); return 0; } @@ -2764,8 +2782,8 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig blk_queue_merge_bvec(q, drbd_merge_bvec); q->queue_lock = &resource->req_lock; - device->md_io_page = alloc_page(GFP_KERNEL); - if (!device->md_io_page) + device->md_io.page = alloc_page(GFP_KERNEL); + if (!device->md_io.page) goto out_no_io_page; if (drbd_bm_init(device)) @@ -2794,6 +2812,7 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig kref_get(&device->kref); INIT_LIST_HEAD(&device->peer_devices); + INIT_LIST_HEAD(&device->pending_bitmap_io); for_each_connection(connection, resource) { peer_device = kzalloc(sizeof(struct drbd_peer_device), GFP_KERNEL); if (!peer_device) @@ -2829,7 +2848,10 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig for_each_peer_device(peer_device, device) drbd_connected(peer_device); } - + /* move to create_peer_device() */ + for_each_peer_device(peer_device, device) + drbd_debugfs_peer_device_add(peer_device); + drbd_debugfs_device_add(device); return NO_ERROR; out_idr_remove_vol: @@ -2853,7 +2875,7 @@ out_idr_remove_minor: out_no_minor_idr: drbd_bm_cleanup(device); out_no_bitmap: - __free_page(device->md_io_page); + __free_page(device->md_io.page); out_no_io_page: put_disk(disk); out_no_disk: @@ -2868,8 +2890,13 @@ void drbd_delete_device(struct drbd_device *device) { struct drbd_resource *resource = device->resource; struct drbd_connection *connection; + struct drbd_peer_device *peer_device; int refs = 3; + /* move to free_peer_device() */ + for_each_peer_device(peer_device, device) + drbd_debugfs_peer_device_cleanup(peer_device); + drbd_debugfs_device_cleanup(device); for_each_connection(connection, resource) { idr_remove(&connection->peer_devices, device->vnr); refs++; @@ -2881,13 +2908,12 @@ void drbd_delete_device(struct drbd_device *device) kref_sub(&device->kref, refs, drbd_destroy_device); } -int __init drbd_init(void) +static int __init drbd_init(void) { int err; if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) { - printk(KERN_ERR - "drbd: invalid minor_count (%d)\n", minor_count); + pr_err("invalid minor_count (%d)\n", minor_count); #ifdef MODULE return -EINVAL; #else @@ -2897,14 +2923,11 @@ int __init drbd_init(void) err = register_blkdev(DRBD_MAJOR, "drbd"); if (err) { - printk(KERN_ERR - "drbd: unable to register block device major %d\n", + pr_err("unable to register block device major %d\n", DRBD_MAJOR); return err; } - register_reboot_notifier(&drbd_notifier); - /* * allocate all necessary structs */ @@ -2918,7 +2941,7 @@ int __init drbd_init(void) err = drbd_genl_register(); if (err) { - printk(KERN_ERR "drbd: unable to register generic netlink family\n"); + pr_err("unable to register generic netlink family\n"); goto fail; } @@ -2929,38 +2952,39 @@ int __init drbd_init(void) err = -ENOMEM; drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL); if (!drbd_proc) { - printk(KERN_ERR "drbd: unable to register proc file\n"); + pr_err("unable to register proc file\n"); goto fail; } retry.wq = create_singlethread_workqueue("drbd-reissue"); if (!retry.wq) { - printk(KERN_ERR "drbd: unable to create retry workqueue\n"); + pr_err("unable to create retry workqueue\n"); goto fail; } INIT_WORK(&retry.worker, do_retry); spin_lock_init(&retry.lock); INIT_LIST_HEAD(&retry.writes); - printk(KERN_INFO "drbd: initialized. " + if (drbd_debugfs_init()) + pr_notice("failed to initialize debugfs -- will not be available\n"); + + pr_info("initialized. " "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n", API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX); - printk(KERN_INFO "drbd: %s\n", drbd_buildtag()); - printk(KERN_INFO "drbd: registered as block device major %d\n", - DRBD_MAJOR); - + pr_info("%s\n", drbd_buildtag()); + pr_info("registered as block device major %d\n", DRBD_MAJOR); return 0; /* Success! */ fail: drbd_cleanup(); if (err == -ENOMEM) - printk(KERN_ERR "drbd: ran out of memory\n"); + pr_err("ran out of memory\n"); else - printk(KERN_ERR "drbd: initialization failure\n"); + pr_err("initialization failure\n"); return err; } -void drbd_free_bc(struct drbd_backing_dev *ldev) +void drbd_free_ldev(struct drbd_backing_dev *ldev) { if (ldev == NULL) return; @@ -2972,24 +2996,29 @@ void drbd_free_bc(struct drbd_backing_dev *ldev) kfree(ldev); } -void drbd_free_sock(struct drbd_connection *connection) +static void drbd_free_one_sock(struct drbd_socket *ds) { - if (connection->data.socket) { - mutex_lock(&connection->data.mutex); - kernel_sock_shutdown(connection->data.socket, SHUT_RDWR); - sock_release(connection->data.socket); - connection->data.socket = NULL; - mutex_unlock(&connection->data.mutex); - } - if (connection->meta.socket) { - mutex_lock(&connection->meta.mutex); - kernel_sock_shutdown(connection->meta.socket, SHUT_RDWR); - sock_release(connection->meta.socket); - connection->meta.socket = NULL; - mutex_unlock(&connection->meta.mutex); + struct socket *s; + mutex_lock(&ds->mutex); + s = ds->socket; + ds->socket = NULL; + mutex_unlock(&ds->mutex); + if (s) { + /* so debugfs does not need to mutex_lock() */ + synchronize_rcu(); + kernel_sock_shutdown(s, SHUT_RDWR); + sock_release(s); } } +void drbd_free_sock(struct drbd_connection *connection) +{ + if (connection->data.socket) + drbd_free_one_sock(&connection->data); + if (connection->meta.socket) + drbd_free_one_sock(&connection->meta); +} + /* meta data management */ void conn_md_sync(struct drbd_connection *connection) @@ -3093,7 +3122,7 @@ void drbd_md_sync(struct drbd_device *device) if (!get_ldev_if_state(device, D_FAILED)) return; - buffer = drbd_md_get_buffer(device); + buffer = drbd_md_get_buffer(device, __func__); if (!buffer) goto out; @@ -3253,7 +3282,7 @@ int drbd_md_read(struct drbd_device *device, struct drbd_backing_dev *bdev) if (device->state.disk != D_DISKLESS) return ERR_DISK_CONFIGURED; - buffer = drbd_md_get_buffer(device); + buffer = drbd_md_get_buffer(device, __func__); if (!buffer) return ERR_NOMEM; @@ -3466,23 +3495,19 @@ void drbd_uuid_set_bm(struct drbd_device *device, u64 val) __must_hold(local) * * Sets all bits in the bitmap and writes the whole bitmap to stable storage. */ -int drbd_bmio_set_n_write(struct drbd_device *device) +int drbd_bmio_set_n_write(struct drbd_device *device) __must_hold(local) { int rv = -EIO; - if (get_ldev_if_state(device, D_ATTACHING)) { - drbd_md_set_flag(device, MDF_FULL_SYNC); - drbd_md_sync(device); - drbd_bm_set_all(device); - - rv = drbd_bm_write(device); + drbd_md_set_flag(device, MDF_FULL_SYNC); + drbd_md_sync(device); + drbd_bm_set_all(device); - if (!rv) { - drbd_md_clear_flag(device, MDF_FULL_SYNC); - drbd_md_sync(device); - } + rv = drbd_bm_write(device); - put_ldev(device); + if (!rv) { + drbd_md_clear_flag(device, MDF_FULL_SYNC); + drbd_md_sync(device); } return rv; @@ -3494,18 +3519,11 @@ int drbd_bmio_set_n_write(struct drbd_device *device) * * Clears all bits in the bitmap and writes the whole bitmap to stable storage. */ -int drbd_bmio_clear_n_write(struct drbd_device *device) +int drbd_bmio_clear_n_write(struct drbd_device *device) __must_hold(local) { - int rv = -EIO; - drbd_resume_al(device); - if (get_ldev_if_state(device, D_ATTACHING)) { - drbd_bm_clear_all(device); - rv = drbd_bm_write(device); - put_ldev(device); - } - - return rv; + drbd_bm_clear_all(device); + return drbd_bm_write(device); } static int w_bitmap_io(struct drbd_work *w, int unused) @@ -3537,61 +3555,6 @@ static int w_bitmap_io(struct drbd_work *w, int unused) return 0; } -void drbd_ldev_destroy(struct drbd_device *device) -{ - lc_destroy(device->resync); - device->resync = NULL; - lc_destroy(device->act_log); - device->act_log = NULL; - __no_warn(local, - drbd_free_bc(device->ldev); - device->ldev = NULL;); - - clear_bit(GO_DISKLESS, &device->flags); -} - -static int w_go_diskless(struct drbd_work *w, int unused) -{ - struct drbd_device *device = - container_of(w, struct drbd_device, go_diskless); - - D_ASSERT(device, device->state.disk == D_FAILED); - /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will - * inc/dec it frequently. Once we are D_DISKLESS, no one will touch - * the protected members anymore, though, so once put_ldev reaches zero - * again, it will be safe to free them. */ - - /* Try to write changed bitmap pages, read errors may have just - * set some bits outside the area covered by the activity log. - * - * If we have an IO error during the bitmap writeout, - * we will want a full sync next time, just in case. - * (Do we want a specific meta data flag for this?) - * - * If that does not make it to stable storage either, - * we cannot do anything about that anymore. - * - * We still need to check if both bitmap and ldev are present, we may - * end up here after a failed attach, before ldev was even assigned. - */ - if (device->bitmap && device->ldev) { - /* An interrupted resync or similar is allowed to recounts bits - * while we detach. - * Any modifications would not be expected anymore, though. - */ - if (drbd_bitmap_io_from_worker(device, drbd_bm_write, - "detach", BM_LOCKED_TEST_ALLOWED)) { - if (test_bit(WAS_READ_ERROR, &device->flags)) { - drbd_md_set_flag(device, MDF_FULL_SYNC); - drbd_md_sync(device); - } - } - } - - drbd_force_state(device, NS(disk, D_DISKLESS)); - return 0; -} - /** * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap * @device: DRBD device. @@ -3603,6 +3566,9 @@ static int w_go_diskless(struct drbd_work *w, int unused) * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be * called from worker context. It MUST NOT be used while a previous such * work is still pending! + * + * Its worker function encloses the call of io_fn() by get_ldev() and + * put_ldev(). */ void drbd_queue_bitmap_io(struct drbd_device *device, int (*io_fn)(struct drbd_device *), @@ -3685,25 +3651,7 @@ int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag) static void md_sync_timer_fn(unsigned long data) { struct drbd_device *device = (struct drbd_device *) data; - - /* must not double-queue! */ - if (list_empty(&device->md_sync_work.list)) - drbd_queue_work_front(&first_peer_device(device)->connection->sender_work, - &device->md_sync_work); -} - -static int w_md_sync(struct drbd_work *w, int unused) -{ - struct drbd_device *device = - container_of(w, struct drbd_device, md_sync_work); - - drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n"); -#ifdef DEBUG - drbd_warn(device, "last md_mark_dirty: %s:%u\n", - device->last_md_mark_dirty.func, device->last_md_mark_dirty.line); -#endif - drbd_md_sync(device); - return 0; + drbd_device_post_work(device, MD_SYNC); } const char *cmdname(enum drbd_packet cmd) diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c index 3f2e16738080..1cd47df44bda 100644 --- a/drivers/block/drbd/drbd_nl.c +++ b/drivers/block/drbd/drbd_nl.c @@ -23,6 +23,8 @@ */ +#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt + #include <linux/module.h> #include <linux/drbd.h> #include <linux/in.h> @@ -85,7 +87,7 @@ static void drbd_adm_send_reply(struct sk_buff *skb, struct genl_info *info) { genlmsg_end(skb, genlmsg_data(nlmsg_data(nlmsg_hdr(skb)))); if (genlmsg_reply(skb, info)) - printk(KERN_ERR "drbd: error sending genl reply\n"); + pr_err("error sending genl reply\n"); } /* Used on a fresh "drbd_adm_prepare"d reply_skb, this cannot fail: The only @@ -558,8 +560,10 @@ void conn_try_outdate_peer_async(struct drbd_connection *connection) } enum drbd_state_rv -drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force) +drbd_set_role(struct drbd_device *const device, enum drbd_role new_role, int force) { + struct drbd_peer_device *const peer_device = first_peer_device(device); + struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; const int max_tries = 4; enum drbd_state_rv rv = SS_UNKNOWN_ERROR; struct net_conf *nc; @@ -607,7 +611,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force) device->state.disk == D_CONSISTENT && mask.pdsk == 0) { D_ASSERT(device, device->state.pdsk == D_UNKNOWN); - if (conn_try_outdate_peer(first_peer_device(device)->connection)) { + if (conn_try_outdate_peer(connection)) { val.disk = D_UP_TO_DATE; mask.disk = D_MASK; } @@ -617,7 +621,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force) if (rv == SS_NOTHING_TO_DO) goto out; if (rv == SS_PRIMARY_NOP && mask.pdsk == 0) { - if (!conn_try_outdate_peer(first_peer_device(device)->connection) && force) { + if (!conn_try_outdate_peer(connection) && force) { drbd_warn(device, "Forced into split brain situation!\n"); mask.pdsk = D_MASK; val.pdsk = D_OUTDATED; @@ -630,7 +634,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force) retry at most once more in this case. */ int timeo; rcu_read_lock(); - nc = rcu_dereference(first_peer_device(device)->connection->net_conf); + nc = rcu_dereference(connection->net_conf); timeo = nc ? (nc->ping_timeo + 1) * HZ / 10 : 1; rcu_read_unlock(); schedule_timeout_interruptible(timeo); @@ -659,19 +663,17 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force) /* FIXME also wait for all pending P_BARRIER_ACK? */ if (new_role == R_SECONDARY) { - set_disk_ro(device->vdisk, true); if (get_ldev(device)) { device->ldev->md.uuid[UI_CURRENT] &= ~(u64)1; put_ldev(device); } } else { - /* Called from drbd_adm_set_role only. - * We are still holding the conf_update mutex. */ - nc = first_peer_device(device)->connection->net_conf; + mutex_lock(&device->resource->conf_update); + nc = connection->net_conf; if (nc) nc->discard_my_data = 0; /* without copy; single bit op is atomic */ + mutex_unlock(&device->resource->conf_update); - set_disk_ro(device->vdisk, false); if (get_ldev(device)) { if (((device->state.conn < C_CONNECTED || device->state.pdsk <= D_FAILED) @@ -689,12 +691,12 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force) if (device->state.conn >= C_WF_REPORT_PARAMS) { /* if this was forced, we should consider sync */ if (forced) - drbd_send_uuids(first_peer_device(device)); - drbd_send_current_state(first_peer_device(device)); + drbd_send_uuids(peer_device); + drbd_send_current_state(peer_device); } drbd_md_sync(device); - + set_disk_ro(device->vdisk, new_role == R_SECONDARY); kobject_uevent(&disk_to_dev(device->vdisk)->kobj, KOBJ_CHANGE); out: mutex_unlock(device->state_mutex); @@ -891,7 +893,7 @@ drbd_determine_dev_size(struct drbd_device *device, enum dds_flags flags, struct * still lock the act_log to not trigger ASSERTs there. */ drbd_suspend_io(device); - buffer = drbd_md_get_buffer(device); /* Lock meta-data IO */ + buffer = drbd_md_get_buffer(device, __func__); /* Lock meta-data IO */ if (!buffer) { drbd_resume_io(device); return DS_ERROR; @@ -971,6 +973,10 @@ drbd_determine_dev_size(struct drbd_device *device, enum dds_flags flags, struct if (la_size_changed || md_moved || rs) { u32 prev_flags; + /* We do some synchronous IO below, which may take some time. + * Clear the timer, to avoid scary "timer expired!" messages, + * "Superblock" is written out at least twice below, anyways. */ + del_timer(&device->md_sync_timer); drbd_al_shrink(device); /* All extents inactive. */ prev_flags = md->flags; @@ -1116,15 +1122,16 @@ static int drbd_check_al_size(struct drbd_device *device, struct disk_conf *dc) return 0; } -static void drbd_setup_queue_param(struct drbd_device *device, unsigned int max_bio_size) +static void drbd_setup_queue_param(struct drbd_device *device, struct drbd_backing_dev *bdev, + unsigned int max_bio_size) { struct request_queue * const q = device->rq_queue; unsigned int max_hw_sectors = max_bio_size >> 9; unsigned int max_segments = 0; struct request_queue *b = NULL; - if (get_ldev_if_state(device, D_ATTACHING)) { - b = device->ldev->backing_bdev->bd_disk->queue; + if (bdev) { + b = bdev->backing_bdev->bd_disk->queue; max_hw_sectors = min(queue_max_hw_sectors(b), max_bio_size >> 9); rcu_read_lock(); @@ -1169,11 +1176,10 @@ static void drbd_setup_queue_param(struct drbd_device *device, unsigned int max_ b->backing_dev_info.ra_pages); q->backing_dev_info.ra_pages = b->backing_dev_info.ra_pages; } - put_ldev(device); } } -void drbd_reconsider_max_bio_size(struct drbd_device *device) +void drbd_reconsider_max_bio_size(struct drbd_device *device, struct drbd_backing_dev *bdev) { unsigned int now, new, local, peer; @@ -1181,10 +1187,9 @@ void drbd_reconsider_max_bio_size(struct drbd_device *device) local = device->local_max_bio_size; /* Eventually last known value, from volatile memory */ peer = device->peer_max_bio_size; /* Eventually last known value, from meta data */ - if (get_ldev_if_state(device, D_ATTACHING)) { - local = queue_max_hw_sectors(device->ldev->backing_bdev->bd_disk->queue) << 9; + if (bdev) { + local = queue_max_hw_sectors(bdev->backing_bdev->bd_disk->queue) << 9; device->local_max_bio_size = local; - put_ldev(device); } local = min(local, DRBD_MAX_BIO_SIZE); @@ -1217,7 +1222,7 @@ void drbd_reconsider_max_bio_size(struct drbd_device *device) if (new != now) drbd_info(device, "max BIO size = %u\n", new); - drbd_setup_queue_param(device, new); + drbd_setup_queue_param(device, bdev, new); } /* Starts the worker thread */ @@ -1299,6 +1304,13 @@ static unsigned int drbd_al_extents_max(struct drbd_backing_dev *bdev) return (al_size_4k - 1) * AL_CONTEXT_PER_TRANSACTION; } +static bool write_ordering_changed(struct disk_conf *a, struct disk_conf *b) +{ + return a->disk_barrier != b->disk_barrier || + a->disk_flushes != b->disk_flushes || + a->disk_drain != b->disk_drain; +} + int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info) { struct drbd_config_context adm_ctx; @@ -1405,7 +1417,8 @@ int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info) else set_bit(MD_NO_FUA, &device->flags); - drbd_bump_write_ordering(first_peer_device(device)->connection, WO_bdev_flush); + if (write_ordering_changed(old_disk_conf, new_disk_conf)) + drbd_bump_write_ordering(device->resource, NULL, WO_bdev_flush); drbd_md_sync(device); @@ -1440,6 +1453,8 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info) { struct drbd_config_context adm_ctx; struct drbd_device *device; + struct drbd_peer_device *peer_device; + struct drbd_connection *connection; int err; enum drbd_ret_code retcode; enum determine_dev_size dd; @@ -1462,7 +1477,9 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info) device = adm_ctx.device; mutex_lock(&adm_ctx.resource->adm_mutex); - conn_reconfig_start(first_peer_device(device)->connection); + peer_device = first_peer_device(device); + connection = peer_device ? peer_device->connection : NULL; + conn_reconfig_start(connection); /* if you want to reconfigure, please tear down first */ if (device->state.disk > D_DISKLESS) { @@ -1473,7 +1490,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info) * drbd_ldev_destroy is done already, we may end up here very fast, * e.g. if someone calls attach from the on-io-error handler, * to realize a "hot spare" feature (not that I'd recommend that) */ - wait_event(device->misc_wait, !atomic_read(&device->local_cnt)); + wait_event(device->misc_wait, !test_bit(GOING_DISKLESS, &device->flags)); /* make sure there is no leftover from previous force-detach attempts */ clear_bit(FORCE_DETACH, &device->flags); @@ -1529,7 +1546,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info) goto fail; rcu_read_lock(); - nc = rcu_dereference(first_peer_device(device)->connection->net_conf); + nc = rcu_dereference(connection->net_conf); if (nc) { if (new_disk_conf->fencing == FP_STONITH && nc->wire_protocol == DRBD_PROT_A) { rcu_read_unlock(); @@ -1649,7 +1666,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info) */ wait_event(device->misc_wait, !atomic_read(&device->ap_pending_cnt) || drbd_suspended(device)); /* and for any other previously queued work */ - drbd_flush_workqueue(&first_peer_device(device)->connection->sender_work); + drbd_flush_workqueue(&connection->sender_work); rv = _drbd_request_state(device, NS(disk, D_ATTACHING), CS_VERBOSE); retcode = rv; /* FIXME: Type mismatch. */ @@ -1710,7 +1727,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info) new_disk_conf = NULL; new_plan = NULL; - drbd_bump_write_ordering(first_peer_device(device)->connection, WO_bdev_flush); + drbd_bump_write_ordering(device->resource, device->ldev, WO_bdev_flush); if (drbd_md_test_flag(device->ldev, MDF_CRASHED_PRIMARY)) set_bit(CRASHED_PRIMARY, &device->flags); @@ -1726,7 +1743,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info) device->read_cnt = 0; device->writ_cnt = 0; - drbd_reconsider_max_bio_size(device); + drbd_reconsider_max_bio_size(device, device->ldev); /* If I am currently not R_PRIMARY, * but meta data primary indicator is set, @@ -1845,7 +1862,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info) kobject_uevent(&disk_to_dev(device->vdisk)->kobj, KOBJ_CHANGE); put_ldev(device); - conn_reconfig_done(first_peer_device(device)->connection); + conn_reconfig_done(connection); mutex_unlock(&adm_ctx.resource->adm_mutex); drbd_adm_finish(&adm_ctx, info, retcode); return 0; @@ -1856,7 +1873,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info) drbd_force_state(device, NS(disk, D_DISKLESS)); drbd_md_sync(device); fail: - conn_reconfig_done(first_peer_device(device)->connection); + conn_reconfig_done(connection); if (nbc) { if (nbc->backing_bdev) blkdev_put(nbc->backing_bdev, @@ -1888,7 +1905,7 @@ static int adm_detach(struct drbd_device *device, int force) } drbd_suspend_io(device); /* so no-one is stuck in drbd_al_begin_io */ - drbd_md_get_buffer(device); /* make sure there is no in-flight meta-data IO */ + drbd_md_get_buffer(device, __func__); /* make sure there is no in-flight meta-data IO */ retcode = drbd_request_state(device, NS(disk, D_FAILED)); drbd_md_put_buffer(device); /* D_FAILED will transition to DISKLESS. */ @@ -2654,8 +2671,13 @@ int drbd_adm_invalidate(struct sk_buff *skb, struct genl_info *info) if (retcode != NO_ERROR) goto out; - mutex_lock(&adm_ctx.resource->adm_mutex); device = adm_ctx.device; + if (!get_ldev(device)) { + retcode = ERR_NO_DISK; + goto out; + } + + mutex_lock(&adm_ctx.resource->adm_mutex); /* If there is still bitmap IO pending, probably because of a previous * resync just being finished, wait for it before requesting a new resync. @@ -2679,6 +2701,7 @@ int drbd_adm_invalidate(struct sk_buff *skb, struct genl_info *info) retcode = drbd_request_state(device, NS(conn, C_STARTING_SYNC_T)); drbd_resume_io(device); mutex_unlock(&adm_ctx.resource->adm_mutex); + put_ldev(device); out: drbd_adm_finish(&adm_ctx, info, retcode); return 0; @@ -2704,7 +2727,7 @@ out: return 0; } -static int drbd_bmio_set_susp_al(struct drbd_device *device) +static int drbd_bmio_set_susp_al(struct drbd_device *device) __must_hold(local) { int rv; @@ -2725,8 +2748,13 @@ int drbd_adm_invalidate_peer(struct sk_buff *skb, struct genl_info *info) if (retcode != NO_ERROR) goto out; - mutex_lock(&adm_ctx.resource->adm_mutex); device = adm_ctx.device; + if (!get_ldev(device)) { + retcode = ERR_NO_DISK; + goto out; + } + + mutex_lock(&adm_ctx.resource->adm_mutex); /* If there is still bitmap IO pending, probably because of a previous * resync just being finished, wait for it before requesting a new resync. @@ -2753,6 +2781,7 @@ int drbd_adm_invalidate_peer(struct sk_buff *skb, struct genl_info *info) retcode = drbd_request_state(device, NS(conn, C_STARTING_SYNC_S)); drbd_resume_io(device); mutex_unlock(&adm_ctx.resource->adm_mutex); + put_ldev(device); out: drbd_adm_finish(&adm_ctx, info, retcode); return 0; @@ -2892,7 +2921,7 @@ static struct drbd_connection *the_only_connection(struct drbd_resource *resourc return list_first_entry(&resource->connections, struct drbd_connection, connections); } -int nla_put_status_info(struct sk_buff *skb, struct drbd_device *device, +static int nla_put_status_info(struct sk_buff *skb, struct drbd_device *device, const struct sib_info *sib) { struct drbd_resource *resource = device->resource; @@ -3622,13 +3651,6 @@ void drbd_bcast_event(struct drbd_device *device, const struct sib_info *sib) unsigned seq; int err = -ENOMEM; - if (sib->sib_reason == SIB_SYNC_PROGRESS) { - if (time_after(jiffies, device->rs_last_bcast + HZ)) - device->rs_last_bcast = jiffies; - else - return; - } - seq = atomic_inc_return(&drbd_genl_seq); msg = genlmsg_new(NLMSG_GOODSIZE, GFP_NOIO); if (!msg) diff --git a/drivers/block/drbd/drbd_proc.c b/drivers/block/drbd/drbd_proc.c index 89736bdbbc70..06e6147c7601 100644 --- a/drivers/block/drbd/drbd_proc.c +++ b/drivers/block/drbd/drbd_proc.c @@ -60,20 +60,65 @@ static void seq_printf_with_thousands_grouping(struct seq_file *seq, long v) seq_printf(seq, "%ld", v); } +static void drbd_get_syncer_progress(struct drbd_device *device, + union drbd_dev_state state, unsigned long *rs_total, + unsigned long *bits_left, unsigned int *per_mil_done) +{ + /* this is to break it at compile time when we change that, in case we + * want to support more than (1<<32) bits on a 32bit arch. */ + typecheck(unsigned long, device->rs_total); + *rs_total = device->rs_total; + + /* note: both rs_total and rs_left are in bits, i.e. in + * units of BM_BLOCK_SIZE. + * for the percentage, we don't care. */ + + if (state.conn == C_VERIFY_S || state.conn == C_VERIFY_T) + *bits_left = device->ov_left; + else + *bits_left = drbd_bm_total_weight(device) - device->rs_failed; + /* >> 10 to prevent overflow, + * +1 to prevent division by zero */ + if (*bits_left > *rs_total) { + /* D'oh. Maybe a logic bug somewhere. More likely just a race + * between state change and reset of rs_total. + */ + *bits_left = *rs_total; + *per_mil_done = *rs_total ? 0 : 1000; + } else { + /* Make sure the division happens in long context. + * We allow up to one petabyte storage right now, + * at a granularity of 4k per bit that is 2**38 bits. + * After shift right and multiplication by 1000, + * this should still fit easily into a 32bit long, + * so we don't need a 64bit division on 32bit arch. + * Note: currently we don't support such large bitmaps on 32bit + * arch anyways, but no harm done to be prepared for it here. + */ + unsigned int shift = *rs_total > UINT_MAX ? 16 : 10; + unsigned long left = *bits_left >> shift; + unsigned long total = 1UL + (*rs_total >> shift); + unsigned long tmp = 1000UL - left * 1000UL/total; + *per_mil_done = tmp; + } +} + + /*lge * progress bars shamelessly adapted from driver/md/md.c * output looks like * [=====>..............] 33.5% (23456/123456) * finish: 2:20:20 speed: 6,345 (6,456) K/sec */ -static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *seq) +static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *seq, + union drbd_dev_state state) { - unsigned long db, dt, dbdt, rt, rs_left; + unsigned long db, dt, dbdt, rt, rs_total, rs_left; unsigned int res; int i, x, y; int stalled = 0; - drbd_get_syncer_progress(device, &rs_left, &res); + drbd_get_syncer_progress(device, state, &rs_total, &rs_left, &res); x = res/50; y = 20-x; @@ -85,21 +130,21 @@ static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *se seq_printf(seq, "."); seq_printf(seq, "] "); - if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) + if (state.conn == C_VERIFY_S || state.conn == C_VERIFY_T) seq_printf(seq, "verified:"); else seq_printf(seq, "sync'ed:"); seq_printf(seq, "%3u.%u%% ", res / 10, res % 10); /* if more than a few GB, display in MB */ - if (device->rs_total > (4UL << (30 - BM_BLOCK_SHIFT))) + if (rs_total > (4UL << (30 - BM_BLOCK_SHIFT))) seq_printf(seq, "(%lu/%lu)M", (unsigned long) Bit2KB(rs_left >> 10), - (unsigned long) Bit2KB(device->rs_total >> 10)); + (unsigned long) Bit2KB(rs_total >> 10)); else seq_printf(seq, "(%lu/%lu)K\n\t", (unsigned long) Bit2KB(rs_left), - (unsigned long) Bit2KB(device->rs_total)); + (unsigned long) Bit2KB(rs_total)); /* see drivers/md/md.c * We do not want to overflow, so the order of operands and @@ -150,13 +195,13 @@ static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *se dt = (jiffies - device->rs_start - device->rs_paused) / HZ; if (dt == 0) dt = 1; - db = device->rs_total - rs_left; + db = rs_total - rs_left; dbdt = Bit2KB(db/dt); seq_printf_with_thousands_grouping(seq, dbdt); seq_printf(seq, ")"); - if (device->state.conn == C_SYNC_TARGET || - device->state.conn == C_VERIFY_S) { + if (state.conn == C_SYNC_TARGET || + state.conn == C_VERIFY_S) { seq_printf(seq, " want: "); seq_printf_with_thousands_grouping(seq, device->c_sync_rate); } @@ -168,8 +213,8 @@ static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *se unsigned long bm_bits = drbd_bm_bits(device); unsigned long bit_pos; unsigned long long stop_sector = 0; - if (device->state.conn == C_VERIFY_S || - device->state.conn == C_VERIFY_T) { + if (state.conn == C_VERIFY_S || + state.conn == C_VERIFY_T) { bit_pos = bm_bits - device->ov_left; if (verify_can_do_stop_sector(device)) stop_sector = device->ov_stop_sector; @@ -188,22 +233,13 @@ static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *se } } -static void resync_dump_detail(struct seq_file *seq, struct lc_element *e) -{ - struct bm_extent *bme = lc_entry(e, struct bm_extent, lce); - - seq_printf(seq, "%5d %s %s\n", bme->rs_left, - bme->flags & BME_NO_WRITES ? "NO_WRITES" : "---------", - bme->flags & BME_LOCKED ? "LOCKED" : "------" - ); -} - static int drbd_seq_show(struct seq_file *seq, void *v) { int i, prev_i = -1; const char *sn; struct drbd_device *device; struct net_conf *nc; + union drbd_dev_state state; char wp; static char write_ordering_chars[] = { @@ -241,11 +277,12 @@ static int drbd_seq_show(struct seq_file *seq, void *v) seq_printf(seq, "\n"); prev_i = i; - sn = drbd_conn_str(device->state.conn); + state = device->state; + sn = drbd_conn_str(state.conn); - if (device->state.conn == C_STANDALONE && - device->state.disk == D_DISKLESS && - device->state.role == R_SECONDARY) { + if (state.conn == C_STANDALONE && + state.disk == D_DISKLESS && + state.role == R_SECONDARY) { seq_printf(seq, "%2d: cs:Unconfigured\n", i); } else { /* reset device->congestion_reason */ @@ -258,15 +295,15 @@ static int drbd_seq_show(struct seq_file *seq, void *v) " ns:%u nr:%u dw:%u dr:%u al:%u bm:%u " "lo:%d pe:%d ua:%d ap:%d ep:%d wo:%c", i, sn, - drbd_role_str(device->state.role), - drbd_role_str(device->state.peer), - drbd_disk_str(device->state.disk), - drbd_disk_str(device->state.pdsk), + drbd_role_str(state.role), + drbd_role_str(state.peer), + drbd_disk_str(state.disk), + drbd_disk_str(state.pdsk), wp, drbd_suspended(device) ? 's' : 'r', - device->state.aftr_isp ? 'a' : '-', - device->state.peer_isp ? 'p' : '-', - device->state.user_isp ? 'u' : '-', + state.aftr_isp ? 'a' : '-', + state.peer_isp ? 'p' : '-', + state.user_isp ? 'u' : '-', device->congestion_reason ?: '-', test_bit(AL_SUSPENDED, &device->flags) ? 's' : '-', device->send_cnt/2, @@ -281,17 +318,17 @@ static int drbd_seq_show(struct seq_file *seq, void *v) atomic_read(&device->unacked_cnt), atomic_read(&device->ap_bio_cnt), first_peer_device(device)->connection->epochs, - write_ordering_chars[first_peer_device(device)->connection->write_ordering] + write_ordering_chars[device->resource->write_ordering] ); seq_printf(seq, " oos:%llu\n", Bit2KB((unsigned long long) drbd_bm_total_weight(device))); } - if (device->state.conn == C_SYNC_SOURCE || - device->state.conn == C_SYNC_TARGET || - device->state.conn == C_VERIFY_S || - device->state.conn == C_VERIFY_T) - drbd_syncer_progress(device, seq); + if (state.conn == C_SYNC_SOURCE || + state.conn == C_SYNC_TARGET || + state.conn == C_VERIFY_S || + state.conn == C_VERIFY_T) + drbd_syncer_progress(device, seq, state); if (proc_details >= 1 && get_ldev_if_state(device, D_FAILED)) { lc_seq_printf_stats(seq, device->resync); @@ -299,12 +336,8 @@ static int drbd_seq_show(struct seq_file *seq, void *v) put_ldev(device); } - if (proc_details >= 2) { - if (device->resync) { - lc_seq_dump_details(seq, device->resync, "rs_left", - resync_dump_detail); - } - } + if (proc_details >= 2) + seq_printf(seq, "\tblocked on activity log: %d\n", atomic_read(&device->ap_actlog_cnt)); } rcu_read_unlock(); @@ -316,7 +349,7 @@ static int drbd_proc_open(struct inode *inode, struct file *file) int err; if (try_module_get(THIS_MODULE)) { - err = single_open(file, drbd_seq_show, PDE_DATA(inode)); + err = single_open(file, drbd_seq_show, NULL); if (err) module_put(THIS_MODULE); return err; diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c index 5b17ec88ea05..9342b8da73ab 100644 --- a/drivers/block/drbd/drbd_receiver.c +++ b/drivers/block/drbd/drbd_receiver.c @@ -362,17 +362,14 @@ drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t secto goto fail; } + memset(peer_req, 0, sizeof(*peer_req)); + INIT_LIST_HEAD(&peer_req->w.list); drbd_clear_interval(&peer_req->i); peer_req->i.size = data_size; peer_req->i.sector = sector; - peer_req->i.local = false; - peer_req->i.waiting = false; - - peer_req->epoch = NULL; + peer_req->submit_jif = jiffies; peer_req->peer_device = peer_device; peer_req->pages = page; - atomic_set(&peer_req->pending_bios, 0); - peer_req->flags = 0; /* * The block_id is opaque to the receiver. It is not endianness * converted, and sent back to the sender unchanged. @@ -389,11 +386,16 @@ drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t secto void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req, int is_net) { + might_sleep(); if (peer_req->flags & EE_HAS_DIGEST) kfree(peer_req->digest); drbd_free_pages(device, peer_req->pages, is_net); D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0); D_ASSERT(device, drbd_interval_empty(&peer_req->i)); + if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) { + peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; + drbd_al_complete_io(device, &peer_req->i); + } mempool_free(peer_req, drbd_ee_mempool); } @@ -791,8 +793,18 @@ static int receive_first_packet(struct drbd_connection *connection, struct socke { unsigned int header_size = drbd_header_size(connection); struct packet_info pi; + struct net_conf *nc; int err; + rcu_read_lock(); + nc = rcu_dereference(connection->net_conf); + if (!nc) { + rcu_read_unlock(); + return -EIO; + } + sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10; + rcu_read_unlock(); + err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0); if (err != header_size) { if (err >= 0) @@ -809,7 +821,7 @@ static int receive_first_packet(struct drbd_connection *connection, struct socke * drbd_socket_okay() - Free the socket if its connection is not okay * @sock: pointer to the pointer to the socket. */ -static int drbd_socket_okay(struct socket **sock) +static bool drbd_socket_okay(struct socket **sock) { int rr; char tb[4]; @@ -827,6 +839,30 @@ static int drbd_socket_okay(struct socket **sock) return false; } } + +static bool connection_established(struct drbd_connection *connection, + struct socket **sock1, + struct socket **sock2) +{ + struct net_conf *nc; + int timeout; + bool ok; + + if (!*sock1 || !*sock2) + return false; + + rcu_read_lock(); + nc = rcu_dereference(connection->net_conf); + timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10; + rcu_read_unlock(); + schedule_timeout_interruptible(timeout); + + ok = drbd_socket_okay(sock1); + ok = drbd_socket_okay(sock2) && ok; + + return ok; +} + /* Gets called if a connection is established, or if a new minor gets created in a connection */ int drbd_connected(struct drbd_peer_device *peer_device) @@ -868,8 +904,8 @@ static int conn_connect(struct drbd_connection *connection) struct drbd_socket sock, msock; struct drbd_peer_device *peer_device; struct net_conf *nc; - int vnr, timeout, h, ok; - bool discard_my_data; + int vnr, timeout, h; + bool discard_my_data, ok; enum drbd_state_rv rv; struct accept_wait_data ad = { .connection = connection, @@ -913,17 +949,8 @@ static int conn_connect(struct drbd_connection *connection) } } - if (sock.socket && msock.socket) { - rcu_read_lock(); - nc = rcu_dereference(connection->net_conf); - timeout = nc->ping_timeo * HZ / 10; - rcu_read_unlock(); - schedule_timeout_interruptible(timeout); - ok = drbd_socket_okay(&sock.socket); - ok = drbd_socket_okay(&msock.socket) && ok; - if (ok) - break; - } + if (connection_established(connection, &sock.socket, &msock.socket)) + break; retry: s = drbd_wait_for_connect(connection, &ad); @@ -969,8 +996,7 @@ randomize: goto out_release_sockets; } - ok = drbd_socket_okay(&sock.socket); - ok = drbd_socket_okay(&msock.socket) && ok; + ok = connection_established(connection, &sock.socket, &msock.socket); } while (!ok); if (ad.s_listen) @@ -1151,7 +1177,7 @@ static void drbd_flush(struct drbd_connection *connection) struct drbd_peer_device *peer_device; int vnr; - if (connection->write_ordering >= WO_bdev_flush) { + if (connection->resource->write_ordering >= WO_bdev_flush) { rcu_read_lock(); idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { struct drbd_device *device = peer_device->device; @@ -1161,14 +1187,22 @@ static void drbd_flush(struct drbd_connection *connection) kref_get(&device->kref); rcu_read_unlock(); + /* Right now, we have only this one synchronous code path + * for flushes between request epochs. + * We may want to make those asynchronous, + * or at least parallelize the flushes to the volume devices. + */ + device->flush_jif = jiffies; + set_bit(FLUSH_PENDING, &device->flags); rv = blkdev_issue_flush(device->ldev->backing_bdev, GFP_NOIO, NULL); + clear_bit(FLUSH_PENDING, &device->flags); if (rv) { drbd_info(device, "local disk flush failed with status %d\n", rv); /* would rather check on EOPNOTSUPP, but that is not reliable. * don't try again for ANY return value != 0 * if (rv == -EOPNOTSUPP) */ - drbd_bump_write_ordering(connection, WO_drain_io); + drbd_bump_write_ordering(connection->resource, NULL, WO_drain_io); } put_ldev(device); kref_put(&device->kref, drbd_destroy_device); @@ -1257,15 +1291,30 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connectio return rv; } +static enum write_ordering_e +max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo) +{ + struct disk_conf *dc; + + dc = rcu_dereference(bdev->disk_conf); + + if (wo == WO_bdev_flush && !dc->disk_flushes) + wo = WO_drain_io; + if (wo == WO_drain_io && !dc->disk_drain) + wo = WO_none; + + return wo; +} + /** * drbd_bump_write_ordering() - Fall back to an other write ordering method * @connection: DRBD connection. * @wo: Write ordering method to try. */ -void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo) +void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev, + enum write_ordering_e wo) { - struct disk_conf *dc; - struct drbd_peer_device *peer_device; + struct drbd_device *device; enum write_ordering_e pwo; int vnr; static char *write_ordering_str[] = { @@ -1274,26 +1323,27 @@ void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ord [WO_bdev_flush] = "flush", }; - pwo = connection->write_ordering; - wo = min(pwo, wo); + pwo = resource->write_ordering; + if (wo != WO_bdev_flush) + wo = min(pwo, wo); rcu_read_lock(); - idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { - struct drbd_device *device = peer_device->device; + idr_for_each_entry(&resource->devices, device, vnr) { + if (get_ldev(device)) { + wo = max_allowed_wo(device->ldev, wo); + if (device->ldev == bdev) + bdev = NULL; + put_ldev(device); + } + } - if (!get_ldev_if_state(device, D_ATTACHING)) - continue; - dc = rcu_dereference(device->ldev->disk_conf); + if (bdev) + wo = max_allowed_wo(bdev, wo); - if (wo == WO_bdev_flush && !dc->disk_flushes) - wo = WO_drain_io; - if (wo == WO_drain_io && !dc->disk_drain) - wo = WO_none; - put_ldev(device); - } rcu_read_unlock(); - connection->write_ordering = wo; - if (pwo != connection->write_ordering || wo == WO_bdev_flush) - drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]); + + resource->write_ordering = wo; + if (pwo != resource->write_ordering || wo == WO_bdev_flush) + drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]); } /** @@ -1330,6 +1380,13 @@ int drbd_submit_peer_request(struct drbd_device *device, /* wait for all pending IO completions, before we start * zeroing things out. */ conn_wait_active_ee_empty(first_peer_device(device)->connection); + /* add it to the active list now, + * so we can find it to present it in debugfs */ + peer_req->submit_jif = jiffies; + peer_req->flags |= EE_SUBMITTED; + spin_lock_irq(&device->resource->req_lock); + list_add_tail(&peer_req->w.list, &device->active_ee); + spin_unlock_irq(&device->resource->req_lock); if (blkdev_issue_zeroout(device->ldev->backing_bdev, sector, ds >> 9, GFP_NOIO)) peer_req->flags |= EE_WAS_ERROR; @@ -1398,6 +1455,9 @@ submit: D_ASSERT(device, page == NULL); atomic_set(&peer_req->pending_bios, n_bios); + /* for debugfs: update timestamp, mark as submitted */ + peer_req->submit_jif = jiffies; + peer_req->flags |= EE_SUBMITTED; do { bio = bios; bios = bios->bi_next; @@ -1471,7 +1531,7 @@ static int receive_Barrier(struct drbd_connection *connection, struct packet_inf * R_PRIMARY crashes now. * Therefore we must send the barrier_ack after the barrier request was * completed. */ - switch (connection->write_ordering) { + switch (connection->resource->write_ordering) { case WO_none: if (rv == FE_RECYCLED) return 0; @@ -1498,7 +1558,8 @@ static int receive_Barrier(struct drbd_connection *connection, struct packet_inf return 0; default: - drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering); + drbd_err(connection, "Strangeness in connection->write_ordering %d\n", + connection->resource->write_ordering); return -EIO; } @@ -1531,7 +1592,7 @@ read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector, struct drbd_peer_request *peer_req; struct page *page; int dgs, ds, err; - int data_size = pi->size; + unsigned int data_size = pi->size; void *dig_in = peer_device->connection->int_dig_in; void *dig_vv = peer_device->connection->int_dig_vv; unsigned long *data; @@ -1578,6 +1639,7 @@ read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector, if (!peer_req) return NULL; + peer_req->flags |= EE_WRITE; if (trim) return peer_req; @@ -1734,9 +1796,10 @@ static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t secto * respective _drbd_clear_done_ee */ peer_req->w.cb = e_end_resync_block; + peer_req->submit_jif = jiffies; spin_lock_irq(&device->resource->req_lock); - list_add(&peer_req->w.list, &device->sync_ee); + list_add_tail(&peer_req->w.list, &device->sync_ee); spin_unlock_irq(&device->resource->req_lock); atomic_add(pi->size >> 9, &device->rs_sect_ev); @@ -1889,6 +1952,7 @@ static int e_end_block(struct drbd_work *w, int cancel) } dec_unacked(device); } + /* we delete from the conflict detection hash _after_ we sent out the * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */ if (peer_req->flags & EE_IN_INTERVAL_TREE) { @@ -2115,6 +2179,8 @@ static int handle_write_conflicts(struct drbd_device *device, drbd_for_each_overlap(i, &device->write_requests, sector, size) { if (i == &peer_req->i) continue; + if (i->completed) + continue; if (!i->local) { /* @@ -2147,7 +2213,6 @@ static int handle_write_conflicts(struct drbd_device *device, (unsigned long long)sector, size, superseded ? "local" : "remote"); - inc_unacked(device); peer_req->w.cb = superseded ? e_send_superseded : e_send_retry_write; list_add_tail(&peer_req->w.list, &device->done_ee); @@ -2206,6 +2271,7 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info * { struct drbd_peer_device *peer_device; struct drbd_device *device; + struct net_conf *nc; sector_t sector; struct drbd_peer_request *peer_req; struct p_data *p = pi->data; @@ -2245,6 +2311,8 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info * } peer_req->w.cb = e_end_block; + peer_req->submit_jif = jiffies; + peer_req->flags |= EE_APPLICATION; dp_flags = be32_to_cpu(p->dp_flags); rw |= wire_flags_to_bio(dp_flags); @@ -2271,9 +2339,36 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info * spin_unlock(&connection->epoch_lock); rcu_read_lock(); - tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries; + nc = rcu_dereference(peer_device->connection->net_conf); + tp = nc->two_primaries; + if (peer_device->connection->agreed_pro_version < 100) { + switch (nc->wire_protocol) { + case DRBD_PROT_C: + dp_flags |= DP_SEND_WRITE_ACK; + break; + case DRBD_PROT_B: + dp_flags |= DP_SEND_RECEIVE_ACK; + break; + } + } rcu_read_unlock(); + + if (dp_flags & DP_SEND_WRITE_ACK) { + peer_req->flags |= EE_SEND_WRITE_ACK; + inc_unacked(device); + /* corresponding dec_unacked() in e_end_block() + * respective _drbd_clear_done_ee */ + } + + if (dp_flags & DP_SEND_RECEIVE_ACK) { + /* I really don't like it that the receiver thread + * sends on the msock, but anyways */ + drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req); + } + if (tp) { + /* two primaries implies protocol C */ + D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK); peer_req->flags |= EE_IN_INTERVAL_TREE; err = wait_for_and_update_peer_seq(peer_device, peer_seq); if (err) @@ -2297,44 +2392,18 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info * * active_ee to become empty in drbd_submit_peer_request(); * better not add ourselves here. */ if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0) - list_add(&peer_req->w.list, &device->active_ee); + list_add_tail(&peer_req->w.list, &device->active_ee); spin_unlock_irq(&device->resource->req_lock); if (device->state.conn == C_SYNC_TARGET) wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req)); - if (peer_device->connection->agreed_pro_version < 100) { - rcu_read_lock(); - switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) { - case DRBD_PROT_C: - dp_flags |= DP_SEND_WRITE_ACK; - break; - case DRBD_PROT_B: - dp_flags |= DP_SEND_RECEIVE_ACK; - break; - } - rcu_read_unlock(); - } - - if (dp_flags & DP_SEND_WRITE_ACK) { - peer_req->flags |= EE_SEND_WRITE_ACK; - inc_unacked(device); - /* corresponding dec_unacked() in e_end_block() - * respective _drbd_clear_done_ee */ - } - - if (dp_flags & DP_SEND_RECEIVE_ACK) { - /* I really don't like it that the receiver thread - * sends on the msock, but anyways */ - drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req); - } - if (device->state.pdsk < D_INCONSISTENT) { /* In case we have the only disk of the cluster, */ drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size); - peer_req->flags |= EE_CALL_AL_COMPLETE_IO; peer_req->flags &= ~EE_MAY_SET_IN_SYNC; - drbd_al_begin_io(device, &peer_req->i, true); + drbd_al_begin_io(device, &peer_req->i); + peer_req->flags |= EE_CALL_AL_COMPLETE_IO; } err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR); @@ -2347,8 +2416,10 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info * list_del(&peer_req->w.list); drbd_remove_epoch_entry_interval(device, peer_req); spin_unlock_irq(&device->resource->req_lock); - if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) + if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) { + peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; drbd_al_complete_io(device, &peer_req->i); + } out_interrupted: drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP); @@ -2368,13 +2439,14 @@ out_interrupted: * The current sync rate used here uses only the most recent two step marks, * to have a short time average so we can react faster. */ -bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector) +bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector, + bool throttle_if_app_is_waiting) { struct lc_element *tmp; - bool throttle = true; + bool throttle = drbd_rs_c_min_rate_throttle(device); - if (!drbd_rs_c_min_rate_throttle(device)) - return false; + if (!throttle || throttle_if_app_is_waiting) + return throttle; spin_lock_irq(&device->al_lock); tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector)); @@ -2382,7 +2454,8 @@ bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector) struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce); if (test_bit(BME_PRIORITY, &bm_ext->flags)) throttle = false; - /* Do not slow down if app IO is already waiting for this extent */ + /* Do not slow down if app IO is already waiting for this extent, + * and our progress is necessary for application IO to complete. */ } spin_unlock_irq(&device->al_lock); @@ -2407,7 +2480,9 @@ bool drbd_rs_c_min_rate_throttle(struct drbd_device *device) curr_events = (int)part_stat_read(&disk->part0, sectors[0]) + (int)part_stat_read(&disk->part0, sectors[1]) - atomic_read(&device->rs_sect_ev); - if (!device->rs_last_events || curr_events - device->rs_last_events > 64) { + + if (atomic_read(&device->ap_actlog_cnt) + || !device->rs_last_events || curr_events - device->rs_last_events > 64) { unsigned long rs_left; int i; @@ -2508,6 +2583,7 @@ static int receive_DataRequest(struct drbd_connection *connection, struct packet peer_req->w.cb = w_e_end_data_req; fault_type = DRBD_FAULT_DT_RD; /* application IO, don't drbd_rs_begin_io */ + peer_req->flags |= EE_APPLICATION; goto submit; case P_RS_DATA_REQUEST: @@ -2538,6 +2614,8 @@ static int receive_DataRequest(struct drbd_connection *connection, struct packet peer_req->w.cb = w_e_end_csum_rs_req; /* used in the sector offset progress display */ device->bm_resync_fo = BM_SECT_TO_BIT(sector); + /* remember to report stats in drbd_resync_finished */ + device->use_csums = true; } else if (pi->cmd == P_OV_REPLY) { /* track progress, we may need to throttle */ atomic_add(size >> 9, &device->rs_sect_in); @@ -2595,8 +2673,20 @@ static int receive_DataRequest(struct drbd_connection *connection, struct packet * we would also throttle its application reads. * In that case, throttling is done on the SyncTarget only. */ - if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector)) + + /* Even though this may be a resync request, we do add to "read_ee"; + * "sync_ee" is only used for resync WRITEs. + * Add to list early, so debugfs can find this request + * even if we have to sleep below. */ + spin_lock_irq(&device->resource->req_lock); + list_add_tail(&peer_req->w.list, &device->read_ee); + spin_unlock_irq(&device->resource->req_lock); + + update_receiver_timing_details(connection, drbd_rs_should_slow_down); + if (device->state.peer != R_PRIMARY + && drbd_rs_should_slow_down(device, sector, false)) schedule_timeout_uninterruptible(HZ/10); + update_receiver_timing_details(connection, drbd_rs_begin_io); if (drbd_rs_begin_io(device, sector)) goto out_free_e; @@ -2604,22 +2694,20 @@ submit_for_resync: atomic_add(size >> 9, &device->rs_sect_ev); submit: + update_receiver_timing_details(connection, drbd_submit_peer_request); inc_unacked(device); - spin_lock_irq(&device->resource->req_lock); - list_add_tail(&peer_req->w.list, &device->read_ee); - spin_unlock_irq(&device->resource->req_lock); - if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0) return 0; /* don't care for the reason here */ drbd_err(device, "submit failed, triggering re-connect\n"); + +out_free_e: spin_lock_irq(&device->resource->req_lock); list_del(&peer_req->w.list); spin_unlock_irq(&device->resource->req_lock); /* no drbd_rs_complete_io(), we are dropping the connection anyways */ -out_free_e: put_ldev(device); drbd_free_peer_req(device, peer_req); return -EIO; @@ -2842,8 +2930,10 @@ static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid, -1091 requires proto 91 -1096 requires proto 96 */ -static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local) +static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local) { + struct drbd_peer_device *const peer_device = first_peer_device(device); + struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; u64 self, peer; int i, j; @@ -2869,7 +2959,7 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) { - if (first_peer_device(device)->connection->agreed_pro_version < 91) + if (connection->agreed_pro_version < 91) return -1091; if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) && @@ -2892,7 +2982,7 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) { - if (first_peer_device(device)->connection->agreed_pro_version < 91) + if (connection->agreed_pro_version < 91) return -1091; if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) && @@ -2925,7 +3015,7 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho case 1: /* self_pri && !peer_pri */ return 1; case 2: /* !self_pri && peer_pri */ return -1; case 3: /* self_pri && peer_pri */ - dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags); + dc = test_bit(RESOLVE_CONFLICTS, &connection->flags); return dc ? -1 : 1; } } @@ -2938,14 +3028,14 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho *rule_nr = 51; peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1); if (self == peer) { - if (first_peer_device(device)->connection->agreed_pro_version < 96 ? + if (connection->agreed_pro_version < 96 ? (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) : peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) { /* The last P_SYNC_UUID did not get though. Undo the last start of resync as sync source modifications of the peer's UUIDs. */ - if (first_peer_device(device)->connection->agreed_pro_version < 91) + if (connection->agreed_pro_version < 91) return -1091; device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START]; @@ -2975,14 +3065,14 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho *rule_nr = 71; self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1); if (self == peer) { - if (first_peer_device(device)->connection->agreed_pro_version < 96 ? + if (connection->agreed_pro_version < 96 ? (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) : self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) { /* The last P_SYNC_UUID did not get though. Undo the last start of resync as sync source modifications of our UUIDs. */ - if (first_peer_device(device)->connection->agreed_pro_version < 91) + if (connection->agreed_pro_version < 91) return -1091; __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]); @@ -3352,8 +3442,7 @@ disconnect: * return: NULL (alg name was "") * ERR_PTR(error) if something goes wrong * or the crypto hash ptr, if it worked out ok. */ -static -struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device, +static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device, const char *alg, const char *name) { struct crypto_hash *tfm; @@ -3639,7 +3728,7 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info struct drbd_device *device; struct p_sizes *p = pi->data; enum determine_dev_size dd = DS_UNCHANGED; - sector_t p_size, p_usize, my_usize; + sector_t p_size, p_usize, p_csize, my_usize; int ldsc = 0; /* local disk size changed */ enum dds_flags ddsf; @@ -3650,6 +3739,7 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info p_size = be64_to_cpu(p->d_size); p_usize = be64_to_cpu(p->u_size); + p_csize = be64_to_cpu(p->c_size); /* just store the peer's disk size for now. * we still need to figure out whether we accept that. */ @@ -3710,7 +3800,6 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info } device->peer_max_bio_size = be32_to_cpu(p->max_bio_size); - drbd_reconsider_max_bio_size(device); /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size(). In case we cleared the QUEUE_FLAG_DISCARD from our queue in drbd_reconsider_max_bio_size(), we can be sure that after @@ -3718,14 +3807,28 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info ddsf = be16_to_cpu(p->dds_flags); if (get_ldev(device)) { + drbd_reconsider_max_bio_size(device, device->ldev); dd = drbd_determine_dev_size(device, ddsf, NULL); put_ldev(device); if (dd == DS_ERROR) return -EIO; drbd_md_sync(device); } else { - /* I am diskless, need to accept the peer's size. */ - drbd_set_my_capacity(device, p_size); + /* + * I am diskless, need to accept the peer's *current* size. + * I must NOT accept the peers backing disk size, + * it may have been larger than mine all along... + * + * At this point, the peer knows more about my disk, or at + * least about what we last agreed upon, than myself. + * So if his c_size is less than his d_size, the most likely + * reason is that *my* d_size was smaller last time we checked. + * + * However, if he sends a zero current size, + * take his (user-capped or) backing disk size anyways. + */ + drbd_reconsider_max_bio_size(device, NULL); + drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size); } if (get_ldev(device)) { @@ -4501,6 +4604,7 @@ static void drbdd(struct drbd_connection *connection) struct data_cmd *cmd; drbd_thread_current_set_cpu(&connection->receiver); + update_receiver_timing_details(connection, drbd_recv_header); if (drbd_recv_header(connection, &pi)) goto err_out; @@ -4519,12 +4623,14 @@ static void drbdd(struct drbd_connection *connection) } if (shs) { + update_receiver_timing_details(connection, drbd_recv_all_warn); err = drbd_recv_all_warn(connection, pi.data, shs); if (err) goto err_out; pi.size -= shs; } + update_receiver_timing_details(connection, cmd->fn); err = cmd->fn(connection, &pi); if (err) { drbd_err(connection, "error receiving %s, e: %d l: %d!\n", diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c index 09803d0d5207..c67717d572d1 100644 --- a/drivers/block/drbd/drbd_req.c +++ b/drivers/block/drbd/drbd_req.c @@ -52,7 +52,7 @@ static void _drbd_start_io_acct(struct drbd_device *device, struct drbd_request static void _drbd_end_io_acct(struct drbd_device *device, struct drbd_request *req) { int rw = bio_data_dir(req->master_bio); - unsigned long duration = jiffies - req->start_time; + unsigned long duration = jiffies - req->start_jif; int cpu; cpu = part_stat_lock(); part_stat_add(cpu, &device->vdisk->part0, ticks[rw], duration); @@ -66,7 +66,7 @@ static struct drbd_request *drbd_req_new(struct drbd_device *device, { struct drbd_request *req; - req = mempool_alloc(drbd_request_mempool, GFP_NOIO); + req = mempool_alloc(drbd_request_mempool, GFP_NOIO | __GFP_ZERO); if (!req) return NULL; @@ -84,6 +84,8 @@ static struct drbd_request *drbd_req_new(struct drbd_device *device, INIT_LIST_HEAD(&req->tl_requests); INIT_LIST_HEAD(&req->w.list); + INIT_LIST_HEAD(&req->req_pending_master_completion); + INIT_LIST_HEAD(&req->req_pending_local); /* one reference to be put by __drbd_make_request */ atomic_set(&req->completion_ref, 1); @@ -92,6 +94,19 @@ static struct drbd_request *drbd_req_new(struct drbd_device *device, return req; } +static void drbd_remove_request_interval(struct rb_root *root, + struct drbd_request *req) +{ + struct drbd_device *device = req->device; + struct drbd_interval *i = &req->i; + + drbd_remove_interval(root, i); + + /* Wake up any processes waiting for this request to complete. */ + if (i->waiting) + wake_up(&device->misc_wait); +} + void drbd_req_destroy(struct kref *kref) { struct drbd_request *req = container_of(kref, struct drbd_request, kref); @@ -107,14 +122,30 @@ void drbd_req_destroy(struct kref *kref) return; } - /* remove it from the transfer log. - * well, only if it had been there in the first - * place... if it had not (local only or conflicting - * and never sent), it should still be "empty" as - * initialized in drbd_req_new(), so we can list_del() it - * here unconditionally */ + /* If called from mod_rq_state (expected normal case) or + * drbd_send_and_submit (the less likely normal path), this holds the + * req_lock, and req->tl_requests will typicaly be on ->transfer_log, + * though it may be still empty (never added to the transfer log). + * + * If called from do_retry(), we do NOT hold the req_lock, but we are + * still allowed to unconditionally list_del(&req->tl_requests), + * because it will be on a local on-stack list only. */ list_del_init(&req->tl_requests); + /* finally remove the request from the conflict detection + * respective block_id verification interval tree. */ + if (!drbd_interval_empty(&req->i)) { + struct rb_root *root; + + if (s & RQ_WRITE) + root = &device->write_requests; + else + root = &device->read_requests; + drbd_remove_request_interval(root, req); + } else if (s & (RQ_NET_MASK & ~RQ_NET_DONE) && req->i.size != 0) + drbd_err(device, "drbd_req_destroy: Logic BUG: interval empty, but: rq_state=0x%x, sect=%llu, size=%u\n", + s, (unsigned long long)req->i.sector, req->i.size); + /* if it was a write, we may have to set the corresponding * bit(s) out-of-sync first. If it had a local part, we need to * release the reference to the activity log. */ @@ -188,19 +219,6 @@ void complete_master_bio(struct drbd_device *device, } -static void drbd_remove_request_interval(struct rb_root *root, - struct drbd_request *req) -{ - struct drbd_device *device = req->device; - struct drbd_interval *i = &req->i; - - drbd_remove_interval(root, i); - - /* Wake up any processes waiting for this request to complete. */ - if (i->waiting) - wake_up(&device->misc_wait); -} - /* Helper for __req_mod(). * Set m->bio to the master bio, if it is fit to be completed, * or leave it alone (it is initialized to NULL in __req_mod), @@ -254,18 +272,6 @@ void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m) ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK); error = PTR_ERR(req->private_bio); - /* remove the request from the conflict detection - * respective block_id verification hash */ - if (!drbd_interval_empty(&req->i)) { - struct rb_root *root; - - if (rw == WRITE) - root = &device->write_requests; - else - root = &device->read_requests; - drbd_remove_request_interval(root, req); - } - /* Before we can signal completion to the upper layers, * we may need to close the current transfer log epoch. * We are within the request lock, so we can simply compare @@ -301,9 +307,24 @@ void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m) m->error = ok ? 0 : (error ?: -EIO); m->bio = req->master_bio; req->master_bio = NULL; + /* We leave it in the tree, to be able to verify later + * write-acks in protocol != C during resync. + * But we mark it as "complete", so it won't be counted as + * conflict in a multi-primary setup. */ + req->i.completed = true; } + + if (req->i.waiting) + wake_up(&device->misc_wait); + + /* Either we are about to complete to upper layers, + * or we will restart this request. + * In either case, the request object will be destroyed soon, + * so better remove it from all lists. */ + list_del_init(&req->req_pending_master_completion); } +/* still holds resource->req_lock */ static int drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put) { struct drbd_device *device = req->device; @@ -324,12 +345,91 @@ static int drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_ return 1; } +static void set_if_null_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req) +{ + struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; + if (!connection) + return; + if (connection->req_next == NULL) + connection->req_next = req; +} + +static void advance_conn_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req) +{ + struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; + if (!connection) + return; + if (connection->req_next != req) + return; + list_for_each_entry_continue(req, &connection->transfer_log, tl_requests) { + const unsigned s = req->rq_state; + if (s & RQ_NET_QUEUED) + break; + } + if (&req->tl_requests == &connection->transfer_log) + req = NULL; + connection->req_next = req; +} + +static void set_if_null_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req) +{ + struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; + if (!connection) + return; + if (connection->req_ack_pending == NULL) + connection->req_ack_pending = req; +} + +static void advance_conn_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req) +{ + struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; + if (!connection) + return; + if (connection->req_ack_pending != req) + return; + list_for_each_entry_continue(req, &connection->transfer_log, tl_requests) { + const unsigned s = req->rq_state; + if ((s & RQ_NET_SENT) && (s & RQ_NET_PENDING)) + break; + } + if (&req->tl_requests == &connection->transfer_log) + req = NULL; + connection->req_ack_pending = req; +} + +static void set_if_null_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req) +{ + struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; + if (!connection) + return; + if (connection->req_not_net_done == NULL) + connection->req_not_net_done = req; +} + +static void advance_conn_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req) +{ + struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; + if (!connection) + return; + if (connection->req_not_net_done != req) + return; + list_for_each_entry_continue(req, &connection->transfer_log, tl_requests) { + const unsigned s = req->rq_state; + if ((s & RQ_NET_SENT) && !(s & RQ_NET_DONE)) + break; + } + if (&req->tl_requests == &connection->transfer_log) + req = NULL; + connection->req_not_net_done = req; +} + /* I'd like this to be the only place that manipulates * req->completion_ref and req->kref. */ static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m, int clear, int set) { struct drbd_device *device = req->device; + struct drbd_peer_device *peer_device = first_peer_device(device); unsigned s = req->rq_state; int c_put = 0; int k_put = 0; @@ -356,14 +456,23 @@ static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m, atomic_inc(&req->completion_ref); } - if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) + if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) { atomic_inc(&req->completion_ref); + set_if_null_req_next(peer_device, req); + } if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK)) kref_get(&req->kref); /* wait for the DONE */ - if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT)) - atomic_add(req->i.size >> 9, &device->ap_in_flight); + if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT)) { + /* potentially already completed in the asender thread */ + if (!(s & RQ_NET_DONE)) { + atomic_add(req->i.size >> 9, &device->ap_in_flight); + set_if_null_req_not_net_done(peer_device, req); + } + if (s & RQ_NET_PENDING) + set_if_null_req_ack_pending(peer_device, req); + } if (!(s & RQ_COMPLETION_SUSP) && (set & RQ_COMPLETION_SUSP)) atomic_inc(&req->completion_ref); @@ -386,20 +495,34 @@ static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m, ++k_put; else ++c_put; + list_del_init(&req->req_pending_local); } if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) { dec_ap_pending(device); ++c_put; + req->acked_jif = jiffies; + advance_conn_req_ack_pending(peer_device, req); } - if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) + if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) { ++c_put; + advance_conn_req_next(peer_device, req); + } - if ((s & RQ_EXP_BARR_ACK) && !(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) { - if (req->rq_state & RQ_NET_SENT) + if (!(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) { + if (s & RQ_NET_SENT) atomic_sub(req->i.size >> 9, &device->ap_in_flight); - ++k_put; + if (s & RQ_EXP_BARR_ACK) + ++k_put; + req->net_done_jif = jiffies; + + /* in ahead/behind mode, or just in case, + * before we finally destroy this request, + * the caching pointers must not reference it anymore */ + advance_conn_req_next(peer_device, req); + advance_conn_req_ack_pending(peer_device, req); + advance_conn_req_not_net_done(peer_device, req); } /* potentially complete and destroy */ @@ -439,6 +562,19 @@ static void drbd_report_io_error(struct drbd_device *device, struct drbd_request bdevname(device->ldev->backing_bdev, b)); } +/* Helper for HANDED_OVER_TO_NETWORK. + * Is this a protocol A write (neither WRITE_ACK nor RECEIVE_ACK expected)? + * Is it also still "PENDING"? + * --> If so, clear PENDING and set NET_OK below. + * If it is a protocol A write, but not RQ_PENDING anymore, neg-ack was faster + * (and we must not set RQ_NET_OK) */ +static inline bool is_pending_write_protocol_A(struct drbd_request *req) +{ + return (req->rq_state & + (RQ_WRITE|RQ_NET_PENDING|RQ_EXP_WRITE_ACK|RQ_EXP_RECEIVE_ACK)) + == (RQ_WRITE|RQ_NET_PENDING); +} + /* obviously this could be coded as many single functions * instead of one huge switch, * or by putting the code directly in the respective locations @@ -454,7 +590,9 @@ static void drbd_report_io_error(struct drbd_device *device, struct drbd_request int __req_mod(struct drbd_request *req, enum drbd_req_event what, struct bio_and_error *m) { - struct drbd_device *device = req->device; + struct drbd_device *const device = req->device; + struct drbd_peer_device *const peer_device = first_peer_device(device); + struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; struct net_conf *nc; int p, rv = 0; @@ -477,7 +615,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, * and from w_read_retry_remote */ D_ASSERT(device, !(req->rq_state & RQ_NET_MASK)); rcu_read_lock(); - nc = rcu_dereference(first_peer_device(device)->connection->net_conf); + nc = rcu_dereference(connection->net_conf); p = nc->wire_protocol; rcu_read_unlock(); req->rq_state |= @@ -549,7 +687,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, D_ASSERT(device, (req->rq_state & RQ_LOCAL_MASK) == 0); mod_rq_state(req, m, 0, RQ_NET_QUEUED); req->w.cb = w_send_read_req; - drbd_queue_work(&first_peer_device(device)->connection->sender_work, + drbd_queue_work(&connection->sender_work, &req->w); break; @@ -585,23 +723,23 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, D_ASSERT(device, req->rq_state & RQ_NET_PENDING); mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK); req->w.cb = w_send_dblock; - drbd_queue_work(&first_peer_device(device)->connection->sender_work, + drbd_queue_work(&connection->sender_work, &req->w); /* close the epoch, in case it outgrew the limit */ rcu_read_lock(); - nc = rcu_dereference(first_peer_device(device)->connection->net_conf); + nc = rcu_dereference(connection->net_conf); p = nc->max_epoch_size; rcu_read_unlock(); - if (first_peer_device(device)->connection->current_tle_writes >= p) - start_new_tl_epoch(first_peer_device(device)->connection); + if (connection->current_tle_writes >= p) + start_new_tl_epoch(connection); break; case QUEUE_FOR_SEND_OOS: mod_rq_state(req, m, 0, RQ_NET_QUEUED); req->w.cb = w_send_out_of_sync; - drbd_queue_work(&first_peer_device(device)->connection->sender_work, + drbd_queue_work(&connection->sender_work, &req->w); break; @@ -615,18 +753,16 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, case HANDED_OVER_TO_NETWORK: /* assert something? */ - if (bio_data_dir(req->master_bio) == WRITE && - !(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK))) { + if (is_pending_write_protocol_A(req)) /* this is what is dangerous about protocol A: * pretend it was successfully written on the peer. */ - if (req->rq_state & RQ_NET_PENDING) - mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK); - /* else: neg-ack was faster... */ - /* it is still not yet RQ_NET_DONE until the - * corresponding epoch barrier got acked as well, - * so we know what to dirty on connection loss */ - } - mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT); + mod_rq_state(req, m, RQ_NET_QUEUED|RQ_NET_PENDING, + RQ_NET_SENT|RQ_NET_OK); + else + mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT); + /* It is still not yet RQ_NET_DONE until the + * corresponding epoch barrier got acked as well, + * so we know what to dirty on connection loss. */ break; case OOS_HANDED_TO_NETWORK: @@ -658,12 +794,13 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, case WRITE_ACKED_BY_PEER_AND_SIS: req->rq_state |= RQ_NET_SIS; case WRITE_ACKED_BY_PEER: - D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK); - /* protocol C; successfully written on peer. + /* Normal operation protocol C: successfully written on peer. + * During resync, even in protocol != C, + * we requested an explicit write ack anyways. + * Which means we cannot even assert anything here. * Nothing more to do here. * We want to keep the tl in place for all protocols, to cater * for volatile write-back caches on lower level devices. */ - goto ack_common; case RECV_ACKED_BY_PEER: D_ASSERT(device, req->rq_state & RQ_EXP_RECEIVE_ACK); @@ -671,7 +808,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, * see also notes above in HANDED_OVER_TO_NETWORK about * protocol != C */ ack_common: - D_ASSERT(device, req->rq_state & RQ_NET_PENDING); mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK); break; @@ -714,7 +850,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, get_ldev(device); /* always succeeds in this call path */ req->w.cb = w_restart_disk_io; - drbd_queue_work(&first_peer_device(device)->connection->sender_work, + drbd_queue_work(&connection->sender_work, &req->w); break; @@ -736,7 +872,8 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING); if (req->w.cb) { - drbd_queue_work(&first_peer_device(device)->connection->sender_work, + /* w.cb expected to be w_send_dblock, or w_send_read_req */ + drbd_queue_work(&connection->sender_work, &req->w); rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ; } /* else: FIXME can this happen? */ @@ -769,7 +906,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, break; case QUEUE_AS_DRBD_BARRIER: - start_new_tl_epoch(first_peer_device(device)->connection); + start_new_tl_epoch(connection); mod_rq_state(req, m, 0, RQ_NET_OK|RQ_NET_DONE); break; }; @@ -886,6 +1023,9 @@ static void maybe_pull_ahead(struct drbd_device *device) connection->agreed_pro_version < 96) return; + if (on_congestion == OC_PULL_AHEAD && device->state.conn == C_AHEAD) + return; /* nothing to do ... */ + /* If I don't even have good local storage, we can not reasonably try * to pull ahead of the peer. We also need the local reference to make * sure device->act_log is there. @@ -1021,6 +1161,7 @@ drbd_submit_req_private_bio(struct drbd_request *req) * stable storage, and this is a WRITE, we may not even submit * this bio. */ if (get_ldev(device)) { + req->pre_submit_jif = jiffies; if (drbd_insert_fault(device, rw == WRITE ? DRBD_FAULT_DT_WR : rw == READ ? DRBD_FAULT_DT_RD @@ -1035,10 +1176,14 @@ drbd_submit_req_private_bio(struct drbd_request *req) static void drbd_queue_write(struct drbd_device *device, struct drbd_request *req) { - spin_lock(&device->submit.lock); + spin_lock_irq(&device->resource->req_lock); list_add_tail(&req->tl_requests, &device->submit.writes); - spin_unlock(&device->submit.lock); + list_add_tail(&req->req_pending_master_completion, + &device->pending_master_completion[1 /* WRITE */]); + spin_unlock_irq(&device->resource->req_lock); queue_work(device->submit.wq, &device->submit.worker); + /* do_submit() may sleep internally on al_wait, too */ + wake_up(&device->al_wait); } /* returns the new drbd_request pointer, if the caller is expected to @@ -1047,7 +1192,7 @@ static void drbd_queue_write(struct drbd_device *device, struct drbd_request *re * Returns ERR_PTR(-ENOMEM) if we cannot allocate a drbd_request. */ static struct drbd_request * -drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long start_time) +drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long start_jif) { const int rw = bio_data_dir(bio); struct drbd_request *req; @@ -1062,7 +1207,7 @@ drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long bio_endio(bio, -ENOMEM); return ERR_PTR(-ENOMEM); } - req->start_time = start_time; + req->start_jif = start_jif; if (!get_ldev(device)) { bio_put(req->private_bio); @@ -1075,10 +1220,12 @@ drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long if (rw == WRITE && req->private_bio && req->i.size && !test_bit(AL_SUSPENDED, &device->flags)) { if (!drbd_al_begin_io_fastpath(device, &req->i)) { + atomic_inc(&device->ap_actlog_cnt); drbd_queue_write(device, req); return NULL; } req->rq_state |= RQ_IN_ACT_LOG; + req->in_actlog_jif = jiffies; } return req; @@ -1086,11 +1233,13 @@ drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request *req) { + struct drbd_resource *resource = device->resource; const int rw = bio_rw(req->master_bio); struct bio_and_error m = { NULL, }; bool no_remote = false; + bool submit_private_bio = false; - spin_lock_irq(&device->resource->req_lock); + spin_lock_irq(&resource->req_lock); if (rw == WRITE) { /* This may temporarily give up the req_lock, * but will re-aquire it before it returns here. @@ -1148,13 +1297,18 @@ static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request no_remote = true; } + /* If it took the fast path in drbd_request_prepare, add it here. + * The slow path has added it already. */ + if (list_empty(&req->req_pending_master_completion)) + list_add_tail(&req->req_pending_master_completion, + &device->pending_master_completion[rw == WRITE]); if (req->private_bio) { /* needs to be marked within the same spinlock */ + list_add_tail(&req->req_pending_local, + &device->pending_completion[rw == WRITE]); _req_mod(req, TO_BE_SUBMITTED); /* but we need to give up the spinlock to submit */ - spin_unlock_irq(&device->resource->req_lock); - drbd_submit_req_private_bio(req); - spin_lock_irq(&device->resource->req_lock); + submit_private_bio = true; } else if (no_remote) { nodata: if (__ratelimit(&drbd_ratelimit_state)) @@ -1167,15 +1321,23 @@ nodata: out: if (drbd_req_put_completion_ref(req, &m, 1)) kref_put(&req->kref, drbd_req_destroy); - spin_unlock_irq(&device->resource->req_lock); - + spin_unlock_irq(&resource->req_lock); + + /* Even though above is a kref_put(), this is safe. + * As long as we still need to submit our private bio, + * we hold a completion ref, and the request cannot disappear. + * If however this request did not even have a private bio to submit + * (e.g. remote read), req may already be invalid now. + * That's why we cannot check on req->private_bio. */ + if (submit_private_bio) + drbd_submit_req_private_bio(req); if (m.bio) complete_master_bio(device, &m); } -void __drbd_make_request(struct drbd_device *device, struct bio *bio, unsigned long start_time) +void __drbd_make_request(struct drbd_device *device, struct bio *bio, unsigned long start_jif) { - struct drbd_request *req = drbd_request_prepare(device, bio, start_time); + struct drbd_request *req = drbd_request_prepare(device, bio, start_jif); if (IS_ERR_OR_NULL(req)) return; drbd_send_and_submit(device, req); @@ -1194,6 +1356,8 @@ static void submit_fast_path(struct drbd_device *device, struct list_head *incom continue; req->rq_state |= RQ_IN_ACT_LOG; + req->in_actlog_jif = jiffies; + atomic_dec(&device->ap_actlog_cnt); } list_del_init(&req->tl_requests); @@ -1203,7 +1367,8 @@ static void submit_fast_path(struct drbd_device *device, struct list_head *incom static bool prepare_al_transaction_nonblock(struct drbd_device *device, struct list_head *incoming, - struct list_head *pending) + struct list_head *pending, + struct list_head *later) { struct drbd_request *req, *tmp; int wake = 0; @@ -1212,45 +1377,105 @@ static bool prepare_al_transaction_nonblock(struct drbd_device *device, spin_lock_irq(&device->al_lock); list_for_each_entry_safe(req, tmp, incoming, tl_requests) { err = drbd_al_begin_io_nonblock(device, &req->i); + if (err == -ENOBUFS) + break; if (err == -EBUSY) wake = 1; if (err) - continue; - req->rq_state |= RQ_IN_ACT_LOG; - list_move_tail(&req->tl_requests, pending); + list_move_tail(&req->tl_requests, later); + else + list_move_tail(&req->tl_requests, pending); } spin_unlock_irq(&device->al_lock); if (wake) wake_up(&device->al_wait); - return !list_empty(pending); } +void send_and_submit_pending(struct drbd_device *device, struct list_head *pending) +{ + struct drbd_request *req, *tmp; + + list_for_each_entry_safe(req, tmp, pending, tl_requests) { + req->rq_state |= RQ_IN_ACT_LOG; + req->in_actlog_jif = jiffies; + atomic_dec(&device->ap_actlog_cnt); + list_del_init(&req->tl_requests); + drbd_send_and_submit(device, req); + } +} + void do_submit(struct work_struct *ws) { struct drbd_device *device = container_of(ws, struct drbd_device, submit.worker); - LIST_HEAD(incoming); - LIST_HEAD(pending); - struct drbd_request *req, *tmp; + LIST_HEAD(incoming); /* from drbd_make_request() */ + LIST_HEAD(pending); /* to be submitted after next AL-transaction commit */ + LIST_HEAD(busy); /* blocked by resync requests */ + + /* grab new incoming requests */ + spin_lock_irq(&device->resource->req_lock); + list_splice_tail_init(&device->submit.writes, &incoming); + spin_unlock_irq(&device->resource->req_lock); for (;;) { - spin_lock(&device->submit.lock); - list_splice_tail_init(&device->submit.writes, &incoming); - spin_unlock(&device->submit.lock); + DEFINE_WAIT(wait); + /* move used-to-be-busy back to front of incoming */ + list_splice_init(&busy, &incoming); submit_fast_path(device, &incoming); if (list_empty(&incoming)) break; -skip_fast_path: - wait_event(device->al_wait, prepare_al_transaction_nonblock(device, &incoming, &pending)); - /* Maybe more was queued, while we prepared the transaction? - * Try to stuff them into this transaction as well. - * Be strictly non-blocking here, no wait_event, we already - * have something to commit. - * Stop if we don't make any more progres. - */ for (;;) { + prepare_to_wait(&device->al_wait, &wait, TASK_UNINTERRUPTIBLE); + + list_splice_init(&busy, &incoming); + prepare_al_transaction_nonblock(device, &incoming, &pending, &busy); + if (!list_empty(&pending)) + break; + + schedule(); + + /* If all currently "hot" activity log extents are kept busy by + * incoming requests, we still must not totally starve new + * requests to "cold" extents. + * Something left on &incoming means there had not been + * enough update slots available, and the activity log + * has been marked as "starving". + * + * Try again now, without looking for new requests, + * effectively blocking all new requests until we made + * at least _some_ progress with what we currently have. + */ + if (!list_empty(&incoming)) + continue; + + /* Nothing moved to pending, but nothing left + * on incoming: all moved to busy! + * Grab new and iterate. */ + spin_lock_irq(&device->resource->req_lock); + list_splice_tail_init(&device->submit.writes, &incoming); + spin_unlock_irq(&device->resource->req_lock); + } + finish_wait(&device->al_wait, &wait); + + /* If the transaction was full, before all incoming requests + * had been processed, skip ahead to commit, and iterate + * without splicing in more incoming requests from upper layers. + * + * Else, if all incoming have been processed, + * they have become either "pending" (to be submitted after + * next transaction commit) or "busy" (blocked by resync). + * + * Maybe more was queued, while we prepared the transaction? + * Try to stuff those into this transaction as well. + * Be strictly non-blocking here, + * we already have something to commit. + * + * Commit if we don't make any more progres. + */ + + while (list_empty(&incoming)) { LIST_HEAD(more_pending); LIST_HEAD(more_incoming); bool made_progress; @@ -1260,55 +1485,32 @@ skip_fast_path: if (list_empty(&device->submit.writes)) break; - spin_lock(&device->submit.lock); + spin_lock_irq(&device->resource->req_lock); list_splice_tail_init(&device->submit.writes, &more_incoming); - spin_unlock(&device->submit.lock); + spin_unlock_irq(&device->resource->req_lock); if (list_empty(&more_incoming)) break; - made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending); + made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending, &busy); list_splice_tail_init(&more_pending, &pending); list_splice_tail_init(&more_incoming, &incoming); - if (!made_progress) break; } - drbd_al_begin_io_commit(device, false); - - list_for_each_entry_safe(req, tmp, &pending, tl_requests) { - list_del_init(&req->tl_requests); - drbd_send_and_submit(device, req); - } - /* If all currently hot activity log extents are kept busy by - * incoming requests, we still must not totally starve new - * requests to cold extents. In that case, prepare one request - * in blocking mode. */ - list_for_each_entry_safe(req, tmp, &incoming, tl_requests) { - list_del_init(&req->tl_requests); - req->rq_state |= RQ_IN_ACT_LOG; - if (!drbd_al_begin_io_prepare(device, &req->i)) { - /* Corresponding extent was hot after all? */ - drbd_send_and_submit(device, req); - } else { - /* Found a request to a cold extent. - * Put on "pending" list, - * and try to cumulate with more. */ - list_add(&req->tl_requests, &pending); - goto skip_fast_path; - } - } + drbd_al_begin_io_commit(device); + send_and_submit_pending(device, &pending); } } void drbd_make_request(struct request_queue *q, struct bio *bio) { struct drbd_device *device = (struct drbd_device *) q->queuedata; - unsigned long start_time; + unsigned long start_jif; - start_time = jiffies; + start_jif = jiffies; /* * what we "blindly" assume: @@ -1316,7 +1518,7 @@ void drbd_make_request(struct request_queue *q, struct bio *bio) D_ASSERT(device, IS_ALIGNED(bio->bi_iter.bi_size, 512)); inc_ap_bio(device); - __drbd_make_request(device, bio, start_time); + __drbd_make_request(device, bio, start_jif); } /* This is called by bio_add_page(). @@ -1353,36 +1555,13 @@ int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct return limit; } -static void find_oldest_requests( - struct drbd_connection *connection, - struct drbd_device *device, - struct drbd_request **oldest_req_waiting_for_peer, - struct drbd_request **oldest_req_waiting_for_disk) -{ - struct drbd_request *r; - *oldest_req_waiting_for_peer = NULL; - *oldest_req_waiting_for_disk = NULL; - list_for_each_entry(r, &connection->transfer_log, tl_requests) { - const unsigned s = r->rq_state; - if (!*oldest_req_waiting_for_peer - && ((s & RQ_NET_MASK) && !(s & RQ_NET_DONE))) - *oldest_req_waiting_for_peer = r; - - if (!*oldest_req_waiting_for_disk - && (s & RQ_LOCAL_PENDING) && r->device == device) - *oldest_req_waiting_for_disk = r; - - if (*oldest_req_waiting_for_peer && *oldest_req_waiting_for_disk) - break; - } -} - void request_timer_fn(unsigned long data) { struct drbd_device *device = (struct drbd_device *) data; struct drbd_connection *connection = first_peer_device(device)->connection; - struct drbd_request *req_disk, *req_peer; /* oldest request */ + struct drbd_request *req_read, *req_write, *req_peer; /* oldest request */ struct net_conf *nc; + unsigned long oldest_submit_jif; unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */ unsigned long now; @@ -1403,14 +1582,31 @@ void request_timer_fn(unsigned long data) return; /* Recurring timer stopped */ now = jiffies; + nt = now + et; spin_lock_irq(&device->resource->req_lock); - find_oldest_requests(connection, device, &req_peer, &req_disk); - if (req_peer == NULL && req_disk == NULL) { - spin_unlock_irq(&device->resource->req_lock); - mod_timer(&device->request_timer, now + et); - return; - } + req_read = list_first_entry_or_null(&device->pending_completion[0], struct drbd_request, req_pending_local); + req_write = list_first_entry_or_null(&device->pending_completion[1], struct drbd_request, req_pending_local); + req_peer = connection->req_not_net_done; + /* maybe the oldest request waiting for the peer is in fact still + * blocking in tcp sendmsg */ + if (!req_peer && connection->req_next && connection->req_next->pre_send_jif) + req_peer = connection->req_next; + + /* evaluate the oldest peer request only in one timer! */ + if (req_peer && req_peer->device != device) + req_peer = NULL; + + /* do we have something to evaluate? */ + if (req_peer == NULL && req_write == NULL && req_read == NULL) + goto out; + + oldest_submit_jif = + (req_write && req_read) + ? ( time_before(req_write->pre_submit_jif, req_read->pre_submit_jif) + ? req_write->pre_submit_jif : req_read->pre_submit_jif ) + : req_write ? req_write->pre_submit_jif + : req_read ? req_read->pre_submit_jif : now; /* The request is considered timed out, if * - we have some effective timeout from the configuration, @@ -1429,13 +1625,13 @@ void request_timer_fn(unsigned long data) * to expire twice (worst case) to become effective. Good enough. */ if (ent && req_peer && - time_after(now, req_peer->start_time + ent) && + time_after(now, req_peer->pre_send_jif + ent) && !time_in_range(now, connection->last_reconnect_jif, connection->last_reconnect_jif + ent)) { drbd_warn(device, "Remote failed to finish a request within ko-count * timeout\n"); _drbd_set_state(_NS(device, conn, C_TIMEOUT), CS_VERBOSE | CS_HARD, NULL); } - if (dt && req_disk && - time_after(now, req_disk->start_time + dt) && + if (dt && oldest_submit_jif != now && + time_after(now, oldest_submit_jif + dt) && !time_in_range(now, device->last_reattach_jif, device->last_reattach_jif + dt)) { drbd_warn(device, "Local backing device failed to meet the disk-timeout\n"); __drbd_chk_io_error(device, DRBD_FORCE_DETACH); @@ -1443,11 +1639,12 @@ void request_timer_fn(unsigned long data) /* Reschedule timer for the nearest not already expired timeout. * Fallback to now + min(effective network timeout, disk timeout). */ - ent = (ent && req_peer && time_before(now, req_peer->start_time + ent)) - ? req_peer->start_time + ent : now + et; - dt = (dt && req_disk && time_before(now, req_disk->start_time + dt)) - ? req_disk->start_time + dt : now + et; + ent = (ent && req_peer && time_before(now, req_peer->pre_send_jif + ent)) + ? req_peer->pre_send_jif + ent : now + et; + dt = (dt && oldest_submit_jif != now && time_before(now, oldest_submit_jif + dt)) + ? oldest_submit_jif + dt : now + et; nt = time_before(ent, dt) ? ent : dt; +out: spin_unlock_irq(&connection->resource->req_lock); mod_timer(&device->request_timer, nt); } diff --git a/drivers/block/drbd/drbd_req.h b/drivers/block/drbd/drbd_req.h index 8566cd5866b4..9f6a04080e9f 100644 --- a/drivers/block/drbd/drbd_req.h +++ b/drivers/block/drbd/drbd_req.h @@ -288,6 +288,7 @@ extern void complete_master_bio(struct drbd_device *device, extern void request_timer_fn(unsigned long data); extern void tl_restart(struct drbd_connection *connection, enum drbd_req_event what); extern void _tl_restart(struct drbd_connection *connection, enum drbd_req_event what); +extern void tl_abort_disk_io(struct drbd_device *device); /* this is in drbd_main.c */ extern void drbd_restart_request(struct drbd_request *req); diff --git a/drivers/block/drbd/drbd_state.c b/drivers/block/drbd/drbd_state.c index a5d8aae00e04..c35c0f001bb7 100644 --- a/drivers/block/drbd/drbd_state.c +++ b/drivers/block/drbd/drbd_state.c @@ -410,7 +410,7 @@ _drbd_request_state(struct drbd_device *device, union drbd_state mask, return rv; } -static void print_st(struct drbd_device *device, char *name, union drbd_state ns) +static void print_st(struct drbd_device *device, const char *name, union drbd_state ns) { drbd_err(device, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c%c%c }\n", name, @@ -952,11 +952,12 @@ enum drbd_state_rv __drbd_set_state(struct drbd_device *device, union drbd_state ns, enum chg_state_flags flags, struct completion *done) { + struct drbd_peer_device *peer_device = first_peer_device(device); + struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; union drbd_state os; enum drbd_state_rv rv = SS_SUCCESS; enum sanitize_state_warnings ssw; struct after_state_chg_work *ascw; - bool did_remote, should_do_remote; os = drbd_read_state(device); @@ -978,9 +979,9 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns, this happen...*/ if (is_valid_state(device, os) == rv) - rv = is_valid_soft_transition(os, ns, first_peer_device(device)->connection); + rv = is_valid_soft_transition(os, ns, connection); } else - rv = is_valid_soft_transition(os, ns, first_peer_device(device)->connection); + rv = is_valid_soft_transition(os, ns, connection); } if (rv < SS_SUCCESS) { @@ -997,7 +998,7 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns, sanitize_state(). Only display it here if we where not called from _conn_request_state() */ if (!(flags & CS_DC_SUSP)) - conn_pr_state_change(first_peer_device(device)->connection, os, ns, + conn_pr_state_change(connection, os, ns, (flags & ~CS_DC_MASK) | CS_DC_SUSP); /* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference @@ -1008,28 +1009,35 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns, (os.disk != D_DISKLESS && ns.disk == D_DISKLESS)) atomic_inc(&device->local_cnt); - did_remote = drbd_should_do_remote(device->state); + if (!is_sync_state(os.conn) && is_sync_state(ns.conn)) + clear_bit(RS_DONE, &device->flags); + + /* changes to local_cnt and device flags should be visible before + * changes to state, which again should be visible before anything else + * depending on that change happens. */ + smp_wmb(); device->state.i = ns.i; - should_do_remote = drbd_should_do_remote(device->state); device->resource->susp = ns.susp; device->resource->susp_nod = ns.susp_nod; device->resource->susp_fen = ns.susp_fen; + smp_wmb(); /* put replicated vs not-replicated requests in seperate epochs */ - if (did_remote != should_do_remote) - start_new_tl_epoch(first_peer_device(device)->connection); + if (drbd_should_do_remote((union drbd_dev_state)os.i) != + drbd_should_do_remote((union drbd_dev_state)ns.i)) + start_new_tl_epoch(connection); if (os.disk == D_ATTACHING && ns.disk >= D_NEGOTIATING) drbd_print_uuids(device, "attached to UUIDs"); /* Wake up role changes, that were delayed because of connection establishing */ if (os.conn == C_WF_REPORT_PARAMS && ns.conn != C_WF_REPORT_PARAMS && - no_peer_wf_report_params(first_peer_device(device)->connection)) - clear_bit(STATE_SENT, &first_peer_device(device)->connection->flags); + no_peer_wf_report_params(connection)) + clear_bit(STATE_SENT, &connection->flags); wake_up(&device->misc_wait); wake_up(&device->state_wait); - wake_up(&first_peer_device(device)->connection->ping_wait); + wake_up(&connection->ping_wait); /* Aborted verify run, or we reached the stop sector. * Log the last position, unless end-of-device. */ @@ -1118,21 +1126,21 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns, /* Receiver should clean up itself */ if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING) - drbd_thread_stop_nowait(&first_peer_device(device)->connection->receiver); + drbd_thread_stop_nowait(&connection->receiver); /* Now the receiver finished cleaning up itself, it should die */ if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE) - drbd_thread_stop_nowait(&first_peer_device(device)->connection->receiver); + drbd_thread_stop_nowait(&connection->receiver); /* Upon network failure, we need to restart the receiver. */ if (os.conn > C_WF_CONNECTION && ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT) - drbd_thread_restart_nowait(&first_peer_device(device)->connection->receiver); + drbd_thread_restart_nowait(&connection->receiver); /* Resume AL writing if we get a connection */ if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) { drbd_resume_al(device); - first_peer_device(device)->connection->connect_cnt++; + connection->connect_cnt++; } /* remember last attach time so request_timer_fn() won't @@ -1150,7 +1158,7 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns, ascw->w.cb = w_after_state_ch; ascw->device = device; ascw->done = done; - drbd_queue_work(&first_peer_device(device)->connection->sender_work, + drbd_queue_work(&connection->sender_work, &ascw->w); } else { drbd_err(device, "Could not kmalloc an ascw\n"); @@ -1222,13 +1230,16 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, union drbd_state ns, enum chg_state_flags flags) { struct drbd_resource *resource = device->resource; + struct drbd_peer_device *peer_device = first_peer_device(device); + struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; struct sib_info sib; sib.sib_reason = SIB_STATE_CHANGE; sib.os = os; sib.ns = ns; - if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) { + if ((os.disk != D_UP_TO_DATE || os.pdsk != D_UP_TO_DATE) + && (ns.disk == D_UP_TO_DATE && ns.pdsk == D_UP_TO_DATE)) { clear_bit(CRASHED_PRIMARY, &device->flags); if (device->p_uuid) device->p_uuid[UI_FLAGS] &= ~((u64)2); @@ -1245,7 +1256,6 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, state change. This function might sleep */ if (ns.susp_nod) { - struct drbd_connection *connection = first_peer_device(device)->connection; enum drbd_req_event what = NOTHING; spin_lock_irq(&device->resource->req_lock); @@ -1267,8 +1277,6 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, } if (ns.susp_fen) { - struct drbd_connection *connection = first_peer_device(device)->connection; - spin_lock_irq(&device->resource->req_lock); if (resource->susp_fen && conn_lowest_conn(connection) >= C_CONNECTED) { /* case2: The connection was established again: */ @@ -1294,8 +1302,8 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, * which is unexpected. */ if ((os.conn != C_SYNC_SOURCE && os.conn != C_PAUSED_SYNC_S) && (ns.conn == C_SYNC_SOURCE || ns.conn == C_PAUSED_SYNC_S) && - first_peer_device(device)->connection->agreed_pro_version >= 96 && get_ldev(device)) { - drbd_gen_and_send_sync_uuid(first_peer_device(device)); + connection->agreed_pro_version >= 96 && get_ldev(device)) { + drbd_gen_and_send_sync_uuid(peer_device); put_ldev(device); } @@ -1309,8 +1317,8 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, atomic_set(&device->rs_pending_cnt, 0); drbd_rs_cancel_all(device); - drbd_send_uuids(first_peer_device(device)); - drbd_send_state(first_peer_device(device), ns); + drbd_send_uuids(peer_device); + drbd_send_state(peer_device, ns); } /* No point in queuing send_bitmap if we don't have a connection * anymore, so check also the _current_ state, not only the new state @@ -1335,7 +1343,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, set_bit(NEW_CUR_UUID, &device->flags); } else { drbd_uuid_new_current(device); - drbd_send_uuids(first_peer_device(device)); + drbd_send_uuids(peer_device); } } put_ldev(device); @@ -1346,7 +1354,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, if (os.peer == R_SECONDARY && ns.peer == R_PRIMARY && device->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) { drbd_uuid_new_current(device); - drbd_send_uuids(first_peer_device(device)); + drbd_send_uuids(peer_device); } /* D_DISKLESS Peer becomes secondary */ if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY) @@ -1373,16 +1381,16 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, /* Last part of the attaching process ... */ if (ns.conn >= C_CONNECTED && os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) { - drbd_send_sizes(first_peer_device(device), 0, 0); /* to start sync... */ - drbd_send_uuids(first_peer_device(device)); - drbd_send_state(first_peer_device(device), ns); + drbd_send_sizes(peer_device, 0, 0); /* to start sync... */ + drbd_send_uuids(peer_device); + drbd_send_state(peer_device, ns); } /* We want to pause/continue resync, tell peer. */ if (ns.conn >= C_CONNECTED && ((os.aftr_isp != ns.aftr_isp) || (os.user_isp != ns.user_isp))) - drbd_send_state(first_peer_device(device), ns); + drbd_send_state(peer_device, ns); /* In case one of the isp bits got set, suspend other devices. */ if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) && @@ -1392,10 +1400,10 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, /* Make sure the peer gets informed about eventual state changes (ISP bits) while we were in WFReportParams. */ if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED) - drbd_send_state(first_peer_device(device), ns); + drbd_send_state(peer_device, ns); if (os.conn != C_AHEAD && ns.conn == C_AHEAD) - drbd_send_state(first_peer_device(device), ns); + drbd_send_state(peer_device, ns); /* We are in the progress to start a full sync... */ if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) || @@ -1449,7 +1457,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, drbd_disk_str(device->state.disk)); if (ns.conn >= C_CONNECTED) - drbd_send_state(first_peer_device(device), ns); + drbd_send_state(peer_device, ns); drbd_rs_cancel_all(device); @@ -1473,7 +1481,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, drbd_disk_str(device->state.disk)); if (ns.conn >= C_CONNECTED) - drbd_send_state(first_peer_device(device), ns); + drbd_send_state(peer_device, ns); /* corresponding get_ldev in __drbd_set_state * this may finally trigger drbd_ldev_destroy. */ put_ldev(device); @@ -1481,7 +1489,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, /* Notify peer that I had a local IO error, and did not detached.. */ if (os.disk == D_UP_TO_DATE && ns.disk == D_INCONSISTENT && ns.conn >= C_CONNECTED) - drbd_send_state(first_peer_device(device), ns); + drbd_send_state(peer_device, ns); /* Disks got bigger while they were detached */ if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING && @@ -1499,14 +1507,14 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os, /* sync target done with resync. Explicitly notify peer, even though * it should (at least for non-empty resyncs) already know itself. */ if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED) - drbd_send_state(first_peer_device(device), ns); + drbd_send_state(peer_device, ns); /* Verify finished, or reached stop sector. Peer did not know about * the stop sector, and we may even have changed the stop sector during * verify to interrupt/stop early. Send the new state. */ if (os.conn == C_VERIFY_S && ns.conn == C_CONNECTED && verify_can_do_stop_sector(device)) - drbd_send_state(first_peer_device(device), ns); + drbd_send_state(peer_device, ns); /* This triggers bitmap writeout of potentially still unwritten pages * if the resync finished cleanly, or aborted because of peer disk @@ -1563,7 +1571,7 @@ static int w_after_conn_state_ch(struct drbd_work *w, int unused) old_conf = connection->net_conf; connection->my_addr_len = 0; connection->peer_addr_len = 0; - rcu_assign_pointer(connection->net_conf, NULL); + RCU_INIT_POINTER(connection->net_conf, NULL); conn_free_crypto(connection); mutex_unlock(&connection->resource->conf_update); @@ -1599,7 +1607,7 @@ static int w_after_conn_state_ch(struct drbd_work *w, int unused) return 0; } -void conn_old_common_state(struct drbd_connection *connection, union drbd_state *pcs, enum chg_state_flags *pf) +static void conn_old_common_state(struct drbd_connection *connection, union drbd_state *pcs, enum chg_state_flags *pf) { enum chg_state_flags flags = ~0; struct drbd_peer_device *peer_device; @@ -1688,7 +1696,7 @@ conn_is_valid_transition(struct drbd_connection *connection, union drbd_state ma return rv; } -void +static void conn_set_state(struct drbd_connection *connection, union drbd_state mask, union drbd_state val, union drbd_state *pns_min, union drbd_state *pns_max, enum chg_state_flags flags) { diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c index d8f57b6305cd..50776b362828 100644 --- a/drivers/block/drbd/drbd_worker.c +++ b/drivers/block/drbd/drbd_worker.c @@ -67,13 +67,10 @@ rwlock_t global_state_lock; */ void drbd_md_io_complete(struct bio *bio, int error) { - struct drbd_md_io *md_io; struct drbd_device *device; - md_io = (struct drbd_md_io *)bio->bi_private; - device = container_of(md_io, struct drbd_device, md_io); - - md_io->error = error; + device = bio->bi_private; + device->md_io.error = error; /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able * to timeout on the lower level device, and eventually detach from it. @@ -87,7 +84,7 @@ void drbd_md_io_complete(struct bio *bio, int error) * ASSERT(atomic_read(&device->md_io_in_use) == 1) there. */ drbd_md_put_buffer(device); - md_io->done = 1; + device->md_io.done = 1; wake_up(&device->misc_wait); bio_put(bio); if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */ @@ -135,6 +132,7 @@ void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(l i = peer_req->i; do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; block_id = peer_req->block_id; + peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; spin_lock_irqsave(&device->resource->req_lock, flags); device->writ_cnt += peer_req->i.size >> 9; @@ -398,9 +396,6 @@ static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, if (!get_ldev(device)) return -EIO; - if (drbd_rs_should_slow_down(device, sector)) - goto defer; - /* GFP_TRY, because if there is no memory available right now, this may * be rescheduled for later. It is "only" background resync, after all. */ peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector, @@ -410,7 +405,7 @@ static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, peer_req->w.cb = w_e_send_csum; spin_lock_irq(&device->resource->req_lock); - list_add(&peer_req->w.list, &device->read_ee); + list_add_tail(&peer_req->w.list, &device->read_ee); spin_unlock_irq(&device->resource->req_lock); atomic_add(size >> 9, &device->rs_sect_ev); @@ -452,9 +447,9 @@ void resync_timer_fn(unsigned long data) { struct drbd_device *device = (struct drbd_device *) data; - if (list_empty(&device->resync_work.list)) - drbd_queue_work(&first_peer_device(device)->connection->sender_work, - &device->resync_work); + drbd_queue_work_if_unqueued( + &first_peer_device(device)->connection->sender_work, + &device->resync_work); } static void fifo_set(struct fifo_buffer *fb, int value) @@ -504,9 +499,9 @@ struct fifo_buffer *fifo_alloc(int fifo_size) static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in) { struct disk_conf *dc; - unsigned int want; /* The number of sectors we want in the proxy */ + unsigned int want; /* The number of sectors we want in-flight */ int req_sect; /* Number of sectors to request in this turn */ - int correction; /* Number of sectors more we need in the proxy*/ + int correction; /* Number of sectors more we need in-flight */ int cps; /* correction per invocation of drbd_rs_controller() */ int steps; /* Number of time steps to plan ahead */ int curr_corr; @@ -577,20 +572,27 @@ static int drbd_rs_number_requests(struct drbd_device *device) * potentially causing a distributed deadlock on congestion during * online-verify or (checksum-based) resync, if max-buffers, * socket buffer sizes and resync rate settings are mis-configured. */ - if (mxb - device->rs_in_flight < number) - number = mxb - device->rs_in_flight; + + /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k), + * mxb (as used here, and in drbd_alloc_pages on the peer) is + * "number of pages" (typically also 4k), + * but "rs_in_flight" is in "sectors" (512 Byte). */ + if (mxb - device->rs_in_flight/8 < number) + number = mxb - device->rs_in_flight/8; return number; } -static int make_resync_request(struct drbd_device *device, int cancel) +static int make_resync_request(struct drbd_device *const device, int cancel) { + struct drbd_peer_device *const peer_device = first_peer_device(device); + struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; unsigned long bit; sector_t sector; const sector_t capacity = drbd_get_capacity(device->this_bdev); int max_bio_size; int number, rollback_i, size; - int align, queued, sndbuf; + int align, requeue = 0; int i = 0; if (unlikely(cancel)) @@ -617,17 +619,22 @@ static int make_resync_request(struct drbd_device *device, int cancel) goto requeue; for (i = 0; i < number; i++) { - /* Stop generating RS requests, when half of the send buffer is filled */ - mutex_lock(&first_peer_device(device)->connection->data.mutex); - if (first_peer_device(device)->connection->data.socket) { - queued = first_peer_device(device)->connection->data.socket->sk->sk_wmem_queued; - sndbuf = first_peer_device(device)->connection->data.socket->sk->sk_sndbuf; - } else { - queued = 1; - sndbuf = 0; - } - mutex_unlock(&first_peer_device(device)->connection->data.mutex); - if (queued > sndbuf / 2) + /* Stop generating RS requests when half of the send buffer is filled, + * but notify TCP that we'd like to have more space. */ + mutex_lock(&connection->data.mutex); + if (connection->data.socket) { + struct sock *sk = connection->data.socket->sk; + int queued = sk->sk_wmem_queued; + int sndbuf = sk->sk_sndbuf; + if (queued > sndbuf / 2) { + requeue = 1; + if (sk->sk_socket) + set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); + } + } else + requeue = 1; + mutex_unlock(&connection->data.mutex); + if (requeue) goto requeue; next_sector: @@ -642,8 +649,7 @@ next_sector: sector = BM_BIT_TO_SECT(bit); - if (drbd_rs_should_slow_down(device, sector) || - drbd_try_rs_begin_io(device, sector)) { + if (drbd_try_rs_begin_io(device, sector)) { device->bm_resync_fo = bit; goto requeue; } @@ -696,9 +702,9 @@ next_sector: /* adjust very last sectors, in case we are oddly sized */ if (sector + (size>>9) > capacity) size = (capacity-sector)<<9; - if (first_peer_device(device)->connection->agreed_pro_version >= 89 && - first_peer_device(device)->connection->csums_tfm) { - switch (read_for_csum(first_peer_device(device), sector, size)) { + + if (device->use_csums) { + switch (read_for_csum(peer_device, sector, size)) { case -EIO: /* Disk failure */ put_ldev(device); return -EIO; @@ -717,7 +723,7 @@ next_sector: int err; inc_rs_pending(device); - err = drbd_send_drequest(first_peer_device(device), P_RS_DATA_REQUEST, + err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST, sector, size, ID_SYNCER); if (err) { drbd_err(device, "drbd_send_drequest() failed, aborting...\n"); @@ -774,8 +780,7 @@ static int make_ov_request(struct drbd_device *device, int cancel) size = BM_BLOCK_SIZE; - if (drbd_rs_should_slow_down(device, sector) || - drbd_try_rs_begin_io(device, sector)) { + if (drbd_try_rs_begin_io(device, sector)) { device->ov_position = sector; goto requeue; } @@ -911,7 +916,7 @@ int drbd_resync_finished(struct drbd_device *device) if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) khelper_cmd = "after-resync-target"; - if (first_peer_device(device)->connection->csums_tfm && device->rs_total) { + if (device->use_csums && device->rs_total) { const unsigned long s = device->rs_same_csum; const unsigned long t = device->rs_total; const int ratio = @@ -1351,13 +1356,15 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel) { struct drbd_request *req = container_of(w, struct drbd_request, w); struct drbd_device *device = req->device; - struct drbd_connection *connection = first_peer_device(device)->connection; + struct drbd_peer_device *const peer_device = first_peer_device(device); + struct drbd_connection *const connection = peer_device->connection; int err; if (unlikely(cancel)) { req_mod(req, SEND_CANCELED); return 0; } + req->pre_send_jif = jiffies; /* this time, no connection->send.current_epoch_writes++; * If it was sent, it was the closing barrier for the last @@ -1365,7 +1372,7 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel) * No more barriers will be sent, until we leave AHEAD mode again. */ maybe_send_barrier(connection, req->epoch); - err = drbd_send_out_of_sync(first_peer_device(device), req); + err = drbd_send_out_of_sync(peer_device, req); req_mod(req, OOS_HANDED_TO_NETWORK); return err; @@ -1380,19 +1387,21 @@ int w_send_dblock(struct drbd_work *w, int cancel) { struct drbd_request *req = container_of(w, struct drbd_request, w); struct drbd_device *device = req->device; - struct drbd_connection *connection = first_peer_device(device)->connection; + struct drbd_peer_device *const peer_device = first_peer_device(device); + struct drbd_connection *connection = peer_device->connection; int err; if (unlikely(cancel)) { req_mod(req, SEND_CANCELED); return 0; } + req->pre_send_jif = jiffies; re_init_if_first_write(connection, req->epoch); maybe_send_barrier(connection, req->epoch); connection->send.current_epoch_writes++; - err = drbd_send_dblock(first_peer_device(device), req); + err = drbd_send_dblock(peer_device, req); req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); return err; @@ -1407,19 +1416,21 @@ int w_send_read_req(struct drbd_work *w, int cancel) { struct drbd_request *req = container_of(w, struct drbd_request, w); struct drbd_device *device = req->device; - struct drbd_connection *connection = first_peer_device(device)->connection; + struct drbd_peer_device *const peer_device = first_peer_device(device); + struct drbd_connection *connection = peer_device->connection; int err; if (unlikely(cancel)) { req_mod(req, SEND_CANCELED); return 0; } + req->pre_send_jif = jiffies; /* Even read requests may close a write epoch, * if there was any yet. */ maybe_send_barrier(connection, req->epoch); - err = drbd_send_drequest(first_peer_device(device), P_DATA_REQUEST, req->i.sector, req->i.size, + err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size, (unsigned long)req); req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); @@ -1433,7 +1444,7 @@ int w_restart_disk_io(struct drbd_work *w, int cancel) struct drbd_device *device = req->device; if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) - drbd_al_begin_io(device, &req->i, false); + drbd_al_begin_io(device, &req->i); drbd_req_make_private_bio(req, req->master_bio); req->private_bio->bi_bdev = device->ldev->backing_bdev; @@ -1601,26 +1612,32 @@ void drbd_rs_controller_reset(struct drbd_device *device) void start_resync_timer_fn(unsigned long data) { struct drbd_device *device = (struct drbd_device *) data; - - drbd_queue_work(&first_peer_device(device)->connection->sender_work, - &device->start_resync_work); + drbd_device_post_work(device, RS_START); } -int w_start_resync(struct drbd_work *w, int cancel) +static void do_start_resync(struct drbd_device *device) { - struct drbd_device *device = - container_of(w, struct drbd_device, start_resync_work); - if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) { - drbd_warn(device, "w_start_resync later...\n"); + drbd_warn(device, "postponing start_resync ...\n"); device->start_resync_timer.expires = jiffies + HZ/10; add_timer(&device->start_resync_timer); - return 0; + return; } drbd_start_resync(device, C_SYNC_SOURCE); clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags); - return 0; +} + +static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device) +{ + bool csums_after_crash_only; + rcu_read_lock(); + csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only; + rcu_read_unlock(); + return connection->agreed_pro_version >= 89 && /* supported? */ + connection->csums_tfm && /* configured? */ + (csums_after_crash_only == 0 /* use for each resync? */ + || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */ } /** @@ -1633,6 +1650,8 @@ int w_start_resync(struct drbd_work *w, int cancel) */ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) { + struct drbd_peer_device *peer_device = first_peer_device(device); + struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; union drbd_state ns; int r; @@ -1651,7 +1670,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) if (r > 0) { drbd_info(device, "before-resync-target handler returned %d, " "dropping connection.\n", r); - conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD); + conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); return; } } else /* C_SYNC_SOURCE */ { @@ -1664,7 +1683,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) } else { drbd_info(device, "before-resync-source handler returned %d, " "dropping connection.\n", r); - conn_request_state(first_peer_device(device)->connection, + conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); return; } @@ -1672,7 +1691,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) } } - if (current == first_peer_device(device)->connection->worker.task) { + if (current == connection->worker.task) { /* The worker should not sleep waiting for state_mutex, that can take long */ if (!mutex_trylock(device->state_mutex)) { @@ -1733,11 +1752,20 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) device->rs_mark_time[i] = now; } _drbd_pause_after(device); + /* Forget potentially stale cached per resync extent bit-counts. + * Open coded drbd_rs_cancel_all(device), we already have IRQs + * disabled, and know the disk state is ok. */ + spin_lock(&device->al_lock); + lc_reset(device->resync); + device->resync_locked = 0; + device->resync_wenr = LC_FREE; + spin_unlock(&device->al_lock); } write_unlock(&global_state_lock); spin_unlock_irq(&device->resource->req_lock); if (r == SS_SUCCESS) { + wake_up(&device->al_wait); /* for lc_reset() above */ /* reset rs_last_bcast when a resync or verify is started, * to deal with potential jiffies wrap. */ device->rs_last_bcast = jiffies - HZ; @@ -1746,8 +1774,12 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) drbd_conn_str(ns.conn), (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10), (unsigned long) device->rs_total); - if (side == C_SYNC_TARGET) + if (side == C_SYNC_TARGET) { device->bm_resync_fo = 0; + device->use_csums = use_checksum_based_resync(connection, device); + } else { + device->use_csums = 0; + } /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid * with w_send_oos, or the sync target will get confused as to @@ -1756,12 +1788,10 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) * drbd_resync_finished from here in that case. * We drbd_gen_and_send_sync_uuid here for protocol < 96, * and from after_state_ch otherwise. */ - if (side == C_SYNC_SOURCE && - first_peer_device(device)->connection->agreed_pro_version < 96) - drbd_gen_and_send_sync_uuid(first_peer_device(device)); + if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96) + drbd_gen_and_send_sync_uuid(peer_device); - if (first_peer_device(device)->connection->agreed_pro_version < 95 && - device->rs_total == 0) { + if (connection->agreed_pro_version < 95 && device->rs_total == 0) { /* This still has a race (about when exactly the peers * detect connection loss) that can lead to a full sync * on next handshake. In 8.3.9 we fixed this with explicit @@ -1777,7 +1807,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) int timeo; rcu_read_lock(); - nc = rcu_dereference(first_peer_device(device)->connection->net_conf); + nc = rcu_dereference(connection->net_conf); timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9; rcu_read_unlock(); schedule_timeout_interruptible(timeo); @@ -1799,10 +1829,165 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) mutex_unlock(device->state_mutex); } +static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done) +{ + struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, }; + device->rs_last_bcast = jiffies; + + if (!get_ldev(device)) + return; + + drbd_bm_write_lazy(device, 0); + if (resync_done && is_sync_state(device->state.conn)) + drbd_resync_finished(device); + + drbd_bcast_event(device, &sib); + /* update timestamp, in case it took a while to write out stuff */ + device->rs_last_bcast = jiffies; + put_ldev(device); +} + +static void drbd_ldev_destroy(struct drbd_device *device) +{ + lc_destroy(device->resync); + device->resync = NULL; + lc_destroy(device->act_log); + device->act_log = NULL; + __no_warn(local, + drbd_free_ldev(device->ldev); + device->ldev = NULL;); + clear_bit(GOING_DISKLESS, &device->flags); + wake_up(&device->misc_wait); +} + +static void go_diskless(struct drbd_device *device) +{ + D_ASSERT(device, device->state.disk == D_FAILED); + /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will + * inc/dec it frequently. Once we are D_DISKLESS, no one will touch + * the protected members anymore, though, so once put_ldev reaches zero + * again, it will be safe to free them. */ + + /* Try to write changed bitmap pages, read errors may have just + * set some bits outside the area covered by the activity log. + * + * If we have an IO error during the bitmap writeout, + * we will want a full sync next time, just in case. + * (Do we want a specific meta data flag for this?) + * + * If that does not make it to stable storage either, + * we cannot do anything about that anymore. + * + * We still need to check if both bitmap and ldev are present, we may + * end up here after a failed attach, before ldev was even assigned. + */ + if (device->bitmap && device->ldev) { + /* An interrupted resync or similar is allowed to recounts bits + * while we detach. + * Any modifications would not be expected anymore, though. + */ + if (drbd_bitmap_io_from_worker(device, drbd_bm_write, + "detach", BM_LOCKED_TEST_ALLOWED)) { + if (test_bit(WAS_READ_ERROR, &device->flags)) { + drbd_md_set_flag(device, MDF_FULL_SYNC); + drbd_md_sync(device); + } + } + } + + drbd_force_state(device, NS(disk, D_DISKLESS)); +} + +static int do_md_sync(struct drbd_device *device) +{ + drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n"); + drbd_md_sync(device); + return 0; +} + +/* only called from drbd_worker thread, no locking */ +void __update_timing_details( + struct drbd_thread_timing_details *tdp, + unsigned int *cb_nr, + void *cb, + const char *fn, const unsigned int line) +{ + unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST; + struct drbd_thread_timing_details *td = tdp + i; + + td->start_jif = jiffies; + td->cb_addr = cb; + td->caller_fn = fn; + td->line = line; + td->cb_nr = *cb_nr; + + i = (i+1) % DRBD_THREAD_DETAILS_HIST; + td = tdp + i; + memset(td, 0, sizeof(*td)); + + ++(*cb_nr); +} + +#define WORK_PENDING(work_bit, todo) (todo & (1UL << work_bit)) +static void do_device_work(struct drbd_device *device, const unsigned long todo) +{ + if (WORK_PENDING(MD_SYNC, todo)) + do_md_sync(device); + if (WORK_PENDING(RS_DONE, todo) || + WORK_PENDING(RS_PROGRESS, todo)) + update_on_disk_bitmap(device, WORK_PENDING(RS_DONE, todo)); + if (WORK_PENDING(GO_DISKLESS, todo)) + go_diskless(device); + if (WORK_PENDING(DESTROY_DISK, todo)) + drbd_ldev_destroy(device); + if (WORK_PENDING(RS_START, todo)) + do_start_resync(device); +} + +#define DRBD_DEVICE_WORK_MASK \ + ((1UL << GO_DISKLESS) \ + |(1UL << DESTROY_DISK) \ + |(1UL << MD_SYNC) \ + |(1UL << RS_START) \ + |(1UL << RS_PROGRESS) \ + |(1UL << RS_DONE) \ + ) + +static unsigned long get_work_bits(unsigned long *flags) +{ + unsigned long old, new; + do { + old = *flags; + new = old & ~DRBD_DEVICE_WORK_MASK; + } while (cmpxchg(flags, old, new) != old); + return old & DRBD_DEVICE_WORK_MASK; +} + +static void do_unqueued_work(struct drbd_connection *connection) +{ + struct drbd_peer_device *peer_device; + int vnr; + + rcu_read_lock(); + idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { + struct drbd_device *device = peer_device->device; + unsigned long todo = get_work_bits(&device->flags); + if (!todo) + continue; + + kref_get(&device->kref); + rcu_read_unlock(); + do_device_work(device, todo); + kref_put(&device->kref, drbd_destroy_device); + rcu_read_lock(); + } + rcu_read_unlock(); +} + static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) { spin_lock_irq(&queue->q_lock); - list_splice_init(&queue->q, work_list); + list_splice_tail_init(&queue->q, work_list); spin_unlock_irq(&queue->q_lock); return !list_empty(work_list); } @@ -1851,7 +2036,7 @@ static void wait_for_work(struct drbd_connection *connection, struct list_head * /* dequeue single item only, * we still use drbd_queue_work_front() in some places */ if (!list_empty(&connection->sender_work.q)) - list_move(connection->sender_work.q.next, work_list); + list_splice_tail_init(&connection->sender_work.q, work_list); spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ if (!list_empty(work_list) || signal_pending(current)) { spin_unlock_irq(&connection->resource->req_lock); @@ -1873,6 +2058,14 @@ static void wait_for_work(struct drbd_connection *connection, struct list_head * if (send_barrier) maybe_send_barrier(connection, connection->send.current_epoch_nr + 1); + + if (test_bit(DEVICE_WORK_PENDING, &connection->flags)) + break; + + /* drbd_send() may have called flush_signals() */ + if (get_t_state(&connection->worker) != RUNNING) + break; + schedule(); /* may be woken up for other things but new work, too, * e.g. if the current epoch got closed. @@ -1906,10 +2099,15 @@ int drbd_worker(struct drbd_thread *thi) while (get_t_state(thi) == RUNNING) { drbd_thread_current_set_cpu(thi); - /* as long as we use drbd_queue_work_front(), - * we may only dequeue single work items here, not batches. */ - if (list_empty(&work_list)) + if (list_empty(&work_list)) { + update_worker_timing_details(connection, wait_for_work); wait_for_work(connection, &work_list); + } + + if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { + update_worker_timing_details(connection, do_unqueued_work); + do_unqueued_work(connection); + } if (signal_pending(current)) { flush_signals(current); @@ -1926,6 +2124,7 @@ int drbd_worker(struct drbd_thread *thi) while (!list_empty(&work_list)) { w = list_first_entry(&work_list, struct drbd_work, list); list_del_init(&w->list); + update_worker_timing_details(connection, w->cb); if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0) continue; if (connection->cstate >= C_WF_REPORT_PARAMS) @@ -1934,13 +2133,18 @@ int drbd_worker(struct drbd_thread *thi) } do { + if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { + update_worker_timing_details(connection, do_unqueued_work); + do_unqueued_work(connection); + } while (!list_empty(&work_list)) { w = list_first_entry(&work_list, struct drbd_work, list); list_del_init(&w->list); + update_worker_timing_details(connection, w->cb); w->cb(w, 1); } dequeue_work_batch(&connection->sender_work, &work_list); - } while (!list_empty(&work_list)); + } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags)); rcu_read_lock(); idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { diff --git a/drivers/block/virtio_blk.c b/drivers/block/virtio_blk.c index f63d358f3d93..0a581400de0f 100644 --- a/drivers/block/virtio_blk.c +++ b/drivers/block/virtio_blk.c @@ -15,17 +15,22 @@ #include <linux/numa.h> #define PART_BITS 4 +#define VQ_NAME_LEN 16 static int major; static DEFINE_IDA(vd_index_ida); static struct workqueue_struct *virtblk_wq; +struct virtio_blk_vq { + struct virtqueue *vq; + spinlock_t lock; + char name[VQ_NAME_LEN]; +} ____cacheline_aligned_in_smp; + struct virtio_blk { struct virtio_device *vdev; - struct virtqueue *vq; - spinlock_t vq_lock; /* The disk structure for the kernel. */ struct gendisk *disk; @@ -47,6 +52,10 @@ struct virtio_blk /* Ida index - used to track minor number allocations. */ int index; + + /* num of vqs */ + int num_vqs; + struct virtio_blk_vq *vqs; }; struct virtblk_req @@ -133,14 +142,15 @@ static void virtblk_done(struct virtqueue *vq) { struct virtio_blk *vblk = vq->vdev->priv; bool req_done = false; + int qid = vq->index; struct virtblk_req *vbr; unsigned long flags; unsigned int len; - spin_lock_irqsave(&vblk->vq_lock, flags); + spin_lock_irqsave(&vblk->vqs[qid].lock, flags); do { virtqueue_disable_cb(vq); - while ((vbr = virtqueue_get_buf(vblk->vq, &len)) != NULL) { + while ((vbr = virtqueue_get_buf(vblk->vqs[qid].vq, &len)) != NULL) { blk_mq_complete_request(vbr->req); req_done = true; } @@ -151,7 +161,7 @@ static void virtblk_done(struct virtqueue *vq) /* In case queue is stopped waiting for more buffers. */ if (req_done) blk_mq_start_stopped_hw_queues(vblk->disk->queue, true); - spin_unlock_irqrestore(&vblk->vq_lock, flags); + spin_unlock_irqrestore(&vblk->vqs[qid].lock, flags); } static int virtio_queue_rq(struct blk_mq_hw_ctx *hctx, struct request *req) @@ -160,6 +170,7 @@ static int virtio_queue_rq(struct blk_mq_hw_ctx *hctx, struct request *req) struct virtblk_req *vbr = blk_mq_rq_to_pdu(req); unsigned long flags; unsigned int num; + int qid = hctx->queue_num; const bool last = (req->cmd_flags & REQ_END) != 0; int err; bool notify = false; @@ -202,12 +213,12 @@ static int virtio_queue_rq(struct blk_mq_hw_ctx *hctx, struct request *req) vbr->out_hdr.type |= VIRTIO_BLK_T_IN; } - spin_lock_irqsave(&vblk->vq_lock, flags); - err = __virtblk_add_req(vblk->vq, vbr, vbr->sg, num); + spin_lock_irqsave(&vblk->vqs[qid].lock, flags); + err = __virtblk_add_req(vblk->vqs[qid].vq, vbr, vbr->sg, num); if (err) { - virtqueue_kick(vblk->vq); + virtqueue_kick(vblk->vqs[qid].vq); blk_mq_stop_hw_queue(hctx); - spin_unlock_irqrestore(&vblk->vq_lock, flags); + spin_unlock_irqrestore(&vblk->vqs[qid].lock, flags); /* Out of mem doesn't actually happen, since we fall back * to direct descriptors */ if (err == -ENOMEM || err == -ENOSPC) @@ -215,12 +226,12 @@ static int virtio_queue_rq(struct blk_mq_hw_ctx *hctx, struct request *req) return BLK_MQ_RQ_QUEUE_ERROR; } - if (last && virtqueue_kick_prepare(vblk->vq)) + if (last && virtqueue_kick_prepare(vblk->vqs[qid].vq)) notify = true; - spin_unlock_irqrestore(&vblk->vq_lock, flags); + spin_unlock_irqrestore(&vblk->vqs[qid].lock, flags); if (notify) - virtqueue_notify(vblk->vq); + virtqueue_notify(vblk->vqs[qid].vq); return BLK_MQ_RQ_QUEUE_OK; } @@ -377,12 +388,64 @@ static void virtblk_config_changed(struct virtio_device *vdev) static int init_vq(struct virtio_blk *vblk) { int err = 0; + int i; + vq_callback_t **callbacks; + const char **names; + struct virtqueue **vqs; + unsigned short num_vqs; + struct virtio_device *vdev = vblk->vdev; + + err = virtio_cread_feature(vdev, VIRTIO_BLK_F_MQ, + struct virtio_blk_config, num_queues, + &num_vqs); + if (err) + num_vqs = 1; + + vblk->vqs = kmalloc(sizeof(*vblk->vqs) * num_vqs, GFP_KERNEL); + if (!vblk->vqs) { + err = -ENOMEM; + goto out; + } + + names = kmalloc(sizeof(*names) * num_vqs, GFP_KERNEL); + if (!names) + goto err_names; + + callbacks = kmalloc(sizeof(*callbacks) * num_vqs, GFP_KERNEL); + if (!callbacks) + goto err_callbacks; + + vqs = kmalloc(sizeof(*vqs) * num_vqs, GFP_KERNEL); + if (!vqs) + goto err_vqs; - /* We expect one virtqueue, for output. */ - vblk->vq = virtio_find_single_vq(vblk->vdev, virtblk_done, "requests"); - if (IS_ERR(vblk->vq)) - err = PTR_ERR(vblk->vq); + for (i = 0; i < num_vqs; i++) { + callbacks[i] = virtblk_done; + snprintf(vblk->vqs[i].name, VQ_NAME_LEN, "req.%d", i); + names[i] = vblk->vqs[i].name; + } + + /* Discover virtqueues and write information to configuration. */ + err = vdev->config->find_vqs(vdev, num_vqs, vqs, callbacks, names); + if (err) + goto err_find_vqs; + for (i = 0; i < num_vqs; i++) { + spin_lock_init(&vblk->vqs[i].lock); + vblk->vqs[i].vq = vqs[i]; + } + vblk->num_vqs = num_vqs; + + err_find_vqs: + kfree(vqs); + err_vqs: + kfree(callbacks); + err_callbacks: + kfree(names); + err_names: + if (err) + kfree(vblk->vqs); + out: return err; } @@ -551,7 +614,6 @@ static int virtblk_probe(struct virtio_device *vdev) err = init_vq(vblk); if (err) goto out_free_vblk; - spin_lock_init(&vblk->vq_lock); /* FIXME: How many partitions? How long is a piece of string? */ vblk->disk = alloc_disk(1 << PART_BITS); @@ -562,7 +624,7 @@ static int virtblk_probe(struct virtio_device *vdev) /* Default queue sizing is to fill the ring. */ if (!virtblk_queue_depth) { - virtblk_queue_depth = vblk->vq->num_free; + virtblk_queue_depth = vblk->vqs[0].vq->num_free; /* ... but without indirect descs, we use 2 descs per req */ if (!virtio_has_feature(vdev, VIRTIO_RING_F_INDIRECT_DESC)) virtblk_queue_depth /= 2; @@ -570,7 +632,6 @@ static int virtblk_probe(struct virtio_device *vdev) memset(&vblk->tag_set, 0, sizeof(vblk->tag_set)); vblk->tag_set.ops = &virtio_mq_ops; - vblk->tag_set.nr_hw_queues = 1; vblk->tag_set.queue_depth = virtblk_queue_depth; vblk->tag_set.numa_node = NUMA_NO_NODE; vblk->tag_set.flags = BLK_MQ_F_SHOULD_MERGE; @@ -578,6 +639,7 @@ static int virtblk_probe(struct virtio_device *vdev) sizeof(struct virtblk_req) + sizeof(struct scatterlist) * sg_elems; vblk->tag_set.driver_data = vblk; + vblk->tag_set.nr_hw_queues = vblk->num_vqs; err = blk_mq_alloc_tag_set(&vblk->tag_set); if (err) @@ -727,6 +789,7 @@ static void virtblk_remove(struct virtio_device *vdev) refc = atomic_read(&disk_to_dev(vblk->disk)->kobj.kref.refcount); put_disk(vblk->disk); vdev->config->del_vqs(vdev); + kfree(vblk->vqs); kfree(vblk); /* Only free device id if we don't have any users */ @@ -777,7 +840,8 @@ static const struct virtio_device_id id_table[] = { static unsigned int features[] = { VIRTIO_BLK_F_SEG_MAX, VIRTIO_BLK_F_SIZE_MAX, VIRTIO_BLK_F_GEOMETRY, VIRTIO_BLK_F_RO, VIRTIO_BLK_F_BLK_SIZE, VIRTIO_BLK_F_SCSI, - VIRTIO_BLK_F_WCE, VIRTIO_BLK_F_TOPOLOGY, VIRTIO_BLK_F_CONFIG_WCE + VIRTIO_BLK_F_WCE, VIRTIO_BLK_F_TOPOLOGY, VIRTIO_BLK_F_CONFIG_WCE, + VIRTIO_BLK_F_MQ, }; static struct virtio_driver virtio_blk = { |