diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/auth_unix.c | 2 | ||||
-rw-r--r-- | net/sunrpc/cache.c | 103 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 2 | ||||
-rw-r--r-- | net/sunrpc/svc.c | 113 | ||||
-rw-r--r-- | net/sunrpc/svc_xprt.c | 10 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 19 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 11 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/physical_ops.c | 25 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 197 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 12 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 83 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 47 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 77 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 236 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 28 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 18 |
16 files changed, 499 insertions, 484 deletions
diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c index 4feda2d0a833..548240dd15fc 100644 --- a/net/sunrpc/auth_unix.c +++ b/net/sunrpc/auth_unix.c @@ -23,7 +23,7 @@ struct unx_cred { }; #define uc_uid uc_base.cr_uid -#define UNX_WRITESLACK (21 + (UNX_MAXNODENAME >> 2)) +#define UNX_WRITESLACK (21 + XDR_QUADLEN(UNX_MAXNODENAME)) #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) # define RPCDBG_FACILITY RPCDBG_AUTH diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c index 2928afffbb81..4a2340a54401 100644 --- a/net/sunrpc/cache.c +++ b/net/sunrpc/cache.c @@ -44,7 +44,7 @@ static void cache_revisit_request(struct cache_head *item); static void cache_init(struct cache_head *h) { time_t now = seconds_since_boot(); - h->next = NULL; + INIT_HLIST_NODE(&h->cache_list); h->flags = 0; kref_init(&h->ref); h->expiry_time = now + CACHE_NEW_EXPIRY; @@ -54,15 +54,14 @@ static void cache_init(struct cache_head *h) struct cache_head *sunrpc_cache_lookup(struct cache_detail *detail, struct cache_head *key, int hash) { - struct cache_head **head, **hp; - struct cache_head *new = NULL, *freeme = NULL; + struct cache_head *new = NULL, *freeme = NULL, *tmp = NULL; + struct hlist_head *head; head = &detail->hash_table[hash]; read_lock(&detail->hash_lock); - for (hp=head; *hp != NULL ; hp = &(*hp)->next) { - struct cache_head *tmp = *hp; + hlist_for_each_entry(tmp, head, cache_list) { if (detail->match(tmp, key)) { if (cache_is_expired(detail, tmp)) /* This entry is expired, we will discard it. */ @@ -88,12 +87,10 @@ struct cache_head *sunrpc_cache_lookup(struct cache_detail *detail, write_lock(&detail->hash_lock); /* check if entry appeared while we slept */ - for (hp=head; *hp != NULL ; hp = &(*hp)->next) { - struct cache_head *tmp = *hp; + hlist_for_each_entry(tmp, head, cache_list) { if (detail->match(tmp, key)) { if (cache_is_expired(detail, tmp)) { - *hp = tmp->next; - tmp->next = NULL; + hlist_del_init(&tmp->cache_list); detail->entries --; freeme = tmp; break; @@ -104,8 +101,8 @@ struct cache_head *sunrpc_cache_lookup(struct cache_detail *detail, return tmp; } } - new->next = *head; - *head = new; + + hlist_add_head(&new->cache_list, head); detail->entries++; cache_get(new); write_unlock(&detail->hash_lock); @@ -143,7 +140,6 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail, * If 'old' is not VALID, we update it directly, * otherwise we need to replace it */ - struct cache_head **head; struct cache_head *tmp; if (!test_bit(CACHE_VALID, &old->flags)) { @@ -168,15 +164,13 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail, } cache_init(tmp); detail->init(tmp, old); - head = &detail->hash_table[hash]; write_lock(&detail->hash_lock); if (test_bit(CACHE_NEGATIVE, &new->flags)) set_bit(CACHE_NEGATIVE, &tmp->flags); else detail->update(tmp, new); - tmp->next = *head; - *head = tmp; + hlist_add_head(&tmp->cache_list, &detail->hash_table[hash]); detail->entries++; cache_get(tmp); cache_fresh_locked(tmp, new->expiry_time); @@ -416,28 +410,29 @@ static int cache_clean(void) /* find a non-empty bucket in the table */ while (current_detail && current_index < current_detail->hash_size && - current_detail->hash_table[current_index] == NULL) + hlist_empty(¤t_detail->hash_table[current_index])) current_index++; /* find a cleanable entry in the bucket and clean it, or set to next bucket */ if (current_detail && current_index < current_detail->hash_size) { - struct cache_head *ch, **cp; + struct cache_head *ch = NULL; struct cache_detail *d; + struct hlist_head *head; + struct hlist_node *tmp; write_lock(¤t_detail->hash_lock); /* Ok, now to clean this strand */ - cp = & current_detail->hash_table[current_index]; - for (ch = *cp ; ch ; cp = & ch->next, ch = *cp) { + head = ¤t_detail->hash_table[current_index]; + hlist_for_each_entry_safe(ch, tmp, head, cache_list) { if (current_detail->nextcheck > ch->expiry_time) current_detail->nextcheck = ch->expiry_time+1; if (!cache_is_expired(current_detail, ch)) continue; - *cp = ch->next; - ch->next = NULL; + hlist_del_init(&ch->cache_list); current_detail->entries--; rv = 1; break; @@ -1270,18 +1265,13 @@ EXPORT_SYMBOL_GPL(qword_get); * get a header, then pass each real item in the cache */ -struct handle { - struct cache_detail *cd; -}; - -static void *c_start(struct seq_file *m, loff_t *pos) +void *cache_seq_start(struct seq_file *m, loff_t *pos) __acquires(cd->hash_lock) { loff_t n = *pos; unsigned int hash, entry; struct cache_head *ch; - struct cache_detail *cd = ((struct handle*)m->private)->cd; - + struct cache_detail *cd = m->private; read_lock(&cd->hash_lock); if (!n--) @@ -1289,7 +1279,7 @@ static void *c_start(struct seq_file *m, loff_t *pos) hash = n >> 32; entry = n & ((1LL<<32) - 1); - for (ch=cd->hash_table[hash]; ch; ch=ch->next) + hlist_for_each_entry(ch, &cd->hash_table[hash], cache_list) if (!entry--) return ch; n &= ~((1LL<<32) - 1); @@ -1297,51 +1287,57 @@ static void *c_start(struct seq_file *m, loff_t *pos) hash++; n += 1LL<<32; } while(hash < cd->hash_size && - cd->hash_table[hash]==NULL); + hlist_empty(&cd->hash_table[hash])); if (hash >= cd->hash_size) return NULL; *pos = n+1; - return cd->hash_table[hash]; + return hlist_entry_safe(cd->hash_table[hash].first, + struct cache_head, cache_list); } +EXPORT_SYMBOL_GPL(cache_seq_start); -static void *c_next(struct seq_file *m, void *p, loff_t *pos) +void *cache_seq_next(struct seq_file *m, void *p, loff_t *pos) { struct cache_head *ch = p; int hash = (*pos >> 32); - struct cache_detail *cd = ((struct handle*)m->private)->cd; + struct cache_detail *cd = m->private; if (p == SEQ_START_TOKEN) hash = 0; - else if (ch->next == NULL) { + else if (ch->cache_list.next == NULL) { hash++; *pos += 1LL<<32; } else { ++*pos; - return ch->next; + return hlist_entry_safe(ch->cache_list.next, + struct cache_head, cache_list); } *pos &= ~((1LL<<32) - 1); while (hash < cd->hash_size && - cd->hash_table[hash] == NULL) { + hlist_empty(&cd->hash_table[hash])) { hash++; *pos += 1LL<<32; } if (hash >= cd->hash_size) return NULL; ++*pos; - return cd->hash_table[hash]; + return hlist_entry_safe(cd->hash_table[hash].first, + struct cache_head, cache_list); } +EXPORT_SYMBOL_GPL(cache_seq_next); -static void c_stop(struct seq_file *m, void *p) +void cache_seq_stop(struct seq_file *m, void *p) __releases(cd->hash_lock) { - struct cache_detail *cd = ((struct handle*)m->private)->cd; + struct cache_detail *cd = m->private; read_unlock(&cd->hash_lock); } +EXPORT_SYMBOL_GPL(cache_seq_stop); static int c_show(struct seq_file *m, void *p) { struct cache_head *cp = p; - struct cache_detail *cd = ((struct handle*)m->private)->cd; + struct cache_detail *cd = m->private; if (p == SEQ_START_TOKEN) return cd->cache_show(m, cd, NULL); @@ -1364,33 +1360,36 @@ static int c_show(struct seq_file *m, void *p) } static const struct seq_operations cache_content_op = { - .start = c_start, - .next = c_next, - .stop = c_stop, + .start = cache_seq_start, + .next = cache_seq_next, + .stop = cache_seq_stop, .show = c_show, }; static int content_open(struct inode *inode, struct file *file, struct cache_detail *cd) { - struct handle *han; + struct seq_file *seq; + int err; if (!cd || !try_module_get(cd->owner)) return -EACCES; - han = __seq_open_private(file, &cache_content_op, sizeof(*han)); - if (han == NULL) { + + err = seq_open(file, &cache_content_op); + if (err) { module_put(cd->owner); - return -ENOMEM; + return err; } - han->cd = cd; + seq = file->private_data; + seq->private = cd; return 0; } static int content_release(struct inode *inode, struct file *file, struct cache_detail *cd) { - int ret = seq_release_private(inode, file); + int ret = seq_release(inode, file); module_put(cd->owner); return ret; } @@ -1665,17 +1664,21 @@ EXPORT_SYMBOL_GPL(cache_unregister_net); struct cache_detail *cache_create_net(struct cache_detail *tmpl, struct net *net) { struct cache_detail *cd; + int i; cd = kmemdup(tmpl, sizeof(struct cache_detail), GFP_KERNEL); if (cd == NULL) return ERR_PTR(-ENOMEM); - cd->hash_table = kzalloc(cd->hash_size * sizeof(struct cache_head *), + cd->hash_table = kzalloc(cd->hash_size * sizeof(struct hlist_head), GFP_KERNEL); if (cd->hash_table == NULL) { kfree(cd); return ERR_PTR(-ENOMEM); } + + for (i = 0; i < cd->hash_size; i++) + INIT_HLIST_HEAD(&cd->hash_table[i]); cd->net = net; return cd; } diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 337ca851a350..b140c092d226 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -297,7 +297,7 @@ static int rpc_complete_task(struct rpc_task *task) clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); ret = atomic_dec_and_test(&task->tk_count); if (waitqueue_active(wq)) - __wake_up_locked_key(wq, TASK_NORMAL, &k); + __wake_up_locked_key(wq, TASK_NORMAL, 1, &k); spin_unlock_irqrestore(&wq->lock, flags); return ret; } diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index 5a16d8d8c831..a8f579df14d8 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -34,36 +34,19 @@ static void svc_unregister(const struct svc_serv *serv, struct net *net); -#define svc_serv_is_pooled(serv) ((serv)->sv_function) +#define svc_serv_is_pooled(serv) ((serv)->sv_ops->svo_function) -/* - * Mode for mapping cpus to pools. - */ -enum { - SVC_POOL_AUTO = -1, /* choose one of the others */ - SVC_POOL_GLOBAL, /* no mapping, just a single global pool - * (legacy & UP mode) */ - SVC_POOL_PERCPU, /* one pool per cpu */ - SVC_POOL_PERNODE /* one pool per numa node */ -}; #define SVC_POOL_DEFAULT SVC_POOL_GLOBAL /* * Structure for mapping cpus to pools and vice versa. * Setup once during sunrpc initialisation. */ -static struct svc_pool_map { - int count; /* How many svc_servs use us */ - int mode; /* Note: int not enum to avoid - * warnings about "enumeration value - * not handled in switch" */ - unsigned int npools; - unsigned int *pool_to; /* maps pool id to cpu or node */ - unsigned int *to_pool; /* maps cpu or node to pool id */ -} svc_pool_map = { - .count = 0, +struct svc_pool_map svc_pool_map = { .mode = SVC_POOL_DEFAULT }; +EXPORT_SYMBOL_GPL(svc_pool_map); + static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */ static int @@ -236,7 +219,7 @@ svc_pool_map_init_pernode(struct svc_pool_map *m) * vice versa). Initialise the map if we're the first user. * Returns the number of pools. */ -static unsigned int +unsigned int svc_pool_map_get(void) { struct svc_pool_map *m = &svc_pool_map; @@ -271,7 +254,7 @@ svc_pool_map_get(void) mutex_unlock(&svc_pool_map_mutex); return m->npools; } - +EXPORT_SYMBOL_GPL(svc_pool_map_get); /* * Drop a reference to the global map of cpus to pools. @@ -280,7 +263,7 @@ svc_pool_map_get(void) * mode using the pool_mode module option without * rebooting or re-loading sunrpc.ko. */ -static void +void svc_pool_map_put(void) { struct svc_pool_map *m = &svc_pool_map; @@ -297,7 +280,7 @@ svc_pool_map_put(void) mutex_unlock(&svc_pool_map_mutex); } - +EXPORT_SYMBOL_GPL(svc_pool_map_put); static int svc_pool_map_get_node(unsigned int pidx) { @@ -423,7 +406,7 @@ EXPORT_SYMBOL_GPL(svc_bind); */ static struct svc_serv * __svc_create(struct svc_program *prog, unsigned int bufsize, int npools, - void (*shutdown)(struct svc_serv *serv, struct net *net)) + struct svc_serv_ops *ops) { struct svc_serv *serv; unsigned int vers; @@ -440,7 +423,7 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools, bufsize = RPCSVC_MAXPAYLOAD; serv->sv_max_payload = bufsize? bufsize : 4096; serv->sv_max_mesg = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE); - serv->sv_shutdown = shutdown; + serv->sv_ops = ops; xdrsize = 0; while (prog) { prog->pg_lovers = prog->pg_nvers-1; @@ -486,26 +469,22 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools, struct svc_serv * svc_create(struct svc_program *prog, unsigned int bufsize, - void (*shutdown)(struct svc_serv *serv, struct net *net)) + struct svc_serv_ops *ops) { - return __svc_create(prog, bufsize, /*npools*/1, shutdown); + return __svc_create(prog, bufsize, /*npools*/1, ops); } EXPORT_SYMBOL_GPL(svc_create); struct svc_serv * svc_create_pooled(struct svc_program *prog, unsigned int bufsize, - void (*shutdown)(struct svc_serv *serv, struct net *net), - svc_thread_fn func, struct module *mod) + struct svc_serv_ops *ops) { struct svc_serv *serv; unsigned int npools = svc_pool_map_get(); - serv = __svc_create(prog, bufsize, npools, shutdown); + serv = __svc_create(prog, bufsize, npools, ops); if (!serv) goto out_err; - - serv->sv_function = func; - serv->sv_module = mod; return serv; out_err: svc_pool_map_put(); @@ -517,8 +496,8 @@ void svc_shutdown_net(struct svc_serv *serv, struct net *net) { svc_close_net(serv, net); - if (serv->sv_shutdown) - serv->sv_shutdown(serv, net); + if (serv->sv_ops->svo_shutdown) + serv->sv_ops->svo_shutdown(serv, net); } EXPORT_SYMBOL_GPL(svc_shutdown_net); @@ -604,40 +583,52 @@ svc_release_buffer(struct svc_rqst *rqstp) } struct svc_rqst * -svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node) +svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node) { struct svc_rqst *rqstp; rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node); if (!rqstp) - goto out_enomem; + return rqstp; - serv->sv_nrthreads++; __set_bit(RQ_BUSY, &rqstp->rq_flags); spin_lock_init(&rqstp->rq_lock); rqstp->rq_server = serv; rqstp->rq_pool = pool; - spin_lock_bh(&pool->sp_lock); - pool->sp_nrthreads++; - list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads); - spin_unlock_bh(&pool->sp_lock); rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node); if (!rqstp->rq_argp) - goto out_thread; + goto out_enomem; rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node); if (!rqstp->rq_resp) - goto out_thread; + goto out_enomem; if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node)) - goto out_thread; + goto out_enomem; return rqstp; -out_thread: - svc_exit_thread(rqstp); out_enomem: - return ERR_PTR(-ENOMEM); + svc_rqst_free(rqstp); + return NULL; +} +EXPORT_SYMBOL_GPL(svc_rqst_alloc); + +struct svc_rqst * +svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node) +{ + struct svc_rqst *rqstp; + + rqstp = svc_rqst_alloc(serv, pool, node); + if (!rqstp) + return ERR_PTR(-ENOMEM); + + serv->sv_nrthreads++; + spin_lock_bh(&pool->sp_lock); + pool->sp_nrthreads++; + list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads); + spin_unlock_bh(&pool->sp_lock); + return rqstp; } EXPORT_SYMBOL_GPL(svc_prepare_thread); @@ -739,12 +730,12 @@ svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) break; } - __module_get(serv->sv_module); - task = kthread_create_on_node(serv->sv_function, rqstp, + __module_get(serv->sv_ops->svo_module); + task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp, node, "%s", serv->sv_name); if (IS_ERR(task)) { error = PTR_ERR(task); - module_put(serv->sv_module); + module_put(serv->sv_ops->svo_module); svc_exit_thread(rqstp); break; } @@ -772,15 +763,21 @@ EXPORT_SYMBOL_GPL(svc_set_num_threads); * mutex" for the service. */ void -svc_exit_thread(struct svc_rqst *rqstp) +svc_rqst_free(struct svc_rqst *rqstp) { - struct svc_serv *serv = rqstp->rq_server; - struct svc_pool *pool = rqstp->rq_pool; - svc_release_buffer(rqstp); kfree(rqstp->rq_resp); kfree(rqstp->rq_argp); kfree(rqstp->rq_auth_data); + kfree_rcu(rqstp, rq_rcu_head); +} +EXPORT_SYMBOL_GPL(svc_rqst_free); + +void +svc_exit_thread(struct svc_rqst *rqstp) +{ + struct svc_serv *serv = rqstp->rq_server; + struct svc_pool *pool = rqstp->rq_pool; spin_lock_bh(&pool->sp_lock); pool->sp_nrthreads--; @@ -788,7 +785,7 @@ svc_exit_thread(struct svc_rqst *rqstp) list_del_rcu(&rqstp->rq_all); spin_unlock_bh(&pool->sp_lock); - kfree_rcu(rqstp, rq_rcu_head); + svc_rqst_free(rqstp); /* Release the server */ if (serv) diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index 163ac45c3639..a6cbb2104667 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -24,7 +24,6 @@ static int svc_deferred_recv(struct svc_rqst *rqstp); static struct cache_deferred_req *svc_defer(struct cache_req *req); static void svc_age_temp_xprts(unsigned long closure); static void svc_delete_xprt(struct svc_xprt *xprt); -static void svc_xprt_do_enqueue(struct svc_xprt *xprt); /* apparently the "standard" is that clients close * idle connections after 5 minutes, servers after @@ -225,12 +224,12 @@ static void svc_xprt_received(struct svc_xprt *xprt) } /* As soon as we clear busy, the xprt could be closed and - * 'put', so we need a reference to call svc_xprt_do_enqueue with: + * 'put', so we need a reference to call svc_enqueue_xprt with: */ svc_xprt_get(xprt); smp_mb__before_atomic(); clear_bit(XPT_BUSY, &xprt->xpt_flags); - svc_xprt_do_enqueue(xprt); + xprt->xpt_server->sv_ops->svo_enqueue_xprt(xprt); svc_xprt_put(xprt); } @@ -320,7 +319,7 @@ static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt) return false; } -static void svc_xprt_do_enqueue(struct svc_xprt *xprt) +void svc_xprt_do_enqueue(struct svc_xprt *xprt) { struct svc_pool *pool; struct svc_rqst *rqstp = NULL; @@ -402,6 +401,7 @@ redo_search: out: trace_svc_xprt_do_enqueue(xprt, rqstp); } +EXPORT_SYMBOL_GPL(svc_xprt_do_enqueue); /* * Queue up a transport with data pending. If there are idle nfsd @@ -412,7 +412,7 @@ void svc_xprt_enqueue(struct svc_xprt *xprt) { if (test_bit(XPT_BUSY, &xprt->xpt_flags)) return; - svc_xprt_do_enqueue(xprt); + xprt->xpt_server->sv_ops->svo_enqueue_xprt(xprt); } EXPORT_SYMBOL_GPL(svc_xprt_enqueue); diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index f1e8dafbd507..cb25c89da623 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -39,6 +39,25 @@ static int fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_create_data_internal *cdata) { + struct ib_device_attr *devattr = &ia->ri_devattr; + struct ib_mr *mr; + + /* Obtain an lkey to use for the regbufs, which are + * protected from remote access. + */ + if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) { + ia->ri_dma_lkey = ia->ri_device->local_dma_lkey; + } else { + mr = ib_get_dma_mr(ia->ri_pd, IB_ACCESS_LOCAL_WRITE); + if (IS_ERR(mr)) { + pr_err("%s: ib_get_dma_mr for failed with %lX\n", + __func__, PTR_ERR(mr)); + return -ENOMEM; + } + ia->ri_dma_lkey = ia->ri_dma_mr->lkey; + ia->ri_dma_mr = mr; + } + return 0; } diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 04ea914201b2..d6653f5d0830 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -117,7 +117,7 @@ __frwr_recovery_worker(struct work_struct *work) if (ib_dereg_mr(r->r.frmr.fr_mr)) goto out_fail; - r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(pd, depth); + r->r.frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); if (IS_ERR(r->r.frmr.fr_mr)) goto out_fail; @@ -148,7 +148,7 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, struct rpcrdma_frmr *f = &r->r.frmr; int rc; - f->fr_mr = ib_alloc_fast_reg_mr(pd, depth); + f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); if (IS_ERR(f->fr_mr)) goto out_mr_err; f->fr_pgl = ib_alloc_fast_reg_page_list(device, depth); @@ -158,7 +158,7 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, out_mr_err: rc = PTR_ERR(f->fr_mr); - dprintk("RPC: %s: ib_alloc_fast_reg_mr status %i\n", + dprintk("RPC: %s: ib_alloc_mr status %i\n", __func__, rc); return rc; @@ -189,6 +189,11 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct ib_device_attr *devattr = &ia->ri_devattr; int depth, delta; + /* Obtain an lkey to use for the regbufs, which are + * protected from remote access. + */ + ia->ri_dma_lkey = ia->ri_device->local_dma_lkey; + ia->ri_max_frmr_depth = min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, devattr->max_fast_reg_page_list_len); diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c index 41985d07fdb7..72cf8b15bbb4 100644 --- a/net/sunrpc/xprtrdma/physical_ops.c +++ b/net/sunrpc/xprtrdma/physical_ops.c @@ -23,6 +23,29 @@ static int physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_create_data_internal *cdata) { + struct ib_device_attr *devattr = &ia->ri_devattr; + struct ib_mr *mr; + + /* Obtain an rkey to use for RPC data payloads. + */ + mr = ib_get_dma_mr(ia->ri_pd, + IB_ACCESS_LOCAL_WRITE | + IB_ACCESS_REMOTE_WRITE | + IB_ACCESS_REMOTE_READ); + if (IS_ERR(mr)) { + pr_err("%s: ib_get_dma_mr for failed with %lX\n", + __func__, PTR_ERR(mr)); + return -ENOMEM; + } + ia->ri_dma_mr = mr; + + /* Obtain an lkey to use for regbufs. + */ + if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) + ia->ri_dma_lkey = ia->ri_device->local_dma_lkey; + else + ia->ri_dma_lkey = ia->ri_dma_mr->lkey; + return 0; } @@ -51,7 +74,7 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, struct rpcrdma_ia *ia = &r_xprt->rx_ia; rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing)); - seg->mr_rkey = ia->ri_bind_mem->rkey; + seg->mr_rkey = ia->ri_dma_mr->rkey; seg->mr_base = seg->mr_dma; seg->mr_nsegs = 1; return 1; diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 84ea37daef36..bc8bd6577467 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -71,6 +71,67 @@ static const char transfertypes[][12] = { }; #endif +/* The client can send a request inline as long as the RPCRDMA header + * plus the RPC call fit under the transport's inline limit. If the + * combined call message size exceeds that limit, the client must use + * the read chunk list for this operation. + */ +static bool rpcrdma_args_inline(struct rpc_rqst *rqst) +{ + unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len; + + return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); +} + +/* The client can't know how large the actual reply will be. Thus it + * plans for the largest possible reply for that particular ULP + * operation. If the maximum combined reply message size exceeds that + * limit, the client must provide a write list or a reply chunk for + * this request. + */ +static bool rpcrdma_results_inline(struct rpc_rqst *rqst) +{ + unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen; + + return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst); +} + +static int +rpcrdma_tail_pullup(struct xdr_buf *buf) +{ + size_t tlen = buf->tail[0].iov_len; + size_t skip = tlen & 3; + + /* Do not include the tail if it is only an XDR pad */ + if (tlen < 4) + return 0; + + /* xdr_write_pages() adds a pad at the beginning of the tail + * if the content in "buf->pages" is unaligned. Force the + * tail's actual content to land at the next XDR position + * after the head instead. + */ + if (skip) { + unsigned char *src, *dst; + unsigned int count; + + src = buf->tail[0].iov_base; + dst = buf->head[0].iov_base; + dst += buf->head[0].iov_len; + + src += skip; + tlen -= skip; + + dprintk("RPC: %s: skip=%zu, memmove(%p, %p, %zu)\n", + __func__, skip, dst, src, tlen); + + for (count = tlen; count; count--) + *dst++ = *src++; + } + + return tlen; +} + /* * Chunk assembly from upper layer xdr_buf. * @@ -122,6 +183,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, if (len && n == nsegs) return -EIO; + /* When encoding the read list, the tail is always sent inline */ + if (type == rpcrdma_readch) + return n; + if (xdrbuf->tail[0].iov_len) { /* the rpcrdma protocol allows us to omit any trailing * xdr pad bytes, saving the server an RDMA operation. */ @@ -297,8 +362,7 @@ out: * pre-registered memory buffer for this request. For small amounts * of data, this is efficient. The cutoff value is tunable. */ -static int -rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad) +static void rpcrdma_inline_pullup(struct rpc_rqst *rqst) { int i, npages, curlen; int copy_len; @@ -310,16 +374,9 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad) destp = rqst->rq_svec[0].iov_base; curlen = rqst->rq_svec[0].iov_len; destp += curlen; - /* - * Do optional padding where it makes sense. Alignment of write - * payload can help the server, if our setting is accurate. - */ - pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/); - if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH) - pad = 0; /* don't pad this request */ - dprintk("RPC: %s: pad %d destp 0x%p len %d hdrlen %d\n", - __func__, pad, destp, rqst->rq_slen, curlen); + dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n", + __func__, destp, rqst->rq_slen, curlen); copy_len = rqst->rq_snd_buf.page_len; @@ -355,7 +412,6 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad) page_base = 0; } /* header now contains entire send message */ - return pad; } /* @@ -380,7 +436,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); char *base; - size_t rpclen, padlen; + size_t rpclen; ssize_t hdrlen; enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; @@ -402,28 +458,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) /* * Chunks needed for results? * + * o Read ops return data as write chunk(s), header as inline. * o If the expected result is under the inline threshold, all ops - * return as inline (but see later). + * return as inline. * o Large non-read ops return as a single reply chunk. - * o Large read ops return data as write chunk(s), header as inline. - * - * Note: the NFS code sending down multiple result segments implies - * the op is one of read, readdir[plus], readlink or NFSv4 getacl. - */ - - /* - * This code can handle read chunks, write chunks OR reply - * chunks -- only one type. If the request is too big to fit - * inline, then we will choose read chunks. If the request is - * a READ, then use write chunks to separate the file data - * into pages; otherwise use reply chunks. */ - if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst)) - wtype = rpcrdma_noch; - else if (rqst->rq_rcv_buf.page_len == 0) - wtype = rpcrdma_replych; - else if (rqst->rq_rcv_buf.flags & XDRBUF_READ) + if (rqst->rq_rcv_buf.flags & XDRBUF_READ) wtype = rpcrdma_writech; + else if (rpcrdma_results_inline(rqst)) + wtype = rpcrdma_noch; else wtype = rpcrdma_replych; @@ -432,21 +475,25 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) * * o If the total request is under the inline threshold, all ops * are sent as inline. - * o Large non-write ops are sent with the entire message as a - * single read chunk (protocol 0-position special case). * o Large write ops transmit data as read chunk(s), header as * inline. + * o Large non-write ops are sent with the entire message as a + * single read chunk (protocol 0-position special case). * - * Note: the NFS code sending down multiple argument segments - * implies the op is a write. - * TBD check NFSv4 setacl + * This assumes that the upper layer does not present a request + * that both has a data payload, and whose non-data arguments + * by themselves are larger than the inline threshold. */ - if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst)) + if (rpcrdma_args_inline(rqst)) { rtype = rpcrdma_noch; - else if (rqst->rq_snd_buf.page_len == 0) - rtype = rpcrdma_areadch; - else + } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) { rtype = rpcrdma_readch; + } else { + r_xprt->rx_stats.nomsg_call_count++; + headerp->rm_type = htonl(RDMA_NOMSG); + rtype = rpcrdma_areadch; + rpclen = 0; + } /* The following simplification is not true forever */ if (rtype != rpcrdma_noch && wtype == rpcrdma_replych) @@ -458,7 +505,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) } hdrlen = RPCRDMA_HDRLEN_MIN; - padlen = 0; /* * Pull up any extra send data into the preregistered buffer. @@ -467,45 +513,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) */ if (rtype == rpcrdma_noch) { - padlen = rpcrdma_inline_pullup(rqst, - RPCRDMA_INLINE_PAD_VALUE(rqst)); - - if (padlen) { - headerp->rm_type = rdma_msgp; - headerp->rm_body.rm_padded.rm_align = - cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst)); - headerp->rm_body.rm_padded.rm_thresh = - cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH); - headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero; - headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero; - headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero; - hdrlen += 2 * sizeof(u32); /* extra words in padhdr */ - if (wtype != rpcrdma_noch) { - dprintk("RPC: %s: invalid chunk list\n", - __func__); - return -EIO; - } - } else { - headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; - headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; - headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero; - /* new length after pullup */ - rpclen = rqst->rq_svec[0].iov_len; - /* - * Currently we try to not actually use read inline. - * Reply chunks have the desirable property that - * they land, packed, directly in the target buffers - * without headers, so they require no fixup. The - * additional RDMA Write op sends the same amount - * of data, streams on-the-wire and adds no overhead - * on receive. Therefore, we request a reply chunk - * for non-writes wherever feasible and efficient. - */ - if (wtype == rpcrdma_noch) - wtype = rpcrdma_replych; - } - } + rpcrdma_inline_pullup(rqst); + headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; + headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; + headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero; + /* new length after pullup */ + rpclen = rqst->rq_svec[0].iov_len; + } else if (rtype == rpcrdma_readch) + rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); if (rtype != rpcrdma_noch) { hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, headerp, rtype); @@ -518,9 +534,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) if (hdrlen < 0) return hdrlen; - dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd" + dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd" " headerp 0x%p base 0x%p lkey 0x%x\n", - __func__, transfertypes[wtype], hdrlen, rpclen, padlen, + __func__, transfertypes[wtype], hdrlen, rpclen, headerp, base, rdmab_lkey(req->rl_rdmabuf)); /* @@ -534,26 +550,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) req->rl_send_iov[0].length = hdrlen; req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); + req->rl_niovs = 1; + if (rtype == rpcrdma_areadch) + return 0; + req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf); req->rl_send_iov[1].length = rpclen; req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf); req->rl_niovs = 2; - - if (padlen) { - struct rpcrdma_ep *ep = &r_xprt->rx_ep; - - req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf); - req->rl_send_iov[2].length = padlen; - req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf); - - req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen; - req->rl_send_iov[3].length = rqst->rq_slen - rpclen; - req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf); - - req->rl_niovs = 4; - } - return 0; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 2e1348bde325..cb5174284074 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -115,15 +115,6 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp, rqstp->rq_arg.tail[0].iov_len = 0; } -static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) -{ - if (!rdma_cap_read_multi_sge(xprt->sc_cm_id->device, - xprt->sc_cm_id->port_num)) - return 1; - else - return min_t(int, sge_count, xprt->sc_max_sge); -} - /* Issue an RDMA_READ using the local lkey to map the data sink */ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, @@ -144,8 +135,7 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt, ctxt->direction = DMA_FROM_DEVICE; ctxt->read_hdr = head; - pages_needed = - min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed)); + pages_needed = min_t(int, pages_needed, xprt->sc_max_sge_rd); read = min_t(int, pages_needed << PAGE_SHIFT, rs_length); for (pno = 0; pno < pages_needed; pno++) { diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index d25cd430f9ff..1dfae8317065 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -136,6 +136,79 @@ static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt, return dma_addr; } +/* Returns the address of the first read chunk or <nul> if no read chunk + * is present + */ +struct rpcrdma_read_chunk * +svc_rdma_get_read_chunk(struct rpcrdma_msg *rmsgp) +{ + struct rpcrdma_read_chunk *ch = + (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; + + if (ch->rc_discrim == xdr_zero) + return NULL; + return ch; +} + +/* Returns the address of the first read write array element or <nul> + * if no write array list is present + */ +static struct rpcrdma_write_array * +svc_rdma_get_write_array(struct rpcrdma_msg *rmsgp) +{ + if (rmsgp->rm_body.rm_chunks[0] != xdr_zero || + rmsgp->rm_body.rm_chunks[1] == xdr_zero) + return NULL; + return (struct rpcrdma_write_array *)&rmsgp->rm_body.rm_chunks[1]; +} + +/* Returns the address of the first reply array element or <nul> if no + * reply array is present + */ +static struct rpcrdma_write_array * +svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp) +{ + struct rpcrdma_read_chunk *rch; + struct rpcrdma_write_array *wr_ary; + struct rpcrdma_write_array *rp_ary; + + /* XXX: Need to fix when reply chunk may occur with read list + * and/or write list. + */ + if (rmsgp->rm_body.rm_chunks[0] != xdr_zero || + rmsgp->rm_body.rm_chunks[1] != xdr_zero) + return NULL; + + rch = svc_rdma_get_read_chunk(rmsgp); + if (rch) { + while (rch->rc_discrim != xdr_zero) + rch++; + + /* The reply chunk follows an empty write array located + * at 'rc_position' here. The reply array is at rc_target. + */ + rp_ary = (struct rpcrdma_write_array *)&rch->rc_target; + goto found_it; + } + + wr_ary = svc_rdma_get_write_array(rmsgp); + if (wr_ary) { + int chunk = be32_to_cpu(wr_ary->wc_nchunks); + + rp_ary = (struct rpcrdma_write_array *) + &wr_ary->wc_array[chunk].wc_target.rs_length; + goto found_it; + } + + /* No read list, no write list */ + rp_ary = (struct rpcrdma_write_array *)&rmsgp->rm_body.rm_chunks[2]; + + found_it: + if (rp_ary->wc_discrim == xdr_zero) + return NULL; + return rp_ary; +} + /* Assumptions: * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE */ @@ -384,6 +457,7 @@ static int send_reply(struct svcxprt_rdma *rdma, int byte_count) { struct ib_send_wr send_wr; + u32 xdr_off; int sge_no; int sge_bytes; int page_no; @@ -418,8 +492,8 @@ static int send_reply(struct svcxprt_rdma *rdma, ctxt->direction = DMA_TO_DEVICE; /* Map the payload indicated by 'byte_count' */ + xdr_off = 0; for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) { - int xdr_off = 0; sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count); byte_count -= sge_bytes; ctxt->sge[sge_no].addr = @@ -457,6 +531,13 @@ static int send_reply(struct svcxprt_rdma *rdma, } rqstp->rq_next_page = rqstp->rq_respages + 1; + /* The loop above bumps sc_dma_used for each sge. The + * xdr_buf.tail gets a separate sge, but resides in the + * same page as xdr_buf.head. Don't count it twice. + */ + if (sge_no > ctxt->count) + atomic_dec(&rdma->sc_dma_used); + if (sge_no > rdma->sc_max_sge) { pr_err("svcrdma: Too many sges (%d)\n", sge_no); goto err; diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 6b36279e4288..fcc3eb80c265 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -91,7 +91,7 @@ struct svc_xprt_class svc_rdma_class = { .xcl_name = "rdma", .xcl_owner = THIS_MODULE, .xcl_ops = &svc_rdma_ops, - .xcl_max_payload = RPCRDMA_MAXPAYLOAD, + .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA, .xcl_ident = XPRT_TRANSPORT_RDMA, }; @@ -659,6 +659,7 @@ static int rdma_cma_handler(struct rdma_cm_id *cma_id, if (xprt) { set_bit(XPT_CLOSE, &xprt->xpt_flags); svc_xprt_enqueue(xprt); + svc_xprt_put(xprt); } break; default: @@ -733,17 +734,19 @@ static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt) struct ib_mr *mr; struct ib_fast_reg_page_list *pl; struct svc_rdma_fastreg_mr *frmr; + u32 num_sg; frmr = kmalloc(sizeof(*frmr), GFP_KERNEL); if (!frmr) goto err; - mr = ib_alloc_fast_reg_mr(xprt->sc_pd, RPCSVC_MAXPAGES); + num_sg = min_t(u32, RPCSVC_MAXPAGES, xprt->sc_frmr_pg_list_len); + mr = ib_alloc_mr(xprt->sc_pd, IB_MR_TYPE_MEM_REG, num_sg); if (IS_ERR(mr)) goto err_free_frmr; pl = ib_alloc_fast_reg_page_list(xprt->sc_cm_id->device, - RPCSVC_MAXPAGES); + num_sg); if (IS_ERR(pl)) goto err_free_mr; @@ -872,6 +875,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) * capabilities of this particular device */ newxprt->sc_max_sge = min((size_t)devattr.max_sge, (size_t)RPCSVC_MAXPAGES); + newxprt->sc_max_sge_rd = min_t(size_t, devattr.max_sge_rd, + RPCSVC_MAXPAGES); newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr, (size_t)svcrdma_max_requests); newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests; @@ -1046,6 +1051,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) " remote_ip : %pI4\n" " remote_port : %d\n" " max_sge : %d\n" + " max_sge_rd : %d\n" " sq_depth : %d\n" " max_requests : %d\n" " ord : %d\n", @@ -1059,6 +1065,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id-> route.addr.dst_addr)->sin_port), newxprt->sc_max_sge, + newxprt->sc_max_sge_rd, newxprt->sc_sq_depth, newxprt->sc_max_requests, newxprt->sc_ord); @@ -1201,40 +1208,6 @@ static int svc_rdma_secure_port(struct svc_rqst *rqstp) return 1; } -/* - * Attempt to register the kvec representing the RPC memory with the - * device. - * - * Returns: - * NULL : The device does not support fastreg or there were no more - * fastreg mr. - * frmr : The kvec register request was successfully posted. - * <0 : An error was encountered attempting to register the kvec. - */ -int svc_rdma_fastreg(struct svcxprt_rdma *xprt, - struct svc_rdma_fastreg_mr *frmr) -{ - struct ib_send_wr fastreg_wr; - u8 key; - - /* Bump the key */ - key = (u8)(frmr->mr->lkey & 0x000000FF); - ib_update_fast_reg_key(frmr->mr, ++key); - - /* Prepare FASTREG WR */ - memset(&fastreg_wr, 0, sizeof fastreg_wr); - fastreg_wr.opcode = IB_WR_FAST_REG_MR; - fastreg_wr.send_flags = IB_SEND_SIGNALED; - fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva; - fastreg_wr.wr.fast_reg.page_list = frmr->page_list; - fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len; - fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT; - fastreg_wr.wr.fast_reg.length = frmr->map_len; - fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags; - fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey; - return svc_rdma_send(xprt, &fastreg_wr); -} - int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) { struct ib_send_wr *bad_wr, *n_wr; diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 680f888a9ddd..64443eb754ad 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -175,10 +175,8 @@ xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap) } static void -xprt_rdma_format_addresses(struct rpc_xprt *xprt) +xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap) { - struct sockaddr *sap = (struct sockaddr *) - &rpcx_to_rdmad(xprt).addr; char buf[128]; switch (sap->sa_family) { @@ -302,7 +300,7 @@ xprt_setup_rdma(struct xprt_create *args) struct rpc_xprt *xprt; struct rpcrdma_xprt *new_xprt; struct rpcrdma_ep *new_ep; - struct sockaddr_in *sin; + struct sockaddr *sap; int rc; if (args->addrlen > sizeof(xprt->addr)) { @@ -333,26 +331,20 @@ xprt_setup_rdma(struct xprt_create *args) * Set up RDMA-specific connect data. */ - /* Put server RDMA address in local cdata */ - memcpy(&cdata.addr, args->dstaddr, args->addrlen); + sap = (struct sockaddr *)&cdata.addr; + memcpy(sap, args->dstaddr, args->addrlen); /* Ensure xprt->addr holds valid server TCP (not RDMA) * address, for any side protocols which peek at it */ xprt->prot = IPPROTO_TCP; xprt->addrlen = args->addrlen; - memcpy(&xprt->addr, &cdata.addr, xprt->addrlen); + memcpy(&xprt->addr, sap, xprt->addrlen); - sin = (struct sockaddr_in *)&cdata.addr; - if (ntohs(sin->sin_port) != 0) + if (rpc_get_port(sap)) xprt_set_bound(xprt); - dprintk("RPC: %s: %pI4:%u\n", - __func__, &sin->sin_addr.s_addr, ntohs(sin->sin_port)); - - /* Set max requests */ cdata.max_requests = xprt->max_reqs; - /* Set some length limits */ cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */ cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */ @@ -375,8 +367,7 @@ xprt_setup_rdma(struct xprt_create *args) new_xprt = rpcx_to_rdmax(xprt); - rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr, - xprt_rdma_memreg_strategy); + rc = rpcrdma_ia_open(new_xprt, sap, xprt_rdma_memreg_strategy); if (rc) goto out1; @@ -409,7 +400,7 @@ xprt_setup_rdma(struct xprt_create *args) INIT_DELAYED_WORK(&new_xprt->rx_connect_worker, xprt_rdma_connect_worker); - xprt_rdma_format_addresses(xprt); + xprt_rdma_format_addresses(xprt, sap); xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt); if (xprt->max_payload == 0) goto out4; @@ -420,6 +411,9 @@ xprt_setup_rdma(struct xprt_create *args) if (!try_module_get(THIS_MODULE)) goto out4; + dprintk("RPC: %s: %s:%s\n", __func__, + xprt->address_strings[RPC_DISPLAY_ADDR], + xprt->address_strings[RPC_DISPLAY_PORT]); return xprt; out4: @@ -653,31 +647,30 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) if (xprt_connected(xprt)) idle_time = (long)(jiffies - xprt->last_used) / HZ; - seq_printf(seq, - "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu " - "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n", - - 0, /* need a local port? */ - xprt->stat.bind_count, - xprt->stat.connect_count, - xprt->stat.connect_time, - idle_time, - xprt->stat.sends, - xprt->stat.recvs, - xprt->stat.bad_xids, - xprt->stat.req_u, - xprt->stat.bklog_u, - - r_xprt->rx_stats.read_chunk_count, - r_xprt->rx_stats.write_chunk_count, - r_xprt->rx_stats.reply_chunk_count, - r_xprt->rx_stats.total_rdma_request, - r_xprt->rx_stats.total_rdma_reply, - r_xprt->rx_stats.pullup_copy_count, - r_xprt->rx_stats.fixup_copy_count, - r_xprt->rx_stats.hardway_register_count, - r_xprt->rx_stats.failed_marshal_count, - r_xprt->rx_stats.bad_reply_count); + seq_puts(seq, "\txprt:\trdma "); + seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ", + 0, /* need a local port? */ + xprt->stat.bind_count, + xprt->stat.connect_count, + xprt->stat.connect_time, + idle_time, + xprt->stat.sends, + xprt->stat.recvs, + xprt->stat.bad_xids, + xprt->stat.req_u, + xprt->stat.bklog_u); + seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu\n", + r_xprt->rx_stats.read_chunk_count, + r_xprt->rx_stats.write_chunk_count, + r_xprt->rx_stats.reply_chunk_count, + r_xprt->rx_stats.total_rdma_request, + r_xprt->rx_stats.total_rdma_reply, + r_xprt->rx_stats.pullup_copy_count, + r_xprt->rx_stats.fixup_copy_count, + r_xprt->rx_stats.hardway_register_count, + r_xprt->rx_stats.failed_marshal_count, + r_xprt->rx_stats.bad_reply_count, + r_xprt->rx_stats.nomsg_call_count); } static int diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 891c4ede2c20..682996779970 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -52,6 +52,7 @@ #include <linux/prefetch.h> #include <linux/sunrpc/addr.h> #include <asm/bitops.h> +#include <linux/module.h> /* try_module_get()/module_put() */ #include "xprt_rdma.h" @@ -414,6 +415,14 @@ connected: return 0; } +static void rpcrdma_destroy_id(struct rdma_cm_id *id) +{ + if (id) { + module_put(id->device->owner); + rdma_destroy_id(id); + } +} + static struct rdma_cm_id * rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia, struct sockaddr *addr) @@ -440,6 +449,17 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, } wait_for_completion_interruptible_timeout(&ia->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); + + /* FIXME: + * Until xprtrdma supports DEVICE_REMOVAL, the provider must + * be pinned while there are active NFS/RDMA mounts to prevent + * hangs and crashes at umount time. + */ + if (!ia->ri_async_rc && !try_module_get(id->device->owner)) { + dprintk("RPC: %s: Failed to get device module\n", + __func__); + ia->ri_async_rc = -ENODEV; + } rc = ia->ri_async_rc; if (rc) goto out; @@ -449,16 +469,17 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, if (rc) { dprintk("RPC: %s: rdma_resolve_route() failed %i\n", __func__, rc); - goto out; + goto put; } wait_for_completion_interruptible_timeout(&ia->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); rc = ia->ri_async_rc; if (rc) - goto out; + goto put; return id; - +put: + module_put(id->device->owner); out: rdma_destroy_id(id); return ERR_PTR(rc); @@ -493,9 +514,11 @@ rpcrdma_clean_cq(struct ib_cq *cq) int rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) { - int rc, mem_priv; struct rpcrdma_ia *ia = &xprt->rx_ia; struct ib_device_attr *devattr = &ia->ri_devattr; + int rc; + + ia->ri_dma_mr = NULL; ia->ri_id = rpcrdma_create_id(xprt, ia, addr); if (IS_ERR(ia->ri_id)) { @@ -519,11 +542,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) goto out3; } - if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) { - ia->ri_have_dma_lkey = 1; - ia->ri_dma_lkey = ia->ri_device->local_dma_lkey; - } - if (memreg == RPCRDMA_FRMR) { /* Requires both frmr reg and local dma lkey */ if (((devattr->device_cap_flags & @@ -539,42 +557,19 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) if (!ia->ri_device->alloc_fmr) { dprintk("RPC: %s: MTHCAFMR registration " "not supported by HCA\n", __func__); - memreg = RPCRDMA_ALLPHYSICAL; + goto out3; } } - /* - * Optionally obtain an underlying physical identity mapping in - * order to do a memory window-based bind. This base registration - * is protected from remote access - that is enabled only by binding - * for the specific bytes targeted during each RPC operation, and - * revoked after the corresponding completion similar to a storage - * adapter. - */ switch (memreg) { case RPCRDMA_FRMR: ia->ri_ops = &rpcrdma_frwr_memreg_ops; break; case RPCRDMA_ALLPHYSICAL: ia->ri_ops = &rpcrdma_physical_memreg_ops; - mem_priv = IB_ACCESS_LOCAL_WRITE | - IB_ACCESS_REMOTE_WRITE | - IB_ACCESS_REMOTE_READ; - goto register_setup; + break; case RPCRDMA_MTHCAFMR: ia->ri_ops = &rpcrdma_fmr_memreg_ops; - if (ia->ri_have_dma_lkey) - break; - mem_priv = IB_ACCESS_LOCAL_WRITE; - register_setup: - ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv); - if (IS_ERR(ia->ri_bind_mem)) { - printk(KERN_ALERT "%s: ib_get_dma_mr for " - "phys register failed with %lX\n", - __func__, PTR_ERR(ia->ri_bind_mem)); - rc = -ENOMEM; - goto out3; - } break; default: printk(KERN_ERR "RPC: Unsupported memory " @@ -592,7 +587,7 @@ out3: ib_dealloc_pd(ia->ri_pd); ia->ri_pd = NULL; out2: - rdma_destroy_id(ia->ri_id); + rpcrdma_destroy_id(ia->ri_id); ia->ri_id = NULL; out1: return rc; @@ -606,25 +601,17 @@ out1: void rpcrdma_ia_close(struct rpcrdma_ia *ia) { - int rc; - dprintk("RPC: %s: entering\n", __func__); - if (ia->ri_bind_mem != NULL) { - rc = ib_dereg_mr(ia->ri_bind_mem); - dprintk("RPC: %s: ib_dereg_mr returned %i\n", - __func__, rc); - } - if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { if (ia->ri_id->qp) rdma_destroy_qp(ia->ri_id); - rdma_destroy_id(ia->ri_id); + rpcrdma_destroy_id(ia->ri_id); ia->ri_id = NULL; } /* If the pd is still busy, xprtrdma missed freeing a resource */ if (ia->ri_pd && !IS_ERR(ia->ri_pd)) - WARN_ON(ib_dealloc_pd(ia->ri_pd)); + ib_dealloc_pd(ia->ri_pd); } /* @@ -639,6 +626,12 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct ib_cq_init_attr cq_attr = {}; int rc, err; + if (devattr->max_sge < RPCRDMA_MAX_IOVS) { + dprintk("RPC: %s: insufficient sge's available\n", + __func__); + return -ENOMEM; + } + /* check provider's send/recv wr limits */ if (cdata->max_requests > devattr->max_qp_wr) cdata->max_requests = devattr->max_qp_wr; @@ -651,21 +644,13 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, if (rc) return rc; ep->rep_attr.cap.max_recv_wr = cdata->max_requests; - ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2); + ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; ep->rep_attr.cap.max_recv_sge = 1; ep->rep_attr.cap.max_inline_data = 0; ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; ep->rep_attr.qp_type = IB_QPT_RC; ep->rep_attr.port_num = ~0; - if (cdata->padding) { - ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding, - GFP_KERNEL); - if (IS_ERR(ep->rep_padbuf)) - return PTR_ERR(ep->rep_padbuf); - } else - ep->rep_padbuf = NULL; - dprintk("RPC: %s: requested max: dtos: send %d recv %d; " "iovs: send %d recv %d\n", __func__, @@ -748,7 +733,8 @@ out2: dprintk("RPC: %s: ib_destroy_cq returned %i\n", __func__, err); out1: - rpcrdma_free_regbuf(ia, ep->rep_padbuf); + if (ia->ri_dma_mr) + ib_dereg_mr(ia->ri_dma_mr); return rc; } @@ -775,8 +761,6 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ia->ri_id->qp = NULL; } - rpcrdma_free_regbuf(ia, ep->rep_padbuf); - rpcrdma_clean_cq(ep->rep_attr.recv_cq); rc = ib_destroy_cq(ep->rep_attr.recv_cq); if (rc) @@ -788,6 +772,12 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) if (rc) dprintk("RPC: %s: ib_destroy_cq returned %i\n", __func__, rc); + + if (ia->ri_dma_mr) { + rc = ib_dereg_mr(ia->ri_dma_mr); + dprintk("RPC: %s: ib_dereg_mr returned %i\n", + __func__, rc); + } } /* @@ -825,7 +815,7 @@ retry: if (ia->ri_device != id->device) { printk("RPC: %s: can't reconnect on " "different device!\n", __func__); - rdma_destroy_id(id); + rpcrdma_destroy_id(id); rc = -ENETUNREACH; goto out; } @@ -834,7 +824,7 @@ retry: if (rc) { dprintk("RPC: %s: rdma_create_qp failed %i\n", __func__, rc); - rdma_destroy_id(id); + rpcrdma_destroy_id(id); rc = -ENETUNREACH; goto out; } @@ -845,7 +835,7 @@ retry: write_unlock(&ia->ri_qplock); rdma_destroy_qp(old); - rdma_destroy_id(old); + rpcrdma_destroy_id(old); } else { dprintk("RPC: %s: connecting...\n", __func__); rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); @@ -1229,75 +1219,6 @@ rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg) (unsigned long long)seg->mr_dma, seg->mr_dmalen); } -static int -rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len, - struct ib_mr **mrp, struct ib_sge *iov) -{ - struct ib_phys_buf ipb; - struct ib_mr *mr; - int rc; - - /* - * All memory passed here was kmalloc'ed, therefore phys-contiguous. - */ - iov->addr = ib_dma_map_single(ia->ri_device, - va, len, DMA_BIDIRECTIONAL); - if (ib_dma_mapping_error(ia->ri_device, iov->addr)) - return -ENOMEM; - - iov->length = len; - - if (ia->ri_have_dma_lkey) { - *mrp = NULL; - iov->lkey = ia->ri_dma_lkey; - return 0; - } else if (ia->ri_bind_mem != NULL) { - *mrp = NULL; - iov->lkey = ia->ri_bind_mem->lkey; - return 0; - } - - ipb.addr = iov->addr; - ipb.size = iov->length; - mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1, - IB_ACCESS_LOCAL_WRITE, &iov->addr); - - dprintk("RPC: %s: phys convert: 0x%llx " - "registered 0x%llx length %d\n", - __func__, (unsigned long long)ipb.addr, - (unsigned long long)iov->addr, len); - - if (IS_ERR(mr)) { - *mrp = NULL; - rc = PTR_ERR(mr); - dprintk("RPC: %s: failed with %i\n", __func__, rc); - } else { - *mrp = mr; - iov->lkey = mr->lkey; - rc = 0; - } - - return rc; -} - -static int -rpcrdma_deregister_internal(struct rpcrdma_ia *ia, - struct ib_mr *mr, struct ib_sge *iov) -{ - int rc; - - ib_dma_unmap_single(ia->ri_device, - iov->addr, iov->length, DMA_BIDIRECTIONAL); - - if (NULL == mr) - return 0; - - rc = ib_dereg_mr(mr); - if (rc) - dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc); - return rc; -} - /** * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers * @ia: controlling rpcrdma_ia @@ -1317,26 +1238,29 @@ struct rpcrdma_regbuf * rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags) { struct rpcrdma_regbuf *rb; - int rc; + struct ib_sge *iov; - rc = -ENOMEM; rb = kmalloc(sizeof(*rb) + size, flags); if (rb == NULL) goto out; - rb->rg_size = size; - rb->rg_owner = NULL; - rc = rpcrdma_register_internal(ia, rb->rg_base, size, - &rb->rg_mr, &rb->rg_iov); - if (rc) + iov = &rb->rg_iov; + iov->addr = ib_dma_map_single(ia->ri_device, + (void *)rb->rg_base, size, + DMA_BIDIRECTIONAL); + if (ib_dma_mapping_error(ia->ri_device, iov->addr)) goto out_free; + iov->length = size; + iov->lkey = ia->ri_dma_lkey; + rb->rg_size = size; + rb->rg_owner = NULL; return rb; out_free: kfree(rb); out: - return ERR_PTR(rc); + return ERR_PTR(-ENOMEM); } /** @@ -1347,10 +1271,15 @@ out: void rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) { - if (rb) { - rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov); - kfree(rb); - } + struct ib_sge *iov; + + if (!rb) + return; + + iov = &rb->rg_iov; + ib_dma_unmap_single(ia->ri_device, + iov->addr, iov->length, DMA_BIDIRECTIONAL); + kfree(rb); } /* @@ -1363,9 +1292,11 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_req *req) { + struct ib_device *device = ia->ri_device; struct ib_send_wr send_wr, *send_wr_fail; struct rpcrdma_rep *rep = req->rl_reply; - int rc; + struct ib_sge *iov = req->rl_send_iov; + int i, rc; if (rep) { rc = rpcrdma_ep_post_recv(ia, ep, rep); @@ -1376,22 +1307,15 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, send_wr.next = NULL; send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION; - send_wr.sg_list = req->rl_send_iov; + send_wr.sg_list = iov; send_wr.num_sge = req->rl_niovs; send_wr.opcode = IB_WR_SEND; - if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */ - ib_dma_sync_single_for_device(ia->ri_device, - req->rl_send_iov[3].addr, - req->rl_send_iov[3].length, - DMA_TO_DEVICE); - ib_dma_sync_single_for_device(ia->ri_device, - req->rl_send_iov[1].addr, - req->rl_send_iov[1].length, - DMA_TO_DEVICE); - ib_dma_sync_single_for_device(ia->ri_device, - req->rl_send_iov[0].addr, - req->rl_send_iov[0].length, - DMA_TO_DEVICE); + + for (i = 0; i < send_wr.num_sge; i++) + ib_dma_sync_single_for_device(device, iov[i].addr, + iov[i].length, DMA_TO_DEVICE); + dprintk("RPC: %s: posting %d s/g entries\n", + __func__, send_wr.num_sge); if (DECR_CQCOUNT(ep) > 0) send_wr.send_flags = 0; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index f49dd8b38122..02512221b8bc 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -51,7 +51,6 @@ #include <linux/sunrpc/clnt.h> /* rpc_xprt */ #include <linux/sunrpc/rpc_rdma.h> /* RPC/RDMA protocol */ #include <linux/sunrpc/xprtrdma.h> /* xprt parameters */ -#include <linux/sunrpc/svc.h> /* RPCSVC_MAXPAYLOAD */ #define RDMA_RESOLVE_TIMEOUT (5000) /* 5 seconds */ #define RDMA_CONNECT_RETRY_MAX (2) /* retries if no listener backlog */ @@ -65,9 +64,8 @@ struct rpcrdma_ia { struct ib_device *ri_device; struct rdma_cm_id *ri_id; struct ib_pd *ri_pd; - struct ib_mr *ri_bind_mem; + struct ib_mr *ri_dma_mr; u32 ri_dma_lkey; - int ri_have_dma_lkey; struct completion ri_done; int ri_async_rc; unsigned int ri_max_frmr_depth; @@ -89,7 +87,6 @@ struct rpcrdma_ep { int rep_connected; struct ib_qp_init_attr rep_attr; wait_queue_head_t rep_connect_wait; - struct rpcrdma_regbuf *rep_padbuf; struct rdma_conn_param rep_remote_cma; struct sockaddr_storage rep_remote_addr; struct delayed_work rep_connect_worker; @@ -119,7 +116,6 @@ struct rpcrdma_ep { struct rpcrdma_regbuf { size_t rg_size; struct rpcrdma_req *rg_owner; - struct ib_mr *rg_mr; struct ib_sge rg_iov; __be32 rg_base[0] __attribute__ ((aligned(256))); }; @@ -165,8 +161,7 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) * struct rpcrdma_buffer. N is the max number of outstanding requests. */ -/* temporary static scatter/gather max */ -#define RPCRDMA_MAX_DATA_SEGS (64) /* max scatter/gather */ +#define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE) #define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */ struct rpcrdma_buffer; @@ -258,16 +253,18 @@ struct rpcrdma_mr_seg { /* chunk descriptors */ char *mr_offset; /* kva if no page, else offset */ }; +#define RPCRDMA_MAX_IOVS (2) + struct rpcrdma_req { - unsigned int rl_niovs; /* 0, 2 or 4 */ - unsigned int rl_nchunks; /* non-zero if chunks */ - unsigned int rl_connect_cookie; /* retry detection */ - struct rpcrdma_buffer *rl_buffer; /* home base for this structure */ + unsigned int rl_niovs; + unsigned int rl_nchunks; + unsigned int rl_connect_cookie; + struct rpcrdma_buffer *rl_buffer; struct rpcrdma_rep *rl_reply;/* holder for reply buffer */ - struct ib_sge rl_send_iov[4]; /* for active requests */ - struct rpcrdma_regbuf *rl_rdmabuf; - struct rpcrdma_regbuf *rl_sendbuf; - struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS]; + struct rpcrdma_regbuf *rl_rdmabuf; + struct rpcrdma_regbuf *rl_sendbuf; + struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; }; static inline struct rpcrdma_req * @@ -342,6 +339,7 @@ struct rpcrdma_stats { unsigned long hardway_register_count; unsigned long failed_marshal_count; unsigned long bad_reply_count; + unsigned long nomsg_call_count; }; /* diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 0030376327b7..7be90bc1a7c2 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -822,6 +822,8 @@ static void xs_reset_transport(struct sock_xprt *transport) if (atomic_read(&transport->xprt.swapper)) sk_clear_memalloc(sk); + kernel_sock_shutdown(sock, SHUT_RDWR); + write_lock_bh(&sk->sk_callback_lock); transport->inet = NULL; transport->sock = NULL; @@ -829,6 +831,7 @@ static void xs_reset_transport(struct sock_xprt *transport) sk->sk_user_data = NULL; xs_restore_old_callbacks(transport, sk); + xprt_clear_connected(xprt); write_unlock_bh(&sk->sk_callback_lock); xs_sock_reset_connection_flags(xprt); @@ -1866,7 +1869,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, sk->sk_data_ready = xs_local_data_ready; sk->sk_write_space = xs_udp_write_space; sk->sk_error_report = xs_error_report; - sk->sk_allocation = GFP_ATOMIC; + sk->sk_allocation = GFP_NOIO; xprt_clear_connected(xprt); @@ -2051,7 +2054,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) sk->sk_user_data = xprt; sk->sk_data_ready = xs_udp_data_ready; sk->sk_write_space = xs_udp_write_space; - sk->sk_allocation = GFP_ATOMIC; + sk->sk_allocation = GFP_NOIO; xprt_set_connected(xprt); @@ -2153,7 +2156,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) sk->sk_state_change = xs_tcp_state_change; sk->sk_write_space = xs_tcp_write_space; sk->sk_error_report = xs_error_report; - sk->sk_allocation = GFP_ATOMIC; + sk->sk_allocation = GFP_NOIO; /* socket options */ sock_reset_flag(sk, SOCK_LINGER); @@ -2279,13 +2282,14 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport)); - /* Start by resetting any existing state */ - xs_reset_transport(transport); - - if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) { + if (transport->sock != NULL) { dprintk("RPC: xs_connect delayed xprt %p for %lu " "seconds\n", xprt, xprt->reestablish_timeout / HZ); + + /* Start by resetting any existing state */ + xs_reset_transport(transport); + queue_delayed_work(rpciod_workqueue, &transport->connect_worker, xprt->reestablish_timeout); |