diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2014-06-13 10:06:23 +0400 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2014-06-13 10:06:23 +0400 |
commit | 6d87c225f5d82d29243dc124f1ffcbb0e14ec358 (patch) | |
tree | 7d72e2e6a77ec0911e86911d2ddae62c1b4161cf /drivers | |
parent | 338c09a94b14c449dd53227e9bea44816668c6a5 (diff) | |
parent | 22001f619f29ddf66582d834223dcff4c0b74595 (diff) | |
download | linux-6d87c225f5d82d29243dc124f1ffcbb0e14ec358.tar.xz |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil:
"This has a mix of bug fixes and cleanups.
Alex's patch fixes a rare race in RBD. Ilya's patches fix an ENOENT
check when a second rbd image is mapped and a couple memory leaks.
Zheng fixes several issues with fragmented directories and multiple
MDSs. Josh fixes a spin/sleep issue, and Josh and Guangliang's
patches fix setting and unsetting RBD images read-only.
Naturally there are several other cleanups mixed in for good measure"
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (23 commits)
rbd: only set disk to read-only once
rbd: move calls that may sleep out of spin lock range
rbd: add ioctl for rbd
ceph: use truncate_pagecache() instead of truncate_inode_pages()
ceph: include time stamp in every MDS request
rbd: fix ida/idr memory leak
rbd: use reference counts for image requests
rbd: fix osd_request memory leak in __rbd_dev_header_watch_sync()
rbd: make sure we have latest osdmap on 'rbd map'
libceph: add ceph_monc_wait_osdmap()
libceph: mon_get_version request infrastructure
libceph: recognize poolop requests in debugfs
ceph: refactor readpage_nounlock() to make the logic clearer
mds: check cap ID when handling cap export message
ceph: remember subtree root dirfrag's auth MDS
ceph: introduce ceph_fill_fragtree()
ceph: handle cap import atomically
ceph: pre-allocate ceph_cap struct for ceph_add_cap()
ceph: update inode fields according to issued caps
rbd: replace IS_ERR and PTR_ERR with PTR_ERR_OR_ZERO
...
Diffstat (limited to 'drivers')
-rw-r--r-- | drivers/block/rbd.c | 242 |
1 files changed, 197 insertions, 45 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 4c95b503b09e..bbeb404b3a07 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -541,7 +541,6 @@ static int rbd_open(struct block_device *bdev, fmode_t mode) return -ENOENT; (void) get_device(&rbd_dev->dev); - set_device_ro(bdev, rbd_dev->mapping.read_only); return 0; } @@ -559,10 +558,76 @@ static void rbd_release(struct gendisk *disk, fmode_t mode) put_device(&rbd_dev->dev); } +static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg) +{ + int ret = 0; + int val; + bool ro; + bool ro_changed = false; + + /* get_user() may sleep, so call it before taking rbd_dev->lock */ + if (get_user(val, (int __user *)(arg))) + return -EFAULT; + + ro = val ? true : false; + /* Snapshot doesn't allow to write*/ + if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro) + return -EROFS; + + spin_lock_irq(&rbd_dev->lock); + /* prevent others open this device */ + if (rbd_dev->open_count > 1) { + ret = -EBUSY; + goto out; + } + + if (rbd_dev->mapping.read_only != ro) { + rbd_dev->mapping.read_only = ro; + ro_changed = true; + } + +out: + spin_unlock_irq(&rbd_dev->lock); + /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */ + if (ret == 0 && ro_changed) + set_disk_ro(rbd_dev->disk, ro ? 1 : 0); + + return ret; +} + +static int rbd_ioctl(struct block_device *bdev, fmode_t mode, + unsigned int cmd, unsigned long arg) +{ + struct rbd_device *rbd_dev = bdev->bd_disk->private_data; + int ret = 0; + + switch (cmd) { + case BLKROSET: + ret = rbd_ioctl_set_ro(rbd_dev, arg); + break; + default: + ret = -ENOTTY; + } + + return ret; +} + +#ifdef CONFIG_COMPAT +static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode, + unsigned int cmd, unsigned long arg) +{ + return rbd_ioctl(bdev, mode, cmd, arg); +} +#endif /* CONFIG_COMPAT */ + static const struct block_device_operations rbd_bd_ops = { .owner = THIS_MODULE, .open = rbd_open, .release = rbd_release, + .ioctl = rbd_ioctl, +#ifdef CONFIG_COMPAT + .compat_ioctl = rbd_compat_ioctl, +#endif }; /* @@ -1382,6 +1447,13 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request) kref_put(&obj_request->kref, rbd_obj_request_destroy); } +static void rbd_img_request_get(struct rbd_img_request *img_request) +{ + dout("%s: img %p (was %d)\n", __func__, img_request, + atomic_read(&img_request->kref.refcount)); + kref_get(&img_request->kref); +} + static bool img_request_child_test(struct rbd_img_request *img_request); static void rbd_parent_request_destroy(struct kref *kref); static void rbd_img_request_destroy(struct kref *kref); @@ -2142,6 +2214,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) img_request->next_completion = which; out: spin_unlock_irq(&img_request->completion_lock); + rbd_img_request_put(img_request); if (!more) rbd_img_request_complete(img_request); @@ -2242,6 +2315,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, goto out_unwind; obj_request->osd_req = osd_req; obj_request->callback = rbd_img_obj_callback; + rbd_img_request_get(img_request); if (write_request) { osd_req_op_alloc_hint_init(osd_req, which, @@ -2872,56 +2946,55 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) } /* - * Request sync osd watch/unwatch. The value of "start" determines - * whether a watch request is being initiated or torn down. + * Initiate a watch request, synchronously. */ -static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) +static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; int ret; - rbd_assert(start ^ !!rbd_dev->watch_event); - rbd_assert(start ^ !!rbd_dev->watch_request); + rbd_assert(!rbd_dev->watch_event); + rbd_assert(!rbd_dev->watch_request); - if (start) { - ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, - &rbd_dev->watch_event); - if (ret < 0) - return ret; - rbd_assert(rbd_dev->watch_event != NULL); - } + ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, + &rbd_dev->watch_event); + if (ret < 0) + return ret; + + rbd_assert(rbd_dev->watch_event); - ret = -ENOMEM; obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, - OBJ_REQUEST_NODATA); - if (!obj_request) + OBJ_REQUEST_NODATA); + if (!obj_request) { + ret = -ENOMEM; goto out_cancel; + } obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, obj_request); - if (!obj_request->osd_req) - goto out_cancel; + if (!obj_request->osd_req) { + ret = -ENOMEM; + goto out_put; + } - if (start) - ceph_osdc_set_request_linger(osdc, obj_request->osd_req); - else - ceph_osdc_unregister_linger_request(osdc, - rbd_dev->watch_request->osd_req); + ceph_osdc_set_request_linger(osdc, obj_request->osd_req); osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, - rbd_dev->watch_event->cookie, 0, start ? 1 : 0); + rbd_dev->watch_event->cookie, 0, 1); rbd_osd_req_format_write(obj_request); ret = rbd_obj_request_submit(osdc, obj_request); if (ret) - goto out_cancel; + goto out_linger; + ret = rbd_obj_request_wait(obj_request); if (ret) - goto out_cancel; + goto out_linger; + ret = obj_request->result; if (ret) - goto out_cancel; + goto out_linger; /* * A watch request is set to linger, so the underlying osd @@ -2931,36 +3004,84 @@ static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) * it. We'll drop that reference (below) after we've * unregistered it. */ - if (start) { - rbd_dev->watch_request = obj_request; + rbd_dev->watch_request = obj_request; - return 0; + return 0; + +out_linger: + ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req); +out_put: + rbd_obj_request_put(obj_request); +out_cancel: + ceph_osdc_cancel_event(rbd_dev->watch_event); + rbd_dev->watch_event = NULL; + + return ret; +} + +/* + * Tear down a watch request, synchronously. + */ +static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_obj_request *obj_request; + int ret; + + rbd_assert(rbd_dev->watch_event); + rbd_assert(rbd_dev->watch_request); + + obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, + OBJ_REQUEST_NODATA); + if (!obj_request) { + ret = -ENOMEM; + goto out_cancel; + } + + obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, + obj_request); + if (!obj_request->osd_req) { + ret = -ENOMEM; + goto out_put; } + osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, + rbd_dev->watch_event->cookie, 0, 0); + rbd_osd_req_format_write(obj_request); + + ret = rbd_obj_request_submit(osdc, obj_request); + if (ret) + goto out_put; + + ret = rbd_obj_request_wait(obj_request); + if (ret) + goto out_put; + + ret = obj_request->result; + if (ret) + goto out_put; + /* We have successfully torn down the watch request */ + ceph_osdc_unregister_linger_request(osdc, + rbd_dev->watch_request->osd_req); rbd_obj_request_put(rbd_dev->watch_request); rbd_dev->watch_request = NULL; + +out_put: + rbd_obj_request_put(obj_request); out_cancel: - /* Cancel the event if we're tearing down, or on error */ ceph_osdc_cancel_event(rbd_dev->watch_event); rbd_dev->watch_event = NULL; - if (obj_request) - rbd_obj_request_put(obj_request); return ret; } -static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) -{ - return __rbd_dev_header_watch_sync(rbd_dev, true); -} - static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) { int ret; - ret = __rbd_dev_header_watch_sync(rbd_dev, false); + ret = __rbd_dev_header_unwatch_sync(rbd_dev); if (ret) { rbd_warn(rbd_dev, "unable to tear down watch request: %d\n", ret); @@ -3058,7 +3179,6 @@ static void rbd_request_fn(struct request_queue *q) __releases(q->queue_lock) __acquires(q->queue_lock) { struct rbd_device *rbd_dev = q->queuedata; - bool read_only = rbd_dev->mapping.read_only; struct request *rq; int result; @@ -3094,7 +3214,7 @@ static void rbd_request_fn(struct request_queue *q) if (write_request) { result = -EROFS; - if (read_only) + if (rbd_dev->mapping.read_only) goto end_request; rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); } @@ -4683,6 +4803,38 @@ out_err: } /* + * Return pool id (>= 0) or a negative error code. + */ +static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name) +{ + u64 newest_epoch; + unsigned long timeout = rbdc->client->options->mount_timeout * HZ; + int tries = 0; + int ret; + +again: + ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name); + if (ret == -ENOENT && tries++ < 1) { + ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap", + &newest_epoch); + if (ret < 0) + return ret; + + if (rbdc->client->osdc.osdmap->epoch < newest_epoch) { + ceph_monc_request_next_osdmap(&rbdc->client->monc); + (void) ceph_monc_wait_osdmap(&rbdc->client->monc, + newest_epoch, timeout); + goto again; + } else { + /* the osdmap we have is new enough */ + return -ENOENT; + } + } + + return ret; +} + +/* * An rbd format 2 image has a unique identifier, distinct from the * name given to it by the user. Internally, that identifier is * what's used to specify the names of objects related to the image. @@ -4752,7 +4904,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) image_id = ceph_extract_encoded_string(&p, p + ret, NULL, GFP_NOIO); - ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0; + ret = PTR_ERR_OR_ZERO(image_id); if (!ret) rbd_dev->image_format = 2; } else { @@ -4907,6 +5059,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) if (ret) goto err_out_disk; set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); + set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); ret = rbd_bus_add_dev(rbd_dev); if (ret) @@ -5053,7 +5206,6 @@ static ssize_t do_rbd_add(struct bus_type *bus, struct rbd_options *rbd_opts = NULL; struct rbd_spec *spec = NULL; struct rbd_client *rbdc; - struct ceph_osd_client *osdc; bool read_only; int rc = -ENOMEM; @@ -5075,8 +5227,7 @@ static ssize_t do_rbd_add(struct bus_type *bus, } /* pick the pool */ - osdc = &rbdc->client->osdc; - rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name); + rc = rbd_add_get_pool_id(rbdc, spec->pool_name); if (rc < 0) goto err_out_client; spec->pool_id = (u64)rc; @@ -5387,6 +5538,7 @@ err_out_slab: static void __exit rbd_exit(void) { + ida_destroy(&rbd_dev_id_ida); rbd_sysfs_cleanup(); if (single_major) unregister_blkdev(rbd_major, RBD_DRV_NAME); |