summaryrefslogtreecommitdiff
path: root/fs/ceph/file.c
diff options
context:
space:
mode:
authorLuis Henriques <lhenriques@suse.com>2018-10-15 18:45:59 +0300
committerIlya Dryomov <idryomov@gmail.com>2018-10-22 11:28:24 +0300
commit503f82a9932d311c12430779627f7a130bbe462a (patch)
treec8040ee6fa052eeec74ca8a95f4c357b43584be8 /fs/ceph/file.c
parent23ddf9bea900b4ce39e5707e1ddf66062cfe6d91 (diff)
downloadlinux-503f82a9932d311c12430779627f7a130bbe462a.tar.xz
ceph: support copy_file_range file operation
This commit implements support for the copy_file_range syscall in cephfs. It is implemented using the RADOS 'copy-from' operation, which allows to do a remote object copy, without the need to download/upload data from/to the OSDs. Some manual copy may however be required if the source/destination file offsets aren't object aligned or if the copy length is smaller than the object size. Signed-off-by: Luis Henriques <lhenriques@suse.com> Reviewed-by: "Yan, Zheng" <zyan@redhat.com> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'fs/ceph/file.c')
-rw-r--r--fs/ceph/file.c294
1 files changed, 293 insertions, 1 deletions
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 0fa6b6b6ccbc..5557ec6760ea 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: GPL-2.0
#include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/striper.h>
#include <linux/module.h>
#include <linux/sched.h>
@@ -1795,6 +1796,297 @@ unlock:
return ret;
}
+/*
+ * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
+ * src_ci. Two attempts are made to obtain both caps, and an error is return if
+ * this fails; zero is returned on success.
+ */
+static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
+ loff_t src_endoff, int *src_got,
+ struct ceph_inode_info *dst_ci,
+ loff_t dst_endoff, int *dst_got)
+{
+ int ret = 0;
+ bool retrying = false;
+
+retry_caps:
+ ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+ dst_endoff, dst_got, NULL);
+ if (ret < 0)
+ return ret;
+
+ /*
+ * Since we're already holding the FILE_WR capability for the dst file,
+ * we would risk a deadlock by using ceph_get_caps. Thus, we'll do some
+ * retry dance instead to try to get both capabilities.
+ */
+ ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+ false, src_got);
+ if (ret <= 0) {
+ /* Start by dropping dst_ci caps and getting src_ci caps */
+ ceph_put_cap_refs(dst_ci, *dst_got);
+ if (retrying) {
+ if (!ret)
+ /* ceph_try_get_caps masks EAGAIN */
+ ret = -EAGAIN;
+ return ret;
+ }
+ ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
+ CEPH_CAP_FILE_SHARED, src_endoff,
+ src_got, NULL);
+ if (ret < 0)
+ return ret;
+ /*... drop src_ci caps too, and retry */
+ ceph_put_cap_refs(src_ci, *src_got);
+ retrying = true;
+ goto retry_caps;
+ }
+ return ret;
+}
+
+static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
+ struct ceph_inode_info *dst_ci, int dst_got)
+{
+ ceph_put_cap_refs(src_ci, src_got);
+ ceph_put_cap_refs(dst_ci, dst_got);
+}
+
+/*
+ * This function does several size-related checks, returning an error if:
+ * - source file is smaller than off+len
+ * - destination file size is not OK (inode_newsize_ok())
+ * - max bytes quotas is exceeded
+ */
+static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
+ loff_t src_off, loff_t dst_off, size_t len)
+{
+ loff_t size, endoff;
+
+ size = i_size_read(src_inode);
+ /*
+ * Don't copy beyond source file EOF. Instead of simply setting length
+ * to (size - src_off), just drop to VFS default implementation, as the
+ * local i_size may be stale due to other clients writing to the source
+ * inode.
+ */
+ if (src_off + len > size) {
+ dout("Copy beyond EOF (%llu + %zu > %llu)\n",
+ src_off, len, size);
+ return -EOPNOTSUPP;
+ }
+ size = i_size_read(dst_inode);
+
+ endoff = dst_off + len;
+ if (inode_newsize_ok(dst_inode, endoff))
+ return -EOPNOTSUPP;
+
+ if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
+ return -EDQUOT;
+
+ return 0;
+}
+
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
+ struct file *dst_file, loff_t dst_off,
+ size_t len, unsigned int flags)
+{
+ struct inode *src_inode = file_inode(src_file);
+ struct inode *dst_inode = file_inode(dst_file);
+ struct ceph_inode_info *src_ci = ceph_inode(src_inode);
+ struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
+ struct ceph_cap_flush *prealloc_cf;
+ struct ceph_object_locator src_oloc, dst_oloc;
+ struct ceph_object_id src_oid, dst_oid;
+ loff_t endoff = 0, size;
+ ssize_t ret = -EIO;
+ u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
+ u32 src_objlen, dst_objlen, object_size;
+ int src_got = 0, dst_got = 0, err, dirty;
+ bool do_final_copy = false;
+
+ if (src_inode == dst_inode)
+ return -EINVAL;
+ if (ceph_snap(dst_inode) != CEPH_NOSNAP)
+ return -EROFS;
+
+ /*
+ * Some of the checks below will return -EOPNOTSUPP, which will force a
+ * fallback to the default VFS copy_file_range implementation. This is
+ * desirable in several cases (for ex, the 'len' is smaller than the
+ * size of the objects, or in cases where that would be more
+ * efficient).
+ */
+
+ if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
+ (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
+ (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
+ return -EOPNOTSUPP;
+
+ if (len < src_ci->i_layout.object_size)
+ return -EOPNOTSUPP; /* no remote copy will be done */
+
+ prealloc_cf = ceph_alloc_cap_flush();
+ if (!prealloc_cf)
+ return -ENOMEM;
+
+ /* Start by sync'ing the source file */
+ ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
+ if (ret < 0)
+ goto out;
+
+ /*
+ * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
+ * clients may have dirty data in their caches. And OSDs know nothing
+ * about caps, so they can't safely do the remote object copies.
+ */
+ err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
+ dst_ci, (dst_off + len), &dst_got);
+ if (err < 0) {
+ dout("get_rd_wr_caps returned %d\n", err);
+ ret = -EOPNOTSUPP;
+ goto out;
+ }
+
+ ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
+ if (ret < 0)
+ goto out_caps;
+
+ size = i_size_read(dst_inode);
+ endoff = dst_off + len;
+
+ /* Drop dst file cached pages */
+ ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
+ dst_off >> PAGE_SHIFT,
+ endoff >> PAGE_SHIFT);
+ if (ret < 0) {
+ dout("Failed to invalidate inode pages (%zd)\n", ret);
+ ret = 0; /* XXX */
+ }
+ src_oloc.pool = src_ci->i_layout.pool_id;
+ src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+ dst_oloc.pool = dst_ci->i_layout.pool_id;
+ dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+
+ ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+ src_ci->i_layout.object_size,
+ &src_objnum, &src_objoff, &src_objlen);
+ ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+ dst_ci->i_layout.object_size,
+ &dst_objnum, &dst_objoff, &dst_objlen);
+ /* object-level offsets need to the same */
+ if (src_objoff != dst_objoff) {
+ ret = -EOPNOTSUPP;
+ goto out_caps;
+ }
+
+ /*
+ * Do a manual copy if the object offset isn't object aligned.
+ * 'src_objlen' contains the bytes left until the end of the object,
+ * starting at the src_off
+ */
+ if (src_objoff) {
+ /*
+ * we need to temporarily drop all caps as we'll be calling
+ * {read,write}_iter, which will get caps again.
+ */
+ put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+ ret = do_splice_direct(src_file, &src_off, dst_file,
+ &dst_off, src_objlen, flags);
+ if (ret < 0) {
+ dout("do_splice_direct returned %d\n", err);
+ goto out;
+ }
+ len -= ret;
+ err = get_rd_wr_caps(src_ci, (src_off + len),
+ &src_got, dst_ci,
+ (dst_off + len), &dst_got);
+ if (err < 0)
+ goto out;
+ err = is_file_size_ok(src_inode, dst_inode,
+ src_off, dst_off, len);
+ if (err < 0)
+ goto out_caps;
+ }
+ object_size = src_ci->i_layout.object_size;
+ while (len >= object_size) {
+ ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+ object_size, &src_objnum,
+ &src_objoff, &src_objlen);
+ ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+ object_size, &dst_objnum,
+ &dst_objoff, &dst_objlen);
+ ceph_oid_init(&src_oid);
+ ceph_oid_printf(&src_oid, "%llx.%08llx",
+ src_ci->i_vino.ino, src_objnum);
+ ceph_oid_init(&dst_oid);
+ ceph_oid_printf(&dst_oid, "%llx.%08llx",
+ dst_ci->i_vino.ino, dst_objnum);
+ /* Do an object remote copy */
+ err = ceph_osdc_copy_from(
+ &ceph_inode_to_client(src_inode)->client->osdc,
+ src_ci->i_vino.snap, 0,
+ &src_oid, &src_oloc,
+ CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+ CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
+ &dst_oid, &dst_oloc,
+ CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+ CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0);
+ if (err) {
+ dout("ceph_osdc_copy_from returned %d\n", err);
+ if (!ret)
+ ret = err;
+ goto out_caps;
+ }
+ len -= object_size;
+ src_off += object_size;
+ dst_off += object_size;
+ ret += object_size;
+ }
+
+ if (len)
+ /* We still need one final local copy */
+ do_final_copy = true;
+
+ file_update_time(dst_file);
+ if (endoff > size) {
+ int caps_flags = 0;
+
+ /* Let the MDS know about dst file size change */
+ if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
+ caps_flags |= CHECK_CAPS_NODELAY;
+ if (ceph_inode_set_size(dst_inode, endoff))
+ caps_flags |= CHECK_CAPS_AUTHONLY;
+ if (caps_flags)
+ ceph_check_caps(dst_ci, caps_flags, NULL);
+ }
+ /* Mark Fw dirty */
+ spin_lock(&dst_ci->i_ceph_lock);
+ dst_ci->i_inline_version = CEPH_INLINE_NONE;
+ dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
+ spin_unlock(&dst_ci->i_ceph_lock);
+ if (dirty)
+ __mark_inode_dirty(dst_inode, dirty);
+
+out_caps:
+ put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+
+ if (do_final_copy) {
+ err = do_splice_direct(src_file, &src_off, dst_file,
+ &dst_off, len, flags);
+ if (err < 0) {
+ dout("do_splice_direct returned %d\n", err);
+ goto out;
+ }
+ len -= err;
+ ret += err;
+ }
+
+out:
+ ceph_free_cap_flush(prealloc_cf);
+
+ return ret;
+}
+
const struct file_operations ceph_file_fops = {
.open = ceph_open,
.release = ceph_release,
@@ -1810,5 +2102,5 @@ const struct file_operations ceph_file_fops = {
.unlocked_ioctl = ceph_ioctl,
.compat_ioctl = ceph_ioctl,
.fallocate = ceph_fallocate,
+ .copy_file_range = ceph_copy_file_range,
};
-