diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2017-03-01 02:36:09 +0300 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2017-03-01 02:36:09 +0300 |
commit | b2deee2dc06db7cdf99b84346e69bdb9db9baa85 (patch) | |
tree | ceb073fa12c1a9804761ec8ce8911a517b007ed6 /drivers/block/rbd.c | |
parent | d4f4cf77b37eaea58ef863a4cbc95dad3880b524 (diff) | |
parent | 54ea0046b6fe36ec18e82d282a29a18da6cdea0f (diff) | |
download | linux-b2deee2dc06db7cdf99b84346e69bdb9db9baa85.tar.xz |
Merge tag 'ceph-for-4.11-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
"This time around we have:
- support for rbd data-pool feature, which enables rbd images on
erasure-coded pools (myself). CEPH_PG_MAX_SIZE has been bumped to
allow erasure-coded profiles with k+m up to 32.
- a patch for ceph_d_revalidate() performance regression introduced
in 4.9, along with some cleanups in the area (Jeff Layton)
- a set of fixes for unsafe ->d_parent accesses in CephFS (Jeff
Layton)
- buffered reads are now processed in rsize windows instead of rasize
windows (Andreas Gerstmayr). The new default for rsize mount option
is 64M.
- ack vs commit distinction is gone, greatly simplifying ->fsync()
and MOSDOpReply handling code (myself)
... also a few filesystem bug fixes from Zheng, a CRUSH sync up (CRUSH
computations are still serialized though) and several minor fixes and
cleanups all over"
* tag 'ceph-for-4.11-rc1' of git://github.com/ceph/ceph-client: (52 commits)
libceph, rbd, ceph: WRITE | ONDISK -> WRITE
libceph: get rid of ack vs commit
ceph: remove special ack vs commit behavior
ceph: tidy some white space in get_nonsnap_parent()
crush: fix dprintk compilation
crush: do is_out test only if we do not collide
ceph: remove req from unsafe list when unregistering it
rbd: constify device_type structure
rbd: kill obj_request->object_name and rbd_segment_name_cache
rbd: store and use obj_request->object_no
rbd: RBD_V{1,2}_DATA_FORMAT macros
rbd: factor out __rbd_osd_req_create()
rbd: set offset and length outside of rbd_obj_request_create()
rbd: support for data-pool feature
rbd: introduce rbd_init_layout()
rbd: use rbd_obj_bytes() more
rbd: remove now unused rbd_obj_request_wait() and helpers
rbd: switch rbd_obj_method_sync() to ceph_osdc_call()
libceph: pass reply buffer length through ceph_osdc_call()
rbd: do away with obj_request in rbd_obj_read_sync()
...
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r-- | drivers/block/rbd.c | 601 |
1 files changed, 227 insertions, 374 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 362cecc77130..4d6807723798 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -123,9 +123,11 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_FEATURE_LAYERING (1<<0) #define RBD_FEATURE_STRIPINGV2 (1<<1) #define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2) +#define RBD_FEATURE_DATA_POOL (1<<7) #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ RBD_FEATURE_STRIPINGV2 | \ - RBD_FEATURE_EXCLUSIVE_LOCK) + RBD_FEATURE_EXCLUSIVE_LOCK | \ + RBD_FEATURE_DATA_POOL) /* Features supported by this (client software) implementation. */ @@ -144,10 +146,9 @@ struct rbd_image_header { /* These six fields never change for a given rbd image */ char *object_prefix; __u8 obj_order; - __u8 crypt_type; - __u8 comp_type; u64 stripe_unit; u64 stripe_count; + s64 data_pool_id; u64 features; /* Might be changeable someday? */ /* The remaining fields need to be updated occasionally */ @@ -230,7 +231,7 @@ enum obj_req_flags { }; struct rbd_obj_request { - const char *object_name; + u64 object_no; u64 offset; /* object start byte */ u64 length; /* bytes from offset */ unsigned long flags; @@ -438,7 +439,6 @@ static DEFINE_SPINLOCK(rbd_client_list_lock); static struct kmem_cache *rbd_img_request_cache; static struct kmem_cache *rbd_obj_request_cache; -static struct kmem_cache *rbd_segment_name_cache; static int rbd_major; static DEFINE_IDA(rbd_dev_id_ida); @@ -973,6 +973,30 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) } /* + * returns the size of an object in the image + */ +static u32 rbd_obj_bytes(struct rbd_image_header *header) +{ + return 1U << header->obj_order; +} + +static void rbd_init_layout(struct rbd_device *rbd_dev) +{ + if (rbd_dev->header.stripe_unit == 0 || + rbd_dev->header.stripe_count == 0) { + rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header); + rbd_dev->header.stripe_count = 1; + } + + rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit; + rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count; + rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header); + rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ? + rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id; + RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); +} + +/* * Fill an rbd image header with information from the given format 1 * on-disk header. */ @@ -992,15 +1016,11 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, /* Allocate this now to avoid having to handle failure below */ if (first_time) { - size_t len; - - len = strnlen(ondisk->object_prefix, - sizeof (ondisk->object_prefix)); - object_prefix = kmalloc(len + 1, GFP_KERNEL); + object_prefix = kstrndup(ondisk->object_prefix, + sizeof(ondisk->object_prefix), + GFP_KERNEL); if (!object_prefix) return -ENOMEM; - memcpy(object_prefix, ondisk->object_prefix, len); - object_prefix[len] = '\0'; } /* Allocate the snapshot context and fill it in */ @@ -1051,12 +1071,7 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, if (first_time) { header->object_prefix = object_prefix; header->obj_order = ondisk->options.order; - header->crypt_type = ondisk->options.crypt_type; - header->comp_type = ondisk->options.comp_type; - /* The rest aren't used for format 1 images */ - header->stripe_unit = 0; - header->stripe_count = 0; - header->features = 0; + rbd_init_layout(rbd_dev); } else { ceph_put_snap_context(header->snapc); kfree(header->snap_names); @@ -1232,42 +1247,9 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) rbd_dev->mapping.features = 0; } -static void rbd_segment_name_free(const char *name) -{ - /* The explicit cast here is needed to drop the const qualifier */ - - kmem_cache_free(rbd_segment_name_cache, (void *)name); -} - -static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) -{ - char *name; - u64 segment; - int ret; - char *name_format; - - name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO); - if (!name) - return NULL; - segment = offset >> rbd_dev->header.obj_order; - name_format = "%s.%012llx"; - if (rbd_dev->image_format == 2) - name_format = "%s.%016llx"; - ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format, - rbd_dev->header.object_prefix, segment); - if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) { - pr_err("error formatting segment name for #%llu (%d)\n", - segment, ret); - rbd_segment_name_free(name); - name = NULL; - } - - return name; -} - static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) { - u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; + u64 segment_size = rbd_obj_bytes(&rbd_dev->header); return offset & (segment_size - 1); } @@ -1275,7 +1257,7 @@ static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) static u64 rbd_segment_length(struct rbd_device *rbd_dev, u64 offset, u64 length) { - u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; + u64 segment_size = rbd_obj_bytes(&rbd_dev->header); offset &= segment_size - 1; @@ -1287,14 +1269,6 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev, } /* - * returns the size of an object in the image - */ -static u64 rbd_obj_bytes(struct rbd_image_header *header) -{ - return 1 << header->obj_order; -} - -/* * bio helpers */ @@ -1623,7 +1597,9 @@ static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) { struct ceph_osd_request *osd_req = obj_request->osd_req; - dout("%s %p osd_req %p\n", __func__, obj_request, osd_req); + dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, + obj_request, obj_request->object_no, obj_request->offset, + obj_request->length, osd_req); if (obj_request_img_data_test(obj_request)) { WARN_ON(obj_request->callback != rbd_img_obj_callback); rbd_img_request_get(obj_request->img_request); @@ -1631,44 +1607,6 @@ static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); } -static void rbd_obj_request_end(struct rbd_obj_request *obj_request) -{ - dout("%s %p\n", __func__, obj_request); - ceph_osdc_cancel_request(obj_request->osd_req); -} - -/* - * Wait for an object request to complete. If interrupted, cancel the - * underlying osd request. - * - * @timeout: in jiffies, 0 means "wait forever" - */ -static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request, - unsigned long timeout) -{ - long ret; - - dout("%s %p\n", __func__, obj_request); - ret = wait_for_completion_interruptible_timeout( - &obj_request->completion, - ceph_timeout_jiffies(timeout)); - if (ret <= 0) { - if (ret == 0) - ret = -ETIMEDOUT; - rbd_obj_request_end(obj_request); - } else { - ret = 0; - } - - dout("%s %p ret %d\n", __func__, obj_request, (int)ret); - return ret; -} - -static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) -{ - return __rbd_obj_request_wait(obj_request, 0); -} - static void rbd_img_request_complete(struct rbd_img_request *img_request) { @@ -1955,8 +1893,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) rbd_osd_call_callback(obj_request); break; default: - rbd_warn(NULL, "%s: unsupported op %hu", - obj_request->object_name, (unsigned short) opcode); + rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d", + obj_request->object_no, opcode); break; } @@ -1980,6 +1918,40 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) osd_req->r_data_offset = obj_request->offset; } +static struct ceph_osd_request * +__rbd_osd_req_create(struct rbd_device *rbd_dev, + struct ceph_snap_context *snapc, + int num_ops, unsigned int flags, + struct rbd_obj_request *obj_request) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_osd_request *req; + const char *name_format = rbd_dev->image_format == 1 ? + RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; + + req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); + if (!req) + return NULL; + + req->r_flags = flags; + req->r_callback = rbd_osd_req_callback; + req->r_priv = obj_request; + + req->r_base_oloc.pool = rbd_dev->layout.pool_id; + if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, + rbd_dev->header.object_prefix, obj_request->object_no)) + goto err_req; + + if (ceph_osdc_alloc_messages(req, GFP_NOIO)) + goto err_req; + + return req; + +err_req: + ceph_osdc_put_request(req); + return NULL; +} + /* * Create an osd request. A read request has one osd op (read). * A write request has either one (watch) or two (hint+write) osd ops. @@ -1993,8 +1965,6 @@ static struct ceph_osd_request *rbd_osd_req_create( struct rbd_obj_request *obj_request) { struct ceph_snap_context *snapc = NULL; - struct ceph_osd_client *osdc; - struct ceph_osd_request *osd_req; if (obj_request_img_data_test(obj_request) && (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) { @@ -2009,35 +1979,9 @@ static struct ceph_osd_request *rbd_osd_req_create( rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2)); - /* Allocate and initialize the request, for the num_ops ops */ - - osdc = &rbd_dev->rbd_client->client->osdc; - osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, - GFP_NOIO); - if (!osd_req) - goto fail; - - if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) - osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; - else - osd_req->r_flags = CEPH_OSD_FLAG_READ; - - osd_req->r_callback = rbd_osd_req_callback; - osd_req->r_priv = obj_request; - - osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id; - if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s", - obj_request->object_name)) - goto fail; - - if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO)) - goto fail; - - return osd_req; - -fail: - ceph_osdc_put_request(osd_req); - return NULL; + return __rbd_osd_req_create(rbd_dev, snapc, num_ops, + (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ? + CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request); } /* @@ -2050,10 +1994,6 @@ static struct ceph_osd_request * rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) { struct rbd_img_request *img_request; - struct ceph_snap_context *snapc; - struct rbd_device *rbd_dev; - struct ceph_osd_client *osdc; - struct ceph_osd_request *osd_req; int num_osd_ops = 3; rbd_assert(obj_request_img_data_test(obj_request)); @@ -2065,77 +2005,34 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) if (img_request_discard_test(img_request)) num_osd_ops = 2; - /* Allocate and initialize the request, for all the ops */ - - snapc = img_request->snapc; - rbd_dev = img_request->rbd_dev; - osdc = &rbd_dev->rbd_client->client->osdc; - osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops, - false, GFP_NOIO); - if (!osd_req) - goto fail; - - osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; - osd_req->r_callback = rbd_osd_req_callback; - osd_req->r_priv = obj_request; - - osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id; - if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s", - obj_request->object_name)) - goto fail; - - if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO)) - goto fail; - - return osd_req; - -fail: - ceph_osdc_put_request(osd_req); - return NULL; + return __rbd_osd_req_create(img_request->rbd_dev, + img_request->snapc, num_osd_ops, + CEPH_OSD_FLAG_WRITE, obj_request); } - static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) { ceph_osdc_put_request(osd_req); } -/* object_name is assumed to be a non-null pointer and NUL-terminated */ - -static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, - u64 offset, u64 length, - enum obj_request_type type) +static struct rbd_obj_request * +rbd_obj_request_create(enum obj_request_type type) { struct rbd_obj_request *obj_request; - size_t size; - char *name; rbd_assert(obj_request_type_valid(type)); - size = strlen(object_name) + 1; - name = kmalloc(size, GFP_NOIO); - if (!name) - return NULL; - obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); - if (!obj_request) { - kfree(name); + if (!obj_request) return NULL; - } - obj_request->object_name = memcpy(name, object_name, size); - obj_request->offset = offset; - obj_request->length = length; - obj_request->flags = 0; obj_request->which = BAD_WHICH; obj_request->type = type; INIT_LIST_HEAD(&obj_request->links); init_completion(&obj_request->completion); kref_init(&obj_request->kref); - dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name, - offset, length, (int)type, obj_request); - + dout("%s %p\n", __func__, obj_request); return obj_request; } @@ -2170,8 +2067,6 @@ static void rbd_obj_request_destroy(struct kref *kref) break; } - kfree(obj_request->object_name); - obj_request->object_name = NULL; kmem_cache_free(rbd_obj_request_cache, obj_request); } @@ -2546,22 +2441,18 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, while (resid) { struct ceph_osd_request *osd_req; - const char *object_name; - u64 offset; - u64 length; + u64 object_no = img_offset >> rbd_dev->header.obj_order; + u64 offset = rbd_segment_offset(rbd_dev, img_offset); + u64 length = rbd_segment_length(rbd_dev, img_offset, resid); - object_name = rbd_segment_name(rbd_dev, img_offset); - if (!object_name) - goto out_unwind; - offset = rbd_segment_offset(rbd_dev, img_offset); - length = rbd_segment_length(rbd_dev, img_offset, resid); - obj_request = rbd_obj_request_create(object_name, - offset, length, type); - /* object request has its own copy of the object name */ - rbd_segment_name_free(object_name); + obj_request = rbd_obj_request_create(type); if (!obj_request) goto out_unwind; + obj_request->object_no = object_no; + obj_request->offset = offset; + obj_request->length = length; + /* * set obj_request->img_request before creating the * osd_request so that it gets the right snapc @@ -2771,7 +2662,7 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) * child image to which the original request was to be sent. */ img_offset = obj_request->img_offset - obj_request->offset; - length = (u64)1 << rbd_dev->header.obj_order; + length = rbd_obj_bytes(&rbd_dev->header); /* * There is no defined parent data beyond the parent @@ -2900,11 +2791,12 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) size_t size; int ret; - stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, - OBJ_REQUEST_PAGES); + stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES); if (!stat_request) return -ENOMEM; + stat_request->object_no = obj_request->object_no; + stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, stat_request); if (!stat_request->osd_req) { @@ -3983,17 +3875,17 @@ out: * returned in the outbound buffer, or a negative error code. */ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, - const char *object_name, - const char *class_name, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, const char *method_name, const void *outbound, size_t outbound_size, void *inbound, size_t inbound_size) { - struct rbd_obj_request *obj_request; - struct page **pages; - u32 page_count; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct page *req_page = NULL; + struct page *reply_page; int ret; /* @@ -4003,61 +3895,35 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, * method. Currently if this is present it will be a * snapshot id. */ - page_count = (u32)calc_pages_for(0, inbound_size); - pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); - - ret = -ENOMEM; - obj_request = rbd_obj_request_create(object_name, 0, inbound_size, - OBJ_REQUEST_PAGES); - if (!obj_request) - goto out; + if (outbound) { + if (outbound_size > PAGE_SIZE) + return -E2BIG; - obj_request->pages = pages; - obj_request->page_count = page_count; - - obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, - obj_request); - if (!obj_request->osd_req) - goto out; - - osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL, - class_name, method_name); - if (outbound_size) { - struct ceph_pagelist *pagelist; - - pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); - if (!pagelist) - goto out; + req_page = alloc_page(GFP_KERNEL); + if (!req_page) + return -ENOMEM; - ceph_pagelist_init(pagelist); - ceph_pagelist_append(pagelist, outbound, outbound_size); - osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0, - pagelist); + memcpy(page_address(req_page), outbound, outbound_size); } - osd_req_op_cls_response_data_pages(obj_request->osd_req, 0, - obj_request->pages, inbound_size, - 0, false, false); - - rbd_obj_request_submit(obj_request); - ret = rbd_obj_request_wait(obj_request); - if (ret) - goto out; - ret = obj_request->result; - if (ret < 0) - goto out; + reply_page = alloc_page(GFP_KERNEL); + if (!reply_page) { + if (req_page) + __free_page(req_page); + return -ENOMEM; + } - rbd_assert(obj_request->xferred < (u64)INT_MAX); - ret = (int)obj_request->xferred; - ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred); -out: - if (obj_request) - rbd_obj_request_put(obj_request); - else - ceph_release_page_vector(pages, page_count); + ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, + CEPH_OSD_FLAG_READ, req_page, outbound_size, + reply_page, &inbound_size); + if (!ret) { + memcpy(inbound, page_address(reply_page), inbound_size); + ret = inbound_size; + } + if (req_page) + __free_page(req_page); + __free_page(reply_page); return ret; } @@ -4256,63 +4122,46 @@ static void rbd_free_disk(struct rbd_device *rbd_dev) } static int rbd_obj_read_sync(struct rbd_device *rbd_dev, - const char *object_name, - u64 offset, u64 length, void *buf) + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + void *buf, int buf_len) { - struct rbd_obj_request *obj_request; - struct page **pages = NULL; - u32 page_count; - size_t size; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_osd_request *req; + struct page **pages; + int num_pages = calc_pages_for(0, buf_len); int ret; - page_count = (u32) calc_pages_for(offset, length); - pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); - - ret = -ENOMEM; - obj_request = rbd_obj_request_create(object_name, offset, length, - OBJ_REQUEST_PAGES); - if (!obj_request) - goto out; - - obj_request->pages = pages; - obj_request->page_count = page_count; - - obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, - obj_request); - if (!obj_request->osd_req) - goto out; + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); + if (!req) + return -ENOMEM; - osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ, - offset, length, 0, 0); - osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, - obj_request->pages, - obj_request->length, - obj_request->offset & ~PAGE_MASK, - false, false); + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_READ; - rbd_obj_request_submit(obj_request); - ret = rbd_obj_request_wait(obj_request); + ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); if (ret) - goto out; + goto out_req; - ret = obj_request->result; - if (ret < 0) - goto out; + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); + goto out_req; + } - rbd_assert(obj_request->xferred <= (u64) SIZE_MAX); - size = (size_t) obj_request->xferred; - ceph_copy_from_page_vector(pages, buf, 0, size); - rbd_assert(size <= (size_t)INT_MAX); - ret = (int)size; -out: - if (obj_request) - rbd_obj_request_put(obj_request); - else - ceph_release_page_vector(pages, page_count); + osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0); + osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, + true); + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + if (ret >= 0) + ceph_copy_from_page_vector(pages, buf, 0, ret); +out_req: + ceph_osdc_put_request(req); return ret; } @@ -4348,8 +4197,8 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev) if (!ondisk) return -ENOMEM; - ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name, - 0, size, ondisk); + ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, ondisk, size); if (ret < 0) goto out; if ((size_t)ret < size) { @@ -4781,7 +4630,7 @@ static const struct attribute_group *rbd_attr_groups[] = { static void rbd_dev_release(struct device *dev); -static struct device_type rbd_device_type = { +static const struct device_type rbd_device_type = { .name = "rbd", .groups = rbd_attr_groups, .release = rbd_dev_release, @@ -4876,8 +4725,9 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, INIT_LIST_HEAD(&rbd_dev->node); init_rwsem(&rbd_dev->header_rwsem); + rbd_dev->header.data_pool_id = CEPH_NOPOOL; ceph_oid_init(&rbd_dev->header_oid); - ceph_oloc_init(&rbd_dev->header_oloc); + rbd_dev->header_oloc.pool = spec->pool_id; mutex_init(&rbd_dev->watch_mutex); rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; @@ -4899,12 +4749,6 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, rbd_dev->rbd_client = rbdc; rbd_dev->spec = spec; - rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER; - rbd_dev->layout.stripe_count = 1; - rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER; - rbd_dev->layout.pool_id = spec->pool_id; - RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); - return rbd_dev; } @@ -4970,10 +4814,10 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, __le64 size; } __attribute__ ((packed)) size_buf = { 0 }; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_size", - &snapid, sizeof (snapid), - &size_buf, sizeof (size_buf)); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_size", + &snapid, sizeof(snapid), + &size_buf, sizeof(size_buf)); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; @@ -5010,9 +4854,9 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) if (!reply_buf) return -ENOMEM; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_object_prefix", NULL, 0, - reply_buf, RBD_OBJ_PREFIX_LEN_MAX); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_object_prefix", + NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; @@ -5045,10 +4889,10 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, u64 unsup; int ret; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_features", - &snapid, sizeof (snapid), - &features_buf, sizeof (features_buf)); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_features", + &snapid, sizeof(snapid), + &features_buf, sizeof(features_buf)); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; @@ -5107,10 +4951,9 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) } snapid = cpu_to_le64(rbd_dev->spec->snap_id); - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_parent", - &snapid, sizeof (snapid), - reply_buf, size); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_parent", + &snapid, sizeof(snapid), reply_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out_err; @@ -5210,9 +5053,9 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) u64 stripe_count; int ret; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_stripe_unit_count", NULL, 0, - (char *)&striping_info_buf, size); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_stripe_unit_count", + NULL, 0, &striping_info_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; @@ -5226,7 +5069,7 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) * out, and only fail if the image has non-default values. */ ret = -EINVAL; - obj_size = (u64)1 << rbd_dev->header.obj_order; + obj_size = rbd_obj_bytes(&rbd_dev->header); p = &striping_info_buf; stripe_unit = ceph_decode_64(&p); if (stripe_unit != obj_size) { @@ -5247,8 +5090,27 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) return 0; } +static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev) +{ + __le64 data_pool_id; + int ret; + + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_data_pool", + NULL, 0, &data_pool_id, sizeof(data_pool_id)); + if (ret < 0) + return ret; + if (ret < sizeof(data_pool_id)) + return -EBADMSG; + + rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id); + WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL); + return 0; +} + static char *rbd_dev_image_name(struct rbd_device *rbd_dev) { + CEPH_DEFINE_OID_ONSTACK(oid); size_t image_id_size; char *image_id; void *p; @@ -5276,10 +5138,10 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev) if (!reply_buf) goto out; - ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY, - "rbd", "dir_get_name", - image_id, image_id_size, - reply_buf, size); + ceph_oid_printf(&oid, "%s", RBD_DIRECTORY); + ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, + "dir_get_name", image_id, image_id_size, + reply_buf, size); if (ret < 0) goto out; p = reply_buf; @@ -5458,9 +5320,9 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) if (!reply_buf) return -ENOMEM; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_snapcontext", NULL, 0, - reply_buf, size); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_snapcontext", + NULL, 0, reply_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; @@ -5523,10 +5385,9 @@ static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, return ERR_PTR(-ENOMEM); snapid = cpu_to_le64(snap_id); - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_snapshot_name", - &snapid, sizeof (snapid), - reply_buf, size); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_snapshot_name", + &snapid, sizeof(snapid), reply_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) { snap_name = ERR_PTR(ret); @@ -5833,7 +5694,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) { int ret; size_t size; - char *object_name; + CEPH_DEFINE_OID_ONSTACK(oid); void *response; char *image_id; @@ -5853,12 +5714,12 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) * First, see if the format 2 image id file exists, and if * so, get the image's persistent id from it. */ - size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name); - object_name = kmalloc(size, GFP_NOIO); - if (!object_name) - return -ENOMEM; - sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name); - dout("rbd id object name is %s\n", object_name); + ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX, + rbd_dev->spec->image_name); + if (ret) + return ret; + + dout("rbd id object name is %s\n", oid.name); /* Response will be an encoded string, which includes a length */ @@ -5871,9 +5732,9 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) /* If it doesn't exist we'll assume it's a format 1 image */ - ret = rbd_obj_method_sync(rbd_dev, object_name, - "rbd", "get_id", NULL, 0, - response, RBD_IMAGE_ID_LEN_MAX); + ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, + "get_id", NULL, 0, + response, RBD_IMAGE_ID_LEN_MAX); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret == -ENOENT) { image_id = kstrdup("", GFP_KERNEL); @@ -5896,8 +5757,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) } out: kfree(response); - kfree(object_name); - + ceph_oid_destroy(&oid); return ret; } @@ -5944,14 +5804,20 @@ static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev) if (ret < 0) goto out_err; } - /* No support for crypto and compression type format 2 images */ + if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) { + ret = rbd_dev_v2_data_pool(rbd_dev); + if (ret) + goto out_err; + } + + rbd_init_layout(rbd_dev); return 0; + out_err: rbd_dev->header.features = 0; kfree(rbd_dev->header.object_prefix); rbd_dev->header.object_prefix = NULL; - return ret; } @@ -6077,8 +5943,6 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev) /* Record the header object name for this rbd image. */ rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); - - rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id; if (rbd_dev->image_format == 1) ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", spec->image_name, RBD_SUFFIX); @@ -6471,27 +6335,16 @@ static int rbd_slab_init(void) if (!rbd_obj_request_cache) goto out_err; - rbd_assert(!rbd_segment_name_cache); - rbd_segment_name_cache = kmem_cache_create("rbd_segment_name", - CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL); - if (rbd_segment_name_cache) - return 0; -out_err: - kmem_cache_destroy(rbd_obj_request_cache); - rbd_obj_request_cache = NULL; + return 0; +out_err: kmem_cache_destroy(rbd_img_request_cache); rbd_img_request_cache = NULL; - return -ENOMEM; } static void rbd_slab_exit(void) { - rbd_assert(rbd_segment_name_cache); - kmem_cache_destroy(rbd_segment_name_cache); - rbd_segment_name_cache = NULL; - rbd_assert(rbd_obj_request_cache); kmem_cache_destroy(rbd_obj_request_cache); rbd_obj_request_cache = NULL; |