diff options
Diffstat (limited to 'fs/ceph')
-rw-r--r-- | fs/ceph/caps.c | 72 | ||||
-rw-r--r-- | fs/ceph/debugfs.c | 27 | ||||
-rw-r--r-- | fs/ceph/dir.c | 461 | ||||
-rw-r--r-- | fs/ceph/file.c | 13 | ||||
-rw-r--r-- | fs/ceph/inode.c | 70 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 768 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 43 | ||||
-rw-r--r-- | fs/ceph/snap.c | 166 | ||||
-rw-r--r-- | fs/ceph/super.c | 21 | ||||
-rw-r--r-- | fs/ceph/super.h | 43 | ||||
-rw-r--r-- | fs/ceph/xattr.c | 20 |
11 files changed, 1375 insertions, 329 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index bba28a5034ba..36a8dc699448 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -148,11 +148,17 @@ void ceph_caps_finalize(struct ceph_mds_client *mdsc) spin_unlock(&mdsc->caps_list_lock); } -void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta) +void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc, + struct ceph_mount_options *fsopt) { spin_lock(&mdsc->caps_list_lock); - mdsc->caps_min_count += delta; - BUG_ON(mdsc->caps_min_count < 0); + mdsc->caps_min_count = fsopt->max_readdir; + if (mdsc->caps_min_count < 1024) + mdsc->caps_min_count = 1024; + mdsc->caps_use_max = fsopt->caps_max; + if (mdsc->caps_use_max > 0 && + mdsc->caps_use_max < mdsc->caps_min_count) + mdsc->caps_use_max = mdsc->caps_min_count; spin_unlock(&mdsc->caps_list_lock); } @@ -272,6 +278,7 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc, if (!err) { BUG_ON(have + alloc != need); ctx->count = need; + ctx->used = 0; } spin_lock(&mdsc->caps_list_lock); @@ -295,13 +302,24 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc, } void ceph_unreserve_caps(struct ceph_mds_client *mdsc, - struct ceph_cap_reservation *ctx) + struct ceph_cap_reservation *ctx) { + bool reclaim = false; + if (!ctx->count) + return; + dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count); spin_lock(&mdsc->caps_list_lock); __ceph_unreserve_caps(mdsc, ctx->count); ctx->count = 0; + + if (mdsc->caps_use_max > 0 && + mdsc->caps_use_count > mdsc->caps_use_max) + reclaim = true; spin_unlock(&mdsc->caps_list_lock); + + if (reclaim) + ceph_reclaim_caps_nr(mdsc, ctx->used); } struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc, @@ -346,6 +364,7 @@ struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc, BUG_ON(list_empty(&mdsc->caps_list)); ctx->count--; + ctx->used++; mdsc->caps_reserve_count--; mdsc->caps_use_count++; @@ -500,12 +519,12 @@ static void __insert_cap_node(struct ceph_inode_info *ci, static void __cap_set_timeouts(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci) { - struct ceph_mount_options *ma = mdsc->fsc->mount_options; + struct ceph_mount_options *opt = mdsc->fsc->mount_options; ci->i_hold_caps_min = round_jiffies(jiffies + - ma->caps_wanted_delay_min * HZ); + opt->caps_wanted_delay_min * HZ); ci->i_hold_caps_max = round_jiffies(jiffies + - ma->caps_wanted_delay_max * HZ); + opt->caps_wanted_delay_max * HZ); dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode, ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies); } @@ -657,6 +676,10 @@ void ceph_add_cap(struct inode *inode, session->s_nr_caps++; spin_unlock(&session->s_cap_lock); } else { + spin_lock(&session->s_cap_lock); + list_move_tail(&cap->session_caps, &session->s_caps); + spin_unlock(&session->s_cap_lock); + if (cap->cap_gen < session->s_cap_gen) cap->issued = cap->implemented = CEPH_CAP_PIN; @@ -1081,9 +1104,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { cap->queue_release = 1; if (removed) { - list_add_tail(&cap->session_caps, - &session->s_cap_releases); - session->s_num_cap_releases++; + __ceph_queue_cap_release(session, cap); removed = 0; } } else { @@ -1245,7 +1266,7 @@ static int send_cap_msg(struct cap_msg_args *arg) * Queue cap releases when an inode is dropped from our cache. Since * inode is about to be destroyed, there is no need for i_ceph_lock. */ -void ceph_queue_caps_release(struct inode *inode) +void __ceph_remove_caps(struct inode *inode) { struct ceph_inode_info *ci = ceph_inode(inode); struct rb_node *p; @@ -2393,6 +2414,12 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, if ((cap->issued & ci->i_flushing_caps) != ci->i_flushing_caps) { ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; + /* encode_caps_cb() also will reset these sequence + * numbers. make sure sequence numbers in cap flush + * message match later reconnect message */ + cap->seq = 0; + cap->issue_seq = 0; + cap->mseq = 0; __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); } else { @@ -3880,12 +3907,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, cap->seq = seq; cap->issue_seq = seq; spin_lock(&session->s_cap_lock); - list_add_tail(&cap->session_caps, - &session->s_cap_releases); - session->s_num_cap_releases++; + __ceph_queue_cap_release(session, cap); spin_unlock(&session->s_cap_lock); } - goto flush_cap_releases; + goto done; } /* these will work even if we don't have a cap yet */ @@ -3955,7 +3980,12 @@ void ceph_handle_caps(struct ceph_mds_session *session, ceph_cap_op_name(op)); } - goto done; +done: + mutex_unlock(&session->s_mutex); +done_unlocked: + iput(inode); + ceph_put_string(extra_info.pool_ns); + return; flush_cap_releases: /* @@ -3963,14 +3993,8 @@ flush_cap_releases: * along for the mds (who clearly thinks we still have this * cap). */ - ceph_send_cap_releases(mdsc, session); - -done: - mutex_unlock(&session->s_mutex); -done_unlocked: - iput(inode); - ceph_put_string(extra_info.pool_ns); - return; + ceph_flush_cap_releases(mdsc, session); + goto done; bad: pr_err("ceph_handle_caps: corrupt message\n"); diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index abdf98deeec4..98365e74cb4a 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -139,23 +139,6 @@ static int caps_show(struct seq_file *s, void *p) return 0; } -static int dentry_lru_show(struct seq_file *s, void *ptr) -{ - struct ceph_fs_client *fsc = s->private; - struct ceph_mds_client *mdsc = fsc->mdsc; - struct ceph_dentry_info *di; - - spin_lock(&mdsc->dentry_lru_lock); - list_for_each_entry(di, &mdsc->dentry_lru, lru) { - struct dentry *dentry = di->dentry; - seq_printf(s, "%p %p\t%pd\n", - di, dentry, dentry); - } - spin_unlock(&mdsc->dentry_lru_lock); - - return 0; -} - static int mds_sessions_show(struct seq_file *s, void *ptr) { struct ceph_fs_client *fsc = s->private; @@ -195,7 +178,6 @@ static int mds_sessions_show(struct seq_file *s, void *ptr) CEPH_DEFINE_SHOW_FUNC(mdsmap_show) CEPH_DEFINE_SHOW_FUNC(mdsc_show) CEPH_DEFINE_SHOW_FUNC(caps_show) -CEPH_DEFINE_SHOW_FUNC(dentry_lru_show) CEPH_DEFINE_SHOW_FUNC(mds_sessions_show) @@ -231,7 +213,6 @@ void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc) debugfs_remove(fsc->debugfs_mds_sessions); debugfs_remove(fsc->debugfs_caps); debugfs_remove(fsc->debugfs_mdsc); - debugfs_remove(fsc->debugfs_dentry_lru); } int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) @@ -291,14 +272,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) if (!fsc->debugfs_caps) goto out; - fsc->debugfs_dentry_lru = debugfs_create_file("dentry_lru", - 0400, - fsc->client->debugfs_dir, - fsc, - &dentry_lru_show_fops); - if (!fsc->debugfs_dentry_lru) - goto out; - return 0; out: diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 82928cea0209..0637149fb9f9 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -29,6 +29,9 @@ const struct dentry_operations ceph_dentry_ops; +static bool __dentry_lease_is_valid(struct ceph_dentry_info *di); +static int __dir_lease_try_check(const struct dentry *dentry); + /* * Initialize ceph dentry state. */ @@ -44,7 +47,7 @@ static int ceph_d_init(struct dentry *dentry) di->lease_session = NULL; di->time = jiffies; dentry->d_fsdata = di; - ceph_dentry_lru_add(dentry); + INIT_LIST_HEAD(&di->lease_list); return 0; } @@ -241,6 +244,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, goto out; } if (fpos_cmp(ctx->pos, di->offset) <= 0) { + __ceph_dentry_dir_lease_touch(di); emit_dentry = true; } spin_unlock(&dentry->d_lock); @@ -1125,13 +1129,277 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, } /* + * Move dentry to tail of mdsc->dentry_leases list when lease is updated. + * Leases at front of the list will expire first. (Assume all leases have + * similar duration) + * + * Called under dentry->d_lock. + */ +void __ceph_dentry_lease_touch(struct ceph_dentry_info *di) +{ + struct dentry *dn = di->dentry; + struct ceph_mds_client *mdsc; + + dout("dentry_lease_touch %p %p '%pd'\n", di, dn, dn); + + di->flags |= CEPH_DENTRY_LEASE_LIST; + if (di->flags & CEPH_DENTRY_SHRINK_LIST) { + di->flags |= CEPH_DENTRY_REFERENCED; + return; + } + + mdsc = ceph_sb_to_client(dn->d_sb)->mdsc; + spin_lock(&mdsc->dentry_list_lock); + list_move_tail(&di->lease_list, &mdsc->dentry_leases); + spin_unlock(&mdsc->dentry_list_lock); +} + +static void __dentry_dir_lease_touch(struct ceph_mds_client* mdsc, + struct ceph_dentry_info *di) +{ + di->flags &= ~(CEPH_DENTRY_LEASE_LIST | CEPH_DENTRY_REFERENCED); + di->lease_gen = 0; + di->time = jiffies; + list_move_tail(&di->lease_list, &mdsc->dentry_dir_leases); +} + +/* + * When dir lease is used, add dentry to tail of mdsc->dentry_dir_leases + * list if it's not in the list, otherwise set 'referenced' flag. + * + * Called under dentry->d_lock. + */ +void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di) +{ + struct dentry *dn = di->dentry; + struct ceph_mds_client *mdsc; + + dout("dentry_dir_lease_touch %p %p '%pd' (offset %lld)\n", + di, dn, dn, di->offset); + + if (!list_empty(&di->lease_list)) { + if (di->flags & CEPH_DENTRY_LEASE_LIST) { + /* don't remove dentry from dentry lease list + * if its lease is valid */ + if (__dentry_lease_is_valid(di)) + return; + } else { + di->flags |= CEPH_DENTRY_REFERENCED; + return; + } + } + + if (di->flags & CEPH_DENTRY_SHRINK_LIST) { + di->flags |= CEPH_DENTRY_REFERENCED; + di->flags &= ~CEPH_DENTRY_LEASE_LIST; + return; + } + + mdsc = ceph_sb_to_client(dn->d_sb)->mdsc; + spin_lock(&mdsc->dentry_list_lock); + __dentry_dir_lease_touch(mdsc, di), + spin_unlock(&mdsc->dentry_list_lock); +} + +static void __dentry_lease_unlist(struct ceph_dentry_info *di) +{ + struct ceph_mds_client *mdsc; + if (di->flags & CEPH_DENTRY_SHRINK_LIST) + return; + if (list_empty(&di->lease_list)) + return; + + mdsc = ceph_sb_to_client(di->dentry->d_sb)->mdsc; + spin_lock(&mdsc->dentry_list_lock); + list_del_init(&di->lease_list); + spin_unlock(&mdsc->dentry_list_lock); +} + +enum { + KEEP = 0, + DELETE = 1, + TOUCH = 2, + STOP = 4, +}; + +struct ceph_lease_walk_control { + bool dir_lease; + bool expire_dir_lease; + unsigned long nr_to_scan; + unsigned long dir_lease_ttl; +}; + +static unsigned long +__dentry_leases_walk(struct ceph_mds_client *mdsc, + struct ceph_lease_walk_control *lwc, + int (*check)(struct dentry*, void*)) +{ + struct ceph_dentry_info *di, *tmp; + struct dentry *dentry, *last = NULL; + struct list_head* list; + LIST_HEAD(dispose); + unsigned long freed = 0; + int ret = 0; + + list = lwc->dir_lease ? &mdsc->dentry_dir_leases : &mdsc->dentry_leases; + spin_lock(&mdsc->dentry_list_lock); + list_for_each_entry_safe(di, tmp, list, lease_list) { + if (!lwc->nr_to_scan) + break; + --lwc->nr_to_scan; + + dentry = di->dentry; + if (last == dentry) + break; + + if (!spin_trylock(&dentry->d_lock)) + continue; + + if (dentry->d_lockref.count < 0) { + list_del_init(&di->lease_list); + goto next; + } + + ret = check(dentry, lwc); + if (ret & TOUCH) { + /* move it into tail of dir lease list */ + __dentry_dir_lease_touch(mdsc, di); + if (!last) + last = dentry; + } + if (ret & DELETE) { + /* stale lease */ + di->flags &= ~CEPH_DENTRY_REFERENCED; + if (dentry->d_lockref.count > 0) { + /* update_dentry_lease() will re-add + * it to lease list, or + * ceph_d_delete() will return 1 when + * last reference is dropped */ + list_del_init(&di->lease_list); + } else { + di->flags |= CEPH_DENTRY_SHRINK_LIST; + list_move_tail(&di->lease_list, &dispose); + dget_dlock(dentry); + } + } +next: + spin_unlock(&dentry->d_lock); + if (ret & STOP) + break; + } + spin_unlock(&mdsc->dentry_list_lock); + + while (!list_empty(&dispose)) { + di = list_first_entry(&dispose, struct ceph_dentry_info, + lease_list); + dentry = di->dentry; + spin_lock(&dentry->d_lock); + + list_del_init(&di->lease_list); + di->flags &= ~CEPH_DENTRY_SHRINK_LIST; + if (di->flags & CEPH_DENTRY_REFERENCED) { + spin_lock(&mdsc->dentry_list_lock); + if (di->flags & CEPH_DENTRY_LEASE_LIST) { + list_add_tail(&di->lease_list, + &mdsc->dentry_leases); + } else { + __dentry_dir_lease_touch(mdsc, di); + } + spin_unlock(&mdsc->dentry_list_lock); + } else { + freed++; + } + + spin_unlock(&dentry->d_lock); + /* ceph_d_delete() does the trick */ + dput(dentry); + } + return freed; +} + +static int __dentry_lease_check(struct dentry *dentry, void *arg) +{ + struct ceph_dentry_info *di = ceph_dentry(dentry); + int ret; + + if (__dentry_lease_is_valid(di)) + return STOP; + ret = __dir_lease_try_check(dentry); + if (ret == -EBUSY) + return KEEP; + if (ret > 0) + return TOUCH; + return DELETE; +} + +static int __dir_lease_check(struct dentry *dentry, void *arg) +{ + struct ceph_lease_walk_control *lwc = arg; + struct ceph_dentry_info *di = ceph_dentry(dentry); + + int ret = __dir_lease_try_check(dentry); + if (ret == -EBUSY) + return KEEP; + if (ret > 0) { + if (time_before(jiffies, di->time + lwc->dir_lease_ttl)) + return STOP; + /* Move dentry to tail of dir lease list if we don't want + * to delete it. So dentries in the list are checked in a + * round robin manner */ + if (!lwc->expire_dir_lease) + return TOUCH; + if (dentry->d_lockref.count > 0 || + (di->flags & CEPH_DENTRY_REFERENCED)) + return TOUCH; + /* invalidate dir lease */ + di->lease_shared_gen = 0; + } + return DELETE; +} + +int ceph_trim_dentries(struct ceph_mds_client *mdsc) +{ + struct ceph_lease_walk_control lwc; + unsigned long count; + unsigned long freed; + + spin_lock(&mdsc->caps_list_lock); + if (mdsc->caps_use_max > 0 && + mdsc->caps_use_count > mdsc->caps_use_max) + count = mdsc->caps_use_count - mdsc->caps_use_max; + else + count = 0; + spin_unlock(&mdsc->caps_list_lock); + + lwc.dir_lease = false; + lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE * 2; + freed = __dentry_leases_walk(mdsc, &lwc, __dentry_lease_check); + if (!lwc.nr_to_scan) /* more invalid leases */ + return -EAGAIN; + + if (lwc.nr_to_scan < CEPH_CAPS_PER_RELEASE) + lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE; + + lwc.dir_lease = true; + lwc.expire_dir_lease = freed < count; + lwc.dir_lease_ttl = mdsc->fsc->mount_options->caps_wanted_delay_max * HZ; + freed +=__dentry_leases_walk(mdsc, &lwc, __dir_lease_check); + if (!lwc.nr_to_scan) /* more to check */ + return -EAGAIN; + + return freed > 0 ? 1 : 0; +} + +/* * Ensure a dentry lease will no longer revalidate. */ void ceph_invalidate_dentry_lease(struct dentry *dentry) { + struct ceph_dentry_info *di = ceph_dentry(dentry); spin_lock(&dentry->d_lock); - ceph_dentry(dentry)->time = jiffies; - ceph_dentry(dentry)->lease_shared_gen = 0; + di->time = jiffies; + di->lease_shared_gen = 0; + __dentry_lease_unlist(di); spin_unlock(&dentry->d_lock); } @@ -1139,45 +1407,59 @@ void ceph_invalidate_dentry_lease(struct dentry *dentry) * Check if dentry lease is valid. If not, delete the lease. Try to * renew if the least is more than half up. */ +static bool __dentry_lease_is_valid(struct ceph_dentry_info *di) +{ + struct ceph_mds_session *session; + + if (!di->lease_gen) + return false; + + session = di->lease_session; + if (session) { + u32 gen; + unsigned long ttl; + + spin_lock(&session->s_gen_ttl_lock); + gen = session->s_cap_gen; + ttl = session->s_cap_ttl; + spin_unlock(&session->s_gen_ttl_lock); + + if (di->lease_gen == gen && + time_before(jiffies, ttl) && + time_before(jiffies, di->time)) + return true; + } + di->lease_gen = 0; + return false; +} + static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags, struct inode *dir) { struct ceph_dentry_info *di; - struct ceph_mds_session *s; - int valid = 0; - u32 gen; - unsigned long ttl; struct ceph_mds_session *session = NULL; u32 seq = 0; + int valid = 0; spin_lock(&dentry->d_lock); di = ceph_dentry(dentry); - if (di && di->lease_session) { - s = di->lease_session; - spin_lock(&s->s_gen_ttl_lock); - gen = s->s_cap_gen; - ttl = s->s_cap_ttl; - spin_unlock(&s->s_gen_ttl_lock); + if (di && __dentry_lease_is_valid(di)) { + valid = 1; - if (di->lease_gen == gen && - time_before(jiffies, di->time) && - time_before(jiffies, ttl)) { - valid = 1; - if (di->lease_renew_after && - time_after(jiffies, di->lease_renew_after)) { - /* - * We should renew. If we're in RCU walk mode - * though, we can't do that so just return - * -ECHILD. - */ - if (flags & LOOKUP_RCU) { - valid = -ECHILD; - } else { - session = ceph_get_mds_session(s); - seq = di->lease_seq; - di->lease_renew_after = 0; - di->lease_renew_from = jiffies; - } + if (di->lease_renew_after && + time_after(jiffies, di->lease_renew_after)) { + /* + * We should renew. If we're in RCU walk mode + * though, we can't do that so just return + * -ECHILD. + */ + if (flags & LOOKUP_RCU) { + valid = -ECHILD; + } else { + session = ceph_get_mds_session(di->lease_session); + seq = di->lease_seq; + di->lease_renew_after = 0; + di->lease_renew_from = jiffies; } } } @@ -1193,6 +1475,38 @@ static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags, } /* + * Called under dentry->d_lock. + */ +static int __dir_lease_try_check(const struct dentry *dentry) +{ + struct ceph_dentry_info *di = ceph_dentry(dentry); + struct inode *dir; + struct ceph_inode_info *ci; + int valid = 0; + + if (!di->lease_shared_gen) + return 0; + if (IS_ROOT(dentry)) + return 0; + + dir = d_inode(dentry->d_parent); + ci = ceph_inode(dir); + + if (spin_trylock(&ci->i_ceph_lock)) { + if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen && + __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 0)) + valid = 1; + spin_unlock(&ci->i_ceph_lock); + } else { + valid = -EBUSY; + } + + if (!valid) + di->lease_shared_gen = 0; + return valid; +} + +/* * Check if directory-wide content lease/cap is valid. */ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) @@ -1205,6 +1519,8 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen) valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1); spin_unlock(&ci->i_ceph_lock); + if (valid) + __ceph_dentry_dir_lease_touch(di); dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n", dir, (unsigned)atomic_read(&ci->i_shared_gen), dentry, (unsigned)di->lease_shared_gen, valid); @@ -1297,11 +1613,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) } dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); - if (valid) { - ceph_dentry_lru_touch(dentry); - } else { + if (!valid) ceph_dir_clear_complete(dir); - } if (!(flags & LOOKUP_RCU)) dput(parent); @@ -1309,6 +1622,31 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) } /* + * Delete unused dentry that doesn't have valid lease + * + * Called under dentry->d_lock. + */ +static int ceph_d_delete(const struct dentry *dentry) +{ + struct ceph_dentry_info *di; + + /* won't release caps */ + if (d_really_is_negative(dentry)) + return 0; + if (ceph_snap(d_inode(dentry)) != CEPH_NOSNAP) + return 0; + /* vaild lease? */ + di = ceph_dentry(dentry); + if (di) { + if (__dentry_lease_is_valid(di)) + return 0; + if (__dir_lease_try_check(dentry)) + return 0; + } + return 1; +} + +/* * Release our ceph_dentry_info. */ static void ceph_d_release(struct dentry *dentry) @@ -1316,9 +1654,9 @@ static void ceph_d_release(struct dentry *dentry) struct ceph_dentry_info *di = ceph_dentry(dentry); dout("d_release %p\n", dentry); - ceph_dentry_lru_del(dentry); spin_lock(&dentry->d_lock); + __dentry_lease_unlist(di); dentry->d_fsdata = NULL; spin_unlock(&dentry->d_lock); @@ -1419,49 +1757,7 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, return size - left; } -/* - * We maintain a private dentry LRU. - * - * FIXME: this needs to be changed to a per-mds lru to be useful. - */ -void ceph_dentry_lru_add(struct dentry *dn) -{ - struct ceph_dentry_info *di = ceph_dentry(dn); - struct ceph_mds_client *mdsc; - - dout("dentry_lru_add %p %p '%pd'\n", di, dn, dn); - mdsc = ceph_sb_to_client(dn->d_sb)->mdsc; - spin_lock(&mdsc->dentry_lru_lock); - list_add_tail(&di->lru, &mdsc->dentry_lru); - mdsc->num_dentry++; - spin_unlock(&mdsc->dentry_lru_lock); -} - -void ceph_dentry_lru_touch(struct dentry *dn) -{ - struct ceph_dentry_info *di = ceph_dentry(dn); - struct ceph_mds_client *mdsc; - - dout("dentry_lru_touch %p %p '%pd' (offset %lld)\n", di, dn, dn, - di->offset); - mdsc = ceph_sb_to_client(dn->d_sb)->mdsc; - spin_lock(&mdsc->dentry_lru_lock); - list_move_tail(&di->lru, &mdsc->dentry_lru); - spin_unlock(&mdsc->dentry_lru_lock); -} -void ceph_dentry_lru_del(struct dentry *dn) -{ - struct ceph_dentry_info *di = ceph_dentry(dn); - struct ceph_mds_client *mdsc; - - dout("dentry_lru_del %p %p '%pd'\n", di, dn, dn); - mdsc = ceph_sb_to_client(dn->d_sb)->mdsc; - spin_lock(&mdsc->dentry_lru_lock); - list_del_init(&di->lru); - mdsc->num_dentry--; - spin_unlock(&mdsc->dentry_lru_lock); -} /* * Return name hash for a given dentry. This is dependent on @@ -1470,6 +1766,7 @@ void ceph_dentry_lru_del(struct dentry *dn) unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn) { struct ceph_inode_info *dci = ceph_inode(dir); + unsigned hash; switch (dci->i_dir_layout.dl_dir_hash) { case 0: /* for backward compat */ @@ -1477,8 +1774,11 @@ unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn) return dn->d_name.hash; default: - return ceph_str_hash(dci->i_dir_layout.dl_dir_hash, + spin_lock(&dn->d_lock); + hash = ceph_str_hash(dci->i_dir_layout.dl_dir_hash, dn->d_name.name, dn->d_name.len); + spin_unlock(&dn->d_lock); + return hash; } } @@ -1531,6 +1831,7 @@ const struct inode_operations ceph_snapdir_iops = { const struct dentry_operations ceph_dentry_ops = { .d_revalidate = ceph_d_revalidate, + .d_delete = ceph_d_delete, .d_release = ceph_d_release, .d_prune = ceph_d_prune, .d_init = ceph_d_init, diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 189df668b6a0..9f53c3d99304 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -590,7 +590,8 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, * but it will at least behave sensibly when they are * in sequence. */ - ret = filemap_write_and_wait_range(inode->i_mapping, off, off + len); + ret = filemap_write_and_wait_range(inode->i_mapping, + off, off + len - 1); if (ret < 0) return ret; @@ -929,14 +930,15 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, (write ? "write" : "read"), file, pos, (unsigned)count, snapc, snapc->seq); - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); + ret = filemap_write_and_wait_range(inode->i_mapping, + pos, pos + count - 1); if (ret < 0) return ret; if (write) { int ret2 = invalidate_inode_pages2_range(inode->i_mapping, pos >> PAGE_SHIFT, - (pos + count) >> PAGE_SHIFT); + (pos + count - 1) >> PAGE_SHIFT); if (ret2 < 0) dout("invalidate_inode_pages2_range returned %d\n", ret2); @@ -1132,13 +1134,14 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, dout("sync_write on file %p %lld~%u snapc %p seq %lld\n", file, pos, (unsigned)count, snapc, snapc->seq); - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); + ret = filemap_write_and_wait_range(inode->i_mapping, + pos, pos + count - 1); if (ret < 0) return ret; ret = invalidate_inode_pages2_range(inode->i_mapping, pos >> PAGE_SHIFT, - (pos + count) >> PAGE_SHIFT); + (pos + count - 1) >> PAGE_SHIFT); if (ret < 0) dout("invalidate_inode_pages2_range returned %d\n", ret); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 9d1f34d46627..c2feb310ac1e 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -497,7 +497,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_wrbuffer_ref = 0; ci->i_wrbuffer_ref_head = 0; atomic_set(&ci->i_filelock_ref, 0); - atomic_set(&ci->i_shared_gen, 0); + atomic_set(&ci->i_shared_gen, 1); ci->i_rdcache_gen = 0; ci->i_rdcache_revoking = 0; @@ -524,6 +524,7 @@ static void ceph_i_callback(struct rcu_head *head) struct inode *inode = container_of(head, struct inode, i_rcu); struct ceph_inode_info *ci = ceph_inode(inode); + kfree(ci->i_symlink); kmem_cache_free(ceph_inode_cachep, ci); } @@ -537,7 +538,7 @@ void ceph_destroy_inode(struct inode *inode) ceph_fscache_unregister_inode_cookie(ci); - ceph_queue_caps_release(inode); + __ceph_remove_caps(inode); if (__ceph_has_any_quota(ci)) ceph_adjust_quota_realms_count(inode, false); @@ -548,20 +549,24 @@ void ceph_destroy_inode(struct inode *inode) */ if (ci->i_snap_realm) { struct ceph_mds_client *mdsc = - ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; - struct ceph_snap_realm *realm = ci->i_snap_realm; - - dout(" dropping residual ref to snap realm %p\n", realm); - spin_lock(&realm->inodes_with_caps_lock); - list_del_init(&ci->i_snap_realm_item); - ci->i_snap_realm = NULL; - if (realm->ino == ci->i_vino.ino) - realm->inode = NULL; - spin_unlock(&realm->inodes_with_caps_lock); - ceph_put_snap_realm(mdsc, realm); + ceph_inode_to_client(inode)->mdsc; + if (ceph_snap(inode) == CEPH_NOSNAP) { + struct ceph_snap_realm *realm = ci->i_snap_realm; + dout(" dropping residual ref to snap realm %p\n", + realm); + spin_lock(&realm->inodes_with_caps_lock); + list_del_init(&ci->i_snap_realm_item); + ci->i_snap_realm = NULL; + if (realm->ino == ci->i_vino.ino) + realm->inode = NULL; + spin_unlock(&realm->inodes_with_caps_lock); + ceph_put_snap_realm(mdsc, realm); + } else { + ceph_put_snapid_map(mdsc, ci->i_snapid_map); + ci->i_snap_realm = NULL; + } } - kfree(ci->i_symlink); while ((n = rb_first(&ci->i_fragtree)) != NULL) { frag = rb_entry(n, struct ceph_inode_frag, node); rb_erase(n, &ci->i_fragtree); @@ -776,6 +781,9 @@ static int fill_inode(struct inode *inode, struct page *locked_page, pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data, iinfo->pool_ns_len); + if (ceph_snap(inode) != CEPH_NOSNAP && !ci->i_snapid_map) + ci->i_snapid_map = ceph_get_snapid_map(mdsc, ceph_snap(inode)); + spin_lock(&ci->i_ceph_lock); /* @@ -869,6 +877,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, ci->i_rbytes = le64_to_cpu(info->rbytes); ci->i_rfiles = le64_to_cpu(info->rfiles); ci->i_rsubdirs = le64_to_cpu(info->rsubdirs); + ci->i_dir_pin = iinfo->dir_pin; ceph_decode_timespec64(&ci->i_rctime, &info->rctime); } } @@ -899,6 +908,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, case S_IFBLK: case S_IFCHR: case S_IFSOCK: + inode->i_blkbits = PAGE_SHIFT; init_special_inode(inode, inode->i_mode, inode->i_rdev); inode->i_op = &ceph_file_iops; break; @@ -1066,9 +1076,10 @@ static void update_dentry_lease(struct dentry *dentry, goto out_unlock; di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen); - - if (duration == 0) + if (duration == 0) { + __ceph_dentry_dir_lease_touch(di); goto out_unlock; + } if (di->lease_gen == session->s_cap_gen && time_before(ttl, di->time)) @@ -1079,8 +1090,6 @@ static void update_dentry_lease(struct dentry *dentry, di->lease_session = NULL; } - ceph_dentry_lru_touch(dentry); - if (!di->lease_session) di->lease_session = ceph_get_mds_session(session); di->lease_gen = session->s_cap_gen; @@ -1088,6 +1097,8 @@ static void update_dentry_lease(struct dentry *dentry, di->lease_renew_after = half_ttl; di->lease_renew_from = 0; di->time = ttl; + + __ceph_dentry_lease_touch(di); out_unlock: spin_unlock(&dentry->d_lock); if (old_lease_session) @@ -1152,6 +1163,19 @@ static int splice_dentry(struct dentry **pdn, struct inode *in) return 0; } +static int d_name_cmp(struct dentry *dentry, const char *name, size_t len) +{ + int ret; + + /* take d_lock to ensure dentry->d_name stability */ + spin_lock(&dentry->d_lock); + ret = dentry->d_name.len - len; + if (!ret) + ret = memcmp(dentry->d_name.name, name, len); + spin_unlock(&dentry->d_lock); + return ret; +} + /* * Incorporate results into the local cache. This is either just * one inode, or a directory, dentry, and possibly linked-to inode (e.g., @@ -1401,7 +1425,8 @@ retry_lookup: err = splice_dentry(&req->r_dentry, in); if (err < 0) goto done; - } else if (rinfo->head->is_dentry) { + } else if (rinfo->head->is_dentry && + !d_name_cmp(req->r_dentry, rinfo->dname, rinfo->dname_len)) { struct ceph_vino *ptvino = NULL; if ((le32_to_cpu(rinfo->diri.in->cap.caps) & CEPH_CAP_FILE_SHARED) || @@ -2259,10 +2284,11 @@ int ceph_getattr(const struct path *path, struct kstat *stat, if (!err) { generic_fillattr(inode, stat); stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino); - if (ceph_snap(inode) != CEPH_NOSNAP) - stat->dev = ceph_snap(inode); + if (ceph_snap(inode) == CEPH_NOSNAP) + stat->dev = inode->i_sb->s_dev; else - stat->dev = 0; + stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0; + if (S_ISDIR(inode->i_mode)) { if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), RBYTES)) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 163fc74bf221..9049c2a3e972 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -20,6 +20,8 @@ #include <linux/ceph/auth.h> #include <linux/ceph/debugfs.h> +#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) + /* * A cluster of MDS (metadata server) daemons is responsible for * managing the file system namespace (the directory hierarchy and @@ -46,13 +48,17 @@ */ struct ceph_reconnect_state { - int nr_caps; + struct ceph_mds_session *session; + int nr_caps, nr_realms; struct ceph_pagelist *pagelist; unsigned msg_version; + bool allow_multi; }; static void __wake_requests(struct ceph_mds_client *mdsc, struct list_head *head); +static void ceph_cap_release_work(struct work_struct *work); +static void ceph_cap_reclaim_work(struct work_struct *work); static const struct ceph_connection_operations mds_con_ops; @@ -61,6 +67,29 @@ static const struct ceph_connection_operations mds_con_ops; * mds reply parsing */ +static int parse_reply_info_quota(void **p, void *end, + struct ceph_mds_reply_info_in *info) +{ + u8 struct_v, struct_compat; + u32 struct_len; + + ceph_decode_8_safe(p, end, struct_v, bad); + ceph_decode_8_safe(p, end, struct_compat, bad); + /* struct_v is expected to be >= 1. we only + * understand encoding with struct_compat == 1. */ + if (!struct_v || struct_compat != 1) + goto bad; + ceph_decode_32_safe(p, end, struct_len, bad); + ceph_decode_need(p, end, struct_len, bad); + end = *p + struct_len; + ceph_decode_64_safe(p, end, info->max_bytes, bad); + ceph_decode_64_safe(p, end, info->max_files, bad); + *p = end; + return 0; +bad: + return -EIO; +} + /* * parse individual inode info */ @@ -68,8 +97,24 @@ static int parse_reply_info_in(void **p, void *end, struct ceph_mds_reply_info_in *info, u64 features) { - int err = -EIO; + int err = 0; + u8 struct_v = 0; + if (features == (u64)-1) { + u32 struct_len; + u8 struct_compat; + ceph_decode_8_safe(p, end, struct_v, bad); + ceph_decode_8_safe(p, end, struct_compat, bad); + /* struct_v is expected to be >= 1. we only understand + * encoding with struct_compat == 1. */ + if (!struct_v || struct_compat != 1) + goto bad; + ceph_decode_32_safe(p, end, struct_len, bad); + ceph_decode_need(p, end, struct_len, bad); + end = *p + struct_len; + } + + ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); info->in = *p; *p += sizeof(struct ceph_mds_reply_inode) + sizeof(*info->in->fragtree.splits) * @@ -87,49 +132,136 @@ static int parse_reply_info_in(void **p, void *end, info->xattr_data = *p; *p += info->xattr_len; - if (features & CEPH_FEATURE_MDS_INLINE_DATA) { + if (features == (u64)-1) { + /* inline data */ ceph_decode_64_safe(p, end, info->inline_version, bad); ceph_decode_32_safe(p, end, info->inline_len, bad); ceph_decode_need(p, end, info->inline_len, bad); info->inline_data = *p; *p += info->inline_len; - } else - info->inline_version = CEPH_INLINE_NONE; + /* quota */ + err = parse_reply_info_quota(p, end, info); + if (err < 0) + goto out_bad; + /* pool namespace */ + ceph_decode_32_safe(p, end, info->pool_ns_len, bad); + if (info->pool_ns_len > 0) { + ceph_decode_need(p, end, info->pool_ns_len, bad); + info->pool_ns_data = *p; + *p += info->pool_ns_len; + } + /* btime, change_attr */ + { + struct ceph_timespec btime; + u64 change_attr; + ceph_decode_need(p, end, sizeof(btime), bad); + ceph_decode_copy(p, &btime, sizeof(btime)); + ceph_decode_64_safe(p, end, change_attr, bad); + } + + /* dir pin */ + if (struct_v >= 2) { + ceph_decode_32_safe(p, end, info->dir_pin, bad); + } else { + info->dir_pin = -ENODATA; + } + + *p = end; + } else { + if (features & CEPH_FEATURE_MDS_INLINE_DATA) { + ceph_decode_64_safe(p, end, info->inline_version, bad); + ceph_decode_32_safe(p, end, info->inline_len, bad); + ceph_decode_need(p, end, info->inline_len, bad); + info->inline_data = *p; + *p += info->inline_len; + } else + info->inline_version = CEPH_INLINE_NONE; + + if (features & CEPH_FEATURE_MDS_QUOTA) { + err = parse_reply_info_quota(p, end, info); + if (err < 0) + goto out_bad; + } else { + info->max_bytes = 0; + info->max_files = 0; + } + + info->pool_ns_len = 0; + info->pool_ns_data = NULL; + if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { + ceph_decode_32_safe(p, end, info->pool_ns_len, bad); + if (info->pool_ns_len > 0) { + ceph_decode_need(p, end, info->pool_ns_len, bad); + info->pool_ns_data = *p; + *p += info->pool_ns_len; + } + } + + info->dir_pin = -ENODATA; + } + return 0; +bad: + err = -EIO; +out_bad: + return err; +} - if (features & CEPH_FEATURE_MDS_QUOTA) { +static int parse_reply_info_dir(void **p, void *end, + struct ceph_mds_reply_dirfrag **dirfrag, + u64 features) +{ + if (features == (u64)-1) { u8 struct_v, struct_compat; u32 struct_len; - - /* - * both struct_v and struct_compat are expected to be >= 1 - */ ceph_decode_8_safe(p, end, struct_v, bad); ceph_decode_8_safe(p, end, struct_compat, bad); - if (!struct_v || !struct_compat) + /* struct_v is expected to be >= 1. we only understand + * encoding whose struct_compat == 1. */ + if (!struct_v || struct_compat != 1) goto bad; ceph_decode_32_safe(p, end, struct_len, bad); ceph_decode_need(p, end, struct_len, bad); - ceph_decode_64_safe(p, end, info->max_bytes, bad); - ceph_decode_64_safe(p, end, info->max_files, bad); - } else { - info->max_bytes = 0; - info->max_files = 0; + end = *p + struct_len; } - info->pool_ns_len = 0; - info->pool_ns_data = NULL; - if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { - ceph_decode_32_safe(p, end, info->pool_ns_len, bad); - if (info->pool_ns_len > 0) { - ceph_decode_need(p, end, info->pool_ns_len, bad); - info->pool_ns_data = *p; - *p += info->pool_ns_len; - } + ceph_decode_need(p, end, sizeof(**dirfrag), bad); + *dirfrag = *p; + *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); + if (unlikely(*p > end)) + goto bad; + if (features == (u64)-1) + *p = end; + return 0; +bad: + return -EIO; +} + +static int parse_reply_info_lease(void **p, void *end, + struct ceph_mds_reply_lease **lease, + u64 features) +{ + if (features == (u64)-1) { + u8 struct_v, struct_compat; + u32 struct_len; + ceph_decode_8_safe(p, end, struct_v, bad); + ceph_decode_8_safe(p, end, struct_compat, bad); + /* struct_v is expected to be >= 1. we only understand + * encoding whose struct_compat == 1. */ + if (!struct_v || struct_compat != 1) + goto bad; + ceph_decode_32_safe(p, end, struct_len, bad); + ceph_decode_need(p, end, struct_len, bad); + end = *p + struct_len; } + ceph_decode_need(p, end, sizeof(**lease), bad); + *lease = *p; + *p += sizeof(**lease); + if (features == (u64)-1) + *p = end; return 0; bad: - return err; + return -EIO; } /* @@ -147,20 +279,18 @@ static int parse_reply_info_trace(void **p, void *end, if (err < 0) goto out_bad; - if (unlikely(*p + sizeof(*info->dirfrag) > end)) - goto bad; - info->dirfrag = *p; - *p += sizeof(*info->dirfrag) + - sizeof(u32)*le32_to_cpu(info->dirfrag->ndist); - if (unlikely(*p > end)) - goto bad; + err = parse_reply_info_dir(p, end, &info->dirfrag, features); + if (err < 0) + goto out_bad; ceph_decode_32_safe(p, end, info->dname_len, bad); ceph_decode_need(p, end, info->dname_len, bad); info->dname = *p; *p += info->dname_len; - info->dlease = *p; - *p += sizeof(*info->dlease); + + err = parse_reply_info_lease(p, end, &info->dlease, features); + if (err < 0) + goto out_bad; } if (info->head->is_target) { @@ -183,20 +313,16 @@ out_bad: /* * parse readdir results */ -static int parse_reply_info_dir(void **p, void *end, +static int parse_reply_info_readdir(void **p, void *end, struct ceph_mds_reply_info_parsed *info, u64 features) { u32 num, i = 0; int err; - info->dir_dir = *p; - if (*p + sizeof(*info->dir_dir) > end) - goto bad; - *p += sizeof(*info->dir_dir) + - sizeof(u32)*le32_to_cpu(info->dir_dir->ndist); - if (*p > end) - goto bad; + err = parse_reply_info_dir(p, end, &info->dir_dir, features); + if (err < 0) + goto out_bad; ceph_decode_need(p, end, sizeof(num) + 2, bad); num = ceph_decode_32(p); @@ -222,15 +348,16 @@ static int parse_reply_info_dir(void **p, void *end, while (num) { struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; /* dentry */ - ceph_decode_need(p, end, sizeof(u32)*2, bad); - rde->name_len = ceph_decode_32(p); + ceph_decode_32_safe(p, end, rde->name_len, bad); ceph_decode_need(p, end, rde->name_len, bad); rde->name = *p; *p += rde->name_len; dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); - rde->lease = *p; - *p += sizeof(struct ceph_mds_reply_lease); + /* dentry lease */ + err = parse_reply_info_lease(p, end, &rde->lease, features); + if (err) + goto out_bad; /* inode */ err = parse_reply_info_in(p, end, &rde->inode, features); if (err < 0) @@ -281,7 +408,8 @@ static int parse_reply_info_create(void **p, void *end, struct ceph_mds_reply_info_parsed *info, u64 features) { - if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { + if (features == (u64)-1 || + (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { if (*p == end) { info->has_create_ino = false; } else { @@ -310,7 +438,7 @@ static int parse_reply_info_extra(void **p, void *end, if (op == CEPH_MDS_OP_GETFILELOCK) return parse_reply_info_filelock(p, end, info, features); else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) - return parse_reply_info_dir(p, end, info, features); + return parse_reply_info_readdir(p, end, info, features); else if (op == CEPH_MDS_OP_CREATE) return parse_reply_info_create(p, end, info, features); else @@ -494,7 +622,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); spin_lock_init(&s->s_gen_ttl_lock); - s->s_cap_gen = 0; + s->s_cap_gen = 1; s->s_cap_ttl = jiffies - 1; spin_lock_init(&s->s_cap_lock); @@ -510,6 +638,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, s->s_cap_reconnect = 0; s->s_cap_iterator = NULL; INIT_LIST_HEAD(&s->s_cap_releases); + INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); + INIT_LIST_HEAD(&s->s_cap_flushing); mdsc->sessions[mds] = s; @@ -535,6 +665,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc, dout("__unregister_session mds%d %p\n", s->s_mds, s); BUG_ON(mdsc->sessions[s->s_mds] != s); mdsc->sessions[s->s_mds] = NULL; + s->s_state = 0; ceph_con_close(&s->s_con); ceph_put_mds_session(s); atomic_dec(&mdsc->num_sessions); @@ -1197,13 +1328,10 @@ static int iterate_session_caps(struct ceph_mds_session *session, cap->session = NULL; list_del_init(&cap->session_caps); session->s_nr_caps--; - if (cap->queue_release) { - list_add_tail(&cap->session_caps, - &session->s_cap_releases); - session->s_num_cap_releases++; - } else { + if (cap->queue_release) + __ceph_queue_cap_release(session, cap); + else old_cap = cap; /* put_cap it w/o locks held */ - } } if (ret < 0) goto out; @@ -1286,6 +1414,15 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); ci->i_prealloc_cap_flush = NULL; } + + if (drop && + ci->i_wrbuffer_ref_head == 0 && + ci->i_wr_ref == 0 && + ci->i_dirty_caps == 0 && + ci->i_flushing_caps == 0) { + ceph_put_snap_context(ci->i_head_snapc); + ci->i_head_snapc = NULL; + } } spin_unlock(&ci->i_ceph_lock); while (!list_empty(&to_remove)) { @@ -1638,7 +1775,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc, session->s_trim_caps = 0; } - ceph_send_cap_releases(mdsc, session); + ceph_flush_cap_releases(mdsc, session); return 0; } @@ -1681,8 +1818,8 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, /* * called under s_mutex */ -void ceph_send_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) +static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) { struct ceph_msg *msg = NULL; struct ceph_mds_cap_release *head; @@ -1774,6 +1911,81 @@ out_err: spin_unlock(&session->s_cap_lock); } +static void ceph_cap_release_work(struct work_struct *work) +{ + struct ceph_mds_session *session = + container_of(work, struct ceph_mds_session, s_cap_release_work); + + mutex_lock(&session->s_mutex); + if (session->s_state == CEPH_MDS_SESSION_OPEN || + session->s_state == CEPH_MDS_SESSION_HUNG) + ceph_send_cap_releases(session->s_mdsc, session); + mutex_unlock(&session->s_mutex); + ceph_put_mds_session(session); +} + +void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + if (mdsc->stopping) + return; + + get_session(session); + if (queue_work(mdsc->fsc->cap_wq, + &session->s_cap_release_work)) { + dout("cap release work queued\n"); + } else { + ceph_put_mds_session(session); + dout("failed to queue cap release work\n"); + } +} + +/* + * caller holds session->s_cap_lock + */ +void __ceph_queue_cap_release(struct ceph_mds_session *session, + struct ceph_cap *cap) +{ + list_add_tail(&cap->session_caps, &session->s_cap_releases); + session->s_num_cap_releases++; + + if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) + ceph_flush_cap_releases(session->s_mdsc, session); +} + +static void ceph_cap_reclaim_work(struct work_struct *work) +{ + struct ceph_mds_client *mdsc = + container_of(work, struct ceph_mds_client, cap_reclaim_work); + int ret = ceph_trim_dentries(mdsc); + if (ret == -EAGAIN) + ceph_queue_cap_reclaim_work(mdsc); +} + +void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) +{ + if (mdsc->stopping) + return; + + if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { + dout("caps reclaim work queued\n"); + } else { + dout("failed to queue caps release work\n"); + } +} + +void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) +{ + int val; + if (!nr) + return; + val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); + if (!(val % CEPH_CAPS_PER_RELEASE)) { + atomic_set(&mdsc->cap_reclaim_pending, 0); + ceph_queue_cap_reclaim_work(mdsc); + } +} + /* * requests */ @@ -1958,10 +2170,39 @@ retry: return path; } +/* Duplicate the dentry->d_name.name safely */ +static int clone_dentry_name(struct dentry *dentry, const char **ppath, + int *ppathlen) +{ + u32 len; + char *name; + +retry: + len = READ_ONCE(dentry->d_name.len); + name = kmalloc(len + 1, GFP_NOFS); + if (!name) + return -ENOMEM; + + spin_lock(&dentry->d_lock); + if (dentry->d_name.len != len) { + spin_unlock(&dentry->d_lock); + kfree(name); + goto retry; + } + memcpy(name, dentry->d_name.name, len); + spin_unlock(&dentry->d_lock); + + name[len] = '\0'; + *ppath = name; + *ppathlen = len; + return 0; +} + static int build_dentry_path(struct dentry *dentry, struct inode *dir, const char **ppath, int *ppathlen, u64 *pino, - int *pfreepath) + bool *pfreepath, bool parent_locked) { + int ret; char *path; rcu_read_lock(); @@ -1970,8 +2211,15 @@ static int build_dentry_path(struct dentry *dentry, struct inode *dir, if (dir && ceph_snap(dir) == CEPH_NOSNAP) { *pino = ceph_ino(dir); rcu_read_unlock(); - *ppath = dentry->d_name.name; - *ppathlen = dentry->d_name.len; + if (parent_locked) { + *ppath = dentry->d_name.name; + *ppathlen = dentry->d_name.len; + } else { + ret = clone_dentry_name(dentry, ppath, ppathlen); + if (ret) + return ret; + *pfreepath = true; + } return 0; } rcu_read_unlock(); @@ -1979,13 +2227,13 @@ static int build_dentry_path(struct dentry *dentry, struct inode *dir, if (IS_ERR(path)) return PTR_ERR(path); *ppath = path; - *pfreepath = 1; + *pfreepath = true; return 0; } static int build_inode_path(struct inode *inode, const char **ppath, int *ppathlen, u64 *pino, - int *pfreepath) + bool *pfreepath) { struct dentry *dentry; char *path; @@ -2001,7 +2249,7 @@ static int build_inode_path(struct inode *inode, if (IS_ERR(path)) return PTR_ERR(path); *ppath = path; - *pfreepath = 1; + *pfreepath = true; return 0; } @@ -2012,7 +2260,7 @@ static int build_inode_path(struct inode *inode, static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, struct inode *rdiri, const char *rpath, u64 rino, const char **ppath, int *pathlen, - u64 *ino, int *freepath) + u64 *ino, bool *freepath, bool parent_locked) { int r = 0; @@ -2022,7 +2270,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, ceph_snap(rinode)); } else if (rdentry) { r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, - freepath); + freepath, parent_locked); dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, *ppath); } else if (rpath || rino) { @@ -2048,7 +2296,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, const char *path2 = NULL; u64 ino1 = 0, ino2 = 0; int pathlen1 = 0, pathlen2 = 0; - int freepath1 = 0, freepath2 = 0; + bool freepath1 = false, freepath2 = false; int len; u16 releases; void *p, *end; @@ -2056,16 +2304,19 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, ret = set_request_path_attr(req->r_inode, req->r_dentry, req->r_parent, req->r_path1, req->r_ino1.ino, - &path1, &pathlen1, &ino1, &freepath1); + &path1, &pathlen1, &ino1, &freepath1, + test_bit(CEPH_MDS_R_PARENT_LOCKED, + &req->r_req_flags)); if (ret < 0) { msg = ERR_PTR(ret); goto out; } + /* If r_old_dentry is set, then assume that its parent is locked */ ret = set_request_path_attr(NULL, req->r_old_dentry, req->r_old_dentry_dir, req->r_path2, req->r_ino2.ino, - &path2, &pathlen2, &ino2, &freepath2); + &path2, &pathlen2, &ino2, &freepath2, true); if (ret < 0) { msg = ERR_PTR(ret); goto out_free1; @@ -2653,7 +2904,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) dout("handle_reply tid %lld result %d\n", tid, result); rinfo = &req->r_reply_info; - err = parse_reply_info(msg, rinfo, session->s_con.peer_features); + if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) + err = parse_reply_info(msg, rinfo, (u64)-1); + else + err = parse_reply_info(msg, rinfo, session->s_con.peer_features); mutex_unlock(&mdsc->mutex); mutex_lock(&session->s_mutex); @@ -2684,7 +2938,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || req->r_op == CEPH_MDS_OP_LSSNAP)) ceph_readdir_prepopulate(req, req->r_session); - ceph_unreserve_caps(mdsc, &req->r_caps_reservation); } current->journal_info = NULL; mutex_unlock(&req->r_fill_mutex); @@ -2693,12 +2946,18 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) if (realm) ceph_put_snap_realm(mdsc, realm); - if (err == 0 && req->r_target_inode && - test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { - struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); - spin_lock(&ci->i_unsafe_lock); - list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops); - spin_unlock(&ci->i_unsafe_lock); + if (err == 0) { + if (req->r_target_inode && + test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { + struct ceph_inode_info *ci = + ceph_inode(req->r_target_inode); + spin_lock(&ci->i_unsafe_lock); + list_add_tail(&req->r_unsafe_target_item, + &ci->i_unsafe_iops); + spin_unlock(&ci->i_unsafe_lock); + } + + ceph_unreserve_caps(mdsc, &req->r_caps_reservation); } out_err: mutex_lock(&mdsc->mutex); @@ -2777,6 +3036,25 @@ bad: pr_err("mdsc_handle_forward decode error err=%d\n", err); } +static int __decode_and_drop_session_metadata(void **p, void *end) +{ + /* map<string,string> */ + u32 n; + ceph_decode_32_safe(p, end, n, bad); + while (n-- > 0) { + u32 len; + ceph_decode_32_safe(p, end, len, bad); + ceph_decode_need(p, end, len, bad); + *p += len; + ceph_decode_32_safe(p, end, len, bad); + ceph_decode_need(p, end, len, bad); + *p += len; + } + return 0; +bad: + return -1; +} + /* * handle a mds session control message */ @@ -2784,18 +3062,36 @@ static void handle_session(struct ceph_mds_session *session, struct ceph_msg *msg) { struct ceph_mds_client *mdsc = session->s_mdsc; + int mds = session->s_mds; + int msg_version = le16_to_cpu(msg->hdr.version); + void *p = msg->front.iov_base; + void *end = p + msg->front.iov_len; + struct ceph_mds_session_head *h; u32 op; u64 seq; - int mds = session->s_mds; - struct ceph_mds_session_head *h = msg->front.iov_base; + unsigned long features = 0; int wake = 0; /* decode */ - if (msg->front.iov_len < sizeof(*h)) - goto bad; + ceph_decode_need(&p, end, sizeof(*h), bad); + h = p; + p += sizeof(*h); + op = le32_to_cpu(h->op); seq = le64_to_cpu(h->seq); + if (msg_version >= 3) { + u32 len; + /* version >= 2, metadata */ + if (__decode_and_drop_session_metadata(&p, end) < 0) + goto bad; + /* version >= 3, feature bits */ + ceph_decode_32_safe(&p, end, len, bad); + ceph_decode_need(&p, end, len, bad); + memcpy(&features, p, min_t(size_t, len, sizeof(features))); + p += len; + } + mutex_lock(&mdsc->mutex); if (op == CEPH_SESSION_CLOSE) { get_session(session); @@ -2821,6 +3117,7 @@ static void handle_session(struct ceph_mds_session *session, if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) pr_info("mds%d reconnect success\n", session->s_mds); session->s_state = CEPH_MDS_SESSION_OPEN; + session->s_features = features; renewed_caps(mdsc, session, 0); wake = 1; if (mdsc->stopping) @@ -2947,6 +3244,82 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc, mutex_unlock(&mdsc->mutex); } +static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) +{ + struct ceph_msg *reply; + struct ceph_pagelist *_pagelist; + struct page *page; + __le32 *addr; + int err = -ENOMEM; + + if (!recon_state->allow_multi) + return -ENOSPC; + + /* can't handle message that contains both caps and realm */ + BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); + + /* pre-allocate new pagelist */ + _pagelist = ceph_pagelist_alloc(GFP_NOFS); + if (!_pagelist) + return -ENOMEM; + + reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); + if (!reply) + goto fail_msg; + + /* placeholder for nr_caps */ + err = ceph_pagelist_encode_32(_pagelist, 0); + if (err < 0) + goto fail; + + if (recon_state->nr_caps) { + /* currently encoding caps */ + err = ceph_pagelist_encode_32(recon_state->pagelist, 0); + if (err) + goto fail; + } else { + /* placeholder for nr_realms (currently encoding relams) */ + err = ceph_pagelist_encode_32(_pagelist, 0); + if (err < 0) + goto fail; + } + + err = ceph_pagelist_encode_8(recon_state->pagelist, 1); + if (err) + goto fail; + + page = list_first_entry(&recon_state->pagelist->head, struct page, lru); + addr = kmap_atomic(page); + if (recon_state->nr_caps) { + /* currently encoding caps */ + *addr = cpu_to_le32(recon_state->nr_caps); + } else { + /* currently encoding relams */ + *(addr + 1) = cpu_to_le32(recon_state->nr_realms); + } + kunmap_atomic(addr); + + reply->hdr.version = cpu_to_le16(5); + reply->hdr.compat_version = cpu_to_le16(4); + + reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); + ceph_msg_data_add_pagelist(reply, recon_state->pagelist); + + ceph_con_send(&recon_state->session->s_con, reply); + ceph_pagelist_release(recon_state->pagelist); + + recon_state->pagelist = _pagelist; + recon_state->nr_caps = 0; + recon_state->nr_realms = 0; + recon_state->msg_version = 5; + return 0; +fail: + ceph_msg_put(reply); +fail_msg: + ceph_pagelist_release(_pagelist); + return err; +} + /* * Encode information about a cap for a reconnect with the MDS. */ @@ -2966,9 +3339,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, dout(" adding %p ino %llx.%llx cap %p %lld %s\n", inode, ceph_vinop(inode), cap, cap->cap_id, ceph_cap_string(cap->issued)); - err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); - if (err) - return err; spin_lock(&ci->i_ceph_lock); cap->seq = 0; /* reset cap seq */ @@ -3008,7 +3378,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, if (recon_state->msg_version >= 2) { int num_fcntl_locks, num_flock_locks; struct ceph_filelock *flocks = NULL; - size_t struct_len, total_len = 0; + size_t struct_len, total_len = sizeof(u64); u8 struct_v = 0; encode_again: @@ -3043,7 +3413,7 @@ encode_again: if (recon_state->msg_version >= 3) { /* version, compat_version and struct_len */ - total_len = 2 * sizeof(u8) + sizeof(u32); + total_len += 2 * sizeof(u8) + sizeof(u32); struct_v = 2; } /* @@ -3060,12 +3430,19 @@ encode_again: struct_len += sizeof(u64); /* snap_follows */ total_len += struct_len; - err = ceph_pagelist_reserve(pagelist, total_len); - if (err) { - kfree(flocks); - goto out_err; + + if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { + err = send_reconnect_partial(recon_state); + if (err) + goto out_freeflocks; + pagelist = recon_state->pagelist; } + err = ceph_pagelist_reserve(pagelist, total_len); + if (err) + goto out_freeflocks; + + ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); if (recon_state->msg_version >= 3) { ceph_pagelist_encode_8(pagelist, struct_v); ceph_pagelist_encode_8(pagelist, 1); @@ -3077,7 +3454,7 @@ encode_again: num_fcntl_locks, num_flock_locks); if (struct_v >= 2) ceph_pagelist_encode_64(pagelist, snap_follows); - +out_freeflocks: kfree(flocks); } else { u64 pathbase = 0; @@ -3098,20 +3475,81 @@ encode_again: } err = ceph_pagelist_reserve(pagelist, - pathlen + sizeof(u32) + sizeof(rec.v1)); + sizeof(u64) + sizeof(u32) + + pathlen + sizeof(rec.v1)); if (err) { - kfree(path); - goto out_err; + goto out_freepath; } + ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); ceph_pagelist_encode_string(pagelist, path, pathlen); ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); - +out_freepath: kfree(path); } - recon_state->nr_caps++; out_err: + if (err >= 0) + recon_state->nr_caps++; + return err; +} + +static int encode_snap_realms(struct ceph_mds_client *mdsc, + struct ceph_reconnect_state *recon_state) +{ + struct rb_node *p; + struct ceph_pagelist *pagelist = recon_state->pagelist; + int err = 0; + + if (recon_state->msg_version >= 4) { + err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); + if (err < 0) + goto fail; + } + + /* + * snaprealms. we provide mds with the ino, seq (version), and + * parent for all of our realms. If the mds has any newer info, + * it will tell us. + */ + for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { + struct ceph_snap_realm *realm = + rb_entry(p, struct ceph_snap_realm, node); + struct ceph_mds_snaprealm_reconnect sr_rec; + + if (recon_state->msg_version >= 4) { + size_t need = sizeof(u8) * 2 + sizeof(u32) + + sizeof(sr_rec); + + if (pagelist->length + need > RECONNECT_MAX_SIZE) { + err = send_reconnect_partial(recon_state); + if (err) + goto fail; + pagelist = recon_state->pagelist; + } + + err = ceph_pagelist_reserve(pagelist, need); + if (err) + goto fail; + + ceph_pagelist_encode_8(pagelist, 1); + ceph_pagelist_encode_8(pagelist, 1); + ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); + } + + dout(" adding snap realm %llx seq %lld parent %llx\n", + realm->ino, realm->seq, realm->parent_ino); + sr_rec.ino = cpu_to_le64(realm->ino); + sr_rec.seq = cpu_to_le64(realm->seq); + sr_rec.parent = cpu_to_le64(realm->parent_ino); + + err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); + if (err) + goto fail; + + recon_state->nr_realms++; + } +fail: return err; } @@ -3132,18 +3570,17 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { struct ceph_msg *reply; - struct rb_node *p; int mds = session->s_mds; int err = -ENOMEM; - int s_nr_caps; - struct ceph_pagelist *pagelist; - struct ceph_reconnect_state recon_state; + struct ceph_reconnect_state recon_state = { + .session = session, + }; LIST_HEAD(dispose); pr_info("mds%d reconnect start\n", mds); - pagelist = ceph_pagelist_alloc(GFP_NOFS); - if (!pagelist) + recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); + if (!recon_state.pagelist) goto fail_nopagelist; reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); @@ -3187,63 +3624,90 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, /* replay unsafe requests */ replay_unsafe_requests(mdsc, session); + ceph_early_kick_flushing_caps(mdsc, session); + down_read(&mdsc->snap_rwsem); - /* traverse this session's caps */ - s_nr_caps = session->s_nr_caps; - err = ceph_pagelist_encode_32(pagelist, s_nr_caps); + /* placeholder for nr_caps */ + err = ceph_pagelist_encode_32(recon_state.pagelist, 0); if (err) goto fail; - recon_state.nr_caps = 0; - recon_state.pagelist = pagelist; - if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) + if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { recon_state.msg_version = 3; - else + recon_state.allow_multi = true; + } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { + recon_state.msg_version = 3; + } else { recon_state.msg_version = 2; + } + /* trsaverse this session's caps */ err = iterate_session_caps(session, encode_caps_cb, &recon_state); - if (err < 0) - goto fail; spin_lock(&session->s_cap_lock); session->s_cap_reconnect = 0; spin_unlock(&session->s_cap_lock); - /* - * snaprealms. we provide mds with the ino, seq (version), and - * parent for all of our realms. If the mds has any newer info, - * it will tell us. - */ - for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { - struct ceph_snap_realm *realm = - rb_entry(p, struct ceph_snap_realm, node); - struct ceph_mds_snaprealm_reconnect sr_rec; + if (err < 0) + goto fail; - dout(" adding snap realm %llx seq %lld parent %llx\n", - realm->ino, realm->seq, realm->parent_ino); - sr_rec.ino = cpu_to_le64(realm->ino); - sr_rec.seq = cpu_to_le64(realm->seq); - sr_rec.parent = cpu_to_le64(realm->parent_ino); - err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); - if (err) - goto fail; + /* check if all realms can be encoded into current message */ + if (mdsc->num_snap_realms) { + size_t total_len = + recon_state.pagelist->length + + mdsc->num_snap_realms * + sizeof(struct ceph_mds_snaprealm_reconnect); + if (recon_state.msg_version >= 4) { + /* number of realms */ + total_len += sizeof(u32); + /* version, compat_version and struct_len */ + total_len += mdsc->num_snap_realms * + (2 * sizeof(u8) + sizeof(u32)); + } + if (total_len > RECONNECT_MAX_SIZE) { + if (!recon_state.allow_multi) { + err = -ENOSPC; + goto fail; + } + if (recon_state.nr_caps) { + err = send_reconnect_partial(&recon_state); + if (err) + goto fail; + } + recon_state.msg_version = 5; + } } - reply->hdr.version = cpu_to_le16(recon_state.msg_version); + err = encode_snap_realms(mdsc, &recon_state); + if (err < 0) + goto fail; - /* raced with cap release? */ - if (s_nr_caps != recon_state.nr_caps) { - struct page *page = list_first_entry(&pagelist->head, - struct page, lru); + if (recon_state.msg_version >= 5) { + err = ceph_pagelist_encode_8(recon_state.pagelist, 0); + if (err < 0) + goto fail; + } + + if (recon_state.nr_caps || recon_state.nr_realms) { + struct page *page = + list_first_entry(&recon_state.pagelist->head, + struct page, lru); __le32 *addr = kmap_atomic(page); - *addr = cpu_to_le32(recon_state.nr_caps); + if (recon_state.nr_caps) { + WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); + *addr = cpu_to_le32(recon_state.nr_caps); + } else if (recon_state.msg_version >= 4) { + *(addr + 1) = cpu_to_le32(recon_state.nr_realms); + } kunmap_atomic(addr); } - reply->hdr.data_len = cpu_to_le32(pagelist->length); - ceph_msg_data_add_pagelist(reply, pagelist); + reply->hdr.version = cpu_to_le16(recon_state.msg_version); + if (recon_state.msg_version >= 4) + reply->hdr.compat_version = cpu_to_le16(4); - ceph_early_kick_flushing_caps(mdsc, session); + reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); + ceph_msg_data_add_pagelist(reply, recon_state.pagelist); ceph_con_send(&session->s_con, reply); @@ -3254,7 +3718,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, mutex_unlock(&mdsc->mutex); up_read(&mdsc->snap_rwsem); - ceph_pagelist_release(pagelist); + ceph_pagelist_release(recon_state.pagelist); return; fail: @@ -3262,7 +3726,7 @@ fail: up_read(&mdsc->snap_rwsem); mutex_unlock(&session->s_mutex); fail_nomsg: - ceph_pagelist_release(pagelist); + ceph_pagelist_release(recon_state.pagelist); fail_nopagelist: pr_err("error %d preparing reconnect for mds%d\n", err, mds); return; @@ -3580,7 +4044,6 @@ static void delayed_work(struct work_struct *work) int renew_caps; dout("mdsc delayed_work\n"); - ceph_check_delayed_caps(mdsc); mutex_lock(&mdsc->mutex); renew_interval = mdsc->mdsmap->m_session_timeout >> 2; @@ -3628,6 +4091,12 @@ static void delayed_work(struct work_struct *work) } mutex_unlock(&mdsc->mutex); + ceph_check_delayed_caps(mdsc); + + ceph_queue_cap_reclaim_work(mdsc); + + ceph_trim_snapid_map(mdsc); + schedule_delayed(mdsc); } @@ -3660,6 +4129,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) init_rwsem(&mdsc->snap_rwsem); mdsc->snap_realms = RB_ROOT; INIT_LIST_HEAD(&mdsc->snap_empty); + mdsc->num_snap_realms = 0; spin_lock_init(&mdsc->snap_empty_lock); mdsc->last_tid = 0; mdsc->oldest_tid = 0; @@ -3677,11 +4147,19 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) mdsc->num_cap_flushing = 0; spin_lock_init(&mdsc->cap_dirty_lock); init_waitqueue_head(&mdsc->cap_flushing_wq); - spin_lock_init(&mdsc->dentry_lru_lock); - INIT_LIST_HEAD(&mdsc->dentry_lru); + INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); + atomic_set(&mdsc->cap_reclaim_pending, 0); + + spin_lock_init(&mdsc->dentry_list_lock); + INIT_LIST_HEAD(&mdsc->dentry_leases); + INIT_LIST_HEAD(&mdsc->dentry_dir_leases); ceph_caps_init(mdsc); - ceph_adjust_min_caps(mdsc, fsc->min_caps); + ceph_adjust_caps_max_min(mdsc, fsc->mount_options); + + spin_lock_init(&mdsc->snapid_map_lock); + mdsc->snapid_map_tree = RB_ROOT; + INIT_LIST_HEAD(&mdsc->snapid_map_lru); init_rwsem(&mdsc->pool_perm_rwsem); mdsc->pool_perm_tree = RB_ROOT; @@ -3876,8 +4354,10 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) WARN_ON(!list_empty(&mdsc->cap_delay_list)); mutex_unlock(&mdsc->mutex); + ceph_cleanup_snapid_map(mdsc); ceph_cleanup_empty_realms(mdsc); + cancel_work_sync(&mdsc->cap_reclaim_work); cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ dout("stopped\n"); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 729da155ebf0..50385a481fdb 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -21,11 +21,14 @@ #define CEPHFS_FEATURE_REPLY_ENCODING 9 #define CEPHFS_FEATURE_RECLAIM_CLIENT 10 #define CEPHFS_FEATURE_LAZY_CAP_WANTED 11 +#define CEPHFS_FEATURE_MULTI_RECONNECT 12 #define CEPHFS_FEATURES_CLIENT_SUPPORTED { \ 0, 1, 2, 3, 4, 5, 6, 7, \ CEPHFS_FEATURE_MIMIC, \ + CEPHFS_FEATURE_REPLY_ENCODING, \ CEPHFS_FEATURE_LAZY_CAP_WANTED, \ + CEPHFS_FEATURE_MULTI_RECONNECT, \ } #define CEPHFS_FEATURES_CLIENT_REQUIRED {} @@ -65,6 +68,7 @@ struct ceph_mds_reply_info_in { char *pool_ns_data; u64 max_bytes; u64 max_files; + s32 dir_pin; }; struct ceph_mds_reply_dir_entry { @@ -152,6 +156,7 @@ struct ceph_mds_session { int s_mds; int s_state; unsigned long s_ttl; /* time until mds kills us */ + unsigned long s_features; u64 s_seq; /* incoming msg seq # */ struct mutex s_mutex; /* serialize session messages */ @@ -167,19 +172,20 @@ struct ceph_mds_session { /* protected by s_cap_lock */ spinlock_t s_cap_lock; struct list_head s_caps; /* all caps issued by this session */ + struct ceph_cap *s_cap_iterator; int s_nr_caps, s_trim_caps; int s_num_cap_releases; int s_cap_reconnect; int s_readonly; struct list_head s_cap_releases; /* waiting cap_release messages */ - struct ceph_cap *s_cap_iterator; + struct work_struct s_cap_release_work; /* protected by mutex */ struct list_head s_cap_flushing; /* inodes w/ flushing caps */ unsigned long s_renew_requested; /* last time we sent a renew req */ u64 s_renew_seq; - refcount_t s_ref; + refcount_t s_ref; struct list_head s_waiting; /* waiting requests */ struct list_head s_unsafe; /* unsafe requests */ }; @@ -310,6 +316,15 @@ struct ceph_pool_perm { char pool_ns[]; }; +struct ceph_snapid_map { + struct rb_node node; + struct list_head lru; + atomic_t ref; + u64 snap; + dev_t dev; + unsigned long last_used; +}; + /* * mds client state */ @@ -341,6 +356,7 @@ struct ceph_mds_client { struct rw_semaphore snap_rwsem; struct rb_root snap_realms; struct list_head snap_empty; + int num_snap_realms; spinlock_t snap_empty_lock; /* protect snap_empty */ u64 last_tid; /* most recent mds request */ @@ -362,6 +378,9 @@ struct ceph_mds_client { spinlock_t cap_dirty_lock; /* protects above items */ wait_queue_head_t cap_flushing_wq; + struct work_struct cap_reclaim_work; + atomic_t cap_reclaim_pending; + /* * Cap reservations * @@ -378,13 +397,18 @@ struct ceph_mds_client { unreserved) */ int caps_total_count; /* total caps allocated */ int caps_use_count; /* in use */ + int caps_use_max; /* max used caps */ int caps_reserve_count; /* unused, reserved */ int caps_avail_count; /* unused, unreserved */ int caps_min_count; /* keep at least this many (unreserved) */ - spinlock_t dentry_lru_lock; - struct list_head dentry_lru; - int num_dentry; + spinlock_t dentry_list_lock; + struct list_head dentry_leases; /* fifo list */ + struct list_head dentry_dir_leases; /* lru list */ + + spinlock_t snapid_map_lock; + struct rb_root snapid_map_tree; + struct list_head snapid_map_lru; struct rw_semaphore pool_perm_rwsem; struct rb_root pool_perm_tree; @@ -438,9 +462,12 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req) kref_put(&req->r_kref, ceph_mdsc_release_request); } -extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session); - +extern void __ceph_queue_cap_release(struct ceph_mds_session *session, + struct ceph_cap *cap); +extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session); +extern void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc); +extern void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr); extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc); extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index f74193da0e09..b26e12cd8ec3 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -3,12 +3,13 @@ #include <linux/sort.h> #include <linux/slab.h> - #include "super.h" #include "mds_client.h" - #include <linux/ceph/decode.h> +/* unused map expires after 5 minutes */ +#define CEPH_SNAPID_MAP_TIMEOUT (5 * 60 * HZ) + /* * Snapshots in ceph are driven in large part by cooperation from the * client. In contrast to local file systems or file servers that @@ -124,6 +125,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm( INIT_LIST_HEAD(&realm->inodes_with_caps); spin_lock_init(&realm->inodes_with_caps_lock); __insert_snap_realm(&mdsc->snap_realms, realm); + mdsc->num_snap_realms++; + dout("create_snap_realm %llx %p\n", realm->ino, realm); return realm; } @@ -175,6 +178,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc, dout("__destroy_snap_realm %p %llx\n", realm, realm->ino); rb_erase(&realm->node, &mdsc->snap_realms); + mdsc->num_snap_realms--; if (realm->parent) { list_del_init(&realm->child_item); @@ -568,7 +572,12 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) old_snapc = NULL; update_snapc: - if (ci->i_head_snapc) { + if (ci->i_wrbuffer_ref_head == 0 && + ci->i_wr_ref == 0 && + ci->i_dirty_caps == 0 && + ci->i_flushing_caps == 0) { + ci->i_head_snapc = NULL; + } else { ci->i_head_snapc = ceph_get_snap_context(new_snapc); dout(" new snapc is %p\n", new_snapc); } @@ -986,3 +995,154 @@ out: up_write(&mdsc->snap_rwsem); return; } + +struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc, + u64 snap) +{ + struct ceph_snapid_map *sm, *exist; + struct rb_node **p, *parent; + int ret; + + exist = NULL; + spin_lock(&mdsc->snapid_map_lock); + p = &mdsc->snapid_map_tree.rb_node; + while (*p) { + exist = rb_entry(*p, struct ceph_snapid_map, node); + if (snap > exist->snap) { + p = &(*p)->rb_left; + } else if (snap < exist->snap) { + p = &(*p)->rb_right; + } else { + if (atomic_inc_return(&exist->ref) == 1) + list_del_init(&exist->lru); + break; + } + exist = NULL; + } + spin_unlock(&mdsc->snapid_map_lock); + if (exist) { + dout("found snapid map %llx -> %x\n", exist->snap, exist->dev); + return exist; + } + + sm = kmalloc(sizeof(*sm), GFP_NOFS); + if (!sm) + return NULL; + + ret = get_anon_bdev(&sm->dev); + if (ret < 0) { + kfree(sm); + return NULL; + } + + INIT_LIST_HEAD(&sm->lru); + atomic_set(&sm->ref, 1); + sm->snap = snap; + + exist = NULL; + parent = NULL; + p = &mdsc->snapid_map_tree.rb_node; + spin_lock(&mdsc->snapid_map_lock); + while (*p) { + parent = *p; + exist = rb_entry(*p, struct ceph_snapid_map, node); + if (snap > exist->snap) + p = &(*p)->rb_left; + else if (snap < exist->snap) + p = &(*p)->rb_right; + else + break; + exist = NULL; + } + if (exist) { + if (atomic_inc_return(&exist->ref) == 1) + list_del_init(&exist->lru); + } else { + rb_link_node(&sm->node, parent, p); + rb_insert_color(&sm->node, &mdsc->snapid_map_tree); + } + spin_unlock(&mdsc->snapid_map_lock); + if (exist) { + free_anon_bdev(sm->dev); + kfree(sm); + dout("found snapid map %llx -> %x\n", exist->snap, exist->dev); + return exist; + } + + dout("create snapid map %llx -> %x\n", sm->snap, sm->dev); + return sm; +} + +void ceph_put_snapid_map(struct ceph_mds_client* mdsc, + struct ceph_snapid_map *sm) +{ + if (!sm) + return; + if (atomic_dec_and_lock(&sm->ref, &mdsc->snapid_map_lock)) { + if (!RB_EMPTY_NODE(&sm->node)) { + sm->last_used = jiffies; + list_add_tail(&sm->lru, &mdsc->snapid_map_lru); + spin_unlock(&mdsc->snapid_map_lock); + } else { + /* already cleaned up by + * ceph_cleanup_snapid_map() */ + spin_unlock(&mdsc->snapid_map_lock); + kfree(sm); + } + } +} + +void ceph_trim_snapid_map(struct ceph_mds_client *mdsc) +{ + struct ceph_snapid_map *sm; + unsigned long now; + LIST_HEAD(to_free); + + spin_lock(&mdsc->snapid_map_lock); + now = jiffies; + + while (!list_empty(&mdsc->snapid_map_lru)) { + sm = list_first_entry(&mdsc->snapid_map_lru, + struct ceph_snapid_map, lru); + if (time_after(sm->last_used + CEPH_SNAPID_MAP_TIMEOUT, now)) + break; + + rb_erase(&sm->node, &mdsc->snapid_map_tree); + list_move(&sm->lru, &to_free); + } + spin_unlock(&mdsc->snapid_map_lock); + + while (!list_empty(&to_free)) { + sm = list_first_entry(&to_free, struct ceph_snapid_map, lru); + list_del(&sm->lru); + dout("trim snapid map %llx -> %x\n", sm->snap, sm->dev); + free_anon_bdev(sm->dev); + kfree(sm); + } +} + +void ceph_cleanup_snapid_map(struct ceph_mds_client *mdsc) +{ + struct ceph_snapid_map *sm; + struct rb_node *p; + LIST_HEAD(to_free); + + spin_lock(&mdsc->snapid_map_lock); + while ((p = rb_first(&mdsc->snapid_map_tree))) { + sm = rb_entry(p, struct ceph_snapid_map, node); + rb_erase(p, &mdsc->snapid_map_tree); + RB_CLEAR_NODE(p); + list_move(&sm->lru, &to_free); + } + spin_unlock(&mdsc->snapid_map_lock); + + while (!list_empty(&to_free)) { + sm = list_first_entry(&to_free, struct ceph_snapid_map, lru); + list_del(&sm->lru); + free_anon_bdev(sm->dev); + if (WARN_ON_ONCE(atomic_read(&sm->ref))) { + pr_err("snapid map %llx -> %x still in use\n", + sm->snap, sm->dev); + } + } +} diff --git a/fs/ceph/super.c b/fs/ceph/super.c index da2cd8e89062..6d5bb2f74612 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -133,6 +133,7 @@ enum { Opt_rasize, Opt_caps_wanted_delay_min, Opt_caps_wanted_delay_max, + Opt_caps_max, Opt_readdir_max_entries, Opt_readdir_max_bytes, Opt_congestion_kb, @@ -175,6 +176,7 @@ static match_table_t fsopt_tokens = { {Opt_rasize, "rasize=%d"}, {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"}, {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"}, + {Opt_caps_max, "caps_max=%d"}, {Opt_readdir_max_entries, "readdir_max_entries=%d"}, {Opt_readdir_max_bytes, "readdir_max_bytes=%d"}, {Opt_congestion_kb, "write_congestion_kb=%d"}, @@ -286,6 +288,11 @@ static int parse_fsopt_token(char *c, void *private) return -EINVAL; fsopt->caps_wanted_delay_max = intval; break; + case Opt_caps_max: + if (intval < 0) + return -EINVAL; + fsopt->caps_max = intval; + break; case Opt_readdir_max_entries: if (intval < 1) return -EINVAL; @@ -576,6 +583,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) seq_printf(m, ",rasize=%d", fsopt->rasize); if (fsopt->congestion_kb != default_congestion_kb()) seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb); + if (fsopt->caps_max) + seq_printf(m, ",caps_max=%d", fsopt->caps_max); if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT) seq_printf(m, ",caps_wanted_delay_min=%d", fsopt->caps_wanted_delay_min); @@ -671,6 +680,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1); if (!fsc->trunc_wq) goto fail_pg_inv_wq; + fsc->cap_wq = alloc_workqueue("ceph-cap", 0, 1); + if (!fsc->cap_wq) + goto fail_trunc_wq; /* set up mempools */ err = -ENOMEM; @@ -678,13 +690,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, size = sizeof (struct page *) * (page_count ? page_count : 1); fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size); if (!fsc->wb_pagevec_pool) - goto fail_trunc_wq; - - /* caps */ - fsc->min_caps = fsopt->max_readdir; + goto fail_cap_wq; return fsc; +fail_cap_wq: + destroy_workqueue(fsc->cap_wq); fail_trunc_wq: destroy_workqueue(fsc->trunc_wq); fail_pg_inv_wq: @@ -706,6 +717,7 @@ static void flush_fs_workqueues(struct ceph_fs_client *fsc) flush_workqueue(fsc->wb_wq); flush_workqueue(fsc->pg_inv_wq); flush_workqueue(fsc->trunc_wq); + flush_workqueue(fsc->cap_wq); } static void destroy_fs_client(struct ceph_fs_client *fsc) @@ -715,6 +727,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc) destroy_workqueue(fsc->wb_wq); destroy_workqueue(fsc->pg_inv_wq); destroy_workqueue(fsc->trunc_wq); + destroy_workqueue(fsc->cap_wq); mempool_destroy(fsc->wb_pagevec_pool); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index dfb64a5211b6..16c03188578e 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -79,6 +79,7 @@ struct ceph_mount_options { int rasize; /* max readahead */ int congestion_kb; /* max writeback in flight */ int caps_wanted_delay_min, caps_wanted_delay_max; + int caps_max; int max_readdir; /* max readdir result (entires) */ int max_readdir_bytes; /* max readdir result (bytes) */ @@ -100,17 +101,18 @@ struct ceph_fs_client { struct ceph_client *client; unsigned long mount_state; - int min_caps; /* min caps i added */ loff_t max_file_size; struct ceph_mds_client *mdsc; /* writeback */ mempool_t *wb_pagevec_pool; + atomic_long_t writeback_count; + struct workqueue_struct *wb_wq; struct workqueue_struct *pg_inv_wq; struct workqueue_struct *trunc_wq; - atomic_long_t writeback_count; + struct workqueue_struct *cap_wq; #ifdef CONFIG_DEBUG_FS struct dentry *debugfs_dentry_lru, *debugfs_caps; @@ -260,17 +262,22 @@ struct ceph_inode_xattr { * Ceph dentry state */ struct ceph_dentry_info { + struct dentry *dentry; struct ceph_mds_session *lease_session; + struct list_head lease_list; + unsigned flags; int lease_shared_gen; u32 lease_gen; u32 lease_seq; unsigned long lease_renew_after, lease_renew_from; - struct list_head lru; - struct dentry *dentry; unsigned long time; u64 offset; }; +#define CEPH_DENTRY_REFERENCED 1 +#define CEPH_DENTRY_LEASE_LIST 2 +#define CEPH_DENTRY_SHRINK_LIST 4 + struct ceph_inode_xattrs_info { /* * (still encoded) xattr blob. we avoid the overhead of parsing @@ -318,6 +325,8 @@ struct ceph_inode_info { /* quotas */ u64 i_max_bytes, i_max_files; + s32 i_dir_pin; + struct rb_root i_fragtree; int i_fragtree_nsplits; struct mutex i_fragtree_mutex; @@ -370,7 +379,10 @@ struct ceph_inode_info { struct list_head i_unsafe_iops; /* uncommitted mds inode ops */ spinlock_t i_unsafe_lock; - struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */ + union { + struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */ + struct ceph_snapid_map *i_snapid_map; /* snapid -> dev_t */ + }; int i_snap_realm_counter; /* snap realm (if caps) */ struct list_head i_snap_realm_item; struct list_head i_snap_flush_item; @@ -587,7 +599,7 @@ extern u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v, struct ceph_inode_frag *pfrag, int *found); -static inline struct ceph_dentry_info *ceph_dentry(struct dentry *dentry) +static inline struct ceph_dentry_info *ceph_dentry(const struct dentry *dentry) { return (struct ceph_dentry_info *)dentry->d_fsdata; } @@ -656,7 +668,8 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check); extern void ceph_caps_init(struct ceph_mds_client *mdsc); extern void ceph_caps_finalize(struct ceph_mds_client *mdsc); -extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta); +extern void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc, + struct ceph_mount_options *fsopt); extern int ceph_reserve_caps(struct ceph_mds_client *mdsc, struct ceph_cap_reservation *ctx, int need); extern void ceph_unreserve_caps(struct ceph_mds_client *mdsc, @@ -837,6 +850,14 @@ extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, struct ceph_cap_snap *capsnap); extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); +extern struct ceph_snapid_map *ceph_get_snapid_map(struct ceph_mds_client *mdsc, + u64 snap); +extern void ceph_put_snapid_map(struct ceph_mds_client* mdsc, + struct ceph_snapid_map *sm); +extern void ceph_trim_snapid_map(struct ceph_mds_client *mdsc); +extern void ceph_cleanup_snapid_map(struct ceph_mds_client *mdsc); + + /* * a cap_snap is "pending" if it is still awaiting an in-progress * sync write (that may/may not still update size, mtime, etc.). @@ -975,11 +996,11 @@ extern void ceph_add_cap(struct inode *inode, unsigned cap, unsigned seq, u64 realmino, int flags, struct ceph_cap **new_cap); extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); +extern void __ceph_remove_caps(struct inode* inode); extern void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap); extern int ceph_is_any_caps(struct inode *inode); -extern void ceph_queue_caps_release(struct inode *inode); extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc); extern int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync); @@ -1049,10 +1070,10 @@ extern int ceph_handle_snapdir(struct ceph_mds_request *req, extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, struct dentry *dentry, int err); -extern void ceph_dentry_lru_add(struct dentry *dn); -extern void ceph_dentry_lru_touch(struct dentry *dn); -extern void ceph_dentry_lru_del(struct dentry *dn); +extern void __ceph_dentry_lease_touch(struct ceph_dentry_info *di); +extern void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di); extern void ceph_invalidate_dentry_lease(struct dentry *dentry); +extern int ceph_trim_dentries(struct ceph_mds_client *mdsc); extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn); extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 316f6ad10644..0cc42c8879e9 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -228,8 +228,19 @@ static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val, ci->i_rctime.tv_nsec); } -/* quotas */ +/* dir pin */ +static bool ceph_vxattrcb_dir_pin_exists(struct ceph_inode_info *ci) +{ + return ci->i_dir_pin != -ENODATA; +} + +static size_t ceph_vxattrcb_dir_pin(struct ceph_inode_info *ci, char *val, + size_t size) +{ + return snprintf(val, size, "%d", (int)ci->i_dir_pin); +} +/* quotas */ static bool ceph_vxattrcb_quota_exists(struct ceph_inode_info *ci) { bool ret = false; @@ -315,6 +326,13 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = { XATTR_RSTAT_FIELD(dir, rbytes), XATTR_RSTAT_FIELD(dir, rctime), { + .name = "ceph.dir.pin", + .name_size = sizeof("ceph.dir_pin"), + .getxattr_cb = ceph_vxattrcb_dir_pin, + .exists_cb = ceph_vxattrcb_dir_pin_exists, + .flags = VXATTR_FLAG_HIDDEN, + }, + { .name = "ceph.quota", .name_size = sizeof("ceph.quota"), .getxattr_cb = ceph_vxattrcb_quota, |