diff options
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r-- | fs/ceph/mds_client.c | 768 |
1 files changed, 624 insertions, 144 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 163fc74bf221..9049c2a3e972 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -20,6 +20,8 @@ #include <linux/ceph/auth.h> #include <linux/ceph/debugfs.h> +#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) + /* * A cluster of MDS (metadata server) daemons is responsible for * managing the file system namespace (the directory hierarchy and @@ -46,13 +48,17 @@ */ struct ceph_reconnect_state { - int nr_caps; + struct ceph_mds_session *session; + int nr_caps, nr_realms; struct ceph_pagelist *pagelist; unsigned msg_version; + bool allow_multi; }; static void __wake_requests(struct ceph_mds_client *mdsc, struct list_head *head); +static void ceph_cap_release_work(struct work_struct *work); +static void ceph_cap_reclaim_work(struct work_struct *work); static const struct ceph_connection_operations mds_con_ops; @@ -61,6 +67,29 @@ static const struct ceph_connection_operations mds_con_ops; * mds reply parsing */ +static int parse_reply_info_quota(void **p, void *end, + struct ceph_mds_reply_info_in *info) +{ + u8 struct_v, struct_compat; + u32 struct_len; + + ceph_decode_8_safe(p, end, struct_v, bad); + ceph_decode_8_safe(p, end, struct_compat, bad); + /* struct_v is expected to be >= 1. we only + * understand encoding with struct_compat == 1. */ + if (!struct_v || struct_compat != 1) + goto bad; + ceph_decode_32_safe(p, end, struct_len, bad); + ceph_decode_need(p, end, struct_len, bad); + end = *p + struct_len; + ceph_decode_64_safe(p, end, info->max_bytes, bad); + ceph_decode_64_safe(p, end, info->max_files, bad); + *p = end; + return 0; +bad: + return -EIO; +} + /* * parse individual inode info */ @@ -68,8 +97,24 @@ static int parse_reply_info_in(void **p, void *end, struct ceph_mds_reply_info_in *info, u64 features) { - int err = -EIO; + int err = 0; + u8 struct_v = 0; + if (features == (u64)-1) { + u32 struct_len; + u8 struct_compat; + ceph_decode_8_safe(p, end, struct_v, bad); + ceph_decode_8_safe(p, end, struct_compat, bad); + /* struct_v is expected to be >= 1. we only understand + * encoding with struct_compat == 1. */ + if (!struct_v || struct_compat != 1) + goto bad; + ceph_decode_32_safe(p, end, struct_len, bad); + ceph_decode_need(p, end, struct_len, bad); + end = *p + struct_len; + } + + ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); info->in = *p; *p += sizeof(struct ceph_mds_reply_inode) + sizeof(*info->in->fragtree.splits) * @@ -87,49 +132,136 @@ static int parse_reply_info_in(void **p, void *end, info->xattr_data = *p; *p += info->xattr_len; - if (features & CEPH_FEATURE_MDS_INLINE_DATA) { + if (features == (u64)-1) { + /* inline data */ ceph_decode_64_safe(p, end, info->inline_version, bad); ceph_decode_32_safe(p, end, info->inline_len, bad); ceph_decode_need(p, end, info->inline_len, bad); info->inline_data = *p; *p += info->inline_len; - } else - info->inline_version = CEPH_INLINE_NONE; + /* quota */ + err = parse_reply_info_quota(p, end, info); + if (err < 0) + goto out_bad; + /* pool namespace */ + ceph_decode_32_safe(p, end, info->pool_ns_len, bad); + if (info->pool_ns_len > 0) { + ceph_decode_need(p, end, info->pool_ns_len, bad); + info->pool_ns_data = *p; + *p += info->pool_ns_len; + } + /* btime, change_attr */ + { + struct ceph_timespec btime; + u64 change_attr; + ceph_decode_need(p, end, sizeof(btime), bad); + ceph_decode_copy(p, &btime, sizeof(btime)); + ceph_decode_64_safe(p, end, change_attr, bad); + } + + /* dir pin */ + if (struct_v >= 2) { + ceph_decode_32_safe(p, end, info->dir_pin, bad); + } else { + info->dir_pin = -ENODATA; + } + + *p = end; + } else { + if (features & CEPH_FEATURE_MDS_INLINE_DATA) { + ceph_decode_64_safe(p, end, info->inline_version, bad); + ceph_decode_32_safe(p, end, info->inline_len, bad); + ceph_decode_need(p, end, info->inline_len, bad); + info->inline_data = *p; + *p += info->inline_len; + } else + info->inline_version = CEPH_INLINE_NONE; + + if (features & CEPH_FEATURE_MDS_QUOTA) { + err = parse_reply_info_quota(p, end, info); + if (err < 0) + goto out_bad; + } else { + info->max_bytes = 0; + info->max_files = 0; + } + + info->pool_ns_len = 0; + info->pool_ns_data = NULL; + if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { + ceph_decode_32_safe(p, end, info->pool_ns_len, bad); + if (info->pool_ns_len > 0) { + ceph_decode_need(p, end, info->pool_ns_len, bad); + info->pool_ns_data = *p; + *p += info->pool_ns_len; + } + } + + info->dir_pin = -ENODATA; + } + return 0; +bad: + err = -EIO; +out_bad: + return err; +} - if (features & CEPH_FEATURE_MDS_QUOTA) { +static int parse_reply_info_dir(void **p, void *end, + struct ceph_mds_reply_dirfrag **dirfrag, + u64 features) +{ + if (features == (u64)-1) { u8 struct_v, struct_compat; u32 struct_len; - - /* - * both struct_v and struct_compat are expected to be >= 1 - */ ceph_decode_8_safe(p, end, struct_v, bad); ceph_decode_8_safe(p, end, struct_compat, bad); - if (!struct_v || !struct_compat) + /* struct_v is expected to be >= 1. we only understand + * encoding whose struct_compat == 1. */ + if (!struct_v || struct_compat != 1) goto bad; ceph_decode_32_safe(p, end, struct_len, bad); ceph_decode_need(p, end, struct_len, bad); - ceph_decode_64_safe(p, end, info->max_bytes, bad); - ceph_decode_64_safe(p, end, info->max_files, bad); - } else { - info->max_bytes = 0; - info->max_files = 0; + end = *p + struct_len; } - info->pool_ns_len = 0; - info->pool_ns_data = NULL; - if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { - ceph_decode_32_safe(p, end, info->pool_ns_len, bad); - if (info->pool_ns_len > 0) { - ceph_decode_need(p, end, info->pool_ns_len, bad); - info->pool_ns_data = *p; - *p += info->pool_ns_len; - } + ceph_decode_need(p, end, sizeof(**dirfrag), bad); + *dirfrag = *p; + *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); + if (unlikely(*p > end)) + goto bad; + if (features == (u64)-1) + *p = end; + return 0; +bad: + return -EIO; +} + +static int parse_reply_info_lease(void **p, void *end, + struct ceph_mds_reply_lease **lease, + u64 features) +{ + if (features == (u64)-1) { + u8 struct_v, struct_compat; + u32 struct_len; + ceph_decode_8_safe(p, end, struct_v, bad); + ceph_decode_8_safe(p, end, struct_compat, bad); + /* struct_v is expected to be >= 1. we only understand + * encoding whose struct_compat == 1. */ + if (!struct_v || struct_compat != 1) + goto bad; + ceph_decode_32_safe(p, end, struct_len, bad); + ceph_decode_need(p, end, struct_len, bad); + end = *p + struct_len; } + ceph_decode_need(p, end, sizeof(**lease), bad); + *lease = *p; + *p += sizeof(**lease); + if (features == (u64)-1) + *p = end; return 0; bad: - return err; + return -EIO; } /* @@ -147,20 +279,18 @@ static int parse_reply_info_trace(void **p, void *end, if (err < 0) goto out_bad; - if (unlikely(*p + sizeof(*info->dirfrag) > end)) - goto bad; - info->dirfrag = *p; - *p += sizeof(*info->dirfrag) + - sizeof(u32)*le32_to_cpu(info->dirfrag->ndist); - if (unlikely(*p > end)) - goto bad; + err = parse_reply_info_dir(p, end, &info->dirfrag, features); + if (err < 0) + goto out_bad; ceph_decode_32_safe(p, end, info->dname_len, bad); ceph_decode_need(p, end, info->dname_len, bad); info->dname = *p; *p += info->dname_len; - info->dlease = *p; - *p += sizeof(*info->dlease); + + err = parse_reply_info_lease(p, end, &info->dlease, features); + if (err < 0) + goto out_bad; } if (info->head->is_target) { @@ -183,20 +313,16 @@ out_bad: /* * parse readdir results */ -static int parse_reply_info_dir(void **p, void *end, +static int parse_reply_info_readdir(void **p, void *end, struct ceph_mds_reply_info_parsed *info, u64 features) { u32 num, i = 0; int err; - info->dir_dir = *p; - if (*p + sizeof(*info->dir_dir) > end) - goto bad; - *p += sizeof(*info->dir_dir) + - sizeof(u32)*le32_to_cpu(info->dir_dir->ndist); - if (*p > end) - goto bad; + err = parse_reply_info_dir(p, end, &info->dir_dir, features); + if (err < 0) + goto out_bad; ceph_decode_need(p, end, sizeof(num) + 2, bad); num = ceph_decode_32(p); @@ -222,15 +348,16 @@ static int parse_reply_info_dir(void **p, void *end, while (num) { struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; /* dentry */ - ceph_decode_need(p, end, sizeof(u32)*2, bad); - rde->name_len = ceph_decode_32(p); + ceph_decode_32_safe(p, end, rde->name_len, bad); ceph_decode_need(p, end, rde->name_len, bad); rde->name = *p; *p += rde->name_len; dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); - rde->lease = *p; - *p += sizeof(struct ceph_mds_reply_lease); + /* dentry lease */ + err = parse_reply_info_lease(p, end, &rde->lease, features); + if (err) + goto out_bad; /* inode */ err = parse_reply_info_in(p, end, &rde->inode, features); if (err < 0) @@ -281,7 +408,8 @@ static int parse_reply_info_create(void **p, void *end, struct ceph_mds_reply_info_parsed *info, u64 features) { - if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { + if (features == (u64)-1 || + (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { if (*p == end) { info->has_create_ino = false; } else { @@ -310,7 +438,7 @@ static int parse_reply_info_extra(void **p, void *end, if (op == CEPH_MDS_OP_GETFILELOCK) return parse_reply_info_filelock(p, end, info, features); else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) - return parse_reply_info_dir(p, end, info, features); + return parse_reply_info_readdir(p, end, info, features); else if (op == CEPH_MDS_OP_CREATE) return parse_reply_info_create(p, end, info, features); else @@ -494,7 +622,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); spin_lock_init(&s->s_gen_ttl_lock); - s->s_cap_gen = 0; + s->s_cap_gen = 1; s->s_cap_ttl = jiffies - 1; spin_lock_init(&s->s_cap_lock); @@ -510,6 +638,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, s->s_cap_reconnect = 0; s->s_cap_iterator = NULL; INIT_LIST_HEAD(&s->s_cap_releases); + INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); + INIT_LIST_HEAD(&s->s_cap_flushing); mdsc->sessions[mds] = s; @@ -535,6 +665,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc, dout("__unregister_session mds%d %p\n", s->s_mds, s); BUG_ON(mdsc->sessions[s->s_mds] != s); mdsc->sessions[s->s_mds] = NULL; + s->s_state = 0; ceph_con_close(&s->s_con); ceph_put_mds_session(s); atomic_dec(&mdsc->num_sessions); @@ -1197,13 +1328,10 @@ static int iterate_session_caps(struct ceph_mds_session *session, cap->session = NULL; list_del_init(&cap->session_caps); session->s_nr_caps--; - if (cap->queue_release) { - list_add_tail(&cap->session_caps, - &session->s_cap_releases); - session->s_num_cap_releases++; - } else { + if (cap->queue_release) + __ceph_queue_cap_release(session, cap); + else old_cap = cap; /* put_cap it w/o locks held */ - } } if (ret < 0) goto out; @@ -1286,6 +1414,15 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); ci->i_prealloc_cap_flush = NULL; } + + if (drop && + ci->i_wrbuffer_ref_head == 0 && + ci->i_wr_ref == 0 && + ci->i_dirty_caps == 0 && + ci->i_flushing_caps == 0) { + ceph_put_snap_context(ci->i_head_snapc); + ci->i_head_snapc = NULL; + } } spin_unlock(&ci->i_ceph_lock); while (!list_empty(&to_remove)) { @@ -1638,7 +1775,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc, session->s_trim_caps = 0; } - ceph_send_cap_releases(mdsc, session); + ceph_flush_cap_releases(mdsc, session); return 0; } @@ -1681,8 +1818,8 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, /* * called under s_mutex */ -void ceph_send_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) +static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) { struct ceph_msg *msg = NULL; struct ceph_mds_cap_release *head; @@ -1774,6 +1911,81 @@ out_err: spin_unlock(&session->s_cap_lock); } +static void ceph_cap_release_work(struct work_struct *work) +{ + struct ceph_mds_session *session = + container_of(work, struct ceph_mds_session, s_cap_release_work); + + mutex_lock(&session->s_mutex); + if (session->s_state == CEPH_MDS_SESSION_OPEN || + session->s_state == CEPH_MDS_SESSION_HUNG) + ceph_send_cap_releases(session->s_mdsc, session); + mutex_unlock(&session->s_mutex); + ceph_put_mds_session(session); +} + +void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + if (mdsc->stopping) + return; + + get_session(session); + if (queue_work(mdsc->fsc->cap_wq, + &session->s_cap_release_work)) { + dout("cap release work queued\n"); + } else { + ceph_put_mds_session(session); + dout("failed to queue cap release work\n"); + } +} + +/* + * caller holds session->s_cap_lock + */ +void __ceph_queue_cap_release(struct ceph_mds_session *session, + struct ceph_cap *cap) +{ + list_add_tail(&cap->session_caps, &session->s_cap_releases); + session->s_num_cap_releases++; + + if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) + ceph_flush_cap_releases(session->s_mdsc, session); +} + +static void ceph_cap_reclaim_work(struct work_struct *work) +{ + struct ceph_mds_client *mdsc = + container_of(work, struct ceph_mds_client, cap_reclaim_work); + int ret = ceph_trim_dentries(mdsc); + if (ret == -EAGAIN) + ceph_queue_cap_reclaim_work(mdsc); +} + +void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) +{ + if (mdsc->stopping) + return; + + if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { + dout("caps reclaim work queued\n"); + } else { + dout("failed to queue caps release work\n"); + } +} + +void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) +{ + int val; + if (!nr) + return; + val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); + if (!(val % CEPH_CAPS_PER_RELEASE)) { + atomic_set(&mdsc->cap_reclaim_pending, 0); + ceph_queue_cap_reclaim_work(mdsc); + } +} + /* * requests */ @@ -1958,10 +2170,39 @@ retry: return path; } +/* Duplicate the dentry->d_name.name safely */ +static int clone_dentry_name(struct dentry *dentry, const char **ppath, + int *ppathlen) +{ + u32 len; + char *name; + +retry: + len = READ_ONCE(dentry->d_name.len); + name = kmalloc(len + 1, GFP_NOFS); + if (!name) + return -ENOMEM; + + spin_lock(&dentry->d_lock); + if (dentry->d_name.len != len) { + spin_unlock(&dentry->d_lock); + kfree(name); + goto retry; + } + memcpy(name, dentry->d_name.name, len); + spin_unlock(&dentry->d_lock); + + name[len] = '\0'; + *ppath = name; + *ppathlen = len; + return 0; +} + static int build_dentry_path(struct dentry *dentry, struct inode *dir, const char **ppath, int *ppathlen, u64 *pino, - int *pfreepath) + bool *pfreepath, bool parent_locked) { + int ret; char *path; rcu_read_lock(); @@ -1970,8 +2211,15 @@ static int build_dentry_path(struct dentry *dentry, struct inode *dir, if (dir && ceph_snap(dir) == CEPH_NOSNAP) { *pino = ceph_ino(dir); rcu_read_unlock(); - *ppath = dentry->d_name.name; - *ppathlen = dentry->d_name.len; + if (parent_locked) { + *ppath = dentry->d_name.name; + *ppathlen = dentry->d_name.len; + } else { + ret = clone_dentry_name(dentry, ppath, ppathlen); + if (ret) + return ret; + *pfreepath = true; + } return 0; } rcu_read_unlock(); @@ -1979,13 +2227,13 @@ static int build_dentry_path(struct dentry *dentry, struct inode *dir, if (IS_ERR(path)) return PTR_ERR(path); *ppath = path; - *pfreepath = 1; + *pfreepath = true; return 0; } static int build_inode_path(struct inode *inode, const char **ppath, int *ppathlen, u64 *pino, - int *pfreepath) + bool *pfreepath) { struct dentry *dentry; char *path; @@ -2001,7 +2249,7 @@ static int build_inode_path(struct inode *inode, if (IS_ERR(path)) return PTR_ERR(path); *ppath = path; - *pfreepath = 1; + *pfreepath = true; return 0; } @@ -2012,7 +2260,7 @@ static int build_inode_path(struct inode *inode, static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, struct inode *rdiri, const char *rpath, u64 rino, const char **ppath, int *pathlen, - u64 *ino, int *freepath) + u64 *ino, bool *freepath, bool parent_locked) { int r = 0; @@ -2022,7 +2270,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, ceph_snap(rinode)); } else if (rdentry) { r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, - freepath); + freepath, parent_locked); dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, *ppath); } else if (rpath || rino) { @@ -2048,7 +2296,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, const char *path2 = NULL; u64 ino1 = 0, ino2 = 0; int pathlen1 = 0, pathlen2 = 0; - int freepath1 = 0, freepath2 = 0; + bool freepath1 = false, freepath2 = false; int len; u16 releases; void *p, *end; @@ -2056,16 +2304,19 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, ret = set_request_path_attr(req->r_inode, req->r_dentry, req->r_parent, req->r_path1, req->r_ino1.ino, - &path1, &pathlen1, &ino1, &freepath1); + &path1, &pathlen1, &ino1, &freepath1, + test_bit(CEPH_MDS_R_PARENT_LOCKED, + &req->r_req_flags)); if (ret < 0) { msg = ERR_PTR(ret); goto out; } + /* If r_old_dentry is set, then assume that its parent is locked */ ret = set_request_path_attr(NULL, req->r_old_dentry, req->r_old_dentry_dir, req->r_path2, req->r_ino2.ino, - &path2, &pathlen2, &ino2, &freepath2); + &path2, &pathlen2, &ino2, &freepath2, true); if (ret < 0) { msg = ERR_PTR(ret); goto out_free1; @@ -2653,7 +2904,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) dout("handle_reply tid %lld result %d\n", tid, result); rinfo = &req->r_reply_info; - err = parse_reply_info(msg, rinfo, session->s_con.peer_features); + if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) + err = parse_reply_info(msg, rinfo, (u64)-1); + else + err = parse_reply_info(msg, rinfo, session->s_con.peer_features); mutex_unlock(&mdsc->mutex); mutex_lock(&session->s_mutex); @@ -2684,7 +2938,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || req->r_op == CEPH_MDS_OP_LSSNAP)) ceph_readdir_prepopulate(req, req->r_session); - ceph_unreserve_caps(mdsc, &req->r_caps_reservation); } current->journal_info = NULL; mutex_unlock(&req->r_fill_mutex); @@ -2693,12 +2946,18 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) if (realm) ceph_put_snap_realm(mdsc, realm); - if (err == 0 && req->r_target_inode && - test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { - struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); - spin_lock(&ci->i_unsafe_lock); - list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops); - spin_unlock(&ci->i_unsafe_lock); + if (err == 0) { + if (req->r_target_inode && + test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { + struct ceph_inode_info *ci = + ceph_inode(req->r_target_inode); + spin_lock(&ci->i_unsafe_lock); + list_add_tail(&req->r_unsafe_target_item, + &ci->i_unsafe_iops); + spin_unlock(&ci->i_unsafe_lock); + } + + ceph_unreserve_caps(mdsc, &req->r_caps_reservation); } out_err: mutex_lock(&mdsc->mutex); @@ -2777,6 +3036,25 @@ bad: pr_err("mdsc_handle_forward decode error err=%d\n", err); } +static int __decode_and_drop_session_metadata(void **p, void *end) +{ + /* map<string,string> */ + u32 n; + ceph_decode_32_safe(p, end, n, bad); + while (n-- > 0) { + u32 len; + ceph_decode_32_safe(p, end, len, bad); + ceph_decode_need(p, end, len, bad); + *p += len; + ceph_decode_32_safe(p, end, len, bad); + ceph_decode_need(p, end, len, bad); + *p += len; + } + return 0; +bad: + return -1; +} + /* * handle a mds session control message */ @@ -2784,18 +3062,36 @@ static void handle_session(struct ceph_mds_session *session, struct ceph_msg *msg) { struct ceph_mds_client *mdsc = session->s_mdsc; + int mds = session->s_mds; + int msg_version = le16_to_cpu(msg->hdr.version); + void *p = msg->front.iov_base; + void *end = p + msg->front.iov_len; + struct ceph_mds_session_head *h; u32 op; u64 seq; - int mds = session->s_mds; - struct ceph_mds_session_head *h = msg->front.iov_base; + unsigned long features = 0; int wake = 0; /* decode */ - if (msg->front.iov_len < sizeof(*h)) - goto bad; + ceph_decode_need(&p, end, sizeof(*h), bad); + h = p; + p += sizeof(*h); + op = le32_to_cpu(h->op); seq = le64_to_cpu(h->seq); + if (msg_version >= 3) { + u32 len; + /* version >= 2, metadata */ + if (__decode_and_drop_session_metadata(&p, end) < 0) + goto bad; + /* version >= 3, feature bits */ + ceph_decode_32_safe(&p, end, len, bad); + ceph_decode_need(&p, end, len, bad); + memcpy(&features, p, min_t(size_t, len, sizeof(features))); + p += len; + } + mutex_lock(&mdsc->mutex); if (op == CEPH_SESSION_CLOSE) { get_session(session); @@ -2821,6 +3117,7 @@ static void handle_session(struct ceph_mds_session *session, if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) pr_info("mds%d reconnect success\n", session->s_mds); session->s_state = CEPH_MDS_SESSION_OPEN; + session->s_features = features; renewed_caps(mdsc, session, 0); wake = 1; if (mdsc->stopping) @@ -2947,6 +3244,82 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc, mutex_unlock(&mdsc->mutex); } +static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) +{ + struct ceph_msg *reply; + struct ceph_pagelist *_pagelist; + struct page *page; + __le32 *addr; + int err = -ENOMEM; + + if (!recon_state->allow_multi) + return -ENOSPC; + + /* can't handle message that contains both caps and realm */ + BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); + + /* pre-allocate new pagelist */ + _pagelist = ceph_pagelist_alloc(GFP_NOFS); + if (!_pagelist) + return -ENOMEM; + + reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); + if (!reply) + goto fail_msg; + + /* placeholder for nr_caps */ + err = ceph_pagelist_encode_32(_pagelist, 0); + if (err < 0) + goto fail; + + if (recon_state->nr_caps) { + /* currently encoding caps */ + err = ceph_pagelist_encode_32(recon_state->pagelist, 0); + if (err) + goto fail; + } else { + /* placeholder for nr_realms (currently encoding relams) */ + err = ceph_pagelist_encode_32(_pagelist, 0); + if (err < 0) + goto fail; + } + + err = ceph_pagelist_encode_8(recon_state->pagelist, 1); + if (err) + goto fail; + + page = list_first_entry(&recon_state->pagelist->head, struct page, lru); + addr = kmap_atomic(page); + if (recon_state->nr_caps) { + /* currently encoding caps */ + *addr = cpu_to_le32(recon_state->nr_caps); + } else { + /* currently encoding relams */ + *(addr + 1) = cpu_to_le32(recon_state->nr_realms); + } + kunmap_atomic(addr); + + reply->hdr.version = cpu_to_le16(5); + reply->hdr.compat_version = cpu_to_le16(4); + + reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); + ceph_msg_data_add_pagelist(reply, recon_state->pagelist); + + ceph_con_send(&recon_state->session->s_con, reply); + ceph_pagelist_release(recon_state->pagelist); + + recon_state->pagelist = _pagelist; + recon_state->nr_caps = 0; + recon_state->nr_realms = 0; + recon_state->msg_version = 5; + return 0; +fail: + ceph_msg_put(reply); +fail_msg: + ceph_pagelist_release(_pagelist); + return err; +} + /* * Encode information about a cap for a reconnect with the MDS. */ @@ -2966,9 +3339,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, dout(" adding %p ino %llx.%llx cap %p %lld %s\n", inode, ceph_vinop(inode), cap, cap->cap_id, ceph_cap_string(cap->issued)); - err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); - if (err) - return err; spin_lock(&ci->i_ceph_lock); cap->seq = 0; /* reset cap seq */ @@ -3008,7 +3378,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, if (recon_state->msg_version >= 2) { int num_fcntl_locks, num_flock_locks; struct ceph_filelock *flocks = NULL; - size_t struct_len, total_len = 0; + size_t struct_len, total_len = sizeof(u64); u8 struct_v = 0; encode_again: @@ -3043,7 +3413,7 @@ encode_again: if (recon_state->msg_version >= 3) { /* version, compat_version and struct_len */ - total_len = 2 * sizeof(u8) + sizeof(u32); + total_len += 2 * sizeof(u8) + sizeof(u32); struct_v = 2; } /* @@ -3060,12 +3430,19 @@ encode_again: struct_len += sizeof(u64); /* snap_follows */ total_len += struct_len; - err = ceph_pagelist_reserve(pagelist, total_len); - if (err) { - kfree(flocks); - goto out_err; + + if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { + err = send_reconnect_partial(recon_state); + if (err) + goto out_freeflocks; + pagelist = recon_state->pagelist; } + err = ceph_pagelist_reserve(pagelist, total_len); + if (err) + goto out_freeflocks; + + ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); if (recon_state->msg_version >= 3) { ceph_pagelist_encode_8(pagelist, struct_v); ceph_pagelist_encode_8(pagelist, 1); @@ -3077,7 +3454,7 @@ encode_again: num_fcntl_locks, num_flock_locks); if (struct_v >= 2) ceph_pagelist_encode_64(pagelist, snap_follows); - +out_freeflocks: kfree(flocks); } else { u64 pathbase = 0; @@ -3098,20 +3475,81 @@ encode_again: } err = ceph_pagelist_reserve(pagelist, - pathlen + sizeof(u32) + sizeof(rec.v1)); + sizeof(u64) + sizeof(u32) + + pathlen + sizeof(rec.v1)); if (err) { - kfree(path); - goto out_err; + goto out_freepath; } + ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); ceph_pagelist_encode_string(pagelist, path, pathlen); ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); - +out_freepath: kfree(path); } - recon_state->nr_caps++; out_err: + if (err >= 0) + recon_state->nr_caps++; + return err; +} + +static int encode_snap_realms(struct ceph_mds_client *mdsc, + struct ceph_reconnect_state *recon_state) +{ + struct rb_node *p; + struct ceph_pagelist *pagelist = recon_state->pagelist; + int err = 0; + + if (recon_state->msg_version >= 4) { + err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); + if (err < 0) + goto fail; + } + + /* + * snaprealms. we provide mds with the ino, seq (version), and + * parent for all of our realms. If the mds has any newer info, + * it will tell us. + */ + for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { + struct ceph_snap_realm *realm = + rb_entry(p, struct ceph_snap_realm, node); + struct ceph_mds_snaprealm_reconnect sr_rec; + + if (recon_state->msg_version >= 4) { + size_t need = sizeof(u8) * 2 + sizeof(u32) + + sizeof(sr_rec); + + if (pagelist->length + need > RECONNECT_MAX_SIZE) { + err = send_reconnect_partial(recon_state); + if (err) + goto fail; + pagelist = recon_state->pagelist; + } + + err = ceph_pagelist_reserve(pagelist, need); + if (err) + goto fail; + + ceph_pagelist_encode_8(pagelist, 1); + ceph_pagelist_encode_8(pagelist, 1); + ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); + } + + dout(" adding snap realm %llx seq %lld parent %llx\n", + realm->ino, realm->seq, realm->parent_ino); + sr_rec.ino = cpu_to_le64(realm->ino); + sr_rec.seq = cpu_to_le64(realm->seq); + sr_rec.parent = cpu_to_le64(realm->parent_ino); + + err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); + if (err) + goto fail; + + recon_state->nr_realms++; + } +fail: return err; } @@ -3132,18 +3570,17 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { struct ceph_msg *reply; - struct rb_node *p; int mds = session->s_mds; int err = -ENOMEM; - int s_nr_caps; - struct ceph_pagelist *pagelist; - struct ceph_reconnect_state recon_state; + struct ceph_reconnect_state recon_state = { + .session = session, + }; LIST_HEAD(dispose); pr_info("mds%d reconnect start\n", mds); - pagelist = ceph_pagelist_alloc(GFP_NOFS); - if (!pagelist) + recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); + if (!recon_state.pagelist) goto fail_nopagelist; reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); @@ -3187,63 +3624,90 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, /* replay unsafe requests */ replay_unsafe_requests(mdsc, session); + ceph_early_kick_flushing_caps(mdsc, session); + down_read(&mdsc->snap_rwsem); - /* traverse this session's caps */ - s_nr_caps = session->s_nr_caps; - err = ceph_pagelist_encode_32(pagelist, s_nr_caps); + /* placeholder for nr_caps */ + err = ceph_pagelist_encode_32(recon_state.pagelist, 0); if (err) goto fail; - recon_state.nr_caps = 0; - recon_state.pagelist = pagelist; - if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) + if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { recon_state.msg_version = 3; - else + recon_state.allow_multi = true; + } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { + recon_state.msg_version = 3; + } else { recon_state.msg_version = 2; + } + /* trsaverse this session's caps */ err = iterate_session_caps(session, encode_caps_cb, &recon_state); - if (err < 0) - goto fail; spin_lock(&session->s_cap_lock); session->s_cap_reconnect = 0; spin_unlock(&session->s_cap_lock); - /* - * snaprealms. we provide mds with the ino, seq (version), and - * parent for all of our realms. If the mds has any newer info, - * it will tell us. - */ - for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { - struct ceph_snap_realm *realm = - rb_entry(p, struct ceph_snap_realm, node); - struct ceph_mds_snaprealm_reconnect sr_rec; + if (err < 0) + goto fail; - dout(" adding snap realm %llx seq %lld parent %llx\n", - realm->ino, realm->seq, realm->parent_ino); - sr_rec.ino = cpu_to_le64(realm->ino); - sr_rec.seq = cpu_to_le64(realm->seq); - sr_rec.parent = cpu_to_le64(realm->parent_ino); - err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); - if (err) - goto fail; + /* check if all realms can be encoded into current message */ + if (mdsc->num_snap_realms) { + size_t total_len = + recon_state.pagelist->length + + mdsc->num_snap_realms * + sizeof(struct ceph_mds_snaprealm_reconnect); + if (recon_state.msg_version >= 4) { + /* number of realms */ + total_len += sizeof(u32); + /* version, compat_version and struct_len */ + total_len += mdsc->num_snap_realms * + (2 * sizeof(u8) + sizeof(u32)); + } + if (total_len > RECONNECT_MAX_SIZE) { + if (!recon_state.allow_multi) { + err = -ENOSPC; + goto fail; + } + if (recon_state.nr_caps) { + err = send_reconnect_partial(&recon_state); + if (err) + goto fail; + } + recon_state.msg_version = 5; + } } - reply->hdr.version = cpu_to_le16(recon_state.msg_version); + err = encode_snap_realms(mdsc, &recon_state); + if (err < 0) + goto fail; - /* raced with cap release? */ - if (s_nr_caps != recon_state.nr_caps) { - struct page *page = list_first_entry(&pagelist->head, - struct page, lru); + if (recon_state.msg_version >= 5) { + err = ceph_pagelist_encode_8(recon_state.pagelist, 0); + if (err < 0) + goto fail; + } + + if (recon_state.nr_caps || recon_state.nr_realms) { + struct page *page = + list_first_entry(&recon_state.pagelist->head, + struct page, lru); __le32 *addr = kmap_atomic(page); - *addr = cpu_to_le32(recon_state.nr_caps); + if (recon_state.nr_caps) { + WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); + *addr = cpu_to_le32(recon_state.nr_caps); + } else if (recon_state.msg_version >= 4) { + *(addr + 1) = cpu_to_le32(recon_state.nr_realms); + } kunmap_atomic(addr); } - reply->hdr.data_len = cpu_to_le32(pagelist->length); - ceph_msg_data_add_pagelist(reply, pagelist); + reply->hdr.version = cpu_to_le16(recon_state.msg_version); + if (recon_state.msg_version >= 4) + reply->hdr.compat_version = cpu_to_le16(4); - ceph_early_kick_flushing_caps(mdsc, session); + reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); + ceph_msg_data_add_pagelist(reply, recon_state.pagelist); ceph_con_send(&session->s_con, reply); @@ -3254,7 +3718,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, mutex_unlock(&mdsc->mutex); up_read(&mdsc->snap_rwsem); - ceph_pagelist_release(pagelist); + ceph_pagelist_release(recon_state.pagelist); return; fail: @@ -3262,7 +3726,7 @@ fail: up_read(&mdsc->snap_rwsem); mutex_unlock(&session->s_mutex); fail_nomsg: - ceph_pagelist_release(pagelist); + ceph_pagelist_release(recon_state.pagelist); fail_nopagelist: pr_err("error %d preparing reconnect for mds%d\n", err, mds); return; @@ -3580,7 +4044,6 @@ static void delayed_work(struct work_struct *work) int renew_caps; dout("mdsc delayed_work\n"); - ceph_check_delayed_caps(mdsc); mutex_lock(&mdsc->mutex); renew_interval = mdsc->mdsmap->m_session_timeout >> 2; @@ -3628,6 +4091,12 @@ static void delayed_work(struct work_struct *work) } mutex_unlock(&mdsc->mutex); + ceph_check_delayed_caps(mdsc); + + ceph_queue_cap_reclaim_work(mdsc); + + ceph_trim_snapid_map(mdsc); + schedule_delayed(mdsc); } @@ -3660,6 +4129,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) init_rwsem(&mdsc->snap_rwsem); mdsc->snap_realms = RB_ROOT; INIT_LIST_HEAD(&mdsc->snap_empty); + mdsc->num_snap_realms = 0; spin_lock_init(&mdsc->snap_empty_lock); mdsc->last_tid = 0; mdsc->oldest_tid = 0; @@ -3677,11 +4147,19 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) mdsc->num_cap_flushing = 0; spin_lock_init(&mdsc->cap_dirty_lock); init_waitqueue_head(&mdsc->cap_flushing_wq); - spin_lock_init(&mdsc->dentry_lru_lock); - INIT_LIST_HEAD(&mdsc->dentry_lru); + INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); + atomic_set(&mdsc->cap_reclaim_pending, 0); + + spin_lock_init(&mdsc->dentry_list_lock); + INIT_LIST_HEAD(&mdsc->dentry_leases); + INIT_LIST_HEAD(&mdsc->dentry_dir_leases); ceph_caps_init(mdsc); - ceph_adjust_min_caps(mdsc, fsc->min_caps); + ceph_adjust_caps_max_min(mdsc, fsc->mount_options); + + spin_lock_init(&mdsc->snapid_map_lock); + mdsc->snapid_map_tree = RB_ROOT; + INIT_LIST_HEAD(&mdsc->snapid_map_lru); init_rwsem(&mdsc->pool_perm_rwsem); mdsc->pool_perm_tree = RB_ROOT; @@ -3876,8 +4354,10 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) WARN_ON(!list_empty(&mdsc->cap_delay_list)); mutex_unlock(&mdsc->mutex); + ceph_cleanup_snapid_map(mdsc); ceph_cleanup_empty_realms(mdsc); + cancel_work_sync(&mdsc->cap_reclaim_work); cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ dout("stopped\n"); |