diff options
Diffstat (limited to 'fs/ceph')
-rw-r--r-- | fs/ceph/Kconfig | 9 | ||||
-rw-r--r-- | fs/ceph/Makefile | 1 | ||||
-rw-r--r-- | fs/ceph/addr.c | 207 | ||||
-rw-r--r-- | fs/ceph/cache.c | 398 | ||||
-rw-r--r-- | fs/ceph/cache.h | 159 | ||||
-rw-r--r-- | fs/ceph/caps.c | 181 | ||||
-rw-r--r-- | fs/ceph/dir.c | 101 | ||||
-rw-r--r-- | fs/ceph/file.c | 306 | ||||
-rw-r--r-- | fs/ceph/inode.c | 56 | ||||
-rw-r--r-- | fs/ceph/ioctl.c | 12 | ||||
-rw-r--r-- | fs/ceph/locks.c | 4 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 50 | ||||
-rw-r--r-- | fs/ceph/mdsmap.c | 42 | ||||
-rw-r--r-- | fs/ceph/super.c | 37 | ||||
-rw-r--r-- | fs/ceph/super.h | 21 | ||||
-rw-r--r-- | fs/ceph/xattr.c | 9 |
16 files changed, 1280 insertions, 313 deletions
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig index 49bc78243db9..ac9a2ef5bb9b 100644 --- a/fs/ceph/Kconfig +++ b/fs/ceph/Kconfig @@ -16,3 +16,12 @@ config CEPH_FS If unsure, say N. +if CEPH_FS +config CEPH_FSCACHE + bool "Enable Ceph client caching support" + depends on CEPH_FS=m && FSCACHE || CEPH_FS=y && FSCACHE=y + help + Choose Y here to enable persistent, read-only local + caching support for Ceph clients using FS-Cache + +endif diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile index bd352125e829..32e30106a2f0 100644 --- a/fs/ceph/Makefile +++ b/fs/ceph/Makefile @@ -9,3 +9,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \ mds_client.o mdsmap.o strings.o ceph_frag.o \ debugfs.o +ceph-$(CONFIG_CEPH_FSCACHE) += cache.o diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 3e68ac101040..6df8bd481425 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -11,6 +11,7 @@ #include "super.h" #include "mds_client.h" +#include "cache.h" #include <linux/ceph/osd_client.h> /* @@ -70,15 +71,16 @@ static int ceph_set_page_dirty(struct page *page) struct address_space *mapping = page->mapping; struct inode *inode; struct ceph_inode_info *ci; - int undo = 0; struct ceph_snap_context *snapc; + int ret; if (unlikely(!mapping)) return !TestSetPageDirty(page); - if (TestSetPageDirty(page)) { + if (PageDirty(page)) { dout("%p set_page_dirty %p idx %lu -- already dirty\n", mapping->host, page, page->index); + BUG_ON(!PagePrivate(page)); return 0; } @@ -107,35 +109,19 @@ static int ceph_set_page_dirty(struct page *page) snapc, snapc->seq, snapc->num_snaps); spin_unlock(&ci->i_ceph_lock); - /* now adjust page */ - spin_lock_irq(&mapping->tree_lock); - if (page->mapping) { /* Race with truncate? */ - WARN_ON_ONCE(!PageUptodate(page)); - account_page_dirtied(page, page->mapping); - radix_tree_tag_set(&mapping->page_tree, - page_index(page), PAGECACHE_TAG_DIRTY); - - /* - * Reference snap context in page->private. Also set - * PagePrivate so that we get invalidatepage callback. - */ - page->private = (unsigned long)snapc; - SetPagePrivate(page); - } else { - dout("ANON set_page_dirty %p (raced truncate?)\n", page); - undo = 1; - } - - spin_unlock_irq(&mapping->tree_lock); - - if (undo) - /* whoops, we failed to dirty the page */ - ceph_put_wrbuffer_cap_refs(ci, 1, snapc); + /* + * Reference snap context in page->private. Also set + * PagePrivate so that we get invalidatepage callback. + */ + BUG_ON(PagePrivate(page)); + page->private = (unsigned long)snapc; + SetPagePrivate(page); - __mark_inode_dirty(mapping->host, I_DIRTY_PAGES); + ret = __set_page_dirty_nobuffers(page); + WARN_ON(!PageLocked(page)); + WARN_ON(!page->mapping); - BUG_ON(!PageDirty(page)); - return 1; + return ret; } /* @@ -143,17 +129,26 @@ static int ceph_set_page_dirty(struct page *page) * dirty page counters appropriately. Only called if there is private * data on the page. */ -static void ceph_invalidatepage(struct page *page, unsigned long offset) +static void ceph_invalidatepage(struct page *page, unsigned int offset, + unsigned int length) { struct inode *inode; struct ceph_inode_info *ci; struct ceph_snap_context *snapc = page_snap_context(page); - BUG_ON(!PageLocked(page)); - BUG_ON(!PagePrivate(page)); - BUG_ON(!page->mapping); - inode = page->mapping->host; + ci = ceph_inode(inode); + + if (offset != 0 || length != PAGE_CACHE_SIZE) { + dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n", + inode, page, page->index, offset, length); + return; + } + + ceph_invalidate_fscache_page(inode, page); + + if (!PagePrivate(page)) + return; /* * We can get non-dirty pages here due to races between @@ -163,31 +158,28 @@ static void ceph_invalidatepage(struct page *page, unsigned long offset) if (!PageDirty(page)) pr_err("%p invalidatepage %p page not dirty\n", inode, page); - if (offset == 0) - ClearPageChecked(page); + ClearPageChecked(page); - ci = ceph_inode(inode); - if (offset == 0) { - dout("%p invalidatepage %p idx %lu full dirty page %lu\n", - inode, page, page->index, offset); - ceph_put_wrbuffer_cap_refs(ci, 1, snapc); - ceph_put_snap_context(snapc); - page->private = 0; - ClearPagePrivate(page); - } else { - dout("%p invalidatepage %p idx %lu partial dirty page\n", - inode, page, page->index); - } + dout("%p invalidatepage %p idx %lu full dirty page\n", + inode, page, page->index); + + ceph_put_wrbuffer_cap_refs(ci, 1, snapc); + ceph_put_snap_context(snapc); + page->private = 0; + ClearPagePrivate(page); } -/* just a sanity check */ static int ceph_releasepage(struct page *page, gfp_t g) { struct inode *inode = page->mapping ? page->mapping->host : NULL; dout("%p releasepage %p idx %lu\n", inode, page, page->index); WARN_ON(PageDirty(page)); - WARN_ON(PagePrivate(page)); - return 0; + + /* Can we release the page from the cache? */ + if (!ceph_release_fscache_page(page, g)) + return 0; + + return !PagePrivate(page); } /* @@ -197,11 +189,16 @@ static int readpage_nounlock(struct file *filp, struct page *page) { struct inode *inode = file_inode(filp); struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_osd_client *osdc = + struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; int err = 0; u64 len = PAGE_CACHE_SIZE; + err = ceph_readpage_from_fscache(inode, page); + + if (err == 0) + goto out; + dout("readpage inode %p file %p page %p index %lu\n", inode, filp, page, page->index); err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, @@ -219,6 +216,9 @@ static int readpage_nounlock(struct file *filp, struct page *page) } SetPageUptodate(page); + if (err == 0) + ceph_readpage_to_fscache(inode, page); + out: return err < 0 ? err : 0; } @@ -261,6 +261,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) page->index); flush_dcache_page(page); SetPageUptodate(page); + ceph_readpage_to_fscache(inode, page); unlock_page(page); page_cache_release(page); bytes -= PAGE_CACHE_SIZE; @@ -330,11 +331,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) page = list_entry(page_list->prev, struct page, lru); BUG_ON(PageLocked(page)); list_del(&page->lru); - + dout("start_read %p adding %p idx %lu\n", inode, page, page->index); if (add_to_page_cache_lru(page, &inode->i_data, page->index, GFP_NOFS)) { + ceph_fscache_uncache_page(inode, page); page_cache_release(page); dout("start_read %p add_to_page_cache failed %p\n", inode, page); @@ -377,6 +379,12 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, int rc = 0; int max = 0; + rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list, + &nr_pages); + + if (rc == 0) + goto out; + if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE) max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT; @@ -391,6 +399,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, BUG_ON(rc == 0); } out: + ceph_fscache_readpages_cancel(inode, page_list); + dout("readpages %p file %p ret %d\n", inode, file, rc); return rc; } @@ -438,13 +448,12 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) struct ceph_inode_info *ci; struct ceph_fs_client *fsc; struct ceph_osd_client *osdc; - loff_t page_off = page_offset(page); - int len = PAGE_CACHE_SIZE; - loff_t i_size; - int err = 0; struct ceph_snap_context *snapc, *oldest; - u64 snap_size = 0; + loff_t page_off = page_offset(page); long writeback_stat; + u64 truncate_size, snap_size = 0; + u32 truncate_seq; + int err = 0, len = PAGE_CACHE_SIZE; dout("writepage %p idx %lu\n", page, page->index); @@ -474,13 +483,20 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) } ceph_put_snap_context(oldest); + spin_lock(&ci->i_ceph_lock); + truncate_seq = ci->i_truncate_seq; + truncate_size = ci->i_truncate_size; + if (!snap_size) + snap_size = i_size_read(inode); + spin_unlock(&ci->i_ceph_lock); + /* is this a partial page at end of file? */ - if (snap_size) - i_size = snap_size; - else - i_size = i_size_read(inode); - if (i_size < page_off + len) - len = i_size - page_off; + if (page_off >= snap_size) { + dout("%p page eof %llu\n", page, snap_size); + goto out; + } + if (snap_size < page_off + len) + len = snap_size - page_off; dout("writepage %p page %p index %lu on %llu~%u snapc %p\n", inode, page, page->index, page_off, len, snapc); @@ -490,11 +506,13 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb)) set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC); + ceph_readpage_to_fscache(inode, page); + set_page_writeback(page); err = ceph_osdc_writepages(osdc, ceph_vino(inode), &ci->i_layout, snapc, page_off, len, - ci->i_truncate_seq, ci->i_truncate_size, + truncate_seq, truncate_size, &inode->i_mtime, &page, 1); if (err < 0) { dout("writepage setting page/mapping error %d %p\n", err, page); @@ -545,7 +563,6 @@ static void ceph_release_pages(struct page **pages, int num) pagevec_release(&pvec); } - /* * async writeback completion handler. * @@ -631,25 +648,6 @@ static void writepages_finish(struct ceph_osd_request *req, ceph_osdc_put_request(req); } -static struct ceph_osd_request * -ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len, - struct ceph_snap_context *snapc, int num_ops) -{ - struct ceph_fs_client *fsc; - struct ceph_inode_info *ci; - struct ceph_vino vino; - - fsc = ceph_inode_to_client(inode); - ci = ceph_inode(inode); - vino = ceph_vino(inode); - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ - - return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, offset, len, num_ops, CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK, - snapc, ci->i_truncate_seq, ci->i_truncate_size, true); -} - /* * initiate async writeback */ @@ -658,7 +656,8 @@ static int ceph_writepages_start(struct address_space *mapping, { struct inode *inode = mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_fs_client *fsc; + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_vino vino = ceph_vino(inode); pgoff_t index, start, end; int range_whole = 0; int should_loop = 1; @@ -670,22 +669,22 @@ static int ceph_writepages_start(struct address_space *mapping, unsigned wsize = 1 << inode->i_blkbits; struct ceph_osd_request *req = NULL; int do_sync; - u64 snap_size; + u64 truncate_size, snap_size; + u32 truncate_seq; /* * Include a 'sync' in the OSD request if this is a data * integrity write (e.g., O_SYNC write or fsync()), or if our * cap is being revoked. */ - do_sync = wbc->sync_mode == WB_SYNC_ALL; - if (ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER)) + if ((wbc->sync_mode == WB_SYNC_ALL) || + ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER)) do_sync = 1; dout("writepages_start %p dosync=%d (mode=%s)\n", inode, do_sync, wbc->sync_mode == WB_SYNC_NONE ? "NONE" : (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); - fsc = ceph_inode_to_client(inode); if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) { pr_warning("writepage_start %p on forced umount\n", inode); return -EIO; /* we're in a forced umount, don't write! */ @@ -728,6 +727,14 @@ retry: snap_size = i_size_read(inode); dout(" oldest snapc is %p seq %lld (%d snaps)\n", snapc, snapc->seq, snapc->num_snaps); + + spin_lock(&ci->i_ceph_lock); + truncate_seq = ci->i_truncate_seq; + truncate_size = ci->i_truncate_size; + if (!snap_size) + snap_size = i_size_read(inode); + spin_unlock(&ci->i_ceph_lock); + if (last_snapc && snapc != last_snapc) { /* if we switched to a newer snapc, restart our scan at the * start of the original file range. */ @@ -739,7 +746,6 @@ retry: while (!done && index <= end) { int num_ops = do_sync ? 2 : 1; - struct ceph_vino vino; unsigned i; int first; pgoff_t next; @@ -833,17 +839,18 @@ get_more_pages: * that it will use. */ if (locked_pages == 0) { - size_t size; - BUG_ON(pages); - /* prepare async write request */ offset = (u64)page_offset(page); len = wsize; - req = ceph_writepages_osd_request(inode, - offset, &len, snapc, - num_ops); - + req = ceph_osdc_new_request(&fsc->client->osdc, + &ci->i_layout, vino, + offset, &len, num_ops, + CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ONDISK, + snapc, truncate_seq, + truncate_size, true); if (IS_ERR(req)) { rc = PTR_ERR(req); unlock_page(page); @@ -854,8 +861,8 @@ get_more_pages: req->r_inode = inode; max_pages = calc_pages_for(0, (u64)len); - size = max_pages * sizeof (*pages); - pages = kmalloc(size, GFP_NOFS); + pages = kmalloc(max_pages * sizeof (*pages), + GFP_NOFS); if (!pages) { pool = fsc->wb_pagevec_pool; pages = mempool_alloc(pool, GFP_NOFS); diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c new file mode 100644 index 000000000000..6bfe65e0b038 --- /dev/null +++ b/fs/ceph/cache.c @@ -0,0 +1,398 @@ +/* + * Ceph cache definitions. + * + * Copyright (C) 2013 by Adfin Solutions, Inc. All Rights Reserved. + * Written by Milosz Tanski (milosz@adfin.com) + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to: + * Free Software Foundation + * 51 Franklin Street, Fifth Floor + * Boston, MA 02111-1301 USA + * + */ + +#include "super.h" +#include "cache.h" + +struct ceph_aux_inode { + struct timespec mtime; + loff_t size; +}; + +struct fscache_netfs ceph_cache_netfs = { + .name = "ceph", + .version = 0, +}; + +static uint16_t ceph_fscache_session_get_key(const void *cookie_netfs_data, + void *buffer, uint16_t maxbuf) +{ + const struct ceph_fs_client* fsc = cookie_netfs_data; + uint16_t klen; + + klen = sizeof(fsc->client->fsid); + if (klen > maxbuf) + return 0; + + memcpy(buffer, &fsc->client->fsid, klen); + return klen; +} + +static const struct fscache_cookie_def ceph_fscache_fsid_object_def = { + .name = "CEPH.fsid", + .type = FSCACHE_COOKIE_TYPE_INDEX, + .get_key = ceph_fscache_session_get_key, +}; + +int ceph_fscache_register(void) +{ + return fscache_register_netfs(&ceph_cache_netfs); +} + +void ceph_fscache_unregister(void) +{ + fscache_unregister_netfs(&ceph_cache_netfs); +} + +int ceph_fscache_register_fs(struct ceph_fs_client* fsc) +{ + fsc->fscache = fscache_acquire_cookie(ceph_cache_netfs.primary_index, + &ceph_fscache_fsid_object_def, + fsc); + + if (fsc->fscache == NULL) { + pr_err("Unable to resgister fsid: %p fscache cookie", fsc); + return 0; + } + + fsc->revalidate_wq = alloc_workqueue("ceph-revalidate", 0, 1); + if (fsc->revalidate_wq == NULL) + return -ENOMEM; + + return 0; +} + +static uint16_t ceph_fscache_inode_get_key(const void *cookie_netfs_data, + void *buffer, uint16_t maxbuf) +{ + const struct ceph_inode_info* ci = cookie_netfs_data; + uint16_t klen; + + /* use ceph virtual inode (id + snaphot) */ + klen = sizeof(ci->i_vino); + if (klen > maxbuf) + return 0; + + memcpy(buffer, &ci->i_vino, klen); + return klen; +} + +static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data, + void *buffer, uint16_t bufmax) +{ + struct ceph_aux_inode aux; + const struct ceph_inode_info* ci = cookie_netfs_data; + const struct inode* inode = &ci->vfs_inode; + + memset(&aux, 0, sizeof(aux)); + aux.mtime = inode->i_mtime; + aux.size = inode->i_size; + + memcpy(buffer, &aux, sizeof(aux)); + + return sizeof(aux); +} + +static void ceph_fscache_inode_get_attr(const void *cookie_netfs_data, + uint64_t *size) +{ + const struct ceph_inode_info* ci = cookie_netfs_data; + const struct inode* inode = &ci->vfs_inode; + + *size = inode->i_size; +} + +static enum fscache_checkaux ceph_fscache_inode_check_aux( + void *cookie_netfs_data, const void *data, uint16_t dlen) +{ + struct ceph_aux_inode aux; + struct ceph_inode_info* ci = cookie_netfs_data; + struct inode* inode = &ci->vfs_inode; + + if (dlen != sizeof(aux)) + return FSCACHE_CHECKAUX_OBSOLETE; + + memset(&aux, 0, sizeof(aux)); + aux.mtime = inode->i_mtime; + aux.size = inode->i_size; + + if (memcmp(data, &aux, sizeof(aux)) != 0) + return FSCACHE_CHECKAUX_OBSOLETE; + + dout("ceph inode 0x%p cached okay", ci); + return FSCACHE_CHECKAUX_OKAY; +} + +static void ceph_fscache_inode_now_uncached(void* cookie_netfs_data) +{ + struct ceph_inode_info* ci = cookie_netfs_data; + struct pagevec pvec; + pgoff_t first; + int loop, nr_pages; + + pagevec_init(&pvec, 0); + first = 0; + + dout("ceph inode 0x%p now uncached", ci); + + while (1) { + nr_pages = pagevec_lookup(&pvec, ci->vfs_inode.i_mapping, first, + PAGEVEC_SIZE - pagevec_count(&pvec)); + + if (!nr_pages) + break; + + for (loop = 0; loop < nr_pages; loop++) + ClearPageFsCache(pvec.pages[loop]); + + first = pvec.pages[nr_pages - 1]->index + 1; + + pvec.nr = nr_pages; + pagevec_release(&pvec); + cond_resched(); + } +} + +static const struct fscache_cookie_def ceph_fscache_inode_object_def = { + .name = "CEPH.inode", + .type = FSCACHE_COOKIE_TYPE_DATAFILE, + .get_key = ceph_fscache_inode_get_key, + .get_attr = ceph_fscache_inode_get_attr, + .get_aux = ceph_fscache_inode_get_aux, + .check_aux = ceph_fscache_inode_check_aux, + .now_uncached = ceph_fscache_inode_now_uncached, +}; + +void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc, + struct ceph_inode_info* ci) +{ + struct inode* inode = &ci->vfs_inode; + + /* No caching for filesystem */ + if (fsc->fscache == NULL) + return; + + /* Only cache for regular files that are read only */ + if ((ci->vfs_inode.i_mode & S_IFREG) == 0) + return; + + /* Avoid multiple racing open requests */ + mutex_lock(&inode->i_mutex); + + if (ci->fscache) + goto done; + + ci->fscache = fscache_acquire_cookie(fsc->fscache, + &ceph_fscache_inode_object_def, + ci); +done: + mutex_unlock(&inode->i_mutex); + +} + +void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci) +{ + struct fscache_cookie* cookie; + + if ((cookie = ci->fscache) == NULL) + return; + + ci->fscache = NULL; + + fscache_uncache_all_inode_pages(cookie, &ci->vfs_inode); + fscache_relinquish_cookie(cookie, 0); +} + +static void ceph_vfs_readpage_complete(struct page *page, void *data, int error) +{ + if (!error) + SetPageUptodate(page); +} + +static void ceph_vfs_readpage_complete_unlock(struct page *page, void *data, int error) +{ + if (!error) + SetPageUptodate(page); + + unlock_page(page); +} + +static inline int cache_valid(struct ceph_inode_info *ci) +{ + return ((ceph_caps_issued(ci) & CEPH_CAP_FILE_CACHE) && + (ci->i_fscache_gen == ci->i_rdcache_gen)); +} + + +/* Atempt to read from the fscache, + * + * This function is called from the readpage_nounlock context. DO NOT attempt to + * unlock the page here (or in the callback). + */ +int ceph_readpage_from_fscache(struct inode *inode, struct page *page) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + + if (!cache_valid(ci)) + return -ENOBUFS; + + ret = fscache_read_or_alloc_page(ci->fscache, page, + ceph_vfs_readpage_complete, NULL, + GFP_KERNEL); + + switch (ret) { + case 0: /* Page found */ + dout("page read submitted\n"); + return 0; + case -ENOBUFS: /* Pages were not found, and can't be */ + case -ENODATA: /* Pages were not found */ + dout("page/inode not in cache\n"); + return ret; + default: + dout("%s: unknown error ret = %i\n", __func__, ret); + return ret; + } +} + +int ceph_readpages_from_fscache(struct inode *inode, + struct address_space *mapping, + struct list_head *pages, + unsigned *nr_pages) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + + if (!cache_valid(ci)) + return -ENOBUFS; + + ret = fscache_read_or_alloc_pages(ci->fscache, mapping, pages, nr_pages, + ceph_vfs_readpage_complete_unlock, + NULL, mapping_gfp_mask(mapping)); + + switch (ret) { + case 0: /* All pages found */ + dout("all-page read submitted\n"); + return 0; + case -ENOBUFS: /* Some pages were not found, and can't be */ + case -ENODATA: /* some pages were not found */ + dout("page/inode not in cache\n"); + return ret; + default: + dout("%s: unknown error ret = %i\n", __func__, ret); + return ret; + } +} + +void ceph_readpage_to_fscache(struct inode *inode, struct page *page) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + + if (!PageFsCache(page)) + return; + + if (!cache_valid(ci)) + return; + + ret = fscache_write_page(ci->fscache, page, GFP_KERNEL); + if (ret) + fscache_uncache_page(ci->fscache, page); +} + +void ceph_invalidate_fscache_page(struct inode* inode, struct page *page) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + + fscache_wait_on_page_write(ci->fscache, page); + fscache_uncache_page(ci->fscache, page); +} + +void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc) +{ + if (fsc->revalidate_wq) + destroy_workqueue(fsc->revalidate_wq); + + fscache_relinquish_cookie(fsc->fscache, 0); + fsc->fscache = NULL; +} + +static void ceph_revalidate_work(struct work_struct *work) +{ + int issued; + u32 orig_gen; + struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, + i_revalidate_work); + struct inode *inode = &ci->vfs_inode; + + spin_lock(&ci->i_ceph_lock); + issued = __ceph_caps_issued(ci, NULL); + orig_gen = ci->i_rdcache_gen; + spin_unlock(&ci->i_ceph_lock); + + if (!(issued & CEPH_CAP_FILE_CACHE)) { + dout("revalidate_work lost cache before validation %p\n", + inode); + goto out; + } + + if (!fscache_check_consistency(ci->fscache)) + fscache_invalidate(ci->fscache); + + spin_lock(&ci->i_ceph_lock); + /* Update the new valid generation (backwards sanity check too) */ + if (orig_gen > ci->i_fscache_gen) { + ci->i_fscache_gen = orig_gen; + } + spin_unlock(&ci->i_ceph_lock); + +out: + iput(&ci->vfs_inode); +} + +void ceph_queue_revalidate(struct inode *inode) +{ + struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); + struct ceph_inode_info *ci = ceph_inode(inode); + + if (fsc->revalidate_wq == NULL || ci->fscache == NULL) + return; + + ihold(inode); + + if (queue_work(ceph_sb_to_client(inode->i_sb)->revalidate_wq, + &ci->i_revalidate_work)) { + dout("ceph_queue_revalidate %p\n", inode); + } else { + dout("ceph_queue_revalidate %p failed\n)", inode); + iput(inode); + } +} + +void ceph_fscache_inode_init(struct ceph_inode_info *ci) +{ + ci->fscache = NULL; + /* The first load is verifed cookie open time */ + ci->i_fscache_gen = 1; + INIT_WORK(&ci->i_revalidate_work, ceph_revalidate_work); +} diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h new file mode 100644 index 000000000000..ba949408a336 --- /dev/null +++ b/fs/ceph/cache.h @@ -0,0 +1,159 @@ +/* + * Ceph cache definitions. + * + * Copyright (C) 2013 by Adfin Solutions, Inc. All Rights Reserved. + * Written by Milosz Tanski (milosz@adfin.com) + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to: + * Free Software Foundation + * 51 Franklin Street, Fifth Floor + * Boston, MA 02111-1301 USA + * + */ + +#ifndef _CEPH_CACHE_H +#define _CEPH_CACHE_H + +#ifdef CONFIG_CEPH_FSCACHE + +extern struct fscache_netfs ceph_cache_netfs; + +int ceph_fscache_register(void); +void ceph_fscache_unregister(void); + +int ceph_fscache_register_fs(struct ceph_fs_client* fsc); +void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc); + +void ceph_fscache_inode_init(struct ceph_inode_info *ci); +void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc, + struct ceph_inode_info* ci); +void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci); + +int ceph_readpage_from_fscache(struct inode *inode, struct page *page); +int ceph_readpages_from_fscache(struct inode *inode, + struct address_space *mapping, + struct list_head *pages, + unsigned *nr_pages); +void ceph_readpage_to_fscache(struct inode *inode, struct page *page); +void ceph_invalidate_fscache_page(struct inode* inode, struct page *page); +void ceph_queue_revalidate(struct inode *inode); + +static inline void ceph_fscache_invalidate(struct inode *inode) +{ + fscache_invalidate(ceph_inode(inode)->fscache); +} + +static inline void ceph_fscache_uncache_page(struct inode *inode, + struct page *page) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + return fscache_uncache_page(ci->fscache, page); +} + +static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp) +{ + struct inode* inode = page->mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + return fscache_maybe_release_page(ci->fscache, page, gfp); +} + +static inline void ceph_fscache_readpages_cancel(struct inode *inode, + struct list_head *pages) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + return fscache_readpages_cancel(ci->fscache, pages); +} + +#else + +static inline int ceph_fscache_register(void) +{ + return 0; +} + +static inline void ceph_fscache_unregister(void) +{ +} + +static inline int ceph_fscache_register_fs(struct ceph_fs_client* fsc) +{ + return 0; +} + +static inline void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc) +{ +} + +static inline void ceph_fscache_inode_init(struct ceph_inode_info *ci) +{ +} + +static inline void ceph_fscache_register_inode_cookie(struct ceph_fs_client* parent_fsc, + struct ceph_inode_info* ci) +{ +} + +static inline void ceph_fscache_uncache_page(struct inode *inode, + struct page *pages) +{ +} + +static inline int ceph_readpage_from_fscache(struct inode* inode, + struct page *page) +{ + return -ENOBUFS; +} + +static inline int ceph_readpages_from_fscache(struct inode *inode, + struct address_space *mapping, + struct list_head *pages, + unsigned *nr_pages) +{ + return -ENOBUFS; +} + +static inline void ceph_readpage_to_fscache(struct inode *inode, + struct page *page) +{ +} + +static inline void ceph_fscache_invalidate(struct inode *inode) +{ +} + +static inline void ceph_invalidate_fscache_page(struct inode *inode, + struct page *page) +{ +} + +static inline void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci) +{ +} + +static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp) +{ + return 1; +} + +static inline void ceph_fscache_readpages_cancel(struct inode *inode, + struct list_head *pages) +{ +} + +static inline void ceph_queue_revalidate(struct inode *inode) +{ +} + +#endif + +#endif diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index da0f9b8a3bcb..13976c33332e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -10,6 +10,7 @@ #include "super.h" #include "mds_client.h" +#include "cache.h" #include <linux/ceph/decode.h> #include <linux/ceph/messenger.h> @@ -147,7 +148,7 @@ void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta) spin_unlock(&mdsc->caps_list_lock); } -int ceph_reserve_caps(struct ceph_mds_client *mdsc, +void ceph_reserve_caps(struct ceph_mds_client *mdsc, struct ceph_cap_reservation *ctx, int need) { int i; @@ -155,7 +156,6 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc, int have; int alloc = 0; LIST_HEAD(newcaps); - int ret = 0; dout("reserve caps ctx=%p need=%d\n", ctx, need); @@ -174,14 +174,15 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc, for (i = have; i < need; i++) { cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); - if (!cap) { - ret = -ENOMEM; - goto out_alloc_count; - } + if (!cap) + break; list_add(&cap->caps_item, &newcaps); alloc++; } - BUG_ON(have + alloc != need); + /* we didn't manage to reserve as much as we needed */ + if (have + alloc != need) + pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n", + ctx, need, have + alloc); spin_lock(&mdsc->caps_list_lock); mdsc->caps_total_count += alloc; @@ -197,13 +198,6 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc, dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", ctx, mdsc->caps_total_count, mdsc->caps_use_count, mdsc->caps_reserve_count, mdsc->caps_avail_count); - return 0; - -out_alloc_count: - /* we didn't manage to reserve as much as we needed */ - pr_warning("reserve caps ctx=%p ENOMEM need=%d got=%d\n", - ctx, need, have); - return ret; } int ceph_unreserve_caps(struct ceph_mds_client *mdsc, @@ -486,8 +480,9 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, * i_rdcache_gen. */ if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && - (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) + (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) { ci->i_rdcache_gen++; + } /* * if we are newly issued FILE_SHARED, mark dir not complete; we @@ -612,9 +607,11 @@ retry: __cap_delay_requeue(mdsc, ci); } - if (flags & CEPH_CAP_FLAG_AUTH) - ci->i_auth_cap = cap; - else if (ci->i_auth_cap == cap) { + if (flags & CEPH_CAP_FLAG_AUTH) { + if (ci->i_auth_cap == NULL || + ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) + ci->i_auth_cap = cap; + } else if (ci->i_auth_cap == cap) { ci->i_auth_cap = NULL; spin_lock(&mdsc->cap_dirty_lock); if (!list_empty(&ci->i_dirty_item)) { @@ -695,6 +692,15 @@ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) if (implemented) *implemented |= cap->implemented; } + /* + * exclude caps issued by non-auth MDS, but are been revoking + * by the auth MDS. The non-auth MDS should be revoking/exporting + * these caps, but the message is delayed. + */ + if (ci->i_auth_cap) { + cap = ci->i_auth_cap; + have &= ~cap->implemented | cap->issued; + } return have; } @@ -802,22 +808,28 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch) /* * Return true if mask caps are currently being revoked by an MDS. */ -int ceph_caps_revoking(struct ceph_inode_info *ci, int mask) +int __ceph_caps_revoking_other(struct ceph_inode_info *ci, + struct ceph_cap *ocap, int mask) { - struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; struct rb_node *p; - int ret = 0; - spin_lock(&ci->i_ceph_lock); for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { cap = rb_entry(p, struct ceph_cap, ci_node); - if (__cap_is_valid(cap) && - (cap->implemented & ~cap->issued & mask)) { - ret = 1; - break; - } + if (cap != ocap && __cap_is_valid(cap) && + (cap->implemented & ~cap->issued & mask)) + return 1; } + return 0; +} + +int ceph_caps_revoking(struct ceph_inode_info *ci, int mask) +{ + struct inode *inode = &ci->vfs_inode; + int ret; + + spin_lock(&ci->i_ceph_lock); + ret = __ceph_caps_revoking_other(ci, NULL, mask); spin_unlock(&ci->i_ceph_lock); dout("ceph_caps_revoking %p %s = %d\n", inode, ceph_cap_string(mask), ret); @@ -1980,8 +1992,15 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, cap = ci->i_auth_cap; dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode, ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq); + __ceph_flush_snaps(ci, &session, 1); + if (ci->i_flushing_caps) { + spin_lock(&mdsc->cap_dirty_lock); + list_move_tail(&ci->i_flushing_item, + &cap->session->s_cap_flushing); + spin_unlock(&mdsc->cap_dirty_lock); + delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, __ceph_caps_used(ci), __ceph_caps_wanted(ci), @@ -2055,15 +2074,17 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, /* finish pending truncate */ while (ci->i_truncate_pending) { spin_unlock(&ci->i_ceph_lock); - __ceph_do_pending_vmtruncate(inode, !(need & CEPH_CAP_FILE_WR)); + __ceph_do_pending_vmtruncate(inode); spin_lock(&ci->i_ceph_lock); } - if (need & CEPH_CAP_FILE_WR) { + have = __ceph_caps_issued(ci, &implemented); + + if (have & need & CEPH_CAP_FILE_WR) { if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) { dout("get_cap_refs %p endoff %llu > maxsize %llu\n", inode, endoff, ci->i_max_size); - if (endoff > ci->i_wanted_max_size) { + if (endoff > ci->i_requested_max_size) { *check_max = 1; ret = 1; } @@ -2078,7 +2099,6 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, goto out; } } - have = __ceph_caps_issued(ci, &implemented); if ((have & need) == need) { /* @@ -2120,14 +2140,17 @@ static void check_max_size(struct inode *inode, loff_t endoff) /* do we need to explicitly request a larger max_size? */ spin_lock(&ci->i_ceph_lock); - if ((endoff >= ci->i_max_size || - endoff > (inode->i_size << 1)) && - endoff > ci->i_wanted_max_size) { + if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) { dout("write %p at large endoff %llu, req max_size\n", inode, endoff); ci->i_wanted_max_size = endoff; - check = 1; } + /* duplicate ceph_check_caps()'s logic */ + if (ci->i_auth_cap && + (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) && + ci->i_wanted_max_size > ci->i_max_size && + ci->i_wanted_max_size > ci->i_requested_max_size) + check = 1; spin_unlock(&ci->i_ceph_lock); if (check) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); @@ -2313,6 +2336,38 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, } /* + * Invalidate unlinked inode's aliases, so we can drop the inode ASAP. + */ +static void invalidate_aliases(struct inode *inode) +{ + struct dentry *dn, *prev = NULL; + + dout("invalidate_aliases inode %p\n", inode); + d_prune_aliases(inode); + /* + * For non-directory inode, d_find_alias() only returns + * connected dentry. After calling d_invalidate(), the + * dentry become disconnected. + * + * For directory inode, d_find_alias() can return + * disconnected dentry. But directory inode should have + * one alias at most. + */ + while ((dn = d_find_alias(inode))) { + if (dn == prev) { + dput(dn); + break; + } + d_invalidate(dn); + if (prev) + dput(prev); + prev = dn; + } + if (prev) + dput(prev); +} + +/* * Handle a cap GRANT message from the MDS. (Note that a GRANT may * actually be a revocation if it specifies a smaller cap set.) * @@ -2340,8 +2395,9 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, int check_caps = 0; int wake = 0; int writeback = 0; - int revoked_rdcache = 0; int queue_invalidate = 0; + int deleted_inode = 0; + int queue_revalidate = 0; dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", inode, cap, mds, seq, ceph_cap_string(newcaps)); @@ -2356,9 +2412,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && !ci->i_wrbuffer_ref) { - if (try_nonblocking_invalidate(inode) == 0) { - revoked_rdcache = 1; - } else { + if (try_nonblocking_invalidate(inode)) { /* there were locked pages.. invalidate later in a separate thread. */ if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { @@ -2366,6 +2420,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ci->i_rdcache_revoking = ci->i_rdcache_gen; } } + + ceph_fscache_invalidate(inode); } /* side effects now are allowed */ @@ -2386,8 +2442,12 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, from_kgid(&init_user_ns, inode->i_gid)); } - if ((issued & CEPH_CAP_LINK_EXCL) == 0) + if ((issued & CEPH_CAP_LINK_EXCL) == 0) { set_nlink(inode, le32_to_cpu(grant->nlink)); + if (inode->i_nlink == 0 && + (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) + deleted_inode = 1; + } if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) { int len = le32_to_cpu(grant->xattr_len); @@ -2403,6 +2463,11 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, } } + /* Do we need to revalidate our fscache cookie. Don't bother on the + * first cache cap as we already validate at cookie creation time. */ + if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1) + queue_revalidate = 1; + /* size/ctime/mtime/atime? */ ceph_fill_file_size(inode, issued, le32_to_cpu(grant->truncate_seq), @@ -2473,6 +2538,11 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, } else { dout("grant: %s -> %s\n", ceph_cap_string(cap->issued), ceph_cap_string(newcaps)); + /* non-auth MDS is revoking the newly grant caps ? */ + if (cap == ci->i_auth_cap && + __ceph_caps_revoking_other(ci, cap, newcaps)) + check_caps = 2; + cap->issued = newcaps; cap->implemented |= newcaps; /* add bits only, to * avoid stepping on a @@ -2482,6 +2552,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, BUG_ON(cap->issued & ~cap->implemented); spin_unlock(&ci->i_ceph_lock); + if (writeback) /* * queue inode for writeback: we can't actually call @@ -2491,6 +2562,10 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ceph_queue_writeback(inode); if (queue_invalidate) ceph_queue_invalidate(inode); + if (deleted_inode) + invalidate_aliases(inode); + if (queue_revalidate) + ceph_queue_revalidate(inode); if (wake) wake_up_all(&ci->i_cap_wq); @@ -2647,8 +2722,10 @@ static void handle_cap_trunc(struct inode *inode, truncate_seq, truncate_size, size); spin_unlock(&ci->i_ceph_lock); - if (queue_trunc) + if (queue_trunc) { ceph_queue_vmtruncate(inode); + ceph_fscache_invalidate(inode); + } } /* @@ -3042,21 +3119,19 @@ int ceph_encode_inode_release(void **p, struct inode *inode, (cap->issued & unless) == 0)) { if ((cap->issued & drop) && (cap->issued & unless) == 0) { - dout("encode_inode_release %p cap %p %s -> " - "%s\n", inode, cap, + int wanted = __ceph_caps_wanted(ci); + if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0) + wanted |= cap->mds_wanted; + dout("encode_inode_release %p cap %p " + "%s -> %s, wanted %s -> %s\n", inode, cap, ceph_cap_string(cap->issued), - ceph_cap_string(cap->issued & ~drop)); + ceph_cap_string(cap->issued & ~drop), + ceph_cap_string(cap->mds_wanted), + ceph_cap_string(wanted)); + cap->issued &= ~drop; cap->implemented &= ~drop; - if (ci->i_ceph_flags & CEPH_I_NODELAY) { - int wanted = __ceph_caps_wanted(ci); - dout(" wanted %s -> %s (act %s)\n", - ceph_cap_string(cap->mds_wanted), - ceph_cap_string(cap->mds_wanted & - ~wanted), - ceph_cap_string(wanted)); - cap->mds_wanted &= wanted; - } + cap->mds_wanted = wanted; } else { dout("encode_inode_release %p cap %p %s" " (force)\n", inode, cap, diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index f02d82b7933e..868b61d56cac 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -111,11 +111,10 @@ static unsigned fpos_off(loff_t p) * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by * the MDS if/when the directory is modified). */ -static int __dcache_readdir(struct file *filp, - void *dirent, filldir_t filldir) +static int __dcache_readdir(struct file *file, struct dir_context *ctx) { - struct ceph_file_info *fi = filp->private_data; - struct dentry *parent = filp->f_dentry; + struct ceph_file_info *fi = file->private_data; + struct dentry *parent = file->f_dentry; struct inode *dir = parent->d_inode; struct list_head *p; struct dentry *dentry, *last; @@ -126,14 +125,14 @@ static int __dcache_readdir(struct file *filp, last = fi->dentry; fi->dentry = NULL; - dout("__dcache_readdir %p at %llu (last %p)\n", dir, filp->f_pos, + dout("__dcache_readdir %p at %llu (last %p)\n", dir, ctx->pos, last); spin_lock(&parent->d_lock); /* start at beginning? */ - if (filp->f_pos == 2 || last == NULL || - filp->f_pos < ceph_dentry(last)->offset) { + if (ctx->pos == 2 || last == NULL || + ctx->pos < ceph_dentry(last)->offset) { if (list_empty(&parent->d_subdirs)) goto out_unlock; p = parent->d_subdirs.prev; @@ -157,11 +156,11 @@ more: if (!d_unhashed(dentry) && dentry->d_inode && ceph_snap(dentry->d_inode) != CEPH_SNAPDIR && ceph_ino(dentry->d_inode) != CEPH_INO_CEPH && - filp->f_pos <= di->offset) + ctx->pos <= di->offset) break; dout(" skipping %p %.*s at %llu (%llu)%s%s\n", dentry, dentry->d_name.len, dentry->d_name.name, di->offset, - filp->f_pos, d_unhashed(dentry) ? " unhashed" : "", + ctx->pos, d_unhashed(dentry) ? " unhashed" : "", !dentry->d_inode ? " null" : ""); spin_unlock(&dentry->d_lock); p = p->prev; @@ -173,29 +172,27 @@ more: spin_unlock(&dentry->d_lock); spin_unlock(&parent->d_lock); - dout(" %llu (%llu) dentry %p %.*s %p\n", di->offset, filp->f_pos, + dout(" %llu (%llu) dentry %p %.*s %p\n", di->offset, ctx->pos, dentry, dentry->d_name.len, dentry->d_name.name, dentry->d_inode); - filp->f_pos = di->offset; - err = filldir(dirent, dentry->d_name.name, - dentry->d_name.len, di->offset, + ctx->pos = di->offset; + if (!dir_emit(ctx, dentry->d_name.name, + dentry->d_name.len, ceph_translate_ino(dentry->d_sb, dentry->d_inode->i_ino), - dentry->d_inode->i_mode >> 12); - - if (last) { - if (err < 0) { + dentry->d_inode->i_mode >> 12)) { + if (last) { /* remember our position */ fi->dentry = last; fi->next_offset = di->offset; - } else { - dput(last); } + dput(dentry); + return 0; } - last = dentry; - if (err < 0) - goto out; + if (last) + dput(last); + last = dentry; - filp->f_pos++; + ctx->pos++; /* make sure a dentry wasn't dropped while we didn't have parent lock */ if (!ceph_dir_is_complete(dir)) { @@ -235,59 +232,59 @@ static int note_last_dentry(struct ceph_file_info *fi, const char *name, return 0; } -static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir) +static int ceph_readdir(struct file *file, struct dir_context *ctx) { - struct ceph_file_info *fi = filp->private_data; - struct inode *inode = file_inode(filp); + struct ceph_file_info *fi = file->private_data; + struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_mds_client *mdsc = fsc->mdsc; - unsigned frag = fpos_frag(filp->f_pos); - int off = fpos_off(filp->f_pos); + unsigned frag = fpos_frag(ctx->pos); + int off = fpos_off(ctx->pos); int err; u32 ftype; struct ceph_mds_reply_info_parsed *rinfo; const int max_entries = fsc->mount_options->max_readdir; const int max_bytes = fsc->mount_options->max_readdir_bytes; - dout("readdir %p filp %p frag %u off %u\n", inode, filp, frag, off); + dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off); if (fi->flags & CEPH_F_ATEND) return 0; /* always start with . and .. */ - if (filp->f_pos == 0) { + if (ctx->pos == 0) { /* note dir version at start of readdir so we can tell * if any dentries get dropped */ fi->dir_release_count = atomic_read(&ci->i_release_count); dout("readdir off 0 -> '.'\n"); - if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0), + if (!dir_emit(ctx, ".", 1, ceph_translate_ino(inode->i_sb, inode->i_ino), - inode->i_mode >> 12) < 0) + inode->i_mode >> 12)) return 0; - filp->f_pos = 1; + ctx->pos = 1; off = 1; } - if (filp->f_pos == 1) { - ino_t ino = parent_ino(filp->f_dentry); + if (ctx->pos == 1) { + ino_t ino = parent_ino(file->f_dentry); dout("readdir off 1 -> '..'\n"); - if (filldir(dirent, "..", 2, ceph_make_fpos(0, 1), + if (!dir_emit(ctx, "..", 2, ceph_translate_ino(inode->i_sb, ino), - inode->i_mode >> 12) < 0) + inode->i_mode >> 12)) return 0; - filp->f_pos = 2; + ctx->pos = 2; off = 2; } /* can we use the dcache? */ spin_lock(&ci->i_ceph_lock); - if ((filp->f_pos == 2 || fi->dentry) && + if ((ctx->pos == 2 || fi->dentry) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && ceph_snap(inode) != CEPH_SNAPDIR && __ceph_dir_is_complete(ci) && __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { spin_unlock(&ci->i_ceph_lock); - err = __dcache_readdir(filp, dirent, filldir); + err = __dcache_readdir(file, ctx); if (err != -EAGAIN) return err; } else { @@ -327,7 +324,7 @@ more: return PTR_ERR(req); req->r_inode = inode; ihold(inode); - req->r_dentry = dget(filp->f_dentry); + req->r_dentry = dget(file->f_dentry); /* hints to request -> mds selection code */ req->r_direct_mode = USE_AUTH_MDS; req->r_direct_hash = ceph_frag_value(frag); @@ -379,15 +376,16 @@ more: rinfo = &fi->last_readdir->r_reply_info; dout("readdir frag %x num %d off %d chunkoff %d\n", frag, rinfo->dir_nr, off, fi->offset); + + ctx->pos = ceph_make_fpos(frag, off); while (off >= fi->offset && off - fi->offset < rinfo->dir_nr) { - u64 pos = ceph_make_fpos(frag, off); struct ceph_mds_reply_inode *in = rinfo->dir_in[off - fi->offset].in; struct ceph_vino vino; ino_t ino; dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n", - off, off - fi->offset, rinfo->dir_nr, pos, + off, off - fi->offset, rinfo->dir_nr, ctx->pos, rinfo->dir_dname_len[off - fi->offset], rinfo->dir_dname[off - fi->offset], in); BUG_ON(!in); @@ -395,16 +393,15 @@ more: vino.ino = le64_to_cpu(in->ino); vino.snap = le64_to_cpu(in->snapid); ino = ceph_vino_to_ino(vino); - if (filldir(dirent, + if (!dir_emit(ctx, rinfo->dir_dname[off - fi->offset], rinfo->dir_dname_len[off - fi->offset], - pos, - ceph_translate_ino(inode->i_sb, ino), ftype) < 0) { + ceph_translate_ino(inode->i_sb, ino), ftype)) { dout("filldir stopping us...\n"); return 0; } off++; - filp->f_pos = pos + 1; + ctx->pos++; } if (fi->last_name) { @@ -417,7 +414,7 @@ more: if (!ceph_frag_is_rightmost(frag)) { frag = ceph_frag_next(frag); off = 0; - filp->f_pos = ceph_make_fpos(frag, off); + ctx->pos = ceph_make_fpos(frag, off); dout("readdir next frag is %x\n", frag); goto more; } @@ -432,11 +429,11 @@ more: if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { dout(" marking %p complete\n", inode); __ceph_dir_set_complete(ci, fi->dir_release_count); - ci->i_max_offset = filp->f_pos; + ci->i_max_offset = ctx->pos; } spin_unlock(&ci->i_ceph_lock); - dout("readdir %p filp %p done.\n", inode, filp); + dout("readdir %p file %p done.\n", inode, file); return 0; } @@ -796,6 +793,8 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, req->r_locked_dir = dir; req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; + /* release LINK_SHARED on source inode (mds will lock it) */ + req->r_old_inode_drop = CEPH_CAP_LINK_SHARED; err = ceph_mdsc_do_request(mdsc, dir, req); if (err) { d_drop(dentry); @@ -1268,7 +1267,7 @@ unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn) const struct file_operations ceph_dir_fops = { .read = ceph_read_dir, - .readdir = ceph_readdir, + .iterate = ceph_readdir, .llseek = ceph_dir_llseek, .open = ceph_open, .release = ceph_release, diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 656e16907430..3de89829e2a1 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -8,9 +8,11 @@ #include <linux/namei.h> #include <linux/writeback.h> #include <linux/aio.h> +#include <linux/falloc.h> #include "super.h" #include "mds_client.h" +#include "cache.h" /* * Ceph file operations @@ -68,9 +70,23 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) { struct ceph_file_info *cf; int ret = 0; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); + struct ceph_mds_client *mdsc = fsc->mdsc; switch (inode->i_mode & S_IFMT) { case S_IFREG: + /* First file open request creates the cookie, we want to keep + * this cookie around for the filetime of the inode as not to + * have to worry about fscache register / revoke / operation + * races. + * + * Also, if we know the operation is going to invalidate data + * (non readonly) just nuke the cache right away. + */ + ceph_fscache_register_inode_cookie(mdsc->fsc, ci); + if ((fmode & CEPH_FILE_MODE_WR)) + ceph_fscache_invalidate(inode); case S_IFDIR: dout("init_file %p %p 0%o (regular)\n", inode, file, inode->i_mode); @@ -181,6 +197,7 @@ int ceph_open(struct inode *inode, struct file *file) spin_unlock(&ci->i_ceph_lock); return ceph_init_file(inode, file, fmode); } + spin_unlock(&ci->i_ceph_lock); dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted)); @@ -191,6 +208,7 @@ int ceph_open(struct inode *inode, struct file *file) } req->r_inode = inode; ihold(inode); + req->r_num_caps = 1; if (flags & (O_CREAT|O_TRUNC)) parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); @@ -313,9 +331,9 @@ static int striped_read(struct inode *inode, { struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); - u64 pos, this_len; + u64 pos, this_len, left; int io_align, page_align; - int left, pages_left; + int pages_left; int read; struct page **page_pos; int ret; @@ -346,47 +364,40 @@ more: ret = 0; hit_stripe = this_len < left; was_short = ret >= 0 && ret < this_len; - dout("striped_read %llu~%u (read %u) got %d%s%s\n", pos, left, read, + dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read, ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); - if (ret > 0) { - int didpages = (page_align + ret) >> PAGE_CACHE_SHIFT; - - if (read < pos - off) { - dout(" zero gap %llu to %llu\n", off + read, pos); - ceph_zero_page_vector_range(page_align + read, - pos - off - read, pages); + if (ret >= 0) { + int didpages; + if (was_short && (pos + ret < inode->i_size)) { + u64 tmp = min(this_len - ret, + inode->i_size - pos - ret); + dout(" zero gap %llu to %llu\n", + pos + ret, pos + ret + tmp); + ceph_zero_page_vector_range(page_align + read + ret, + tmp, pages); + ret += tmp; } + + didpages = (page_align + ret) >> PAGE_CACHE_SHIFT; pos += ret; read = pos - off; left -= ret; page_pos += didpages; pages_left -= didpages; - /* hit stripe? */ - if (left && hit_stripe) + /* hit stripe and need continue*/ + if (left && hit_stripe && pos < inode->i_size) goto more; } - if (was_short) { + if (read > 0) { + ret = read; /* did we bounce off eof? */ if (pos + left > inode->i_size) *checkeof = 1; - - /* zero trailing bytes (inside i_size) */ - if (left > 0 && pos < inode->i_size) { - if (pos + left > inode->i_size) - left = inode->i_size - pos; - - dout("zero tail %d\n", left); - ceph_zero_page_vector_range(page_align + read, left, - pages); - read += left; - } } - if (ret >= 0) - ret = read; dout("striped_read returns %d\n", ret); return ret; } @@ -618,6 +629,8 @@ out: if (check_caps) ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL); + } else if (ret != -EOLDSNAPC && written > 0) { + ret = written; } return ret; } @@ -659,7 +672,6 @@ again: if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || - (inode->i_sb->s_flags & MS_SYNCHRONOUS) || (fi->flags & CEPH_F_SYNC)) /* hmm, this isn't really async... */ ret = ceph_sync_read(filp, base, len, ppos, &checkeof); @@ -711,14 +723,11 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov, &ceph_sb_to_client(inode->i_sb)->client->osdc; ssize_t count, written = 0; int err, want, got; - bool hold_mutex; if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; - sb_start_write(inode->i_sb); mutex_lock(&inode->i_mutex); - hold_mutex = true; err = generic_segment_checks(iov, &nr_segs, &count, VERIFY_READ); if (err) @@ -764,18 +773,31 @@ retry_snap: if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || - (inode->i_sb->s_flags & MS_SYNCHRONOUS) || (fi->flags & CEPH_F_SYNC)) { mutex_unlock(&inode->i_mutex); written = ceph_sync_write(file, iov->iov_base, count, pos, &iocb->ki_pos); + if (written == -EOLDSNAPC) { + dout("aio_write %p %llx.%llx %llu~%u" + "got EOLDSNAPC, retrying\n", + inode, ceph_vinop(inode), + pos, (unsigned)iov->iov_len); + mutex_lock(&inode->i_mutex); + goto retry_snap; + } } else { + /* + * No need to acquire the i_truncate_mutex. Because + * the MDS revokes Fwb caps before sending truncate + * message to us. We can't get Fwb cap while there + * are pending vmtruncate. So write and vmtruncate + * can not run at the same time + */ written = generic_file_buffered_write(iocb, iov, nr_segs, pos, &iocb->ki_pos, count, 0); mutex_unlock(&inode->i_mutex); } - hold_mutex = false; if (written >= 0) { int dirty; @@ -799,19 +821,12 @@ retry_snap: written = err; } - if (written == -EOLDSNAPC) { - dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n", - inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len); - mutex_lock(&inode->i_mutex); - hold_mutex = true; - goto retry_snap; - } + goto out_unlocked; + out: - if (hold_mutex) - mutex_unlock(&inode->i_mutex); - sb_end_write(inode->i_sb); + mutex_unlock(&inode->i_mutex); +out_unlocked: current->backing_dev_info = NULL; - return written ? written : err; } @@ -824,7 +839,6 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) int ret; mutex_lock(&inode->i_mutex); - __ceph_do_pending_vmtruncate(inode, false); if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) { ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); @@ -866,20 +880,209 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) break; } - if (offset < 0 || offset > inode->i_sb->s_maxbytes) { - offset = -EINVAL; + offset = vfs_setpos(file, offset, inode->i_sb->s_maxbytes); + +out: + mutex_unlock(&inode->i_mutex); + return offset; +} + +static inline void ceph_zero_partial_page( + struct inode *inode, loff_t offset, unsigned size) +{ + struct page *page; + pgoff_t index = offset >> PAGE_CACHE_SHIFT; + + page = find_lock_page(inode->i_mapping, index); + if (page) { + wait_on_page_writeback(page); + zero_user(page, offset & (PAGE_CACHE_SIZE - 1), size); + unlock_page(page); + page_cache_release(page); + } +} + +static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset, + loff_t length) +{ + loff_t nearly = round_up(offset, PAGE_CACHE_SIZE); + if (offset < nearly) { + loff_t size = nearly - offset; + if (length < size) + size = length; + ceph_zero_partial_page(inode, offset, size); + offset += size; + length -= size; + } + if (length >= PAGE_CACHE_SIZE) { + loff_t size = round_down(length, PAGE_CACHE_SIZE); + truncate_pagecache_range(inode, offset, offset + size - 1); + offset += size; + length -= size; + } + if (length) + ceph_zero_partial_page(inode, offset, length); +} + +static int ceph_zero_partial_object(struct inode *inode, + loff_t offset, loff_t *length) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_osd_request *req; + int ret = 0; + loff_t zero = 0; + int op; + + if (!length) { + op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE; + length = &zero; + } else { + op = CEPH_OSD_OP_ZERO; + } + + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + ceph_vino(inode), + offset, length, + 1, op, + CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ONDISK, + NULL, 0, 0, false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); goto out; } - /* Special lock needed here? */ - if (offset != file->f_pos) { - file->f_pos = offset; - file->f_version = 0; + ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap, + &inode->i_mtime); + + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) { + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + if (ret == -ENOENT) + ret = 0; } + ceph_osdc_put_request(req); out: + return ret; +} + +static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length) +{ + int ret = 0; + struct ceph_inode_info *ci = ceph_inode(inode); + s32 stripe_unit = ceph_file_layout_su(ci->i_layout); + s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout); + s32 object_size = ceph_file_layout_object_size(ci->i_layout); + u64 object_set_size = object_size * stripe_count; + u64 nearly, t; + + /* round offset up to next period boundary */ + nearly = offset + object_set_size - 1; + t = nearly; + nearly -= do_div(t, object_set_size); + + while (length && offset < nearly) { + loff_t size = length; + ret = ceph_zero_partial_object(inode, offset, &size); + if (ret < 0) + return ret; + offset += size; + length -= size; + } + while (length >= object_set_size) { + int i; + loff_t pos = offset; + for (i = 0; i < stripe_count; ++i) { + ret = ceph_zero_partial_object(inode, pos, NULL); + if (ret < 0) + return ret; + pos += stripe_unit; + } + offset += object_set_size; + length -= object_set_size; + } + while (length) { + loff_t size = length; + ret = ceph_zero_partial_object(inode, offset, &size); + if (ret < 0) + return ret; + offset += size; + length -= size; + } + return ret; +} + +static long ceph_fallocate(struct file *file, int mode, + loff_t offset, loff_t length) +{ + struct ceph_file_info *fi = file->private_data; + struct inode *inode = file->f_dentry->d_inode; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_osd_client *osdc = + &ceph_inode_to_client(inode)->client->osdc; + int want, got = 0; + int dirty; + int ret = 0; + loff_t endoff = 0; + loff_t size; + + if (!S_ISREG(inode->i_mode)) + return -EOPNOTSUPP; + + if (IS_SWAPFILE(inode)) + return -ETXTBSY; + + mutex_lock(&inode->i_mutex); + + if (ceph_snap(inode) != CEPH_NOSNAP) { + ret = -EROFS; + goto unlock; + } + + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) && + !(mode & FALLOC_FL_PUNCH_HOLE)) { + ret = -ENOSPC; + goto unlock; + } + + size = i_size_read(inode); + if (!(mode & FALLOC_FL_KEEP_SIZE)) + endoff = offset + length; + + if (fi->fmode & CEPH_FILE_MODE_LAZY) + want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; + else + want = CEPH_CAP_FILE_BUFFER; + + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); + if (ret < 0) + goto unlock; + + if (mode & FALLOC_FL_PUNCH_HOLE) { + if (offset < size) + ceph_zero_pagecache_range(inode, offset, length); + ret = ceph_zero_objects(inode, offset, length); + } else if (endoff > size) { + truncate_pagecache_range(inode, size, -1); + if (ceph_inode_set_size(inode, endoff)) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, NULL); + } + + if (!ret) { + spin_lock(&ci->i_ceph_lock); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + spin_unlock(&ci->i_ceph_lock); + if (dirty) + __mark_inode_dirty(inode, dirty); + } + + ceph_put_cap_refs(ci, got); +unlock: mutex_unlock(&inode->i_mutex); - return offset; + return ret; } const struct file_operations ceph_file_fops = { @@ -898,5 +1101,6 @@ const struct file_operations ceph_file_fops = { .splice_write = generic_file_splice_write, .unlocked_ioctl = ceph_ioctl, .compat_ioctl = ceph_ioctl, + .fallocate = ceph_fallocate, }; diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index be0f7e20d62e..8549a48115f7 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -12,6 +12,7 @@ #include "super.h" #include "mds_client.h" +#include "cache.h" #include <linux/ceph/decode.h> /* @@ -344,6 +345,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) for (i = 0; i < CEPH_FILE_MODE_NUM; i++) ci->i_nr_by_mode[i] = 0; + mutex_init(&ci->i_truncate_mutex); ci->i_truncate_seq = 0; ci->i_truncate_size = 0; ci->i_truncate_pending = 0; @@ -377,6 +379,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb) INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work); + ceph_fscache_inode_init(ci); + return &ci->vfs_inode; } @@ -396,6 +400,8 @@ void ceph_destroy_inode(struct inode *inode) dout("destroy_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode)); + ceph_fscache_unregister_inode_cookie(ci); + ceph_queue_caps_release(inode); /* @@ -430,7 +436,6 @@ void ceph_destroy_inode(struct inode *inode) call_rcu(&inode->i_rcu, ceph_i_callback); } - /* * Helpers to fill in size, ctime, mtime, and atime. We have to be * careful because either the client or MDS may have more up to date @@ -455,16 +460,20 @@ int ceph_fill_file_size(struct inode *inode, int issued, dout("truncate_seq %u -> %u\n", ci->i_truncate_seq, truncate_seq); ci->i_truncate_seq = truncate_seq; + + /* the MDS should have revoked these caps */ + WARN_ON_ONCE(issued & (CEPH_CAP_FILE_EXCL | + CEPH_CAP_FILE_RD | + CEPH_CAP_FILE_WR | + CEPH_CAP_FILE_LAZYIO)); /* * If we hold relevant caps, or in the case where we're * not the only client referencing this file and we * don't hold those caps, then we need to check whether * the file is either opened or mmaped */ - if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD| - CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER| - CEPH_CAP_FILE_EXCL| - CEPH_CAP_FILE_LAZYIO)) || + if ((issued & (CEPH_CAP_FILE_CACHE| + CEPH_CAP_FILE_BUFFER)) || mapping_mapped(inode->i_mapping) || __ceph_caps_file_wanted(ci)) { ci->i_truncate_pending++; @@ -478,6 +487,10 @@ int ceph_fill_file_size(struct inode *inode, int issued, truncate_size); ci->i_truncate_size = truncate_size; } + + if (queue_trunc) + ceph_fscache_invalidate(inode); + return queue_trunc; } @@ -903,8 +916,8 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in, } else if (realdn) { dout("dn %p (%d) spliced with %p (%d) " "inode %p ino %llx.%llx\n", - dn, dn->d_count, - realdn, realdn->d_count, + dn, d_count(dn), + realdn, d_count(realdn), realdn->d_inode, ceph_vinop(realdn->d_inode)); dput(dn); dn = realdn; @@ -1066,7 +1079,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, * complete. */ ceph_set_dentry_offset(req->r_old_dentry); - dout("dn %p gets new offset %lld\n", req->r_old_dentry, + dout("dn %p gets new offset %lld\n", req->r_old_dentry, ceph_dentry(req->r_old_dentry)->offset); dn = req->r_old_dentry; /* use old_dentry */ @@ -1419,18 +1432,20 @@ static void ceph_invalidate_work(struct work_struct *work) u32 orig_gen; int check = 0; + mutex_lock(&ci->i_truncate_mutex); spin_lock(&ci->i_ceph_lock); dout("invalidate_pages %p gen %d revoking %d\n", inode, ci->i_rdcache_gen, ci->i_rdcache_revoking); if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { /* nevermind! */ spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&ci->i_truncate_mutex); goto out; } orig_gen = ci->i_rdcache_gen; spin_unlock(&ci->i_ceph_lock); - truncate_inode_pages(&inode->i_data, 0); + truncate_inode_pages(inode->i_mapping, 0); spin_lock(&ci->i_ceph_lock); if (orig_gen == ci->i_rdcache_gen && @@ -1445,6 +1460,7 @@ static void ceph_invalidate_work(struct work_struct *work) ci->i_rdcache_revoking); } spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&ci->i_truncate_mutex); if (check) ceph_check_caps(ci, 0, NULL); @@ -1465,7 +1481,7 @@ static void ceph_vmtruncate_work(struct work_struct *work) struct inode *inode = &ci->vfs_inode; dout("vmtruncate_work %p\n", inode); - __ceph_do_pending_vmtruncate(inode, true); + __ceph_do_pending_vmtruncate(inode); iput(inode); } @@ -1478,6 +1494,7 @@ void ceph_queue_vmtruncate(struct inode *inode) struct ceph_inode_info *ci = ceph_inode(inode); ihold(inode); + if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq, &ci->i_vmtruncate_work)) { dout("ceph_queue_vmtruncate %p\n", inode); @@ -1492,17 +1509,19 @@ void ceph_queue_vmtruncate(struct inode *inode) * Make sure any pending truncation is applied before doing anything * that may depend on it. */ -void __ceph_do_pending_vmtruncate(struct inode *inode, bool needlock) +void __ceph_do_pending_vmtruncate(struct inode *inode) { struct ceph_inode_info *ci = ceph_inode(inode); u64 to; int wrbuffer_refs, finish = 0; + mutex_lock(&ci->i_truncate_mutex); retry: spin_lock(&ci->i_ceph_lock); if (ci->i_truncate_pending == 0) { dout("__do_pending_vmtruncate %p none pending\n", inode); spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&ci->i_truncate_mutex); return; } @@ -1519,17 +1538,16 @@ retry: goto retry; } + /* there should be no reader or writer */ + WARN_ON_ONCE(ci->i_rd_ref || ci->i_wr_ref); + to = ci->i_truncate_size; wrbuffer_refs = ci->i_wrbuffer_ref; dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode, ci->i_truncate_pending, to); spin_unlock(&ci->i_ceph_lock); - if (needlock) - mutex_lock(&inode->i_mutex); truncate_inode_pages(inode->i_mapping, to); - if (needlock) - mutex_unlock(&inode->i_mutex); spin_lock(&ci->i_ceph_lock); if (to == ci->i_truncate_size) { @@ -1540,13 +1558,14 @@ retry: if (!finish) goto retry; + mutex_unlock(&ci->i_truncate_mutex); + if (wrbuffer_refs == 0) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); wake_up_all(&ci->i_cap_wq); } - /* * symlinks */ @@ -1588,8 +1607,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; - __ceph_do_pending_vmtruncate(inode, false); - err = inode_change_ok(inode, attr); if (err != 0) return err; @@ -1770,7 +1787,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ceph_cap_string(dirtied), mask); ceph_mdsc_put_request(req); - __ceph_do_pending_vmtruncate(inode, false); + if (mask & CEPH_SETATTR_SIZE) + __ceph_do_pending_vmtruncate(inode); return err; out: spin_unlock(&ci->i_ceph_lock); diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index e0b4ef31d3c8..669622fd1ae3 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -196,8 +196,10 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len, &dl.object_no, &dl.object_offset, &olen); - if (r < 0) + if (r < 0) { + up_read(&osdc->map_sem); return -EIO; + } dl.file_offset -= dl.object_offset; dl.object_size = ceph_file_layout_object_size(ci->i_layout); dl.block_size = ceph_file_layout_su(ci->i_layout); @@ -209,8 +211,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx", ceph_ino(inode), dl.object_no); - ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap, - ceph_file_layout_pg_pool(ci->i_layout)); + r = ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap, + ceph_file_layout_pg_pool(ci->i_layout)); + if (r < 0) { + up_read(&osdc->map_sem); + return r; + } dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid); if (dl.osd >= 0) { diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c index ebbf680378e2..ae6d14e82b0f 100644 --- a/fs/ceph/locks.c +++ b/fs/ceph/locks.c @@ -169,7 +169,7 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl) } /** - * Must be called with BKL already held. Fills in the passed + * Must be called with lock_flocks() already held. Fills in the passed * counter variables, so you can prepare pagelist metadata before calling * ceph_encode_locks. */ @@ -192,7 +192,7 @@ void ceph_count_locks(struct inode *inode, int *fcntl_count, int *flock_count) /** * Encode the flock and fcntl locks for the given inode into the ceph_filelock - * array. Must be called with lock_flocks() already held. + * array. Must be called with inode->i_lock already held. * If we encounter more of a specific lock type than expected, return -ENOSPC. */ int ceph_encode_locks_to_buffer(struct inode *inode, diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 4d2920304be8..b7bda5d9611d 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -414,6 +414,9 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, { struct ceph_mds_session *s; + if (mds >= mdsc->mdsmap->m_max_mds) + return ERR_PTR(-EINVAL); + s = kzalloc(sizeof(*s), GFP_NOFS); if (!s) return ERR_PTR(-ENOMEM); @@ -1028,6 +1031,37 @@ static void remove_session_caps(struct ceph_mds_session *session) { dout("remove_session_caps on %p\n", session); iterate_session_caps(session, remove_session_caps_cb, NULL); + + spin_lock(&session->s_cap_lock); + if (session->s_nr_caps > 0) { + struct super_block *sb = session->s_mdsc->fsc->sb; + struct inode *inode; + struct ceph_cap *cap, *prev = NULL; + struct ceph_vino vino; + /* + * iterate_session_caps() skips inodes that are being + * deleted, we need to wait until deletions are complete. + * __wait_on_freeing_inode() is designed for the job, + * but it is not exported, so use lookup inode function + * to access it. + */ + while (!list_empty(&session->s_caps)) { + cap = list_entry(session->s_caps.next, + struct ceph_cap, session_caps); + if (cap == prev) + break; + prev = cap; + vino = cap->ci->i_vino; + spin_unlock(&session->s_cap_lock); + + inode = ceph_find_inode(sb, vino); + iput(inode); + + spin_lock(&session->s_cap_lock); + } + } + spin_unlock(&session->s_cap_lock); + BUG_ON(session->s_nr_caps > 0); BUG_ON(!list_empty(&session->s_cap_flushing)); cleanup_cap_releases(session); @@ -1391,6 +1425,7 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc, num = le32_to_cpu(head->num); dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); head->num = cpu_to_le32(0); + msg->front.iov_len = sizeof(*head); session->s_num_cap_releases += num; /* requeue completed messages */ @@ -1553,7 +1588,7 @@ retry: *base = ceph_ino(temp->d_inode); *plen = len; dout("build_path on %p %d built %llx '%.*s'\n", - dentry, dentry->d_count, *base, len, path); + dentry, d_count(dentry), *base, len, path); return path; } @@ -2454,6 +2489,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, spin_lock(&ci->i_ceph_lock); cap->seq = 0; /* reset cap seq */ cap->issue_seq = 0; /* and issue_seq */ + cap->mseq = 0; /* and migrate_seq */ if (recon_state->flock) { rec.v2.cap_id = cpu_to_le64(cap->cap_id); @@ -2481,20 +2517,20 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, struct ceph_filelock *flocks; encode_again: - lock_flocks(); + spin_lock(&inode->i_lock); ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); - unlock_flocks(); + spin_unlock(&inode->i_lock); flocks = kmalloc((num_fcntl_locks+num_flock_locks) * sizeof(struct ceph_filelock), GFP_NOFS); if (!flocks) { err = -ENOMEM; goto out_free; } - lock_flocks(); + spin_lock(&inode->i_lock); err = ceph_encode_locks_to_buffer(inode, flocks, num_fcntl_locks, num_flock_locks); - unlock_flocks(); + spin_unlock(&inode->i_lock); if (err) { kfree(flocks); if (err == -ENOSPC) @@ -3040,8 +3076,10 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) fsc->mdsc = mdsc; mutex_init(&mdsc->mutex); mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); - if (mdsc->mdsmap == NULL) + if (mdsc->mdsmap == NULL) { + kfree(mdsc); return -ENOMEM; + } init_completion(&mdsc->safe_umount_waiters); init_waitqueue_head(&mdsc->session_close_wq); diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c index 9278dec9e940..132b64eeecd4 100644 --- a/fs/ceph/mdsmap.c +++ b/fs/ceph/mdsmap.c @@ -92,6 +92,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) u32 num_export_targets; void *pexport_targets = NULL; struct ceph_timespec laggy_since; + struct ceph_mds_info *info; ceph_decode_need(p, end, sizeof(u64)*2 + 1 + sizeof(u32), bad); global_id = ceph_decode_64(p); @@ -126,24 +127,27 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) i+1, n, global_id, mds, inc, ceph_pr_addr(&addr.in_addr), ceph_mds_state_name(state)); - if (mds >= 0 && mds < m->m_max_mds && state > 0) { - m->m_info[mds].global_id = global_id; - m->m_info[mds].state = state; - m->m_info[mds].addr = addr; - m->m_info[mds].laggy = - (laggy_since.tv_sec != 0 || - laggy_since.tv_nsec != 0); - m->m_info[mds].num_export_targets = num_export_targets; - if (num_export_targets) { - m->m_info[mds].export_targets = - kcalloc(num_export_targets, sizeof(u32), - GFP_NOFS); - for (j = 0; j < num_export_targets; j++) - m->m_info[mds].export_targets[j] = - ceph_decode_32(&pexport_targets); - } else { - m->m_info[mds].export_targets = NULL; - } + + if (mds < 0 || mds >= m->m_max_mds || state <= 0) + continue; + + info = &m->m_info[mds]; + info->global_id = global_id; + info->state = state; + info->addr = addr; + info->laggy = (laggy_since.tv_sec != 0 || + laggy_since.tv_nsec != 0); + info->num_export_targets = num_export_targets; + if (num_export_targets) { + info->export_targets = kcalloc(num_export_targets, + sizeof(u32), GFP_NOFS); + if (info->export_targets == NULL) + goto badmem; + for (j = 0; j < num_export_targets; j++) + info->export_targets[j] = + ceph_decode_32(&pexport_targets); + } else { + info->export_targets = NULL; } } @@ -170,7 +174,7 @@ bad: DUMP_PREFIX_OFFSET, 16, 1, start, end - start, true); ceph_mdsmap_destroy(m); - return ERR_PTR(-EINVAL); + return ERR_PTR(err); } void ceph_mdsmap_destroy(struct ceph_mdsmap *m) diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 7d377c9a5e35..6a0951e43044 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -17,6 +17,7 @@ #include "super.h" #include "mds_client.h" +#include "cache.h" #include <linux/ceph/ceph_features.h> #include <linux/ceph/decode.h> @@ -142,6 +143,8 @@ enum { Opt_nodcache, Opt_ino32, Opt_noino32, + Opt_fscache, + Opt_nofscache }; static match_table_t fsopt_tokens = { @@ -167,6 +170,8 @@ static match_table_t fsopt_tokens = { {Opt_nodcache, "nodcache"}, {Opt_ino32, "ino32"}, {Opt_noino32, "noino32"}, + {Opt_fscache, "fsc"}, + {Opt_nofscache, "nofsc"}, {-1, NULL} }; @@ -260,6 +265,12 @@ static int parse_fsopt_token(char *c, void *private) case Opt_noino32: fsopt->flags &= ~CEPH_MOUNT_OPT_INO32; break; + case Opt_fscache: + fsopt->flags |= CEPH_MOUNT_OPT_FSCACHE; + break; + case Opt_nofscache: + fsopt->flags &= ~CEPH_MOUNT_OPT_FSCACHE; + break; default: BUG_ON(token); } @@ -357,7 +368,7 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt, } err = -EINVAL; dev_name_end--; /* back up to ':' separator */ - if (*dev_name_end != ':') { + if (dev_name_end < dev_name || *dev_name_end != ':') { pr_err("device name is missing path (no : separator in %s)\n", dev_name); goto out; @@ -422,6 +433,10 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) seq_puts(m, ",dcache"); else seq_puts(m, ",nodcache"); + if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) + seq_puts(m, ",fsc"); + else + seq_puts(m, ",nofsc"); if (fsopt->wsize) seq_printf(m, ",wsize=%d", fsopt->wsize); @@ -530,11 +545,18 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, if (!fsc->wb_pagevec_pool) goto fail_trunc_wq; + /* setup fscache */ + if ((fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) && + (ceph_fscache_register_fs(fsc) != 0)) + goto fail_fscache; + /* caps */ fsc->min_caps = fsopt->max_readdir; return fsc; +fail_fscache: + ceph_fscache_unregister_fs(fsc); fail_trunc_wq: destroy_workqueue(fsc->trunc_wq); fail_pg_inv_wq: @@ -554,6 +576,8 @@ static void destroy_fs_client(struct ceph_fs_client *fsc) { dout("destroy_fs_client %p\n", fsc); + ceph_fscache_unregister_fs(fsc); + destroy_workqueue(fsc->wb_wq); destroy_workqueue(fsc->pg_inv_wq); destroy_workqueue(fsc->trunc_wq); @@ -588,6 +612,8 @@ static void ceph_inode_init_once(void *foo) static int __init init_caches(void) { + int error = -ENOMEM; + ceph_inode_cachep = kmem_cache_create("ceph_inode_info", sizeof(struct ceph_inode_info), __alignof__(struct ceph_inode_info), @@ -611,15 +637,17 @@ static int __init init_caches(void) if (ceph_file_cachep == NULL) goto bad_file; - return 0; + if ((error = ceph_fscache_register())) + goto bad_file; + return 0; bad_file: kmem_cache_destroy(ceph_dentry_cachep); bad_dentry: kmem_cache_destroy(ceph_cap_cachep); bad_cap: kmem_cache_destroy(ceph_inode_cachep); - return -ENOMEM; + return error; } static void destroy_caches(void) @@ -629,10 +657,13 @@ static void destroy_caches(void) * destroy cache. */ rcu_barrier(); + kmem_cache_destroy(ceph_inode_cachep); kmem_cache_destroy(ceph_cap_cachep); kmem_cache_destroy(ceph_dentry_cachep); kmem_cache_destroy(ceph_file_cachep); + + ceph_fscache_unregister(); } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 7ccfdb4aea2e..6014b0a3c405 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -16,6 +16,10 @@ #include <linux/ceph/libceph.h> +#ifdef CONFIG_CEPH_FSCACHE +#include <linux/fscache.h> +#endif + /* f_type in struct statfs */ #define CEPH_SUPER_MAGIC 0x00c36400 @@ -29,6 +33,7 @@ #define CEPH_MOUNT_OPT_NOASYNCREADDIR (1<<7) /* no dcache readdir */ #define CEPH_MOUNT_OPT_INO32 (1<<8) /* 32 bit inos */ #define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */ +#define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */ #define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES) @@ -90,6 +95,11 @@ struct ceph_fs_client { struct dentry *debugfs_bdi; struct dentry *debugfs_mdsc, *debugfs_mdsmap; #endif + +#ifdef CONFIG_CEPH_FSCACHE + struct fscache_cookie *fscache; + struct workqueue_struct *revalidate_wq; +#endif }; @@ -288,6 +298,7 @@ struct ceph_inode_info { int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ + struct mutex i_truncate_mutex; u32 i_truncate_seq; /* last truncate to smaller size */ u64 i_truncate_size; /* and the size we last truncated down to */ int i_truncate_pending; /* still need to call vmtruncate */ @@ -319,6 +330,12 @@ struct ceph_inode_info { struct work_struct i_vmtruncate_work; +#ifdef CONFIG_CEPH_FSCACHE + struct fscache_cookie *fscache; + u32 i_fscache_gen; /* sequence, for delayed fscache validate */ + struct work_struct i_revalidate_work; +#endif + struct inode vfs_inode; /* at end */ }; @@ -534,7 +551,7 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci); extern void ceph_caps_init(struct ceph_mds_client *mdsc); extern void ceph_caps_finalize(struct ceph_mds_client *mdsc); extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta); -extern int ceph_reserve_caps(struct ceph_mds_client *mdsc, +extern void ceph_reserve_caps(struct ceph_mds_client *mdsc, struct ceph_cap_reservation *ctx, int need); extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc, struct ceph_cap_reservation *ctx); @@ -692,7 +709,7 @@ extern int ceph_readdir_prepopulate(struct ceph_mds_request *req, extern int ceph_inode_holds_cap(struct inode *inode, int mask); extern int ceph_inode_set_size(struct inode *inode, loff_t size); -extern void __ceph_do_pending_vmtruncate(struct inode *inode, bool needlock); +extern void __ceph_do_pending_vmtruncate(struct inode *inode); extern void ceph_queue_vmtruncate(struct inode *inode); extern void ceph_queue_invalidate(struct inode *inode); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 9b6b2b6dd164..be661d8f532a 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -675,17 +675,18 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, if (!ceph_is_valid_xattr(name)) return -ENODATA; - spin_lock(&ci->i_ceph_lock); - dout("getxattr %p ver=%lld index_ver=%lld\n", inode, - ci->i_xattrs.version, ci->i_xattrs.index_version); /* let's see if a virtual xattr was requested */ vxattr = ceph_match_vxattr(inode, name); if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) { err = vxattr->getxattr_cb(ci, value, size); - goto out; + return err; } + spin_lock(&ci->i_ceph_lock); + dout("getxattr %p ver=%lld index_ver=%lld\n", inode, + ci->i_xattrs.version, ci->i_xattrs.index_version); + if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) && (ci->i_xattrs.index_version >= ci->i_xattrs.version)) { goto get_xattr; |