diff options
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r-- | net/ceph/osd_client.c | 125 |
1 files changed, 93 insertions, 32 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index f3fc54eac09d..53299c7b0ca4 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -292,6 +292,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, ceph_osd_data_release(&op->cls.request_data); ceph_osd_data_release(&op->cls.response_data); break; + case CEPH_OSD_OP_SETXATTR: + case CEPH_OSD_OP_CMPXATTR: + ceph_osd_data_release(&op->xattr.osd_data); + break; default: break; } @@ -476,8 +480,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req, size_t payload_len = 0; BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && - opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO && - opcode != CEPH_OSD_OP_TRUNCATE); + opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE); op->extent.offset = offset; op->extent.length = length; @@ -545,6 +548,39 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, } EXPORT_SYMBOL(osd_req_op_cls_init); +int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, + u16 opcode, const char *name, const void *value, + size_t size, u8 cmp_op, u8 cmp_mode) +{ + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_pagelist *pagelist; + size_t payload_len; + + BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); + + pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); + if (!pagelist) + return -ENOMEM; + + ceph_pagelist_init(pagelist); + + payload_len = strlen(name); + op->xattr.name_len = payload_len; + ceph_pagelist_append(pagelist, name, payload_len); + + op->xattr.value_len = size; + ceph_pagelist_append(pagelist, value, size); + payload_len += size; + + op->xattr.cmp_op = cmp_op; + op->xattr.cmp_mode = cmp_mode; + + ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); + op->payload_len = payload_len; + return 0; +} +EXPORT_SYMBOL(osd_req_op_xattr_init); + void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, u64 cookie, u64 version, int flag) @@ -626,7 +662,6 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_ZERO: - case CEPH_OSD_OP_DELETE: case CEPH_OSD_OP_TRUNCATE: if (src->op == CEPH_OSD_OP_WRITE) request_data_len = src->extent.length; @@ -676,6 +711,19 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, dst->alloc_hint.expected_write_size = cpu_to_le64(src->alloc_hint.expected_write_size); break; + case CEPH_OSD_OP_SETXATTR: + case CEPH_OSD_OP_CMPXATTR: + dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); + dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); + dst->xattr.cmp_op = src->xattr.cmp_op; + dst->xattr.cmp_mode = src->xattr.cmp_mode; + osd_data = &src->xattr.osd_data; + ceph_osdc_msg_data_add(req->r_request, osd_data); + request_data_len = osd_data->pagelist->length; + break; + case CEPH_OSD_OP_CREATE: + case CEPH_OSD_OP_DELETE: + break; default: pr_err("unsupported osd opcode %s\n", ceph_osd_op_name(src->op)); @@ -705,7 +753,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, struct ceph_file_layout *layout, struct ceph_vino vino, - u64 off, u64 *plen, int num_ops, + u64 off, u64 *plen, + unsigned int which, int num_ops, int opcode, int flags, struct ceph_snap_context *snapc, u32 truncate_seq, @@ -716,13 +765,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, u64 objnum = 0; u64 objoff = 0; u64 objlen = 0; - u32 object_size; - u64 object_base; int r; BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && - opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO && - opcode != CEPH_OSD_OP_TRUNCATE); + opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && + opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, GFP_NOFS); @@ -738,29 +785,24 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, return ERR_PTR(r); } - object_size = le32_to_cpu(layout->fl_object_size); - object_base = off - objoff; - if (!(truncate_seq == 1 && truncate_size == -1ULL)) { - if (truncate_size <= object_base) { - truncate_size = 0; - } else { - truncate_size -= object_base; - if (truncate_size > object_size) - truncate_size = object_size; + if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { + osd_req_op_init(req, which, opcode); + } else { + u32 object_size = le32_to_cpu(layout->fl_object_size); + u32 object_base = off - objoff; + if (!(truncate_seq == 1 && truncate_size == -1ULL)) { + if (truncate_size <= object_base) { + truncate_size = 0; + } else { + truncate_size -= object_base; + if (truncate_size > object_size) + truncate_size = object_size; + } } + osd_req_op_extent_init(req, which, opcode, objoff, objlen, + truncate_size, truncate_seq); } - osd_req_op_extent_init(req, 0, opcode, objoff, objlen, - truncate_size, truncate_seq); - - /* - * A second op in the ops array means the caller wants to - * also issue a include a 'startsync' command so that the - * osd will flush data quickly. - */ - if (num_ops > 1) - osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); - req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name), @@ -1007,8 +1049,8 @@ static void put_osd(struct ceph_osd *osd) static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) { dout("__remove_osd %p\n", osd); - BUG_ON(!list_empty(&osd->o_requests)); - BUG_ON(!list_empty(&osd->o_linger_requests)); + WARN_ON(!list_empty(&osd->o_requests)); + WARN_ON(!list_empty(&osd->o_linger_requests)); rb_erase(&osd->o_node, &osdc->osds); list_del_init(&osd->o_osd_lru); @@ -1254,6 +1296,8 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc, if (list_empty(&req->r_osd_item)) req->r_osd = NULL; } + + list_del_init(&req->r_req_lru_item); /* can be on notarget */ ceph_osdc_put_request(req); } @@ -1395,6 +1439,7 @@ static int __map_request(struct ceph_osd_client *osdc, if (req->r_osd) { __cancel_request(req); list_del_init(&req->r_osd_item); + list_del_init(&req->r_linger_osd_item); req->r_osd = NULL; } @@ -2623,7 +2668,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, vino.snap, off, *plen); - req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, + req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, truncate_seq, truncate_size, false); @@ -2666,7 +2711,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, int page_align = off & ~PAGE_MASK; BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ - req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, + req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, snapc, truncate_seq, truncate_size, @@ -2917,6 +2962,20 @@ static int invalidate_authorizer(struct ceph_connection *con) return ceph_monc_validate_auth(&osdc->client->monc); } +static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) +{ + struct ceph_osd *o = con->private; + struct ceph_auth_handshake *auth = &o->o_auth; + return ceph_auth_sign_message(auth, msg); +} + +static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) +{ + struct ceph_osd *o = con->private; + struct ceph_auth_handshake *auth = &o->o_auth; + return ceph_auth_check_message_signature(auth, msg); +} + static const struct ceph_connection_operations osd_con_ops = { .get = get_osd_con, .put = put_osd_con, @@ -2925,5 +2984,7 @@ static const struct ceph_connection_operations osd_con_ops = { .verify_authorizer_reply = verify_authorizer_reply, .invalidate_authorizer = invalidate_authorizer, .alloc_msg = alloc_msg, + .sign_message = sign_message, + .check_message_signature = check_message_signature, .fault = osd_reset, }; |