summaryrefslogtreecommitdiff
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/Kconfig2
-rw-r--r--fs/ceph/Makefile2
-rw-r--r--fs/ceph/acl.c2
-rw-r--r--fs/ceph/addr.c133
-rw-r--r--fs/ceph/cache.c2
-rw-r--r--fs/ceph/caps.c954
-rw-r--r--fs/ceph/debugfs.c112
-rw-r--r--fs/ceph/dir.c193
-rw-r--r--fs/ceph/export.c14
-rw-r--r--fs/ceph/file.c538
-rw-r--r--fs/ceph/inode.c107
-rw-r--r--fs/ceph/ioctl.c2
-rw-r--r--fs/ceph/locks.c31
-rw-r--r--fs/ceph/mds_client.c472
-rw-r--r--fs/ceph/mds_client.h54
-rw-r--r--fs/ceph/mdsmap.c10
-rw-r--r--fs/ceph/metric.c297
-rw-r--r--fs/ceph/metric.h153
-rw-r--r--fs/ceph/quota.c70
-rw-r--r--fs/ceph/snap.c1
-rw-r--r--fs/ceph/super.c92
-rw-r--r--fs/ceph/super.h181
-rw-r--r--fs/ceph/xattr.c16
23 files changed, 2614 insertions, 824 deletions
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index cf235f6eacf9..471e40156065 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -13,7 +13,7 @@ config CEPH_FS
scalable file system designed to provide high performance,
reliable access to petabytes of storage.
- More information at http://ceph.newdream.net/.
+ More information at https://ceph.io/.
If unsure, say N.
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index 0a0823d378db..50c635dc7f71 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -8,7 +8,7 @@ obj-$(CONFIG_CEPH_FS) += ceph.o
ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
export.o caps.o snap.o xattr.o quota.o io.o \
mds_client.o mdsmap.o strings.o ceph_frag.o \
- debugfs.o util.o
+ debugfs.o util.o metric.o
ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 26be6520d3fb..e0465741c591 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -22,7 +22,7 @@ static inline void ceph_set_cached_acl(struct inode *inode,
struct ceph_inode_info *ci = ceph_inode(inode);
spin_lock(&ci->i_ceph_lock);
- if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
+ if (__ceph_caps_issued_mask_metric(ci, CEPH_CAP_XATTR_SHARED, 0))
set_cached_acl(inode, type, acl);
else
forget_cached_acl(inode, type);
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 7ab616601141..6ea761c84494 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -11,10 +11,12 @@
#include <linux/task_io_accounting_ops.h>
#include <linux/signal.h>
#include <linux/iversion.h>
+#include <linux/ktime.h>
#include "super.h"
#include "mds_client.h"
#include "cache.h"
+#include "metric.h"
#include <linux/ceph/osd_client.h>
#include <linux/ceph/striper.h>
@@ -159,8 +161,6 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
if (!PagePrivate(page))
return;
- ClearPageChecked(page);
-
dout("%p invalidatepage %p idx %lu full dirty page\n",
inode, page, page->index);
@@ -183,6 +183,50 @@ static int ceph_releasepage(struct page *page, gfp_t g)
}
/*
+ * Read some contiguous pages. If we cross a stripe boundary, shorten
+ * *plen. Return number of bytes read, or error.
+ */
+static int ceph_sync_readpages(struct ceph_fs_client *fsc,
+ struct ceph_vino vino,
+ struct ceph_file_layout *layout,
+ u64 off, u64 *plen,
+ u32 truncate_seq, u64 truncate_size,
+ struct page **pages, int num_pages,
+ int page_align)
+{
+ struct ceph_osd_client *osdc = &fsc->client->osdc;
+ struct ceph_osd_request *req;
+ int rc = 0;
+
+ dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
+ vino.snap, off, *plen);
+ req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
+ CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+ NULL, truncate_seq, truncate_size,
+ false);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+
+ /* it may be a short read due to an object boundary */
+ osd_req_op_extent_osd_data_pages(req, 0,
+ pages, *plen, page_align, false, false);
+
+ dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
+ off, *plen, *plen, page_align);
+
+ rc = ceph_osdc_start_request(osdc, req, false);
+ if (!rc)
+ rc = ceph_osdc_wait_request(osdc, req);
+
+ ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, rc);
+
+ ceph_osdc_put_request(req);
+ dout("readpages result %d\n", rc);
+ return rc;
+}
+
+/*
* read a single page, without unlocking it.
*/
static int ceph_do_readpage(struct file *filp, struct page *page)
@@ -218,7 +262,7 @@ static int ceph_do_readpage(struct file *filp, struct page *page)
dout("readpage inode %p file %p page %p index %lu\n",
inode, filp, page, page->index);
- err = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
+ err = ceph_sync_readpages(fsc, ceph_vino(inode),
&ci->i_layout, off, &len,
ci->i_truncate_seq, ci->i_truncate_size,
&page, 1, 0);
@@ -260,6 +304,7 @@ static int ceph_readpage(struct file *filp, struct page *page)
static void finish_read(struct ceph_osd_request *req)
{
struct inode *inode = req->r_inode;
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_osd_data *osd_data;
int rc = req->r_result <= 0 ? req->r_result : 0;
int bytes = req->r_result >= 0 ? req->r_result : 0;
@@ -297,6 +342,10 @@ unlock:
put_page(page);
bytes -= PAGE_SIZE;
}
+
+ ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, rc);
+
kfree(osd_data->pages);
}
@@ -571,6 +620,50 @@ static u64 get_writepages_data_length(struct inode *inode,
}
/*
+ * do a synchronous write on N pages
+ */
+static int ceph_sync_writepages(struct ceph_fs_client *fsc,
+ struct ceph_vino vino,
+ struct ceph_file_layout *layout,
+ struct ceph_snap_context *snapc,
+ u64 off, u64 len,
+ u32 truncate_seq, u64 truncate_size,
+ struct timespec64 *mtime,
+ struct page **pages, int num_pages)
+{
+ struct ceph_osd_client *osdc = &fsc->client->osdc;
+ struct ceph_osd_request *req;
+ int rc = 0;
+ int page_align = off & ~PAGE_MASK;
+
+ req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
+ CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
+ snapc, truncate_seq, truncate_size,
+ true);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+
+ /* it may be a short write due to an object boundary */
+ osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
+ false, false);
+ dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
+
+ req->r_mtime = *mtime;
+ rc = ceph_osdc_start_request(osdc, req, true);
+ if (!rc)
+ rc = ceph_osdc_wait_request(osdc, req);
+
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, rc);
+
+ ceph_osdc_put_request(req);
+ if (rc == 0)
+ rc = len;
+ dout("writepages result %d\n", rc);
+ return rc;
+}
+
+/*
* Write a single page, but leave the page locked.
*
* If we get a write error, mark the mapping for error, but still adjust the
@@ -628,7 +721,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
set_page_writeback(page);
- err = ceph_osdc_writepages(&fsc->client->osdc, ceph_vino(inode),
+ err = ceph_sync_writepages(fsc, ceph_vino(inode),
&ci->i_layout, snapc, page_off, len,
ceph_wbc.truncate_seq,
ceph_wbc.truncate_size,
@@ -714,6 +807,9 @@ static void writepages_finish(struct ceph_osd_request *req)
ceph_clear_error_write(ci);
}
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, rc);
+
/*
* We lost the cache cap, need to truncate the page before
* it is unlocked, otherwise we'd truncate it later in the
@@ -766,8 +862,7 @@ static void writepages_finish(struct ceph_osd_request *req)
osd_data = osd_req_op_extent_osd_data(req, 0);
if (osd_data->pages_from_pool)
- mempool_free(osd_data->pages,
- ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
+ mempool_free(osd_data->pages, ceph_wb_pagevec_pool);
else
kfree(osd_data->pages);
ceph_osdc_put_request(req);
@@ -859,10 +954,10 @@ retry:
int num_ops = 0, op_idx;
unsigned i, pvec_pages, max_pages, locked_pages = 0;
struct page **pages = NULL, **data_pages;
- mempool_t *pool = NULL; /* Becomes non-null if mempool used */
struct page *page;
pgoff_t strip_unit_end = 0;
u64 offset = 0, len = 0;
+ bool from_pool = false;
max_pages = wsize >> PAGE_SHIFT;
@@ -961,16 +1056,16 @@ get_more_pages:
sizeof(*pages),
GFP_NOFS);
if (!pages) {
- pool = fsc->wb_pagevec_pool;
- pages = mempool_alloc(pool, GFP_NOFS);
+ from_pool = true;
+ pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
BUG_ON(!pages);
}
len = 0;
} else if (page->index !=
(offset + len) >> PAGE_SHIFT) {
- if (num_ops >= (pool ? CEPH_OSD_SLAB_OPS :
- CEPH_OSD_MAX_OPS)) {
+ if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS :
+ CEPH_OSD_MAX_OPS)) {
redirty_page_for_writepage(wbc, page);
unlock_page(page);
break;
@@ -1065,7 +1160,7 @@ new_request:
offset, len);
osd_req_op_extent_osd_data_pages(req, op_idx,
data_pages, len, 0,
- !!pool, false);
+ from_pool, false);
osd_req_op_extent_update(req, op_idx, len);
len = 0;
@@ -1092,12 +1187,12 @@ new_request:
dout("writepages got pages at %llu~%llu\n", offset, len);
osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
- 0, !!pool, false);
+ 0, from_pool, false);
osd_req_op_extent_update(req, op_idx, len);
BUG_ON(op_idx + 1 != req->r_num_ops);
- pool = NULL;
+ from_pool = false;
if (i < locked_pages) {
BUG_ON(num_ops <= req->r_num_ops);
num_ops -= req->r_num_ops;
@@ -1108,8 +1203,8 @@ new_request:
pages = kmalloc_array(locked_pages, sizeof(*pages),
GFP_NOFS);
if (!pages) {
- pool = fsc->wb_pagevec_pool;
- pages = mempool_alloc(pool, GFP_NOFS);
+ from_pool = true;
+ pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
BUG_ON(!pages);
}
memcpy(pages, data_pages + i,
@@ -1575,7 +1670,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
do {
lock_page(page);
- if ((off > size) || (page->mapping != inode->i_mapping)) {
+ if (page_mkwrite_check_truncate(page, inode) < 0) {
unlock_page(page);
ret = VM_FAULT_NOPAGE;
break;
@@ -1772,6 +1867,10 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err)
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
+
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, err);
+
out_put:
ceph_osdc_put_request(req);
if (err == -ECANCELED)
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 270b769607a2..2f5cb6bc78e1 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -32,7 +32,7 @@ struct ceph_fscache_entry {
size_t uniq_len;
/* The following members must be last */
struct ceph_fsid fsid;
- char uniquifier[0];
+ char uniquifier[];
};
static const struct fscache_cookie_def ceph_fscache_fsid_object_def = {
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 28ae0c134700..034b3f4fdd3a 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -490,13 +490,10 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
struct ceph_inode_info *ci)
{
struct ceph_mount_options *opt = mdsc->fsc->mount_options;
-
- ci->i_hold_caps_min = round_jiffies(jiffies +
- opt->caps_wanted_delay_min * HZ);
ci->i_hold_caps_max = round_jiffies(jiffies +
opt->caps_wanted_delay_max * HZ);
- dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
- ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
+ dout("__cap_set_timeouts %p %lu\n", &ci->vfs_inode,
+ ci->i_hold_caps_max - jiffies);
}
/*
@@ -508,10 +505,9 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
* -> we take mdsc->cap_delay_lock
*/
static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
- struct ceph_inode_info *ci,
- bool set_timeout)
+ struct ceph_inode_info *ci)
{
- dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode,
+ dout("__cap_delay_requeue %p flags 0x%lx at %lu\n", &ci->vfs_inode,
ci->i_ceph_flags, ci->i_hold_caps_max);
if (!mdsc->stopping) {
spin_lock(&mdsc->cap_delay_lock);
@@ -520,8 +516,7 @@ static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
goto no_change;
list_del_init(&ci->i_cap_delay_list);
}
- if (set_timeout)
- __cap_set_timeouts(mdsc, ci);
+ __cap_set_timeouts(mdsc, ci);
list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
no_change:
spin_unlock(&mdsc->cap_delay_lock);
@@ -561,19 +556,20 @@ static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
spin_unlock(&mdsc->cap_delay_lock);
}
-/*
- * Common issue checks for add_cap, handle_cap_grant.
- */
+/* Common issue checks for add_cap, handle_cap_grant. */
static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
unsigned issued)
{
unsigned had = __ceph_caps_issued(ci, NULL);
+ lockdep_assert_held(&ci->i_ceph_lock);
+
/*
* Each time we receive FILE_CACHE anew, we increment
* i_rdcache_gen.
*/
- if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+ if (S_ISREG(ci->vfs_inode.i_mode) &&
+ (issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
(had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
ci->i_rdcache_gen++;
}
@@ -592,6 +588,34 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
__ceph_dir_clear_complete(ci);
}
}
+
+ /* Wipe saved layout if we're losing DIR_CREATE caps */
+ if (S_ISDIR(ci->vfs_inode.i_mode) && (had & CEPH_CAP_DIR_CREATE) &&
+ !(issued & CEPH_CAP_DIR_CREATE)) {
+ ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
+ memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
+ }
+}
+
+/**
+ * change_auth_cap_ses - move inode to appropriate lists when auth caps change
+ * @ci: inode to be moved
+ * @session: new auth caps session
+ */
+static void change_auth_cap_ses(struct ceph_inode_info *ci,
+ struct ceph_mds_session *session)
+{
+ lockdep_assert_held(&ci->i_ceph_lock);
+
+ if (list_empty(&ci->i_dirty_item) && list_empty(&ci->i_flushing_item))
+ return;
+
+ spin_lock(&session->s_mdsc->cap_dirty_lock);
+ if (!list_empty(&ci->i_dirty_item))
+ list_move(&ci->i_dirty_item, &session->s_cap_dirty);
+ if (!list_empty(&ci->i_flushing_item))
+ list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
+ spin_unlock(&session->s_mdsc->cap_dirty_lock);
}
/*
@@ -605,7 +629,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
*/
void ceph_add_cap(struct inode *inode,
struct ceph_mds_session *session, u64 cap_id,
- int fmode, unsigned issued, unsigned wanted,
+ unsigned issued, unsigned wanted,
unsigned seq, unsigned mseq, u64 realmino, int flags,
struct ceph_cap **new_cap)
{
@@ -621,13 +645,6 @@ void ceph_add_cap(struct inode *inode,
dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
session->s_mds, cap_id, ceph_cap_string(issued), seq);
- /*
- * If we are opening the file, include file mode wanted bits
- * in wanted.
- */
- if (fmode >= 0)
- wanted |= ceph_caps_for_mode(fmode);
-
spin_lock(&session->s_gen_ttl_lock);
gen = session->s_cap_gen;
spin_unlock(&session->s_gen_ttl_lock);
@@ -651,6 +668,7 @@ void ceph_add_cap(struct inode *inode,
spin_lock(&session->s_cap_lock);
list_add_tail(&cap->session_caps, &session->s_caps);
session->s_nr_caps++;
+ atomic64_inc(&mdsc->metric.total_caps);
spin_unlock(&session->s_cap_lock);
} else {
spin_lock(&session->s_cap_lock);
@@ -725,12 +743,15 @@ void ceph_add_cap(struct inode *inode,
dout(" issued %s, mds wanted %s, actual %s, queueing\n",
ceph_cap_string(issued), ceph_cap_string(wanted),
ceph_cap_string(actual_wanted));
- __cap_delay_requeue(mdsc, ci, true);
+ __cap_delay_requeue(mdsc, ci);
}
if (flags & CEPH_CAP_FLAG_AUTH) {
if (!ci->i_auth_cap ||
ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
+ if (ci->i_auth_cap &&
+ ci->i_auth_cap->session != cap->session)
+ change_auth_cap_ses(ci, cap->session);
ci->i_auth_cap = cap;
cap->mds_wanted = wanted;
}
@@ -752,9 +773,6 @@ void ceph_add_cap(struct inode *inode,
cap->issue_seq = seq;
cap->mseq = mseq;
cap->cap_gen = gen;
-
- if (fmode >= 0)
- __ceph_get_fmode(ci, fmode);
}
/*
@@ -869,8 +887,8 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
int have = ci->i_snap_caps;
if ((have & mask) == mask) {
- dout("__ceph_caps_issued_mask ino 0x%lx snap issued %s"
- " (mask %s)\n", ci->vfs_inode.i_ino,
+ dout("__ceph_caps_issued_mask ino 0x%llx snap issued %s"
+ " (mask %s)\n", ceph_ino(&ci->vfs_inode),
ceph_cap_string(have),
ceph_cap_string(mask));
return 1;
@@ -881,8 +899,8 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
if (!__cap_is_valid(cap))
continue;
if ((cap->issued & mask) == mask) {
- dout("__ceph_caps_issued_mask ino 0x%lx cap %p issued %s"
- " (mask %s)\n", ci->vfs_inode.i_ino, cap,
+ dout("__ceph_caps_issued_mask ino 0x%llx cap %p issued %s"
+ " (mask %s)\n", ceph_ino(&ci->vfs_inode), cap,
ceph_cap_string(cap->issued),
ceph_cap_string(mask));
if (touch)
@@ -893,8 +911,8 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
/* does a combination of caps satisfy mask? */
have |= cap->issued;
if ((have & mask) == mask) {
- dout("__ceph_caps_issued_mask ino 0x%lx combo issued %s"
- " (mask %s)\n", ci->vfs_inode.i_ino,
+ dout("__ceph_caps_issued_mask ino 0x%llx combo issued %s"
+ " (mask %s)\n", ceph_ino(&ci->vfs_inode),
ceph_cap_string(cap->issued),
ceph_cap_string(mask));
if (touch) {
@@ -919,6 +937,20 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
return 0;
}
+int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask,
+ int touch)
+{
+ struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
+ int r;
+
+ r = __ceph_caps_issued_mask(ci, mask, touch);
+ if (r)
+ ceph_update_cap_hit(&fsc->mdsc->metric);
+ else
+ ceph_update_cap_mis(&fsc->mdsc->metric);
+ return r;
+}
+
/*
* Return true if mask caps are currently being revoked by an MDS.
*/
@@ -958,29 +990,97 @@ int __ceph_caps_used(struct ceph_inode_info *ci)
if (ci->i_rd_ref)
used |= CEPH_CAP_FILE_RD;
if (ci->i_rdcache_ref ||
- (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */
+ (S_ISREG(ci->vfs_inode.i_mode) &&
ci->vfs_inode.i_data.nrpages))
used |= CEPH_CAP_FILE_CACHE;
if (ci->i_wr_ref)
used |= CEPH_CAP_FILE_WR;
if (ci->i_wb_ref || ci->i_wrbuffer_ref)
used |= CEPH_CAP_FILE_BUFFER;
+ if (ci->i_fx_ref)
+ used |= CEPH_CAP_FILE_EXCL;
return used;
}
+#define FMODE_WAIT_BIAS 1000
+
/*
* wanted, by virtue of open file modes
*/
int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
{
- int i, bits = 0;
- for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
- if (ci->i_nr_by_mode[i])
- bits |= 1 << i;
+ const int PIN_SHIFT = ffs(CEPH_FILE_MODE_PIN);
+ const int RD_SHIFT = ffs(CEPH_FILE_MODE_RD);
+ const int WR_SHIFT = ffs(CEPH_FILE_MODE_WR);
+ const int LAZY_SHIFT = ffs(CEPH_FILE_MODE_LAZY);
+ struct ceph_mount_options *opt =
+ ceph_inode_to_client(&ci->vfs_inode)->mount_options;
+ unsigned long used_cutoff = jiffies - opt->caps_wanted_delay_max * HZ;
+ unsigned long idle_cutoff = jiffies - opt->caps_wanted_delay_min * HZ;
+
+ if (S_ISDIR(ci->vfs_inode.i_mode)) {
+ int want = 0;
+
+ /* use used_cutoff here, to keep dir's wanted caps longer */
+ if (ci->i_nr_by_mode[RD_SHIFT] > 0 ||
+ time_after(ci->i_last_rd, used_cutoff))
+ want |= CEPH_CAP_ANY_SHARED;
+
+ if (ci->i_nr_by_mode[WR_SHIFT] > 0 ||
+ time_after(ci->i_last_wr, used_cutoff)) {
+ want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
+ if (opt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
+ want |= CEPH_CAP_ANY_DIR_OPS;
+ }
+
+ if (want || ci->i_nr_by_mode[PIN_SHIFT] > 0)
+ want |= CEPH_CAP_PIN;
+
+ return want;
+ } else {
+ int bits = 0;
+
+ if (ci->i_nr_by_mode[RD_SHIFT] > 0) {
+ if (ci->i_nr_by_mode[RD_SHIFT] >= FMODE_WAIT_BIAS ||
+ time_after(ci->i_last_rd, used_cutoff))
+ bits |= 1 << RD_SHIFT;
+ } else if (time_after(ci->i_last_rd, idle_cutoff)) {
+ bits |= 1 << RD_SHIFT;
+ }
+
+ if (ci->i_nr_by_mode[WR_SHIFT] > 0) {
+ if (ci->i_nr_by_mode[WR_SHIFT] >= FMODE_WAIT_BIAS ||
+ time_after(ci->i_last_wr, used_cutoff))
+ bits |= 1 << WR_SHIFT;
+ } else if (time_after(ci->i_last_wr, idle_cutoff)) {
+ bits |= 1 << WR_SHIFT;
+ }
+
+ /* check lazyio only when read/write is wanted */
+ if ((bits & (CEPH_FILE_MODE_RDWR << 1)) &&
+ ci->i_nr_by_mode[LAZY_SHIFT] > 0)
+ bits |= 1 << LAZY_SHIFT;
+
+ return bits ? ceph_caps_for_mode(bits >> 1) : 0;
}
- if (bits == 0)
- return 0;
- return ceph_caps_for_mode(bits >> 1);
+}
+
+/*
+ * wanted, by virtue of open file modes AND cap refs (buffered/cached data)
+ */
+int __ceph_caps_wanted(struct ceph_inode_info *ci)
+{
+ int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci);
+ if (S_ISDIR(ci->vfs_inode.i_mode)) {
+ /* we want EXCL if holding caps of dir ops */
+ if (w & CEPH_CAP_ANY_DIR_OPS)
+ w |= CEPH_CAP_FILE_EXCL;
+ } else {
+ /* we want EXCL if dirty data */
+ if (w & CEPH_CAP_FILE_BUFFER)
+ w |= CEPH_CAP_FILE_EXCL;
+ }
+ return w;
}
/*
@@ -1004,14 +1104,6 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
return mds_wanted;
}
-/*
- * called under i_ceph_lock
- */
-static int __ceph_is_single_caps(struct ceph_inode_info *ci)
-{
- return rb_first(&ci->i_caps) == rb_last(&ci->i_caps);
-}
-
int ceph_is_any_caps(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
@@ -1056,8 +1148,10 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
/* remove from inode's cap rbtree, and clear auth cap */
rb_erase(&cap->ci_node, &ci->i_caps);
- if (ci->i_auth_cap == cap)
+ if (ci->i_auth_cap == cap) {
+ WARN_ON_ONCE(!list_empty(&ci->i_dirty_item));
ci->i_auth_cap = NULL;
+ }
/* remove from session list */
spin_lock(&session->s_cap_lock);
@@ -1068,6 +1162,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
} else {
list_del_init(&cap->session_caps);
session->s_nr_caps--;
+ atomic64_dec(&mdsc->metric.total_caps);
cap->session = NULL;
removed = 1;
}
@@ -1114,6 +1209,7 @@ struct cap_msg_args {
u64 xattr_version;
u64 change_attr;
struct ceph_buffer *xattr_buf;
+ struct ceph_buffer *old_xattr_buf;
struct timespec64 atime, mtime, ctime, btime;
int op, caps, wanted, dirty;
u32 seq, issue_seq, mseq, time_warp_seq;
@@ -1122,6 +1218,7 @@ struct cap_msg_args {
kgid_t gid;
umode_t mode;
bool inline_data;
+ bool wake;
};
/*
@@ -1251,118 +1348,92 @@ void __ceph_remove_caps(struct ceph_inode_info *ci)
}
/*
- * Send a cap msg on the given inode. Update our caps state, then
- * drop i_ceph_lock and send the message.
+ * Prepare to send a cap message to an MDS. Update the cap state, and populate
+ * the arg struct with the parameters that will need to be sent. This should
+ * be done under the i_ceph_lock to guard against changes to cap state.
*
* Make note of max_size reported/requested from mds, revoked caps
* that have now been implemented.
- *
- * Return non-zero if delayed release, or we experienced an error
- * such that the caller should requeue + retry later.
- *
- * called with i_ceph_lock, then drops it.
- * caller should hold snap_rwsem (read), s_mutex.
*/
-static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
- int op, int flags, int used, int want, int retain,
- int flushing, u64 flush_tid, u64 oldest_flush_tid)
- __releases(cap->ci->i_ceph_lock)
+static void __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap,
+ int op, int flags, int used, int want, int retain,
+ int flushing, u64 flush_tid, u64 oldest_flush_tid)
{
struct ceph_inode_info *ci = cap->ci;
struct inode *inode = &ci->vfs_inode;
- struct ceph_buffer *old_blob = NULL;
- struct cap_msg_args arg;
int held, revoking;
- int wake = 0;
- int delayed = 0;
- int ret;
+
+ lockdep_assert_held(&ci->i_ceph_lock);
held = cap->issued | cap->implemented;
revoking = cap->implemented & ~cap->issued;
retain &= ~revoking;
- dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n",
- inode, cap, cap->session,
+ dout("%s %p cap %p session %p %s -> %s (revoking %s)\n",
+ __func__, inode, cap, cap->session,
ceph_cap_string(held), ceph_cap_string(held & retain),
ceph_cap_string(revoking));
BUG_ON((retain & CEPH_CAP_PIN) == 0);
- arg.session = cap->session;
-
- /* don't release wanted unless we've waited a bit. */
- if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
- time_before(jiffies, ci->i_hold_caps_min)) {
- dout(" delaying issued %s -> %s, wanted %s -> %s on send\n",
- ceph_cap_string(cap->issued),
- ceph_cap_string(cap->issued & retain),
- ceph_cap_string(cap->mds_wanted),
- ceph_cap_string(want));
- want |= cap->mds_wanted;
- retain |= cap->issued;
- delayed = 1;
- }
- ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH);
- if (want & ~cap->mds_wanted) {
- /* user space may open/close single file frequently.
- * This avoids droping mds_wanted immediately after
- * requesting new mds_wanted.
- */
- __cap_set_timeouts(mdsc, ci);
- }
+ ci->i_ceph_flags &= ~CEPH_I_FLUSH;
cap->issued &= retain; /* drop bits we don't want */
- if (cap->implemented & ~cap->issued) {
- /*
- * Wake up any waiters on wanted -> needed transition.
- * This is due to the weird transition from buffered
- * to sync IO... we need to flush dirty pages _before_
- * allowing sync writes to avoid reordering.
- */
- wake = 1;
- }
+ /*
+ * Wake up any waiters on wanted -> needed transition. This is due to
+ * the weird transition from buffered to sync IO... we need to flush
+ * dirty pages _before_ allowing sync writes to avoid reordering.
+ */
+ arg->wake = cap->implemented & ~cap->issued;
cap->implemented &= cap->issued | used;
cap->mds_wanted = want;
- arg.ino = ceph_vino(inode).ino;
- arg.cid = cap->cap_id;
- arg.follows = flushing ? ci->i_head_snapc->seq : 0;
- arg.flush_tid = flush_tid;
- arg.oldest_flush_tid = oldest_flush_tid;
-
- arg.size = inode->i_size;
- ci->i_reported_size = arg.size;
- arg.max_size = ci->i_wanted_max_size;
- ci->i_requested_max_size = arg.max_size;
+ arg->session = cap->session;
+ arg->ino = ceph_vino(inode).ino;
+ arg->cid = cap->cap_id;
+ arg->follows = flushing ? ci->i_head_snapc->seq : 0;
+ arg->flush_tid = flush_tid;
+ arg->oldest_flush_tid = oldest_flush_tid;
+
+ arg->size = inode->i_size;
+ ci->i_reported_size = arg->size;
+ arg->max_size = ci->i_wanted_max_size;
+ if (cap == ci->i_auth_cap) {
+ if (want & CEPH_CAP_ANY_FILE_WR)
+ ci->i_requested_max_size = arg->max_size;
+ else
+ ci->i_requested_max_size = 0;
+ }
if (flushing & CEPH_CAP_XATTR_EXCL) {
- old_blob = __ceph_build_xattrs_blob(ci);
- arg.xattr_version = ci->i_xattrs.version;
- arg.xattr_buf = ci->i_xattrs.blob;
+ arg->old_xattr_buf = __ceph_build_xattrs_blob(ci);
+ arg->xattr_version = ci->i_xattrs.version;
+ arg->xattr_buf = ci->i_xattrs.blob;
} else {
- arg.xattr_buf = NULL;
+ arg->xattr_buf = NULL;
+ arg->old_xattr_buf = NULL;
}
- arg.mtime = inode->i_mtime;
- arg.atime = inode->i_atime;
- arg.ctime = inode->i_ctime;
- arg.btime = ci->i_btime;
- arg.change_attr = inode_peek_iversion_raw(inode);
+ arg->mtime = inode->i_mtime;
+ arg->atime = inode->i_atime;
+ arg->ctime = inode->i_ctime;
+ arg->btime = ci->i_btime;
+ arg->change_attr = inode_peek_iversion_raw(inode);
- arg.op = op;
- arg.caps = cap->implemented;
- arg.wanted = want;
- arg.dirty = flushing;
+ arg->op = op;
+ arg->caps = cap->implemented;
+ arg->wanted = want;
+ arg->dirty = flushing;
- arg.seq = cap->seq;
- arg.issue_seq = cap->issue_seq;
- arg.mseq = cap->mseq;
- arg.time_warp_seq = ci->i_time_warp_seq;
+ arg->seq = cap->seq;
+ arg->issue_seq = cap->issue_seq;
+ arg->mseq = cap->mseq;
+ arg->time_warp_seq = ci->i_time_warp_seq;
- arg.uid = inode->i_uid;
- arg.gid = inode->i_gid;
- arg.mode = inode->i_mode;
+ arg->uid = inode->i_uid;
+ arg->gid = inode->i_gid;
+ arg->mode = inode->i_mode;
- arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+ arg->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
if (!(flags & CEPH_CLIENT_CAPS_PENDING_CAPSNAP) &&
!list_empty(&ci->i_cap_snaps)) {
struct ceph_cap_snap *capsnap;
@@ -1375,22 +1446,35 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
}
}
}
- arg.flags = flags;
-
- spin_unlock(&ci->i_ceph_lock);
+ arg->flags = flags;
+}
- ceph_buffer_put(old_blob);
+/*
+ * Send a cap msg on the given inode.
+ *
+ * Caller should hold snap_rwsem (read), s_mutex.
+ */
+static void __send_cap(struct ceph_mds_client *mdsc, struct cap_msg_args *arg,
+ struct ceph_inode_info *ci)
+{
+ struct inode *inode = &ci->vfs_inode;
+ int ret;
- ret = send_cap_msg(&arg);
+ ret = send_cap_msg(arg);
if (ret < 0) {
- dout("error sending cap msg, must requeue %p\n", inode);
- delayed = 1;
+ pr_err("error sending cap msg, ino (%llx.%llx) "
+ "flushing %s tid %llu, requeue\n",
+ ceph_vinop(inode), ceph_cap_string(arg->dirty),
+ arg->flush_tid);
+ spin_lock(&ci->i_ceph_lock);
+ __cap_delay_requeue(mdsc, ci);
+ spin_unlock(&ci->i_ceph_lock);
}
- if (wake)
- wake_up_all(&ci->i_cap_wq);
+ ceph_buffer_put(arg->old_xattr_buf);
- return delayed;
+ if (arg->wake)
+ wake_up_all(&ci->i_cap_wq);
}
static inline int __send_flush_snap(struct inode *inode,
@@ -1411,6 +1495,7 @@ static inline int __send_flush_snap(struct inode *inode,
arg.max_size = 0;
arg.xattr_version = capsnap->xattr_version;
arg.xattr_buf = capsnap->xattr_blob;
+ arg.old_xattr_buf = NULL;
arg.atime = capsnap->atime;
arg.mtime = capsnap->mtime;
@@ -1434,6 +1519,7 @@ static inline int __send_flush_snap(struct inode *inode,
arg.inline_data = capsnap->inline_data;
arg.flags = 0;
+ arg.wake = false;
return send_cap_msg(&arg);
}
@@ -1617,6 +1703,8 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
int was = ci->i_dirty_caps;
int dirty = 0;
+ lockdep_assert_held(&ci->i_ceph_lock);
+
if (!ci->i_auth_cap) {
pr_warn("__mark_dirty_caps %p %llx mask %s, "
"but no auth cap (session was closed?)\n",
@@ -1629,6 +1717,8 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
ceph_cap_string(was | mask));
ci->i_dirty_caps |= mask;
if (was == 0) {
+ struct ceph_mds_session *session = ci->i_auth_cap->session;
+
WARN_ON_ONCE(ci->i_prealloc_cap_flush);
swap(ci->i_prealloc_cap_flush, *pcf);
@@ -1641,7 +1731,7 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
&ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
BUG_ON(!list_empty(&ci->i_dirty_item));
spin_lock(&mdsc->cap_dirty_lock);
- list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
+ list_add(&ci->i_dirty_item, &session->s_cap_dirty);
spin_unlock(&mdsc->cap_dirty_lock);
if (ci->i_flushing_caps == 0) {
ihold(inode);
@@ -1654,7 +1744,7 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
(mask & CEPH_CAP_FILE_BUFFER))
dirty |= I_DIRTY_DATASYNC;
- __cap_delay_requeue(mdsc, ci, true);
+ __cap_delay_requeue(mdsc, ci);
return dirty;
}
@@ -1684,30 +1774,33 @@ static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
* Remove cap_flush from the mdsc's or inode's flushing cap list.
* Return true if caller needs to wake up flush waiters.
*/
-static bool __finish_cap_flush(struct ceph_mds_client *mdsc,
- struct ceph_inode_info *ci,
- struct ceph_cap_flush *cf)
+static bool __detach_cap_flush_from_mdsc(struct ceph_mds_client *mdsc,
+ struct ceph_cap_flush *cf)
{
struct ceph_cap_flush *prev;
bool wake = cf->wake;
- if (mdsc) {
- /* are there older pending cap flushes? */
- if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
- prev = list_prev_entry(cf, g_list);
- prev->wake = true;
- wake = false;
- }
- list_del(&cf->g_list);
- } else if (ci) {
- if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
- prev = list_prev_entry(cf, i_list);
- prev->wake = true;
- wake = false;
- }
- list_del(&cf->i_list);
- } else {
- BUG_ON(1);
+
+ if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
+ prev = list_prev_entry(cf, g_list);
+ prev->wake = true;
+ wake = false;
}
+ list_del(&cf->g_list);
+ return wake;
+}
+
+static bool __detach_cap_flush_from_ci(struct ceph_inode_info *ci,
+ struct ceph_cap_flush *cf)
+{
+ struct ceph_cap_flush *prev;
+ bool wake = cf->wake;
+
+ if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
+ prev = list_prev_entry(cf, i_list);
+ prev->wake = true;
+ wake = false;
+ }
+ list_del(&cf->i_list);
return wake;
}
@@ -1726,6 +1819,7 @@ static u64 __mark_caps_flushing(struct inode *inode,
struct ceph_cap_flush *cf = NULL;
int flushing;
+ lockdep_assert_held(&ci->i_ceph_lock);
BUG_ON(ci->i_dirty_caps == 0);
BUG_ON(list_empty(&ci->i_dirty_item));
BUG_ON(!ci->i_prealloc_cap_flush);
@@ -1805,8 +1899,6 @@ bool __ceph_should_report_size(struct ceph_inode_info *ci)
* versus held caps. Release, flush, ack revoked caps to mds as
* appropriate.
*
- * CHECK_CAPS_NODELAY - caller is delayed work and we should not delay
- * cap release further.
* CHECK_CAPS_AUTHONLY - we should only check the auth cap
* CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without
* further delay.
@@ -1825,24 +1917,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
int mds = -1; /* keep track of how far we've gone through i_caps list
to avoid an infinite loop on retry */
struct rb_node *p;
- int delayed = 0, sent = 0;
- bool no_delay = flags & CHECK_CAPS_NODELAY;
bool queue_invalidate = false;
bool tried_invalidate = false;
- /* if we are unmounting, flush any unused caps immediately. */
- if (mdsc->stopping)
- no_delay = true;
-
spin_lock(&ci->i_ceph_lock);
-
if (ci->i_ceph_flags & CEPH_I_FLUSH)
flags |= CHECK_CAPS_FLUSH;
- if (!(flags & CHECK_CAPS_AUTHONLY) ||
- (ci->i_auth_cap && __ceph_is_single_caps(ci)))
- __cap_delay_cancel(mdsc, ci);
-
goto retry_locked;
retry:
spin_lock(&ci->i_ceph_lock);
@@ -1866,10 +1947,11 @@ retry_locked:
* revoking the shared cap on every create/unlink
* operation.
*/
- if (IS_RDONLY(inode))
+ if (IS_RDONLY(inode)) {
want = CEPH_CAP_ANY_SHARED;
- else
- want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
+ } else {
+ want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
+ }
retain |= want;
} else {
@@ -1885,14 +1967,13 @@ retry_locked:
}
dout("check_caps %p file_want %s used %s dirty %s flushing %s"
- " issued %s revoking %s retain %s %s%s%s\n", inode,
+ " issued %s revoking %s retain %s %s%s\n", inode,
ceph_cap_string(file_wanted),
ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
ceph_cap_string(ci->i_flushing_caps),
ceph_cap_string(issued), ceph_cap_string(revoking),
ceph_cap_string(retain),
(flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "",
- (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "",
(flags & CHECK_CAPS_FLUSH) ? " FLUSH" : "");
/*
@@ -1900,8 +1981,8 @@ retry_locked:
* have cached pages, but don't want them, then try to invalidate.
* If we fail, it's because pages are locked.... try again later.
*/
- if ((!no_delay || mdsc->stopping) &&
- !S_ISDIR(inode->i_mode) && /* ignore readdir cache */
+ if ((!(flags & CHECK_CAPS_NOINVAL) || mdsc->stopping) &&
+ S_ISREG(inode->i_mode) &&
!(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */
inode->i_data.nrpages && /* have cached pages */
(revoking & (CEPH_CAP_FILE_CACHE|
@@ -1918,6 +1999,9 @@ retry_locked:
}
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+ int mflags = 0;
+ struct cap_msg_args arg;
+
cap = rb_entry(p, struct ceph_cap, ci_node);
/* avoid looping forever */
@@ -1973,28 +2057,17 @@ retry_locked:
}
/* want more caps from mds? */
- if (want & ~(cap->mds_wanted | cap->issued))
- goto ack;
+ if (want & ~cap->mds_wanted) {
+ if (want & ~(cap->mds_wanted | cap->issued))
+ goto ack;
+ if (!__cap_is_valid(cap))
+ goto ack;
+ }
/* things we might delay */
if ((cap->issued & ~retain) == 0)
continue; /* nope, all good */
- if (no_delay)
- goto ack;
-
- /* delay? */
- if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
- time_before(jiffies, ci->i_hold_caps_max)) {
- dout(" delaying issued %s -> %s, wanted %s -> %s\n",
- ceph_cap_string(cap->issued),
- ceph_cap_string(cap->issued & retain),
- ceph_cap_string(cap->mds_wanted),
- ceph_cap_string(want));
- delayed++;
- continue;
- }
-
ack:
if (session && session != cap->session) {
dout("oops, wrong session %p mutex\n", session);
@@ -2006,12 +2079,24 @@ ack:
if (mutex_trylock(&session->s_mutex) == 0) {
dout("inverting session/ino locks on %p\n",
session);
+ session = ceph_get_mds_session(session);
spin_unlock(&ci->i_ceph_lock);
if (took_snap_rwsem) {
up_read(&mdsc->snap_rwsem);
took_snap_rwsem = 0;
}
- mutex_lock(&session->s_mutex);
+ if (session) {
+ mutex_lock(&session->s_mutex);
+ ceph_put_mds_session(session);
+ } else {
+ /*
+ * Because we take the reference while
+ * holding the i_ceph_lock, it should
+ * never be NULL. Throw a warning if it
+ * ever is.
+ */
+ WARN_ON_ONCE(true);
+ }
goto retry;
}
}
@@ -2046,6 +2131,9 @@ ack:
flushing = ci->i_dirty_caps;
flush_tid = __mark_caps_flushing(inode, session, false,
&oldest_flush_tid);
+ if (flags & CHECK_CAPS_FLUSH &&
+ list_empty(&session->s_cap_dirty))
+ mflags |= CEPH_CLIENT_CAPS_SYNC;
} else {
flushing = 0;
flush_tid = 0;
@@ -2055,18 +2143,23 @@ ack:
}
mds = cap->mds; /* remember mds, so we don't repeat */
- sent++;
- /* __send_cap drops i_ceph_lock */
- delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, 0,
- cap_used, want, retain, flushing,
- flush_tid, oldest_flush_tid);
+ __prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used,
+ want, retain, flushing, flush_tid, oldest_flush_tid);
+ spin_unlock(&ci->i_ceph_lock);
+
+ __send_cap(mdsc, &arg, ci);
+
goto retry; /* retake i_ceph_lock and restart our cap scan. */
}
- /* Reschedule delayed caps release if we delayed anything */
- if (delayed)
- __cap_delay_requeue(mdsc, ci, false);
+ /* periodically re-calculate caps wanted by open files */
+ if (__ceph_is_any_real_caps(ci) &&
+ list_empty(&ci->i_cap_delay_list) &&
+ (file_wanted & ~CEPH_CAP_PIN) &&
+ !(used & (CEPH_CAP_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
+ __cap_delay_requeue(mdsc, ci);
+ }
spin_unlock(&ci->i_ceph_lock);
@@ -2095,7 +2188,7 @@ retry:
retry_locked:
if (ci->i_dirty_caps && ci->i_auth_cap) {
struct ceph_cap *cap = ci->i_auth_cap;
- int delayed;
+ struct cap_msg_args arg;
if (session != cap->session) {
spin_unlock(&ci->i_ceph_lock);
@@ -2123,19 +2216,13 @@ retry_locked:
flush_tid = __mark_caps_flushing(inode, session, true,
&oldest_flush_tid);
- /* __send_cap drops i_ceph_lock */
- delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
- CEPH_CLIENT_CAPS_SYNC,
- __ceph_caps_used(ci),
- __ceph_caps_wanted(ci),
- (cap->issued | cap->implemented),
- flushing, flush_tid, oldest_flush_tid);
+ __prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC,
+ __ceph_caps_used(ci), __ceph_caps_wanted(ci),
+ (cap->issued | cap->implemented),
+ flushing, flush_tid, oldest_flush_tid);
+ spin_unlock(&ci->i_ceph_lock);
- if (delayed) {
- spin_lock(&ci->i_ceph_lock);
- __cap_delay_requeue(mdsc, ci, true);
- spin_unlock(&ci->i_ceph_lock);
- }
+ __send_cap(mdsc, &arg, ci);
} else {
if (!list_empty(&ci->i_cap_flush_list)) {
struct ceph_cap_flush *cf =
@@ -2233,6 +2320,10 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
if (datasync)
goto out;
+ ret = ceph_wait_on_async_create(inode);
+ if (ret)
+ goto out;
+
dirty = try_flush_caps(inode, &flush_tid);
dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
@@ -2333,24 +2424,19 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
first_tid = cf->tid + 1;
if (cf->caps) {
+ struct cap_msg_args arg;
+
dout("kick_flushing_caps %p cap %p tid %llu %s\n",
inode, cap, cf->tid, ceph_cap_string(cf->caps));
- ci->i_ceph_flags |= CEPH_I_NODELAY;
-
- ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+ __prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH,
(cf->tid < last_snap_flush ?
CEPH_CLIENT_CAPS_PENDING_CAPSNAP : 0),
__ceph_caps_used(ci),
__ceph_caps_wanted(ci),
(cap->issued | cap->implemented),
cf->caps, cf->tid, oldest_flush_tid);
- if (ret) {
- pr_err("kick_flushing_caps: error sending "
- "cap flush, ino (%llx.%llx) "
- "tid %llu flushing %s\n",
- ceph_vinop(inode), cf->tid,
- ceph_cap_string(cf->caps));
- }
+ spin_unlock(&ci->i_ceph_lock);
+ __send_cap(mdsc, &arg, ci);
} else {
struct ceph_cap_snap *capsnap =
container_of(cf, struct ceph_cap_snap,
@@ -2434,6 +2520,8 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_cap *cap;
u64 oldest_flush_tid;
+ lockdep_assert_held(&session->s_mutex);
+
dout("kick_flushing_caps mds%d\n", session->s_mds);
spin_lock(&mdsc->cap_dirty_lock);
@@ -2457,16 +2545,15 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
}
}
-static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session,
- struct inode *inode)
- __releases(ci->i_ceph_lock)
+void ceph_kick_flushing_inode_caps(struct ceph_mds_session *session,
+ struct ceph_inode_info *ci)
{
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_cap *cap;
+ struct ceph_mds_client *mdsc = session->s_mdsc;
+ struct ceph_cap *cap = ci->i_auth_cap;
+
+ lockdep_assert_held(&ci->i_ceph_lock);
- cap = ci->i_auth_cap;
- dout("kick_flushing_inode_caps %p flushing %s\n", inode,
+ dout("%s %p flushing %s\n", __func__, &ci->vfs_inode,
ceph_cap_string(ci->i_flushing_caps));
if (!list_empty(&ci->i_cap_flush_list)) {
@@ -2478,9 +2565,6 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
spin_unlock(&mdsc->cap_dirty_lock);
__kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
- spin_unlock(&ci->i_ceph_lock);
- } else {
- spin_unlock(&ci->i_ceph_lock);
}
}
@@ -2488,18 +2572,20 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
/*
* Take references to capabilities we hold, so that we don't release
* them to the MDS prematurely.
- *
- * Protected by i_ceph_lock.
*/
-static void __take_cap_refs(struct ceph_inode_info *ci, int got,
+void ceph_take_cap_refs(struct ceph_inode_info *ci, int got,
bool snap_rwsem_locked)
{
+ lockdep_assert_held(&ci->i_ceph_lock);
+
if (got & CEPH_CAP_PIN)
ci->i_pin_ref++;
if (got & CEPH_CAP_FILE_RD)
ci->i_rd_ref++;
if (got & CEPH_CAP_FILE_CACHE)
ci->i_rdcache_ref++;
+ if (got & CEPH_CAP_FILE_EXCL)
+ ci->i_fx_ref++;
if (got & CEPH_CAP_FILE_WR) {
if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
BUG_ON(!snap_rwsem_locked);
@@ -2512,7 +2598,7 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got,
if (ci->i_wb_ref == 0)
ihold(&ci->vfs_inode);
ci->i_wb_ref++;
- dout("__take_cap_refs %p wb %d -> %d (?)\n",
+ dout("%s %p wb %d -> %d (?)\n", __func__,
&ci->vfs_inode, ci->i_wb_ref-1, ci->i_wb_ref);
}
}
@@ -2524,14 +2610,16 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got,
* Note that caller is responsible for ensuring max_size increases are
* requested from the MDS.
*
- * Returns 0 if caps were not able to be acquired (yet), a 1 if they were,
- * or a negative error code.
- *
- * FIXME: how does a 0 return differ from -EAGAIN?
+ * Returns 0 if caps were not able to be acquired (yet), 1 if succeed,
+ * or a negative error code. There are 3 speical error codes:
+ * -EAGAIN: need to sleep but non-blocking is specified
+ * -EFBIG: ask caller to call check_max_size() and try again.
+ * -ESTALE: ask caller to call ceph_renew_caps() and try again.
*/
enum {
- NON_BLOCKING = 1,
- CHECK_FILELOCK = 2,
+ /* first 8 bits are reserved for CEPH_FILE_MODE_FOO */
+ NON_BLOCKING = (1 << 8),
+ CHECK_FILELOCK = (1 << 9),
};
static int try_get_cap_refs(struct inode *inode, int need, int want,
@@ -2541,7 +2629,6 @@ static int try_get_cap_refs(struct inode *inode, int need, int want,
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
int ret = 0;
int have, implemented;
- int file_wanted;
bool snap_rwsem_locked = false;
dout("get_cap_refs %p need %s want %s\n", inode,
@@ -2557,15 +2644,6 @@ again:
goto out_unlock;
}
- /* make sure file is actually open */
- file_wanted = __ceph_caps_file_wanted(ci);
- if ((file_wanted & need) != need) {
- dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
- ceph_cap_string(need), ceph_cap_string(file_wanted));
- ret = -EBADF;
- goto out_unlock;
- }
-
/* finish pending truncate */
while (ci->i_truncate_pending) {
spin_unlock(&ci->i_ceph_lock);
@@ -2584,7 +2662,7 @@ again:
dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
inode, endoff, ci->i_max_size);
if (endoff > ci->i_requested_max_size)
- ret = -EAGAIN;
+ ret = ci->i_auth_cap ? -EFBIG : -ESTALE;
goto out_unlock;
}
/*
@@ -2630,55 +2708,64 @@ again:
}
snap_rwsem_locked = true;
}
- *got = need | (have & want);
- if ((need & CEPH_CAP_FILE_RD) &&
+ if ((have & want) == want)
+ *got = need | want;
+ else
+ *got = need;
+ if (S_ISREG(inode->i_mode) &&
+ (need & CEPH_CAP_FILE_RD) &&
!(*got & CEPH_CAP_FILE_CACHE))
ceph_disable_fscache_readpage(ci);
- __take_cap_refs(ci, *got, true);
+ ceph_take_cap_refs(ci, *got, true);
ret = 1;
}
} else {
int session_readonly = false;
- if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) {
+ int mds_wanted;
+ if (ci->i_auth_cap &&
+ (need & (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_EXCL))) {
struct ceph_mds_session *s = ci->i_auth_cap->session;
spin_lock(&s->s_cap_lock);
session_readonly = s->s_readonly;
spin_unlock(&s->s_cap_lock);
}
if (session_readonly) {
- dout("get_cap_refs %p needed %s but mds%d readonly\n",
+ dout("get_cap_refs %p need %s but mds%d readonly\n",
inode, ceph_cap_string(need), ci->i_auth_cap->mds);
ret = -EROFS;
goto out_unlock;
}
- if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) {
- int mds_wanted;
- if (READ_ONCE(mdsc->fsc->mount_state) ==
- CEPH_MOUNT_SHUTDOWN) {
- dout("get_cap_refs %p forced umount\n", inode);
- ret = -EIO;
- goto out_unlock;
- }
- mds_wanted = __ceph_caps_mds_wanted(ci, false);
- if (need & ~(mds_wanted & need)) {
- dout("get_cap_refs %p caps were dropped"
- " (session killed?)\n", inode);
- ret = -ESTALE;
- goto out_unlock;
- }
- if (!(file_wanted & ~mds_wanted))
- ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED;
+ if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+ dout("get_cap_refs %p forced umount\n", inode);
+ ret = -EIO;
+ goto out_unlock;
+ }
+ mds_wanted = __ceph_caps_mds_wanted(ci, false);
+ if (need & ~mds_wanted) {
+ dout("get_cap_refs %p need %s > mds_wanted %s\n",
+ inode, ceph_cap_string(need),
+ ceph_cap_string(mds_wanted));
+ ret = -ESTALE;
+ goto out_unlock;
}
- dout("get_cap_refs %p have %s needed %s\n", inode,
+ dout("get_cap_refs %p have %s need %s\n", inode,
ceph_cap_string(have), ceph_cap_string(need));
}
out_unlock:
+
+ __ceph_touch_fmode(ci, mdsc, flags);
+
spin_unlock(&ci->i_ceph_lock);
if (snap_rwsem_locked)
up_read(&mdsc->snap_rwsem);
+ if (!ret)
+ ceph_update_cap_mis(&mdsc->metric);
+ else if (ret == 1)
+ ceph_update_cap_hit(&mdsc->metric);
+
dout("get_cap_refs %p ret %d got %s\n", inode,
ret, ceph_cap_string(*got));
return ret;
@@ -2712,20 +2799,40 @@ static void check_max_size(struct inode *inode, loff_t endoff)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
}
+static inline int get_used_fmode(int caps)
+{
+ int fmode = 0;
+ if (caps & CEPH_CAP_FILE_RD)
+ fmode |= CEPH_FILE_MODE_RD;
+ if (caps & CEPH_CAP_FILE_WR)
+ fmode |= CEPH_FILE_MODE_WR;
+ return fmode;
+}
+
int ceph_try_get_caps(struct inode *inode, int need, int want,
bool nonblock, int *got)
{
- int ret;
+ int ret, flags;
BUG_ON(need & ~CEPH_CAP_FILE_RD);
- BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED));
- ret = ceph_pool_perm_check(inode, need);
- if (ret < 0)
- return ret;
+ BUG_ON(want & ~(CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO |
+ CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
+ CEPH_CAP_ANY_DIR_OPS));
+ if (need) {
+ ret = ceph_pool_perm_check(inode, need);
+ if (ret < 0)
+ return ret;
+ }
+
+ flags = get_used_fmode(need | want);
+ if (nonblock)
+ flags |= NON_BLOCKING;
- ret = try_get_cap_refs(inode, need, want, 0,
- (nonblock ? NON_BLOCKING : 0), got);
- return ret == -EAGAIN ? 0 : ret;
+ ret = try_get_cap_refs(inode, need, want, 0, flags, got);
+ /* three special error codes */
+ if (ret == -EAGAIN || ret == -EFBIG || ret == -ESTALE)
+ ret = 0;
+ return ret;
}
/*
@@ -2750,22 +2857,22 @@ int ceph_get_caps(struct file *filp, int need, int want,
fi->filp_gen != READ_ONCE(fsc->filp_gen))
return -EBADF;
- while (true) {
- if (endoff > 0)
- check_max_size(inode, endoff);
+ flags = get_used_fmode(need | want);
- flags = atomic_read(&fi->num_locks) ? CHECK_FILELOCK : 0;
+ while (true) {
+ flags &= CEPH_FILE_MODE_MASK;
+ if (atomic_read(&fi->num_locks))
+ flags |= CHECK_FILELOCK;
_got = 0;
ret = try_get_cap_refs(inode, need, want, endoff,
flags, &_got);
- if (ret == -EAGAIN)
- continue;
+ WARN_ON_ONCE(ret == -EAGAIN);
if (!ret) {
struct ceph_mds_client *mdsc = fsc->mdsc;
struct cap_wait cw;
DEFINE_WAIT_FUNC(wait, woken_wake_function);
- cw.ino = inode->i_ino;
+ cw.ino = ceph_ino(inode);
cw.tgid = current->tgid;
cw.need = need;
cw.want = want;
@@ -2774,6 +2881,8 @@ int ceph_get_caps(struct file *filp, int need, int want,
list_add(&cw.list, &mdsc->cap_wait_list);
spin_unlock(&mdsc->caps_list_lock);
+ /* make sure used fmode not timeout */
+ ceph_get_fmode(ci, flags, FMODE_WAIT_BIAS);
add_wait_queue(&ci->i_cap_wq, &wait);
flags |= NON_BLOCKING;
@@ -2787,6 +2896,7 @@ int ceph_get_caps(struct file *filp, int need, int want,
}
remove_wait_queue(&ci->i_cap_wq, &wait);
+ ceph_put_fmode(ci, flags, FMODE_WAIT_BIAS);
spin_lock(&mdsc->caps_list_lock);
list_del(&cw.list);
@@ -2804,16 +2914,26 @@ int ceph_get_caps(struct file *filp, int need, int want,
}
if (ret < 0) {
+ if (ret == -EFBIG || ret == -ESTALE) {
+ int ret2 = ceph_wait_on_async_create(inode);
+ if (ret2 < 0)
+ return ret2;
+ }
+ if (ret == -EFBIG) {
+ check_max_size(inode, endoff);
+ continue;
+ }
if (ret == -ESTALE) {
/* session was killed, try renew caps */
- ret = ceph_renew_caps(inode);
+ ret = ceph_renew_caps(inode, flags);
if (ret == 0)
continue;
}
return ret;
}
- if (ci->i_inline_version != CEPH_INLINE_NONE &&
+ if (S_ISREG(ci->vfs_inode.i_mode) &&
+ ci->i_inline_version != CEPH_INLINE_NONE &&
(_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
i_size_read(inode) > 0) {
struct page *page =
@@ -2846,7 +2966,8 @@ int ceph_get_caps(struct file *filp, int need, int want,
break;
}
- if ((_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE))
+ if (S_ISREG(ci->vfs_inode.i_mode) &&
+ (_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE))
ceph_fscache_revalidate_cookie(ci);
*got = _got;
@@ -2860,7 +2981,7 @@ int ceph_get_caps(struct file *filp, int need, int want,
void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
{
spin_lock(&ci->i_ceph_lock);
- __take_cap_refs(ci, caps, false);
+ ceph_take_cap_refs(ci, caps, false);
spin_unlock(&ci->i_ceph_lock);
}
@@ -2897,7 +3018,8 @@ static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci,
* If we are releasing a WR cap (from a sync write), finalize any affected
* cap_snap, and wake up any waiters.
*/
-void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
+ bool skip_checking_caps)
{
struct inode *inode = &ci->vfs_inode;
int last = 0, put = 0, flushsnaps = 0, wake = 0;
@@ -2911,6 +3033,9 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
if (had & CEPH_CAP_FILE_CACHE)
if (--ci->i_rdcache_ref == 0)
last++;
+ if (had & CEPH_CAP_FILE_EXCL)
+ if (--ci->i_fx_ref == 0)
+ last++;
if (had & CEPH_CAP_FILE_BUFFER) {
if (--ci->i_wb_ref == 0) {
last++;
@@ -2950,7 +3075,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
last ? " last" : "", put ? " put" : "");
- if (last && !flushsnaps)
+ if (last && !skip_checking_caps)
ceph_check_caps(ci, 0, NULL);
else if (flushsnaps)
ceph_flush_snaps(ci, NULL);
@@ -2960,6 +3085,16 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
iput(inode);
}
+void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+{
+ __ceph_put_cap_refs(ci, had, false);
+}
+
+void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci, int had)
+{
+ __ceph_put_cap_refs(ci, had, true);
+}
+
/*
* Release @nr WRBUFFER refs on dirty pages for the given @snapc snap
* context. Adjust per-snap dirty page accounting as appropriate.
@@ -3032,7 +3167,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
spin_unlock(&ci->i_ceph_lock);
if (last) {
- ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
+ ceph_check_caps(ci, 0, NULL);
} else if (flush_snaps) {
ceph_flush_snaps(ci, NULL);
}
@@ -3133,7 +3268,7 @@ static void handle_cap_grant(struct inode *inode,
* try to invalidate (once). (If there are dirty buffers, we
* will invalidate _after_ writeback.)
*/
- if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */
+ if (S_ISREG(inode->i_mode) && /* don't invalidate readdir cache */
((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
(newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
!(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
@@ -3258,10 +3393,6 @@ static void handle_cap_grant(struct inode *inode,
ci->i_requested_max_size = 0;
}
wake = true;
- } else if (ci->i_wanted_max_size > ci->i_max_size &&
- ci->i_wanted_max_size > ci->i_requested_max_size) {
- /* CEPH_CAP_OP_IMPORT */
- wake = true;
}
}
@@ -3297,11 +3428,12 @@ static void handle_cap_grant(struct inode *inode,
ceph_cap_string(cap->issued),
ceph_cap_string(newcaps),
ceph_cap_string(revoking));
- if (revoking & used & CEPH_CAP_FILE_BUFFER)
+ if (S_ISREG(inode->i_mode) &&
+ (revoking & used & CEPH_CAP_FILE_BUFFER))
writeback = true; /* initiate writeback; will delay ack */
- else if (revoking == CEPH_CAP_FILE_CACHE &&
- (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
- queue_invalidate)
+ else if (queue_invalidate &&
+ revoking == CEPH_CAP_FILE_CACHE &&
+ (newcaps & CEPH_CAP_FILE_LAZYIO) == 0)
; /* do nothing yet, invalidation will be queued */
else if (cap == ci->i_auth_cap)
check_caps = 1; /* check auth cap only */
@@ -3336,10 +3468,20 @@ static void handle_cap_grant(struct inode *inode,
fill_inline = true;
}
- if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
+ if (ci->i_auth_cap == cap &&
+ le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
if (newcaps & ~extra_info->issued)
wake = true;
- kick_flushing_inode_caps(session->s_mdsc, session, inode);
+
+ if (ci->i_requested_max_size > max_size ||
+ !(le32_to_cpu(grant->wanted) & CEPH_CAP_ANY_FILE_WR)) {
+ /* re-request max_size if necessary */
+ ci->i_requested_max_size = 0;
+ wake = true;
+ }
+
+ ceph_kick_flushing_inode_caps(session, ci);
+ spin_unlock(&ci->i_ceph_lock);
up_read(&session->s_mdsc->snap_rwsem);
} else {
spin_unlock(&ci->i_ceph_lock);
@@ -3367,10 +3509,10 @@ static void handle_cap_grant(struct inode *inode,
wake_up_all(&ci->i_cap_wq);
if (check_caps == 1)
- ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
+ ceph_check_caps(ci, CHECK_CAPS_AUTHONLY | CHECK_CAPS_NOINVAL,
session);
else if (check_caps == 2)
- ceph_check_caps(ci, CHECK_CAPS_NODELAY, session);
+ ceph_check_caps(ci, CHECK_CAPS_NOINVAL, session);
else
mutex_unlock(&session->s_mutex);
}
@@ -3397,15 +3539,26 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
bool wake_mdsc = false;
list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
+ /* Is this the one that was flushed? */
if (cf->tid == flush_tid)
cleaned = cf->caps;
- if (cf->caps == 0) /* capsnap */
+
+ /* Is this a capsnap? */
+ if (cf->caps == 0)
continue;
+
if (cf->tid <= flush_tid) {
- if (__finish_cap_flush(NULL, ci, cf))
- wake_ci = true;
+ /*
+ * An earlier or current tid. The FLUSH_ACK should
+ * represent a superset of this flush's caps.
+ */
+ wake_ci |= __detach_cap_flush_from_ci(ci, cf);
list_add_tail(&cf->i_list, &to_remove);
} else {
+ /*
+ * This is a later one. Any caps in it are still dirty
+ * so don't count them as cleaned.
+ */
cleaned &= ~cf->caps;
if (!cleaned)
break;
@@ -3425,10 +3578,8 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
spin_lock(&mdsc->cap_dirty_lock);
- list_for_each_entry(cf, &to_remove, i_list) {
- if (__finish_cap_flush(mdsc, NULL, cf))
- wake_mdsc = true;
- }
+ list_for_each_entry(cf, &to_remove, i_list)
+ wake_mdsc |= __detach_cap_flush_from_mdsc(mdsc, cf);
if (ci->i_flushing_caps == 0) {
if (list_empty(&ci->i_cap_flush_list)) {
@@ -3520,17 +3671,15 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
dout(" removing %p cap_snap %p follows %lld\n",
inode, capsnap, follows);
list_del(&capsnap->ci_item);
- if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush))
- wake_ci = true;
+ wake_ci |= __detach_cap_flush_from_ci(ci, &capsnap->cap_flush);
spin_lock(&mdsc->cap_dirty_lock);
if (list_empty(&ci->i_cap_flush_list))
list_del_init(&ci->i_flushing_item);
- if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush))
- wake_mdsc = true;
-
+ wake_mdsc |= __detach_cap_flush_from_mdsc(mdsc,
+ &capsnap->cap_flush);
spin_unlock(&mdsc->cap_dirty_lock);
}
spin_unlock(&ci->i_ceph_lock);
@@ -3550,10 +3699,9 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
*
* caller hold s_mutex.
*/
-static void handle_cap_trunc(struct inode *inode,
+static bool handle_cap_trunc(struct inode *inode,
struct ceph_mds_caps *trunc,
struct ceph_mds_session *session)
- __releases(ci->i_ceph_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
@@ -3564,7 +3712,9 @@ static void handle_cap_trunc(struct inode *inode,
int implemented = 0;
int dirty = __ceph_caps_dirty(ci);
int issued = __ceph_caps_issued(ceph_inode(inode), &implemented);
- int queue_trunc = 0;
+ bool queue_trunc = false;
+
+ lockdep_assert_held(&ci->i_ceph_lock);
issued |= implemented | dirty;
@@ -3572,10 +3722,7 @@ static void handle_cap_trunc(struct inode *inode,
inode, mds, seq, truncate_size, truncate_seq);
queue_trunc = ceph_fill_file_size(inode, issued,
truncate_seq, truncate_size, size);
- spin_unlock(&ci->i_ceph_lock);
-
- if (queue_trunc)
- ceph_queue_vmtruncate(inode);
+ return queue_trunc;
}
/*
@@ -3619,8 +3766,6 @@ retry:
goto out_unlock;
if (target < 0) {
- if (cap->mds_wanted | cap->issued)
- ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
__ceph_remove_cap(cap, false);
goto out_unlock;
}
@@ -3651,15 +3796,9 @@ retry:
tcap->issue_seq = t_seq - 1;
tcap->issued |= issued;
tcap->implemented |= issued;
- if (cap == ci->i_auth_cap)
+ if (cap == ci->i_auth_cap) {
ci->i_auth_cap = tcap;
-
- if (!list_empty(&ci->i_cap_flush_list) &&
- ci->i_auth_cap == tcap) {
- spin_lock(&mdsc->cap_dirty_lock);
- list_move_tail(&ci->i_flushing_item,
- &tcap->session->s_cap_flushing);
- spin_unlock(&mdsc->cap_dirty_lock);
+ change_auth_cap_ses(ci, tcap->session);
}
}
__ceph_remove_cap(cap, false);
@@ -3668,7 +3807,7 @@ retry:
/* add placeholder for the export tagert */
int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
tcap = new_cap;
- ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
+ ceph_add_cap(inode, tsession, t_cap_id, issued, 0,
t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
if (!list_empty(&ci->i_cap_flush_list) &&
@@ -3703,6 +3842,7 @@ retry:
WARN_ON(1);
tsession = NULL;
target = -1;
+ mutex_lock(&session->s_mutex);
}
goto retry;
@@ -3727,7 +3867,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
struct ceph_mds_cap_peer *ph,
struct ceph_mds_session *session,
struct ceph_cap **target_cap, int *old_issued)
- __acquires(ci->i_ceph_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap *cap, *ocap, *new_cap = NULL;
@@ -3752,14 +3891,13 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
inode, ci, mds, mseq, peer);
-
retry:
- spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds);
if (!cap) {
if (!new_cap) {
spin_unlock(&ci->i_ceph_lock);
new_cap = ceph_get_cap(mdsc, NULL);
+ spin_lock(&ci->i_ceph_lock);
goto retry;
}
cap = new_cap;
@@ -3773,7 +3911,7 @@ retry:
__ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
- ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq,
+ ceph_add_cap(inode, session, cap_id, caps, wanted, seq, mseq,
realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
@@ -3794,9 +3932,6 @@ retry:
__ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
}
- /* make sure we re-request max_size, if necessary */
- ci->i_requested_max_size = 0;
-
*old_issued = issued;
*target_cap = cap;
}
@@ -3825,6 +3960,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
size_t snaptrace_len;
void *p, *end;
struct cap_extra_info extra_info = {};
+ bool queue_trunc;
dout("handle_caps from mds%d\n", session->s_mds);
@@ -3947,7 +4083,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
__ceph_queue_cap_release(session, cap);
spin_unlock(&session->s_cap_lock);
}
- goto done;
+ goto flush_cap_releases;
}
/* these will work even if we don't have a cap yet */
@@ -3972,6 +4108,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
} else {
down_read(&mdsc->snap_rwsem);
}
+ spin_lock(&ci->i_ceph_lock);
handle_cap_import(mdsc, inode, h, peer, session,
&cap, &extra_info.issued);
handle_cap_grant(inode, session, cap,
@@ -4008,7 +4145,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
break;
case CEPH_CAP_OP_TRUNC:
- handle_cap_trunc(inode, h, session);
+ queue_trunc = handle_cap_trunc(inode, h, session);
+ spin_unlock(&ci->i_ceph_lock);
+ if (queue_trunc)
+ ceph_queue_vmtruncate(inode);
break;
default:
@@ -4047,13 +4187,10 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
{
struct inode *inode;
struct ceph_inode_info *ci;
- int flags = CHECK_CAPS_NODELAY;
dout("check_delayed_caps\n");
- while (1) {
- spin_lock(&mdsc->cap_delay_lock);
- if (list_empty(&mdsc->cap_delay_list))
- break;
+ spin_lock(&mdsc->cap_delay_lock);
+ while (!list_empty(&mdsc->cap_delay_list)) {
ci = list_first_entry(&mdsc->cap_delay_list,
struct ceph_inode_info,
i_cap_delay_list);
@@ -4063,13 +4200,13 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
list_del_init(&ci->i_cap_delay_list);
inode = igrab(&ci->vfs_inode);
- spin_unlock(&mdsc->cap_delay_lock);
-
if (inode) {
+ spin_unlock(&mdsc->cap_delay_lock);
dout("check_delayed_caps on %p\n", inode);
- ceph_check_caps(ci, flags, NULL);
+ ceph_check_caps(ci, 0, NULL);
/* avoid calling iput_final() in tick thread */
ceph_async_iput(inode);
+ spin_lock(&mdsc->cap_delay_lock);
}
}
spin_unlock(&mdsc->cap_delay_lock);
@@ -4078,21 +4215,22 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
/*
* Flush all dirty caps to the mds
*/
-void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
+static void flush_dirty_session_caps(struct ceph_mds_session *s)
{
+ struct ceph_mds_client *mdsc = s->s_mdsc;
struct ceph_inode_info *ci;
struct inode *inode;
dout("flush_dirty_caps\n");
spin_lock(&mdsc->cap_dirty_lock);
- while (!list_empty(&mdsc->cap_dirty)) {
- ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info,
+ while (!list_empty(&s->s_cap_dirty)) {
+ ci = list_first_entry(&s->s_cap_dirty, struct ceph_inode_info,
i_dirty_item);
inode = &ci->vfs_inode;
ihold(inode);
dout("flush_dirty_caps %p\n", inode);
spin_unlock(&mdsc->cap_dirty_lock);
- ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, NULL);
+ ceph_check_caps(ci, CHECK_CAPS_FLUSH, NULL);
iput(inode);
spin_lock(&mdsc->cap_dirty_lock);
}
@@ -4100,14 +4238,60 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
dout("flush_dirty_caps done\n");
}
-void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode)
+static void iterate_sessions(struct ceph_mds_client *mdsc,
+ void (*cb)(struct ceph_mds_session *))
+{
+ int mds;
+
+ mutex_lock(&mdsc->mutex);
+ for (mds = 0; mds < mdsc->max_sessions; ++mds) {
+ struct ceph_mds_session *s;
+
+ if (!mdsc->sessions[mds])
+ continue;
+
+ s = ceph_get_mds_session(mdsc->sessions[mds]);
+ if (!s)
+ continue;
+
+ mutex_unlock(&mdsc->mutex);
+ cb(s);
+ ceph_put_mds_session(s);
+ mutex_lock(&mdsc->mutex);
+ }
+ mutex_unlock(&mdsc->mutex);
+}
+
+void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
+{
+ iterate_sessions(mdsc, flush_dirty_session_caps);
+}
+
+void __ceph_touch_fmode(struct ceph_inode_info *ci,
+ struct ceph_mds_client *mdsc, int fmode)
+{
+ unsigned long now = jiffies;
+ if (fmode & CEPH_FILE_MODE_RD)
+ ci->i_last_rd = now;
+ if (fmode & CEPH_FILE_MODE_WR)
+ ci->i_last_wr = now;
+ /* queue periodic check */
+ if (fmode &&
+ __ceph_is_any_real_caps(ci) &&
+ list_empty(&ci->i_cap_delay_list))
+ __cap_delay_requeue(mdsc, ci);
+}
+
+void ceph_get_fmode(struct ceph_inode_info *ci, int fmode, int count)
{
int i;
int bits = (fmode << 1) | 1;
+ spin_lock(&ci->i_ceph_lock);
for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
if (bits & (1 << i))
- ci->i_nr_by_mode[i]++;
+ ci->i_nr_by_mode[i] += count;
}
+ spin_unlock(&ci->i_ceph_lock);
}
/*
@@ -4115,26 +4299,18 @@ void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode)
* we may need to release capabilities to the MDS (or schedule
* their delayed release).
*/
-void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
+void ceph_put_fmode(struct ceph_inode_info *ci, int fmode, int count)
{
- int i, last = 0;
+ int i;
int bits = (fmode << 1) | 1;
spin_lock(&ci->i_ceph_lock);
for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
if (bits & (1 << i)) {
- BUG_ON(ci->i_nr_by_mode[i] == 0);
- if (--ci->i_nr_by_mode[i] == 0)
- last++;
+ BUG_ON(ci->i_nr_by_mode[i] < count);
+ ci->i_nr_by_mode[i] -= count;
}
}
- dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n",
- &ci->vfs_inode, fmode,
- ci->i_nr_by_mode[0], ci->i_nr_by_mode[1],
- ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]);
spin_unlock(&ci->i_ceph_lock);
-
- if (last && ci->i_vino.snap == CEPH_NOSNAP)
- ceph_check_caps(ci, 0, NULL);
}
/*
@@ -4152,7 +4328,6 @@ int ceph_drop_caps_for_unlink(struct inode *inode)
if (inode->i_nlink == 1) {
drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
- ci->i_ceph_flags |= CEPH_I_NODELAY;
if (__ceph_caps_dirty(ci)) {
struct ceph_mds_client *mdsc =
ceph_inode_to_client(inode)->mdsc;
@@ -4208,8 +4383,6 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
if (force || (cap->issued & drop)) {
if (cap->issued & drop) {
int wanted = __ceph_caps_wanted(ci);
- if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
- wanted |= cap->mds_wanted;
dout("encode_inode_release %p cap %p "
"%s -> %s, wanted %s -> %s\n", inode, cap,
ceph_cap_string(cap->issued),
@@ -4220,6 +4393,9 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
cap->issued &= ~drop;
cap->implemented &= ~drop;
cap->mds_wanted = wanted;
+ if (cap == ci->i_auth_cap &&
+ !(wanted & CEPH_CAP_ANY_FILE_WR))
+ ci->i_requested_max_size = 0;
} else {
dout("encode_inode_release %p cap %p %s"
" (force)\n", inode, cap,
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index fb7cabd98e7b..3e3fcda9b276 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -7,6 +7,8 @@
#include <linux/ctype.h>
#include <linux/debugfs.h>
#include <linux/seq_file.h>
+#include <linux/math64.h>
+#include <linux/ktime.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/mon_client.h>
@@ -18,6 +20,7 @@
#ifdef CONFIG_DEBUG_FS
#include "mds_client.h"
+#include "metric.h"
static int mdsmap_show(struct seq_file *s, void *p)
{
@@ -124,11 +127,82 @@ static int mdsc_show(struct seq_file *s, void *p)
return 0;
}
+#define CEPH_METRIC_SHOW(name, total, avg, min, max, sq) { \
+ s64 _total, _avg, _min, _max, _sq, _st; \
+ _avg = ktime_to_us(avg); \
+ _min = ktime_to_us(min == KTIME_MAX ? 0 : min); \
+ _max = ktime_to_us(max); \
+ _total = total - 1; \
+ _sq = _total > 0 ? DIV64_U64_ROUND_CLOSEST(sq, _total) : 0; \
+ _st = int_sqrt64(_sq); \
+ _st = ktime_to_us(_st); \
+ seq_printf(s, "%-14s%-12lld%-16lld%-16lld%-16lld%lld\n", \
+ name, total, _avg, _min, _max, _st); \
+}
+
+static int metric_show(struct seq_file *s, void *p)
+{
+ struct ceph_fs_client *fsc = s->private;
+ struct ceph_mds_client *mdsc = fsc->mdsc;
+ struct ceph_client_metric *m = &mdsc->metric;
+ int nr_caps = 0;
+ s64 total, sum, avg, min, max, sq;
+
+ seq_printf(s, "item total avg_lat(us) min_lat(us) max_lat(us) stdev(us)\n");
+ seq_printf(s, "-----------------------------------------------------------------------------------\n");
+
+ spin_lock(&m->read_latency_lock);
+ total = m->total_reads;
+ sum = m->read_latency_sum;
+ avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
+ min = m->read_latency_min;
+ max = m->read_latency_max;
+ sq = m->read_latency_sq_sum;
+ spin_unlock(&m->read_latency_lock);
+ CEPH_METRIC_SHOW("read", total, avg, min, max, sq);
+
+ spin_lock(&m->write_latency_lock);
+ total = m->total_writes;
+ sum = m->write_latency_sum;
+ avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
+ min = m->write_latency_min;
+ max = m->write_latency_max;
+ sq = m->write_latency_sq_sum;
+ spin_unlock(&m->write_latency_lock);
+ CEPH_METRIC_SHOW("write", total, avg, min, max, sq);
+
+ spin_lock(&m->metadata_latency_lock);
+ total = m->total_metadatas;
+ sum = m->metadata_latency_sum;
+ avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
+ min = m->metadata_latency_min;
+ max = m->metadata_latency_max;
+ sq = m->metadata_latency_sq_sum;
+ spin_unlock(&m->metadata_latency_lock);
+ CEPH_METRIC_SHOW("metadata", total, avg, min, max, sq);
+
+ seq_printf(s, "\n");
+ seq_printf(s, "item total miss hit\n");
+ seq_printf(s, "-------------------------------------------------\n");
+
+ seq_printf(s, "%-14s%-16lld%-16lld%lld\n", "d_lease",
+ atomic64_read(&m->total_dentries),
+ percpu_counter_sum(&m->d_lease_mis),
+ percpu_counter_sum(&m->d_lease_hit));
+
+ nr_caps = atomic64_read(&m->total_caps);
+ seq_printf(s, "%-14s%-16d%-16lld%lld\n", "caps", nr_caps,
+ percpu_counter_sum(&m->i_caps_mis),
+ percpu_counter_sum(&m->i_caps_hit));
+
+ return 0;
+}
+
static int caps_show_cb(struct inode *inode, struct ceph_cap *cap, void *p)
{
struct seq_file *s = p;
- seq_printf(s, "0x%-17lx%-17s%-17s\n", inode->i_ino,
+ seq_printf(s, "0x%-17llx%-17s%-17s\n", ceph_ino(inode),
ceph_cap_string(cap->issued),
ceph_cap_string(cap->implemented));
return 0;
@@ -173,7 +247,7 @@ static int caps_show(struct seq_file *s, void *p)
spin_lock(&mdsc->caps_list_lock);
list_for_each_entry(cw, &mdsc->cap_wait_list, list) {
- seq_printf(s, "%-13d0x%-17lx%-17s%-17s\n", cw->tgid, cw->ino,
+ seq_printf(s, "%-13d0x%-17llx%-17s%-17s\n", cw->tgid, cw->ino,
ceph_cap_string(cw->need),
ceph_cap_string(cw->want));
}
@@ -188,7 +262,7 @@ static int mds_sessions_show(struct seq_file *s, void *ptr)
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_auth_client *ac = fsc->client->monc.auth;
struct ceph_options *opt = fsc->client->options;
- int mds = -1;
+ int mds;
mutex_lock(&mdsc->mutex);
@@ -218,10 +292,11 @@ static int mds_sessions_show(struct seq_file *s, void *ptr)
return 0;
}
-CEPH_DEFINE_SHOW_FUNC(mdsmap_show)
-CEPH_DEFINE_SHOW_FUNC(mdsc_show)
-CEPH_DEFINE_SHOW_FUNC(caps_show)
-CEPH_DEFINE_SHOW_FUNC(mds_sessions_show)
+DEFINE_SHOW_ATTRIBUTE(mdsmap);
+DEFINE_SHOW_ATTRIBUTE(mdsc);
+DEFINE_SHOW_ATTRIBUTE(caps);
+DEFINE_SHOW_ATTRIBUTE(mds_sessions);
+DEFINE_SHOW_ATTRIBUTE(metric);
/*
@@ -255,6 +330,7 @@ void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc)
debugfs_remove(fsc->debugfs_mdsmap);
debugfs_remove(fsc->debugfs_mds_sessions);
debugfs_remove(fsc->debugfs_caps);
+ debugfs_remove(fsc->debugfs_metric);
debugfs_remove(fsc->debugfs_mdsc);
}
@@ -271,7 +347,7 @@ void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
&congestion_kb_fops);
snprintf(name, sizeof(name), "../../bdi/%s",
- dev_name(fsc->sb->s_bdi->dev));
+ bdi_dev_name(fsc->sb->s_bdi));
fsc->debugfs_bdi =
debugfs_create_symlink("bdi",
fsc->client->debugfs_dir,
@@ -281,25 +357,31 @@ void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
0400,
fsc->client->debugfs_dir,
fsc,
- &mdsmap_show_fops);
+ &mdsmap_fops);
fsc->debugfs_mds_sessions = debugfs_create_file("mds_sessions",
0400,
fsc->client->debugfs_dir,
fsc,
- &mds_sessions_show_fops);
+ &mds_sessions_fops);
fsc->debugfs_mdsc = debugfs_create_file("mdsc",
0400,
fsc->client->debugfs_dir,
fsc,
- &mdsc_show_fops);
+ &mdsc_fops);
+
+ fsc->debugfs_metric = debugfs_create_file("metrics",
+ 0400,
+ fsc->client->debugfs_dir,
+ fsc,
+ &metric_fops);
fsc->debugfs_caps = debugfs_create_file("caps",
- 0400,
- fsc->client->debugfs_dir,
- fsc,
- &caps_show_fops);
+ 0400,
+ fsc->client->debugfs_dir,
+ fsc,
+ &caps_fops);
}
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index d0cd0aba5843..d72e4a12bb69 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -38,6 +38,8 @@ static int __dir_lease_try_check(const struct dentry *dentry);
static int ceph_d_init(struct dentry *dentry)
{
struct ceph_dentry_info *di;
+ struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
+ struct ceph_mds_client *mdsc = fsc->mdsc;
di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
if (!di)
@@ -48,6 +50,9 @@ static int ceph_d_init(struct dentry *dentry)
di->time = jiffies;
dentry->d_fsdata = di;
INIT_LIST_HEAD(&di->lease_list);
+
+ atomic64_inc(&mdsc->metric.total_dentries);
+
return 0;
}
@@ -254,9 +259,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
dentry, dentry, d_inode(dentry));
ctx->pos = di->offset;
if (!dir_emit(ctx, dentry->d_name.name,
- dentry->d_name.len,
- ceph_translate_ino(dentry->d_sb,
- d_inode(dentry)->i_ino),
+ dentry->d_name.len, ceph_present_inode(d_inode(dentry)),
d_inode(dentry)->i_mode >> 12)) {
dput(dentry);
err = 0;
@@ -319,30 +322,37 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
/* always start with . and .. */
if (ctx->pos == 0) {
dout("readdir off 0 -> '.'\n");
- if (!dir_emit(ctx, ".", 1,
- ceph_translate_ino(inode->i_sb, inode->i_ino),
+ if (!dir_emit(ctx, ".", 1, ceph_present_inode(inode),
inode->i_mode >> 12))
return 0;
ctx->pos = 1;
}
if (ctx->pos == 1) {
- ino_t ino = parent_ino(file->f_path.dentry);
+ u64 ino;
+ struct dentry *dentry = file->f_path.dentry;
+
+ spin_lock(&dentry->d_lock);
+ ino = ceph_present_inode(dentry->d_parent->d_inode);
+ spin_unlock(&dentry->d_lock);
+
dout("readdir off 1 -> '..'\n");
- if (!dir_emit(ctx, "..", 2,
- ceph_translate_ino(inode->i_sb, ino),
- inode->i_mode >> 12))
+ if (!dir_emit(ctx, "..", 2, ino, inode->i_mode >> 12))
return 0;
ctx->pos = 2;
}
- /* can we use the dcache? */
spin_lock(&ci->i_ceph_lock);
+ /* request Fx cap. if have Fx, we don't need to release Fs cap
+ * for later create/unlink. */
+ __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_WR);
+ /* can we use the dcache? */
if (ceph_test_mount_opt(fsc, DCACHE) &&
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
ceph_snap(inode) != CEPH_SNAPDIR &&
__ceph_dir_is_complete_ordered(ci) &&
- __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
+ __ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
int shared_gen = atomic_read(&ci->i_shared_gen);
+
spin_unlock(&ci->i_ceph_lock);
err = __dcache_readdir(file, ctx, shared_gen);
if (err != -EAGAIN)
@@ -498,9 +508,6 @@ more:
}
for (; i < rinfo->dir_nr; i++) {
struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
- struct ceph_vino vino;
- ino_t ino;
- u32 ftype;
BUG_ON(rde->offset < ctx->pos);
@@ -510,13 +517,10 @@ more:
rde->name_len, rde->name, &rde->inode.in);
BUG_ON(!rde->inode.in);
- ftype = le32_to_cpu(rde->inode.in->mode) >> 12;
- vino.ino = le64_to_cpu(rde->inode.in->ino);
- vino.snap = le64_to_cpu(rde->inode.in->snapid);
- ino = ceph_vino_to_ino(vino);
if (!dir_emit(ctx, rde->name, rde->name_len,
- ceph_translate_ino(inode->i_sb, ino), ftype)) {
+ ceph_present_ino(inode->i_sb, le64_to_cpu(rde->inode.in->ino)),
+ le32_to_cpu(rde->inode.in->mode) >> 12)) {
dout("filldir stopping us...\n");
return 0;
}
@@ -752,14 +756,15 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
struct ceph_dentry_info *di = ceph_dentry(dentry);
spin_lock(&ci->i_ceph_lock);
- dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags);
+ dout(" dir %p flags are 0x%lx\n", dir, ci->i_ceph_flags);
if (strncmp(dentry->d_name.name,
fsc->mount_options->snapdir_name,
dentry->d_name.len) &&
!is_root_ceph_dentry(dir, dentry) &&
ceph_test_mount_opt(fsc, DCACHE) &&
__ceph_dir_is_complete(ci) &&
- (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
+ __ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
+ __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
spin_unlock(&ci->i_ceph_lock);
dout(" dir %p complete, -ENOENT\n", dir);
d_add(dentry, NULL);
@@ -920,6 +925,10 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
req->r_num_caps = 2;
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+ if (as_ctx.pagelist) {
+ req->r_pagelist = as_ctx.pagelist;
+ as_ctx.pagelist = NULL;
+ }
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
@@ -1036,6 +1045,78 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
return err;
}
+static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ int result = req->r_err ? req->r_err :
+ le32_to_cpu(req->r_reply_info.head->result);
+
+ if (result == -EJUKEBOX)
+ goto out;
+
+ /* If op failed, mark everyone involved for errors */
+ if (result) {
+ int pathlen = 0;
+ u64 base = 0;
+ char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
+ &base, 0);
+
+ /* mark error on parent + clear complete */
+ mapping_set_error(req->r_parent->i_mapping, result);
+ ceph_dir_clear_complete(req->r_parent);
+
+ /* drop the dentry -- we don't know its status */
+ if (!d_unhashed(req->r_dentry))
+ d_drop(req->r_dentry);
+
+ /* mark inode itself for an error (since metadata is bogus) */
+ mapping_set_error(req->r_old_inode->i_mapping, result);
+
+ pr_warn("ceph: async unlink failure path=(%llx)%s result=%d!\n",
+ base, IS_ERR(path) ? "<<bad>>" : path, result);
+ ceph_mdsc_free_path(path, pathlen);
+ }
+out:
+ iput(req->r_old_inode);
+ ceph_mdsc_release_dir_caps(req);
+}
+
+static int get_caps_for_async_unlink(struct inode *dir, struct dentry *dentry)
+{
+ struct ceph_inode_info *ci = ceph_inode(dir);
+ struct ceph_dentry_info *di;
+ int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_UNLINK;
+
+ spin_lock(&ci->i_ceph_lock);
+ if ((__ceph_caps_issued(ci, NULL) & want) == want) {
+ ceph_take_cap_refs(ci, want, false);
+ got = want;
+ }
+ spin_unlock(&ci->i_ceph_lock);
+
+ /* If we didn't get anything, return 0 */
+ if (!got)
+ return 0;
+
+ spin_lock(&dentry->d_lock);
+ di = ceph_dentry(dentry);
+ /*
+ * - We are holding Fx, which implies Fs caps.
+ * - Only support async unlink for primary linkage
+ */
+ if (atomic_read(&ci->i_shared_gen) != di->lease_shared_gen ||
+ !(di->flags & CEPH_DENTRY_PRIMARY_LINK))
+ want = 0;
+ spin_unlock(&dentry->d_lock);
+
+ /* Do we still want what we've got? */
+ if (want == got)
+ return got;
+
+ ceph_put_cap_refs(ci, got);
+ return 0;
+}
+
/*
* rmdir and unlink are differ only by the metadata op code
*/
@@ -1045,6 +1126,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
struct ceph_mds_client *mdsc = fsc->mdsc;
struct inode *inode = d_inode(dentry);
struct ceph_mds_request *req;
+ bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS);
int err = -EROFS;
int op;
@@ -1059,6 +1141,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
} else
goto out;
+retry:
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
if (IS_ERR(req)) {
err = PTR_ERR(req);
@@ -1067,13 +1150,39 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
req->r_parent = dir;
- set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
req->r_inode_drop = ceph_drop_caps_for_unlink(inode);
- err = ceph_mdsc_do_request(mdsc, dir, req);
- if (!err && !req->r_reply_info.head->is_dentry)
- d_delete(dentry);
+
+ if (try_async && op == CEPH_MDS_OP_UNLINK &&
+ (req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) {
+ dout("async unlink on %llu/%.*s caps=%s", ceph_ino(dir),
+ dentry->d_name.len, dentry->d_name.name,
+ ceph_cap_string(req->r_dir_caps));
+ set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
+ req->r_callback = ceph_async_unlink_cb;
+ req->r_old_inode = d_inode(dentry);
+ ihold(req->r_old_inode);
+ err = ceph_mdsc_submit_request(mdsc, dir, req);
+ if (!err) {
+ /*
+ * We have enough caps, so we assume that the unlink
+ * will succeed. Fix up the target inode and dcache.
+ */
+ drop_nlink(inode);
+ d_delete(dentry);
+ } else if (err == -EJUKEBOX) {
+ try_async = false;
+ ceph_mdsc_put_request(req);
+ goto retry;
+ }
+ } else {
+ set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
+ err = ceph_mdsc_do_request(mdsc, dir, req);
+ if (!err && !req->r_reply_info.head->is_dentry)
+ d_delete(dentry);
+ }
+
ceph_mdsc_put_request(req);
out:
return err;
@@ -1099,11 +1208,12 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
op = CEPH_MDS_OP_RENAMESNAP;
else
return -EROFS;
+ } else if (old_dir != new_dir) {
+ err = ceph_quota_check_rename(mdsc, d_inode(old_dentry),
+ new_dir);
+ if (err)
+ return err;
}
- /* don't allow cross-quota renames */
- if ((old_dir != new_dir) &&
- (!ceph_quota_is_same_realm(old_dir, new_dir)))
- return -EXDEV;
dout("rename dir %p dentry %p to dir %p dentry %p\n",
old_dir, old_dentry, new_dir, new_dentry);
@@ -1411,6 +1521,7 @@ void ceph_invalidate_dentry_lease(struct dentry *dentry)
spin_lock(&dentry->d_lock);
di->time = jiffies;
di->lease_shared_gen = 0;
+ di->flags &= ~CEPH_DENTRY_PRIMARY_LINK;
__dentry_lease_unlist(di);
spin_unlock(&dentry->d_lock);
}
@@ -1520,7 +1631,8 @@ static int __dir_lease_try_check(const struct dentry *dentry)
/*
* Check if directory-wide content lease/cap is valid.
*/
-static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
+static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry,
+ struct ceph_mds_client *mdsc)
{
struct ceph_inode_info *ci = ceph_inode(dir);
int valid;
@@ -1528,7 +1640,10 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
spin_lock(&ci->i_ceph_lock);
valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
- shared_gen = atomic_read(&ci->i_shared_gen);
+ if (valid) {
+ __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
+ shared_gen = atomic_read(&ci->i_shared_gen);
+ }
spin_unlock(&ci->i_ceph_lock);
if (valid) {
struct ceph_dentry_info *di;
@@ -1554,6 +1669,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
int valid = 0;
struct dentry *parent;
struct inode *dir, *inode;
+ struct ceph_mds_client *mdsc;
if (flags & LOOKUP_RCU) {
parent = READ_ONCE(dentry->d_parent);
@@ -1570,6 +1686,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
dout("d_revalidate %p '%pd' inode %p offset 0x%llx\n", dentry,
dentry, inode, ceph_dentry(dentry)->offset);
+ mdsc = ceph_sb_to_client(dir->i_sb)->mdsc;
+
/* always trust cached snapped dentries, snapdir dentry */
if (ceph_snap(dir) != CEPH_NOSNAP) {
dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry,
@@ -1581,7 +1699,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
valid = dentry_lease_is_valid(dentry, flags);
if (valid == -ECHILD)
return valid;
- if (valid || dir_lease_is_valid(dir, dentry)) {
+ if (valid || dir_lease_is_valid(dir, dentry, mdsc)) {
if (inode)
valid = ceph_is_any_caps(inode);
else
@@ -1590,8 +1708,6 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
}
if (!valid) {
- struct ceph_mds_client *mdsc =
- ceph_sb_to_client(dir->i_sb)->mdsc;
struct ceph_mds_request *req;
int op, err;
u32 mask;
@@ -1599,6 +1715,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
if (flags & LOOKUP_RCU)
return -ECHILD;
+ percpu_counter_inc(&mdsc->metric.d_lease_mis);
+
op = ceph_snap(dir) == CEPH_SNAPDIR ?
CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
@@ -1622,7 +1740,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
case -ENOENT:
if (d_really_is_negative(dentry))
valid = 1;
- /* Fallthrough */
+ fallthrough;
default:
break;
}
@@ -1630,6 +1748,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
dout("d_revalidate %p lookup result=%d\n",
dentry, err);
}
+ } else {
+ percpu_counter_inc(&mdsc->metric.d_lease_hit);
}
dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
@@ -1672,9 +1792,12 @@ static int ceph_d_delete(const struct dentry *dentry)
static void ceph_d_release(struct dentry *dentry)
{
struct ceph_dentry_info *di = ceph_dentry(dentry);
+ struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
dout("d_release %p\n", dentry);
+ atomic64_dec(&fsc->mdsc->metric.total_dentries);
+
spin_lock(&dentry->d_lock);
__dentry_lease_unlist(di);
dentry->d_fsdata = NULL;
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index b6bfa94332c3..e088843a7734 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -172,9 +172,16 @@ struct inode *ceph_lookup_inode(struct super_block *sb, u64 ino)
static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
{
struct inode *inode = __lookup_inode(sb, ino);
+ int err;
+
if (IS_ERR(inode))
return ERR_CAST(inode);
- if (inode->i_nlink == 0) {
+ /* We need LINK caps to reliably check i_nlink */
+ err = ceph_do_getattr(inode, CEPH_CAP_LINK_SHARED, false);
+ if (err)
+ return ERR_PTR(err);
+ /* -ESTALE if inode as been unlinked and no file is open */
+ if ((inode->i_nlink == 0) && (atomic_read(&inode->i_count) == 1)) {
iput(inode);
return ERR_PTR(-ESTALE);
}
@@ -315,6 +322,11 @@ static struct dentry *__get_parent(struct super_block *sb,
req->r_num_caps = 1;
err = ceph_mdsc_do_request(mdsc, NULL, req);
+ if (err) {
+ ceph_mdsc_put_request(req);
+ return ERR_PTR(err);
+ }
+
inode = req->r_target_inode;
if (inode)
ihold(inode);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 7e0190b1f821..3f4c993dfc6f 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -11,11 +11,13 @@
#include <linux/writeback.h>
#include <linux/falloc.h>
#include <linux/iversion.h>
+#include <linux/ktime.h>
#include "super.h"
#include "mds_client.h"
#include "cache.h"
#include "io.h"
+#include "metric.h"
static __le32 ceph_flags_sys2wire(u32 flags)
{
@@ -212,10 +214,8 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
if (isdir) {
struct ceph_dir_file_info *dfi =
kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL);
- if (!dfi) {
- ceph_put_fmode(ci, fmode); /* clean up */
+ if (!dfi)
return -ENOMEM;
- }
file->private_data = dfi;
fi = &dfi->file_info;
@@ -223,15 +223,15 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
dfi->readdir_cache_idx = -1;
} else {
fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
- if (!fi) {
- ceph_put_fmode(ci, fmode); /* clean up */
+ if (!fi)
return -ENOMEM;
- }
file->private_data = fi;
}
+ ceph_get_fmode(ci, fmode, 1);
fi->fmode = fmode;
+
spin_lock_init(&fi->rw_contexts_lock);
INIT_LIST_HEAD(&fi->rw_contexts);
fi->meta_err = errseq_sample(&ci->i_meta_err);
@@ -252,7 +252,7 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
case S_IFREG:
ceph_fscache_register_inode_cookie(inode);
ceph_fscache_file_set_cookie(inode, file);
- /* fall through */
+ fallthrough;
case S_IFDIR:
ret = ceph_init_file_info(inode, file, fmode,
S_ISDIR(inode->i_mode));
@@ -263,7 +263,6 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
case S_IFLNK:
dout("init_file %p %p 0%o (symlink)\n", inode, file,
inode->i_mode);
- ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
break;
default:
@@ -273,7 +272,6 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
* we need to drop the open ref now, since we don't
* have .release set to ceph_release.
*/
- ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
BUG_ON(inode->i_fop->release == ceph_release);
/* call the proper open fop */
@@ -285,14 +283,15 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
/*
* try renew caps after session gets killed.
*/
-int ceph_renew_caps(struct inode *inode)
+int ceph_renew_caps(struct inode *inode, int fmode)
{
- struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
+ struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_request *req;
int err, flags, wanted;
spin_lock(&ci->i_ceph_lock);
+ __ceph_touch_fmode(ci, mdsc, fmode);
wanted = __ceph_caps_file_wanted(ci);
if (__ceph_is_any_real_caps(ci) &&
(!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) {
@@ -326,7 +325,6 @@ int ceph_renew_caps(struct inode *inode)
req->r_inode = inode;
ihold(inode);
req->r_num_caps = 1;
- req->r_fmode = -1;
err = ceph_mdsc_do_request(mdsc, NULL, req);
ceph_mdsc_put_request(req);
@@ -372,9 +370,6 @@ int ceph_open(struct inode *inode, struct file *file)
/* trivially open snapdir */
if (ceph_snap(inode) == CEPH_SNAPDIR) {
- spin_lock(&ci->i_ceph_lock);
- __ceph_get_fmode(ci, fmode);
- spin_unlock(&ci->i_ceph_lock);
return ceph_init_file(inode, file, fmode);
}
@@ -392,7 +387,7 @@ int ceph_open(struct inode *inode, struct file *file)
dout("open %p fmode %d want %s issued %s using existing\n",
inode, fmode, ceph_cap_string(wanted),
ceph_cap_string(issued));
- __ceph_get_fmode(ci, fmode);
+ __ceph_touch_fmode(ci, mdsc, fmode);
spin_unlock(&ci->i_ceph_lock);
/* adjust wanted? */
@@ -404,7 +399,7 @@ int ceph_open(struct inode *inode, struct file *file)
return ceph_init_file(inode, file, fmode);
} else if (ceph_snap(inode) != CEPH_NOSNAP &&
(ci->i_snap_caps & wanted) == wanted) {
- __ceph_get_fmode(ci, fmode);
+ __ceph_touch_fmode(ci, mdsc, fmode);
spin_unlock(&ci->i_ceph_lock);
return ceph_init_file(inode, file, fmode);
}
@@ -430,6 +425,236 @@ out:
return err;
}
+/* Clone the layout from a synchronous create, if the dir now has Dc caps */
+static void
+cache_file_layout(struct inode *dst, struct inode *src)
+{
+ struct ceph_inode_info *cdst = ceph_inode(dst);
+ struct ceph_inode_info *csrc = ceph_inode(src);
+
+ spin_lock(&cdst->i_ceph_lock);
+ if ((__ceph_caps_issued(cdst, NULL) & CEPH_CAP_DIR_CREATE) &&
+ !ceph_file_layout_is_valid(&cdst->i_cached_layout)) {
+ memcpy(&cdst->i_cached_layout, &csrc->i_layout,
+ sizeof(cdst->i_cached_layout));
+ rcu_assign_pointer(cdst->i_cached_layout.pool_ns,
+ ceph_try_get_string(csrc->i_layout.pool_ns));
+ }
+ spin_unlock(&cdst->i_ceph_lock);
+}
+
+/*
+ * Try to set up an async create. We need caps, a file layout, and inode number,
+ * and either a lease on the dentry or complete dir info. If any of those
+ * criteria are not satisfied, then return false and the caller can go
+ * synchronous.
+ */
+static int try_prep_async_create(struct inode *dir, struct dentry *dentry,
+ struct ceph_file_layout *lo, u64 *pino)
+{
+ struct ceph_inode_info *ci = ceph_inode(dir);
+ struct ceph_dentry_info *di = ceph_dentry(dentry);
+ int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_CREATE;
+ u64 ino;
+
+ spin_lock(&ci->i_ceph_lock);
+ /* No auth cap means no chance for Dc caps */
+ if (!ci->i_auth_cap)
+ goto no_async;
+
+ /* Any delegated inos? */
+ if (xa_empty(&ci->i_auth_cap->session->s_delegated_inos))
+ goto no_async;
+
+ if (!ceph_file_layout_is_valid(&ci->i_cached_layout))
+ goto no_async;
+
+ if ((__ceph_caps_issued(ci, NULL) & want) != want)
+ goto no_async;
+
+ if (d_in_lookup(dentry)) {
+ if (!__ceph_dir_is_complete(ci))
+ goto no_async;
+ spin_lock(&dentry->d_lock);
+ di->lease_shared_gen = atomic_read(&ci->i_shared_gen);
+ spin_unlock(&dentry->d_lock);
+ } else if (atomic_read(&ci->i_shared_gen) !=
+ READ_ONCE(di->lease_shared_gen)) {
+ goto no_async;
+ }
+
+ ino = ceph_get_deleg_ino(ci->i_auth_cap->session);
+ if (!ino)
+ goto no_async;
+
+ *pino = ino;
+ ceph_take_cap_refs(ci, want, false);
+ memcpy(lo, &ci->i_cached_layout, sizeof(*lo));
+ rcu_assign_pointer(lo->pool_ns,
+ ceph_try_get_string(ci->i_cached_layout.pool_ns));
+ got = want;
+no_async:
+ spin_unlock(&ci->i_ceph_lock);
+ return got;
+}
+
+static void restore_deleg_ino(struct inode *dir, u64 ino)
+{
+ struct ceph_inode_info *ci = ceph_inode(dir);
+ struct ceph_mds_session *s = NULL;
+
+ spin_lock(&ci->i_ceph_lock);
+ if (ci->i_auth_cap)
+ s = ceph_get_mds_session(ci->i_auth_cap->session);
+ spin_unlock(&ci->i_ceph_lock);
+ if (s) {
+ int err = ceph_restore_deleg_ino(s, ino);
+ if (err)
+ pr_warn("ceph: unable to restore delegated ino 0x%llx to session: %d\n",
+ ino, err);
+ ceph_put_mds_session(s);
+ }
+}
+
+static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ int result = req->r_err ? req->r_err :
+ le32_to_cpu(req->r_reply_info.head->result);
+
+ if (result == -EJUKEBOX)
+ goto out;
+
+ mapping_set_error(req->r_parent->i_mapping, result);
+
+ if (result) {
+ struct dentry *dentry = req->r_dentry;
+ int pathlen = 0;
+ u64 base = 0;
+ char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
+ &base, 0);
+
+ ceph_dir_clear_complete(req->r_parent);
+ if (!d_unhashed(dentry))
+ d_drop(dentry);
+
+ /* FIXME: start returning I/O errors on all accesses? */
+ pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
+ base, IS_ERR(path) ? "<<bad>>" : path, result);
+ ceph_mdsc_free_path(path, pathlen);
+ }
+
+ if (req->r_target_inode) {
+ struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
+ u64 ino = ceph_vino(req->r_target_inode).ino;
+
+ if (req->r_deleg_ino != ino)
+ pr_warn("%s: inode number mismatch! err=%d deleg_ino=0x%llx target=0x%llx\n",
+ __func__, req->r_err, req->r_deleg_ino, ino);
+ mapping_set_error(req->r_target_inode->i_mapping, result);
+
+ spin_lock(&ci->i_ceph_lock);
+ if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
+ ci->i_ceph_flags &= ~CEPH_I_ASYNC_CREATE;
+ wake_up_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT);
+ }
+ ceph_kick_flushing_inode_caps(req->r_session, ci);
+ spin_unlock(&ci->i_ceph_lock);
+ } else {
+ pr_warn("%s: no req->r_target_inode for 0x%llx\n", __func__,
+ req->r_deleg_ino);
+ }
+out:
+ ceph_mdsc_release_dir_caps(req);
+}
+
+static int ceph_finish_async_create(struct inode *dir, struct dentry *dentry,
+ struct file *file, umode_t mode,
+ struct ceph_mds_request *req,
+ struct ceph_acl_sec_ctx *as_ctx,
+ struct ceph_file_layout *lo)
+{
+ int ret;
+ char xattr_buf[4];
+ struct ceph_mds_reply_inode in = { };
+ struct ceph_mds_reply_info_in iinfo = { .in = &in };
+ struct ceph_inode_info *ci = ceph_inode(dir);
+ struct inode *inode;
+ struct timespec64 now;
+ struct ceph_vino vino = { .ino = req->r_deleg_ino,
+ .snap = CEPH_NOSNAP };
+
+ ktime_get_real_ts64(&now);
+
+ inode = ceph_get_inode(dentry->d_sb, vino);
+ if (IS_ERR(inode))
+ return PTR_ERR(inode);
+
+ iinfo.inline_version = CEPH_INLINE_NONE;
+ iinfo.change_attr = 1;
+ ceph_encode_timespec64(&iinfo.btime, &now);
+
+ iinfo.xattr_len = ARRAY_SIZE(xattr_buf);
+ iinfo.xattr_data = xattr_buf;
+ memset(iinfo.xattr_data, 0, iinfo.xattr_len);
+
+ in.ino = cpu_to_le64(vino.ino);
+ in.snapid = cpu_to_le64(CEPH_NOSNAP);
+ in.version = cpu_to_le64(1); // ???
+ in.cap.caps = in.cap.wanted = cpu_to_le32(CEPH_CAP_ALL_FILE);
+ in.cap.cap_id = cpu_to_le64(1);
+ in.cap.realm = cpu_to_le64(ci->i_snap_realm->ino);
+ in.cap.flags = CEPH_CAP_FLAG_AUTH;
+ in.ctime = in.mtime = in.atime = iinfo.btime;
+ in.mode = cpu_to_le32((u32)mode);
+ in.truncate_seq = cpu_to_le32(1);
+ in.truncate_size = cpu_to_le64(-1ULL);
+ in.xattr_version = cpu_to_le64(1);
+ in.uid = cpu_to_le32(from_kuid(&init_user_ns, current_fsuid()));
+ in.gid = cpu_to_le32(from_kgid(&init_user_ns, dir->i_mode & S_ISGID ?
+ dir->i_gid : current_fsgid()));
+ in.nlink = cpu_to_le32(1);
+ in.max_size = cpu_to_le64(lo->stripe_unit);
+
+ ceph_file_layout_to_legacy(lo, &in.layout);
+
+ ret = ceph_fill_inode(inode, NULL, &iinfo, NULL, req->r_session,
+ req->r_fmode, NULL);
+ if (ret) {
+ dout("%s failed to fill inode: %d\n", __func__, ret);
+ ceph_dir_clear_complete(dir);
+ if (!d_unhashed(dentry))
+ d_drop(dentry);
+ if (inode->i_state & I_NEW)
+ discard_new_inode(inode);
+ } else {
+ struct dentry *dn;
+
+ dout("%s d_adding new inode 0x%llx to 0x%llx/%s\n", __func__,
+ vino.ino, ceph_ino(dir), dentry->d_name.name);
+ ceph_dir_clear_ordered(dir);
+ ceph_init_inode_acls(inode, as_ctx);
+ if (inode->i_state & I_NEW) {
+ /*
+ * If it's not I_NEW, then someone created this before
+ * we got here. Assume the server is aware of it at
+ * that point and don't worry about setting
+ * CEPH_I_ASYNC_CREATE.
+ */
+ ceph_inode(inode)->i_ceph_flags = CEPH_I_ASYNC_CREATE;
+ unlock_new_inode(inode);
+ }
+ if (d_in_lookup(dentry) || d_really_is_negative(dentry)) {
+ if (!d_unhashed(dentry))
+ d_drop(dentry);
+ dn = d_splice_alias(inode, dentry);
+ WARN_ON_ONCE(dn && dn != dentry);
+ }
+ file->f_mode |= FMODE_CREATED;
+ ret = finish_open(file, dentry, ceph_open);
+ }
+ return ret;
+}
/*
* Do a lookup + open with a single request. If we get a non-existent
@@ -443,6 +668,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
struct ceph_mds_request *req;
struct dentry *dn;
struct ceph_acl_sec_ctx as_ctx = {};
+ bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS);
int mask;
int err;
@@ -466,7 +692,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
/* If it's not being looked up, it's negative */
return -ENOENT;
}
-
+retry:
/* do the open */
req = prepare_open_request(dir->i_sb, flags, mode);
if (IS_ERR(req)) {
@@ -475,21 +701,43 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
}
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
+ mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+ if (ceph_security_xattr_wanted(dir))
+ mask |= CEPH_CAP_XATTR_SHARED;
+ req->r_args.open.mask = cpu_to_le32(mask);
+ req->r_parent = dir;
+
if (flags & O_CREAT) {
+ struct ceph_file_layout lo;
+
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
if (as_ctx.pagelist) {
req->r_pagelist = as_ctx.pagelist;
as_ctx.pagelist = NULL;
}
+ if (try_async &&
+ (req->r_dir_caps =
+ try_prep_async_create(dir, dentry, &lo,
+ &req->r_deleg_ino))) {
+ set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
+ req->r_args.open.flags |= cpu_to_le32(CEPH_O_EXCL);
+ req->r_callback = ceph_async_create_cb;
+ err = ceph_mdsc_submit_request(mdsc, dir, req);
+ if (!err) {
+ err = ceph_finish_async_create(dir, dentry,
+ file, mode, req,
+ &as_ctx, &lo);
+ } else if (err == -EJUKEBOX) {
+ restore_deleg_ino(dir, req->r_deleg_ino);
+ ceph_mdsc_put_request(req);
+ try_async = false;
+ goto retry;
+ }
+ goto out_req;
+ }
}
- mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
- if (ceph_security_xattr_wanted(dir))
- mask |= CEPH_CAP_XATTR_SHARED;
- req->r_args.open.mask = cpu_to_le32(mask);
-
- req->r_parent = dir;
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
err = ceph_mdsc_do_request(mdsc,
(flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
@@ -518,14 +766,15 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
} else {
dout("atomic_open finish_open on dn %p\n", dn);
if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
- ceph_init_inode_acls(d_inode(dentry), &as_ctx);
+ struct inode *newino = d_inode(dentry);
+
+ cache_file_layout(dir, newino);
+ ceph_init_inode_acls(newino, &as_ctx);
file->f_mode |= FMODE_CREATED;
}
err = finish_open(file, dentry, ceph_open);
}
out_req:
- if (!req->r_err && req->r_target_inode)
- ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
ceph_mdsc_put_request(req);
out_ctx:
ceph_release_acl_sec_ctx(&as_ctx);
@@ -542,7 +791,7 @@ int ceph_release(struct inode *inode, struct file *file)
dout("release inode %p dir file %p\n", inode, file);
WARN_ON(!list_empty(&dfi->file_info.rw_contexts));
- ceph_put_fmode(ci, dfi->file_info.fmode);
+ ceph_put_fmode(ci, dfi->file_info.fmode, 1);
if (dfi->last_readdir)
ceph_mdsc_put_request(dfi->last_readdir);
@@ -554,7 +803,8 @@ int ceph_release(struct inode *inode, struct file *file)
dout("release inode %p regular file %p\n", inode, file);
WARN_ON(!list_empty(&fi->rw_contexts));
- ceph_put_fmode(ci, fi->fmode);
+ ceph_put_fmode(ci, fi->fmode, 1);
+
kmem_cache_free(ceph_file_cachep, fi);
}
@@ -658,6 +908,12 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
ret = ceph_osdc_start_request(osdc, req, false);
if (!ret)
ret = ceph_osdc_wait_request(osdc, req);
+
+ ceph_update_read_latency(&fsc->mdsc->metric,
+ req->r_start_latency,
+ req->r_end_latency,
+ ret);
+
ceph_osdc_put_request(req);
i_size = i_size_read(inode);
@@ -796,6 +1052,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
struct inode *inode = req->r_inode;
struct ceph_aio_request *aio_req = req->r_priv;
struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_client_metric *metric = &fsc->mdsc->metric;
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
BUG_ON(!osd_data->num_bvecs);
@@ -803,6 +1061,16 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
dout("ceph_aio_complete_req %p rc %d bytes %u\n",
inode, rc, osd_data->bvec_pos.iter.bi_size);
+ /* r_start_latency == 0 means the request was not submitted */
+ if (req->r_start_latency) {
+ if (aio_req->write)
+ ceph_update_write_latency(metric, req->r_start_latency,
+ req->r_end_latency, rc);
+ else
+ ceph_update_read_latency(metric, req->r_start_latency,
+ req->r_end_latency, rc);
+ }
+
if (rc == -EOLDSNAPC) {
struct ceph_aio_work *aio_work;
BUG_ON(!aio_req->write);
@@ -931,6 +1199,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_client_metric *metric = &fsc->mdsc->metric;
struct ceph_vino vino;
struct ceph_osd_request *req;
struct bio_vec *bvecs;
@@ -1047,6 +1316,13 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
if (!ret)
ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+ if (write)
+ ceph_update_write_latency(metric, req->r_start_latency,
+ req->r_end_latency, ret);
+ else
+ ceph_update_read_latency(metric, req->r_start_latency,
+ req->r_end_latency, ret);
+
size = i_size_read(inode);
if (!write) {
if (ret == -ENOENT)
@@ -1218,6 +1494,8 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
if (!ret)
ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, ret);
out:
ceph_osdc_put_request(req);
if (ret != 0) {
@@ -1260,6 +1538,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
struct inode *inode = file_inode(filp);
struct ceph_inode_info *ci = ceph_inode(inode);
struct page *pinned_page = NULL;
+ bool direct_lock = iocb->ki_flags & IOCB_DIRECT;
ssize_t ret;
int want, got = 0;
int retry_op = 0, read = 0;
@@ -1268,7 +1547,7 @@ again:
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
- if (iocb->ki_flags & IOCB_DIRECT)
+ if (direct_lock)
ceph_start_io_direct(inode);
else
ceph_start_io_read(inode);
@@ -1325,7 +1604,7 @@ again:
}
ceph_put_cap_refs(ci, got);
- if (iocb->ki_flags & IOCB_DIRECT)
+ if (direct_lock)
ceph_end_io_direct(inode);
else
ceph_end_io_read(inode);
@@ -1415,10 +1694,13 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_osd_client *osdc = &fsc->client->osdc;
struct ceph_cap_flush *prealloc_cf;
ssize_t count, written = 0;
int err, want, got;
bool direct_lock = false;
+ u32 map_flags;
+ u64 pool_flags;
loff_t pos;
loff_t limit = max(i_size_read(inode), fsc->max_file_size);
@@ -1481,8 +1763,12 @@ retry_snap:
goto out;
}
- /* FIXME: not complete since it doesn't account for being at quota */
- if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL)) {
+ down_read(&osdc->lock);
+ map_flags = osdc->osdmap->flags;
+ pool_flags = ceph_pg_pool_flags(osdc->osdmap, ci->i_layout.pool_id);
+ up_read(&osdc->lock);
+ if ((map_flags & CEPH_OSDMAP_FULL) ||
+ (pool_flags & CEPH_POOL_FLAG_FULL)) {
err = -ENOSPC;
goto out;
}
@@ -1560,7 +1846,7 @@ retry_snap:
if (dirty)
__mark_inode_dirty(inode, dirty);
if (ceph_quota_is_max_bytes_approaching(inode, iocb->ki_pos))
- ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL);
+ ceph_check_caps(ci, 0, NULL);
}
dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n",
@@ -1575,7 +1861,8 @@ retry_snap:
}
if (written >= 0) {
- if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_NEARFULL))
+ if ((map_flags & CEPH_OSDMAP_NEARFULL) ||
+ (pool_flags & CEPH_POOL_FLAG_NEARFULL))
iocb->ki_flags |= IOCB_DSYNC;
written = generic_write_sync(iocb, written);
}
@@ -1936,6 +2223,71 @@ static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
return 0;
}
+static ssize_t ceph_do_objects_copy(struct ceph_inode_info *src_ci, u64 *src_off,
+ struct ceph_inode_info *dst_ci, u64 *dst_off,
+ struct ceph_fs_client *fsc,
+ size_t len, unsigned int flags)
+{
+ struct ceph_object_locator src_oloc, dst_oloc;
+ struct ceph_object_id src_oid, dst_oid;
+ size_t bytes = 0;
+ u64 src_objnum, src_objoff, dst_objnum, dst_objoff;
+ u32 src_objlen, dst_objlen;
+ u32 object_size = src_ci->i_layout.object_size;
+ int ret;
+
+ src_oloc.pool = src_ci->i_layout.pool_id;
+ src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+ dst_oloc.pool = dst_ci->i_layout.pool_id;
+ dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+
+ while (len >= object_size) {
+ ceph_calc_file_object_mapping(&src_ci->i_layout, *src_off,
+ object_size, &src_objnum,
+ &src_objoff, &src_objlen);
+ ceph_calc_file_object_mapping(&dst_ci->i_layout, *dst_off,
+ object_size, &dst_objnum,
+ &dst_objoff, &dst_objlen);
+ ceph_oid_init(&src_oid);
+ ceph_oid_printf(&src_oid, "%llx.%08llx",
+ src_ci->i_vino.ino, src_objnum);
+ ceph_oid_init(&dst_oid);
+ ceph_oid_printf(&dst_oid, "%llx.%08llx",
+ dst_ci->i_vino.ino, dst_objnum);
+ /* Do an object remote copy */
+ ret = ceph_osdc_copy_from(&fsc->client->osdc,
+ src_ci->i_vino.snap, 0,
+ &src_oid, &src_oloc,
+ CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+ CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
+ &dst_oid, &dst_oloc,
+ CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+ CEPH_OSD_OP_FLAG_FADVISE_DONTNEED,
+ dst_ci->i_truncate_seq,
+ dst_ci->i_truncate_size,
+ CEPH_OSD_COPY_FROM_FLAG_TRUNCATE_SEQ);
+ if (ret) {
+ if (ret == -EOPNOTSUPP) {
+ fsc->have_copy_from2 = false;
+ pr_notice("OSDs don't support copy-from2; disabling copy offload\n");
+ }
+ dout("ceph_osdc_copy_from returned %d\n", ret);
+ if (!bytes)
+ bytes = ret;
+ goto out;
+ }
+ len -= object_size;
+ bytes += object_size;
+ *src_off += object_size;
+ *dst_off += object_size;
+ }
+
+out:
+ ceph_oloc_destroy(&src_oloc);
+ ceph_oloc_destroy(&dst_oloc);
+ return bytes;
+}
+
static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
struct file *dst_file, loff_t dst_off,
size_t len, unsigned int flags)
@@ -1946,14 +2298,11 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
struct ceph_cap_flush *prealloc_cf;
struct ceph_fs_client *src_fsc = ceph_inode_to_client(src_inode);
- struct ceph_object_locator src_oloc, dst_oloc;
- struct ceph_object_id src_oid, dst_oid;
- loff_t endoff = 0, size;
- ssize_t ret = -EIO;
+ loff_t size;
+ ssize_t ret = -EIO, bytes;
u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
- u32 src_objlen, dst_objlen, object_size;
+ u32 src_objlen, dst_objlen;
int src_got = 0, dst_got = 0, err, dirty;
- bool do_final_copy = false;
if (src_inode->i_sb != dst_inode->i_sb) {
struct ceph_fs_client *dst_fsc = ceph_inode_to_client(dst_inode);
@@ -2031,22 +2380,14 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
if (ret < 0)
goto out_caps;
- size = i_size_read(dst_inode);
- endoff = dst_off + len;
-
/* Drop dst file cached pages */
ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
dst_off >> PAGE_SHIFT,
- endoff >> PAGE_SHIFT);
+ (dst_off + len) >> PAGE_SHIFT);
if (ret < 0) {
dout("Failed to invalidate inode pages (%zd)\n", ret);
ret = 0; /* XXX */
}
- src_oloc.pool = src_ci->i_layout.pool_id;
- src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
- dst_oloc.pool = dst_ci->i_layout.pool_id;
- dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
-
ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
src_ci->i_layout.object_size,
&src_objnum, &src_objoff, &src_objlen);
@@ -2065,6 +2406,8 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
* starting at the src_off
*/
if (src_objoff) {
+ dout("Initial partial copy of %u bytes\n", src_objlen);
+
/*
* we need to temporarily drop all caps as we'll be calling
* {read,write}_iter, which will get caps again.
@@ -2072,8 +2415,9 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
ret = do_splice_direct(src_file, &src_off, dst_file,
&dst_off, src_objlen, flags);
- if (ret < 0) {
- dout("do_splice_direct returned %d\n", err);
+ /* Abort on short copies or on error */
+ if (ret < src_objlen) {
+ dout("Failed partial copy (%zd)\n", ret);
goto out;
}
len -= ret;
@@ -2086,65 +2430,27 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
if (err < 0)
goto out_caps;
}
- object_size = src_ci->i_layout.object_size;
- while (len >= object_size) {
- ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
- object_size, &src_objnum,
- &src_objoff, &src_objlen);
- ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
- object_size, &dst_objnum,
- &dst_objoff, &dst_objlen);
- ceph_oid_init(&src_oid);
- ceph_oid_printf(&src_oid, "%llx.%08llx",
- src_ci->i_vino.ino, src_objnum);
- ceph_oid_init(&dst_oid);
- ceph_oid_printf(&dst_oid, "%llx.%08llx",
- dst_ci->i_vino.ino, dst_objnum);
- /* Do an object remote copy */
- err = ceph_osdc_copy_from(
- &src_fsc->client->osdc,
- src_ci->i_vino.snap, 0,
- &src_oid, &src_oloc,
- CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
- CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
- &dst_oid, &dst_oloc,
- CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
- CEPH_OSD_OP_FLAG_FADVISE_DONTNEED,
- dst_ci->i_truncate_seq, dst_ci->i_truncate_size,
- CEPH_OSD_COPY_FROM_FLAG_TRUNCATE_SEQ);
- if (err) {
- if (err == -EOPNOTSUPP) {
- src_fsc->have_copy_from2 = false;
- pr_notice("OSDs don't support copy-from2; disabling copy offload\n");
- }
- dout("ceph_osdc_copy_from returned %d\n", err);
- if (!ret)
- ret = err;
- goto out_caps;
- }
- len -= object_size;
- src_off += object_size;
- dst_off += object_size;
- ret += object_size;
- }
- if (len)
- /* We still need one final local copy */
- do_final_copy = true;
+ size = i_size_read(dst_inode);
+ bytes = ceph_do_objects_copy(src_ci, &src_off, dst_ci, &dst_off,
+ src_fsc, len, flags);
+ if (bytes <= 0) {
+ if (!ret)
+ ret = bytes;
+ goto out_caps;
+ }
+ dout("Copied %zu bytes out of %zu\n", bytes, len);
+ len -= bytes;
+ ret += bytes;
file_update_time(dst_file);
inode_inc_iversion_raw(dst_inode);
- if (endoff > size) {
- int caps_flags = 0;
-
+ if (dst_off > size) {
/* Let the MDS know about dst file size change */
- if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
- caps_flags |= CHECK_CAPS_NODELAY;
- if (ceph_inode_set_size(dst_inode, endoff))
- caps_flags |= CHECK_CAPS_AUTHONLY;
- if (caps_flags)
- ceph_check_caps(dst_ci, caps_flags, NULL);
+ if (ceph_inode_set_size(dst_inode, dst_off) ||
+ ceph_quota_is_max_bytes_approaching(dst_inode, dst_off))
+ ceph_check_caps(dst_ci, CHECK_CAPS_AUTHONLY, NULL);
}
/* Mark Fw dirty */
spin_lock(&dst_ci->i_ceph_lock);
@@ -2157,15 +2463,18 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
out_caps:
put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
- if (do_final_copy) {
- err = do_splice_direct(src_file, &src_off, dst_file,
- &dst_off, len, flags);
- if (err < 0) {
- dout("do_splice_direct returned %d\n", err);
- goto out;
- }
- len -= err;
- ret += err;
+ /*
+ * Do the final manual copy if we still have some bytes left, unless
+ * there were errors in remote object copies (len >= object_size).
+ */
+ if (len && (len < src_ci->i_layout.object_size)) {
+ dout("Final partial copy of %zu bytes\n", len);
+ bytes = do_splice_direct(src_file, &src_off, dst_file,
+ &dst_off, len, flags);
+ if (bytes > 0)
+ ret += bytes;
+ else
+ dout("Failed partial copy (%zd)\n", bytes);
}
out:
@@ -2198,6 +2507,7 @@ const struct file_operations ceph_file_fops = {
.mmap = ceph_mmap,
.fsync = ceph_fsync,
.lock = ceph_lock,
+ .setlease = simple_nosetlease,
.flock = ceph_flock,
.splice_read = generic_file_splice_read,
.splice_write = iter_file_splice_write,
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index d01710a16a4a..d163fa96cb40 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -41,8 +41,10 @@ static void ceph_inode_work(struct work_struct *work);
*/
static int ceph_set_ino_cb(struct inode *inode, void *data)
{
- ceph_inode(inode)->i_vino = *(struct ceph_vino *)data;
- inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data);
+ struct ceph_inode_info *ci = ceph_inode(inode);
+
+ ci->i_vino = *(struct ceph_vino *)data;
+ inode->i_ino = ceph_vino_to_ino_t(ci->i_vino);
inode_set_iversion_raw(inode, 0);
return 0;
}
@@ -50,17 +52,14 @@ static int ceph_set_ino_cb(struct inode *inode, void *data)
struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
{
struct inode *inode;
- ino_t t = ceph_vino_to_ino(vino);
- inode = iget5_locked(sb, t, ceph_ino_compare, ceph_set_ino_cb, &vino);
+ inode = iget5_locked(sb, (unsigned long)vino.ino, ceph_ino_compare,
+ ceph_set_ino_cb, &vino);
if (!inode)
return ERR_PTR(-ENOMEM);
- if (inode->i_state & I_NEW)
- dout("get_inode created new inode %p %llx.%llx ino %llx\n",
- inode, ceph_vinop(inode), (u64)inode->i_ino);
- dout("get_inode on %lu=%llx.%llx got %p\n", inode->i_ino, vino.ino,
- vino.snap, inode);
+ dout("get_inode on %llu=%llx.%llx got %p new %d\n", ceph_present_inode(inode),
+ ceph_vinop(inode), inode, !!(inode->i_state & I_NEW));
return inode;
}
@@ -82,10 +81,14 @@ struct inode *ceph_get_snapdir(struct inode *parent)
inode->i_mode = parent->i_mode;
inode->i_uid = parent->i_uid;
inode->i_gid = parent->i_gid;
+ inode->i_mtime = parent->i_mtime;
+ inode->i_ctime = parent->i_ctime;
+ inode->i_atime = parent->i_atime;
inode->i_op = &ceph_snapdir_iops;
inode->i_fop = &ceph_snapdir_fops;
ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
ci->i_rbytes = 0;
+ ci->i_btime = ceph_inode(parent)->i_btime;
if (inode->i_state & I_NEW)
unlock_new_inode(inode);
@@ -447,6 +450,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_max_files = 0;
memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
+ memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
ci->i_fragtree = RB_ROOT;
@@ -471,13 +475,13 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_prealloc_cap_flush = NULL;
INIT_LIST_HEAD(&ci->i_cap_flush_list);
init_waitqueue_head(&ci->i_cap_wq);
- ci->i_hold_caps_min = 0;
ci->i_hold_caps_max = 0;
INIT_LIST_HEAD(&ci->i_cap_delay_list);
INIT_LIST_HEAD(&ci->i_cap_snaps);
ci->i_head_snapc = NULL;
ci->i_snap_caps = 0;
+ ci->i_last_rd = ci->i_last_wr = jiffies - 3600 * HZ;
for (i = 0; i < CEPH_FILE_MODE_BITS; i++)
ci->i_nr_by_mode[i] = 0;
@@ -496,6 +500,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_rdcache_ref = 0;
ci->i_wr_ref = 0;
ci->i_wb_ref = 0;
+ ci->i_fx_ref = 0;
ci->i_wrbuffer_ref = 0;
ci->i_wrbuffer_ref_head = 0;
atomic_set(&ci->i_filelock_ref, 0);
@@ -586,6 +591,7 @@ void ceph_evict_inode(struct inode *inode)
ceph_buffer_put(ci->i_xattrs.prealloc_blob);
ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
+ ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
}
static inline blkcnt_t calc_inode_blocks(u64 size)
@@ -636,7 +642,7 @@ int ceph_fill_file_size(struct inode *inode, int issued,
if ((issued & (CEPH_CAP_FILE_CACHE|
CEPH_CAP_FILE_BUFFER)) ||
mapping_mapped(inode->i_mapping) ||
- __ceph_caps_file_wanted(ci)) {
+ __ceph_is_file_opened(ci)) {
ci->i_truncate_pending++;
queue_trunc = 1;
}
@@ -727,11 +733,11 @@ void ceph_fill_file_time(struct inode *inode, int issued,
* Populate an inode based on info from mds. May be called on new or
* existing inodes.
*/
-static int fill_inode(struct inode *inode, struct page *locked_page,
- struct ceph_mds_reply_info_in *iinfo,
- struct ceph_mds_reply_dirfrag *dirinfo,
- struct ceph_mds_session *session, int cap_fmode,
- struct ceph_cap_reservation *caps_reservation)
+int ceph_fill_inode(struct inode *inode, struct page *locked_page,
+ struct ceph_mds_reply_info_in *iinfo,
+ struct ceph_mds_reply_dirfrag *dirinfo,
+ struct ceph_mds_session *session, int cap_fmode,
+ struct ceph_cap_reservation *caps_reservation)
{
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_mds_reply_inode *info = iinfo->in;
@@ -748,7 +754,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
bool new_version = false;
bool fill_inline = false;
- dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
+ dout("%s %p ino %llx.%llx v %llu had %llu\n", __func__,
inode, ceph_vinop(inode), le64_to_cpu(info->version),
ci->i_version);
@@ -769,7 +775,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
if (iinfo->xattr_len > 4) {
xattr_blob = ceph_buffer_new(iinfo->xattr_len, GFP_NOFS);
if (!xattr_blob)
- pr_err("fill_inode ENOMEM xattr blob %d bytes\n",
+ pr_err("%s ENOMEM xattr blob %d bytes\n", __func__,
iinfo->xattr_len);
}
@@ -932,8 +938,9 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
spin_unlock(&ci->i_ceph_lock);
if (symlen != i_size_read(inode)) {
- pr_err("fill_inode %llx.%llx BAD symlink "
- "size %lld\n", ceph_vinop(inode),
+ pr_err("%s %llx.%llx BAD symlink "
+ "size %lld\n", __func__,
+ ceph_vinop(inode),
i_size_read(inode));
i_size_write(inode, symlen);
inode->i_blocks = calc_inode_blocks(symlen);
@@ -957,7 +964,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
inode->i_fop = &ceph_dir_fops;
break;
default:
- pr_err("fill_inode %llx.%llx BAD mode 0%o\n",
+ pr_err("%s %llx.%llx BAD mode 0%o\n", __func__,
ceph_vinop(inode), inode->i_mode);
}
@@ -966,7 +973,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
if (ceph_snap(inode) == CEPH_NOSNAP) {
ceph_add_cap(inode, session,
le64_to_cpu(info->cap.cap_id),
- cap_fmode, info_caps,
+ info_caps,
le32_to_cpu(info->cap.wanted),
le32_to_cpu(info->cap.seq),
le32_to_cpu(info->cap.mseq),
@@ -991,13 +998,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
dout(" %p got snap_caps %s\n", inode,
ceph_cap_string(info_caps));
ci->i_snap_caps |= info_caps;
- if (cap_fmode >= 0)
- __ceph_get_fmode(ci, cap_fmode);
}
- } else if (cap_fmode >= 0) {
- pr_warn("mds issued no caps on %llx.%llx\n",
- ceph_vinop(inode));
- __ceph_get_fmode(ci, cap_fmode);
}
if (iinfo->inline_version > 0 &&
@@ -1009,6 +1010,13 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
fill_inline = true;
}
+ if (cap_fmode >= 0) {
+ if (!info_caps)
+ pr_warn("mds issued no caps on %llx.%llx\n",
+ ceph_vinop(inode));
+ __ceph_touch_fmode(ci, mdsc, cap_fmode);
+ }
+
spin_unlock(&ci->i_ceph_lock);
if (fill_inline)
@@ -1050,6 +1058,7 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
struct ceph_mds_session **old_lease_session)
{
struct ceph_dentry_info *di = ceph_dentry(dentry);
+ unsigned mask = le16_to_cpu(lease->mask);
long unsigned duration = le32_to_cpu(lease->duration_ms);
long unsigned ttl = from_time + (duration * HZ) / 1000;
long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
@@ -1061,8 +1070,13 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
if (ceph_snap(dir) != CEPH_NOSNAP)
return;
+ if (mask & CEPH_LEASE_PRIMARY_LINK)
+ di->flags |= CEPH_DENTRY_PRIMARY_LINK;
+ else
+ di->flags &= ~CEPH_DENTRY_PRIMARY_LINK;
+
di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen);
- if (duration == 0) {
+ if (!(mask & CEPH_LEASE_VALID)) {
__ceph_dentry_dir_lease_touch(di);
return;
}
@@ -1239,10 +1253,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
struct inode *dir = req->r_parent;
if (dir) {
- err = fill_inode(dir, NULL,
- &rinfo->diri, rinfo->dirfrag,
- session, -1,
- &req->r_caps_reservation);
+ err = ceph_fill_inode(dir, NULL, &rinfo->diri,
+ rinfo->dirfrag, session, -1,
+ &req->r_caps_reservation);
if (err < 0)
goto done;
} else {
@@ -1307,13 +1320,14 @@ retry_lookup:
goto done;
}
- err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL,
- session,
+ err = ceph_fill_inode(in, req->r_locked_page, &rinfo->targeti,
+ NULL, session,
(!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
+ !test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) &&
rinfo->head->result == 0) ? req->r_fmode : -1,
&req->r_caps_reservation);
if (err < 0) {
- pr_err("fill_inode badness %p %llx.%llx\n",
+ pr_err("ceph_fill_inode badness %p %llx.%llx\n",
in, ceph_vinop(in));
if (in->i_state & I_NEW)
discard_new_inode(in);
@@ -1500,10 +1514,11 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
dout("new_inode badness got %d\n", err);
continue;
}
- rc = fill_inode(in, NULL, &rde->inode, NULL, session,
- -1, &req->r_caps_reservation);
+ rc = ceph_fill_inode(in, NULL, &rde->inode, NULL, session,
+ -1, &req->r_caps_reservation);
if (rc < 0) {
- pr_err("fill_inode badness on %p got %d\n", in, rc);
+ pr_err("ceph_fill_inode badness on %p got %d\n",
+ in, rc);
err = rc;
if (in->i_state & I_NEW) {
ihold(in);
@@ -1707,10 +1722,10 @@ retry_lookup:
}
}
- ret = fill_inode(in, NULL, &rde->inode, NULL, session,
- -1, &req->r_caps_reservation);
+ ret = ceph_fill_inode(in, NULL, &rde->inode, NULL, session,
+ -1, &req->r_caps_reservation);
if (ret < 0) {
- pr_err("fill_inode badness on %p\n", in);
+ pr_err("ceph_fill_inode badness on %p\n", in);
if (d_really_is_negative(dn)) {
/* avoid calling iput_final() in mds
* dispatch threads */
@@ -1972,7 +1987,7 @@ retry:
mutex_unlock(&ci->i_truncate_mutex);
if (wrbuffer_refs == 0)
- ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
+ ceph_check_caps(ci, 0, NULL);
wake_up_all(&ci->i_cap_wq);
}
@@ -2272,8 +2287,8 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
dout("do_getattr inode %p mask %s mode 0%o\n",
inode, ceph_cap_string(mask), inode->i_mode);
- if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
- return 0;
+ if (!force && ceph_caps_issued_mask_metric(ceph_inode(inode), mask, 1))
+ return 0;
mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS;
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
@@ -2362,7 +2377,7 @@ int ceph_getattr(const struct path *path, struct kstat *stat,
}
generic_fillattr(inode, stat);
- stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
+ stat->ino = ceph_present_inode(inode);
/*
* btime on newly-allocated inodes is 0, so if this is still set to
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index c90f03beb15d..6e061bf62ad4 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -243,11 +243,13 @@ static long ceph_ioctl_lazyio(struct file *file)
struct ceph_file_info *fi = file->private_data;
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) {
spin_lock(&ci->i_ceph_lock);
fi->fmode |= CEPH_FILE_MODE_LAZY;
ci->i_nr_by_mode[ffs(CEPH_FILE_MODE_LAZY)]++;
+ __ceph_touch_fmode(ci, mdsc, fi->fmode);
spin_unlock(&ci->i_ceph_lock);
dout("ioctl_layzio: file %p marked lazy\n", file);
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index 544e9e85b120..d6b9166e71e4 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -210,6 +210,21 @@ static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
return 0;
}
+static int try_unlock_file(struct file *file, struct file_lock *fl)
+{
+ int err;
+ unsigned int orig_flags = fl->fl_flags;
+ fl->fl_flags |= FL_EXISTS;
+ err = locks_lock_file_wait(file, fl);
+ fl->fl_flags = orig_flags;
+ if (err == -ENOENT) {
+ if (!(orig_flags & FL_EXISTS))
+ err = 0;
+ return err;
+ }
+ return 1;
+}
+
/**
* Attempt to set an fcntl lock.
* For now, this just goes away to the server. Later it may be more awesome.
@@ -255,9 +270,15 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
else
lock_cmd = CEPH_LOCK_UNLOCK;
+ if (op == CEPH_MDS_OP_SETFILELOCK && F_UNLCK == fl->fl_type) {
+ err = try_unlock_file(file, fl);
+ if (err <= 0)
+ return err;
+ }
+
err = ceph_lock_message(CEPH_LOCK_FCNTL, op, inode, lock_cmd, wait, fl);
if (!err) {
- if (op == CEPH_MDS_OP_SETFILELOCK) {
+ if (op == CEPH_MDS_OP_SETFILELOCK && F_UNLCK != fl->fl_type) {
dout("mds locked, locking locally\n");
err = posix_lock_file(file, fl, NULL);
if (err) {
@@ -311,9 +332,15 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
else
lock_cmd = CEPH_LOCK_UNLOCK;
+ if (F_UNLCK == fl->fl_type) {
+ err = try_unlock_file(file, fl);
+ if (err <= 0)
+ return err;
+ }
+
err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK,
inode, lock_cmd, wait, fl);
- if (!err) {
+ if (!err && F_UNLCK != fl->fl_type) {
err = locks_lock_file_wait(file, fl);
if (err) {
ceph_lock_message(CEPH_LOCK_FLOCK,
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index bbbbddf71326..4a26862d7667 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -10,6 +10,7 @@
#include <linux/seq_file.h>
#include <linux/ratelimit.h>
#include <linux/bits.h>
+#include <linux/ktime.h>
#include "super.h"
#include "mds_client.h"
@@ -415,21 +416,121 @@ bad:
return -EIO;
}
+
+#if BITS_PER_LONG == 64
+
+#define DELEGATED_INO_AVAILABLE xa_mk_value(1)
+
+static int ceph_parse_deleg_inos(void **p, void *end,
+ struct ceph_mds_session *s)
+{
+ u32 sets;
+
+ ceph_decode_32_safe(p, end, sets, bad);
+ dout("got %u sets of delegated inodes\n", sets);
+ while (sets--) {
+ u64 start, len, ino;
+
+ ceph_decode_64_safe(p, end, start, bad);
+ ceph_decode_64_safe(p, end, len, bad);
+ while (len--) {
+ int err = xa_insert(&s->s_delegated_inos, ino = start++,
+ DELEGATED_INO_AVAILABLE,
+ GFP_KERNEL);
+ if (!err) {
+ dout("added delegated inode 0x%llx\n",
+ start - 1);
+ } else if (err == -EBUSY) {
+ pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
+ start - 1);
+ } else {
+ return err;
+ }
+ }
+ }
+ return 0;
+bad:
+ return -EIO;
+}
+
+u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
+{
+ unsigned long ino;
+ void *val;
+
+ xa_for_each(&s->s_delegated_inos, ino, val) {
+ val = xa_erase(&s->s_delegated_inos, ino);
+ if (val == DELEGATED_INO_AVAILABLE)
+ return ino;
+ }
+ return 0;
+}
+
+int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
+{
+ return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
+ GFP_KERNEL);
+}
+#else /* BITS_PER_LONG == 64 */
+/*
+ * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
+ * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
+ * and bottom words?
+ */
+static int ceph_parse_deleg_inos(void **p, void *end,
+ struct ceph_mds_session *s)
+{
+ u32 sets;
+
+ ceph_decode_32_safe(p, end, sets, bad);
+ if (sets)
+ ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
+ return 0;
+bad:
+ return -EIO;
+}
+
+u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
+{
+ return 0;
+}
+
+int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
+{
+ return 0;
+}
+#endif /* BITS_PER_LONG == 64 */
+
/*
* parse create results
*/
static int parse_reply_info_create(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
- u64 features)
+ u64 features, struct ceph_mds_session *s)
{
+ int ret;
+
if (features == (u64)-1 ||
(features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
- /* Malformed reply? */
if (*p == end) {
+ /* Malformed reply? */
info->has_create_ino = false;
- } else {
+ } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
+ u8 struct_v, struct_compat;
+ u32 len;
+
info->has_create_ino = true;
+ ceph_decode_8_safe(p, end, struct_v, bad);
+ ceph_decode_8_safe(p, end, struct_compat, bad);
+ ceph_decode_32_safe(p, end, len, bad);
+ ceph_decode_64_safe(p, end, info->ino, bad);
+ ret = ceph_parse_deleg_inos(p, end, s);
+ if (ret)
+ return ret;
+ } else {
+ /* legacy */
ceph_decode_64_safe(p, end, info->ino, bad);
+ info->has_create_ino = true;
}
} else {
if (*p != end)
@@ -448,7 +549,7 @@ bad:
*/
static int parse_reply_info_extra(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
- u64 features)
+ u64 features, struct ceph_mds_session *s)
{
u32 op = le32_to_cpu(info->head->op);
@@ -457,7 +558,7 @@ static int parse_reply_info_extra(void **p, void *end,
else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
return parse_reply_info_readdir(p, end, info, features);
else if (op == CEPH_MDS_OP_CREATE)
- return parse_reply_info_create(p, end, info, features);
+ return parse_reply_info_create(p, end, info, features, s);
else
return -EIO;
}
@@ -465,7 +566,7 @@ static int parse_reply_info_extra(void **p, void *end,
/*
* parse entire mds reply
*/
-static int parse_reply_info(struct ceph_msg *msg,
+static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
struct ceph_mds_reply_info_parsed *info,
u64 features)
{
@@ -490,7 +591,7 @@ static int parse_reply_info(struct ceph_msg *msg,
ceph_decode_32_safe(&p, end, len, bad);
if (len > 0) {
ceph_decode_need(&p, end, len, bad);
- err = parse_reply_info_extra(&p, p+len, info, features);
+ err = parse_reply_info_extra(&p, p+len, info, features, s);
if (err < 0)
goto out_bad;
}
@@ -558,6 +659,8 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
if (refcount_dec_and_test(&s->s_ref)) {
if (s->s_auth.authorizer)
ceph_auth_destroy_authorizer(s->s_auth.authorizer);
+ WARN_ON(mutex_is_locked(&s->s_mutex));
+ xa_destroy(&s->s_delegated_inos);
kfree(s);
}
}
@@ -645,12 +748,14 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
refcount_set(&s->s_ref, 1);
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
+ xa_init(&s->s_delegated_inos);
s->s_num_cap_releases = 0;
s->s_cap_reconnect = 0;
s->s_cap_iterator = NULL;
INIT_LIST_HEAD(&s->s_cap_releases);
INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
+ INIT_LIST_HEAD(&s->s_cap_dirty);
INIT_LIST_HEAD(&s->s_cap_flushing);
mdsc->sessions[mds] = s;
@@ -699,6 +804,7 @@ void ceph_mdsc_release_request(struct kref *kref)
struct ceph_mds_request *req = container_of(kref,
struct ceph_mds_request,
r_kref);
+ ceph_mdsc_release_dir_caps_no_check(req);
destroy_reply_info(&req->r_reply_info);
if (req->r_request)
ceph_msg_put(req->r_request);
@@ -736,7 +842,7 @@ void ceph_mdsc_release_request(struct kref *kref)
put_request_session(req);
ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
WARN_ON_ONCE(!list_empty(&req->r_wait));
- kfree(req);
+ kmem_cache_free(ceph_mds_request_cachep, req);
}
DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
@@ -793,8 +899,13 @@ static void __register_request(struct ceph_mds_client *mdsc,
mdsc->oldest_tid = req->r_tid;
if (dir) {
+ struct ceph_inode_info *ci = ceph_inode(dir);
+
ihold(dir);
req->r_unsafe_dir = dir;
+ spin_lock(&ci->i_unsafe_lock);
+ list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
+ spin_unlock(&ci->i_unsafe_lock);
}
}
@@ -822,8 +933,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
erase_request(&mdsc->request_tree, req);
- if (req->r_unsafe_dir &&
- test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
+ if (req->r_unsafe_dir) {
struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
spin_lock(&ci->i_unsafe_lock);
list_del_init(&req->r_unsafe_dir_item);
@@ -993,8 +1103,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
frag.frag, mds);
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
CEPH_MDS_STATE_ACTIVE) {
- if (mode == USE_ANY_MDS &&
- !ceph_mdsmap_is_laggy(mdsc->mdsmap,
+ if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
mds))
goto out;
}
@@ -1058,7 +1167,7 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
-static void encode_supported_features(void **p, void *end)
+static int encode_supported_features(void **p, void *end)
{
static const size_t count = ARRAY_SIZE(feature_bits);
@@ -1066,16 +1175,64 @@ static void encode_supported_features(void **p, void *end)
size_t i;
size_t size = FEATURE_BYTES(count);
- BUG_ON(*p + 4 + size > end);
+ if (WARN_ON_ONCE(*p + 4 + size > end))
+ return -ERANGE;
+
ceph_encode_32(p, size);
memset(*p, 0, size);
for (i = 0; i < count; i++)
((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
*p += size;
} else {
- BUG_ON(*p + 4 > end);
+ if (WARN_ON_ONCE(*p + 4 > end))
+ return -ERANGE;
+
ceph_encode_32(p, 0);
}
+
+ return 0;
+}
+
+static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
+#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
+static int encode_metric_spec(void **p, void *end)
+{
+ static const size_t count = ARRAY_SIZE(metric_bits);
+
+ /* header */
+ if (WARN_ON_ONCE(*p + 2 > end))
+ return -ERANGE;
+
+ ceph_encode_8(p, 1); /* version */
+ ceph_encode_8(p, 1); /* compat */
+
+ if (count > 0) {
+ size_t i;
+ size_t size = METRIC_BYTES(count);
+
+ if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
+ return -ERANGE;
+
+ /* metric spec info length */
+ ceph_encode_32(p, 4 + size);
+
+ /* metric spec */
+ ceph_encode_32(p, size);
+ memset(*p, 0, size);
+ for (i = 0; i < count; i++)
+ ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
+ *p += size;
+ } else {
+ if (WARN_ON_ONCE(*p + 4 + 4 > end))
+ return -ERANGE;
+
+ /* metric spec info length */
+ ceph_encode_32(p, 4);
+ /* metric spec */
+ ceph_encode_32(p, 0);
+ }
+
+ return 0;
}
/*
@@ -1093,6 +1250,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
size_t size, count;
void *p, *end;
+ int ret;
const char* metadata[][2] = {
{"hostname", mdsc->nodename},
@@ -1117,12 +1275,19 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
size = FEATURE_BYTES(count);
extra_bytes += 4 + size;
+ /* metric spec */
+ size = 0;
+ count = ARRAY_SIZE(metric_bits);
+ if (count > 0)
+ size = METRIC_BYTES(count);
+ extra_bytes += 2 + 4 + 4 + size;
+
/* Allocate the message */
msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
GFP_NOFS, false);
if (!msg) {
pr_err("create_session_msg ENOMEM creating msg\n");
- return NULL;
+ return ERR_PTR(-ENOMEM);
}
p = msg->front.iov_base;
end = p + msg->front.iov_len;
@@ -1135,9 +1300,9 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
* Serialize client metadata into waiting buffer space, using
* the format that userspace expects for map<string, string>
*
- * ClientSession messages with metadata are v3
+ * ClientSession messages with metadata are v4
*/
- msg->hdr.version = cpu_to_le16(3);
+ msg->hdr.version = cpu_to_le16(4);
msg->hdr.compat_version = cpu_to_le16(1);
/* The write pointer, following the session_head structure */
@@ -1159,7 +1324,20 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
p += val_len;
}
- encode_supported_features(&p, end);
+ ret = encode_supported_features(&p, end);
+ if (ret) {
+ pr_err("encode_supported_features failed!\n");
+ ceph_msg_put(msg);
+ return ERR_PTR(ret);
+ }
+
+ ret = encode_metric_spec(&p, end);
+ if (ret) {
+ pr_err("encode_metric_spec failed!\n");
+ ceph_msg_put(msg);
+ return ERR_PTR(ret);
+ }
+
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
@@ -1187,8 +1365,8 @@ static int __open_session(struct ceph_mds_client *mdsc,
/* send connect message */
msg = create_session_open_msg(mdsc, session->s_seq);
- if (!msg)
- return -ENOMEM;
+ if (IS_ERR(msg))
+ return PTR_ERR(msg);
ceph_con_send(&session->s_con, msg);
return 0;
}
@@ -1202,6 +1380,7 @@ static struct ceph_mds_session *
__open_export_target_session(struct ceph_mds_client *mdsc, int target)
{
struct ceph_mds_session *session;
+ int ret;
session = __ceph_lookup_mds_session(mdsc, target);
if (!session) {
@@ -1210,8 +1389,11 @@ __open_export_target_session(struct ceph_mds_client *mdsc, int target)
return session;
}
if (session->s_state == CEPH_MDS_SESSION_NEW ||
- session->s_state == CEPH_MDS_SESSION_CLOSING)
- __open_session(mdsc, session);
+ session->s_state == CEPH_MDS_SESSION_CLOSING) {
+ ret = __open_session(mdsc, session);
+ if (ret)
+ return ERR_PTR(ret);
+ }
return session;
}
@@ -1375,6 +1557,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session,
cap->session = NULL;
list_del_init(&cap->session_caps);
session->s_nr_caps--;
+ atomic64_dec(&session->s_mdsc->metric.total_caps);
if (cap->queue_release)
__ceph_queue_cap_release(session, cap);
else
@@ -1407,8 +1590,6 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
spin_lock(&ci->i_ceph_lock);
- if (cap->mds_wanted | cap->issued)
- ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
__ceph_remove_cap(cap, false);
if (!ci->i_auth_cap) {
struct ceph_cap_flush *cf;
@@ -1574,9 +1755,6 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
/* mds did not re-issue stale cap */
spin_lock(&ci->i_ceph_lock);
cap->issued = cap->implemented = CEPH_CAP_PIN;
- /* make sure mds knows what we want */
- if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted)
- ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
spin_unlock(&ci->i_ceph_lock);
}
} else if (ev == FORCE_RO) {
@@ -1680,8 +1858,7 @@ static void renewed_caps(struct ceph_mds_client *mdsc,
/*
* send a session close request
*/
-static int request_close_session(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
+static int request_close_session(struct ceph_mds_session *session)
{
struct ceph_msg *msg;
@@ -1704,7 +1881,7 @@ static int __close_session(struct ceph_mds_client *mdsc,
if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
return 0;
session->s_state = CEPH_MDS_SESSION_CLOSING;
- return request_close_session(mdsc, session);
+ return request_close_session(session);
}
static bool drop_negative_children(struct dentry *dentry)
@@ -1772,7 +1949,8 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
}
/* The inode has cached pages, but it's no longer used.
* we can safely drop it */
- if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
+ if (S_ISREG(inode->i_mode) &&
+ wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
!(oissued & CEPH_CAP_FILE_CACHE)) {
used = 0;
oissued = 0;
@@ -2089,14 +2267,16 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
{
- struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
+ struct ceph_mds_request *req;
+ req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
if (!req)
return ERR_PTR(-ENOMEM);
mutex_init(&req->r_fill_mutex);
req->r_mdsc = mdsc;
req->r_started = jiffies;
+ req->r_start_latency = ktime_get();
req->r_resend_mds = -1;
INIT_LIST_HEAD(&req->r_unsafe_dir_item);
INIT_LIST_HEAD(&req->r_unsafe_target_item);
@@ -2368,7 +2548,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
head->op = cpu_to_le32(req->r_op);
head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
- head->ino = 0;
+ head->ino = cpu_to_le64(req->r_deleg_ino);
head->args = req->r_args;
ceph_encode_filepath(&p, end, ino1, path1);
@@ -2382,7 +2562,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
if (req->r_inode_drop)
releases += ceph_encode_inode_release(&p,
req->r_inode ? req->r_inode : d_inode(req->r_dentry),
- mds, req->r_inode_drop, req->r_inode_unless, 0);
+ mds, req->r_inode_drop, req->r_inode_unless,
+ req->r_op == CEPH_MDS_OP_READDIR);
if (req->r_dentry_drop)
releases += ceph_encode_dentry_release(&p, req->r_dentry,
req->r_parent, mds, req->r_dentry_drop,
@@ -2411,7 +2592,12 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
ceph_encode_copy(&p, &ts, sizeof(ts));
}
- BUG_ON(p > end);
+ if (WARN_ON_ONCE(p > end)) {
+ ceph_msg_put(msg);
+ msg = ERR_PTR(-ERANGE);
+ goto out_free2;
+ }
+
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
@@ -2442,6 +2628,8 @@ out:
static void complete_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req)
{
+ req->r_end_latency = ktime_get();
+
if (req->r_callback)
req->r_callback(mdsc, req);
complete_all(&req->r_completion);
@@ -2522,12 +2710,13 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
flags |= CEPH_MDS_FLAG_REPLAY;
+ if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
+ flags |= CEPH_MDS_FLAG_ASYNC;
if (req->r_parent)
flags |= CEPH_MDS_FLAG_WANT_DENTRY;
rhead->flags = cpu_to_le32(flags);
rhead->num_fwd = req->r_num_fwd;
rhead->num_retry = req->r_attempts - 1;
- rhead->ino = 0;
dout(" r_parent = %p\n", req->r_parent);
return 0;
@@ -2573,7 +2762,7 @@ static void __do_request(struct ceph_mds_client *mdsc,
if (req->r_timeout &&
time_after_eq(jiffies, req->r_started + req->r_timeout)) {
dout("do_request timed out\n");
- err = -EIO;
+ err = -ETIMEDOUT;
goto finish;
}
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
@@ -2605,6 +2794,10 @@ static void __do_request(struct ceph_mds_client *mdsc,
mds = __choose_mds(mdsc, req, &random);
if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
+ if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
+ err = -EJUKEBOX;
+ goto finish;
+ }
dout("do_request no mds or not active, waiting for map\n");
list_add(&req->r_wait, &mdsc->waiting_for_map);
return;
@@ -2629,9 +2822,20 @@ static void __do_request(struct ceph_mds_client *mdsc,
err = -EACCES;
goto out_session;
}
+ /*
+ * We cannot queue async requests since the caps and delegated
+ * inodes are bound to the session. Just return -EJUKEBOX and
+ * let the caller retry a sync request in that case.
+ */
+ if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
+ err = -EJUKEBOX;
+ goto out_session;
+ }
if (session->s_state == CEPH_MDS_SESSION_NEW ||
session->s_state == CEPH_MDS_SESSION_CLOSING) {
- __open_session(mdsc, session);
+ err = __open_session(mdsc, session);
+ if (err)
+ goto out_session;
/* retry the same mds later */
if (random)
req->r_resend_mds = mds;
@@ -2709,19 +2913,43 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
struct ceph_mds_request *req)
{
- int err;
+ int err = 0;
/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
if (req->r_inode)
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
if (req->r_parent) {
- ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
+ struct ceph_inode_info *ci = ceph_inode(req->r_parent);
+ int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
+ CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
+ spin_lock(&ci->i_ceph_lock);
+ ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
+ __ceph_touch_fmode(ci, mdsc, fmode);
+ spin_unlock(&ci->i_ceph_lock);
ihold(req->r_parent);
}
if (req->r_old_dentry_dir)
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
+ if (req->r_inode) {
+ err = ceph_wait_on_async_create(req->r_inode);
+ if (err) {
+ dout("%s: wait for async create returned: %d\n",
+ __func__, err);
+ return err;
+ }
+ }
+
+ if (!err && req->r_old_inode) {
+ err = ceph_wait_on_async_create(req->r_old_inode);
+ if (err) {
+ dout("%s: wait for async create returned: %d\n",
+ __func__, err);
+ return err;
+ }
+ }
+
dout("submit_request on %p for inode %p\n", req, dir);
mutex_lock(&mdsc->mutex);
__register_request(mdsc, req, dir);
@@ -2747,7 +2975,7 @@ static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
if (timeleft > 0)
err = 0;
else if (!timeleft)
- err = -EIO; /* timed out */
+ err = -ETIMEDOUT; /* timed out */
else
err = timeleft; /* killed */
}
@@ -2935,22 +3163,14 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
} else {
set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
- if (req->r_unsafe_dir) {
- struct ceph_inode_info *ci =
- ceph_inode(req->r_unsafe_dir);
- spin_lock(&ci->i_unsafe_lock);
- list_add_tail(&req->r_unsafe_dir_item,
- &ci->i_unsafe_dirops);
- spin_unlock(&ci->i_unsafe_lock);
- }
}
dout("handle_reply tid %lld result %d\n", tid, result);
rinfo = &req->r_reply_info;
if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
- err = parse_reply_info(msg, rinfo, (u64)-1);
+ err = parse_reply_info(session, msg, rinfo, (u64)-1);
else
- err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
+ err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex);
@@ -3020,6 +3240,9 @@ out_err:
/* kick calling process */
complete_request(mdsc, req);
+
+ ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency,
+ req->r_end_latency, err);
out:
ceph_mdsc_put_request(req);
return;
@@ -3116,8 +3339,7 @@ static void handle_session(struct ceph_mds_session *session,
void *end = p + msg->front.iov_len;
struct ceph_mds_session_head *h;
u32 op;
- u64 seq;
- unsigned long features = 0;
+ u64 seq, features = 0;
int wake = 0;
bool blacklisted = false;
@@ -3136,9 +3358,10 @@ static void handle_session(struct ceph_mds_session *session,
goto bad;
/* version >= 3, feature bits */
ceph_decode_32_safe(&p, end, len, bad);
- ceph_decode_need(&p, end, len, bad);
- memcpy(&features, p, min_t(size_t, len, sizeof(features)));
- p += len;
+ if (len) {
+ ceph_decode_64_safe(&p, end, features, bad);
+ p += len - sizeof(features);
+ }
}
mutex_lock(&mdsc->mutex);
@@ -3168,6 +3391,8 @@ static void handle_session(struct ceph_mds_session *session,
session->s_state = CEPH_MDS_SESSION_OPEN;
session->s_features = features;
renewed_caps(mdsc, session, 0);
+ if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
+ metric_schedule_delayed(&mdsc->metric);
wake = 1;
if (mdsc->stopping)
__close_session(mdsc, session);
@@ -3249,6 +3474,29 @@ bad:
return;
}
+void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
+{
+ int dcaps;
+
+ dcaps = xchg(&req->r_dir_caps, 0);
+ if (dcaps) {
+ dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
+ ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
+ }
+}
+
+void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
+{
+ int dcaps;
+
+ dcaps = xchg(&req->r_dir_caps, 0);
+ if (dcaps) {
+ dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
+ ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
+ dcaps);
+ }
+}
+
/*
* called under session->mutex.
*/
@@ -3276,9 +3524,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
continue;
if (req->r_attempts == 0)
continue; /* only old requests */
- if (req->r_session &&
- req->r_session->s_mds == session->s_mds)
- __send_request(mdsc, session, req, true);
+ if (!req->r_session)
+ continue;
+ if (req->r_session->s_mds != session->s_mds)
+ continue;
+
+ ceph_mdsc_release_dir_caps_no_check(req);
+
+ __send_request(mdsc, session, req, true);
}
mutex_unlock(&mdsc->mutex);
}
@@ -3362,7 +3615,7 @@ fail_msg:
/*
* Encode information about a cap for a reconnect with the MDS.
*/
-static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
+static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
union {
@@ -3385,6 +3638,15 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
cap->mseq = 0; /* and migrate_seq */
cap->cap_gen = cap->session->s_cap_gen;
+ /* These are lost when the session goes away */
+ if (S_ISDIR(inode->i_mode)) {
+ if (cap->issued & CEPH_CAP_DIR_CREATE) {
+ ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
+ memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
+ }
+ cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
+ }
+
if (recon_state->msg_version >= 2) {
rec.v2.cap_id = cpu_to_le64(cap->cap_id);
rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
@@ -3602,8 +3864,6 @@ fail:
* recovering MDS might have.
*
* This is a relatively heavyweight operation, but it's rare.
- *
- * called with mdsc->mutex held.
*/
static void send_mds_reconnect(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
@@ -3626,6 +3886,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
if (!reply)
goto fail_nomsg;
+ xa_destroy(&session->s_delegated_inos);
+
mutex_lock(&session->s_mutex);
session->s_state = CEPH_MDS_SESSION_RECONNECTING;
session->s_seq = 0;
@@ -3681,7 +3943,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
recon_state.msg_version = 2;
}
/* trsaverse this session's caps */
- err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state);
+ err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
spin_lock(&session->s_cap_lock);
session->s_cap_reconnect = 0;
@@ -3855,7 +4117,11 @@ static void check_new_map(struct ceph_mds_client *mdsc,
oldstate != CEPH_MDS_STATE_STARTING)
pr_info("mds%d recovery completed\n", s->s_mds);
kick_requests(mdsc, i);
+ mutex_unlock(&mdsc->mutex);
+ mutex_lock(&s->s_mutex);
+ mutex_lock(&mdsc->mutex);
ceph_kick_flushing_caps(mdsc, s);
+ mutex_unlock(&s->s_mutex);
wake_up_session_caps(s, RECONNECT);
}
}
@@ -4080,6 +4346,30 @@ static void maybe_recover_session(struct ceph_mds_client *mdsc)
ceph_force_reconnect(fsc->sb);
}
+bool check_session_state(struct ceph_mds_session *s)
+{
+ if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
+ dout("resending session close request for mds%d\n",
+ s->s_mds);
+ request_close_session(s);
+ return false;
+ }
+ if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
+ if (s->s_state == CEPH_MDS_SESSION_OPEN) {
+ s->s_state = CEPH_MDS_SESSION_HUNG;
+ pr_info("mds%d hung\n", s->s_mds);
+ }
+ }
+ if (s->s_state == CEPH_MDS_SESSION_NEW ||
+ s->s_state == CEPH_MDS_SESSION_RESTARTING ||
+ s->s_state == CEPH_MDS_SESSION_CLOSED ||
+ s->s_state == CEPH_MDS_SESSION_REJECTED)
+ /* this mds is failed or recovering, just wait */
+ return false;
+
+ return true;
+}
+
/*
* delayed work -- periodically trim expired leases, renew caps with mds
*/
@@ -4100,6 +4390,9 @@ static void delayed_work(struct work_struct *work)
dout("mdsc delayed_work\n");
+ if (mdsc->stopping)
+ return;
+
mutex_lock(&mdsc->mutex);
renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
renew_caps = time_after_eq(jiffies, HZ*renew_interval +
@@ -4111,23 +4404,8 @@ static void delayed_work(struct work_struct *work)
struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
if (!s)
continue;
- if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
- dout("resending session close request for mds%d\n",
- s->s_mds);
- request_close_session(mdsc, s);
- ceph_put_mds_session(s);
- continue;
- }
- if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
- if (s->s_state == CEPH_MDS_SESSION_OPEN) {
- s->s_state = CEPH_MDS_SESSION_HUNG;
- pr_info("mds%d hung\n", s->s_mds);
- }
- }
- if (s->s_state == CEPH_MDS_SESSION_NEW ||
- s->s_state == CEPH_MDS_SESSION_RESTARTING ||
- s->s_state == CEPH_MDS_SESSION_REJECTED) {
- /* this mds is failed or recovering, just wait */
+
+ if (!check_session_state(s)) {
ceph_put_mds_session(s);
continue;
}
@@ -4163,6 +4441,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
{
struct ceph_mds_client *mdsc;
+ int err;
mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
if (!mdsc)
@@ -4171,11 +4450,10 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
mutex_init(&mdsc->mutex);
mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
if (!mdsc->mdsmap) {
- kfree(mdsc);
- return -ENOMEM;
+ err = -ENOMEM;
+ goto err_mdsc;
}
- fsc->mdsc = mdsc;
init_completion(&mdsc->safe_umount_waiters);
init_waitqueue_head(&mdsc->session_close_wq);
INIT_LIST_HEAD(&mdsc->waiting_for_map);
@@ -4204,13 +4482,15 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
spin_lock_init(&mdsc->snap_flush_lock);
mdsc->last_cap_flush_tid = 1;
INIT_LIST_HEAD(&mdsc->cap_flush_list);
- INIT_LIST_HEAD(&mdsc->cap_dirty);
INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
mdsc->num_cap_flushing = 0;
spin_lock_init(&mdsc->cap_dirty_lock);
init_waitqueue_head(&mdsc->cap_flushing_wq);
INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
atomic_set(&mdsc->cap_reclaim_pending, 0);
+ err = ceph_metric_init(&mdsc->metric);
+ if (err)
+ goto err_mdsmap;
spin_lock_init(&mdsc->dentry_list_lock);
INIT_LIST_HEAD(&mdsc->dentry_leases);
@@ -4228,7 +4508,15 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
strscpy(mdsc->nodename, utsname()->nodename,
sizeof(mdsc->nodename));
+
+ fsc->mdsc = mdsc;
return 0;
+
+err_mdsmap:
+ kfree(mdsc->mdsmap);
+err_mdsc:
+ kfree(mdsc);
+ return err;
}
/*
@@ -4465,7 +4753,16 @@ void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
{
dout("stop\n");
- cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
+ /*
+ * Make sure the delayed work stopped before releasing
+ * the resources.
+ *
+ * Because the cancel_delayed_work_sync() will only
+ * guarantee that the work finishes executing. But the
+ * delayed work will re-arm itself again after that.
+ */
+ flush_delayed_work(&mdsc->delayed_work);
+
if (mdsc->mdsmap)
ceph_mdsmap_destroy(mdsc->mdsmap);
kfree(mdsc->sessions);
@@ -4486,6 +4783,9 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
ceph_mdsc_stop(mdsc);
+ ceph_metric_destroy(&mdsc->metric);
+
+ flush_delayed_work(&mdsc->metric.delayed_work);
fsc->mdsc = NULL;
kfree(mdsc);
dout("mdsc_destroy %p done\n", mdsc);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 27a7446e10d3..658800605bfb 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -10,12 +10,16 @@
#include <linux/spinlock.h>
#include <linux/refcount.h>
#include <linux/utsname.h>
+#include <linux/ktime.h>
#include <linux/ceph/types.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/mdsmap.h>
#include <linux/ceph/auth.h>
+#include "metric.h"
+#include "super.h"
+
/* The first 8 bits are reserved for old ceph releases */
enum ceph_feature_type {
CEPHFS_FEATURE_MIMIC = 8,
@@ -23,8 +27,10 @@ enum ceph_feature_type {
CEPHFS_FEATURE_RECLAIM_CLIENT,
CEPHFS_FEATURE_LAZY_CAP_WANTED,
CEPHFS_FEATURE_MULTI_RECONNECT,
+ CEPHFS_FEATURE_DELEG_INO,
+ CEPHFS_FEATURE_METRIC_COLLECT,
- CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_MULTI_RECONNECT,
+ CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_METRIC_COLLECT,
};
/*
@@ -37,6 +43,8 @@ enum ceph_feature_type {
CEPHFS_FEATURE_REPLY_ENCODING, \
CEPHFS_FEATURE_LAZY_CAP_WANTED, \
CEPHFS_FEATURE_MULTI_RECONNECT, \
+ CEPHFS_FEATURE_DELEG_INO, \
+ CEPHFS_FEATURE_METRIC_COLLECT, \
\
CEPHFS_FEATURE_MAX, \
}
@@ -194,13 +202,18 @@ struct ceph_mds_session {
struct list_head s_cap_releases; /* waiting cap_release messages */
struct work_struct s_cap_release_work;
- /* protected by mutex */
+ /* See ceph_inode_info->i_dirty_item. */
+ struct list_head s_cap_dirty; /* inodes w/ dirty caps */
+
+ /* See ceph_inode_info->i_flushing_item. */
struct list_head s_cap_flushing; /* inodes w/ flushing caps */
+
unsigned long s_renew_requested; /* last time we sent a renew req */
u64 s_renew_seq;
struct list_head s_waiting; /* waiting requests */
struct list_head s_unsafe; /* unsafe requests */
+ struct xarray s_delegated_inos;
};
/*
@@ -255,6 +268,7 @@ struct ceph_mds_request {
#define CEPH_MDS_R_GOT_RESULT (5) /* got a result */
#define CEPH_MDS_R_DID_PREPOPULATE (6) /* prepopulated readdir */
#define CEPH_MDS_R_PARENT_LOCKED (7) /* is r_parent->i_rwsem wlocked? */
+#define CEPH_MDS_R_ASYNC (8) /* async request */
unsigned long r_req_flags;
struct mutex r_fill_mutex;
@@ -263,6 +277,7 @@ struct ceph_mds_request {
int r_fmode; /* file mode, if expecting cap */
kuid_t r_uid;
kgid_t r_gid;
+ int r_request_release_offset;
struct timespec64 r_stamp;
/* for choosing which mds to send this request to */
@@ -280,14 +295,20 @@ struct ceph_mds_request {
int r_old_inode_drop, r_old_inode_unless;
struct ceph_msg *r_request; /* original request */
- int r_request_release_offset;
struct ceph_msg *r_reply;
struct ceph_mds_reply_info_parsed r_reply_info;
- struct page *r_locked_page;
int r_err;
+
+ struct page *r_locked_page;
+ int r_dir_caps;
+ int r_num_caps;
+ u32 r_readdir_offset;
+
unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */
unsigned long r_started; /* start time to measure timeout against */
+ unsigned long r_start_latency; /* start time to measure latency */
+ unsigned long r_end_latency; /* finish time to measure latency */
unsigned long r_request_started; /* start time for mds request only,
used to measure lease durations */
@@ -304,6 +325,7 @@ struct ceph_mds_request {
int r_num_fwd; /* number of forward attempts */
int r_resend_mds; /* mds to resend to next, if any*/
u32 r_sent_on_mseq; /* cap mseq request was sent at*/
+ u64 r_deleg_ino;
struct list_head r_wait;
struct completion r_completion;
@@ -315,10 +337,8 @@ struct ceph_mds_request {
long long r_dir_release_cnt;
long long r_dir_ordered_cnt;
int r_readdir_cache_idx;
- u32 r_readdir_offset;
struct ceph_cap_reservation r_caps_reservation;
- int r_num_caps;
};
struct ceph_pool_perm {
@@ -352,7 +372,7 @@ struct ceph_quotarealm_inode {
struct cap_wait {
struct list_head list;
- unsigned long ino;
+ u64 ino;
pid_t tgid;
int need;
int want;
@@ -411,7 +431,6 @@ struct ceph_mds_client {
u64 last_cap_flush_tid;
struct list_head cap_flush_list;
- struct list_head cap_dirty; /* inodes with dirty caps */
struct list_head cap_dirty_migrating; /* ...that are migration... */
int num_cap_flushing; /* # caps we are flushing */
spinlock_t cap_dirty_lock; /* protects above items */
@@ -446,6 +465,8 @@ struct ceph_mds_client {
struct list_head dentry_leases; /* fifo list */
struct list_head dentry_dir_leases; /* lru list */
+ struct ceph_client_metric metric;
+
spinlock_t snapid_map_lock;
struct rb_root snapid_map_tree;
struct list_head snapid_map_lru;
@@ -458,6 +479,8 @@ struct ceph_mds_client {
extern const char *ceph_mds_op_name(int op);
+extern bool check_session_state(struct ceph_mds_session *s);
+
extern struct ceph_mds_session *
__ceph_lookup_mds_session(struct ceph_mds_client *, int mds);
@@ -488,6 +511,8 @@ extern int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
struct inode *dir,
struct ceph_mds_request *req);
+extern void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req);
+extern void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req);
static inline void ceph_mdsc_get_request(struct ceph_mds_request *req)
{
kref_get(&req->r_kref);
@@ -512,7 +537,7 @@ extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
static inline void ceph_mdsc_free_path(char *path, int len)
{
- if (path)
+ if (!IS_ERR_OR_NULL(path))
__putname(path - (PATH_MAX - 1 - len));
}
@@ -537,4 +562,15 @@ extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
extern int ceph_trim_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
int max_caps);
+
+static inline int ceph_wait_on_async_create(struct inode *inode)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+
+ return wait_on_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT,
+ TASK_INTERRUPTIBLE);
+}
+
+extern u64 ceph_get_deleg_ino(struct ceph_mds_session *session);
+extern int ceph_restore_deleg_ino(struct ceph_mds_session *session, u64 ino);
#endif
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 889627817e52..e4aba6c6d3b5 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -120,7 +120,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
const void *start = *p;
int i, j, n;
int err;
- u8 mdsmap_v, mdsmap_cv;
+ u8 mdsmap_v;
u16 mdsmap_ev;
m = kzalloc(sizeof(*m), GFP_NOFS);
@@ -129,7 +129,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
ceph_decode_need(p, end, 1 + 1, bad);
mdsmap_v = ceph_decode_8(p);
- mdsmap_cv = ceph_decode_8(p);
+ *p += sizeof(u8); /* mdsmap_cv */
if (mdsmap_v >= 4) {
u32 mdsmap_len;
ceph_decode_32_safe(p, end, mdsmap_len, bad);
@@ -174,7 +174,6 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
u64 global_id;
u32 namelen;
s32 mds, inc, state;
- u64 state_seq;
u8 info_v;
void *info_end = NULL;
struct ceph_entity_addr addr;
@@ -189,9 +188,8 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
info_v= ceph_decode_8(p);
if (info_v >= 4) {
u32 info_len;
- u8 info_cv;
ceph_decode_need(p, end, 1 + sizeof(u32), bad);
- info_cv = ceph_decode_8(p);
+ *p += sizeof(u8); /* info_cv */
info_len = ceph_decode_32(p);
info_end = *p + info_len;
if (info_end > end)
@@ -210,7 +208,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
mds = ceph_decode_32(p);
inc = ceph_decode_32(p);
state = ceph_decode_32(p);
- state_seq = ceph_decode_64(p);
+ *p += sizeof(u64); /* state_seq */
err = ceph_decode_entity_addr(p, end, &addr);
if (err)
goto corrupt;
diff --git a/fs/ceph/metric.c b/fs/ceph/metric.c
new file mode 100644
index 000000000000..2466b261fba2
--- /dev/null
+++ b/fs/ceph/metric.c
@@ -0,0 +1,297 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#include <linux/ceph/ceph_debug.h>
+
+#include <linux/types.h>
+#include <linux/percpu_counter.h>
+#include <linux/math64.h>
+
+#include "metric.h"
+#include "mds_client.h"
+
+static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *s)
+{
+ struct ceph_metric_head *head;
+ struct ceph_metric_cap *cap;
+ struct ceph_metric_read_latency *read;
+ struct ceph_metric_write_latency *write;
+ struct ceph_metric_metadata_latency *meta;
+ struct ceph_client_metric *m = &mdsc->metric;
+ u64 nr_caps = atomic64_read(&m->total_caps);
+ struct ceph_msg *msg;
+ struct timespec64 ts;
+ s64 sum;
+ s32 items = 0;
+ s32 len;
+
+ len = sizeof(*head) + sizeof(*cap) + sizeof(*read) + sizeof(*write)
+ + sizeof(*meta);
+
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_METRICS, len, GFP_NOFS, true);
+ if (!msg) {
+ pr_err("send metrics to mds%d, failed to allocate message\n",
+ s->s_mds);
+ return false;
+ }
+
+ head = msg->front.iov_base;
+
+ /* encode the cap metric */
+ cap = (struct ceph_metric_cap *)(head + 1);
+ cap->type = cpu_to_le32(CLIENT_METRIC_TYPE_CAP_INFO);
+ cap->ver = 1;
+ cap->compat = 1;
+ cap->data_len = cpu_to_le32(sizeof(*cap) - 10);
+ cap->hit = cpu_to_le64(percpu_counter_sum(&mdsc->metric.i_caps_hit));
+ cap->mis = cpu_to_le64(percpu_counter_sum(&mdsc->metric.i_caps_mis));
+ cap->total = cpu_to_le64(nr_caps);
+ items++;
+
+ /* encode the read latency metric */
+ read = (struct ceph_metric_read_latency *)(cap + 1);
+ read->type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY);
+ read->ver = 1;
+ read->compat = 1;
+ read->data_len = cpu_to_le32(sizeof(*read) - 10);
+ sum = m->read_latency_sum;
+ jiffies_to_timespec64(sum, &ts);
+ read->sec = cpu_to_le32(ts.tv_sec);
+ read->nsec = cpu_to_le32(ts.tv_nsec);
+ items++;
+
+ /* encode the write latency metric */
+ write = (struct ceph_metric_write_latency *)(read + 1);
+ write->type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY);
+ write->ver = 1;
+ write->compat = 1;
+ write->data_len = cpu_to_le32(sizeof(*write) - 10);
+ sum = m->write_latency_sum;
+ jiffies_to_timespec64(sum, &ts);
+ write->sec = cpu_to_le32(ts.tv_sec);
+ write->nsec = cpu_to_le32(ts.tv_nsec);
+ items++;
+
+ /* encode the metadata latency metric */
+ meta = (struct ceph_metric_metadata_latency *)(write + 1);
+ meta->type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY);
+ meta->ver = 1;
+ meta->compat = 1;
+ meta->data_len = cpu_to_le32(sizeof(*meta) - 10);
+ sum = m->metadata_latency_sum;
+ jiffies_to_timespec64(sum, &ts);
+ meta->sec = cpu_to_le32(ts.tv_sec);
+ meta->nsec = cpu_to_le32(ts.tv_nsec);
+ items++;
+
+ put_unaligned_le32(items, &head->num);
+ msg->front.iov_len = len;
+ msg->hdr.version = cpu_to_le16(1);
+ msg->hdr.compat_version = cpu_to_le16(1);
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+ dout("client%llu send metrics to mds%d\n",
+ ceph_client_gid(mdsc->fsc->client), s->s_mds);
+ ceph_con_send(&s->s_con, msg);
+
+ return true;
+}
+
+
+static void metric_get_session(struct ceph_mds_client *mdsc)
+{
+ struct ceph_mds_session *s;
+ int i;
+
+ mutex_lock(&mdsc->mutex);
+ for (i = 0; i < mdsc->max_sessions; i++) {
+ s = __ceph_lookup_mds_session(mdsc, i);
+ if (!s)
+ continue;
+
+ /*
+ * Skip it if MDS doesn't support the metric collection,
+ * or the MDS will close the session's socket connection
+ * directly when it get this message.
+ */
+ if (check_session_state(s) &&
+ test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &s->s_features)) {
+ mdsc->metric.session = s;
+ break;
+ }
+
+ ceph_put_mds_session(s);
+ }
+ mutex_unlock(&mdsc->mutex);
+}
+
+static void metric_delayed_work(struct work_struct *work)
+{
+ struct ceph_client_metric *m =
+ container_of(work, struct ceph_client_metric, delayed_work.work);
+ struct ceph_mds_client *mdsc =
+ container_of(m, struct ceph_mds_client, metric);
+
+ if (mdsc->stopping)
+ return;
+
+ if (!m->session || !check_session_state(m->session)) {
+ if (m->session) {
+ ceph_put_mds_session(m->session);
+ m->session = NULL;
+ }
+ metric_get_session(mdsc);
+ }
+ if (m->session) {
+ ceph_mdsc_send_metrics(mdsc, m->session);
+ metric_schedule_delayed(m);
+ }
+}
+
+int ceph_metric_init(struct ceph_client_metric *m)
+{
+ int ret;
+
+ if (!m)
+ return -EINVAL;
+
+ atomic64_set(&m->total_dentries, 0);
+ ret = percpu_counter_init(&m->d_lease_hit, 0, GFP_KERNEL);
+ if (ret)
+ return ret;
+
+ ret = percpu_counter_init(&m->d_lease_mis, 0, GFP_KERNEL);
+ if (ret)
+ goto err_d_lease_mis;
+
+ atomic64_set(&m->total_caps, 0);
+ ret = percpu_counter_init(&m->i_caps_hit, 0, GFP_KERNEL);
+ if (ret)
+ goto err_i_caps_hit;
+
+ ret = percpu_counter_init(&m->i_caps_mis, 0, GFP_KERNEL);
+ if (ret)
+ goto err_i_caps_mis;
+
+ spin_lock_init(&m->read_latency_lock);
+ m->read_latency_sq_sum = 0;
+ m->read_latency_min = KTIME_MAX;
+ m->read_latency_max = 0;
+ m->total_reads = 0;
+ m->read_latency_sum = 0;
+
+ spin_lock_init(&m->write_latency_lock);
+ m->write_latency_sq_sum = 0;
+ m->write_latency_min = KTIME_MAX;
+ m->write_latency_max = 0;
+ m->total_writes = 0;
+ m->write_latency_sum = 0;
+
+ spin_lock_init(&m->metadata_latency_lock);
+ m->metadata_latency_sq_sum = 0;
+ m->metadata_latency_min = KTIME_MAX;
+ m->metadata_latency_max = 0;
+ m->total_metadatas = 0;
+ m->metadata_latency_sum = 0;
+
+ m->session = NULL;
+ INIT_DELAYED_WORK(&m->delayed_work, metric_delayed_work);
+
+ return 0;
+
+err_i_caps_mis:
+ percpu_counter_destroy(&m->i_caps_hit);
+err_i_caps_hit:
+ percpu_counter_destroy(&m->d_lease_mis);
+err_d_lease_mis:
+ percpu_counter_destroy(&m->d_lease_hit);
+
+ return ret;
+}
+
+void ceph_metric_destroy(struct ceph_client_metric *m)
+{
+ if (!m)
+ return;
+
+ percpu_counter_destroy(&m->i_caps_mis);
+ percpu_counter_destroy(&m->i_caps_hit);
+ percpu_counter_destroy(&m->d_lease_mis);
+ percpu_counter_destroy(&m->d_lease_hit);
+
+ cancel_delayed_work_sync(&m->delayed_work);
+
+ if (m->session)
+ ceph_put_mds_session(m->session);
+}
+
+static inline void __update_latency(ktime_t *totalp, ktime_t *lsump,
+ ktime_t *min, ktime_t *max,
+ ktime_t *sq_sump, ktime_t lat)
+{
+ ktime_t total, avg, sq, lsum;
+
+ total = ++(*totalp);
+ lsum = (*lsump += lat);
+
+ if (unlikely(lat < *min))
+ *min = lat;
+ if (unlikely(lat > *max))
+ *max = lat;
+
+ if (unlikely(total == 1))
+ return;
+
+ /* the sq is (lat - old_avg) * (lat - new_avg) */
+ avg = DIV64_U64_ROUND_CLOSEST((lsum - lat), (total - 1));
+ sq = lat - avg;
+ avg = DIV64_U64_ROUND_CLOSEST(lsum, total);
+ sq = sq * (lat - avg);
+ *sq_sump += sq;
+}
+
+void ceph_update_read_latency(struct ceph_client_metric *m,
+ ktime_t r_start, ktime_t r_end,
+ int rc)
+{
+ ktime_t lat = ktime_sub(r_end, r_start);
+
+ if (unlikely(rc < 0 && rc != -ENOENT && rc != -ETIMEDOUT))
+ return;
+
+ spin_lock(&m->read_latency_lock);
+ __update_latency(&m->total_reads, &m->read_latency_sum,
+ &m->read_latency_min, &m->read_latency_max,
+ &m->read_latency_sq_sum, lat);
+ spin_unlock(&m->read_latency_lock);
+}
+
+void ceph_update_write_latency(struct ceph_client_metric *m,
+ ktime_t r_start, ktime_t r_end,
+ int rc)
+{
+ ktime_t lat = ktime_sub(r_end, r_start);
+
+ if (unlikely(rc && rc != -ETIMEDOUT))
+ return;
+
+ spin_lock(&m->write_latency_lock);
+ __update_latency(&m->total_writes, &m->write_latency_sum,
+ &m->write_latency_min, &m->write_latency_max,
+ &m->write_latency_sq_sum, lat);
+ spin_unlock(&m->write_latency_lock);
+}
+
+void ceph_update_metadata_latency(struct ceph_client_metric *m,
+ ktime_t r_start, ktime_t r_end,
+ int rc)
+{
+ ktime_t lat = ktime_sub(r_end, r_start);
+
+ if (unlikely(rc && rc != -ENOENT))
+ return;
+
+ spin_lock(&m->metadata_latency_lock);
+ __update_latency(&m->total_metadatas, &m->metadata_latency_sum,
+ &m->metadata_latency_min, &m->metadata_latency_max,
+ &m->metadata_latency_sq_sum, lat);
+ spin_unlock(&m->metadata_latency_lock);
+}
diff --git a/fs/ceph/metric.h b/fs/ceph/metric.h
new file mode 100644
index 000000000000..1d0959d669d7
--- /dev/null
+++ b/fs/ceph/metric.h
@@ -0,0 +1,153 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _FS_CEPH_MDS_METRIC_H
+#define _FS_CEPH_MDS_METRIC_H
+
+#include <linux/types.h>
+#include <linux/percpu_counter.h>
+#include <linux/ktime.h>
+
+extern bool disable_send_metrics;
+
+enum ceph_metric_type {
+ CLIENT_METRIC_TYPE_CAP_INFO,
+ CLIENT_METRIC_TYPE_READ_LATENCY,
+ CLIENT_METRIC_TYPE_WRITE_LATENCY,
+ CLIENT_METRIC_TYPE_METADATA_LATENCY,
+ CLIENT_METRIC_TYPE_DENTRY_LEASE,
+
+ CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_DENTRY_LEASE,
+};
+
+/*
+ * This will always have the highest metric bit value
+ * as the last element of the array.
+ */
+#define CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED { \
+ CLIENT_METRIC_TYPE_CAP_INFO, \
+ CLIENT_METRIC_TYPE_READ_LATENCY, \
+ CLIENT_METRIC_TYPE_WRITE_LATENCY, \
+ CLIENT_METRIC_TYPE_METADATA_LATENCY, \
+ \
+ CLIENT_METRIC_TYPE_MAX, \
+}
+
+/* metric caps header */
+struct ceph_metric_cap {
+ __le32 type; /* ceph metric type */
+
+ __u8 ver;
+ __u8 compat;
+
+ __le32 data_len; /* length of sizeof(hit + mis + total) */
+ __le64 hit;
+ __le64 mis;
+ __le64 total;
+} __packed;
+
+/* metric read latency header */
+struct ceph_metric_read_latency {
+ __le32 type; /* ceph metric type */
+
+ __u8 ver;
+ __u8 compat;
+
+ __le32 data_len; /* length of sizeof(sec + nsec) */
+ __le32 sec;
+ __le32 nsec;
+} __packed;
+
+/* metric write latency header */
+struct ceph_metric_write_latency {
+ __le32 type; /* ceph metric type */
+
+ __u8 ver;
+ __u8 compat;
+
+ __le32 data_len; /* length of sizeof(sec + nsec) */
+ __le32 sec;
+ __le32 nsec;
+} __packed;
+
+/* metric metadata latency header */
+struct ceph_metric_metadata_latency {
+ __le32 type; /* ceph metric type */
+
+ __u8 ver;
+ __u8 compat;
+
+ __le32 data_len; /* length of sizeof(sec + nsec) */
+ __le32 sec;
+ __le32 nsec;
+} __packed;
+
+struct ceph_metric_head {
+ __le32 num; /* the number of metrics that will be sent */
+} __packed;
+
+/* This is the global metrics */
+struct ceph_client_metric {
+ atomic64_t total_dentries;
+ struct percpu_counter d_lease_hit;
+ struct percpu_counter d_lease_mis;
+
+ atomic64_t total_caps;
+ struct percpu_counter i_caps_hit;
+ struct percpu_counter i_caps_mis;
+
+ spinlock_t read_latency_lock;
+ u64 total_reads;
+ ktime_t read_latency_sum;
+ ktime_t read_latency_sq_sum;
+ ktime_t read_latency_min;
+ ktime_t read_latency_max;
+
+ spinlock_t write_latency_lock;
+ u64 total_writes;
+ ktime_t write_latency_sum;
+ ktime_t write_latency_sq_sum;
+ ktime_t write_latency_min;
+ ktime_t write_latency_max;
+
+ spinlock_t metadata_latency_lock;
+ u64 total_metadatas;
+ ktime_t metadata_latency_sum;
+ ktime_t metadata_latency_sq_sum;
+ ktime_t metadata_latency_min;
+ ktime_t metadata_latency_max;
+
+ struct ceph_mds_session *session;
+ struct delayed_work delayed_work; /* delayed work */
+};
+
+static inline void metric_schedule_delayed(struct ceph_client_metric *m)
+{
+ if (disable_send_metrics)
+ return;
+
+ /* per second */
+ schedule_delayed_work(&m->delayed_work, round_jiffies_relative(HZ));
+}
+
+extern int ceph_metric_init(struct ceph_client_metric *m);
+extern void ceph_metric_destroy(struct ceph_client_metric *m);
+
+static inline void ceph_update_cap_hit(struct ceph_client_metric *m)
+{
+ percpu_counter_inc(&m->i_caps_hit);
+}
+
+static inline void ceph_update_cap_mis(struct ceph_client_metric *m)
+{
+ percpu_counter_inc(&m->i_caps_mis);
+}
+
+extern void ceph_update_read_latency(struct ceph_client_metric *m,
+ ktime_t r_start, ktime_t r_end,
+ int rc);
+extern void ceph_update_write_latency(struct ceph_client_metric *m,
+ ktime_t r_start, ktime_t r_end,
+ int rc);
+extern void ceph_update_metadata_latency(struct ceph_client_metric *m,
+ ktime_t r_start, ktime_t r_end,
+ int rc);
+#endif /* _FS_CEPH_MDS_METRIC_H */
diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c
index de56dee60540..cc2c4d40b022 100644
--- a/fs/ceph/quota.c
+++ b/fs/ceph/quota.c
@@ -23,12 +23,12 @@ static inline bool ceph_has_realms_with_quotas(struct inode *inode)
{
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct super_block *sb = mdsc->fsc->sb;
+ struct inode *root = d_inode(sb->s_root);
if (atomic64_read(&mdsc->quotarealms_count) > 0)
return true;
/* if root is the real CephFS root, we don't have quota realms */
- if (sb->s_root->d_inode &&
- (sb->s_root->d_inode->i_ino == CEPH_INO_ROOT))
+ if (root && ceph_ino(root) == CEPH_INO_ROOT)
return false;
/* otherwise, we can't know for sure */
return true;
@@ -159,8 +159,8 @@ static struct inode *lookup_quotarealm_inode(struct ceph_mds_client *mdsc,
}
if (IS_ERR(in)) {
- pr_warn("Can't lookup inode %llx (err: %ld)\n",
- realm->ino, PTR_ERR(in));
+ dout("Can't lookup inode %llx (err: %ld)\n",
+ realm->ino, PTR_ERR(in));
qri->timeout = jiffies + msecs_to_jiffies(60 * 1000); /* XXX */
} else {
qri->timeout = 0;
@@ -264,7 +264,7 @@ restart:
return NULL;
}
-bool ceph_quota_is_same_realm(struct inode *old, struct inode *new)
+static bool ceph_quota_is_same_realm(struct inode *old, struct inode *new)
{
struct ceph_mds_client *mdsc = ceph_inode_to_client(old)->mdsc;
struct ceph_snap_realm *old_realm, *new_realm;
@@ -361,8 +361,6 @@ restart:
spin_unlock(&ci->i_ceph_lock);
switch (op) {
case QUOTA_CHECK_MAX_FILES_OP:
- exceeded = (max && (rvalue >= max));
- break;
case QUOTA_CHECK_MAX_BYTES_OP:
exceeded = (max && (rvalue + delta > max));
break;
@@ -417,7 +415,7 @@ bool ceph_quota_is_max_files_exceeded(struct inode *inode)
WARN_ON(!S_ISDIR(inode->i_mode));
- return check_quota_exceeded(inode, QUOTA_CHECK_MAX_FILES_OP, 0);
+ return check_quota_exceeded(inode, QUOTA_CHECK_MAX_FILES_OP, 1);
}
/*
@@ -518,3 +516,59 @@ bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, struct kstatfs *buf)
return is_updated;
}
+/*
+ * ceph_quota_check_rename - check if a rename can be executed
+ * @mdsc: MDS client instance
+ * @old: inode to be copied
+ * @new: destination inode (directory)
+ *
+ * This function verifies if a rename (e.g. moving a file or directory) can be
+ * executed. It forces an rstat update in the @new target directory (and in the
+ * source @old as well, if it's a directory). The actual check is done both for
+ * max_files and max_bytes.
+ *
+ * This function returns 0 if it's OK to do the rename, or, if quotas are
+ * exceeded, -EXDEV (if @old is a directory) or -EDQUOT.
+ */
+int ceph_quota_check_rename(struct ceph_mds_client *mdsc,
+ struct inode *old, struct inode *new)
+{
+ struct ceph_inode_info *ci_old = ceph_inode(old);
+ int ret = 0;
+
+ if (ceph_quota_is_same_realm(old, new))
+ return 0;
+
+ /*
+ * Get the latest rstat for target directory (and for source, if a
+ * directory)
+ */
+ ret = ceph_do_getattr(new, CEPH_STAT_RSTAT, false);
+ if (ret)
+ return ret;
+
+ if (S_ISDIR(old->i_mode)) {
+ ret = ceph_do_getattr(old, CEPH_STAT_RSTAT, false);
+ if (ret)
+ return ret;
+ ret = check_quota_exceeded(new, QUOTA_CHECK_MAX_BYTES_OP,
+ ci_old->i_rbytes);
+ if (!ret)
+ ret = check_quota_exceeded(new,
+ QUOTA_CHECK_MAX_FILES_OP,
+ ci_old->i_rfiles +
+ ci_old->i_rsubdirs);
+ if (ret)
+ ret = -EXDEV;
+ } else {
+ ret = check_quota_exceeded(new, QUOTA_CHECK_MAX_BYTES_OP,
+ i_size_read(old));
+ if (!ret)
+ ret = check_quota_exceeded(new,
+ QUOTA_CHECK_MAX_FILES_OP, 1);
+ if (ret)
+ ret = -EDQUOT;
+ }
+
+ return ret;
+}
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index ccfcc66aaf44..923be9399b21 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -1155,5 +1155,6 @@ void ceph_cleanup_snapid_map(struct ceph_mds_client *mdsc)
pr_err("snapid map %llx -> %x still in use\n",
sm->snap, sm->dev);
}
+ kfree(sm);
}
}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index c7f150686a53..7ec0e6d03d10 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -27,6 +27,9 @@
#include <linux/ceph/auth.h>
#include <linux/ceph/debugfs.h>
+static DEFINE_SPINLOCK(ceph_fsc_lock);
+static LIST_HEAD(ceph_fsc_list);
+
/*
* Ceph superblock operations
*
@@ -155,6 +158,7 @@ enum {
Opt_acl,
Opt_quotadf,
Opt_copyfrom,
+ Opt_wsync,
};
enum ceph_recover_session_mode {
@@ -194,6 +198,7 @@ static const struct fs_parameter_spec ceph_mount_parameters[] = {
fsparam_string ("snapdirname", Opt_snapdirname),
fsparam_string ("source", Opt_source),
fsparam_u32 ("wsize", Opt_wsize),
+ fsparam_flag_no ("wsync", Opt_wsync),
{}
};
@@ -444,6 +449,12 @@ static int ceph_parse_mount_param(struct fs_context *fc,
fc->sb_flags &= ~SB_POSIXACL;
}
break;
+ case Opt_wsync:
+ if (!result.negated)
+ fsopt->flags &= ~CEPH_MOUNT_OPT_ASYNC_DIROPS;
+ else
+ fsopt->flags |= CEPH_MOUNT_OPT_ASYNC_DIROPS;
+ break;
default:
BUG();
}
@@ -567,6 +578,9 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
if (fsopt->flags & CEPH_MOUNT_OPT_CLEANRECOVER)
seq_show_option(m, "recover_session", "clean");
+ if (fsopt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
+ seq_puts(m, ",nowsync");
+
if (fsopt->wsize != CEPH_MAX_WRITE_SIZE)
seq_printf(m, ",wsize=%u", fsopt->wsize);
if (fsopt->rsize != CEPH_MAX_READ_SIZE)
@@ -623,8 +637,6 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
struct ceph_options *opt)
{
struct ceph_fs_client *fsc;
- int page_count;
- size_t size;
int err;
fsc = kzalloc(sizeof(*fsc), GFP_KERNEL);
@@ -672,18 +684,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
if (!fsc->cap_wq)
goto fail_inode_wq;
- /* set up mempools */
- err = -ENOMEM;
- page_count = fsc->mount_options->wsize >> PAGE_SHIFT;
- size = sizeof (struct page *) * (page_count ? page_count : 1);
- fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
- if (!fsc->wb_pagevec_pool)
- goto fail_cap_wq;
+ spin_lock(&ceph_fsc_lock);
+ list_add_tail(&fsc->metric_wakeup, &ceph_fsc_list);
+ spin_unlock(&ceph_fsc_lock);
return fsc;
-fail_cap_wq:
- destroy_workqueue(fsc->cap_wq);
fail_inode_wq:
destroy_workqueue(fsc->inode_wq);
fail_client:
@@ -706,12 +712,14 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
{
dout("destroy_fs_client %p\n", fsc);
+ spin_lock(&ceph_fsc_lock);
+ list_del(&fsc->metric_wakeup);
+ spin_unlock(&ceph_fsc_lock);
+
ceph_mdsc_destroy(fsc);
destroy_workqueue(fsc->inode_wq);
destroy_workqueue(fsc->cap_wq);
- mempool_destroy(fsc->wb_pagevec_pool);
-
destroy_mount_options(fsc->mount_options);
ceph_destroy_client(fsc->client);
@@ -729,6 +737,8 @@ struct kmem_cache *ceph_cap_flush_cachep;
struct kmem_cache *ceph_dentry_cachep;
struct kmem_cache *ceph_file_cachep;
struct kmem_cache *ceph_dir_file_cachep;
+struct kmem_cache *ceph_mds_request_cachep;
+mempool_t *ceph_wb_pagevec_pool;
static void ceph_inode_init_once(void *foo)
{
@@ -769,6 +779,14 @@ static int __init init_caches(void)
if (!ceph_dir_file_cachep)
goto bad_dir_file;
+ ceph_mds_request_cachep = KMEM_CACHE(ceph_mds_request, SLAB_MEM_SPREAD);
+ if (!ceph_mds_request_cachep)
+ goto bad_mds_req;
+
+ ceph_wb_pagevec_pool = mempool_create_kmalloc_pool(10, CEPH_MAX_WRITE_SIZE >> PAGE_SHIFT);
+ if (!ceph_wb_pagevec_pool)
+ goto bad_pagevec_pool;
+
error = ceph_fscache_register();
if (error)
goto bad_fscache;
@@ -776,6 +794,10 @@ static int __init init_caches(void)
return 0;
bad_fscache:
+ kmem_cache_destroy(ceph_mds_request_cachep);
+bad_pagevec_pool:
+ mempool_destroy(ceph_wb_pagevec_pool);
+bad_mds_req:
kmem_cache_destroy(ceph_dir_file_cachep);
bad_dir_file:
kmem_cache_destroy(ceph_file_cachep);
@@ -804,12 +826,14 @@ static void destroy_caches(void)
kmem_cache_destroy(ceph_dentry_cachep);
kmem_cache_destroy(ceph_file_cachep);
kmem_cache_destroy(ceph_dir_file_cachep);
+ kmem_cache_destroy(ceph_mds_request_cachep);
+ mempool_destroy(ceph_wb_pagevec_pool);
ceph_fscache_unregister();
}
/*
- * ceph_umount_begin - initiate forced umount. Tear down down the
+ * ceph_umount_begin - initiate forced umount. Tear down the
* mount, skipping steps that may hang while waiting for server(s).
*/
static void ceph_umount_begin(struct super_block *sb)
@@ -1107,6 +1131,15 @@ static void ceph_free_fc(struct fs_context *fc)
static int ceph_reconfigure_fc(struct fs_context *fc)
{
+ struct ceph_parse_opts_ctx *pctx = fc->fs_private;
+ struct ceph_mount_options *fsopt = pctx->opts;
+ struct ceph_fs_client *fsc = ceph_sb_to_client(fc->root->d_sb);
+
+ if (fsopt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
+ ceph_set_mount_opt(fsc, ASYNC_DIROPS);
+ else
+ ceph_clear_mount_opt(fsc, ASYNC_DIROPS);
+
sync_filesystem(fc->root->d_sb);
return 0;
}
@@ -1254,6 +1287,37 @@ static void __exit exit_ceph(void)
destroy_caches();
}
+static int param_set_metrics(const char *val, const struct kernel_param *kp)
+{
+ struct ceph_fs_client *fsc;
+ int ret;
+
+ ret = param_set_bool(val, kp);
+ if (ret) {
+ pr_err("Failed to parse sending metrics switch value '%s'\n",
+ val);
+ return ret;
+ } else if (!disable_send_metrics) {
+ // wake up all the mds clients
+ spin_lock(&ceph_fsc_lock);
+ list_for_each_entry(fsc, &ceph_fsc_list, metric_wakeup) {
+ metric_schedule_delayed(&fsc->mdsc->metric);
+ }
+ spin_unlock(&ceph_fsc_lock);
+ }
+
+ return 0;
+}
+
+static const struct kernel_param_ops param_ops_metrics = {
+ .set = param_set_metrics,
+ .get = param_get_bool,
+};
+
+bool disable_send_metrics = false;
+module_param_cb(disable_send_metrics, &param_ops_metrics, &disable_send_metrics, 0644);
+MODULE_PARM_DESC(disable_send_metrics, "Enable sending perf metrics to ceph cluster (default: on)");
+
module_init(init_ceph);
module_exit(exit_ceph);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 037cdfb2ad4f..a3995ebe0623 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -43,13 +43,16 @@
#define CEPH_MOUNT_OPT_MOUNTWAIT (1<<12) /* mount waits if no mds is up */
#define CEPH_MOUNT_OPT_NOQUOTADF (1<<13) /* no root dir quota in statfs */
#define CEPH_MOUNT_OPT_NOCOPYFROM (1<<14) /* don't use RADOS 'copy-from' op */
+#define CEPH_MOUNT_OPT_ASYNC_DIROPS (1<<15) /* allow async directory ops */
#define CEPH_MOUNT_OPT_DEFAULT \
(CEPH_MOUNT_OPT_DCACHE | \
CEPH_MOUNT_OPT_NOCOPYFROM)
#define ceph_set_mount_opt(fsc, opt) \
- (fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt;
+ (fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt
+#define ceph_clear_mount_opt(fsc, opt) \
+ (fsc)->mount_options->flags &= ~CEPH_MOUNT_OPT_##opt
#define ceph_test_mount_opt(fsc, opt) \
(!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt))
@@ -98,6 +101,8 @@ struct ceph_mount_options {
struct ceph_fs_client {
struct super_block *sb;
+ struct list_head metric_wakeup;
+
struct ceph_mount_options *mount_options;
struct ceph_client *client;
@@ -113,8 +118,6 @@ struct ceph_fs_client {
struct ceph_mds_client *mdsc;
- /* writeback */
- mempool_t *wb_pagevec_pool;
atomic_long_t writeback_count;
struct workqueue_struct *inode_wq;
@@ -125,6 +128,7 @@ struct ceph_fs_client {
struct dentry *debugfs_congestion_kb;
struct dentry *debugfs_bdi;
struct dentry *debugfs_mdsc, *debugfs_mdsmap;
+ struct dentry *debugfs_metric;
struct dentry *debugfs_mds_sessions;
#endif
@@ -170,9 +174,9 @@ struct ceph_cap {
struct list_head caps_item;
};
-#define CHECK_CAPS_NODELAY 1 /* do not delay any further */
-#define CHECK_CAPS_AUTHONLY 2 /* only check auth cap */
-#define CHECK_CAPS_FLUSH 4 /* flush any dirty caps */
+#define CHECK_CAPS_AUTHONLY 1 /* only check auth cap */
+#define CHECK_CAPS_FLUSH 2 /* flush any dirty caps */
+#define CHECK_CAPS_NOINVAL 4 /* don't invalidate pagecache */
struct ceph_cap_flush {
u64 tid;
@@ -284,6 +288,7 @@ struct ceph_dentry_info {
#define CEPH_DENTRY_REFERENCED 1
#define CEPH_DENTRY_LEASE_LIST 2
#define CEPH_DENTRY_SHRINK_LIST 4
+#define CEPH_DENTRY_PRIMARY_LINK 8
struct ceph_inode_xattrs_info {
/*
@@ -315,13 +320,14 @@ struct ceph_inode_info {
u64 i_inline_version;
u32 i_time_warp_seq;
- unsigned i_ceph_flags;
+ unsigned long i_ceph_flags;
atomic64_t i_release_count;
atomic64_t i_ordered_count;
atomic64_t i_complete_seq[2];
struct ceph_dir_layout i_dir_layout;
struct ceph_file_layout i_layout;
+ struct ceph_file_layout i_cached_layout; // for async creates
char *i_symlink;
/* for dirs */
@@ -345,14 +351,31 @@ struct ceph_inode_info {
struct rb_root i_caps; /* cap list */
struct ceph_cap *i_auth_cap; /* authoritative cap, if any */
unsigned i_dirty_caps, i_flushing_caps; /* mask of dirtied fields */
- struct list_head i_dirty_item, i_flushing_item;
+
+ /*
+ * Link to the auth cap's session's s_cap_dirty list. s_cap_dirty
+ * is protected by the mdsc->cap_dirty_lock, but each individual item
+ * is also protected by the inode's i_ceph_lock. Walking s_cap_dirty
+ * requires the mdsc->cap_dirty_lock. List presence for an item can
+ * be tested under the i_ceph_lock. Changing anything requires both.
+ */
+ struct list_head i_dirty_item;
+
+ /*
+ * Link to session's s_cap_flushing list. Protected in a similar
+ * fashion to i_dirty_item, but also by the s_mutex for changes. The
+ * s_cap_flushing list can be walked while holding either the s_mutex
+ * or msdc->cap_dirty_lock. List presence can also be checked while
+ * holding the i_ceph_lock for this inode.
+ */
+ struct list_head i_flushing_item;
+
/* we need to track cap writeback on a per-cap-bit basis, to allow
* overlapping, pipelined cap flushes to the mds. we can probably
* reduce the tid to 8 bits if we're concerned about inode size. */
struct ceph_cap_flush *i_prealloc_cap_flush;
struct list_head i_cap_flush_list;
wait_queue_head_t i_cap_wq; /* threads waiting on a capability */
- unsigned long i_hold_caps_min; /* jiffies */
unsigned long i_hold_caps_max; /* jiffies */
struct list_head i_cap_delay_list; /* for delayed cap release to mds */
struct ceph_cap_reservation i_cap_migration_resv;
@@ -361,6 +384,8 @@ struct ceph_inode_info {
dirty|flushing caps */
unsigned i_snap_caps; /* cap bits for snapped files */
+ unsigned long i_last_rd;
+ unsigned long i_last_wr;
int i_nr_by_mode[CEPH_FILE_MODE_BITS]; /* open file counts */
struct mutex i_truncate_mutex;
@@ -375,7 +400,7 @@ struct ceph_inode_info {
/* held references to caps */
int i_pin_ref;
- int i_rd_ref, i_rdcache_ref, i_wr_ref, i_wb_ref;
+ int i_rd_ref, i_rdcache_ref, i_wr_ref, i_wb_ref, i_fx_ref;
int i_wrbuffer_ref, i_wrbuffer_ref_head;
atomic_t i_filelock_ref;
atomic_t i_shared_gen; /* increment each time we get FILE_SHARED */
@@ -432,15 +457,7 @@ ceph_vino(const struct inode *inode)
return ceph_inode(inode)->i_vino;
}
-/*
- * ino_t is <64 bits on many architectures, blech.
- *
- * i_ino (kernel inode) st_ino (userspace)
- * i386 32 32
- * x86_64+ino32 64 32
- * x86_64 64 64
- */
-static inline u32 ceph_ino_to_ino32(__u64 vino)
+static inline u32 ceph_ino_to_ino32(u64 vino)
{
u32 ino = vino & 0xffffffff;
ino ^= vino >> 32;
@@ -450,34 +467,17 @@ static inline u32 ceph_ino_to_ino32(__u64 vino)
}
/*
- * kernel i_ino value
+ * Inode numbers in cephfs are 64 bits, but inode->i_ino is 32-bits on
+ * some arches. We generally do not use this value inside the ceph driver, but
+ * we do want to set it to something, so that generic vfs code has an
+ * appropriate value for tracepoints and the like.
*/
-static inline ino_t ceph_vino_to_ino(struct ceph_vino vino)
+static inline ino_t ceph_vino_to_ino_t(struct ceph_vino vino)
{
-#if BITS_PER_LONG == 32
- return ceph_ino_to_ino32(vino.ino);
-#else
+ if (sizeof(ino_t) == sizeof(u32))
+ return ceph_ino_to_ino32(vino.ino);
return (ino_t)vino.ino;
-#endif
-}
-
-/*
- * user-visible ino (stat, filldir)
- */
-#if BITS_PER_LONG == 32
-static inline ino_t ceph_translate_ino(struct super_block *sb, ino_t ino)
-{
- return ino;
-}
-#else
-static inline ino_t ceph_translate_ino(struct super_block *sb, ino_t ino)
-{
- if (ceph_test_mount_opt(ceph_sb_to_client(sb), INO32))
- ino = ceph_ino_to_ino32(ino);
- return ino;
}
-#endif
-
/* for printf-style formatting */
#define ceph_vinop(i) ceph_inode(i)->i_vino.ino, ceph_inode(i)->i_vino.snap
@@ -486,11 +486,34 @@ static inline u64 ceph_ino(struct inode *inode)
{
return ceph_inode(inode)->i_vino.ino;
}
+
static inline u64 ceph_snap(struct inode *inode)
{
return ceph_inode(inode)->i_vino.snap;
}
+/**
+ * ceph_present_ino - format an inode number for presentation to userland
+ * @sb: superblock where the inode lives
+ * @ino: inode number to (possibly) convert
+ *
+ * If the user mounted with the ino32 option, then the 64-bit value needs
+ * to be converted to something that can fit inside 32 bits. Note that
+ * internal kernel code never uses this value, so this is entirely for
+ * userland consumption.
+ */
+static inline u64 ceph_present_ino(struct super_block *sb, u64 ino)
+{
+ if (unlikely(ceph_test_mount_opt(ceph_sb_to_client(sb), INO32)))
+ return ceph_ino_to_ino32(ino);
+ return ino;
+}
+
+static inline u64 ceph_present_inode(struct inode *inode)
+{
+ return ceph_present_ino(inode->i_sb, ceph_ino(inode));
+}
+
static inline int ceph_ino_compare(struct inode *inode, void *data)
{
struct ceph_vino *pvino = (struct ceph_vino *)data;
@@ -499,11 +522,16 @@ static inline int ceph_ino_compare(struct inode *inode, void *data)
ci->i_vino.snap == pvino->snap;
}
+
static inline struct inode *ceph_find_inode(struct super_block *sb,
struct ceph_vino vino)
{
- ino_t t = ceph_vino_to_ino(vino);
- return ilookup5(sb, t, ceph_ino_compare, &vino);
+ /*
+ * NB: The hashval will be run through the fs/inode.c hash function
+ * anyway, so there is no need to squash the inode number down to
+ * 32-bits first. Just use low-order bits on arches with 32-bit long.
+ */
+ return ilookup5(sb, (unsigned long)vino.ino, ceph_ino_compare, &vino);
}
@@ -511,18 +539,18 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
* Ceph inode.
*/
#define CEPH_I_DIR_ORDERED (1 << 0) /* dentries in dir are ordered */
-#define CEPH_I_NODELAY (1 << 1) /* do not delay cap release */
#define CEPH_I_FLUSH (1 << 2) /* do not delay flush of dirty metadata */
#define CEPH_I_POOL_PERM (1 << 3) /* pool rd/wr bits are valid */
#define CEPH_I_POOL_RD (1 << 4) /* can read from pool */
#define CEPH_I_POOL_WR (1 << 5) /* can write to pool */
#define CEPH_I_SEC_INITED (1 << 6) /* security initialized */
-#define CEPH_I_CAP_DROPPED (1 << 7) /* caps were forcibly dropped */
-#define CEPH_I_KICK_FLUSH (1 << 8) /* kick flushing caps */
-#define CEPH_I_FLUSH_SNAPS (1 << 9) /* need flush snapss */
-#define CEPH_I_ERROR_WRITE (1 << 10) /* have seen write errors */
-#define CEPH_I_ERROR_FILELOCK (1 << 11) /* have seen file lock errors */
-#define CEPH_I_ODIRECT (1 << 12) /* inode in direct I/O mode */
+#define CEPH_I_KICK_FLUSH (1 << 7) /* kick flushing caps */
+#define CEPH_I_FLUSH_SNAPS (1 << 8) /* need flush snapss */
+#define CEPH_I_ERROR_WRITE (1 << 9) /* have seen write errors */
+#define CEPH_I_ERROR_FILELOCK (1 << 10) /* have seen file lock errors */
+#define CEPH_I_ODIRECT (1 << 11) /* inode in direct I/O mode */
+#define CEPH_ASYNC_CREATE_BIT (12) /* async create in flight for this */
+#define CEPH_I_ASYNC_CREATE (1 << CEPH_ASYNC_CREATE_BIT)
/*
* Masks of ceph inode work.
@@ -638,6 +666,8 @@ static inline bool __ceph_is_any_real_caps(struct ceph_inode_info *ci)
extern int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented);
extern int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int t);
+extern int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask,
+ int t);
extern int __ceph_caps_issued_other(struct ceph_inode_info *ci,
struct ceph_cap *cap);
@@ -650,12 +680,12 @@ static inline int ceph_caps_issued(struct ceph_inode_info *ci)
return issued;
}
-static inline int ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask,
- int touch)
+static inline int ceph_caps_issued_mask_metric(struct ceph_inode_info *ci,
+ int mask, int touch)
{
int r;
spin_lock(&ci->i_ceph_lock);
- r = __ceph_caps_issued_mask(ci, mask, touch);
+ r = __ceph_caps_issued_mask_metric(ci, mask, touch);
spin_unlock(&ci->i_ceph_lock);
return r;
}
@@ -674,18 +704,12 @@ extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
extern int ceph_caps_revoking(struct ceph_inode_info *ci, int mask);
extern int __ceph_caps_used(struct ceph_inode_info *ci);
-extern int __ceph_caps_file_wanted(struct ceph_inode_info *ci);
-
-/*
- * wanted, by virtue of open file modes AND cap refs (buffered/cached data)
- */
-static inline int __ceph_caps_wanted(struct ceph_inode_info *ci)
+static inline bool __ceph_is_file_opened(struct ceph_inode_info *ci)
{
- int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci);
- if (w & CEPH_CAP_FILE_BUFFER)
- w |= CEPH_CAP_FILE_EXCL; /* we want EXCL if dirty data */
- return w;
+ return ci->i_nr_by_mode[0];
}
+extern int __ceph_caps_file_wanted(struct ceph_inode_info *ci);
+extern int __ceph_caps_wanted(struct ceph_inode_info *ci);
/* what the mds thinks we want */
extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check);
@@ -899,6 +923,9 @@ static inline bool __ceph_have_pending_cap_snap(struct ceph_inode_info *ci)
}
/* inode.c */
+struct ceph_mds_reply_info_in;
+struct ceph_mds_reply_dirfrag;
+
extern const struct inode_operations ceph_file_iops;
extern struct inode *ceph_alloc_inode(struct super_block *sb);
@@ -914,6 +941,11 @@ extern void ceph_fill_file_time(struct inode *inode, int issued,
u64 time_warp_seq, struct timespec64 *ctime,
struct timespec64 *mtime,
struct timespec64 *atime);
+extern int ceph_fill_inode(struct inode *inode, struct page *locked_page,
+ struct ceph_mds_reply_info_in *iinfo,
+ struct ceph_mds_reply_dirfrag *dirinfo,
+ struct ceph_mds_session *session, int cap_fmode,
+ struct ceph_cap_reservation *caps_reservation);
extern int ceph_fill_trace(struct super_block *sb,
struct ceph_mds_request *req);
extern int ceph_readdir_prepopulate(struct ceph_mds_request *req,
@@ -1042,7 +1074,7 @@ extern struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx);
extern void ceph_add_cap(struct inode *inode,
struct ceph_mds_session *session, u64 cap_id,
- int fmode, unsigned issued, unsigned wanted,
+ unsigned issued, unsigned wanted,
unsigned cap, unsigned seq, u64 realmino, int flags,
struct ceph_cap **new_cap);
extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
@@ -1058,10 +1090,16 @@ extern void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
+void ceph_kick_flushing_inode_caps(struct ceph_mds_session *session,
+ struct ceph_inode_info *ci);
extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci,
int mds);
+extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int caps,
+ bool snap_rwsem_locked);
extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
+extern void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci,
+ int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
struct ceph_snap_context *snapc);
extern void ceph_flush_snaps(struct ceph_inode_info *ci,
@@ -1084,8 +1122,10 @@ extern int ceph_try_get_caps(struct inode *inode,
int need, int want, bool nonblock, int *got);
/* for counting open files by mode */
-extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);
-extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode);
+extern void ceph_get_fmode(struct ceph_inode_info *ci, int mode, int count);
+extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode, int count);
+extern void __ceph_touch_fmode(struct ceph_inode_info *ci,
+ struct ceph_mds_client *mdsc, int fmode);
/* addr.c */
extern const struct address_space_operations ceph_aops;
@@ -1097,7 +1137,7 @@ extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
/* file.c */
extern const struct file_operations ceph_file_fops;
-extern int ceph_renew_caps(struct inode *inode);
+extern int ceph_renew_caps(struct inode *inode, int fmode);
extern int ceph_open(struct inode *inode, struct file *file);
extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
struct file *file, unsigned flags, umode_t mode);
@@ -1175,13 +1215,14 @@ extern void ceph_handle_quota(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
struct ceph_msg *msg);
extern bool ceph_quota_is_max_files_exceeded(struct inode *inode);
-extern bool ceph_quota_is_same_realm(struct inode *old, struct inode *new);
extern bool ceph_quota_is_max_bytes_exceeded(struct inode *inode,
loff_t newlen);
extern bool ceph_quota_is_max_bytes_approaching(struct inode *inode,
loff_t newlen);
extern bool ceph_quota_update_statfs(struct ceph_fs_client *fsc,
struct kstatfs *buf);
+extern int ceph_quota_check_rename(struct ceph_mds_client *mdsc,
+ struct inode *old, struct inode *new);
extern void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc);
#endif /* _FS_CEPH_SUPER_H */
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 7b8a070a782d..3a733ac33d9b 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -497,10 +497,10 @@ static int __set_xattr(struct ceph_inode_info *ci,
kfree(*newxattr);
*newxattr = NULL;
if (xattr->should_free_val)
- kfree((void *)xattr->val);
+ kfree(xattr->val);
if (update_xattr) {
- kfree((void *)name);
+ kfree(name);
name = xattr->name;
}
ci->i_xattrs.names_size -= xattr->name_len;
@@ -566,9 +566,9 @@ static void __free_xattr(struct ceph_inode_xattr *xattr)
BUG_ON(!xattr);
if (xattr->should_free_name)
- kfree((void *)xattr->name);
+ kfree(xattr->name);
if (xattr->should_free_val)
- kfree((void *)xattr->val);
+ kfree(xattr->val);
kfree(xattr);
}
@@ -582,9 +582,9 @@ static int __remove_xattr(struct ceph_inode_info *ci,
rb_erase(&xattr->node, &ci->i_xattrs.index);
if (xattr->should_free_name)
- kfree((void *)xattr->name);
+ kfree(xattr->name);
if (xattr->should_free_val)
- kfree((void *)xattr->val);
+ kfree(xattr->val);
ci->i_xattrs.names_size -= xattr->name_len;
ci->i_xattrs.vals_size -= xattr->val_len;
@@ -856,7 +856,7 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
if (ci->i_xattrs.version == 0 ||
!((req_mask & CEPH_CAP_XATTR_SHARED) ||
- __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1))) {
+ __ceph_caps_issued_mask_metric(ci, CEPH_CAP_XATTR_SHARED, 1))) {
spin_unlock(&ci->i_ceph_lock);
/* security module gets xattr while filling trace */
@@ -914,7 +914,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
ci->i_xattrs.version, ci->i_xattrs.index_version);
if (ci->i_xattrs.version == 0 ||
- !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
+ !__ceph_caps_issued_mask_metric(ci, CEPH_CAP_XATTR_SHARED, 1)) {
spin_unlock(&ci->i_ceph_lock);
err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
if (err)