diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2019-07-18 21:05:25 +0300 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2019-07-18 21:05:25 +0300 |
commit | d9b9c893048e9d308a833619f0866f1f52778cf5 (patch) | |
tree | 29090d6871a39fdf35b6e5b22fe49750e9cf7bb3 /drivers/block/rbd.c | |
parent | 0fe49f70a08d7d25acee3b066a88c654fea26121 (diff) | |
parent | d31d07b97a5e76f41e00eb81dcca740e84aa7782 (diff) | |
download | linux-d9b9c893048e9d308a833619f0866f1f52778cf5.tar.xz |
Merge tag 'ceph-for-5.3-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
"Lots of exciting things this time!
- support for rbd object-map and fast-diff features (myself). This
will speed up reads, discards and things like snap diffs on sparse
images.
- ceph.snap.btime vxattr to expose snapshot creation time (David
Disseldorp). This will be used to integrate with "Restore Previous
Versions" feature added in Windows 7 for folks who reexport ceph
through SMB.
- security xattrs for ceph (Zheng Yan). Only selinux is supported for
now due to the limitations of ->dentry_init_security().
- support for MSG_ADDR2, FS_BTIME and FS_CHANGE_ATTR features (Jeff
Layton). This is actually a single feature bit which was missing
because of the filesystem pieces. With this in, the kernel client
will finally be reported as "luminous" by "ceph features" -- it is
still being reported as "jewel" even though all required Luminous
features were implemented in 4.13.
- stop NULL-terminating ceph vxattrs (Jeff Layton). The convention
with xattrs is to not terminate and this was causing
inconsistencies with ceph-fuse.
- change filesystem time granularity from 1 us to 1 ns, again fixing
an inconsistency with ceph-fuse (Luis Henriques).
On top of this there are some additional dentry name handling and cap
flushing fixes from Zheng. Finally, Jeff is formally taking over for
Zheng as the filesystem maintainer"
* tag 'ceph-for-5.3-rc1' of git://github.com/ceph/ceph-client: (71 commits)
ceph: fix end offset in truncate_inode_pages_range call
ceph: use generic_delete_inode() for ->drop_inode
ceph: use ceph_evict_inode to cleanup inode's resource
ceph: initialize superblock s_time_gran to 1
MAINTAINERS: take over for Zheng as CephFS kernel client maintainer
rbd: setallochint only if object doesn't exist
rbd: support for object-map and fast-diff
rbd: call rbd_dev_mapping_set() from rbd_dev_image_probe()
libceph: export osd_req_op_data() macro
libceph: change ceph_osdc_call() to take page vector for response
libceph: bump CEPH_MSG_MAX_DATA_LEN (again)
rbd: new exclusive lock wait/wake code
rbd: quiescing lock should wait for image requests
rbd: lock should be quiesced on reacquire
rbd: introduce copyup state machine
rbd: rename rbd_obj_setup_*() to rbd_obj_init_*()
rbd: move OSD request allocation into object request state machines
rbd: factor out __rbd_osd_setup_discard_ops()
rbd: factor out rbd_osd_setup_copyup()
rbd: introduce obj_req->osd_reqs list
...
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r-- | drivers/block/rbd.c | 2188 |
1 files changed, 1600 insertions, 588 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index e5009a34f9c2..3327192bb71f 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -115,6 +115,8 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_FEATURE_LAYERING (1ULL<<0) #define RBD_FEATURE_STRIPINGV2 (1ULL<<1) #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) +#define RBD_FEATURE_OBJECT_MAP (1ULL<<3) +#define RBD_FEATURE_FAST_DIFF (1ULL<<4) #define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5) #define RBD_FEATURE_DATA_POOL (1ULL<<7) #define RBD_FEATURE_OPERATIONS (1ULL<<8) @@ -122,6 +124,8 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ RBD_FEATURE_STRIPINGV2 | \ RBD_FEATURE_EXCLUSIVE_LOCK | \ + RBD_FEATURE_OBJECT_MAP | \ + RBD_FEATURE_FAST_DIFF | \ RBD_FEATURE_DEEP_FLATTEN | \ RBD_FEATURE_DATA_POOL | \ RBD_FEATURE_OPERATIONS) @@ -203,6 +207,11 @@ struct rbd_client { struct list_head node; }; +struct pending_result { + int result; /* first nonzero result */ + int num_pending; +}; + struct rbd_img_request; enum obj_request_type { @@ -219,6 +228,18 @@ enum obj_operation_type { OBJ_OP_ZEROOUT, }; +#define RBD_OBJ_FLAG_DELETION (1U << 0) +#define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1) +#define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2) +#define RBD_OBJ_FLAG_MAY_EXIST (1U << 3) +#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4) + +enum rbd_obj_read_state { + RBD_OBJ_READ_START = 1, + RBD_OBJ_READ_OBJECT, + RBD_OBJ_READ_PARENT, +}; + /* * Writes go through the following state machine to deal with * layering: @@ -245,17 +266,28 @@ enum obj_operation_type { * even if there is a parent). */ enum rbd_obj_write_state { - RBD_OBJ_WRITE_FLAT = 1, - RBD_OBJ_WRITE_GUARD, - RBD_OBJ_WRITE_READ_FROM_PARENT, - RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC, - RBD_OBJ_WRITE_COPYUP_OPS, + RBD_OBJ_WRITE_START = 1, + RBD_OBJ_WRITE_PRE_OBJECT_MAP, + RBD_OBJ_WRITE_OBJECT, + __RBD_OBJ_WRITE_COPYUP, + RBD_OBJ_WRITE_COPYUP, + RBD_OBJ_WRITE_POST_OBJECT_MAP, +}; + +enum rbd_obj_copyup_state { + RBD_OBJ_COPYUP_START = 1, + RBD_OBJ_COPYUP_READ_PARENT, + __RBD_OBJ_COPYUP_OBJECT_MAPS, + RBD_OBJ_COPYUP_OBJECT_MAPS, + __RBD_OBJ_COPYUP_WRITE_OBJECT, + RBD_OBJ_COPYUP_WRITE_OBJECT, }; struct rbd_obj_request { struct ceph_object_extent ex; + unsigned int flags; /* RBD_OBJ_FLAG_* */ union { - bool tried_parent; /* for reads */ + enum rbd_obj_read_state read_state; /* for reads */ enum rbd_obj_write_state write_state; /* for writes */ }; @@ -271,14 +303,15 @@ struct rbd_obj_request { u32 bvec_idx; }; }; + + enum rbd_obj_copyup_state copyup_state; struct bio_vec *copyup_bvecs; u32 copyup_bvec_count; - struct ceph_osd_request *osd_req; - - u64 xferred; /* bytes transferred */ - int result; + struct list_head osd_reqs; /* w/ r_private_item */ + struct mutex state_mutex; + struct pending_result pending; struct kref kref; }; @@ -287,11 +320,19 @@ enum img_req_flags { IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ }; +enum rbd_img_state { + RBD_IMG_START = 1, + RBD_IMG_EXCLUSIVE_LOCK, + __RBD_IMG_OBJECT_REQUESTS, + RBD_IMG_OBJECT_REQUESTS, +}; + struct rbd_img_request { struct rbd_device *rbd_dev; enum obj_operation_type op_type; enum obj_request_type data_type; unsigned long flags; + enum rbd_img_state state; union { u64 snap_id; /* for reads */ struct ceph_snap_context *snapc; /* for writes */ @@ -300,13 +341,14 @@ struct rbd_img_request { struct request *rq; /* block request */ struct rbd_obj_request *obj_request; /* obj req initiator */ }; - spinlock_t completion_lock; - u64 xferred;/* aggregate bytes transferred */ - int result; /* first nonzero obj_request result */ + struct list_head lock_item; struct list_head object_extents; /* obj_req.ex structs */ - u32 pending_count; + struct mutex state_mutex; + struct pending_result pending; + struct work_struct work; + int work_result; struct kref kref; }; @@ -380,7 +422,17 @@ struct rbd_device { struct work_struct released_lock_work; struct delayed_work lock_dwork; struct work_struct unlock_work; - wait_queue_head_t lock_waitq; + spinlock_t lock_lists_lock; + struct list_head acquiring_list; + struct list_head running_list; + struct completion acquire_wait; + int acquire_err; + struct completion releasing_wait; + + spinlock_t object_map_lock; + u8 *object_map; + u64 object_map_size; /* in objects */ + u64 object_map_flags; struct workqueue_struct *task_wq; @@ -408,12 +460,10 @@ struct rbd_device { * Flag bits for rbd_dev->flags: * - REMOVING (which is coupled with rbd_dev->open_count) is protected * by rbd_dev->lock - * - BLACKLISTED is protected by rbd_dev->lock_rwsem */ enum rbd_dev_flags { RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ - RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */ }; static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ @@ -466,6 +516,8 @@ static int minor_to_rbd_dev_id(int minor) static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) { + lockdep_assert_held(&rbd_dev->lock_rwsem); + return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; } @@ -583,6 +635,26 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, u8 *order, u64 *snap_size); static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, u64 *snap_features); +static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev); + +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result); +static void rbd_img_handle_request(struct rbd_img_request *img_req, int result); + +/* + * Return true if nothing else is pending. + */ +static bool pending_result_dec(struct pending_result *pending, int *result) +{ + rbd_assert(pending->num_pending > 0); + + if (*result && !pending->result) + pending->result = *result; + if (--pending->num_pending) + return false; + + *result = pending->result; + return true; +} static int rbd_open(struct block_device *bdev, fmode_t mode) { @@ -1317,6 +1389,8 @@ static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes) static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, u32 bytes) { + dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes); + switch (obj_req->img_request->data_type) { case OBJ_REQUEST_BIO: zero_bios(&obj_req->bio_pos, off, bytes); @@ -1339,13 +1413,6 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request) kref_put(&obj_request->kref, rbd_obj_request_destroy); } -static void rbd_img_request_get(struct rbd_img_request *img_request) -{ - dout("%s: img %p (was %d)\n", __func__, img_request, - kref_read(&img_request->kref)); - kref_get(&img_request->kref); -} - static void rbd_img_request_destroy(struct kref *kref); static void rbd_img_request_put(struct rbd_img_request *img_request) { @@ -1362,7 +1429,6 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, /* Image request now owns object's original reference */ obj_request->img_request = img_request; - img_request->pending_count++; dout("%s: img %p obj %p\n", __func__, img_request, obj_request); } @@ -1375,13 +1441,13 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, rbd_obj_request_put(obj_request); } -static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) +static void rbd_osd_submit(struct ceph_osd_request *osd_req) { - struct ceph_osd_request *osd_req = obj_request->osd_req; + struct rbd_obj_request *obj_req = osd_req->r_priv; - dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, - obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off, - obj_request->ex.oe_len, osd_req); + dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n", + __func__, osd_req, obj_req, obj_req->ex.oe_objno, + obj_req->ex.oe_off, obj_req->ex.oe_len); ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); } @@ -1457,41 +1523,38 @@ static bool rbd_img_is_write(struct rbd_img_request *img_req) } } -static void rbd_obj_handle_request(struct rbd_obj_request *obj_req); - static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) { struct rbd_obj_request *obj_req = osd_req->r_priv; + int result; dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, osd_req->r_result, obj_req); - rbd_assert(osd_req == obj_req->osd_req); - obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0; - if (!obj_req->result && !rbd_img_is_write(obj_req->img_request)) - obj_req->xferred = osd_req->r_result; + /* + * Writes aren't allowed to return a data payload. In some + * guarded write cases (e.g. stat + zero on an empty object) + * a stat response makes it through, but we don't care. + */ + if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request)) + result = 0; else - /* - * Writes aren't allowed to return a data payload. In some - * guarded write cases (e.g. stat + zero on an empty object) - * a stat response makes it through, but we don't care. - */ - obj_req->xferred = 0; + result = osd_req->r_result; - rbd_obj_handle_request(obj_req); + rbd_obj_handle_request(obj_req, result); } -static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) +static void rbd_osd_format_read(struct ceph_osd_request *osd_req) { - struct ceph_osd_request *osd_req = obj_request->osd_req; + struct rbd_obj_request *obj_request = osd_req->r_priv; osd_req->r_flags = CEPH_OSD_FLAG_READ; osd_req->r_snapid = obj_request->img_request->snap_id; } -static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) +static void rbd_osd_format_write(struct ceph_osd_request *osd_req) { - struct ceph_osd_request *osd_req = obj_request->osd_req; + struct rbd_obj_request *obj_request = osd_req->r_priv; osd_req->r_flags = CEPH_OSD_FLAG_WRITE; ktime_get_real_ts64(&osd_req->r_mtime); @@ -1499,19 +1562,21 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) } static struct ceph_osd_request * -__rbd_osd_req_create(struct rbd_obj_request *obj_req, - struct ceph_snap_context *snapc, unsigned int num_ops) +__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, + struct ceph_snap_context *snapc, int num_ops) { struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_request *req; const char *name_format = rbd_dev->image_format == 1 ? RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; + int ret; req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); if (!req) - return NULL; + return ERR_PTR(-ENOMEM); + list_add_tail(&req->r_private_item, &obj_req->osd_reqs); req->r_callback = rbd_osd_req_callback; req->r_priv = obj_req; @@ -1522,27 +1587,20 @@ __rbd_osd_req_create(struct rbd_obj_request *obj_req, ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); req->r_base_oloc.pool = rbd_dev->layout.pool_id; - if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, - rbd_dev->header.object_prefix, obj_req->ex.oe_objno)) - goto err_req; + ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, + rbd_dev->header.object_prefix, + obj_req->ex.oe_objno); + if (ret) + return ERR_PTR(ret); return req; - -err_req: - ceph_osdc_put_request(req); - return NULL; } static struct ceph_osd_request * -rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops) +rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops) { - return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc, - num_ops); -} - -static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) -{ - ceph_osdc_put_request(osd_req); + return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc, + num_ops); } static struct rbd_obj_request *rbd_obj_request_create(void) @@ -1554,6 +1612,8 @@ static struct rbd_obj_request *rbd_obj_request_create(void) return NULL; ceph_object_extent_init(&obj_request->ex); + INIT_LIST_HEAD(&obj_request->osd_reqs); + mutex_init(&obj_request->state_mutex); kref_init(&obj_request->kref); dout("%s %p\n", __func__, obj_request); @@ -1563,14 +1623,19 @@ static struct rbd_obj_request *rbd_obj_request_create(void) static void rbd_obj_request_destroy(struct kref *kref) { struct rbd_obj_request *obj_request; + struct ceph_osd_request *osd_req; u32 i; obj_request = container_of(kref, struct rbd_obj_request, kref); dout("%s: obj %p\n", __func__, obj_request); - if (obj_request->osd_req) - rbd_osd_req_destroy(obj_request->osd_req); + while (!list_empty(&obj_request->osd_reqs)) { + osd_req = list_first_entry(&obj_request->osd_reqs, + struct ceph_osd_request, r_private_item); + list_del_init(&osd_req->r_private_item); + ceph_osdc_put_request(osd_req); + } switch (obj_request->img_request->data_type) { case OBJ_REQUEST_NODATA: @@ -1684,8 +1749,9 @@ static struct rbd_img_request *rbd_img_request_create( if (rbd_dev_parent_get(rbd_dev)) img_request_layered_set(img_request); - spin_lock_init(&img_request->completion_lock); + INIT_LIST_HEAD(&img_request->lock_item); INIT_LIST_HEAD(&img_request->object_extents); + mutex_init(&img_request->state_mutex); kref_init(&img_request->kref); dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev, @@ -1703,6 +1769,7 @@ static void rbd_img_request_destroy(struct kref *kref) dout("%s: img %p\n", __func__, img_request); + WARN_ON(!list_empty(&img_request->lock_item)); for_each_obj_request_safe(img_request, obj_request, next_obj_request) rbd_img_obj_request_del(img_request, obj_request); @@ -1717,6 +1784,466 @@ static void rbd_img_request_destroy(struct kref *kref) kmem_cache_free(rbd_img_request_cache, img_request); } +#define BITS_PER_OBJ 2 +#define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ) +#define OBJ_MASK ((1 << BITS_PER_OBJ) - 1) + +static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno, + u64 *index, u8 *shift) +{ + u32 off; + + rbd_assert(objno < rbd_dev->object_map_size); + *index = div_u64_rem(objno, OBJS_PER_BYTE, &off); + *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ; +} + +static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno) +{ + u64 index; + u8 shift; + + lockdep_assert_held(&rbd_dev->object_map_lock); + __rbd_object_map_index(rbd_dev, objno, &index, &shift); + return (rbd_dev->object_map[index] >> shift) & OBJ_MASK; +} + +static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val) +{ + u64 index; + u8 shift; + u8 *p; + + lockdep_assert_held(&rbd_dev->object_map_lock); + rbd_assert(!(val & ~OBJ_MASK)); + + __rbd_object_map_index(rbd_dev, objno, &index, &shift); + p = &rbd_dev->object_map[index]; + *p = (*p & ~(OBJ_MASK << shift)) | (val << shift); +} + +static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno) +{ + u8 state; + + spin_lock(&rbd_dev->object_map_lock); + state = __rbd_object_map_get(rbd_dev, objno); + spin_unlock(&rbd_dev->object_map_lock); + return state; +} + +static bool use_object_map(struct rbd_device *rbd_dev) +{ + return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) && + !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)); +} + +static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno) +{ + u8 state; + + /* fall back to default logic if object map is disabled or invalid */ + if (!use_object_map(rbd_dev)) + return true; + + state = rbd_object_map_get(rbd_dev, objno); + return state != OBJECT_NONEXISTENT; +} + +static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id, + struct ceph_object_id *oid) +{ + if (snap_id == CEPH_NOSNAP) + ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX, + rbd_dev->spec->image_id); + else + ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX, + rbd_dev->spec->image_id, snap_id); +} + +static int rbd_object_map_lock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + u8 lock_type; + char *lock_tag; + struct ceph_locker *lockers; + u32 num_lockers; + bool broke_lock = false; + int ret; + + rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid); + +again: + ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, + CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0); + if (ret != -EBUSY || broke_lock) { + if (ret == -EEXIST) + ret = 0; /* already locked by myself */ + if (ret) + rbd_warn(rbd_dev, "failed to lock object map: %d", ret); + return ret; + } + + ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, &lock_type, &lock_tag, + &lockers, &num_lockers); + if (ret) { + if (ret == -ENOENT) + goto again; + + rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret); + return ret; + } + + kfree(lock_tag); + if (num_lockers == 0) + goto again; + + rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu", + ENTITY_NAME(lockers[0].id.name)); + + ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, lockers[0].id.cookie, + &lockers[0].id.name); + ceph_free_lockers(lockers, num_lockers); + if (ret) { + if (ret == -ENOENT) + goto again; + + rbd_warn(rbd_dev, "failed to break object map lock: %d", ret); + return ret; + } + + broke_lock = true; + goto again; +} + +static void rbd_object_map_unlock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + int ret; + + rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid); + + ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, + ""); + if (ret && ret != -ENOENT) + rbd_warn(rbd_dev, "failed to unlock object map: %d", ret); +} + +static int decode_object_map_header(void **p, void *end, u64 *object_map_size) +{ + u8 struct_v; + u32 struct_len; + u32 header_len; + void *header_end; + int ret; + + ceph_decode_32_safe(p, end, header_len, e_inval); + header_end = *p + header_len; + + ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v, + &struct_len); + if (ret) + return ret; + + ceph_decode_64_safe(p, end, *object_map_size, e_inval); + + *p = header_end; + return 0; + +e_inval: + return -EINVAL; +} + +static int __rbd_object_map_load(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + struct page **pages; + void *p, *end; + size_t reply_len; + u64 num_objects; + u64 object_map_bytes; + u64 object_map_size; + int num_pages; + int ret; + + rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size); + + num_objects = ceph_get_num_objects(&rbd_dev->layout, + rbd_dev->mapping.size); + object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ, + BITS_PER_BYTE); + num_pages = calc_pages_for(0, object_map_bytes) + 1; + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + reply_len = num_pages * PAGE_SIZE; + rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid); + ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc, + "rbd", "object_map_load", CEPH_OSD_FLAG_READ, + NULL, 0, pages, &reply_len); + if (ret) + goto out; + + p = page_address(pages[0]); + end = p + min(reply_len, (size_t)PAGE_SIZE); + ret = decode_object_map_header(&p, end, &object_map_size); + if (ret) + goto out; + + if (object_map_size != num_objects) { + rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu", + object_map_size, num_objects); + ret = -EINVAL; + goto out; + } + + if (offset_in_page(p) + object_map_bytes > reply_len) { + ret = -EINVAL; + goto out; + } + + rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL); + if (!rbd_dev->object_map) { + ret = -ENOMEM; + goto out; + } + + rbd_dev->object_map_size = object_map_size; + ceph_copy_from_page_vector(pages, rbd_dev->object_map, + offset_in_page(p), object_map_bytes); + +out: + ceph_release_page_vector(pages, num_pages); + return ret; +} + +static void rbd_object_map_free(struct rbd_device *rbd_dev) +{ + kvfree(rbd_dev->object_map); + rbd_dev->object_map = NULL; + rbd_dev->object_map_size = 0; +} + +static int rbd_object_map_load(struct rbd_device *rbd_dev) +{ + int ret; + + ret = __rbd_object_map_load(rbd_dev); + if (ret) + return ret; + + ret = rbd_dev_v2_get_flags(rbd_dev); + if (ret) { + rbd_object_map_free(rbd_dev); + return ret; + } + + if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID) + rbd_warn(rbd_dev, "object map is invalid"); + + return 0; +} + +static int rbd_object_map_open(struct rbd_device *rbd_dev) +{ + int ret; + + ret = rbd_object_map_lock(rbd_dev); + if (ret) + return ret; + + ret = rbd_object_map_load(rbd_dev); + if (ret) { + rbd_object_map_unlock(rbd_dev); + return ret; + } + + return 0; +} + +static void rbd_object_map_close(struct rbd_device *rbd_dev) +{ + rbd_object_map_free(rbd_dev); + rbd_object_map_unlock(rbd_dev); +} + +/* + * This function needs snap_id (or more precisely just something to + * distinguish between HEAD and snapshot object maps), new_state and + * current_state that were passed to rbd_object_map_update(). + * + * To avoid allocating and stashing a context we piggyback on the OSD + * request. A HEAD update has two ops (assert_locked). For new_state + * and current_state we decode our own object_map_update op, encoded in + * rbd_cls_object_map_update(). + */ +static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req, + struct ceph_osd_request *osd_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_osd_data *osd_data; + u64 objno; + u8 state, new_state, current_state; + bool has_current_state; + void *p; + + if (osd_req->r_result) + return osd_req->r_result; + + /* + * Nothing to do for a snapshot object map. + */ + if (osd_req->r_num_ops == 1) + return 0; + + /* + * Update in-memory HEAD object map. + */ + rbd_assert(osd_req->r_num_ops == 2); + osd_data = osd_req_op_data(osd_req, 1, cls, request_data); + rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES); + + p = page_address(osd_data->pages[0]); + objno = ceph_decode_64(&p); + rbd_assert(objno == obj_req->ex.oe_objno); + rbd_assert(ceph_decode_64(&p) == objno + 1); + new_state = ceph_decode_8(&p); + has_current_state = ceph_decode_8(&p); + if (has_current_state) + current_state = ceph_decode_8(&p); + + spin_lock(&rbd_dev->object_map_lock); + state = __rbd_object_map_get(rbd_dev, objno); + if (!has_current_state || current_state == state || + (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN)) + __rbd_object_map_set(rbd_dev, objno, new_state); + spin_unlock(&rbd_dev->object_map_lock); + + return 0; +} + +static void rbd_object_map_callback(struct ceph_osd_request *osd_req) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + int result; + + dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, + osd_req->r_result, obj_req); + + result = rbd_object_map_update_finish(obj_req, osd_req); + rbd_obj_handle_request(obj_req, result); +} + +static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state) +{ + u8 state = rbd_object_map_get(rbd_dev, objno); + + if (state == new_state || + (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) || + (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING)) + return false; + + return true; +} + +static int rbd_cls_object_map_update(struct ceph_osd_request *req, + int which, u64 objno, u8 new_state, + const u8 *current_state) +{ + struct page **pages; + void *p, *start; + int ret; + + ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update"); + if (ret) + return ret; + + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + p = start = page_address(pages[0]); + ceph_encode_64(&p, objno); + ceph_encode_64(&p, objno + 1); + ceph_encode_8(&p, new_state); + if (current_state) { + ceph_encode_8(&p, 1); + ceph_encode_8(&p, *current_state); + } else { + ceph_encode_8(&p, 0); + } + + osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0, + false, true); + return 0; +} + +/* + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error + */ +static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id, + u8 new_state, const u8 *current_state) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_osd_request *req; + int num_ops = 1; + int which = 0; + int ret; + + if (snap_id == CEPH_NOSNAP) { + if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state)) + return 1; + + num_ops++; /* assert_locked */ + } + + req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + list_add_tail(&req->r_private_item, &obj_req->osd_reqs); + req->r_callback = rbd_object_map_callback; + req->r_priv = obj_req; + + rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid); + ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); + req->r_flags = CEPH_OSD_FLAG_WRITE; + ktime_get_real_ts64(&req->r_mtime); + + if (snap_id == CEPH_NOSNAP) { + /* + * Protect against possible race conditions during lock + * ownership transitions. + */ + ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME, + CEPH_CLS_LOCK_EXCLUSIVE, "", ""); + if (ret) + return ret; + } + + ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno, + new_state, current_state); + if (ret) + return ret; + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + return ret; + + ceph_osdc_start_request(osdc, req, false); + return 0; +} + static void prune_extents(struct ceph_file_extent *img_extents, u32 *num_img_extents, u64 overlap) { @@ -1764,11 +2291,13 @@ static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req, return 0; } -static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) +static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which) { + struct rbd_obj_request *obj_req = osd_req->r_priv; + switch (obj_req->img_request->data_type) { case OBJ_REQUEST_BIO: - osd_req_op_extent_osd_data_bio(obj_req->osd_req, which, + osd_req_op_extent_osd_data_bio(osd_req, which, &obj_req->bio_pos, obj_req->ex.oe_len); break; @@ -1777,7 +2306,7 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) rbd_assert(obj_req->bvec_pos.iter.bi_size == obj_req->ex.oe_len); rbd_assert(obj_req->bvec_idx == obj_req->bvec_count); - osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which, + osd_req_op_extent_osd_data_bvec_pos(osd_req, which, &obj_req->bvec_pos); break; default: @@ -1785,22 +2314,7 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) } } -static int rbd_obj_setup_read(struct rbd_obj_request *obj_req) -{ - obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1); - if (!obj_req->osd_req) - return -ENOMEM; - - osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ, - obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); - rbd_osd_req_setup_data(obj_req, 0); - - rbd_osd_req_format_read(obj_req); - return 0; -} - -static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, - unsigned int which) +static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which) { struct page **pages; @@ -1816,45 +2330,60 @@ static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, if (IS_ERR(pages)) return PTR_ERR(pages); - osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0); - osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages, + osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0); + osd_req_op_raw_data_in_pages(osd_req, which, pages, 8 + sizeof(struct ceph_timespec), 0, false, true); return 0; } -static int count_write_ops(struct rbd_obj_request *obj_req) +static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which, + u32 bytes) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + int ret; + + ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup"); + if (ret) + return ret; + + osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs, + obj_req->copyup_bvec_count, bytes); + return 0; +} + +static int rbd_obj_init_read(struct rbd_obj_request *obj_req) { - return 2; /* setallochint + write/writefull */ + obj_req->read_state = RBD_OBJ_READ_START; + return 0; } -static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req, - unsigned int which) +static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req, + int which) { + struct rbd_obj_request *obj_req = osd_req->r_priv; struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; u16 opcode; - osd_req_op_alloc_hint_init(obj_req->osd_req, which++, - rbd_dev->layout.object_size, - rbd_dev->layout.object_size); + if (!use_object_map(rbd_dev) || + !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) { + osd_req_op_alloc_hint_init(osd_req, which++, + rbd_dev->layout.object_size, + rbd_dev->layout.object_size); + } if (rbd_obj_is_entire(obj_req)) opcode = CEPH_OSD_OP_WRITEFULL; else opcode = CEPH_OSD_OP_WRITE; - osd_req_op_extent_init(obj_req->osd_req, which, opcode, + osd_req_op_extent_init(osd_req, which, opcode, obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); - rbd_osd_req_setup_data(obj_req, which++); - - rbd_assert(which == obj_req->osd_req->r_num_ops); - rbd_osd_req_format_write(obj_req); + rbd_osd_setup_data(osd_req, which); } -static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) +static int rbd_obj_init_write(struct rbd_obj_request *obj_req) { - unsigned int num_osd_ops, which = 0; - bool need_guard; int ret; /* reverse map the entire object onto the parent */ @@ -1862,24 +2391,10 @@ static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) if (ret) return ret; - need_guard = rbd_obj_copyup_enabled(obj_req); - num_osd_ops = need_guard + count_write_ops(obj_req); - - obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); - if (!obj_req->osd_req) - return -ENOMEM; - - if (need_guard) { - ret = __rbd_obj_setup_stat(obj_req, which++); - if (ret) - return ret; + if (rbd_obj_copyup_enabled(obj_req)) + obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED; - obj_req->write_state = RBD_OBJ_WRITE_GUARD; - } else { - obj_req->write_state = RBD_OBJ_WRITE_FLAT; - } - - __rbd_obj_setup_write(obj_req, which); + obj_req->write_state = RBD_OBJ_WRITE_START; return 0; } @@ -1889,11 +2404,26 @@ static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req) CEPH_OSD_OP_ZERO; } -static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) +static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req, + int which) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + + if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) { + rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION); + osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0); + } else { + osd_req_op_extent_init(osd_req, which, + truncate_or_zero_opcode(obj_req), + obj_req->ex.oe_off, obj_req->ex.oe_len, + 0, 0); + } +} + +static int rbd_obj_init_discard(struct rbd_obj_request *obj_req) { struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; - u64 off = obj_req->ex.oe_off; - u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len; + u64 off, next_off; int ret; /* @@ -1906,10 +2436,17 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) */ if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size || !rbd_obj_is_tail(obj_req)) { - off = round_up(off, rbd_dev->opts->alloc_size); - next_off = round_down(next_off, rbd_dev->opts->alloc_size); + off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size); + next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len, + rbd_dev->opts->alloc_size); if (off >= next_off) return 1; + + dout("%s %p %llu~%llu -> %llu~%llu\n", __func__, + obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len, + off, next_off - off); + obj_req->ex.oe_off = off; + obj_req->ex.oe_len = next_off - off; } /* reverse map the entire object onto the parent */ @@ -1917,52 +2454,29 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) if (ret) return ret; - obj_req->osd_req = rbd_osd_req_create(obj_req, 1); - if (!obj_req->osd_req) - return -ENOMEM; - - if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) { - osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0); - } else { - dout("%s %p %llu~%llu -> %llu~%llu\n", __func__, - obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len, - off, next_off - off); - osd_req_op_extent_init(obj_req->osd_req, 0, - truncate_or_zero_opcode(obj_req), - off, next_off - off, 0, 0); - } + obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT; + if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) + obj_req->flags |= RBD_OBJ_FLAG_DELETION; - obj_req->write_state = RBD_OBJ_WRITE_FLAT; - rbd_osd_req_format_write(obj_req); + obj_req->write_state = RBD_OBJ_WRITE_START; return 0; } -static int count_zeroout_ops(struct rbd_obj_request *obj_req) -{ - int num_osd_ops; - - if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents && - !rbd_obj_copyup_enabled(obj_req)) - num_osd_ops = 2; /* create + truncate */ - else - num_osd_ops = 1; /* delete/truncate/zero */ - - return num_osd_ops; -} - -static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req, - unsigned int which) +static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req, + int which) { + struct rbd_obj_request *obj_req = osd_req->r_priv; u16 opcode; if (rbd_obj_is_entire(obj_req)) { if (obj_req->num_img_extents) { - if (!rbd_obj_copyup_enabled(obj_req)) - osd_req_op_init(obj_req->osd_req, which++, + if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)) + osd_req_op_init(osd_req, which++, CEPH_OSD_OP_CREATE, 0); opcode = CEPH_OSD_OP_TRUNCATE; } else { - osd_req_op_init(obj_req->osd_req, which++, + rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION); + osd_req_op_init(osd_req, which++, CEPH_OSD_OP_DELETE, 0); opcode = 0; } @@ -1971,18 +2485,13 @@ static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req, } if (opcode) - osd_req_op_extent_init(obj_req->osd_req, which++, opcode, + osd_req_op_extent_init(osd_req, which, opcode, obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); - - rbd_assert(which == obj_req->osd_req->r_num_ops); - rbd_osd_req_format_write(obj_req); } -static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req) +static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req) { - unsigned int num_osd_ops, which = 0; - bool need_guard; int ret; /* reverse map the entire object onto the parent */ @@ -1990,31 +2499,66 @@ static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req) if (ret) return ret; - need_guard = rbd_obj_copyup_enabled(obj_req); - num_osd_ops = need_guard + count_zeroout_ops(obj_req); + if (rbd_obj_copyup_enabled(obj_req)) + obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED; + if (!obj_req->num_img_extents) { + obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT; + if (rbd_obj_is_entire(obj_req)) + obj_req->flags |= RBD_OBJ_FLAG_DELETION; + } - obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); - if (!obj_req->osd_req) - return -ENOMEM; + obj_req->write_state = RBD_OBJ_WRITE_START; + return 0; +} - if (need_guard) { - ret = __rbd_obj_setup_stat(obj_req, which++); - if (ret) - return ret; +static int count_write_ops(struct rbd_obj_request *obj_req) +{ + struct rbd_img_request *img_req = obj_req->img_request; - obj_req->write_state = RBD_OBJ_WRITE_GUARD; - } else { - obj_req->write_state = RBD_OBJ_WRITE_FLAT; + switch (img_req->op_type) { + case OBJ_OP_WRITE: + if (!use_object_map(img_req->rbd_dev) || + !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) + return 2; /* setallochint + write/writefull */ + + return 1; /* write/writefull */ + case OBJ_OP_DISCARD: + return 1; /* delete/truncate/zero */ + case OBJ_OP_ZEROOUT: + if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents && + !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)) + return 2; /* create + truncate */ + + return 1; /* delete/truncate/zero */ + default: + BUG(); } +} - __rbd_obj_setup_zeroout(obj_req, which); - return 0; +static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req, + int which) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + + switch (obj_req->img_request->op_type) { + case OBJ_OP_WRITE: + __rbd_osd_setup_write_ops(osd_req, which); + break; + case OBJ_OP_DISCARD: + __rbd_osd_setup_discard_ops(osd_req, which); + break; + case OBJ_OP_ZEROOUT: + __rbd_osd_setup_zeroout_ops(osd_req, which); + break; + default: + BUG(); + } } /* - * For each object request in @img_req, allocate an OSD request, add - * individual OSD ops and prepare them for submission. The number of - * OSD ops depends on op_type and the overlap point (if any). + * Prune the list of object requests (adjust offset and/or length, drop + * redundant requests). Prepare object request state machines and image + * request state machine for execution. */ static int __rbd_img_fill_request(struct rbd_img_request *img_req) { @@ -2024,16 +2568,16 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req) for_each_obj_request_safe(img_req, obj_req, next_obj_req) { switch (img_req->op_type) { case OBJ_OP_READ: - ret = rbd_obj_setup_read(obj_req); + ret = rbd_obj_init_read(obj_req); break; case OBJ_OP_WRITE: - ret = rbd_obj_setup_write(obj_req); + ret = rbd_obj_init_write(obj_req); break; case OBJ_OP_DISCARD: - ret = rbd_obj_setup_discard(obj_req); + ret = rbd_obj_init_discard(obj_req); break; case OBJ_OP_ZEROOUT: - ret = rbd_obj_setup_zeroout(obj_req); + ret = rbd_obj_init_zeroout(obj_req); break; default: BUG(); @@ -2041,17 +2585,12 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req) if (ret < 0) return ret; if (ret > 0) { - img_req->xferred += obj_req->ex.oe_len; - img_req->pending_count--; rbd_img_obj_request_del(img_req, obj_req); continue; } - - ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); - if (ret) - return ret; } + img_req->state = RBD_IMG_START; return 0; } @@ -2340,17 +2879,55 @@ static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, &it); } -static void rbd_img_request_submit(struct rbd_img_request *img_request) +static void rbd_img_handle_request_work(struct work_struct *work) { - struct rbd_obj_request *obj_request; + struct rbd_img_request *img_req = + container_of(work, struct rbd_img_request, work); - dout("%s: img %p\n", __func__, img_request); + rbd_img_handle_request(img_req, img_req->work_result); +} - rbd_img_request_get(img_request); - for_each_obj_request(img_request, obj_request) - rbd_obj_request_submit(obj_request); +static void rbd_img_schedule(struct rbd_img_request *img_req, int result) +{ + INIT_WORK(&img_req->work, rbd_img_handle_request_work); + img_req->work_result = result; + queue_work(rbd_wq, &img_req->work); +} - rbd_img_request_put(img_request); +static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + + if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) { + obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST; + return true; + } + + dout("%s %p objno %llu assuming dne\n", __func__, obj_req, + obj_req->ex.oe_objno); + return false; +} + +static int rbd_obj_read_object(struct rbd_obj_request *obj_req) +{ + struct ceph_osd_request *osd_req; + int ret; + + osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); + + osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ, + obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); + rbd_osd_setup_data(osd_req, 0); + rbd_osd_format_read(osd_req); + + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); + if (ret) + return ret; + + rbd_osd_submit(osd_req); + return 0; } static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req) @@ -2396,51 +2973,144 @@ static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req) return ret; } - rbd_img_request_submit(child_img_req); + /* avoid parent chain recursion */ + rbd_img_schedule(child_img_req, 0); return 0; } -static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req) +static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result) { struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret; - if (obj_req->result == -ENOENT && - rbd_dev->parent_overlap && !obj_req->tried_parent) { - /* reverse map this object extent onto the parent */ - ret = rbd_obj_calc_img_extents(obj_req, false); +again: + switch (obj_req->read_state) { + case RBD_OBJ_READ_START: + rbd_assert(!*result); + + if (!rbd_obj_may_exist(obj_req)) { + *result = -ENOENT; + obj_req->read_state = RBD_OBJ_READ_OBJECT; + goto again; + } + + ret = rbd_obj_read_object(obj_req); if (ret) { - obj_req->result = ret; + *result = ret; return true; } - - if (obj_req->num_img_extents) { - obj_req->tried_parent = true; - ret = rbd_obj_read_from_parent(obj_req); + obj_req->read_state = RBD_OBJ_READ_OBJECT; + return false; + case RBD_OBJ_READ_OBJECT: + if (*result == -ENOENT && rbd_dev->parent_overlap) { + /* reverse map this object extent onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, false); if (ret) { - obj_req->result = ret; + *result = ret; return true; } - return false; + if (obj_req->num_img_extents) { + ret = rbd_obj_read_from_parent(obj_req); + if (ret) { + *result = ret; + return true; + } + obj_req->read_state = RBD_OBJ_READ_PARENT; + return false; + } + } + + /* + * -ENOENT means a hole in the image -- zero-fill the entire + * length of the request. A short read also implies zero-fill + * to the end of the request. + */ + if (*result == -ENOENT) { + rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len); + *result = 0; + } else if (*result >= 0) { + if (*result < obj_req->ex.oe_len) + rbd_obj_zero_range(obj_req, *result, + obj_req->ex.oe_len - *result); + else + rbd_assert(*result == obj_req->ex.oe_len); + *result = 0; } + return true; + case RBD_OBJ_READ_PARENT: + return true; + default: + BUG(); } +} - /* - * -ENOENT means a hole in the image -- zero-fill the entire - * length of the request. A short read also implies zero-fill - * to the end of the request. In both cases we update xferred - * count to indicate the whole request was satisfied. - */ - if (obj_req->result == -ENOENT || - (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) { - rbd_assert(!obj_req->xferred || !obj_req->result); - rbd_obj_zero_range(obj_req, obj_req->xferred, - obj_req->ex.oe_len - obj_req->xferred); - obj_req->result = 0; - obj_req->xferred = obj_req->ex.oe_len; +static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + + if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) + obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST; + + if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) && + (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) { + dout("%s %p noop for nonexistent\n", __func__, obj_req); + return true; } - return true; + return false; +} + +/* + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error + */ +static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u8 new_state; + + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return 1; + + if (obj_req->flags & RBD_OBJ_FLAG_DELETION) + new_state = OBJECT_PENDING; + else + new_state = OBJECT_EXISTS; + + return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL); +} + +static int rbd_obj_write_object(struct rbd_obj_request *obj_req) +{ + struct ceph_osd_request *osd_req; + int num_ops = count_write_ops(obj_req); + int which = 0; + int ret; + + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) + num_ops++; /* stat */ + + osd_req = rbd_obj_add_osd_request(obj_req, num_ops); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); + + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) { + ret = rbd_osd_setup_stat(osd_req, which++); + if (ret) + return ret; + } + + rbd_osd_setup_write_ops(osd_req, which); + rbd_osd_format_write(osd_req); + + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); + if (ret) + return ret; + + rbd_osd_submit(osd_req); + return 0; } /* @@ -2463,123 +3133,67 @@ static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) #define MODS_ONLY U32_MAX -static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req, - u32 bytes) +static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req, + u32 bytes) { + struct ceph_osd_request *osd_req; int ret; dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); - rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); rbd_assert(bytes > 0 && bytes != MODS_ONLY); - rbd_osd_req_destroy(obj_req->osd_req); - obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1); - if (!obj_req->osd_req) - return -ENOMEM; + osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); - ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup"); + ret = rbd_osd_setup_copyup(osd_req, 0, bytes); if (ret) return ret; - osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, - obj_req->copyup_bvecs, - obj_req->copyup_bvec_count, - bytes); - rbd_osd_req_format_write(obj_req); + rbd_osd_format_write(osd_req); - ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); if (ret) return ret; - rbd_obj_request_submit(obj_req); + rbd_osd_submit(osd_req); return 0; } -static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes) +static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req, + u32 bytes) { - struct rbd_img_request *img_req = obj_req->img_request; - unsigned int num_osd_ops = (bytes != MODS_ONLY); - unsigned int which = 0; + struct ceph_osd_request *osd_req; + int num_ops = count_write_ops(obj_req); + int which = 0; int ret; dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); - rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT || - obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL); - rbd_osd_req_destroy(obj_req->osd_req); - switch (img_req->op_type) { - case OBJ_OP_WRITE: - num_osd_ops += count_write_ops(obj_req); - break; - case OBJ_OP_ZEROOUT: - num_osd_ops += count_zeroout_ops(obj_req); - break; - default: - BUG(); - } + if (bytes != MODS_ONLY) + num_ops++; /* copyup */ - obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); - if (!obj_req->osd_req) - return -ENOMEM; + osd_req = rbd_obj_add_osd_request(obj_req, num_ops); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); if (bytes != MODS_ONLY) { - ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd", - "copyup"); + ret = rbd_osd_setup_copyup(osd_req, which++, bytes); if (ret) return ret; - - osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++, - obj_req->copyup_bvecs, - obj_req->copyup_bvec_count, - bytes); } - switch (img_req->op_type) { - case OBJ_OP_WRITE: - __rbd_obj_setup_write(obj_req, which); - break; - case OBJ_OP_ZEROOUT: - __rbd_obj_setup_zeroout(obj_req, which); - break; - default: - BUG(); - } + rbd_osd_setup_write_ops(osd_req, which); + rbd_osd_format_write(osd_req); - ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); if (ret) return ret; - rbd_obj_request_submit(obj_req); + rbd_osd_submit(osd_req); return 0; } -static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) -{ - /* - * Only send non-zero copyup data to save some I/O and network - * bandwidth -- zero copyup data is equivalent to the object not - * existing. - */ - if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) { - dout("%s obj_req %p detected zeroes\n", __func__, obj_req); - bytes = 0; - } - - if (obj_req->img_request->snapc->num_snaps && bytes > 0) { - /* - * Send a copyup request with an empty snapshot context to - * deep-copyup the object through all existing snapshots. - * A second request with the current snapshot context will be - * sent for the actual modification. - */ - obj_req->write_state = RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC; - return rbd_obj_issue_copyup_empty_snapc(obj_req, bytes); - } - - obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS; - return rbd_obj_issue_copyup_ops(obj_req, bytes); -} - static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) { u32 i; @@ -2608,7 +3222,12 @@ static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) return 0; } -static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) +/* + * The target object doesn't exist. Read the data for the entire + * target object up to the overlap point (if any) from the parent, + * so we can use it for a copyup. + */ +static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req) { struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret; @@ -2623,178 +3242,492 @@ static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) * request -- pass MODS_ONLY since the copyup isn't needed * anymore. */ - obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS; - return rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY); + return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY); } ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req)); if (ret) return ret; - obj_req->write_state = RBD_OBJ_WRITE_READ_FROM_PARENT; return rbd_obj_read_from_parent(obj_req); } -static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req) +static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req) { + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_snap_context *snapc = obj_req->img_request->snapc; + u8 new_state; + u32 i; int ret; - switch (obj_req->write_state) { - case RBD_OBJ_WRITE_GUARD: - rbd_assert(!obj_req->xferred); - if (obj_req->result == -ENOENT) { - /* - * The target object doesn't exist. Read the data for - * the entire target object up to the overlap point (if - * any) from the parent, so we can use it for a copyup. - */ - ret = rbd_obj_handle_write_guard(obj_req); - if (ret) { - obj_req->result = ret; - return true; - } - return false; + rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending); + + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return; + + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS) + return; + + for (i = 0; i < snapc->num_snaps; i++) { + if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) && + i + 1 < snapc->num_snaps) + new_state = OBJECT_EXISTS_CLEAN; + else + new_state = OBJECT_EXISTS; + + ret = rbd_object_map_update(obj_req, snapc->snaps[i], + new_state, NULL); + if (ret < 0) { + obj_req->pending.result = ret; + return; } - /* fall through */ - case RBD_OBJ_WRITE_FLAT: - case RBD_OBJ_WRITE_COPYUP_OPS: - if (!obj_req->result) - /* - * There is no such thing as a successful short - * write -- indicate the whole request was satisfied. - */ - obj_req->xferred = obj_req->ex.oe_len; - return true; - case RBD_OBJ_WRITE_READ_FROM_PARENT: - if (obj_req->result) - return true; - rbd_assert(obj_req->xferred); - ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred); + rbd_assert(!ret); + obj_req->pending.num_pending++; + } +} + +static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req) +{ + u32 bytes = rbd_obj_img_extents_bytes(obj_req); + int ret; + + rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending); + + /* + * Only send non-zero copyup data to save some I/O and network + * bandwidth -- zero copyup data is equivalent to the object not + * existing. + */ + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS) + bytes = 0; + + if (obj_req->img_request->snapc->num_snaps && bytes > 0) { + /* + * Send a copyup request with an empty snapshot context to + * deep-copyup the object through all existing snapshots. + * A second request with the current snapshot context will be + * sent for the actual modification. + */ + ret = rbd_obj_copyup_empty_snapc(obj_req, bytes); + if (ret) { + obj_req->pending.result = ret; + return; + } + + obj_req->pending.num_pending++; + bytes = MODS_ONLY; + } + + ret = rbd_obj_copyup_current_snapc(obj_req, bytes); + if (ret) { + obj_req->pending.result = ret; + return; + } + + obj_req->pending.num_pending++; +} + +static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; + +again: + switch (obj_req->copyup_state) { + case RBD_OBJ_COPYUP_START: + rbd_assert(!*result); + + ret = rbd_obj_copyup_read_parent(obj_req); if (ret) { - obj_req->result = ret; - obj_req->xferred = 0; + *result = ret; return true; } + if (obj_req->num_img_extents) + obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT; + else + obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT; return false; - case RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC: - if (obj_req->result) + case RBD_OBJ_COPYUP_READ_PARENT: + if (*result) return true; - obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS; - ret = rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY); - if (ret) { - obj_req->result = ret; + if (is_zero_bvecs(obj_req->copyup_bvecs, + rbd_obj_img_extents_bytes(obj_req))) { + dout("%s %p detected zeros\n", __func__, obj_req); + obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS; + } + + rbd_obj_copyup_object_maps(obj_req); + if (!obj_req->pending.num_pending) { + *result = obj_req->pending.result; + obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS; + goto again; + } + obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS; + return false; + case __RBD_OBJ_COPYUP_OBJECT_MAPS: + if (!pending_result_dec(&obj_req->pending, result)) + return false; + /* fall through */ + case RBD_OBJ_COPYUP_OBJECT_MAPS: + if (*result) { + rbd_warn(rbd_dev, "snap object map update failed: %d", + *result); return true; } + + rbd_obj_copyup_write_object(obj_req); + if (!obj_req->pending.num_pending) { + *result = obj_req->pending.result; + obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT; + goto again; + } + obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT; return false; + case __RBD_OBJ_COPYUP_WRITE_OBJECT: + if (!pending_result_dec(&obj_req->pending, result)) + return false; + /* fall through */ + case RBD_OBJ_COPYUP_WRITE_OBJECT: + return true; default: BUG(); } } /* - * Returns true if @obj_req is completed, or false otherwise. + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error */ -static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req) +static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req) { - switch (obj_req->img_request->op_type) { - case OBJ_OP_READ: - return rbd_obj_handle_read(obj_req); - case OBJ_OP_WRITE: - return rbd_obj_handle_write(obj_req); - case OBJ_OP_DISCARD: - case OBJ_OP_ZEROOUT: - if (rbd_obj_handle_write(obj_req)) { + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u8 current_state = OBJECT_PENDING; + + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return 1; + + if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION)) + return 1; + + return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT, + ¤t_state); +} + +static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; + +again: + switch (obj_req->write_state) { + case RBD_OBJ_WRITE_START: + rbd_assert(!*result); + + if (rbd_obj_write_is_noop(obj_req)) + return true; + + ret = rbd_obj_write_pre_object_map(obj_req); + if (ret < 0) { + *result = ret; + return true; + } + obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP; + if (ret > 0) + goto again; + return false; + case RBD_OBJ_WRITE_PRE_OBJECT_MAP: + if (*result) { + rbd_warn(rbd_dev, "pre object map update failed: %d", + *result); + return true; + } + ret = rbd_obj_write_object(obj_req); + if (ret) { + *result = ret; + return true; + } + obj_req->write_state = RBD_OBJ_WRITE_OBJECT; + return false; + case RBD_OBJ_WRITE_OBJECT: + if (*result == -ENOENT) { + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) { + *result = 0; + obj_req->copyup_state = RBD_OBJ_COPYUP_START; + obj_req->write_state = __RBD_OBJ_WRITE_COPYUP; + goto again; + } /* - * Hide -ENOENT from delete/truncate/zero -- discarding - * a non-existent object is not a problem. + * On a non-existent object: + * delete - -ENOENT, truncate/zero - 0 */ - if (obj_req->result == -ENOENT) { - obj_req->result = 0; - obj_req->xferred = obj_req->ex.oe_len; - } + if (obj_req->flags & RBD_OBJ_FLAG_DELETION) + *result = 0; + } + if (*result) + return true; + + obj_req->write_state = RBD_OBJ_WRITE_COPYUP; + goto again; + case __RBD_OBJ_WRITE_COPYUP: + if (!rbd_obj_advance_copyup(obj_req, result)) + return false; + /* fall through */ + case RBD_OBJ_WRITE_COPYUP: + if (*result) { + rbd_warn(rbd_dev, "copyup failed: %d", *result); + return true; + } + ret = rbd_obj_write_post_object_map(obj_req); + if (ret < 0) { + *result = ret; return true; } + obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP; + if (ret > 0) + goto again; return false; + case RBD_OBJ_WRITE_POST_OBJECT_MAP: + if (*result) + rbd_warn(rbd_dev, "post object map update failed: %d", + *result); + return true; default: BUG(); } } -static void rbd_obj_end_request(struct rbd_obj_request *obj_req) +/* + * Return true if @obj_req is completed. + */ +static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req, + int *result) { struct rbd_img_request *img_req = obj_req->img_request; + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool done; - rbd_assert((!obj_req->result && - obj_req->xferred == obj_req->ex.oe_len) || - (obj_req->result < 0 && !obj_req->xferred)); - if (!obj_req->result) { - img_req->xferred += obj_req->xferred; - return; - } + mutex_lock(&obj_req->state_mutex); + if (!rbd_img_is_write(img_req)) + done = rbd_obj_advance_read(obj_req, result); + else + done = rbd_obj_advance_write(obj_req, result); + mutex_unlock(&obj_req->state_mutex); - rbd_warn(img_req->rbd_dev, - "%s at objno %llu %llu~%llu result %d xferred %llu", - obj_op_name(img_req->op_type), obj_req->ex.oe_objno, - obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result, - obj_req->xferred); - if (!img_req->result) { - img_req->result = obj_req->result; - img_req->xferred = 0; + if (done && *result) { + rbd_assert(*result < 0); + rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d", + obj_op_name(img_req->op_type), obj_req->ex.oe_objno, + obj_req->ex.oe_off, obj_req->ex.oe_len, *result); } + return done; } -static void rbd_img_end_child_request(struct rbd_img_request *img_req) +/* + * This is open-coded in rbd_img_handle_request() to avoid parent chain + * recursion. + */ +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result) { - struct rbd_obj_request *obj_req = img_req->obj_request; + if (__rbd_obj_handle_request(obj_req, &result)) + rbd_img_handle_request(obj_req->img_request, result); +} - rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags)); - rbd_assert((!img_req->result && - img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) || - (img_req->result < 0 && !img_req->xferred)); +static bool need_exclusive_lock(struct rbd_img_request *img_req) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; - obj_req->result = img_req->result; - obj_req->xferred = img_req->xferred; - rbd_img_request_put(img_req); + if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) + return false; + + if (rbd_dev->spec->snap_id != CEPH_NOSNAP) + return false; + + rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); + if (rbd_dev->opts->lock_on_read || + (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return true; + + return rbd_img_is_write(img_req); } -static void rbd_img_end_request(struct rbd_img_request *img_req) +static bool rbd_lock_add_request(struct rbd_img_request *img_req) { - rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); - rbd_assert((!img_req->result && - img_req->xferred == blk_rq_bytes(img_req->rq)) || - (img_req->result < 0 && !img_req->xferred)); + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool locked; + + lockdep_assert_held(&rbd_dev->lock_rwsem); + locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED; + spin_lock(&rbd_dev->lock_lists_lock); + rbd_assert(list_empty(&img_req->lock_item)); + if (!locked) + list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list); + else + list_add_tail(&img_req->lock_item, &rbd_dev->running_list); + spin_unlock(&rbd_dev->lock_lists_lock); + return locked; +} + +static void rbd_lock_del_request(struct rbd_img_request *img_req) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool need_wakeup; - blk_mq_end_request(img_req->rq, - errno_to_blk_status(img_req->result)); - rbd_img_request_put(img_req); + lockdep_assert_held(&rbd_dev->lock_rwsem); + spin_lock(&rbd_dev->lock_lists_lock); + rbd_assert(!list_empty(&img_req->lock_item)); + list_del_init(&img_req->lock_item); + need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING && + list_empty(&rbd_dev->running_list)); + spin_unlock(&rbd_dev->lock_lists_lock); + if (need_wakeup) + complete(&rbd_dev->releasing_wait); } -static void rbd_obj_handle_request(struct rbd_obj_request *obj_req) +static int rbd_img_exclusive_lock(struct rbd_img_request *img_req) { - struct rbd_img_request *img_req; + struct rbd_device *rbd_dev = img_req->rbd_dev; + + if (!need_exclusive_lock(img_req)) + return 1; + + if (rbd_lock_add_request(img_req)) + return 1; + + if (rbd_dev->opts->exclusive) { + WARN_ON(1); /* lock got released? */ + return -EROFS; + } + + /* + * Note the use of mod_delayed_work() in rbd_acquire_lock() + * and cancel_delayed_work() in wake_lock_waiters(). + */ + dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + return 0; +} + +static void rbd_img_object_requests(struct rbd_img_request *img_req) +{ + struct rbd_obj_request *obj_req; + + rbd_assert(!img_req->pending.result && !img_req->pending.num_pending); + + for_each_obj_request(img_req, obj_req) { + int result = 0; + + if (__rbd_obj_handle_request(obj_req, &result)) { + if (result) { + img_req->pending.result = result; + return; + } + } else { + img_req->pending.num_pending++; + } + } +} + +static bool rbd_img_advance(struct rbd_img_request *img_req, int *result) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + int ret; again: - if (!__rbd_obj_handle_request(obj_req)) - return; + switch (img_req->state) { + case RBD_IMG_START: + rbd_assert(!*result); - img_req = obj_req->img_request; - spin_lock(&img_req->completion_lock); - rbd_obj_end_request(obj_req); - rbd_assert(img_req->pending_count); - if (--img_req->pending_count) { - spin_unlock(&img_req->completion_lock); - return; + ret = rbd_img_exclusive_lock(img_req); + if (ret < 0) { + *result = ret; + return true; + } + img_req->state = RBD_IMG_EXCLUSIVE_LOCK; + if (ret > 0) + goto again; + return false; + case RBD_IMG_EXCLUSIVE_LOCK: + if (*result) + return true; + + rbd_assert(!need_exclusive_lock(img_req) || + __rbd_is_lock_owner(rbd_dev)); + + rbd_img_object_requests(img_req); + if (!img_req->pending.num_pending) { + *result = img_req->pending.result; + img_req->state = RBD_IMG_OBJECT_REQUESTS; + goto again; + } + img_req->state = __RBD_IMG_OBJECT_REQUESTS; + return false; + case __RBD_IMG_OBJECT_REQUESTS: + if (!pending_result_dec(&img_req->pending, result)) + return false; + /* fall through */ + case RBD_IMG_OBJECT_REQUESTS: + return true; + default: + BUG(); + } +} + +/* + * Return true if @img_req is completed. + */ +static bool __rbd_img_handle_request(struct rbd_img_request *img_req, + int *result) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool done; + + if (need_exclusive_lock(img_req)) { + down_read(&rbd_dev->lock_rwsem); + mutex_lock(&img_req->state_mutex); + done = rbd_img_advance(img_req, result); + if (done) + rbd_lock_del_request(img_req); + mutex_unlock(&img_req->state_mutex); + up_read(&rbd_dev->lock_rwsem); + } else { + mutex_lock(&img_req->state_mutex); + done = rbd_img_advance(img_req, result); + mutex_unlock(&img_req->state_mutex); + } + + if (done && *result) { + rbd_assert(*result < 0); + rbd_warn(rbd_dev, "%s%s result %d", + test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "", + obj_op_name(img_req->op_type), *result); } + return done; +} + +static void rbd_img_handle_request(struct rbd_img_request *img_req, int result) +{ +again: + if (!__rbd_img_handle_request(img_req, &result)) + return; - spin_unlock(&img_req->completion_lock); if (test_bit(IMG_REQ_CHILD, &img_req->flags)) { - obj_req = img_req->obj_request; - rbd_img_end_child_request(img_req); - goto again; + struct rbd_obj_request *obj_req = img_req->obj_request; + + rbd_img_request_put(img_req); + if (__rbd_obj_handle_request(obj_req, &result)) { + img_req = obj_req->img_request; + goto again; + } + } else { + struct request *rq = img_req->rq; + + rbd_img_request_put(img_req); + blk_mq_end_request(rq, errno_to_blk_status(result)); } - rbd_img_end_request(img_req); } static const struct rbd_client_id rbd_empty_cid; @@ -2839,6 +3772,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie) { struct rbd_client_id cid = rbd_get_cid(rbd_dev); + rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; strcpy(rbd_dev->lock_cookie, cookie); rbd_set_owner_cid(rbd_dev, &cid); queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); @@ -2863,7 +3797,6 @@ static int rbd_lock(struct rbd_device *rbd_dev) if (ret) return ret; - rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; __rbd_lock(rbd_dev, cookie); return 0; } @@ -2882,7 +3815,7 @@ static void rbd_unlock(struct rbd_device *rbd_dev) ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, rbd_dev->lock_cookie); if (ret && ret != -ENOENT) - rbd_warn(rbd_dev, "failed to unlock: %d", ret); + rbd_warn(rbd_dev, "failed to unlock header: %d", ret); /* treat errors as the image is unlocked */ rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; @@ -3009,15 +3942,34 @@ e_inval: goto out; } -static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) +/* + * Either image request state machine(s) or rbd_add_acquire_lock() + * (i.e. "rbd map"). + */ +static void wake_lock_waiters(struct rbd_device *rbd_dev, int result) { - dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); + struct rbd_img_request *img_req; + + dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); + lockdep_assert_held_write(&rbd_dev->lock_rwsem); cancel_delayed_work(&rbd_dev->lock_dwork); - if (wake_all) - wake_up_all(&rbd_dev->lock_waitq); - else - wake_up(&rbd_dev->lock_waitq); + if (!completion_done(&rbd_dev->acquire_wait)) { + rbd_assert(list_empty(&rbd_dev->acquiring_list) && + list_empty(&rbd_dev->running_list)); + rbd_dev->acquire_err = result; + complete_all(&rbd_dev->acquire_wait); + return; + } + + list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) { + mutex_lock(&img_req->state_mutex); + rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK); + rbd_img_schedule(img_req, result); + mutex_unlock(&img_req->state_mutex); + } + + list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list); } static int get_lock_owner_info(struct rbd_device *rbd_dev, @@ -3132,13 +4084,10 @@ static int rbd_try_lock(struct rbd_device *rbd_dev) goto again; ret = find_watcher(rbd_dev, lockers); - if (ret) { - if (ret > 0) - ret = 0; /* have to request lock */ - goto out; - } + if (ret) + goto out; /* request lock or error */ - rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", + rbd_warn(rbd_dev, "breaking header lock owned by %s%llu", ENTITY_NAME(lockers[0].id.name)); ret = ceph_monc_blacklist_add(&client->monc, @@ -3165,53 +4114,90 @@ out: return ret; } +static int rbd_post_acquire_action(struct rbd_device *rbd_dev) +{ + int ret; + + if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) { + ret = rbd_object_map_open(rbd_dev); + if (ret) + return ret; + } + + return 0; +} + /* - * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED + * Return: + * 0 - lock acquired + * 1 - caller should call rbd_request_lock() + * <0 - error */ -static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, - int *pret) +static int rbd_try_acquire_lock(struct rbd_device *rbd_dev) { - enum rbd_lock_state lock_state; + int ret; down_read(&rbd_dev->lock_rwsem); dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, rbd_dev->lock_state); if (__rbd_is_lock_owner(rbd_dev)) { - lock_state = rbd_dev->lock_state; up_read(&rbd_dev->lock_rwsem); - return lock_state; + return 0; } up_read(&rbd_dev->lock_rwsem); down_write(&rbd_dev->lock_rwsem); dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, rbd_dev->lock_state); - if (!__rbd_is_lock_owner(rbd_dev)) { - *pret = rbd_try_lock(rbd_dev); - if (*pret) - rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); + if (__rbd_is_lock_owner(rbd_dev)) { + up_write(&rbd_dev->lock_rwsem); + return 0; + } + + ret = rbd_try_lock(rbd_dev); + if (ret < 0) { + rbd_warn(rbd_dev, "failed to lock header: %d", ret); + if (ret == -EBLACKLISTED) + goto out; + + ret = 1; /* request lock anyway */ + } + if (ret > 0) { + up_write(&rbd_dev->lock_rwsem); + return ret; + } + + rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED); + rbd_assert(list_empty(&rbd_dev->running_list)); + + ret = rbd_post_acquire_action(rbd_dev); + if (ret) { + rbd_warn(rbd_dev, "post-acquire action failed: %d", ret); + /* + * Can't stay in RBD_LOCK_STATE_LOCKED because + * rbd_lock_add_request() would let the request through, + * assuming that e.g. object map is locked and loaded. + */ + rbd_unlock(rbd_dev); } - lock_state = rbd_dev->lock_state; +out: + wake_lock_waiters(rbd_dev, ret); up_write(&rbd_dev->lock_rwsem); - return lock_state; + return ret; } static void rbd_acquire_lock(struct work_struct *work) { struct rbd_device *rbd_dev = container_of(to_delayed_work(work), struct rbd_device, lock_dwork); - enum rbd_lock_state lock_state; - int ret = 0; + int ret; dout("%s rbd_dev %p\n", __func__, rbd_dev); again: - lock_state = rbd_try_acquire_lock(rbd_dev, &ret); - if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { - if (lock_state == RBD_LOCK_STATE_LOCKED) - wake_requests(rbd_dev, true); - dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, - rbd_dev, lock_state, ret); + ret = rbd_try_acquire_lock(rbd_dev); + if (ret <= 0) { + dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret); return; } @@ -3220,16 +4206,9 @@ again: goto again; /* treat this as a dead client */ } else if (ret == -EROFS) { rbd_warn(rbd_dev, "peer will not release lock"); - /* - * If this is rbd_add_acquire_lock(), we want to fail - * immediately -- reuse BLACKLISTED flag. Otherwise we - * want to block. - */ - if (!(rbd_dev->disk->flags & GENHD_FL_UP)) { - set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); - /* wake "rbd map --exclusive" process */ - wake_requests(rbd_dev, false); - } + down_write(&rbd_dev->lock_rwsem); + wake_lock_waiters(rbd_dev, ret); + up_write(&rbd_dev->lock_rwsem); } else if (ret < 0) { rbd_warn(rbd_dev, "error requesting lock: %d", ret); mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, @@ -3246,43 +4225,67 @@ again: } } -/* - * lock_rwsem must be held for write - */ -static bool rbd_release_lock(struct rbd_device *rbd_dev) +static bool rbd_quiesce_lock(struct rbd_device *rbd_dev) { - dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, - rbd_dev->lock_state); + bool need_wait; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + lockdep_assert_held_write(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) return false; - rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; - downgrade_write(&rbd_dev->lock_rwsem); /* * Ensure that all in-flight IO is flushed. - * - * FIXME: ceph_osdc_sync() flushes the entire OSD client, which - * may be shared with other devices. */ - ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); + rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; + rbd_assert(!completion_done(&rbd_dev->releasing_wait)); + need_wait = !list_empty(&rbd_dev->running_list); + downgrade_write(&rbd_dev->lock_rwsem); + if (need_wait) + wait_for_completion(&rbd_dev->releasing_wait); up_read(&rbd_dev->lock_rwsem); down_write(&rbd_dev->lock_rwsem); - dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, - rbd_dev->lock_state); if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) return false; + rbd_assert(list_empty(&rbd_dev->running_list)); + return true; +} + +static void rbd_pre_release_action(struct rbd_device *rbd_dev) +{ + if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) + rbd_object_map_close(rbd_dev); +} + +static void __rbd_release_lock(struct rbd_device *rbd_dev) +{ + rbd_assert(list_empty(&rbd_dev->running_list)); + + rbd_pre_release_action(rbd_dev); rbd_unlock(rbd_dev); +} + +/* + * lock_rwsem must be held for write + */ +static void rbd_release_lock(struct rbd_device *rbd_dev) +{ + if (!rbd_quiesce_lock(rbd_dev)) + return; + + __rbd_release_lock(rbd_dev); + /* * Give others a chance to grab the lock - we would re-acquire - * almost immediately if we got new IO during ceph_osdc_sync() - * otherwise. We need to ack our own notifications, so this - * lock_dwork will be requeued from rbd_wait_state_locked() - * after wake_requests() in rbd_handle_released_lock(). + * almost immediately if we got new IO while draining the running + * list otherwise. We need to ack our own notifications, so this + * lock_dwork will be requeued from rbd_handle_released_lock() by + * way of maybe_kick_acquire(). */ cancel_delayed_work(&rbd_dev->lock_dwork); - return true; } static void rbd_release_lock_work(struct work_struct *work) @@ -3295,6 +4298,23 @@ static void rbd_release_lock_work(struct work_struct *work) up_write(&rbd_dev->lock_rwsem); } +static void maybe_kick_acquire(struct rbd_device *rbd_dev) +{ + bool have_requests; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + if (__rbd_is_lock_owner(rbd_dev)) + return; + + spin_lock(&rbd_dev->lock_lists_lock); + have_requests = !list_empty(&rbd_dev->acquiring_list); + spin_unlock(&rbd_dev->lock_lists_lock); + if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) { + dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + } +} + static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, void **p) { @@ -3324,8 +4344,7 @@ static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, down_read(&rbd_dev->lock_rwsem); } - if (!__rbd_is_lock_owner(rbd_dev)) - wake_requests(rbd_dev, false); + maybe_kick_acquire(rbd_dev); up_read(&rbd_dev->lock_rwsem); } @@ -3357,8 +4376,7 @@ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, down_read(&rbd_dev->lock_rwsem); } - if (!__rbd_is_lock_owner(rbd_dev)) - wake_requests(rbd_dev, false); + maybe_kick_acquire(rbd_dev); up_read(&rbd_dev->lock_rwsem); } @@ -3608,7 +4626,6 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev) static void rbd_unregister_watch(struct rbd_device *rbd_dev) { - WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); cancel_tasks_sync(rbd_dev); mutex_lock(&rbd_dev->watch_mutex); @@ -3630,7 +4647,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) char cookie[32]; int ret; - WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); + if (!rbd_quiesce_lock(rbd_dev)) + return; format_lock_cookie(rbd_dev, cookie); ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, @@ -3646,11 +4664,11 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) * Lock cookie cannot be updated on older OSDs, so do * a manual release and queue an acquire. */ - if (rbd_release_lock(rbd_dev)) - queue_delayed_work(rbd_dev->task_wq, - &rbd_dev->lock_dwork, 0); + __rbd_release_lock(rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); } else { __rbd_lock(rbd_dev, cookie); + wake_lock_waiters(rbd_dev, 0); } } @@ -3671,15 +4689,18 @@ static void rbd_reregister_watch(struct work_struct *work) ret = __rbd_register_watch(rbd_dev); if (ret) { rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); - if (ret == -EBLACKLISTED || ret == -ENOENT) { - set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); - wake_requests(rbd_dev, true); - } else { + if (ret != -EBLACKLISTED && ret != -ENOENT) { queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, RBD_RETRY_DELAY); + mutex_unlock(&rbd_dev->watch_mutex); + return; } + mutex_unlock(&rbd_dev->watch_mutex); + down_write(&rbd_dev->lock_rwsem); + wake_lock_waiters(rbd_dev, ret); + up_write(&rbd_dev->lock_rwsem); return; } @@ -3742,7 +4763,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, CEPH_OSD_FLAG_READ, req_page, outbound_size, - reply_page, &inbound_size); + &reply_page, &inbound_size); if (!ret) { memcpy(inbound, page_address(reply_page), inbound_size); ret = inbound_size; @@ -3754,54 +4775,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, return ret; } -/* - * lock_rwsem must be held for read - */ -static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire) -{ - DEFINE_WAIT(wait); - unsigned long timeout; - int ret = 0; - - if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) - return -EBLACKLISTED; - - if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) - return 0; - - if (!may_acquire) { - rbd_warn(rbd_dev, "exclusive lock required"); - return -EROFS; - } - - do { - /* - * Note the use of mod_delayed_work() in rbd_acquire_lock() - * and cancel_delayed_work() in wake_requests(). - */ - dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); - queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); - prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, - TASK_UNINTERRUPTIBLE); - up_read(&rbd_dev->lock_rwsem); - timeout = schedule_timeout(ceph_timeout_jiffies( - rbd_dev->opts->lock_timeout)); - down_read(&rbd_dev->lock_rwsem); - if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { - ret = -EBLACKLISTED; - break; - } - if (!timeout) { - rbd_warn(rbd_dev, "timed out waiting for lock"); - ret = -ETIMEDOUT; - break; - } - } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); - - finish_wait(&rbd_dev->lock_waitq, &wait); - return ret; -} - static void rbd_queue_workfn(struct work_struct *work) { struct request *rq = blk_mq_rq_from_pdu(work); @@ -3812,7 +4785,6 @@ static void rbd_queue_workfn(struct work_struct *work) u64 length = blk_rq_bytes(rq); enum obj_operation_type op_type; u64 mapping_size; - bool must_be_locked; int result; switch (req_op(rq)) { @@ -3886,21 +4858,10 @@ static void rbd_queue_workfn(struct work_struct *work) goto err_rq; } - must_be_locked = - (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && - (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read); - if (must_be_locked) { - down_read(&rbd_dev->lock_rwsem); - result = rbd_wait_state_locked(rbd_dev, - !rbd_dev->opts->exclusive); - if (result) - goto err_unlock; - } - img_request = rbd_img_request_create(rbd_dev, op_type, snapc); if (!img_request) { result = -ENOMEM; - goto err_unlock; + goto err_rq; } img_request->rq = rq; snapc = NULL; /* img_request consumes a ref */ @@ -3910,19 +4871,14 @@ static void rbd_queue_workfn(struct work_struct *work) else result = rbd_img_fill_from_bio(img_request, offset, length, rq->bio); - if (result || !img_request->pending_count) + if (result) goto err_img_request; - rbd_img_request_submit(img_request); - if (must_be_locked) - up_read(&rbd_dev->lock_rwsem); + rbd_img_handle_request(img_request, 0); return; err_img_request: rbd_img_request_put(img_request); -err_unlock: - if (must_be_locked) - up_read(&rbd_dev->lock_rwsem); err_rq: if (result) rbd_warn(rbd_dev, "%s %llx at %llx result %d", @@ -4589,7 +5545,13 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); - init_waitqueue_head(&rbd_dev->lock_waitq); + spin_lock_init(&rbd_dev->lock_lists_lock); + INIT_LIST_HEAD(&rbd_dev->acquiring_list); + INIT_LIST_HEAD(&rbd_dev->running_list); + init_completion(&rbd_dev->acquire_wait); + init_completion(&rbd_dev->releasing_wait); + + spin_lock_init(&rbd_dev->object_map_lock); rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; @@ -4772,6 +5734,32 @@ static int rbd_dev_v2_features(struct rbd_device *rbd_dev) &rbd_dev->header.features); } +/* + * These are generic image flags, but since they are used only for + * object map, store them in rbd_dev->object_map_flags. + * + * For the same reason, this function is called only on object map + * (re)load and not on header refresh. + */ +static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev) +{ + __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id); + __le64 flags; + int ret; + + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_flags", + &snapid, sizeof(snapid), + &flags, sizeof(flags)); + if (ret < 0) + return ret; + if (ret < sizeof(flags)) + return -EBADMSG; + + rbd_dev->object_map_flags = le64_to_cpu(flags); + return 0; +} + struct parent_image_info { u64 pool_id; const char *pool_ns; @@ -4829,7 +5817,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "parent_get", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), reply_page, &reply_len); + req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret == -EOPNOTSUPP ? 1 : ret; @@ -4841,7 +5829,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), reply_page, &reply_len); + req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret; @@ -4872,7 +5860,7 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "get_parent", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), reply_page, &reply_len); + req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret; @@ -5605,28 +6593,49 @@ static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) { down_write(&rbd_dev->lock_rwsem); if (__rbd_is_lock_owner(rbd_dev)) - rbd_unlock(rbd_dev); + __rbd_release_lock(rbd_dev); up_write(&rbd_dev->lock_rwsem); } +/* + * If the wait is interrupted, an error is returned even if the lock + * was successfully acquired. rbd_dev_image_unlock() will release it + * if needed. + */ static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) { - int ret; + long ret; if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { + if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read) + return 0; + rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); return -EINVAL; } - /* FIXME: "rbd map --exclusive" should be in interruptible */ - down_read(&rbd_dev->lock_rwsem); - ret = rbd_wait_state_locked(rbd_dev, true); - up_read(&rbd_dev->lock_rwsem); + if (rbd_dev->spec->snap_id != CEPH_NOSNAP) + return 0; + + rbd_assert(!rbd_is_lock_owner(rbd_dev)); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait, + ceph_timeout_jiffies(rbd_dev->opts->lock_timeout)); + if (ret > 0) + ret = rbd_dev->acquire_err; + else if (!ret) + ret = -ETIMEDOUT; + if (ret) { - rbd_warn(rbd_dev, "failed to acquire exclusive lock"); - return -EROFS; + rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret); + return ret; } + /* + * The lock may have been released by now, unless automatic lock + * transitions are disabled. + */ + rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev)); return 0; } @@ -5724,6 +6733,8 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev) struct rbd_image_header *header; rbd_dev_parent_put(rbd_dev); + rbd_object_map_free(rbd_dev); + rbd_dev_mapping_clear(rbd_dev); /* Free dynamic fields from the header, then zero it out */ @@ -5824,7 +6835,6 @@ out_err: static void rbd_dev_device_release(struct rbd_device *rbd_dev) { clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); - rbd_dev_mapping_clear(rbd_dev); rbd_free_disk(rbd_dev); if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); @@ -5858,23 +6868,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) if (ret) goto err_out_blkdev; - ret = rbd_dev_mapping_set(rbd_dev); - if (ret) - goto err_out_disk; - set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only); ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); if (ret) - goto err_out_mapping; + goto err_out_disk; set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); up_write(&rbd_dev->header_rwsem); return 0; -err_out_mapping: - rbd_dev_mapping_clear(rbd_dev); err_out_disk: rbd_free_disk(rbd_dev); err_out_blkdev: @@ -5975,6 +6979,17 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) goto err_out_probe; } + ret = rbd_dev_mapping_set(rbd_dev); + if (ret) + goto err_out_probe; + + if (rbd_dev->spec->snap_id != CEPH_NOSNAP && + (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) { + ret = rbd_object_map_load(rbd_dev); + if (ret) + goto err_out_probe; + } + if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { ret = rbd_dev_v2_parent_info(rbd_dev); if (ret) @@ -6071,11 +7086,9 @@ static ssize_t do_rbd_add(struct bus_type *bus, if (rc) goto err_out_image_probe; - if (rbd_dev->opts->exclusive) { - rc = rbd_add_acquire_lock(rbd_dev); - if (rc) - goto err_out_device_setup; - } + rc = rbd_add_acquire_lock(rbd_dev); + if (rc) + goto err_out_image_lock; /* Everything's ready. Announce the disk to the world. */ @@ -6101,7 +7114,6 @@ out: err_out_image_lock: rbd_dev_image_unlock(rbd_dev); -err_out_device_setup: rbd_dev_device_release(rbd_dev); err_out_image_probe: rbd_dev_image_release(rbd_dev); |