diff options
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r-- | fs/ceph/mds_client.c | 676 |
1 files changed, 520 insertions, 156 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 5fb367b1d4b0..615db141b6c4 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -15,6 +15,7 @@ #include "super.h" #include "mds_client.h" +#include "crypto.h" #include <linux/ceph/ceph_features.h> #include <linux/ceph/messenger.h> @@ -184,8 +185,54 @@ static int parse_reply_info_in(void **p, void *end, info->rsnaps = 0; } + if (struct_v >= 5) { + u32 alen; + + ceph_decode_32_safe(p, end, alen, bad); + + while (alen--) { + u32 len; + + /* key */ + ceph_decode_32_safe(p, end, len, bad); + ceph_decode_skip_n(p, end, len, bad); + /* value */ + ceph_decode_32_safe(p, end, len, bad); + ceph_decode_skip_n(p, end, len, bad); + } + } + + /* fscrypt flag -- ignore */ + if (struct_v >= 6) + ceph_decode_skip_8(p, end, bad); + + info->fscrypt_auth = NULL; + info->fscrypt_auth_len = 0; + info->fscrypt_file = NULL; + info->fscrypt_file_len = 0; + if (struct_v >= 7) { + ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad); + if (info->fscrypt_auth_len) { + info->fscrypt_auth = kmalloc(info->fscrypt_auth_len, + GFP_KERNEL); + if (!info->fscrypt_auth) + return -ENOMEM; + ceph_decode_copy_safe(p, end, info->fscrypt_auth, + info->fscrypt_auth_len, bad); + } + ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad); + if (info->fscrypt_file_len) { + info->fscrypt_file = kmalloc(info->fscrypt_file_len, + GFP_KERNEL); + if (!info->fscrypt_file) + return -ENOMEM; + ceph_decode_copy_safe(p, end, info->fscrypt_file, + info->fscrypt_file_len, bad); + } + } *p = end; } else { + /* legacy (unversioned) struct */ if (features & CEPH_FEATURE_MDS_INLINE_DATA) { ceph_decode_64_safe(p, end, info->inline_version, bad); ceph_decode_32_safe(p, end, info->inline_len, bad); @@ -263,27 +310,47 @@ bad: static int parse_reply_info_lease(void **p, void *end, struct ceph_mds_reply_lease **lease, - u64 features) + u64 features, u32 *altname_len, u8 **altname) { + u8 struct_v; + u32 struct_len; + void *lend; + if (features == (u64)-1) { - u8 struct_v, struct_compat; - u32 struct_len; + u8 struct_compat; + ceph_decode_8_safe(p, end, struct_v, bad); ceph_decode_8_safe(p, end, struct_compat, bad); + /* struct_v is expected to be >= 1. we only understand * encoding whose struct_compat == 1. */ if (!struct_v || struct_compat != 1) goto bad; + ceph_decode_32_safe(p, end, struct_len, bad); - ceph_decode_need(p, end, struct_len, bad); - end = *p + struct_len; + } else { + struct_len = sizeof(**lease); + *altname_len = 0; + *altname = NULL; } - ceph_decode_need(p, end, sizeof(**lease), bad); + lend = *p + struct_len; + ceph_decode_need(p, end, struct_len, bad); *lease = *p; *p += sizeof(**lease); - if (features == (u64)-1) - *p = end; + + if (features == (u64)-1) { + if (struct_v >= 2) { + ceph_decode_32_safe(p, end, *altname_len, bad); + ceph_decode_need(p, end, *altname_len, bad); + *altname = *p; + *p += *altname_len; + } else { + *altname = NULL; + *altname_len = 0; + } + } + *p = lend; return 0; bad: return -EIO; @@ -313,7 +380,8 @@ static int parse_reply_info_trace(void **p, void *end, info->dname = *p; *p += info->dname_len; - err = parse_reply_info_lease(p, end, &info->dlease, features); + err = parse_reply_info_lease(p, end, &info->dlease, features, + &info->altname_len, &info->altname); if (err < 0) goto out_bad; } @@ -339,9 +407,10 @@ out_bad: * parse readdir results */ static int parse_reply_info_readdir(void **p, void *end, - struct ceph_mds_reply_info_parsed *info, - u64 features) + struct ceph_mds_request *req, + u64 features) { + struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; u32 num, i = 0; int err; @@ -371,18 +440,87 @@ static int parse_reply_info_readdir(void **p, void *end, info->dir_nr = num; while (num) { + struct inode *inode = d_inode(req->r_dentry); + struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; + struct fscrypt_str tname = FSTR_INIT(NULL, 0); + struct fscrypt_str oname = FSTR_INIT(NULL, 0); + struct ceph_fname fname; + u32 altname_len, _name_len; + u8 *altname, *_name; + /* dentry */ - ceph_decode_32_safe(p, end, rde->name_len, bad); - ceph_decode_need(p, end, rde->name_len, bad); - rde->name = *p; - *p += rde->name_len; - dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); + ceph_decode_32_safe(p, end, _name_len, bad); + ceph_decode_need(p, end, _name_len, bad); + _name = *p; + *p += _name_len; + dout("parsed dir dname '%.*s'\n", _name_len, _name); + + if (info->hash_order) + rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, + _name, _name_len); /* dentry lease */ - err = parse_reply_info_lease(p, end, &rde->lease, features); + err = parse_reply_info_lease(p, end, &rde->lease, features, + &altname_len, &altname); if (err) goto out_bad; + + /* + * Try to dencrypt the dentry names and update them + * in the ceph_mds_reply_dir_entry struct. + */ + fname.dir = inode; + fname.name = _name; + fname.name_len = _name_len; + fname.ctext = altname; + fname.ctext_len = altname_len; + /* + * The _name_len maybe larger than altname_len, such as + * when the human readable name length is in range of + * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE), + * then the copy in ceph_fname_to_usr will corrupt the + * data if there has no encryption key. + * + * Just set the no_copy flag and then if there has no + * encryption key the oname.name will be assigned to + * _name always. + */ + fname.no_copy = true; + if (altname_len == 0) { + /* + * Set tname to _name, and this will be used + * to do the base64_decode in-place. It's + * safe because the decoded string should + * always be shorter, which is 3/4 of origin + * string. + */ + tname.name = _name; + + /* + * Set oname to _name too, and this will be + * used to do the dencryption in-place. + */ + oname.name = _name; + oname.len = _name_len; + } else { + /* + * This will do the decryption only in-place + * from altname cryptext directly. + */ + oname.name = altname; + oname.len = altname_len; + } + rde->is_nokey = false; + err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey); + if (err) { + pr_err("%s unable to decode %.*s, got %d\n", __func__, + _name_len, _name, err); + goto out_bad; + } + rde->name = oname.name; + rde->name_len = oname.len; + /* inode */ err = parse_reply_info_in(p, end, &rde->inode, features); if (err < 0) @@ -581,15 +719,16 @@ bad: * parse extra results */ static int parse_reply_info_extra(void **p, void *end, - struct ceph_mds_reply_info_parsed *info, + struct ceph_mds_request *req, u64 features, struct ceph_mds_session *s) { + struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; u32 op = le32_to_cpu(info->head->op); if (op == CEPH_MDS_OP_GETFILELOCK) return parse_reply_info_filelock(p, end, info, features); else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) - return parse_reply_info_readdir(p, end, info, features); + return parse_reply_info_readdir(p, end, req, features); else if (op == CEPH_MDS_OP_CREATE) return parse_reply_info_create(p, end, info, features, s); else if (op == CEPH_MDS_OP_GETVXATTR) @@ -602,9 +741,9 @@ static int parse_reply_info_extra(void **p, void *end, * parse entire mds reply */ static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, - struct ceph_mds_reply_info_parsed *info, - u64 features) + struct ceph_mds_request *req, u64 features) { + struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; void *p, *end; u32 len; int err; @@ -626,7 +765,7 @@ static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, ceph_decode_32_safe(&p, end, len, bad); if (len > 0) { ceph_decode_need(&p, end, len, bad); - err = parse_reply_info_extra(&p, p+len, info, features, s); + err = parse_reply_info_extra(&p, p+len, req, features, s); if (err < 0) goto out_bad; } @@ -651,8 +790,21 @@ out_bad: static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) { + int i; + + kfree(info->diri.fscrypt_auth); + kfree(info->diri.fscrypt_file); + kfree(info->targeti.fscrypt_auth); + kfree(info->targeti.fscrypt_file); if (!info->dir_entries) return; + + for (i = 0; i < info->dir_nr; i++) { + struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; + + kfree(rde->inode.fscrypt_auth); + kfree(rde->inode.fscrypt_file); + } free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); } @@ -945,6 +1097,7 @@ void ceph_mdsc_release_request(struct kref *kref) iput(req->r_parent); } iput(req->r_target_inode); + iput(req->r_new_inode); if (req->r_dentry) dput(req->r_dentry); if (req->r_old_dentry) @@ -965,6 +1118,8 @@ void ceph_mdsc_release_request(struct kref *kref) put_cred(req->r_cred); if (req->r_pagelist) ceph_pagelist_release(req->r_pagelist); + kfree(req->r_fscrypt_auth); + kfree(req->r_altname); put_request_session(req); ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); WARN_ON_ONCE(!list_empty(&req->r_wait)); @@ -2373,20 +2528,90 @@ static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) return mdsc->oldest_tid; } -/* - * Build a dentry's path. Allocate on heap; caller must kfree. Based - * on build_path_from_dentry in fs/cifs/dir.c. +#if IS_ENABLED(CONFIG_FS_ENCRYPTION) +static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) +{ + struct inode *dir = req->r_parent; + struct dentry *dentry = req->r_dentry; + u8 *cryptbuf = NULL; + u32 len = 0; + int ret = 0; + + /* only encode if we have parent and dentry */ + if (!dir || !dentry) + goto success; + + /* No-op unless this is encrypted */ + if (!IS_ENCRYPTED(dir)) + goto success; + + ret = ceph_fscrypt_prepare_readdir(dir); + if (ret < 0) + return ERR_PTR(ret); + + /* No key? Just ignore it. */ + if (!fscrypt_has_encryption_key(dir)) + goto success; + + if (!fscrypt_fname_encrypted_size(dir, dentry->d_name.len, NAME_MAX, + &len)) { + WARN_ON_ONCE(1); + return ERR_PTR(-ENAMETOOLONG); + } + + /* No need to append altname if name is short enough */ + if (len <= CEPH_NOHASH_NAME_MAX) { + len = 0; + goto success; + } + + cryptbuf = kmalloc(len, GFP_KERNEL); + if (!cryptbuf) + return ERR_PTR(-ENOMEM); + + ret = fscrypt_fname_encrypt(dir, &dentry->d_name, cryptbuf, len); + if (ret) { + kfree(cryptbuf); + return ERR_PTR(ret); + } +success: + *plen = len; + return cryptbuf; +} +#else +static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) +{ + *plen = 0; + return NULL; +} +#endif + +/** + * ceph_mdsc_build_path - build a path string to a given dentry + * @dentry: dentry to which path should be built + * @plen: returned length of string + * @pbase: returned base inode number + * @for_wire: is this path going to be sent to the MDS? + * + * Build a string that represents the path to the dentry. This is mostly called + * for two different purposes: + * + * 1) we need to build a path string to send to the MDS (for_wire == true) + * 2) we need a path string for local presentation (e.g. debugfs) + * (for_wire == false) * - * If @stop_on_nosnap, generate path relative to the first non-snapped - * inode. + * The path is built in reverse, starting with the dentry. Walk back up toward + * the root, building the path until the first non-snapped inode is reached + * (for_wire) or the root inode is reached (!for_wire). * * Encode hidden .snap dirs as a double /, i.e. * foo/.snap/bar -> foo//bar */ char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, - int stop_on_nosnap) + int for_wire) { - struct dentry *temp; + struct dentry *cur; + struct inode *inode; char *path; int pos; unsigned seq; @@ -2403,34 +2628,72 @@ retry: path[pos] = '\0'; seq = read_seqbegin(&rename_lock); - rcu_read_lock(); - temp = dentry; + cur = dget(dentry); for (;;) { - struct inode *inode; + struct dentry *parent; - spin_lock(&temp->d_lock); - inode = d_inode(temp); + spin_lock(&cur->d_lock); + inode = d_inode(cur); if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { dout("build_path path+%d: %p SNAPDIR\n", - pos, temp); - } else if (stop_on_nosnap && inode && dentry != temp && + pos, cur); + spin_unlock(&cur->d_lock); + parent = dget_parent(cur); + } else if (for_wire && inode && dentry != cur && ceph_snap(inode) == CEPH_NOSNAP) { - spin_unlock(&temp->d_lock); + spin_unlock(&cur->d_lock); pos++; /* get rid of any prepended '/' */ break; + } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) { + pos -= cur->d_name.len; + if (pos < 0) { + spin_unlock(&cur->d_lock); + break; + } + memcpy(path + pos, cur->d_name.name, cur->d_name.len); + spin_unlock(&cur->d_lock); + parent = dget_parent(cur); } else { - pos -= temp->d_name.len; + int len, ret; + char buf[NAME_MAX]; + + /* + * Proactively copy name into buf, in case we need to + * present it as-is. + */ + memcpy(buf, cur->d_name.name, cur->d_name.len); + len = cur->d_name.len; + spin_unlock(&cur->d_lock); + parent = dget_parent(cur); + + ret = ceph_fscrypt_prepare_readdir(d_inode(parent)); + if (ret < 0) { + dput(parent); + dput(cur); + return ERR_PTR(ret); + } + + if (fscrypt_has_encryption_key(d_inode(parent))) { + len = ceph_encode_encrypted_fname(d_inode(parent), + cur, buf); + if (len < 0) { + dput(parent); + dput(cur); + return ERR_PTR(len); + } + } + pos -= len; if (pos < 0) { - spin_unlock(&temp->d_lock); + dput(parent); break; } - memcpy(path + pos, temp->d_name.name, temp->d_name.len); + memcpy(path + pos, buf, len); } - spin_unlock(&temp->d_lock); - temp = READ_ONCE(temp->d_parent); + dput(cur); + cur = parent; /* Are we at the root? */ - if (IS_ROOT(temp)) + if (IS_ROOT(cur)) break; /* Are we out of buffer? */ @@ -2439,8 +2702,9 @@ retry: path[pos] = '/'; } - base = ceph_ino(d_inode(temp)); - rcu_read_unlock(); + inode = d_inode(cur); + base = inode ? ceph_ino(inode) : 0; + dput(cur); if (read_seqretry(&rename_lock, seq)) goto retry; @@ -2450,8 +2714,8 @@ retry: * A rename didn't occur, but somehow we didn't end up where * we thought we would. Throw a warning and try again. */ - pr_warn("build_path did not end path lookup where " - "expected, pos is %d\n", pos); + pr_warn("build_path did not end path lookup where expected (pos = %d)\n", + pos); goto retry; } @@ -2471,7 +2735,8 @@ static int build_dentry_path(struct dentry *dentry, struct inode *dir, rcu_read_lock(); if (!dir) dir = d_inode_rcu(dentry->d_parent); - if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { + if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP && + !IS_ENCRYPTED(dir)) { *pino = ceph_ino(dir); rcu_read_unlock(); *ppath = dentry->d_name.name; @@ -2539,8 +2804,8 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, return r; } -static void encode_timestamp_and_gids(void **p, - const struct ceph_mds_request *req) +static void encode_mclientrequest_tail(void **p, + const struct ceph_mds_request *req) { struct ceph_timespec ts; int i; @@ -2548,11 +2813,43 @@ static void encode_timestamp_and_gids(void **p, ceph_encode_timespec64(&ts, &req->r_stamp); ceph_encode_copy(p, &ts, sizeof(ts)); - /* gid_list */ + /* v4: gid_list */ ceph_encode_32(p, req->r_cred->group_info->ngroups); for (i = 0; i < req->r_cred->group_info->ngroups; i++) ceph_encode_64(p, from_kgid(&init_user_ns, req->r_cred->group_info->gid[i])); + + /* v5: altname */ + ceph_encode_32(p, req->r_altname_len); + ceph_encode_copy(p, req->r_altname, req->r_altname_len); + + /* v6: fscrypt_auth and fscrypt_file */ + if (req->r_fscrypt_auth) { + u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth); + + ceph_encode_32(p, authlen); + ceph_encode_copy(p, req->r_fscrypt_auth, authlen); + } else { + ceph_encode_32(p, 0); + } + if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) { + ceph_encode_32(p, sizeof(__le64)); + ceph_encode_64(p, req->r_fscrypt_file); + } else { + ceph_encode_32(p, 0); + } +} + +static struct ceph_mds_request_head_legacy * +find_legacy_request_head(void *p, u64 features) +{ + bool legacy = !(features & CEPH_FEATURE_FS_BTIME); + struct ceph_mds_request_head_old *ohead; + + if (legacy) + return (struct ceph_mds_request_head_legacy *)p; + ohead = (struct ceph_mds_request_head_old *)p; + return (struct ceph_mds_request_head_legacy *)&ohead->oldest_client_tid; } /* @@ -2565,7 +2862,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, int mds = session->s_mds; struct ceph_mds_client *mdsc = session->s_mdsc; struct ceph_msg *msg; - struct ceph_mds_request_head_old *head; + struct ceph_mds_request_head_legacy *lhead; const char *path1 = NULL; const char *path2 = NULL; u64 ino1 = 0, ino2 = 0; @@ -2577,6 +2874,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, void *p, *end; int ret; bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); + bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, + &session->s_features); ret = set_request_path_attr(req->r_inode, req->r_dentry, req->r_parent, req->r_path1, req->r_ino1.ino, @@ -2601,12 +2900,32 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, goto out_free1; } - len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head); - len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + - sizeof(struct ceph_timespec); - len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); + req->r_altname = get_fscrypt_altname(req, &req->r_altname_len); + if (IS_ERR(req->r_altname)) { + msg = ERR_CAST(req->r_altname); + req->r_altname = NULL; + goto out_free2; + } + + /* + * For old cephs without supporting the 32bit retry/fwd feature + * it will copy the raw memories directly when decoding the + * requests. While new cephs will decode the head depending the + * version member, so we need to make sure it will be compatible + * with them both. + */ + if (legacy) + len = sizeof(struct ceph_mds_request_head_legacy); + else if (old_version) + len = sizeof(struct ceph_mds_request_head_old); + else + len = sizeof(struct ceph_mds_request_head); - /* calculate (max) length for cap releases */ + /* filepaths */ + len += 2 * (1 + sizeof(u32) + sizeof(u64)); + len += pathlen1 + pathlen2; + + /* cap releases */ len += sizeof(struct ceph_mds_request_release) * (!!req->r_inode_drop + !!req->r_dentry_drop + !!req->r_old_inode_drop + !!req->r_old_dentry_drop); @@ -2616,6 +2935,27 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, if (req->r_old_dentry_drop) len += pathlen2; + /* MClientRequest tail */ + + /* req->r_stamp */ + len += sizeof(struct ceph_timespec); + + /* gid list */ + len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); + + /* alternate name */ + len += sizeof(u32) + req->r_altname_len; + + /* fscrypt_auth */ + len += sizeof(u32); // fscrypt_auth + if (req->r_fscrypt_auth) + len += ceph_fscrypt_auth_len(req->r_fscrypt_auth); + + /* fscrypt_file */ + len += sizeof(u32); + if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) + len += sizeof(__le64); + msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); if (!msg) { msg = ERR_PTR(-ENOMEM); @@ -2624,33 +2964,40 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, msg->hdr.tid = cpu_to_le64(req->r_tid); + lhead = find_legacy_request_head(msg->front.iov_base, + session->s_con.peer_features); + /* - * The old ceph_mds_request_head didn't contain a version field, and + * The ceph_mds_request_head_legacy didn't contain a version field, and * one was added when we moved the message version from 3->4. */ if (legacy) { msg->hdr.version = cpu_to_le16(3); - head = msg->front.iov_base; - p = msg->front.iov_base + sizeof(*head); - } else { - struct ceph_mds_request_head *new_head = msg->front.iov_base; + p = msg->front.iov_base + sizeof(*lhead); + } else if (old_version) { + struct ceph_mds_request_head_old *ohead = msg->front.iov_base; msg->hdr.version = cpu_to_le16(4); - new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); - head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; - p = msg->front.iov_base + sizeof(*new_head); + ohead->version = cpu_to_le16(1); + p = msg->front.iov_base + sizeof(*ohead); + } else { + struct ceph_mds_request_head *nhead = msg->front.iov_base; + + msg->hdr.version = cpu_to_le16(6); + nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); + p = msg->front.iov_base + sizeof(*nhead); } end = msg->front.iov_base + msg->front.iov_len; - head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); - head->op = cpu_to_le32(req->r_op); - head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, - req->r_cred->fsuid)); - head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, - req->r_cred->fsgid)); - head->ino = cpu_to_le64(req->r_deleg_ino); - head->args = req->r_args; + lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); + lhead->op = cpu_to_le32(req->r_op); + lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, + req->r_cred->fsuid)); + lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, + req->r_cred->fsgid)); + lhead->ino = cpu_to_le64(req->r_deleg_ino); + lhead->args = req->r_args; ceph_encode_filepath(&p, end, ino1, path1); ceph_encode_filepath(&p, end, ino2, path2); @@ -2665,15 +3012,23 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, req->r_inode ? req->r_inode : d_inode(req->r_dentry), mds, req->r_inode_drop, req->r_inode_unless, req->r_op == CEPH_MDS_OP_READDIR); - if (req->r_dentry_drop) - releases += ceph_encode_dentry_release(&p, req->r_dentry, + if (req->r_dentry_drop) { + ret = ceph_encode_dentry_release(&p, req->r_dentry, req->r_parent, mds, req->r_dentry_drop, req->r_dentry_unless); - if (req->r_old_dentry_drop) - releases += ceph_encode_dentry_release(&p, req->r_old_dentry, + if (ret < 0) + goto out_err; + releases += ret; + } + if (req->r_old_dentry_drop) { + ret = ceph_encode_dentry_release(&p, req->r_old_dentry, req->r_old_dentry_dir, mds, req->r_old_dentry_drop, req->r_old_dentry_unless); + if (ret < 0) + goto out_err; + releases += ret; + } if (req->r_old_inode_drop) releases += ceph_encode_inode_release(&p, d_inode(req->r_old_dentry), @@ -2684,9 +3039,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, p = msg->front.iov_base + req->r_request_release_offset; } - head->num_releases = cpu_to_le16(releases); + lhead->num_releases = cpu_to_le16(releases); - encode_timestamp_and_gids(&p, req); + encode_mclientrequest_tail(&p, req); if (WARN_ON_ONCE(p > end)) { ceph_msg_put(msg); @@ -2715,6 +3070,10 @@ out_free1: ceph_mdsc_free_path((char *)path1, pathlen1); out: return msg; +out_err: + ceph_msg_put(msg); + msg = ERR_PTR(ret); + goto out_free2; } /* @@ -2731,18 +3090,6 @@ static void complete_request(struct ceph_mds_client *mdsc, complete_all(&req->r_completion); } -static struct ceph_mds_request_head_old * -find_old_request_head(void *p, u64 features) -{ - bool legacy = !(features & CEPH_FEATURE_FS_BTIME); - struct ceph_mds_request_head *new_head; - - if (legacy) - return (struct ceph_mds_request_head_old *)p; - new_head = (struct ceph_mds_request_head *)p; - return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; -} - /* * called under mdsc->mutex */ @@ -2752,29 +3099,28 @@ static int __prepare_send_request(struct ceph_mds_session *session, { int mds = session->s_mds; struct ceph_mds_client *mdsc = session->s_mdsc; - struct ceph_mds_request_head_old *rhead; + struct ceph_mds_request_head_legacy *lhead; + struct ceph_mds_request_head *nhead; struct ceph_msg *msg; - int flags = 0, max_retry; + int flags = 0, old_max_retry; + bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, + &session->s_features); /* - * The type of 'r_attempts' in kernel 'ceph_mds_request' - * is 'int', while in 'ceph_mds_request_head' the type of - * 'num_retry' is '__u8'. So in case the request retries - * exceeding 256 times, the MDS will receive a incorrect - * retry seq. - * - * In this case it's ususally a bug in MDS and continue - * retrying the request makes no sense. - * - * In future this could be fixed in ceph code, so avoid - * using the hardcode here. + * Avoid inifinite retrying after overflow. The client will + * increase the retry count and if the MDS is old version, + * so we limit to retry at most 256 times. */ - max_retry = sizeof_field(struct ceph_mds_request_head, num_retry); - max_retry = 1 << (max_retry * BITS_PER_BYTE); - if (req->r_attempts >= max_retry) { - pr_warn_ratelimited("%s request tid %llu seq overflow\n", - __func__, req->r_tid); - return -EMULTIHOP; + if (req->r_attempts) { + old_max_retry = sizeof_field(struct ceph_mds_request_head_old, + num_retry); + old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE); + if ((old_version && req->r_attempts >= old_max_retry) || + ((uint32_t)req->r_attempts >= U32_MAX)) { + pr_warn_ratelimited("%s request tid %llu seq overflow\n", + __func__, req->r_tid); + return -EMULTIHOP; + } } req->r_attempts++; @@ -2800,23 +3146,27 @@ static int __prepare_send_request(struct ceph_mds_session *session, * d_move mangles the src name. */ msg = req->r_request; - rhead = find_old_request_head(msg->front.iov_base, - session->s_con.peer_features); + lhead = find_legacy_request_head(msg->front.iov_base, + session->s_con.peer_features); - flags = le32_to_cpu(rhead->flags); + flags = le32_to_cpu(lhead->flags); flags |= CEPH_MDS_FLAG_REPLAY; - rhead->flags = cpu_to_le32(flags); + lhead->flags = cpu_to_le32(flags); if (req->r_target_inode) - rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); + lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); - rhead->num_retry = req->r_attempts - 1; + lhead->num_retry = req->r_attempts - 1; + if (!old_version) { + nhead = (struct ceph_mds_request_head*)msg->front.iov_base; + nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); + } /* remove cap/dentry releases from message */ - rhead->num_releases = 0; + lhead->num_releases = 0; p = msg->front.iov_base + req->r_request_release_offset; - encode_timestamp_and_gids(&p, req); + encode_mclientrequest_tail(&p, req); msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); @@ -2834,18 +3184,23 @@ static int __prepare_send_request(struct ceph_mds_session *session, } req->r_request = msg; - rhead = find_old_request_head(msg->front.iov_base, - session->s_con.peer_features); - rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); + lhead = find_legacy_request_head(msg->front.iov_base, + session->s_con.peer_features); + lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) flags |= CEPH_MDS_FLAG_REPLAY; if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) flags |= CEPH_MDS_FLAG_ASYNC; if (req->r_parent) flags |= CEPH_MDS_FLAG_WANT_DENTRY; - rhead->flags = cpu_to_le32(flags); - rhead->num_fwd = req->r_num_fwd; - rhead->num_retry = req->r_attempts - 1; + lhead->flags = cpu_to_le32(flags); + lhead->num_fwd = req->r_num_fwd; + lhead->num_retry = req->r_attempts - 1; + if (!old_version) { + nhead = (struct ceph_mds_request_head*)msg->front.iov_base; + nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd); + nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); + } dout(" r_parent = %p\n", req->r_parent); return 0; @@ -3348,22 +3703,35 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) } dout("handle_reply tid %lld result %d\n", tid, result); - rinfo = &req->r_reply_info; if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) - err = parse_reply_info(session, msg, rinfo, (u64)-1); + err = parse_reply_info(session, msg, req, (u64)-1); else - err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); + err = parse_reply_info(session, msg, req, + session->s_con.peer_features); mutex_unlock(&mdsc->mutex); /* Must find target inode outside of mutexes to avoid deadlocks */ + rinfo = &req->r_reply_info; if ((err >= 0) && rinfo->head->is_target) { - struct inode *in; + struct inode *in = xchg(&req->r_new_inode, NULL); struct ceph_vino tvino = { .ino = le64_to_cpu(rinfo->targeti.in->ino), .snap = le64_to_cpu(rinfo->targeti.in->snapid) }; - in = ceph_get_inode(mdsc->fsc->sb, tvino); + /* + * If we ended up opening an existing inode, discard + * r_new_inode + */ + if (req->r_op == CEPH_MDS_OP_CREATE && + !req->r_reply_info.has_create_ino) { + /* This should never happen on an async create */ + WARN_ON_ONCE(req->r_deleg_ino); + iput(in); + in = NULL; + } + + in = ceph_get_inode(mdsc->fsc->sb, tvino, in); if (IS_ERR(in)) { err = PTR_ERR(in); mutex_lock(&session->s_mutex); @@ -3406,7 +3774,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) if (err == 0) { if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || req->r_op == CEPH_MDS_OP_LSSNAP)) - ceph_readdir_prepopulate(req, req->r_session); + err = ceph_readdir_prepopulate(req, req->r_session); } current->journal_info = NULL; mutex_unlock(&req->r_fill_mutex); @@ -3491,33 +3859,21 @@ static void handle_forward(struct ceph_mds_client *mdsc, if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { dout("forward tid %llu aborted, unregistering\n", tid); __unregister_request(mdsc, req); - } else if (fwd_seq <= req->r_num_fwd) { + } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) { /* - * The type of 'num_fwd' in ceph 'MClientRequestForward' - * is 'int32_t', while in 'ceph_mds_request_head' the - * type is '__u8'. So in case the request bounces between - * MDSes exceeding 256 times, the client will get stuck. - * - * In this case it's ususally a bug in MDS and continue - * bouncing the request makes no sense. + * Avoid inifinite retrying after overflow. * - * In future this could be fixed in ceph code, so avoid - * using the hardcode here. + * The MDS will increase the fwd count and in client side + * if the num_fwd is less than the one saved in request + * that means the MDS is an old version and overflowed of + * 8 bits. */ - int max = sizeof_field(struct ceph_mds_request_head, num_fwd); - max = 1 << (max * BITS_PER_BYTE); - if (req->r_num_fwd >= max) { - mutex_lock(&req->r_fill_mutex); - req->r_err = -EMULTIHOP; - set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); - mutex_unlock(&req->r_fill_mutex); - aborted = true; - pr_warn_ratelimited("forward tid %llu seq overflow\n", - tid); - } else { - dout("forward tid %llu to mds%d - old seq %d <= %d\n", - tid, next_mds, req->r_num_fwd, fwd_seq); - } + mutex_lock(&req->r_fill_mutex); + req->r_err = -EMULTIHOP; + set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); + mutex_unlock(&req->r_fill_mutex); + aborted = true; + pr_warn_ratelimited("forward tid %llu seq overflow\n", tid); } else { /* resend. forward race not possible; mds would drop */ dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); @@ -4550,6 +4906,9 @@ static void handle_lease(struct ceph_mds_client *mdsc, dout("handle_lease from mds%d\n", mds); + if (!ceph_inc_mds_stopping_blocker(mdsc, session)) + return; + /* decode */ if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) goto bad; @@ -4568,8 +4927,6 @@ static void handle_lease(struct ceph_mds_client *mdsc, dname.len, dname.name); mutex_lock(&session->s_mutex); - inc_session_sequence(session); - if (!inode) { dout("handle_lease no inode %llx\n", vino.ino); goto release; @@ -4631,9 +4988,13 @@ release: out: mutex_unlock(&session->s_mutex); iput(inode); + + ceph_dec_mds_stopping_blocker(mdsc); return; bad: + ceph_dec_mds_stopping_blocker(mdsc); + pr_err("corrupt lease message\n"); ceph_msg_dump(msg); } @@ -4829,6 +5190,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) } init_completion(&mdsc->safe_umount_waiters); + spin_lock_init(&mdsc->stopping_lock); + atomic_set(&mdsc->stopping_blockers, 0); + init_completion(&mdsc->stopping_waiter); init_waitqueue_head(&mdsc->session_close_wq); INIT_LIST_HEAD(&mdsc->waiting_for_map); mdsc->quotarealms_inodes = RB_ROOT; |