diff options
Diffstat (limited to 'drivers')
-rw-r--r-- | drivers/block/rbd.c | 2452 |
1 files changed, 980 insertions, 1472 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 1e03b04819c8..07dc5419bd63 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -32,6 +32,7 @@ #include <linux/ceph/osd_client.h> #include <linux/ceph/mon_client.h> #include <linux/ceph/cls_lock_client.h> +#include <linux/ceph/striper.h> #include <linux/ceph/decode.h> #include <linux/parser.h> #include <linux/bsearch.h> @@ -200,95 +201,81 @@ struct rbd_client { }; struct rbd_img_request; -typedef void (*rbd_img_callback_t)(struct rbd_img_request *); - -#define BAD_WHICH U32_MAX /* Good which or bad which, which? */ - -struct rbd_obj_request; -typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *); enum obj_request_type { - OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES + OBJ_REQUEST_NODATA = 1, + OBJ_REQUEST_BIO, /* pointer into provided bio (list) */ + OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */ + OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */ }; enum obj_operation_type { + OBJ_OP_READ = 1, OBJ_OP_WRITE, - OBJ_OP_READ, OBJ_OP_DISCARD, }; -enum obj_req_flags { - OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */ - OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */ - OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */ - OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ +/* + * Writes go through the following state machine to deal with + * layering: + * + * need copyup + * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP + * | ^ | + * v \------------------------------/ + * done + * ^ + * | + * RBD_OBJ_WRITE_FLAT + * + * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether + * there is a parent or not. + */ +enum rbd_obj_write_state { + RBD_OBJ_WRITE_FLAT = 1, + RBD_OBJ_WRITE_GUARD, + RBD_OBJ_WRITE_COPYUP, }; struct rbd_obj_request { - u64 object_no; - u64 offset; /* object start byte */ - u64 length; /* bytes from offset */ - unsigned long flags; - - /* - * An object request associated with an image will have its - * img_data flag set; a standalone object request will not. - * - * A standalone object request will have which == BAD_WHICH - * and a null obj_request pointer. - * - * An object request initiated in support of a layered image - * object (to check for its existence before a write) will - * have which == BAD_WHICH and a non-null obj_request pointer. - * - * Finally, an object request for rbd image data will have - * which != BAD_WHICH, and will have a non-null img_request - * pointer. The value of which will be in the range - * 0..(img_request->obj_request_count-1). - */ + struct ceph_object_extent ex; union { - struct rbd_obj_request *obj_request; /* STAT op */ - struct { - struct rbd_img_request *img_request; - u64 img_offset; - /* links for img_request->obj_requests list */ - struct list_head links; - }; + bool tried_parent; /* for reads */ + enum rbd_obj_write_state write_state; /* for writes */ }; - u32 which; /* posn image request list */ - enum obj_request_type type; + struct rbd_img_request *img_request; + struct ceph_file_extent *img_extents; + u32 num_img_extents; + union { - struct bio *bio_list; + struct ceph_bio_iter bio_pos; struct { - struct page **pages; - u32 page_count; + struct ceph_bvec_iter bvec_pos; + u32 bvec_count; + u32 bvec_idx; }; }; - struct page **copyup_pages; - u32 copyup_page_count; + struct bio_vec *copyup_bvecs; + u32 copyup_bvec_count; struct ceph_osd_request *osd_req; u64 xferred; /* bytes transferred */ int result; - rbd_obj_callback_t callback; - struct kref kref; }; enum img_req_flags { - IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */ IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ - IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */ }; struct rbd_img_request { struct rbd_device *rbd_dev; - u64 offset; /* starting image byte offset */ - u64 length; /* byte count from offset */ + enum obj_operation_type op_type; + enum obj_request_type data_type; unsigned long flags; union { u64 snap_id; /* for reads */ @@ -298,26 +285,21 @@ struct rbd_img_request { struct request *rq; /* block request */ struct rbd_obj_request *obj_request; /* obj req initiator */ }; - struct page **copyup_pages; - u32 copyup_page_count; - spinlock_t completion_lock;/* protects next_completion */ - u32 next_completion; - rbd_img_callback_t callback; + spinlock_t completion_lock; u64 xferred;/* aggregate bytes transferred */ int result; /* first nonzero obj_request result */ + struct list_head object_extents; /* obj_req.ex structs */ u32 obj_request_count; - struct list_head obj_requests; /* rbd_obj_request structs */ + u32 pending_count; struct kref kref; }; #define for_each_obj_request(ireq, oreq) \ - list_for_each_entry(oreq, &(ireq)->obj_requests, links) -#define for_each_obj_request_from(ireq, oreq) \ - list_for_each_entry_from(oreq, &(ireq)->obj_requests, links) + list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item) #define for_each_obj_request_safe(ireq, oreq, n) \ - list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) + list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item) enum rbd_watch_state { RBD_WATCH_STATE_UNREGISTERED, @@ -433,8 +415,6 @@ static DEFINE_SPINLOCK(rbd_client_list_lock); static struct kmem_cache *rbd_img_request_cache; static struct kmem_cache *rbd_obj_request_cache; -static struct bio_set *rbd_bio_clone; - static int rbd_major; static DEFINE_IDA(rbd_dev_id_ida); @@ -447,8 +427,6 @@ static bool single_major = true; module_param(single_major, bool, S_IRUGO); MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)"); -static int rbd_img_request_submit(struct rbd_img_request *img_request); - static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count); static ssize_t rbd_remove(struct bus_type *bus, const char *buf, @@ -458,7 +436,6 @@ static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf, static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf, size_t count); static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth); -static void rbd_spec_put(struct rbd_spec *spec); static int rbd_dev_id_to_minor(int dev_id) { @@ -577,9 +554,6 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) # define rbd_assert(expr) ((void) 0) #endif /* !RBD_DEBUG */ -static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request); -static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request); -static void rbd_img_parent_read(struct rbd_obj_request *obj_request); static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); static int rbd_dev_refresh(struct rbd_device *rbd_dev); @@ -857,26 +831,6 @@ static char* obj_op_name(enum obj_operation_type op_type) } /* - * Get a ceph client with specific addr and configuration, if one does - * not exist create it. Either way, ceph_opts is consumed by this - * function. - */ -static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) -{ - struct rbd_client *rbdc; - - mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING); - rbdc = rbd_client_find(ceph_opts); - if (rbdc) /* using an existing client */ - ceph_destroy_options(ceph_opts); - else - rbdc = rbd_client_create(ceph_opts); - mutex_unlock(&client_mutex); - - return rbdc; -} - -/* * Destroy ceph client * * Caller must hold rbd_client_list_lock. @@ -904,6 +858,56 @@ static void rbd_put_client(struct rbd_client *rbdc) kref_put(&rbdc->kref, rbd_client_release); } +static int wait_for_latest_osdmap(struct ceph_client *client) +{ + u64 newest_epoch; + int ret; + + ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch); + if (ret) + return ret; + + if (client->osdc.osdmap->epoch >= newest_epoch) + return 0; + + ceph_osdc_maybe_request_map(&client->osdc); + return ceph_monc_wait_osdmap(&client->monc, newest_epoch, + client->options->mount_timeout); +} + +/* + * Get a ceph client with specific addr and configuration, if one does + * not exist create it. Either way, ceph_opts is consumed by this + * function. + */ +static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) +{ + struct rbd_client *rbdc; + int ret; + + mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING); + rbdc = rbd_client_find(ceph_opts); + if (rbdc) { + ceph_destroy_options(ceph_opts); + + /* + * Using an existing client. Make sure ->pg_pools is up to + * date before we look up the pool id in do_rbd_add(). + */ + ret = wait_for_latest_osdmap(rbdc->client); + if (ret) { + rbd_warn(NULL, "failed to get latest osdmap: %d", ret); + rbd_put_client(rbdc); + rbdc = ERR_PTR(ret); + } + } else { + rbdc = rbd_client_create(ceph_opts); + } + mutex_unlock(&client_mutex); + + return rbdc; +} + static bool rbd_image_format_valid(u32 image_format) { return image_format == 1 || image_format == 2; @@ -1223,272 +1227,59 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) rbd_dev->mapping.features = 0; } -static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) -{ - u64 segment_size = rbd_obj_bytes(&rbd_dev->header); - - return offset & (segment_size - 1); -} - -static u64 rbd_segment_length(struct rbd_device *rbd_dev, - u64 offset, u64 length) -{ - u64 segment_size = rbd_obj_bytes(&rbd_dev->header); - - offset &= segment_size - 1; - - rbd_assert(length <= U64_MAX - offset); - if (offset + length > segment_size) - length = segment_size - offset; - - return length; -} - -/* - * bio helpers - */ - -static void bio_chain_put(struct bio *chain) -{ - struct bio *tmp; - - while (chain) { - tmp = chain; - chain = chain->bi_next; - bio_put(tmp); - } -} - -/* - * zeros a bio chain, starting at specific offset - */ -static void zero_bio_chain(struct bio *chain, int start_ofs) +static void zero_bvec(struct bio_vec *bv) { - struct bio_vec bv; - struct bvec_iter iter; - unsigned long flags; void *buf; - int pos = 0; - - while (chain) { - bio_for_each_segment(bv, chain, iter) { - if (pos + bv.bv_len > start_ofs) { - int remainder = max(start_ofs - pos, 0); - buf = bvec_kmap_irq(&bv, &flags); - memset(buf + remainder, 0, - bv.bv_len - remainder); - flush_dcache_page(bv.bv_page); - bvec_kunmap_irq(buf, &flags); - } - pos += bv.bv_len; - } + unsigned long flags; - chain = chain->bi_next; - } + buf = bvec_kmap_irq(bv, &flags); + memset(buf, 0, bv->bv_len); + flush_dcache_page(bv->bv_page); + bvec_kunmap_irq(buf, &flags); } -/* - * similar to zero_bio_chain(), zeros data defined by a page array, - * starting at the given byte offset from the start of the array and - * continuing up to the given end offset. The pages array is - * assumed to be big enough to hold all bytes up to the end. - */ -static void zero_pages(struct page **pages, u64 offset, u64 end) +static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes) { - struct page **page = &pages[offset >> PAGE_SHIFT]; + struct ceph_bio_iter it = *bio_pos; - rbd_assert(end > offset); - rbd_assert(end - offset <= (u64)SIZE_MAX); - while (offset < end) { - size_t page_offset; - size_t length; - unsigned long flags; - void *kaddr; - - page_offset = offset & ~PAGE_MASK; - length = min_t(size_t, PAGE_SIZE - page_offset, end - offset); - local_irq_save(flags); - kaddr = kmap_atomic(*page); - memset(kaddr + page_offset, 0, length); - flush_dcache_page(*page); - kunmap_atomic(kaddr); - local_irq_restore(flags); - - offset += length; - page++; - } + ceph_bio_iter_advance(&it, off); + ceph_bio_iter_advance_step(&it, bytes, ({ + zero_bvec(&bv); + })); } -/* - * Clone a portion of a bio, starting at the given byte offset - * and continuing for the number of bytes indicated. - */ -static struct bio *bio_clone_range(struct bio *bio_src, - unsigned int offset, - unsigned int len, - gfp_t gfpmask) +static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes) { - struct bio *bio; - - bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone); - if (!bio) - return NULL; /* ENOMEM */ + struct ceph_bvec_iter it = *bvec_pos; - bio_advance(bio, offset); - bio->bi_iter.bi_size = len; - - return bio; + ceph_bvec_iter_advance(&it, off); + ceph_bvec_iter_advance_step(&it, bytes, ({ + zero_bvec(&bv); + })); } /* - * Clone a portion of a bio chain, starting at the given byte offset - * into the first bio in the source chain and continuing for the - * number of bytes indicated. The result is another bio chain of - * exactly the given length, or a null pointer on error. - * - * The bio_src and offset parameters are both in-out. On entry they - * refer to the first source bio and the offset into that bio where - * the start of data to be cloned is located. + * Zero a range in @obj_req data buffer defined by a bio (list) or + * (private) bio_vec array. * - * On return, bio_src is updated to refer to the bio in the source - * chain that contains first un-cloned byte, and *offset will - * contain the offset of that byte within that bio. + * @off is relative to the start of the data buffer. */ -static struct bio *bio_chain_clone_range(struct bio **bio_src, - unsigned int *offset, - unsigned int len, - gfp_t gfpmask) +static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, + u32 bytes) { - struct bio *bi = *bio_src; - unsigned int off = *offset; - struct bio *chain = NULL; - struct bio **end; - - /* Build up a chain of clone bios up to the limit */ - - if (!bi || off >= bi->bi_iter.bi_size || !len) - return NULL; /* Nothing to clone */ - - end = &chain; - while (len) { - unsigned int bi_size; - struct bio *bio; - - if (!bi) { - rbd_warn(NULL, "bio_chain exhausted with %u left", len); - goto out_err; /* EINVAL; ran out of bio's */ - } - bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len); - bio = bio_clone_range(bi, off, bi_size, gfpmask); - if (!bio) - goto out_err; /* ENOMEM */ - - *end = bio; - end = &bio->bi_next; - - off += bi_size; - if (off == bi->bi_iter.bi_size) { - bi = bi->bi_next; - off = 0; - } - len -= bi_size; - } - *bio_src = bi; - *offset = off; - - return chain; -out_err: - bio_chain_put(chain); - - return NULL; -} - -/* - * The default/initial value for all object request flags is 0. For - * each flag, once its value is set to 1 it is never reset to 0 - * again. - */ -static void obj_request_img_data_set(struct rbd_obj_request *obj_request) -{ - if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) { - struct rbd_device *rbd_dev; - - rbd_dev = obj_request->img_request->rbd_dev; - rbd_warn(rbd_dev, "obj_request %p already marked img_data", - obj_request); - } -} - -static bool obj_request_img_data_test(struct rbd_obj_request *obj_request) -{ - smp_mb(); - return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0; -} - -static void obj_request_done_set(struct rbd_obj_request *obj_request) -{ - if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) { - struct rbd_device *rbd_dev = NULL; - - if (obj_request_img_data_test(obj_request)) - rbd_dev = obj_request->img_request->rbd_dev; - rbd_warn(rbd_dev, "obj_request %p already marked done", - obj_request); + switch (obj_req->img_request->data_type) { + case OBJ_REQUEST_BIO: + zero_bios(&obj_req->bio_pos, off, bytes); + break; + case OBJ_REQUEST_BVECS: + case OBJ_REQUEST_OWN_BVECS: + zero_bvecs(&obj_req->bvec_pos, off, bytes); + break; + default: + rbd_assert(0); } } -static bool obj_request_done_test(struct rbd_obj_request *obj_request) -{ - smp_mb(); - return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0; -} - -/* - * This sets the KNOWN flag after (possibly) setting the EXISTS - * flag. The latter is set based on the "exists" value provided. - * - * Note that for our purposes once an object exists it never goes - * away again. It's possible that the response from two existence - * checks are separated by the creation of the target object, and - * the first ("doesn't exist") response arrives *after* the second - * ("does exist"). In that case we ignore the second one. - */ -static void obj_request_existence_set(struct rbd_obj_request *obj_request, - bool exists) -{ - if (exists) - set_bit(OBJ_REQ_EXISTS, &obj_request->flags); - set_bit(OBJ_REQ_KNOWN, &obj_request->flags); - smp_mb(); -} - -static bool obj_request_known_test(struct rbd_obj_request *obj_request) -{ - smp_mb(); - return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0; -} - -static bool obj_request_exists_test(struct rbd_obj_request *obj_request) -{ - smp_mb(); - return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0; -} - -static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request) -{ - struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; - - return obj_request->img_offset < - round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header)); -} - -static void rbd_obj_request_get(struct rbd_obj_request *obj_request) -{ - dout("%s: obj %p (was %d)\n", __func__, obj_request, - kref_read(&obj_request->kref)); - kref_get(&obj_request->kref); -} - static void rbd_obj_request_destroy(struct kref *kref); static void rbd_obj_request_put(struct rbd_obj_request *obj_request) { @@ -1505,18 +1296,13 @@ static void rbd_img_request_get(struct rbd_img_request *img_request) kref_get(&img_request->kref); } -static bool img_request_child_test(struct rbd_img_request *img_request); -static void rbd_parent_request_destroy(struct kref *kref); static void rbd_img_request_destroy(struct kref *kref); static void rbd_img_request_put(struct rbd_img_request *img_request) { rbd_assert(img_request != NULL); dout("%s: img %p (was %d)\n", __func__, img_request, kref_read(&img_request->kref)); - if (img_request_child_test(img_request)) - kref_put(&img_request->kref, rbd_parent_request_destroy); - else - kref_put(&img_request->kref, rbd_img_request_destroy); + kref_put(&img_request->kref, rbd_img_request_destroy); } static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, @@ -1526,139 +1312,37 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, /* Image request now owns object's original reference */ obj_request->img_request = img_request; - obj_request->which = img_request->obj_request_count; - rbd_assert(!obj_request_img_data_test(obj_request)); - obj_request_img_data_set(obj_request); - rbd_assert(obj_request->which != BAD_WHICH); img_request->obj_request_count++; - list_add_tail(&obj_request->links, &img_request->obj_requests); - dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, - obj_request->which); + img_request->pending_count++; + dout("%s: img %p obj %p\n", __func__, img_request, obj_request); } static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, struct rbd_obj_request *obj_request) { - rbd_assert(obj_request->which != BAD_WHICH); - - dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, - obj_request->which); - list_del(&obj_request->links); + dout("%s: img %p obj %p\n", __func__, img_request, obj_request); + list_del(&obj_request->ex.oe_item); rbd_assert(img_request->obj_request_count > 0); img_request->obj_request_count--; - rbd_assert(obj_request->which == img_request->obj_request_count); - obj_request->which = BAD_WHICH; - rbd_assert(obj_request_img_data_test(obj_request)); rbd_assert(obj_request->img_request == img_request); - obj_request->img_request = NULL; - obj_request->callback = NULL; rbd_obj_request_put(obj_request); } -static bool obj_request_type_valid(enum obj_request_type type) -{ - switch (type) { - case OBJ_REQUEST_NODATA: - case OBJ_REQUEST_BIO: - case OBJ_REQUEST_PAGES: - return true; - default: - return false; - } -} - -static void rbd_img_obj_callback(struct rbd_obj_request *obj_request); - static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) { struct ceph_osd_request *osd_req = obj_request->osd_req; dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, - obj_request, obj_request->object_no, obj_request->offset, - obj_request->length, osd_req); - if (obj_request_img_data_test(obj_request)) { - WARN_ON(obj_request->callback != rbd_img_obj_callback); - rbd_img_request_get(obj_request->img_request); - } + obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off, + obj_request->ex.oe_len, osd_req); ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); } -static void rbd_img_request_complete(struct rbd_img_request *img_request) -{ - - dout("%s: img %p\n", __func__, img_request); - - /* - * If no error occurred, compute the aggregate transfer - * count for the image request. We could instead use - * atomic64_cmpxchg() to update it as each object request - * completes; not clear which way is better off hand. - */ - if (!img_request->result) { - struct rbd_obj_request *obj_request; - u64 xferred = 0; - - for_each_obj_request(img_request, obj_request) - xferred += obj_request->xferred; - img_request->xferred = xferred; - } - - if (img_request->callback) - img_request->callback(img_request); - else - rbd_img_request_put(img_request); -} - /* * The default/initial value for all image request flags is 0. Each * is conditionally set to 1 at image request initialization time * and currently never change thereafter. */ -static void img_request_write_set(struct rbd_img_request *img_request) -{ - set_bit(IMG_REQ_WRITE, &img_request->flags); - smp_mb(); -} - -static bool img_request_write_test(struct rbd_img_request *img_request) -{ - smp_mb(); - return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0; -} - -/* - * Set the discard flag when the img_request is an discard request - */ -static void img_request_discard_set(struct rbd_img_request *img_request) -{ - set_bit(IMG_REQ_DISCARD, &img_request->flags); - smp_mb(); -} - -static bool img_request_discard_test(struct rbd_img_request *img_request) -{ - smp_mb(); - return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0; -} - -static void img_request_child_set(struct rbd_img_request *img_request) -{ - set_bit(IMG_REQ_CHILD, &img_request->flags); - smp_mb(); -} - -static void img_request_child_clear(struct rbd_img_request *img_request) -{ - clear_bit(IMG_REQ_CHILD, &img_request->flags); - smp_mb(); -} - -static bool img_request_child_test(struct rbd_img_request *img_request) -{ - smp_mb(); - return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0; -} - static void img_request_layered_set(struct rbd_img_request *img_request) { set_bit(IMG_REQ_LAYERED, &img_request->flags); @@ -1677,209 +1361,70 @@ static bool img_request_layered_test(struct rbd_img_request *img_request) return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; } -static enum obj_operation_type -rbd_img_request_op_type(struct rbd_img_request *img_request) -{ - if (img_request_write_test(img_request)) - return OBJ_OP_WRITE; - else if (img_request_discard_test(img_request)) - return OBJ_OP_DISCARD; - else - return OBJ_OP_READ; -} - -static void -rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) -{ - u64 xferred = obj_request->xferred; - u64 length = obj_request->length; - - dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, - obj_request, obj_request->img_request, obj_request->result, - xferred, length); - /* - * ENOENT means a hole in the image. We zero-fill the entire - * length of the request. A short read also implies zero-fill - * to the end of the request. An error requires the whole - * length of the request to be reported finished with an error - * to the block layer. In each case we update the xferred - * count to indicate the whole request was satisfied. - */ - rbd_assert(obj_request->type != OBJ_REQUEST_NODATA); - if (obj_request->result == -ENOENT) { - if (obj_request->type == OBJ_REQUEST_BIO) - zero_bio_chain(obj_request->bio_list, 0); - else - zero_pages(obj_request->pages, 0, length); - obj_request->result = 0; - } else if (xferred < length && !obj_request->result) { - if (obj_request->type == OBJ_REQUEST_BIO) - zero_bio_chain(obj_request->bio_list, xferred); - else - zero_pages(obj_request->pages, xferred, length); - } - obj_request->xferred = length; - obj_request_done_set(obj_request); -} - -static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) +static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req) { - dout("%s: obj %p cb %p\n", __func__, obj_request, - obj_request->callback); - obj_request->callback(obj_request); -} + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; -static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err) -{ - obj_request->result = err; - obj_request->xferred = 0; - /* - * kludge - mirror rbd_obj_request_submit() to match a put in - * rbd_img_obj_callback() - */ - if (obj_request_img_data_test(obj_request)) { - WARN_ON(obj_request->callback != rbd_img_obj_callback); - rbd_img_request_get(obj_request->img_request); - } - obj_request_done_set(obj_request); - rbd_obj_request_complete(obj_request); + return !obj_req->ex.oe_off && + obj_req->ex.oe_len == rbd_dev->layout.object_size; } -static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) +static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req) { - struct rbd_img_request *img_request = NULL; - struct rbd_device *rbd_dev = NULL; - bool layered = false; - - if (obj_request_img_data_test(obj_request)) { - img_request = obj_request->img_request; - layered = img_request && img_request_layered_test(img_request); - rbd_dev = img_request->rbd_dev; - } - - dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, - obj_request, img_request, obj_request->result, - obj_request->xferred, obj_request->length); - if (layered && obj_request->result == -ENOENT && - obj_request->img_offset < rbd_dev->parent_overlap) - rbd_img_parent_read(obj_request); - else if (img_request) - rbd_img_obj_request_read_callback(obj_request); - else - obj_request_done_set(obj_request); -} + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; -static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) -{ - dout("%s: obj %p result %d %llu\n", __func__, obj_request, - obj_request->result, obj_request->length); - /* - * There is no such thing as a successful short write. Set - * it to our originally-requested length. - */ - obj_request->xferred = obj_request->length; - obj_request_done_set(obj_request); + return obj_req->ex.oe_off + obj_req->ex.oe_len == + rbd_dev->layout.object_size; } -static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request) +static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req) { - dout("%s: obj %p result %d %llu\n", __func__, obj_request, - obj_request->result, obj_request->length); - /* - * There is no such thing as a successful short discard. Set - * it to our originally-requested length. - */ - obj_request->xferred = obj_request->length; - /* discarding a non-existent object is not a problem */ - if (obj_request->result == -ENOENT) - obj_request->result = 0; - obj_request_done_set(obj_request); + return ceph_file_extents_bytes(obj_req->img_extents, + obj_req->num_img_extents); } -/* - * For a simple stat call there's nothing to do. We'll do more if - * this is part of a write sequence for a layered image. - */ -static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request) +static bool rbd_img_is_write(struct rbd_img_request *img_req) { - dout("%s: obj %p\n", __func__, obj_request); - obj_request_done_set(obj_request); + switch (img_req->op_type) { + case OBJ_OP_READ: + return false; + case OBJ_OP_WRITE: + case OBJ_OP_DISCARD: + return true; + default: + rbd_assert(0); + } } -static void rbd_osd_call_callback(struct rbd_obj_request *obj_request) -{ - dout("%s: obj %p\n", __func__, obj_request); - - if (obj_request_img_data_test(obj_request)) - rbd_osd_copyup_callback(obj_request); - else - obj_request_done_set(obj_request); -} +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req); static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) { - struct rbd_obj_request *obj_request = osd_req->r_priv; - u16 opcode; + struct rbd_obj_request *obj_req = osd_req->r_priv; - dout("%s: osd_req %p\n", __func__, osd_req); - rbd_assert(osd_req == obj_request->osd_req); - if (obj_request_img_data_test(obj_request)) { - rbd_assert(obj_request->img_request); - rbd_assert(obj_request->which != BAD_WHICH); - } else { - rbd_assert(obj_request->which == BAD_WHICH); - } - - if (osd_req->r_result < 0) - obj_request->result = osd_req->r_result; - - /* - * We support a 64-bit length, but ultimately it has to be - * passed to the block layer, which just supports a 32-bit - * length field. - */ - obj_request->xferred = osd_req->r_ops[0].outdata_len; - rbd_assert(obj_request->xferred < (u64)UINT_MAX); + dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, + osd_req->r_result, obj_req); + rbd_assert(osd_req == obj_req->osd_req); - opcode = osd_req->r_ops[0].op; - switch (opcode) { - case CEPH_OSD_OP_READ: - rbd_osd_read_callback(obj_request); - break; - case CEPH_OSD_OP_SETALLOCHINT: - rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE || - osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL); - /* fall through */ - case CEPH_OSD_OP_WRITE: - case CEPH_OSD_OP_WRITEFULL: - rbd_osd_write_callback(obj_request); - break; - case CEPH_OSD_OP_STAT: - rbd_osd_stat_callback(obj_request); - break; - case CEPH_OSD_OP_DELETE: - case CEPH_OSD_OP_TRUNCATE: - case CEPH_OSD_OP_ZERO: - rbd_osd_discard_callback(obj_request); - break; - case CEPH_OSD_OP_CALL: - rbd_osd_call_callback(obj_request); - break; - default: - rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d", - obj_request->object_no, opcode); - break; - } + obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0; + if (!obj_req->result && !rbd_img_is_write(obj_req->img_request)) + obj_req->xferred = osd_req->r_result; + else + /* + * Writes aren't allowed to return a data payload. In some + * guarded write cases (e.g. stat + zero on an empty object) + * a stat response makes it through, but we don't care. + */ + obj_req->xferred = 0; - if (obj_request_done_test(obj_request)) - rbd_obj_request_complete(obj_request); + rbd_obj_handle_request(obj_req); } static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) { struct ceph_osd_request *osd_req = obj_request->osd_req; - rbd_assert(obj_request_img_data_test(obj_request)); + osd_req->r_flags = CEPH_OSD_FLAG_READ; osd_req->r_snapid = obj_request->img_request->snap_id; } @@ -1887,32 +1432,33 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) { struct ceph_osd_request *osd_req = obj_request->osd_req; + osd_req->r_flags = CEPH_OSD_FLAG_WRITE; ktime_get_real_ts(&osd_req->r_mtime); - osd_req->r_data_offset = obj_request->offset; + osd_req->r_data_offset = obj_request->ex.oe_off; } static struct ceph_osd_request * -__rbd_osd_req_create(struct rbd_device *rbd_dev, - struct ceph_snap_context *snapc, - int num_ops, unsigned int flags, - struct rbd_obj_request *obj_request) +rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops) { + struct rbd_img_request *img_req = obj_req->img_request; + struct rbd_device *rbd_dev = img_req->rbd_dev; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_request *req; const char *name_format = rbd_dev->image_format == 1 ? RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; - req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); + req = ceph_osdc_alloc_request(osdc, + (rbd_img_is_write(img_req) ? img_req->snapc : NULL), + num_ops, false, GFP_NOIO); if (!req) return NULL; - req->r_flags = flags; req->r_callback = rbd_osd_req_callback; - req->r_priv = obj_request; + req->r_priv = obj_req; req->r_base_oloc.pool = rbd_dev->layout.pool_id; if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, - rbd_dev->header.object_prefix, obj_request->object_no)) + rbd_dev->header.object_prefix, obj_req->ex.oe_objno)) goto err_req; if (ceph_osdc_alloc_messages(req, GFP_NOIO)) @@ -1925,83 +1471,20 @@ err_req: return NULL; } -/* - * Create an osd request. A read request has one osd op (read). - * A write request has either one (watch) or two (hint+write) osd ops. - * (All rbd data writes are prefixed with an allocation hint op, but - * technically osd watch is a write request, hence this distinction.) - */ -static struct ceph_osd_request *rbd_osd_req_create( - struct rbd_device *rbd_dev, - enum obj_operation_type op_type, - unsigned int num_ops, - struct rbd_obj_request *obj_request) -{ - struct ceph_snap_context *snapc = NULL; - - if (obj_request_img_data_test(obj_request) && - (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) { - struct rbd_img_request *img_request = obj_request->img_request; - if (op_type == OBJ_OP_WRITE) { - rbd_assert(img_request_write_test(img_request)); - } else { - rbd_assert(img_request_discard_test(img_request)); - } - snapc = img_request->snapc; - } - - rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2)); - - return __rbd_osd_req_create(rbd_dev, snapc, num_ops, - (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ? - CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request); -} - -/* - * Create a copyup osd request based on the information in the object - * request supplied. A copyup request has two or three osd ops, a - * copyup method call, potentially a hint op, and a write or truncate - * or zero op. - */ -static struct ceph_osd_request * -rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) -{ - struct rbd_img_request *img_request; - int num_osd_ops = 3; - - rbd_assert(obj_request_img_data_test(obj_request)); - img_request = obj_request->img_request; - rbd_assert(img_request); - rbd_assert(img_request_write_test(img_request) || - img_request_discard_test(img_request)); - - if (img_request_discard_test(img_request)) - num_osd_ops = 2; - - return __rbd_osd_req_create(img_request->rbd_dev, - img_request->snapc, num_osd_ops, - CEPH_OSD_FLAG_WRITE, obj_request); -} - static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) { ceph_osdc_put_request(osd_req); } -static struct rbd_obj_request * -rbd_obj_request_create(enum obj_request_type type) +static struct rbd_obj_request *rbd_obj_request_create(void) { struct rbd_obj_request *obj_request; - rbd_assert(obj_request_type_valid(type)); - obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); if (!obj_request) return NULL; - obj_request->which = BAD_WHICH; - obj_request->type = type; - INIT_LIST_HEAD(&obj_request->links); + ceph_object_extent_init(&obj_request->ex); kref_init(&obj_request->kref); dout("%s %p\n", __func__, obj_request); @@ -2011,32 +1494,34 @@ rbd_obj_request_create(enum obj_request_type type) static void rbd_obj_request_destroy(struct kref *kref) { struct rbd_obj_request *obj_request; + u32 i; obj_request = container_of(kref, struct rbd_obj_request, kref); dout("%s: obj %p\n", __func__, obj_request); - rbd_assert(obj_request->img_request == NULL); - rbd_assert(obj_request->which == BAD_WHICH); - if (obj_request->osd_req) rbd_osd_req_destroy(obj_request->osd_req); - rbd_assert(obj_request_type_valid(obj_request->type)); - switch (obj_request->type) { + switch (obj_request->img_request->data_type) { case OBJ_REQUEST_NODATA: - break; /* Nothing to do */ case OBJ_REQUEST_BIO: - if (obj_request->bio_list) - bio_chain_put(obj_request->bio_list); - break; - case OBJ_REQUEST_PAGES: - /* img_data requests don't own their page array */ - if (obj_request->pages && - !obj_request_img_data_test(obj_request)) - ceph_release_page_vector(obj_request->pages, - obj_request->page_count); + case OBJ_REQUEST_BVECS: + break; /* Nothing to do */ + case OBJ_REQUEST_OWN_BVECS: + kfree(obj_request->bvec_pos.bvecs); break; + default: + rbd_assert(0); + } + + kfree(obj_request->img_extents); + if (obj_request->copyup_bvecs) { + for (i = 0; i < obj_request->copyup_bvec_count; i++) { + if (obj_request->copyup_bvecs[i].bv_page) + __free_page(obj_request->copyup_bvecs[i].bv_page); + } + kfree(obj_request->copyup_bvecs); } kmem_cache_free(rbd_obj_request_cache, obj_request); @@ -2111,7 +1596,6 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev) */ static struct rbd_img_request *rbd_img_request_create( struct rbd_device *rbd_dev, - u64 offset, u64 length, enum obj_operation_type op_type, struct ceph_snap_context *snapc) { @@ -2122,27 +1606,21 @@ static struct rbd_img_request *rbd_img_request_create( return NULL; img_request->rbd_dev = rbd_dev; - img_request->offset = offset; - img_request->length = length; - if (op_type == OBJ_OP_DISCARD) { - img_request_discard_set(img_request); - img_request->snapc = snapc; - } else if (op_type == OBJ_OP_WRITE) { - img_request_write_set(img_request); - img_request->snapc = snapc; - } else { + img_request->op_type = op_type; + if (!rbd_img_is_write(img_request)) img_request->snap_id = rbd_dev->spec->snap_id; - } + else + img_request->snapc = snapc; + if (rbd_dev_parent_get(rbd_dev)) img_request_layered_set(img_request); spin_lock_init(&img_request->completion_lock); - INIT_LIST_HEAD(&img_request->obj_requests); + INIT_LIST_HEAD(&img_request->object_extents); kref_init(&img_request->kref); - dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, - obj_op_name(op_type), offset, length, img_request); - + dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev, + obj_op_name(op_type), img_request); return img_request; } @@ -2165,829 +1643,934 @@ static void rbd_img_request_destroy(struct kref *kref) rbd_dev_parent_put(img_request->rbd_dev); } - if (img_request_write_test(img_request) || - img_request_discard_test(img_request)) + if (rbd_img_is_write(img_request)) ceph_put_snap_context(img_request->snapc); kmem_cache_free(rbd_img_request_cache, img_request); } -static struct rbd_img_request *rbd_parent_request_create( - struct rbd_obj_request *obj_request, - u64 img_offset, u64 length) +static void prune_extents(struct ceph_file_extent *img_extents, + u32 *num_img_extents, u64 overlap) { - struct rbd_img_request *parent_request; - struct rbd_device *rbd_dev; + u32 cnt = *num_img_extents; - rbd_assert(obj_request->img_request); - rbd_dev = obj_request->img_request->rbd_dev; + /* drop extents completely beyond the overlap */ + while (cnt && img_extents[cnt - 1].fe_off >= overlap) + cnt--; - parent_request = rbd_img_request_create(rbd_dev->parent, img_offset, - length, OBJ_OP_READ, NULL); - if (!parent_request) - return NULL; + if (cnt) { + struct ceph_file_extent *ex = &img_extents[cnt - 1]; - img_request_child_set(parent_request); - rbd_obj_request_get(obj_request); - parent_request->obj_request = obj_request; + /* trim final overlapping extent */ + if (ex->fe_off + ex->fe_len > overlap) + ex->fe_len = overlap - ex->fe_off; + } - return parent_request; + *num_img_extents = cnt; } -static void rbd_parent_request_destroy(struct kref *kref) +/* + * Determine the byte range(s) covered by either just the object extent + * or the entire object in the parent image. + */ +static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req, + bool entire) { - struct rbd_img_request *parent_request; - struct rbd_obj_request *orig_request; + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; - parent_request = container_of(kref, struct rbd_img_request, kref); - orig_request = parent_request->obj_request; + if (!rbd_dev->parent_overlap) + return 0; - parent_request->obj_request = NULL; - rbd_obj_request_put(orig_request); - img_request_child_clear(parent_request); + ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno, + entire ? 0 : obj_req->ex.oe_off, + entire ? rbd_dev->layout.object_size : + obj_req->ex.oe_len, + &obj_req->img_extents, + &obj_req->num_img_extents); + if (ret) + return ret; - rbd_img_request_destroy(kref); + prune_extents(obj_req->img_extents, &obj_req->num_img_extents, + rbd_dev->parent_overlap); + return 0; } -static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) +static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) { - struct rbd_img_request *img_request; - unsigned int xferred; - int result; - bool more; - - rbd_assert(obj_request_img_data_test(obj_request)); - img_request = obj_request->img_request; - - rbd_assert(obj_request->xferred <= (u64)UINT_MAX); - xferred = (unsigned int)obj_request->xferred; - result = obj_request->result; - if (result) { - struct rbd_device *rbd_dev = img_request->rbd_dev; - enum obj_operation_type op_type; - - if (img_request_discard_test(img_request)) - op_type = OBJ_OP_DISCARD; - else if (img_request_write_test(img_request)) - op_type = OBJ_OP_WRITE; - else - op_type = OBJ_OP_READ; - - rbd_warn(rbd_dev, "%s %llx at %llx (%llx)", - obj_op_name(op_type), obj_request->length, - obj_request->img_offset, obj_request->offset); - rbd_warn(rbd_dev, " result %d xferred %x", - result, xferred); - if (!img_request->result) - img_request->result = result; - /* - * Need to end I/O on the entire obj_request worth of - * bytes in case of error. - */ - xferred = obj_request->length; + switch (obj_req->img_request->data_type) { + case OBJ_REQUEST_BIO: + osd_req_op_extent_osd_data_bio(obj_req->osd_req, which, + &obj_req->bio_pos, + obj_req->ex.oe_len); + break; + case OBJ_REQUEST_BVECS: + case OBJ_REQUEST_OWN_BVECS: + rbd_assert(obj_req->bvec_pos.iter.bi_size == + obj_req->ex.oe_len); + rbd_assert(obj_req->bvec_idx == obj_req->bvec_count); + osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which, + &obj_req->bvec_pos); + break; + default: + rbd_assert(0); } +} - if (img_request_child_test(img_request)) { - rbd_assert(img_request->obj_request != NULL); - more = obj_request->which < img_request->obj_request_count - 1; - } else { - blk_status_t status = errno_to_blk_status(result); +static int rbd_obj_setup_read(struct rbd_obj_request *obj_req) +{ + obj_req->osd_req = rbd_osd_req_create(obj_req, 1); + if (!obj_req->osd_req) + return -ENOMEM; - rbd_assert(img_request->rq != NULL); + osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ, + obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); + rbd_osd_req_setup_data(obj_req, 0); - more = blk_update_request(img_request->rq, status, xferred); - if (!more) - __blk_mq_end_request(img_request->rq, status); - } + rbd_osd_req_format_read(obj_req); + return 0; +} + +static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, + unsigned int which) +{ + struct page **pages; - return more; + /* + * The response data for a STAT call consists of: + * le64 length; + * struct { + * le32 tv_sec; + * le32 tv_nsec; + * } mtime; + */ + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0); + osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages, + 8 + sizeof(struct ceph_timespec), + 0, false, true); + return 0; } -static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) +static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req, + unsigned int which) { - struct rbd_img_request *img_request; - u32 which = obj_request->which; - bool more = true; + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u16 opcode; - rbd_assert(obj_request_img_data_test(obj_request)); - img_request = obj_request->img_request; + osd_req_op_alloc_hint_init(obj_req->osd_req, which++, + rbd_dev->layout.object_size, + rbd_dev->layout.object_size); - dout("%s: img %p obj %p\n", __func__, img_request, obj_request); - rbd_assert(img_request != NULL); - rbd_assert(img_request->obj_request_count > 0); - rbd_assert(which != BAD_WHICH); - rbd_assert(which < img_request->obj_request_count); + if (rbd_obj_is_entire(obj_req)) + opcode = CEPH_OSD_OP_WRITEFULL; + else + opcode = CEPH_OSD_OP_WRITE; - spin_lock_irq(&img_request->completion_lock); - if (which != img_request->next_completion) - goto out; + osd_req_op_extent_init(obj_req->osd_req, which, opcode, + obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); + rbd_osd_req_setup_data(obj_req, which++); + + rbd_assert(which == obj_req->osd_req->r_num_ops); + rbd_osd_req_format_write(obj_req); +} - for_each_obj_request_from(img_request, obj_request) { - rbd_assert(more); - rbd_assert(which < img_request->obj_request_count); +static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) +{ + unsigned int num_osd_ops, which = 0; + int ret; - if (!obj_request_done_test(obj_request)) - break; - more = rbd_img_obj_end_request(obj_request); - which++; + /* reverse map the entire object onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, true); + if (ret) + return ret; + + if (obj_req->num_img_extents) { + obj_req->write_state = RBD_OBJ_WRITE_GUARD; + num_osd_ops = 3; /* stat + setallochint + write/writefull */ + } else { + obj_req->write_state = RBD_OBJ_WRITE_FLAT; + num_osd_ops = 2; /* setallochint + write/writefull */ } - rbd_assert(more ^ (which == img_request->obj_request_count)); - img_request->next_completion = which; -out: - spin_unlock_irq(&img_request->completion_lock); - rbd_img_request_put(img_request); + obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); + if (!obj_req->osd_req) + return -ENOMEM; - if (!more) - rbd_img_request_complete(img_request); + if (obj_req->num_img_extents) { + ret = __rbd_obj_setup_stat(obj_req, which++); + if (ret) + return ret; + } + + __rbd_obj_setup_write(obj_req, which); + return 0; } -/* - * Add individual osd ops to the given ceph_osd_request and prepare - * them for submission. num_ops is the current number of - * osd operations already to the object request. - */ -static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request, - struct ceph_osd_request *osd_request, - enum obj_operation_type op_type, - unsigned int num_ops) -{ - struct rbd_img_request *img_request = obj_request->img_request; - struct rbd_device *rbd_dev = img_request->rbd_dev; - u64 object_size = rbd_obj_bytes(&rbd_dev->header); - u64 offset = obj_request->offset; - u64 length = obj_request->length; - u64 img_end; +static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req, + unsigned int which) +{ u16 opcode; - if (op_type == OBJ_OP_DISCARD) { - if (!offset && length == object_size && - (!img_request_layered_test(img_request) || - !obj_request_overlaps_parent(obj_request))) { - opcode = CEPH_OSD_OP_DELETE; - } else if ((offset + length == object_size)) { + if (rbd_obj_is_entire(obj_req)) { + if (obj_req->num_img_extents) { + osd_req_op_init(obj_req->osd_req, which++, + CEPH_OSD_OP_CREATE, 0); opcode = CEPH_OSD_OP_TRUNCATE; } else { - down_read(&rbd_dev->header_rwsem); - img_end = rbd_dev->header.image_size; - up_read(&rbd_dev->header_rwsem); - - if (obj_request->img_offset + length == img_end) - opcode = CEPH_OSD_OP_TRUNCATE; - else - opcode = CEPH_OSD_OP_ZERO; + osd_req_op_init(obj_req->osd_req, which++, + CEPH_OSD_OP_DELETE, 0); + opcode = 0; } - } else if (op_type == OBJ_OP_WRITE) { - if (!offset && length == object_size) - opcode = CEPH_OSD_OP_WRITEFULL; - else - opcode = CEPH_OSD_OP_WRITE; - osd_req_op_alloc_hint_init(osd_request, num_ops, - object_size, object_size); - num_ops++; + } else if (rbd_obj_is_tail(obj_req)) { + opcode = CEPH_OSD_OP_TRUNCATE; } else { - opcode = CEPH_OSD_OP_READ; + opcode = CEPH_OSD_OP_ZERO; } - if (opcode == CEPH_OSD_OP_DELETE) - osd_req_op_init(osd_request, num_ops, opcode, 0); - else - osd_req_op_extent_init(osd_request, num_ops, opcode, - offset, length, 0, 0); - - if (obj_request->type == OBJ_REQUEST_BIO) - osd_req_op_extent_osd_data_bio(osd_request, num_ops, - obj_request->bio_list, length); - else if (obj_request->type == OBJ_REQUEST_PAGES) - osd_req_op_extent_osd_data_pages(osd_request, num_ops, - obj_request->pages, length, - offset & ~PAGE_MASK, false, false); - - /* Discards are also writes */ - if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) - rbd_osd_req_format_write(obj_request); - else - rbd_osd_req_format_read(obj_request); + if (opcode) + osd_req_op_extent_init(obj_req->osd_req, which++, opcode, + obj_req->ex.oe_off, obj_req->ex.oe_len, + 0, 0); + + rbd_assert(which == obj_req->osd_req->r_num_ops); + rbd_osd_req_format_write(obj_req); } -/* - * Split up an image request into one or more object requests, each - * to a different object. The "type" parameter indicates whether - * "data_desc" is the pointer to the head of a list of bio - * structures, or the base of a page array. In either case this - * function assumes data_desc describes memory sufficient to hold - * all data described by the image request. - */ -static int rbd_img_request_fill(struct rbd_img_request *img_request, - enum obj_request_type type, - void *data_desc) +static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) { - struct rbd_device *rbd_dev = img_request->rbd_dev; - struct rbd_obj_request *obj_request = NULL; - struct rbd_obj_request *next_obj_request; - struct bio *bio_list = NULL; - unsigned int bio_offset = 0; - struct page **pages = NULL; - enum obj_operation_type op_type; - u64 img_offset; - u64 resid; - - dout("%s: img %p type %d data_desc %p\n", __func__, img_request, - (int)type, data_desc); + unsigned int num_osd_ops, which = 0; + int ret; - img_offset = img_request->offset; - resid = img_request->length; - rbd_assert(resid > 0); - op_type = rbd_img_request_op_type(img_request); + /* reverse map the entire object onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, true); + if (ret) + return ret; - if (type == OBJ_REQUEST_BIO) { - bio_list = data_desc; - rbd_assert(img_offset == - bio_list->bi_iter.bi_sector << SECTOR_SHIFT); - } else if (type == OBJ_REQUEST_PAGES) { - pages = data_desc; + if (rbd_obj_is_entire(obj_req)) { + obj_req->write_state = RBD_OBJ_WRITE_FLAT; + if (obj_req->num_img_extents) + num_osd_ops = 2; /* create + truncate */ + else + num_osd_ops = 1; /* delete */ + } else { + if (obj_req->num_img_extents) { + obj_req->write_state = RBD_OBJ_WRITE_GUARD; + num_osd_ops = 2; /* stat + truncate/zero */ + } else { + obj_req->write_state = RBD_OBJ_WRITE_FLAT; + num_osd_ops = 1; /* truncate/zero */ + } } - while (resid) { - struct ceph_osd_request *osd_req; - u64 object_no = img_offset >> rbd_dev->header.obj_order; - u64 offset = rbd_segment_offset(rbd_dev, img_offset); - u64 length = rbd_segment_length(rbd_dev, img_offset, resid); - - obj_request = rbd_obj_request_create(type); - if (!obj_request) - goto out_unwind; - - obj_request->object_no = object_no; - obj_request->offset = offset; - obj_request->length = length; - - /* - * set obj_request->img_request before creating the - * osd_request so that it gets the right snapc - */ - rbd_img_obj_request_add(img_request, obj_request); - - if (type == OBJ_REQUEST_BIO) { - unsigned int clone_size; - - rbd_assert(length <= (u64)UINT_MAX); - clone_size = (unsigned int)length; - obj_request->bio_list = - bio_chain_clone_range(&bio_list, - &bio_offset, - clone_size, - GFP_NOIO); - if (!obj_request->bio_list) - goto out_unwind; - } else if (type == OBJ_REQUEST_PAGES) { - unsigned int page_count; - - obj_request->pages = pages; - page_count = (u32)calc_pages_for(offset, length); - obj_request->page_count = page_count; - if ((offset + length) & ~PAGE_MASK) - page_count--; /* more on last page */ - pages += page_count; - } + obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); + if (!obj_req->osd_req) + return -ENOMEM; - osd_req = rbd_osd_req_create(rbd_dev, op_type, - (op_type == OBJ_OP_WRITE) ? 2 : 1, - obj_request); - if (!osd_req) - goto out_unwind; + if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) { + ret = __rbd_obj_setup_stat(obj_req, which++); + if (ret) + return ret; + } - obj_request->osd_req = osd_req; - obj_request->callback = rbd_img_obj_callback; - obj_request->img_offset = img_offset; + __rbd_obj_setup_discard(obj_req, which); + return 0; +} - rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0); +/* + * For each object request in @img_req, allocate an OSD request, add + * individual OSD ops and prepare them for submission. The number of + * OSD ops depends on op_type and the overlap point (if any). + */ +static int __rbd_img_fill_request(struct rbd_img_request *img_req) +{ + struct rbd_obj_request *obj_req; + int ret; - img_offset += length; - resid -= length; + for_each_obj_request(img_req, obj_req) { + switch (img_req->op_type) { + case OBJ_OP_READ: + ret = rbd_obj_setup_read(obj_req); + break; + case OBJ_OP_WRITE: + ret = rbd_obj_setup_write(obj_req); + break; + case OBJ_OP_DISCARD: + ret = rbd_obj_setup_discard(obj_req); + break; + default: + rbd_assert(0); + } + if (ret) + return ret; } return 0; +} -out_unwind: - for_each_obj_request_safe(img_request, obj_request, next_obj_request) - rbd_img_obj_request_del(img_request, obj_request); +union rbd_img_fill_iter { + struct ceph_bio_iter bio_iter; + struct ceph_bvec_iter bvec_iter; +}; - return -ENOMEM; -} +struct rbd_img_fill_ctx { + enum obj_request_type pos_type; + union rbd_img_fill_iter *pos; + union rbd_img_fill_iter iter; + ceph_object_extent_fn_t set_pos_fn; + ceph_object_extent_fn_t count_fn; + ceph_object_extent_fn_t copy_fn; +}; -static void -rbd_osd_copyup_callback(struct rbd_obj_request *obj_request) +static struct ceph_object_extent *alloc_object_extent(void *arg) { - struct rbd_img_request *img_request; - struct rbd_device *rbd_dev; - struct page **pages; - u32 page_count; + struct rbd_img_request *img_req = arg; + struct rbd_obj_request *obj_req; - dout("%s: obj %p\n", __func__, obj_request); + obj_req = rbd_obj_request_create(); + if (!obj_req) + return NULL; - rbd_assert(obj_request->type == OBJ_REQUEST_BIO || - obj_request->type == OBJ_REQUEST_NODATA); - rbd_assert(obj_request_img_data_test(obj_request)); - img_request = obj_request->img_request; - rbd_assert(img_request); + rbd_img_obj_request_add(img_req, obj_req); + return &obj_req->ex; +} - rbd_dev = img_request->rbd_dev; - rbd_assert(rbd_dev); +/* + * While su != os && sc == 1 is technically not fancy (it's the same + * layout as su == os && sc == 1), we can't use the nocopy path for it + * because ->set_pos_fn() should be called only once per object. + * ceph_file_to_extents() invokes action_fn once per stripe unit, so + * treat su != os && sc == 1 as fancy. + */ +static bool rbd_layout_is_fancy(struct ceph_file_layout *l) +{ + return l->stripe_unit != l->object_size; +} - pages = obj_request->copyup_pages; - rbd_assert(pages != NULL); - obj_request->copyup_pages = NULL; - page_count = obj_request->copyup_page_count; - rbd_assert(page_count); - obj_request->copyup_page_count = 0; - ceph_release_page_vector(pages, page_count); +static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct rbd_img_fill_ctx *fctx) +{ + u32 i; + int ret; + + img_req->data_type = fctx->pos_type; /* - * We want the transfer count to reflect the size of the - * original write request. There is no such thing as a - * successful short write, so if the request was successful - * we can just set it to the originally-requested length. + * Create object requests and set each object request's starting + * position in the provided bio (list) or bio_vec array. */ - if (!obj_request->result) - obj_request->xferred = obj_request->length; + fctx->iter = *fctx->pos; + for (i = 0; i < num_img_extents; i++) { + ret = ceph_file_to_extents(&img_req->rbd_dev->layout, + img_extents[i].fe_off, + img_extents[i].fe_len, + &img_req->object_extents, + alloc_object_extent, img_req, + fctx->set_pos_fn, &fctx->iter); + if (ret) + return ret; + } - obj_request_done_set(obj_request); + return __rbd_img_fill_request(img_req); } -static void -rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) +/* + * Map a list of image extents to a list of object extents, create the + * corresponding object requests (normally each to a different object, + * but not always) and add them to @img_req. For each object request, + * set up its data descriptor to point to the corresponding chunk(s) of + * @fctx->pos data buffer. + * + * Because ceph_file_to_extents() will merge adjacent object extents + * together, each object request's data descriptor may point to multiple + * different chunks of @fctx->pos data buffer. + * + * @fctx->pos data buffer is assumed to be large enough. + */ +static int rbd_img_fill_request(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct rbd_img_fill_ctx *fctx) { - struct rbd_obj_request *orig_request; - struct ceph_osd_request *osd_req; - struct rbd_device *rbd_dev; - struct page **pages; - enum obj_operation_type op_type; - u32 page_count; - int img_result; - u64 parent_length; - - rbd_assert(img_request_child_test(img_request)); - - /* First get what we need from the image request */ - - pages = img_request->copyup_pages; - rbd_assert(pages != NULL); - img_request->copyup_pages = NULL; - page_count = img_request->copyup_page_count; - rbd_assert(page_count); - img_request->copyup_page_count = 0; - - orig_request = img_request->obj_request; - rbd_assert(orig_request != NULL); - rbd_assert(obj_request_type_valid(orig_request->type)); - img_result = img_request->result; - parent_length = img_request->length; - rbd_assert(img_result || parent_length == img_request->xferred); - rbd_img_request_put(img_request); + struct rbd_device *rbd_dev = img_req->rbd_dev; + struct rbd_obj_request *obj_req; + u32 i; + int ret; - rbd_assert(orig_request->img_request); - rbd_dev = orig_request->img_request->rbd_dev; - rbd_assert(rbd_dev); + if (fctx->pos_type == OBJ_REQUEST_NODATA || + !rbd_layout_is_fancy(&rbd_dev->layout)) + return rbd_img_fill_request_nocopy(img_req, img_extents, + num_img_extents, fctx); + + img_req->data_type = OBJ_REQUEST_OWN_BVECS; /* - * If the overlap has become 0 (most likely because the - * image has been flattened) we need to free the pages - * and re-submit the original write request. + * Create object requests and determine ->bvec_count for each object + * request. Note that ->bvec_count sum over all object requests may + * be greater than the number of bio_vecs in the provided bio (list) + * or bio_vec array because when mapped, those bio_vecs can straddle + * stripe unit boundaries. */ - if (!rbd_dev->parent_overlap) { - ceph_release_page_vector(pages, page_count); - rbd_obj_request_submit(orig_request); - return; + fctx->iter = *fctx->pos; + for (i = 0; i < num_img_extents; i++) { + ret = ceph_file_to_extents(&rbd_dev->layout, + img_extents[i].fe_off, + img_extents[i].fe_len, + &img_req->object_extents, + alloc_object_extent, img_req, + fctx->count_fn, &fctx->iter); + if (ret) + return ret; } - if (img_result) - goto out_err; + for_each_obj_request(img_req, obj_req) { + obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count, + sizeof(*obj_req->bvec_pos.bvecs), + GFP_NOIO); + if (!obj_req->bvec_pos.bvecs) + return -ENOMEM; + } /* - * The original osd request is of no use to use any more. - * We need a new one that can hold the three ops in a copyup - * request. Allocate the new copyup osd request for the - * original request, and release the old one. + * Fill in each object request's private bio_vec array, splitting and + * rearranging the provided bio_vecs in stripe unit chunks as needed. */ - img_result = -ENOMEM; - osd_req = rbd_osd_req_create_copyup(orig_request); - if (!osd_req) - goto out_err; - rbd_osd_req_destroy(orig_request->osd_req); - orig_request->osd_req = osd_req; - orig_request->copyup_pages = pages; - orig_request->copyup_page_count = page_count; + fctx->iter = *fctx->pos; + for (i = 0; i < num_img_extents; i++) { + ret = ceph_iterate_extents(&rbd_dev->layout, + img_extents[i].fe_off, + img_extents[i].fe_len, + &img_req->object_extents, + fctx->copy_fn, &fctx->iter); + if (ret) + return ret; + } - /* Initialize the copyup op */ + return __rbd_img_fill_request(img_req); +} - osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup"); - osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0, - false, false); +static int rbd_img_fill_nodata(struct rbd_img_request *img_req, + u64 off, u64 len) +{ + struct ceph_file_extent ex = { off, len }; + union rbd_img_fill_iter dummy; + struct rbd_img_fill_ctx fctx = { + .pos_type = OBJ_REQUEST_NODATA, + .pos = &dummy, + }; - /* Add the other op(s) */ + return rbd_img_fill_request(img_req, &ex, 1, &fctx); +} - op_type = rbd_img_request_op_type(orig_request->img_request); - rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1); +static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bio_iter *it = arg; - /* All set, send it off. */ + dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); + obj_req->bio_pos = *it; + ceph_bio_iter_advance(it, bytes); +} - rbd_obj_request_submit(orig_request); - return; +static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bio_iter *it = arg; + + dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); + ceph_bio_iter_advance_step(it, bytes, ({ + obj_req->bvec_count++; + })); -out_err: - ceph_release_page_vector(pages, page_count); - rbd_obj_request_error(orig_request, img_result); } -/* - * Read from the parent image the range of data that covers the - * entire target of the given object request. This is used for - * satisfying a layered image write request when the target of an - * object request from the image request does not exist. - * - * A page array big enough to hold the returned data is allocated - * and supplied to rbd_img_request_fill() as the "data descriptor." - * When the read completes, this page array will be transferred to - * the original object request for the copyup operation. - * - * If an error occurs, it is recorded as the result of the original - * object request in rbd_img_obj_exists_callback(). - */ -static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) -{ - struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; - struct rbd_img_request *parent_request = NULL; - u64 img_offset; - u64 length; - struct page **pages = NULL; - u32 page_count; - int result; +static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bio_iter *it = arg; - rbd_assert(rbd_dev->parent != NULL); + dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); + ceph_bio_iter_advance_step(it, bytes, ({ + obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; + obj_req->bvec_pos.iter.bi_size += bv.bv_len; + })); +} - /* - * Determine the byte range covered by the object in the - * child image to which the original request was to be sent. - */ - img_offset = obj_request->img_offset - obj_request->offset; - length = rbd_obj_bytes(&rbd_dev->header); +static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct ceph_bio_iter *bio_pos) +{ + struct rbd_img_fill_ctx fctx = { + .pos_type = OBJ_REQUEST_BIO, + .pos = (union rbd_img_fill_iter *)bio_pos, + .set_pos_fn = set_bio_pos, + .count_fn = count_bio_bvecs, + .copy_fn = copy_bio_bvecs, + }; - /* - * There is no defined parent data beyond the parent - * overlap, so limit what we read at that boundary if - * necessary. - */ - if (img_offset + length > rbd_dev->parent_overlap) { - rbd_assert(img_offset < rbd_dev->parent_overlap); - length = rbd_dev->parent_overlap - img_offset; - } + return rbd_img_fill_request(img_req, img_extents, num_img_extents, + &fctx); +} - /* - * Allocate a page array big enough to receive the data read - * from the parent. - */ - page_count = (u32)calc_pages_for(0, length); - pages = ceph_alloc_page_vector(page_count, GFP_NOIO); - if (IS_ERR(pages)) { - result = PTR_ERR(pages); - pages = NULL; - goto out_err; - } +static int rbd_img_fill_from_bio(struct rbd_img_request *img_req, + u64 off, u64 len, struct bio *bio) +{ + struct ceph_file_extent ex = { off, len }; + struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter }; - result = -ENOMEM; - parent_request = rbd_parent_request_create(obj_request, - img_offset, length); - if (!parent_request) - goto out_err; + return __rbd_img_fill_from_bio(img_req, &ex, 1, &it); +} - result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); - if (result) - goto out_err; +static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bvec_iter *it = arg; - parent_request->copyup_pages = pages; - parent_request->copyup_page_count = page_count; - parent_request->callback = rbd_img_obj_parent_read_full_callback; + obj_req->bvec_pos = *it; + ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes); + ceph_bvec_iter_advance(it, bytes); +} - result = rbd_img_request_submit(parent_request); - if (!result) - return 0; +static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bvec_iter *it = arg; - parent_request->copyup_pages = NULL; - parent_request->copyup_page_count = 0; -out_err: - if (pages) - ceph_release_page_vector(pages, page_count); - if (parent_request) - rbd_img_request_put(parent_request); - return result; + ceph_bvec_iter_advance_step(it, bytes, ({ + obj_req->bvec_count++; + })); } -static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) +static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) { - struct rbd_obj_request *orig_request; - struct rbd_device *rbd_dev; - int result; + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bvec_iter *it = arg; - rbd_assert(!obj_request_img_data_test(obj_request)); + ceph_bvec_iter_advance_step(it, bytes, ({ + obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; + obj_req->bvec_pos.iter.bi_size += bv.bv_len; + })); +} - /* - * All we need from the object request is the original - * request and the result of the STAT op. Grab those, then - * we're done with the request. - */ - orig_request = obj_request->obj_request; - obj_request->obj_request = NULL; - rbd_obj_request_put(orig_request); - rbd_assert(orig_request); - rbd_assert(orig_request->img_request); - - result = obj_request->result; - obj_request->result = 0; - - dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__, - obj_request, orig_request, result, - obj_request->xferred, obj_request->length); - rbd_obj_request_put(obj_request); +static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct ceph_bvec_iter *bvec_pos) +{ + struct rbd_img_fill_ctx fctx = { + .pos_type = OBJ_REQUEST_BVECS, + .pos = (union rbd_img_fill_iter *)bvec_pos, + .set_pos_fn = set_bvec_pos, + .count_fn = count_bvecs, + .copy_fn = copy_bvecs, + }; - /* - * If the overlap has become 0 (most likely because the - * image has been flattened) we need to re-submit the - * original request. - */ - rbd_dev = orig_request->img_request->rbd_dev; - if (!rbd_dev->parent_overlap) { - rbd_obj_request_submit(orig_request); - return; - } + return rbd_img_fill_request(img_req, img_extents, num_img_extents, + &fctx); +} - /* - * Our only purpose here is to determine whether the object - * exists, and we don't want to treat the non-existence as - * an error. If something else comes back, transfer the - * error to the original request and complete it now. - */ - if (!result) { - obj_request_existence_set(orig_request, true); - } else if (result == -ENOENT) { - obj_request_existence_set(orig_request, false); - } else { - goto fail_orig_request; - } +static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct bio_vec *bvecs) +{ + struct ceph_bvec_iter it = { + .bvecs = bvecs, + .iter = { .bi_size = ceph_file_extents_bytes(img_extents, + num_img_extents) }, + }; - /* - * Resubmit the original request now that we have recorded - * whether the target object exists. - */ - result = rbd_img_obj_request_submit(orig_request); - if (result) - goto fail_orig_request; + return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents, + &it); +} - return; +static void rbd_img_request_submit(struct rbd_img_request *img_request) +{ + struct rbd_obj_request *obj_request; + + dout("%s: img %p\n", __func__, img_request); + + rbd_img_request_get(img_request); + for_each_obj_request(img_request, obj_request) + rbd_obj_request_submit(obj_request); -fail_orig_request: - rbd_obj_request_error(orig_request, result); + rbd_img_request_put(img_request); } -static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) +static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req) { - struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; - struct rbd_obj_request *stat_request; - struct page **pages; - u32 page_count; - size_t size; + struct rbd_img_request *img_req = obj_req->img_request; + struct rbd_img_request *child_img_req; int ret; - stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES); - if (!stat_request) + child_img_req = rbd_img_request_create(img_req->rbd_dev->parent, + OBJ_OP_READ, NULL); + if (!child_img_req) return -ENOMEM; - stat_request->object_no = obj_request->object_no; + __set_bit(IMG_REQ_CHILD, &child_img_req->flags); + child_img_req->obj_request = obj_req; - stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, - stat_request); - if (!stat_request->osd_req) { - ret = -ENOMEM; - goto fail_stat_request; + if (!rbd_img_is_write(img_req)) { + switch (img_req->data_type) { + case OBJ_REQUEST_BIO: + ret = __rbd_img_fill_from_bio(child_img_req, + obj_req->img_extents, + obj_req->num_img_extents, + &obj_req->bio_pos); + break; + case OBJ_REQUEST_BVECS: + case OBJ_REQUEST_OWN_BVECS: + ret = __rbd_img_fill_from_bvecs(child_img_req, + obj_req->img_extents, + obj_req->num_img_extents, + &obj_req->bvec_pos); + break; + default: + rbd_assert(0); + } + } else { + ret = rbd_img_fill_from_bvecs(child_img_req, + obj_req->img_extents, + obj_req->num_img_extents, + obj_req->copyup_bvecs); + } + if (ret) { + rbd_img_request_put(child_img_req); + return ret; + } + + rbd_img_request_submit(child_img_req); + return 0; +} + +static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; + + if (obj_req->result == -ENOENT && + rbd_dev->parent_overlap && !obj_req->tried_parent) { + /* reverse map this object extent onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, false); + if (ret) { + obj_req->result = ret; + return true; + } + + if (obj_req->num_img_extents) { + obj_req->tried_parent = true; + ret = rbd_obj_read_from_parent(obj_req); + if (ret) { + obj_req->result = ret; + return true; + } + return false; + } } /* - * The response data for a STAT call consists of: - * le64 length; - * struct { - * le32 tv_sec; - * le32 tv_nsec; - * } mtime; + * -ENOENT means a hole in the image -- zero-fill the entire + * length of the request. A short read also implies zero-fill + * to the end of the request. In both cases we update xferred + * count to indicate the whole request was satisfied. */ - size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); - page_count = (u32)calc_pages_for(0, size); - pages = ceph_alloc_page_vector(page_count, GFP_NOIO); - if (IS_ERR(pages)) { - ret = PTR_ERR(pages); - goto fail_stat_request; + if (obj_req->result == -ENOENT || + (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) { + rbd_assert(!obj_req->xferred || !obj_req->result); + rbd_obj_zero_range(obj_req, obj_req->xferred, + obj_req->ex.oe_len - obj_req->xferred); + obj_req->result = 0; + obj_req->xferred = obj_req->ex.oe_len; } - osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); - osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, - false, false); - - rbd_obj_request_get(obj_request); - stat_request->obj_request = obj_request; - stat_request->pages = pages; - stat_request->page_count = page_count; - stat_request->callback = rbd_img_obj_exists_callback; + return true; +} - rbd_obj_request_submit(stat_request); - return 0; +/* + * copyup_bvecs pages are never highmem pages + */ +static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) +{ + struct ceph_bvec_iter it = { + .bvecs = bvecs, + .iter = { .bi_size = bytes }, + }; -fail_stat_request: - rbd_obj_request_put(stat_request); - return ret; + ceph_bvec_iter_advance_step(&it, bytes, ({ + if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0, + bv.bv_len)) + return false; + })); + return true; } -static bool img_obj_request_simple(struct rbd_obj_request *obj_request) +static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) { - struct rbd_img_request *img_request = obj_request->img_request; - struct rbd_device *rbd_dev = img_request->rbd_dev; + unsigned int num_osd_ops = obj_req->osd_req->r_num_ops; - /* Reads */ - if (!img_request_write_test(img_request) && - !img_request_discard_test(img_request)) - return true; - - /* Non-layered writes */ - if (!img_request_layered_test(img_request)) - return true; + dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); + rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); + rbd_osd_req_destroy(obj_req->osd_req); /* - * Layered writes outside of the parent overlap range don't - * share any data with the parent. + * Create a copyup request with the same number of OSD ops as + * the original request. The original request was stat + op(s), + * the new copyup request will be copyup + the same op(s). */ - if (!obj_request_overlaps_parent(obj_request)) - return true; + obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); + if (!obj_req->osd_req) + return -ENOMEM; /* - * Entire-object layered writes - we will overwrite whatever - * parent data there is anyway. + * Only send non-zero copyup data to save some I/O and network + * bandwidth -- zero copyup data is equivalent to the object not + * existing. */ - if (!obj_request->offset && - obj_request->length == rbd_obj_bytes(&rbd_dev->header)) - return true; + if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) { + dout("%s obj_req %p detected zeroes\n", __func__, obj_req); + bytes = 0; + } - /* - * If the object is known to already exist, its parent data has - * already been copied. - */ - if (obj_request_known_test(obj_request) && - obj_request_exists_test(obj_request)) - return true; + osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd", + "copyup"); + osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, + obj_req->copyup_bvecs, bytes); + + switch (obj_req->img_request->op_type) { + case OBJ_OP_WRITE: + __rbd_obj_setup_write(obj_req, 1); + break; + case OBJ_OP_DISCARD: + rbd_assert(!rbd_obj_is_entire(obj_req)); + __rbd_obj_setup_discard(obj_req, 1); + break; + default: + rbd_assert(0); + } - return false; + rbd_obj_request_submit(obj_req); + return 0; } -static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) +static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) { - rbd_assert(obj_request_img_data_test(obj_request)); - rbd_assert(obj_request_type_valid(obj_request->type)); - rbd_assert(obj_request->img_request); + u32 i; - if (img_obj_request_simple(obj_request)) { - rbd_obj_request_submit(obj_request); - return 0; - } + rbd_assert(!obj_req->copyup_bvecs); + obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap); + obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count, + sizeof(*obj_req->copyup_bvecs), + GFP_NOIO); + if (!obj_req->copyup_bvecs) + return -ENOMEM; - /* - * It's a layered write. The target object might exist but - * we may not know that yet. If we know it doesn't exist, - * start by reading the data for the full target object from - * the parent so we can use it for a copyup to the target. - */ - if (obj_request_known_test(obj_request)) - return rbd_img_obj_parent_read_full(obj_request); + for (i = 0; i < obj_req->copyup_bvec_count; i++) { + unsigned int len = min(obj_overlap, (u64)PAGE_SIZE); - /* We don't know whether the target exists. Go find out. */ + obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO); + if (!obj_req->copyup_bvecs[i].bv_page) + return -ENOMEM; + + obj_req->copyup_bvecs[i].bv_offset = 0; + obj_req->copyup_bvecs[i].bv_len = len; + obj_overlap -= len; + } - return rbd_img_obj_exists_submit(obj_request); + rbd_assert(!obj_overlap); + return 0; } -static int rbd_img_request_submit(struct rbd_img_request *img_request) +static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) { - struct rbd_obj_request *obj_request; - struct rbd_obj_request *next_obj_request; - int ret = 0; - - dout("%s: img %p\n", __func__, img_request); + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; - rbd_img_request_get(img_request); - for_each_obj_request_safe(img_request, obj_request, next_obj_request) { - ret = rbd_img_obj_request_submit(obj_request); - if (ret) - goto out_put_ireq; + rbd_assert(obj_req->num_img_extents); + prune_extents(obj_req->img_extents, &obj_req->num_img_extents, + rbd_dev->parent_overlap); + if (!obj_req->num_img_extents) { + /* + * The overlap has become 0 (most likely because the + * image has been flattened). Use rbd_obj_issue_copyup() + * to re-submit the original write request -- the copyup + * operation itself will be a no-op, since someone must + * have populated the child object while we weren't + * looking. Move to WRITE_FLAT state as we'll be done + * with the operation once the null copyup completes. + */ + obj_req->write_state = RBD_OBJ_WRITE_FLAT; + return rbd_obj_issue_copyup(obj_req, 0); } -out_put_ireq: - rbd_img_request_put(img_request); - return ret; + ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req)); + if (ret) + return ret; + + obj_req->write_state = RBD_OBJ_WRITE_COPYUP; + return rbd_obj_read_from_parent(obj_req); } -static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) +static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req) { - struct rbd_obj_request *obj_request; - struct rbd_device *rbd_dev; - u64 obj_end; - u64 img_xferred; - int img_result; + int ret; - rbd_assert(img_request_child_test(img_request)); +again: + switch (obj_req->write_state) { + case RBD_OBJ_WRITE_GUARD: + rbd_assert(!obj_req->xferred); + if (obj_req->result == -ENOENT) { + /* + * The target object doesn't exist. Read the data for + * the entire target object up to the overlap point (if + * any) from the parent, so we can use it for a copyup. + */ + ret = rbd_obj_handle_write_guard(obj_req); + if (ret) { + obj_req->result = ret; + return true; + } + return false; + } + /* fall through */ + case RBD_OBJ_WRITE_FLAT: + if (!obj_req->result) + /* + * There is no such thing as a successful short + * write -- indicate the whole request was satisfied. + */ + obj_req->xferred = obj_req->ex.oe_len; + return true; + case RBD_OBJ_WRITE_COPYUP: + obj_req->write_state = RBD_OBJ_WRITE_GUARD; + if (obj_req->result) + goto again; - /* First get what we need from the image request and release it */ + rbd_assert(obj_req->xferred); + ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred); + if (ret) { + obj_req->result = ret; + return true; + } + return false; + default: + rbd_assert(0); + } +} - obj_request = img_request->obj_request; - img_xferred = img_request->xferred; - img_result = img_request->result; - rbd_img_request_put(img_request); +/* + * Returns true if @obj_req is completed, or false otherwise. + */ +static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req) +{ + switch (obj_req->img_request->op_type) { + case OBJ_OP_READ: + return rbd_obj_handle_read(obj_req); + case OBJ_OP_WRITE: + return rbd_obj_handle_write(obj_req); + case OBJ_OP_DISCARD: + if (rbd_obj_handle_write(obj_req)) { + /* + * Hide -ENOENT from delete/truncate/zero -- discarding + * a non-existent object is not a problem. + */ + if (obj_req->result == -ENOENT) { + obj_req->result = 0; + obj_req->xferred = obj_req->ex.oe_len; + } + return true; + } + return false; + default: + rbd_assert(0); + } +} - /* - * If the overlap has become 0 (most likely because the - * image has been flattened) we need to re-submit the - * original request. - */ - rbd_assert(obj_request); - rbd_assert(obj_request->img_request); - rbd_dev = obj_request->img_request->rbd_dev; - if (!rbd_dev->parent_overlap) { - rbd_obj_request_submit(obj_request); +static void rbd_obj_end_request(struct rbd_obj_request *obj_req) +{ + struct rbd_img_request *img_req = obj_req->img_request; + + rbd_assert((!obj_req->result && + obj_req->xferred == obj_req->ex.oe_len) || + (obj_req->result < 0 && !obj_req->xferred)); + if (!obj_req->result) { + img_req->xferred += obj_req->xferred; return; } - obj_request->result = img_result; - if (obj_request->result) - goto out; + rbd_warn(img_req->rbd_dev, + "%s at objno %llu %llu~%llu result %d xferred %llu", + obj_op_name(img_req->op_type), obj_req->ex.oe_objno, + obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result, + obj_req->xferred); + if (!img_req->result) { + img_req->result = obj_req->result; + img_req->xferred = 0; + } +} - /* - * We need to zero anything beyond the parent overlap - * boundary. Since rbd_img_obj_request_read_callback() - * will zero anything beyond the end of a short read, an - * easy way to do this is to pretend the data from the - * parent came up short--ending at the overlap boundary. - */ - rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length); - obj_end = obj_request->img_offset + obj_request->length; - if (obj_end > rbd_dev->parent_overlap) { - u64 xferred = 0; +static void rbd_img_end_child_request(struct rbd_img_request *img_req) +{ + struct rbd_obj_request *obj_req = img_req->obj_request; - if (obj_request->img_offset < rbd_dev->parent_overlap) - xferred = rbd_dev->parent_overlap - - obj_request->img_offset; + rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags)); + rbd_assert((!img_req->result && + img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) || + (img_req->result < 0 && !img_req->xferred)); - obj_request->xferred = min(img_xferred, xferred); - } else { - obj_request->xferred = img_xferred; - } -out: - rbd_img_obj_request_read_callback(obj_request); - rbd_obj_request_complete(obj_request); + obj_req->result = img_req->result; + obj_req->xferred = img_req->xferred; + rbd_img_request_put(img_req); } -static void rbd_img_parent_read(struct rbd_obj_request *obj_request) +static void rbd_img_end_request(struct rbd_img_request *img_req) { - struct rbd_img_request *img_request; - int result; + rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); + rbd_assert((!img_req->result && + img_req->xferred == blk_rq_bytes(img_req->rq)) || + (img_req->result < 0 && !img_req->xferred)); - rbd_assert(obj_request_img_data_test(obj_request)); - rbd_assert(obj_request->img_request != NULL); - rbd_assert(obj_request->result == (s32) -ENOENT); - rbd_assert(obj_request_type_valid(obj_request->type)); + blk_mq_end_request(img_req->rq, + errno_to_blk_status(img_req->result)); + rbd_img_request_put(img_req); +} - /* rbd_read_finish(obj_request, obj_request->length); */ - img_request = rbd_parent_request_create(obj_request, - obj_request->img_offset, - obj_request->length); - result = -ENOMEM; - if (!img_request) - goto out_err; +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req) +{ + struct rbd_img_request *img_req; - if (obj_request->type == OBJ_REQUEST_BIO) - result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, - obj_request->bio_list); - else - result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES, - obj_request->pages); - if (result) - goto out_err; +again: + if (!__rbd_obj_handle_request(obj_req)) + return; - img_request->callback = rbd_img_parent_read_callback; - result = rbd_img_request_submit(img_request); - if (result) - goto out_err; + img_req = obj_req->img_request; + spin_lock(&img_req->completion_lock); + rbd_obj_end_request(obj_req); + rbd_assert(img_req->pending_count); + if (--img_req->pending_count) { + spin_unlock(&img_req->completion_lock); + return; + } - return; -out_err: - if (img_request) - rbd_img_request_put(img_request); - obj_request->result = result; - obj_request->xferred = 0; - obj_request_done_set(obj_request); + spin_unlock(&img_req->completion_lock); + if (test_bit(IMG_REQ_CHILD, &img_req->flags)) { + obj_req = img_req->obj_request; + rbd_img_end_child_request(img_req); + goto again; + } + rbd_img_end_request(img_req); } static const struct rbd_client_id rbd_empty_cid; @@ -3091,8 +2674,8 @@ static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_client_id cid = rbd_get_cid(rbd_dev); - int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN; - char buf[buf_size]; + char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN]; + int buf_size = sizeof(buf); void *p = buf; dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); @@ -3610,8 +3193,8 @@ static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, u64 cookie, s32 *result) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN; - char buf[buf_size]; + char buf[4 + CEPH_ENCODING_START_BLK_LEN]; + int buf_size = sizeof(buf); int ret; if (result) { @@ -3887,7 +3470,7 @@ static void rbd_reregister_watch(struct work_struct *work) ret = rbd_dev_refresh(rbd_dev); if (ret) - rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); + rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret); } /* @@ -4070,8 +3653,7 @@ static void rbd_queue_workfn(struct work_struct *work) } } - img_request = rbd_img_request_create(rbd_dev, offset, length, op_type, - snapc); + img_request = rbd_img_request_create(rbd_dev, op_type, snapc); if (!img_request) { result = -ENOMEM; goto err_unlock; @@ -4080,18 +3662,14 @@ static void rbd_queue_workfn(struct work_struct *work) snapc = NULL; /* img_request consumes a ref */ if (op_type == OBJ_OP_DISCARD) - result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA, - NULL); + result = rbd_img_fill_nodata(img_request, offset, length); else - result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, - rq->bio); - if (result) - goto err_img_request; - - result = rbd_img_request_submit(img_request); + result = rbd_img_fill_from_bio(img_request, offset, length, + rq->bio); if (result) goto err_img_request; + rbd_img_request_submit(img_request); if (must_be_locked) up_read(&rbd_dev->lock_rwsem); return; @@ -4369,7 +3947,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); q->limits.max_sectors = queue_max_hw_sectors(q); blk_queue_max_segments(q, USHRT_MAX); - blk_queue_max_segment_size(q, segment_size); + blk_queue_max_segment_size(q, UINT_MAX); blk_queue_io_min(q, segment_size); blk_queue_io_opt(q, segment_size); @@ -5057,9 +4635,6 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) } __attribute__ ((packed)) striping_info_buf = { 0 }; size_t size = sizeof (striping_info_buf); void *p; - u64 obj_size; - u64 stripe_unit; - u64 stripe_count; int ret; ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, @@ -5071,31 +4646,9 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) if (ret < size) return -ERANGE; - /* - * We don't actually support the "fancy striping" feature - * (STRIPINGV2) yet, but if the striping sizes are the - * defaults the behavior is the same as before. So find - * out, and only fail if the image has non-default values. - */ - ret = -EINVAL; - obj_size = rbd_obj_bytes(&rbd_dev->header); p = &striping_info_buf; - stripe_unit = ceph_decode_64(&p); - if (stripe_unit != obj_size) { - rbd_warn(rbd_dev, "unsupported stripe unit " - "(got %llu want %llu)", - stripe_unit, obj_size); - return -EINVAL; - } - stripe_count = ceph_decode_64(&p); - if (stripe_count != 1) { - rbd_warn(rbd_dev, "unsupported stripe count " - "(got %llu want 1)", stripe_count); - return -EINVAL; - } - rbd_dev->header.stripe_unit = stripe_unit; - rbd_dev->header.stripe_count = stripe_count; - + rbd_dev->header.stripe_unit = ceph_decode_64(&p); + rbd_dev->header.stripe_count = ceph_decode_64(&p); return 0; } @@ -5653,39 +5206,6 @@ out_err: return ret; } -/* - * Return pool id (>= 0) or a negative error code. - */ -static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name) -{ - struct ceph_options *opts = rbdc->client->options; - u64 newest_epoch; - int tries = 0; - int ret; - -again: - ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name); - if (ret == -ENOENT && tries++ < 1) { - ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap", - &newest_epoch); - if (ret < 0) - return ret; - - if (rbdc->client->osdc.osdmap->epoch < newest_epoch) { - ceph_osdc_maybe_request_map(&rbdc->client->osdc); - (void) ceph_monc_wait_osdmap(&rbdc->client->monc, - newest_epoch, - opts->mount_timeout); - goto again; - } else { - /* the osdmap we have is new enough */ - return -ENOENT; - } - } - - return ret; -} - static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) { down_write(&rbd_dev->lock_rwsem); @@ -6114,7 +5634,7 @@ static ssize_t do_rbd_add(struct bus_type *bus, } /* pick the pool */ - rc = rbd_add_get_pool_id(rbdc, spec->pool_name); + rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name); if (rc < 0) { if (rc == -ENOENT) pr_info("pool %s does not exist\n", spec->pool_name); @@ -6366,16 +5886,8 @@ static int rbd_slab_init(void) if (!rbd_obj_request_cache) goto out_err; - rbd_assert(!rbd_bio_clone); - rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0); - if (!rbd_bio_clone) - goto out_err_clone; - return 0; -out_err_clone: - kmem_cache_destroy(rbd_obj_request_cache); - rbd_obj_request_cache = NULL; out_err: kmem_cache_destroy(rbd_img_request_cache); rbd_img_request_cache = NULL; @@ -6391,10 +5903,6 @@ static void rbd_slab_exit(void) rbd_assert(rbd_img_request_cache); kmem_cache_destroy(rbd_img_request_cache); rbd_img_request_cache = NULL; - - rbd_assert(rbd_bio_clone); - bioset_free(rbd_bio_clone); - rbd_bio_clone = NULL; } static int __init rbd_init(void) |