diff options
Diffstat (limited to 'drivers/block/drbd/drbd_receiver.c')
-rw-r--r-- | drivers/block/drbd/drbd_receiver.c | 270 |
1 files changed, 37 insertions, 233 deletions
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c index e5a2e5f7887b..caaf2781136d 100644 --- a/drivers/block/drbd/drbd_receiver.c +++ b/drivers/block/drbd/drbd_receiver.c @@ -33,6 +33,7 @@ #include <linux/string.h> #include <linux/scatterlist.h> #include <linux/part_stat.h> +#include <linux/mempool.h> #include "drbd_int.h" #include "drbd_protocol.h" #include "drbd_req.h" @@ -63,182 +64,31 @@ static int e_end_block(struct drbd_work *, int); #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) -/* - * some helper functions to deal with single linked page lists, - * page->private being our "next" pointer. - */ - -/* If at least n pages are linked at head, get n pages off. - * Otherwise, don't modify head, and return NULL. - * Locking is the responsibility of the caller. - */ -static struct page *page_chain_del(struct page **head, int n) -{ - struct page *page; - struct page *tmp; - - BUG_ON(!n); - BUG_ON(!head); - - page = *head; - - if (!page) - return NULL; - - while (page) { - tmp = page_chain_next(page); - if (--n == 0) - break; /* found sufficient pages */ - if (tmp == NULL) - /* insufficient pages, don't use any of them. */ - return NULL; - page = tmp; - } - - /* add end of list marker for the returned list */ - set_page_private(page, 0); - /* actual return value, and adjustment of head */ - page = *head; - *head = tmp; - return page; -} - -/* may be used outside of locks to find the tail of a (usually short) - * "private" page chain, before adding it back to a global chain head - * with page_chain_add() under a spinlock. */ -static struct page *page_chain_tail(struct page *page, int *len) -{ - struct page *tmp; - int i = 1; - while ((tmp = page_chain_next(page))) { - ++i; - page = tmp; - } - if (len) - *len = i; - return page; -} - -static int page_chain_free(struct page *page) -{ - struct page *tmp; - int i = 0; - page_chain_for_each_safe(page, tmp) { - put_page(page); - ++i; - } - return i; -} - -static void page_chain_add(struct page **head, - struct page *chain_first, struct page *chain_last) -{ -#if 1 - struct page *tmp; - tmp = page_chain_tail(chain_first, NULL); - BUG_ON(tmp != chain_last); -#endif - - /* add chain to head */ - set_page_private(chain_last, (unsigned long)*head); - *head = chain_first; -} - -static struct page *__drbd_alloc_pages(struct drbd_device *device, - unsigned int number) +static struct page *__drbd_alloc_pages(unsigned int number) { struct page *page = NULL; struct page *tmp = NULL; unsigned int i = 0; - /* Yes, testing drbd_pp_vacant outside the lock is racy. - * So what. It saves a spin_lock. */ - if (drbd_pp_vacant >= number) { - spin_lock(&drbd_pp_lock); - page = page_chain_del(&drbd_pp_pool, number); - if (page) - drbd_pp_vacant -= number; - spin_unlock(&drbd_pp_lock); - if (page) - return page; - } - /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD * "criss-cross" setup, that might cause write-out on some other DRBD, * which in turn might block on the other node at this very place. */ for (i = 0; i < number; i++) { - tmp = alloc_page(GFP_TRY); + tmp = mempool_alloc(&drbd_buffer_page_pool, GFP_TRY); if (!tmp) - break; + goto fail; set_page_private(tmp, (unsigned long)page); page = tmp; } - - if (i == number) - return page; - - /* Not enough pages immediately available this time. - * No need to jump around here, drbd_alloc_pages will retry this - * function "soon". */ - if (page) { - tmp = page_chain_tail(page, NULL); - spin_lock(&drbd_pp_lock); - page_chain_add(&drbd_pp_pool, page, tmp); - drbd_pp_vacant += i; - spin_unlock(&drbd_pp_lock); + return page; +fail: + page_chain_for_each_safe(page, tmp) { + set_page_private(page, 0); + mempool_free(page, &drbd_buffer_page_pool); } return NULL; } -static void reclaim_finished_net_peer_reqs(struct drbd_device *device, - struct list_head *to_be_freed) -{ - struct drbd_peer_request *peer_req, *tmp; - - /* The EEs are always appended to the end of the list. Since - they are sent in order over the wire, they have to finish - in order. As soon as we see the first not finished we can - stop to examine the list... */ - - list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) { - if (drbd_peer_req_has_active_page(peer_req)) - break; - list_move(&peer_req->w.list, to_be_freed); - } -} - -static void drbd_reclaim_net_peer_reqs(struct drbd_device *device) -{ - LIST_HEAD(reclaimed); - struct drbd_peer_request *peer_req, *t; - - spin_lock_irq(&device->resource->req_lock); - reclaim_finished_net_peer_reqs(device, &reclaimed); - spin_unlock_irq(&device->resource->req_lock); - list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) - drbd_free_net_peer_req(device, peer_req); -} - -static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection) -{ - struct drbd_peer_device *peer_device; - int vnr; - - rcu_read_lock(); - idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { - struct drbd_device *device = peer_device->device; - if (!atomic_read(&device->pp_in_use_by_net)) - continue; - - kref_get(&device->kref); - rcu_read_unlock(); - drbd_reclaim_net_peer_reqs(device); - kref_put(&device->kref, drbd_destroy_device); - rcu_read_lock(); - } - rcu_read_unlock(); -} - /** * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled) * @peer_device: DRBD device. @@ -263,9 +113,8 @@ struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int bool retry) { struct drbd_device *device = peer_device->device; - struct page *page = NULL; + struct page *page; struct net_conf *nc; - DEFINE_WAIT(wait); unsigned int mxb; rcu_read_lock(); @@ -273,37 +122,9 @@ struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int mxb = nc ? nc->max_buffers : 1000000; rcu_read_unlock(); - if (atomic_read(&device->pp_in_use) < mxb) - page = __drbd_alloc_pages(device, number); - - /* Try to keep the fast path fast, but occasionally we need - * to reclaim the pages we lended to the network stack. */ - if (page && atomic_read(&device->pp_in_use_by_net) > 512) - drbd_reclaim_net_peer_reqs(device); - - while (page == NULL) { - prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); - - drbd_reclaim_net_peer_reqs(device); - - if (atomic_read(&device->pp_in_use) < mxb) { - page = __drbd_alloc_pages(device, number); - if (page) - break; - } - - if (!retry) - break; - - if (signal_pending(current)) { - drbd_warn(device, "drbd_alloc_pages interrupted!\n"); - break; - } - - if (schedule_timeout(HZ/10) == 0) - mxb = UINT_MAX; - } - finish_wait(&drbd_pp_wait, &wait); + if (atomic_read(&device->pp_in_use) >= mxb) + schedule_timeout_interruptible(HZ / 10); + page = __drbd_alloc_pages(number); if (page) atomic_add(number, &device->pp_in_use); @@ -314,29 +135,25 @@ struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int * Is also used from inside an other spin_lock_irq(&resource->req_lock); * Either links the page chain back to the global pool, * or returns all pages to the system. */ -static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net) +static void drbd_free_pages(struct drbd_device *device, struct page *page) { - atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use; - int i; + struct page *tmp; + int i = 0; if (page == NULL) return; - if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count) - i = page_chain_free(page); - else { - struct page *tmp; - tmp = page_chain_tail(page, &i); - spin_lock(&drbd_pp_lock); - page_chain_add(&drbd_pp_pool, page, tmp); - drbd_pp_vacant += i; - spin_unlock(&drbd_pp_lock); - } - i = atomic_sub_return(i, a); + page_chain_for_each_safe(page, tmp) { + set_page_private(page, 0); + if (page_count(page) == 1) + mempool_free(page, &drbd_buffer_page_pool); + else + put_page(page); + i++; + } + i = atomic_sub_return(i, &device->pp_in_use); if (i < 0) - drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n", - is_net ? "pp_in_use_by_net" : "pp_in_use", i); - wake_up(&drbd_pp_wait); + drbd_warn(device, "ASSERTION FAILED: pp_in_use: %d < 0\n", i); } /* @@ -380,6 +197,8 @@ drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t secto gfpflags_allow_blocking(gfp_mask)); if (!page) goto fail; + if (!mempool_is_saturated(&drbd_buffer_page_pool)) + peer_req->flags |= EE_RELEASE_TO_MEMPOOL; } memset(peer_req, 0, sizeof(*peer_req)); @@ -403,13 +222,12 @@ drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t secto return NULL; } -void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req, - int is_net) +void drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req) { might_sleep(); if (peer_req->flags & EE_HAS_DIGEST) kfree(peer_req->digest); - drbd_free_pages(device, peer_req->pages, is_net); + drbd_free_pages(device, peer_req->pages); D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0); D_ASSERT(device, drbd_interval_empty(&peer_req->i)); if (!expect(device, !(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) { @@ -424,14 +242,13 @@ int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list) LIST_HEAD(work_list); struct drbd_peer_request *peer_req, *t; int count = 0; - int is_net = list == &device->net_ee; spin_lock_irq(&device->resource->req_lock); list_splice_init(list, &work_list); spin_unlock_irq(&device->resource->req_lock); list_for_each_entry_safe(peer_req, t, &work_list, w.list) { - __drbd_free_peer_req(device, peer_req, is_net); + drbd_free_peer_req(device, peer_req); count++; } return count; @@ -443,18 +260,13 @@ int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list) static int drbd_finish_peer_reqs(struct drbd_device *device) { LIST_HEAD(work_list); - LIST_HEAD(reclaimed); struct drbd_peer_request *peer_req, *t; int err = 0; spin_lock_irq(&device->resource->req_lock); - reclaim_finished_net_peer_reqs(device, &reclaimed); list_splice_init(&device->done_ee, &work_list); spin_unlock_irq(&device->resource->req_lock); - list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) - drbd_free_net_peer_req(device, peer_req); - /* possible callbacks here: * e_end_block, and e_end_resync_block, e_send_superseded. * all ignore the last argument. @@ -1975,7 +1787,7 @@ static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size) data_size -= len; } kunmap(page); - drbd_free_pages(peer_device->device, page, 0); + drbd_free_pages(peer_device->device, page); return err; } @@ -2500,7 +2312,11 @@ static int handle_write_conflicts(struct drbd_device *device, peer_req->w.cb = superseded ? e_send_superseded : e_send_retry_write; list_add_tail(&peer_req->w.list, &device->done_ee); - queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work); + /* put is in drbd_send_acks_wf() */ + kref_get(&device->kref); + if (!queue_work(connection->ack_sender, + &peer_req->peer_device->send_acks_work)) + kref_put(&device->kref, drbd_destroy_device); err = -ENOENT; goto out; @@ -5220,16 +5036,6 @@ static int drbd_disconnected(struct drbd_peer_device *peer_device) put_ldev(device); } - /* tcp_close and release of sendpage pages can be deferred. I don't - * want to use SO_LINGER, because apparently it can be deferred for - * more than 20 seconds (longest time I checked). - * - * Actually we don't care for exactly when the network stack does its - * put_page(), but release our reference on these pages right here. - */ - i = drbd_free_peer_reqs(device, &device->net_ee); - if (i) - drbd_info(device, "net_ee not empty, killed %u entries\n", i); i = atomic_read(&device->pp_in_use_by_net); if (i) drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i); @@ -5976,8 +5782,6 @@ int drbd_ack_receiver(struct drbd_thread *thi) while (get_t_state(thi) == RUNNING) { drbd_thread_current_set_cpu(thi); - conn_reclaim_net_peer_reqs(connection); - if (test_and_clear_bit(SEND_PING, &connection->flags)) { if (drbd_send_ping(connection)) { drbd_err(connection, "drbd_send_ping has failed\n"); |