diff options
Diffstat (limited to 'fs/nfs/localio.c')
-rw-r--r-- | fs/nfs/localio.c | 225 |
1 files changed, 164 insertions, 61 deletions
diff --git a/fs/nfs/localio.c b/fs/nfs/localio.c index 4b8618cf114c..510d0a16cfe9 100644 --- a/fs/nfs/localio.c +++ b/fs/nfs/localio.c @@ -35,6 +35,7 @@ struct nfs_local_kiocb { struct bio_vec *bvec; struct nfs_pgio_header *hdr; struct work_struct work; + void (*aio_complete_work)(struct work_struct *); struct nfsd_file *localio; }; @@ -48,9 +49,14 @@ struct nfs_local_fsync_ctx { static bool localio_enabled __read_mostly = true; module_param(localio_enabled, bool, 0644); +static bool localio_O_DIRECT_semantics __read_mostly = false; +module_param(localio_O_DIRECT_semantics, bool, 0644); +MODULE_PARM_DESC(localio_O_DIRECT_semantics, + "LOCALIO will use O_DIRECT semantics to filesystem."); + static inline bool nfs_client_is_local(const struct nfs_client *clp) { - return !!test_bit(NFS_CS_LOCAL_IO, &clp->cl_flags); + return !!rcu_access_pointer(clp->cl_uuid.net); } bool nfs_server_is_local(const struct nfs_client *clp) @@ -116,30 +122,6 @@ const struct rpc_program nfslocalio_program = { }; /* - * nfs_local_enable - enable local i/o for an nfs_client - */ -static void nfs_local_enable(struct nfs_client *clp) -{ - spin_lock(&clp->cl_localio_lock); - set_bit(NFS_CS_LOCAL_IO, &clp->cl_flags); - trace_nfs_local_enable(clp); - spin_unlock(&clp->cl_localio_lock); -} - -/* - * nfs_local_disable - disable local i/o for an nfs_client - */ -void nfs_local_disable(struct nfs_client *clp) -{ - spin_lock(&clp->cl_localio_lock); - if (test_and_clear_bit(NFS_CS_LOCAL_IO, &clp->cl_flags)) { - trace_nfs_local_disable(clp); - nfs_uuid_invalidate_one_client(&clp->cl_uuid); - } - spin_unlock(&clp->cl_localio_lock); -} - -/* * nfs_init_localioclient - Initialise an NFS localio client connection */ static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp) @@ -178,7 +160,7 @@ static bool nfs_server_uuid_is_local(struct nfs_client *clp) rpc_shutdown_client(rpcclient_localio); /* Server is only local if it initialized required struct members */ - if (status || !clp->cl_uuid.net || !clp->cl_uuid.dom) + if (status || !rcu_access_pointer(clp->cl_uuid.net) || !clp->cl_uuid.dom) return false; return true; @@ -189,49 +171,74 @@ static bool nfs_server_uuid_is_local(struct nfs_client *clp) * - called after alloc_client and init_client (so cl_rpcclient exists) * - this function is idempotent, it can be called for old or new clients */ -void nfs_local_probe(struct nfs_client *clp) +static void nfs_local_probe(struct nfs_client *clp) { /* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */ if (!localio_enabled || clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) { - nfs_local_disable(clp); + nfs_localio_disable_client(clp); return; } if (nfs_client_is_local(clp)) { /* If already enabled, disable and re-enable */ - nfs_local_disable(clp); + nfs_localio_disable_client(clp); } if (!nfs_uuid_begin(&clp->cl_uuid)) return; if (nfs_server_uuid_is_local(clp)) - nfs_local_enable(clp); + nfs_localio_enable_client(clp); nfs_uuid_end(&clp->cl_uuid); } -EXPORT_SYMBOL_GPL(nfs_local_probe); + +void nfs_local_probe_async_work(struct work_struct *work) +{ + struct nfs_client *clp = + container_of(work, struct nfs_client, cl_local_probe_work); + + if (!refcount_inc_not_zero(&clp->cl_count)) + return; + nfs_local_probe(clp); + nfs_put_client(clp); +} + +void nfs_local_probe_async(struct nfs_client *clp) +{ + queue_work(nfsiod_workqueue, &clp->cl_local_probe_work); +} +EXPORT_SYMBOL_GPL(nfs_local_probe_async); + +static inline void nfs_local_file_put(struct nfsd_file *localio) +{ + /* nfs_to_nfsd_file_put_local() expects an __rcu pointer + * but we have a __kernel pointer. It is always safe + * to cast a __kernel pointer to an __rcu pointer + * because the cast only weakens what is known about the pointer. + */ + struct nfsd_file __rcu *nf = (struct nfsd_file __rcu*) localio; + + nfs_to_nfsd_file_put_local(&nf); +} /* - * nfs_local_open_fh - open a local filehandle in terms of nfsd_file + * __nfs_local_open_fh - open a local filehandle in terms of nfsd_file. * - * Returns a pointer to a struct nfsd_file or NULL + * Returns a pointer to a struct nfsd_file or ERR_PTR. + * Caller must release returned nfsd_file with nfs_to_nfsd_file_put_local(). */ -struct nfsd_file * -nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred, - struct nfs_fh *fh, const fmode_t mode) +static struct nfsd_file * +__nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred, + struct nfs_fh *fh, struct nfs_file_localio *nfl, + struct nfsd_file __rcu **pnf, + const fmode_t mode) { struct nfsd_file *localio; - int status; - - if (!nfs_server_is_local(clp)) - return NULL; - if (mode & ~(FMODE_READ | FMODE_WRITE)) - return NULL; localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient, - cred, fh, mode); + cred, fh, nfl, pnf, mode); if (IS_ERR(localio)) { - status = PTR_ERR(localio); + int status = PTR_ERR(localio); trace_nfs_local_open_fh(fh, mode, status); switch (status) { case -ENOMEM: @@ -240,10 +247,39 @@ nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred, /* Revalidate localio, will disable if unsupported */ nfs_local_probe(clp); } - return NULL; } return localio; } + +/* + * nfs_local_open_fh - open a local filehandle in terms of nfsd_file. + * First checking if the open nfsd_file is already cached, otherwise + * must __nfs_local_open_fh and insert the nfsd_file in nfs_file_localio. + * + * Returns a pointer to a struct nfsd_file or NULL. + */ +struct nfsd_file * +nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred, + struct nfs_fh *fh, struct nfs_file_localio *nfl, + const fmode_t mode) +{ + struct nfsd_file *nf, __rcu **pnf; + + if (!nfs_server_is_local(clp)) + return NULL; + if (mode & ~(FMODE_READ | FMODE_WRITE)) + return NULL; + + if (mode & FMODE_WRITE) + pnf = &nfl->rw_file; + else + pnf = &nfl->ro_file; + + nf = __nfs_local_open_fh(clp, cred, fh, nfl, pnf, mode); + if (IS_ERR(nf)) + return NULL; + return nf; +} EXPORT_SYMBOL_GPL(nfs_local_open_fh); static struct bio_vec * @@ -285,10 +321,19 @@ nfs_local_iocb_alloc(struct nfs_pgio_header *hdr, kfree(iocb); return NULL; } - init_sync_kiocb(&iocb->kiocb, file); + + if (localio_O_DIRECT_semantics && + test_bit(NFS_IOHDR_ODIRECT, &hdr->flags)) { + iocb->kiocb.ki_filp = file; + iocb->kiocb.ki_flags = IOCB_DIRECT; + } else + init_sync_kiocb(&iocb->kiocb, file); + iocb->kiocb.ki_pos = hdr->args.offset; iocb->hdr = hdr; iocb->kiocb.ki_flags &= ~IOCB_APPEND; + iocb->aio_complete_work = NULL; + return iocb; } @@ -328,7 +373,7 @@ nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status) hdr->res.op_status = NFS4_OK; hdr->task.tk_status = 0; } else { - hdr->res.op_status = nfs4_stat_to_errno(status); + hdr->res.op_status = nfs_localio_errno_to_nfs4_stat(status); hdr->task.tk_status = status; } } @@ -338,11 +383,23 @@ nfs_local_pgio_release(struct nfs_local_kiocb *iocb) { struct nfs_pgio_header *hdr = iocb->hdr; - nfs_to_nfsd_file_put_local(iocb->localio); + nfs_local_file_put(iocb->localio); nfs_local_iocb_free(iocb); nfs_local_hdr_release(hdr, hdr->task.tk_ops); } +/* + * Complete the I/O from iocb->kiocb.ki_complete() + * + * Note that this function can be called from a bottom half context, + * hence we need to queue the rpc_call_done() etc to a workqueue + */ +static inline void nfs_local_pgio_aio_complete(struct nfs_local_kiocb *iocb) +{ + INIT_WORK(&iocb->work, iocb->aio_complete_work); + queue_work(nfsiod_workqueue, &iocb->work); +} + static void nfs_local_read_done(struct nfs_local_kiocb *iocb, long status) { @@ -365,6 +422,23 @@ nfs_local_read_done(struct nfs_local_kiocb *iocb, long status) status > 0 ? status : 0, hdr->res.eof); } +static void nfs_local_read_aio_complete_work(struct work_struct *work) +{ + struct nfs_local_kiocb *iocb = + container_of(work, struct nfs_local_kiocb, work); + + nfs_local_pgio_release(iocb); +} + +static void nfs_local_read_aio_complete(struct kiocb *kiocb, long ret) +{ + struct nfs_local_kiocb *iocb = + container_of(kiocb, struct nfs_local_kiocb, kiocb); + + nfs_local_read_done(iocb, ret); + nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_read_aio_complete_work */ +} + static void nfs_local_call_read(struct work_struct *work) { struct nfs_local_kiocb *iocb = @@ -379,10 +453,10 @@ static void nfs_local_call_read(struct work_struct *work) nfs_local_iter_init(&iter, iocb, READ); status = filp->f_op->read_iter(&iocb->kiocb, &iter); - WARN_ON_ONCE(status == -EIOCBQUEUED); - - nfs_local_read_done(iocb, status); - nfs_local_pgio_release(iocb); + if (status != -EIOCBQUEUED) { + nfs_local_read_done(iocb, status); + nfs_local_pgio_release(iocb); + } revert_creds(save_cred); } @@ -410,6 +484,11 @@ nfs_do_local_read(struct nfs_pgio_header *hdr, nfs_local_pgio_init(hdr, call_ops); hdr->res.eof = false; + if (iocb->kiocb.ki_flags & IOCB_DIRECT) { + iocb->kiocb.ki_complete = nfs_local_read_aio_complete; + iocb->aio_complete_work = nfs_local_read_aio_complete_work; + } + INIT_WORK(&iocb->work, nfs_local_call_read); queue_work(nfslocaliod_workqueue, &iocb->work); @@ -534,6 +613,24 @@ nfs_local_write_done(struct nfs_local_kiocb *iocb, long status) nfs_local_pgio_done(hdr, status); } +static void nfs_local_write_aio_complete_work(struct work_struct *work) +{ + struct nfs_local_kiocb *iocb = + container_of(work, struct nfs_local_kiocb, work); + + nfs_local_vfs_getattr(iocb); + nfs_local_pgio_release(iocb); +} + +static void nfs_local_write_aio_complete(struct kiocb *kiocb, long ret) +{ + struct nfs_local_kiocb *iocb = + container_of(kiocb, struct nfs_local_kiocb, kiocb); + + nfs_local_write_done(iocb, ret); + nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_write_aio_complete_work */ +} + static void nfs_local_call_write(struct work_struct *work) { struct nfs_local_kiocb *iocb = @@ -552,11 +649,11 @@ static void nfs_local_call_write(struct work_struct *work) file_start_write(filp); status = filp->f_op->write_iter(&iocb->kiocb, &iter); file_end_write(filp); - WARN_ON_ONCE(status == -EIOCBQUEUED); - - nfs_local_write_done(iocb, status); - nfs_local_vfs_getattr(iocb); - nfs_local_pgio_release(iocb); + if (status != -EIOCBQUEUED) { + nfs_local_write_done(iocb, status); + nfs_local_vfs_getattr(iocb); + nfs_local_pgio_release(iocb); + } revert_creds(save_cred); current->flags = old_flags; @@ -592,10 +689,16 @@ nfs_do_local_write(struct nfs_pgio_header *hdr, case NFS_FILE_SYNC: iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC; } + nfs_local_pgio_init(hdr, call_ops); nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable); + if (iocb->kiocb.ki_flags & IOCB_DIRECT) { + iocb->kiocb.ki_complete = nfs_local_write_aio_complete; + iocb->aio_complete_work = nfs_local_write_aio_complete_work; + } + INIT_WORK(&iocb->work, nfs_local_call_write); queue_work(nfslocaliod_workqueue, &iocb->work); @@ -626,8 +729,8 @@ int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio, if (status != 0) { if (status == -EAGAIN) - nfs_local_disable(clp); - nfs_to_nfsd_file_put_local(localio); + nfs_localio_disable_client(clp); + nfs_local_file_put(localio); hdr->task.tk_status = status; nfs_local_hdr_release(hdr, call_ops); } @@ -668,7 +771,7 @@ nfs_local_commit_done(struct nfs_commit_data *data, int status) data->task.tk_status = 0; } else { nfs_reset_boot_verifier(data->inode); - data->res.op_status = nfs4_stat_to_errno(status); + data->res.op_status = nfs_localio_errno_to_nfs4_stat(status); data->task.tk_status = status; } } @@ -678,7 +781,7 @@ nfs_local_release_commit_data(struct nfsd_file *localio, struct nfs_commit_data *data, const struct rpc_call_ops *call_ops) { - nfs_to_nfsd_file_put_local(localio); + nfs_local_file_put(localio); call_ops->rpc_call_done(&data->task, data); call_ops->rpc_release(data); } |