summaryrefslogtreecommitdiff
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/addr.c201
-rw-r--r--fs/ceph/caps.c199
-rw-r--r--fs/ceph/dir.c88
-rw-r--r--fs/ceph/file.c23
-rw-r--r--fs/ceph/inode.c118
-rw-r--r--fs/ceph/ioctl.c38
-rw-r--r--fs/ceph/ioctl.h55
-rw-r--r--fs/ceph/mds_client.c54
-rw-r--r--fs/ceph/mds_client.h2
-rw-r--r--fs/ceph/snap.c16
-rw-r--r--fs/ceph/super.c71
-rw-r--r--fs/ceph/super.h71
-rw-r--r--fs/ceph/xattr.c42
13 files changed, 576 insertions, 402 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 5a3953db8118..173b1d22e59b 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -87,7 +87,7 @@ static int ceph_set_page_dirty(struct page *page)
snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
/* dirty the head */
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (ci->i_head_snapc == NULL)
ci->i_head_snapc = ceph_get_snap_context(snapc);
++ci->i_wrbuffer_ref_head;
@@ -100,7 +100,7 @@ static int ceph_set_page_dirty(struct page *page)
ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
snapc, snapc->seq, snapc->num_snaps);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
/* now adjust page */
spin_lock_irq(&mapping->tree_lock);
@@ -228,102 +228,155 @@ static int ceph_readpage(struct file *filp, struct page *page)
}
/*
- * Build a vector of contiguous pages from the provided page list.
+ * Finish an async read(ahead) op.
*/
-static struct page **page_vector_from_list(struct list_head *page_list,
- unsigned *nr_pages)
+static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
{
- struct page **pages;
- struct page *page;
- int next_index, contig_pages = 0;
+ struct inode *inode = req->r_inode;
+ struct ceph_osd_reply_head *replyhead;
+ int rc, bytes;
+ int i;
- /* build page vector */
- pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS);
- if (!pages)
- return ERR_PTR(-ENOMEM);
+ /* parse reply */
+ replyhead = msg->front.iov_base;
+ WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
+ rc = le32_to_cpu(replyhead->result);
+ bytes = le32_to_cpu(msg->hdr.data_len);
- BUG_ON(list_empty(page_list));
- next_index = list_entry(page_list->prev, struct page, lru)->index;
- list_for_each_entry_reverse(page, page_list, lru) {
- if (page->index == next_index) {
- dout("readpages page %d %p\n", contig_pages, page);
- pages[contig_pages] = page;
- contig_pages++;
- next_index++;
- } else {
- break;
+ dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
+
+ /* unlock all pages, zeroing any data we didn't read */
+ for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
+ struct page *page = req->r_pages[i];
+
+ if (bytes < (int)PAGE_CACHE_SIZE) {
+ /* zero (remainder of) page */
+ int s = bytes < 0 ? 0 : bytes;
+ zero_user_segment(page, s, PAGE_CACHE_SIZE);
}
+ dout("finish_read %p uptodate %p idx %lu\n", inode, page,
+ page->index);
+ flush_dcache_page(page);
+ SetPageUptodate(page);
+ unlock_page(page);
+ page_cache_release(page);
}
- *nr_pages = contig_pages;
- return pages;
+ kfree(req->r_pages);
}
/*
- * Read multiple pages. Leave pages we don't read + unlock in page_list;
- * the caller (VM) cleans them up.
+ * start an async read(ahead) operation. return nr_pages we submitted
+ * a read for on success, or negative error code.
*/
-static int ceph_readpages(struct file *file, struct address_space *mapping,
- struct list_head *page_list, unsigned nr_pages)
+static int start_read(struct inode *inode, struct list_head *page_list, int max)
{
- struct inode *inode = file->f_dentry->d_inode;
- struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_osd_client *osdc =
&ceph_inode_to_client(inode)->client->osdc;
- int rc = 0;
- struct page **pages;
- loff_t offset;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct page *page = list_entry(page_list->prev, struct page, lru);
+ struct ceph_osd_request *req;
+ u64 off;
u64 len;
+ int i;
+ struct page **pages;
+ pgoff_t next_index;
+ int nr_pages = 0;
+ int ret;
- dout("readpages %p file %p nr_pages %d\n",
- inode, file, nr_pages);
-
- pages = page_vector_from_list(page_list, &nr_pages);
- if (IS_ERR(pages))
- return PTR_ERR(pages);
+ off = page->index << PAGE_CACHE_SHIFT;
- /* guess read extent */
- offset = pages[0]->index << PAGE_CACHE_SHIFT;
+ /* count pages */
+ next_index = page->index;
+ list_for_each_entry_reverse(page, page_list, lru) {
+ if (page->index != next_index)
+ break;
+ nr_pages++;
+ next_index++;
+ if (max && nr_pages == max)
+ break;
+ }
len = nr_pages << PAGE_CACHE_SHIFT;
- rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
- offset, &len,
- ci->i_truncate_seq, ci->i_truncate_size,
- pages, nr_pages, 0);
- if (rc == -ENOENT)
- rc = 0;
- if (rc < 0)
- goto out;
-
- for (; !list_empty(page_list) && len > 0;
- rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) {
- struct page *page =
- list_entry(page_list->prev, struct page, lru);
+ dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
+ off, len);
+
+ req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
+ off, &len,
+ CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+ NULL, 0,
+ ci->i_truncate_seq, ci->i_truncate_size,
+ NULL, false, 1, 0);
+ if (!req)
+ return -ENOMEM;
+ /* build page vector */
+ nr_pages = len >> PAGE_CACHE_SHIFT;
+ pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
+ ret = -ENOMEM;
+ if (!pages)
+ goto out;
+ for (i = 0; i < nr_pages; ++i) {
+ page = list_entry(page_list->prev, struct page, lru);
+ BUG_ON(PageLocked(page));
list_del(&page->lru);
-
- if (rc < (int)PAGE_CACHE_SIZE) {
- /* zero (remainder of) page */
- int s = rc < 0 ? 0 : rc;
- zero_user_segment(page, s, PAGE_CACHE_SIZE);
- }
-
- if (add_to_page_cache_lru(page, mapping, page->index,
+
+ dout("start_read %p adding %p idx %lu\n", inode, page,
+ page->index);
+ if (add_to_page_cache_lru(page, &inode->i_data, page->index,
GFP_NOFS)) {
page_cache_release(page);
- dout("readpages %p add_to_page_cache failed %p\n",
+ dout("start_read %p add_to_page_cache failed %p\n",
inode, page);
- continue;
+ nr_pages = i;
+ goto out_pages;
}
- dout("readpages %p adding %p idx %lu\n", inode, page,
- page->index);
- flush_dcache_page(page);
- SetPageUptodate(page);
- unlock_page(page);
- page_cache_release(page);
+ pages[i] = page;
}
- rc = 0;
+ req->r_pages = pages;
+ req->r_num_pages = nr_pages;
+ req->r_callback = finish_read;
+ req->r_inode = inode;
+
+ dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
+ ret = ceph_osdc_start_request(osdc, req, false);
+ if (ret < 0)
+ goto out_pages;
+ ceph_osdc_put_request(req);
+ return nr_pages;
+out_pages:
+ ceph_release_page_vector(pages, nr_pages);
+out:
+ ceph_osdc_put_request(req);
+ return ret;
+}
+
+
+/*
+ * Read multiple pages. Leave pages we don't read + unlock in page_list;
+ * the caller (VM) cleans them up.
+ */
+static int ceph_readpages(struct file *file, struct address_space *mapping,
+ struct list_head *page_list, unsigned nr_pages)
+{
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ int rc = 0;
+ int max = 0;
+
+ if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
+ max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
+ >> PAGE_SHIFT;
+
+ dout("readpages %p file %p nr_pages %d max %d\n", inode, file, nr_pages,
+ max);
+ while (!list_empty(page_list)) {
+ rc = start_read(inode, page_list, max);
+ if (rc < 0)
+ goto out;
+ BUG_ON(rc == 0);
+ }
out:
- kfree(pages);
+ dout("readpages %p file %p ret %d\n", inode, file, rc);
return rc;
}
@@ -338,7 +391,7 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,
struct ceph_snap_context *snapc = NULL;
struct ceph_cap_snap *capsnap = NULL;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
capsnap->context, capsnap->dirty_pages);
@@ -354,7 +407,7 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,
dout(" head snapc %p has %d dirty pages\n",
snapc, ci->i_wrbuffer_ref_head);
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return snapc;
}
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 8d74ad7ba556..8b53193e4f7c 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -309,7 +309,7 @@ void ceph_reservation_status(struct ceph_fs_client *fsc,
/*
* Find ceph_cap for given mds, if any.
*
- * Called with i_lock held.
+ * Called with i_ceph_lock held.
*/
static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
{
@@ -332,9 +332,9 @@ struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
{
struct ceph_cap *cap;
- spin_lock(&ci->vfs_inode.i_lock);
+ spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds);
- spin_unlock(&ci->vfs_inode.i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return cap;
}
@@ -361,15 +361,16 @@ static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
int ceph_get_cap_mds(struct inode *inode)
{
+ struct ceph_inode_info *ci = ceph_inode(inode);
int mds;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
mds = __ceph_get_cap_mds(ceph_inode(inode));
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return mds;
}
/*
- * Called under i_lock.
+ * Called under i_ceph_lock.
*/
static void __insert_cap_node(struct ceph_inode_info *ci,
struct ceph_cap *new)
@@ -415,7 +416,7 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
*
* If I_FLUSH is set, leave the inode at the front of the list.
*
- * Caller holds i_lock
+ * Caller holds i_ceph_lock
* -> we take mdsc->cap_delay_lock
*/
static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
@@ -457,7 +458,7 @@ static void __cap_delay_requeue_front(struct ceph_mds_client *mdsc,
/*
* Cancel delayed work on cap.
*
- * Caller must hold i_lock.
+ * Caller must hold i_ceph_lock.
*/
static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
struct ceph_inode_info *ci)
@@ -487,17 +488,15 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
ci->i_rdcache_gen++;
/*
- * if we are newly issued FILE_SHARED, clear I_COMPLETE; we
+ * if we are newly issued FILE_SHARED, clear D_COMPLETE; we
* don't know what happened to this directory while we didn't
* have the cap.
*/
if ((issued & CEPH_CAP_FILE_SHARED) &&
(had & CEPH_CAP_FILE_SHARED) == 0) {
ci->i_shared_gen++;
- if (S_ISDIR(ci->vfs_inode.i_mode)) {
- dout(" marking %p NOT complete\n", &ci->vfs_inode);
- ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
- }
+ if (S_ISDIR(ci->vfs_inode.i_mode))
+ ceph_dir_clear_complete(&ci->vfs_inode);
}
}
@@ -534,14 +533,14 @@ int ceph_add_cap(struct inode *inode,
wanted |= ceph_caps_for_mode(fmode);
retry:
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds);
if (!cap) {
if (new_cap) {
cap = new_cap;
new_cap = NULL;
} else {
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
new_cap = get_cap(mdsc, caps_reservation);
if (new_cap == NULL)
return -ENOMEM;
@@ -627,7 +626,7 @@ retry:
if (fmode >= 0)
__ceph_get_fmode(ci, fmode);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
wake_up_all(&ci->i_cap_wq);
return 0;
}
@@ -794,7 +793,7 @@ int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
struct rb_node *p;
int ret = 0;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node);
if (__cap_is_valid(cap) &&
@@ -803,7 +802,7 @@ int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
break;
}
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
dout("ceph_caps_revoking %p %s = %d\n", inode,
ceph_cap_string(mask), ret);
return ret;
@@ -857,7 +856,7 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
}
/*
- * called under i_lock
+ * called under i_ceph_lock
*/
static int __ceph_is_any_caps(struct ceph_inode_info *ci)
{
@@ -867,7 +866,7 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)
/*
* Remove a cap. Take steps to deal with a racing iterate_session_caps.
*
- * caller should hold i_lock.
+ * caller should hold i_ceph_lock.
* caller will not hold session s_mutex if called from destroy_inode.
*/
void __ceph_remove_cap(struct ceph_cap *cap)
@@ -945,7 +944,7 @@ static int send_cap_msg(struct ceph_mds_session *session,
seq, issue_seq, mseq, follows, size, max_size,
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
- msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS);
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false);
if (!msg)
return -ENOMEM;
@@ -1030,7 +1029,7 @@ static void __queue_cap_release(struct ceph_mds_session *session,
/*
* Queue cap releases when an inode is dropped from our cache. Since
- * inode is about to be destroyed, there is no need for i_lock.
+ * inode is about to be destroyed, there is no need for i_ceph_lock.
*/
void ceph_queue_caps_release(struct inode *inode)
{
@@ -1051,7 +1050,7 @@ void ceph_queue_caps_release(struct inode *inode)
/*
* Send a cap msg on the given inode. Update our caps state, then
- * drop i_lock and send the message.
+ * drop i_ceph_lock and send the message.
*
* Make note of max_size reported/requested from mds, revoked caps
* that have now been implemented.
@@ -1063,13 +1062,13 @@ void ceph_queue_caps_release(struct inode *inode)
* Return non-zero if delayed release, or we experienced an error
* such that the caller should requeue + retry later.
*
- * called with i_lock, then drops it.
+ * called with i_ceph_lock, then drops it.
* caller should hold snap_rwsem (read), s_mutex.
*/
static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
int op, int used, int want, int retain, int flushing,
unsigned *pflush_tid)
- __releases(cap->ci->vfs_inode->i_lock)
+ __releases(cap->ci->i_ceph_lock)
{
struct ceph_inode_info *ci = cap->ci;
struct inode *inode = &ci->vfs_inode;
@@ -1172,7 +1171,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
xattr_version = ci->i_xattrs.version;
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
@@ -1200,13 +1199,13 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
* Unless @again is true, skip cap_snaps that were already sent to
* the MDS (i.e., during this session).
*
- * Called under i_lock. Takes s_mutex as needed.
+ * Called under i_ceph_lock. Takes s_mutex as needed.
*/
void __ceph_flush_snaps(struct ceph_inode_info *ci,
struct ceph_mds_session **psession,
int again)
- __releases(ci->vfs_inode->i_lock)
- __acquires(ci->vfs_inode->i_lock)
+ __releases(ci->i_ceph_lock)
+ __acquires(ci->i_ceph_lock)
{
struct inode *inode = &ci->vfs_inode;
int mds;
@@ -1263,7 +1262,7 @@ retry:
session = NULL;
}
if (!session) {
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
mutex_lock(&mdsc->mutex);
session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex);
@@ -1277,7 +1276,7 @@ retry:
* deletion or migration. retry, and we'll
* get a better @mds value next time.
*/
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
goto retry;
}
@@ -1287,7 +1286,7 @@ retry:
list_del_init(&capsnap->flushing_item);
list_add_tail(&capsnap->flushing_item,
&session->s_cap_snaps_flushing);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n",
inode, capsnap, capsnap->follows, capsnap->flush_tid);
@@ -1304,7 +1303,7 @@ retry:
next_follows = capsnap->follows + 1;
ceph_put_cap_snap(capsnap);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
goto retry;
}
@@ -1324,11 +1323,9 @@ out:
static void ceph_flush_snaps(struct ceph_inode_info *ci)
{
- struct inode *inode = &ci->vfs_inode;
-
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
__ceph_flush_snaps(ci, NULL, 0);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
/*
@@ -1375,7 +1372,7 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
* Add dirty inode to the flushing list. Assigned a seq number so we
* can wait for caps to flush without starving.
*
- * Called under i_lock.
+ * Called under i_ceph_lock.
*/
static int __mark_caps_flushing(struct inode *inode,
struct ceph_mds_session *session)
@@ -1423,9 +1420,9 @@ static int try_nonblocking_invalidate(struct inode *inode)
struct ceph_inode_info *ci = ceph_inode(inode);
u32 invalidating_gen = ci->i_rdcache_gen;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
invalidate_mapping_pages(&inode->i_data, 0, -1);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (inode->i_data.nrpages == 0 &&
invalidating_gen == ci->i_rdcache_gen) {
@@ -1472,7 +1469,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
if (mdsc->stopping)
is_delayed = 1;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (ci->i_ceph_flags & CEPH_I_FLUSH)
flags |= CHECK_CAPS_FLUSH;
@@ -1482,7 +1479,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
__ceph_flush_snaps(ci, &session, 0);
goto retry_locked;
retry:
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
retry_locked:
file_wanted = __ceph_caps_file_wanted(ci);
used = __ceph_caps_used(ci);
@@ -1636,7 +1633,7 @@ ack:
if (mutex_trylock(&session->s_mutex) == 0) {
dout("inverting session/ino locks on %p\n",
session);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (took_snap_rwsem) {
up_read(&mdsc->snap_rwsem);
took_snap_rwsem = 0;
@@ -1650,7 +1647,7 @@ ack:
if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
dout("inverting snap/in locks on %p\n",
inode);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
down_read(&mdsc->snap_rwsem);
took_snap_rwsem = 1;
goto retry;
@@ -1666,10 +1663,10 @@ ack:
mds = cap->mds; /* remember mds, so we don't repeat */
sent++;
- /* __send_cap drops i_lock */
+ /* __send_cap drops i_ceph_lock */
delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, used, want,
retain, flushing, NULL);
- goto retry; /* retake i_lock and restart our cap scan. */
+ goto retry; /* retake i_ceph_lock and restart our cap scan. */
}
/*
@@ -1683,7 +1680,7 @@ ack:
else if (!is_delayed || force_requeue)
__cap_delay_requeue(mdsc, ci);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (queue_invalidate)
ceph_queue_invalidate(inode);
@@ -1706,7 +1703,7 @@ static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session,
int flushing = 0;
retry:
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode);
goto out;
@@ -1718,7 +1715,7 @@ retry:
int delayed;
if (!session) {
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
session = cap->session;
mutex_lock(&session->s_mutex);
goto retry;
@@ -1729,18 +1726,18 @@ retry:
flushing = __mark_caps_flushing(inode, session);
- /* __send_cap drops i_lock */
+ /* __send_cap drops i_ceph_lock */
delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
cap->issued | cap->implemented, flushing,
flush_tid);
if (!delayed)
goto out_unlocked;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
__cap_delay_requeue(mdsc, ci);
}
out:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
out_unlocked:
if (session && unlock_session)
mutex_unlock(&session->s_mutex);
@@ -1755,7 +1752,7 @@ static int caps_are_flushed(struct inode *inode, unsigned tid)
struct ceph_inode_info *ci = ceph_inode(inode);
int i, ret = 1;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
for (i = 0; i < CEPH_CAP_BITS; i++)
if ((ci->i_flushing_caps & (1 << i)) &&
ci->i_cap_flush_tid[i] <= tid) {
@@ -1763,7 +1760,7 @@ static int caps_are_flushed(struct inode *inode, unsigned tid)
ret = 0;
break;
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return ret;
}
@@ -1870,10 +1867,10 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
struct ceph_mds_client *mdsc =
ceph_sb_to_client(inode->i_sb)->mdsc;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (__ceph_caps_dirty(ci))
__cap_delay_requeue_front(mdsc, ci);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
return err;
}
@@ -1896,7 +1893,7 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
struct inode *inode = &ci->vfs_inode;
struct ceph_cap *cap;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
cap = ci->i_auth_cap;
if (cap && cap->session == session) {
dout("kick_flushing_caps %p cap %p capsnap %p\n", inode,
@@ -1906,7 +1903,7 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
pr_err("%p auth cap %p not mds%d ???\n", inode,
cap, session->s_mds);
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
}
@@ -1923,7 +1920,7 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_cap *cap;
int delayed = 0;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
cap = ci->i_auth_cap;
if (cap && cap->session == session) {
dout("kick_flushing_caps %p cap %p %s\n", inode,
@@ -1934,14 +1931,14 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
cap->issued | cap->implemented,
ci->i_flushing_caps, NULL);
if (delayed) {
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
__cap_delay_requeue(mdsc, ci);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
} else {
pr_err("%p auth cap %p not mds%d ???\n", inode,
cap, session->s_mds);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
}
}
@@ -1954,7 +1951,7 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
struct ceph_cap *cap;
int delayed = 0;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
cap = ci->i_auth_cap;
dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode,
ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq);
@@ -1966,12 +1963,12 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
cap->issued | cap->implemented,
ci->i_flushing_caps, NULL);
if (delayed) {
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
__cap_delay_requeue(mdsc, ci);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
} else {
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
}
@@ -1980,7 +1977,7 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
* Take references to capabilities we hold, so that we don't release
* them to the MDS prematurely.
*
- * Protected by i_lock.
+ * Protected by i_ceph_lock.
*/
static void __take_cap_refs(struct ceph_inode_info *ci, int got)
{
@@ -2018,7 +2015,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
dout("get_cap_refs %p need %s want %s\n", inode,
ceph_cap_string(need), ceph_cap_string(want));
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
/* make sure file is actually open */
file_wanted = __ceph_caps_file_wanted(ci);
@@ -2079,7 +2076,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
ceph_cap_string(have), ceph_cap_string(need));
}
out:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
dout("get_cap_refs %p ret %d got %s\n", inode,
ret, ceph_cap_string(*got));
return ret;
@@ -2096,7 +2093,7 @@ static void check_max_size(struct inode *inode, loff_t endoff)
int check = 0;
/* do we need to explicitly request a larger max_size? */
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if ((endoff >= ci->i_max_size ||
endoff > (inode->i_size << 1)) &&
endoff > ci->i_wanted_max_size) {
@@ -2105,7 +2102,7 @@ static void check_max_size(struct inode *inode, loff_t endoff)
ci->i_wanted_max_size = endoff;
check = 1;
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (check)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
}
@@ -2142,9 +2139,9 @@ retry:
*/
void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
{
- spin_lock(&ci->vfs_inode.i_lock);
+ spin_lock(&ci->i_ceph_lock);
__take_cap_refs(ci, caps);
- spin_unlock(&ci->vfs_inode.i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
/*
@@ -2162,7 +2159,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
int last = 0, put = 0, flushsnaps = 0, wake = 0;
struct ceph_cap_snap *capsnap;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (had & CEPH_CAP_PIN)
--ci->i_pin_ref;
if (had & CEPH_CAP_FILE_RD)
@@ -2195,7 +2192,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
}
}
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
last ? " last" : "", put ? " put" : "");
@@ -2227,7 +2224,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
int found = 0;
struct ceph_cap_snap *capsnap = NULL;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
ci->i_wrbuffer_ref -= nr;
last = !ci->i_wrbuffer_ref;
@@ -2276,7 +2273,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
}
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (last) {
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
@@ -2293,7 +2290,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
* Handle a cap GRANT message from the MDS. (Note that a GRANT may
* actually be a revocation if it specifies a smaller cap set.)
*
- * caller holds s_mutex and i_lock, we drop both.
+ * caller holds s_mutex and i_ceph_lock, we drop both.
*
* return value:
* 0 - ok
@@ -2304,7 +2301,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
struct ceph_mds_session *session,
struct ceph_cap *cap,
struct ceph_buffer *xattr_buf)
- __releases(inode->i_lock)
+ __releases(ci->i_ceph_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
@@ -2363,7 +2360,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
}
if ((issued & CEPH_CAP_LINK_EXCL) == 0)
- inode->i_nlink = le32_to_cpu(grant->nlink);
+ set_nlink(inode, le32_to_cpu(grant->nlink));
if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
int len = le32_to_cpu(grant->xattr_len);
@@ -2455,7 +2452,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
}
BUG_ON(cap->issued & ~cap->implemented);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (writeback)
/*
* queue inode for writeback: we can't actually call
@@ -2485,7 +2482,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
struct ceph_mds_caps *m,
struct ceph_mds_session *session,
struct ceph_cap *cap)
- __releases(inode->i_lock)
+ __releases(ci->i_ceph_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
@@ -2541,7 +2538,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
wake_up_all(&ci->i_cap_wq);
out:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (drop)
iput(inode);
}
@@ -2564,7 +2561,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n",
inode, ci, session->s_mds, follows);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
if (capsnap->follows == follows) {
if (capsnap->flush_tid != flush_tid) {
@@ -2587,7 +2584,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
capsnap, capsnap->follows);
}
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (drop)
iput(inode);
}
@@ -2600,7 +2597,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
static void handle_cap_trunc(struct inode *inode,
struct ceph_mds_caps *trunc,
struct ceph_mds_session *session)
- __releases(inode->i_lock)
+ __releases(ci->i_ceph_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
@@ -2619,7 +2616,7 @@ static void handle_cap_trunc(struct inode *inode,
inode, mds, seq, truncate_size, truncate_seq);
queue_trunc = ceph_fill_file_size(inode, issued,
truncate_seq, truncate_size, size);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (queue_trunc)
ceph_queue_vmtruncate(inode);
@@ -2648,7 +2645,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
dout("handle_cap_export inode %p ci %p mds%d mseq %d\n",
inode, ci, mds, mseq);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
/* make sure we haven't seen a higher mseq */
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
@@ -2692,7 +2689,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
}
/* else, we already released it */
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
/*
@@ -2747,9 +2744,9 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
up_read(&mdsc->snap_rwsem);
/* make sure we re-request max_size, if necessary */
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
ci->i_requested_max_size = 0;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
/*
@@ -2764,6 +2761,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_mds_client *mdsc = session->s_mdsc;
struct super_block *sb = mdsc->fsc->sb;
struct inode *inode;
+ struct ceph_inode_info *ci;
struct ceph_cap *cap;
struct ceph_mds_caps *h;
int mds = session->s_mds;
@@ -2817,6 +2815,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
/* lookup ino */
inode = ceph_find_inode(sb, vino);
+ ci = ceph_inode(inode);
dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
vino.snap, inode);
if (!inode) {
@@ -2846,16 +2845,16 @@ void ceph_handle_caps(struct ceph_mds_session *session,
}
/* the rest require a cap */
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ceph_inode(inode), mds);
if (!cap) {
dout(" no cap on %p ino %llx.%llx from mds%d\n",
inode, ceph_ino(inode), ceph_snap(inode), mds);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
goto flush_cap_releases;
}
- /* note that each of these drops i_lock for us */
+ /* note that each of these drops i_ceph_lock for us */
switch (op) {
case CEPH_CAP_OP_REVOKE:
case CEPH_CAP_OP_GRANT:
@@ -2871,7 +2870,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
break;
default:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
pr_err("ceph_handle_caps: unknown cap op %d %s\n", op,
ceph_cap_op_name(op));
}
@@ -2964,13 +2963,13 @@ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
struct inode *inode = &ci->vfs_inode;
int last = 0;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
dout("put_fmode %p fmode %d %d -> %d\n", inode, fmode,
ci->i_nr_by_mode[fmode], ci->i_nr_by_mode[fmode]-1);
BUG_ON(ci->i_nr_by_mode[fmode] == 0);
if (--ci->i_nr_by_mode[fmode] == 0)
last++;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (last && ci->i_vino.snap == CEPH_NOSNAP)
ceph_check_caps(ci, 0, NULL);
@@ -2993,7 +2992,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
int used, dirty;
int ret = 0;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
used = __ceph_caps_used(ci);
dirty = __ceph_caps_dirty(ci);
@@ -3048,7 +3047,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
inode, cap, ceph_cap_string(cap->issued));
}
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return ret;
}
@@ -3063,7 +3062,7 @@ int ceph_encode_dentry_release(void **p, struct dentry *dentry,
/*
* force an record for the directory caps if we have a dentry lease.
- * this is racy (can't take i_lock and d_lock together), but it
+ * this is racy (can't take i_ceph_lock and d_lock together), but it
* doesn't have to be perfect; the mds will revoke anything we don't
* release.
*/
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 382abc9a6a54..98954003a8d3 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -108,7 +108,7 @@ static unsigned fpos_off(loff_t p)
* falling back to a "normal" sync readdir if any dentries in the dir
* are dropped.
*
- * I_COMPLETE tells indicates we have all dentries in the dir. It is
+ * D_COMPLETE tells indicates we have all dentries in the dir. It is
* defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
* the MDS if/when the directory is modified).
*/
@@ -199,8 +199,8 @@ more:
filp->f_pos++;
/* make sure a dentry wasn't dropped while we didn't have parent lock */
- if (!ceph_i_test(dir, CEPH_I_COMPLETE)) {
- dout(" lost I_COMPLETE on %p; falling back to mds\n", dir);
+ if (!ceph_dir_test_complete(dir)) {
+ dout(" lost D_COMPLETE on %p; falling back to mds\n", dir);
err = -EAGAIN;
goto out;
}
@@ -281,18 +281,18 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
}
/* can we use the dcache? */
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if ((filp->f_pos == 2 || fi->dentry) &&
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
ceph_snap(inode) != CEPH_SNAPDIR &&
- (ci->i_ceph_flags & CEPH_I_COMPLETE) &&
+ ceph_dir_test_complete(inode) &&
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
err = __dcache_readdir(filp, dirent, filldir);
if (err != -EAGAIN)
return err;
} else {
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
if (fi->dentry) {
err = note_last_dentry(fi, fi->dentry->d_name.name,
@@ -351,7 +351,7 @@ more:
if (!req->r_did_prepopulate) {
dout("readdir !did_prepopulate");
- fi->dir_release_count--; /* preclude I_COMPLETE */
+ fi->dir_release_count--; /* preclude D_COMPLETE */
}
/* note next offset and last dentry name */
@@ -428,13 +428,12 @@ more:
* were released during the whole readdir, and we should have
* the complete dir contents in our cache.
*/
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (ci->i_release_count == fi->dir_release_count) {
- dout(" marking %p complete\n", inode);
- /* ci->i_ceph_flags |= CEPH_I_COMPLETE; */
+ ceph_dir_set_complete(inode);
ci->i_max_offset = filp->f_pos;
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
dout("readdir %p filp %p done.\n", inode, filp);
return 0;
@@ -608,21 +607,21 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
struct ceph_inode_info *ci = ceph_inode(dir);
struct ceph_dentry_info *di = ceph_dentry(dentry);
- spin_lock(&dir->i_lock);
+ spin_lock(&ci->i_ceph_lock);
dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags);
if (strncmp(dentry->d_name.name,
fsc->mount_options->snapdir_name,
dentry->d_name.len) &&
!is_root_ceph_dentry(dir, dentry) &&
- (ci->i_ceph_flags & CEPH_I_COMPLETE) &&
+ ceph_dir_test_complete(dir) &&
(__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
- spin_unlock(&dir->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
dout(" dir %p complete, -ENOENT\n", dir);
d_add(dentry, NULL);
di->lease_shared_gen = ci->i_shared_gen;
return NULL;
}
- spin_unlock(&dir->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
op = ceph_snap(dir) == CEPH_SNAPDIR ?
@@ -842,12 +841,12 @@ static int drop_caps_for_unlink(struct inode *inode)
struct ceph_inode_info *ci = ceph_inode(inode);
int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (inode->i_nlink == 1) {
drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
ci->i_ceph_flags |= CEPH_I_NODELAY;
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return drop;
}
@@ -934,7 +933,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
*/
/* d_move screws up d_subdirs order */
- ceph_i_clear(new_dir, CEPH_I_COMPLETE);
+ ceph_dir_clear_complete(new_dir);
d_move(old_dentry, new_dentry);
@@ -1016,10 +1015,10 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
struct ceph_dentry_info *di = ceph_dentry(dentry);
int valid = 0;
- spin_lock(&dir->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (ci->i_shared_gen == di->lease_shared_gen)
valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
- spin_unlock(&dir->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n",
dir, (unsigned)ci->i_shared_gen, dentry,
(unsigned)di->lease_shared_gen, valid);
@@ -1092,7 +1091,52 @@ static int ceph_snapdir_d_revalidate(struct dentry *dentry,
return 1;
}
+/*
+ * Set/clear/test dir complete flag on the dir's dentry.
+ */
+void ceph_dir_set_complete(struct inode *inode)
+{
+ /* not yet implemented */
+}
+
+void ceph_dir_clear_complete(struct inode *inode)
+{
+ /* not yet implemented */
+}
+
+bool ceph_dir_test_complete(struct inode *inode)
+{
+ /* not yet implemented */
+ return false;
+}
+
+/*
+ * When the VFS prunes a dentry from the cache, we need to clear the
+ * complete flag on the parent directory.
+ *
+ * Called under dentry->d_lock.
+ */
+static void ceph_d_prune(struct dentry *dentry)
+{
+ struct ceph_dentry_info *di;
+
+ dout("ceph_d_prune %p\n", dentry);
+ /* do we have a valid parent? */
+ if (!dentry->d_parent || IS_ROOT(dentry))
+ return;
+
+ /* if we are not hashed, we don't affect D_COMPLETE */
+ if (d_unhashed(dentry))
+ return;
+
+ /*
+ * we hold d_lock, so d_parent is stable, and d_fsdata is never
+ * cleared until d_release
+ */
+ di = ceph_dentry(dentry->d_parent);
+ clear_bit(CEPH_D_COMPLETE, &di->flags);
+}
/*
* read() on a dir. This weird interface hack only works if mounted
@@ -1306,6 +1350,7 @@ const struct inode_operations ceph_dir_iops = {
const struct dentry_operations ceph_dentry_ops = {
.d_revalidate = ceph_d_revalidate,
.d_release = ceph_d_release,
+ .d_prune = ceph_d_prune,
};
const struct dentry_operations ceph_snapdir_dentry_ops = {
@@ -1315,4 +1360,5 @@ const struct dentry_operations ceph_snapdir_dentry_ops = {
const struct dentry_operations ceph_snap_dentry_ops = {
.d_release = ceph_d_release,
+ .d_prune = ceph_d_prune,
};
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index ce549d31eeb7..ed72428d9c75 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -147,9 +147,9 @@ int ceph_open(struct inode *inode, struct file *file)
/* trivially open snapdir */
if (ceph_snap(inode) == CEPH_SNAPDIR) {
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
__ceph_get_fmode(ci, fmode);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return ceph_init_file(inode, file, fmode);
}
@@ -158,7 +158,7 @@ int ceph_open(struct inode *inode, struct file *file)
* write) or any MDS (for read). Update wanted set
* asynchronously.
*/
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (__ceph_is_any_real_caps(ci) &&
(((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) {
int mds_wanted = __ceph_caps_mds_wanted(ci);
@@ -168,7 +168,7 @@ int ceph_open(struct inode *inode, struct file *file)
inode, fmode, ceph_cap_string(wanted),
ceph_cap_string(issued));
__ceph_get_fmode(ci, fmode);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
/* adjust wanted? */
if ((issued & wanted) != wanted &&
@@ -180,10 +180,10 @@ int ceph_open(struct inode *inode, struct file *file)
} else if (ceph_snap(inode) != CEPH_NOSNAP &&
(ci->i_snap_caps & wanted) == wanted) {
__ceph_get_fmode(ci, fmode);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return ceph_init_file(inode, file, fmode);
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted));
req = prepare_open_request(inode->i_sb, flags, 0);
@@ -743,9 +743,9 @@ retry_snap:
*/
int dirty;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
ceph_put_cap_refs(ci, got);
ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
@@ -764,9 +764,9 @@ retry_snap:
if (ret >= 0) {
int dirty;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (dirty)
__mark_inode_dirty(inode, dirty);
}
@@ -797,7 +797,8 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int origin)
mutex_lock(&inode->i_mutex);
__ceph_do_pending_vmtruncate(inode);
- if (origin != SEEK_CUR || origin != SEEK_SET) {
+
+ if (origin == SEEK_END || origin == SEEK_DATA || origin == SEEK_HOLE) {
ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
if (ret < 0) {
offset = ret;
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 095799ba9dd1..87fb132fb330 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -9,7 +9,6 @@
#include <linux/namei.h>
#include <linux/writeback.h>
#include <linux/vmalloc.h>
-#include <linux/pagevec.h>
#include "super.h"
#include "mds_client.h"
@@ -298,6 +297,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
dout("alloc_inode %p\n", &ci->vfs_inode);
+ spin_lock_init(&ci->i_ceph_lock);
+
ci->i_version = 0;
ci->i_time_warp_seq = 0;
ci->i_ceph_flags = 0;
@@ -584,7 +585,7 @@ static int fill_inode(struct inode *inode,
iinfo->xattr_len);
}
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
/*
* provided version will be odd if inode value is projected,
@@ -619,7 +620,7 @@ static int fill_inode(struct inode *inode,
}
if ((issued & CEPH_CAP_LINK_EXCL) == 0)
- inode->i_nlink = le32_to_cpu(info->nlink);
+ set_nlink(inode, le32_to_cpu(info->nlink));
/* be careful with mtime, atime, size */
ceph_decode_timespec(&atime, &info->atime);
@@ -681,7 +682,7 @@ static int fill_inode(struct inode *inode,
char *sym;
BUG_ON(symlen != inode->i_size);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
err = -ENOMEM;
sym = kmalloc(symlen+1, GFP_NOFS);
@@ -690,7 +691,7 @@ static int fill_inode(struct inode *inode,
memcpy(sym, iinfo->symlink, symlen);
sym[symlen] = 0;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (!ci->i_symlink)
ci->i_symlink = sym;
else
@@ -716,7 +717,7 @@ static int fill_inode(struct inode *inode,
}
no_change:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
/* queue truncate if we saw i_size decrease */
if (queue_trunc)
@@ -751,13 +752,13 @@ no_change:
info->cap.flags,
caps_reservation);
} else {
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
dout(" %p got snap_caps %s\n", inode,
ceph_cap_string(le32_to_cpu(info->cap.caps)));
ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
if (cap_fmode >= 0)
__ceph_get_fmode(ci, cap_fmode);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
} else if (cap_fmode >= 0) {
pr_warning("mds issued no caps on %llx.%llx\n",
@@ -772,9 +773,9 @@ no_change:
ceph_snap(inode) == CEPH_NOSNAP &&
(le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
(issued & CEPH_CAP_FILE_EXCL) == 0 &&
- (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
+ !ceph_dir_test_complete(inode)) {
dout(" marking %p complete (empty)\n", inode);
- /* ci->i_ceph_flags |= CEPH_I_COMPLETE; */
+ ceph_dir_set_complete(inode);
ci->i_max_offset = 2;
}
@@ -850,19 +851,20 @@ static void ceph_set_dentry_offset(struct dentry *dn)
{
struct dentry *dir = dn->d_parent;
struct inode *inode = dir->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_dentry_info *di;
BUG_ON(!inode);
di = ceph_dentry(dn);
- spin_lock(&inode->i_lock);
- if ((ceph_inode(inode)->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
- spin_unlock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
+ if (!ceph_dir_test_complete(inode)) {
+ spin_unlock(&ci->i_ceph_lock);
return;
}
di->offset = ceph_inode(inode)->i_max_offset++;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
spin_lock(&dir->d_lock);
spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED);
@@ -1057,7 +1059,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
* d_move() puts the renamed dentry at the end of
* d_subdirs. We need to assign it an appropriate
* directory offset so we can behave when holding
- * I_COMPLETE.
+ * D_COMPLETE.
*/
ceph_set_dentry_offset(req->r_old_dentry);
dout("dn %p gets new offset %lld\n", req->r_old_dentry,
@@ -1309,7 +1311,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
struct ceph_inode_info *ci = ceph_inode(inode);
int ret = 0;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
inode->i_size = size;
inode->i_blocks = (size + (1 << 9) - 1) >> 9;
@@ -1319,7 +1321,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
(ci->i_reported_size << 1) < ci->i_max_size)
ret = 1;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return ret;
}
@@ -1329,12 +1331,13 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
*/
void ceph_queue_writeback(struct inode *inode)
{
+ ihold(inode);
if (queue_work(ceph_inode_to_client(inode)->wb_wq,
&ceph_inode(inode)->i_wb_work)) {
dout("ceph_queue_writeback %p\n", inode);
- ihold(inode);
} else {
dout("ceph_queue_writeback %p failed\n", inode);
+ iput(inode);
}
}
@@ -1354,55 +1357,13 @@ static void ceph_writeback_work(struct work_struct *work)
*/
void ceph_queue_invalidate(struct inode *inode)
{
+ ihold(inode);
if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq,
&ceph_inode(inode)->i_pg_inv_work)) {
dout("ceph_queue_invalidate %p\n", inode);
- ihold(inode);
} else {
dout("ceph_queue_invalidate %p failed\n", inode);
- }
-}
-
-/*
- * invalidate any pages that are not dirty or under writeback. this
- * includes pages that are clean and mapped.
- */
-static void ceph_invalidate_nondirty_pages(struct address_space *mapping)
-{
- struct pagevec pvec;
- pgoff_t next = 0;
- int i;
-
- pagevec_init(&pvec, 0);
- while (pagevec_lookup(&pvec, mapping, next, PAGEVEC_SIZE)) {
- for (i = 0; i < pagevec_count(&pvec); i++) {
- struct page *page = pvec.pages[i];
- pgoff_t index;
- int skip_page =
- (PageDirty(page) || PageWriteback(page));
-
- if (!skip_page)
- skip_page = !trylock_page(page);
-
- /*
- * We really shouldn't be looking at the ->index of an
- * unlocked page. But we're not allowed to lock these
- * pages. So we rely upon nobody altering the ->index
- * of this (pinned-by-us) page.
- */
- index = page->index;
- if (index > next)
- next = index;
- next++;
-
- if (skip_page)
- continue;
-
- generic_error_remove_page(mapping, page);
- unlock_page(page);
- }
- pagevec_release(&pvec);
- cond_resched();
+ iput(inode);
}
}
@@ -1418,20 +1379,20 @@ static void ceph_invalidate_work(struct work_struct *work)
u32 orig_gen;
int check = 0;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
dout("invalidate_pages %p gen %d revoking %d\n", inode,
ci->i_rdcache_gen, ci->i_rdcache_revoking);
if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
/* nevermind! */
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
goto out;
}
orig_gen = ci->i_rdcache_gen;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
- ceph_invalidate_nondirty_pages(inode->i_mapping);
+ truncate_inode_pages(&inode->i_data, 0);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (orig_gen == ci->i_rdcache_gen &&
orig_gen == ci->i_rdcache_revoking) {
dout("invalidate_pages %p gen %d successful\n", inode,
@@ -1443,7 +1404,7 @@ static void ceph_invalidate_work(struct work_struct *work)
inode, orig_gen, ci->i_rdcache_gen,
ci->i_rdcache_revoking);
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (check)
ceph_check_caps(ci, 0, NULL);
@@ -1478,13 +1439,14 @@ void ceph_queue_vmtruncate(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
+ ihold(inode);
if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,
&ci->i_vmtruncate_work)) {
dout("ceph_queue_vmtruncate %p\n", inode);
- ihold(inode);
} else {
dout("ceph_queue_vmtruncate %p failed, pending=%d\n",
inode, ci->i_truncate_pending);
+ iput(inode);
}
}
@@ -1501,10 +1463,10 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
int wrbuffer_refs, wake = 0;
retry:
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (ci->i_truncate_pending == 0) {
dout("__do_pending_vmtruncate %p none pending\n", inode);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return;
}
@@ -1515,7 +1477,7 @@ retry:
if (ci->i_wrbuffer_ref_head < ci->i_wrbuffer_ref) {
dout("__do_pending_vmtruncate %p flushing snaps first\n",
inode);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
filemap_write_and_wait_range(&inode->i_data, 0,
inode->i_sb->s_maxbytes);
goto retry;
@@ -1525,15 +1487,15 @@ retry:
wrbuffer_refs = ci->i_wrbuffer_ref;
dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode,
ci->i_truncate_pending, to);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
truncate_inode_pages(inode->i_mapping, to);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
ci->i_truncate_pending--;
if (ci->i_truncate_pending == 0)
wake = 1;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (wrbuffer_refs == 0)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
@@ -1588,7 +1550,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
if (IS_ERR(req))
return PTR_ERR(req);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
issued = __ceph_caps_issued(ci, NULL);
dout("setattr %p issued %s\n", inode, ceph_cap_string(issued));
@@ -1736,7 +1698,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
}
release &= issued;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (inode_dirty_flags)
__mark_inode_dirty(inode, inode_dirty_flags);
@@ -1758,7 +1720,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
__ceph_do_pending_vmtruncate(inode);
return err;
out:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
ceph_mdsc_put_request(req);
return err;
}
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index 3b256b50f7d8..790914a598dd 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -42,17 +42,39 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_mds_request *req;
struct ceph_ioctl_layout l;
+ struct ceph_inode_info *ci = ceph_inode(file->f_dentry->d_inode);
+ struct ceph_ioctl_layout nl;
int err, i;
- /* copy and validate */
if (copy_from_user(&l, arg, sizeof(l)))
return -EFAULT;
- if ((l.object_size & ~PAGE_MASK) ||
- (l.stripe_unit & ~PAGE_MASK) ||
- !l.stripe_unit ||
- (l.object_size &&
- (unsigned)l.object_size % (unsigned)l.stripe_unit))
+ /* validate changed params against current layout */
+ err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT);
+ if (!err) {
+ nl.stripe_unit = ceph_file_layout_su(ci->i_layout);
+ nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
+ nl.object_size = ceph_file_layout_object_size(ci->i_layout);
+ nl.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool);
+ nl.preferred_osd =
+ (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred);
+ } else
+ return err;
+
+ if (l.stripe_count)
+ nl.stripe_count = l.stripe_count;
+ if (l.stripe_unit)
+ nl.stripe_unit = l.stripe_unit;
+ if (l.object_size)
+ nl.object_size = l.object_size;
+ if (l.data_pool)
+ nl.data_pool = l.data_pool;
+ if (l.preferred_osd)
+ nl.preferred_osd = l.preferred_osd;
+
+ if ((nl.object_size & ~PAGE_MASK) ||
+ (nl.stripe_unit & ~PAGE_MASK) ||
+ ((unsigned)nl.object_size % (unsigned)nl.stripe_unit))
return -EINVAL;
/* make sure it's a valid data pool */
@@ -219,11 +241,11 @@ static long ceph_ioctl_lazyio(struct file *file)
struct ceph_inode_info *ci = ceph_inode(inode);
if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) {
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
ci->i_nr_by_mode[fi->fmode]--;
fi->fmode |= CEPH_FILE_MODE_LAZY;
ci->i_nr_by_mode[fi->fmode]++;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
dout("ioctl_layzio: file %p marked lazy\n", file);
ceph_check_caps(ci, 0, NULL);
diff --git a/fs/ceph/ioctl.h b/fs/ceph/ioctl.h
index 0c5167e43180..be4a60487333 100644
--- a/fs/ceph/ioctl.h
+++ b/fs/ceph/ioctl.h
@@ -6,7 +6,31 @@
#define CEPH_IOCTL_MAGIC 0x97
-/* just use u64 to align sanely on all archs */
+/*
+ * CEPH_IOC_GET_LAYOUT - get file layout or dir layout policy
+ * CEPH_IOC_SET_LAYOUT - set file layout
+ * CEPH_IOC_SET_LAYOUT_POLICY - set dir layout policy
+ *
+ * The file layout specifies how file data is striped over objects in
+ * the distributed object store, which object pool they belong to (if
+ * it differs from the default), and an optional 'preferred osd' to
+ * store them on.
+ *
+ * Files get a new layout based on the policy set on the containing
+ * directory or one of its ancestors. The GET_LAYOUT ioctl will let
+ * you examine the layout for a file or the policy on a directory.
+ *
+ * SET_LAYOUT will let you set a layout on a newly created file. This
+ * only works immediately after the file is created and before any
+ * data is written to it.
+ *
+ * SET_LAYOUT_POLICY will let you set a layout policy (default layout)
+ * on a directory that will apply to any new files created in that
+ * directory (or any child directory that doesn't specify a layout of
+ * its own).
+ */
+
+/* use u64 to align sanely on all archs */
struct ceph_ioctl_layout {
__u64 stripe_unit, stripe_count, object_size;
__u64 data_pool;
@@ -21,6 +45,8 @@ struct ceph_ioctl_layout {
struct ceph_ioctl_layout)
/*
+ * CEPH_IOC_GET_DATALOC - get location of file data in the cluster
+ *
* Extract identity, address of the OSD and object storing a given
* file offset.
*/
@@ -39,7 +65,34 @@ struct ceph_ioctl_dataloc {
#define CEPH_IOC_GET_DATALOC _IOWR(CEPH_IOCTL_MAGIC, 3, \
struct ceph_ioctl_dataloc)
+/*
+ * CEPH_IOC_LAZYIO - relax consistency
+ *
+ * Normally Ceph switches to synchronous IO when multiple clients have
+ * the file open (and or more for write). Reads and writes bypass the
+ * page cache and go directly to the OSD. Setting this flag on a file
+ * descriptor will allow buffered IO for this file in cases where the
+ * application knows it won't interfere with other nodes (or doesn't
+ * care).
+ */
#define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4)
+
+/*
+ * CEPH_IOC_SYNCIO - force synchronous IO
+ *
+ * This ioctl sets a file flag that forces the synchronous IO that
+ * bypasses the page cache, even if it is not necessary. This is
+ * essentially the opposite behavior of IOC_LAZYIO. This forces the
+ * same read/write path as a file opened by multiple clients when one
+ * or more of those clients is opened for write.
+ *
+ * Note that this type of sync IO takes a different path than a file
+ * opened with O_SYNC/D_SYNC (writes hit the page cache and are
+ * immediately flushed on page boundaries). It is very similar to
+ * O_DIRECT (writes bypass the page cache) excep that O_DIRECT writes
+ * are not copied (user page must remain stable) and O_DIRECT writes
+ * have alignment restrictions (on the buffer and file offset).
+ */
#define CEPH_IOC_SYNCIO _IO(CEPH_IOCTL_MAGIC, 5)
#endif
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 86c59e16ba74..6203d805eb45 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -619,7 +619,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
*
* Called under mdsc->mutex.
*/
-struct dentry *get_nonsnap_parent(struct dentry *dentry)
+static struct dentry *get_nonsnap_parent(struct dentry *dentry)
{
/*
* we don't need to worry about protecting the d_parent access
@@ -732,21 +732,21 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
}
}
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
cap = NULL;
if (mode == USE_AUTH_MDS)
cap = ci->i_auth_cap;
if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
if (!cap) {
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
goto random;
}
mds = cap->session->s_mds;
dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
inode, ceph_vinop(inode), mds,
cap == ci->i_auth_cap ? "auth " : "", cap);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return mds;
random:
@@ -764,7 +764,8 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
struct ceph_msg *msg;
struct ceph_mds_session_head *h;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
+ false);
if (!msg) {
pr_err("create_session_msg ENOMEM creating msg\n");
return NULL;
@@ -950,7 +951,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
__ceph_remove_cap(cap);
if (!__ceph_is_any_real_caps(ci)) {
struct ceph_mds_client *mdsc =
@@ -983,7 +984,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
}
spin_unlock(&mdsc->cap_dirty_lock);
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
while (drop--)
iput(inode);
return 0;
@@ -1014,10 +1015,10 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
wake_up_all(&ci->i_cap_wq);
if (arg) {
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
ci->i_wanted_max_size = 0;
ci->i_requested_max_size = 0;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
return 0;
}
@@ -1150,7 +1151,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
if (session->s_trim_caps <= 0)
return -1;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
mine = cap->issued | cap->implemented;
used = __ceph_caps_used(ci);
oissued = __ceph_caps_issued_other(ci, cap);
@@ -1169,7 +1170,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
__ceph_remove_cap(cap);
} else {
/* try to drop referring dentries */
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
d_prune_aliases(inode);
dout("trim_caps_cb %p cap %p pruned, count now %d\n",
inode, cap, atomic_read(&inode->i_count));
@@ -1177,7 +1178,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
}
out:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return 0;
}
@@ -1240,7 +1241,7 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
while (session->s_num_cap_releases < session->s_nr_caps + extra) {
spin_unlock(&session->s_cap_lock);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
- GFP_NOFS);
+ GFP_NOFS, false);
if (!msg)
goto out_unlocked;
dout("add_cap_releases %p msg %p now %d\n", session, msg,
@@ -1295,7 +1296,7 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
i_flushing_item);
struct inode *inode = &ci->vfs_inode;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (ci->i_cap_flush_seq <= want_flush_seq) {
dout("check_cap_flush still flushing %p "
"seq %lld <= %lld to mds%d\n", inode,
@@ -1303,7 +1304,7 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
session->s_mds);
ret = 0;
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
@@ -1494,6 +1495,7 @@ retry:
pos, temp);
} else if (stop_on_nosnap && inode &&
ceph_snap(inode) == CEPH_NOSNAP) {
+ spin_unlock(&temp->d_lock);
break;
} else {
pos -= temp->d_name.len;
@@ -1652,7 +1654,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
if (req->r_old_dentry_drop)
len += req->r_old_dentry->d_name.len;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
if (!msg) {
msg = ERR_PTR(-ENOMEM);
goto out_free2;
@@ -2001,7 +2003,7 @@ out:
}
/*
- * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
+ * Invalidate dir D_COMPLETE, dentry lease state on an aborted MDS
* namespace request.
*/
void ceph_invalidate_dir_request(struct ceph_mds_request *req)
@@ -2009,11 +2011,11 @@ void ceph_invalidate_dir_request(struct ceph_mds_request *req)
struct inode *inode = req->r_locked_dir;
struct ceph_inode_info *ci = ceph_inode(inode);
- dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
- spin_lock(&inode->i_lock);
- ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
+ dout("invalidate_dir_request %p (D_COMPLETE, lease(s))\n", inode);
+ spin_lock(&ci->i_ceph_lock);
+ ceph_dir_clear_complete(inode);
ci->i_release_count++;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (req->r_dentry)
ceph_invalidate_dentry_lease(req->r_dentry);
@@ -2421,7 +2423,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
if (err)
goto out_free;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */
@@ -2444,7 +2446,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
rec.v1.pathbase = cpu_to_le64(pathbase);
reclen = sizeof(rec.v1);
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (recon_state->flock) {
int num_fcntl_locks, num_flock_locks;
@@ -2518,7 +2520,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
goto fail_nopagelist;
ceph_pagelist_init(pagelist);
- reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
+ reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
if (!reply)
goto fail_nomsg;
@@ -2831,7 +2833,7 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
dnamelen = dentry->d_name.len;
len += dnamelen;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
if (!msg)
return;
lease = msg->front.iov_base;
@@ -3153,7 +3155,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
/*
* true if all sessions are closed, or we force unmount
*/
-bool done_closing_sessions(struct ceph_mds_client *mdsc)
+static bool done_closing_sessions(struct ceph_mds_client *mdsc)
{
int i, n = 0;
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 4bb239921dbd..a50ca0e39475 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -20,7 +20,7 @@
*
* mdsc->snap_rwsem
*
- * inode->i_lock
+ * ci->i_ceph_lock
* mdsc->snap_flush_lock
* mdsc->cap_delay_lock
*
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index e26437191333..a559c80f127a 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -446,7 +446,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
return;
}
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
used = __ceph_caps_used(ci);
dirty = __ceph_caps_dirty(ci);
@@ -528,7 +528,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
kfree(capsnap);
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
/*
@@ -537,7 +537,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
*
* If capsnap can now be flushed, add to snap_flush list, and return 1.
*
- * Caller must hold i_lock.
+ * Caller must hold i_ceph_lock.
*/
int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap *capsnap)
@@ -739,9 +739,9 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
inode = &ci->vfs_inode;
ihold(inode);
spin_unlock(&mdsc->snap_flush_lock);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
__ceph_flush_snaps(ci, &session, 0);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
iput(inode);
spin_lock(&mdsc->snap_flush_lock);
}
@@ -847,7 +847,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
continue;
ci = ceph_inode(inode);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (!ci->i_snap_realm)
goto skip_inode;
/*
@@ -876,7 +876,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
oldrealm = ci->i_snap_realm;
ci->i_snap_realm = realm;
spin_unlock(&realm->inodes_with_caps_lock);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
ceph_get_snap_realm(mdsc, realm);
ceph_put_snap_realm(mdsc, oldrealm);
@@ -885,7 +885,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
continue;
skip_inode:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
iput(inode);
}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 88bacaf385d9..b48f15f101a0 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -114,6 +114,7 @@ static int ceph_sync_fs(struct super_block *sb, int wait)
enum {
Opt_wsize,
Opt_rsize,
+ Opt_rasize,
Opt_caps_wanted_delay_min,
Opt_caps_wanted_delay_max,
Opt_cap_release_safety,
@@ -136,6 +137,7 @@ enum {
static match_table_t fsopt_tokens = {
{Opt_wsize, "wsize=%d"},
{Opt_rsize, "rsize=%d"},
+ {Opt_rasize, "rasize=%d"},
{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
{Opt_cap_release_safety, "cap_release_safety=%d"},
@@ -196,6 +198,9 @@ static int parse_fsopt_token(char *c, void *private)
case Opt_rsize:
fsopt->rsize = intval;
break;
+ case Opt_rasize:
+ fsopt->rasize = intval;
+ break;
case Opt_caps_wanted_delay_min:
fsopt->caps_wanted_delay_min = intval;
break;
@@ -289,28 +294,29 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
dout("parse_mount_options %p, dev_name '%s'\n", fsopt, dev_name);
- fsopt->sb_flags = flags;
- fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
+ fsopt->sb_flags = flags;
+ fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
- fsopt->rsize = CEPH_RSIZE_DEFAULT;
- fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
+ fsopt->rsize = CEPH_RSIZE_DEFAULT;
+ fsopt->rasize = CEPH_RASIZE_DEFAULT;
+ fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
- fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT;
- fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
- fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
- fsopt->congestion_kb = default_congestion_kb();
-
- /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
- err = -EINVAL;
- if (!dev_name)
- goto out;
- *path = strstr(dev_name, ":/");
- if (*path == NULL) {
- pr_err("device name is missing path (no :/ in %s)\n",
- dev_name);
- goto out;
- }
+ fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT;
+ fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
+ fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
+ fsopt->congestion_kb = default_congestion_kb();
+
+ /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
+ err = -EINVAL;
+ if (!dev_name)
+ goto out;
+ *path = strstr(dev_name, ":/");
+ if (*path == NULL) {
+ pr_err("device name is missing path (no :/ in %s)\n",
+ dev_name);
+ goto out;
+ }
dev_name_end = *path;
dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
@@ -376,6 +382,8 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt)
seq_printf(m, ",wsize=%d", fsopt->wsize);
if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
seq_printf(m, ",rsize=%d", fsopt->rsize);
+ if (fsopt->rasize != CEPH_RASIZE_DEFAULT)
+ seq_printf(m, ",rasize=%d", fsopt->rasize);
if (fsopt->congestion_kb != default_congestion_kb())
seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
@@ -418,24 +426,27 @@ static int extra_mon_dispatch(struct ceph_client *client, struct ceph_msg *msg)
/*
* create a new fs client
*/
-struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
+static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
struct ceph_options *opt)
{
struct ceph_fs_client *fsc;
+ const unsigned supported_features =
+ CEPH_FEATURE_FLOCK |
+ CEPH_FEATURE_DIRLAYOUTHASH;
+ const unsigned required_features = 0;
int err = -ENOMEM;
fsc = kzalloc(sizeof(*fsc), GFP_KERNEL);
if (!fsc)
return ERR_PTR(-ENOMEM);
- fsc->client = ceph_create_client(opt, fsc);
+ fsc->client = ceph_create_client(opt, fsc, supported_features,
+ required_features);
if (IS_ERR(fsc->client)) {
err = PTR_ERR(fsc->client);
goto fail;
}
fsc->client->extra_mon_dispatch = extra_mon_dispatch;
- fsc->client->supported_features |= CEPH_FEATURE_FLOCK |
- CEPH_FEATURE_DIRLAYOUTHASH;
fsc->client->monc.want_mdsmap = 1;
fsc->mount_options = fsopt;
@@ -491,7 +502,7 @@ fail:
return ERR_PTR(err);
}
-void destroy_fs_client(struct ceph_fs_client *fsc)
+static void destroy_fs_client(struct ceph_fs_client *fsc)
{
dout("destroy_fs_client %p\n", fsc);
@@ -627,10 +638,12 @@ static struct dentry *open_root_dentry(struct ceph_fs_client *fsc,
if (err == 0) {
dout("open_root_inode success\n");
if (ceph_ino(req->r_target_inode) == CEPH_INO_ROOT &&
- fsc->sb->s_root == NULL)
+ fsc->sb->s_root == NULL) {
root = d_alloc_root(req->r_target_inode);
- else
+ ceph_init_dentry(root);
+ } else {
root = d_obtain_alias(req->r_target_inode);
+ }
req->r_target_inode = NULL;
dout("open_root_inode success, root dentry is %p\n", root);
} else {
@@ -774,10 +787,10 @@ static int ceph_register_bdi(struct super_block *sb,
{
int err;
- /* set ra_pages based on rsize mount option? */
- if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
+ /* set ra_pages based on rasize mount option? */
+ if (fsc->mount_options->rasize >= PAGE_CACHE_SIZE)
fsc->backing_dev_info.ra_pages =
- (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
+ (fsc->mount_options->rasize + PAGE_CACHE_SIZE - 1)
>> PAGE_SHIFT;
else
fsc->backing_dev_info.ra_pages =
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index a23eed526f05..edcbf3774a56 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -36,7 +36,8 @@
#define ceph_test_mount_opt(fsc, opt) \
(!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt))
-#define CEPH_RSIZE_DEFAULT (512*1024) /* readahead */
+#define CEPH_RSIZE_DEFAULT 0 /* max read size */
+#define CEPH_RASIZE_DEFAULT (8192*1024) /* readahead */
#define CEPH_MAX_READDIR_DEFAULT 1024
#define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024)
#define CEPH_SNAPDIRNAME_DEFAULT ".snap"
@@ -45,8 +46,9 @@ struct ceph_mount_options {
int flags;
int sb_flags;
- int wsize;
- int rsize; /* max readahead */
+ int wsize; /* max write size */
+ int rsize; /* max read size */
+ int rasize; /* max readahead */
int congestion_kb; /* max writeback in flight */
int caps_wanted_delay_min, caps_wanted_delay_max;
int cap_release_safety;
@@ -201,6 +203,7 @@ struct ceph_inode_xattr {
* Ceph dentry state
*/
struct ceph_dentry_info {
+ unsigned long flags;
struct ceph_mds_session *lease_session;
u32 lease_gen, lease_shared_gen;
u32 lease_seq;
@@ -211,6 +214,18 @@ struct ceph_dentry_info {
u64 offset;
};
+/*
+ * dentry flags
+ *
+ * The locking for D_COMPLETE is a bit odd:
+ * - we can clear it at almost any time (see ceph_d_prune)
+ * - it is only meaningful if:
+ * - we hold dir inode i_ceph_lock
+ * - we hold dir FILE_SHARED caps
+ * - the dentry D_COMPLETE is set
+ */
+#define CEPH_D_COMPLETE 1 /* if set, d_u.d_subdirs is complete directory */
+
struct ceph_inode_xattrs_info {
/*
* (still encoded) xattr blob. we avoid the overhead of parsing
@@ -235,6 +250,8 @@ struct ceph_inode_xattrs_info {
struct ceph_inode_info {
struct ceph_vino i_vino; /* ceph ino + snap */
+ spinlock_t i_ceph_lock;
+
u64 i_version;
u32 i_time_warp_seq;
@@ -249,14 +266,14 @@ struct ceph_inode_info {
struct timespec i_rctime;
u64 i_rbytes, i_rfiles, i_rsubdirs;
u64 i_files, i_subdirs;
- u64 i_max_offset; /* largest readdir offset, set with I_COMPLETE */
+ u64 i_max_offset; /* largest readdir offset, set with D_COMPLETE */
struct rb_root i_fragtree;
struct mutex i_fragtree_mutex;
struct ceph_inode_xattrs_info i_xattrs;
- /* capabilities. protected _both_ by i_lock and cap->session's
+ /* capabilities. protected _both_ by i_ceph_lock and cap->session's
* s_mutex. */
struct rb_root i_caps; /* cap list */
struct ceph_cap *i_auth_cap; /* authoritative cap, if any */
@@ -344,9 +361,10 @@ static inline struct ceph_vino ceph_vino(struct inode *inode)
* x86_64+ino32 64 32
* x86_64 64 64
*/
-static inline u32 ceph_ino_to_ino32(ino_t ino)
+static inline u32 ceph_ino_to_ino32(__u64 vino)
{
- ino ^= ino >> (sizeof(ino) * 8 - 32);
+ u32 ino = vino & 0xffffffff;
+ ino ^= vino >> 32;
if (!ino)
ino = 1;
return ino;
@@ -357,11 +375,11 @@ static inline u32 ceph_ino_to_ino32(ino_t ino)
*/
static inline ino_t ceph_vino_to_ino(struct ceph_vino vino)
{
- ino_t ino = (ino_t)vino.ino; /* ^ (vino.snap << 20); */
#if BITS_PER_LONG == 32
- ino = ceph_ino_to_ino32(ino);
+ return ceph_ino_to_ino32(vino.ino);
+#else
+ return (ino_t)vino.ino;
#endif
- return ino;
}
/*
@@ -413,7 +431,6 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
/*
* Ceph inode.
*/
-#define CEPH_I_COMPLETE 1 /* we have complete directory cached */
#define CEPH_I_NODELAY 4 /* do not delay cap release */
#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
@@ -422,18 +439,18 @@ static inline void ceph_i_clear(struct inode *inode, unsigned mask)
{
struct ceph_inode_info *ci = ceph_inode(inode);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
ci->i_ceph_flags &= ~mask;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
static inline void ceph_i_set(struct inode *inode, unsigned mask)
{
struct ceph_inode_info *ci = ceph_inode(inode);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
ci->i_ceph_flags |= mask;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
static inline bool ceph_i_test(struct inode *inode, unsigned mask)
@@ -441,9 +458,9 @@ static inline bool ceph_i_test(struct inode *inode, unsigned mask)
struct ceph_inode_info *ci = ceph_inode(inode);
bool r;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
r = (ci->i_ceph_flags & mask) == mask;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return r;
}
@@ -471,6 +488,13 @@ static inline loff_t ceph_make_fpos(unsigned frag, unsigned off)
}
/*
+ * set/clear directory D_COMPLETE flag
+ */
+void ceph_dir_set_complete(struct inode *inode);
+void ceph_dir_clear_complete(struct inode *inode);
+bool ceph_dir_test_complete(struct inode *inode);
+
+/*
* caps helpers
*/
static inline bool __ceph_is_any_real_caps(struct ceph_inode_info *ci)
@@ -486,9 +510,9 @@ extern int __ceph_caps_issued_other(struct ceph_inode_info *ci,
static inline int ceph_caps_issued(struct ceph_inode_info *ci)
{
int issued;
- spin_lock(&ci->vfs_inode.i_lock);
+ spin_lock(&ci->i_ceph_lock);
issued = __ceph_caps_issued(ci, NULL);
- spin_unlock(&ci->vfs_inode.i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return issued;
}
@@ -496,9 +520,9 @@ static inline int ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask,
int touch)
{
int r;
- spin_lock(&ci->vfs_inode.i_lock);
+ spin_lock(&ci->i_ceph_lock);
r = __ceph_caps_issued_mask(ci, mask, touch);
- spin_unlock(&ci->vfs_inode.i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return r;
}
@@ -721,10 +745,9 @@ extern int ceph_add_cap(struct inode *inode,
extern void __ceph_remove_cap(struct ceph_cap *cap);
static inline void ceph_remove_cap(struct ceph_cap *cap)
{
- struct inode *inode = &cap->ci->vfs_inode;
- spin_lock(&inode->i_lock);
+ spin_lock(&cap->ci->i_ceph_lock);
__ceph_remove_cap(cap);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&cap->ci->i_ceph_lock);
}
extern void ceph_put_cap(struct ceph_mds_client *mdsc,
struct ceph_cap *cap);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 96c6739a0280..a5e36e4488a7 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -343,8 +343,8 @@ void __ceph_destroy_xattrs(struct ceph_inode_info *ci)
}
static int __build_xattrs(struct inode *inode)
- __releases(inode->i_lock)
- __acquires(inode->i_lock)
+ __releases(ci->i_ceph_lock)
+ __acquires(ci->i_ceph_lock)
{
u32 namelen;
u32 numattr = 0;
@@ -372,7 +372,7 @@ start:
end = p + ci->i_xattrs.blob->vec.iov_len;
ceph_decode_32_safe(&p, end, numattr, bad);
xattr_version = ci->i_xattrs.version;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
xattrs = kcalloc(numattr, sizeof(struct ceph_xattr *),
GFP_NOFS);
@@ -387,7 +387,7 @@ start:
goto bad_lock;
}
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (ci->i_xattrs.version != xattr_version) {
/* lost a race, retry */
for (i = 0; i < numattr; i++)
@@ -418,7 +418,7 @@ start:
return err;
bad_lock:
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
bad:
if (xattrs) {
for (i = 0; i < numattr; i++)
@@ -512,7 +512,7 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
if (vxattrs)
vxattr = ceph_match_vxattr(vxattrs, name);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
ci->i_xattrs.version, ci->i_xattrs.index_version);
@@ -520,14 +520,14 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
(ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
goto get_xattr;
} else {
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
/* get xattrs from mds (if we don't already have them) */
err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR);
if (err)
return err;
}
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (vxattr && vxattr->readonly) {
err = vxattr->getxattr_cb(ci, value, size);
@@ -558,7 +558,7 @@ get_xattr:
memcpy(value, xattr->val, xattr->val_len);
out:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return err;
}
@@ -573,7 +573,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
u32 len;
int i;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
dout("listxattr %p ver=%lld index_ver=%lld\n", inode,
ci->i_xattrs.version, ci->i_xattrs.index_version);
@@ -581,13 +581,13 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
(ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
goto list_xattr;
} else {
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR);
if (err)
return err;
}
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
err = __build_xattrs(inode);
if (err < 0)
@@ -619,7 +619,7 @@ list_xattr:
}
out:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return err;
}
@@ -739,7 +739,7 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
if (!xattr)
goto out;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
retry:
issued = __ceph_caps_issued(ci, NULL);
if (!(issued & CEPH_CAP_XATTR_EXCL))
@@ -752,12 +752,12 @@ retry:
required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
struct ceph_buffer *blob = NULL;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
dout(" preaallocating new blob size=%d\n", required_blob_size);
blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
if (!blob)
goto out;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (ci->i_xattrs.prealloc_blob)
ceph_buffer_put(ci->i_xattrs.prealloc_blob);
ci->i_xattrs.prealloc_blob = blob;
@@ -770,13 +770,13 @@ retry:
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
ci->i_xattrs.dirty = true;
inode->i_ctime = CURRENT_TIME;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (dirty)
__mark_inode_dirty(inode, dirty);
return err;
do_sync:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
err = ceph_sync_setxattr(dentry, name, value, size, flags);
out:
kfree(newname);
@@ -833,7 +833,7 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
return -EOPNOTSUPP;
}
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
__build_xattrs(inode);
issued = __ceph_caps_issued(ci, NULL);
dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
@@ -846,12 +846,12 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
ci->i_xattrs.dirty = true;
inode->i_ctime = CURRENT_TIME;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (dirty)
__mark_inode_dirty(inode, dirty);
return err;
do_sync:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
err = ceph_send_removexattr(dentry, name);
return err;
}