summaryrefslogtreecommitdiff
path: root/drivers/block/rbd.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2017-05-10 18:42:33 +0300
committerLinus Torvalds <torvalds@linux-foundation.org>2017-05-10 18:42:33 +0300
commit26c5eaa1326e9703effd01e7cc3cc0d4ad4b3c19 (patch)
tree070c518340ae308dce62695a06a118a1df78be15 /drivers/block/rbd.c
parent1176032cb12bb89ad558a3e57e82f2f25b817eff (diff)
parenteeca958dce0a9231d1969f86196653eb50fcc9b3 (diff)
downloadlinux-26c5eaa1326e9703effd01e7cc3cc0d4ad4b3c19.tar.xz
Merge tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov: "The two main items are support for disabling automatic rbd exclusive lock transfers from myself and the long awaited -ENOSPC handling series from Jeff. The former will allow rbd users to take advantage of exclusive lock's built-in blacklist/break-lock functionality while staying in control of who owns the lock. With the latter in place, we will abort filesystem writes on -ENOSPC instead of having them block indefinitely. Beyond that we've got the usual pile of filesystem fixes from Zheng, some refcount_t conversion patches from Elena and a patch for an ancient open() flags handling bug from Alexander" * tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client: (31 commits) ceph: fix memory leak in __ceph_setxattr() ceph: fix file open flags on ppc64 ceph: choose readdir frag based on previous readdir reply rbd: exclusive map option rbd: return ResponseMessage result from rbd_handle_request_lock() rbd: kill rbd_is_lock_supported() rbd: support updating the lock cookie without releasing the lock rbd: store lock cookie rbd: ignore unlock errors rbd: fix error handling around rbd_init_disk() rbd: move rbd_unregister_watch() call into rbd_dev_image_release() rbd: move rbd_dev_destroy() call out of rbd_dev_image_release() ceph: when seeing write errors on an inode, switch to sync writes Revert "ceph: SetPageError() for writeback pages if writepages fails" ceph: handle epoch barriers in cap messages libceph: add an epoch_barrier field to struct ceph_osd_client libceph: abort already submitted but abortable requests when map or pool goes full libceph: allow requests to return immediately on full conditions if caller wishes libceph: remove req->r_replay_version ceph: make seeky readdir more efficient ...
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r--drivers/block/rbd.c359
1 files changed, 215 insertions, 144 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 26812c1ed0cf..454bf9c34882 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -387,6 +387,7 @@ struct rbd_device {
struct rw_semaphore lock_rwsem;
enum rbd_lock_state lock_state;
+ char lock_cookie[32];
struct rbd_client_id owner_cid;
struct work_struct acquired_lock_work;
struct work_struct released_lock_work;
@@ -477,13 +478,6 @@ static int minor_to_rbd_dev_id(int minor)
return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
}
-static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
-{
- return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
- rbd_dev->spec->snap_id == CEPH_NOSNAP &&
- !rbd_dev->mapping.read_only;
-}
-
static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
{
return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
@@ -731,7 +725,7 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
kref_init(&rbdc->kref);
INIT_LIST_HEAD(&rbdc->node);
- rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
+ rbdc->client = ceph_create_client(ceph_opts, rbdc);
if (IS_ERR(rbdc->client))
goto out_rbdc;
ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
@@ -804,6 +798,7 @@ enum {
Opt_read_only,
Opt_read_write,
Opt_lock_on_read,
+ Opt_exclusive,
Opt_err
};
@@ -816,6 +811,7 @@ static match_table_t rbd_opts_tokens = {
{Opt_read_write, "read_write"},
{Opt_read_write, "rw"}, /* Alternate spelling */
{Opt_lock_on_read, "lock_on_read"},
+ {Opt_exclusive, "exclusive"},
{Opt_err, NULL}
};
@@ -823,11 +819,13 @@ struct rbd_options {
int queue_depth;
bool read_only;
bool lock_on_read;
+ bool exclusive;
};
#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
#define RBD_READ_ONLY_DEFAULT false
#define RBD_LOCK_ON_READ_DEFAULT false
+#define RBD_EXCLUSIVE_DEFAULT false
static int parse_rbd_opts_token(char *c, void *private)
{
@@ -866,6 +864,9 @@ static int parse_rbd_opts_token(char *c, void *private)
case Opt_lock_on_read:
rbd_opts->lock_on_read = true;
break;
+ case Opt_exclusive:
+ rbd_opts->exclusive = true;
+ break;
default:
/* libceph prints "bad option" msg */
return -EINVAL;
@@ -3079,7 +3080,8 @@ static int rbd_lock(struct rbd_device *rbd_dev)
char cookie[32];
int ret;
- WARN_ON(__rbd_is_lock_owner(rbd_dev));
+ WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
+ rbd_dev->lock_cookie[0] != '\0');
format_lock_cookie(rbd_dev, cookie);
ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
@@ -3089,6 +3091,7 @@ static int rbd_lock(struct rbd_device *rbd_dev)
return ret;
rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
+ strcpy(rbd_dev->lock_cookie, cookie);
rbd_set_owner_cid(rbd_dev, &cid);
queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
return 0;
@@ -3097,27 +3100,24 @@ static int rbd_lock(struct rbd_device *rbd_dev)
/*
* lock_rwsem must be held for write
*/
-static int rbd_unlock(struct rbd_device *rbd_dev)
+static void rbd_unlock(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
- char cookie[32];
int ret;
- WARN_ON(!__rbd_is_lock_owner(rbd_dev));
-
- rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+ WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
+ rbd_dev->lock_cookie[0] == '\0');
- format_lock_cookie(rbd_dev, cookie);
ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
- RBD_LOCK_NAME, cookie);
- if (ret && ret != -ENOENT) {
- rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
- return ret;
- }
+ RBD_LOCK_NAME, rbd_dev->lock_cookie);
+ if (ret && ret != -ENOENT)
+ rbd_warn(rbd_dev, "failed to unlock: %d", ret);
+ /* treat errors as the image is unlocked */
+ rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+ rbd_dev->lock_cookie[0] = '\0';
rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
- return 0;
}
static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
@@ -3447,6 +3447,18 @@ again:
ret = rbd_request_lock(rbd_dev);
if (ret == -ETIMEDOUT) {
goto again; /* treat this as a dead client */
+ } else if (ret == -EROFS) {
+ rbd_warn(rbd_dev, "peer will not release lock");
+ /*
+ * If this is rbd_add_acquire_lock(), we want to fail
+ * immediately -- reuse BLACKLISTED flag. Otherwise we
+ * want to block.
+ */
+ if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
+ set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
+ /* wake "rbd map --exclusive" process */
+ wake_requests(rbd_dev, false);
+ }
} else if (ret < 0) {
rbd_warn(rbd_dev, "error requesting lock: %d", ret);
mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
@@ -3490,16 +3502,15 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev)
if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
return false;
- if (!rbd_unlock(rbd_dev))
- /*
- * Give others a chance to grab the lock - we would re-acquire
- * almost immediately if we got new IO during ceph_osdc_sync()
- * otherwise. We need to ack our own notifications, so this
- * lock_dwork will be requeued from rbd_wait_state_locked()
- * after wake_requests() in rbd_handle_released_lock().
- */
- cancel_delayed_work(&rbd_dev->lock_dwork);
-
+ rbd_unlock(rbd_dev);
+ /*
+ * Give others a chance to grab the lock - we would re-acquire
+ * almost immediately if we got new IO during ceph_osdc_sync()
+ * otherwise. We need to ack our own notifications, so this
+ * lock_dwork will be requeued from rbd_wait_state_locked()
+ * after wake_requests() in rbd_handle_released_lock().
+ */
+ cancel_delayed_work(&rbd_dev->lock_dwork);
return true;
}
@@ -3580,12 +3591,16 @@ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
up_read(&rbd_dev->lock_rwsem);
}
-static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
- void **p)
+/*
+ * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
+ * ResponseMessage is needed.
+ */
+static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
+ void **p)
{
struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
struct rbd_client_id cid = { 0 };
- bool need_to_send;
+ int result = 1;
if (struct_v >= 2) {
cid.gid = ceph_decode_64(p);
@@ -3595,19 +3610,36 @@ static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
cid.handle);
if (rbd_cid_equal(&cid, &my_cid))
- return false;
+ return result;
down_read(&rbd_dev->lock_rwsem);
- need_to_send = __rbd_is_lock_owner(rbd_dev);
- if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
- if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
- dout("%s rbd_dev %p queueing unlock_work\n", __func__,
- rbd_dev);
- queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
+ if (__rbd_is_lock_owner(rbd_dev)) {
+ if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
+ rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
+ goto out_unlock;
+
+ /*
+ * encode ResponseMessage(0) so the peer can detect
+ * a missing owner
+ */
+ result = 0;
+
+ if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
+ if (!rbd_dev->opts->exclusive) {
+ dout("%s rbd_dev %p queueing unlock_work\n",
+ __func__, rbd_dev);
+ queue_work(rbd_dev->task_wq,
+ &rbd_dev->unlock_work);
+ } else {
+ /* refuse to release the lock */
+ result = -EROFS;
+ }
}
}
+
+out_unlock:
up_read(&rbd_dev->lock_rwsem);
- return need_to_send;
+ return result;
}
static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
@@ -3690,13 +3722,10 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
break;
case RBD_NOTIFY_OP_REQUEST_LOCK:
- if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
- /*
- * send ResponseMessage(0) back so the client
- * can detect a missing owner
- */
+ ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
+ if (ret <= 0)
rbd_acknowledge_notify_result(rbd_dev, notify_id,
- cookie, 0);
+ cookie, ret);
else
rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
break;
@@ -3821,24 +3850,51 @@ static void rbd_unregister_watch(struct rbd_device *rbd_dev)
ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
}
+/*
+ * lock_rwsem must be held for write
+ */
+static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
+{
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+ char cookie[32];
+ int ret;
+
+ WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+
+ format_lock_cookie(rbd_dev, cookie);
+ ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
+ &rbd_dev->header_oloc, RBD_LOCK_NAME,
+ CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
+ RBD_LOCK_TAG, cookie);
+ if (ret) {
+ if (ret != -EOPNOTSUPP)
+ rbd_warn(rbd_dev, "failed to update lock cookie: %d",
+ ret);
+
+ /*
+ * Lock cookie cannot be updated on older OSDs, so do
+ * a manual release and queue an acquire.
+ */
+ if (rbd_release_lock(rbd_dev))
+ queue_delayed_work(rbd_dev->task_wq,
+ &rbd_dev->lock_dwork, 0);
+ } else {
+ strcpy(rbd_dev->lock_cookie, cookie);
+ }
+}
+
static void rbd_reregister_watch(struct work_struct *work)
{
struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
struct rbd_device, watch_dwork);
- bool was_lock_owner = false;
- bool need_to_wake = false;
int ret;
dout("%s rbd_dev %p\n", __func__, rbd_dev);
- down_write(&rbd_dev->lock_rwsem);
- if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
- was_lock_owner = rbd_release_lock(rbd_dev);
-
mutex_lock(&rbd_dev->watch_mutex);
if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
mutex_unlock(&rbd_dev->watch_mutex);
- goto out;
+ return;
}
ret = __rbd_register_watch(rbd_dev);
@@ -3846,36 +3902,28 @@ static void rbd_reregister_watch(struct work_struct *work)
rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
if (ret == -EBLACKLISTED || ret == -ENOENT) {
set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
- need_to_wake = true;
+ wake_requests(rbd_dev, true);
} else {
queue_delayed_work(rbd_dev->task_wq,
&rbd_dev->watch_dwork,
RBD_RETRY_DELAY);
}
mutex_unlock(&rbd_dev->watch_mutex);
- goto out;
+ return;
}
- need_to_wake = true;
rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
mutex_unlock(&rbd_dev->watch_mutex);
+ down_write(&rbd_dev->lock_rwsem);
+ if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
+ rbd_reacquire_lock(rbd_dev);
+ up_write(&rbd_dev->lock_rwsem);
+
ret = rbd_dev_refresh(rbd_dev);
if (ret)
rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
-
- if (was_lock_owner) {
- ret = rbd_try_lock(rbd_dev);
- if (ret)
- rbd_warn(rbd_dev, "reregisteration lock failed: %d",
- ret);
- }
-
-out:
- up_write(&rbd_dev->lock_rwsem);
- if (need_to_wake)
- wake_requests(rbd_dev, true);
}
/*
@@ -4034,10 +4082,6 @@ static void rbd_queue_workfn(struct work_struct *work)
if (op_type != OBJ_OP_READ) {
snapc = rbd_dev->header.snapc;
ceph_get_snap_context(snapc);
- must_be_locked = rbd_is_lock_supported(rbd_dev);
- } else {
- must_be_locked = rbd_dev->opts->lock_on_read &&
- rbd_is_lock_supported(rbd_dev);
}
up_read(&rbd_dev->header_rwsem);
@@ -4048,14 +4092,20 @@ static void rbd_queue_workfn(struct work_struct *work)
goto err_rq;
}
+ must_be_locked =
+ (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
+ (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
if (must_be_locked) {
down_read(&rbd_dev->lock_rwsem);
if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
- !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
+ !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
+ if (rbd_dev->opts->exclusive) {
+ rbd_warn(rbd_dev, "exclusive lock required");
+ result = -EROFS;
+ goto err_unlock;
+ }
rbd_wait_state_locked(rbd_dev);
-
- WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
- !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
+ }
if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
result = -EBLACKLISTED;
goto err_unlock;
@@ -4114,19 +4164,10 @@ static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
static void rbd_free_disk(struct rbd_device *rbd_dev)
{
- struct gendisk *disk = rbd_dev->disk;
-
- if (!disk)
- return;
-
+ blk_cleanup_queue(rbd_dev->disk->queue);
+ blk_mq_free_tag_set(&rbd_dev->tag_set);
+ put_disk(rbd_dev->disk);
rbd_dev->disk = NULL;
- if (disk->flags & GENHD_FL_UP) {
- del_gendisk(disk);
- if (disk->queue)
- blk_cleanup_queue(disk->queue);
- blk_mq_free_tag_set(&rbd_dev->tag_set);
- }
- put_disk(disk);
}
static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
@@ -4383,8 +4424,12 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
+ /*
+ * disk_release() expects a queue ref from add_disk() and will
+ * put it. Hold an extra ref until add_disk() is called.
+ */
+ WARN_ON(!blk_get_queue(q));
disk->queue = q;
-
q->queuedata = rbd_dev;
rbd_dev->disk = disk;
@@ -5624,6 +5669,7 @@ static int rbd_add_parse_args(const char *buf,
rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
+ rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
copts = ceph_parse_options(options, mon_addrs,
mon_addrs + mon_addrs_size - 1,
@@ -5682,6 +5728,33 @@ again:
return ret;
}
+static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
+{
+ down_write(&rbd_dev->lock_rwsem);
+ if (__rbd_is_lock_owner(rbd_dev))
+ rbd_unlock(rbd_dev);
+ up_write(&rbd_dev->lock_rwsem);
+}
+
+static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
+{
+ if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
+ rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
+ return -EINVAL;
+ }
+
+ /* FIXME: "rbd map --exclusive" should be in interruptible */
+ down_read(&rbd_dev->lock_rwsem);
+ rbd_wait_state_locked(rbd_dev);
+ up_read(&rbd_dev->lock_rwsem);
+ if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
+ rbd_warn(rbd_dev, "failed to acquire exclusive lock");
+ return -EROFS;
+ }
+
+ return 0;
+}
+
/*
* An rbd format 2 image has a unique identifier, distinct from the
* name given to it by the user. Internally, that identifier is
@@ -5873,6 +5946,15 @@ out_err:
return ret;
}
+static void rbd_dev_device_release(struct rbd_device *rbd_dev)
+{
+ clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
+ rbd_dev_mapping_clear(rbd_dev);
+ rbd_free_disk(rbd_dev);
+ if (!single_major)
+ unregister_blkdev(rbd_dev->major, rbd_dev->name);
+}
+
/*
* rbd_dev->header_rwsem must be locked for write and will be unlocked
* upon return.
@@ -5908,26 +5990,13 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
- dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
- ret = device_add(&rbd_dev->dev);
+ ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
if (ret)
goto err_out_mapping;
- /* Everything's ready. Announce the disk to the world. */
-
set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
up_write(&rbd_dev->header_rwsem);
-
- spin_lock(&rbd_dev_list_lock);
- list_add_tail(&rbd_dev->node, &rbd_dev_list);
- spin_unlock(&rbd_dev_list_lock);
-
- add_disk(rbd_dev->disk);
- pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
- (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
- rbd_dev->header.features);
-
- return ret;
+ return 0;
err_out_mapping:
rbd_dev_mapping_clear(rbd_dev);
@@ -5962,11 +6031,11 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
static void rbd_dev_image_release(struct rbd_device *rbd_dev)
{
rbd_dev_unprobe(rbd_dev);
+ if (rbd_dev->opts)
+ rbd_unregister_watch(rbd_dev);
rbd_dev->image_format = 0;
kfree(rbd_dev->spec->image_id);
rbd_dev->spec->image_id = NULL;
-
- rbd_dev_destroy(rbd_dev);
}
/*
@@ -6126,22 +6195,43 @@ static ssize_t do_rbd_add(struct bus_type *bus,
rbd_dev->mapping.read_only = read_only;
rc = rbd_dev_device_setup(rbd_dev);
- if (rc) {
- /*
- * rbd_unregister_watch() can't be moved into
- * rbd_dev_image_release() without refactoring, see
- * commit 1f3ef78861ac.
- */
- rbd_unregister_watch(rbd_dev);
- rbd_dev_image_release(rbd_dev);
- goto out;
+ if (rc)
+ goto err_out_image_probe;
+
+ if (rbd_dev->opts->exclusive) {
+ rc = rbd_add_acquire_lock(rbd_dev);
+ if (rc)
+ goto err_out_device_setup;
}
+ /* Everything's ready. Announce the disk to the world. */
+
+ rc = device_add(&rbd_dev->dev);
+ if (rc)
+ goto err_out_image_lock;
+
+ add_disk(rbd_dev->disk);
+ /* see rbd_init_disk() */
+ blk_put_queue(rbd_dev->disk->queue);
+
+ spin_lock(&rbd_dev_list_lock);
+ list_add_tail(&rbd_dev->node, &rbd_dev_list);
+ spin_unlock(&rbd_dev_list_lock);
+
+ pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
+ (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
+ rbd_dev->header.features);
rc = count;
out:
module_put(THIS_MODULE);
return rc;
+err_out_image_lock:
+ rbd_dev_image_unlock(rbd_dev);
+err_out_device_setup:
+ rbd_dev_device_release(rbd_dev);
+err_out_image_probe:
+ rbd_dev_image_release(rbd_dev);
err_out_rbd_dev:
rbd_dev_destroy(rbd_dev);
err_out_client:
@@ -6169,21 +6259,6 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
return do_rbd_add(bus, buf, count);
}
-static void rbd_dev_device_release(struct rbd_device *rbd_dev)
-{
- rbd_free_disk(rbd_dev);
-
- spin_lock(&rbd_dev_list_lock);
- list_del_init(&rbd_dev->node);
- spin_unlock(&rbd_dev_list_lock);
-
- clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
- device_del(&rbd_dev->dev);
- rbd_dev_mapping_clear(rbd_dev);
- if (!single_major)
- unregister_blkdev(rbd_dev->major, rbd_dev->name);
-}
-
static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
{
while (rbd_dev->parent) {
@@ -6201,6 +6276,7 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
}
rbd_assert(second);
rbd_dev_image_release(second);
+ rbd_dev_destroy(second);
first->parent = NULL;
first->parent_overlap = 0;
@@ -6269,21 +6345,16 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
blk_set_queue_dying(rbd_dev->disk->queue);
}
- down_write(&rbd_dev->lock_rwsem);
- if (__rbd_is_lock_owner(rbd_dev))
- rbd_unlock(rbd_dev);
- up_write(&rbd_dev->lock_rwsem);
- rbd_unregister_watch(rbd_dev);
+ del_gendisk(rbd_dev->disk);
+ spin_lock(&rbd_dev_list_lock);
+ list_del_init(&rbd_dev->node);
+ spin_unlock(&rbd_dev_list_lock);
+ device_del(&rbd_dev->dev);
- /*
- * Don't free anything from rbd_dev->disk until after all
- * notifies are completely processed. Otherwise
- * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
- * in a potential use after free of rbd_dev->disk or rbd_dev.
- */
+ rbd_dev_image_unlock(rbd_dev);
rbd_dev_device_release(rbd_dev);
rbd_dev_image_release(rbd_dev);
-
+ rbd_dev_destroy(rbd_dev);
return count;
}