diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2013-05-07 00:11:19 +0400 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2013-05-07 00:11:19 +0400 |
commit | 91f8575685e35f3bd021286bc82d26397458f5a9 (patch) | |
tree | 09de8d889758a12071adb9427ed741e27c907aa6 /drivers/block | |
parent | 2e378f3eebd28feefbb1f9953834a5a19482f053 (diff) | |
parent | b5b09be30cf99f9c699e825629f02e3bce555d44 (diff) | |
download | linux-91f8575685e35f3bd021286bc82d26397458f5a9.tar.xz |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Alex Elder:
"This is a big pull.
Most of it is culmination of Alex's work to implement RBD image
layering, which is now complete (yay!).
There is also some work from Yan to fix i_mutex behavior surrounding
writes in cephfs, a sync write fix, a fix for RBD images that get
resized while they are mapped, and a few patches from me that resolve
annoying auth warnings and fix several bugs in the ceph auth code."
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (254 commits)
rbd: fix image request leak on parent read
libceph: use slab cache for osd client requests
libceph: allocate ceph message data with a slab allocator
libceph: allocate ceph messages with a slab allocator
rbd: allocate image object names with a slab allocator
rbd: allocate object requests with a slab allocator
rbd: allocate name separate from obj_request
rbd: allocate image requests with a slab allocator
rbd: use binary search for snapshot lookup
rbd: clear EXISTS flag if mapped snapshot disappears
rbd: kill off the snapshot list
rbd: define rbd_snap_size() and rbd_snap_features()
rbd: use snap_id not index to look up snap info
rbd: look up snapshot name in names buffer
rbd: drop obj_request->version
rbd: drop rbd_obj_method_sync() version parameter
rbd: more version parameter removal
rbd: get rid of some version parameters
rbd: stop tracking header object version
rbd: snap names are pointer to constant data
...
Diffstat (limited to 'drivers/block')
-rw-r--r-- | drivers/block/rbd.c | 2858 |
1 files changed, 1834 insertions, 1024 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index b7b7a88d9f68..c2ca1818f335 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1,3 +1,4 @@ + /* rbd.c -- Export ceph rados objects as a Linux block device @@ -32,12 +33,14 @@ #include <linux/ceph/mon_client.h> #include <linux/ceph/decode.h> #include <linux/parser.h> +#include <linux/bsearch.h> #include <linux/kernel.h> #include <linux/device.h> #include <linux/module.h> #include <linux/fs.h> #include <linux/blkdev.h> +#include <linux/slab.h> #include "rbd_types.h" @@ -52,13 +55,6 @@ #define SECTOR_SHIFT 9 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT) -/* It might be useful to have these defined elsewhere */ - -#define U8_MAX ((u8) (~0U)) -#define U16_MAX ((u16) (~0U)) -#define U32_MAX ((u32) (~0U)) -#define U64_MAX ((u64) (~0ULL)) - #define RBD_DRV_NAME "rbd" #define RBD_DRV_NAME_LONG "rbd (rados block device)" @@ -72,6 +68,8 @@ #define RBD_SNAP_HEAD_NAME "-" +#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */ + /* This allows a single page to hold an image name sent by OSD */ #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) #define RBD_IMAGE_ID_LEN_MAX 64 @@ -80,11 +78,14 @@ /* Feature bits */ -#define RBD_FEATURE_LAYERING 1 +#define RBD_FEATURE_LAYERING (1<<0) +#define RBD_FEATURE_STRIPINGV2 (1<<1) +#define RBD_FEATURES_ALL \ + (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2) /* Features supported by this (client software) implementation. */ -#define RBD_FEATURES_ALL (0) +#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL) /* * An RBD device name will be "rbd#", where the "rbd" comes from @@ -112,7 +113,8 @@ struct rbd_image_header { char *snap_names; u64 *snap_sizes; - u64 obj_version; + u64 stripe_unit; + u64 stripe_count; }; /* @@ -142,13 +144,13 @@ struct rbd_image_header { */ struct rbd_spec { u64 pool_id; - char *pool_name; + const char *pool_name; - char *image_id; - char *image_name; + const char *image_id; + const char *image_name; u64 snap_id; - char *snap_name; + const char *snap_name; struct kref kref; }; @@ -174,13 +176,44 @@ enum obj_request_type { OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES }; +enum obj_req_flags { + OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */ + OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */ + OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */ + OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ +}; + struct rbd_obj_request { const char *object_name; u64 offset; /* object start byte */ u64 length; /* bytes from offset */ + unsigned long flags; - struct rbd_img_request *img_request; - struct list_head links; /* img_request->obj_requests */ + /* + * An object request associated with an image will have its + * img_data flag set; a standalone object request will not. + * + * A standalone object request will have which == BAD_WHICH + * and a null obj_request pointer. + * + * An object request initiated in support of a layered image + * object (to check for its existence before a write) will + * have which == BAD_WHICH and a non-null obj_request pointer. + * + * Finally, an object request for rbd image data will have + * which != BAD_WHICH, and will have a non-null img_request + * pointer. The value of which will be in the range + * 0..(img_request->obj_request_count-1). + */ + union { + struct rbd_obj_request *obj_request; /* STAT op */ + struct { + struct rbd_img_request *img_request; + u64 img_offset; + /* links for img_request->obj_requests list */ + struct list_head links; + }; + }; u32 which; /* posn image request list */ enum obj_request_type type; @@ -191,13 +224,12 @@ struct rbd_obj_request { u32 page_count; }; }; + struct page **copyup_pages; struct ceph_osd_request *osd_req; u64 xferred; /* bytes transferred */ - u64 version; int result; - atomic_t done; rbd_obj_callback_t callback; struct completion completion; @@ -205,19 +237,31 @@ struct rbd_obj_request { struct kref kref; }; +enum img_req_flags { + IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */ + IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ + IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ +}; + struct rbd_img_request { - struct request *rq; struct rbd_device *rbd_dev; u64 offset; /* starting image byte offset */ u64 length; /* byte count from offset */ - bool write_request; /* false for read */ + unsigned long flags; union { + u64 snap_id; /* for reads */ struct ceph_snap_context *snapc; /* for writes */ - u64 snap_id; /* for reads */ }; + union { + struct request *rq; /* block request */ + struct rbd_obj_request *obj_request; /* obj req initiator */ + }; + struct page **copyup_pages; spinlock_t completion_lock;/* protects next_completion */ u32 next_completion; rbd_img_callback_t callback; + u64 xferred;/* aggregate bytes transferred */ + int result; /* first nonzero obj_request result */ u32 obj_request_count; struct list_head obj_requests; /* rbd_obj_request structs */ @@ -232,15 +276,6 @@ struct rbd_img_request { #define for_each_obj_request_safe(ireq, oreq, n) \ list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) -struct rbd_snap { - struct device dev; - const char *name; - u64 size; - struct list_head node; - u64 id; - u64 features; -}; - struct rbd_mapping { u64 size; u64 features; @@ -276,6 +311,7 @@ struct rbd_device { struct rbd_spec *parent_spec; u64 parent_overlap; + struct rbd_device *parent; /* protects updating the header */ struct rw_semaphore header_rwsem; @@ -284,9 +320,6 @@ struct rbd_device { struct list_head node; - /* list of snapshots */ - struct list_head snaps; - /* sysfs related */ struct device dev; unsigned long open_count; /* protected by lock */ @@ -312,16 +345,21 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock); static LIST_HEAD(rbd_client_list); /* clients */ static DEFINE_SPINLOCK(rbd_client_list_lock); -static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); -static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); +/* Slab caches for frequently-allocated structures */ + +static struct kmem_cache *rbd_img_request_cache; +static struct kmem_cache *rbd_obj_request_cache; +static struct kmem_cache *rbd_segment_name_cache; -static void rbd_dev_release(struct device *dev); -static void rbd_remove_snap_dev(struct rbd_snap *snap); +static int rbd_img_request_submit(struct rbd_img_request *img_request); + +static void rbd_dev_device_release(struct device *dev); static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count); static ssize_t rbd_remove(struct bus_type *bus, const char *buf, size_t count); +static int rbd_dev_image_probe(struct rbd_device *rbd_dev); static struct bus_attribute rbd_bus_attrs[] = { __ATTR(add, S_IWUSR, NULL, rbd_add), @@ -383,8 +421,19 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) # define rbd_assert(expr) ((void) 0) #endif /* !RBD_DEBUG */ -static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver); -static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver); +static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request); +static void rbd_img_parent_read(struct rbd_obj_request *obj_request); +static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); + +static int rbd_dev_refresh(struct rbd_device *rbd_dev); +static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev); +static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, + u64 snap_id); +static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, + u8 *order, u64 *snap_size); +static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, + u64 *snap_features); +static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name); static int rbd_open(struct block_device *bdev, fmode_t mode) { @@ -484,6 +533,13 @@ out_opt: return ERR_PTR(ret); } +static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc) +{ + kref_get(&rbdc->kref); + + return rbdc; +} + /* * Find a ceph client with specific addr and configuration. If * found, bump its reference count. @@ -499,7 +555,8 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) spin_lock(&rbd_client_list_lock); list_for_each_entry(client_node, &rbd_client_list, node) { if (!ceph_compare_options(ceph_opts, client_node->client)) { - kref_get(&client_node->kref); + __rbd_get_client(client_node); + found = true; break; } @@ -722,7 +779,6 @@ static int rbd_header_from_disk(struct rbd_image_header *header, header->snap_sizes[i] = le64_to_cpu(ondisk->snaps[i].image_size); } else { - WARN_ON(ondisk->snap_names_len); header->snap_names = NULL; header->snap_sizes = NULL; } @@ -735,18 +791,13 @@ static int rbd_header_from_disk(struct rbd_image_header *header, /* Allocate and fill in the snapshot context */ header->image_size = le64_to_cpu(ondisk->image_size); - size = sizeof (struct ceph_snap_context); - size += snap_count * sizeof (header->snapc->snaps[0]); - header->snapc = kzalloc(size, GFP_KERNEL); + + header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); if (!header->snapc) goto out_err; - - atomic_set(&header->snapc->nref, 1); header->snapc->seq = le64_to_cpu(ondisk->snap_seq); - header->snapc->num_snaps = snap_count; for (i = 0; i < snap_count; i++) - header->snapc->snaps[i] = - le64_to_cpu(ondisk->snaps[i].id); + header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id); return 0; @@ -761,70 +812,174 @@ out_err: return -ENOMEM; } -static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) +static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which) { - struct rbd_snap *snap; + const char *snap_name; + rbd_assert(which < rbd_dev->header.snapc->num_snaps); + + /* Skip over names until we find the one we are looking for */ + + snap_name = rbd_dev->header.snap_names; + while (which--) + snap_name += strlen(snap_name) + 1; + + return kstrdup(snap_name, GFP_KERNEL); +} + +/* + * Snapshot id comparison function for use with qsort()/bsearch(). + * Note that result is for snapshots in *descending* order. + */ +static int snapid_compare_reverse(const void *s1, const void *s2) +{ + u64 snap_id1 = *(u64 *)s1; + u64 snap_id2 = *(u64 *)s2; + + if (snap_id1 < snap_id2) + return 1; + return snap_id1 == snap_id2 ? 0 : -1; +} + +/* + * Search a snapshot context to see if the given snapshot id is + * present. + * + * Returns the position of the snapshot id in the array if it's found, + * or BAD_SNAP_INDEX otherwise. + * + * Note: The snapshot array is in kept sorted (by the osd) in + * reverse order, highest snapshot id first. + */ +static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id) +{ + struct ceph_snap_context *snapc = rbd_dev->header.snapc; + u64 *found; + + found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps, + sizeof (snap_id), snapid_compare_reverse); + + return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX; +} + +static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, + u64 snap_id) +{ + u32 which; + + which = rbd_dev_snap_index(rbd_dev, snap_id); + if (which == BAD_SNAP_INDEX) + return NULL; + + return _rbd_dev_v1_snap_name(rbd_dev, which); +} + +static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) +{ if (snap_id == CEPH_NOSNAP) return RBD_SNAP_HEAD_NAME; - list_for_each_entry(snap, &rbd_dev->snaps, node) - if (snap_id == snap->id) - return snap->name; + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + if (rbd_dev->image_format == 1) + return rbd_dev_v1_snap_name(rbd_dev, snap_id); - return NULL; + return rbd_dev_v2_snap_name(rbd_dev, snap_id); } -static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name) +static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id, + u64 *snap_size) { + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + if (snap_id == CEPH_NOSNAP) { + *snap_size = rbd_dev->header.image_size; + } else if (rbd_dev->image_format == 1) { + u32 which; - struct rbd_snap *snap; + which = rbd_dev_snap_index(rbd_dev, snap_id); + if (which == BAD_SNAP_INDEX) + return -ENOENT; - list_for_each_entry(snap, &rbd_dev->snaps, node) { - if (!strcmp(snap_name, snap->name)) { - rbd_dev->spec->snap_id = snap->id; - rbd_dev->mapping.size = snap->size; - rbd_dev->mapping.features = snap->features; + *snap_size = rbd_dev->header.snap_sizes[which]; + } else { + u64 size = 0; + int ret; - return 0; - } + ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size); + if (ret) + return ret; + + *snap_size = size; } + return 0; +} + +static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id, + u64 *snap_features) +{ + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + if (snap_id == CEPH_NOSNAP) { + *snap_features = rbd_dev->header.features; + } else if (rbd_dev->image_format == 1) { + *snap_features = 0; /* No features for format 1 */ + } else { + u64 features = 0; + int ret; + + ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features); + if (ret) + return ret; - return -ENOENT; + *snap_features = features; + } + return 0; } -static int rbd_dev_set_mapping(struct rbd_device *rbd_dev) +static int rbd_dev_mapping_set(struct rbd_device *rbd_dev) { + const char *snap_name = rbd_dev->spec->snap_name; + u64 snap_id; + u64 size = 0; + u64 features = 0; int ret; - if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME, - sizeof (RBD_SNAP_HEAD_NAME))) { - rbd_dev->spec->snap_id = CEPH_NOSNAP; - rbd_dev->mapping.size = rbd_dev->header.image_size; - rbd_dev->mapping.features = rbd_dev->header.features; - ret = 0; + if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) { + snap_id = rbd_snap_id_by_name(rbd_dev, snap_name); + if (snap_id == CEPH_NOSNAP) + return -ENOENT; } else { - ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name); - if (ret < 0) - goto done; - rbd_dev->mapping.read_only = true; + snap_id = CEPH_NOSNAP; } - set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); -done: - return ret; + ret = rbd_snap_size(rbd_dev, snap_id, &size); + if (ret) + return ret; + ret = rbd_snap_features(rbd_dev, snap_id, &features); + if (ret) + return ret; + + rbd_dev->mapping.size = size; + rbd_dev->mapping.features = features; + + /* If we are mapping a snapshot it must be marked read-only */ + + if (snap_id != CEPH_NOSNAP) + rbd_dev->mapping.read_only = true; + + return 0; } -static void rbd_header_free(struct rbd_image_header *header) +static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) { - kfree(header->object_prefix); - header->object_prefix = NULL; - kfree(header->snap_sizes); - header->snap_sizes = NULL; - kfree(header->snap_names); - header->snap_names = NULL; - ceph_put_snap_context(header->snapc); - header->snapc = NULL; + rbd_dev->mapping.size = 0; + rbd_dev->mapping.features = 0; + rbd_dev->mapping.read_only = true; +} + +static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev) +{ + rbd_dev->mapping.size = 0; + rbd_dev->mapping.features = 0; + rbd_dev->mapping.read_only = true; } static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) @@ -833,7 +988,7 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) u64 segment; int ret; - name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO); + name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO); if (!name) return NULL; segment = offset >> rbd_dev->header.obj_order; @@ -849,6 +1004,13 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) return name; } +static void rbd_segment_name_free(const char *name) +{ + /* The explicit cast here is needed to drop the const qualifier */ + + kmem_cache_free(rbd_segment_name_cache, (void *)name); +} + static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) { u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; @@ -921,6 +1083,37 @@ static void zero_bio_chain(struct bio *chain, int start_ofs) } /* + * similar to zero_bio_chain(), zeros data defined by a page array, + * starting at the given byte offset from the start of the array and + * continuing up to the given end offset. The pages array is + * assumed to be big enough to hold all bytes up to the end. + */ +static void zero_pages(struct page **pages, u64 offset, u64 end) +{ + struct page **page = &pages[offset >> PAGE_SHIFT]; + + rbd_assert(end > offset); + rbd_assert(end - offset <= (u64)SIZE_MAX); + while (offset < end) { + size_t page_offset; + size_t length; + unsigned long flags; + void *kaddr; + + page_offset = (size_t)(offset & ~PAGE_MASK); + length = min(PAGE_SIZE - page_offset, (size_t)(end - offset)); + local_irq_save(flags); + kaddr = kmap_atomic(*page); + memset(kaddr + page_offset, 0, length); + kunmap_atomic(kaddr); + local_irq_restore(flags); + + offset += length; + page++; + } +} + +/* * Clone a portion of a bio, starting at the given byte offset * and continuing for the number of bytes indicated. */ @@ -1064,6 +1257,77 @@ out_err: return NULL; } +/* + * The default/initial value for all object request flags is 0. For + * each flag, once its value is set to 1 it is never reset to 0 + * again. + */ +static void obj_request_img_data_set(struct rbd_obj_request *obj_request) +{ + if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) { + struct rbd_device *rbd_dev; + + rbd_dev = obj_request->img_request->rbd_dev; + rbd_warn(rbd_dev, "obj_request %p already marked img_data\n", + obj_request); + } +} + +static bool obj_request_img_data_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0; +} + +static void obj_request_done_set(struct rbd_obj_request *obj_request) +{ + if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) { + struct rbd_device *rbd_dev = NULL; + + if (obj_request_img_data_test(obj_request)) + rbd_dev = obj_request->img_request->rbd_dev; + rbd_warn(rbd_dev, "obj_request %p already marked done\n", + obj_request); + } +} + +static bool obj_request_done_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0; +} + +/* + * This sets the KNOWN flag after (possibly) setting the EXISTS + * flag. The latter is set based on the "exists" value provided. + * + * Note that for our purposes once an object exists it never goes + * away again. It's possible that the response from two existence + * checks are separated by the creation of the target object, and + * the first ("doesn't exist") response arrives *after* the second + * ("does exist"). In that case we ignore the second one. + */ +static void obj_request_existence_set(struct rbd_obj_request *obj_request, + bool exists) +{ + if (exists) + set_bit(OBJ_REQ_EXISTS, &obj_request->flags); + set_bit(OBJ_REQ_KNOWN, &obj_request->flags); + smp_mb(); +} + +static bool obj_request_known_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0; +} + +static bool obj_request_exists_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0; +} + static void rbd_obj_request_get(struct rbd_obj_request *obj_request) { dout("%s: obj %p (was %d)\n", __func__, obj_request, @@ -1101,9 +1365,11 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, { rbd_assert(obj_request->img_request == NULL); - rbd_obj_request_get(obj_request); + /* Image request now owns object's original reference */ obj_request->img_request = img_request; obj_request->which = img_request->obj_request_count; + rbd_assert(!obj_request_img_data_test(obj_request)); + obj_request_img_data_set(obj_request); rbd_assert(obj_request->which != BAD_WHICH); img_request->obj_request_count++; list_add_tail(&obj_request->links, &img_request->obj_requests); @@ -1123,6 +1389,7 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, img_request->obj_request_count--; rbd_assert(obj_request->which == img_request->obj_request_count); obj_request->which = BAD_WHICH; + rbd_assert(obj_request_img_data_test(obj_request)); rbd_assert(obj_request->img_request == img_request); obj_request->img_request = NULL; obj_request->callback = NULL; @@ -1141,76 +1408,6 @@ static bool obj_request_type_valid(enum obj_request_type type) } } -static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...) -{ - struct ceph_osd_req_op *op; - va_list args; - size_t size; - - op = kzalloc(sizeof (*op), GFP_NOIO); - if (!op) - return NULL; - op->op = opcode; - va_start(args, opcode); - switch (opcode) { - case CEPH_OSD_OP_READ: - case CEPH_OSD_OP_WRITE: - /* rbd_osd_req_op_create(READ, offset, length) */ - /* rbd_osd_req_op_create(WRITE, offset, length) */ - op->extent.offset = va_arg(args, u64); - op->extent.length = va_arg(args, u64); - if (opcode == CEPH_OSD_OP_WRITE) - op->payload_len = op->extent.length; - break; - case CEPH_OSD_OP_STAT: - break; - case CEPH_OSD_OP_CALL: - /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */ - op->cls.class_name = va_arg(args, char *); - size = strlen(op->cls.class_name); - rbd_assert(size <= (size_t) U8_MAX); - op->cls.class_len = size; - op->payload_len = size; - - op->cls.method_name = va_arg(args, char *); - size = strlen(op->cls.method_name); - rbd_assert(size <= (size_t) U8_MAX); - op->cls.method_len = size; - op->payload_len += size; - - op->cls.argc = 0; - op->cls.indata = va_arg(args, void *); - size = va_arg(args, size_t); - rbd_assert(size <= (size_t) U32_MAX); - op->cls.indata_len = (u32) size; - op->payload_len += size; - break; - case CEPH_OSD_OP_NOTIFY_ACK: - case CEPH_OSD_OP_WATCH: - /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */ - /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */ - op->watch.cookie = va_arg(args, u64); - op->watch.ver = va_arg(args, u64); - op->watch.ver = cpu_to_le64(op->watch.ver); - if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int)) - op->watch.flag = (u8) 1; - break; - default: - rbd_warn(NULL, "unsupported opcode %hu\n", opcode); - kfree(op); - op = NULL; - break; - } - va_end(args); - - return op; -} - -static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op) -{ - kfree(op); -} - static int rbd_obj_request_submit(struct ceph_osd_client *osdc, struct rbd_obj_request *obj_request) { @@ -1221,7 +1418,24 @@ static int rbd_obj_request_submit(struct ceph_osd_client *osdc, static void rbd_img_request_complete(struct rbd_img_request *img_request) { + dout("%s: img %p\n", __func__, img_request); + + /* + * If no error occurred, compute the aggregate transfer + * count for the image request. We could instead use + * atomic64_cmpxchg() to update it as each object request + * completes; not clear which way is better off hand. + */ + if (!img_request->result) { + struct rbd_obj_request *obj_request; + u64 xferred = 0; + + for_each_obj_request(img_request, obj_request) + xferred += obj_request->xferred; + img_request->xferred = xferred; + } + if (img_request->callback) img_request->callback(img_request); else @@ -1237,39 +1451,56 @@ static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) return wait_for_completion_interruptible(&obj_request->completion); } -static void obj_request_done_init(struct rbd_obj_request *obj_request) +/* + * The default/initial value for all image request flags is 0. Each + * is conditionally set to 1 at image request initialization time + * and currently never change thereafter. + */ +static void img_request_write_set(struct rbd_img_request *img_request) { - atomic_set(&obj_request->done, 0); - smp_wmb(); + set_bit(IMG_REQ_WRITE, &img_request->flags); + smp_mb(); } -static void obj_request_done_set(struct rbd_obj_request *obj_request) +static bool img_request_write_test(struct rbd_img_request *img_request) { - int done; + smp_mb(); + return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0; +} - done = atomic_inc_return(&obj_request->done); - if (done > 1) { - struct rbd_img_request *img_request = obj_request->img_request; - struct rbd_device *rbd_dev; +static void img_request_child_set(struct rbd_img_request *img_request) +{ + set_bit(IMG_REQ_CHILD, &img_request->flags); + smp_mb(); +} - rbd_dev = img_request ? img_request->rbd_dev : NULL; - rbd_warn(rbd_dev, "obj_request %p was already done\n", - obj_request); - } +static bool img_request_child_test(struct rbd_img_request *img_request) +{ + smp_mb(); + return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0; } -static bool obj_request_done_test(struct rbd_obj_request *obj_request) +static void img_request_layered_set(struct rbd_img_request *img_request) +{ + set_bit(IMG_REQ_LAYERED, &img_request->flags); + smp_mb(); +} + +static bool img_request_layered_test(struct rbd_img_request *img_request) { smp_mb(); - return atomic_read(&obj_request->done) != 0; + return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; } static void rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) { + u64 xferred = obj_request->xferred; + u64 length = obj_request->length; + dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, obj_request, obj_request->img_request, obj_request->result, - obj_request->xferred, obj_request->length); + xferred, length); /* * ENOENT means a hole in the image. We zero-fill the * entire length of the request. A short read also implies @@ -1277,15 +1508,20 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) * update the xferred count to indicate the whole request * was satisfied. */ - BUG_ON(obj_request->type != OBJ_REQUEST_BIO); + rbd_assert(obj_request->type != OBJ_REQUEST_NODATA); if (obj_request->result == -ENOENT) { - zero_bio_chain(obj_request->bio_list, 0); + if (obj_request->type == OBJ_REQUEST_BIO) + zero_bio_chain(obj_request->bio_list, 0); + else + zero_pages(obj_request->pages, 0, length); obj_request->result = 0; - obj_request->xferred = obj_request->length; - } else if (obj_request->xferred < obj_request->length && - !obj_request->result) { - zero_bio_chain(obj_request->bio_list, obj_request->xferred); - obj_request->xferred = obj_request->length; + obj_request->xferred = length; + } else if (xferred < length && !obj_request->result) { + if (obj_request->type == OBJ_REQUEST_BIO) + zero_bio_chain(obj_request->bio_list, xferred); + else + zero_pages(obj_request->pages, xferred, length); + obj_request->xferred = length; } obj_request_done_set(obj_request); } @@ -1308,9 +1544,23 @@ static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request) static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) { - dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request, - obj_request->result, obj_request->xferred, obj_request->length); - if (obj_request->img_request) + struct rbd_img_request *img_request = NULL; + struct rbd_device *rbd_dev = NULL; + bool layered = false; + + if (obj_request_img_data_test(obj_request)) { + img_request = obj_request->img_request; + layered = img_request && img_request_layered_test(img_request); + rbd_dev = img_request->rbd_dev; + } + + dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, + obj_request, img_request, obj_request->result, + obj_request->xferred, obj_request->length); + if (layered && obj_request->result == -ENOENT && + obj_request->img_offset < rbd_dev->parent_overlap) + rbd_img_parent_read(obj_request); + else if (img_request) rbd_img_obj_request_read_callback(obj_request); else obj_request_done_set(obj_request); @@ -1321,9 +1571,8 @@ static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) dout("%s: obj %p result %d %llu\n", __func__, obj_request, obj_request->result, obj_request->length); /* - * There is no such thing as a successful short write. - * Our xferred value is the number of bytes transferred - * back. Set it to our originally-requested length. + * There is no such thing as a successful short write. Set + * it to our originally-requested length. */ obj_request->xferred = obj_request->length; obj_request_done_set(obj_request); @@ -1347,22 +1596,25 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg); rbd_assert(osd_req == obj_request->osd_req); - rbd_assert(!!obj_request->img_request ^ - (obj_request->which == BAD_WHICH)); + if (obj_request_img_data_test(obj_request)) { + rbd_assert(obj_request->img_request); + rbd_assert(obj_request->which != BAD_WHICH); + } else { + rbd_assert(obj_request->which == BAD_WHICH); + } if (osd_req->r_result < 0) obj_request->result = osd_req->r_result; - obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version); - WARN_ON(osd_req->r_num_ops != 1); /* For now */ + BUG_ON(osd_req->r_num_ops > 2); /* * We support a 64-bit length, but ultimately it has to be * passed to blk_end_request(), which takes an unsigned int. */ obj_request->xferred = osd_req->r_reply_op_len[0]; - rbd_assert(obj_request->xferred < (u64) UINT_MAX); - opcode = osd_req->r_request_ops[0].op; + rbd_assert(obj_request->xferred < (u64)UINT_MAX); + opcode = osd_req->r_ops[0].op; switch (opcode) { case CEPH_OSD_OP_READ: rbd_osd_read_callback(obj_request); @@ -1388,28 +1640,49 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, rbd_obj_request_complete(obj_request); } +static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request = obj_request->img_request; + struct ceph_osd_request *osd_req = obj_request->osd_req; + u64 snap_id; + + rbd_assert(osd_req != NULL); + + snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP; + ceph_osdc_build_request(osd_req, obj_request->offset, + NULL, snap_id, NULL); +} + +static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request = obj_request->img_request; + struct ceph_osd_request *osd_req = obj_request->osd_req; + struct ceph_snap_context *snapc; + struct timespec mtime = CURRENT_TIME; + + rbd_assert(osd_req != NULL); + + snapc = img_request ? img_request->snapc : NULL; + ceph_osdc_build_request(osd_req, obj_request->offset, + snapc, CEPH_NOSNAP, &mtime); +} + static struct ceph_osd_request *rbd_osd_req_create( struct rbd_device *rbd_dev, bool write_request, - struct rbd_obj_request *obj_request, - struct ceph_osd_req_op *op) + struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request = obj_request->img_request; struct ceph_snap_context *snapc = NULL; struct ceph_osd_client *osdc; struct ceph_osd_request *osd_req; - struct timespec now; - struct timespec *mtime; - u64 snap_id = CEPH_NOSNAP; - u64 offset = obj_request->offset; - u64 length = obj_request->length; - if (img_request) { - rbd_assert(img_request->write_request == write_request); - if (img_request->write_request) + if (obj_request_img_data_test(obj_request)) { + struct rbd_img_request *img_request = obj_request->img_request; + + rbd_assert(write_request == + img_request_write_test(img_request)); + if (write_request) snapc = img_request->snapc; - else - snap_id = img_request->snap_id; } /* Allocate and initialize the request, for the single op */ @@ -1419,31 +1692,10 @@ static struct ceph_osd_request *rbd_osd_req_create( if (!osd_req) return NULL; /* ENOMEM */ - rbd_assert(obj_request_type_valid(obj_request->type)); - switch (obj_request->type) { - case OBJ_REQUEST_NODATA: - break; /* Nothing to do */ - case OBJ_REQUEST_BIO: - rbd_assert(obj_request->bio_list != NULL); - osd_req->r_bio = obj_request->bio_list; - break; - case OBJ_REQUEST_PAGES: - osd_req->r_pages = obj_request->pages; - osd_req->r_num_pages = obj_request->page_count; - osd_req->r_page_alignment = offset & ~PAGE_MASK; - break; - } - - if (write_request) { + if (write_request) osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; - now = CURRENT_TIME; - mtime = &now; - } else { + else osd_req->r_flags = CEPH_OSD_FLAG_READ; - mtime = NULL; /* not needed for reads */ - offset = 0; /* These are not used... */ - length = 0; /* ...for osd read requests */ - } osd_req->r_callback = rbd_osd_req_callback; osd_req->r_priv = obj_request; @@ -1454,14 +1706,51 @@ static struct ceph_osd_request *rbd_osd_req_create( osd_req->r_file_layout = rbd_dev->layout; /* struct */ - /* osd_req will get its own reference to snapc (if non-null) */ + return osd_req; +} - ceph_osdc_build_request(osd_req, offset, length, 1, op, - snapc, snap_id, mtime); +/* + * Create a copyup osd request based on the information in the + * object request supplied. A copyup request has two osd ops, + * a copyup method call, and a "normal" write request. + */ +static struct ceph_osd_request * +rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + struct ceph_snap_context *snapc; + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + struct ceph_osd_request *osd_req; + + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + rbd_assert(img_request); + rbd_assert(img_request_write_test(img_request)); + + /* Allocate and initialize the request, for the two ops */ + + snapc = img_request->snapc; + rbd_dev = img_request->rbd_dev; + osdc = &rbd_dev->rbd_client->client->osdc; + osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC); + if (!osd_req) + return NULL; /* ENOMEM */ + + osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + osd_req->r_callback = rbd_osd_req_callback; + osd_req->r_priv = obj_request; + + osd_req->r_oid_len = strlen(obj_request->object_name); + rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); + memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); + + osd_req->r_file_layout = rbd_dev->layout; /* struct */ return osd_req; } + static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) { ceph_osdc_put_request(osd_req); @@ -1480,18 +1769,23 @@ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, rbd_assert(obj_request_type_valid(type)); size = strlen(object_name) + 1; - obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL); - if (!obj_request) + name = kmalloc(size, GFP_KERNEL); + if (!name) return NULL; - name = (char *)(obj_request + 1); + obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL); + if (!obj_request) { + kfree(name); + return NULL; + } + obj_request->object_name = memcpy(name, object_name, size); obj_request->offset = offset; obj_request->length = length; + obj_request->flags = 0; obj_request->which = BAD_WHICH; obj_request->type = type; INIT_LIST_HEAD(&obj_request->links); - obj_request_done_init(obj_request); init_completion(&obj_request->completion); kref_init(&obj_request->kref); @@ -1530,7 +1824,9 @@ static void rbd_obj_request_destroy(struct kref *kref) break; } - kfree(obj_request); + kfree(obj_request->object_name); + obj_request->object_name = NULL; + kmem_cache_free(rbd_obj_request_cache, obj_request); } /* @@ -1541,37 +1837,40 @@ static void rbd_obj_request_destroy(struct kref *kref) static struct rbd_img_request *rbd_img_request_create( struct rbd_device *rbd_dev, u64 offset, u64 length, - bool write_request) + bool write_request, + bool child_request) { struct rbd_img_request *img_request; - struct ceph_snap_context *snapc = NULL; - img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC); + img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC); if (!img_request) return NULL; if (write_request) { down_read(&rbd_dev->header_rwsem); - snapc = ceph_get_snap_context(rbd_dev->header.snapc); + ceph_get_snap_context(rbd_dev->header.snapc); up_read(&rbd_dev->header_rwsem); - if (WARN_ON(!snapc)) { - kfree(img_request); - return NULL; /* Shouldn't happen */ - } } img_request->rq = NULL; img_request->rbd_dev = rbd_dev; img_request->offset = offset; img_request->length = length; - img_request->write_request = write_request; - if (write_request) - img_request->snapc = snapc; - else + img_request->flags = 0; + if (write_request) { + img_request_write_set(img_request); + img_request->snapc = rbd_dev->header.snapc; + } else { img_request->snap_id = rbd_dev->spec->snap_id; + } + if (child_request) + img_request_child_set(img_request); + if (rbd_dev->parent_spec) + img_request_layered_set(img_request); spin_lock_init(&img_request->completion_lock); img_request->next_completion = 0; img_request->callback = NULL; + img_request->result = 0; img_request->obj_request_count = 0; INIT_LIST_HEAD(&img_request->obj_requests); kref_init(&img_request->kref); @@ -1600,78 +1899,204 @@ static void rbd_img_request_destroy(struct kref *kref) rbd_img_obj_request_del(img_request, obj_request); rbd_assert(img_request->obj_request_count == 0); - if (img_request->write_request) + if (img_request_write_test(img_request)) ceph_put_snap_context(img_request->snapc); - kfree(img_request); + if (img_request_child_test(img_request)) + rbd_obj_request_put(img_request->obj_request); + + kmem_cache_free(rbd_img_request_cache, img_request); +} + +static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + unsigned int xferred; + int result; + bool more; + + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + + rbd_assert(obj_request->xferred <= (u64)UINT_MAX); + xferred = (unsigned int)obj_request->xferred; + result = obj_request->result; + if (result) { + struct rbd_device *rbd_dev = img_request->rbd_dev; + + rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n", + img_request_write_test(img_request) ? "write" : "read", + obj_request->length, obj_request->img_offset, + obj_request->offset); + rbd_warn(rbd_dev, " result %d xferred %x\n", + result, xferred); + if (!img_request->result) + img_request->result = result; + } + + /* Image object requests don't own their page array */ + + if (obj_request->type == OBJ_REQUEST_PAGES) { + obj_request->pages = NULL; + obj_request->page_count = 0; + } + + if (img_request_child_test(img_request)) { + rbd_assert(img_request->obj_request != NULL); + more = obj_request->which < img_request->obj_request_count - 1; + } else { + rbd_assert(img_request->rq != NULL); + more = blk_end_request(img_request->rq, result, xferred); + } + + return more; } -static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, - struct bio *bio_list) +static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + u32 which = obj_request->which; + bool more = true; + + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + + dout("%s: img %p obj %p\n", __func__, img_request, obj_request); + rbd_assert(img_request != NULL); + rbd_assert(img_request->obj_request_count > 0); + rbd_assert(which != BAD_WHICH); + rbd_assert(which < img_request->obj_request_count); + rbd_assert(which >= img_request->next_completion); + + spin_lock_irq(&img_request->completion_lock); + if (which != img_request->next_completion) + goto out; + + for_each_obj_request_from(img_request, obj_request) { + rbd_assert(more); + rbd_assert(which < img_request->obj_request_count); + + if (!obj_request_done_test(obj_request)) + break; + more = rbd_img_obj_end_request(obj_request); + which++; + } + + rbd_assert(more ^ (which == img_request->obj_request_count)); + img_request->next_completion = which; +out: + spin_unlock_irq(&img_request->completion_lock); + + if (!more) + rbd_img_request_complete(img_request); +} + +/* + * Split up an image request into one or more object requests, each + * to a different object. The "type" parameter indicates whether + * "data_desc" is the pointer to the head of a list of bio + * structures, or the base of a page array. In either case this + * function assumes data_desc describes memory sufficient to hold + * all data described by the image request. + */ +static int rbd_img_request_fill(struct rbd_img_request *img_request, + enum obj_request_type type, + void *data_desc) { struct rbd_device *rbd_dev = img_request->rbd_dev; struct rbd_obj_request *obj_request = NULL; struct rbd_obj_request *next_obj_request; - unsigned int bio_offset; - u64 image_offset; + bool write_request = img_request_write_test(img_request); + struct bio *bio_list; + unsigned int bio_offset = 0; + struct page **pages; + u64 img_offset; u64 resid; u16 opcode; - dout("%s: img %p bio %p\n", __func__, img_request, bio_list); + dout("%s: img %p type %d data_desc %p\n", __func__, img_request, + (int)type, data_desc); - opcode = img_request->write_request ? CEPH_OSD_OP_WRITE - : CEPH_OSD_OP_READ; - bio_offset = 0; - image_offset = img_request->offset; - rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT); + opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ; + img_offset = img_request->offset; resid = img_request->length; rbd_assert(resid > 0); + + if (type == OBJ_REQUEST_BIO) { + bio_list = data_desc; + rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT); + } else { + rbd_assert(type == OBJ_REQUEST_PAGES); + pages = data_desc; + } + while (resid) { + struct ceph_osd_request *osd_req; const char *object_name; - unsigned int clone_size; - struct ceph_osd_req_op *op; u64 offset; u64 length; - object_name = rbd_segment_name(rbd_dev, image_offset); + object_name = rbd_segment_name(rbd_dev, img_offset); if (!object_name) goto out_unwind; - offset = rbd_segment_offset(rbd_dev, image_offset); - length = rbd_segment_length(rbd_dev, image_offset, resid); + offset = rbd_segment_offset(rbd_dev, img_offset); + length = rbd_segment_length(rbd_dev, img_offset, resid); obj_request = rbd_obj_request_create(object_name, - offset, length, - OBJ_REQUEST_BIO); - kfree(object_name); /* object request has its own copy */ + offset, length, type); + /* object request has its own copy of the object name */ + rbd_segment_name_free(object_name); if (!obj_request) goto out_unwind; - rbd_assert(length <= (u64) UINT_MAX); - clone_size = (unsigned int) length; - obj_request->bio_list = bio_chain_clone_range(&bio_list, - &bio_offset, clone_size, - GFP_ATOMIC); - if (!obj_request->bio_list) - goto out_partial; + if (type == OBJ_REQUEST_BIO) { + unsigned int clone_size; + + rbd_assert(length <= (u64)UINT_MAX); + clone_size = (unsigned int)length; + obj_request->bio_list = + bio_chain_clone_range(&bio_list, + &bio_offset, + clone_size, + GFP_ATOMIC); + if (!obj_request->bio_list) + goto out_partial; + } else { + unsigned int page_count; + + obj_request->pages = pages; + page_count = (u32)calc_pages_for(offset, length); + obj_request->page_count = page_count; + if ((offset + length) & ~PAGE_MASK) + page_count--; /* more on last page */ + pages += page_count; + } - /* - * Build up the op to use in building the osd - * request. Note that the contents of the op are - * copied by rbd_osd_req_create(). - */ - op = rbd_osd_req_op_create(opcode, offset, length); - if (!op) - goto out_partial; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, - img_request->write_request, - obj_request, op); - rbd_osd_req_op_destroy(op); - if (!obj_request->osd_req) + osd_req = rbd_osd_req_create(rbd_dev, write_request, + obj_request); + if (!osd_req) goto out_partial; - /* status and version are initially zero-filled */ + obj_request->osd_req = osd_req; + obj_request->callback = rbd_img_obj_callback; + osd_req_op_extent_init(osd_req, 0, opcode, offset, length, + 0, 0); + if (type == OBJ_REQUEST_BIO) + osd_req_op_extent_osd_data_bio(osd_req, 0, + obj_request->bio_list, length); + else + osd_req_op_extent_osd_data_pages(osd_req, 0, + obj_request->pages, length, + offset & ~PAGE_MASK, false, false); + + if (write_request) + rbd_osd_req_format_write(obj_request); + else + rbd_osd_req_format_read(obj_request); + + obj_request->img_offset = img_offset; rbd_img_obj_request_add(img_request, obj_request); - image_offset += length; + img_offset += length; resid -= length; } @@ -1686,61 +2111,389 @@ out_unwind: return -ENOMEM; } -static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) +static void +rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request) { struct rbd_img_request *img_request; - u32 which = obj_request->which; - bool more = true; + struct rbd_device *rbd_dev; + u64 length; + u32 page_count; + rbd_assert(obj_request->type == OBJ_REQUEST_BIO); + rbd_assert(obj_request_img_data_test(obj_request)); img_request = obj_request->img_request; + rbd_assert(img_request); - dout("%s: img %p obj %p\n", __func__, img_request, obj_request); + rbd_dev = img_request->rbd_dev; + rbd_assert(rbd_dev); + length = (u64)1 << rbd_dev->header.obj_order; + page_count = (u32)calc_pages_for(0, length); + + rbd_assert(obj_request->copyup_pages); + ceph_release_page_vector(obj_request->copyup_pages, page_count); + obj_request->copyup_pages = NULL; + + /* + * We want the transfer count to reflect the size of the + * original write request. There is no such thing as a + * successful short write, so if the request was successful + * we can just set it to the originally-requested length. + */ + if (!obj_request->result) + obj_request->xferred = obj_request->length; + + /* Finish up with the normal image object callback */ + + rbd_img_obj_callback(obj_request); +} + +static void +rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) +{ + struct rbd_obj_request *orig_request; + struct ceph_osd_request *osd_req; + struct ceph_osd_client *osdc; + struct rbd_device *rbd_dev; + struct page **pages; + int result; + u64 obj_size; + u64 xferred; + + rbd_assert(img_request_child_test(img_request)); + + /* First get what we need from the image request */ + + pages = img_request->copyup_pages; + rbd_assert(pages != NULL); + img_request->copyup_pages = NULL; + + orig_request = img_request->obj_request; + rbd_assert(orig_request != NULL); + rbd_assert(orig_request->type == OBJ_REQUEST_BIO); + result = img_request->result; + obj_size = img_request->length; + xferred = img_request->xferred; + + rbd_dev = img_request->rbd_dev; + rbd_assert(rbd_dev); + rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order); + + rbd_img_request_put(img_request); + + if (result) + goto out_err; + + /* Allocate the new copyup osd request for the original request */ + + result = -ENOMEM; + rbd_assert(!orig_request->osd_req); + osd_req = rbd_osd_req_create_copyup(orig_request); + if (!osd_req) + goto out_err; + orig_request->osd_req = osd_req; + orig_request->copyup_pages = pages; + + /* Initialize the copyup op */ + + osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup"); + osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0, + false, false); + + /* Then the original write request op */ + + osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE, + orig_request->offset, + orig_request->length, 0, 0); + osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list, + orig_request->length); + + rbd_osd_req_format_write(orig_request); + + /* All set, send it off. */ + + orig_request->callback = rbd_img_obj_copyup_callback; + osdc = &rbd_dev->rbd_client->client->osdc; + result = rbd_obj_request_submit(osdc, orig_request); + if (!result) + return; +out_err: + /* Record the error code and complete the request */ + + orig_request->result = result; + orig_request->xferred = 0; + obj_request_done_set(orig_request); + rbd_obj_request_complete(orig_request); +} + +/* + * Read from the parent image the range of data that covers the + * entire target of the given object request. This is used for + * satisfying a layered image write request when the target of an + * object request from the image request does not exist. + * + * A page array big enough to hold the returned data is allocated + * and supplied to rbd_img_request_fill() as the "data descriptor." + * When the read completes, this page array will be transferred to + * the original object request for the copyup operation. + * + * If an error occurs, record it as the result of the original + * object request and mark it done so it gets completed. + */ +static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request = NULL; + struct rbd_img_request *parent_request = NULL; + struct rbd_device *rbd_dev; + u64 img_offset; + u64 length; + struct page **pages = NULL; + u32 page_count; + int result; + + rbd_assert(obj_request_img_data_test(obj_request)); + rbd_assert(obj_request->type == OBJ_REQUEST_BIO); + + img_request = obj_request->img_request; rbd_assert(img_request != NULL); - rbd_assert(img_request->rq != NULL); - rbd_assert(img_request->obj_request_count > 0); - rbd_assert(which != BAD_WHICH); - rbd_assert(which < img_request->obj_request_count); - rbd_assert(which >= img_request->next_completion); + rbd_dev = img_request->rbd_dev; + rbd_assert(rbd_dev->parent != NULL); - spin_lock_irq(&img_request->completion_lock); - if (which != img_request->next_completion) - goto out; + /* + * First things first. The original osd request is of no + * use to use any more, we'll need a new one that can hold + * the two ops in a copyup request. We'll get that later, + * but for now we can release the old one. + */ + rbd_osd_req_destroy(obj_request->osd_req); + obj_request->osd_req = NULL; - for_each_obj_request_from(img_request, obj_request) { - unsigned int xferred; - int result; + /* + * Determine the byte range covered by the object in the + * child image to which the original request was to be sent. + */ + img_offset = obj_request->img_offset - obj_request->offset; + length = (u64)1 << rbd_dev->header.obj_order; - rbd_assert(more); - rbd_assert(which < img_request->obj_request_count); + /* + * There is no defined parent data beyond the parent + * overlap, so limit what we read at that boundary if + * necessary. + */ + if (img_offset + length > rbd_dev->parent_overlap) { + rbd_assert(img_offset < rbd_dev->parent_overlap); + length = rbd_dev->parent_overlap - img_offset; + } - if (!obj_request_done_test(obj_request)) - break; + /* + * Allocate a page array big enough to receive the data read + * from the parent. + */ + page_count = (u32)calc_pages_for(0, length); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) { + result = PTR_ERR(pages); + pages = NULL; + goto out_err; + } - rbd_assert(obj_request->xferred <= (u64) UINT_MAX); - xferred = (unsigned int) obj_request->xferred; - result = (int) obj_request->result; - if (result) - rbd_warn(NULL, "obj_request %s result %d xferred %u\n", - img_request->write_request ? "write" : "read", - result, xferred); + result = -ENOMEM; + parent_request = rbd_img_request_create(rbd_dev->parent, + img_offset, length, + false, true); + if (!parent_request) + goto out_err; + rbd_obj_request_get(obj_request); + parent_request->obj_request = obj_request; - more = blk_end_request(img_request->rq, result, xferred); - which++; + result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); + if (result) + goto out_err; + parent_request->copyup_pages = pages; + + parent_request->callback = rbd_img_obj_parent_read_full_callback; + result = rbd_img_request_submit(parent_request); + if (!result) + return 0; + + parent_request->copyup_pages = NULL; + parent_request->obj_request = NULL; + rbd_obj_request_put(obj_request); +out_err: + if (pages) + ceph_release_page_vector(pages, page_count); + if (parent_request) + rbd_img_request_put(parent_request); + obj_request->result = result; + obj_request->xferred = 0; + obj_request_done_set(obj_request); + + return result; +} + +static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) +{ + struct rbd_obj_request *orig_request; + int result; + + rbd_assert(!obj_request_img_data_test(obj_request)); + + /* + * All we need from the object request is the original + * request and the result of the STAT op. Grab those, then + * we're done with the request. + */ + orig_request = obj_request->obj_request; + obj_request->obj_request = NULL; + rbd_assert(orig_request); + rbd_assert(orig_request->img_request); + + result = obj_request->result; + obj_request->result = 0; + + dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__, + obj_request, orig_request, result, + obj_request->xferred, obj_request->length); + rbd_obj_request_put(obj_request); + + rbd_assert(orig_request); + rbd_assert(orig_request->img_request); + + /* + * Our only purpose here is to determine whether the object + * exists, and we don't want to treat the non-existence as + * an error. If something else comes back, transfer the + * error to the original request and complete it now. + */ + if (!result) { + obj_request_existence_set(orig_request, true); + } else if (result == -ENOENT) { + obj_request_existence_set(orig_request, false); + } else if (result) { + orig_request->result = result; + goto out; } - rbd_assert(more ^ (which == img_request->obj_request_count)); - img_request->next_completion = which; + /* + * Resubmit the original request now that we have recorded + * whether the target object exists. + */ + orig_request->result = rbd_img_obj_request_submit(orig_request); out: - spin_unlock_irq(&img_request->completion_lock); + if (orig_request->result) + rbd_obj_request_complete(orig_request); + rbd_obj_request_put(orig_request); +} - if (!more) - rbd_img_request_complete(img_request); +static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) +{ + struct rbd_obj_request *stat_request; + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + struct page **pages = NULL; + u32 page_count; + size_t size; + int ret; + + /* + * The response data for a STAT call consists of: + * le64 length; + * struct { + * le32 tv_sec; + * le32 tv_nsec; + * } mtime; + */ + size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); + page_count = (u32)calc_pages_for(0, size); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + ret = -ENOMEM; + stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, + OBJ_REQUEST_PAGES); + if (!stat_request) + goto out; + + rbd_obj_request_get(obj_request); + stat_request->obj_request = obj_request; + stat_request->pages = pages; + stat_request->page_count = page_count; + + rbd_assert(obj_request->img_request); + rbd_dev = obj_request->img_request->rbd_dev; + stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, + stat_request); + if (!stat_request->osd_req) + goto out; + stat_request->callback = rbd_img_obj_exists_callback; + + osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT); + osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, + false, false); + rbd_osd_req_format_read(stat_request); + + osdc = &rbd_dev->rbd_client->client->osdc; + ret = rbd_obj_request_submit(osdc, stat_request); +out: + if (ret) + rbd_obj_request_put(obj_request); + + return ret; +} + +static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + struct rbd_device *rbd_dev; + bool known; + + rbd_assert(obj_request_img_data_test(obj_request)); + + img_request = obj_request->img_request; + rbd_assert(img_request); + rbd_dev = img_request->rbd_dev; + + /* + * Only writes to layered images need special handling. + * Reads and non-layered writes are simple object requests. + * Layered writes that start beyond the end of the overlap + * with the parent have no parent data, so they too are + * simple object requests. Finally, if the target object is + * known to already exist, its parent data has already been + * copied, so a write to the object can also be handled as a + * simple object request. + */ + if (!img_request_write_test(img_request) || + !img_request_layered_test(img_request) || + rbd_dev->parent_overlap <= obj_request->img_offset || + ((known = obj_request_known_test(obj_request)) && + obj_request_exists_test(obj_request))) { + + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + + rbd_dev = obj_request->img_request->rbd_dev; + osdc = &rbd_dev->rbd_client->client->osdc; + + return rbd_obj_request_submit(osdc, obj_request); + } + + /* + * It's a layered write. The target object might exist but + * we may not know that yet. If we know it doesn't exist, + * start by reading the data for the full target object from + * the parent so we can use it for a copyup to the target. + */ + if (known) + return rbd_img_obj_parent_read_full(obj_request); + + /* We don't know whether the target exists. Go find out. */ + + return rbd_img_obj_exists_submit(obj_request); } static int rbd_img_request_submit(struct rbd_img_request *img_request) { - struct rbd_device *rbd_dev = img_request->rbd_dev; - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; struct rbd_obj_request *next_obj_request; @@ -1748,27 +2501,105 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request) for_each_obj_request_safe(img_request, obj_request, next_obj_request) { int ret; - obj_request->callback = rbd_img_obj_callback; - ret = rbd_obj_request_submit(osdc, obj_request); + ret = rbd_img_obj_request_submit(obj_request); if (ret) return ret; - /* - * The image request has its own reference to each - * of its object requests, so we can safely drop the - * initial one here. - */ - rbd_obj_request_put(obj_request); } return 0; } -static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, - u64 ver, u64 notify_id) +static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) { struct rbd_obj_request *obj_request; - struct ceph_osd_req_op *op; - struct ceph_osd_client *osdc; + struct rbd_device *rbd_dev; + u64 obj_end; + + rbd_assert(img_request_child_test(img_request)); + + obj_request = img_request->obj_request; + rbd_assert(obj_request); + rbd_assert(obj_request->img_request); + + obj_request->result = img_request->result; + if (obj_request->result) + goto out; + + /* + * We need to zero anything beyond the parent overlap + * boundary. Since rbd_img_obj_request_read_callback() + * will zero anything beyond the end of a short read, an + * easy way to do this is to pretend the data from the + * parent came up short--ending at the overlap boundary. + */ + rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length); + obj_end = obj_request->img_offset + obj_request->length; + rbd_dev = obj_request->img_request->rbd_dev; + if (obj_end > rbd_dev->parent_overlap) { + u64 xferred = 0; + + if (obj_request->img_offset < rbd_dev->parent_overlap) + xferred = rbd_dev->parent_overlap - + obj_request->img_offset; + + obj_request->xferred = min(img_request->xferred, xferred); + } else { + obj_request->xferred = img_request->xferred; + } +out: + rbd_img_request_put(img_request); + rbd_img_obj_request_read_callback(obj_request); + rbd_obj_request_complete(obj_request); +} + +static void rbd_img_parent_read(struct rbd_obj_request *obj_request) +{ + struct rbd_device *rbd_dev; + struct rbd_img_request *img_request; + int result; + + rbd_assert(obj_request_img_data_test(obj_request)); + rbd_assert(obj_request->img_request != NULL); + rbd_assert(obj_request->result == (s32) -ENOENT); + rbd_assert(obj_request->type == OBJ_REQUEST_BIO); + + rbd_dev = obj_request->img_request->rbd_dev; + rbd_assert(rbd_dev->parent != NULL); + /* rbd_read_finish(obj_request, obj_request->length); */ + img_request = rbd_img_request_create(rbd_dev->parent, + obj_request->img_offset, + obj_request->length, + false, true); + result = -ENOMEM; + if (!img_request) + goto out_err; + + rbd_obj_request_get(obj_request); + img_request->obj_request = obj_request; + + result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, + obj_request->bio_list); + if (result) + goto out_err; + + img_request->callback = rbd_img_parent_read_callback; + result = rbd_img_request_submit(img_request); + if (result) + goto out_err; + + return; +out_err: + if (img_request) + rbd_img_request_put(img_request); + obj_request->result = result; + obj_request->xferred = 0; + obj_request_done_set(obj_request); +} + +static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id) +{ + struct rbd_obj_request *obj_request; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; int ret; obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, @@ -1777,17 +2608,15 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, return -ENOMEM; ret = -ENOMEM; - op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver); - if (!op) - goto out; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, - obj_request, op); - rbd_osd_req_op_destroy(op); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); if (!obj_request->osd_req) goto out; - - osdc = &rbd_dev->rbd_client->client->osdc; obj_request->callback = rbd_obj_request_put; + + osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK, + notify_id, 0, 0); + rbd_osd_req_format_read(obj_request); + ret = rbd_obj_request_submit(osdc, obj_request); out: if (ret) @@ -1799,21 +2628,16 @@ out: static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) { struct rbd_device *rbd_dev = (struct rbd_device *)data; - u64 hver; - int rc; if (!rbd_dev) return; dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, - rbd_dev->header_name, (unsigned long long) notify_id, - (unsigned int) opcode); - rc = rbd_dev_refresh(rbd_dev, &hver); - if (rc) - rbd_warn(rbd_dev, "got notification but failed to " - " update snaps: %d\n", rc); + rbd_dev->header_name, (unsigned long long)notify_id, + (unsigned int)opcode); + (void)rbd_dev_refresh(rbd_dev); - rbd_obj_notify_ack(rbd_dev, hver, notify_id); + rbd_obj_notify_ack(rbd_dev, notify_id); } /* @@ -1824,7 +2648,6 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; - struct ceph_osd_req_op *op; int ret; rbd_assert(start ^ !!rbd_dev->watch_event); @@ -1844,14 +2667,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) if (!obj_request) goto out_cancel; - op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH, - rbd_dev->watch_event->cookie, - rbd_dev->header.obj_version, start); - if (!op) - goto out_cancel; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, - obj_request, op); - rbd_osd_req_op_destroy(op); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request); if (!obj_request->osd_req) goto out_cancel; @@ -1860,6 +2676,11 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) else ceph_osdc_unregister_linger_request(osdc, rbd_dev->watch_request->osd_req); + + osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, + rbd_dev->watch_event->cookie, 0, start); + rbd_osd_req_format_write(obj_request); + ret = rbd_obj_request_submit(osdc, obj_request); if (ret) goto out_cancel; @@ -1899,40 +2720,38 @@ out_cancel: } /* - * Synchronous osd object method call + * Synchronous osd object method call. Returns the number of bytes + * returned in the outbound buffer, or a negative error code. */ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, const char *object_name, const char *class_name, const char *method_name, - const char *outbound, + const void *outbound, size_t outbound_size, - char *inbound, - size_t inbound_size, - u64 *version) + void *inbound, + size_t inbound_size) { + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; - struct ceph_osd_client *osdc; - struct ceph_osd_req_op *op; struct page **pages; u32 page_count; int ret; /* - * Method calls are ultimately read operations but they - * don't involve object data (so no offset or length). - * The result should placed into the inbound buffer - * provided. They also supply outbound data--parameters for - * the object method. Currently if this is present it will - * be a snapshot id. + * Method calls are ultimately read operations. The result + * should placed into the inbound buffer provided. They + * also supply outbound data--parameters for the object + * method. Currently if this is present it will be a + * snapshot id. */ - page_count = (u32) calc_pages_for(0, inbound_size); + page_count = (u32)calc_pages_for(0, inbound_size); pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); if (IS_ERR(pages)) return PTR_ERR(pages); ret = -ENOMEM; - obj_request = rbd_obj_request_create(object_name, 0, 0, + obj_request = rbd_obj_request_create(object_name, 0, inbound_size, OBJ_REQUEST_PAGES); if (!obj_request) goto out; @@ -1940,17 +2759,29 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, obj_request->pages = pages; obj_request->page_count = page_count; - op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name, - method_name, outbound, outbound_size); - if (!op) - goto out; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, - obj_request, op); - rbd_osd_req_op_destroy(op); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); if (!obj_request->osd_req) goto out; - osdc = &rbd_dev->rbd_client->client->osdc; + osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL, + class_name, method_name); + if (outbound_size) { + struct ceph_pagelist *pagelist; + + pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); + if (!pagelist) + goto out; + + ceph_pagelist_init(pagelist); + ceph_pagelist_append(pagelist, outbound, outbound_size); + osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0, + pagelist); + } + osd_req_op_cls_response_data_pages(obj_request->osd_req, 0, + obj_request->pages, inbound_size, + 0, false, false); + rbd_osd_req_format_read(obj_request); + ret = rbd_obj_request_submit(osdc, obj_request); if (ret) goto out; @@ -1961,10 +2792,10 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, ret = obj_request->result; if (ret < 0) goto out; - ret = 0; + + rbd_assert(obj_request->xferred < (u64)INT_MAX); + ret = (int)obj_request->xferred; ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred); - if (version) - *version = obj_request->version; out: if (obj_request) rbd_obj_request_put(obj_request); @@ -2034,18 +2865,22 @@ static void rbd_request_fn(struct request_queue *q) } result = -EINVAL; - if (WARN_ON(offset && length > U64_MAX - offset + 1)) + if (offset && length > U64_MAX - offset + 1) { + rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n", + offset, length); goto end_request; /* Shouldn't happen */ + } result = -ENOMEM; img_request = rbd_img_request_create(rbd_dev, offset, length, - write_request); + write_request, false); if (!img_request) goto end_request; img_request->rq = rq; - result = rbd_img_request_fill_bio(img_request, rq->bio); + result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, + rq->bio); if (!result) result = rbd_img_request_submit(img_request); if (result) @@ -2053,8 +2888,10 @@ static void rbd_request_fn(struct request_queue *q) end_request: spin_lock_irq(q->queue_lock); if (result < 0) { - rbd_warn(rbd_dev, "obj_request %s result %d\n", - write_request ? "write" : "read", result); + rbd_warn(rbd_dev, "%s %llx at %llx result %d\n", + write_request ? "write" : "read", + length, offset, result); + __blk_end_request_all(rq, result); } } @@ -2113,22 +2950,22 @@ static void rbd_free_disk(struct rbd_device *rbd_dev) if (!disk) return; - if (disk->flags & GENHD_FL_UP) + rbd_dev->disk = NULL; + if (disk->flags & GENHD_FL_UP) { del_gendisk(disk); - if (disk->queue) - blk_cleanup_queue(disk->queue); + if (disk->queue) + blk_cleanup_queue(disk->queue); + } put_disk(disk); } static int rbd_obj_read_sync(struct rbd_device *rbd_dev, const char *object_name, - u64 offset, u64 length, - char *buf, u64 *version) + u64 offset, u64 length, void *buf) { - struct ceph_osd_req_op *op; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; - struct ceph_osd_client *osdc; struct page **pages = NULL; u32 page_count; size_t size; @@ -2148,16 +2985,19 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, obj_request->pages = pages; obj_request->page_count = page_count; - op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length); - if (!op) - goto out; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, - obj_request, op); - rbd_osd_req_op_destroy(op); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); if (!obj_request->osd_req) goto out; - osdc = &rbd_dev->rbd_client->client->osdc; + osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ, + offset, length, 0, 0); + osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, + obj_request->pages, + obj_request->length, + obj_request->offset & ~PAGE_MASK, + false, false); + rbd_osd_req_format_read(obj_request); + ret = rbd_obj_request_submit(osdc, obj_request); if (ret) goto out; @@ -2172,10 +3012,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, rbd_assert(obj_request->xferred <= (u64) SIZE_MAX); size = (size_t) obj_request->xferred; ceph_copy_from_page_vector(pages, buf, 0, size); - rbd_assert(size <= (size_t) INT_MAX); - ret = (int) size; - if (version) - *version = obj_request->version; + rbd_assert(size <= (size_t)INT_MAX); + ret = (int)size; out: if (obj_request) rbd_obj_request_put(obj_request); @@ -2196,7 +3034,7 @@ out: * Returns a pointer-coded errno if a failure occurs. */ static struct rbd_image_header_ondisk * -rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version) +rbd_dev_v1_header_read(struct rbd_device *rbd_dev) { struct rbd_image_header_ondisk *ondisk = NULL; u32 snap_count = 0; @@ -2224,11 +3062,10 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version) return ERR_PTR(-ENOMEM); ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name, - 0, size, - (char *) ondisk, version); + 0, size, ondisk); if (ret < 0) goto out_err; - if (WARN_ON((size_t) ret < size)) { + if ((size_t)ret < size) { ret = -ENXIO; rbd_warn(rbd_dev, "short header read (want %zd got %d)", size, ret); @@ -2260,46 +3097,36 @@ static int rbd_read_header(struct rbd_device *rbd_dev, struct rbd_image_header *header) { struct rbd_image_header_ondisk *ondisk; - u64 ver = 0; int ret; - ondisk = rbd_dev_v1_header_read(rbd_dev, &ver); + ondisk = rbd_dev_v1_header_read(rbd_dev); if (IS_ERR(ondisk)) return PTR_ERR(ondisk); ret = rbd_header_from_disk(header, ondisk); - if (ret >= 0) - header->obj_version = ver; kfree(ondisk); return ret; } -static void rbd_remove_all_snaps(struct rbd_device *rbd_dev) -{ - struct rbd_snap *snap; - struct rbd_snap *next; - - list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) - rbd_remove_snap_dev(snap); -} - static void rbd_update_mapping_size(struct rbd_device *rbd_dev) { - sector_t size; - if (rbd_dev->spec->snap_id != CEPH_NOSNAP) return; - size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE; - dout("setting size to %llu sectors", (unsigned long long) size); - rbd_dev->mapping.size = (u64) size; - set_capacity(rbd_dev->disk, size); + if (rbd_dev->mapping.size != rbd_dev->header.image_size) { + sector_t size; + + rbd_dev->mapping.size = rbd_dev->header.image_size; + size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE; + dout("setting size to %llu sectors", (unsigned long long)size); + set_capacity(rbd_dev->disk, size); + } } /* * only read the first part of the ondisk header, without the snaps info */ -static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver) +static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev) { int ret; struct rbd_image_header h; @@ -2320,37 +3147,61 @@ static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver) /* osd requests may still refer to snapc */ ceph_put_snap_context(rbd_dev->header.snapc); - if (hver) - *hver = h.obj_version; - rbd_dev->header.obj_version = h.obj_version; rbd_dev->header.image_size = h.image_size; rbd_dev->header.snapc = h.snapc; rbd_dev->header.snap_names = h.snap_names; rbd_dev->header.snap_sizes = h.snap_sizes; /* Free the extra copy of the object prefix */ - WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix)); + if (strcmp(rbd_dev->header.object_prefix, h.object_prefix)) + rbd_warn(rbd_dev, "object prefix changed (ignoring)"); kfree(h.object_prefix); - ret = rbd_dev_snaps_update(rbd_dev); - if (!ret) - ret = rbd_dev_snaps_register(rbd_dev); - up_write(&rbd_dev->header_rwsem); return ret; } -static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver) +/* + * Clear the rbd device's EXISTS flag if the snapshot it's mapped to + * has disappeared from the (just updated) snapshot context. + */ +static void rbd_exists_validate(struct rbd_device *rbd_dev) +{ + u64 snap_id; + + if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) + return; + + snap_id = rbd_dev->spec->snap_id; + if (snap_id == CEPH_NOSNAP) + return; + + if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX) + clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); +} + +static int rbd_dev_refresh(struct rbd_device *rbd_dev) { + u64 image_size; int ret; rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + image_size = rbd_dev->header.image_size; mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); if (rbd_dev->image_format == 1) - ret = rbd_dev_v1_refresh(rbd_dev, hver); + ret = rbd_dev_v1_refresh(rbd_dev); else - ret = rbd_dev_v2_refresh(rbd_dev, hver); + ret = rbd_dev_v2_refresh(rbd_dev); + + /* If it's a mapped snapshot, validate its EXISTS flag */ + + rbd_exists_validate(rbd_dev); mutex_unlock(&ctl_mutex); + if (ret) + rbd_warn(rbd_dev, "got notification but failed to " + " update snaps: %d\n", ret); + if (image_size != rbd_dev->header.image_size) + revalidate_disk(rbd_dev->disk); return ret; } @@ -2394,8 +3245,6 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) rbd_dev->disk = disk; - set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); - return 0; out_disk: put_disk(disk); @@ -2416,13 +3265,9 @@ static ssize_t rbd_size_show(struct device *dev, struct device_attribute *attr, char *buf) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - sector_t size; - down_read(&rbd_dev->header_rwsem); - size = get_capacity(rbd_dev->disk); - up_read(&rbd_dev->header_rwsem); - - return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE); + return sprintf(buf, "%llu\n", + (unsigned long long)rbd_dev->mapping.size); } /* @@ -2435,7 +3280,7 @@ static ssize_t rbd_features_show(struct device *dev, struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); return sprintf(buf, "0x%016llx\n", - (unsigned long long) rbd_dev->mapping.features); + (unsigned long long)rbd_dev->mapping.features); } static ssize_t rbd_major_show(struct device *dev, @@ -2443,7 +3288,11 @@ static ssize_t rbd_major_show(struct device *dev, { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - return sprintf(buf, "%d\n", rbd_dev->major); + if (rbd_dev->major) + return sprintf(buf, "%d\n", rbd_dev->major); + + return sprintf(buf, "(none)\n"); + } static ssize_t rbd_client_id_show(struct device *dev, @@ -2469,7 +3318,7 @@ static ssize_t rbd_pool_id_show(struct device *dev, struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); return sprintf(buf, "%llu\n", - (unsigned long long) rbd_dev->spec->pool_id); + (unsigned long long) rbd_dev->spec->pool_id); } static ssize_t rbd_name_show(struct device *dev, @@ -2555,7 +3404,7 @@ static ssize_t rbd_image_refresh(struct device *dev, struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); int ret; - ret = rbd_dev_refresh(rbd_dev, NULL); + ret = rbd_dev_refresh(rbd_dev); return ret < 0 ? ret : size; } @@ -2606,71 +3455,6 @@ static struct device_type rbd_device_type = { .release = rbd_sysfs_dev_release, }; - -/* - sysfs - snapshots -*/ - -static ssize_t rbd_snap_size_show(struct device *dev, - struct device_attribute *attr, - char *buf) -{ - struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); - - return sprintf(buf, "%llu\n", (unsigned long long)snap->size); -} - -static ssize_t rbd_snap_id_show(struct device *dev, - struct device_attribute *attr, - char *buf) -{ - struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); - - return sprintf(buf, "%llu\n", (unsigned long long)snap->id); -} - -static ssize_t rbd_snap_features_show(struct device *dev, - struct device_attribute *attr, - char *buf) -{ - struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); - - return sprintf(buf, "0x%016llx\n", - (unsigned long long) snap->features); -} - -static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); -static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); -static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL); - -static struct attribute *rbd_snap_attrs[] = { - &dev_attr_snap_size.attr, - &dev_attr_snap_id.attr, - &dev_attr_snap_features.attr, - NULL, -}; - -static struct attribute_group rbd_snap_attr_group = { - .attrs = rbd_snap_attrs, -}; - -static void rbd_snap_dev_release(struct device *dev) -{ - struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); - kfree(snap->name); - kfree(snap); -} - -static const struct attribute_group *rbd_snap_attr_groups[] = { - &rbd_snap_attr_group, - NULL -}; - -static struct device_type rbd_snap_device_type = { - .groups = rbd_snap_attr_groups, - .release = rbd_snap_dev_release, -}; - static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) { kref_get(&spec->kref); @@ -2694,8 +3478,6 @@ static struct rbd_spec *rbd_spec_alloc(void) return NULL; kref_init(&spec->kref); - rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */ - return spec; } @@ -2722,7 +3504,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, spin_lock_init(&rbd_dev->lock); rbd_dev->flags = 0; INIT_LIST_HEAD(&rbd_dev->node); - INIT_LIST_HEAD(&rbd_dev->snaps); init_rwsem(&rbd_dev->header_rwsem); rbd_dev->spec = spec; @@ -2740,96 +3521,11 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, static void rbd_dev_destroy(struct rbd_device *rbd_dev) { - rbd_spec_put(rbd_dev->parent_spec); - kfree(rbd_dev->header_name); rbd_put_client(rbd_dev->rbd_client); rbd_spec_put(rbd_dev->spec); kfree(rbd_dev); } -static bool rbd_snap_registered(struct rbd_snap *snap) -{ - bool ret = snap->dev.type == &rbd_snap_device_type; - bool reg = device_is_registered(&snap->dev); - - rbd_assert(!ret ^ reg); - - return ret; -} - -static void rbd_remove_snap_dev(struct rbd_snap *snap) -{ - list_del(&snap->node); - if (device_is_registered(&snap->dev)) - device_unregister(&snap->dev); -} - -static int rbd_register_snap_dev(struct rbd_snap *snap, - struct device *parent) -{ - struct device *dev = &snap->dev; - int ret; - - dev->type = &rbd_snap_device_type; - dev->parent = parent; - dev->release = rbd_snap_dev_release; - dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name); - dout("%s: registering device for snapshot %s\n", __func__, snap->name); - - ret = device_register(dev); - - return ret; -} - -static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev, - const char *snap_name, - u64 snap_id, u64 snap_size, - u64 snap_features) -{ - struct rbd_snap *snap; - int ret; - - snap = kzalloc(sizeof (*snap), GFP_KERNEL); - if (!snap) - return ERR_PTR(-ENOMEM); - - ret = -ENOMEM; - snap->name = kstrdup(snap_name, GFP_KERNEL); - if (!snap->name) - goto err; - - snap->id = snap_id; - snap->size = snap_size; - snap->features = snap_features; - - return snap; - -err: - kfree(snap->name); - kfree(snap); - - return ERR_PTR(ret); -} - -static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which, - u64 *snap_size, u64 *snap_features) -{ - char *snap_name; - - rbd_assert(which < rbd_dev->header.snapc->num_snaps); - - *snap_size = rbd_dev->header.snap_sizes[which]; - *snap_features = 0; /* No features for v1 */ - - /* Skip over names until we find the one we are looking for */ - - snap_name = rbd_dev->header.snap_names; - while (which--) - snap_name += strlen(snap_name) + 1; - - return snap_name; -} - /* * Get the size and object order for an image snapshot, or if * snap_id is CEPH_NOSNAP, gets this information for the base @@ -2847,18 +3543,21 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, "rbd", "get_size", - (char *) &snapid, sizeof (snapid), - (char *) &size_buf, sizeof (size_buf), NULL); + &snapid, sizeof (snapid), + &size_buf, sizeof (size_buf)); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; + if (ret < sizeof (size_buf)) + return -ERANGE; - *order = size_buf.order; + if (order) + *order = size_buf.order; *snap_size = le64_to_cpu(size_buf.size); dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n", - (unsigned long long) snap_id, (unsigned int) *order, - (unsigned long long) *snap_size); + (unsigned long long)snap_id, (unsigned int)*order, + (unsigned long long)*snap_size); return 0; } @@ -2881,17 +3580,16 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) return -ENOMEM; ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, - "rbd", "get_object_prefix", - NULL, 0, - reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL); + "rbd", "get_object_prefix", NULL, 0, + reply_buf, RBD_OBJ_PREFIX_LEN_MAX); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; p = reply_buf; rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, - p + RBD_OBJ_PREFIX_LEN_MAX, - NULL, GFP_NOIO); + p + ret, NULL, GFP_NOIO); + ret = 0; if (IS_ERR(rbd_dev->header.object_prefix)) { ret = PTR_ERR(rbd_dev->header.object_prefix); @@ -2899,7 +3597,6 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) } else { dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); } - out: kfree(reply_buf); @@ -2913,29 +3610,30 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, struct { __le64 features; __le64 incompat; - } features_buf = { 0 }; + } __attribute__ ((packed)) features_buf = { 0 }; u64 incompat; int ret; ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, "rbd", "get_features", - (char *) &snapid, sizeof (snapid), - (char *) &features_buf, sizeof (features_buf), - NULL); + &snapid, sizeof (snapid), + &features_buf, sizeof (features_buf)); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; + if (ret < sizeof (features_buf)) + return -ERANGE; incompat = le64_to_cpu(features_buf.incompat); - if (incompat & ~RBD_FEATURES_ALL) + if (incompat & ~RBD_FEATURES_SUPPORTED) return -ENXIO; *snap_features = le64_to_cpu(features_buf.features); dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", - (unsigned long long) snap_id, - (unsigned long long) *snap_features, - (unsigned long long) le64_to_cpu(features_buf.incompat)); + (unsigned long long)snap_id, + (unsigned long long)*snap_features, + (unsigned long long)le64_to_cpu(features_buf.incompat)); return 0; } @@ -2975,15 +3673,15 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) snapid = cpu_to_le64(CEPH_NOSNAP); ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, "rbd", "get_parent", - (char *) &snapid, sizeof (snapid), - (char *) reply_buf, size, NULL); + &snapid, sizeof (snapid), + reply_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out_err; - ret = -ERANGE; p = reply_buf; - end = (char *) reply_buf + size; + end = reply_buf + ret; + ret = -ERANGE; ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err); if (parent_spec->pool_id == CEPH_NOPOOL) goto out; /* No parent? No problem. */ @@ -2991,8 +3689,11 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) /* The ceph file layout needs to fit pool id in 32 bits */ ret = -EIO; - if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX)) - goto out; + if (parent_spec->pool_id > (u64)U32_MAX) { + rbd_warn(NULL, "parent pool id too large (%llu > %u)\n", + (unsigned long long)parent_spec->pool_id, U32_MAX); + goto out_err; + } image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); if (IS_ERR(image_id)) { @@ -3015,6 +3716,56 @@ out_err: return ret; } +static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) +{ + struct { + __le64 stripe_unit; + __le64 stripe_count; + } __attribute__ ((packed)) striping_info_buf = { 0 }; + size_t size = sizeof (striping_info_buf); + void *p; + u64 obj_size; + u64 stripe_unit; + u64 stripe_count; + int ret; + + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + "rbd", "get_stripe_unit_count", NULL, 0, + (char *)&striping_info_buf, size); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); + if (ret < 0) + return ret; + if (ret < size) + return -ERANGE; + + /* + * We don't actually support the "fancy striping" feature + * (STRIPINGV2) yet, but if the striping sizes are the + * defaults the behavior is the same as before. So find + * out, and only fail if the image has non-default values. + */ + ret = -EINVAL; + obj_size = (u64)1 << rbd_dev->header.obj_order; + p = &striping_info_buf; + stripe_unit = ceph_decode_64(&p); + if (stripe_unit != obj_size) { + rbd_warn(rbd_dev, "unsupported stripe unit " + "(got %llu want %llu)", + stripe_unit, obj_size); + return -EINVAL; + } + stripe_count = ceph_decode_64(&p); + if (stripe_count != 1) { + rbd_warn(rbd_dev, "unsupported stripe count " + "(got %llu want 1)", stripe_count); + return -EINVAL; + } + rbd_dev->header.stripe_unit = stripe_unit; + rbd_dev->header.stripe_count = stripe_count; + + return 0; +} + static char *rbd_dev_image_name(struct rbd_device *rbd_dev) { size_t image_id_size; @@ -3036,8 +3787,8 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev) return NULL; p = image_id; - end = (char *) image_id + image_id_size; - ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len); + end = image_id + image_id_size; + ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len); size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; reply_buf = kmalloc(size, GFP_KERNEL); @@ -3047,11 +3798,12 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev) ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY, "rbd", "dir_get_name", image_id, image_id_size, - (char *) reply_buf, size, NULL); + reply_buf, size); if (ret < 0) goto out; p = reply_buf; - end = (char *) reply_buf + size; + end = reply_buf + ret; + image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); if (IS_ERR(image_name)) image_name = NULL; @@ -3064,69 +3816,134 @@ out: return image_name; } +static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) +{ + struct ceph_snap_context *snapc = rbd_dev->header.snapc; + const char *snap_name; + u32 which = 0; + + /* Skip over names until we find the one we are looking for */ + + snap_name = rbd_dev->header.snap_names; + while (which < snapc->num_snaps) { + if (!strcmp(name, snap_name)) + return snapc->snaps[which]; + snap_name += strlen(snap_name) + 1; + which++; + } + return CEPH_NOSNAP; +} + +static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) +{ + struct ceph_snap_context *snapc = rbd_dev->header.snapc; + u32 which; + bool found = false; + u64 snap_id; + + for (which = 0; !found && which < snapc->num_snaps; which++) { + const char *snap_name; + + snap_id = snapc->snaps[which]; + snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id); + if (IS_ERR(snap_name)) + break; + found = !strcmp(name, snap_name); + kfree(snap_name); + } + return found ? snap_id : CEPH_NOSNAP; +} + /* - * When a parent image gets probed, we only have the pool, image, - * and snapshot ids but not the names of any of them. This call - * is made later to fill in those names. It has to be done after - * rbd_dev_snaps_update() has completed because some of the - * information (in particular, snapshot name) is not available - * until then. + * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if + * no snapshot by that name is found, or if an error occurs. */ -static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev) +static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) { - struct ceph_osd_client *osdc; - const char *name; - void *reply_buf = NULL; + if (rbd_dev->image_format == 1) + return rbd_v1_snap_id_by_name(rbd_dev, name); + + return rbd_v2_snap_id_by_name(rbd_dev, name); +} + +/* + * When an rbd image has a parent image, it is identified by the + * pool, image, and snapshot ids (not names). This function fills + * in the names for those ids. (It's OK if we can't figure out the + * name for an image id, but the pool and snapshot ids should always + * exist and have names.) All names in an rbd spec are dynamically + * allocated. + * + * When an image being mapped (not a parent) is probed, we have the + * pool name and pool id, image name and image id, and the snapshot + * name. The only thing we're missing is the snapshot id. + */ +static int rbd_dev_spec_update(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_spec *spec = rbd_dev->spec; + const char *pool_name; + const char *image_name; + const char *snap_name; int ret; - if (rbd_dev->spec->pool_name) - return 0; /* Already have the names */ + /* + * An image being mapped will have the pool name (etc.), but + * we need to look up the snapshot id. + */ + if (spec->pool_name) { + if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) { + u64 snap_id; + + snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name); + if (snap_id == CEPH_NOSNAP) + return -ENOENT; + spec->snap_id = snap_id; + } else { + spec->snap_id = CEPH_NOSNAP; + } - /* Look up the pool name */ + return 0; + } - osdc = &rbd_dev->rbd_client->client->osdc; - name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id); - if (!name) { - rbd_warn(rbd_dev, "there is no pool with id %llu", - rbd_dev->spec->pool_id); /* Really a BUG() */ + /* Get the pool name; we have to make our own copy of this */ + + pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id); + if (!pool_name) { + rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id); return -EIO; } - - rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL); - if (!rbd_dev->spec->pool_name) + pool_name = kstrdup(pool_name, GFP_KERNEL); + if (!pool_name) return -ENOMEM; /* Fetch the image name; tolerate failure here */ - name = rbd_dev_image_name(rbd_dev); - if (name) - rbd_dev->spec->image_name = (char *) name; - else + image_name = rbd_dev_image_name(rbd_dev); + if (!image_name) rbd_warn(rbd_dev, "unable to get image name"); - /* Look up the snapshot name. */ + /* Look up the snapshot name, and make a copy */ - name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id); - if (!name) { - rbd_warn(rbd_dev, "no snapshot with id %llu", - rbd_dev->spec->snap_id); /* Really a BUG() */ - ret = -EIO; + snap_name = rbd_snap_name(rbd_dev, spec->snap_id); + if (!snap_name) { + ret = -ENOMEM; goto out_err; } - rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL); - if(!rbd_dev->spec->snap_name) - goto out_err; + + spec->pool_name = pool_name; + spec->image_name = image_name; + spec->snap_name = snap_name; return 0; out_err: - kfree(reply_buf); - kfree(rbd_dev->spec->pool_name); - rbd_dev->spec->pool_name = NULL; + kfree(image_name); + kfree(pool_name); return ret; } -static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) +static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) { size_t size; int ret; @@ -3151,16 +3968,15 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) return -ENOMEM; ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, - "rbd", "get_snapcontext", - NULL, 0, - reply_buf, size, ver); + "rbd", "get_snapcontext", NULL, 0, + reply_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; - ret = -ERANGE; p = reply_buf; - end = (char *) reply_buf + size; + end = reply_buf + ret; + ret = -ERANGE; ceph_decode_64_safe(&p, end, seq, out); ceph_decode_32_safe(&p, end, snap_count, out); @@ -3177,37 +3993,33 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) } if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) goto out; + ret = 0; - size = sizeof (struct ceph_snap_context) + - snap_count * sizeof (snapc->snaps[0]); - snapc = kmalloc(size, GFP_KERNEL); + snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); if (!snapc) { ret = -ENOMEM; goto out; } - - atomic_set(&snapc->nref, 1); snapc->seq = seq; - snapc->num_snaps = snap_count; for (i = 0; i < snap_count; i++) snapc->snaps[i] = ceph_decode_64(&p); rbd_dev->header.snapc = snapc; dout(" snap context seq = %llu, snap_count = %u\n", - (unsigned long long) seq, (unsigned int) snap_count); - + (unsigned long long)seq, (unsigned int)snap_count); out: kfree(reply_buf); - return 0; + return ret; } -static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which) +static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, + u64 snap_id) { size_t size; void *reply_buf; - __le64 snap_id; + __le64 snapid; int ret; void *p; void *end; @@ -3218,236 +4030,52 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which) if (!reply_buf) return ERR_PTR(-ENOMEM); - snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]); + snapid = cpu_to_le64(snap_id); ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, "rbd", "get_snapshot_name", - (char *) &snap_id, sizeof (snap_id), - reply_buf, size, NULL); + &snapid, sizeof (snapid), + reply_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); - if (ret < 0) + if (ret < 0) { + snap_name = ERR_PTR(ret); goto out; + } p = reply_buf; - end = (char *) reply_buf + size; + end = reply_buf + ret; snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); - if (IS_ERR(snap_name)) { - ret = PTR_ERR(snap_name); + if (IS_ERR(snap_name)) goto out; - } else { - dout(" snap_id 0x%016llx snap_name = %s\n", - (unsigned long long) le64_to_cpu(snap_id), snap_name); - } - kfree(reply_buf); - return snap_name; + dout(" snap_id 0x%016llx snap_name = %s\n", + (unsigned long long)snap_id, snap_name); out: kfree(reply_buf); - return ERR_PTR(ret); -} - -static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which, - u64 *snap_size, u64 *snap_features) -{ - u64 snap_id; - u8 order; - int ret; - - snap_id = rbd_dev->header.snapc->snaps[which]; - ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size); - if (ret) - return ERR_PTR(ret); - ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features); - if (ret) - return ERR_PTR(ret); - - return rbd_dev_v2_snap_name(rbd_dev, which); -} - -static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which, - u64 *snap_size, u64 *snap_features) -{ - if (rbd_dev->image_format == 1) - return rbd_dev_v1_snap_info(rbd_dev, which, - snap_size, snap_features); - if (rbd_dev->image_format == 2) - return rbd_dev_v2_snap_info(rbd_dev, which, - snap_size, snap_features); - return ERR_PTR(-EINVAL); + return snap_name; } -static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver) +static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev) { int ret; - __u8 obj_order; down_write(&rbd_dev->header_rwsem); - /* Grab old order first, to see if it changes */ - - obj_order = rbd_dev->header.obj_order, ret = rbd_dev_v2_image_size(rbd_dev); if (ret) goto out; - if (rbd_dev->header.obj_order != obj_order) { - ret = -EIO; - goto out; - } rbd_update_mapping_size(rbd_dev); - ret = rbd_dev_v2_snap_context(rbd_dev, hver); + ret = rbd_dev_v2_snap_context(rbd_dev); dout("rbd_dev_v2_snap_context returned %d\n", ret); if (ret) goto out; - ret = rbd_dev_snaps_update(rbd_dev); - dout("rbd_dev_snaps_update returned %d\n", ret); - if (ret) - goto out; - ret = rbd_dev_snaps_register(rbd_dev); - dout("rbd_dev_snaps_register returned %d\n", ret); out: up_write(&rbd_dev->header_rwsem); return ret; } -/* - * Scan the rbd device's current snapshot list and compare it to the - * newly-received snapshot context. Remove any existing snapshots - * not present in the new snapshot context. Add a new snapshot for - * any snaphots in the snapshot context not in the current list. - * And verify there are no changes to snapshots we already know - * about. - * - * Assumes the snapshots in the snapshot context are sorted by - * snapshot id, highest id first. (Snapshots in the rbd_dev's list - * are also maintained in that order.) - */ -static int rbd_dev_snaps_update(struct rbd_device *rbd_dev) -{ - struct ceph_snap_context *snapc = rbd_dev->header.snapc; - const u32 snap_count = snapc->num_snaps; - struct list_head *head = &rbd_dev->snaps; - struct list_head *links = head->next; - u32 index = 0; - - dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count); - while (index < snap_count || links != head) { - u64 snap_id; - struct rbd_snap *snap; - char *snap_name; - u64 snap_size = 0; - u64 snap_features = 0; - - snap_id = index < snap_count ? snapc->snaps[index] - : CEPH_NOSNAP; - snap = links != head ? list_entry(links, struct rbd_snap, node) - : NULL; - rbd_assert(!snap || snap->id != CEPH_NOSNAP); - - if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) { - struct list_head *next = links->next; - - /* - * A previously-existing snapshot is not in - * the new snap context. - * - * If the now missing snapshot is the one the - * image is mapped to, clear its exists flag - * so we can avoid sending any more requests - * to it. - */ - if (rbd_dev->spec->snap_id == snap->id) - clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); - rbd_remove_snap_dev(snap); - dout("%ssnap id %llu has been removed\n", - rbd_dev->spec->snap_id == snap->id ? - "mapped " : "", - (unsigned long long) snap->id); - - /* Done with this list entry; advance */ - - links = next; - continue; - } - - snap_name = rbd_dev_snap_info(rbd_dev, index, - &snap_size, &snap_features); - if (IS_ERR(snap_name)) - return PTR_ERR(snap_name); - - dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count, - (unsigned long long) snap_id); - if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) { - struct rbd_snap *new_snap; - - /* We haven't seen this snapshot before */ - - new_snap = __rbd_add_snap_dev(rbd_dev, snap_name, - snap_id, snap_size, snap_features); - if (IS_ERR(new_snap)) { - int err = PTR_ERR(new_snap); - - dout(" failed to add dev, error %d\n", err); - - return err; - } - - /* New goes before existing, or at end of list */ - - dout(" added dev%s\n", snap ? "" : " at end\n"); - if (snap) - list_add_tail(&new_snap->node, &snap->node); - else - list_add_tail(&new_snap->node, head); - } else { - /* Already have this one */ - - dout(" already present\n"); - - rbd_assert(snap->size == snap_size); - rbd_assert(!strcmp(snap->name, snap_name)); - rbd_assert(snap->features == snap_features); - - /* Done with this list entry; advance */ - - links = links->next; - } - - /* Advance to the next entry in the snapshot context */ - - index++; - } - dout("%s: done\n", __func__); - - return 0; -} - -/* - * Scan the list of snapshots and register the devices for any that - * have not already been registered. - */ -static int rbd_dev_snaps_register(struct rbd_device *rbd_dev) -{ - struct rbd_snap *snap; - int ret = 0; - - dout("%s:\n", __func__); - if (WARN_ON(!device_is_registered(&rbd_dev->dev))) - return -EIO; - - list_for_each_entry(snap, &rbd_dev->snaps, node) { - if (!rbd_snap_registered(snap)) { - ret = rbd_register_snap_dev(snap, &rbd_dev->dev); - if (ret < 0) - break; - } - } - dout("%s: returning %d\n", __func__, ret); - - return ret; -} - static int rbd_bus_add_dev(struct rbd_device *rbd_dev) { struct device *dev; @@ -3459,7 +4087,7 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev) dev->bus = &rbd_bus_type; dev->type = &rbd_device_type; dev->parent = &rbd_root_dev; - dev->release = rbd_dev_release; + dev->release = rbd_dev_device_release; dev_set_name(dev, "%d", rbd_dev->dev_id); ret = device_register(dev); @@ -3673,6 +4301,7 @@ static int rbd_add_parse_args(const char *buf, size_t len; char *options; const char *mon_addrs; + char *snap_name; size_t mon_addrs_size; struct rbd_spec *spec = NULL; struct rbd_options *rbd_opts = NULL; @@ -3731,10 +4360,11 @@ static int rbd_add_parse_args(const char *buf, ret = -ENAMETOOLONG; goto out_err; } - spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL); - if (!spec->snap_name) + snap_name = kmemdup(buf, len + 1, GFP_KERNEL); + if (!snap_name) goto out_mem; - *(spec->snap_name + len) = '\0'; + *(snap_name + len) = '\0'; + spec->snap_name = snap_name; /* Initialize all rbd options to the defaults */ @@ -3788,15 +4418,19 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) size_t size; char *object_name; void *response; - void *p; + char *image_id; /* * When probing a parent image, the image id is already * known (and the image name likely is not). There's no - * need to fetch the image id again in this case. + * need to fetch the image id again in this case. We + * do still need to set the image format though. */ - if (rbd_dev->spec->image_id) + if (rbd_dev->spec->image_id) { + rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1; + return 0; + } /* * First, see if the format 2 image id file exists, and if @@ -3818,23 +4452,32 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) goto out; } + /* If it doesn't exist we'll assume it's a format 1 image */ + ret = rbd_obj_method_sync(rbd_dev, object_name, - "rbd", "get_id", - NULL, 0, - response, RBD_IMAGE_ID_LEN_MAX, NULL); + "rbd", "get_id", NULL, 0, + response, RBD_IMAGE_ID_LEN_MAX); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); - if (ret < 0) - goto out; - - p = response; - rbd_dev->spec->image_id = ceph_extract_encoded_string(&p, - p + RBD_IMAGE_ID_LEN_MAX, + if (ret == -ENOENT) { + image_id = kstrdup("", GFP_KERNEL); + ret = image_id ? 0 : -ENOMEM; + if (!ret) + rbd_dev->image_format = 1; + } else if (ret > sizeof (__le32)) { + void *p = response; + + image_id = ceph_extract_encoded_string(&p, p + ret, NULL, GFP_NOIO); - if (IS_ERR(rbd_dev->spec->image_id)) { - ret = PTR_ERR(rbd_dev->spec->image_id); - rbd_dev->spec->image_id = NULL; + ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0; + if (!ret) + rbd_dev->image_format = 2; } else { - dout("image_id is %s\n", rbd_dev->spec->image_id); + ret = -EINVAL; + } + + if (!ret) { + rbd_dev->spec->image_id = image_id; + dout("image_id is %s\n", image_id); } out: kfree(response); @@ -3843,27 +4486,30 @@ out: return ret; } -static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) +/* Undo whatever state changes are made by v1 or v2 image probe */ + +static void rbd_dev_unprobe(struct rbd_device *rbd_dev) { - int ret; - size_t size; + struct rbd_image_header *header; - /* Version 1 images have no id; empty string is used */ + rbd_dev_remove_parent(rbd_dev); + rbd_spec_put(rbd_dev->parent_spec); + rbd_dev->parent_spec = NULL; + rbd_dev->parent_overlap = 0; - rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL); - if (!rbd_dev->spec->image_id) - return -ENOMEM; + /* Free dynamic fields from the header, then zero it out */ - /* Record the header object name for this rbd image. */ + header = &rbd_dev->header; + ceph_put_snap_context(header->snapc); + kfree(header->snap_sizes); + kfree(header->snap_names); + kfree(header->object_prefix); + memset(header, 0, sizeof (*header)); +} - size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX); - rbd_dev->header_name = kmalloc(size, GFP_KERNEL); - if (!rbd_dev->header_name) { - ret = -ENOMEM; - goto out_err; - } - sprintf(rbd_dev->header_name, "%s%s", - rbd_dev->spec->image_name, RBD_SUFFIX); +static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) +{ + int ret; /* Populate rbd image metadata */ @@ -3876,8 +4522,6 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) rbd_dev->parent_spec = NULL; rbd_dev->parent_overlap = 0; - rbd_dev->image_format = 1; - dout("discovered version 1 image, header name is %s\n", rbd_dev->header_name); @@ -3894,43 +4538,45 @@ out_err: static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) { - size_t size; int ret; - u64 ver = 0; - - /* - * Image id was filled in by the caller. Record the header - * object name for this rbd image. - */ - size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id); - rbd_dev->header_name = kmalloc(size, GFP_KERNEL); - if (!rbd_dev->header_name) - return -ENOMEM; - sprintf(rbd_dev->header_name, "%s%s", - RBD_HEADER_PREFIX, rbd_dev->spec->image_id); - - /* Get the size and object order for the image */ ret = rbd_dev_v2_image_size(rbd_dev); - if (ret < 0) + if (ret) goto out_err; /* Get the object prefix (a.k.a. block_name) for the image */ ret = rbd_dev_v2_object_prefix(rbd_dev); - if (ret < 0) + if (ret) goto out_err; /* Get the and check features for the image */ ret = rbd_dev_v2_features(rbd_dev); - if (ret < 0) + if (ret) goto out_err; /* If the image supports layering, get the parent info */ if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { ret = rbd_dev_v2_parent_info(rbd_dev); + if (ret) + goto out_err; + + /* + * Don't print a warning for parent images. We can + * tell this point because we won't know its pool + * name yet (just its pool id). + */ + if (rbd_dev->spec->pool_name) + rbd_warn(rbd_dev, "WARNING: kernel layering " + "is EXPERIMENTAL!"); + } + + /* If the image supports fancy striping, get its parameters */ + + if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) { + ret = rbd_dev_v2_striping_info(rbd_dev); if (ret < 0) goto out_err; } @@ -3942,12 +4588,9 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) /* Get the snapshot context, plus the header version */ - ret = rbd_dev_v2_snap_context(rbd_dev, &ver); + ret = rbd_dev_v2_snap_context(rbd_dev); if (ret) goto out_err; - rbd_dev->header.obj_version = ver; - - rbd_dev->image_format = 2; dout("discovered version 2 image, header name is %s\n", rbd_dev->header_name); @@ -3965,22 +4608,54 @@ out_err: return ret; } -static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) +static int rbd_dev_probe_parent(struct rbd_device *rbd_dev) { + struct rbd_device *parent = NULL; + struct rbd_spec *parent_spec; + struct rbd_client *rbdc; int ret; - /* no need to lock here, as rbd_dev is not registered yet */ - ret = rbd_dev_snaps_update(rbd_dev); - if (ret) - return ret; + if (!rbd_dev->parent_spec) + return 0; + /* + * We need to pass a reference to the client and the parent + * spec when creating the parent rbd_dev. Images related by + * parent/child relationships always share both. + */ + parent_spec = rbd_spec_get(rbd_dev->parent_spec); + rbdc = __rbd_get_client(rbd_dev->rbd_client); - ret = rbd_dev_probe_update_spec(rbd_dev); - if (ret) - goto err_out_snaps; + ret = -ENOMEM; + parent = rbd_dev_create(rbdc, parent_spec); + if (!parent) + goto out_err; + + ret = rbd_dev_image_probe(parent); + if (ret < 0) + goto out_err; + rbd_dev->parent = parent; + + return 0; +out_err: + if (parent) { + rbd_spec_put(rbd_dev->parent_spec); + kfree(rbd_dev->header_name); + rbd_dev_destroy(parent); + } else { + rbd_put_client(rbdc); + rbd_spec_put(parent_spec); + } + + return ret; +} + +static int rbd_dev_device_setup(struct rbd_device *rbd_dev) +{ + int ret; - ret = rbd_dev_set_mapping(rbd_dev); + ret = rbd_dev_mapping_set(rbd_dev); if (ret) - goto err_out_snaps; + return ret; /* generate unique id: find highest unique id, add one */ rbd_dev_id_get(rbd_dev); @@ -4007,54 +4682,81 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) if (ret) goto err_out_disk; - /* - * At this point cleanup in the event of an error is the job - * of the sysfs code (initiated by rbd_bus_del_dev()). - */ - down_write(&rbd_dev->header_rwsem); - ret = rbd_dev_snaps_register(rbd_dev); - up_write(&rbd_dev->header_rwsem); - if (ret) - goto err_out_bus; - - ret = rbd_dev_header_watch_sync(rbd_dev, 1); - if (ret) - goto err_out_bus; - /* Everything's ready. Announce the disk to the world. */ + set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); + set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); add_disk(rbd_dev->disk); pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, (unsigned long long) rbd_dev->mapping.size); return ret; -err_out_bus: - /* this will also clean up rest of rbd_dev stuff */ - rbd_bus_del_dev(rbd_dev); - - return ret; err_out_disk: rbd_free_disk(rbd_dev); err_out_blkdev: unregister_blkdev(rbd_dev->major, rbd_dev->name); err_out_id: rbd_dev_id_put(rbd_dev); -err_out_snaps: - rbd_remove_all_snaps(rbd_dev); + rbd_dev_mapping_clear(rbd_dev); return ret; } +static int rbd_dev_header_name(struct rbd_device *rbd_dev) +{ + struct rbd_spec *spec = rbd_dev->spec; + size_t size; + + /* Record the header object name for this rbd image. */ + + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + + if (rbd_dev->image_format == 1) + size = strlen(spec->image_name) + sizeof (RBD_SUFFIX); + else + size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id); + + rbd_dev->header_name = kmalloc(size, GFP_KERNEL); + if (!rbd_dev->header_name) + return -ENOMEM; + + if (rbd_dev->image_format == 1) + sprintf(rbd_dev->header_name, "%s%s", + spec->image_name, RBD_SUFFIX); + else + sprintf(rbd_dev->header_name, "%s%s", + RBD_HEADER_PREFIX, spec->image_id); + return 0; +} + +static void rbd_dev_image_release(struct rbd_device *rbd_dev) +{ + int ret; + + rbd_dev_unprobe(rbd_dev); + ret = rbd_dev_header_watch_sync(rbd_dev, 0); + if (ret) + rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret); + kfree(rbd_dev->header_name); + rbd_dev->header_name = NULL; + rbd_dev->image_format = 0; + kfree(rbd_dev->spec->image_id); + rbd_dev->spec->image_id = NULL; + + rbd_dev_destroy(rbd_dev); +} + /* * Probe for the existence of the header object for the given rbd * device. For format 2 images this includes determining the image * id. */ -static int rbd_dev_probe(struct rbd_device *rbd_dev) +static int rbd_dev_image_probe(struct rbd_device *rbd_dev) { int ret; + int tmp; /* * Get the id from the image id object. If it's not a @@ -4063,18 +4765,48 @@ static int rbd_dev_probe(struct rbd_device *rbd_dev) */ ret = rbd_dev_image_id(rbd_dev); if (ret) + return ret; + rbd_assert(rbd_dev->spec->image_id); + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + + ret = rbd_dev_header_name(rbd_dev); + if (ret) + goto err_out_format; + + ret = rbd_dev_header_watch_sync(rbd_dev, 1); + if (ret) + goto out_header_name; + + if (rbd_dev->image_format == 1) ret = rbd_dev_v1_probe(rbd_dev); else ret = rbd_dev_v2_probe(rbd_dev); - if (ret) { - dout("probe failed, returning %d\n", ret); - - return ret; - } + if (ret) + goto err_out_watch; - ret = rbd_dev_probe_finish(rbd_dev); + ret = rbd_dev_spec_update(rbd_dev); if (ret) - rbd_header_free(&rbd_dev->header); + goto err_out_probe; + + ret = rbd_dev_probe_parent(rbd_dev); + if (!ret) + return 0; + +err_out_probe: + rbd_dev_unprobe(rbd_dev); +err_out_watch: + tmp = rbd_dev_header_watch_sync(rbd_dev, 0); + if (tmp) + rbd_warn(rbd_dev, "unable to tear down watch request\n"); +out_header_name: + kfree(rbd_dev->header_name); + rbd_dev->header_name = NULL; +err_out_format: + rbd_dev->image_format = 0; + kfree(rbd_dev->spec->image_id); + rbd_dev->spec->image_id = NULL; + + dout("probe failed, returning %d\n", ret); return ret; } @@ -4111,11 +4843,13 @@ static ssize_t rbd_add(struct bus_type *bus, rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name); if (rc < 0) goto err_out_client; - spec->pool_id = (u64) rc; + spec->pool_id = (u64)rc; /* The ceph file layout needs to fit pool id in 32 bits */ - if (WARN_ON(spec->pool_id > (u64) U32_MAX)) { + if (spec->pool_id > (u64)U32_MAX) { + rbd_warn(NULL, "pool id too large (%llu > %u)\n", + (unsigned long long)spec->pool_id, U32_MAX); rc = -EIO; goto err_out_client; } @@ -4130,11 +4864,15 @@ static ssize_t rbd_add(struct bus_type *bus, kfree(rbd_opts); rbd_opts = NULL; /* done with this */ - rc = rbd_dev_probe(rbd_dev); + rc = rbd_dev_image_probe(rbd_dev); if (rc < 0) goto err_out_rbd_dev; - return count; + rc = rbd_dev_device_setup(rbd_dev); + if (!rc) + return count; + + rbd_dev_image_release(rbd_dev); err_out_rbd_dev: rbd_dev_destroy(rbd_dev); err_out_client: @@ -4149,7 +4887,7 @@ err_out_module: dout("Error adding device %s\n", buf); - return (ssize_t) rc; + return (ssize_t)rc; } static struct rbd_device *__rbd_get_dev(unsigned long dev_id) @@ -4169,27 +4907,43 @@ static struct rbd_device *__rbd_get_dev(unsigned long dev_id) return NULL; } -static void rbd_dev_release(struct device *dev) +static void rbd_dev_device_release(struct device *dev) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - if (rbd_dev->watch_event) - rbd_dev_header_watch_sync(rbd_dev, 0); - - /* clean up and free blkdev */ rbd_free_disk(rbd_dev); + clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); + rbd_dev_clear_mapping(rbd_dev); unregister_blkdev(rbd_dev->major, rbd_dev->name); - - /* release allocated disk header fields */ - rbd_header_free(&rbd_dev->header); - - /* done with the id, and with the rbd_dev */ + rbd_dev->major = 0; rbd_dev_id_put(rbd_dev); - rbd_assert(rbd_dev->rbd_client != NULL); - rbd_dev_destroy(rbd_dev); + rbd_dev_mapping_clear(rbd_dev); +} - /* release module ref */ - module_put(THIS_MODULE); +static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) +{ + while (rbd_dev->parent) { + struct rbd_device *first = rbd_dev; + struct rbd_device *second = first->parent; + struct rbd_device *third; + + /* + * Follow to the parent with no grandparent and + * remove it. + */ + while (second && (third = second->parent)) { + first = second; + second = third; + } + rbd_assert(second); + rbd_dev_image_release(second); + first->parent = NULL; + first->parent_overlap = 0; + + rbd_assert(first->parent_spec); + rbd_spec_put(first->parent_spec); + first->parent_spec = NULL; + } } static ssize_t rbd_remove(struct bus_type *bus, @@ -4197,13 +4951,13 @@ static ssize_t rbd_remove(struct bus_type *bus, size_t count) { struct rbd_device *rbd_dev = NULL; - int target_id, rc; + int target_id; unsigned long ul; - int ret = count; + int ret; - rc = strict_strtoul(buf, 10, &ul); - if (rc) - return rc; + ret = strict_strtoul(buf, 10, &ul); + if (ret) + return ret; /* convert to int; abort if we lost anything in the conversion */ target_id = (int) ul; @@ -4226,10 +4980,10 @@ static ssize_t rbd_remove(struct bus_type *bus, spin_unlock_irq(&rbd_dev->lock); if (ret < 0) goto done; - - rbd_remove_all_snaps(rbd_dev); + ret = count; rbd_bus_del_dev(rbd_dev); - + rbd_dev_image_release(rbd_dev); + module_put(THIS_MODULE); done: mutex_unlock(&ctl_mutex); @@ -4261,6 +5015,56 @@ static void rbd_sysfs_cleanup(void) device_unregister(&rbd_root_dev); } +static int rbd_slab_init(void) +{ + rbd_assert(!rbd_img_request_cache); + rbd_img_request_cache = kmem_cache_create("rbd_img_request", + sizeof (struct rbd_img_request), + __alignof__(struct rbd_img_request), + 0, NULL); + if (!rbd_img_request_cache) + return -ENOMEM; + + rbd_assert(!rbd_obj_request_cache); + rbd_obj_request_cache = kmem_cache_create("rbd_obj_request", + sizeof (struct rbd_obj_request), + __alignof__(struct rbd_obj_request), + 0, NULL); + if (!rbd_obj_request_cache) + goto out_err; + + rbd_assert(!rbd_segment_name_cache); + rbd_segment_name_cache = kmem_cache_create("rbd_segment_name", + MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL); + if (rbd_segment_name_cache) + return 0; +out_err: + if (rbd_obj_request_cache) { + kmem_cache_destroy(rbd_obj_request_cache); + rbd_obj_request_cache = NULL; + } + + kmem_cache_destroy(rbd_img_request_cache); + rbd_img_request_cache = NULL; + + return -ENOMEM; +} + +static void rbd_slab_exit(void) +{ + rbd_assert(rbd_segment_name_cache); + kmem_cache_destroy(rbd_segment_name_cache); + rbd_segment_name_cache = NULL; + + rbd_assert(rbd_obj_request_cache); + kmem_cache_destroy(rbd_obj_request_cache); + rbd_obj_request_cache = NULL; + + rbd_assert(rbd_img_request_cache); + kmem_cache_destroy(rbd_img_request_cache); + rbd_img_request_cache = NULL; +} + static int __init rbd_init(void) { int rc; @@ -4270,16 +5074,22 @@ static int __init rbd_init(void) return -EINVAL; } - rc = rbd_sysfs_init(); + rc = rbd_slab_init(); if (rc) return rc; - pr_info("loaded " RBD_DRV_NAME_LONG "\n"); - return 0; + rc = rbd_sysfs_init(); + if (rc) + rbd_slab_exit(); + else + pr_info("loaded " RBD_DRV_NAME_LONG "\n"); + + return rc; } static void __exit rbd_exit(void) { rbd_sysfs_cleanup(); + rbd_slab_exit(); } module_init(rbd_init); |