summaryrefslogtreecommitdiff
path: root/net/sunrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c28
-rw-r--r--net/sunrpc/auth_gss/gss_krb5_crypto.c3
-rw-r--r--net/sunrpc/auth_gss/gss_rpc_upcall.c9
-rw-r--r--net/sunrpc/auth_gss/gss_rpc_xdr.c14
-rw-r--r--net/sunrpc/auth_gss/gss_rpc_xdr.h4
-rw-r--r--net/sunrpc/auth_gss/svcauth_gss.c8
-rw-r--r--net/sunrpc/clnt.c16
-rw-r--r--net/sunrpc/rpcb_clnt.c82
-rw-r--r--net/sunrpc/stats.c16
-rw-r--r--net/sunrpc/svc.c35
-rw-r--r--net/sunrpc/svc_xprt.c10
-rw-r--r--net/sunrpc/xprt.c8
-rw-r--r--net/sunrpc/xprtrdma/Makefile4
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c47
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c69
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c125
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_marshal.c168
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c734
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_rw.c449
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c15
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c250
-rw-r--r--net/sunrpc/xprtrdma/transport.c3
-rw-r--r--net/sunrpc/xprtrdma/verbs.c55
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h40
-rw-r--r--net/sunrpc/xprtsock.c2
25 files changed, 1086 insertions, 1108 deletions
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 4f16953e4954..9463af4b32e8 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -117,14 +117,14 @@ static const struct rpc_pipe_ops gss_upcall_ops_v1;
static inline struct gss_cl_ctx *
gss_get_ctx(struct gss_cl_ctx *ctx)
{
- atomic_inc(&ctx->count);
+ refcount_inc(&ctx->count);
return ctx;
}
static inline void
gss_put_ctx(struct gss_cl_ctx *ctx)
{
- if (atomic_dec_and_test(&ctx->count))
+ if (refcount_dec_and_test(&ctx->count))
gss_free_ctx(ctx);
}
@@ -200,7 +200,7 @@ gss_alloc_context(void)
ctx->gc_proc = RPC_GSS_PROC_DATA;
ctx->gc_seq = 1; /* NetApp 6.4R1 doesn't accept seq. no. 0 */
spin_lock_init(&ctx->gc_seq_lock);
- atomic_set(&ctx->count,1);
+ refcount_set(&ctx->count,1);
}
return ctx;
}
@@ -287,7 +287,7 @@ err:
#define UPCALL_BUF_LEN 128
struct gss_upcall_msg {
- atomic_t count;
+ refcount_t count;
kuid_t uid;
struct rpc_pipe_msg msg;
struct list_head list;
@@ -328,7 +328,7 @@ static void
gss_release_msg(struct gss_upcall_msg *gss_msg)
{
struct net *net = gss_msg->auth->net;
- if (!atomic_dec_and_test(&gss_msg->count))
+ if (!refcount_dec_and_test(&gss_msg->count))
return;
put_pipe_version(net);
BUG_ON(!list_empty(&gss_msg->list));
@@ -348,7 +348,7 @@ __gss_find_upcall(struct rpc_pipe *pipe, kuid_t uid, const struct gss_auth *auth
continue;
if (auth && pos->auth->service != auth->service)
continue;
- atomic_inc(&pos->count);
+ refcount_inc(&pos->count);
dprintk("RPC: %s found msg %p\n", __func__, pos);
return pos;
}
@@ -369,7 +369,7 @@ gss_add_msg(struct gss_upcall_msg *gss_msg)
spin_lock(&pipe->lock);
old = __gss_find_upcall(pipe, gss_msg->uid, gss_msg->auth);
if (old == NULL) {
- atomic_inc(&gss_msg->count);
+ refcount_inc(&gss_msg->count);
list_add(&gss_msg->list, &pipe->in_downcall);
} else
gss_msg = old;
@@ -383,7 +383,7 @@ __gss_unhash_msg(struct gss_upcall_msg *gss_msg)
list_del_init(&gss_msg->list);
rpc_wake_up_status(&gss_msg->rpc_waitqueue, gss_msg->msg.errno);
wake_up_all(&gss_msg->waitqueue);
- atomic_dec(&gss_msg->count);
+ refcount_dec(&gss_msg->count);
}
static void
@@ -506,7 +506,7 @@ gss_alloc_msg(struct gss_auth *gss_auth,
INIT_LIST_HEAD(&gss_msg->list);
rpc_init_wait_queue(&gss_msg->rpc_waitqueue, "RPCSEC_GSS upcall waitq");
init_waitqueue_head(&gss_msg->waitqueue);
- atomic_set(&gss_msg->count, 1);
+ refcount_set(&gss_msg->count, 1);
gss_msg->uid = uid;
gss_msg->auth = gss_auth;
switch (vers) {
@@ -542,11 +542,11 @@ gss_setup_upcall(struct gss_auth *gss_auth, struct rpc_cred *cred)
gss_msg = gss_add_msg(gss_new);
if (gss_msg == gss_new) {
int res;
- atomic_inc(&gss_msg->count);
+ refcount_inc(&gss_msg->count);
res = rpc_queue_upcall(gss_new->pipe, &gss_new->msg);
if (res) {
gss_unhash_msg(gss_new);
- atomic_dec(&gss_msg->count);
+ refcount_dec(&gss_msg->count);
gss_release_msg(gss_new);
gss_msg = ERR_PTR(res);
}
@@ -595,7 +595,7 @@ gss_refresh_upcall(struct rpc_task *task)
task->tk_timeout = 0;
gss_cred->gc_upcall = gss_msg;
/* gss_upcall_callback will release the reference to gss_upcall_msg */
- atomic_inc(&gss_msg->count);
+ refcount_inc(&gss_msg->count);
rpc_sleep_on(&gss_msg->rpc_waitqueue, task, gss_upcall_callback);
} else {
gss_handle_downcall_result(gss_cred, gss_msg);
@@ -815,7 +815,7 @@ restart:
if (!list_empty(&gss_msg->msg.list))
continue;
gss_msg->msg.errno = -EPIPE;
- atomic_inc(&gss_msg->count);
+ refcount_inc(&gss_msg->count);
__gss_unhash_msg(gss_msg);
spin_unlock(&pipe->lock);
gss_release_msg(gss_msg);
@@ -834,7 +834,7 @@ gss_pipe_destroy_msg(struct rpc_pipe_msg *msg)
if (msg->errno < 0) {
dprintk("RPC: %s releasing msg %p\n",
__func__, gss_msg);
- atomic_inc(&gss_msg->count);
+ refcount_inc(&gss_msg->count);
gss_unhash_msg(gss_msg);
if (msg->errno == -ETIMEDOUT)
warn_gssd();
diff --git a/net/sunrpc/auth_gss/gss_krb5_crypto.c b/net/sunrpc/auth_gss/gss_krb5_crypto.c
index fb39284ec174..12649c9fedab 100644
--- a/net/sunrpc/auth_gss/gss_krb5_crypto.c
+++ b/net/sunrpc/auth_gss/gss_krb5_crypto.c
@@ -34,6 +34,7 @@
* WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
*/
+#include <crypto/algapi.h>
#include <crypto/hash.h>
#include <crypto/skcipher.h>
#include <linux/err.h>
@@ -927,7 +928,7 @@ gss_krb5_aes_decrypt(struct krb5_ctx *kctx, u32 offset, struct xdr_buf *buf,
if (ret)
goto out_err;
- if (memcmp(pkt_hmac, our_hmac, kctx->gk5e->cksumlength) != 0) {
+ if (crypto_memneq(pkt_hmac, our_hmac, kctx->gk5e->cksumlength) != 0) {
ret = GSS_S_BAD_SIG;
goto out_err;
}
diff --git a/net/sunrpc/auth_gss/gss_rpc_upcall.c b/net/sunrpc/auth_gss/gss_rpc_upcall.c
index f0c6a8c78a56..46b295e4f2b8 100644
--- a/net/sunrpc/auth_gss/gss_rpc_upcall.c
+++ b/net/sunrpc/auth_gss/gss_rpc_upcall.c
@@ -55,15 +55,15 @@ enum {
#define PROC(proc, name) \
[GSSX_##proc] = { \
.p_proc = GSSX_##proc, \
- .p_encode = (kxdreproc_t)gssx_enc_##name, \
- .p_decode = (kxdrdproc_t)gssx_dec_##name, \
+ .p_encode = gssx_enc_##name, \
+ .p_decode = gssx_dec_##name, \
.p_arglen = GSSX_ARG_##name##_sz, \
.p_replen = GSSX_RES_##name##_sz, \
.p_statidx = GSSX_##proc, \
.p_name = #proc, \
}
-static struct rpc_procinfo gssp_procedures[] = {
+static const struct rpc_procinfo gssp_procedures[] = {
PROC(INDICATE_MECHS, indicate_mechs),
PROC(GET_CALL_CONTEXT, get_call_context),
PROC(IMPORT_AND_CANON_NAME, import_and_canon_name),
@@ -364,11 +364,12 @@ void gssp_free_upcall_data(struct gssp_upcall_data *data)
/*
* Initialization stuff
*/
-
+static unsigned int gssp_version1_counts[ARRAY_SIZE(gssp_procedures)];
static const struct rpc_version gssp_version1 = {
.number = GSSPROXY_VERS_1,
.nrprocs = ARRAY_SIZE(gssp_procedures),
.procs = gssp_procedures,
+ .counts = gssp_version1_counts,
};
static const struct rpc_version *gssp_version[] = {
diff --git a/net/sunrpc/auth_gss/gss_rpc_xdr.c b/net/sunrpc/auth_gss/gss_rpc_xdr.c
index 25d9a9cf7b66..c4778cae58ef 100644
--- a/net/sunrpc/auth_gss/gss_rpc_xdr.c
+++ b/net/sunrpc/auth_gss/gss_rpc_xdr.c
@@ -44,7 +44,7 @@ static int gssx_dec_bool(struct xdr_stream *xdr, u32 *v)
}
static int gssx_enc_buffer(struct xdr_stream *xdr,
- gssx_buffer *buf)
+ const gssx_buffer *buf)
{
__be32 *p;
@@ -56,7 +56,7 @@ static int gssx_enc_buffer(struct xdr_stream *xdr,
}
static int gssx_enc_in_token(struct xdr_stream *xdr,
- struct gssp_in_token *in)
+ const struct gssp_in_token *in)
{
__be32 *p;
@@ -130,7 +130,7 @@ static int gssx_dec_option(struct xdr_stream *xdr,
}
static int dummy_enc_opt_array(struct xdr_stream *xdr,
- struct gssx_option_array *oa)
+ const struct gssx_option_array *oa)
{
__be32 *p;
@@ -348,7 +348,7 @@ static int gssx_dec_status(struct xdr_stream *xdr,
}
static int gssx_enc_call_ctx(struct xdr_stream *xdr,
- struct gssx_call_ctx *ctx)
+ const struct gssx_call_ctx *ctx)
{
struct gssx_option opt;
__be32 *p;
@@ -733,8 +733,9 @@ static int gssx_enc_cb(struct xdr_stream *xdr, struct gssx_cb *cb)
void gssx_enc_accept_sec_context(struct rpc_rqst *req,
struct xdr_stream *xdr,
- struct gssx_arg_accept_sec_context *arg)
+ const void *data)
{
+ const struct gssx_arg_accept_sec_context *arg = data;
int err;
err = gssx_enc_call_ctx(xdr, &arg->call_ctx);
@@ -789,8 +790,9 @@ done:
int gssx_dec_accept_sec_context(struct rpc_rqst *rqstp,
struct xdr_stream *xdr,
- struct gssx_res_accept_sec_context *res)
+ void *data)
{
+ struct gssx_res_accept_sec_context *res = data;
u32 value_follows;
int err;
struct page *scratch;
diff --git a/net/sunrpc/auth_gss/gss_rpc_xdr.h b/net/sunrpc/auth_gss/gss_rpc_xdr.h
index 9d88c6239f01..146c31032917 100644
--- a/net/sunrpc/auth_gss/gss_rpc_xdr.h
+++ b/net/sunrpc/auth_gss/gss_rpc_xdr.h
@@ -179,10 +179,10 @@ struct gssx_res_accept_sec_context {
#define gssx_dec_init_sec_context NULL
void gssx_enc_accept_sec_context(struct rpc_rqst *req,
struct xdr_stream *xdr,
- struct gssx_arg_accept_sec_context *args);
+ const void *data);
int gssx_dec_accept_sec_context(struct rpc_rqst *rqstp,
struct xdr_stream *xdr,
- struct gssx_res_accept_sec_context *res);
+ void *data);
#define gssx_enc_release_handle NULL
#define gssx_dec_release_handle NULL
#define gssx_enc_get_mic NULL
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index a54a7a3d28f5..7b1ee5a0b03c 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -838,6 +838,14 @@ unwrap_integ_data(struct svc_rqst *rqstp, struct xdr_buf *buf, u32 seq, struct g
struct xdr_netobj mic;
struct xdr_buf integ_buf;
+ /* NFS READ normally uses splice to send data in-place. However
+ * the data in cache can change after the reply's MIC is computed
+ * but before the RPC reply is sent. To prevent the client from
+ * rejecting the server-computed MIC in this somewhat rare case,
+ * do not use splice with the GSS integrity service.
+ */
+ clear_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
+
/* Did we already verify the signature on the original pass through? */
if (rqstp->rq_deferred)
return 0;
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index b5cb921775a0..2e49d1f892b7 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1517,14 +1517,16 @@ static void
call_start(struct rpc_task *task)
{
struct rpc_clnt *clnt = task->tk_client;
+ int idx = task->tk_msg.rpc_proc->p_statidx;
dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
clnt->cl_program->name, clnt->cl_vers,
rpc_proc_name(task),
(RPC_IS_ASYNC(task) ? "async" : "sync"));
- /* Increment call count */
- task->tk_msg.rpc_proc->p_count++;
+ /* Increment call count (version might not be valid for ping) */
+ if (clnt->cl_program->version[clnt->cl_vers])
+ clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
clnt->cl_stats->rpccnt++;
task->tk_action = call_reserve;
}
@@ -1672,7 +1674,7 @@ call_allocate(struct rpc_task *task)
unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
- struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
+ const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
int status;
dprint_status(task);
@@ -2476,16 +2478,18 @@ out_overflow:
goto out_garbage;
}
-static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
+static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
+ const void *obj)
{
}
-static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
+static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
+ void *obj)
{
return 0;
}
-static struct rpc_procinfo rpcproc_null = {
+static const struct rpc_procinfo rpcproc_null = {
.p_encode = rpcproc_encode_null,
.p_decode = rpcproc_decode_null,
};
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index 5b30603596d0..ea0676f199c8 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -128,13 +128,13 @@ struct rpcbind_args {
int r_status;
};
-static struct rpc_procinfo rpcb_procedures2[];
-static struct rpc_procinfo rpcb_procedures3[];
-static struct rpc_procinfo rpcb_procedures4[];
+static const struct rpc_procinfo rpcb_procedures2[];
+static const struct rpc_procinfo rpcb_procedures3[];
+static const struct rpc_procinfo rpcb_procedures4[];
struct rpcb_info {
u32 rpc_vers;
- struct rpc_procinfo * rpc_proc;
+ const struct rpc_procinfo *rpc_proc;
};
static const struct rpcb_info rpcb_next_version[];
@@ -620,7 +620,8 @@ int rpcb_v4_register(struct net *net, const u32 program, const u32 version,
return -EAFNOSUPPORT;
}
-static struct rpc_task *rpcb_call_async(struct rpc_clnt *rpcb_clnt, struct rpcbind_args *map, struct rpc_procinfo *proc)
+static struct rpc_task *rpcb_call_async(struct rpc_clnt *rpcb_clnt,
+ struct rpcbind_args *map, const struct rpc_procinfo *proc)
{
struct rpc_message msg = {
.rpc_proc = proc,
@@ -671,7 +672,7 @@ static struct rpc_clnt *rpcb_find_transport_owner(struct rpc_clnt *clnt)
void rpcb_getport_async(struct rpc_task *task)
{
struct rpc_clnt *clnt;
- struct rpc_procinfo *proc;
+ const struct rpc_procinfo *proc;
u32 bind_version;
struct rpc_xprt *xprt;
struct rpc_clnt *rpcb_clnt;
@@ -843,8 +844,9 @@ static void rpcb_getport_done(struct rpc_task *child, void *data)
*/
static void rpcb_enc_mapping(struct rpc_rqst *req, struct xdr_stream *xdr,
- const struct rpcbind_args *rpcb)
+ const void *data)
{
+ const struct rpcbind_args *rpcb = data;
__be32 *p;
dprintk("RPC: %5u encoding PMAP_%s call (%u, %u, %d, %u)\n",
@@ -860,8 +862,9 @@ static void rpcb_enc_mapping(struct rpc_rqst *req, struct xdr_stream *xdr,
}
static int rpcb_dec_getport(struct rpc_rqst *req, struct xdr_stream *xdr,
- struct rpcbind_args *rpcb)
+ void *data)
{
+ struct rpcbind_args *rpcb = data;
unsigned long port;
__be32 *p;
@@ -882,8 +885,9 @@ static int rpcb_dec_getport(struct rpc_rqst *req, struct xdr_stream *xdr,
}
static int rpcb_dec_set(struct rpc_rqst *req, struct xdr_stream *xdr,
- unsigned int *boolp)
+ void *data)
{
+ unsigned int *boolp = data;
__be32 *p;
p = xdr_inline_decode(xdr, 4);
@@ -917,8 +921,9 @@ static void encode_rpcb_string(struct xdr_stream *xdr, const char *string,
}
static void rpcb_enc_getaddr(struct rpc_rqst *req, struct xdr_stream *xdr,
- const struct rpcbind_args *rpcb)
+ const void *data)
{
+ const struct rpcbind_args *rpcb = data;
__be32 *p;
dprintk("RPC: %5u encoding RPCB_%s call (%u, %u, '%s', '%s')\n",
@@ -937,8 +942,9 @@ static void rpcb_enc_getaddr(struct rpc_rqst *req, struct xdr_stream *xdr,
}
static int rpcb_dec_getaddr(struct rpc_rqst *req, struct xdr_stream *xdr,
- struct rpcbind_args *rpcb)
+ void *data)
{
+ struct rpcbind_args *rpcb = data;
struct sockaddr_storage address;
struct sockaddr *sap = (struct sockaddr *)&address;
__be32 *p;
@@ -989,11 +995,11 @@ out_fail:
* since the Linux kernel RPC code requires only these.
*/
-static struct rpc_procinfo rpcb_procedures2[] = {
+static const struct rpc_procinfo rpcb_procedures2[] = {
[RPCBPROC_SET] = {
.p_proc = RPCBPROC_SET,
- .p_encode = (kxdreproc_t)rpcb_enc_mapping,
- .p_decode = (kxdrdproc_t)rpcb_dec_set,
+ .p_encode = rpcb_enc_mapping,
+ .p_decode = rpcb_dec_set,
.p_arglen = RPCB_mappingargs_sz,
.p_replen = RPCB_setres_sz,
.p_statidx = RPCBPROC_SET,
@@ -1002,8 +1008,8 @@ static struct rpc_procinfo rpcb_procedures2[] = {
},
[RPCBPROC_UNSET] = {
.p_proc = RPCBPROC_UNSET,
- .p_encode = (kxdreproc_t)rpcb_enc_mapping,
- .p_decode = (kxdrdproc_t)rpcb_dec_set,
+ .p_encode = rpcb_enc_mapping,
+ .p_decode = rpcb_dec_set,
.p_arglen = RPCB_mappingargs_sz,
.p_replen = RPCB_setres_sz,
.p_statidx = RPCBPROC_UNSET,
@@ -1012,8 +1018,8 @@ static struct rpc_procinfo rpcb_procedures2[] = {
},
[RPCBPROC_GETPORT] = {
.p_proc = RPCBPROC_GETPORT,
- .p_encode = (kxdreproc_t)rpcb_enc_mapping,
- .p_decode = (kxdrdproc_t)rpcb_dec_getport,
+ .p_encode = rpcb_enc_mapping,
+ .p_decode = rpcb_dec_getport,
.p_arglen = RPCB_mappingargs_sz,
.p_replen = RPCB_getportres_sz,
.p_statidx = RPCBPROC_GETPORT,
@@ -1022,11 +1028,11 @@ static struct rpc_procinfo rpcb_procedures2[] = {
},
};
-static struct rpc_procinfo rpcb_procedures3[] = {
+static const struct rpc_procinfo rpcb_procedures3[] = {
[RPCBPROC_SET] = {
.p_proc = RPCBPROC_SET,
- .p_encode = (kxdreproc_t)rpcb_enc_getaddr,
- .p_decode = (kxdrdproc_t)rpcb_dec_set,
+ .p_encode = rpcb_enc_getaddr,
+ .p_decode = rpcb_dec_set,
.p_arglen = RPCB_getaddrargs_sz,
.p_replen = RPCB_setres_sz,
.p_statidx = RPCBPROC_SET,
@@ -1035,8 +1041,8 @@ static struct rpc_procinfo rpcb_procedures3[] = {
},
[RPCBPROC_UNSET] = {
.p_proc = RPCBPROC_UNSET,
- .p_encode = (kxdreproc_t)rpcb_enc_getaddr,
- .p_decode = (kxdrdproc_t)rpcb_dec_set,
+ .p_encode = rpcb_enc_getaddr,
+ .p_decode = rpcb_dec_set,
.p_arglen = RPCB_getaddrargs_sz,
.p_replen = RPCB_setres_sz,
.p_statidx = RPCBPROC_UNSET,
@@ -1045,8 +1051,8 @@ static struct rpc_procinfo rpcb_procedures3[] = {
},
[RPCBPROC_GETADDR] = {
.p_proc = RPCBPROC_GETADDR,
- .p_encode = (kxdreproc_t)rpcb_enc_getaddr,
- .p_decode = (kxdrdproc_t)rpcb_dec_getaddr,
+ .p_encode = rpcb_enc_getaddr,
+ .p_decode = rpcb_dec_getaddr,
.p_arglen = RPCB_getaddrargs_sz,
.p_replen = RPCB_getaddrres_sz,
.p_statidx = RPCBPROC_GETADDR,
@@ -1055,11 +1061,11 @@ static struct rpc_procinfo rpcb_procedures3[] = {
},
};
-static struct rpc_procinfo rpcb_procedures4[] = {
+static const struct rpc_procinfo rpcb_procedures4[] = {
[RPCBPROC_SET] = {
.p_proc = RPCBPROC_SET,
- .p_encode = (kxdreproc_t)rpcb_enc_getaddr,
- .p_decode = (kxdrdproc_t)rpcb_dec_set,
+ .p_encode = rpcb_enc_getaddr,
+ .p_decode = rpcb_dec_set,
.p_arglen = RPCB_getaddrargs_sz,
.p_replen = RPCB_setres_sz,
.p_statidx = RPCBPROC_SET,
@@ -1068,8 +1074,8 @@ static struct rpc_procinfo rpcb_procedures4[] = {
},
[RPCBPROC_UNSET] = {
.p_proc = RPCBPROC_UNSET,
- .p_encode = (kxdreproc_t)rpcb_enc_getaddr,
- .p_decode = (kxdrdproc_t)rpcb_dec_set,
+ .p_encode = rpcb_enc_getaddr,
+ .p_decode = rpcb_dec_set,
.p_arglen = RPCB_getaddrargs_sz,
.p_replen = RPCB_setres_sz,
.p_statidx = RPCBPROC_UNSET,
@@ -1078,8 +1084,8 @@ static struct rpc_procinfo rpcb_procedures4[] = {
},
[RPCBPROC_GETADDR] = {
.p_proc = RPCBPROC_GETADDR,
- .p_encode = (kxdreproc_t)rpcb_enc_getaddr,
- .p_decode = (kxdrdproc_t)rpcb_dec_getaddr,
+ .p_encode = rpcb_enc_getaddr,
+ .p_decode = rpcb_dec_getaddr,
.p_arglen = RPCB_getaddrargs_sz,
.p_replen = RPCB_getaddrres_sz,
.p_statidx = RPCBPROC_GETADDR,
@@ -1112,22 +1118,28 @@ static const struct rpcb_info rpcb_next_version6[] = {
},
};
+static unsigned int rpcb_version2_counts[ARRAY_SIZE(rpcb_procedures2)];
static const struct rpc_version rpcb_version2 = {
.number = RPCBVERS_2,
.nrprocs = ARRAY_SIZE(rpcb_procedures2),
- .procs = rpcb_procedures2
+ .procs = rpcb_procedures2,
+ .counts = rpcb_version2_counts,
};
+static unsigned int rpcb_version3_counts[ARRAY_SIZE(rpcb_procedures3)];
static const struct rpc_version rpcb_version3 = {
.number = RPCBVERS_3,
.nrprocs = ARRAY_SIZE(rpcb_procedures3),
- .procs = rpcb_procedures3
+ .procs = rpcb_procedures3,
+ .counts = rpcb_version3_counts,
};
+static unsigned int rpcb_version4_counts[ARRAY_SIZE(rpcb_procedures4)];
static const struct rpc_version rpcb_version4 = {
.number = RPCBVERS_4,
.nrprocs = ARRAY_SIZE(rpcb_procedures4),
- .procs = rpcb_procedures4
+ .procs = rpcb_procedures4,
+ .counts = rpcb_version4_counts,
};
static const struct rpc_version *rpcb_version[] = {
diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c
index caeb01ad2b5a..1e671333c3d5 100644
--- a/net/sunrpc/stats.c
+++ b/net/sunrpc/stats.c
@@ -55,8 +55,7 @@ static int rpc_proc_show(struct seq_file *seq, void *v) {
seq_printf(seq, "proc%u %u",
vers->number, vers->nrprocs);
for (j = 0; j < vers->nrprocs; j++)
- seq_printf(seq, " %u",
- vers->procs[j].p_count);
+ seq_printf(seq, " %u", vers->counts[j]);
seq_putc(seq, '\n');
}
return 0;
@@ -78,9 +77,9 @@ static const struct file_operations rpc_proc_fops = {
/*
* Get RPC server stats
*/
-void svc_seq_show(struct seq_file *seq, const struct svc_stat *statp) {
+void svc_seq_show(struct seq_file *seq, const struct svc_stat *statp)
+{
const struct svc_program *prog = statp->program;
- const struct svc_procedure *proc;
const struct svc_version *vers;
unsigned int i, j;
@@ -99,11 +98,12 @@ void svc_seq_show(struct seq_file *seq, const struct svc_stat *statp) {
statp->rpcbadclnt);
for (i = 0; i < prog->pg_nvers; i++) {
- if (!(vers = prog->pg_vers[i]) || !(proc = vers->vs_proc))
+ vers = prog->pg_vers[i];
+ if (!vers)
continue;
seq_printf(seq, "proc%d %u", i, vers->vs_nproc);
- for (j = 0; j < vers->vs_nproc; j++, proc++)
- seq_printf(seq, " %u", proc->pc_count);
+ for (j = 0; j < vers->vs_nproc; j++)
+ seq_printf(seq, " %u", vers->vs_count[j]);
seq_putc(seq, '\n');
}
}
@@ -192,7 +192,7 @@ void rpc_count_iostats(const struct rpc_task *task, struct rpc_iostats *stats)
EXPORT_SYMBOL_GPL(rpc_count_iostats);
static void _print_name(struct seq_file *seq, unsigned int op,
- struct rpc_procinfo *procs)
+ const struct rpc_procinfo *procs)
{
if (procs[op].p_name)
seq_printf(seq, "\t%12s: ", procs[op].p_name);
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index bc0f5a0ecbdc..85ce0db5b0a6 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1008,7 +1008,7 @@ int svc_register(const struct svc_serv *serv, struct net *net,
const unsigned short port)
{
struct svc_program *progp;
- struct svc_version *vers;
+ const struct svc_version *vers;
unsigned int i;
int error = 0;
@@ -1151,10 +1151,9 @@ static int
svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
{
struct svc_program *progp;
- struct svc_version *versp = NULL; /* compiler food */
- struct svc_procedure *procp = NULL;
+ const struct svc_version *versp = NULL; /* compiler food */
+ const struct svc_procedure *procp = NULL;
struct svc_serv *serv = rqstp->rq_server;
- kxdrproc_t xdr;
__be32 *statp;
u32 prog, vers, proc;
__be32 auth_stat, rpc_stat;
@@ -1166,7 +1165,7 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
if (argv->iov_len < 6*4)
goto err_short_len;
- /* Will be turned off only in gss privacy case: */
+ /* Will be turned off by GSS integrity and privacy services */
set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
/* Will be turned off only when NFSv4 Sessions are used */
set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
@@ -1262,7 +1261,7 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
svc_putnl(resv, RPC_SUCCESS);
/* Bump per-procedure stats counter */
- procp->pc_count++;
+ versp->vs_count[proc]++;
/* Initialize storage for argp and resp */
memset(rqstp->rq_argp, 0, procp->pc_argsize);
@@ -1276,28 +1275,30 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
/* Call the function that processes the request. */
if (!versp->vs_dispatch) {
- /* Decode arguments */
- xdr = procp->pc_decode;
- if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp))
+ /*
+ * Decode arguments
+ * XXX: why do we ignore the return value?
+ */
+ if (procp->pc_decode &&
+ !procp->pc_decode(rqstp, argv->iov_base))
goto err_garbage;
- *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
+ *statp = procp->pc_func(rqstp);
/* Encode reply */
if (*statp == rpc_drop_reply ||
test_bit(RQ_DROPME, &rqstp->rq_flags)) {
if (procp->pc_release)
- procp->pc_release(rqstp, NULL, rqstp->rq_resp);
+ procp->pc_release(rqstp);
goto dropit;
}
if (*statp == rpc_autherr_badcred) {
if (procp->pc_release)
- procp->pc_release(rqstp, NULL, rqstp->rq_resp);
+ procp->pc_release(rqstp);
goto err_bad_auth;
}
- if (*statp == rpc_success &&
- (xdr = procp->pc_encode) &&
- !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
+ if (*statp == rpc_success && procp->pc_encode &&
+ !procp->pc_encode(rqstp, resv->iov_base + resv->iov_len)) {
dprintk("svc: failed to encode reply\n");
/* serv->sv_stats->rpcsystemerr++; */
*statp = rpc_system_err;
@@ -1307,7 +1308,7 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
if (!versp->vs_dispatch(rqstp, statp)) {
/* Release reply info */
if (procp->pc_release)
- procp->pc_release(rqstp, NULL, rqstp->rq_resp);
+ procp->pc_release(rqstp);
goto dropit;
}
}
@@ -1318,7 +1319,7 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
/* Release reply info */
if (procp->pc_release)
- procp->pc_release(rqstp, NULL, rqstp->rq_resp);
+ procp->pc_release(rqstp);
if (procp->pc_encode == NULL)
goto dropit;
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 7bfe1fb42add..d16a8b423c20 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -659,11 +659,13 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
int i;
/* now allocate needed pages. If we get a failure, sleep briefly */
- pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
- WARN_ON_ONCE(pages >= RPCSVC_MAXPAGES);
- if (pages >= RPCSVC_MAXPAGES)
+ pages = (serv->sv_max_mesg + 2 * PAGE_SIZE) >> PAGE_SHIFT;
+ if (pages > RPCSVC_MAXPAGES) {
+ pr_warn_once("svc: warning: pages=%u > RPCSVC_MAXPAGES=%lu\n",
+ pages, RPCSVC_MAXPAGES);
/* use as many pages as possible */
- pages = RPCSVC_MAXPAGES - 1;
+ pages = RPCSVC_MAXPAGES;
+ }
for (i = 0; i < pages ; i++)
while (rqstp->rq_pages[i] == NULL) {
struct page *p = alloc_page(GFP_KERNEL);
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 3e63c5e97ebe..4654a9934269 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1047,13 +1047,15 @@ out:
return ret;
}
-static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
+static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt)
{
struct rpc_rqst *req = ERR_PTR(-EAGAIN);
if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
goto out;
- req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
+ spin_unlock(&xprt->reserve_lock);
+ req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS);
+ spin_lock(&xprt->reserve_lock);
if (req != NULL)
goto out;
atomic_dec(&xprt->num_reqs);
@@ -1081,7 +1083,7 @@ void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
list_del(&req->rq_list);
goto out_init_req;
}
- req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT|__GFP_NOWARN);
+ req = xprt_dynamic_alloc_slot(xprt);
if (!IS_ERR(req))
goto out_init_req;
switch (PTR_ERR(req)) {
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index c1ae8142ab73..b8213ddce2f2 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -3,6 +3,6 @@ obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o
rpcrdma-y := transport.o rpc_rdma.o verbs.o \
fmr_ops.o frwr_ops.o \
svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
- svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
- svc_rdma_rw.o module.o
+ svc_rdma_sendto.o svc_rdma_recvfrom.o svc_rdma_rw.o \
+ module.o
rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 59e64025ed96..d3f84bb1d443 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -91,7 +91,7 @@ __fmr_unmap(struct rpcrdma_mw *mw)
list_add(&mw->fmr.fm_mr->list, &l);
rc = ib_unmap_fmr(&l);
- list_del_init(&mw->fmr.fm_mr->list);
+ list_del(&mw->fmr.fm_mr->list);
return rc;
}
@@ -213,13 +213,11 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
break;
}
- mw->mw_nents = i;
mw->mw_dir = rpcrdma_data_dir(writing);
- if (i == 0)
- goto out_dmamap_err;
- if (!ib_dma_map_sg(r_xprt->rx_ia.ri_device,
- mw->mw_sg, mw->mw_nents, mw->mw_dir))
+ mw->mw_nents = ib_dma_map_sg(r_xprt->rx_ia.ri_device,
+ mw->mw_sg, i, mw->mw_dir);
+ if (!mw->mw_nents)
goto out_dmamap_err;
for (i = 0, dma_pages = mw->fmr.fm_physaddrs; i < mw->mw_nents; i++)
@@ -237,16 +235,18 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
return mw->mw_nents;
out_dmamap_err:
- pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n",
- mw->mw_sg, mw->mw_nents);
- rpcrdma_defer_mr_recovery(mw);
+ pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
+ mw->mw_sg, i);
+ rpcrdma_put_mw(r_xprt, mw);
return -EIO;
out_maperr:
pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
len, (unsigned long long)dma_pages[0],
pageoff, mw->mw_nents, rc);
- rpcrdma_defer_mr_recovery(mw);
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir);
+ rpcrdma_put_mw(r_xprt, mw);
return -EIO;
}
@@ -255,24 +255,26 @@ out_maperr:
* Sleeps until it is safe for the host CPU to access the
* previously mapped memory regions.
*
- * Caller ensures that req->rl_registered is not empty.
+ * Caller ensures that @mws is not empty before the call. This
+ * function empties the list.
*/
static void
-fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws)
{
- struct rpcrdma_mw *mw, *tmp;
+ struct rpcrdma_mw *mw;
LIST_HEAD(unmap_list);
int rc;
- dprintk("RPC: %s: req %p\n", __func__, req);
-
/* ORDER: Invalidate all of the req's MRs first
*
* ib_unmap_fmr() is slow, so use a single call instead
* of one call per mapped FMR.
*/
- list_for_each_entry(mw, &req->rl_registered, mw_list)
+ list_for_each_entry(mw, mws, mw_list) {
+ dprintk("RPC: %s: unmapping fmr %p\n",
+ __func__, &mw->fmr);
list_add_tail(&mw->fmr.fm_mr->list, &unmap_list);
+ }
r_xprt->rx_stats.local_inv_needed++;
rc = ib_unmap_fmr(&unmap_list);
if (rc)
@@ -281,9 +283,11 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
/* ORDER: Now DMA unmap all of the req's MRs, and return
* them to the free MW list.
*/
- list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
- list_del_init(&mw->mw_list);
- list_del_init(&mw->fmr.fm_mr->list);
+ while (!list_empty(mws)) {
+ mw = rpcrdma_pop_mw(mws);
+ dprintk("RPC: %s: DMA unmapping fmr %p\n",
+ __func__, &mw->fmr);
+ list_del(&mw->fmr.fm_mr->list);
ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
mw->mw_sg, mw->mw_nents, mw->mw_dir);
rpcrdma_put_mw(r_xprt, mw);
@@ -294,8 +298,9 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
out_reset:
pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc);
- list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
- list_del_init(&mw->fmr.fm_mr->list);
+ while (!list_empty(mws)) {
+ mw = rpcrdma_pop_mw(mws);
+ list_del(&mw->fmr.fm_mr->list);
fmr_op_recover_mr(mw);
}
}
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index f81dd93176c0..6aea36a38bfd 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -277,7 +277,7 @@ __frwr_sendcompletion_flush(struct ib_wc *wc, const char *wr)
}
/**
- * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC
+ * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
* @cq: completion queue (ignored)
* @wc: completed WR
*
@@ -298,7 +298,7 @@ frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
}
/**
- * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC
* @cq: completion queue (ignored)
* @wc: completed WR
*
@@ -319,7 +319,7 @@ frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
}
/**
- * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC
* @cq: completion queue (ignored)
* @wc: completed WR
*
@@ -355,7 +355,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
struct ib_mr *mr;
struct ib_reg_wr *reg_wr;
struct ib_send_wr *bad_wr;
- int rc, i, n, dma_nents;
+ int rc, i, n;
u8 key;
mw = NULL;
@@ -391,14 +391,10 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
break;
}
- mw->mw_nents = i;
mw->mw_dir = rpcrdma_data_dir(writing);
- if (i == 0)
- goto out_dmamap_err;
- dma_nents = ib_dma_map_sg(ia->ri_device,
- mw->mw_sg, mw->mw_nents, mw->mw_dir);
- if (!dma_nents)
+ mw->mw_nents = ib_dma_map_sg(ia->ri_device, mw->mw_sg, i, mw->mw_dir);
+ if (!mw->mw_nents)
goto out_dmamap_err;
n = ib_map_mr_sg(mr, mw->mw_sg, mw->mw_nents, NULL, PAGE_SIZE);
@@ -436,13 +432,14 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
return mw->mw_nents;
out_dmamap_err:
- pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n",
- mw->mw_sg, mw->mw_nents);
- rpcrdma_defer_mr_recovery(mw);
+ pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
+ mw->mw_sg, i);
+ frmr->fr_state = FRMR_IS_INVALID;
+ rpcrdma_put_mw(r_xprt, mw);
return -EIO;
out_mapmr_err:
- pr_err("rpcrdma: failed to map mr %p (%u/%u)\n",
+ pr_err("rpcrdma: failed to map mr %p (%d/%d)\n",
frmr->fr_mr, n, mw->mw_nents);
rpcrdma_defer_mr_recovery(mw);
return -EIO;
@@ -458,21 +455,19 @@ out_senderr:
* Sleeps until it is safe for the host CPU to access the
* previously mapped memory regions.
*
- * Caller ensures that req->rl_registered is not empty.
+ * Caller ensures that @mws is not empty before the call. This
+ * function empties the list.
*/
static void
-frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws)
{
struct ib_send_wr *first, **prev, *last, *bad_wr;
- struct rpcrdma_rep *rep = req->rl_reply;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_frmr *f;
struct rpcrdma_mw *mw;
int count, rc;
- dprintk("RPC: %s: req %p\n", __func__, req);
-
- /* ORDER: Invalidate all of the req's MRs first
+ /* ORDER: Invalidate all of the MRs first
*
* Chain the LOCAL_INV Work Requests and post them with
* a single ib_post_send() call.
@@ -480,11 +475,10 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
f = NULL;
count = 0;
prev = &first;
- list_for_each_entry(mw, &req->rl_registered, mw_list) {
+ list_for_each_entry(mw, mws, mw_list) {
mw->frmr.fr_state = FRMR_IS_INVALID;
- if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) &&
- (mw->mw_handle == rep->rr_inv_rkey))
+ if (mw->mw_flags & RPCRDMA_MW_F_RI)
continue;
f = &mw->frmr;
@@ -524,18 +518,19 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
* unless ri_id->qp is a valid pointer.
*/
r_xprt->rx_stats.local_inv_needed++;
+ bad_wr = NULL;
rc = ib_post_send(ia->ri_id->qp, first, &bad_wr);
+ if (bad_wr != first)
+ wait_for_completion(&f->fr_linv_done);
if (rc)
goto reset_mrs;
- wait_for_completion(&f->fr_linv_done);
-
- /* ORDER: Now DMA unmap all of the req's MRs, and return
+ /* ORDER: Now DMA unmap all of the MRs, and return
* them to the free MW list.
*/
unmap:
- while (!list_empty(&req->rl_registered)) {
- mw = rpcrdma_pop_mw(&req->rl_registered);
+ while (!list_empty(mws)) {
+ mw = rpcrdma_pop_mw(mws);
dprintk("RPC: %s: DMA unmapping frmr %p\n",
__func__, &mw->frmr);
ib_dma_unmap_sg(ia->ri_device,
@@ -546,17 +541,19 @@ unmap:
reset_mrs:
pr_err("rpcrdma: FRMR invalidate ib_post_send returned %i\n", rc);
- rdma_disconnect(ia->ri_id);
/* Find and reset the MRs in the LOCAL_INV WRs that did not
- * get posted. This is synchronous, and slow.
+ * get posted.
*/
- list_for_each_entry(mw, &req->rl_registered, mw_list) {
- f = &mw->frmr;
- if (mw->mw_handle == bad_wr->ex.invalidate_rkey) {
- __frwr_reset_mr(ia, mw);
- bad_wr = bad_wr->next;
- }
+ rpcrdma_init_cqcount(&r_xprt->rx_ep, -count);
+ while (bad_wr) {
+ f = container_of(bad_wr, struct rpcrdma_frmr,
+ fr_invwr);
+ mw = container_of(f, struct rpcrdma_mw, frmr);
+
+ __frwr_reset_mr(ia, mw);
+
+ bad_wr = bad_wr->next;
}
goto unmap;
}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 694e9b13ecf0..ca4d6e4528f3 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -141,7 +141,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
if (xdr->page_len) {
remaining = xdr->page_len;
- offset = xdr->page_base & ~PAGE_MASK;
+ offset = offset_in_page(xdr->page_base);
count = 0;
while (remaining) {
remaining -= min_t(unsigned int,
@@ -222,7 +222,7 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
len = xdrbuf->page_len;
ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
- page_base = xdrbuf->page_base & ~PAGE_MASK;
+ page_base = offset_in_page(xdrbuf->page_base);
p = 0;
while (len && n < RPCRDMA_MAX_SEGS) {
if (!ppages[p]) {
@@ -540,7 +540,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
goto out;
page = virt_to_page(xdr->tail[0].iov_base);
- page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+ page_base = offset_in_page(xdr->tail[0].iov_base);
/* If the content in the page list is an odd length,
* xdr_write_pages() has added a pad at the beginning
@@ -557,7 +557,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
*/
if (xdr->page_len) {
ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
- page_base = xdr->page_base & ~PAGE_MASK;
+ page_base = offset_in_page(xdr->page_base);
remaining = xdr->page_len;
while (remaining) {
sge_no++;
@@ -587,7 +587,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
*/
if (xdr->tail[0].iov_len) {
page = virt_to_page(xdr->tail[0].iov_base);
- page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+ page_base = offset_in_page(xdr->tail[0].iov_base);
len = xdr->tail[0].iov_len;
map_tail:
@@ -734,6 +734,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
rpclen = 0;
}
+ req->rl_xid = rqst->rq_xid;
+ rpcrdma_insert_req(&r_xprt->rx_buf, req);
+
/* This implementation supports the following combinations
* of chunk lists in one RPC-over-RDMA Call message:
*
@@ -875,9 +878,9 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
srcp += curlen;
copy_len -= curlen;
- page_base = rqst->rq_rcv_buf.page_base;
- ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT);
- page_base &= ~PAGE_MASK;
+ ppages = rqst->rq_rcv_buf.pages +
+ (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
+ page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
fixup_copy_count = 0;
if (copy_len && rqst->rq_rcv_buf.page_len) {
int pagelist_len;
@@ -928,6 +931,24 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
return fixup_copy_count;
}
+/* Caller must guarantee @rep remains stable during this call.
+ */
+static void
+rpcrdma_mark_remote_invalidation(struct list_head *mws,
+ struct rpcrdma_rep *rep)
+{
+ struct rpcrdma_mw *mw;
+
+ if (!(rep->rr_wc_flags & IB_WC_WITH_INVALIDATE))
+ return;
+
+ list_for_each_entry(mw, mws, mw_list)
+ if (mw->mw_handle == rep->rr_inv_rkey) {
+ mw->mw_flags = RPCRDMA_MW_F_RI;
+ break; /* only one invalidated MR per RPC */
+ }
+}
+
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
/* By convention, backchannel calls arrive via rdma_msg type
* messages, and never populate the chunk lists. This makes
@@ -969,14 +990,16 @@ rpcrdma_reply_handler(struct work_struct *work)
{
struct rpcrdma_rep *rep =
container_of(work, struct rpcrdma_rep, rr_work);
+ struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
struct rpcrdma_msg *headerp;
struct rpcrdma_req *req;
struct rpc_rqst *rqst;
- struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
- struct rpc_xprt *xprt = &r_xprt->rx_xprt;
__be32 *iptr;
int rdmalen, status, rmerr;
unsigned long cwnd;
+ struct list_head mws;
dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
@@ -994,27 +1017,45 @@ rpcrdma_reply_handler(struct work_struct *work)
/* Match incoming rpcrdma_rep to an rpcrdma_req to
* get context for handling any incoming chunks.
*/
- spin_lock_bh(&xprt->transport_lock);
- rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
- if (!rqst)
+ spin_lock(&buf->rb_lock);
+ req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
+ headerp->rm_xid);
+ if (!req)
goto out_nomatch;
-
- req = rpcr_to_rdmar(rqst);
if (req->rl_reply)
goto out_duplicate;
- /* Sanity checking has passed. We are now committed
- * to complete this transaction.
+ list_replace_init(&req->rl_registered, &mws);
+ rpcrdma_mark_remote_invalidation(&mws, rep);
+
+ /* Avoid races with signals and duplicate replies
+ * by marking this req as matched.
*/
- list_del_init(&rqst->rq_list);
- spin_unlock_bh(&xprt->transport_lock);
+ req->rl_reply = rep;
+ spin_unlock(&buf->rb_lock);
+
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
__func__, rep, req, be32_to_cpu(headerp->rm_xid));
- /* from here on, the reply is no longer an orphan */
- req->rl_reply = rep;
- xprt->reestablish_timeout = 0;
+ /* Invalidate and unmap the data payloads before waking the
+ * waiting application. This guarantees the memory regions
+ * are properly fenced from the server before the application
+ * accesses the data. It also ensures proper send flow control:
+ * waking the next RPC waits until this RPC has relinquished
+ * all its Send Queue entries.
+ */
+ if (!list_empty(&mws))
+ r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
+ /* Perform XID lookup, reconstruction of the RPC reply, and
+ * RPC completion while holding the transport lock to ensure
+ * the rep, rqst, and rq_task pointers remain stable.
+ */
+ spin_lock_bh(&xprt->transport_lock);
+ rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+ if (!rqst)
+ goto out_norqst;
+ xprt->reestablish_timeout = 0;
if (headerp->rm_vers != rpcrdma_version)
goto out_badversion;
@@ -1024,12 +1065,9 @@ rpcrdma_reply_handler(struct work_struct *work)
case rdma_msg:
/* never expect read chunks */
/* never expect reply chunks (two ways to check) */
- /* never expect write chunks without having offered RDMA */
if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
(headerp->rm_body.rm_chunks[1] == xdr_zero &&
- headerp->rm_body.rm_chunks[2] != xdr_zero) ||
- (headerp->rm_body.rm_chunks[1] != xdr_zero &&
- list_empty(&req->rl_registered)))
+ headerp->rm_body.rm_chunks[2] != xdr_zero))
goto badheader;
if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
/* count any expected write chunks in read reply */
@@ -1066,8 +1104,7 @@ rpcrdma_reply_handler(struct work_struct *work)
/* never expect read or write chunks, always reply chunks */
if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
headerp->rm_body.rm_chunks[1] != xdr_zero ||
- headerp->rm_body.rm_chunks[2] != xdr_one ||
- list_empty(&req->rl_registered))
+ headerp->rm_body.rm_chunks[2] != xdr_one)
goto badheader;
iptr = (__be32 *)((unsigned char *)headerp +
RPCRDMA_HDRLEN_MIN);
@@ -1093,17 +1130,6 @@ badheader:
}
out:
- /* Invalidate and flush the data payloads before waking the
- * waiting application. This guarantees the memory region is
- * properly fenced from the server before the application
- * accesses the data. It also ensures proper send flow
- * control: waking the next RPC waits until this RPC has
- * relinquished all its Send Queue entries.
- */
- if (!list_empty(&req->rl_registered))
- r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
-
- spin_lock_bh(&xprt->transport_lock);
cwnd = xprt->cwnd;
xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
@@ -1112,7 +1138,7 @@ out:
xprt_complete_rqst(rqst->rq_task, status);
spin_unlock_bh(&xprt->transport_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
- __func__, xprt, rqst, status);
+ __func__, xprt, rqst, status);
return;
out_badstatus:
@@ -1161,26 +1187,37 @@ out_rdmaerr:
r_xprt->rx_stats.bad_reply_count++;
goto out;
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
+/* The req was still available, but by the time the transport_lock
+ * was acquired, the rqst and task had been released. Thus the RPC
+ * has already been terminated.
*/
+out_norqst:
+ spin_unlock_bh(&xprt->transport_lock);
+ rpcrdma_buffer_put(req);
+ dprintk("RPC: %s: race, no rqst left for req %p\n",
+ __func__, req);
+ return;
+
out_shortreply:
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
out_nomatch:
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&buf->rb_lock);
dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
__func__, be32_to_cpu(headerp->rm_xid),
rep->rr_len);
goto repost;
out_duplicate:
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&buf->rb_lock);
dprintk("RPC: %s: "
"duplicate reply %p to RPC request %p: xid 0x%08x\n",
__func__, rep, req, be32_to_cpu(headerp->rm_xid));
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
repost:
r_xprt->rx_stats.bad_reply_count++;
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
deleted file mode 100644
index bdcf7d85a3dc..000000000000
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright (c) 2016 Oracle. All rights reserved.
- * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
- *
- * This software is available to you under a choice of one of two
- * licenses. You may choose to be licensed under the terms of the GNU
- * General Public License (GPL) Version 2, available from the file
- * COPYING in the main directory of this source tree, or the BSD-type
- * license below:
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- *
- * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *
- * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials provided
- * with the distribution.
- *
- * Neither the name of the Network Appliance, Inc. nor the names of
- * its contributors may be used to endorse or promote products
- * derived from this software without specific prior written
- * permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * Author: Tom Tucker <tom@opengridcomputing.com>
- */
-
-#include <linux/sunrpc/xdr.h>
-#include <linux/sunrpc/debug.h>
-#include <asm/unaligned.h>
-#include <linux/sunrpc/rpc_rdma.h>
-#include <linux/sunrpc/svc_rdma.h>
-
-#define RPCDBG_FACILITY RPCDBG_SVCXPRT
-
-static __be32 *xdr_check_read_list(__be32 *p, __be32 *end)
-{
- __be32 *next;
-
- while (*p++ != xdr_zero) {
- next = p + rpcrdma_readchunk_maxsz - 1;
- if (next > end)
- return NULL;
- p = next;
- }
- return p;
-}
-
-static __be32 *xdr_check_write_list(__be32 *p, __be32 *end)
-{
- __be32 *next;
-
- while (*p++ != xdr_zero) {
- next = p + 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
- if (next > end)
- return NULL;
- p = next;
- }
- return p;
-}
-
-static __be32 *xdr_check_reply_chunk(__be32 *p, __be32 *end)
-{
- __be32 *next;
-
- if (*p++ != xdr_zero) {
- next = p + 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
- if (next > end)
- return NULL;
- p = next;
- }
- return p;
-}
-
-/**
- * svc_rdma_xdr_decode_req - Parse incoming RPC-over-RDMA header
- * @rq_arg: Receive buffer
- *
- * On entry, xdr->head[0].iov_base points to first byte in the
- * RPC-over-RDMA header.
- *
- * On successful exit, head[0] points to first byte past the
- * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message.
- * The length of the RPC-over-RDMA header is returned.
- */
-int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
-{
- __be32 *p, *end, *rdma_argp;
- unsigned int hdr_len;
-
- /* Verify that there's enough bytes for header + something */
- if (rq_arg->len <= RPCRDMA_HDRLEN_ERR)
- goto out_short;
-
- rdma_argp = rq_arg->head[0].iov_base;
- if (*(rdma_argp + 1) != rpcrdma_version)
- goto out_version;
-
- switch (*(rdma_argp + 3)) {
- case rdma_msg:
- case rdma_nomsg:
- break;
-
- case rdma_done:
- goto out_drop;
-
- case rdma_error:
- goto out_drop;
-
- default:
- goto out_proc;
- }
-
- end = (__be32 *)((unsigned long)rdma_argp + rq_arg->len);
- p = xdr_check_read_list(rdma_argp + 4, end);
- if (!p)
- goto out_inval;
- p = xdr_check_write_list(p, end);
- if (!p)
- goto out_inval;
- p = xdr_check_reply_chunk(p, end);
- if (!p)
- goto out_inval;
- if (p > end)
- goto out_inval;
-
- rq_arg->head[0].iov_base = p;
- hdr_len = (unsigned long)p - (unsigned long)rdma_argp;
- rq_arg->head[0].iov_len -= hdr_len;
- return hdr_len;
-
-out_short:
- dprintk("svcrdma: header too short = %d\n", rq_arg->len);
- return -EINVAL;
-
-out_version:
- dprintk("svcrdma: bad xprt version: %u\n",
- be32_to_cpup(rdma_argp + 1));
- return -EPROTONOSUPPORT;
-
-out_drop:
- dprintk("svcrdma: dropping RDMA_DONE/ERROR message\n");
- return 0;
-
-out_proc:
- dprintk("svcrdma: bad rdma procedure (%u)\n",
- be32_to_cpup(rdma_argp + 3));
- return -EINVAL;
-
-out_inval:
- dprintk("svcrdma: failed to parse transport header\n");
- return -EINVAL;
-}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 27a99bf5b1a6..ad4bd62eebf1 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -1,4 +1,5 @@
/*
+ * Copyright (c) 2016, 2017 Oracle. All rights reserved.
* Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
* Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
*
@@ -40,12 +41,66 @@
* Author: Tom Tucker <tom@opengridcomputing.com>
*/
-#include <linux/sunrpc/debug.h>
-#include <linux/sunrpc/rpc_rdma.h>
-#include <linux/spinlock.h>
+/* Operation
+ *
+ * The main entry point is svc_rdma_recvfrom. This is called from
+ * svc_recv when the transport indicates there is incoming data to
+ * be read. "Data Ready" is signaled when an RDMA Receive completes,
+ * or when a set of RDMA Reads complete.
+ *
+ * An svc_rqst is passed in. This structure contains an array of
+ * free pages (rq_pages) that will contain the incoming RPC message.
+ *
+ * Short messages are moved directly into svc_rqst::rq_arg, and
+ * the RPC Call is ready to be processed by the Upper Layer.
+ * svc_rdma_recvfrom returns the length of the RPC Call message,
+ * completing the reception of the RPC Call.
+ *
+ * However, when an incoming message has Read chunks,
+ * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's
+ * data payload from the client. svc_rdma_recvfrom sets up the
+ * RDMA Reads using pages in svc_rqst::rq_pages, which are
+ * transferred to an svc_rdma_op_ctxt for the duration of the
+ * I/O. svc_rdma_recvfrom then returns zero, since the RPC message
+ * is still not yet ready.
+ *
+ * When the Read chunk payloads have become available on the
+ * server, "Data Ready" is raised again, and svc_recv calls
+ * svc_rdma_recvfrom again. This second call may use a different
+ * svc_rqst than the first one, thus any information that needs
+ * to be preserved across these two calls is kept in an
+ * svc_rdma_op_ctxt.
+ *
+ * The second call to svc_rdma_recvfrom performs final assembly
+ * of the RPC Call message, using the RDMA Read sink pages kept in
+ * the svc_rdma_op_ctxt. The xdr_buf is copied from the
+ * svc_rdma_op_ctxt to the second svc_rqst. The second call returns
+ * the length of the completed RPC Call message.
+ *
+ * Page Management
+ *
+ * Pages under I/O must be transferred from the first svc_rqst to an
+ * svc_rdma_op_ctxt before the first svc_rdma_recvfrom call returns.
+ *
+ * The first svc_rqst supplies pages for RDMA Reads. These are moved
+ * from rqstp::rq_pages into ctxt::pages. The consumed elements of
+ * the rq_pages array are set to NULL and refilled with the first
+ * svc_rdma_recvfrom call returns.
+ *
+ * During the second svc_rdma_recvfrom call, RDMA Read sink pages
+ * are transferred from the svc_rdma_op_ctxt to the second svc_rqst
+ * (see rdma_read_complete() below).
+ */
+
#include <asm/unaligned.h>
#include <rdma/ib_verbs.h>
#include <rdma/rdma_cm.h>
+
+#include <linux/spinlock.h>
+
+#include <linux/sunrpc/xdr.h>
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
#include <linux/sunrpc/svc_rdma.h>
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
@@ -59,7 +114,6 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
struct svc_rdma_op_ctxt *ctxt,
u32 byte_count)
{
- struct rpcrdma_msg *rmsgp;
struct page *page;
u32 bc;
int sge_no;
@@ -83,20 +137,12 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
rqstp->rq_arg.page_len = bc;
rqstp->rq_arg.page_base = 0;
- /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
- rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
- if (rmsgp->rm_type == rdma_nomsg)
- rqstp->rq_arg.pages = &rqstp->rq_pages[0];
- else
- rqstp->rq_arg.pages = &rqstp->rq_pages[1];
-
sge_no = 1;
while (bc && sge_no < ctxt->count) {
page = ctxt->pages[sge_no];
put_page(rqstp->rq_pages[sge_no]);
rqstp->rq_pages[sge_no] = page;
bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
- rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
sge_no++;
}
rqstp->rq_respages = &rqstp->rq_pages[sge_no];
@@ -115,406 +161,208 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
rqstp->rq_arg.tail[0].iov_len = 0;
}
-/* Issue an RDMA_READ using the local lkey to map the data sink */
-int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
- struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *head,
- int *page_no,
- u32 *page_offset,
- u32 rs_handle,
- u32 rs_length,
- u64 rs_offset,
- bool last)
-{
- struct ib_rdma_wr read_wr;
- int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
- struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
- int ret, read, pno;
- u32 pg_off = *page_offset;
- u32 pg_no = *page_no;
-
- ctxt->direction = DMA_FROM_DEVICE;
- ctxt->read_hdr = head;
- pages_needed = min_t(int, pages_needed, xprt->sc_max_sge_rd);
- read = min_t(int, (pages_needed << PAGE_SHIFT) - *page_offset,
- rs_length);
-
- for (pno = 0; pno < pages_needed; pno++) {
- int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
-
- head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
- head->arg.page_len += len;
-
- head->arg.len += len;
- if (!pg_off)
- head->count++;
- rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
- ctxt->sge[pno].addr =
- ib_dma_map_page(xprt->sc_cm_id->device,
- head->arg.pages[pg_no], pg_off,
- PAGE_SIZE - pg_off,
- DMA_FROM_DEVICE);
- ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
- ctxt->sge[pno].addr);
- if (ret)
- goto err;
- svc_rdma_count_mappings(xprt, ctxt);
-
- ctxt->sge[pno].lkey = xprt->sc_pd->local_dma_lkey;
- ctxt->sge[pno].length = len;
- ctxt->count++;
-
- /* adjust offset and wrap to next page if needed */
- pg_off += len;
- if (pg_off == PAGE_SIZE) {
- pg_off = 0;
- pg_no++;
- }
- rs_length -= len;
- }
-
- if (last && rs_length == 0)
- set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
- else
- clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
-
- memset(&read_wr, 0, sizeof(read_wr));
- ctxt->cqe.done = svc_rdma_wc_read;
- read_wr.wr.wr_cqe = &ctxt->cqe;
- read_wr.wr.opcode = IB_WR_RDMA_READ;
- read_wr.wr.send_flags = IB_SEND_SIGNALED;
- read_wr.rkey = rs_handle;
- read_wr.remote_addr = rs_offset;
- read_wr.wr.sg_list = ctxt->sge;
- read_wr.wr.num_sge = pages_needed;
-
- ret = svc_rdma_send(xprt, &read_wr.wr);
- if (ret) {
- pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- goto err;
- }
+/* This accommodates the largest possible Write chunk,
+ * in one segment.
+ */
+#define MAX_BYTES_WRITE_SEG ((u32)(RPCSVC_MAXPAGES << PAGE_SHIFT))
- /* return current location in page array */
- *page_no = pg_no;
- *page_offset = pg_off;
- ret = read;
- atomic_inc(&rdma_stat_read);
- return ret;
- err:
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 0);
- return ret;
-}
+/* This accommodates the largest possible Position-Zero
+ * Read chunk or Reply chunk, in one segment.
+ */
+#define MAX_BYTES_SPECIAL_SEG ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT))
-/* Issue an RDMA_READ using an FRMR to map the data sink */
-int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
- struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *head,
- int *page_no,
- u32 *page_offset,
- u32 rs_handle,
- u32 rs_length,
- u64 rs_offset,
- bool last)
+/* Sanity check the Read list.
+ *
+ * Implementation limits:
+ * - This implementation supports only one Read chunk.
+ *
+ * Sanity checks:
+ * - Read list does not overflow buffer.
+ * - Segment size limited by largest NFS data payload.
+ *
+ * The segment count is limited to how many segments can
+ * fit in the transport header without overflowing the
+ * buffer. That's about 40 Read segments for a 1KB inline
+ * threshold.
+ *
+ * Returns pointer to the following Write list.
+ */
+static __be32 *xdr_check_read_list(__be32 *p, const __be32 *end)
{
- struct ib_rdma_wr read_wr;
- struct ib_send_wr inv_wr;
- struct ib_reg_wr reg_wr;
- u8 key;
- int nents = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
- struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
- struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
- int ret, read, pno, dma_nents, n;
- u32 pg_off = *page_offset;
- u32 pg_no = *page_no;
-
- if (IS_ERR(frmr))
- return -ENOMEM;
-
- ctxt->direction = DMA_FROM_DEVICE;
- ctxt->frmr = frmr;
- nents = min_t(unsigned int, nents, xprt->sc_frmr_pg_list_len);
- read = min_t(int, (nents << PAGE_SHIFT) - *page_offset, rs_length);
-
- frmr->direction = DMA_FROM_DEVICE;
- frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
- frmr->sg_nents = nents;
-
- for (pno = 0; pno < nents; pno++) {
- int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
-
- head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
- head->arg.page_len += len;
- head->arg.len += len;
- if (!pg_off)
- head->count++;
-
- sg_set_page(&frmr->sg[pno], rqstp->rq_arg.pages[pg_no],
- len, pg_off);
-
- rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
-
- /* adjust offset and wrap to next page if needed */
- pg_off += len;
- if (pg_off == PAGE_SIZE) {
- pg_off = 0;
- pg_no++;
+ u32 position;
+ bool first;
+
+ first = true;
+ while (*p++ != xdr_zero) {
+ if (first) {
+ position = be32_to_cpup(p++);
+ first = false;
+ } else if (be32_to_cpup(p++) != position) {
+ return NULL;
}
- rs_length -= len;
- }
+ p++; /* handle */
+ if (be32_to_cpup(p++) > MAX_BYTES_SPECIAL_SEG)
+ return NULL;
+ p += 2; /* offset */
- if (last && rs_length == 0)
- set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
- else
- clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
-
- dma_nents = ib_dma_map_sg(xprt->sc_cm_id->device,
- frmr->sg, frmr->sg_nents,
- frmr->direction);
- if (!dma_nents) {
- pr_err("svcrdma: failed to dma map sg %p\n",
- frmr->sg);
- return -ENOMEM;
+ if (p > end)
+ return NULL;
}
+ return p;
+}
- n = ib_map_mr_sg(frmr->mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE);
- if (unlikely(n != frmr->sg_nents)) {
- pr_err("svcrdma: failed to map mr %p (%d/%d elements)\n",
- frmr->mr, n, frmr->sg_nents);
- return n < 0 ? n : -EINVAL;
- }
+/* The segment count is limited to how many segments can
+ * fit in the transport header without overflowing the
+ * buffer. That's about 60 Write segments for a 1KB inline
+ * threshold.
+ */
+static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end,
+ u32 maxlen)
+{
+ u32 i, segcount;
- /* Bump the key */
- key = (u8)(frmr->mr->lkey & 0x000000FF);
- ib_update_fast_reg_key(frmr->mr, ++key);
-
- ctxt->sge[0].addr = frmr->mr->iova;
- ctxt->sge[0].lkey = frmr->mr->lkey;
- ctxt->sge[0].length = frmr->mr->length;
- ctxt->count = 1;
- ctxt->read_hdr = head;
-
- /* Prepare REG WR */
- ctxt->reg_cqe.done = svc_rdma_wc_reg;
- reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
- reg_wr.wr.opcode = IB_WR_REG_MR;
- reg_wr.wr.send_flags = IB_SEND_SIGNALED;
- reg_wr.wr.num_sge = 0;
- reg_wr.mr = frmr->mr;
- reg_wr.key = frmr->mr->lkey;
- reg_wr.access = frmr->access_flags;
- reg_wr.wr.next = &read_wr.wr;
-
- /* Prepare RDMA_READ */
- memset(&read_wr, 0, sizeof(read_wr));
- ctxt->cqe.done = svc_rdma_wc_read;
- read_wr.wr.wr_cqe = &ctxt->cqe;
- read_wr.wr.send_flags = IB_SEND_SIGNALED;
- read_wr.rkey = rs_handle;
- read_wr.remote_addr = rs_offset;
- read_wr.wr.sg_list = ctxt->sge;
- read_wr.wr.num_sge = 1;
- if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
- read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
- read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
- } else {
- read_wr.wr.opcode = IB_WR_RDMA_READ;
- read_wr.wr.next = &inv_wr;
- /* Prepare invalidate */
- memset(&inv_wr, 0, sizeof(inv_wr));
- ctxt->inv_cqe.done = svc_rdma_wc_inv;
- inv_wr.wr_cqe = &ctxt->inv_cqe;
- inv_wr.opcode = IB_WR_LOCAL_INV;
- inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
- inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
- }
+ segcount = be32_to_cpup(p++);
+ for (i = 0; i < segcount; i++) {
+ p++; /* handle */
+ if (be32_to_cpup(p++) > maxlen)
+ return NULL;
+ p += 2; /* offset */
- /* Post the chain */
- ret = svc_rdma_send(xprt, &reg_wr.wr);
- if (ret) {
- pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- goto err;
+ if (p > end)
+ return NULL;
}
- /* return current location in page array */
- *page_no = pg_no;
- *page_offset = pg_off;
- ret = read;
- atomic_inc(&rdma_stat_read);
- return ret;
- err:
- svc_rdma_put_context(ctxt, 0);
- svc_rdma_put_frmr(xprt, frmr);
- return ret;
-}
-
-static unsigned int
-rdma_rcl_chunk_count(struct rpcrdma_read_chunk *ch)
-{
- unsigned int count;
-
- for (count = 0; ch->rc_discrim != xdr_zero; ch++)
- count++;
- return count;
+ return p;
}
-/* If there was additional inline content, append it to the end of arg.pages.
- * Tail copy has to be done after the reader function has determined how many
- * pages are needed for RDMA READ.
+/* Sanity check the Write list.
+ *
+ * Implementation limits:
+ * - This implementation supports only one Write chunk.
+ *
+ * Sanity checks:
+ * - Write list does not overflow buffer.
+ * - Segment size limited by largest NFS data payload.
+ *
+ * Returns pointer to the following Reply chunk.
*/
-static int
-rdma_copy_tail(struct svc_rqst *rqstp, struct svc_rdma_op_ctxt *head,
- u32 position, u32 byte_count, u32 page_offset, int page_no)
+static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end)
{
- char *srcp, *destp;
-
- srcp = head->arg.head[0].iov_base + position;
- byte_count = head->arg.head[0].iov_len - position;
- if (byte_count > PAGE_SIZE) {
- dprintk("svcrdma: large tail unsupported\n");
- return 0;
- }
-
- /* Fit as much of the tail on the current page as possible */
- if (page_offset != PAGE_SIZE) {
- destp = page_address(rqstp->rq_arg.pages[page_no]);
- destp += page_offset;
- while (byte_count--) {
- *destp++ = *srcp++;
- page_offset++;
- if (page_offset == PAGE_SIZE && byte_count)
- goto more;
- }
- goto done;
+ u32 chcount;
+
+ chcount = 0;
+ while (*p++ != xdr_zero) {
+ p = xdr_check_write_chunk(p, end, MAX_BYTES_WRITE_SEG);
+ if (!p)
+ return NULL;
+ if (chcount++ > 1)
+ return NULL;
}
-
-more:
- /* Fit the rest on the next page */
- page_no++;
- destp = page_address(rqstp->rq_arg.pages[page_no]);
- while (byte_count--)
- *destp++ = *srcp++;
-
- rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
-
-done:
- byte_count = head->arg.head[0].iov_len - position;
- head->arg.page_len += byte_count;
- head->arg.len += byte_count;
- head->arg.buflen += byte_count;
- return 1;
+ return p;
}
-/* Returns the address of the first read chunk or <nul> if no read chunk
- * is present
+/* Sanity check the Reply chunk.
+ *
+ * Sanity checks:
+ * - Reply chunk does not overflow buffer.
+ * - Segment size limited by largest NFS data payload.
+ *
+ * Returns pointer to the following RPC header.
*/
-static struct rpcrdma_read_chunk *
-svc_rdma_get_read_chunk(struct rpcrdma_msg *rmsgp)
+static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end)
{
- struct rpcrdma_read_chunk *ch =
- (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
-
- if (ch->rc_discrim == xdr_zero)
- return NULL;
- return ch;
+ if (*p++ != xdr_zero) {
+ p = xdr_check_write_chunk(p, end, MAX_BYTES_SPECIAL_SEG);
+ if (!p)
+ return NULL;
+ }
+ return p;
}
-static int rdma_read_chunks(struct svcxprt_rdma *xprt,
- struct rpcrdma_msg *rmsgp,
- struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *head)
+/* On entry, xdr->head[0].iov_base points to first byte in the
+ * RPC-over-RDMA header.
+ *
+ * On successful exit, head[0] points to first byte past the
+ * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message.
+ * The length of the RPC-over-RDMA header is returned.
+ *
+ * Assumptions:
+ * - The transport header is entirely contained in the head iovec.
+ */
+static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
{
- int page_no, ret;
- struct rpcrdma_read_chunk *ch;
- u32 handle, page_offset, byte_count;
- u32 position;
- u64 rs_offset;
- bool last;
-
- /* If no read list is present, return 0 */
- ch = svc_rdma_get_read_chunk(rmsgp);
- if (!ch)
- return 0;
+ __be32 *p, *end, *rdma_argp;
+ unsigned int hdr_len;
+ char *proc;
+
+ /* Verify that there's enough bytes for header + something */
+ if (rq_arg->len <= RPCRDMA_HDRLEN_ERR)
+ goto out_short;
+
+ rdma_argp = rq_arg->head[0].iov_base;
+ if (*(rdma_argp + 1) != rpcrdma_version)
+ goto out_version;
+
+ switch (*(rdma_argp + 3)) {
+ case rdma_msg:
+ proc = "RDMA_MSG";
+ break;
+ case rdma_nomsg:
+ proc = "RDMA_NOMSG";
+ break;
+
+ case rdma_done:
+ goto out_drop;
- if (rdma_rcl_chunk_count(ch) > RPCSVC_MAXPAGES)
- return -EINVAL;
-
- /* The request is completed when the RDMA_READs complete. The
- * head context keeps all the pages that comprise the
- * request.
- */
- head->arg.head[0] = rqstp->rq_arg.head[0];
- head->arg.tail[0] = rqstp->rq_arg.tail[0];
- head->hdr_count = head->count;
- head->arg.page_base = 0;
- head->arg.page_len = 0;
- head->arg.len = rqstp->rq_arg.len;
- head->arg.buflen = rqstp->rq_arg.buflen;
-
- /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
- position = be32_to_cpu(ch->rc_position);
- if (position == 0) {
- head->arg.pages = &head->pages[0];
- page_offset = head->byte_len;
- } else {
- head->arg.pages = &head->pages[head->count];
- page_offset = 0;
- }
+ case rdma_error:
+ goto out_drop;
- ret = 0;
- page_no = 0;
- for (; ch->rc_discrim != xdr_zero; ch++) {
- if (be32_to_cpu(ch->rc_position) != position)
- goto err;
-
- handle = be32_to_cpu(ch->rc_target.rs_handle),
- byte_count = be32_to_cpu(ch->rc_target.rs_length);
- xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
- &rs_offset);
-
- while (byte_count > 0) {
- last = (ch + 1)->rc_discrim == xdr_zero;
- ret = xprt->sc_reader(xprt, rqstp, head,
- &page_no, &page_offset,
- handle, byte_count,
- rs_offset, last);
- if (ret < 0)
- goto err;
- byte_count -= ret;
- rs_offset += ret;
- head->arg.buflen += ret;
- }
+ default:
+ goto out_proc;
}
- /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */
- if (page_offset & 3) {
- u32 pad = 4 - (page_offset & 3);
-
- head->arg.tail[0].iov_len += pad;
- head->arg.len += pad;
- head->arg.buflen += pad;
- page_offset += pad;
- }
+ end = (__be32 *)((unsigned long)rdma_argp + rq_arg->len);
+ p = xdr_check_read_list(rdma_argp + 4, end);
+ if (!p)
+ goto out_inval;
+ p = xdr_check_write_list(p, end);
+ if (!p)
+ goto out_inval;
+ p = xdr_check_reply_chunk(p, end);
+ if (!p)
+ goto out_inval;
+ if (p > end)
+ goto out_inval;
+
+ rq_arg->head[0].iov_base = p;
+ hdr_len = (unsigned long)p - (unsigned long)rdma_argp;
+ rq_arg->head[0].iov_len -= hdr_len;
+ rq_arg->len -= hdr_len;
+ dprintk("svcrdma: received %s request for XID 0x%08x, hdr_len=%u\n",
+ proc, be32_to_cpup(rdma_argp), hdr_len);
+ return hdr_len;
+
+out_short:
+ dprintk("svcrdma: header too short = %d\n", rq_arg->len);
+ return -EINVAL;
+
+out_version:
+ dprintk("svcrdma: bad xprt version: %u\n",
+ be32_to_cpup(rdma_argp + 1));
+ return -EPROTONOSUPPORT;
- ret = 1;
- if (position && position < head->arg.head[0].iov_len)
- ret = rdma_copy_tail(rqstp, head, position,
- byte_count, page_offset, page_no);
- head->arg.head[0].iov_len = position;
- head->position = position;
+out_drop:
+ dprintk("svcrdma: dropping RDMA_DONE/ERROR message\n");
+ return 0;
- err:
- /* Detach arg pages. svc_recv will replenish them */
- for (page_no = 0;
- &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
- rqstp->rq_pages[page_no] = NULL;
+out_proc:
+ dprintk("svcrdma: bad rdma procedure (%u)\n",
+ be32_to_cpup(rdma_argp + 3));
+ return -EINVAL;
- return ret;
+out_inval:
+ dprintk("svcrdma: failed to parse transport header\n");
+ return -EINVAL;
}
static void rdma_read_complete(struct svc_rqst *rqstp,
@@ -528,24 +376,9 @@ static void rdma_read_complete(struct svc_rqst *rqstp,
rqstp->rq_pages[page_no] = head->pages[page_no];
}
- /* Adjustments made for RDMA_NOMSG type requests */
- if (head->position == 0) {
- if (head->arg.len <= head->sge[0].length) {
- head->arg.head[0].iov_len = head->arg.len -
- head->byte_len;
- head->arg.page_len = 0;
- } else {
- head->arg.head[0].iov_len = head->sge[0].length -
- head->byte_len;
- head->arg.page_len = head->arg.len -
- head->sge[0].length;
- }
- }
-
/* Point rq_arg.pages past header */
rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
rqstp->rq_arg.page_len = head->arg.page_len;
- rqstp->rq_arg.page_base = head->arg.page_base;
/* rq_respages starts after the last arg page */
rqstp->rq_respages = &rqstp->rq_pages[page_no];
@@ -642,21 +475,44 @@ static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt,
return true;
}
-/*
- * Set up the rqstp thread context to point to the RQ buffer. If
- * necessary, pull additional data from the client with an RDMA_READ
- * request.
+/**
+ * svc_rdma_recvfrom - Receive an RPC call
+ * @rqstp: request structure into which to receive an RPC Call
+ *
+ * Returns:
+ * The positive number of bytes in the RPC Call message,
+ * %0 if there were no Calls ready to return,
+ * %-EINVAL if the Read chunk data is too large,
+ * %-ENOMEM if rdma_rw context pool was exhausted,
+ * %-ENOTCONN if posting failed (connection is lost),
+ * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ *
+ * Called in a loop when XPT_DATA is set. XPT_DATA is cleared only
+ * when there are no remaining ctxt's to process.
+ *
+ * The next ctxt is removed from the "receive" lists.
+ *
+ * - If the ctxt completes a Read, then finish assembling the Call
+ * message and return the number of bytes in the message.
+ *
+ * - If the ctxt completes a Receive, then construct the Call
+ * message from the contents of the Receive buffer.
+ *
+ * - If there are no Read chunks in this message, then finish
+ * assembling the Call message and return the number of bytes
+ * in the message.
+ *
+ * - If there are Read chunks in this message, post Read WRs to
+ * pull that payload and return 0.
*/
int svc_rdma_recvfrom(struct svc_rqst *rqstp)
{
struct svc_xprt *xprt = rqstp->rq_xprt;
struct svcxprt_rdma *rdma_xprt =
container_of(xprt, struct svcxprt_rdma, sc_xprt);
- struct svc_rdma_op_ctxt *ctxt = NULL;
- struct rpcrdma_msg *rmsgp;
- int ret = 0;
-
- dprintk("svcrdma: rqstp=%p\n", rqstp);
+ struct svc_rdma_op_ctxt *ctxt;
+ __be32 *p;
+ int ret;
spin_lock(&rdma_xprt->sc_rq_dto_lock);
if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
@@ -671,22 +527,14 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
struct svc_rdma_op_ctxt, list);
list_del(&ctxt->list);
} else {
- atomic_inc(&rdma_stat_rq_starve);
+ /* No new incoming requests, terminate the loop */
clear_bit(XPT_DATA, &xprt->xpt_flags);
- ctxt = NULL;
+ spin_unlock(&rdma_xprt->sc_rq_dto_lock);
+ return 0;
}
spin_unlock(&rdma_xprt->sc_rq_dto_lock);
- if (!ctxt) {
- /* This is the EAGAIN path. The svc_recv routine will
- * return -EAGAIN, the nfsd thread will go to call into
- * svc_recv again and we shouldn't be on the active
- * transport list
- */
- if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
- goto defer;
- goto out;
- }
- dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p\n",
+
+ dprintk("svcrdma: recvfrom: ctxt=%p on xprt=%p, rqstp=%p\n",
ctxt, rdma_xprt, rqstp);
atomic_inc(&rdma_stat_recv);
@@ -694,7 +542,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
/* Decode the RDMA header. */
- rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+ p = (__be32 *)rqstp->rq_arg.head[0].iov_base;
ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg);
if (ret < 0)
goto out_err;
@@ -702,9 +550,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
goto out_drop;
rqstp->rq_xprt_hlen = ret;
- if (svc_rdma_is_backchannel_reply(xprt, &rmsgp->rm_xid)) {
- ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt,
- &rmsgp->rm_xid,
+ if (svc_rdma_is_backchannel_reply(xprt, p)) {
+ ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, p,
&rqstp->rq_arg);
svc_rdma_put_context(ctxt, 0);
if (ret)
@@ -712,39 +559,34 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
return ret;
}
- /* Read read-list data. */
- ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
- if (ret > 0) {
- /* read-list posted, defer until data received from client. */
- goto defer;
- } else if (ret < 0) {
- /* Post of read-list failed, free context. */
- svc_rdma_put_context(ctxt, 1);
- return 0;
- }
+ p += rpcrdma_fixed_maxsz;
+ if (*p != xdr_zero)
+ goto out_readchunk;
complete:
- ret = rqstp->rq_arg.head[0].iov_len
- + rqstp->rq_arg.page_len
- + rqstp->rq_arg.tail[0].iov_len;
svc_rdma_put_context(ctxt, 0);
- out:
- dprintk("svcrdma: ret=%d, rq_arg.len=%u, "
- "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zd\n",
- ret, rqstp->rq_arg.len,
- rqstp->rq_arg.head[0].iov_base,
- rqstp->rq_arg.head[0].iov_len);
+ dprintk("svcrdma: recvfrom: xprt=%p, rqstp=%p, rq_arg.len=%u\n",
+ rdma_xprt, rqstp, rqstp->rq_arg.len);
rqstp->rq_prot = IPPROTO_MAX;
svc_xprt_copy_addrs(rqstp, xprt);
- return ret;
+ return rqstp->rq_arg.len;
+
+out_readchunk:
+ ret = svc_rdma_recv_read_chunk(rdma_xprt, rqstp, ctxt, p);
+ if (ret < 0)
+ goto out_postfail;
+ return 0;
out_err:
- svc_rdma_send_error(rdma_xprt, &rmsgp->rm_xid, ret);
+ svc_rdma_send_error(rdma_xprt, p, ret);
svc_rdma_put_context(ctxt, 0);
return 0;
-defer:
- return 0;
+out_postfail:
+ if (ret == -EINVAL)
+ svc_rdma_send_error(rdma_xprt, p, ret);
+ svc_rdma_put_context(ctxt, 1);
+ return ret;
out_drop:
svc_rdma_put_context(ctxt, 1);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 0cf620277693..933f79bed270 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -12,6 +12,9 @@
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
+static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
+
/* Each R/W context contains state for one chain of RDMA Read or
* Write Work Requests.
*
@@ -113,22 +116,20 @@ struct svc_rdma_chunk_ctxt {
struct svcxprt_rdma *cc_rdma;
struct list_head cc_rwctxts;
int cc_sqecount;
- enum dma_data_direction cc_dir;
};
static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
- struct svc_rdma_chunk_ctxt *cc,
- enum dma_data_direction dir)
+ struct svc_rdma_chunk_ctxt *cc)
{
cc->cc_rdma = rdma;
svc_xprt_get(&rdma->sc_xprt);
INIT_LIST_HEAD(&cc->cc_rwctxts);
cc->cc_sqecount = 0;
- cc->cc_dir = dir;
}
-static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc)
+static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
+ enum dma_data_direction dir)
{
struct svcxprt_rdma *rdma = cc->cc_rdma;
struct svc_rdma_rw_ctxt *ctxt;
@@ -138,7 +139,7 @@ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc)
rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
rdma->sc_port_num, ctxt->rw_sg_table.sgl,
- ctxt->rw_nents, cc->cc_dir);
+ ctxt->rw_nents, dir);
svc_rdma_put_rw_ctxt(rdma, ctxt);
}
svc_xprt_put(&rdma->sc_xprt);
@@ -176,13 +177,14 @@ svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
info->wi_seg_no = 0;
info->wi_nsegs = be32_to_cpup(++chunk);
info->wi_segs = ++chunk;
- svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE);
+ svc_rdma_cc_init(rdma, &info->wi_cc);
+ info->wi_cc.cc_cqe.done = svc_rdma_write_done;
return info;
}
static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
{
- svc_rdma_cc_release(&info->wi_cc);
+ svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE);
kfree(info);
}
@@ -216,6 +218,76 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
svc_rdma_write_info_free(info);
}
+/* State for pulling a Read chunk.
+ */
+struct svc_rdma_read_info {
+ struct svc_rdma_op_ctxt *ri_readctxt;
+ unsigned int ri_position;
+ unsigned int ri_pageno;
+ unsigned int ri_pageoff;
+ unsigned int ri_chunklen;
+
+ struct svc_rdma_chunk_ctxt ri_cc;
+};
+
+static struct svc_rdma_read_info *
+svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma)
+{
+ struct svc_rdma_read_info *info;
+
+ info = kmalloc(sizeof(*info), GFP_KERNEL);
+ if (!info)
+ return info;
+
+ svc_rdma_cc_init(rdma, &info->ri_cc);
+ info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
+ return info;
+}
+
+static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
+{
+ svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE);
+ kfree(info);
+}
+
+/**
+ * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
+ * @cq: controlling Completion Queue
+ * @wc: Work Completion
+ *
+ */
+static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_chunk_ctxt *cc =
+ container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
+ struct svcxprt_rdma *rdma = cc->cc_rdma;
+ struct svc_rdma_read_info *info =
+ container_of(cc, struct svc_rdma_read_info, ri_cc);
+
+ atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
+ wake_up(&rdma->sc_send_wait);
+
+ if (unlikely(wc->status != IB_WC_SUCCESS)) {
+ set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
+ svc_rdma_put_context(info->ri_readctxt, 1);
+ } else {
+ spin_lock(&rdma->sc_rq_dto_lock);
+ list_add_tail(&info->ri_readctxt->list,
+ &rdma->sc_read_complete_q);
+ spin_unlock(&rdma->sc_rq_dto_lock);
+
+ set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+ svc_xprt_enqueue(&rdma->sc_xprt);
+ }
+
+ svc_rdma_read_info_free(info);
+}
+
/* This function sleeps when the transport's Send Queue is congested.
*
* Assumptions:
@@ -232,6 +304,9 @@ static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
struct ib_cqe *cqe;
int ret;
+ if (cc->cc_sqecount > rdma->sc_sq_depth)
+ return -EINVAL;
+
first_wr = NULL;
cqe = &cc->cc_cqe;
list_for_each(tmp, &cc->cc_rwctxts) {
@@ -295,8 +370,9 @@ static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
struct scatterlist *sg;
struct page **page;
- page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK;
- page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT;
+ page_off = info->wi_next_off + xdr->page_base;
+ page_no = page_off >> PAGE_SHIFT;
+ page_off = offset_in_page(page_off);
page = xdr->pages + page_no;
info->wi_next_off += remaining;
sg = ctxt->rw_sg_table.sgl;
@@ -332,7 +408,6 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
__be32 *seg;
int ret;
- cc->cc_cqe.done = svc_rdma_write_done;
seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
do {
unsigned int write_len;
@@ -425,6 +500,7 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
*
* Returns a non-negative number of bytes the chunk consumed, or
* %-E2BIG if the payload was larger than the Write chunk,
+ * %-EINVAL if client provided too many segments,
* %-ENOMEM if rdma_rw context pool was exhausted,
* %-ENOTCONN if posting failed (connection is lost),
* %-EIO if rdma_rw initialization failed (DMA mapping, etc).
@@ -465,6 +541,7 @@ out_err:
*
* Returns a non-negative number of bytes the chunk consumed, or
* %-E2BIG if the payload was larger than the Reply chunk,
+ * %-EINVAL if client provided too many segments,
* %-ENOMEM if rdma_rw context pool was exhausted,
* %-ENOTCONN if posting failed (connection is lost),
* %-EIO if rdma_rw initialization failed (DMA mapping, etc).
@@ -510,3 +587,353 @@ out_err:
svc_rdma_write_info_free(info);
return ret;
}
+
+static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
+ struct svc_rqst *rqstp,
+ u32 rkey, u32 len, u64 offset)
+{
+ struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+ struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
+ struct svc_rdma_rw_ctxt *ctxt;
+ unsigned int sge_no, seg_len;
+ struct scatterlist *sg;
+ int ret;
+
+ sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
+ ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
+ if (!ctxt)
+ goto out_noctx;
+ ctxt->rw_nents = sge_no;
+
+ dprintk("svcrdma: reading segment %u@0x%016llx:0x%08x (%u sges)\n",
+ len, offset, rkey, sge_no);
+
+ sg = ctxt->rw_sg_table.sgl;
+ for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
+ seg_len = min_t(unsigned int, len,
+ PAGE_SIZE - info->ri_pageoff);
+
+ head->arg.pages[info->ri_pageno] =
+ rqstp->rq_pages[info->ri_pageno];
+ if (!info->ri_pageoff)
+ head->count++;
+
+ sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
+ seg_len, info->ri_pageoff);
+ sg = sg_next(sg);
+
+ info->ri_pageoff += seg_len;
+ if (info->ri_pageoff == PAGE_SIZE) {
+ info->ri_pageno++;
+ info->ri_pageoff = 0;
+ }
+ len -= seg_len;
+
+ /* Safety check */
+ if (len &&
+ &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end)
+ goto out_overrun;
+ }
+
+ ret = rdma_rw_ctx_init(&ctxt->rw_ctx, cc->cc_rdma->sc_qp,
+ cc->cc_rdma->sc_port_num,
+ ctxt->rw_sg_table.sgl, ctxt->rw_nents,
+ 0, offset, rkey, DMA_FROM_DEVICE);
+ if (ret < 0)
+ goto out_initerr;
+
+ list_add(&ctxt->rw_list, &cc->cc_rwctxts);
+ cc->cc_sqecount += ret;
+ return 0;
+
+out_noctx:
+ dprintk("svcrdma: no R/W ctxs available\n");
+ return -ENOMEM;
+
+out_overrun:
+ dprintk("svcrdma: request overruns rq_pages\n");
+ return -EINVAL;
+
+out_initerr:
+ svc_rdma_put_rw_ctxt(cc->cc_rdma, ctxt);
+ pr_err("svcrdma: failed to map pagelist (%d)\n", ret);
+ return -EIO;
+}
+
+static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
+ struct svc_rdma_read_info *info,
+ __be32 *p)
+{
+ int ret;
+
+ info->ri_chunklen = 0;
+ while (*p++ != xdr_zero) {
+ u32 rs_handle, rs_length;
+ u64 rs_offset;
+
+ if (be32_to_cpup(p++) != info->ri_position)
+ break;
+ rs_handle = be32_to_cpup(p++);
+ rs_length = be32_to_cpup(p++);
+ p = xdr_decode_hyper(p, &rs_offset);
+
+ ret = svc_rdma_build_read_segment(info, rqstp,
+ rs_handle, rs_length,
+ rs_offset);
+ if (ret < 0)
+ break;
+
+ info->ri_chunklen += rs_length;
+ }
+
+ return ret;
+}
+
+/* If there is inline content following the Read chunk, append it to
+ * the page list immediately following the data payload. This has to
+ * be done after the reader function has determined how many pages
+ * were consumed for RDMA Read.
+ *
+ * On entry, ri_pageno and ri_pageoff point directly to the end of the
+ * page list. On exit, both have been updated to the new "next byte".
+ *
+ * Assumptions:
+ * - Inline content fits entirely in rq_pages[0]
+ * - Trailing content is only a handful of bytes
+ */
+static int svc_rdma_copy_tail(struct svc_rqst *rqstp,
+ struct svc_rdma_read_info *info)
+{
+ struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+ unsigned int tail_length, remaining;
+ u8 *srcp, *destp;
+
+ /* Assert that all inline content fits in page 0. This is an
+ * implementation limit, not a protocol limit.
+ */
+ if (head->arg.head[0].iov_len > PAGE_SIZE) {
+ pr_warn_once("svcrdma: too much trailing inline content\n");
+ return -EINVAL;
+ }
+
+ srcp = head->arg.head[0].iov_base;
+ srcp += info->ri_position;
+ tail_length = head->arg.head[0].iov_len - info->ri_position;
+ remaining = tail_length;
+
+ /* If there is room on the last page in the page list, try to
+ * fit the trailing content there.
+ */
+ if (info->ri_pageoff > 0) {
+ unsigned int len;
+
+ len = min_t(unsigned int, remaining,
+ PAGE_SIZE - info->ri_pageoff);
+ destp = page_address(rqstp->rq_pages[info->ri_pageno]);
+ destp += info->ri_pageoff;
+
+ memcpy(destp, srcp, len);
+ srcp += len;
+ destp += len;
+ info->ri_pageoff += len;
+ remaining -= len;
+
+ if (info->ri_pageoff == PAGE_SIZE) {
+ info->ri_pageno++;
+ info->ri_pageoff = 0;
+ }
+ }
+
+ /* Otherwise, a fresh page is needed. */
+ if (remaining) {
+ head->arg.pages[info->ri_pageno] =
+ rqstp->rq_pages[info->ri_pageno];
+ head->count++;
+
+ destp = page_address(rqstp->rq_pages[info->ri_pageno]);
+ memcpy(destp, srcp, remaining);
+ info->ri_pageoff += remaining;
+ }
+
+ head->arg.page_len += tail_length;
+ head->arg.len += tail_length;
+ head->arg.buflen += tail_length;
+ return 0;
+}
+
+/* Construct RDMA Reads to pull over a normal Read chunk. The chunk
+ * data lands in the page list of head->arg.pages.
+ *
+ * Currently NFSD does not look at the head->arg.tail[0] iovec.
+ * Therefore, XDR round-up of the Read chunk and trailing
+ * inline content must both be added at the end of the pagelist.
+ */
+static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
+ struct svc_rdma_read_info *info,
+ __be32 *p)
+{
+ struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+ int ret;
+
+ dprintk("svcrdma: Reading Read chunk at position %u\n",
+ info->ri_position);
+
+ info->ri_pageno = head->hdr_count;
+ info->ri_pageoff = 0;
+
+ ret = svc_rdma_build_read_chunk(rqstp, info, p);
+ if (ret < 0)
+ goto out;
+
+ /* Read chunk may need XDR round-up (see RFC 5666, s. 3.7).
+ */
+ if (info->ri_chunklen & 3) {
+ u32 padlen = 4 - (info->ri_chunklen & 3);
+
+ info->ri_chunklen += padlen;
+
+ /* NB: data payload always starts on XDR alignment,
+ * thus the pad can never contain a page boundary.
+ */
+ info->ri_pageoff += padlen;
+ if (info->ri_pageoff == PAGE_SIZE) {
+ info->ri_pageno++;
+ info->ri_pageoff = 0;
+ }
+ }
+
+ head->arg.page_len = info->ri_chunklen;
+ head->arg.len += info->ri_chunklen;
+ head->arg.buflen += info->ri_chunklen;
+
+ if (info->ri_position < head->arg.head[0].iov_len) {
+ ret = svc_rdma_copy_tail(rqstp, info);
+ if (ret < 0)
+ goto out;
+ }
+ head->arg.head[0].iov_len = info->ri_position;
+
+out:
+ return ret;
+}
+
+/* Construct RDMA Reads to pull over a Position Zero Read chunk.
+ * The start of the data lands in the first page just after
+ * the Transport header, and the rest lands in the page list of
+ * head->arg.pages.
+ *
+ * Assumptions:
+ * - A PZRC has an XDR-aligned length (no implicit round-up).
+ * - There can be no trailing inline content (IOW, we assume
+ * a PZRC is never sent in an RDMA_MSG message, though it's
+ * allowed by spec).
+ */
+static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
+ struct svc_rdma_read_info *info,
+ __be32 *p)
+{
+ struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+ int ret;
+
+ dprintk("svcrdma: Reading Position Zero Read chunk\n");
+
+ info->ri_pageno = head->hdr_count - 1;
+ info->ri_pageoff = offset_in_page(head->byte_len);
+
+ ret = svc_rdma_build_read_chunk(rqstp, info, p);
+ if (ret < 0)
+ goto out;
+
+ head->arg.len += info->ri_chunklen;
+ head->arg.buflen += info->ri_chunklen;
+
+ if (head->arg.buflen <= head->sge[0].length) {
+ /* Transport header and RPC message fit entirely
+ * in page where head iovec resides.
+ */
+ head->arg.head[0].iov_len = info->ri_chunklen;
+ } else {
+ /* Transport header and part of RPC message reside
+ * in the head iovec's page.
+ */
+ head->arg.head[0].iov_len =
+ head->sge[0].length - head->byte_len;
+ head->arg.page_len =
+ info->ri_chunklen - head->arg.head[0].iov_len;
+ }
+
+out:
+ return ret;
+}
+
+/**
+ * svc_rdma_recv_read_chunk - Pull a Read chunk from the client
+ * @rdma: controlling RDMA transport
+ * @rqstp: set of pages to use as Read sink buffers
+ * @head: pages under I/O collect here
+ * @p: pointer to start of Read chunk
+ *
+ * Returns:
+ * %0 if all needed RDMA Reads were posted successfully,
+ * %-EINVAL if client provided too many segments,
+ * %-ENOMEM if rdma_rw context pool was exhausted,
+ * %-ENOTCONN if posting failed (connection is lost),
+ * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ *
+ * Assumptions:
+ * - All Read segments in @p have the same Position value.
+ */
+int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head, __be32 *p)
+{
+ struct svc_rdma_read_info *info;
+ struct page **page;
+ int ret;
+
+ /* The request (with page list) is constructed in
+ * head->arg. Pages involved with RDMA Read I/O are
+ * transferred there.
+ */
+ head->hdr_count = head->count;
+ head->arg.head[0] = rqstp->rq_arg.head[0];
+ head->arg.tail[0] = rqstp->rq_arg.tail[0];
+ head->arg.pages = head->pages;
+ head->arg.page_base = 0;
+ head->arg.page_len = 0;
+ head->arg.len = rqstp->rq_arg.len;
+ head->arg.buflen = rqstp->rq_arg.buflen;
+
+ info = svc_rdma_read_info_alloc(rdma);
+ if (!info)
+ return -ENOMEM;
+ info->ri_readctxt = head;
+
+ info->ri_position = be32_to_cpup(p + 1);
+ if (info->ri_position)
+ ret = svc_rdma_build_normal_read_chunk(rqstp, info, p);
+ else
+ ret = svc_rdma_build_pz_read_chunk(rqstp, info, p);
+
+ /* Mark the start of the pages that can be used for the reply */
+ if (info->ri_pageoff > 0)
+ info->ri_pageno++;
+ rqstp->rq_respages = &rqstp->rq_pages[info->ri_pageno];
+ rqstp->rq_next_page = rqstp->rq_respages + 1;
+
+ if (ret < 0)
+ goto out;
+
+ ret = svc_rdma_post_chunk_ctxt(&info->ri_cc);
+
+out:
+ /* Read sink pages have been moved from rqstp->rq_pages to
+ * head->arg.pages. Force svc_recv to refill those slots
+ * in rq_pages.
+ */
+ for (page = rqstp->rq_pages; page < rqstp->rq_respages; page++)
+ *page = NULL;
+
+ if (ret < 0)
+ svc_rdma_read_info_free(info);
+ return ret;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 1736337f3a55..7c3a211e0e9a 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -313,13 +313,17 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
dma_addr = ib_dma_map_page(dev, virt_to_page(base),
offset, len, DMA_TO_DEVICE);
if (ib_dma_mapping_error(dev, dma_addr))
- return -EIO;
+ goto out_maperr;
ctxt->sge[sge_no].addr = dma_addr;
ctxt->sge[sge_no].length = len;
ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
svc_rdma_count_mappings(rdma, ctxt);
return 0;
+
+out_maperr:
+ pr_err("svcrdma: failed to map buffer\n");
+ return -EIO;
}
static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -334,13 +338,17 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
if (ib_dma_mapping_error(dev, dma_addr))
- return -EIO;
+ goto out_maperr;
ctxt->sge[sge_no].addr = dma_addr;
ctxt->sge[sge_no].length = len;
ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
svc_rdma_count_mappings(rdma, ctxt);
return 0;
+
+out_maperr:
+ pr_err("svcrdma: failed to map page\n");
+ return -EIO;
}
/**
@@ -547,7 +555,6 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
return 0;
err:
- pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
svc_rdma_unmap_dma(ctxt);
svc_rdma_put_context(ctxt, 1);
return ret;
@@ -677,7 +684,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
return 0;
err2:
- if (ret != -E2BIG)
+ if (ret != -E2BIG && ret != -EINVAL)
goto err1;
ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index a9d9cb1ba4c6..e660d4965b18 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -202,7 +202,6 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
out:
ctxt->count = 0;
ctxt->mapped_sges = 0;
- ctxt->frmr = NULL;
return ctxt;
out_empty:
@@ -226,22 +225,13 @@ void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
{
struct svcxprt_rdma *xprt = ctxt->xprt;
struct ib_device *device = xprt->sc_cm_id->device;
- u32 lkey = xprt->sc_pd->local_dma_lkey;
unsigned int i;
- for (i = 0; i < ctxt->mapped_sges; i++) {
- /*
- * Unmap the DMA addr in the SGE if the lkey matches
- * the local_dma_lkey, otherwise, ignore it since it is
- * an FRMR lkey and will be unmapped later when the
- * last WR that uses it completes.
- */
- if (ctxt->sge[i].lkey == lkey)
- ib_dma_unmap_page(device,
- ctxt->sge[i].addr,
- ctxt->sge[i].length,
- ctxt->direction);
- }
+ for (i = 0; i < ctxt->mapped_sges; i++)
+ ib_dma_unmap_page(device,
+ ctxt->sge[i].addr,
+ ctxt->sge[i].length,
+ ctxt->direction);
ctxt->mapped_sges = 0;
}
@@ -346,36 +336,6 @@ out:
svc_xprt_put(&xprt->sc_xprt);
}
-static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
- struct ib_wc *wc,
- const char *opname)
-{
- if (wc->status != IB_WC_SUCCESS)
- goto err;
-
-out:
- atomic_inc(&xprt->sc_sq_avail);
- wake_up(&xprt->sc_send_wait);
- return;
-
-err:
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("svcrdma: %s: %s (%u/0x%x)\n",
- opname, ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
- goto out;
-}
-
-static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
- const char *opname)
-{
- struct svcxprt_rdma *xprt = cq->cq_context;
-
- svc_rdma_send_wc_common(xprt, wc, opname);
- svc_xprt_put(&xprt->sc_xprt);
-}
-
/**
* svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
* @cq: completion queue
@@ -384,73 +344,28 @@ static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
*/
void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
{
- struct ib_cqe *cqe = wc->wr_cqe;
- struct svc_rdma_op_ctxt *ctxt;
-
- svc_rdma_send_wc_common_put(cq, wc, "send");
-
- ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
-}
-
-/**
- * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
- * @cq: completion queue
- * @wc: completed WR
- *
- */
-void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc)
-{
- svc_rdma_send_wc_common_put(cq, wc, "fastreg");
-}
-
-/**
- * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC
- * @cq: completion queue
- * @wc: completed WR
- *
- */
-void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc)
-{
struct svcxprt_rdma *xprt = cq->cq_context;
struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_op_ctxt *ctxt;
- svc_rdma_send_wc_common(xprt, wc, "read");
+ atomic_inc(&xprt->sc_sq_avail);
+ wake_up(&xprt->sc_send_wait);
ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_frmr(xprt, ctxt->frmr);
-
- if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
- struct svc_rdma_op_ctxt *read_hdr;
-
- read_hdr = ctxt->read_hdr;
- spin_lock(&xprt->sc_rq_dto_lock);
- list_add_tail(&read_hdr->list,
- &xprt->sc_read_complete_q);
- spin_unlock(&xprt->sc_rq_dto_lock);
+ svc_rdma_put_context(ctxt, 1);
- set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
- svc_xprt_enqueue(&xprt->sc_xprt);
+ if (unlikely(wc->status != IB_WC_SUCCESS)) {
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("svcrdma: Send: %s (%u/0x%x)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
}
- svc_rdma_put_context(ctxt, 0);
svc_xprt_put(&xprt->sc_xprt);
}
-/**
- * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC
- * @cq: completion queue
- * @wc: completed WR
- *
- */
-void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc)
-{
- svc_rdma_send_wc_common_put(cq, wc, "localInv");
-}
-
static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
int listener)
{
@@ -462,14 +377,12 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
- INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
init_waitqueue_head(&cma_xprt->sc_send_wait);
spin_lock_init(&cma_xprt->sc_lock);
spin_lock_init(&cma_xprt->sc_rq_dto_lock);
- spin_lock_init(&cma_xprt->sc_frmr_q_lock);
spin_lock_init(&cma_xprt->sc_ctxt_lock);
spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
@@ -780,86 +693,6 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
return ERR_PTR(ret);
}
-static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
-{
- struct ib_mr *mr;
- struct scatterlist *sg;
- struct svc_rdma_fastreg_mr *frmr;
- u32 num_sg;
-
- frmr = kmalloc(sizeof(*frmr), GFP_KERNEL);
- if (!frmr)
- goto err;
-
- num_sg = min_t(u32, RPCSVC_MAXPAGES, xprt->sc_frmr_pg_list_len);
- mr = ib_alloc_mr(xprt->sc_pd, IB_MR_TYPE_MEM_REG, num_sg);
- if (IS_ERR(mr))
- goto err_free_frmr;
-
- sg = kcalloc(RPCSVC_MAXPAGES, sizeof(*sg), GFP_KERNEL);
- if (!sg)
- goto err_free_mr;
-
- sg_init_table(sg, RPCSVC_MAXPAGES);
-
- frmr->mr = mr;
- frmr->sg = sg;
- INIT_LIST_HEAD(&frmr->frmr_list);
- return frmr;
-
- err_free_mr:
- ib_dereg_mr(mr);
- err_free_frmr:
- kfree(frmr);
- err:
- return ERR_PTR(-ENOMEM);
-}
-
-static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
-{
- struct svc_rdma_fastreg_mr *frmr;
-
- while (!list_empty(&xprt->sc_frmr_q)) {
- frmr = list_entry(xprt->sc_frmr_q.next,
- struct svc_rdma_fastreg_mr, frmr_list);
- list_del_init(&frmr->frmr_list);
- kfree(frmr->sg);
- ib_dereg_mr(frmr->mr);
- kfree(frmr);
- }
-}
-
-struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
-{
- struct svc_rdma_fastreg_mr *frmr = NULL;
-
- spin_lock(&rdma->sc_frmr_q_lock);
- if (!list_empty(&rdma->sc_frmr_q)) {
- frmr = list_entry(rdma->sc_frmr_q.next,
- struct svc_rdma_fastreg_mr, frmr_list);
- list_del_init(&frmr->frmr_list);
- frmr->sg_nents = 0;
- }
- spin_unlock(&rdma->sc_frmr_q_lock);
- if (frmr)
- return frmr;
-
- return rdma_alloc_frmr(rdma);
-}
-
-void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
- struct svc_rdma_fastreg_mr *frmr)
-{
- if (frmr) {
- ib_dma_unmap_sg(rdma->sc_cm_id->device,
- frmr->sg, frmr->sg_nents, frmr->direction);
- spin_lock(&rdma->sc_frmr_q_lock);
- WARN_ON_ONCE(!list_empty(&frmr->frmr_list));
- list_add(&frmr->frmr_list, &rdma->sc_frmr_q);
- spin_unlock(&rdma->sc_frmr_q_lock);
- }
-}
-
/*
* This is the xpo_recvfrom function for listening endpoints. Its
* purpose is to accept incoming connections. The CMA callback handler
@@ -908,8 +741,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
* capabilities of this particular device */
newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge,
(size_t)RPCSVC_MAXPAGES);
- newxprt->sc_max_sge_rd = min_t(size_t, dev->attrs.max_sge_rd,
- RPCSVC_MAXPAGES);
newxprt->sc_max_req_size = svcrdma_max_req_size;
newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr,
svcrdma_max_requests);
@@ -952,7 +783,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
memset(&qp_attr, 0, sizeof qp_attr);
qp_attr.event_handler = qp_event_handler;
qp_attr.qp_context = &newxprt->sc_xprt;
- qp_attr.port_num = newxprt->sc_cm_id->port_num;
+ qp_attr.port_num = newxprt->sc_port_num;
qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests;
qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
@@ -976,47 +807,12 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
}
newxprt->sc_qp = newxprt->sc_cm_id->qp;
- /*
- * Use the most secure set of MR resources based on the
- * transport type and available memory management features in
- * the device. Here's the table implemented below:
- *
- * Fast Global DMA Remote WR
- * Reg LKEY MR Access
- * Sup'd Sup'd Needed Needed
- *
- * IWARP N N Y Y
- * N Y Y Y
- * Y N Y N
- * Y Y N -
- *
- * IB N N Y N
- * N Y N -
- * Y N Y N
- * Y Y N -
- *
- * NB: iWARP requires remote write access for the data sink
- * of an RDMA_READ. IB does not.
- */
- newxprt->sc_reader = rdma_read_chunk_lcl;
- if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
- newxprt->sc_frmr_pg_list_len =
- dev->attrs.max_fast_reg_page_list_len;
- newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
- newxprt->sc_reader = rdma_read_chunk_frmr;
- } else
+ if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
newxprt->sc_snd_w_inv = false;
-
- /*
- * Determine if a DMA MR is required and if so, what privs are required
- */
- if (!rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num) &&
- !rdma_ib_or_roce(dev, newxprt->sc_cm_id->port_num))
+ if (!rdma_protocol_iwarp(dev, newxprt->sc_port_num) &&
+ !rdma_ib_or_roce(dev, newxprt->sc_port_num))
goto errout;
- if (rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num))
- newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;
-
/* Post receive buffers */
for (i = 0; i < newxprt->sc_max_requests; i++) {
ret = svc_rdma_post_recv(newxprt, GFP_KERNEL);
@@ -1056,7 +852,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap));
dprintk(" max_sge : %d\n", newxprt->sc_max_sge);
- dprintk(" max_sge_rd : %d\n", newxprt->sc_max_sge_rd);
dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth);
dprintk(" max_requests : %d\n", newxprt->sc_max_requests);
dprintk(" ord : %d\n", newxprt->sc_ord);
@@ -1117,12 +912,6 @@ static void __svc_rdma_free(struct work_struct *work)
pr_err("svcrdma: sc_xprt still in use? (%d)\n",
kref_read(&xprt->xpt_ref));
- /*
- * Destroy queued, but not processed read completions. Note
- * that this cleanup has to be done before destroying the
- * cm_id because the device ptr is needed to unmap the dma in
- * svc_rdma_put_context.
- */
while (!list_empty(&rdma->sc_read_complete_q)) {
struct svc_rdma_op_ctxt *ctxt;
ctxt = list_first_entry(&rdma->sc_read_complete_q,
@@ -1130,8 +919,6 @@ static void __svc_rdma_free(struct work_struct *work)
list_del(&ctxt->list);
svc_rdma_put_context(ctxt, 1);
}
-
- /* Destroy queued, but not processed recv completions */
while (!list_empty(&rdma->sc_rq_dto_q)) {
struct svc_rdma_op_ctxt *ctxt;
ctxt = list_first_entry(&rdma->sc_rq_dto_q,
@@ -1151,7 +938,6 @@ static void __svc_rdma_free(struct work_struct *work)
xprt->xpt_bc_xprt = NULL;
}
- rdma_dealloc_frmr_q(rdma);
svc_rdma_destroy_rw_ctxts(rdma);
svc_rdma_destroy_ctxts(rdma);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 62ecbccd9748..d1c458e5ec4d 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -684,7 +684,8 @@ xprt_rdma_free(struct rpc_task *task)
dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
- if (unlikely(!list_empty(&req->rl_registered)))
+ rpcrdma_remove_req(&r_xprt->rx_buf, req);
+ if (!list_empty(&req->rl_registered))
ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
rpcrdma_unmap_sges(ia, req);
rpcrdma_buffer_put(req);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 3dbce9ac4327..e4171f2abe37 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -243,8 +243,6 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
#endif
- struct ib_qp_attr *attr = &ia->ri_qp_attr;
- struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
int connstate = 0;
switch (event->event) {
@@ -267,7 +265,8 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
break;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
- pr_info("rpcrdma: removing device for %pIS:%u\n",
+ pr_info("rpcrdma: removing device %s for %pIS:%u\n",
+ ia->ri_device->name,
sap, rpc_get_port(sap));
#endif
set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
@@ -282,13 +281,6 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
return 1;
case RDMA_CM_EVENT_ESTABLISHED:
connstate = 1;
- ib_query_qp(ia->ri_id->qp, attr,
- IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
- iattr);
- dprintk("RPC: %s: %d responder resources"
- " (%d initiator)\n",
- __func__, attr->max_dest_rd_atomic,
- attr->max_rd_atomic);
rpcrdma_update_connect_private(xprt, &event->param.conn);
goto connected;
case RDMA_CM_EVENT_CONNECT_ERROR:
@@ -298,11 +290,9 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
connstate = -ENETDOWN;
goto connected;
case RDMA_CM_EVENT_REJECTED:
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
- pr_info("rpcrdma: connection to %pIS:%u on %s rejected: %s\n",
- sap, rpc_get_port(sap), ia->ri_device->name,
+ dprintk("rpcrdma: connection to %pIS:%u rejected: %s\n",
+ sap, rpc_get_port(sap),
rdma_reject_msg(id, event->status));
-#endif
connstate = -ECONNREFUSED;
if (event->status == IB_CM_REJ_STALE_CONN)
connstate = -EAGAIN;
@@ -310,37 +300,19 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
case RDMA_CM_EVENT_DISCONNECTED:
connstate = -ECONNABORTED;
connected:
- dprintk("RPC: %s: %sconnected\n",
- __func__, connstate > 0 ? "" : "dis");
atomic_set(&xprt->rx_buf.rb_credits, 1);
ep->rep_connected = connstate;
rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
/*FALLTHROUGH*/
default:
- dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n",
- __func__, sap, rpc_get_port(sap), ep,
- rdma_event_msg(event->event));
+ dprintk("RPC: %s: %pIS:%u on %s/%s (ep 0x%p): %s\n",
+ __func__, sap, rpc_get_port(sap),
+ ia->ri_device->name, ia->ri_ops->ro_displayname,
+ ep, rdma_event_msg(event->event));
break;
}
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
- if (connstate == 1) {
- int ird = attr->max_dest_rd_atomic;
- int tird = ep->rep_remote_cma.responder_resources;
-
- pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
- sap, rpc_get_port(sap),
- ia->ri_device->name,
- ia->ri_ops->ro_displayname,
- xprt->rx_buf.rb_max_requests,
- ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
- } else if (connstate < 0) {
- pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
- sap, rpc_get_port(sap), connstate);
- }
-#endif
-
return 0;
}
@@ -971,7 +943,6 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
if (req == NULL)
return ERR_PTR(-ENOMEM);
- INIT_LIST_HEAD(&req->rl_free);
spin_lock(&buffer->rb_reqslock);
list_add(&req->rl_all, &buffer->rb_allreqs);
spin_unlock(&buffer->rb_reqslock);
@@ -1033,6 +1004,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
spin_lock_init(&buf->rb_recovery_lock);
INIT_LIST_HEAD(&buf->rb_mws);
INIT_LIST_HEAD(&buf->rb_all);
+ INIT_LIST_HEAD(&buf->rb_pending);
INIT_LIST_HEAD(&buf->rb_stale_mrs);
INIT_DELAYED_WORK(&buf->rb_refresh_worker,
rpcrdma_mr_refresh_worker);
@@ -1055,7 +1027,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
goto out;
}
req->rl_backchannel = false;
- list_add(&req->rl_free, &buf->rb_send_bufs);
+ list_add(&req->rl_list, &buf->rb_send_bufs);
}
INIT_LIST_HEAD(&buf->rb_recv_bufs);
@@ -1084,8 +1056,8 @@ rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
struct rpcrdma_req *req;
req = list_first_entry(&buf->rb_send_bufs,
- struct rpcrdma_req, rl_free);
- list_del(&req->rl_free);
+ struct rpcrdma_req, rl_list);
+ list_del_init(&req->rl_list);
return req;
}
@@ -1187,6 +1159,7 @@ rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
if (!mw)
goto out_nomws;
+ mw->mw_flags = 0;
return mw;
out_nomws:
@@ -1267,7 +1240,7 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
spin_lock(&buffers->rb_lock);
buffers->rb_send_count--;
- list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
+ list_add_tail(&req->rl_list, &buffers->rb_send_bufs);
if (rep) {
buffers->rb_recv_count--;
list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 1d66acf1a723..b282d3f8cdd8 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -271,6 +271,7 @@ struct rpcrdma_mw {
struct scatterlist *mw_sg;
int mw_nents;
enum dma_data_direction mw_dir;
+ unsigned long mw_flags;
union {
struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
@@ -282,6 +283,11 @@ struct rpcrdma_mw {
struct list_head mw_all;
};
+/* mw_flags */
+enum {
+ RPCRDMA_MW_F_RI = 1,
+};
+
/*
* struct rpcrdma_req -- structure central to the request/reply sequence.
*
@@ -334,7 +340,8 @@ enum {
struct rpcrdma_buffer;
struct rpcrdma_req {
- struct list_head rl_free;
+ struct list_head rl_list;
+ __be32 rl_xid;
unsigned int rl_mapped_sges;
unsigned int rl_connect_cookie;
struct rpcrdma_buffer *rl_buffer;
@@ -396,6 +403,7 @@ struct rpcrdma_buffer {
int rb_send_count, rb_recv_count;
struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
+ struct list_head rb_pending;
u32 rb_max_requests;
atomic_t rb_credits; /* most recent credit grant */
@@ -461,7 +469,7 @@ struct rpcrdma_memreg_ops {
struct rpcrdma_mr_seg *, int, bool,
struct rpcrdma_mw **);
void (*ro_unmap_sync)(struct rpcrdma_xprt *,
- struct rpcrdma_req *);
+ struct list_head *);
void (*ro_unmap_safe)(struct rpcrdma_xprt *,
struct rpcrdma_req *, bool);
void (*ro_recover_mr)(struct rpcrdma_mw *);
@@ -544,6 +552,34 @@ void rpcrdma_destroy_req(struct rpcrdma_req *);
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
+static inline void
+rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+ spin_lock(&buffers->rb_lock);
+ if (list_empty(&req->rl_list))
+ list_add_tail(&req->rl_list, &buffers->rb_pending);
+ spin_unlock(&buffers->rb_lock);
+}
+
+static inline struct rpcrdma_req *
+rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid)
+{
+ struct rpcrdma_req *pos;
+
+ list_for_each_entry(pos, &buffers->rb_pending, rl_list)
+ if (pos->rl_xid == xid)
+ return pos;
+ return NULL;
+}
+
+static inline void
+rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+ spin_lock(&buffers->rb_lock);
+ list_del(&req->rl_list);
+ spin_unlock(&buffers->rb_lock);
+}
+
struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index d5b54c020dec..4f154d388748 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1624,6 +1624,8 @@ static void xs_tcp_state_change(struct sock *sk)
if (test_and_clear_bit(XPRT_SOCK_CONNECTING,
&transport->sock_state))
xprt_clear_connecting(xprt);
+ if (sk->sk_err)
+ xprt_wake_pending_tasks(xprt, -sk->sk_err);
xs_sock_mark_closed(xprt);
}
out: