summaryrefslogtreecommitdiff
path: root/drivers/block/rbd.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r--drivers/block/rbd.c689
1 files changed, 365 insertions, 324 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index b2c98c1bc037..623c84145b79 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -42,6 +42,7 @@
#include <linux/blkdev.h>
#include <linux/slab.h>
#include <linux/idr.h>
+#include <linux/workqueue.h>
#include "rbd_types.h"
@@ -332,7 +333,10 @@ struct rbd_device {
char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
+ struct list_head rq_queue; /* incoming rq queue */
spinlock_t lock; /* queue, flags, open_count */
+ struct workqueue_struct *rq_wq;
+ struct work_struct rq_work;
struct rbd_image_header header;
unsigned long flags; /* possibly lock protected */
@@ -514,7 +518,8 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
static int rbd_dev_refresh(struct rbd_device *rbd_dev);
static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
-static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
+static int rbd_dev_header_info(struct rbd_device *rbd_dev);
+static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
u64 snap_id);
static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
@@ -971,12 +976,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
header->snap_names = snap_names;
header->snap_sizes = snap_sizes;
- /* Make sure mapping size is consistent with header info */
-
- if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
- if (rbd_dev->mapping.size != header->image_size)
- rbd_dev->mapping.size = header->image_size;
-
return 0;
out_2big:
ret = -EIO;
@@ -1139,6 +1138,13 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
rbd_dev->mapping.features = 0;
}
+static void rbd_segment_name_free(const char *name)
+{
+ /* The explicit cast here is needed to drop the const qualifier */
+
+ kmem_cache_free(rbd_segment_name_cache, (void *)name);
+}
+
static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
{
char *name;
@@ -1158,20 +1164,13 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
pr_err("error formatting segment name for #%llu (%d)\n",
segment, ret);
- kfree(name);
+ rbd_segment_name_free(name);
name = NULL;
}
return name;
}
-static void rbd_segment_name_free(const char *name)
-{
- /* The explicit cast here is needed to drop the const qualifier */
-
- kmem_cache_free(rbd_segment_name_cache, (void *)name);
-}
-
static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
{
u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
@@ -1371,7 +1370,7 @@ static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
struct rbd_device *rbd_dev;
rbd_dev = obj_request->img_request->rbd_dev;
- rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
+ rbd_warn(rbd_dev, "obj_request %p already marked img_data",
obj_request);
}
}
@@ -1389,7 +1388,7 @@ static void obj_request_done_set(struct rbd_obj_request *obj_request)
if (obj_request_img_data_test(obj_request))
rbd_dev = obj_request->img_request->rbd_dev;
- rbd_warn(rbd_dev, "obj_request %p already marked done\n",
+ rbd_warn(rbd_dev, "obj_request %p already marked done",
obj_request);
}
}
@@ -1527,11 +1526,37 @@ static bool obj_request_type_valid(enum obj_request_type type)
static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
struct rbd_obj_request *obj_request)
{
- dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
-
+ dout("%s %p\n", __func__, obj_request);
return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
}
+static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
+{
+ dout("%s %p\n", __func__, obj_request);
+ ceph_osdc_cancel_request(obj_request->osd_req);
+}
+
+/*
+ * Wait for an object request to complete. If interrupted, cancel the
+ * underlying osd request.
+ */
+static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
+{
+ int ret;
+
+ dout("%s %p\n", __func__, obj_request);
+
+ ret = wait_for_completion_interruptible(&obj_request->completion);
+ if (ret < 0) {
+ dout("%s %p interrupted\n", __func__, obj_request);
+ rbd_obj_request_end(obj_request);
+ return ret;
+ }
+
+ dout("%s %p done\n", __func__, obj_request);
+ return 0;
+}
+
static void rbd_img_request_complete(struct rbd_img_request *img_request)
{
@@ -1558,15 +1583,6 @@ static void rbd_img_request_complete(struct rbd_img_request *img_request)
rbd_img_request_put(img_request);
}
-/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
-
-static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
-{
- dout("%s: obj %p\n", __func__, obj_request);
-
- return wait_for_completion_interruptible(&obj_request->completion);
-}
-
/*
* The default/initial value for all image request flags is 0. Each
* is conditionally set to 1 at image request initialization time
@@ -1763,7 +1779,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
rbd_osd_trivial_callback(obj_request);
break;
default:
- rbd_warn(NULL, "%s: unsupported op %hu\n",
+ rbd_warn(NULL, "%s: unsupported op %hu",
obj_request->object_name, (unsigned short) opcode);
break;
}
@@ -1998,7 +2014,7 @@ static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
if (!counter)
rbd_dev_unparent(rbd_dev);
else
- rbd_warn(rbd_dev, "parent reference underflow\n");
+ rbd_warn(rbd_dev, "parent reference underflow");
}
/*
@@ -2028,7 +2044,7 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
/* Image was flattened, but parent is not yet torn down */
if (counter < 0)
- rbd_warn(rbd_dev, "parent reference overflow\n");
+ rbd_warn(rbd_dev, "parent reference overflow");
return false;
}
@@ -2045,7 +2061,7 @@ static struct rbd_img_request *rbd_img_request_create(
{
struct rbd_img_request *img_request;
- img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
+ img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
if (!img_request)
return NULL;
@@ -2161,11 +2177,11 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
if (result) {
struct rbd_device *rbd_dev = img_request->rbd_dev;
- rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
+ rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
img_request_write_test(img_request) ? "write" : "read",
obj_request->length, obj_request->img_offset,
obj_request->offset);
- rbd_warn(rbd_dev, " result %d xferred %x\n",
+ rbd_warn(rbd_dev, " result %d xferred %x",
result, xferred);
if (!img_request->result)
img_request->result = result;
@@ -2946,154 +2962,135 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
rbd_dev->header_name, (unsigned long long)notify_id,
(unsigned int)opcode);
+
+ /*
+ * Until adequate refresh error handling is in place, there is
+ * not much we can do here, except warn.
+ *
+ * See http://tracker.ceph.com/issues/5040
+ */
ret = rbd_dev_refresh(rbd_dev);
if (ret)
- rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
+ rbd_warn(rbd_dev, "refresh failed: %d", ret);
- rbd_obj_notify_ack_sync(rbd_dev, notify_id);
+ ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
+ if (ret)
+ rbd_warn(rbd_dev, "notify_ack ret %d", ret);
}
/*
- * Initiate a watch request, synchronously.
+ * Send a (un)watch request and wait for the ack. Return a request
+ * with a ref held on success or error.
*/
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
+static struct rbd_obj_request *rbd_obj_watch_request_helper(
+ struct rbd_device *rbd_dev,
+ bool watch)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
int ret;
- rbd_assert(!rbd_dev->watch_event);
- rbd_assert(!rbd_dev->watch_request);
-
- ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
- &rbd_dev->watch_event);
- if (ret < 0)
- return ret;
-
- rbd_assert(rbd_dev->watch_event);
-
obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
OBJ_REQUEST_NODATA);
- if (!obj_request) {
- ret = -ENOMEM;
- goto out_cancel;
- }
+ if (!obj_request)
+ return ERR_PTR(-ENOMEM);
obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
obj_request);
if (!obj_request->osd_req) {
ret = -ENOMEM;
- goto out_put;
+ goto out;
}
- ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
-
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
- rbd_dev->watch_event->cookie, 0, 1);
+ rbd_dev->watch_event->cookie, 0, watch);
rbd_osd_req_format_write(obj_request);
+ if (watch)
+ ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
+
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
- goto out_linger;
+ goto out;
ret = rbd_obj_request_wait(obj_request);
if (ret)
- goto out_linger;
+ goto out;
ret = obj_request->result;
- if (ret)
- goto out_linger;
-
- /*
- * A watch request is set to linger, so the underlying osd
- * request won't go away until we unregister it. We retain
- * a pointer to the object request during that time (in
- * rbd_dev->watch_request), so we'll keep a reference to
- * it. We'll drop that reference (below) after we've
- * unregistered it.
- */
- rbd_dev->watch_request = obj_request;
+ if (ret) {
+ if (watch)
+ rbd_obj_request_end(obj_request);
+ goto out;
+ }
- return 0;
+ return obj_request;
-out_linger:
- ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req);
-out_put:
+out:
rbd_obj_request_put(obj_request);
-out_cancel:
- ceph_osdc_cancel_event(rbd_dev->watch_event);
- rbd_dev->watch_event = NULL;
-
- return ret;
+ return ERR_PTR(ret);
}
/*
- * Tear down a watch request, synchronously.
+ * Initiate a watch request, synchronously.
*/
-static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
int ret;
- rbd_assert(rbd_dev->watch_event);
- rbd_assert(rbd_dev->watch_request);
+ rbd_assert(!rbd_dev->watch_event);
+ rbd_assert(!rbd_dev->watch_request);
- obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
- OBJ_REQUEST_NODATA);
- if (!obj_request) {
- ret = -ENOMEM;
- goto out_cancel;
- }
+ ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
+ &rbd_dev->watch_event);
+ if (ret < 0)
+ return ret;
- obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
- obj_request);
- if (!obj_request->osd_req) {
- ret = -ENOMEM;
- goto out_put;
+ obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
+ if (IS_ERR(obj_request)) {
+ ceph_osdc_cancel_event(rbd_dev->watch_event);
+ rbd_dev->watch_event = NULL;
+ return PTR_ERR(obj_request);
}
- osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
- rbd_dev->watch_event->cookie, 0, 0);
- rbd_osd_req_format_write(obj_request);
-
- ret = rbd_obj_request_submit(osdc, obj_request);
- if (ret)
- goto out_put;
+ /*
+ * A watch request is set to linger, so the underlying osd
+ * request won't go away until we unregister it. We retain
+ * a pointer to the object request during that time (in
+ * rbd_dev->watch_request), so we'll keep a reference to it.
+ * We'll drop that reference after we've unregistered it in
+ * rbd_dev_header_unwatch_sync().
+ */
+ rbd_dev->watch_request = obj_request;
- ret = rbd_obj_request_wait(obj_request);
- if (ret)
- goto out_put;
+ return 0;
+}
- ret = obj_request->result;
- if (ret)
- goto out_put;
+/*
+ * Tear down a watch request, synchronously.
+ */
+static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+{
+ struct rbd_obj_request *obj_request;
- /* We have successfully torn down the watch request */
+ rbd_assert(rbd_dev->watch_event);
+ rbd_assert(rbd_dev->watch_request);
- ceph_osdc_unregister_linger_request(osdc,
- rbd_dev->watch_request->osd_req);
+ rbd_obj_request_end(rbd_dev->watch_request);
rbd_obj_request_put(rbd_dev->watch_request);
rbd_dev->watch_request = NULL;
-out_put:
- rbd_obj_request_put(obj_request);
-out_cancel:
+ obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
+ if (!IS_ERR(obj_request))
+ rbd_obj_request_put(obj_request);
+ else
+ rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
+ PTR_ERR(obj_request));
+
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL;
-
- return ret;
-}
-
-static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
-{
- int ret;
-
- ret = __rbd_dev_header_unwatch_sync(rbd_dev);
- if (ret) {
- rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
- ret);
- }
}
/*
@@ -3183,102 +3180,129 @@ out:
return ret;
}
-static void rbd_request_fn(struct request_queue *q)
- __releases(q->queue_lock) __acquires(q->queue_lock)
+static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
{
- struct rbd_device *rbd_dev = q->queuedata;
- struct request *rq;
+ struct rbd_img_request *img_request;
+ u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
+ u64 length = blk_rq_bytes(rq);
+ bool wr = rq_data_dir(rq) == WRITE;
int result;
- while ((rq = blk_fetch_request(q))) {
- bool write_request = rq_data_dir(rq) == WRITE;
- struct rbd_img_request *img_request;
- u64 offset;
- u64 length;
+ /* Ignore/skip any zero-length requests */
- /* Ignore any non-FS requests that filter through. */
+ if (!length) {
+ dout("%s: zero-length request\n", __func__);
+ result = 0;
+ goto err_rq;
+ }
- if (rq->cmd_type != REQ_TYPE_FS) {
- dout("%s: non-fs request type %d\n", __func__,
- (int) rq->cmd_type);
- __blk_end_request_all(rq, 0);
- continue;
+ /* Disallow writes to a read-only device */
+
+ if (wr) {
+ if (rbd_dev->mapping.read_only) {
+ result = -EROFS;
+ goto err_rq;
}
+ rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
+ }
- /* Ignore/skip any zero-length requests */
+ /*
+ * Quit early if the mapped snapshot no longer exists. It's
+ * still possible the snapshot will have disappeared by the
+ * time our request arrives at the osd, but there's no sense in
+ * sending it if we already know.
+ */
+ if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
+ dout("request for non-existent snapshot");
+ rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
+ result = -ENXIO;
+ goto err_rq;
+ }
- offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
- length = (u64) blk_rq_bytes(rq);
+ if (offset && length > U64_MAX - offset + 1) {
+ rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
+ length);
+ result = -EINVAL;
+ goto err_rq; /* Shouldn't happen */
+ }
- if (!length) {
- dout("%s: zero-length request\n", __func__);
- __blk_end_request_all(rq, 0);
- continue;
- }
+ if (offset + length > rbd_dev->mapping.size) {
+ rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
+ length, rbd_dev->mapping.size);
+ result = -EIO;
+ goto err_rq;
+ }
- spin_unlock_irq(q->queue_lock);
+ img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
+ if (!img_request) {
+ result = -ENOMEM;
+ goto err_rq;
+ }
+ img_request->rq = rq;
- /* Disallow writes to a read-only device */
+ result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
+ if (result)
+ goto err_img_request;
- if (write_request) {
- result = -EROFS;
- if (rbd_dev->mapping.read_only)
- goto end_request;
- rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
- }
+ result = rbd_img_request_submit(img_request);
+ if (result)
+ goto err_img_request;
- /*
- * Quit early if the mapped snapshot no longer
- * exists. It's still possible the snapshot will
- * have disappeared by the time our request arrives
- * at the osd, but there's no sense in sending it if
- * we already know.
- */
- if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
- dout("request for non-existent snapshot");
- rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
- result = -ENXIO;
- goto end_request;
- }
+ return;
- result = -EINVAL;
- if (offset && length > U64_MAX - offset + 1) {
- rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
- offset, length);
- goto end_request; /* Shouldn't happen */
- }
+err_img_request:
+ rbd_img_request_put(img_request);
+err_rq:
+ if (result)
+ rbd_warn(rbd_dev, "%s %llx at %llx result %d",
+ wr ? "write" : "read", length, offset, result);
+ blk_end_request_all(rq, result);
+}
- result = -EIO;
- if (offset + length > rbd_dev->mapping.size) {
- rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
- offset, length, rbd_dev->mapping.size);
- goto end_request;
- }
+static void rbd_request_workfn(struct work_struct *work)
+{
+ struct rbd_device *rbd_dev =
+ container_of(work, struct rbd_device, rq_work);
+ struct request *rq, *next;
+ LIST_HEAD(requests);
- result = -ENOMEM;
- img_request = rbd_img_request_create(rbd_dev, offset, length,
- write_request);
- if (!img_request)
- goto end_request;
+ spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
+ list_splice_init(&rbd_dev->rq_queue, &requests);
+ spin_unlock_irq(&rbd_dev->lock);
- img_request->rq = rq;
+ list_for_each_entry_safe(rq, next, &requests, queuelist) {
+ list_del_init(&rq->queuelist);
+ rbd_handle_request(rbd_dev, rq);
+ }
+}
- result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
- rq->bio);
- if (!result)
- result = rbd_img_request_submit(img_request);
- if (result)
- rbd_img_request_put(img_request);
-end_request:
- spin_lock_irq(q->queue_lock);
- if (result < 0) {
- rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
- write_request ? "write" : "read",
- length, offset, result);
-
- __blk_end_request_all(rq, result);
+/*
+ * Called with q->queue_lock held and interrupts disabled, possibly on
+ * the way to schedule(). Do not sleep here!
+ */
+static void rbd_request_fn(struct request_queue *q)
+{
+ struct rbd_device *rbd_dev = q->queuedata;
+ struct request *rq;
+ int queued = 0;
+
+ rbd_assert(rbd_dev);
+
+ while ((rq = blk_fetch_request(q))) {
+ /* Ignore any non-FS requests that filter through. */
+ if (rq->cmd_type != REQ_TYPE_FS) {
+ dout("%s: non-fs request type %d\n", __func__,
+ (int) rq->cmd_type);
+ __blk_end_request_all(rq, 0);
+ continue;
}
+
+ list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
+ queued++;
}
+
+ if (queued)
+ queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
}
/*
@@ -3517,24 +3541,37 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
u64 mapping_size;
int ret;
- rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
down_write(&rbd_dev->header_rwsem);
mapping_size = rbd_dev->mapping.size;
- if (rbd_dev->image_format == 1)
- ret = rbd_dev_v1_header_info(rbd_dev);
- else
- ret = rbd_dev_v2_header_info(rbd_dev);
- /* If it's a mapped snapshot, validate its EXISTS flag */
+ ret = rbd_dev_header_info(rbd_dev);
+ if (ret)
+ return ret;
+
+ /*
+ * If there is a parent, see if it has disappeared due to the
+ * mapped image getting flattened.
+ */
+ if (rbd_dev->parent) {
+ ret = rbd_dev_v2_parent_info(rbd_dev);
+ if (ret)
+ return ret;
+ }
+
+ if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
+ if (rbd_dev->mapping.size != rbd_dev->header.image_size)
+ rbd_dev->mapping.size = rbd_dev->header.image_size;
+ } else {
+ /* validate mapped snapshot's EXISTS flag */
+ rbd_exists_validate(rbd_dev);
+ }
- rbd_exists_validate(rbd_dev);
up_write(&rbd_dev->header_rwsem);
- if (mapping_size != rbd_dev->mapping.size) {
+ if (mapping_size != rbd_dev->mapping.size)
rbd_dev_update_size(rbd_dev);
- }
- return ret;
+ return 0;
}
static int rbd_init_disk(struct rbd_device *rbd_dev)
@@ -3696,46 +3733,36 @@ static ssize_t rbd_snap_show(struct device *dev,
}
/*
- * For an rbd v2 image, shows the pool id, image id, and snapshot id
- * for the parent image. If there is no parent, simply shows
- * "(no parent image)".
+ * For a v2 image, shows the chain of parent images, separated by empty
+ * lines. For v1 images or if there is no parent, shows "(no parent
+ * image)".
*/
static ssize_t rbd_parent_show(struct device *dev,
- struct device_attribute *attr,
- char *buf)
+ struct device_attribute *attr,
+ char *buf)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- struct rbd_spec *spec = rbd_dev->parent_spec;
- int count;
- char *bufp = buf;
+ ssize_t count = 0;
- if (!spec)
+ if (!rbd_dev->parent)
return sprintf(buf, "(no parent image)\n");
- count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
- (unsigned long long) spec->pool_id, spec->pool_name);
- if (count < 0)
- return count;
- bufp += count;
-
- count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
- spec->image_name ? spec->image_name : "(unknown)");
- if (count < 0)
- return count;
- bufp += count;
-
- count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
- (unsigned long long) spec->snap_id, spec->snap_name);
- if (count < 0)
- return count;
- bufp += count;
-
- count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
- if (count < 0)
- return count;
- bufp += count;
+ for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
+ struct rbd_spec *spec = rbd_dev->parent_spec;
+
+ count += sprintf(&buf[count], "%s"
+ "pool_id %llu\npool_name %s\n"
+ "image_id %s\nimage_name %s\n"
+ "snap_id %llu\nsnap_name %s\n"
+ "overlap %llu\n",
+ !count ? "" : "\n", /* first? */
+ spec->pool_id, spec->pool_name,
+ spec->image_id, spec->image_name ?: "(unknown)",
+ spec->snap_id, spec->snap_name,
+ rbd_dev->parent_overlap);
+ }
- return (ssize_t) (bufp - buf);
+ return count;
}
static ssize_t rbd_image_refresh(struct device *dev,
@@ -3748,9 +3775,9 @@ static ssize_t rbd_image_refresh(struct device *dev,
ret = rbd_dev_refresh(rbd_dev);
if (ret)
- rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
+ return ret;
- return ret < 0 ? ret : size;
+ return size;
}
static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
@@ -3822,6 +3849,9 @@ static struct rbd_spec *rbd_spec_alloc(void)
spec = kzalloc(sizeof (*spec), GFP_KERNEL);
if (!spec)
return NULL;
+
+ spec->pool_id = CEPH_NOPOOL;
+ spec->snap_id = CEPH_NOSNAP;
kref_init(&spec->kref);
return spec;
@@ -3848,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
return NULL;
spin_lock_init(&rbd_dev->lock);
+ INIT_LIST_HEAD(&rbd_dev->rq_queue);
+ INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
rbd_dev->flags = 0;
atomic_set(&rbd_dev->parent_ref, 0);
INIT_LIST_HEAD(&rbd_dev->node);
@@ -4021,7 +4053,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
goto out_err;
}
- snapid = cpu_to_le64(CEPH_NOSNAP);
+ snapid = cpu_to_le64(rbd_dev->spec->snap_id);
ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_parent",
&snapid, sizeof (snapid),
@@ -4059,7 +4091,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
ret = -EIO;
if (pool_id > (u64)U32_MAX) {
- rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
+ rbd_warn(NULL, "parent pool id too large (%llu > %u)",
(unsigned long long)pool_id, U32_MAX);
goto out_err;
}
@@ -4083,6 +4115,8 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
parent_spec->snap_id = snap_id;
rbd_dev->parent_spec = parent_spec;
parent_spec = NULL; /* rbd_dev now owns this */
+ } else {
+ kfree(image_id);
}
/*
@@ -4110,8 +4144,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
* overlap is zero we just pretend there was
* no parent image.
*/
- rbd_warn(rbd_dev, "ignoring parent of "
- "clone with overlap 0\n");
+ rbd_warn(rbd_dev, "ignoring parent with overlap 0");
}
}
out:
@@ -4279,18 +4312,38 @@ static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
}
/*
- * When an rbd image has a parent image, it is identified by the
- * pool, image, and snapshot ids (not names). This function fills
- * in the names for those ids. (It's OK if we can't figure out the
- * name for an image id, but the pool and snapshot ids should always
- * exist and have names.) All names in an rbd spec are dynamically
- * allocated.
+ * An image being mapped will have everything but the snap id.
+ */
+static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
+{
+ struct rbd_spec *spec = rbd_dev->spec;
+
+ rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
+ rbd_assert(spec->image_id && spec->image_name);
+ rbd_assert(spec->snap_name);
+
+ if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
+ u64 snap_id;
+
+ snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
+ if (snap_id == CEPH_NOSNAP)
+ return -ENOENT;
+
+ spec->snap_id = snap_id;
+ } else {
+ spec->snap_id = CEPH_NOSNAP;
+ }
+
+ return 0;
+}
+
+/*
+ * A parent image will have all ids but none of the names.
*
- * When an image being mapped (not a parent) is probed, we have the
- * pool name and pool id, image name and image id, and the snapshot
- * name. The only thing we're missing is the snapshot id.
+ * All names in an rbd spec are dynamically allocated. It's OK if we
+ * can't figure out the name for an image id.
*/
-static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
+static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_spec *spec = rbd_dev->spec;
@@ -4299,24 +4352,9 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
const char *snap_name;
int ret;
- /*
- * An image being mapped will have the pool name (etc.), but
- * we need to look up the snapshot id.
- */
- if (spec->pool_name) {
- if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
- u64 snap_id;
-
- snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
- if (snap_id == CEPH_NOSNAP)
- return -ENOENT;
- spec->snap_id = snap_id;
- } else {
- spec->snap_id = CEPH_NOSNAP;
- }
-
- return 0;
- }
+ rbd_assert(spec->pool_id != CEPH_NOPOOL);
+ rbd_assert(spec->image_id);
+ rbd_assert(spec->snap_id != CEPH_NOSNAP);
/* Get the pool name; we have to make our own copy of this */
@@ -4335,7 +4373,7 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
if (!image_name)
rbd_warn(rbd_dev, "unable to get image name");
- /* Look up the snapshot name, and make a copy */
+ /* Fetch the snapshot name */
snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
if (IS_ERR(snap_name)) {
@@ -4348,10 +4386,10 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
spec->snap_name = snap_name;
return 0;
+
out_err:
kfree(image_name);
kfree(pool_name);
-
return ret;
}
@@ -4483,43 +4521,22 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
return ret;
}
- /*
- * If the image supports layering, get the parent info. We
- * need to probe the first time regardless. Thereafter we
- * only need to if there's a parent, to see if it has
- * disappeared due to the mapped image getting flattened.
- */
- if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
- (first_time || rbd_dev->parent_spec)) {
- bool warn;
-
- ret = rbd_dev_v2_parent_info(rbd_dev);
- if (ret)
- return ret;
-
- /*
- * Print a warning if this is the initial probe and
- * the image has a parent. Don't print it if the
- * image now being probed is itself a parent. We
- * can tell at this point because we won't know its
- * pool name yet (just its pool id).
- */
- warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
- if (first_time && warn)
- rbd_warn(rbd_dev, "WARNING: kernel layering "
- "is EXPERIMENTAL!");
- }
-
- if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
- if (rbd_dev->mapping.size != rbd_dev->header.image_size)
- rbd_dev->mapping.size = rbd_dev->header.image_size;
-
ret = rbd_dev_v2_snap_context(rbd_dev);
dout("rbd_dev_v2_snap_context returned %d\n", ret);
return ret;
}
+static int rbd_dev_header_info(struct rbd_device *rbd_dev)
+{
+ rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+
+ if (rbd_dev->image_format == 1)
+ return rbd_dev_v1_header_info(rbd_dev);
+
+ return rbd_dev_v2_header_info(rbd_dev);
+}
+
static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
{
struct device *dev;
@@ -5066,12 +5083,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
ret = rbd_dev_mapping_set(rbd_dev);
if (ret)
goto err_out_disk;
+
set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
+ rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0);
+ if (!rbd_dev->rq_wq)
+ goto err_out_mapping;
+
ret = rbd_bus_add_dev(rbd_dev);
if (ret)
- goto err_out_mapping;
+ goto err_out_workqueue;
/* Everything's ready. Announce the disk to the world. */
@@ -5083,6 +5105,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
return ret;
+err_out_workqueue:
+ destroy_workqueue(rbd_dev->rq_wq);
+ rbd_dev->rq_wq = NULL;
err_out_mapping:
rbd_dev_mapping_clear(rbd_dev);
err_out_disk:
@@ -5155,8 +5180,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
ret = rbd_dev_image_id(rbd_dev);
if (ret)
return ret;
- rbd_assert(rbd_dev->spec->image_id);
- rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
ret = rbd_dev_header_name(rbd_dev);
if (ret)
@@ -5168,25 +5191,45 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
goto out_header_name;
}
- if (rbd_dev->image_format == 1)
- ret = rbd_dev_v1_header_info(rbd_dev);
- else
- ret = rbd_dev_v2_header_info(rbd_dev);
+ ret = rbd_dev_header_info(rbd_dev);
if (ret)
goto err_out_watch;
- ret = rbd_dev_spec_update(rbd_dev);
+ /*
+ * If this image is the one being mapped, we have pool name and
+ * id, image name and id, and snap name - need to fill snap id.
+ * Otherwise this is a parent image, identified by pool, image
+ * and snap ids - need to fill in names for those ids.
+ */
+ if (mapping)
+ ret = rbd_spec_fill_snap_id(rbd_dev);
+ else
+ ret = rbd_spec_fill_names(rbd_dev);
if (ret)
goto err_out_probe;
+ if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
+ ret = rbd_dev_v2_parent_info(rbd_dev);
+ if (ret)
+ goto err_out_probe;
+
+ /*
+ * Need to warn users if this image is the one being
+ * mapped and has a parent.
+ */
+ if (mapping && rbd_dev->parent_spec)
+ rbd_warn(rbd_dev,
+ "WARNING: kernel layering is EXPERIMENTAL!");
+ }
+
ret = rbd_dev_probe_parent(rbd_dev);
if (ret)
goto err_out_probe;
dout("discovered format %u image, header name is %s\n",
rbd_dev->image_format, rbd_dev->header_name);
-
return 0;
+
err_out_probe:
rbd_dev_unprobe(rbd_dev);
err_out_watch:
@@ -5199,9 +5242,6 @@ err_out_format:
rbd_dev->image_format = 0;
kfree(rbd_dev->spec->image_id);
rbd_dev->spec->image_id = NULL;
-
- dout("probe failed, returning %d\n", ret);
-
return ret;
}
@@ -5243,7 +5283,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
/* The ceph file layout needs to fit pool id in 32 bits */
if (spec->pool_id > (u64)U32_MAX) {
- rbd_warn(NULL, "pool id too large (%llu > %u)\n",
+ rbd_warn(NULL, "pool id too large (%llu > %u)",
(unsigned long long)spec->pool_id, U32_MAX);
rc = -EIO;
goto err_out_client;
@@ -5314,6 +5354,7 @@ static void rbd_dev_device_release(struct device *dev)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+ destroy_workqueue(rbd_dev->rq_wq);
rbd_free_disk(rbd_dev);
clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
rbd_dev_mapping_clear(rbd_dev);