diff options
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r-- | drivers/block/rbd.c | 689 |
1 files changed, 365 insertions, 324 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index b2c98c1bc037..623c84145b79 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -42,6 +42,7 @@ #include <linux/blkdev.h> #include <linux/slab.h> #include <linux/idr.h> +#include <linux/workqueue.h> #include "rbd_types.h" @@ -332,7 +333,10 @@ struct rbd_device { char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ + struct list_head rq_queue; /* incoming rq queue */ spinlock_t lock; /* queue, flags, open_count */ + struct workqueue_struct *rq_wq; + struct work_struct rq_work; struct rbd_image_header header; unsigned long flags; /* possibly lock protected */ @@ -514,7 +518,8 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); static int rbd_dev_refresh(struct rbd_device *rbd_dev); static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev); -static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev); +static int rbd_dev_header_info(struct rbd_device *rbd_dev); +static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev); static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u64 snap_id); static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, @@ -971,12 +976,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, header->snap_names = snap_names; header->snap_sizes = snap_sizes; - /* Make sure mapping size is consistent with header info */ - - if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time) - if (rbd_dev->mapping.size != header->image_size) - rbd_dev->mapping.size = header->image_size; - return 0; out_2big: ret = -EIO; @@ -1139,6 +1138,13 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) rbd_dev->mapping.features = 0; } +static void rbd_segment_name_free(const char *name) +{ + /* The explicit cast here is needed to drop the const qualifier */ + + kmem_cache_free(rbd_segment_name_cache, (void *)name); +} + static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) { char *name; @@ -1158,20 +1164,13 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) { pr_err("error formatting segment name for #%llu (%d)\n", segment, ret); - kfree(name); + rbd_segment_name_free(name); name = NULL; } return name; } -static void rbd_segment_name_free(const char *name) -{ - /* The explicit cast here is needed to drop the const qualifier */ - - kmem_cache_free(rbd_segment_name_cache, (void *)name); -} - static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) { u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; @@ -1371,7 +1370,7 @@ static void obj_request_img_data_set(struct rbd_obj_request *obj_request) struct rbd_device *rbd_dev; rbd_dev = obj_request->img_request->rbd_dev; - rbd_warn(rbd_dev, "obj_request %p already marked img_data\n", + rbd_warn(rbd_dev, "obj_request %p already marked img_data", obj_request); } } @@ -1389,7 +1388,7 @@ static void obj_request_done_set(struct rbd_obj_request *obj_request) if (obj_request_img_data_test(obj_request)) rbd_dev = obj_request->img_request->rbd_dev; - rbd_warn(rbd_dev, "obj_request %p already marked done\n", + rbd_warn(rbd_dev, "obj_request %p already marked done", obj_request); } } @@ -1527,11 +1526,37 @@ static bool obj_request_type_valid(enum obj_request_type type) static int rbd_obj_request_submit(struct ceph_osd_client *osdc, struct rbd_obj_request *obj_request) { - dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request); - + dout("%s %p\n", __func__, obj_request); return ceph_osdc_start_request(osdc, obj_request->osd_req, false); } +static void rbd_obj_request_end(struct rbd_obj_request *obj_request) +{ + dout("%s %p\n", __func__, obj_request); + ceph_osdc_cancel_request(obj_request->osd_req); +} + +/* + * Wait for an object request to complete. If interrupted, cancel the + * underlying osd request. + */ +static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) +{ + int ret; + + dout("%s %p\n", __func__, obj_request); + + ret = wait_for_completion_interruptible(&obj_request->completion); + if (ret < 0) { + dout("%s %p interrupted\n", __func__, obj_request); + rbd_obj_request_end(obj_request); + return ret; + } + + dout("%s %p done\n", __func__, obj_request); + return 0; +} + static void rbd_img_request_complete(struct rbd_img_request *img_request) { @@ -1558,15 +1583,6 @@ static void rbd_img_request_complete(struct rbd_img_request *img_request) rbd_img_request_put(img_request); } -/* Caller is responsible for rbd_obj_request_destroy(obj_request) */ - -static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) -{ - dout("%s: obj %p\n", __func__, obj_request); - - return wait_for_completion_interruptible(&obj_request->completion); -} - /* * The default/initial value for all image request flags is 0. Each * is conditionally set to 1 at image request initialization time @@ -1763,7 +1779,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, rbd_osd_trivial_callback(obj_request); break; default: - rbd_warn(NULL, "%s: unsupported op %hu\n", + rbd_warn(NULL, "%s: unsupported op %hu", obj_request->object_name, (unsigned short) opcode); break; } @@ -1998,7 +2014,7 @@ static void rbd_dev_parent_put(struct rbd_device *rbd_dev) if (!counter) rbd_dev_unparent(rbd_dev); else - rbd_warn(rbd_dev, "parent reference underflow\n"); + rbd_warn(rbd_dev, "parent reference underflow"); } /* @@ -2028,7 +2044,7 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev) /* Image was flattened, but parent is not yet torn down */ if (counter < 0) - rbd_warn(rbd_dev, "parent reference overflow\n"); + rbd_warn(rbd_dev, "parent reference overflow"); return false; } @@ -2045,7 +2061,7 @@ static struct rbd_img_request *rbd_img_request_create( { struct rbd_img_request *img_request; - img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC); + img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO); if (!img_request) return NULL; @@ -2161,11 +2177,11 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) if (result) { struct rbd_device *rbd_dev = img_request->rbd_dev; - rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n", + rbd_warn(rbd_dev, "%s %llx at %llx (%llx)", img_request_write_test(img_request) ? "write" : "read", obj_request->length, obj_request->img_offset, obj_request->offset); - rbd_warn(rbd_dev, " result %d xferred %x\n", + rbd_warn(rbd_dev, " result %d xferred %x", result, xferred); if (!img_request->result) img_request->result = result; @@ -2946,154 +2962,135 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, rbd_dev->header_name, (unsigned long long)notify_id, (unsigned int)opcode); + + /* + * Until adequate refresh error handling is in place, there is + * not much we can do here, except warn. + * + * See http://tracker.ceph.com/issues/5040 + */ ret = rbd_dev_refresh(rbd_dev); if (ret) - rbd_warn(rbd_dev, "header refresh error (%d)\n", ret); + rbd_warn(rbd_dev, "refresh failed: %d", ret); - rbd_obj_notify_ack_sync(rbd_dev, notify_id); + ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id); + if (ret) + rbd_warn(rbd_dev, "notify_ack ret %d", ret); } /* - * Initiate a watch request, synchronously. + * Send a (un)watch request and wait for the ack. Return a request + * with a ref held on success or error. */ -static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) +static struct rbd_obj_request *rbd_obj_watch_request_helper( + struct rbd_device *rbd_dev, + bool watch) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; int ret; - rbd_assert(!rbd_dev->watch_event); - rbd_assert(!rbd_dev->watch_request); - - ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, - &rbd_dev->watch_event); - if (ret < 0) - return ret; - - rbd_assert(rbd_dev->watch_event); - obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, OBJ_REQUEST_NODATA); - if (!obj_request) { - ret = -ENOMEM; - goto out_cancel; - } + if (!obj_request) + return ERR_PTR(-ENOMEM); obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, obj_request); if (!obj_request->osd_req) { ret = -ENOMEM; - goto out_put; + goto out; } - ceph_osdc_set_request_linger(osdc, obj_request->osd_req); - osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, - rbd_dev->watch_event->cookie, 0, 1); + rbd_dev->watch_event->cookie, 0, watch); rbd_osd_req_format_write(obj_request); + if (watch) + ceph_osdc_set_request_linger(osdc, obj_request->osd_req); + ret = rbd_obj_request_submit(osdc, obj_request); if (ret) - goto out_linger; + goto out; ret = rbd_obj_request_wait(obj_request); if (ret) - goto out_linger; + goto out; ret = obj_request->result; - if (ret) - goto out_linger; - - /* - * A watch request is set to linger, so the underlying osd - * request won't go away until we unregister it. We retain - * a pointer to the object request during that time (in - * rbd_dev->watch_request), so we'll keep a reference to - * it. We'll drop that reference (below) after we've - * unregistered it. - */ - rbd_dev->watch_request = obj_request; + if (ret) { + if (watch) + rbd_obj_request_end(obj_request); + goto out; + } - return 0; + return obj_request; -out_linger: - ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req); -out_put: +out: rbd_obj_request_put(obj_request); -out_cancel: - ceph_osdc_cancel_event(rbd_dev->watch_event); - rbd_dev->watch_event = NULL; - - return ret; + return ERR_PTR(ret); } /* - * Tear down a watch request, synchronously. + * Initiate a watch request, synchronously. */ -static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; int ret; - rbd_assert(rbd_dev->watch_event); - rbd_assert(rbd_dev->watch_request); + rbd_assert(!rbd_dev->watch_event); + rbd_assert(!rbd_dev->watch_request); - obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, - OBJ_REQUEST_NODATA); - if (!obj_request) { - ret = -ENOMEM; - goto out_cancel; - } + ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, + &rbd_dev->watch_event); + if (ret < 0) + return ret; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, - obj_request); - if (!obj_request->osd_req) { - ret = -ENOMEM; - goto out_put; + obj_request = rbd_obj_watch_request_helper(rbd_dev, true); + if (IS_ERR(obj_request)) { + ceph_osdc_cancel_event(rbd_dev->watch_event); + rbd_dev->watch_event = NULL; + return PTR_ERR(obj_request); } - osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, - rbd_dev->watch_event->cookie, 0, 0); - rbd_osd_req_format_write(obj_request); - - ret = rbd_obj_request_submit(osdc, obj_request); - if (ret) - goto out_put; + /* + * A watch request is set to linger, so the underlying osd + * request won't go away until we unregister it. We retain + * a pointer to the object request during that time (in + * rbd_dev->watch_request), so we'll keep a reference to it. + * We'll drop that reference after we've unregistered it in + * rbd_dev_header_unwatch_sync(). + */ + rbd_dev->watch_request = obj_request; - ret = rbd_obj_request_wait(obj_request); - if (ret) - goto out_put; + return 0; +} - ret = obj_request->result; - if (ret) - goto out_put; +/* + * Tear down a watch request, synchronously. + */ +static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +{ + struct rbd_obj_request *obj_request; - /* We have successfully torn down the watch request */ + rbd_assert(rbd_dev->watch_event); + rbd_assert(rbd_dev->watch_request); - ceph_osdc_unregister_linger_request(osdc, - rbd_dev->watch_request->osd_req); + rbd_obj_request_end(rbd_dev->watch_request); rbd_obj_request_put(rbd_dev->watch_request); rbd_dev->watch_request = NULL; -out_put: - rbd_obj_request_put(obj_request); -out_cancel: + obj_request = rbd_obj_watch_request_helper(rbd_dev, false); + if (!IS_ERR(obj_request)) + rbd_obj_request_put(obj_request); + else + rbd_warn(rbd_dev, "unable to tear down watch request (%ld)", + PTR_ERR(obj_request)); + ceph_osdc_cancel_event(rbd_dev->watch_event); rbd_dev->watch_event = NULL; - - return ret; -} - -static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) -{ - int ret; - - ret = __rbd_dev_header_unwatch_sync(rbd_dev); - if (ret) { - rbd_warn(rbd_dev, "unable to tear down watch request: %d\n", - ret); - } } /* @@ -3183,102 +3180,129 @@ out: return ret; } -static void rbd_request_fn(struct request_queue *q) - __releases(q->queue_lock) __acquires(q->queue_lock) +static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) { - struct rbd_device *rbd_dev = q->queuedata; - struct request *rq; + struct rbd_img_request *img_request; + u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; + u64 length = blk_rq_bytes(rq); + bool wr = rq_data_dir(rq) == WRITE; int result; - while ((rq = blk_fetch_request(q))) { - bool write_request = rq_data_dir(rq) == WRITE; - struct rbd_img_request *img_request; - u64 offset; - u64 length; + /* Ignore/skip any zero-length requests */ - /* Ignore any non-FS requests that filter through. */ + if (!length) { + dout("%s: zero-length request\n", __func__); + result = 0; + goto err_rq; + } - if (rq->cmd_type != REQ_TYPE_FS) { - dout("%s: non-fs request type %d\n", __func__, - (int) rq->cmd_type); - __blk_end_request_all(rq, 0); - continue; + /* Disallow writes to a read-only device */ + + if (wr) { + if (rbd_dev->mapping.read_only) { + result = -EROFS; + goto err_rq; } + rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); + } - /* Ignore/skip any zero-length requests */ + /* + * Quit early if the mapped snapshot no longer exists. It's + * still possible the snapshot will have disappeared by the + * time our request arrives at the osd, but there's no sense in + * sending it if we already know. + */ + if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { + dout("request for non-existent snapshot"); + rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); + result = -ENXIO; + goto err_rq; + } - offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT; - length = (u64) blk_rq_bytes(rq); + if (offset && length > U64_MAX - offset + 1) { + rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset, + length); + result = -EINVAL; + goto err_rq; /* Shouldn't happen */ + } - if (!length) { - dout("%s: zero-length request\n", __func__); - __blk_end_request_all(rq, 0); - continue; - } + if (offset + length > rbd_dev->mapping.size) { + rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset, + length, rbd_dev->mapping.size); + result = -EIO; + goto err_rq; + } - spin_unlock_irq(q->queue_lock); + img_request = rbd_img_request_create(rbd_dev, offset, length, wr); + if (!img_request) { + result = -ENOMEM; + goto err_rq; + } + img_request->rq = rq; - /* Disallow writes to a read-only device */ + result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio); + if (result) + goto err_img_request; - if (write_request) { - result = -EROFS; - if (rbd_dev->mapping.read_only) - goto end_request; - rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); - } + result = rbd_img_request_submit(img_request); + if (result) + goto err_img_request; - /* - * Quit early if the mapped snapshot no longer - * exists. It's still possible the snapshot will - * have disappeared by the time our request arrives - * at the osd, but there's no sense in sending it if - * we already know. - */ - if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { - dout("request for non-existent snapshot"); - rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); - result = -ENXIO; - goto end_request; - } + return; - result = -EINVAL; - if (offset && length > U64_MAX - offset + 1) { - rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n", - offset, length); - goto end_request; /* Shouldn't happen */ - } +err_img_request: + rbd_img_request_put(img_request); +err_rq: + if (result) + rbd_warn(rbd_dev, "%s %llx at %llx result %d", + wr ? "write" : "read", length, offset, result); + blk_end_request_all(rq, result); +} - result = -EIO; - if (offset + length > rbd_dev->mapping.size) { - rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n", - offset, length, rbd_dev->mapping.size); - goto end_request; - } +static void rbd_request_workfn(struct work_struct *work) +{ + struct rbd_device *rbd_dev = + container_of(work, struct rbd_device, rq_work); + struct request *rq, *next; + LIST_HEAD(requests); - result = -ENOMEM; - img_request = rbd_img_request_create(rbd_dev, offset, length, - write_request); - if (!img_request) - goto end_request; + spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */ + list_splice_init(&rbd_dev->rq_queue, &requests); + spin_unlock_irq(&rbd_dev->lock); - img_request->rq = rq; + list_for_each_entry_safe(rq, next, &requests, queuelist) { + list_del_init(&rq->queuelist); + rbd_handle_request(rbd_dev, rq); + } +} - result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, - rq->bio); - if (!result) - result = rbd_img_request_submit(img_request); - if (result) - rbd_img_request_put(img_request); -end_request: - spin_lock_irq(q->queue_lock); - if (result < 0) { - rbd_warn(rbd_dev, "%s %llx at %llx result %d\n", - write_request ? "write" : "read", - length, offset, result); - - __blk_end_request_all(rq, result); +/* + * Called with q->queue_lock held and interrupts disabled, possibly on + * the way to schedule(). Do not sleep here! + */ +static void rbd_request_fn(struct request_queue *q) +{ + struct rbd_device *rbd_dev = q->queuedata; + struct request *rq; + int queued = 0; + + rbd_assert(rbd_dev); + + while ((rq = blk_fetch_request(q))) { + /* Ignore any non-FS requests that filter through. */ + if (rq->cmd_type != REQ_TYPE_FS) { + dout("%s: non-fs request type %d\n", __func__, + (int) rq->cmd_type); + __blk_end_request_all(rq, 0); + continue; } + + list_add_tail(&rq->queuelist, &rbd_dev->rq_queue); + queued++; } + + if (queued) + queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work); } /* @@ -3517,24 +3541,37 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev) u64 mapping_size; int ret; - rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); down_write(&rbd_dev->header_rwsem); mapping_size = rbd_dev->mapping.size; - if (rbd_dev->image_format == 1) - ret = rbd_dev_v1_header_info(rbd_dev); - else - ret = rbd_dev_v2_header_info(rbd_dev); - /* If it's a mapped snapshot, validate its EXISTS flag */ + ret = rbd_dev_header_info(rbd_dev); + if (ret) + return ret; + + /* + * If there is a parent, see if it has disappeared due to the + * mapped image getting flattened. + */ + if (rbd_dev->parent) { + ret = rbd_dev_v2_parent_info(rbd_dev); + if (ret) + return ret; + } + + if (rbd_dev->spec->snap_id == CEPH_NOSNAP) { + if (rbd_dev->mapping.size != rbd_dev->header.image_size) + rbd_dev->mapping.size = rbd_dev->header.image_size; + } else { + /* validate mapped snapshot's EXISTS flag */ + rbd_exists_validate(rbd_dev); + } - rbd_exists_validate(rbd_dev); up_write(&rbd_dev->header_rwsem); - if (mapping_size != rbd_dev->mapping.size) { + if (mapping_size != rbd_dev->mapping.size) rbd_dev_update_size(rbd_dev); - } - return ret; + return 0; } static int rbd_init_disk(struct rbd_device *rbd_dev) @@ -3696,46 +3733,36 @@ static ssize_t rbd_snap_show(struct device *dev, } /* - * For an rbd v2 image, shows the pool id, image id, and snapshot id - * for the parent image. If there is no parent, simply shows - * "(no parent image)". + * For a v2 image, shows the chain of parent images, separated by empty + * lines. For v1 images or if there is no parent, shows "(no parent + * image)". */ static ssize_t rbd_parent_show(struct device *dev, - struct device_attribute *attr, - char *buf) + struct device_attribute *attr, + char *buf) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - struct rbd_spec *spec = rbd_dev->parent_spec; - int count; - char *bufp = buf; + ssize_t count = 0; - if (!spec) + if (!rbd_dev->parent) return sprintf(buf, "(no parent image)\n"); - count = sprintf(bufp, "pool_id %llu\npool_name %s\n", - (unsigned long long) spec->pool_id, spec->pool_name); - if (count < 0) - return count; - bufp += count; - - count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id, - spec->image_name ? spec->image_name : "(unknown)"); - if (count < 0) - return count; - bufp += count; - - count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n", - (unsigned long long) spec->snap_id, spec->snap_name); - if (count < 0) - return count; - bufp += count; - - count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap); - if (count < 0) - return count; - bufp += count; + for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) { + struct rbd_spec *spec = rbd_dev->parent_spec; + + count += sprintf(&buf[count], "%s" + "pool_id %llu\npool_name %s\n" + "image_id %s\nimage_name %s\n" + "snap_id %llu\nsnap_name %s\n" + "overlap %llu\n", + !count ? "" : "\n", /* first? */ + spec->pool_id, spec->pool_name, + spec->image_id, spec->image_name ?: "(unknown)", + spec->snap_id, spec->snap_name, + rbd_dev->parent_overlap); + } - return (ssize_t) (bufp - buf); + return count; } static ssize_t rbd_image_refresh(struct device *dev, @@ -3748,9 +3775,9 @@ static ssize_t rbd_image_refresh(struct device *dev, ret = rbd_dev_refresh(rbd_dev); if (ret) - rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret); + return ret; - return ret < 0 ? ret : size; + return size; } static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); @@ -3822,6 +3849,9 @@ static struct rbd_spec *rbd_spec_alloc(void) spec = kzalloc(sizeof (*spec), GFP_KERNEL); if (!spec) return NULL; + + spec->pool_id = CEPH_NOPOOL; + spec->snap_id = CEPH_NOSNAP; kref_init(&spec->kref); return spec; @@ -3848,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, return NULL; spin_lock_init(&rbd_dev->lock); + INIT_LIST_HEAD(&rbd_dev->rq_queue); + INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn); rbd_dev->flags = 0; atomic_set(&rbd_dev->parent_ref, 0); INIT_LIST_HEAD(&rbd_dev->node); @@ -4021,7 +4053,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) goto out_err; } - snapid = cpu_to_le64(CEPH_NOSNAP); + snapid = cpu_to_le64(rbd_dev->spec->snap_id); ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, "rbd", "get_parent", &snapid, sizeof (snapid), @@ -4059,7 +4091,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) ret = -EIO; if (pool_id > (u64)U32_MAX) { - rbd_warn(NULL, "parent pool id too large (%llu > %u)\n", + rbd_warn(NULL, "parent pool id too large (%llu > %u)", (unsigned long long)pool_id, U32_MAX); goto out_err; } @@ -4083,6 +4115,8 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) parent_spec->snap_id = snap_id; rbd_dev->parent_spec = parent_spec; parent_spec = NULL; /* rbd_dev now owns this */ + } else { + kfree(image_id); } /* @@ -4110,8 +4144,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) * overlap is zero we just pretend there was * no parent image. */ - rbd_warn(rbd_dev, "ignoring parent of " - "clone with overlap 0\n"); + rbd_warn(rbd_dev, "ignoring parent with overlap 0"); } } out: @@ -4279,18 +4312,38 @@ static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) } /* - * When an rbd image has a parent image, it is identified by the - * pool, image, and snapshot ids (not names). This function fills - * in the names for those ids. (It's OK if we can't figure out the - * name for an image id, but the pool and snapshot ids should always - * exist and have names.) All names in an rbd spec are dynamically - * allocated. + * An image being mapped will have everything but the snap id. + */ +static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev) +{ + struct rbd_spec *spec = rbd_dev->spec; + + rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name); + rbd_assert(spec->image_id && spec->image_name); + rbd_assert(spec->snap_name); + + if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) { + u64 snap_id; + + snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name); + if (snap_id == CEPH_NOSNAP) + return -ENOENT; + + spec->snap_id = snap_id; + } else { + spec->snap_id = CEPH_NOSNAP; + } + + return 0; +} + +/* + * A parent image will have all ids but none of the names. * - * When an image being mapped (not a parent) is probed, we have the - * pool name and pool id, image name and image id, and the snapshot - * name. The only thing we're missing is the snapshot id. + * All names in an rbd spec are dynamically allocated. It's OK if we + * can't figure out the name for an image id. */ -static int rbd_dev_spec_update(struct rbd_device *rbd_dev) +static int rbd_spec_fill_names(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_spec *spec = rbd_dev->spec; @@ -4299,24 +4352,9 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev) const char *snap_name; int ret; - /* - * An image being mapped will have the pool name (etc.), but - * we need to look up the snapshot id. - */ - if (spec->pool_name) { - if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) { - u64 snap_id; - - snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name); - if (snap_id == CEPH_NOSNAP) - return -ENOENT; - spec->snap_id = snap_id; - } else { - spec->snap_id = CEPH_NOSNAP; - } - - return 0; - } + rbd_assert(spec->pool_id != CEPH_NOPOOL); + rbd_assert(spec->image_id); + rbd_assert(spec->snap_id != CEPH_NOSNAP); /* Get the pool name; we have to make our own copy of this */ @@ -4335,7 +4373,7 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev) if (!image_name) rbd_warn(rbd_dev, "unable to get image name"); - /* Look up the snapshot name, and make a copy */ + /* Fetch the snapshot name */ snap_name = rbd_snap_name(rbd_dev, spec->snap_id); if (IS_ERR(snap_name)) { @@ -4348,10 +4386,10 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev) spec->snap_name = snap_name; return 0; + out_err: kfree(image_name); kfree(pool_name); - return ret; } @@ -4483,43 +4521,22 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) return ret; } - /* - * If the image supports layering, get the parent info. We - * need to probe the first time regardless. Thereafter we - * only need to if there's a parent, to see if it has - * disappeared due to the mapped image getting flattened. - */ - if (rbd_dev->header.features & RBD_FEATURE_LAYERING && - (first_time || rbd_dev->parent_spec)) { - bool warn; - - ret = rbd_dev_v2_parent_info(rbd_dev); - if (ret) - return ret; - - /* - * Print a warning if this is the initial probe and - * the image has a parent. Don't print it if the - * image now being probed is itself a parent. We - * can tell at this point because we won't know its - * pool name yet (just its pool id). - */ - warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name; - if (first_time && warn) - rbd_warn(rbd_dev, "WARNING: kernel layering " - "is EXPERIMENTAL!"); - } - - if (rbd_dev->spec->snap_id == CEPH_NOSNAP) - if (rbd_dev->mapping.size != rbd_dev->header.image_size) - rbd_dev->mapping.size = rbd_dev->header.image_size; - ret = rbd_dev_v2_snap_context(rbd_dev); dout("rbd_dev_v2_snap_context returned %d\n", ret); return ret; } +static int rbd_dev_header_info(struct rbd_device *rbd_dev) +{ + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + + if (rbd_dev->image_format == 1) + return rbd_dev_v1_header_info(rbd_dev); + + return rbd_dev_v2_header_info(rbd_dev); +} + static int rbd_bus_add_dev(struct rbd_device *rbd_dev) { struct device *dev; @@ -5066,12 +5083,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) ret = rbd_dev_mapping_set(rbd_dev); if (ret) goto err_out_disk; + set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); + rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0); + if (!rbd_dev->rq_wq) + goto err_out_mapping; + ret = rbd_bus_add_dev(rbd_dev); if (ret) - goto err_out_mapping; + goto err_out_workqueue; /* Everything's ready. Announce the disk to the world. */ @@ -5083,6 +5105,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) return ret; +err_out_workqueue: + destroy_workqueue(rbd_dev->rq_wq); + rbd_dev->rq_wq = NULL; err_out_mapping: rbd_dev_mapping_clear(rbd_dev); err_out_disk: @@ -5155,8 +5180,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping) ret = rbd_dev_image_id(rbd_dev); if (ret) return ret; - rbd_assert(rbd_dev->spec->image_id); - rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); ret = rbd_dev_header_name(rbd_dev); if (ret) @@ -5168,25 +5191,45 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping) goto out_header_name; } - if (rbd_dev->image_format == 1) - ret = rbd_dev_v1_header_info(rbd_dev); - else - ret = rbd_dev_v2_header_info(rbd_dev); + ret = rbd_dev_header_info(rbd_dev); if (ret) goto err_out_watch; - ret = rbd_dev_spec_update(rbd_dev); + /* + * If this image is the one being mapped, we have pool name and + * id, image name and id, and snap name - need to fill snap id. + * Otherwise this is a parent image, identified by pool, image + * and snap ids - need to fill in names for those ids. + */ + if (mapping) + ret = rbd_spec_fill_snap_id(rbd_dev); + else + ret = rbd_spec_fill_names(rbd_dev); if (ret) goto err_out_probe; + if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { + ret = rbd_dev_v2_parent_info(rbd_dev); + if (ret) + goto err_out_probe; + + /* + * Need to warn users if this image is the one being + * mapped and has a parent. + */ + if (mapping && rbd_dev->parent_spec) + rbd_warn(rbd_dev, + "WARNING: kernel layering is EXPERIMENTAL!"); + } + ret = rbd_dev_probe_parent(rbd_dev); if (ret) goto err_out_probe; dout("discovered format %u image, header name is %s\n", rbd_dev->image_format, rbd_dev->header_name); - return 0; + err_out_probe: rbd_dev_unprobe(rbd_dev); err_out_watch: @@ -5199,9 +5242,6 @@ err_out_format: rbd_dev->image_format = 0; kfree(rbd_dev->spec->image_id); rbd_dev->spec->image_id = NULL; - - dout("probe failed, returning %d\n", ret); - return ret; } @@ -5243,7 +5283,7 @@ static ssize_t do_rbd_add(struct bus_type *bus, /* The ceph file layout needs to fit pool id in 32 bits */ if (spec->pool_id > (u64)U32_MAX) { - rbd_warn(NULL, "pool id too large (%llu > %u)\n", + rbd_warn(NULL, "pool id too large (%llu > %u)", (unsigned long long)spec->pool_id, U32_MAX); rc = -EIO; goto err_out_client; @@ -5314,6 +5354,7 @@ static void rbd_dev_device_release(struct device *dev) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + destroy_workqueue(rbd_dev->rq_wq); rbd_free_disk(rbd_dev); clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); rbd_dev_mapping_clear(rbd_dev); |