diff options
38 files changed, 2606 insertions, 1127 deletions
diff --git a/MAINTAINERS b/MAINTAINERS index d51808468713..f63e9d1468f6 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -3765,7 +3765,7 @@ F: arch/powerpc/platforms/cell/ CEPH COMMON CODE (LIBCEPH) M: Ilya Dryomov <idryomov@gmail.com> -M: "Yan, Zheng" <zyan@redhat.com> +M: Jeff Layton <jlayton@kernel.org> M: Sage Weil <sage@redhat.com> L: ceph-devel@vger.kernel.org W: http://ceph.com/ @@ -3777,7 +3777,7 @@ F: include/linux/ceph/ F: include/linux/crush/ CEPH DISTRIBUTED FILE SYSTEM CLIENT (CEPH) -M: "Yan, Zheng" <zyan@redhat.com> +M: Jeff Layton <jlayton@kernel.org> M: Sage Weil <sage@redhat.com> M: Ilya Dryomov <idryomov@gmail.com> L: ceph-devel@vger.kernel.org diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index e5009a34f9c2..3327192bb71f 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -115,6 +115,8 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_FEATURE_LAYERING (1ULL<<0) #define RBD_FEATURE_STRIPINGV2 (1ULL<<1) #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) +#define RBD_FEATURE_OBJECT_MAP (1ULL<<3) +#define RBD_FEATURE_FAST_DIFF (1ULL<<4) #define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5) #define RBD_FEATURE_DATA_POOL (1ULL<<7) #define RBD_FEATURE_OPERATIONS (1ULL<<8) @@ -122,6 +124,8 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ RBD_FEATURE_STRIPINGV2 | \ RBD_FEATURE_EXCLUSIVE_LOCK | \ + RBD_FEATURE_OBJECT_MAP | \ + RBD_FEATURE_FAST_DIFF | \ RBD_FEATURE_DEEP_FLATTEN | \ RBD_FEATURE_DATA_POOL | \ RBD_FEATURE_OPERATIONS) @@ -203,6 +207,11 @@ struct rbd_client { struct list_head node; }; +struct pending_result { + int result; /* first nonzero result */ + int num_pending; +}; + struct rbd_img_request; enum obj_request_type { @@ -219,6 +228,18 @@ enum obj_operation_type { OBJ_OP_ZEROOUT, }; +#define RBD_OBJ_FLAG_DELETION (1U << 0) +#define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1) +#define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2) +#define RBD_OBJ_FLAG_MAY_EXIST (1U << 3) +#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4) + +enum rbd_obj_read_state { + RBD_OBJ_READ_START = 1, + RBD_OBJ_READ_OBJECT, + RBD_OBJ_READ_PARENT, +}; + /* * Writes go through the following state machine to deal with * layering: @@ -245,17 +266,28 @@ enum obj_operation_type { * even if there is a parent). */ enum rbd_obj_write_state { - RBD_OBJ_WRITE_FLAT = 1, - RBD_OBJ_WRITE_GUARD, - RBD_OBJ_WRITE_READ_FROM_PARENT, - RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC, - RBD_OBJ_WRITE_COPYUP_OPS, + RBD_OBJ_WRITE_START = 1, + RBD_OBJ_WRITE_PRE_OBJECT_MAP, + RBD_OBJ_WRITE_OBJECT, + __RBD_OBJ_WRITE_COPYUP, + RBD_OBJ_WRITE_COPYUP, + RBD_OBJ_WRITE_POST_OBJECT_MAP, +}; + +enum rbd_obj_copyup_state { + RBD_OBJ_COPYUP_START = 1, + RBD_OBJ_COPYUP_READ_PARENT, + __RBD_OBJ_COPYUP_OBJECT_MAPS, + RBD_OBJ_COPYUP_OBJECT_MAPS, + __RBD_OBJ_COPYUP_WRITE_OBJECT, + RBD_OBJ_COPYUP_WRITE_OBJECT, }; struct rbd_obj_request { struct ceph_object_extent ex; + unsigned int flags; /* RBD_OBJ_FLAG_* */ union { - bool tried_parent; /* for reads */ + enum rbd_obj_read_state read_state; /* for reads */ enum rbd_obj_write_state write_state; /* for writes */ }; @@ -271,14 +303,15 @@ struct rbd_obj_request { u32 bvec_idx; }; }; + + enum rbd_obj_copyup_state copyup_state; struct bio_vec *copyup_bvecs; u32 copyup_bvec_count; - struct ceph_osd_request *osd_req; - - u64 xferred; /* bytes transferred */ - int result; + struct list_head osd_reqs; /* w/ r_private_item */ + struct mutex state_mutex; + struct pending_result pending; struct kref kref; }; @@ -287,11 +320,19 @@ enum img_req_flags { IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ }; +enum rbd_img_state { + RBD_IMG_START = 1, + RBD_IMG_EXCLUSIVE_LOCK, + __RBD_IMG_OBJECT_REQUESTS, + RBD_IMG_OBJECT_REQUESTS, +}; + struct rbd_img_request { struct rbd_device *rbd_dev; enum obj_operation_type op_type; enum obj_request_type data_type; unsigned long flags; + enum rbd_img_state state; union { u64 snap_id; /* for reads */ struct ceph_snap_context *snapc; /* for writes */ @@ -300,13 +341,14 @@ struct rbd_img_request { struct request *rq; /* block request */ struct rbd_obj_request *obj_request; /* obj req initiator */ }; - spinlock_t completion_lock; - u64 xferred;/* aggregate bytes transferred */ - int result; /* first nonzero obj_request result */ + struct list_head lock_item; struct list_head object_extents; /* obj_req.ex structs */ - u32 pending_count; + struct mutex state_mutex; + struct pending_result pending; + struct work_struct work; + int work_result; struct kref kref; }; @@ -380,7 +422,17 @@ struct rbd_device { struct work_struct released_lock_work; struct delayed_work lock_dwork; struct work_struct unlock_work; - wait_queue_head_t lock_waitq; + spinlock_t lock_lists_lock; + struct list_head acquiring_list; + struct list_head running_list; + struct completion acquire_wait; + int acquire_err; + struct completion releasing_wait; + + spinlock_t object_map_lock; + u8 *object_map; + u64 object_map_size; /* in objects */ + u64 object_map_flags; struct workqueue_struct *task_wq; @@ -408,12 +460,10 @@ struct rbd_device { * Flag bits for rbd_dev->flags: * - REMOVING (which is coupled with rbd_dev->open_count) is protected * by rbd_dev->lock - * - BLACKLISTED is protected by rbd_dev->lock_rwsem */ enum rbd_dev_flags { RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ - RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */ }; static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ @@ -466,6 +516,8 @@ static int minor_to_rbd_dev_id(int minor) static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) { + lockdep_assert_held(&rbd_dev->lock_rwsem); + return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; } @@ -583,6 +635,26 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, u8 *order, u64 *snap_size); static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, u64 *snap_features); +static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev); + +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result); +static void rbd_img_handle_request(struct rbd_img_request *img_req, int result); + +/* + * Return true if nothing else is pending. + */ +static bool pending_result_dec(struct pending_result *pending, int *result) +{ + rbd_assert(pending->num_pending > 0); + + if (*result && !pending->result) + pending->result = *result; + if (--pending->num_pending) + return false; + + *result = pending->result; + return true; +} static int rbd_open(struct block_device *bdev, fmode_t mode) { @@ -1317,6 +1389,8 @@ static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes) static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, u32 bytes) { + dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes); + switch (obj_req->img_request->data_type) { case OBJ_REQUEST_BIO: zero_bios(&obj_req->bio_pos, off, bytes); @@ -1339,13 +1413,6 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request) kref_put(&obj_request->kref, rbd_obj_request_destroy); } -static void rbd_img_request_get(struct rbd_img_request *img_request) -{ - dout("%s: img %p (was %d)\n", __func__, img_request, - kref_read(&img_request->kref)); - kref_get(&img_request->kref); -} - static void rbd_img_request_destroy(struct kref *kref); static void rbd_img_request_put(struct rbd_img_request *img_request) { @@ -1362,7 +1429,6 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, /* Image request now owns object's original reference */ obj_request->img_request = img_request; - img_request->pending_count++; dout("%s: img %p obj %p\n", __func__, img_request, obj_request); } @@ -1375,13 +1441,13 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, rbd_obj_request_put(obj_request); } -static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) +static void rbd_osd_submit(struct ceph_osd_request *osd_req) { - struct ceph_osd_request *osd_req = obj_request->osd_req; + struct rbd_obj_request *obj_req = osd_req->r_priv; - dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, - obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off, - obj_request->ex.oe_len, osd_req); + dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n", + __func__, osd_req, obj_req, obj_req->ex.oe_objno, + obj_req->ex.oe_off, obj_req->ex.oe_len); ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); } @@ -1457,41 +1523,38 @@ static bool rbd_img_is_write(struct rbd_img_request *img_req) } } -static void rbd_obj_handle_request(struct rbd_obj_request *obj_req); - static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) { struct rbd_obj_request *obj_req = osd_req->r_priv; + int result; dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, osd_req->r_result, obj_req); - rbd_assert(osd_req == obj_req->osd_req); - obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0; - if (!obj_req->result && !rbd_img_is_write(obj_req->img_request)) - obj_req->xferred = osd_req->r_result; + /* + * Writes aren't allowed to return a data payload. In some + * guarded write cases (e.g. stat + zero on an empty object) + * a stat response makes it through, but we don't care. + */ + if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request)) + result = 0; else - /* - * Writes aren't allowed to return a data payload. In some - * guarded write cases (e.g. stat + zero on an empty object) - * a stat response makes it through, but we don't care. - */ - obj_req->xferred = 0; + result = osd_req->r_result; - rbd_obj_handle_request(obj_req); + rbd_obj_handle_request(obj_req, result); } -static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) +static void rbd_osd_format_read(struct ceph_osd_request *osd_req) { - struct ceph_osd_request *osd_req = obj_request->osd_req; + struct rbd_obj_request *obj_request = osd_req->r_priv; osd_req->r_flags = CEPH_OSD_FLAG_READ; osd_req->r_snapid = obj_request->img_request->snap_id; } -static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) +static void rbd_osd_format_write(struct ceph_osd_request *osd_req) { - struct ceph_osd_request *osd_req = obj_request->osd_req; + struct rbd_obj_request *obj_request = osd_req->r_priv; osd_req->r_flags = CEPH_OSD_FLAG_WRITE; ktime_get_real_ts64(&osd_req->r_mtime); @@ -1499,19 +1562,21 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) } static struct ceph_osd_request * -__rbd_osd_req_create(struct rbd_obj_request *obj_req, - struct ceph_snap_context *snapc, unsigned int num_ops) +__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, + struct ceph_snap_context *snapc, int num_ops) { struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_request *req; const char *name_format = rbd_dev->image_format == 1 ? RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; + int ret; req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); if (!req) - return NULL; + return ERR_PTR(-ENOMEM); + list_add_tail(&req->r_private_item, &obj_req->osd_reqs); req->r_callback = rbd_osd_req_callback; req->r_priv = obj_req; @@ -1522,27 +1587,20 @@ __rbd_osd_req_create(struct rbd_obj_request *obj_req, ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); req->r_base_oloc.pool = rbd_dev->layout.pool_id; - if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, - rbd_dev->header.object_prefix, obj_req->ex.oe_objno)) - goto err_req; + ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, + rbd_dev->header.object_prefix, + obj_req->ex.oe_objno); + if (ret) + return ERR_PTR(ret); return req; - -err_req: - ceph_osdc_put_request(req); - return NULL; } static struct ceph_osd_request * -rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops) +rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops) { - return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc, - num_ops); -} - -static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) -{ - ceph_osdc_put_request(osd_req); + return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc, + num_ops); } static struct rbd_obj_request *rbd_obj_request_create(void) @@ -1554,6 +1612,8 @@ static struct rbd_obj_request *rbd_obj_request_create(void) return NULL; ceph_object_extent_init(&obj_request->ex); + INIT_LIST_HEAD(&obj_request->osd_reqs); + mutex_init(&obj_request->state_mutex); kref_init(&obj_request->kref); dout("%s %p\n", __func__, obj_request); @@ -1563,14 +1623,19 @@ static struct rbd_obj_request *rbd_obj_request_create(void) static void rbd_obj_request_destroy(struct kref *kref) { struct rbd_obj_request *obj_request; + struct ceph_osd_request *osd_req; u32 i; obj_request = container_of(kref, struct rbd_obj_request, kref); dout("%s: obj %p\n", __func__, obj_request); - if (obj_request->osd_req) - rbd_osd_req_destroy(obj_request->osd_req); + while (!list_empty(&obj_request->osd_reqs)) { + osd_req = list_first_entry(&obj_request->osd_reqs, + struct ceph_osd_request, r_private_item); + list_del_init(&osd_req->r_private_item); + ceph_osdc_put_request(osd_req); + } switch (obj_request->img_request->data_type) { case OBJ_REQUEST_NODATA: @@ -1684,8 +1749,9 @@ static struct rbd_img_request *rbd_img_request_create( if (rbd_dev_parent_get(rbd_dev)) img_request_layered_set(img_request); - spin_lock_init(&img_request->completion_lock); + INIT_LIST_HEAD(&img_request->lock_item); INIT_LIST_HEAD(&img_request->object_extents); + mutex_init(&img_request->state_mutex); kref_init(&img_request->kref); dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev, @@ -1703,6 +1769,7 @@ static void rbd_img_request_destroy(struct kref *kref) dout("%s: img %p\n", __func__, img_request); + WARN_ON(!list_empty(&img_request->lock_item)); for_each_obj_request_safe(img_request, obj_request, next_obj_request) rbd_img_obj_request_del(img_request, obj_request); @@ -1717,6 +1784,466 @@ static void rbd_img_request_destroy(struct kref *kref) kmem_cache_free(rbd_img_request_cache, img_request); } +#define BITS_PER_OBJ 2 +#define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ) +#define OBJ_MASK ((1 << BITS_PER_OBJ) - 1) + +static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno, + u64 *index, u8 *shift) +{ + u32 off; + + rbd_assert(objno < rbd_dev->object_map_size); + *index = div_u64_rem(objno, OBJS_PER_BYTE, &off); + *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ; +} + +static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno) +{ + u64 index; + u8 shift; + + lockdep_assert_held(&rbd_dev->object_map_lock); + __rbd_object_map_index(rbd_dev, objno, &index, &shift); + return (rbd_dev->object_map[index] >> shift) & OBJ_MASK; +} + +static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val) +{ + u64 index; + u8 shift; + u8 *p; + + lockdep_assert_held(&rbd_dev->object_map_lock); + rbd_assert(!(val & ~OBJ_MASK)); + + __rbd_object_map_index(rbd_dev, objno, &index, &shift); + p = &rbd_dev->object_map[index]; + *p = (*p & ~(OBJ_MASK << shift)) | (val << shift); +} + +static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno) +{ + u8 state; + + spin_lock(&rbd_dev->object_map_lock); + state = __rbd_object_map_get(rbd_dev, objno); + spin_unlock(&rbd_dev->object_map_lock); + return state; +} + +static bool use_object_map(struct rbd_device *rbd_dev) +{ + return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) && + !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)); +} + +static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno) +{ + u8 state; + + /* fall back to default logic if object map is disabled or invalid */ + if (!use_object_map(rbd_dev)) + return true; + + state = rbd_object_map_get(rbd_dev, objno); + return state != OBJECT_NONEXISTENT; +} + +static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id, + struct ceph_object_id *oid) +{ + if (snap_id == CEPH_NOSNAP) + ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX, + rbd_dev->spec->image_id); + else + ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX, + rbd_dev->spec->image_id, snap_id); +} + +static int rbd_object_map_lock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + u8 lock_type; + char *lock_tag; + struct ceph_locker *lockers; + u32 num_lockers; + bool broke_lock = false; + int ret; + + rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid); + +again: + ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, + CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0); + if (ret != -EBUSY || broke_lock) { + if (ret == -EEXIST) + ret = 0; /* already locked by myself */ + if (ret) + rbd_warn(rbd_dev, "failed to lock object map: %d", ret); + return ret; + } + + ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, &lock_type, &lock_tag, + &lockers, &num_lockers); + if (ret) { + if (ret == -ENOENT) + goto again; + + rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret); + return ret; + } + + kfree(lock_tag); + if (num_lockers == 0) + goto again; + + rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu", + ENTITY_NAME(lockers[0].id.name)); + + ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, lockers[0].id.cookie, + &lockers[0].id.name); + ceph_free_lockers(lockers, num_lockers); + if (ret) { + if (ret == -ENOENT) + goto again; + + rbd_warn(rbd_dev, "failed to break object map lock: %d", ret); + return ret; + } + + broke_lock = true; + goto again; +} + +static void rbd_object_map_unlock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + int ret; + + rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid); + + ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, + ""); + if (ret && ret != -ENOENT) + rbd_warn(rbd_dev, "failed to unlock object map: %d", ret); +} + +static int decode_object_map_header(void **p, void *end, u64 *object_map_size) +{ + u8 struct_v; + u32 struct_len; + u32 header_len; + void *header_end; + int ret; + + ceph_decode_32_safe(p, end, header_len, e_inval); + header_end = *p + header_len; + + ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v, + &struct_len); + if (ret) + return ret; + + ceph_decode_64_safe(p, end, *object_map_size, e_inval); + + *p = header_end; + return 0; + +e_inval: + return -EINVAL; +} + +static int __rbd_object_map_load(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + struct page **pages; + void *p, *end; + size_t reply_len; + u64 num_objects; + u64 object_map_bytes; + u64 object_map_size; + int num_pages; + int ret; + + rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size); + + num_objects = ceph_get_num_objects(&rbd_dev->layout, + rbd_dev->mapping.size); + object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ, + BITS_PER_BYTE); + num_pages = calc_pages_for(0, object_map_bytes) + 1; + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + reply_len = num_pages * PAGE_SIZE; + rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid); + ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc, + "rbd", "object_map_load", CEPH_OSD_FLAG_READ, + NULL, 0, pages, &reply_len); + if (ret) + goto out; + + p = page_address(pages[0]); + end = p + min(reply_len, (size_t)PAGE_SIZE); + ret = decode_object_map_header(&p, end, &object_map_size); + if (ret) + goto out; + + if (object_map_size != num_objects) { + rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu", + object_map_size, num_objects); + ret = -EINVAL; + goto out; + } + + if (offset_in_page(p) + object_map_bytes > reply_len) { + ret = -EINVAL; + goto out; + } + + rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL); + if (!rbd_dev->object_map) { + ret = -ENOMEM; + goto out; + } + + rbd_dev->object_map_size = object_map_size; + ceph_copy_from_page_vector(pages, rbd_dev->object_map, + offset_in_page(p), object_map_bytes); + +out: + ceph_release_page_vector(pages, num_pages); + return ret; +} + +static void rbd_object_map_free(struct rbd_device *rbd_dev) +{ + kvfree(rbd_dev->object_map); + rbd_dev->object_map = NULL; + rbd_dev->object_map_size = 0; +} + +static int rbd_object_map_load(struct rbd_device *rbd_dev) +{ + int ret; + + ret = __rbd_object_map_load(rbd_dev); + if (ret) + return ret; + + ret = rbd_dev_v2_get_flags(rbd_dev); + if (ret) { + rbd_object_map_free(rbd_dev); + return ret; + } + + if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID) + rbd_warn(rbd_dev, "object map is invalid"); + + return 0; +} + +static int rbd_object_map_open(struct rbd_device *rbd_dev) +{ + int ret; + + ret = rbd_object_map_lock(rbd_dev); + if (ret) + return ret; + + ret = rbd_object_map_load(rbd_dev); + if (ret) { + rbd_object_map_unlock(rbd_dev); + return ret; + } + + return 0; +} + +static void rbd_object_map_close(struct rbd_device *rbd_dev) +{ + rbd_object_map_free(rbd_dev); + rbd_object_map_unlock(rbd_dev); +} + +/* + * This function needs snap_id (or more precisely just something to + * distinguish between HEAD and snapshot object maps), new_state and + * current_state that were passed to rbd_object_map_update(). + * + * To avoid allocating and stashing a context we piggyback on the OSD + * request. A HEAD update has two ops (assert_locked). For new_state + * and current_state we decode our own object_map_update op, encoded in + * rbd_cls_object_map_update(). + */ +static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req, + struct ceph_osd_request *osd_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_osd_data *osd_data; + u64 objno; + u8 state, new_state, current_state; + bool has_current_state; + void *p; + + if (osd_req->r_result) + return osd_req->r_result; + + /* + * Nothing to do for a snapshot object map. + */ + if (osd_req->r_num_ops == 1) + return 0; + + /* + * Update in-memory HEAD object map. + */ + rbd_assert(osd_req->r_num_ops == 2); + osd_data = osd_req_op_data(osd_req, 1, cls, request_data); + rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES); + + p = page_address(osd_data->pages[0]); + objno = ceph_decode_64(&p); + rbd_assert(objno == obj_req->ex.oe_objno); + rbd_assert(ceph_decode_64(&p) == objno + 1); + new_state = ceph_decode_8(&p); + has_current_state = ceph_decode_8(&p); + if (has_current_state) + current_state = ceph_decode_8(&p); + + spin_lock(&rbd_dev->object_map_lock); + state = __rbd_object_map_get(rbd_dev, objno); + if (!has_current_state || current_state == state || + (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN)) + __rbd_object_map_set(rbd_dev, objno, new_state); + spin_unlock(&rbd_dev->object_map_lock); + + return 0; +} + +static void rbd_object_map_callback(struct ceph_osd_request *osd_req) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + int result; + + dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, + osd_req->r_result, obj_req); + + result = rbd_object_map_update_finish(obj_req, osd_req); + rbd_obj_handle_request(obj_req, result); +} + +static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state) +{ + u8 state = rbd_object_map_get(rbd_dev, objno); + + if (state == new_state || + (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) || + (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING)) + return false; + + return true; +} + +static int rbd_cls_object_map_update(struct ceph_osd_request *req, + int which, u64 objno, u8 new_state, + const u8 *current_state) +{ + struct page **pages; + void *p, *start; + int ret; + + ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update"); + if (ret) + return ret; + + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + p = start = page_address(pages[0]); + ceph_encode_64(&p, objno); + ceph_encode_64(&p, objno + 1); + ceph_encode_8(&p, new_state); + if (current_state) { + ceph_encode_8(&p, 1); + ceph_encode_8(&p, *current_state); + } else { + ceph_encode_8(&p, 0); + } + + osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0, + false, true); + return 0; +} + +/* + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error + */ +static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id, + u8 new_state, const u8 *current_state) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_osd_request *req; + int num_ops = 1; + int which = 0; + int ret; + + if (snap_id == CEPH_NOSNAP) { + if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state)) + return 1; + + num_ops++; /* assert_locked */ + } + + req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + list_add_tail(&req->r_private_item, &obj_req->osd_reqs); + req->r_callback = rbd_object_map_callback; + req->r_priv = obj_req; + + rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid); + ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); + req->r_flags = CEPH_OSD_FLAG_WRITE; + ktime_get_real_ts64(&req->r_mtime); + + if (snap_id == CEPH_NOSNAP) { + /* + * Protect against possible race conditions during lock + * ownership transitions. + */ + ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME, + CEPH_CLS_LOCK_EXCLUSIVE, "", ""); + if (ret) + return ret; + } + + ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno, + new_state, current_state); + if (ret) + return ret; + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + return ret; + + ceph_osdc_start_request(osdc, req, false); + return 0; +} + static void prune_extents(struct ceph_file_extent *img_extents, u32 *num_img_extents, u64 overlap) { @@ -1764,11 +2291,13 @@ static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req, return 0; } -static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) +static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which) { + struct rbd_obj_request *obj_req = osd_req->r_priv; + switch (obj_req->img_request->data_type) { case OBJ_REQUEST_BIO: - osd_req_op_extent_osd_data_bio(obj_req->osd_req, which, + osd_req_op_extent_osd_data_bio(osd_req, which, &obj_req->bio_pos, obj_req->ex.oe_len); break; @@ -1777,7 +2306,7 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) rbd_assert(obj_req->bvec_pos.iter.bi_size == obj_req->ex.oe_len); rbd_assert(obj_req->bvec_idx == obj_req->bvec_count); - osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which, + osd_req_op_extent_osd_data_bvec_pos(osd_req, which, &obj_req->bvec_pos); break; default: @@ -1785,22 +2314,7 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) } } -static int rbd_obj_setup_read(struct rbd_obj_request *obj_req) -{ - obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1); - if (!obj_req->osd_req) - return -ENOMEM; - - osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ, - obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); - rbd_osd_req_setup_data(obj_req, 0); - - rbd_osd_req_format_read(obj_req); - return 0; -} - -static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, - unsigned int which) +static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which) { struct page **pages; @@ -1816,45 +2330,60 @@ static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, if (IS_ERR(pages)) return PTR_ERR(pages); - osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0); - osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages, + osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0); + osd_req_op_raw_data_in_pages(osd_req, which, pages, 8 + sizeof(struct ceph_timespec), 0, false, true); return 0; } -static int count_write_ops(struct rbd_obj_request *obj_req) +static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which, + u32 bytes) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + int ret; + + ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup"); + if (ret) + return ret; + + osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs, + obj_req->copyup_bvec_count, bytes); + return 0; +} + +static int rbd_obj_init_read(struct rbd_obj_request *obj_req) { - return 2; /* setallochint + write/writefull */ + obj_req->read_state = RBD_OBJ_READ_START; + return 0; } -static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req, - unsigned int which) +static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req, + int which) { + struct rbd_obj_request *obj_req = osd_req->r_priv; struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; u16 opcode; - osd_req_op_alloc_hint_init(obj_req->osd_req, which++, - rbd_dev->layout.object_size, - rbd_dev->layout.object_size); + if (!use_object_map(rbd_dev) || + !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) { + osd_req_op_alloc_hint_init(osd_req, which++, + rbd_dev->layout.object_size, + rbd_dev->layout.object_size); + } if (rbd_obj_is_entire(obj_req)) opcode = CEPH_OSD_OP_WRITEFULL; else opcode = CEPH_OSD_OP_WRITE; - osd_req_op_extent_init(obj_req->osd_req, which, opcode, + osd_req_op_extent_init(osd_req, which, opcode, obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); - rbd_osd_req_setup_data(obj_req, which++); - - rbd_assert(which == obj_req->osd_req->r_num_ops); - rbd_osd_req_format_write(obj_req); + rbd_osd_setup_data(osd_req, which); } -static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) +static int rbd_obj_init_write(struct rbd_obj_request *obj_req) { - unsigned int num_osd_ops, which = 0; - bool need_guard; int ret; /* reverse map the entire object onto the parent */ @@ -1862,24 +2391,10 @@ static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) if (ret) return ret; - need_guard = rbd_obj_copyup_enabled(obj_req); - num_osd_ops = need_guard + count_write_ops(obj_req); - - obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); - if (!obj_req->osd_req) - return -ENOMEM; - - if (need_guard) { - ret = __rbd_obj_setup_stat(obj_req, which++); - if (ret) - return ret; + if (rbd_obj_copyup_enabled(obj_req)) + obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED; - obj_req->write_state = RBD_OBJ_WRITE_GUARD; - } else { - obj_req->write_state = RBD_OBJ_WRITE_FLAT; - } - - __rbd_obj_setup_write(obj_req, which); + obj_req->write_state = RBD_OBJ_WRITE_START; return 0; } @@ -1889,11 +2404,26 @@ static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req) CEPH_OSD_OP_ZERO; } -static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) +static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req, + int which) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + + if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) { + rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION); + osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0); + } else { + osd_req_op_extent_init(osd_req, which, + truncate_or_zero_opcode(obj_req), + obj_req->ex.oe_off, obj_req->ex.oe_len, + 0, 0); + } +} + +static int rbd_obj_init_discard(struct rbd_obj_request *obj_req) { struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; - u64 off = obj_req->ex.oe_off; - u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len; + u64 off, next_off; int ret; /* @@ -1906,10 +2436,17 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) */ if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size || !rbd_obj_is_tail(obj_req)) { - off = round_up(off, rbd_dev->opts->alloc_size); - next_off = round_down(next_off, rbd_dev->opts->alloc_size); + off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size); + next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len, + rbd_dev->opts->alloc_size); if (off >= next_off) return 1; + + dout("%s %p %llu~%llu -> %llu~%llu\n", __func__, + obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len, + off, next_off - off); + obj_req->ex.oe_off = off; + obj_req->ex.oe_len = next_off - off; } /* reverse map the entire object onto the parent */ @@ -1917,52 +2454,29 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) if (ret) return ret; - obj_req->osd_req = rbd_osd_req_create(obj_req, 1); - if (!obj_req->osd_req) - return -ENOMEM; - - if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) { - osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0); - } else { - dout("%s %p %llu~%llu -> %llu~%llu\n", __func__, - obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len, - off, next_off - off); - osd_req_op_extent_init(obj_req->osd_req, 0, - truncate_or_zero_opcode(obj_req), - off, next_off - off, 0, 0); - } + obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT; + if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) + obj_req->flags |= RBD_OBJ_FLAG_DELETION; - obj_req->write_state = RBD_OBJ_WRITE_FLAT; - rbd_osd_req_format_write(obj_req); + obj_req->write_state = RBD_OBJ_WRITE_START; return 0; } -static int count_zeroout_ops(struct rbd_obj_request *obj_req) -{ - int num_osd_ops; - - if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents && - !rbd_obj_copyup_enabled(obj_req)) - num_osd_ops = 2; /* create + truncate */ - else - num_osd_ops = 1; /* delete/truncate/zero */ - - return num_osd_ops; -} - -static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req, - unsigned int which) +static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req, + int which) { + struct rbd_obj_request *obj_req = osd_req->r_priv; u16 opcode; if (rbd_obj_is_entire(obj_req)) { if (obj_req->num_img_extents) { - if (!rbd_obj_copyup_enabled(obj_req)) - osd_req_op_init(obj_req->osd_req, which++, + if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)) + osd_req_op_init(osd_req, which++, CEPH_OSD_OP_CREATE, 0); opcode = CEPH_OSD_OP_TRUNCATE; } else { - osd_req_op_init(obj_req->osd_req, which++, + rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION); + osd_req_op_init(osd_req, which++, CEPH_OSD_OP_DELETE, 0); opcode = 0; } @@ -1971,18 +2485,13 @@ static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req, } if (opcode) - osd_req_op_extent_init(obj_req->osd_req, which++, opcode, + osd_req_op_extent_init(osd_req, which, opcode, obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); - - rbd_assert(which == obj_req->osd_req->r_num_ops); - rbd_osd_req_format_write(obj_req); } -static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req) +static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req) { - unsigned int num_osd_ops, which = 0; - bool need_guard; int ret; /* reverse map the entire object onto the parent */ @@ -1990,31 +2499,66 @@ static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req) if (ret) return ret; - need_guard = rbd_obj_copyup_enabled(obj_req); - num_osd_ops = need_guard + count_zeroout_ops(obj_req); + if (rbd_obj_copyup_enabled(obj_req)) + obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED; + if (!obj_req->num_img_extents) { + obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT; + if (rbd_obj_is_entire(obj_req)) + obj_req->flags |= RBD_OBJ_FLAG_DELETION; + } - obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); - if (!obj_req->osd_req) - return -ENOMEM; + obj_req->write_state = RBD_OBJ_WRITE_START; + return 0; +} - if (need_guard) { - ret = __rbd_obj_setup_stat(obj_req, which++); - if (ret) - return ret; +static int count_write_ops(struct rbd_obj_request *obj_req) +{ + struct rbd_img_request *img_req = obj_req->img_request; - obj_req->write_state = RBD_OBJ_WRITE_GUARD; - } else { - obj_req->write_state = RBD_OBJ_WRITE_FLAT; + switch (img_req->op_type) { + case OBJ_OP_WRITE: + if (!use_object_map(img_req->rbd_dev) || + !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) + return 2; /* setallochint + write/writefull */ + + return 1; /* write/writefull */ + case OBJ_OP_DISCARD: + return 1; /* delete/truncate/zero */ + case OBJ_OP_ZEROOUT: + if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents && + !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)) + return 2; /* create + truncate */ + + return 1; /* delete/truncate/zero */ + default: + BUG(); } +} - __rbd_obj_setup_zeroout(obj_req, which); - return 0; +static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req, + int which) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + + switch (obj_req->img_request->op_type) { + case OBJ_OP_WRITE: + __rbd_osd_setup_write_ops(osd_req, which); + break; + case OBJ_OP_DISCARD: + __rbd_osd_setup_discard_ops(osd_req, which); + break; + case OBJ_OP_ZEROOUT: + __rbd_osd_setup_zeroout_ops(osd_req, which); + break; + default: + BUG(); + } } /* - * For each object request in @img_req, allocate an OSD request, add - * individual OSD ops and prepare them for submission. The number of - * OSD ops depends on op_type and the overlap point (if any). + * Prune the list of object requests (adjust offset and/or length, drop + * redundant requests). Prepare object request state machines and image + * request state machine for execution. */ static int __rbd_img_fill_request(struct rbd_img_request *img_req) { @@ -2024,16 +2568,16 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req) for_each_obj_request_safe(img_req, obj_req, next_obj_req) { switch (img_req->op_type) { case OBJ_OP_READ: - ret = rbd_obj_setup_read(obj_req); + ret = rbd_obj_init_read(obj_req); break; case OBJ_OP_WRITE: - ret = rbd_obj_setup_write(obj_req); + ret = rbd_obj_init_write(obj_req); break; case OBJ_OP_DISCARD: - ret = rbd_obj_setup_discard(obj_req); + ret = rbd_obj_init_discard(obj_req); break; case OBJ_OP_ZEROOUT: - ret = rbd_obj_setup_zeroout(obj_req); + ret = rbd_obj_init_zeroout(obj_req); break; default: BUG(); @@ -2041,17 +2585,12 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req) if (ret < 0) return ret; if (ret > 0) { - img_req->xferred += obj_req->ex.oe_len; - img_req->pending_count--; rbd_img_obj_request_del(img_req, obj_req); continue; } - - ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); - if (ret) - return ret; } + img_req->state = RBD_IMG_START; return 0; } @@ -2340,17 +2879,55 @@ static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, &it); } -static void rbd_img_request_submit(struct rbd_img_request *img_request) +static void rbd_img_handle_request_work(struct work_struct *work) { - struct rbd_obj_request *obj_request; + struct rbd_img_request *img_req = + container_of(work, struct rbd_img_request, work); - dout("%s: img %p\n", __func__, img_request); + rbd_img_handle_request(img_req, img_req->work_result); +} - rbd_img_request_get(img_request); - for_each_obj_request(img_request, obj_request) - rbd_obj_request_submit(obj_request); +static void rbd_img_schedule(struct rbd_img_request *img_req, int result) +{ + INIT_WORK(&img_req->work, rbd_img_handle_request_work); + img_req->work_result = result; + queue_work(rbd_wq, &img_req->work); +} - rbd_img_request_put(img_request); +static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + + if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) { + obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST; + return true; + } + + dout("%s %p objno %llu assuming dne\n", __func__, obj_req, + obj_req->ex.oe_objno); + return false; +} + +static int rbd_obj_read_object(struct rbd_obj_request *obj_req) +{ + struct ceph_osd_request *osd_req; + int ret; + + osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); + + osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ, + obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); + rbd_osd_setup_data(osd_req, 0); + rbd_osd_format_read(osd_req); + + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); + if (ret) + return ret; + + rbd_osd_submit(osd_req); + return 0; } static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req) @@ -2396,51 +2973,144 @@ static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req) return ret; } - rbd_img_request_submit(child_img_req); + /* avoid parent chain recursion */ + rbd_img_schedule(child_img_req, 0); return 0; } -static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req) +static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result) { struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret; - if (obj_req->result == -ENOENT && - rbd_dev->parent_overlap && !obj_req->tried_parent) { - /* reverse map this object extent onto the parent */ - ret = rbd_obj_calc_img_extents(obj_req, false); +again: + switch (obj_req->read_state) { + case RBD_OBJ_READ_START: + rbd_assert(!*result); + + if (!rbd_obj_may_exist(obj_req)) { + *result = -ENOENT; + obj_req->read_state = RBD_OBJ_READ_OBJECT; + goto again; + } + + ret = rbd_obj_read_object(obj_req); if (ret) { - obj_req->result = ret; + *result = ret; return true; } - - if (obj_req->num_img_extents) { - obj_req->tried_parent = true; - ret = rbd_obj_read_from_parent(obj_req); + obj_req->read_state = RBD_OBJ_READ_OBJECT; + return false; + case RBD_OBJ_READ_OBJECT: + if (*result == -ENOENT && rbd_dev->parent_overlap) { + /* reverse map this object extent onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, false); if (ret) { - obj_req->result = ret; + *result = ret; return true; } - return false; + if (obj_req->num_img_extents) { + ret = rbd_obj_read_from_parent(obj_req); + if (ret) { + *result = ret; + return true; + } + obj_req->read_state = RBD_OBJ_READ_PARENT; + return false; + } + } + + /* + * -ENOENT means a hole in the image -- zero-fill the entire + * length of the request. A short read also implies zero-fill + * to the end of the request. + */ + if (*result == -ENOENT) { + rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len); + *result = 0; + } else if (*result >= 0) { + if (*result < obj_req->ex.oe_len) + rbd_obj_zero_range(obj_req, *result, + obj_req->ex.oe_len - *result); + else + rbd_assert(*result == obj_req->ex.oe_len); + *result = 0; } + return true; + case RBD_OBJ_READ_PARENT: + return true; + default: + BUG(); } +} - /* - * -ENOENT means a hole in the image -- zero-fill the entire - * length of the request. A short read also implies zero-fill - * to the end of the request. In both cases we update xferred - * count to indicate the whole request was satisfied. - */ - if (obj_req->result == -ENOENT || - (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) { - rbd_assert(!obj_req->xferred || !obj_req->result); - rbd_obj_zero_range(obj_req, obj_req->xferred, - obj_req->ex.oe_len - obj_req->xferred); - obj_req->result = 0; - obj_req->xferred = obj_req->ex.oe_len; +static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + + if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) + obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST; + + if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) && + (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) { + dout("%s %p noop for nonexistent\n", __func__, obj_req); + return true; } - return true; + return false; +} + +/* + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error + */ +static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u8 new_state; + + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return 1; + + if (obj_req->flags & RBD_OBJ_FLAG_DELETION) + new_state = OBJECT_PENDING; + else + new_state = OBJECT_EXISTS; + + return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL); +} + +static int rbd_obj_write_object(struct rbd_obj_request *obj_req) +{ + struct ceph_osd_request *osd_req; + int num_ops = count_write_ops(obj_req); + int which = 0; + int ret; + + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) + num_ops++; /* stat */ + + osd_req = rbd_obj_add_osd_request(obj_req, num_ops); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); + + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) { + ret = rbd_osd_setup_stat(osd_req, which++); + if (ret) + return ret; + } + + rbd_osd_setup_write_ops(osd_req, which); + rbd_osd_format_write(osd_req); + + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); + if (ret) + return ret; + + rbd_osd_submit(osd_req); + return 0; } /* @@ -2463,123 +3133,67 @@ static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) #define MODS_ONLY U32_MAX -static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req, - u32 bytes) +static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req, + u32 bytes) { + struct ceph_osd_request *osd_req; int ret; dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); - rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); rbd_assert(bytes > 0 && bytes != MODS_ONLY); - rbd_osd_req_destroy(obj_req->osd_req); - obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1); - if (!obj_req->osd_req) - return -ENOMEM; + osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); - ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup"); + ret = rbd_osd_setup_copyup(osd_req, 0, bytes); if (ret) return ret; - osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, - obj_req->copyup_bvecs, - obj_req->copyup_bvec_count, - bytes); - rbd_osd_req_format_write(obj_req); + rbd_osd_format_write(osd_req); - ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); if (ret) return ret; - rbd_obj_request_submit(obj_req); + rbd_osd_submit(osd_req); return 0; } -static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes) +static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req, + u32 bytes) { - struct rbd_img_request *img_req = obj_req->img_request; - unsigned int num_osd_ops = (bytes != MODS_ONLY); - unsigned int which = 0; + struct ceph_osd_request *osd_req; + int num_ops = count_write_ops(obj_req); + int which = 0; int ret; dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); - rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT || - obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL); - rbd_osd_req_destroy(obj_req->osd_req); - switch (img_req->op_type) { - case OBJ_OP_WRITE: - num_osd_ops += count_write_ops(obj_req); - break; - case OBJ_OP_ZEROOUT: - num_osd_ops += count_zeroout_ops(obj_req); - break; - default: - BUG(); - } + if (bytes != MODS_ONLY) + num_ops++; /* copyup */ - obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); - if (!obj_req->osd_req) - return -ENOMEM; + osd_req = rbd_obj_add_osd_request(obj_req, num_ops); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); if (bytes != MODS_ONLY) { - ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd", - "copyup"); + ret = rbd_osd_setup_copyup(osd_req, which++, bytes); if (ret) return ret; - - osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++, - obj_req->copyup_bvecs, - obj_req->copyup_bvec_count, - bytes); } - switch (img_req->op_type) { - case OBJ_OP_WRITE: - __rbd_obj_setup_write(obj_req, which); - break; - case OBJ_OP_ZEROOUT: - __rbd_obj_setup_zeroout(obj_req, which); - break; - default: - BUG(); - } + rbd_osd_setup_write_ops(osd_req, which); + rbd_osd_format_write(osd_req); - ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); if (ret) return ret; - rbd_obj_request_submit(obj_req); + rbd_osd_submit(osd_req); return 0; } -static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) -{ - /* - * Only send non-zero copyup data to save some I/O and network - * bandwidth -- zero copyup data is equivalent to the object not - * existing. - */ - if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) { - dout("%s obj_req %p detected zeroes\n", __func__, obj_req); - bytes = 0; - } - - if (obj_req->img_request->snapc->num_snaps && bytes > 0) { - /* - * Send a copyup request with an empty snapshot context to - * deep-copyup the object through all existing snapshots. - * A second request with the current snapshot context will be - * sent for the actual modification. - */ - obj_req->write_state = RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC; - return rbd_obj_issue_copyup_empty_snapc(obj_req, bytes); - } - - obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS; - return rbd_obj_issue_copyup_ops(obj_req, bytes); -} - static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) { u32 i; @@ -2608,7 +3222,12 @@ static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) return 0; } -static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) +/* + * The target object doesn't exist. Read the data for the entire + * target object up to the overlap point (if any) from the parent, + * so we can use it for a copyup. + */ +static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req) { struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret; @@ -2623,178 +3242,492 @@ static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) * request -- pass MODS_ONLY since the copyup isn't needed * anymore. */ - obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS; - return rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY); + return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY); } ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req)); if (ret) return ret; - obj_req->write_state = RBD_OBJ_WRITE_READ_FROM_PARENT; return rbd_obj_read_from_parent(obj_req); } -static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req) +static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req) { + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_snap_context *snapc = obj_req->img_request->snapc; + u8 new_state; + u32 i; int ret; - switch (obj_req->write_state) { - case RBD_OBJ_WRITE_GUARD: - rbd_assert(!obj_req->xferred); - if (obj_req->result == -ENOENT) { - /* - * The target object doesn't exist. Read the data for - * the entire target object up to the overlap point (if - * any) from the parent, so we can use it for a copyup. - */ - ret = rbd_obj_handle_write_guard(obj_req); - if (ret) { - obj_req->result = ret; - return true; - } - return false; + rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending); + + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return; + + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS) + return; + + for (i = 0; i < snapc->num_snaps; i++) { + if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) && + i + 1 < snapc->num_snaps) + new_state = OBJECT_EXISTS_CLEAN; + else + new_state = OBJECT_EXISTS; + + ret = rbd_object_map_update(obj_req, snapc->snaps[i], + new_state, NULL); + if (ret < 0) { + obj_req->pending.result = ret; + return; } - /* fall through */ - case RBD_OBJ_WRITE_FLAT: - case RBD_OBJ_WRITE_COPYUP_OPS: - if (!obj_req->result) - /* - * There is no such thing as a successful short - * write -- indicate the whole request was satisfied. - */ - obj_req->xferred = obj_req->ex.oe_len; - return true; - case RBD_OBJ_WRITE_READ_FROM_PARENT: - if (obj_req->result) - return true; - rbd_assert(obj_req->xferred); - ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred); + rbd_assert(!ret); + obj_req->pending.num_pending++; + } +} + +static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req) +{ + u32 bytes = rbd_obj_img_extents_bytes(obj_req); + int ret; + + rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending); + + /* + * Only send non-zero copyup data to save some I/O and network + * bandwidth -- zero copyup data is equivalent to the object not + * existing. + */ + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS) + bytes = 0; + + if (obj_req->img_request->snapc->num_snaps && bytes > 0) { + /* + * Send a copyup request with an empty snapshot context to + * deep-copyup the object through all existing snapshots. + * A second request with the current snapshot context will be + * sent for the actual modification. + */ + ret = rbd_obj_copyup_empty_snapc(obj_req, bytes); + if (ret) { + obj_req->pending.result = ret; + return; + } + + obj_req->pending.num_pending++; + bytes = MODS_ONLY; + } + + ret = rbd_obj_copyup_current_snapc(obj_req, bytes); + if (ret) { + obj_req->pending.result = ret; + return; + } + + obj_req->pending.num_pending++; +} + +static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; + +again: + switch (obj_req->copyup_state) { + case RBD_OBJ_COPYUP_START: + rbd_assert(!*result); + + ret = rbd_obj_copyup_read_parent(obj_req); if (ret) { - obj_req->result = ret; - obj_req->xferred = 0; + *result = ret; return true; } + if (obj_req->num_img_extents) + obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT; + else + obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT; return false; - case RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC: - if (obj_req->result) + case RBD_OBJ_COPYUP_READ_PARENT: + if (*result) return true; - obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS; - ret = rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY); - if (ret) { - obj_req->result = ret; + if (is_zero_bvecs(obj_req->copyup_bvecs, + rbd_obj_img_extents_bytes(obj_req))) { + dout("%s %p detected zeros\n", __func__, obj_req); + obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS; + } + + rbd_obj_copyup_object_maps(obj_req); + if (!obj_req->pending.num_pending) { + *result = obj_req->pending.result; + obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS; + goto again; + } + obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS; + return false; + case __RBD_OBJ_COPYUP_OBJECT_MAPS: + if (!pending_result_dec(&obj_req->pending, result)) + return false; + /* fall through */ + case RBD_OBJ_COPYUP_OBJECT_MAPS: + if (*result) { + rbd_warn(rbd_dev, "snap object map update failed: %d", + *result); return true; } + + rbd_obj_copyup_write_object(obj_req); + if (!obj_req->pending.num_pending) { + *result = obj_req->pending.result; + obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT; + goto again; + } + obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT; return false; + case __RBD_OBJ_COPYUP_WRITE_OBJECT: + if (!pending_result_dec(&obj_req->pending, result)) + return false; + /* fall through */ + case RBD_OBJ_COPYUP_WRITE_OBJECT: + return true; default: BUG(); } } /* - * Returns true if @obj_req is completed, or false otherwise. + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error */ -static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req) +static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req) { - switch (obj_req->img_request->op_type) { - case OBJ_OP_READ: - return rbd_obj_handle_read(obj_req); - case OBJ_OP_WRITE: - return rbd_obj_handle_write(obj_req); - case OBJ_OP_DISCARD: - case OBJ_OP_ZEROOUT: - if (rbd_obj_handle_write(obj_req)) { + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u8 current_state = OBJECT_PENDING; + + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return 1; + + if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION)) + return 1; + + return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT, + ¤t_state); +} + +static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; + +again: + switch (obj_req->write_state) { + case RBD_OBJ_WRITE_START: + rbd_assert(!*result); + + if (rbd_obj_write_is_noop(obj_req)) + return true; + + ret = rbd_obj_write_pre_object_map(obj_req); + if (ret < 0) { + *result = ret; + return true; + } + obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP; + if (ret > 0) + goto again; + return false; + case RBD_OBJ_WRITE_PRE_OBJECT_MAP: + if (*result) { + rbd_warn(rbd_dev, "pre object map update failed: %d", + *result); + return true; + } + ret = rbd_obj_write_object(obj_req); + if (ret) { + *result = ret; + return true; + } + obj_req->write_state = RBD_OBJ_WRITE_OBJECT; + return false; + case RBD_OBJ_WRITE_OBJECT: + if (*result == -ENOENT) { + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) { + *result = 0; + obj_req->copyup_state = RBD_OBJ_COPYUP_START; + obj_req->write_state = __RBD_OBJ_WRITE_COPYUP; + goto again; + } /* - * Hide -ENOENT from delete/truncate/zero -- discarding - * a non-existent object is not a problem. + * On a non-existent object: + * delete - -ENOENT, truncate/zero - 0 */ - if (obj_req->result == -ENOENT) { - obj_req->result = 0; - obj_req->xferred = obj_req->ex.oe_len; - } + if (obj_req->flags & RBD_OBJ_FLAG_DELETION) + *result = 0; + } + if (*result) + return true; + + obj_req->write_state = RBD_OBJ_WRITE_COPYUP; + goto again; + case __RBD_OBJ_WRITE_COPYUP: + if (!rbd_obj_advance_copyup(obj_req, result)) + return false; + /* fall through */ + case RBD_OBJ_WRITE_COPYUP: + if (*result) { + rbd_warn(rbd_dev, "copyup failed: %d", *result); + return true; + } + ret = rbd_obj_write_post_object_map(obj_req); + if (ret < 0) { + *result = ret; return true; } + obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP; + if (ret > 0) + goto again; return false; + case RBD_OBJ_WRITE_POST_OBJECT_MAP: + if (*result) + rbd_warn(rbd_dev, "post object map update failed: %d", + *result); + return true; default: BUG(); } } -static void rbd_obj_end_request(struct rbd_obj_request *obj_req) +/* + * Return true if @obj_req is completed. + */ +static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req, + int *result) { struct rbd_img_request *img_req = obj_req->img_request; + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool done; - rbd_assert((!obj_req->result && - obj_req->xferred == obj_req->ex.oe_len) || - (obj_req->result < 0 && !obj_req->xferred)); - if (!obj_req->result) { - img_req->xferred += obj_req->xferred; - return; - } + mutex_lock(&obj_req->state_mutex); + if (!rbd_img_is_write(img_req)) + done = rbd_obj_advance_read(obj_req, result); + else + done = rbd_obj_advance_write(obj_req, result); + mutex_unlock(&obj_req->state_mutex); - rbd_warn(img_req->rbd_dev, - "%s at objno %llu %llu~%llu result %d xferred %llu", - obj_op_name(img_req->op_type), obj_req->ex.oe_objno, - obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result, - obj_req->xferred); - if (!img_req->result) { - img_req->result = obj_req->result; - img_req->xferred = 0; + if (done && *result) { + rbd_assert(*result < 0); + rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d", + obj_op_name(img_req->op_type), obj_req->ex.oe_objno, + obj_req->ex.oe_off, obj_req->ex.oe_len, *result); } + return done; } -static void rbd_img_end_child_request(struct rbd_img_request *img_req) +/* + * This is open-coded in rbd_img_handle_request() to avoid parent chain + * recursion. + */ +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result) { - struct rbd_obj_request *obj_req = img_req->obj_request; + if (__rbd_obj_handle_request(obj_req, &result)) + rbd_img_handle_request(obj_req->img_request, result); +} - rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags)); - rbd_assert((!img_req->result && - img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) || - (img_req->result < 0 && !img_req->xferred)); +static bool need_exclusive_lock(struct rbd_img_request *img_req) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; - obj_req->result = img_req->result; - obj_req->xferred = img_req->xferred; - rbd_img_request_put(img_req); + if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) + return false; + + if (rbd_dev->spec->snap_id != CEPH_NOSNAP) + return false; + + rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); + if (rbd_dev->opts->lock_on_read || + (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return true; + + return rbd_img_is_write(img_req); } -static void rbd_img_end_request(struct rbd_img_request *img_req) +static bool rbd_lock_add_request(struct rbd_img_request *img_req) { - rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); - rbd_assert((!img_req->result && - img_req->xferred == blk_rq_bytes(img_req->rq)) || - (img_req->result < 0 && !img_req->xferred)); + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool locked; + + lockdep_assert_held(&rbd_dev->lock_rwsem); + locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED; + spin_lock(&rbd_dev->lock_lists_lock); + rbd_assert(list_empty(&img_req->lock_item)); + if (!locked) + list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list); + else + list_add_tail(&img_req->lock_item, &rbd_dev->running_list); + spin_unlock(&rbd_dev->lock_lists_lock); + return locked; +} + +static void rbd_lock_del_request(struct rbd_img_request *img_req) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool need_wakeup; - blk_mq_end_request(img_req->rq, - errno_to_blk_status(img_req->result)); - rbd_img_request_put(img_req); + lockdep_assert_held(&rbd_dev->lock_rwsem); + spin_lock(&rbd_dev->lock_lists_lock); + rbd_assert(!list_empty(&img_req->lock_item)); + list_del_init(&img_req->lock_item); + need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING && + list_empty(&rbd_dev->running_list)); + spin_unlock(&rbd_dev->lock_lists_lock); + if (need_wakeup) + complete(&rbd_dev->releasing_wait); } -static void rbd_obj_handle_request(struct rbd_obj_request *obj_req) +static int rbd_img_exclusive_lock(struct rbd_img_request *img_req) { - struct rbd_img_request *img_req; + struct rbd_device *rbd_dev = img_req->rbd_dev; + + if (!need_exclusive_lock(img_req)) + return 1; + + if (rbd_lock_add_request(img_req)) + return 1; + + if (rbd_dev->opts->exclusive) { + WARN_ON(1); /* lock got released? */ + return -EROFS; + } + + /* + * Note the use of mod_delayed_work() in rbd_acquire_lock() + * and cancel_delayed_work() in wake_lock_waiters(). + */ + dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + return 0; +} + +static void rbd_img_object_requests(struct rbd_img_request *img_req) +{ + struct rbd_obj_request *obj_req; + + rbd_assert(!img_req->pending.result && !img_req->pending.num_pending); + + for_each_obj_request(img_req, obj_req) { + int result = 0; + + if (__rbd_obj_handle_request(obj_req, &result)) { + if (result) { + img_req->pending.result = result; + return; + } + } else { + img_req->pending.num_pending++; + } + } +} + +static bool rbd_img_advance(struct rbd_img_request *img_req, int *result) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + int ret; again: - if (!__rbd_obj_handle_request(obj_req)) - return; + switch (img_req->state) { + case RBD_IMG_START: + rbd_assert(!*result); - img_req = obj_req->img_request; - spin_lock(&img_req->completion_lock); - rbd_obj_end_request(obj_req); - rbd_assert(img_req->pending_count); - if (--img_req->pending_count) { - spin_unlock(&img_req->completion_lock); - return; + ret = rbd_img_exclusive_lock(img_req); + if (ret < 0) { + *result = ret; + return true; + } + img_req->state = RBD_IMG_EXCLUSIVE_LOCK; + if (ret > 0) + goto again; + return false; + case RBD_IMG_EXCLUSIVE_LOCK: + if (*result) + return true; + + rbd_assert(!need_exclusive_lock(img_req) || + __rbd_is_lock_owner(rbd_dev)); + + rbd_img_object_requests(img_req); + if (!img_req->pending.num_pending) { + *result = img_req->pending.result; + img_req->state = RBD_IMG_OBJECT_REQUESTS; + goto again; + } + img_req->state = __RBD_IMG_OBJECT_REQUESTS; + return false; + case __RBD_IMG_OBJECT_REQUESTS: + if (!pending_result_dec(&img_req->pending, result)) + return false; + /* fall through */ + case RBD_IMG_OBJECT_REQUESTS: + return true; + default: + BUG(); + } +} + +/* + * Return true if @img_req is completed. + */ +static bool __rbd_img_handle_request(struct rbd_img_request *img_req, + int *result) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool done; + + if (need_exclusive_lock(img_req)) { + down_read(&rbd_dev->lock_rwsem); + mutex_lock(&img_req->state_mutex); + done = rbd_img_advance(img_req, result); + if (done) + rbd_lock_del_request(img_req); + mutex_unlock(&img_req->state_mutex); + up_read(&rbd_dev->lock_rwsem); + } else { + mutex_lock(&img_req->state_mutex); + done = rbd_img_advance(img_req, result); + mutex_unlock(&img_req->state_mutex); + } + + if (done && *result) { + rbd_assert(*result < 0); + rbd_warn(rbd_dev, "%s%s result %d", + test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "", + obj_op_name(img_req->op_type), *result); } + return done; +} + +static void rbd_img_handle_request(struct rbd_img_request *img_req, int result) +{ +again: + if (!__rbd_img_handle_request(img_req, &result)) + return; - spin_unlock(&img_req->completion_lock); if (test_bit(IMG_REQ_CHILD, &img_req->flags)) { - obj_req = img_req->obj_request; - rbd_img_end_child_request(img_req); - goto again; + struct rbd_obj_request *obj_req = img_req->obj_request; + + rbd_img_request_put(img_req); + if (__rbd_obj_handle_request(obj_req, &result)) { + img_req = obj_req->img_request; + goto again; + } + } else { + struct request *rq = img_req->rq; + + rbd_img_request_put(img_req); + blk_mq_end_request(rq, errno_to_blk_status(result)); } - rbd_img_end_request(img_req); } static const struct rbd_client_id rbd_empty_cid; @@ -2839,6 +3772,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie) { struct rbd_client_id cid = rbd_get_cid(rbd_dev); + rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; strcpy(rbd_dev->lock_cookie, cookie); rbd_set_owner_cid(rbd_dev, &cid); queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); @@ -2863,7 +3797,6 @@ static int rbd_lock(struct rbd_device *rbd_dev) if (ret) return ret; - rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; __rbd_lock(rbd_dev, cookie); return 0; } @@ -2882,7 +3815,7 @@ static void rbd_unlock(struct rbd_device *rbd_dev) ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, rbd_dev->lock_cookie); if (ret && ret != -ENOENT) - rbd_warn(rbd_dev, "failed to unlock: %d", ret); + rbd_warn(rbd_dev, "failed to unlock header: %d", ret); /* treat errors as the image is unlocked */ rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; @@ -3009,15 +3942,34 @@ e_inval: goto out; } -static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) +/* + * Either image request state machine(s) or rbd_add_acquire_lock() + * (i.e. "rbd map"). + */ +static void wake_lock_waiters(struct rbd_device *rbd_dev, int result) { - dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); + struct rbd_img_request *img_req; + + dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); + lockdep_assert_held_write(&rbd_dev->lock_rwsem); cancel_delayed_work(&rbd_dev->lock_dwork); - if (wake_all) - wake_up_all(&rbd_dev->lock_waitq); - else - wake_up(&rbd_dev->lock_waitq); + if (!completion_done(&rbd_dev->acquire_wait)) { + rbd_assert(list_empty(&rbd_dev->acquiring_list) && + list_empty(&rbd_dev->running_list)); + rbd_dev->acquire_err = result; + complete_all(&rbd_dev->acquire_wait); + return; + } + + list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) { + mutex_lock(&img_req->state_mutex); + rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK); + rbd_img_schedule(img_req, result); + mutex_unlock(&img_req->state_mutex); + } + + list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list); } static int get_lock_owner_info(struct rbd_device *rbd_dev, @@ -3132,13 +4084,10 @@ static int rbd_try_lock(struct rbd_device *rbd_dev) goto again; ret = find_watcher(rbd_dev, lockers); - if (ret) { - if (ret > 0) - ret = 0; /* have to request lock */ - goto out; - } + if (ret) + goto out; /* request lock or error */ - rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", + rbd_warn(rbd_dev, "breaking header lock owned by %s%llu", ENTITY_NAME(lockers[0].id.name)); ret = ceph_monc_blacklist_add(&client->monc, @@ -3165,53 +4114,90 @@ out: return ret; } +static int rbd_post_acquire_action(struct rbd_device *rbd_dev) +{ + int ret; + + if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) { + ret = rbd_object_map_open(rbd_dev); + if (ret) + return ret; + } + + return 0; +} + /* - * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED + * Return: + * 0 - lock acquired + * 1 - caller should call rbd_request_lock() + * <0 - error */ -static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, - int *pret) +static int rbd_try_acquire_lock(struct rbd_device *rbd_dev) { - enum rbd_lock_state lock_state; + int ret; down_read(&rbd_dev->lock_rwsem); dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, rbd_dev->lock_state); if (__rbd_is_lock_owner(rbd_dev)) { - lock_state = rbd_dev->lock_state; up_read(&rbd_dev->lock_rwsem); - return lock_state; + return 0; } up_read(&rbd_dev->lock_rwsem); down_write(&rbd_dev->lock_rwsem); dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, rbd_dev->lock_state); - if (!__rbd_is_lock_owner(rbd_dev)) { - *pret = rbd_try_lock(rbd_dev); - if (*pret) - rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); + if (__rbd_is_lock_owner(rbd_dev)) { + up_write(&rbd_dev->lock_rwsem); + return 0; + } + + ret = rbd_try_lock(rbd_dev); + if (ret < 0) { + rbd_warn(rbd_dev, "failed to lock header: %d", ret); + if (ret == -EBLACKLISTED) + goto out; + + ret = 1; /* request lock anyway */ + } + if (ret > 0) { + up_write(&rbd_dev->lock_rwsem); + return ret; + } + + rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED); + rbd_assert(list_empty(&rbd_dev->running_list)); + + ret = rbd_post_acquire_action(rbd_dev); + if (ret) { + rbd_warn(rbd_dev, "post-acquire action failed: %d", ret); + /* + * Can't stay in RBD_LOCK_STATE_LOCKED because + * rbd_lock_add_request() would let the request through, + * assuming that e.g. object map is locked and loaded. + */ + rbd_unlock(rbd_dev); } - lock_state = rbd_dev->lock_state; +out: + wake_lock_waiters(rbd_dev, ret); up_write(&rbd_dev->lock_rwsem); - return lock_state; + return ret; } static void rbd_acquire_lock(struct work_struct *work) { struct rbd_device *rbd_dev = container_of(to_delayed_work(work), struct rbd_device, lock_dwork); - enum rbd_lock_state lock_state; - int ret = 0; + int ret; dout("%s rbd_dev %p\n", __func__, rbd_dev); again: - lock_state = rbd_try_acquire_lock(rbd_dev, &ret); - if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { - if (lock_state == RBD_LOCK_STATE_LOCKED) - wake_requests(rbd_dev, true); - dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, - rbd_dev, lock_state, ret); + ret = rbd_try_acquire_lock(rbd_dev); + if (ret <= 0) { + dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret); return; } @@ -3220,16 +4206,9 @@ again: goto again; /* treat this as a dead client */ } else if (ret == -EROFS) { rbd_warn(rbd_dev, "peer will not release lock"); - /* - * If this is rbd_add_acquire_lock(), we want to fail - * immediately -- reuse BLACKLISTED flag. Otherwise we - * want to block. - */ - if (!(rbd_dev->disk->flags & GENHD_FL_UP)) { - set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); - /* wake "rbd map --exclusive" process */ - wake_requests(rbd_dev, false); - } + down_write(&rbd_dev->lock_rwsem); + wake_lock_waiters(rbd_dev, ret); + up_write(&rbd_dev->lock_rwsem); } else if (ret < 0) { rbd_warn(rbd_dev, "error requesting lock: %d", ret); mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, @@ -3246,43 +4225,67 @@ again: } } -/* - * lock_rwsem must be held for write - */ -static bool rbd_release_lock(struct rbd_device *rbd_dev) +static bool rbd_quiesce_lock(struct rbd_device *rbd_dev) { - dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, - rbd_dev->lock_state); + bool need_wait; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + lockdep_assert_held_write(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) return false; - rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; - downgrade_write(&rbd_dev->lock_rwsem); /* * Ensure that all in-flight IO is flushed. - * - * FIXME: ceph_osdc_sync() flushes the entire OSD client, which - * may be shared with other devices. */ - ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); + rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; + rbd_assert(!completion_done(&rbd_dev->releasing_wait)); + need_wait = !list_empty(&rbd_dev->running_list); + downgrade_write(&rbd_dev->lock_rwsem); + if (need_wait) + wait_for_completion(&rbd_dev->releasing_wait); up_read(&rbd_dev->lock_rwsem); down_write(&rbd_dev->lock_rwsem); - dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, - rbd_dev->lock_state); if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) return false; + rbd_assert(list_empty(&rbd_dev->running_list)); + return true; +} + +static void rbd_pre_release_action(struct rbd_device *rbd_dev) +{ + if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) + rbd_object_map_close(rbd_dev); +} + +static void __rbd_release_lock(struct rbd_device *rbd_dev) +{ + rbd_assert(list_empty(&rbd_dev->running_list)); + + rbd_pre_release_action(rbd_dev); rbd_unlock(rbd_dev); +} + +/* + * lock_rwsem must be held for write + */ +static void rbd_release_lock(struct rbd_device *rbd_dev) +{ + if (!rbd_quiesce_lock(rbd_dev)) + return; + + __rbd_release_lock(rbd_dev); + /* * Give others a chance to grab the lock - we would re-acquire - * almost immediately if we got new IO during ceph_osdc_sync() - * otherwise. We need to ack our own notifications, so this - * lock_dwork will be requeued from rbd_wait_state_locked() - * after wake_requests() in rbd_handle_released_lock(). + * almost immediately if we got new IO while draining the running + * list otherwise. We need to ack our own notifications, so this + * lock_dwork will be requeued from rbd_handle_released_lock() by + * way of maybe_kick_acquire(). */ cancel_delayed_work(&rbd_dev->lock_dwork); - return true; } static void rbd_release_lock_work(struct work_struct *work) @@ -3295,6 +4298,23 @@ static void rbd_release_lock_work(struct work_struct *work) up_write(&rbd_dev->lock_rwsem); } +static void maybe_kick_acquire(struct rbd_device *rbd_dev) +{ + bool have_requests; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + if (__rbd_is_lock_owner(rbd_dev)) + return; + + spin_lock(&rbd_dev->lock_lists_lock); + have_requests = !list_empty(&rbd_dev->acquiring_list); + spin_unlock(&rbd_dev->lock_lists_lock); + if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) { + dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + } +} + static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, void **p) { @@ -3324,8 +4344,7 @@ static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, down_read(&rbd_dev->lock_rwsem); } - if (!__rbd_is_lock_owner(rbd_dev)) - wake_requests(rbd_dev, false); + maybe_kick_acquire(rbd_dev); up_read(&rbd_dev->lock_rwsem); } @@ -3357,8 +4376,7 @@ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, down_read(&rbd_dev->lock_rwsem); } - if (!__rbd_is_lock_owner(rbd_dev)) - wake_requests(rbd_dev, false); + maybe_kick_acquire(rbd_dev); up_read(&rbd_dev->lock_rwsem); } @@ -3608,7 +4626,6 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev) static void rbd_unregister_watch(struct rbd_device *rbd_dev) { - WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); cancel_tasks_sync(rbd_dev); mutex_lock(&rbd_dev->watch_mutex); @@ -3630,7 +4647,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) char cookie[32]; int ret; - WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); + if (!rbd_quiesce_lock(rbd_dev)) + return; format_lock_cookie(rbd_dev, cookie); ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, @@ -3646,11 +4664,11 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) * Lock cookie cannot be updated on older OSDs, so do * a manual release and queue an acquire. */ - if (rbd_release_lock(rbd_dev)) - queue_delayed_work(rbd_dev->task_wq, - &rbd_dev->lock_dwork, 0); + __rbd_release_lock(rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); } else { __rbd_lock(rbd_dev, cookie); + wake_lock_waiters(rbd_dev, 0); } } @@ -3671,15 +4689,18 @@ static void rbd_reregister_watch(struct work_struct *work) ret = __rbd_register_watch(rbd_dev); if (ret) { rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); - if (ret == -EBLACKLISTED || ret == -ENOENT) { - set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); - wake_requests(rbd_dev, true); - } else { + if (ret != -EBLACKLISTED && ret != -ENOENT) { queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, RBD_RETRY_DELAY); + mutex_unlock(&rbd_dev->watch_mutex); + return; } + mutex_unlock(&rbd_dev->watch_mutex); + down_write(&rbd_dev->lock_rwsem); + wake_lock_waiters(rbd_dev, ret); + up_write(&rbd_dev->lock_rwsem); return; } @@ -3742,7 +4763,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, CEPH_OSD_FLAG_READ, req_page, outbound_size, - reply_page, &inbound_size); + &reply_page, &inbound_size); if (!ret) { memcpy(inbound, page_address(reply_page), inbound_size); ret = inbound_size; @@ -3754,54 +4775,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, return ret; } -/* - * lock_rwsem must be held for read - */ -static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire) -{ - DEFINE_WAIT(wait); - unsigned long timeout; - int ret = 0; - - if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) - return -EBLACKLISTED; - - if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) - return 0; - - if (!may_acquire) { - rbd_warn(rbd_dev, "exclusive lock required"); - return -EROFS; - } - - do { - /* - * Note the use of mod_delayed_work() in rbd_acquire_lock() - * and cancel_delayed_work() in wake_requests(). - */ - dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); - queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); - prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, - TASK_UNINTERRUPTIBLE); - up_read(&rbd_dev->lock_rwsem); - timeout = schedule_timeout(ceph_timeout_jiffies( - rbd_dev->opts->lock_timeout)); - down_read(&rbd_dev->lock_rwsem); - if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { - ret = -EBLACKLISTED; - break; - } - if (!timeout) { - rbd_warn(rbd_dev, "timed out waiting for lock"); - ret = -ETIMEDOUT; - break; - } - } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); - - finish_wait(&rbd_dev->lock_waitq, &wait); - return ret; -} - static void rbd_queue_workfn(struct work_struct *work) { struct request *rq = blk_mq_rq_from_pdu(work); @@ -3812,7 +4785,6 @@ static void rbd_queue_workfn(struct work_struct *work) u64 length = blk_rq_bytes(rq); enum obj_operation_type op_type; u64 mapping_size; - bool must_be_locked; int result; switch (req_op(rq)) { @@ -3886,21 +4858,10 @@ static void rbd_queue_workfn(struct work_struct *work) goto err_rq; } - must_be_locked = - (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && - (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read); - if (must_be_locked) { - down_read(&rbd_dev->lock_rwsem); - result = rbd_wait_state_locked(rbd_dev, - !rbd_dev->opts->exclusive); - if (result) - goto err_unlock; - } - img_request = rbd_img_request_create(rbd_dev, op_type, snapc); if (!img_request) { result = -ENOMEM; - goto err_unlock; + goto err_rq; } img_request->rq = rq; snapc = NULL; /* img_request consumes a ref */ @@ -3910,19 +4871,14 @@ static void rbd_queue_workfn(struct work_struct *work) else result = rbd_img_fill_from_bio(img_request, offset, length, rq->bio); - if (result || !img_request->pending_count) + if (result) goto err_img_request; - rbd_img_request_submit(img_request); - if (must_be_locked) - up_read(&rbd_dev->lock_rwsem); + rbd_img_handle_request(img_request, 0); return; err_img_request: rbd_img_request_put(img_request); -err_unlock: - if (must_be_locked) - up_read(&rbd_dev->lock_rwsem); err_rq: if (result) rbd_warn(rbd_dev, "%s %llx at %llx result %d", @@ -4589,7 +5545,13 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); - init_waitqueue_head(&rbd_dev->lock_waitq); + spin_lock_init(&rbd_dev->lock_lists_lock); + INIT_LIST_HEAD(&rbd_dev->acquiring_list); + INIT_LIST_HEAD(&rbd_dev->running_list); + init_completion(&rbd_dev->acquire_wait); + init_completion(&rbd_dev->releasing_wait); + + spin_lock_init(&rbd_dev->object_map_lock); rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; @@ -4772,6 +5734,32 @@ static int rbd_dev_v2_features(struct rbd_device *rbd_dev) &rbd_dev->header.features); } +/* + * These are generic image flags, but since they are used only for + * object map, store them in rbd_dev->object_map_flags. + * + * For the same reason, this function is called only on object map + * (re)load and not on header refresh. + */ +static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev) +{ + __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id); + __le64 flags; + int ret; + + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_flags", + &snapid, sizeof(snapid), + &flags, sizeof(flags)); + if (ret < 0) + return ret; + if (ret < sizeof(flags)) + return -EBADMSG; + + rbd_dev->object_map_flags = le64_to_cpu(flags); + return 0; +} + struct parent_image_info { u64 pool_id; const char *pool_ns; @@ -4829,7 +5817,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "parent_get", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), reply_page, &reply_len); + req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret == -EOPNOTSUPP ? 1 : ret; @@ -4841,7 +5829,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), reply_page, &reply_len); + req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret; @@ -4872,7 +5860,7 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "get_parent", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), reply_page, &reply_len); + req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret; @@ -5605,28 +6593,49 @@ static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) { down_write(&rbd_dev->lock_rwsem); if (__rbd_is_lock_owner(rbd_dev)) - rbd_unlock(rbd_dev); + __rbd_release_lock(rbd_dev); up_write(&rbd_dev->lock_rwsem); } +/* + * If the wait is interrupted, an error is returned even if the lock + * was successfully acquired. rbd_dev_image_unlock() will release it + * if needed. + */ static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) { - int ret; + long ret; if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { + if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read) + return 0; + rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); return -EINVAL; } - /* FIXME: "rbd map --exclusive" should be in interruptible */ - down_read(&rbd_dev->lock_rwsem); - ret = rbd_wait_state_locked(rbd_dev, true); - up_read(&rbd_dev->lock_rwsem); + if (rbd_dev->spec->snap_id != CEPH_NOSNAP) + return 0; + + rbd_assert(!rbd_is_lock_owner(rbd_dev)); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait, + ceph_timeout_jiffies(rbd_dev->opts->lock_timeout)); + if (ret > 0) + ret = rbd_dev->acquire_err; + else if (!ret) + ret = -ETIMEDOUT; + if (ret) { - rbd_warn(rbd_dev, "failed to acquire exclusive lock"); - return -EROFS; + rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret); + return ret; } + /* + * The lock may have been released by now, unless automatic lock + * transitions are disabled. + */ + rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev)); return 0; } @@ -5724,6 +6733,8 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev) struct rbd_image_header *header; rbd_dev_parent_put(rbd_dev); + rbd_object_map_free(rbd_dev); + rbd_dev_mapping_clear(rbd_dev); /* Free dynamic fields from the header, then zero it out */ @@ -5824,7 +6835,6 @@ out_err: static void rbd_dev_device_release(struct rbd_device *rbd_dev) { clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); - rbd_dev_mapping_clear(rbd_dev); rbd_free_disk(rbd_dev); if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); @@ -5858,23 +6868,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) if (ret) goto err_out_blkdev; - ret = rbd_dev_mapping_set(rbd_dev); - if (ret) - goto err_out_disk; - set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only); ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); if (ret) - goto err_out_mapping; + goto err_out_disk; set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); up_write(&rbd_dev->header_rwsem); return 0; -err_out_mapping: - rbd_dev_mapping_clear(rbd_dev); err_out_disk: rbd_free_disk(rbd_dev); err_out_blkdev: @@ -5975,6 +6979,17 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) goto err_out_probe; } + ret = rbd_dev_mapping_set(rbd_dev); + if (ret) + goto err_out_probe; + + if (rbd_dev->spec->snap_id != CEPH_NOSNAP && + (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) { + ret = rbd_object_map_load(rbd_dev); + if (ret) + goto err_out_probe; + } + if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { ret = rbd_dev_v2_parent_info(rbd_dev); if (ret) @@ -6071,11 +7086,9 @@ static ssize_t do_rbd_add(struct bus_type *bus, if (rc) goto err_out_image_probe; - if (rbd_dev->opts->exclusive) { - rc = rbd_add_acquire_lock(rbd_dev); - if (rc) - goto err_out_device_setup; - } + rc = rbd_add_acquire_lock(rbd_dev); + if (rc) + goto err_out_image_lock; /* Everything's ready. Announce the disk to the world. */ @@ -6101,7 +7114,6 @@ out: err_out_image_lock: rbd_dev_image_unlock(rbd_dev); -err_out_device_setup: rbd_dev_device_release(rbd_dev); err_out_image_probe: rbd_dev_image_release(rbd_dev); diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h index 62ff50d3e7a6..ac98ab6ccd3b 100644 --- a/drivers/block/rbd_types.h +++ b/drivers/block/rbd_types.h @@ -18,6 +18,7 @@ /* For format version 2, rbd image 'foo' consists of objects * rbd_id.foo - id of image * rbd_header.<id> - image metadata + * rbd_object_map.<id> - optional image object map * rbd_data.<id>.0000000000000000 * rbd_data.<id>.0000000000000001 * ... - data @@ -25,6 +26,7 @@ */ #define RBD_HEADER_PREFIX "rbd_header." +#define RBD_OBJECT_MAP_PREFIX "rbd_object_map." #define RBD_ID_PREFIX "rbd_id." #define RBD_V2_DATA_FORMAT "%s.%016llx" @@ -39,6 +41,14 @@ enum rbd_notify_op { RBD_NOTIFY_OP_HEADER_UPDATE = 3, }; +#define OBJECT_NONEXISTENT 0 +#define OBJECT_EXISTS 1 +#define OBJECT_PENDING 2 +#define OBJECT_EXISTS_CLEAN 3 + +#define RBD_FLAG_OBJECT_MAP_INVALID (1ULL << 0) +#define RBD_FLAG_FAST_DIFF_INVALID (1ULL << 1) + /* * For format version 1, rbd image 'foo' consists of objects * foo.rbd - image metadata diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig index 7f7d92d6b024..cf235f6eacf9 100644 --- a/fs/ceph/Kconfig +++ b/fs/ceph/Kconfig @@ -36,3 +36,15 @@ config CEPH_FS_POSIX_ACL groups beyond the owner/group/world scheme. If you don't know what Access Control Lists are, say N + +config CEPH_FS_SECURITY_LABEL + bool "CephFS Security Labels" + depends on CEPH_FS && SECURITY + help + Security labels support alternative access control models + implemented by security modules like SELinux. This option + enables an extended attribute handler for file security + labels in the Ceph filesystem. + + If you are not using a security module that requires using + extended attributes for file security labels, say N. diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c index 8a19c249036c..aa55f412a6e3 100644 --- a/fs/ceph/acl.c +++ b/fs/ceph/acl.c @@ -159,7 +159,7 @@ out: } int ceph_pre_init_acls(struct inode *dir, umode_t *mode, - struct ceph_acls_info *info) + struct ceph_acl_sec_ctx *as_ctx) { struct posix_acl *acl, *default_acl; size_t val_size1 = 0, val_size2 = 0; @@ -234,9 +234,9 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode, kfree(tmp_buf); - info->acl = acl; - info->default_acl = default_acl; - info->pagelist = pagelist; + as_ctx->acl = acl; + as_ctx->default_acl = default_acl; + as_ctx->pagelist = pagelist; return 0; out_err: @@ -248,18 +248,10 @@ out_err: return err; } -void ceph_init_inode_acls(struct inode* inode, struct ceph_acls_info *info) +void ceph_init_inode_acls(struct inode *inode, struct ceph_acl_sec_ctx *as_ctx) { if (!inode) return; - ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, info->acl); - ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, info->default_acl); -} - -void ceph_release_acls_info(struct ceph_acls_info *info) -{ - posix_acl_release(info->acl); - posix_acl_release(info->default_acl); - if (info->pagelist) - ceph_pagelist_release(info->pagelist); + ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, as_ctx->acl); + ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, as_ctx->default_acl); } diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index a47c541f8006..e078cc55b989 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -10,6 +10,7 @@ #include <linux/pagevec.h> #include <linux/task_io_accounting_ops.h> #include <linux/signal.h> +#include <linux/iversion.h> #include "super.h" #include "mds_client.h" @@ -1576,6 +1577,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) /* Update time before taking page lock */ file_update_time(vma->vm_file); + inode_inc_iversion_raw(inode); do { lock_page(page); diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 0176241eaea7..d98dcd976c80 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -8,6 +8,7 @@ #include <linux/vmalloc.h> #include <linux/wait.h> #include <linux/writeback.h> +#include <linux/iversion.h> #include "super.h" #include "mds_client.h" @@ -1138,8 +1139,9 @@ struct cap_msg_args { u64 ino, cid, follows; u64 flush_tid, oldest_flush_tid, size, max_size; u64 xattr_version; + u64 change_attr; struct ceph_buffer *xattr_buf; - struct timespec64 atime, mtime, ctime; + struct timespec64 atime, mtime, ctime, btime; int op, caps, wanted, dirty; u32 seq, issue_seq, mseq, time_warp_seq; u32 flags; @@ -1160,7 +1162,6 @@ static int send_cap_msg(struct cap_msg_args *arg) struct ceph_msg *msg; void *p; size_t extra_len; - struct timespec64 zerotime = {0}; struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc; dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" @@ -1245,15 +1246,10 @@ static int send_cap_msg(struct cap_msg_args *arg) /* pool namespace (version 8) (mds always ignores this) */ ceph_encode_32(&p, 0); - /* - * btime and change_attr (version 9) - * - * We just zero these out for now, as the MDS ignores them unless - * the requisite feature flags are set (which we don't do yet). - */ - ceph_encode_timespec64(p, &zerotime); + /* btime and change_attr (version 9) */ + ceph_encode_timespec64(p, &arg->btime); p += sizeof(struct ceph_timespec); - ceph_encode_64(&p, 0); + ceph_encode_64(&p, arg->change_attr); /* Advisory flags (version 10) */ ceph_encode_32(&p, arg->flags); @@ -1263,20 +1259,22 @@ static int send_cap_msg(struct cap_msg_args *arg) } /* - * Queue cap releases when an inode is dropped from our cache. Since - * inode is about to be destroyed, there is no need for i_ceph_lock. + * Queue cap releases when an inode is dropped from our cache. */ -void __ceph_remove_caps(struct inode *inode) +void __ceph_remove_caps(struct ceph_inode_info *ci) { - struct ceph_inode_info *ci = ceph_inode(inode); struct rb_node *p; + /* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU) + * may call __ceph_caps_issued_mask() on a freeing inode. */ + spin_lock(&ci->i_ceph_lock); p = rb_first(&ci->i_caps); while (p) { struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); p = rb_next(p); __ceph_remove_cap(cap, true); } + spin_unlock(&ci->i_ceph_lock); } /* @@ -1297,7 +1295,7 @@ void __ceph_remove_caps(struct inode *inode) * caller should hold snap_rwsem (read), s_mutex. */ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, - int op, bool sync, int used, int want, int retain, + int op, int flags, int used, int want, int retain, int flushing, u64 flush_tid, u64 oldest_flush_tid) __releases(cap->ci->i_ceph_lock) { @@ -1377,6 +1375,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, arg.mtime = inode->i_mtime; arg.atime = inode->i_atime; arg.ctime = inode->i_ctime; + arg.btime = ci->i_btime; + arg.change_attr = inode_peek_iversion_raw(inode); arg.op = op; arg.caps = cap->implemented; @@ -1393,12 +1393,19 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, arg.mode = inode->i_mode; arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE; - if (list_empty(&ci->i_cap_snaps)) - arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP; - else - arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP; - if (sync) - arg.flags |= CEPH_CLIENT_CAPS_SYNC; + if (!(flags & CEPH_CLIENT_CAPS_PENDING_CAPSNAP) && + !list_empty(&ci->i_cap_snaps)) { + struct ceph_cap_snap *capsnap; + list_for_each_entry_reverse(capsnap, &ci->i_cap_snaps, ci_item) { + if (capsnap->cap_flush.tid) + break; + if (capsnap->need_flush) { + flags |= CEPH_CLIENT_CAPS_PENDING_CAPSNAP; + break; + } + } + } + arg.flags = flags; spin_unlock(&ci->i_ceph_lock); @@ -1436,6 +1443,8 @@ static inline int __send_flush_snap(struct inode *inode, arg.atime = capsnap->atime; arg.mtime = capsnap->mtime; arg.ctime = capsnap->ctime; + arg.btime = capsnap->btime; + arg.change_attr = capsnap->change_attr; arg.op = CEPH_CAP_OP_FLUSHSNAP; arg.caps = capsnap->issued; @@ -1603,10 +1612,8 @@ retry: } // make sure flushsnap messages are sent in proper order. - if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { + if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) __kick_flushing_caps(mdsc, session, ci, 0); - ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; - } __ceph_flush_snaps(ci, session); out: @@ -2048,10 +2055,8 @@ ack: if (cap == ci->i_auth_cap && (ci->i_ceph_flags & (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) { - if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { + if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) __kick_flushing_caps(mdsc, session, ci, 0); - ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; - } if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) __ceph_flush_snaps(ci, session); @@ -2087,7 +2092,7 @@ ack: sent++; /* __send_cap drops i_ceph_lock */ - delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false, + delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, 0, cap_used, want, retain, flushing, flush_tid, oldest_flush_tid); goto retry; /* retake i_ceph_lock and restart our cap scan. */ @@ -2121,6 +2126,7 @@ static int try_flush_caps(struct inode *inode, u64 *ptid) retry: spin_lock(&ci->i_ceph_lock); +retry_locked: if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { spin_unlock(&ci->i_ceph_lock); dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode); @@ -2128,8 +2134,6 @@ retry: } if (ci->i_dirty_caps && ci->i_auth_cap) { struct ceph_cap *cap = ci->i_auth_cap; - int used = __ceph_caps_used(ci); - int want = __ceph_caps_wanted(ci); int delayed; if (!session || session != cap->session) { @@ -2145,13 +2149,25 @@ retry: goto out; } + if (ci->i_ceph_flags & + (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS)) { + if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) + __kick_flushing_caps(mdsc, session, ci, 0); + if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) + __ceph_flush_snaps(ci, session); + goto retry_locked; + } + flushing = __mark_caps_flushing(inode, session, true, &flush_tid, &oldest_flush_tid); /* __send_cap drops i_ceph_lock */ - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true, - used, want, (cap->issued | cap->implemented), - flushing, flush_tid, oldest_flush_tid); + delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, + CEPH_CLIENT_CAPS_SYNC, + __ceph_caps_used(ci), + __ceph_caps_wanted(ci), + (cap->issued | cap->implemented), + flushing, flush_tid, oldest_flush_tid); if (delayed) { spin_lock(&ci->i_ceph_lock); @@ -2320,6 +2336,16 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_cap_flush *cf; int ret; u64 first_tid = 0; + u64 last_snap_flush = 0; + + ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; + + list_for_each_entry_reverse(cf, &ci->i_cap_flush_list, i_list) { + if (!cf->caps) { + last_snap_flush = cf->tid; + break; + } + } list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { if (cf->tid < first_tid) @@ -2338,10 +2364,13 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc, dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, cap, cf->tid, ceph_cap_string(cf->caps)); ci->i_ceph_flags |= CEPH_I_NODELAY; + ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - false, __ceph_caps_used(ci), + (cf->tid < last_snap_flush ? + CEPH_CLIENT_CAPS_PENDING_CAPSNAP : 0), + __ceph_caps_used(ci), __ceph_caps_wanted(ci), - cap->issued | cap->implemented, + (cap->issued | cap->implemented), cf->caps, cf->tid, oldest_flush_tid); if (ret) { pr_err("kick_flushing_caps: error sending " @@ -2410,7 +2439,6 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, */ if ((cap->issued & ci->i_flushing_caps) != ci->i_flushing_caps) { - ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; /* encode_caps_cb() also will reset these sequence * numbers. make sure sequence numbers in cap flush * message match later reconnect message */ @@ -2450,7 +2478,6 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, continue; } if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { - ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); } @@ -2478,7 +2505,6 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, oldest_flush_tid = __get_oldest_flush_tid(mdsc); spin_unlock(&mdsc->cap_dirty_lock); - ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); spin_unlock(&ci->i_ceph_lock); } else { @@ -3040,8 +3066,10 @@ struct cap_extra_info { bool dirstat_valid; u64 nfiles; u64 nsubdirs; + u64 change_attr; /* currently issued */ int issued; + struct timespec64 btime; }; /* @@ -3123,11 +3151,14 @@ static void handle_cap_grant(struct inode *inode, __check_cap_issue(ci, cap, newcaps); + inode_set_max_iversion_raw(inode, extra_info->change_attr); + if ((newcaps & CEPH_CAP_AUTH_SHARED) && (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) { inode->i_mode = le32_to_cpu(grant->mode); inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid)); inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid)); + ci->i_btime = extra_info->btime; dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode, from_kuid(&init_user_ns, inode->i_uid), from_kgid(&init_user_ns, inode->i_gid)); @@ -3154,6 +3185,7 @@ static void handle_cap_grant(struct inode *inode, ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); ci->i_xattrs.version = version; ceph_forget_all_cached_acls(inode); + ceph_security_invalidate_secctx(inode); } } @@ -3848,17 +3880,19 @@ void ceph_handle_caps(struct ceph_mds_session *session, } } - if (msg_version >= 11) { + if (msg_version >= 9) { struct ceph_timespec *btime; - u64 change_attr; - u32 flags; - /* version >= 9 */ if (p + sizeof(*btime) > end) goto bad; btime = p; + ceph_decode_timespec64(&extra_info.btime, btime); p += sizeof(*btime); - ceph_decode_64_safe(&p, end, change_attr, bad); + ceph_decode_64_safe(&p, end, extra_info.change_attr, bad); + } + + if (msg_version >= 11) { + u32 flags; /* version >= 10 */ ceph_decode_32_safe(&p, end, flags, bad); /* version >= 11 */ diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 83cd41fa2b01..2eb88ed22993 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -52,7 +52,7 @@ static int mdsc_show(struct seq_file *s, void *p) struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; struct rb_node *rp; - int pathlen; + int pathlen = 0; u64 pathbase; char *path; diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 0637149fb9f9..aab29f48c62d 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -825,7 +825,7 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; - struct ceph_acls_info acls = {}; + struct ceph_acl_sec_ctx as_ctx = {}; int err; if (ceph_snap(dir) != CEPH_NOSNAP) @@ -836,7 +836,10 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, goto out; } - err = ceph_pre_init_acls(dir, &mode, &acls); + err = ceph_pre_init_acls(dir, &mode, &as_ctx); + if (err < 0) + goto out; + err = ceph_security_init_secctx(dentry, mode, &as_ctx); if (err < 0) goto out; @@ -855,9 +858,9 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, req->r_args.mknod.rdev = cpu_to_le32(rdev); req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; - if (acls.pagelist) { - req->r_pagelist = acls.pagelist; - acls.pagelist = NULL; + if (as_ctx.pagelist) { + req->r_pagelist = as_ctx.pagelist; + as_ctx.pagelist = NULL; } err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && !req->r_reply_info.head->is_dentry) @@ -865,10 +868,10 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, ceph_mdsc_put_request(req); out: if (!err) - ceph_init_inode_acls(d_inode(dentry), &acls); + ceph_init_inode_acls(d_inode(dentry), &as_ctx); else d_drop(dentry); - ceph_release_acls_info(&acls); + ceph_release_acl_sec_ctx(&as_ctx); return err; } @@ -884,6 +887,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; + struct ceph_acl_sec_ctx as_ctx = {}; int err; if (ceph_snap(dir) != CEPH_NOSNAP) @@ -894,6 +898,10 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, goto out; } + err = ceph_security_init_secctx(dentry, S_IFLNK | 0777, &as_ctx); + if (err < 0) + goto out; + dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS); if (IS_ERR(req)) { @@ -919,6 +927,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, out: if (err) d_drop(dentry); + ceph_release_acl_sec_ctx(&as_ctx); return err; } @@ -927,7 +936,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; - struct ceph_acls_info acls = {}; + struct ceph_acl_sec_ctx as_ctx = {}; int err = -EROFS; int op; @@ -950,7 +959,10 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) } mode |= S_IFDIR; - err = ceph_pre_init_acls(dir, &mode, &acls); + err = ceph_pre_init_acls(dir, &mode, &as_ctx); + if (err < 0) + goto out; + err = ceph_security_init_secctx(dentry, mode, &as_ctx); if (err < 0) goto out; @@ -967,9 +979,9 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) req->r_args.mkdir.mode = cpu_to_le32(mode); req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; - if (acls.pagelist) { - req->r_pagelist = acls.pagelist; - acls.pagelist = NULL; + if (as_ctx.pagelist) { + req->r_pagelist = as_ctx.pagelist; + as_ctx.pagelist = NULL; } err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && @@ -979,10 +991,10 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) ceph_mdsc_put_request(req); out: if (!err) - ceph_init_inode_acls(d_inode(dentry), &acls); + ceph_init_inode_acls(d_inode(dentry), &as_ctx); else d_drop(dentry); - ceph_release_acls_info(&acls); + ceph_release_acl_sec_ctx(&as_ctx); return err; } @@ -1433,8 +1445,7 @@ static bool __dentry_lease_is_valid(struct ceph_dentry_info *di) return false; } -static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags, - struct inode *dir) +static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags) { struct ceph_dentry_info *di; struct ceph_mds_session *session = NULL; @@ -1466,7 +1477,7 @@ static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags, spin_unlock(&dentry->d_lock); if (session) { - ceph_mdsc_lease_send_msg(session, dir, dentry, + ceph_mdsc_lease_send_msg(session, dentry, CEPH_MDS_LEASE_RENEW, seq); ceph_put_mds_session(session); } @@ -1512,18 +1523,26 @@ static int __dir_lease_try_check(const struct dentry *dentry) static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) { struct ceph_inode_info *ci = ceph_inode(dir); - struct ceph_dentry_info *di = ceph_dentry(dentry); - int valid = 0; + int valid; + int shared_gen; spin_lock(&ci->i_ceph_lock); - if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen) - valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1); + valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1); + shared_gen = atomic_read(&ci->i_shared_gen); spin_unlock(&ci->i_ceph_lock); - if (valid) - __ceph_dentry_dir_lease_touch(di); - dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n", - dir, (unsigned)atomic_read(&ci->i_shared_gen), - dentry, (unsigned)di->lease_shared_gen, valid); + if (valid) { + struct ceph_dentry_info *di; + spin_lock(&dentry->d_lock); + di = ceph_dentry(dentry); + if (dir == d_inode(dentry->d_parent) && + di && di->lease_shared_gen == shared_gen) + __ceph_dentry_dir_lease_touch(di); + else + valid = 0; + spin_unlock(&dentry->d_lock); + } + dout("dir_lease_is_valid dir %p v%u dentry %p = %d\n", + dir, (unsigned)atomic_read(&ci->i_shared_gen), dentry, valid); return valid; } @@ -1558,7 +1577,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) { valid = 1; } else { - valid = dentry_lease_is_valid(dentry, flags, dir); + valid = dentry_lease_is_valid(dentry, flags); if (valid == -ECHILD) return valid; if (valid || dir_lease_is_valid(dir, dentry)) { diff --git a/fs/ceph/export.c b/fs/ceph/export.c index d3ef7ee429ec..15ff1b09cfa2 100644 --- a/fs/ceph/export.c +++ b/fs/ceph/export.c @@ -368,7 +368,7 @@ static struct dentry *ceph_get_parent(struct dentry *child) } out: dout("get_parent %p ino %llx.%llx err=%ld\n", - child, ceph_vinop(inode), (IS_ERR(dn) ? PTR_ERR(dn) : 0)); + child, ceph_vinop(inode), (long)PTR_ERR_OR_ZERO(dn)); return dn; } diff --git a/fs/ceph/file.c b/fs/ceph/file.c index c5517ffeb11c..685a03cc4b77 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -10,6 +10,7 @@ #include <linux/namei.h> #include <linux/writeback.h> #include <linux/falloc.h> +#include <linux/iversion.h> #include "super.h" #include "mds_client.h" @@ -437,7 +438,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; struct dentry *dn; - struct ceph_acls_info acls = {}; + struct ceph_acl_sec_ctx as_ctx = {}; int mask; int err; @@ -451,25 +452,28 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, if (flags & O_CREAT) { if (ceph_quota_is_max_files_exceeded(dir)) return -EDQUOT; - err = ceph_pre_init_acls(dir, &mode, &acls); + err = ceph_pre_init_acls(dir, &mode, &as_ctx); if (err < 0) return err; + err = ceph_security_init_secctx(dentry, mode, &as_ctx); + if (err < 0) + goto out_ctx; } /* do the open */ req = prepare_open_request(dir->i_sb, flags, mode); if (IS_ERR(req)) { err = PTR_ERR(req); - goto out_acl; + goto out_ctx; } req->r_dentry = dget(dentry); req->r_num_caps = 2; if (flags & O_CREAT) { req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; - if (acls.pagelist) { - req->r_pagelist = acls.pagelist; - acls.pagelist = NULL; + if (as_ctx.pagelist) { + req->r_pagelist = as_ctx.pagelist; + as_ctx.pagelist = NULL; } } @@ -507,7 +511,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, } else { dout("atomic_open finish_open on dn %p\n", dn); if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) { - ceph_init_inode_acls(d_inode(dentry), &acls); + ceph_init_inode_acls(d_inode(dentry), &as_ctx); file->f_mode |= FMODE_CREATED; } err = finish_open(file, dentry, ceph_open); @@ -516,8 +520,8 @@ out_req: if (!req->r_err && req->r_target_inode) ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode); ceph_mdsc_put_request(req); -out_acl: - ceph_release_acls_info(&acls); +out_ctx: + ceph_release_acl_sec_ctx(&as_ctx); dout("atomic_open result=%d\n", err); return err; } @@ -1007,7 +1011,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, * may block. */ truncate_inode_pages_range(inode->i_mapping, pos, - (pos+len) | (PAGE_SIZE - 1)); + PAGE_ALIGN(pos + len) - 1); req->r_mtime = mtime; } @@ -1022,7 +1026,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, req->r_callback = ceph_aio_complete_req; req->r_inode = inode; req->r_priv = aio_req; - list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs); + list_add_tail(&req->r_private_item, &aio_req->osd_reqs); pos += len; continue; @@ -1082,8 +1086,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, while (!list_empty(&osd_reqs)) { req = list_first_entry(&osd_reqs, struct ceph_osd_request, - r_unsafe_item); - list_del_init(&req->r_unsafe_item); + r_private_item); + list_del_init(&req->r_private_item); if (ret >= 0) ret = ceph_osdc_start_request(req->r_osdc, req, false); @@ -1432,6 +1436,8 @@ retry_snap: if (err) goto out; + inode_inc_iversion_raw(inode); + if (ci->i_inline_version != CEPH_INLINE_NONE) { err = ceph_uninline_data(file, NULL); if (err < 0) @@ -2063,6 +2069,8 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off, do_final_copy = true; file_update_time(dst_file); + inode_inc_iversion_raw(dst_inode); + if (endoff > size) { int caps_flags = 0; diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 761451f36e2d..791f84a13bb8 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -13,6 +13,7 @@ #include <linux/posix_acl.h> #include <linux/random.h> #include <linux/sort.h> +#include <linux/iversion.h> #include "super.h" #include "mds_client.h" @@ -42,6 +43,7 @@ static int ceph_set_ino_cb(struct inode *inode, void *data) { ceph_inode(inode)->i_vino = *(struct ceph_vino *)data; inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data); + inode_set_iversion_raw(inode, 0); return 0; } @@ -509,6 +511,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) INIT_WORK(&ci->i_work, ceph_inode_work); ci->i_work_mask = 0; + memset(&ci->i_btime, '\0', sizeof(ci->i_btime)); ceph_fscache_inode_init(ci); @@ -523,17 +526,20 @@ void ceph_free_inode(struct inode *inode) kmem_cache_free(ceph_inode_cachep, ci); } -void ceph_destroy_inode(struct inode *inode) +void ceph_evict_inode(struct inode *inode) { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_frag *frag; struct rb_node *n; - dout("destroy_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode)); + dout("evict_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode)); + + truncate_inode_pages_final(&inode->i_data); + clear_inode(inode); ceph_fscache_unregister_inode_cookie(ci); - __ceph_remove_caps(inode); + __ceph_remove_caps(ci); if (__ceph_has_any_quota(ci)) ceph_adjust_quota_realms_count(inode, false); @@ -578,16 +584,6 @@ void ceph_destroy_inode(struct inode *inode) ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns)); } -int ceph_drop_inode(struct inode *inode) -{ - /* - * Positve dentry and corresponding inode are always accompanied - * in MDS reply. So no need to keep inode in the cache after - * dropping all its aliases. - */ - return 1; -} - static inline blkcnt_t calc_inode_blocks(u64 size) { return (size + (1<<9) - 1) >> 9; @@ -795,6 +791,9 @@ static int fill_inode(struct inode *inode, struct page *locked_page, le64_to_cpu(info->version) > (ci->i_version & ~1))) new_version = true; + /* Update change_attribute */ + inode_set_max_iversion_raw(inode, iinfo->change_attr); + __ceph_caps_issued(ci, &issued); issued |= __ceph_caps_dirty(ci); new_issued = ~issued & info_caps; @@ -813,6 +812,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page, dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode, from_kuid(&init_user_ns, inode->i_uid), from_kgid(&init_user_ns, inode->i_gid)); + ceph_decode_timespec64(&ci->i_btime, &iinfo->btime); + ceph_decode_timespec64(&ci->i_snap_btime, &iinfo->snap_btime); } if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) && @@ -887,6 +888,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, iinfo->xattr_data, iinfo->xattr_len); ci->i_xattrs.version = le64_to_cpu(info->xattr_version); ceph_forget_all_cached_acls(inode); + ceph_security_invalidate_secctx(inode); xattr_blob = NULL; } @@ -1027,59 +1029,38 @@ out: } /* - * caller should hold session s_mutex. + * caller should hold session s_mutex and dentry->d_lock. */ -static void update_dentry_lease(struct dentry *dentry, - struct ceph_mds_reply_lease *lease, - struct ceph_mds_session *session, - unsigned long from_time, - struct ceph_vino *tgt_vino, - struct ceph_vino *dir_vino) +static void __update_dentry_lease(struct inode *dir, struct dentry *dentry, + struct ceph_mds_reply_lease *lease, + struct ceph_mds_session *session, + unsigned long from_time, + struct ceph_mds_session **old_lease_session) { struct ceph_dentry_info *di = ceph_dentry(dentry); long unsigned duration = le32_to_cpu(lease->duration_ms); long unsigned ttl = from_time + (duration * HZ) / 1000; long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000; - struct inode *dir; - struct ceph_mds_session *old_lease_session = NULL; - /* - * Make sure dentry's inode matches tgt_vino. NULL tgt_vino means that - * we expect a negative dentry. - */ - if (!tgt_vino && d_really_is_positive(dentry)) - return; - - if (tgt_vino && (d_really_is_negative(dentry) || - !ceph_ino_compare(d_inode(dentry), tgt_vino))) - return; - - spin_lock(&dentry->d_lock); dout("update_dentry_lease %p duration %lu ms ttl %lu\n", dentry, duration, ttl); - dir = d_inode(dentry->d_parent); - - /* make sure parent matches dir_vino */ - if (!ceph_ino_compare(dir, dir_vino)) - goto out_unlock; - /* only track leases on regular dentries */ if (ceph_snap(dir) != CEPH_NOSNAP) - goto out_unlock; + return; di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen); if (duration == 0) { __ceph_dentry_dir_lease_touch(di); - goto out_unlock; + return; } if (di->lease_gen == session->s_cap_gen && time_before(ttl, di->time)) - goto out_unlock; /* we already have a newer lease. */ + return; /* we already have a newer lease. */ if (di->lease_session && di->lease_session != session) { - old_lease_session = di->lease_session; + *old_lease_session = di->lease_session; di->lease_session = NULL; } @@ -1092,6 +1073,62 @@ static void update_dentry_lease(struct dentry *dentry, di->time = ttl; __ceph_dentry_lease_touch(di); +} + +static inline void update_dentry_lease(struct inode *dir, struct dentry *dentry, + struct ceph_mds_reply_lease *lease, + struct ceph_mds_session *session, + unsigned long from_time) +{ + struct ceph_mds_session *old_lease_session = NULL; + spin_lock(&dentry->d_lock); + __update_dentry_lease(dir, dentry, lease, session, from_time, + &old_lease_session); + spin_unlock(&dentry->d_lock); + if (old_lease_session) + ceph_put_mds_session(old_lease_session); +} + +/* + * update dentry lease without having parent inode locked + */ +static void update_dentry_lease_careful(struct dentry *dentry, + struct ceph_mds_reply_lease *lease, + struct ceph_mds_session *session, + unsigned long from_time, + char *dname, u32 dname_len, + struct ceph_vino *pdvino, + struct ceph_vino *ptvino) + +{ + struct inode *dir; + struct ceph_mds_session *old_lease_session = NULL; + + spin_lock(&dentry->d_lock); + /* make sure dentry's name matches target */ + if (dentry->d_name.len != dname_len || + memcmp(dentry->d_name.name, dname, dname_len)) + goto out_unlock; + + dir = d_inode(dentry->d_parent); + /* make sure parent matches dvino */ + if (!ceph_ino_compare(dir, pdvino)) + goto out_unlock; + + /* make sure dentry's inode matches target. NULL ptvino means that + * we expect a negative dentry */ + if (ptvino) { + if (d_really_is_negative(dentry)) + goto out_unlock; + if (!ceph_ino_compare(d_inode(dentry), ptvino)) + goto out_unlock; + } else { + if (d_really_is_positive(dentry)) + goto out_unlock; + } + + __update_dentry_lease(dir, dentry, lease, session, + from_time, &old_lease_session); out_unlock: spin_unlock(&dentry->d_lock); if (old_lease_session) @@ -1156,19 +1193,6 @@ static int splice_dentry(struct dentry **pdn, struct inode *in) return 0; } -static int d_name_cmp(struct dentry *dentry, const char *name, size_t len) -{ - int ret; - - /* take d_lock to ensure dentry->d_name stability */ - spin_lock(&dentry->d_lock); - ret = dentry->d_name.len - len; - if (!ret) - ret = memcmp(dentry->d_name.name, name, len); - spin_unlock(&dentry->d_lock); - return ret; -} - /* * Incorporate results into the local cache. This is either just * one inode, or a directory, dentry, and possibly linked-to inode (e.g., @@ -1371,10 +1395,9 @@ retry_lookup: } else if (have_lease) { if (d_unhashed(dn)) d_add(dn, NULL); - update_dentry_lease(dn, rinfo->dlease, - session, - req->r_request_started, - NULL, &dvino); + update_dentry_lease(dir, dn, + rinfo->dlease, session, + req->r_request_started); } goto done; } @@ -1396,11 +1419,9 @@ retry_lookup: } if (have_lease) { - tvino.ino = le64_to_cpu(rinfo->targeti.in->ino); - tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid); - update_dentry_lease(dn, rinfo->dlease, session, - req->r_request_started, - &tvino, &dvino); + update_dentry_lease(dir, dn, + rinfo->dlease, session, + req->r_request_started); } dout(" final dn %p\n", dn); } else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP || @@ -1418,27 +1439,20 @@ retry_lookup: err = splice_dentry(&req->r_dentry, in); if (err < 0) goto done; - } else if (rinfo->head->is_dentry && - !d_name_cmp(req->r_dentry, rinfo->dname, rinfo->dname_len)) { + } else if (rinfo->head->is_dentry && req->r_dentry) { + /* parent inode is not locked, be carefull */ struct ceph_vino *ptvino = NULL; - - if ((le32_to_cpu(rinfo->diri.in->cap.caps) & CEPH_CAP_FILE_SHARED) || - le32_to_cpu(rinfo->dlease->duration_ms)) { - dvino.ino = le64_to_cpu(rinfo->diri.in->ino); - dvino.snap = le64_to_cpu(rinfo->diri.in->snapid); - - if (rinfo->head->is_target) { - tvino.ino = le64_to_cpu(rinfo->targeti.in->ino); - tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid); - ptvino = &tvino; - } - - update_dentry_lease(req->r_dentry, rinfo->dlease, - session, req->r_request_started, ptvino, - &dvino); - } else { - dout("%s: no dentry lease or dir cap\n", __func__); + dvino.ino = le64_to_cpu(rinfo->diri.in->ino); + dvino.snap = le64_to_cpu(rinfo->diri.in->snapid); + if (rinfo->head->is_target) { + tvino.ino = le64_to_cpu(rinfo->targeti.in->ino); + tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid); + ptvino = &tvino; } + update_dentry_lease_careful(req->r_dentry, rinfo->dlease, + session, req->r_request_started, + rinfo->dname, rinfo->dname_len, + &dvino, ptvino); } done: dout("fill_trace done err=%d\n", err); @@ -1600,7 +1614,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, /* FIXME: release caps/leases if error occurs */ for (i = 0; i < rinfo->dir_nr; i++) { struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i; - struct ceph_vino tvino, dvino; + struct ceph_vino tvino; dname.name = rde->name; dname.len = rde->name_len; @@ -1701,9 +1715,9 @@ retry_lookup: ceph_dentry(dn)->offset = rde->offset; - dvino = ceph_vino(d_inode(parent)); - update_dentry_lease(dn, rde->lease, req->r_session, - req->r_request_started, &tvino, &dvino); + update_dentry_lease(d_inode(parent), dn, + rde->lease, req->r_session, + req->r_request_started); if (err == 0 && skipped == 0 && cache_ctl.index >= 0) { ret = fill_readdir_cache(d_inode(parent), dn, @@ -2282,7 +2296,7 @@ static int statx_to_caps(u32 want) { int mask = 0; - if (want & (STATX_MODE|STATX_UID|STATX_GID|STATX_CTIME)) + if (want & (STATX_MODE|STATX_UID|STATX_GID|STATX_CTIME|STATX_BTIME)) mask |= CEPH_CAP_AUTH_SHARED; if (want & (STATX_NLINK|STATX_CTIME)) @@ -2307,6 +2321,7 @@ int ceph_getattr(const struct path *path, struct kstat *stat, { struct inode *inode = d_inode(path->dentry); struct ceph_inode_info *ci = ceph_inode(inode); + u32 valid_mask = STATX_BASIC_STATS; int err = 0; /* Skip the getattr altogether if we're asked not to sync */ @@ -2319,6 +2334,16 @@ int ceph_getattr(const struct path *path, struct kstat *stat, generic_fillattr(inode, stat); stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino); + + /* + * btime on newly-allocated inodes is 0, so if this is still set to + * that, then assume that it's not valid. + */ + if (ci->i_btime.tv_sec || ci->i_btime.tv_nsec) { + stat->btime = ci->i_btime; + valid_mask |= STATX_BTIME; + } + if (ceph_snap(inode) == CEPH_NOSNAP) stat->dev = inode->i_sb->s_dev; else @@ -2342,7 +2367,6 @@ int ceph_getattr(const struct path *path, struct kstat *stat, stat->nlink = 1 + 1 + ci->i_subdirs; } - /* Mask off any higher bits (e.g. btime) until we have support */ - stat->result_mask = request_mask & STATX_BASIC_STATS; + stat->result_mask = request_mask & valid_mask; return err; } diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index c8a9b89b922d..920e9f048bd8 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -150,14 +150,13 @@ static int parse_reply_info_in(void **p, void *end, info->pool_ns_data = *p; *p += info->pool_ns_len; } - /* btime, change_attr */ - { - struct ceph_timespec btime; - u64 change_attr; - ceph_decode_need(p, end, sizeof(btime), bad); - ceph_decode_copy(p, &btime, sizeof(btime)); - ceph_decode_64_safe(p, end, change_attr, bad); - } + + /* btime */ + ceph_decode_need(p, end, sizeof(info->btime), bad); + ceph_decode_copy(p, &info->btime, sizeof(info->btime)); + + /* change attribute */ + ceph_decode_64_safe(p, end, info->change_attr, bad); /* dir pin */ if (struct_v >= 2) { @@ -166,6 +165,15 @@ static int parse_reply_info_in(void **p, void *end, info->dir_pin = -ENODATA; } + /* snapshot birth time, remains zero for v<=2 */ + if (struct_v >= 3) { + ceph_decode_need(p, end, sizeof(info->snap_btime), bad); + ceph_decode_copy(p, &info->snap_btime, + sizeof(info->snap_btime)); + } else { + memset(&info->snap_btime, 0, sizeof(info->snap_btime)); + } + *p = end; } else { if (features & CEPH_FEATURE_MDS_INLINE_DATA) { @@ -197,7 +205,14 @@ static int parse_reply_info_in(void **p, void *end, } } + if (features & CEPH_FEATURE_FS_BTIME) { + ceph_decode_need(p, end, sizeof(info->btime), bad); + ceph_decode_copy(p, &info->btime, sizeof(info->btime)); + ceph_decode_64_safe(p, end, info->change_attr, bad); + } + info->dir_pin = -ENODATA; + /* info->snap_btime remains zero */ } return 0; bad: @@ -717,6 +732,7 @@ void ceph_mdsc_release_request(struct kref *kref) ceph_pagelist_release(req->r_pagelist); put_request_session(req); ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); + WARN_ON_ONCE(!list_empty(&req->r_wait)); kfree(req); } @@ -903,7 +919,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, struct inode *dir; rcu_read_lock(); - parent = req->r_dentry->d_parent; + parent = READ_ONCE(req->r_dentry->d_parent); dir = req->r_parent ? : d_inode_rcu(parent); if (!dir || dir->i_sb != mdsc->fsc->sb) { @@ -2135,7 +2151,7 @@ retry: memcpy(path + pos, temp->d_name.name, temp->d_name.len); } spin_unlock(&temp->d_lock); - temp = temp->d_parent; + temp = READ_ONCE(temp->d_parent); /* Are we at the root? */ if (IS_ROOT(temp)) @@ -3727,42 +3743,35 @@ static void check_new_map(struct ceph_mds_client *mdsc, ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", ceph_session_state_name(s->s_state)); - if (i >= newmap->m_num_mds || - memcmp(ceph_mdsmap_get_addr(oldmap, i), - ceph_mdsmap_get_addr(newmap, i), - sizeof(struct ceph_entity_addr))) { - if (s->s_state == CEPH_MDS_SESSION_OPENING) { - /* the session never opened, just close it - * out now */ - get_session(s); - __unregister_session(mdsc, s); - __wake_requests(mdsc, &s->s_waiting); - ceph_put_mds_session(s); - } else if (i >= newmap->m_num_mds) { - /* force close session for stopped mds */ - get_session(s); - __unregister_session(mdsc, s); - __wake_requests(mdsc, &s->s_waiting); - kick_requests(mdsc, i); - mutex_unlock(&mdsc->mutex); + if (i >= newmap->m_num_mds) { + /* force close session for stopped mds */ + get_session(s); + __unregister_session(mdsc, s); + __wake_requests(mdsc, &s->s_waiting); + mutex_unlock(&mdsc->mutex); - mutex_lock(&s->s_mutex); - cleanup_session_requests(mdsc, s); - remove_session_caps(s); - mutex_unlock(&s->s_mutex); + mutex_lock(&s->s_mutex); + cleanup_session_requests(mdsc, s); + remove_session_caps(s); + mutex_unlock(&s->s_mutex); - ceph_put_mds_session(s); + ceph_put_mds_session(s); - mutex_lock(&mdsc->mutex); - } else { - /* just close it */ - mutex_unlock(&mdsc->mutex); - mutex_lock(&s->s_mutex); - mutex_lock(&mdsc->mutex); - ceph_con_close(&s->s_con); - mutex_unlock(&s->s_mutex); - s->s_state = CEPH_MDS_SESSION_RESTARTING; - } + mutex_lock(&mdsc->mutex); + kick_requests(mdsc, i); + continue; + } + + if (memcmp(ceph_mdsmap_get_addr(oldmap, i), + ceph_mdsmap_get_addr(newmap, i), + sizeof(struct ceph_entity_addr))) { + /* just close it */ + mutex_unlock(&mdsc->mutex); + mutex_lock(&s->s_mutex); + mutex_lock(&mdsc->mutex); + ceph_con_close(&s->s_con); + mutex_unlock(&s->s_mutex); + s->s_state = CEPH_MDS_SESSION_RESTARTING; } else if (oldstate == newstate) { continue; /* nothing new with this mds */ } @@ -3931,31 +3940,33 @@ bad: } void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, - struct inode *inode, struct dentry *dentry, char action, u32 seq) { struct ceph_msg *msg; struct ceph_mds_lease *lease; - int len = sizeof(*lease) + sizeof(u32); - int dnamelen = 0; + struct inode *dir; + int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; - dout("lease_send_msg inode %p dentry %p %s to mds%d\n", - inode, dentry, ceph_lease_op_name(action), session->s_mds); - dnamelen = dentry->d_name.len; - len += dnamelen; + dout("lease_send_msg identry %p %s to mds%d\n", + dentry, ceph_lease_op_name(action), session->s_mds); msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); if (!msg) return; lease = msg->front.iov_base; lease->action = action; - lease->ino = cpu_to_le64(ceph_vino(inode).ino); - lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); lease->seq = cpu_to_le32(seq); - put_unaligned_le32(dnamelen, lease + 1); - memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen); + spin_lock(&dentry->d_lock); + dir = d_inode(dentry->d_parent); + lease->ino = cpu_to_le64(ceph_ino(dir)); + lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); + + put_unaligned_le32(dentry->d_name.len, lease + 1); + memcpy((void *)(lease + 1) + 4, + dentry->d_name.name, dentry->d_name.len); + spin_unlock(&dentry->d_lock); /* * if this is a preemptive lease RELEASE, no need to * flush request stream, since the actual request will @@ -4157,6 +4168,7 @@ static void wait_requests(struct ceph_mds_client *mdsc) while ((req = __get_oldest_req(mdsc))) { dout("wait_requests timed out on tid %llu\n", req->r_tid); + list_del_init(&req->r_wait); __unregister_request(mdsc, req); } } diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index a83f28bc2387..f7c8603484fe 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -69,6 +69,9 @@ struct ceph_mds_reply_info_in { u64 max_bytes; u64 max_files; s32 dir_pin; + struct ceph_timespec btime; + struct ceph_timespec snap_btime; + u64 change_attr; }; struct ceph_mds_reply_dir_entry { @@ -504,7 +507,6 @@ extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, extern void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry); extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, - struct inode *inode, struct dentry *dentry, char action, u32 seq); diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c index 701b4fb0fb5a..ce2d00da5096 100644 --- a/fs/ceph/mdsmap.c +++ b/fs/ceph/mdsmap.c @@ -107,7 +107,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) struct ceph_mdsmap *m; const void *start = *p; int i, j, n; - int err = -EINVAL; + int err; u8 mdsmap_v, mdsmap_cv; u16 mdsmap_ev; @@ -183,8 +183,9 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) inc = ceph_decode_32(p); state = ceph_decode_32(p); state_seq = ceph_decode_64(p); - ceph_decode_copy(p, &addr, sizeof(addr)); - ceph_decode_addr(&addr); + err = ceph_decode_entity_addr(p, end, &addr); + if (err) + goto corrupt; ceph_decode_copy(p, &laggy_since, sizeof(laggy_since)); *p += sizeof(u32); ceph_decode_32_safe(p, end, namelen, bad); @@ -357,7 +358,7 @@ bad_ext: nomem: err = -ENOMEM; goto out_err; -bad: +corrupt: pr_err("corrupt mdsmap\n"); print_hex_dump(KERN_DEBUG, "mdsmap: ", DUMP_PREFIX_OFFSET, 16, 1, @@ -365,6 +366,9 @@ bad: out_err: ceph_mdsmap_destroy(m); return ERR_PTR(err); +bad: + err = -EINVAL; + goto corrupt; } void ceph_mdsmap_destroy(struct ceph_mdsmap *m) diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c index d629fc857450..de56dee60540 100644 --- a/fs/ceph/quota.c +++ b/fs/ceph/quota.c @@ -135,7 +135,7 @@ static struct inode *lookup_quotarealm_inode(struct ceph_mds_client *mdsc, return NULL; mutex_lock(&qri->mutex); - if (qri->inode) { + if (qri->inode && ceph_is_any_caps(qri->inode)) { /* A request has already returned the inode */ mutex_unlock(&qri->mutex); return qri->inode; @@ -146,7 +146,18 @@ static struct inode *lookup_quotarealm_inode(struct ceph_mds_client *mdsc, mutex_unlock(&qri->mutex); return NULL; } - in = ceph_lookup_inode(sb, realm->ino); + if (qri->inode) { + /* get caps */ + int ret = __ceph_do_getattr(qri->inode, NULL, + CEPH_STAT_CAP_INODE, true); + if (ret >= 0) + in = qri->inode; + else + in = ERR_PTR(ret); + } else { + in = ceph_lookup_inode(sb, realm->ino); + } + if (IS_ERR(in)) { pr_warn("Can't lookup inode %llx (err: %ld)\n", realm->ino, PTR_ERR(in)); diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 72c6c022f02b..4c6494eb02b5 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -3,6 +3,7 @@ #include <linux/sort.h> #include <linux/slab.h> +#include <linux/iversion.h> #include "super.h" #include "mds_client.h" #include <linux/ceph/decode.h> @@ -606,6 +607,8 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci, capsnap->mtime = inode->i_mtime; capsnap->atime = inode->i_atime; capsnap->ctime = inode->i_ctime; + capsnap->btime = ci->i_btime; + capsnap->change_attr = inode_peek_iversion_raw(inode); capsnap->time_warp_seq = ci->i_time_warp_seq; capsnap->truncate_size = ci->i_truncate_size; capsnap->truncate_seq = ci->i_truncate_seq; diff --git a/fs/ceph/super.c b/fs/ceph/super.c index ed1b65a6c2c3..ab4868c7308e 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -840,10 +840,10 @@ static int ceph_remount(struct super_block *sb, int *flags, char *data) static const struct super_operations ceph_super_ops = { .alloc_inode = ceph_alloc_inode, - .destroy_inode = ceph_destroy_inode, .free_inode = ceph_free_inode, .write_inode = ceph_write_inode, - .drop_inode = ceph_drop_inode, + .drop_inode = generic_delete_inode, + .evict_inode = ceph_evict_inode, .sync_fs = ceph_sync_fs, .put_super = ceph_put_super, .remount_fs = ceph_remount, @@ -978,7 +978,7 @@ static int ceph_set_super(struct super_block *s, void *data) s->s_d_op = &ceph_dentry_ops; s->s_export_op = &ceph_export_ops; - s->s_time_gran = 1000; /* 1000 ns == 1 us */ + s->s_time_gran = 1; ret = set_anon_super(s, NULL); /* what is that second arg for? */ if (ret != 0) @@ -1159,17 +1159,15 @@ static int __init init_ceph(void) goto out; ceph_flock_init(); - ceph_xattr_init(); ret = register_filesystem(&ceph_fs_type); if (ret) - goto out_xattr; + goto out_caches; pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL); return 0; -out_xattr: - ceph_xattr_exit(); +out_caches: destroy_caches(); out: return ret; @@ -1179,7 +1177,6 @@ static void __exit exit_ceph(void) { dout("exit_ceph\n"); unregister_filesystem(&ceph_fs_type); - ceph_xattr_exit(); destroy_caches(); } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index fbe6869a3f95..d2352fd95dbc 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -197,7 +197,8 @@ struct ceph_cap_snap { u64 xattr_version; u64 size; - struct timespec64 mtime, atime, ctime; + u64 change_attr; + struct timespec64 mtime, atime, ctime, btime; u64 time_warp_seq; u64 truncate_size; u32 truncate_seq; @@ -384,6 +385,8 @@ struct ceph_inode_info { int i_snap_realm_counter; /* snap realm (if caps) */ struct list_head i_snap_realm_item; struct list_head i_snap_flush_item; + struct timespec64 i_btime; + struct timespec64 i_snap_btime; struct work_struct i_work; unsigned long i_work_mask; @@ -544,7 +547,12 @@ static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, long long release_count, long long ordered_count) { - smp_mb__before_atomic(); + /* + * Makes sure operations that setup readdir cache (update page + * cache and i_size) are strongly ordered w.r.t. the following + * atomic64_set() operations. + */ + smp_mb(); atomic64_set(&ci->i_complete_seq[0], release_count); atomic64_set(&ci->i_complete_seq[1], ordered_count); } @@ -876,9 +884,8 @@ static inline bool __ceph_have_pending_cap_snap(struct ceph_inode_info *ci) extern const struct inode_operations ceph_file_iops; extern struct inode *ceph_alloc_inode(struct super_block *sb); -extern void ceph_destroy_inode(struct inode *inode); +extern void ceph_evict_inode(struct inode *inode); extern void ceph_free_inode(struct inode *inode); -extern int ceph_drop_inode(struct inode *inode); extern struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino); @@ -921,10 +928,20 @@ ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t); extern ssize_t ceph_listxattr(struct dentry *, char *, size_t); extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci); extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci); -extern void __init ceph_xattr_init(void); -extern void ceph_xattr_exit(void); extern const struct xattr_handler *ceph_xattr_handlers[]; +struct ceph_acl_sec_ctx { +#ifdef CONFIG_CEPH_FS_POSIX_ACL + void *default_acl; + void *acl; +#endif +#ifdef CONFIG_CEPH_FS_SECURITY_LABEL + void *sec_ctx; + u32 sec_ctxlen; +#endif + struct ceph_pagelist *pagelist; +}; + #ifdef CONFIG_SECURITY extern bool ceph_security_xattr_deadlock(struct inode *in); extern bool ceph_security_xattr_wanted(struct inode *in); @@ -939,21 +956,32 @@ static inline bool ceph_security_xattr_wanted(struct inode *in) } #endif -/* acl.c */ -struct ceph_acls_info { - void *default_acl; - void *acl; - struct ceph_pagelist *pagelist; -}; +#ifdef CONFIG_CEPH_FS_SECURITY_LABEL +extern int ceph_security_init_secctx(struct dentry *dentry, umode_t mode, + struct ceph_acl_sec_ctx *ctx); +extern void ceph_security_invalidate_secctx(struct inode *inode); +#else +static inline int ceph_security_init_secctx(struct dentry *dentry, umode_t mode, + struct ceph_acl_sec_ctx *ctx) +{ + return 0; +} +static inline void ceph_security_invalidate_secctx(struct inode *inode) +{ +} +#endif + +void ceph_release_acl_sec_ctx(struct ceph_acl_sec_ctx *as_ctx); +/* acl.c */ #ifdef CONFIG_CEPH_FS_POSIX_ACL struct posix_acl *ceph_get_acl(struct inode *, int); int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type); int ceph_pre_init_acls(struct inode *dir, umode_t *mode, - struct ceph_acls_info *info); -void ceph_init_inode_acls(struct inode *inode, struct ceph_acls_info *info); -void ceph_release_acls_info(struct ceph_acls_info *info); + struct ceph_acl_sec_ctx *as_ctx); +void ceph_init_inode_acls(struct inode *inode, + struct ceph_acl_sec_ctx *as_ctx); static inline void ceph_forget_all_cached_acls(struct inode *inode) { @@ -966,15 +994,12 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode) #define ceph_set_acl NULL static inline int ceph_pre_init_acls(struct inode *dir, umode_t *mode, - struct ceph_acls_info *info) + struct ceph_acl_sec_ctx *as_ctx) { return 0; } static inline void ceph_init_inode_acls(struct inode *inode, - struct ceph_acls_info *info) -{ -} -static inline void ceph_release_acls_info(struct ceph_acls_info *info) + struct ceph_acl_sec_ctx *as_ctx) { } static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode) @@ -1000,7 +1025,7 @@ extern void ceph_add_cap(struct inode *inode, unsigned cap, unsigned seq, u64 realmino, int flags, struct ceph_cap **new_cap); extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); -extern void __ceph_remove_caps(struct inode* inode); +extern void __ceph_remove_caps(struct ceph_inode_info *ci); extern void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap); extern int ceph_is_any_caps(struct inode *inode); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 0cc42c8879e9..37b458a9af3a 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -8,6 +8,7 @@ #include <linux/ceph/decode.h> #include <linux/xattr.h> +#include <linux/security.h> #include <linux/posix_acl_xattr.h> #include <linux/slab.h> @@ -17,26 +18,9 @@ static int __remove_xattr(struct ceph_inode_info *ci, struct ceph_inode_xattr *xattr); -static const struct xattr_handler ceph_other_xattr_handler; - -/* - * List of handlers for synthetic system.* attributes. Other - * attributes are handled directly. - */ -const struct xattr_handler *ceph_xattr_handlers[] = { -#ifdef CONFIG_CEPH_FS_POSIX_ACL - &posix_acl_access_xattr_handler, - &posix_acl_default_xattr_handler, -#endif - &ceph_other_xattr_handler, - NULL, -}; - static bool ceph_is_valid_xattr(const char *name) { return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) || - !strncmp(name, XATTR_SECURITY_PREFIX, - XATTR_SECURITY_PREFIX_LEN) || !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) || !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN); } @@ -48,8 +32,8 @@ static bool ceph_is_valid_xattr(const char *name) struct ceph_vxattr { char *name; size_t name_size; /* strlen(name) + 1 (for '\0') */ - size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val, - size_t size); + ssize_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val, + size_t size); bool (*exists_cb)(struct ceph_inode_info *ci); unsigned int flags; }; @@ -68,8 +52,8 @@ static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci) rcu_dereference_raw(fl->pool_ns) != NULL); } -static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, - size_t size) +static ssize_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, + size_t size) { struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb); struct ceph_osd_client *osdc = &fsc->client->osdc; @@ -79,7 +63,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, const char *ns_field = " pool_namespace="; char buf[128]; size_t len, total_len = 0; - int ret; + ssize_t ret; pool_ns = ceph_try_get_string(ci->i_layout.pool_ns); @@ -96,18 +80,15 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, len = snprintf(buf, sizeof(buf), "stripe_unit=%u stripe_count=%u object_size=%u pool=%lld", ci->i_layout.stripe_unit, ci->i_layout.stripe_count, - ci->i_layout.object_size, (unsigned long long)pool); + ci->i_layout.object_size, pool); total_len = len; } if (pool_ns) total_len += strlen(ns_field) + pool_ns->len; - if (!size) { - ret = total_len; - } else if (total_len > size) { - ret = -ERANGE; - } else { + ret = total_len; + if (size >= total_len) { memcpy(val, buf, len); ret = len; if (pool_name) { @@ -128,28 +109,55 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, return ret; } -static size_t ceph_vxattrcb_layout_stripe_unit(struct ceph_inode_info *ci, - char *val, size_t size) +/* + * The convention with strings in xattrs is that they should not be NULL + * terminated, since we're returning the length with them. snprintf always + * NULL terminates however, so call it on a temporary buffer and then memcpy + * the result into place. + */ +static int ceph_fmt_xattr(char *val, size_t size, const char *fmt, ...) { - return snprintf(val, size, "%u", ci->i_layout.stripe_unit); + int ret; + va_list args; + char buf[96]; /* NB: reevaluate size if new vxattrs are added */ + + va_start(args, fmt); + ret = vsnprintf(buf, size ? sizeof(buf) : 0, fmt, args); + va_end(args); + + /* Sanity check */ + if (size && ret + 1 > sizeof(buf)) { + WARN_ONCE(true, "Returned length too big (%d)", ret); + return -E2BIG; + } + + if (ret <= size) + memcpy(val, buf, ret); + return ret; } -static size_t ceph_vxattrcb_layout_stripe_count(struct ceph_inode_info *ci, +static ssize_t ceph_vxattrcb_layout_stripe_unit(struct ceph_inode_info *ci, char *val, size_t size) { - return snprintf(val, size, "%u", ci->i_layout.stripe_count); + return ceph_fmt_xattr(val, size, "%u", ci->i_layout.stripe_unit); +} + +static ssize_t ceph_vxattrcb_layout_stripe_count(struct ceph_inode_info *ci, + char *val, size_t size) +{ + return ceph_fmt_xattr(val, size, "%u", ci->i_layout.stripe_count); } -static size_t ceph_vxattrcb_layout_object_size(struct ceph_inode_info *ci, - char *val, size_t size) +static ssize_t ceph_vxattrcb_layout_object_size(struct ceph_inode_info *ci, + char *val, size_t size) { - return snprintf(val, size, "%u", ci->i_layout.object_size); + return ceph_fmt_xattr(val, size, "%u", ci->i_layout.object_size); } -static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci, - char *val, size_t size) +static ssize_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci, + char *val, size_t size) { - int ret; + ssize_t ret; struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb); struct ceph_osd_client *osdc = &fsc->client->osdc; s64 pool = ci->i_layout.pool_id; @@ -157,21 +165,27 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci, down_read(&osdc->lock); pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); - if (pool_name) - ret = snprintf(val, size, "%s", pool_name); - else - ret = snprintf(val, size, "%lld", (unsigned long long)pool); + if (pool_name) { + ret = strlen(pool_name); + if (ret <= size) + memcpy(val, pool_name, ret); + } else { + ret = ceph_fmt_xattr(val, size, "%lld", pool); + } up_read(&osdc->lock); return ret; } -static size_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci, - char *val, size_t size) +static ssize_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci, + char *val, size_t size) { - int ret = 0; + ssize_t ret = 0; struct ceph_string *ns = ceph_try_get_string(ci->i_layout.pool_ns); + if (ns) { - ret = snprintf(val, size, "%.*s", (int)ns->len, ns->str); + ret = ns->len; + if (ret <= size) + memcpy(val, ns->str, ret); ceph_put_string(ns); } return ret; @@ -179,53 +193,54 @@ static size_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci, /* directories */ -static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val, - size_t size) +static ssize_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val, + size_t size) { - return snprintf(val, size, "%lld", ci->i_files + ci->i_subdirs); + return ceph_fmt_xattr(val, size, "%lld", ci->i_files + ci->i_subdirs); } -static size_t ceph_vxattrcb_dir_files(struct ceph_inode_info *ci, char *val, - size_t size) +static ssize_t ceph_vxattrcb_dir_files(struct ceph_inode_info *ci, char *val, + size_t size) { - return snprintf(val, size, "%lld", ci->i_files); + return ceph_fmt_xattr(val, size, "%lld", ci->i_files); } -static size_t ceph_vxattrcb_dir_subdirs(struct ceph_inode_info *ci, char *val, - size_t size) +static ssize_t ceph_vxattrcb_dir_subdirs(struct ceph_inode_info *ci, char *val, + size_t size) { - return snprintf(val, size, "%lld", ci->i_subdirs); + return ceph_fmt_xattr(val, size, "%lld", ci->i_subdirs); } -static size_t ceph_vxattrcb_dir_rentries(struct ceph_inode_info *ci, char *val, - size_t size) +static ssize_t ceph_vxattrcb_dir_rentries(struct ceph_inode_info *ci, char *val, + size_t size) { - return snprintf(val, size, "%lld", ci->i_rfiles + ci->i_rsubdirs); + return ceph_fmt_xattr(val, size, "%lld", + ci->i_rfiles + ci->i_rsubdirs); } -static size_t ceph_vxattrcb_dir_rfiles(struct ceph_inode_info *ci, char *val, - size_t size) +static ssize_t ceph_vxattrcb_dir_rfiles(struct ceph_inode_info *ci, char *val, + size_t size) { - return snprintf(val, size, "%lld", ci->i_rfiles); + return ceph_fmt_xattr(val, size, "%lld", ci->i_rfiles); } -static size_t ceph_vxattrcb_dir_rsubdirs(struct ceph_inode_info *ci, char *val, - size_t size) +static ssize_t ceph_vxattrcb_dir_rsubdirs(struct ceph_inode_info *ci, char *val, + size_t size) { - return snprintf(val, size, "%lld", ci->i_rsubdirs); + return ceph_fmt_xattr(val, size, "%lld", ci->i_rsubdirs); } -static size_t ceph_vxattrcb_dir_rbytes(struct ceph_inode_info *ci, char *val, - size_t size) +static ssize_t ceph_vxattrcb_dir_rbytes(struct ceph_inode_info *ci, char *val, + size_t size) { - return snprintf(val, size, "%lld", ci->i_rbytes); + return ceph_fmt_xattr(val, size, "%lld", ci->i_rbytes); } -static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val, - size_t size) +static ssize_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val, + size_t size) { - return snprintf(val, size, "%lld.09%ld", ci->i_rctime.tv_sec, - ci->i_rctime.tv_nsec); + return ceph_fmt_xattr(val, size, "%lld.%09ld", ci->i_rctime.tv_sec, + ci->i_rctime.tv_nsec); } /* dir pin */ @@ -234,10 +249,10 @@ static bool ceph_vxattrcb_dir_pin_exists(struct ceph_inode_info *ci) return ci->i_dir_pin != -ENODATA; } -static size_t ceph_vxattrcb_dir_pin(struct ceph_inode_info *ci, char *val, - size_t size) +static ssize_t ceph_vxattrcb_dir_pin(struct ceph_inode_info *ci, char *val, + size_t size) { - return snprintf(val, size, "%d", (int)ci->i_dir_pin); + return ceph_fmt_xattr(val, size, "%d", (int)ci->i_dir_pin); } /* quotas */ @@ -254,23 +269,36 @@ static bool ceph_vxattrcb_quota_exists(struct ceph_inode_info *ci) return ret; } -static size_t ceph_vxattrcb_quota(struct ceph_inode_info *ci, char *val, - size_t size) +static ssize_t ceph_vxattrcb_quota(struct ceph_inode_info *ci, char *val, + size_t size) +{ + return ceph_fmt_xattr(val, size, "max_bytes=%llu max_files=%llu", + ci->i_max_bytes, ci->i_max_files); +} + +static ssize_t ceph_vxattrcb_quota_max_bytes(struct ceph_inode_info *ci, + char *val, size_t size) { - return snprintf(val, size, "max_bytes=%llu max_files=%llu", - ci->i_max_bytes, ci->i_max_files); + return ceph_fmt_xattr(val, size, "%llu", ci->i_max_bytes); } -static size_t ceph_vxattrcb_quota_max_bytes(struct ceph_inode_info *ci, - char *val, size_t size) +static ssize_t ceph_vxattrcb_quota_max_files(struct ceph_inode_info *ci, + char *val, size_t size) { - return snprintf(val, size, "%llu", ci->i_max_bytes); + return ceph_fmt_xattr(val, size, "%llu", ci->i_max_files); } -static size_t ceph_vxattrcb_quota_max_files(struct ceph_inode_info *ci, - char *val, size_t size) +/* snapshots */ +static bool ceph_vxattrcb_snap_btime_exists(struct ceph_inode_info *ci) { - return snprintf(val, size, "%llu", ci->i_max_files); + return (ci->i_snap_btime.tv_sec != 0 || ci->i_snap_btime.tv_nsec != 0); +} + +static ssize_t ceph_vxattrcb_snap_btime(struct ceph_inode_info *ci, char *val, + size_t size) +{ + return ceph_fmt_xattr(val, size, "%lld.%09ld", ci->i_snap_btime.tv_sec, + ci->i_snap_btime.tv_nsec); } #define CEPH_XATTR_NAME(_type, _name) XATTR_CEPH_PREFIX #_type "." #_name @@ -327,7 +355,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = { XATTR_RSTAT_FIELD(dir, rctime), { .name = "ceph.dir.pin", - .name_size = sizeof("ceph.dir_pin"), + .name_size = sizeof("ceph.dir.pin"), .getxattr_cb = ceph_vxattrcb_dir_pin, .exists_cb = ceph_vxattrcb_dir_pin_exists, .flags = VXATTR_FLAG_HIDDEN, @@ -341,9 +369,15 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = { }, XATTR_QUOTA_FIELD(quota, max_bytes), XATTR_QUOTA_FIELD(quota, max_files), + { + .name = "ceph.snap.btime", + .name_size = sizeof("ceph.snap.btime"), + .getxattr_cb = ceph_vxattrcb_snap_btime, + .exists_cb = ceph_vxattrcb_snap_btime_exists, + .flags = VXATTR_FLAG_READONLY, + }, { .name = NULL, 0 } /* Required table terminator */ }; -static size_t ceph_dir_vxattrs_name_size; /* total size of all names */ /* files */ @@ -360,9 +394,15 @@ static struct ceph_vxattr ceph_file_vxattrs[] = { XATTR_LAYOUT_FIELD(file, layout, object_size), XATTR_LAYOUT_FIELD(file, layout, pool), XATTR_LAYOUT_FIELD(file, layout, pool_namespace), + { + .name = "ceph.snap.btime", + .name_size = sizeof("ceph.snap.btime"), + .getxattr_cb = ceph_vxattrcb_snap_btime, + .exists_cb = ceph_vxattrcb_snap_btime_exists, + .flags = VXATTR_FLAG_READONLY, + }, { .name = NULL, 0 } /* Required table terminator */ }; -static size_t ceph_file_vxattrs_name_size; /* total size of all names */ static struct ceph_vxattr *ceph_inode_vxattrs(struct inode *inode) { @@ -373,47 +413,6 @@ static struct ceph_vxattr *ceph_inode_vxattrs(struct inode *inode) return NULL; } -static size_t ceph_vxattrs_name_size(struct ceph_vxattr *vxattrs) -{ - if (vxattrs == ceph_dir_vxattrs) - return ceph_dir_vxattrs_name_size; - if (vxattrs == ceph_file_vxattrs) - return ceph_file_vxattrs_name_size; - BUG_ON(vxattrs); - return 0; -} - -/* - * Compute the aggregate size (including terminating '\0') of all - * virtual extended attribute names in the given vxattr table. - */ -static size_t __init vxattrs_name_size(struct ceph_vxattr *vxattrs) -{ - struct ceph_vxattr *vxattr; - size_t size = 0; - - for (vxattr = vxattrs; vxattr->name; vxattr++) { - if (!(vxattr->flags & VXATTR_FLAG_HIDDEN)) - size += vxattr->name_size; - } - - return size; -} - -/* Routines called at initialization and exit time */ - -void __init ceph_xattr_init(void) -{ - ceph_dir_vxattrs_name_size = vxattrs_name_size(ceph_dir_vxattrs); - ceph_file_vxattrs_name_size = vxattrs_name_size(ceph_file_vxattrs); -} - -void ceph_xattr_exit(void) -{ - ceph_dir_vxattrs_name_size = 0; - ceph_file_vxattrs_name_size = 0; -} - static struct ceph_vxattr *ceph_match_vxattr(struct inode *inode, const char *name) { @@ -523,8 +522,8 @@ static int __set_xattr(struct ceph_inode_info *ci, dout("__set_xattr_val p=%p\n", p); } - dout("__set_xattr_val added %llx.%llx xattr %p %s=%.*s\n", - ceph_vinop(&ci->vfs_inode), xattr, name, val_len, val); + dout("__set_xattr_val added %llx.%llx xattr %p %.*s=%.*s\n", + ceph_vinop(&ci->vfs_inode), xattr, name_len, name, val_len, val); return 0; } @@ -823,7 +822,7 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, struct ceph_inode_xattr *xattr; struct ceph_vxattr *vxattr = NULL; int req_mask; - int err; + ssize_t err; /* let's see if a virtual xattr was requested */ vxattr = ceph_match_vxattr(inode, name); @@ -835,8 +834,11 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, if (err) return err; err = -ENODATA; - if (!(vxattr->exists_cb && !vxattr->exists_cb(ci))) + if (!(vxattr->exists_cb && !vxattr->exists_cb(ci))) { err = vxattr->getxattr_cb(ci, value, size); + if (size && size < err) + err = -ERANGE; + } return err; } @@ -897,10 +899,9 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) struct inode *inode = d_inode(dentry); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_vxattr *vxattrs = ceph_inode_vxattrs(inode); - u32 vir_namelen = 0; + bool len_only = (size == 0); u32 namelen; int err; - u32 len; int i; spin_lock(&ci->i_ceph_lock); @@ -919,38 +920,45 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) err = __build_xattrs(inode); if (err < 0) goto out; - /* - * Start with virtual dir xattr names (if any) (including - * terminating '\0' characters for each). - */ - vir_namelen = ceph_vxattrs_name_size(vxattrs); - /* adding 1 byte per each variable due to the null termination */ + /* add 1 byte for each xattr due to the null termination */ namelen = ci->i_xattrs.names_size + ci->i_xattrs.count; - err = -ERANGE; - if (size && vir_namelen + namelen > size) - goto out; - - err = namelen + vir_namelen; - if (size == 0) - goto out; + if (!len_only) { + if (namelen > size) { + err = -ERANGE; + goto out; + } + names = __copy_xattr_names(ci, names); + size -= namelen; + } - names = __copy_xattr_names(ci, names); /* virtual xattr names, too */ - err = namelen; if (vxattrs) { for (i = 0; vxattrs[i].name; i++) { - if (!(vxattrs[i].flags & VXATTR_FLAG_HIDDEN) && - !(vxattrs[i].exists_cb && - !vxattrs[i].exists_cb(ci))) { - len = sprintf(names, "%s", vxattrs[i].name); - names += len + 1; - err += len + 1; + size_t this_len; + + if (vxattrs[i].flags & VXATTR_FLAG_HIDDEN) + continue; + if (vxattrs[i].exists_cb && !vxattrs[i].exists_cb(ci)) + continue; + + this_len = strlen(vxattrs[i].name) + 1; + namelen += this_len; + if (len_only) + continue; + + if (this_len > size) { + err = -ERANGE; + goto out; } + + memcpy(names, vxattrs[i].name, this_len); + names += this_len; + size -= this_len; } } - + err = namelen; out: spin_unlock(&ci->i_ceph_lock); return err; @@ -1206,4 +1214,138 @@ bool ceph_security_xattr_deadlock(struct inode *in) spin_unlock(&ci->i_ceph_lock); return ret; } + +#ifdef CONFIG_CEPH_FS_SECURITY_LABEL +int ceph_security_init_secctx(struct dentry *dentry, umode_t mode, + struct ceph_acl_sec_ctx *as_ctx) +{ + struct ceph_pagelist *pagelist = as_ctx->pagelist; + const char *name; + size_t name_len; + int err; + + err = security_dentry_init_security(dentry, mode, &dentry->d_name, + &as_ctx->sec_ctx, + &as_ctx->sec_ctxlen); + if (err < 0) { + WARN_ON_ONCE(err != -EOPNOTSUPP); + err = 0; /* do nothing */ + goto out; + } + + err = -ENOMEM; + if (!pagelist) { + pagelist = ceph_pagelist_alloc(GFP_KERNEL); + if (!pagelist) + goto out; + err = ceph_pagelist_reserve(pagelist, PAGE_SIZE); + if (err) + goto out; + ceph_pagelist_encode_32(pagelist, 1); + } + + /* + * FIXME: Make security_dentry_init_security() generic. Currently + * It only supports single security module and only selinux has + * dentry_init_security hook. + */ + name = XATTR_NAME_SELINUX; + name_len = strlen(name); + err = ceph_pagelist_reserve(pagelist, + 4 * 2 + name_len + as_ctx->sec_ctxlen); + if (err) + goto out; + + if (as_ctx->pagelist) { + /* update count of KV pairs */ + BUG_ON(pagelist->length <= sizeof(__le32)); + if (list_is_singular(&pagelist->head)) { + le32_add_cpu((__le32*)pagelist->mapped_tail, 1); + } else { + struct page *page = list_first_entry(&pagelist->head, + struct page, lru); + void *addr = kmap_atomic(page); + le32_add_cpu((__le32*)addr, 1); + kunmap_atomic(addr); + } + } else { + as_ctx->pagelist = pagelist; + } + + ceph_pagelist_encode_32(pagelist, name_len); + ceph_pagelist_append(pagelist, name, name_len); + + ceph_pagelist_encode_32(pagelist, as_ctx->sec_ctxlen); + ceph_pagelist_append(pagelist, as_ctx->sec_ctx, as_ctx->sec_ctxlen); + + err = 0; +out: + if (pagelist && !as_ctx->pagelist) + ceph_pagelist_release(pagelist); + return err; +} + +void ceph_security_invalidate_secctx(struct inode *inode) +{ + security_inode_invalidate_secctx(inode); +} + +static int ceph_xattr_set_security_label(const struct xattr_handler *handler, + struct dentry *unused, struct inode *inode, + const char *key, const void *buf, + size_t buflen, int flags) +{ + if (security_ismaclabel(key)) { + const char *name = xattr_full_name(handler, key); + return __ceph_setxattr(inode, name, buf, buflen, flags); + } + return -EOPNOTSUPP; +} + +static int ceph_xattr_get_security_label(const struct xattr_handler *handler, + struct dentry *unused, struct inode *inode, + const char *key, void *buf, size_t buflen) +{ + if (security_ismaclabel(key)) { + const char *name = xattr_full_name(handler, key); + return __ceph_getxattr(inode, name, buf, buflen); + } + return -EOPNOTSUPP; +} + +static const struct xattr_handler ceph_security_label_handler = { + .prefix = XATTR_SECURITY_PREFIX, + .get = ceph_xattr_get_security_label, + .set = ceph_xattr_set_security_label, +}; +#endif #endif + +void ceph_release_acl_sec_ctx(struct ceph_acl_sec_ctx *as_ctx) +{ +#ifdef CONFIG_CEPH_FS_POSIX_ACL + posix_acl_release(as_ctx->acl); + posix_acl_release(as_ctx->default_acl); +#endif +#ifdef CONFIG_CEPH_FS_SECURITY_LABEL + security_release_secctx(as_ctx->sec_ctx, as_ctx->sec_ctxlen); +#endif + if (as_ctx->pagelist) + ceph_pagelist_release(as_ctx->pagelist); +} + +/* + * List of handlers for synthetic system.* attributes. Other + * attributes are handled directly. + */ +const struct xattr_handler *ceph_xattr_handlers[] = { +#ifdef CONFIG_CEPH_FS_POSIX_ACL + &posix_acl_access_xattr_handler, + &posix_acl_default_xattr_handler, +#endif +#ifdef CONFIG_CEPH_FS_SECURITY_LABEL + &ceph_security_label_handler, +#endif + &ceph_other_xattr_handler, + NULL, +}; diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h index 65a38c4a02a1..39e6f4c57580 100644 --- a/include/linux/ceph/ceph_features.h +++ b/include/linux/ceph/ceph_features.h @@ -211,6 +211,7 @@ DEFINE_CEPH_FEATURE_DEPRECATED(63, 1, RESERVED_BROKEN, LUMINOUS) // client-facin CEPH_FEATURE_MON_STATEFUL_SUB | \ CEPH_FEATURE_CRUSH_TUNABLES5 | \ CEPH_FEATURE_NEW_OSDOPREPLY_ENCODING | \ + CEPH_FEATURE_MSG_ADDR2 | \ CEPH_FEATURE_CEPHX_V2) #define CEPH_FEATURES_REQUIRED_DEFAULT 0 diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 3ac0feaf2b5e..cb21c5cf12c3 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -682,7 +682,7 @@ extern const char *ceph_cap_op_name(int op); /* flags field in client cap messages (version >= 10) */ #define CEPH_CLIENT_CAPS_SYNC (1<<0) #define CEPH_CLIENT_CAPS_NO_CAPSNAP (1<<1) -#define CEPH_CLIENT_CAPS_PENDING_CAPSNAP (1<<2); +#define CEPH_CLIENT_CAPS_PENDING_CAPSNAP (1<<2) /* * caps message, used for capability callbacks, acks, requests, etc. diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h index bea6c77d2093..17bc7584d1fe 100644 --- a/include/linux/ceph/cls_lock_client.h +++ b/include/linux/ceph/cls_lock_client.h @@ -52,4 +52,7 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc, char *lock_name, u8 *type, char **tag, struct ceph_locker **lockers, u32 *num_lockers); +int ceph_cls_assert_locked(struct ceph_osd_request *req, int which, + char *lock_name, u8 type, char *cookie, char *tag); + #endif diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h index a6c2a48d42e0..450384fe487c 100644 --- a/include/linux/ceph/decode.h +++ b/include/linux/ceph/decode.h @@ -218,18 +218,27 @@ static inline void ceph_encode_timespec64(struct ceph_timespec *tv, /* * sockaddr_storage <-> ceph_sockaddr */ -static inline void ceph_encode_addr(struct ceph_entity_addr *a) +#define CEPH_ENTITY_ADDR_TYPE_NONE 0 +#define CEPH_ENTITY_ADDR_TYPE_LEGACY __cpu_to_le32(1) + +static inline void ceph_encode_banner_addr(struct ceph_entity_addr *a) { __be16 ss_family = htons(a->in_addr.ss_family); a->in_addr.ss_family = *(__u16 *)&ss_family; + + /* Banner addresses require TYPE_NONE */ + a->type = CEPH_ENTITY_ADDR_TYPE_NONE; } -static inline void ceph_decode_addr(struct ceph_entity_addr *a) +static inline void ceph_decode_banner_addr(struct ceph_entity_addr *a) { __be16 ss_family = *(__be16 *)&a->in_addr.ss_family; a->in_addr.ss_family = ntohs(ss_family); WARN_ON(a->in_addr.ss_family == 512); + a->type = CEPH_ENTITY_ADDR_TYPE_LEGACY; } +extern int ceph_decode_entity_addr(void **p, void *end, + struct ceph_entity_addr *addr); /* * encoders */ diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 337d5049ff93..82156da3c650 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -84,11 +84,13 @@ struct ceph_options { #define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024) /* - * Handle the largest possible rbd object in one message. + * The largest possible rbd data object is 32M. + * The largest possible rbd object map object is 64M. + * * There is no limit on the size of cephfs objects, but it has to obey * rsize and wsize mount options anyway. */ -#define CEPH_MSG_MAX_DATA_LEN (32*1024*1024) +#define CEPH_MSG_MAX_DATA_LEN (64*1024*1024) #define CEPH_AUTH_NAME_DEFAULT "guest" @@ -299,10 +301,6 @@ int ceph_wait_for_latest_osdmap(struct ceph_client *client, /* pagevec.c */ extern void ceph_release_page_vector(struct page **pages, int num_pages); - -extern struct page **ceph_get_direct_page_vector(const void __user *data, - int num_pages, - bool write_page); extern void ceph_put_page_vector(struct page **pages, int num_pages, bool dirty); extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags); diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h index 3a4688af7455..b4d134d3312a 100644 --- a/include/linux/ceph/mon_client.h +++ b/include/linux/ceph/mon_client.h @@ -104,7 +104,6 @@ struct ceph_mon_client { #endif }; -extern struct ceph_monmap *ceph_monmap_decode(void *p, void *end); extern int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 2294f963dab7..ad7fe5d10dcd 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -198,9 +198,9 @@ struct ceph_osd_request { bool r_mempool; struct completion r_completion; /* private to osd_client.c */ ceph_osdc_callback_t r_callback; - struct list_head r_unsafe_item; struct inode *r_inode; /* for use by callbacks */ + struct list_head r_private_item; /* ditto */ void *r_priv; /* ditto */ /* set by submitter */ @@ -389,6 +389,14 @@ extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb); void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err); +#define osd_req_op_data(oreq, whch, typ, fld) \ +({ \ + struct ceph_osd_request *__oreq = (oreq); \ + unsigned int __whch = (whch); \ + BUG_ON(__whch >= __oreq->r_num_ops); \ + &__oreq->r_ops[__whch].typ.fld; \ +}) + extern void osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, u32 flags); @@ -497,7 +505,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, const char *class, const char *method, unsigned int flags, struct page *req_page, size_t req_len, - struct page *resp_page, size_t *resp_len); + struct page **resp_pages, size_t *resp_len); extern int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct ceph_vino vino, diff --git a/include/linux/ceph/striper.h b/include/linux/ceph/striper.h index cbd0d24b7148..3486636c0e6e 100644 --- a/include/linux/ceph/striper.h +++ b/include/linux/ceph/striper.h @@ -66,4 +66,6 @@ int ceph_extent_to_file(struct ceph_file_layout *l, struct ceph_file_extent **file_extents, u32 *num_file_extents); +u64 ceph_get_num_objects(struct ceph_file_layout *l, u64 size); + #endif diff --git a/include/linux/iversion.h b/include/linux/iversion.h index be50ef7cedab..2917ef990d43 100644 --- a/include/linux/iversion.h +++ b/include/linux/iversion.h @@ -113,6 +113,30 @@ inode_peek_iversion_raw(const struct inode *inode) } /** + * inode_set_max_iversion_raw - update i_version new value is larger + * @inode: inode to set + * @val: new i_version to set + * + * Some self-managed filesystems (e.g Ceph) will only update the i_version + * value if the new value is larger than the one we already have. + */ +static inline void +inode_set_max_iversion_raw(struct inode *inode, u64 val) +{ + u64 cur, old; + + cur = inode_peek_iversion_raw(inode); + for (;;) { + if (cur > val) + break; + old = atomic64_cmpxchg(&inode->i_version, cur, val); + if (likely(old == cur)) + break; + cur = old; + } +} + +/** * inode_set_iversion - set i_version to a particular value * @inode: inode to set * @val: new i_version value to set diff --git a/net/ceph/Makefile b/net/ceph/Makefile index db09defe27d0..59d0ba2072de 100644 --- a/net/ceph/Makefile +++ b/net/ceph/Makefile @@ -5,7 +5,7 @@ obj-$(CONFIG_CEPH_LIB) += libceph.o libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \ - mon_client.o \ + mon_client.o decode.o \ cls_lock_client.o \ osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \ striper.o \ diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c index 4cc28541281b..17447c19d937 100644 --- a/net/ceph/cls_lock_client.c +++ b/net/ceph/cls_lock_client.c @@ -6,6 +6,7 @@ #include <linux/ceph/cls_lock_client.h> #include <linux/ceph/decode.h> +#include <linux/ceph/libceph.h> /** * ceph_cls_lock - grab rados lock for object @@ -264,8 +265,11 @@ static int decode_locker(void **p, void *end, struct ceph_locker *locker) return ret; *p += sizeof(struct ceph_timespec); /* skip expiration */ - ceph_decode_copy(p, &locker->info.addr, sizeof(locker->info.addr)); - ceph_decode_addr(&locker->info.addr); + + ret = ceph_decode_entity_addr(p, end, &locker->info.addr); + if (ret) + return ret; + len = ceph_decode_32(p); *p += len; /* skip description */ @@ -360,7 +364,7 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc, dout("%s lock_name %s\n", __func__, lock_name); ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info", CEPH_OSD_FLAG_READ, get_info_op_page, - get_info_op_buf_size, reply_page, &reply_len); + get_info_op_buf_size, &reply_page, &reply_len); dout("%s: status %d\n", __func__, ret); if (ret >= 0) { @@ -375,3 +379,47 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc, return ret; } EXPORT_SYMBOL(ceph_cls_lock_info); + +int ceph_cls_assert_locked(struct ceph_osd_request *req, int which, + char *lock_name, u8 type, char *cookie, char *tag) +{ + int assert_op_buf_size; + int name_len = strlen(lock_name); + int cookie_len = strlen(cookie); + int tag_len = strlen(tag); + struct page **pages; + void *p, *end; + int ret; + + assert_op_buf_size = name_len + sizeof(__le32) + + cookie_len + sizeof(__le32) + + tag_len + sizeof(__le32) + + sizeof(u8) + CEPH_ENCODING_START_BLK_LEN; + if (assert_op_buf_size > PAGE_SIZE) + return -E2BIG; + + ret = osd_req_op_cls_init(req, which, "lock", "assert_locked"); + if (ret) + return ret; + + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + p = page_address(pages[0]); + end = p + assert_op_buf_size; + + /* encode cls_lock_assert_op struct */ + ceph_start_encoding(&p, 1, 1, + assert_op_buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_string(&p, end, lock_name, name_len); + ceph_encode_8(&p, type); + ceph_encode_string(&p, end, cookie, cookie_len); + ceph_encode_string(&p, end, tag, tag_len); + WARN_ON(p != end); + + osd_req_op_cls_request_data_pages(req, which, pages, assert_op_buf_size, + 0, false, true); + return 0; +} +EXPORT_SYMBOL(ceph_cls_assert_locked); diff --git a/net/ceph/decode.c b/net/ceph/decode.c new file mode 100644 index 000000000000..eea529595a7a --- /dev/null +++ b/net/ceph/decode.c @@ -0,0 +1,84 @@ +// SPDX-License-Identifier: GPL-2.0 + +#include <linux/ceph/decode.h> + +static int +ceph_decode_entity_addr_versioned(void **p, void *end, + struct ceph_entity_addr *addr) +{ + int ret; + u8 struct_v; + u32 struct_len, addr_len; + void *struct_end; + + ret = ceph_start_decoding(p, end, 1, "entity_addr_t", &struct_v, + &struct_len); + if (ret) + goto bad; + + ret = -EINVAL; + struct_end = *p + struct_len; + + ceph_decode_copy_safe(p, end, &addr->type, sizeof(addr->type), bad); + + ceph_decode_copy_safe(p, end, &addr->nonce, sizeof(addr->nonce), bad); + + ceph_decode_32_safe(p, end, addr_len, bad); + if (addr_len > sizeof(addr->in_addr)) + goto bad; + + memset(&addr->in_addr, 0, sizeof(addr->in_addr)); + if (addr_len) { + ceph_decode_copy_safe(p, end, &addr->in_addr, addr_len, bad); + + addr->in_addr.ss_family = + le16_to_cpu((__force __le16)addr->in_addr.ss_family); + } + + /* Advance past anything the client doesn't yet understand */ + *p = struct_end; + ret = 0; +bad: + return ret; +} + +static int +ceph_decode_entity_addr_legacy(void **p, void *end, + struct ceph_entity_addr *addr) +{ + int ret = -EINVAL; + + /* Skip rest of type field */ + ceph_decode_skip_n(p, end, 3, bad); + + /* + * Clients that don't support ADDR2 always send TYPE_NONE, change it + * to TYPE_LEGACY for forward compatibility. + */ + addr->type = CEPH_ENTITY_ADDR_TYPE_LEGACY; + ceph_decode_copy_safe(p, end, &addr->nonce, sizeof(addr->nonce), bad); + memset(&addr->in_addr, 0, sizeof(addr->in_addr)); + ceph_decode_copy_safe(p, end, &addr->in_addr, + sizeof(addr->in_addr), bad); + addr->in_addr.ss_family = + be16_to_cpu((__force __be16)addr->in_addr.ss_family); + ret = 0; +bad: + return ret; +} + +int +ceph_decode_entity_addr(void **p, void *end, struct ceph_entity_addr *addr) +{ + u8 marker; + + ceph_decode_8_safe(p, end, marker, bad); + if (marker == 1) + return ceph_decode_entity_addr_versioned(p, end, addr); + else if (marker == 0) + return ceph_decode_entity_addr_legacy(p, end, addr); +bad: + return -EINVAL; +} +EXPORT_SYMBOL(ceph_decode_entity_addr); + diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index a33402c99321..962f521c863e 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -199,12 +199,14 @@ const char *ceph_pr_addr(const struct ceph_entity_addr *addr) switch (ss.ss_family) { case AF_INET: - snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr, + snprintf(s, MAX_ADDR_STR_LEN, "(%d)%pI4:%hu", + le32_to_cpu(addr->type), &in4->sin_addr, ntohs(in4->sin_port)); break; case AF_INET6: - snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr, + snprintf(s, MAX_ADDR_STR_LEN, "(%d)[%pI6c]:%hu", + le32_to_cpu(addr->type), &in6->sin6_addr, ntohs(in6->sin6_port)); break; @@ -220,7 +222,7 @@ EXPORT_SYMBOL(ceph_pr_addr); static void encode_my_addr(struct ceph_messenger *msgr) { memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); - ceph_encode_addr(&msgr->my_enc_addr); + ceph_encode_banner_addr(&msgr->my_enc_addr); } /* @@ -1732,12 +1734,14 @@ static int read_partial_banner(struct ceph_connection *con) ret = read_partial(con, end, size, &con->actual_peer_addr); if (ret <= 0) goto out; + ceph_decode_banner_addr(&con->actual_peer_addr); size = sizeof (con->peer_addr_for_me); end += size; ret = read_partial(con, end, size, &con->peer_addr_for_me); if (ret <= 0) goto out; + ceph_decode_banner_addr(&con->peer_addr_for_me); out: return ret; @@ -1981,6 +1985,7 @@ int ceph_parse_ips(const char *c, const char *end, } addr_set_port(&addr[i], port); + addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY; dout("parse_ips got %s\n", ceph_pr_addr(&addr[i])); @@ -2011,9 +2016,6 @@ static int process_banner(struct ceph_connection *con) if (verify_hello(con) < 0) return -1; - ceph_decode_addr(&con->actual_peer_addr); - ceph_decode_addr(&con->peer_addr_for_me); - /* * Make sure the other end is who we wanted. note that the other * end may not yet know their ip address, so if it's 0.0.0.0, give diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 895679d3529b..0520bf9825aa 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -39,7 +39,7 @@ static int __validate_auth(struct ceph_mon_client *monc); /* * Decode a monmap blob (e.g., during mount). */ -struct ceph_monmap *ceph_monmap_decode(void *p, void *end) +static struct ceph_monmap *ceph_monmap_decode(void *p, void *end) { struct ceph_monmap *m = NULL; int i, err = -EINVAL; @@ -50,7 +50,7 @@ struct ceph_monmap *ceph_monmap_decode(void *p, void *end) ceph_decode_32_safe(&p, end, len, bad); ceph_decode_need(&p, end, len, bad); - dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); + dout("monmap_decode %p %p len %d (%d)\n", p, end, len, (int)(end-p)); p += sizeof(u16); /* skip version */ ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); @@ -58,7 +58,6 @@ struct ceph_monmap *ceph_monmap_decode(void *p, void *end) epoch = ceph_decode_32(&p); num_mon = ceph_decode_32(&p); - ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); if (num_mon > CEPH_MAX_MON) goto bad; @@ -68,17 +67,22 @@ struct ceph_monmap *ceph_monmap_decode(void *p, void *end) m->fsid = fsid; m->epoch = epoch; m->num_mon = num_mon; - ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); - for (i = 0; i < num_mon; i++) - ceph_decode_addr(&m->mon_inst[i].addr); - + for (i = 0; i < num_mon; ++i) { + struct ceph_entity_inst *inst = &m->mon_inst[i]; + + /* copy name portion */ + ceph_decode_copy_safe(&p, end, &inst->name, + sizeof(inst->name), bad); + err = ceph_decode_entity_addr(&p, end, &inst->addr); + if (err) + goto bad; + } dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, m->num_mon); for (i = 0; i < m->num_mon; i++) dout("monmap_decode mon%d is %s\n", i, ceph_pr_addr(&m->mon_inst[i].addr)); return m; - bad: dout("monmap_decode failed with %d\n", err); kfree(m); @@ -469,6 +473,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc, if (IS_ERR(monmap)) { pr_err("problem decoding monmap, %d\n", (int)PTR_ERR(monmap)); + ceph_msg_dump(msg); goto out; } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 9a8eca5eda65..0b2df09b2554 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -171,14 +171,6 @@ static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data, osd_data->num_bvecs = num_bvecs; } -#define osd_req_op_data(oreq, whch, typ, fld) \ -({ \ - struct ceph_osd_request *__oreq = (oreq); \ - unsigned int __whch = (whch); \ - BUG_ON(__whch >= __oreq->r_num_ops); \ - &__oreq->r_ops[__whch].typ.fld; \ -}) - static struct ceph_osd_data * osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) { @@ -478,7 +470,7 @@ static void request_release_checks(struct ceph_osd_request *req) { WARN_ON(!RB_EMPTY_NODE(&req->r_node)); WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node)); - WARN_ON(!list_empty(&req->r_unsafe_item)); + WARN_ON(!list_empty(&req->r_private_item)); WARN_ON(req->r_osd); } @@ -538,7 +530,7 @@ static void request_init(struct ceph_osd_request *req) init_completion(&req->r_completion); RB_CLEAR_NODE(&req->r_node); RB_CLEAR_NODE(&req->r_mc_node); - INIT_LIST_HEAD(&req->r_unsafe_item); + INIT_LIST_HEAD(&req->r_private_item); target_init(&req->r_t); } @@ -4914,20 +4906,26 @@ static int decode_watcher(void **p, void *end, struct ceph_watch_item *item) ret = ceph_start_decoding(p, end, 2, "watch_item_t", &struct_v, &struct_len); if (ret) - return ret; + goto bad; + + ret = -EINVAL; + ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad); + ceph_decode_64_safe(p, end, item->cookie, bad); + ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */ - ceph_decode_copy(p, &item->name, sizeof(item->name)); - item->cookie = ceph_decode_64(p); - *p += 4; /* skip timeout_seconds */ if (struct_v >= 2) { - ceph_decode_copy(p, &item->addr, sizeof(item->addr)); - ceph_decode_addr(&item->addr); + ret = ceph_decode_entity_addr(p, end, &item->addr); + if (ret) + goto bad; + } else { + ret = 0; } dout("%s %s%llu cookie %llu addr %s\n", __func__, ENTITY_NAME(item->name), item->cookie, ceph_pr_addr(&item->addr)); - return 0; +bad: + return ret; } static int decode_watchers(void **p, void *end, @@ -5044,12 +5042,12 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, const char *class, const char *method, unsigned int flags, struct page *req_page, size_t req_len, - struct page *resp_page, size_t *resp_len) + struct page **resp_pages, size_t *resp_len) { struct ceph_osd_request *req; int ret; - if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE)) + if (req_len > PAGE_SIZE) return -E2BIG; req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); @@ -5067,8 +5065,8 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, if (req_page) osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 0, false, false); - if (resp_page) - osd_req_op_cls_response_data_pages(req, 0, &resp_page, + if (resp_pages) + osd_req_op_cls_response_data_pages(req, 0, resp_pages, *resp_len, 0, false, false); ret = ceph_osdc_alloc_messages(req, GFP_NOIO); @@ -5079,7 +5077,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, ret = ceph_osdc_wait_request(osdc, req); if (ret >= 0) { ret = req->r_ops[0].rval; - if (resp_page) + if (resp_pages) *resp_len = req->r_ops[0].outdata_len; } diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 48a31dc9161c..90437906b7bc 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1489,11 +1489,9 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) /* osd_state, osd_weight, osd_addrs->client_addr */ ceph_decode_need(p, end, 3*sizeof(u32) + - map->max_osd*((struct_v >= 5 ? sizeof(u32) : - sizeof(u8)) + - sizeof(*map->osd_weight) + - sizeof(*map->osd_addr)), e_inval); - + map->max_osd*(struct_v >= 5 ? sizeof(u32) : + sizeof(u8)) + + sizeof(*map->osd_weight), e_inval); if (ceph_decode_32(p) != map->max_osd) goto e_inval; @@ -1514,9 +1512,11 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) if (ceph_decode_32(p) != map->max_osd) goto e_inval; - ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr)); - for (i = 0; i < map->max_osd; i++) - ceph_decode_addr(&map->osd_addr[i]); + for (i = 0; i < map->max_osd; i++) { + err = ceph_decode_entity_addr(p, end, &map->osd_addr[i]); + if (err) + goto bad; + } /* pg_temp */ err = decode_pg_temp(p, end, map); @@ -1618,12 +1618,17 @@ static int decode_new_up_state_weight(void **p, void *end, u8 struct_v, void *new_state; void *new_weight_end; u32 len; + int i; new_up_client = *p; ceph_decode_32_safe(p, end, len, e_inval); - len *= sizeof(u32) + sizeof(struct ceph_entity_addr); - ceph_decode_need(p, end, len, e_inval); - *p += len; + for (i = 0; i < len; ++i) { + struct ceph_entity_addr addr; + + ceph_decode_skip_32(p, end, e_inval); + if (ceph_decode_entity_addr(p, end, &addr)) + goto e_inval; + } new_state = *p; ceph_decode_32_safe(p, end, len, e_inval); @@ -1699,9 +1704,9 @@ static int decode_new_up_state_weight(void **p, void *end, u8 struct_v, struct ceph_entity_addr addr; osd = ceph_decode_32(p); - ceph_decode_copy(p, &addr, sizeof(addr)); - ceph_decode_addr(&addr); BUG_ON(osd >= map->max_osd); + if (ceph_decode_entity_addr(p, end, &addr)) + goto e_inval; pr_info("osd%d up\n", osd); map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP; map->osd_addr[osd] = addr; diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c index 74cafc0142ea..64305e7056a1 100644 --- a/net/ceph/pagevec.c +++ b/net/ceph/pagevec.c @@ -10,39 +10,6 @@ #include <linux/ceph/libceph.h> -/* - * build a vector of user pages - */ -struct page **ceph_get_direct_page_vector(const void __user *data, - int num_pages, bool write_page) -{ - struct page **pages; - int got = 0; - int rc = 0; - - pages = kmalloc_array(num_pages, sizeof(*pages), GFP_NOFS); - if (!pages) - return ERR_PTR(-ENOMEM); - - while (got < num_pages) { - rc = get_user_pages_fast( - (unsigned long)data + ((unsigned long)got * PAGE_SIZE), - num_pages - got, write_page ? FOLL_WRITE : 0, pages + got); - if (rc < 0) - break; - BUG_ON(rc == 0); - got += rc; - } - if (rc < 0) - goto fail; - return pages; - -fail: - ceph_put_page_vector(pages, got, false); - return ERR_PTR(rc); -} -EXPORT_SYMBOL(ceph_get_direct_page_vector); - void ceph_put_page_vector(struct page **pages, int num_pages, bool dirty) { int i; diff --git a/net/ceph/striper.c b/net/ceph/striper.c index c36462dc86b7..3b3fa75d1189 100644 --- a/net/ceph/striper.c +++ b/net/ceph/striper.c @@ -259,3 +259,20 @@ int ceph_extent_to_file(struct ceph_file_layout *l, return 0; } EXPORT_SYMBOL(ceph_extent_to_file); + +u64 ceph_get_num_objects(struct ceph_file_layout *l, u64 size) +{ + u64 period = (u64)l->stripe_count * l->object_size; + u64 num_periods = DIV64_U64_ROUND_UP(size, period); + u64 remainder_bytes; + u64 remainder_objs = 0; + + div64_u64_rem(size, period, &remainder_bytes); + if (remainder_bytes > 0 && + remainder_bytes < (u64)l->stripe_count * l->stripe_unit) + remainder_objs = l->stripe_count - + DIV_ROUND_UP_ULL(remainder_bytes, l->stripe_unit); + + return num_periods * l->stripe_count - remainder_objs; +} +EXPORT_SYMBOL(ceph_get_num_objects); |