diff options
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r-- | drivers/block/rbd.c | 190 |
1 files changed, 102 insertions, 88 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 3063452e55da..4ad2ad9a5bb0 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -372,7 +372,7 @@ enum rbd_dev_flags { RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ }; -static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ +static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ static LIST_HEAD(rbd_dev_list); /* devices */ static DEFINE_SPINLOCK(rbd_dev_list_lock); @@ -489,10 +489,8 @@ static int rbd_open(struct block_device *bdev, fmode_t mode) if (removing) return -ENOENT; - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); (void) get_device(&rbd_dev->dev); set_device_ro(bdev, rbd_dev->mapping.read_only); - mutex_unlock(&ctl_mutex); return 0; } @@ -507,9 +505,7 @@ static void rbd_release(struct gendisk *disk, fmode_t mode) spin_unlock_irq(&rbd_dev->lock); rbd_assert(open_count_before > 0); - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); put_device(&rbd_dev->dev); - mutex_unlock(&ctl_mutex); } static const struct block_device_operations rbd_bd_ops = { @@ -520,7 +516,7 @@ static const struct block_device_operations rbd_bd_ops = { /* * Initialize an rbd client instance. Success or not, this function - * consumes ceph_opts. + * consumes ceph_opts. Caller holds client_mutex. */ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) { @@ -535,30 +531,25 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) kref_init(&rbdc->kref); INIT_LIST_HEAD(&rbdc->node); - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0); if (IS_ERR(rbdc->client)) - goto out_mutex; + goto out_rbdc; ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ ret = ceph_open_session(rbdc->client); if (ret < 0) - goto out_err; + goto out_client; spin_lock(&rbd_client_list_lock); list_add_tail(&rbdc->node, &rbd_client_list); spin_unlock(&rbd_client_list_lock); - mutex_unlock(&ctl_mutex); dout("%s: rbdc %p\n", __func__, rbdc); return rbdc; - -out_err: +out_client: ceph_destroy_client(rbdc->client); -out_mutex: - mutex_unlock(&ctl_mutex); +out_rbdc: kfree(rbdc); out_opt: if (ceph_opts) @@ -682,11 +673,13 @@ static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) { struct rbd_client *rbdc; + mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING); rbdc = rbd_client_find(ceph_opts); if (rbdc) /* using an existing client */ ceph_destroy_options(ceph_opts); else rbdc = rbd_client_create(ceph_opts); + mutex_unlock(&client_mutex); return rbdc; } @@ -840,7 +833,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, /* We won't fail any more, fill in the header */ - down_write(&rbd_dev->header_rwsem); if (first_time) { header->object_prefix = object_prefix; header->obj_order = ondisk->options.order; @@ -869,8 +861,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, if (rbd_dev->mapping.size != header->image_size) rbd_dev->mapping.size = header->image_size; - up_write(&rbd_dev->header_rwsem); - return 0; out_2big: ret = -EIO; @@ -1036,12 +1026,16 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) char *name; u64 segment; int ret; + char *name_format; name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO); if (!name) return NULL; segment = offset >> rbd_dev->header.obj_order; - ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx", + name_format = "%s.%012llx"; + if (rbd_dev->image_format == 2) + name_format = "%s.%016llx"; + ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format, rbd_dev->header.object_prefix, segment); if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) { pr_err("error formatting segment name for #%llu (%d)\n", @@ -1122,6 +1116,7 @@ static void zero_bio_chain(struct bio *chain, int start_ofs) buf = bvec_kmap_irq(bv, &flags); memset(buf + remainder, 0, bv->bv_len - remainder); + flush_dcache_page(bv->bv_page); bvec_kunmap_irq(buf, &flags); } pos += bv->bv_len; @@ -1149,11 +1144,12 @@ static void zero_pages(struct page **pages, u64 offset, u64 end) unsigned long flags; void *kaddr; - page_offset = (size_t)(offset & ~PAGE_MASK); - length = min(PAGE_SIZE - page_offset, (size_t)(end - offset)); + page_offset = offset & ~PAGE_MASK; + length = min_t(size_t, PAGE_SIZE - page_offset, end - offset); local_irq_save(flags); kaddr = kmap_atomic(*page); memset(kaddr + page_offset, 0, length); + flush_dcache_page(*page); kunmap_atomic(kaddr); local_irq_restore(flags); @@ -2167,9 +2163,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, struct rbd_obj_request *obj_request = NULL; struct rbd_obj_request *next_obj_request; bool write_request = img_request_write_test(img_request); - struct bio *bio_list; + struct bio *bio_list = 0; unsigned int bio_offset = 0; - struct page **pages; + struct page **pages = 0; u64 img_offset; u64 resid; u16 opcode; @@ -2248,13 +2244,17 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, obj_request->pages, length, offset & ~PAGE_MASK, false, false); + /* + * set obj_request->img_request before formatting + * the osd_request so that it gets the right snapc + */ + rbd_img_obj_request_add(img_request, obj_request); if (write_request) rbd_osd_req_format_write(obj_request); else rbd_osd_req_format_read(obj_request); obj_request->img_offset = img_offset; - rbd_img_obj_request_add(img_request, obj_request); img_offset += length; resid -= length; @@ -2527,6 +2527,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) */ orig_request = obj_request->obj_request; obj_request->obj_request = NULL; + rbd_obj_request_put(orig_request); rbd_assert(orig_request); rbd_assert(orig_request->img_request); @@ -2547,7 +2548,6 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) if (!rbd_dev->parent_overlap) { struct ceph_osd_client *osdc; - rbd_obj_request_put(orig_request); osdc = &rbd_dev->rbd_client->client->osdc; result = rbd_obj_request_submit(osdc, orig_request); if (!result) @@ -2577,7 +2577,6 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) out: if (orig_request->result) rbd_obj_request_complete(orig_request); - rbd_obj_request_put(orig_request); } static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) @@ -2851,7 +2850,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) (unsigned int)opcode); ret = rbd_dev_refresh(rbd_dev); if (ret) - rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret); + rbd_warn(rbd_dev, "header refresh error (%d)\n", ret); rbd_obj_notify_ack(rbd_dev, notify_id); } @@ -3331,8 +3330,8 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev) int ret; rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + down_write(&rbd_dev->header_rwsem); mapping_size = rbd_dev->mapping.size; - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); if (rbd_dev->image_format == 1) ret = rbd_dev_v1_header_info(rbd_dev); else @@ -3341,7 +3340,8 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev) /* If it's a mapped snapshot, validate its EXISTS flag */ rbd_exists_validate(rbd_dev); - mutex_unlock(&ctl_mutex); + up_write(&rbd_dev->header_rwsem); + if (mapping_size != rbd_dev->mapping.size) { sector_t size; @@ -3805,6 +3805,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) void *end; u64 pool_id; char *image_id; + u64 snap_id; u64 overlap; int ret; @@ -3864,24 +3865,56 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) (unsigned long long)pool_id, U32_MAX); goto out_err; } - parent_spec->pool_id = pool_id; image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); if (IS_ERR(image_id)) { ret = PTR_ERR(image_id); goto out_err; } - parent_spec->image_id = image_id; - ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err); + ceph_decode_64_safe(&p, end, snap_id, out_err); ceph_decode_64_safe(&p, end, overlap, out_err); - if (overlap) { - rbd_spec_put(rbd_dev->parent_spec); + /* + * The parent won't change (except when the clone is + * flattened, already handled that). So we only need to + * record the parent spec we have not already done so. + */ + if (!rbd_dev->parent_spec) { + parent_spec->pool_id = pool_id; + parent_spec->image_id = image_id; + parent_spec->snap_id = snap_id; rbd_dev->parent_spec = parent_spec; parent_spec = NULL; /* rbd_dev now owns this */ - rbd_dev->parent_overlap = overlap; - } else { - rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n"); + } + + /* + * We always update the parent overlap. If it's zero we + * treat it specially. + */ + rbd_dev->parent_overlap = overlap; + smp_mb(); + if (!overlap) { + + /* A null parent_spec indicates it's the initial probe */ + + if (parent_spec) { + /* + * The overlap has become zero, so the clone + * must have been resized down to 0 at some + * point. Treat this the same as a flatten. + */ + rbd_dev_parent_put(rbd_dev); + pr_info("%s: clone image now standalone\n", + rbd_dev->disk->disk_name); + } else { + /* + * For the initial probe, if we find the + * overlap is zero we just pretend there was + * no parent image. + */ + rbd_warn(rbd_dev, "ignoring parent of " + "clone with overlap 0\n"); + } } out: ret = 0; @@ -4237,12 +4270,14 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) bool first_time = rbd_dev->header.object_prefix == NULL; int ret; - down_write(&rbd_dev->header_rwsem); + ret = rbd_dev_v2_image_size(rbd_dev); + if (ret) + return ret; if (first_time) { ret = rbd_dev_v2_header_onetime(rbd_dev); if (ret) - goto out; + return ret; } /* @@ -4257,7 +4292,7 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) ret = rbd_dev_v2_parent_info(rbd_dev); if (ret) - goto out; + return ret; /* * Print a warning if this is the initial probe and @@ -4272,18 +4307,12 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) "is EXPERIMENTAL!"); } - ret = rbd_dev_v2_image_size(rbd_dev); - if (ret) - goto out; - if (rbd_dev->spec->snap_id == CEPH_NOSNAP) if (rbd_dev->mapping.size != rbd_dev->header.image_size) rbd_dev->mapping.size = rbd_dev->header.image_size; ret = rbd_dev_v2_snap_context(rbd_dev); dout("rbd_dev_v2_snap_context returned %d\n", ret); -out: - up_write(&rbd_dev->header_rwsem); return ret; } @@ -4293,8 +4322,6 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev) struct device *dev; int ret; - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - dev = &rbd_dev->dev; dev->bus = &rbd_bus_type; dev->type = &rbd_device_type; @@ -4303,8 +4330,6 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev) dev_set_name(dev, "%d", rbd_dev->dev_id); ret = device_register(dev); - mutex_unlock(&ctl_mutex); - return ret; } @@ -5051,23 +5076,6 @@ err_out_module: return (ssize_t)rc; } -static struct rbd_device *__rbd_get_dev(unsigned long dev_id) -{ - struct list_head *tmp; - struct rbd_device *rbd_dev; - - spin_lock(&rbd_dev_list_lock); - list_for_each(tmp, &rbd_dev_list) { - rbd_dev = list_entry(tmp, struct rbd_device, node); - if (rbd_dev->dev_id == dev_id) { - spin_unlock(&rbd_dev_list_lock); - return rbd_dev; - } - } - spin_unlock(&rbd_dev_list_lock); - return NULL; -} - static void rbd_dev_device_release(struct device *dev) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); @@ -5112,8 +5120,10 @@ static ssize_t rbd_remove(struct bus_type *bus, size_t count) { struct rbd_device *rbd_dev = NULL; - int target_id; + struct list_head *tmp; + int dev_id; unsigned long ul; + bool already = false; int ret; ret = strict_strtoul(buf, 10, &ul); @@ -5121,37 +5131,40 @@ static ssize_t rbd_remove(struct bus_type *bus, return ret; /* convert to int; abort if we lost anything in the conversion */ - target_id = (int) ul; - if (target_id != ul) + dev_id = (int)ul; + if (dev_id != ul) return -EINVAL; - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - - rbd_dev = __rbd_get_dev(target_id); - if (!rbd_dev) { - ret = -ENOENT; - goto done; + ret = -ENOENT; + spin_lock(&rbd_dev_list_lock); + list_for_each(tmp, &rbd_dev_list) { + rbd_dev = list_entry(tmp, struct rbd_device, node); + if (rbd_dev->dev_id == dev_id) { + ret = 0; + break; + } + } + if (!ret) { + spin_lock_irq(&rbd_dev->lock); + if (rbd_dev->open_count) + ret = -EBUSY; + else + already = test_and_set_bit(RBD_DEV_FLAG_REMOVING, + &rbd_dev->flags); + spin_unlock_irq(&rbd_dev->lock); } + spin_unlock(&rbd_dev_list_lock); + if (ret < 0 || already) + return ret; - spin_lock_irq(&rbd_dev->lock); - if (rbd_dev->open_count) - ret = -EBUSY; - else - set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags); - spin_unlock_irq(&rbd_dev->lock); - if (ret < 0) - goto done; rbd_bus_del_dev(rbd_dev); ret = rbd_dev_header_watch_sync(rbd_dev, false); if (ret) rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret); rbd_dev_image_release(rbd_dev); module_put(THIS_MODULE); - ret = count; -done: - mutex_unlock(&ctl_mutex); - return ret; + return count; } /* @@ -5259,6 +5272,7 @@ static void __exit rbd_exit(void) module_init(rbd_init); module_exit(rbd_exit); +MODULE_AUTHOR("Alex Elder <elder@inktank.com>"); MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); MODULE_DESCRIPTION("rados block device"); |