diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/9p/client.c | 12 | ||||
-rw-r--r-- | net/ceph/ceph_common.c | 66 | ||||
-rw-r--r-- | net/ceph/crush/crush.c | 13 | ||||
-rw-r--r-- | net/ceph/crush/crush_ln_table.h | 32 | ||||
-rw-r--r-- | net/ceph/crush/hash.c | 8 | ||||
-rw-r--r-- | net/ceph/crush/mapper.c | 148 | ||||
-rw-r--r-- | net/ceph/messenger.c | 27 | ||||
-rw-r--r-- | net/ceph/mon_client.c | 13 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 42 | ||||
-rw-r--r-- | net/ceph/osdmap.c | 2 | ||||
-rw-r--r-- | net/ceph/pagevec.c | 5 | ||||
-rw-r--r-- | net/sunrpc/Makefile | 2 | ||||
-rw-r--r-- | net/sunrpc/backchannel_rqst.c | 134 | ||||
-rw-r--r-- | net/sunrpc/bc_svc.c | 63 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 109 | ||||
-rw-r--r-- | net/sunrpc/debugfs.c | 78 | ||||
-rw-r--r-- | net/sunrpc/svc.c | 36 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 7 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 120 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 227 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/physical_ops.c | 14 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 8 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 43 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 253 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 38 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 151 |
26 files changed, 985 insertions, 666 deletions
diff --git a/net/9p/client.c b/net/9p/client.c index 6f4c4c88db84..498454b3c06c 100644 --- a/net/9p/client.c +++ b/net/9p/client.c @@ -843,7 +843,8 @@ static struct p9_req_t *p9_client_zc_rpc(struct p9_client *c, int8_t type, if (err < 0) { if (err == -EIO) c->status = Disconnected; - goto reterr; + if (err != -ERESTARTSYS) + goto reterr; } if (req->status == REQ_STATUS_ERROR) { p9_debug(P9_DEBUG_ERROR, "req_status error %d\n", req->t_err); @@ -1582,6 +1583,10 @@ p9_client_read(struct p9_fid *fid, u64 offset, struct iov_iter *to, int *err) p9_free_req(clnt, req); break; } + if (rsize < count) { + pr_err("bogus RREAD count (%d > %d)\n", count, rsize); + count = rsize; + } p9_debug(P9_DEBUG_9P, "<<< RREAD count %d\n", count); if (!count) { @@ -1647,6 +1652,11 @@ p9_client_write(struct p9_fid *fid, u64 offset, struct iov_iter *from, int *err) if (*err) { trace_9p_protocol_dump(clnt, req->rc); p9_free_req(clnt, req); + break; + } + if (rsize < count) { + pr_err("bogus RWRITE count (%d > %d)\n", count, rsize); + count = rsize; } p9_debug(P9_DEBUG_9P, "<<< RWRITE count %d\n", count); diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 79e8f71aef5b..f30329f72641 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -9,6 +9,7 @@ #include <keys/ceph-type.h> #include <linux/module.h> #include <linux/mount.h> +#include <linux/nsproxy.h> #include <linux/parser.h> #include <linux/sched.h> #include <linux/seq_file.h> @@ -16,8 +17,6 @@ #include <linux/statfs.h> #include <linux/string.h> #include <linux/vmalloc.h> -#include <linux/nsproxy.h> -#include <net/net_namespace.h> #include <linux/ceph/ceph_features.h> @@ -131,6 +130,13 @@ int ceph_compare_options(struct ceph_options *new_opt, int i; int ret; + /* + * Don't bother comparing options if network namespaces don't + * match. + */ + if (!net_eq(current->nsproxy->net_ns, read_pnet(&client->msgr.net))) + return -1; + ret = memcmp(opt1, opt2, ofs); if (ret) return ret; @@ -335,9 +341,6 @@ ceph_parse_options(char *options, const char *dev_name, int err = -ENOMEM; substring_t argstr[MAX_OPT_ARGS]; - if (current->nsproxy->net_ns != &init_net) - return ERR_PTR(-EINVAL); - opt = kzalloc(sizeof(*opt), GFP_KERNEL); if (!opt) return ERR_PTR(-ENOMEM); @@ -352,8 +355,8 @@ ceph_parse_options(char *options, const char *dev_name, /* start with defaults */ opt->flags = CEPH_OPT_DEFAULT; opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; - opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */ - opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */ + opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; + opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* get mon ip(s) */ /* ip1[:port1][,ip2[:port2]...] */ @@ -439,13 +442,32 @@ ceph_parse_options(char *options, const char *dev_name, pr_warn("ignoring deprecated osdtimeout option\n"); break; case Opt_osdkeepalivetimeout: - opt->osd_keepalive_timeout = intval; + /* 0 isn't well defined right now, reject it */ + if (intval < 1 || intval > INT_MAX / 1000) { + pr_err("osdkeepalive out of range\n"); + err = -EINVAL; + goto out; + } + opt->osd_keepalive_timeout = + msecs_to_jiffies(intval * 1000); break; case Opt_osd_idle_ttl: - opt->osd_idle_ttl = intval; + /* 0 isn't well defined right now, reject it */ + if (intval < 1 || intval > INT_MAX / 1000) { + pr_err("osd_idle_ttl out of range\n"); + err = -EINVAL; + goto out; + } + opt->osd_idle_ttl = msecs_to_jiffies(intval * 1000); break; case Opt_mount_timeout: - opt->mount_timeout = intval; + /* 0 is "wait forever" (i.e. infinite timeout) */ + if (intval < 0 || intval > INT_MAX / 1000) { + pr_err("mount_timeout out of range\n"); + err = -EINVAL; + goto out; + } + opt->mount_timeout = msecs_to_jiffies(intval * 1000); break; case Opt_share: @@ -512,12 +534,14 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client) seq_puts(m, "notcp_nodelay,"); if (opt->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT) - seq_printf(m, "mount_timeout=%d,", opt->mount_timeout); + seq_printf(m, "mount_timeout=%d,", + jiffies_to_msecs(opt->mount_timeout) / 1000); if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT) - seq_printf(m, "osd_idle_ttl=%d,", opt->osd_idle_ttl); + seq_printf(m, "osd_idle_ttl=%d,", + jiffies_to_msecs(opt->osd_idle_ttl) / 1000); if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT) seq_printf(m, "osdkeepalivetimeout=%d,", - opt->osd_keepalive_timeout); + jiffies_to_msecs(opt->osd_keepalive_timeout) / 1000); /* drop redundant comma */ if (m->count != pos) @@ -587,6 +611,7 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, fail_monc: ceph_monc_stop(&client->monc); fail: + ceph_messenger_fini(&client->msgr); kfree(client); return ERR_PTR(err); } @@ -600,8 +625,8 @@ void ceph_destroy_client(struct ceph_client *client) /* unmount */ ceph_osdc_stop(&client->osdc); - ceph_monc_stop(&client->monc); + ceph_messenger_fini(&client->msgr); ceph_debugfs_client_cleanup(client); @@ -626,8 +651,8 @@ static int have_mon_and_osd_map(struct ceph_client *client) */ int __ceph_open_session(struct ceph_client *client, unsigned long started) { - int err; - unsigned long timeout = client->options->mount_timeout * HZ; + unsigned long timeout = client->options->mount_timeout; + long err; /* open session, and wait for mon and osd maps */ err = ceph_monc_open_session(&client->monc); @@ -635,16 +660,15 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started) return err; while (!have_mon_and_osd_map(client)) { - err = -EIO; if (timeout && time_after_eq(jiffies, started + timeout)) - return err; + return -ETIMEDOUT; /* wait */ dout("mount waiting for mon_map\n"); err = wait_event_interruptible_timeout(client->auth_wq, have_mon_and_osd_map(client) || (client->auth_err < 0), - timeout); - if (err == -EINTR || err == -ERESTARTSYS) + ceph_timeout_jiffies(timeout)); + if (err < 0) return err; if (client->auth_err < 0) return client->auth_err; @@ -721,5 +745,5 @@ module_exit(exit_ceph_lib); MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); MODULE_AUTHOR("Patience Warnick <patience@newdream.net>"); -MODULE_DESCRIPTION("Ceph filesystem for Linux"); +MODULE_DESCRIPTION("Ceph core library"); MODULE_LICENSE("GPL"); diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c index 9d84ce4ea0df..80d7c3a97cb8 100644 --- a/net/ceph/crush/crush.c +++ b/net/ceph/crush/crush.c @@ -1,15 +1,11 @@ - #ifdef __KERNEL__ # include <linux/slab.h> +# include <linux/crush/crush.h> #else -# include <stdlib.h> -# include <assert.h> -# define kfree(x) do { if (x) free(x); } while (0) -# define BUG_ON(x) assert(!(x)) +# include "crush_compat.h" +# include "crush.h" #endif -#include <linux/crush/crush.h> - const char *crush_bucket_alg_name(int alg) { switch (alg) { @@ -134,6 +130,9 @@ void crush_destroy(struct crush_map *map) kfree(map->rules); } +#ifndef __KERNEL__ + kfree(map->choose_tries); +#endif kfree(map); } diff --git a/net/ceph/crush/crush_ln_table.h b/net/ceph/crush/crush_ln_table.h index 6192c7fc958c..aae534c901a4 100644 --- a/net/ceph/crush/crush_ln_table.h +++ b/net/ceph/crush/crush_ln_table.h @@ -10,20 +10,20 @@ * */ -#if defined(__linux__) -#include <linux/types.h> -#elif defined(__FreeBSD__) -#include <sys/types.h> -#endif - #ifndef CEPH_CRUSH_LN_H #define CEPH_CRUSH_LN_H +#ifdef __KERNEL__ +# include <linux/types.h> +#else +# include "crush_compat.h" +#endif -// RH_LH_tbl[2*k] = 2^48/(1.0+k/128.0) -// RH_LH_tbl[2*k+1] = 2^48*log2(1.0+k/128.0) - -static int64_t __RH_LH_tbl[128*2+2] = { +/* + * RH_LH_tbl[2*k] = 2^48/(1.0+k/128.0) + * RH_LH_tbl[2*k+1] = 2^48*log2(1.0+k/128.0) + */ +static __s64 __RH_LH_tbl[128*2+2] = { 0x0001000000000000ll, 0x0000000000000000ll, 0x0000fe03f80fe040ll, 0x000002dfca16dde1ll, 0x0000fc0fc0fc0fc1ll, 0x000005b9e5a170b4ll, 0x0000fa232cf25214ll, 0x0000088e68ea899all, 0x0000f83e0f83e0f9ll, 0x00000b5d69bac77ell, 0x0000f6603d980f67ll, 0x00000e26fd5c8555ll, @@ -89,11 +89,12 @@ static int64_t __RH_LH_tbl[128*2+2] = { 0x0000820820820821ll, 0x0000fa2f045e7832ll, 0x000081848da8faf1ll, 0x0000fba577877d7dll, 0x0000810204081021ll, 0x0000fd1a708bbe11ll, 0x0000808080808081ll, 0x0000fe8df263f957ll, 0x0000800000000000ll, 0x0000ffff00000000ll, - }; - +}; - // LL_tbl[k] = 2^48*log2(1.0+k/2^15); -static int64_t __LL_tbl[256] = { +/* + * LL_tbl[k] = 2^48*log2(1.0+k/2^15) + */ +static __s64 __LL_tbl[256] = { 0x0000000000000000ull, 0x00000002e2a60a00ull, 0x000000070cb64ec5ull, 0x00000009ef50ce67ull, 0x0000000cd1e588fdull, 0x0000000fb4747e9cull, 0x0000001296fdaf5eull, 0x0000001579811b58ull, 0x000000185bfec2a1ull, 0x0000001b3e76a552ull, 0x0000001e20e8c380ull, 0x0000002103551d43ull, @@ -160,7 +161,4 @@ static int64_t __LL_tbl[256] = { 0x000002d4562d2ec6ull, 0x000002d73330209dull, 0x000002da102d63b0ull, 0x000002dced24f814ull, }; - - - #endif diff --git a/net/ceph/crush/hash.c b/net/ceph/crush/hash.c index 5bb63e37a8a1..ed123af49eba 100644 --- a/net/ceph/crush/hash.c +++ b/net/ceph/crush/hash.c @@ -1,6 +1,8 @@ - -#include <linux/types.h> -#include <linux/crush/hash.h> +#ifdef __KERNEL__ +# include <linux/crush/hash.h> +#else +# include "hash.h" +#endif /* * Robert Jenkins' function for mixing 32-bit values diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 5b47736d27d9..393bfb22d5bb 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -1,27 +1,31 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Intel Corporation All Rights Reserved + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ #ifdef __KERNEL__ # include <linux/string.h> # include <linux/slab.h> # include <linux/bug.h> # include <linux/kernel.h> -# ifndef dprintk -# define dprintk(args...) -# endif +# include <linux/crush/crush.h> +# include <linux/crush/hash.h> #else -# include <string.h> -# include <stdio.h> -# include <stdlib.h> -# include <assert.h> -# define BUG_ON(x) assert(!(x)) -# define dprintk(args...) /* printf(args) */ -# define kmalloc(x, f) malloc(x) -# define kfree(x) free(x) +# include "crush_compat.h" +# include "crush.h" +# include "hash.h" #endif - -#include <linux/crush/crush.h> -#include <linux/crush/hash.h> #include "crush_ln_table.h" +#define dprintk(args...) /* printf(args) */ + /* * Implement the core CRUSH mapping algorithm. */ @@ -139,7 +143,7 @@ static int bucket_list_choose(struct crush_bucket_list *bucket, int i; for (i = bucket->h.size-1; i >= 0; i--) { - __u64 w = crush_hash32_4(bucket->h.hash,x, bucket->h.items[i], + __u64 w = crush_hash32_4(bucket->h.hash, x, bucket->h.items[i], r, bucket->h.id); w &= 0xffff; dprintk("list_choose i=%d x=%d r=%d item %d weight %x " @@ -238,43 +242,46 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket, return bucket->h.items[high]; } -// compute 2^44*log2(input+1) -uint64_t crush_ln(unsigned xin) +/* compute 2^44*log2(input+1) */ +static __u64 crush_ln(unsigned int xin) { - unsigned x=xin, x1; - int iexpon, index1, index2; - uint64_t RH, LH, LL, xl64, result; + unsigned int x = xin, x1; + int iexpon, index1, index2; + __u64 RH, LH, LL, xl64, result; - x++; + x++; - // normalize input - iexpon = 15; - while(!(x&0x18000)) { x<<=1; iexpon--; } + /* normalize input */ + iexpon = 15; + while (!(x & 0x18000)) { + x <<= 1; + iexpon--; + } - index1 = (x>>8)<<1; - // RH ~ 2^56/index1 - RH = __RH_LH_tbl[index1 - 256]; - // LH ~ 2^48 * log2(index1/256) - LH = __RH_LH_tbl[index1 + 1 - 256]; + index1 = (x >> 8) << 1; + /* RH ~ 2^56/index1 */ + RH = __RH_LH_tbl[index1 - 256]; + /* LH ~ 2^48 * log2(index1/256) */ + LH = __RH_LH_tbl[index1 + 1 - 256]; - // RH*x ~ 2^48 * (2^15 + xf), xf<2^8 - xl64 = (int64_t)x * RH; - xl64 >>= 48; - x1 = xl64; + /* RH*x ~ 2^48 * (2^15 + xf), xf<2^8 */ + xl64 = (__s64)x * RH; + xl64 >>= 48; + x1 = xl64; - result = iexpon; - result <<= (12 + 32); + result = iexpon; + result <<= (12 + 32); - index2 = x1 & 0xff; - // LL ~ 2^48*log2(1.0+index2/2^15) - LL = __LL_tbl[index2]; + index2 = x1 & 0xff; + /* LL ~ 2^48*log2(1.0+index2/2^15) */ + LL = __LL_tbl[index2]; - LH = LH + LL; + LH = LH + LL; - LH >>= (48-12 - 32); - result += LH; + LH >>= (48 - 12 - 32); + result += LH; - return result; + return result; } @@ -290,9 +297,9 @@ uint64_t crush_ln(unsigned xin) static int bucket_straw2_choose(struct crush_bucket_straw2 *bucket, int x, int r) { - unsigned i, high = 0; - unsigned u; - unsigned w; + unsigned int i, high = 0; + unsigned int u; + unsigned int w; __s64 ln, draw, high_draw = 0; for (i = 0; i < bucket->h.size; i++) { @@ -567,6 +574,10 @@ reject: out[outpos] = item; outpos++; count--; +#ifndef __KERNEL__ + if (map->choose_tries && ftotal <= map->choose_total_tries) + map->choose_tries[ftotal]++; +#endif } dprintk("CHOOSE returns %d\n", outpos); @@ -610,6 +621,20 @@ static void crush_choose_indep(const struct crush_map *map, } for (ftotal = 0; left > 0 && ftotal < tries; ftotal++) { +#ifdef DEBUG_INDEP + if (out2 && ftotal) { + dprintk("%u %d a: ", ftotal, left); + for (rep = outpos; rep < endpos; rep++) { + dprintk(" %d", out[rep]); + } + dprintk("\n"); + dprintk("%u %d b: ", ftotal, left); + for (rep = outpos; rep < endpos; rep++) { + dprintk(" %d", out2[rep]); + } + dprintk("\n"); + } +#endif for (rep = outpos; rep < endpos; rep++) { if (out[rep] != CRUSH_ITEM_UNDEF) continue; @@ -726,6 +751,24 @@ static void crush_choose_indep(const struct crush_map *map, out2[rep] = CRUSH_ITEM_NONE; } } +#ifndef __KERNEL__ + if (map->choose_tries && ftotal <= map->choose_total_tries) + map->choose_tries[ftotal]++; +#endif +#ifdef DEBUG_INDEP + if (out2) { + dprintk("%u %d a: ", ftotal, left); + for (rep = outpos; rep < endpos; rep++) { + dprintk(" %d", out[rep]); + } + dprintk("\n"); + dprintk("%u %d b: ", ftotal, left); + for (rep = outpos; rep < endpos; rep++) { + dprintk(" %d", out2[rep]); + } + dprintk("\n"); + } +#endif } /** @@ -790,8 +833,15 @@ int crush_do_rule(const struct crush_map *map, switch (curstep->op) { case CRUSH_RULE_TAKE: - w[0] = curstep->arg1; - wsize = 1; + if ((curstep->arg1 >= 0 && + curstep->arg1 < map->max_devices) || + (-1-curstep->arg1 < map->max_buckets && + map->buckets[-1-curstep->arg1])) { + w[0] = curstep->arg1; + wsize = 1; + } else { + dprintk(" bad take value %d\n", curstep->arg1); + } break; case CRUSH_RULE_SET_CHOOSE_TRIES: @@ -877,7 +927,7 @@ int crush_do_rule(const struct crush_map *map, 0); } else { out_size = ((numrep < (result_max-osize)) ? - numrep : (result_max-osize)); + numrep : (result_max-osize)); crush_choose_indep( map, map->buckets[-1-w[i]], @@ -923,5 +973,3 @@ int crush_do_rule(const struct crush_map *map, } return result_len; } - - diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 073262fea6dd..e3be1d22a247 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -6,6 +6,7 @@ #include <linux/inet.h> #include <linux/kthread.h> #include <linux/net.h> +#include <linux/nsproxy.h> #include <linux/slab.h> #include <linux/socket.h> #include <linux/string.h> @@ -278,7 +279,6 @@ static void _ceph_msgr_exit(void) ceph_msgr_slab_exit(); BUG_ON(zero_page == NULL); - kunmap(zero_page); page_cache_release(zero_page); zero_page = NULL; } @@ -480,7 +480,7 @@ static int ceph_tcp_connect(struct ceph_connection *con) int ret; BUG_ON(con->sock); - ret = sock_create_kern(&init_net, con->peer_addr.in_addr.ss_family, + ret = sock_create_kern(read_pnet(&con->msgr->net), paddr->ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); if (ret) return ret; @@ -1545,7 +1545,7 @@ static int write_partial_message_data(struct ceph_connection *con) page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, &last_piece); ret = ceph_tcp_sendpage(con->sock, page, page_offset, - length, last_piece); + length, !last_piece); if (ret <= 0) { if (do_datacrc) msg->footer.data_crc = cpu_to_le32(crc); @@ -1732,17 +1732,17 @@ static int verify_hello(struct ceph_connection *con) static bool addr_is_blank(struct sockaddr_storage *ss) { + struct in_addr *addr = &((struct sockaddr_in *)ss)->sin_addr; + struct in6_addr *addr6 = &((struct sockaddr_in6 *)ss)->sin6_addr; + switch (ss->ss_family) { case AF_INET: - return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0; + return addr->s_addr == htonl(INADDR_ANY); case AF_INET6: - return - ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 && - ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 && - ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 && - ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0; + return ipv6_addr_any(addr6); + default: + return true; } - return false; } static int addr_port(struct sockaddr_storage *ss) @@ -2945,11 +2945,18 @@ void ceph_messenger_init(struct ceph_messenger *msgr, msgr->tcp_nodelay = tcp_nodelay; atomic_set(&msgr->stopping, 0); + write_pnet(&msgr->net, get_net(current->nsproxy->net_ns)); dout("%s %p\n", __func__, msgr); } EXPORT_SYMBOL(ceph_messenger_init); +void ceph_messenger_fini(struct ceph_messenger *msgr) +{ + put_net(read_pnet(&msgr->net)); +} +EXPORT_SYMBOL(ceph_messenger_fini); + static void clear_standby(struct ceph_connection *con) { /* come back from STANDBY? */ diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 2b3cf05e87b0..9d6ff1215928 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -298,21 +298,28 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) } EXPORT_SYMBOL(ceph_monc_request_next_osdmap); +/* + * Wait for an osdmap with a given epoch. + * + * @epoch: epoch to wait for + * @timeout: in jiffies, 0 means "wait forever" + */ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, unsigned long timeout) { unsigned long started = jiffies; - int ret; + long ret; mutex_lock(&monc->mutex); while (monc->have_osdmap < epoch) { mutex_unlock(&monc->mutex); - if (timeout != 0 && time_after_eq(jiffies, started + timeout)) + if (timeout && time_after_eq(jiffies, started + timeout)) return -ETIMEDOUT; ret = wait_event_interruptible_timeout(monc->client->auth_wq, - monc->have_osdmap >= epoch, timeout); + monc->have_osdmap >= epoch, + ceph_timeout_jiffies(timeout)); if (ret < 0) return ret; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index c4ec9239249a..50033677c0fa 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -296,6 +296,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_CMPXATTR: ceph_osd_data_release(&op->xattr.osd_data); break; + case CEPH_OSD_OP_STAT: + ceph_osd_data_release(&op->raw_data_in); + break; default: break; } @@ -450,7 +453,7 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE) */ static struct ceph_osd_req_op * _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, - u16 opcode) + u16 opcode, u32 flags) { struct ceph_osd_req_op *op; @@ -460,14 +463,15 @@ _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, op = &osd_req->r_ops[which]; memset(op, 0, sizeof (*op)); op->op = opcode; + op->flags = flags; return op; } void osd_req_op_init(struct ceph_osd_request *osd_req, - unsigned int which, u16 opcode) + unsigned int which, u16 opcode, u32 flags) { - (void)_osd_req_op_init(osd_req, which, opcode); + (void)_osd_req_op_init(osd_req, which, opcode, flags); } EXPORT_SYMBOL(osd_req_op_init); @@ -476,7 +480,8 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req, u64 offset, u64 length, u64 truncate_size, u32 truncate_seq) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + opcode, 0); size_t payload_len = 0; BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && @@ -515,7 +520,8 @@ EXPORT_SYMBOL(osd_req_op_extent_update); void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *class, const char *method) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + opcode, 0); struct ceph_pagelist *pagelist; size_t payload_len = 0; size_t size; @@ -552,7 +558,8 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *name, const void *value, size_t size, u8 cmp_op, u8 cmp_mode) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + opcode, 0); struct ceph_pagelist *pagelist; size_t payload_len; @@ -585,7 +592,8 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, u64 cookie, u64 version, int flag) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + opcode, 0); BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); @@ -602,7 +610,8 @@ void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, u64 expected_write_size) { struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, - CEPH_OSD_OP_SETALLOCHINT); + CEPH_OSD_OP_SETALLOCHINT, + 0); op->alloc_hint.expected_object_size = expected_object_size; op->alloc_hint.expected_write_size = expected_write_size; @@ -786,7 +795,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, } if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { - osd_req_op_init(req, which, opcode); + osd_req_op_init(req, which, opcode, 0); } else { u32 object_size = le32_to_cpu(layout->fl_object_size); u32 object_base = off - objoff; @@ -1088,7 +1097,7 @@ static void __move_osd_to_lru(struct ceph_osd_client *osdc, BUG_ON(!list_empty(&osd->o_osd_lru)); list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); - osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; + osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; } static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc, @@ -1199,7 +1208,7 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) static void __schedule_osd_timeout(struct ceph_osd_client *osdc) { schedule_delayed_work(&osdc->timeout_work, - osdc->client->options->osd_keepalive_timeout * HZ); + osdc->client->options->osd_keepalive_timeout); } static void __cancel_osd_timeout(struct ceph_osd_client *osdc) @@ -1567,10 +1576,9 @@ static void handle_timeout(struct work_struct *work) { struct ceph_osd_client *osdc = container_of(work, struct ceph_osd_client, timeout_work.work); + struct ceph_options *opts = osdc->client->options; struct ceph_osd_request *req; struct ceph_osd *osd; - unsigned long keepalive = - osdc->client->options->osd_keepalive_timeout * HZ; struct list_head slow_osds; dout("timeout\n"); down_read(&osdc->map_sem); @@ -1586,7 +1594,8 @@ static void handle_timeout(struct work_struct *work) */ INIT_LIST_HEAD(&slow_osds); list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { - if (time_before(jiffies, req->r_stamp + keepalive)) + if (time_before(jiffies, + req->r_stamp + opts->osd_keepalive_timeout)) break; osd = req->r_osd; @@ -1613,8 +1622,7 @@ static void handle_osds_timeout(struct work_struct *work) struct ceph_osd_client *osdc = container_of(work, struct ceph_osd_client, osds_timeout_work.work); - unsigned long delay = - osdc->client->options->osd_idle_ttl * HZ >> 2; + unsigned long delay = osdc->client->options->osd_idle_ttl / 4; dout("osds timeout\n"); down_read(&osdc->map_sem); @@ -2619,7 +2627,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) osdc->event_count = 0; schedule_delayed_work(&osdc->osds_timeout_work, - round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); + round_jiffies_relative(osdc->client->options->osd_idle_ttl)); err = -ENOMEM; osdc->req_mempool = mempool_create_kmalloc_pool(10, diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 15796696d64e..4a3125836b64 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -89,7 +89,7 @@ static int crush_decode_tree_bucket(void **p, void *end, { int j; dout("crush_decode_tree_bucket %p to %p\n", *p, end); - ceph_decode_32_safe(p, end, b->num_nodes, bad); + ceph_decode_8_safe(p, end, b->num_nodes, bad); b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS); if (b->node_weights == NULL) return -ENOMEM; diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c index 096d91447e06..d4f5f220a8e5 100644 --- a/net/ceph/pagevec.c +++ b/net/ceph/pagevec.c @@ -51,10 +51,7 @@ void ceph_put_page_vector(struct page **pages, int num_pages, bool dirty) set_page_dirty_lock(pages[i]); put_page(pages[i]); } - if (is_vmalloc_addr(pages)) - vfree(pages); - else - kfree(pages); + kvfree(pages); } EXPORT_SYMBOL(ceph_put_page_vector); diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile index 936ad0a15371..b512fbd9d79a 100644 --- a/net/sunrpc/Makefile +++ b/net/sunrpc/Makefile @@ -14,6 +14,6 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \ sunrpc_syms.o cache.o rpc_pipe.o \ svc_xprt.o sunrpc-$(CONFIG_SUNRPC_DEBUG) += debugfs.o -sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o bc_svc.o +sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o sunrpc-$(CONFIG_PROC_FS) += stats.o sunrpc-$(CONFIG_SYSCTL) += sysctl.o diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c index 9dd0ea8db463..9825ff0f91d6 100644 --- a/net/sunrpc/backchannel_rqst.c +++ b/net/sunrpc/backchannel_rqst.c @@ -37,16 +37,18 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ static inline int xprt_need_to_requeue(struct rpc_xprt *xprt) { - return xprt->bc_alloc_count > 0; + return xprt->bc_alloc_count < atomic_read(&xprt->bc_free_slots); } static inline void xprt_inc_alloc_count(struct rpc_xprt *xprt, unsigned int n) { + atomic_add(n, &xprt->bc_free_slots); xprt->bc_alloc_count += n; } static inline int xprt_dec_alloc_count(struct rpc_xprt *xprt, unsigned int n) { + atomic_sub(n, &xprt->bc_free_slots); return xprt->bc_alloc_count -= n; } @@ -60,13 +62,62 @@ static void xprt_free_allocation(struct rpc_rqst *req) dprintk("RPC: free allocations for req= %p\n", req); WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); - xbufp = &req->rq_private_buf; + xbufp = &req->rq_rcv_buf; free_page((unsigned long)xbufp->head[0].iov_base); xbufp = &req->rq_snd_buf; free_page((unsigned long)xbufp->head[0].iov_base); kfree(req); } +static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags) +{ + struct page *page; + /* Preallocate one XDR receive buffer */ + page = alloc_page(gfp_flags); + if (page == NULL) + return -ENOMEM; + buf->head[0].iov_base = page_address(page); + buf->head[0].iov_len = PAGE_SIZE; + buf->tail[0].iov_base = NULL; + buf->tail[0].iov_len = 0; + buf->page_len = 0; + buf->len = 0; + buf->buflen = PAGE_SIZE; + return 0; +} + +static +struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt, gfp_t gfp_flags) +{ + struct rpc_rqst *req; + + /* Pre-allocate one backchannel rpc_rqst */ + req = kzalloc(sizeof(*req), gfp_flags); + if (req == NULL) + return NULL; + + req->rq_xprt = xprt; + INIT_LIST_HEAD(&req->rq_list); + INIT_LIST_HEAD(&req->rq_bc_list); + + /* Preallocate one XDR receive buffer */ + if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) { + printk(KERN_ERR "Failed to create bc receive xbuf\n"); + goto out_free; + } + req->rq_rcv_buf.len = PAGE_SIZE; + + /* Preallocate one XDR send buffer */ + if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) { + printk(KERN_ERR "Failed to create bc snd xbuf\n"); + goto out_free; + } + return req; +out_free: + xprt_free_allocation(req); + return NULL; +} + /* * Preallocate up to min_reqs structures and related buffers for use * by the backchannel. This function can be called multiple times @@ -87,9 +138,7 @@ static void xprt_free_allocation(struct rpc_rqst *req) */ int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) { - struct page *page_rcv = NULL, *page_snd = NULL; - struct xdr_buf *xbufp = NULL; - struct rpc_rqst *req, *tmp; + struct rpc_rqst *req; struct list_head tmp_list; int i; @@ -106,7 +155,7 @@ int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) INIT_LIST_HEAD(&tmp_list); for (i = 0; i < min_reqs; i++) { /* Pre-allocate one backchannel rpc_rqst */ - req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); + req = xprt_alloc_bc_req(xprt, GFP_KERNEL); if (req == NULL) { printk(KERN_ERR "Failed to create bc rpc_rqst\n"); goto out_free; @@ -115,41 +164,6 @@ int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) /* Add the allocated buffer to the tmp list */ dprintk("RPC: adding req= %p\n", req); list_add(&req->rq_bc_pa_list, &tmp_list); - - req->rq_xprt = xprt; - INIT_LIST_HEAD(&req->rq_list); - INIT_LIST_HEAD(&req->rq_bc_list); - - /* Preallocate one XDR receive buffer */ - page_rcv = alloc_page(GFP_KERNEL); - if (page_rcv == NULL) { - printk(KERN_ERR "Failed to create bc receive xbuf\n"); - goto out_free; - } - xbufp = &req->rq_rcv_buf; - xbufp->head[0].iov_base = page_address(page_rcv); - xbufp->head[0].iov_len = PAGE_SIZE; - xbufp->tail[0].iov_base = NULL; - xbufp->tail[0].iov_len = 0; - xbufp->page_len = 0; - xbufp->len = PAGE_SIZE; - xbufp->buflen = PAGE_SIZE; - - /* Preallocate one XDR send buffer */ - page_snd = alloc_page(GFP_KERNEL); - if (page_snd == NULL) { - printk(KERN_ERR "Failed to create bc snd xbuf\n"); - goto out_free; - } - - xbufp = &req->rq_snd_buf; - xbufp->head[0].iov_base = page_address(page_snd); - xbufp->head[0].iov_len = 0; - xbufp->tail[0].iov_base = NULL; - xbufp->tail[0].iov_len = 0; - xbufp->page_len = 0; - xbufp->len = 0; - xbufp->buflen = PAGE_SIZE; } /* @@ -167,7 +181,10 @@ out_free: /* * Memory allocation failed, free the temporary list */ - list_for_each_entry_safe(req, tmp, &tmp_list, rq_bc_pa_list) { + while (!list_empty(&tmp_list)) { + req = list_first_entry(&tmp_list, + struct rpc_rqst, + rq_bc_pa_list); list_del(&req->rq_bc_pa_list); xprt_free_allocation(req); } @@ -217,9 +234,15 @@ static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid) struct rpc_rqst *req = NULL; dprintk("RPC: allocate a backchannel request\n"); - if (list_empty(&xprt->bc_pa_list)) + if (atomic_read(&xprt->bc_free_slots) <= 0) goto not_found; - + if (list_empty(&xprt->bc_pa_list)) { + req = xprt_alloc_bc_req(xprt, GFP_ATOMIC); + if (!req) + goto not_found; + /* Note: this 'free' request adds it to xprt->bc_pa_list */ + xprt_free_bc_request(req); + } req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, rq_bc_pa_list); req->rq_reply_bytes_recvd = 0; @@ -245,11 +268,21 @@ void xprt_free_bc_request(struct rpc_rqst *req) req->rq_connect_cookie = xprt->connect_cookie - 1; smp_mb__before_atomic(); - WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); smp_mb__after_atomic(); - if (!xprt_need_to_requeue(xprt)) { + /* + * Return it to the list of preallocations so that it + * may be reused by a new callback request. + */ + spin_lock_bh(&xprt->bc_pa_lock); + if (xprt_need_to_requeue(xprt)) { + list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list); + xprt->bc_alloc_count++; + req = NULL; + } + spin_unlock_bh(&xprt->bc_pa_lock); + if (req != NULL) { /* * The last remaining session was destroyed while this * entry was in use. Free the entry and don't attempt @@ -260,14 +293,6 @@ void xprt_free_bc_request(struct rpc_rqst *req) xprt_free_allocation(req); return; } - - /* - * Return it to the list of preallocations so that it - * may be reused by a new callback request. - */ - spin_lock_bh(&xprt->bc_pa_lock); - list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list); - spin_unlock_bh(&xprt->bc_pa_lock); } /* @@ -311,6 +336,7 @@ void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied) spin_lock(&xprt->bc_pa_lock); list_del(&req->rq_bc_pa_list); + xprt->bc_alloc_count--; spin_unlock(&xprt->bc_pa_lock); req->rq_private_buf.len = copied; diff --git a/net/sunrpc/bc_svc.c b/net/sunrpc/bc_svc.c deleted file mode 100644 index 15c7a8a1c24f..000000000000 --- a/net/sunrpc/bc_svc.c +++ /dev/null @@ -1,63 +0,0 @@ -/****************************************************************************** - -(c) 2007 Network Appliance, Inc. All Rights Reserved. -(c) 2009 NetApp. All Rights Reserved. - -NetApp provides this source code under the GPL v2 License. -The GPL v2 license is available at -http://opensource.org/licenses/gpl-license.php. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -******************************************************************************/ - -/* - * The NFSv4.1 callback service helper routines. - * They implement the transport level processing required to send the - * reply over an existing open connection previously established by the client. - */ - -#include <linux/module.h> - -#include <linux/sunrpc/xprt.h> -#include <linux/sunrpc/sched.h> -#include <linux/sunrpc/bc_xprt.h> - -#define RPCDBG_FACILITY RPCDBG_SVCDSP - -/* Empty callback ops */ -static const struct rpc_call_ops nfs41_callback_ops = { -}; - - -/* - * Send the callback reply - */ -int bc_send(struct rpc_rqst *req) -{ - struct rpc_task *task; - int ret; - - dprintk("RPC: bc_send req= %p\n", req); - task = rpc_run_bc_task(req, &nfs41_callback_ops); - if (IS_ERR(task)) - ret = PTR_ERR(task); - else { - WARN_ON_ONCE(atomic_read(&task->tk_count) != 1); - ret = task->tk_status; - rpc_put_task(task); - } - dprintk("RPC: bc_send ret= %d\n", ret); - return ret; -} - diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index e6ce1517367f..cbc6af923dd1 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -891,15 +891,8 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt) task->tk_flags |= RPC_TASK_SOFT; if (clnt->cl_noretranstimeo) task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT; - if (sk_memalloc_socks()) { - struct rpc_xprt *xprt; - - rcu_read_lock(); - xprt = rcu_dereference(clnt->cl_xprt); - if (xprt->swapper) - task->tk_flags |= RPC_TASK_SWAPPER; - rcu_read_unlock(); - } + if (atomic_read(&clnt->cl_swapper)) + task->tk_flags |= RPC_TASK_SWAPPER; /* Add to the client's list of all tasks */ spin_lock(&clnt->cl_lock); list_add_tail(&task->tk_task, &clnt->cl_tasks); @@ -1031,15 +1024,14 @@ EXPORT_SYMBOL_GPL(rpc_call_async); * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run * rpc_execute against it * @req: RPC request - * @tk_ops: RPC call ops */ -struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req, - const struct rpc_call_ops *tk_ops) +struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req) { struct rpc_task *task; struct xdr_buf *xbufp = &req->rq_snd_buf; struct rpc_task_setup task_setup_data = { - .callback_ops = tk_ops, + .callback_ops = &rpc_default_ops, + .flags = RPC_TASK_SOFTCONN, }; dprintk("RPC: rpc_run_bc_task req= %p\n", req); @@ -1614,6 +1606,7 @@ call_allocate(struct rpc_task *task) req->rq_callsize + req->rq_rcvsize); if (req->rq_buffer != NULL) return; + xprt_inject_disconnect(xprt); dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid); @@ -1951,33 +1944,36 @@ call_bc_transmit(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; - if (!xprt_prepare_transmit(task)) { - /* - * Could not reserve the transport. Try again after the - * transport is released. - */ - task->tk_status = 0; - task->tk_action = call_bc_transmit; - return; - } + if (!xprt_prepare_transmit(task)) + goto out_retry; - task->tk_action = rpc_exit_task; if (task->tk_status < 0) { printk(KERN_NOTICE "RPC: Could not send backchannel reply " "error: %d\n", task->tk_status); - return; + goto out_done; } + if (req->rq_connect_cookie != req->rq_xprt->connect_cookie) + req->rq_bytes_sent = 0; xprt_transmit(task); + + if (task->tk_status == -EAGAIN) + goto out_nospace; + xprt_end_transmit(task); dprint_status(task); switch (task->tk_status) { case 0: /* Success */ - break; case -EHOSTDOWN: case -EHOSTUNREACH: case -ENETUNREACH: + case -ECONNRESET: + case -ECONNREFUSED: + case -EADDRINUSE: + case -ENOTCONN: + case -EPIPE: + break; case -ETIMEDOUT: /* * Problem reaching the server. Disconnect and let the @@ -2002,6 +1998,13 @@ call_bc_transmit(struct rpc_task *task) break; } rpc_wake_up_queued_task(&req->rq_xprt->pending, task); +out_done: + task->tk_action = rpc_exit_task; + return; +out_nospace: + req->rq_connect_cookie = req->rq_xprt->connect_cookie; +out_retry: + task->tk_status = 0; } #endif /* CONFIG_SUNRPC_BACKCHANNEL */ @@ -2476,3 +2479,59 @@ void rpc_show_tasks(struct net *net) spin_unlock(&sn->rpc_client_lock); } #endif + +#if IS_ENABLED(CONFIG_SUNRPC_SWAP) +int +rpc_clnt_swap_activate(struct rpc_clnt *clnt) +{ + int ret = 0; + struct rpc_xprt *xprt; + + if (atomic_inc_return(&clnt->cl_swapper) == 1) { +retry: + rcu_read_lock(); + xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); + rcu_read_unlock(); + if (!xprt) { + /* + * If we didn't get a reference, then we likely are + * racing with a migration event. Wait for a grace + * period and try again. + */ + synchronize_rcu(); + goto retry; + } + + ret = xprt_enable_swap(xprt); + xprt_put(xprt); + } + return ret; +} +EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate); + +void +rpc_clnt_swap_deactivate(struct rpc_clnt *clnt) +{ + struct rpc_xprt *xprt; + + if (atomic_dec_if_positive(&clnt->cl_swapper) == 0) { +retry: + rcu_read_lock(); + xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); + rcu_read_unlock(); + if (!xprt) { + /* + * If we didn't get a reference, then we likely are + * racing with a migration event. Wait for a grace + * period and try again. + */ + synchronize_rcu(); + goto retry; + } + + xprt_disable_swap(xprt); + xprt_put(xprt); + } +} +EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate); +#endif /* CONFIG_SUNRPC_SWAP */ diff --git a/net/sunrpc/debugfs.c b/net/sunrpc/debugfs.c index 82962f7e6e88..e7b4d93566df 100644 --- a/net/sunrpc/debugfs.c +++ b/net/sunrpc/debugfs.c @@ -10,9 +10,12 @@ #include "netns.h" static struct dentry *topdir; +static struct dentry *rpc_fault_dir; static struct dentry *rpc_clnt_dir; static struct dentry *rpc_xprt_dir; +unsigned int rpc_inject_disconnect; + struct rpc_clnt_iter { struct rpc_clnt *clnt; loff_t pos; @@ -257,6 +260,8 @@ rpc_xprt_debugfs_register(struct rpc_xprt *xprt) debugfs_remove_recursive(xprt->debugfs); xprt->debugfs = NULL; } + + atomic_set(&xprt->inject_disconnect, rpc_inject_disconnect); } void @@ -266,11 +271,79 @@ rpc_xprt_debugfs_unregister(struct rpc_xprt *xprt) xprt->debugfs = NULL; } +static int +fault_open(struct inode *inode, struct file *filp) +{ + filp->private_data = kmalloc(128, GFP_KERNEL); + if (!filp->private_data) + return -ENOMEM; + return 0; +} + +static int +fault_release(struct inode *inode, struct file *filp) +{ + kfree(filp->private_data); + return 0; +} + +static ssize_t +fault_disconnect_read(struct file *filp, char __user *user_buf, + size_t len, loff_t *offset) +{ + char *buffer = (char *)filp->private_data; + size_t size; + + size = sprintf(buffer, "%u\n", rpc_inject_disconnect); + return simple_read_from_buffer(user_buf, len, offset, buffer, size); +} + +static ssize_t +fault_disconnect_write(struct file *filp, const char __user *user_buf, + size_t len, loff_t *offset) +{ + char buffer[16]; + + if (len >= sizeof(buffer)) + len = sizeof(buffer) - 1; + if (copy_from_user(buffer, user_buf, len)) + return -EFAULT; + buffer[len] = '\0'; + if (kstrtouint(buffer, 10, &rpc_inject_disconnect)) + return -EINVAL; + return len; +} + +static const struct file_operations fault_disconnect_fops = { + .owner = THIS_MODULE, + .open = fault_open, + .read = fault_disconnect_read, + .write = fault_disconnect_write, + .release = fault_release, +}; + +static struct dentry * +inject_fault_dir(struct dentry *topdir) +{ + struct dentry *faultdir; + + faultdir = debugfs_create_dir("inject_fault", topdir); + if (!faultdir) + return NULL; + + if (!debugfs_create_file("disconnect", S_IFREG | S_IRUSR, faultdir, + NULL, &fault_disconnect_fops)) + return NULL; + + return faultdir; +} + void __exit sunrpc_debugfs_exit(void) { debugfs_remove_recursive(topdir); topdir = NULL; + rpc_fault_dir = NULL; rpc_clnt_dir = NULL; rpc_xprt_dir = NULL; } @@ -282,6 +355,10 @@ sunrpc_debugfs_init(void) if (!topdir) return; + rpc_fault_dir = inject_fault_dir(topdir); + if (!rpc_fault_dir) + goto out_remove; + rpc_clnt_dir = debugfs_create_dir("rpc_clnt", topdir); if (!rpc_clnt_dir) goto out_remove; @@ -294,5 +371,6 @@ sunrpc_debugfs_init(void) out_remove: debugfs_remove_recursive(topdir); topdir = NULL; + rpc_fault_dir = NULL; rpc_clnt_dir = NULL; } diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index 852ae606b02a..5a16d8d8c831 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -1350,6 +1350,11 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req, { struct kvec *argv = &rqstp->rq_arg.head[0]; struct kvec *resv = &rqstp->rq_res.head[0]; + struct rpc_task *task; + int proc_error; + int error; + + dprintk("svc: %s(%p)\n", __func__, req); /* Build the svc_rqst used by the common processing routine */ rqstp->rq_xprt = serv->sv_bc_xprt; @@ -1372,21 +1377,36 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req, /* * Skip the next two words because they've already been - * processed in the trasport + * processed in the transport */ svc_getu32(argv); /* XID */ svc_getnl(argv); /* CALLDIR */ - /* Returns 1 for send, 0 for drop */ - if (svc_process_common(rqstp, argv, resv)) { - memcpy(&req->rq_snd_buf, &rqstp->rq_res, - sizeof(req->rq_snd_buf)); - return bc_send(req); - } else { - /* drop request */ + /* Parse and execute the bc call */ + proc_error = svc_process_common(rqstp, argv, resv); + + atomic_inc(&req->rq_xprt->bc_free_slots); + if (!proc_error) { + /* Processing error: drop the request */ xprt_free_bc_request(req); return 0; } + + /* Finally, send the reply synchronously */ + memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf)); + task = rpc_run_bc_task(req); + if (IS_ERR(task)) { + error = PTR_ERR(task); + goto out; + } + + WARN_ON_ONCE(atomic_read(&task->tk_count) != 1); + error = task->tk_status; + rpc_put_task(task); + +out: + dprintk("svc: %s(), error=%d\n", __func__, error); + return error; } EXPORT_SYMBOL_GPL(bc_svc_process); #endif /* CONFIG_SUNRPC_BACKCHANNEL */ diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 1d4fe24af06a..ab5dd621ae0c 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -68,6 +68,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net); static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); static void xprt_connect_status(struct rpc_task *task); static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); +static void __xprt_put_cong(struct rpc_xprt *, struct rpc_rqst *); static void xprt_destroy(struct rpc_xprt *xprt); static DEFINE_SPINLOCK(xprt_list_lock); @@ -250,6 +251,8 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) } xprt_clear_locked(xprt); out_sleep: + if (req) + __xprt_put_cong(xprt, req); dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); task->tk_timeout = 0; task->tk_status = -EAGAIN; @@ -608,8 +611,8 @@ static void xprt_autoclose(struct work_struct *work) struct rpc_xprt *xprt = container_of(work, struct rpc_xprt, task_cleanup); - xprt->ops->close(xprt); clear_bit(XPRT_CLOSE_WAIT, &xprt->state); + xprt->ops->close(xprt); xprt_release_write(xprt, NULL); } @@ -967,6 +970,7 @@ void xprt_transmit(struct rpc_task *task) task->tk_status = status; return; } + xprt_inject_disconnect(xprt); dprintk("RPC: %5u xmit complete\n", task->tk_pid); task->tk_flags |= RPC_TASK_SENT; @@ -1285,6 +1289,7 @@ void xprt_release(struct rpc_task *task) spin_unlock_bh(&xprt->transport_lock); if (req->rq_buffer) xprt->ops->buf_free(req->rq_buffer); + xprt_inject_disconnect(xprt); if (req->rq_cred != NULL) put_rpccred(req->rq_cred); task->tk_rqstp = NULL; diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index 302d4ebf6fbf..f1e8dafbd507 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -11,6 +11,21 @@ * can take tens of usecs to complete. */ +/* Normal operation + * + * A Memory Region is prepared for RDMA READ or WRITE using the + * ib_map_phys_fmr verb (fmr_op_map). When the RDMA operation is + * finished, the Memory Region is unmapped using the ib_unmap_fmr + * verb (fmr_op_unmap). + */ + +/* Transport recovery + * + * After a transport reconnect, fmr_op_map re-uses the MR already + * allocated for the RPC, but generates a fresh rkey then maps the + * MR again. This process is synchronous. + */ + #include "xprt_rdma.h" #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) @@ -50,19 +65,28 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt) struct rpcrdma_mw *r; int i, rc; + spin_lock_init(&buf->rb_mwlock); INIT_LIST_HEAD(&buf->rb_mws); INIT_LIST_HEAD(&buf->rb_all); - i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS; - dprintk("RPC: %s: initializing %d FMRs\n", __func__, i); + i = max_t(int, RPCRDMA_MAX_DATA_SEGS / RPCRDMA_MAX_FMR_SGES, 1); + i += 2; /* head + tail */ + i *= buf->rb_max_requests; /* one set for each RPC slot */ + dprintk("RPC: %s: initalizing %d FMRs\n", __func__, i); + rc = -ENOMEM; while (i--) { r = kzalloc(sizeof(*r), GFP_KERNEL); if (!r) - return -ENOMEM; + goto out; - r->r.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr); - if (IS_ERR(r->r.fmr)) + r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES * + sizeof(u64), GFP_KERNEL); + if (!r->r.fmr.physaddrs) + goto out_free; + + r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr); + if (IS_ERR(r->r.fmr.fmr)) goto out_fmr_err; list_add(&r->mw_list, &buf->rb_mws); @@ -71,12 +95,24 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt) return 0; out_fmr_err: - rc = PTR_ERR(r->r.fmr); + rc = PTR_ERR(r->r.fmr.fmr); dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc); + kfree(r->r.fmr.physaddrs); +out_free: kfree(r); +out: return rc; } +static int +__fmr_unmap(struct rpcrdma_mw *r) +{ + LIST_HEAD(l); + + list_add(&r->r.fmr.fmr->list, &l); + return ib_unmap_fmr(&l); +} + /* Use the ib_map_phys_fmr() verb to register a memory region * for remote access via RDMA READ or RDMA WRITE. */ @@ -85,12 +121,24 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int nsegs, bool writing) { struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct ib_device *device = ia->ri_id->device; + struct ib_device *device = ia->ri_device; enum dma_data_direction direction = rpcrdma_data_dir(writing); struct rpcrdma_mr_seg *seg1 = seg; - struct rpcrdma_mw *mw = seg1->rl_mw; - u64 physaddrs[RPCRDMA_MAX_DATA_SEGS]; int len, pageoff, i, rc; + struct rpcrdma_mw *mw; + + mw = seg1->rl_mw; + seg1->rl_mw = NULL; + if (!mw) { + mw = rpcrdma_get_mw(r_xprt); + if (!mw) + return -ENOMEM; + } else { + /* this is a retransmit; generate a fresh rkey */ + rc = __fmr_unmap(mw); + if (rc) + return rc; + } pageoff = offset_in_page(seg1->mr_offset); seg1->mr_offset -= pageoff; /* start of page */ @@ -100,7 +148,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, nsegs = RPCRDMA_MAX_FMR_SGES; for (i = 0; i < nsegs;) { rpcrdma_map_one(device, seg, direction); - physaddrs[i] = seg->mr_dma; + mw->r.fmr.physaddrs[i] = seg->mr_dma; len += seg->mr_len; ++seg; ++i; @@ -110,11 +158,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, break; } - rc = ib_map_phys_fmr(mw->r.fmr, physaddrs, i, seg1->mr_dma); + rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs, + i, seg1->mr_dma); if (rc) goto out_maperr; - seg1->mr_rkey = mw->r.fmr->rkey; + seg1->rl_mw = mw; + seg1->mr_rkey = mw->r.fmr.fmr->rkey; seg1->mr_base = seg1->mr_dma + pageoff; seg1->mr_nsegs = i; seg1->mr_len = len; @@ -137,48 +187,28 @@ fmr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) { struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_mr_seg *seg1 = seg; - struct ib_device *device; + struct rpcrdma_mw *mw = seg1->rl_mw; int rc, nsegs = seg->mr_nsegs; - LIST_HEAD(l); - list_add(&seg1->rl_mw->r.fmr->list, &l); - rc = ib_unmap_fmr(&l); - read_lock(&ia->ri_qplock); - device = ia->ri_id->device; + dprintk("RPC: %s: FMR %p\n", __func__, mw); + + seg1->rl_mw = NULL; while (seg1->mr_nsegs--) - rpcrdma_unmap_one(device, seg++); - read_unlock(&ia->ri_qplock); + rpcrdma_unmap_one(ia->ri_device, seg++); + rc = __fmr_unmap(mw); if (rc) goto out_err; + rpcrdma_put_mw(r_xprt, mw); return nsegs; out_err: + /* The FMR is abandoned, but remains in rb_all. fmr_op_destroy + * will attempt to release it when the transport is destroyed. + */ dprintk("RPC: %s: ib_unmap_fmr status %i\n", __func__, rc); return nsegs; } -/* After a disconnect, unmap all FMRs. - * - * This is invoked only in the transport connect worker in order - * to serialize with rpcrdma_register_fmr_external(). - */ -static void -fmr_op_reset(struct rpcrdma_xprt *r_xprt) -{ - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - struct rpcrdma_mw *r; - LIST_HEAD(list); - int rc; - - list_for_each_entry(r, &buf->rb_all, mw_all) - list_add(&r->r.fmr->list, &list); - - rc = ib_unmap_fmr(&list); - if (rc) - dprintk("RPC: %s: ib_unmap_fmr failed %i\n", - __func__, rc); -} - static void fmr_op_destroy(struct rpcrdma_buffer *buf) { @@ -188,10 +218,13 @@ fmr_op_destroy(struct rpcrdma_buffer *buf) while (!list_empty(&buf->rb_all)) { r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); list_del(&r->mw_all); - rc = ib_dealloc_fmr(r->r.fmr); + kfree(r->r.fmr.physaddrs); + + rc = ib_dealloc_fmr(r->r.fmr.fmr); if (rc) dprintk("RPC: %s: ib_dealloc_fmr failed %i\n", __func__, rc); + kfree(r); } } @@ -202,7 +235,6 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { .ro_open = fmr_op_open, .ro_maxpages = fmr_op_maxpages, .ro_init = fmr_op_init, - .ro_reset = fmr_op_reset, .ro_destroy = fmr_op_destroy, .ro_displayname = "fmr", }; diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index d234521320a4..04ea914201b2 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -11,12 +11,136 @@ * but most complex memory registration mode. */ +/* Normal operation + * + * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG + * Work Request (frmr_op_map). When the RDMA operation is finished, this + * Memory Region is invalidated using a LOCAL_INV Work Request + * (frmr_op_unmap). + * + * Typically these Work Requests are not signaled, and neither are RDMA + * SEND Work Requests (with the exception of signaling occasionally to + * prevent provider work queue overflows). This greatly reduces HCA + * interrupt workload. + * + * As an optimization, frwr_op_unmap marks MRs INVALID before the + * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on + * rb_mws immediately so that no work (like managing a linked list + * under a spinlock) is needed in the completion upcall. + * + * But this means that frwr_op_map() can occasionally encounter an MR + * that is INVALID but the LOCAL_INV WR has not completed. Work Queue + * ordering prevents a subsequent FAST_REG WR from executing against + * that MR while it is still being invalidated. + */ + +/* Transport recovery + * + * ->op_map and the transport connect worker cannot run at the same + * time, but ->op_unmap can fire while the transport connect worker + * is running. Thus MR recovery is handled in ->op_map, to guarantee + * that recovered MRs are owned by a sending RPC, and not one where + * ->op_unmap could fire at the same time transport reconnect is + * being done. + * + * When the underlying transport disconnects, MRs are left in one of + * three states: + * + * INVALID: The MR was not in use before the QP entered ERROR state. + * (Or, the LOCAL_INV WR has not completed or flushed yet). + * + * STALE: The MR was being registered or unregistered when the QP + * entered ERROR state, and the pending WR was flushed. + * + * VALID: The MR was registered before the QP entered ERROR state. + * + * When frwr_op_map encounters STALE and VALID MRs, they are recovered + * with ib_dereg_mr and then are re-initialized. Beause MR recovery + * allocates fresh resources, it is deferred to a workqueue, and the + * recovered MRs are placed back on the rb_mws list when recovery is + * complete. frwr_op_map allocates another MR for the current RPC while + * the broken MR is reset. + * + * To ensure that frwr_op_map doesn't encounter an MR that is marked + * INVALID but that is about to be flushed due to a previous transport + * disconnect, the transport connect worker attempts to drain all + * pending send queue WRs before the transport is reconnected. + */ + #include "xprt_rdma.h" #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) # define RPCDBG_FACILITY RPCDBG_TRANS #endif +static struct workqueue_struct *frwr_recovery_wq; + +#define FRWR_RECOVERY_WQ_FLAGS (WQ_UNBOUND | WQ_MEM_RECLAIM) + +int +frwr_alloc_recovery_wq(void) +{ + frwr_recovery_wq = alloc_workqueue("frwr_recovery", + FRWR_RECOVERY_WQ_FLAGS, 0); + return !frwr_recovery_wq ? -ENOMEM : 0; +} + +void +frwr_destroy_recovery_wq(void) +{ + struct workqueue_struct *wq; + + if (!frwr_recovery_wq) + return; + + wq = frwr_recovery_wq; + frwr_recovery_wq = NULL; + destroy_workqueue(wq); +} + +/* Deferred reset of a single FRMR. Generate a fresh rkey by + * replacing the MR. + * + * There's no recovery if this fails. The FRMR is abandoned, but + * remains in rb_all. It will be cleaned up when the transport is + * destroyed. + */ +static void +__frwr_recovery_worker(struct work_struct *work) +{ + struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw, + r.frmr.fr_work); + struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt; + unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; + struct ib_pd *pd = r_xprt->rx_ia.ri_pd; + + if (ib_dereg_mr(r->r.frmr.fr_mr)) + goto out_fail; + + r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(pd, depth); + if (IS_ERR(r->r.frmr.fr_mr)) + goto out_fail; + + dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); + r->r.frmr.fr_state = FRMR_IS_INVALID; + rpcrdma_put_mw(r_xprt, r); + return; + +out_fail: + pr_warn("RPC: %s: FRMR %p unrecovered\n", + __func__, r); +} + +/* A broken MR was discovered in a context that can't sleep. + * Defer recovery to the recovery worker. + */ +static void +__frwr_queue_recovery(struct rpcrdma_mw *r) +{ + INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker); + queue_work(frwr_recovery_wq, &r->r.frmr.fr_work); +} + static int __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, unsigned int depth) @@ -128,7 +252,7 @@ frwr_sendcompletion(struct ib_wc *wc) /* WARNING: Only wr_id and status are reliable at this point */ r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - dprintk("RPC: %s: frmr %p (stale), status %s (%d)\n", + pr_warn("RPC: %s: frmr %p flushed, status %s (%d)\n", __func__, r, ib_wc_status_msg(wc->status), wc->status); r->r.frmr.fr_state = FRMR_IS_STALE; } @@ -137,16 +261,19 @@ static int frwr_op_init(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - struct ib_device *device = r_xprt->rx_ia.ri_id->device; + struct ib_device *device = r_xprt->rx_ia.ri_device; unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; struct ib_pd *pd = r_xprt->rx_ia.ri_pd; int i; + spin_lock_init(&buf->rb_mwlock); INIT_LIST_HEAD(&buf->rb_mws); INIT_LIST_HEAD(&buf->rb_all); - i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS; - dprintk("RPC: %s: initializing %d FRMRs\n", __func__, i); + i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1); + i += 2; /* head + tail */ + i *= buf->rb_max_requests; /* one set for each RPC slot */ + dprintk("RPC: %s: initalizing %d FRMRs\n", __func__, i); while (i--) { struct rpcrdma_mw *r; @@ -165,6 +292,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt) list_add(&r->mw_list, &buf->rb_mws); list_add(&r->mw_all, &buf->rb_all); r->mw_sendcompletion = frwr_sendcompletion; + r->r.frmr.fr_xprt = r_xprt; } return 0; @@ -178,12 +306,12 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int nsegs, bool writing) { struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct ib_device *device = ia->ri_id->device; + struct ib_device *device = ia->ri_device; enum dma_data_direction direction = rpcrdma_data_dir(writing); struct rpcrdma_mr_seg *seg1 = seg; - struct rpcrdma_mw *mw = seg1->rl_mw; - struct rpcrdma_frmr *frmr = &mw->r.frmr; - struct ib_mr *mr = frmr->fr_mr; + struct rpcrdma_mw *mw; + struct rpcrdma_frmr *frmr; + struct ib_mr *mr; struct ib_send_wr fastreg_wr, *bad_wr; u8 key; int len, pageoff; @@ -192,12 +320,25 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, u64 pa; int page_no; + mw = seg1->rl_mw; + seg1->rl_mw = NULL; + do { + if (mw) + __frwr_queue_recovery(mw); + mw = rpcrdma_get_mw(r_xprt); + if (!mw) + return -ENOMEM; + } while (mw->r.frmr.fr_state != FRMR_IS_INVALID); + frmr = &mw->r.frmr; + frmr->fr_state = FRMR_IS_VALID; + pageoff = offset_in_page(seg1->mr_offset); seg1->mr_offset -= pageoff; /* start of page */ seg1->mr_len += pageoff; len = -pageoff; if (nsegs > ia->ri_max_frmr_depth) nsegs = ia->ri_max_frmr_depth; + for (page_no = i = 0; i < nsegs;) { rpcrdma_map_one(device, seg, direction); pa = seg->mr_dma; @@ -216,8 +357,6 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, dprintk("RPC: %s: Using frmr %p to map %d segments (%d bytes)\n", __func__, mw, i, len); - frmr->fr_state = FRMR_IS_VALID; - memset(&fastreg_wr, 0, sizeof(fastreg_wr)); fastreg_wr.wr_id = (unsigned long)(void *)mw; fastreg_wr.opcode = IB_WR_FAST_REG_MR; @@ -229,6 +368,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, fastreg_wr.wr.fast_reg.access_flags = writing ? IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE : IB_ACCESS_REMOTE_READ; + mr = frmr->fr_mr; key = (u8)(mr->rkey & 0x000000FF); ib_update_fast_reg_key(mr, ++key); fastreg_wr.wr.fast_reg.rkey = mr->rkey; @@ -238,6 +378,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, if (rc) goto out_senderr; + seg1->rl_mw = mw; seg1->mr_rkey = mr->rkey; seg1->mr_base = seg1->mr_dma + pageoff; seg1->mr_nsegs = i; @@ -246,10 +387,9 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, out_senderr: dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); - ib_update_fast_reg_key(mr, --key); - frmr->fr_state = FRMR_IS_INVALID; while (i--) rpcrdma_unmap_one(device, --seg); + __frwr_queue_recovery(mw); return rc; } @@ -261,78 +401,46 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) { struct rpcrdma_mr_seg *seg1 = seg; struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_mw *mw = seg1->rl_mw; struct ib_send_wr invalidate_wr, *bad_wr; int rc, nsegs = seg->mr_nsegs; - struct ib_device *device; - seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID; + dprintk("RPC: %s: FRMR %p\n", __func__, mw); + + seg1->rl_mw = NULL; + mw->r.frmr.fr_state = FRMR_IS_INVALID; memset(&invalidate_wr, 0, sizeof(invalidate_wr)); - invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw; + invalidate_wr.wr_id = (unsigned long)(void *)mw; invalidate_wr.opcode = IB_WR_LOCAL_INV; - invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey; + invalidate_wr.ex.invalidate_rkey = mw->r.frmr.fr_mr->rkey; DECR_CQCOUNT(&r_xprt->rx_ep); - read_lock(&ia->ri_qplock); - device = ia->ri_id->device; while (seg1->mr_nsegs--) - rpcrdma_unmap_one(device, seg++); + rpcrdma_unmap_one(ia->ri_device, seg++); + read_lock(&ia->ri_qplock); rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr); read_unlock(&ia->ri_qplock); if (rc) goto out_err; + + rpcrdma_put_mw(r_xprt, mw); return nsegs; out_err: - /* Force rpcrdma_buffer_get() to retry */ - seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE; dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); + __frwr_queue_recovery(mw); return nsegs; } -/* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in - * an unusable state. Find FRMRs in this state and dereg / reg - * each. FRMRs that are VALID and attached to an rpcrdma_req are - * also torn down. - * - * This gives all in-use FRMRs a fresh rkey and leaves them INVALID. - * - * This is invoked only in the transport connect worker in order - * to serialize with rpcrdma_register_frmr_external(). - */ -static void -frwr_op_reset(struct rpcrdma_xprt *r_xprt) -{ - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - struct ib_device *device = r_xprt->rx_ia.ri_id->device; - unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; - struct ib_pd *pd = r_xprt->rx_ia.ri_pd; - struct rpcrdma_mw *r; - int rc; - - list_for_each_entry(r, &buf->rb_all, mw_all) { - if (r->r.frmr.fr_state == FRMR_IS_INVALID) - continue; - - __frwr_release(r); - rc = __frwr_init(r, pd, device, depth); - if (rc) { - dprintk("RPC: %s: mw %p left %s\n", - __func__, r, - (r->r.frmr.fr_state == FRMR_IS_STALE ? - "stale" : "valid")); - continue; - } - - r->r.frmr.fr_state = FRMR_IS_INVALID; - } -} - static void frwr_op_destroy(struct rpcrdma_buffer *buf) { struct rpcrdma_mw *r; + /* Ensure stale MWs for "buf" are no longer in flight */ + flush_workqueue(frwr_recovery_wq); + while (!list_empty(&buf->rb_all)) { r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); list_del(&r->mw_all); @@ -347,7 +455,6 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { .ro_open = frwr_op_open, .ro_maxpages = frwr_op_maxpages, .ro_init = frwr_op_init, - .ro_reset = frwr_op_reset, .ro_destroy = frwr_op_destroy, .ro_displayname = "frwr", }; diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c index ba518af16787..41985d07fdb7 100644 --- a/net/sunrpc/xprtrdma/physical_ops.c +++ b/net/sunrpc/xprtrdma/physical_ops.c @@ -50,8 +50,7 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, { struct rpcrdma_ia *ia = &r_xprt->rx_ia; - rpcrdma_map_one(ia->ri_id->device, seg, - rpcrdma_data_dir(writing)); + rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing)); seg->mr_rkey = ia->ri_bind_mem->rkey; seg->mr_base = seg->mr_dma; seg->mr_nsegs = 1; @@ -65,19 +64,11 @@ physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) { struct rpcrdma_ia *ia = &r_xprt->rx_ia; - read_lock(&ia->ri_qplock); - rpcrdma_unmap_one(ia->ri_id->device, seg); - read_unlock(&ia->ri_qplock); - + rpcrdma_unmap_one(ia->ri_device, seg); return 1; } static void -physical_op_reset(struct rpcrdma_xprt *r_xprt) -{ -} - -static void physical_op_destroy(struct rpcrdma_buffer *buf) { } @@ -88,7 +79,6 @@ const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = { .ro_open = physical_op_open, .ro_maxpages = physical_op_maxpages, .ro_init = physical_op_init, - .ro_reset = physical_op_reset, .ro_destroy = physical_op_destroy, .ro_displayname = "physical", }; diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 2c53ea9e1b83..84ea37daef36 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -284,9 +284,6 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, return (unsigned char *)iptr - (unsigned char *)headerp; out: - if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR) - return n; - for (pos = 0; nchunks--;) pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt, &req->rl_segments[pos]); @@ -732,8 +729,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) struct rpcrdma_msg *headerp; struct rpcrdma_req *req; struct rpc_rqst *rqst; - struct rpc_xprt *xprt = rep->rr_xprt; - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; + struct rpc_xprt *xprt = &r_xprt->rx_xprt; __be32 *iptr; int rdmalen, status; unsigned long cwnd; @@ -770,7 +767,6 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) rep->rr_len); repost: r_xprt->rx_stats.bad_reply_count++; - rep->rr_func = rpcrdma_reply_handler; if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) rpcrdma_recv_buffer_put(rep); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 436da2caec95..680f888a9ddd 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -240,6 +240,16 @@ xprt_rdma_connect_worker(struct work_struct *work) xprt_clear_connecting(xprt); } +static void +xprt_rdma_inject_disconnect(struct rpc_xprt *xprt) +{ + struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt, + rx_xprt); + + pr_info("rpcrdma: injecting transport disconnect on xprt=%p\n", xprt); + rdma_disconnect(r_xprt->rx_ia.ri_id); +} + /* * xprt_rdma_destroy * @@ -612,12 +622,6 @@ xprt_rdma_send_request(struct rpc_task *task) if (req->rl_reply == NULL) /* e.g. reconnection */ rpcrdma_recv_buffer_get(req); - if (req->rl_reply) { - req->rl_reply->rr_func = rpcrdma_reply_handler; - /* this need only be done once, but... */ - req->rl_reply->rr_xprt = xprt; - } - /* Must suppress retransmit to maintain credits */ if (req->rl_connect_cookie == xprt->connect_cookie) goto drop_connection; @@ -676,6 +680,17 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) r_xprt->rx_stats.bad_reply_count); } +static int +xprt_rdma_enable_swap(struct rpc_xprt *xprt) +{ + return -EINVAL; +} + +static void +xprt_rdma_disable_swap(struct rpc_xprt *xprt) +{ +} + /* * Plumbing for rpc transport switch and kernel module */ @@ -694,7 +709,10 @@ static struct rpc_xprt_ops xprt_rdma_procs = { .send_request = xprt_rdma_send_request, .close = xprt_rdma_close, .destroy = xprt_rdma_destroy, - .print_stats = xprt_rdma_print_stats + .print_stats = xprt_rdma_print_stats, + .enable_swap = xprt_rdma_enable_swap, + .disable_swap = xprt_rdma_disable_swap, + .inject_disconnect = xprt_rdma_inject_disconnect }; static struct xprt_class xprt_rdma = { @@ -720,17 +738,24 @@ void xprt_rdma_cleanup(void) if (rc) dprintk("RPC: %s: xprt_unregister returned %i\n", __func__, rc); + + frwr_destroy_recovery_wq(); } int xprt_rdma_init(void) { int rc; - rc = xprt_register_transport(&xprt_rdma); - + rc = frwr_alloc_recovery_wq(); if (rc) return rc; + rc = xprt_register_transport(&xprt_rdma); + if (rc) { + frwr_destroy_recovery_wq(); + return rc; + } + dprintk("RPCRDMA Module Init, register RPC RDMA transport\n"); dprintk("Defaults:\n"); diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 52df265b472a..891c4ede2c20 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -80,7 +80,6 @@ static void rpcrdma_run_tasklet(unsigned long data) { struct rpcrdma_rep *rep; - void (*func)(struct rpcrdma_rep *); unsigned long flags; data = data; @@ -89,14 +88,9 @@ rpcrdma_run_tasklet(unsigned long data) rep = list_entry(rpcrdma_tasklets_g.next, struct rpcrdma_rep, rr_list); list_del(&rep->rr_list); - func = rep->rr_func; - rep->rr_func = NULL; spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); - if (func) - func(rep); - else - rpcrdma_recv_buffer_put(rep); + rpcrdma_reply_handler(rep); spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); } @@ -236,7 +230,7 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) __func__, rep, wc->byte_len); rep->rr_len = wc->byte_len; - ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device, + ib_dma_sync_single_for_cpu(rep->rr_device, rdmab_addr(rep->rr_rdmabuf), rep->rr_len, DMA_FROM_DEVICE); prefetch(rdmab_to_msg(rep->rr_rdmabuf)); @@ -407,7 +401,7 @@ connected: pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n", sap, rpc_get_port(sap), - ia->ri_id->device->name, + ia->ri_device->name, ia->ri_ops->ro_displayname, xprt->rx_buf.rb_max_requests, ird, ird < 4 && ird < tird / 2 ? " (low!)" : ""); @@ -508,8 +502,9 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) rc = PTR_ERR(ia->ri_id); goto out1; } + ia->ri_device = ia->ri_id->device; - ia->ri_pd = ib_alloc_pd(ia->ri_id->device); + ia->ri_pd = ib_alloc_pd(ia->ri_device); if (IS_ERR(ia->ri_pd)) { rc = PTR_ERR(ia->ri_pd); dprintk("RPC: %s: ib_alloc_pd() failed %i\n", @@ -517,7 +512,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) goto out2; } - rc = ib_query_device(ia->ri_id->device, devattr); + rc = ib_query_device(ia->ri_device, devattr); if (rc) { dprintk("RPC: %s: ib_query_device failed %d\n", __func__, rc); @@ -526,7 +521,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) { ia->ri_have_dma_lkey = 1; - ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey; + ia->ri_dma_lkey = ia->ri_device->local_dma_lkey; } if (memreg == RPCRDMA_FRMR) { @@ -541,7 +536,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) } } if (memreg == RPCRDMA_MTHCAFMR) { - if (!ia->ri_id->device->alloc_fmr) { + if (!ia->ri_device->alloc_fmr) { dprintk("RPC: %s: MTHCAFMR registration " "not supported by HCA\n", __func__); memreg = RPCRDMA_ALLPHYSICAL; @@ -590,9 +585,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) dprintk("RPC: %s: memory registration strategy is '%s'\n", __func__, ia->ri_ops->ro_displayname); - /* Else will do memory reg/dereg for each chunk */ - ia->ri_memreg_strategy = memreg; - rwlock_init(&ia->ri_qplock); return 0; @@ -622,17 +614,17 @@ rpcrdma_ia_close(struct rpcrdma_ia *ia) dprintk("RPC: %s: ib_dereg_mr returned %i\n", __func__, rc); } + if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { if (ia->ri_id->qp) rdma_destroy_qp(ia->ri_id); rdma_destroy_id(ia->ri_id); ia->ri_id = NULL; } - if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) { - rc = ib_dealloc_pd(ia->ri_pd); - dprintk("RPC: %s: ib_dealloc_pd returned %i\n", - __func__, rc); - } + + /* If the pd is still busy, xprtrdma missed freeing a resource */ + if (ia->ri_pd && !IS_ERR(ia->ri_pd)) + WARN_ON(ib_dealloc_pd(ia->ri_pd)); } /* @@ -693,8 +685,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1; - sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall, - rpcrdma_cq_async_error_upcall, ep, &cq_attr); + sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall, + rpcrdma_cq_async_error_upcall, ep, &cq_attr); if (IS_ERR(sendcq)) { rc = PTR_ERR(sendcq); dprintk("RPC: %s: failed to create send CQ: %i\n", @@ -710,8 +702,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, } cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1; - recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall, - rpcrdma_cq_async_error_upcall, ep, &cq_attr); + recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall, + rpcrdma_cq_async_error_upcall, ep, &cq_attr); if (IS_ERR(recvcq)) { rc = PTR_ERR(recvcq); dprintk("RPC: %s: failed to create recv CQ: %i\n", @@ -817,8 +809,6 @@ retry: rpcrdma_flush_cqs(ep); xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); - ia->ri_ops->ro_reset(xprt); - id = rpcrdma_create_id(xprt, ia, (struct sockaddr *)&xprt->rx_data.addr); if (IS_ERR(id)) { @@ -832,7 +822,7 @@ retry: * More stuff I haven't thought of! * Rrrgh! */ - if (ia->ri_id->device != id->device) { + if (ia->ri_device != id->device) { printk("RPC: %s: can't reconnect on " "different device!\n", __func__); rdma_destroy_id(id); @@ -974,7 +964,8 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) goto out_free; } - rep->rr_buffer = &r_xprt->rx_buf; + rep->rr_device = ia->ri_device; + rep->rr_rxprt = r_xprt; return rep; out_free: @@ -1098,31 +1089,33 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) kfree(buf->rb_pool); } -/* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving - * some req segments uninitialized. - */ -static void -rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf) +struct rpcrdma_mw * +rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) { - if (*mw) { - list_add_tail(&(*mw)->mw_list, &buf->rb_mws); - *mw = NULL; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_mw *mw = NULL; + + spin_lock(&buf->rb_mwlock); + if (!list_empty(&buf->rb_mws)) { + mw = list_first_entry(&buf->rb_mws, + struct rpcrdma_mw, mw_list); + list_del_init(&mw->mw_list); } + spin_unlock(&buf->rb_mwlock); + + if (!mw) + pr_err("RPC: %s: no MWs available\n", __func__); + return mw; } -/* Cycle mw's back in reverse order, and "spin" them. - * This delays and scrambles reuse as much as possible. - */ -static void -rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) +void +rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) { - struct rpcrdma_mr_seg *seg = req->rl_segments; - struct rpcrdma_mr_seg *seg1 = seg; - int i; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++) - rpcrdma_buffer_put_mr(&seg->rl_mw, buf); - rpcrdma_buffer_put_mr(&seg1->rl_mw, buf); + spin_lock(&buf->rb_mwlock); + list_add_tail(&mw->mw_list, &buf->rb_mws); + spin_unlock(&buf->rb_mwlock); } static void @@ -1132,115 +1125,10 @@ rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) req->rl_niovs = 0; if (req->rl_reply) { buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply; - req->rl_reply->rr_func = NULL; req->rl_reply = NULL; } } -/* rpcrdma_unmap_one() was already done during deregistration. - * Redo only the ib_post_send(). - */ -static void -rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia) -{ - struct rpcrdma_xprt *r_xprt = - container_of(ia, struct rpcrdma_xprt, rx_ia); - struct ib_send_wr invalidate_wr, *bad_wr; - int rc; - - dprintk("RPC: %s: FRMR %p is stale\n", __func__, r); - - /* When this FRMR is re-inserted into rb_mws, it is no longer stale */ - r->r.frmr.fr_state = FRMR_IS_INVALID; - - memset(&invalidate_wr, 0, sizeof(invalidate_wr)); - invalidate_wr.wr_id = (unsigned long)(void *)r; - invalidate_wr.opcode = IB_WR_LOCAL_INV; - invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey; - DECR_CQCOUNT(&r_xprt->rx_ep); - - dprintk("RPC: %s: frmr %p invalidating rkey %08x\n", - __func__, r, r->r.frmr.fr_mr->rkey); - - read_lock(&ia->ri_qplock); - rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr); - read_unlock(&ia->ri_qplock); - if (rc) { - /* Force rpcrdma_buffer_get() to retry */ - r->r.frmr.fr_state = FRMR_IS_STALE; - dprintk("RPC: %s: ib_post_send failed, %i\n", - __func__, rc); - } -} - -static void -rpcrdma_retry_flushed_linv(struct list_head *stale, - struct rpcrdma_buffer *buf) -{ - struct rpcrdma_ia *ia = rdmab_to_ia(buf); - struct list_head *pos; - struct rpcrdma_mw *r; - unsigned long flags; - - list_for_each(pos, stale) { - r = list_entry(pos, struct rpcrdma_mw, mw_list); - rpcrdma_retry_local_inv(r, ia); - } - - spin_lock_irqsave(&buf->rb_lock, flags); - list_splice_tail(stale, &buf->rb_mws); - spin_unlock_irqrestore(&buf->rb_lock, flags); -} - -static struct rpcrdma_req * -rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf, - struct list_head *stale) -{ - struct rpcrdma_mw *r; - int i; - - i = RPCRDMA_MAX_SEGS - 1; - while (!list_empty(&buf->rb_mws)) { - r = list_entry(buf->rb_mws.next, - struct rpcrdma_mw, mw_list); - list_del(&r->mw_list); - if (r->r.frmr.fr_state == FRMR_IS_STALE) { - list_add(&r->mw_list, stale); - continue; - } - req->rl_segments[i].rl_mw = r; - if (unlikely(i-- == 0)) - return req; /* Success */ - } - - /* Not enough entries on rb_mws for this req */ - rpcrdma_buffer_put_sendbuf(req, buf); - rpcrdma_buffer_put_mrs(req, buf); - return NULL; -} - -static struct rpcrdma_req * -rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) -{ - struct rpcrdma_mw *r; - int i; - - i = RPCRDMA_MAX_SEGS - 1; - while (!list_empty(&buf->rb_mws)) { - r = list_entry(buf->rb_mws.next, - struct rpcrdma_mw, mw_list); - list_del(&r->mw_list); - req->rl_segments[i].rl_mw = r; - if (unlikely(i-- == 0)) - return req; /* Success */ - } - - /* Not enough entries on rb_mws for this req */ - rpcrdma_buffer_put_sendbuf(req, buf); - rpcrdma_buffer_put_mrs(req, buf); - return NULL; -} - /* * Get a set of request/reply buffers. * @@ -1253,12 +1141,11 @@ rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) struct rpcrdma_req * rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) { - struct rpcrdma_ia *ia = rdmab_to_ia(buffers); - struct list_head stale; struct rpcrdma_req *req; unsigned long flags; spin_lock_irqsave(&buffers->rb_lock, flags); + if (buffers->rb_send_index == buffers->rb_max_requests) { spin_unlock_irqrestore(&buffers->rb_lock, flags); dprintk("RPC: %s: out of request buffers\n", __func__); @@ -1277,20 +1164,7 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) } buffers->rb_send_bufs[buffers->rb_send_index++] = NULL; - INIT_LIST_HEAD(&stale); - switch (ia->ri_memreg_strategy) { - case RPCRDMA_FRMR: - req = rpcrdma_buffer_get_frmrs(req, buffers, &stale); - break; - case RPCRDMA_MTHCAFMR: - req = rpcrdma_buffer_get_fmrs(req, buffers); - break; - default: - break; - } spin_unlock_irqrestore(&buffers->rb_lock, flags); - if (!list_empty(&stale)) - rpcrdma_retry_flushed_linv(&stale, buffers); return req; } @@ -1302,19 +1176,10 @@ void rpcrdma_buffer_put(struct rpcrdma_req *req) { struct rpcrdma_buffer *buffers = req->rl_buffer; - struct rpcrdma_ia *ia = rdmab_to_ia(buffers); unsigned long flags; spin_lock_irqsave(&buffers->rb_lock, flags); rpcrdma_buffer_put_sendbuf(req, buffers); - switch (ia->ri_memreg_strategy) { - case RPCRDMA_FRMR: - case RPCRDMA_MTHCAFMR: - rpcrdma_buffer_put_mrs(req, buffers); - break; - default: - break; - } spin_unlock_irqrestore(&buffers->rb_lock, flags); } @@ -1344,10 +1209,9 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req) void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) { - struct rpcrdma_buffer *buffers = rep->rr_buffer; + struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; unsigned long flags; - rep->rr_func = NULL; spin_lock_irqsave(&buffers->rb_lock, flags); buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep; spin_unlock_irqrestore(&buffers->rb_lock, flags); @@ -1376,9 +1240,9 @@ rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len, /* * All memory passed here was kmalloc'ed, therefore phys-contiguous. */ - iov->addr = ib_dma_map_single(ia->ri_id->device, + iov->addr = ib_dma_map_single(ia->ri_device, va, len, DMA_BIDIRECTIONAL); - if (ib_dma_mapping_error(ia->ri_id->device, iov->addr)) + if (ib_dma_mapping_error(ia->ri_device, iov->addr)) return -ENOMEM; iov->length = len; @@ -1422,8 +1286,8 @@ rpcrdma_deregister_internal(struct rpcrdma_ia *ia, { int rc; - ib_dma_unmap_single(ia->ri_id->device, - iov->addr, iov->length, DMA_BIDIRECTIONAL); + ib_dma_unmap_single(ia->ri_device, + iov->addr, iov->length, DMA_BIDIRECTIONAL); if (NULL == mr) return 0; @@ -1516,15 +1380,18 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, send_wr.num_sge = req->rl_niovs; send_wr.opcode = IB_WR_SEND; if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */ - ib_dma_sync_single_for_device(ia->ri_id->device, - req->rl_send_iov[3].addr, req->rl_send_iov[3].length, - DMA_TO_DEVICE); - ib_dma_sync_single_for_device(ia->ri_id->device, - req->rl_send_iov[1].addr, req->rl_send_iov[1].length, - DMA_TO_DEVICE); - ib_dma_sync_single_for_device(ia->ri_id->device, - req->rl_send_iov[0].addr, req->rl_send_iov[0].length, - DMA_TO_DEVICE); + ib_dma_sync_single_for_device(ia->ri_device, + req->rl_send_iov[3].addr, + req->rl_send_iov[3].length, + DMA_TO_DEVICE); + ib_dma_sync_single_for_device(ia->ri_device, + req->rl_send_iov[1].addr, + req->rl_send_iov[1].length, + DMA_TO_DEVICE); + ib_dma_sync_single_for_device(ia->ri_device, + req->rl_send_iov[0].addr, + req->rl_send_iov[0].length, + DMA_TO_DEVICE); if (DECR_CQCOUNT(ep) > 0) send_wr.send_flags = 0; @@ -1557,7 +1424,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; recv_wr.num_sge = 1; - ib_dma_sync_single_for_cpu(ia->ri_id->device, + ib_dma_sync_single_for_cpu(ia->ri_device, rdmab_addr(rep->rr_rdmabuf), rdmab_length(rep->rr_rdmabuf), DMA_BIDIRECTIONAL); diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 58163b88738c..f49dd8b38122 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -62,6 +62,7 @@ struct rpcrdma_ia { const struct rpcrdma_memreg_ops *ri_ops; rwlock_t ri_qplock; + struct ib_device *ri_device; struct rdma_cm_id *ri_id; struct ib_pd *ri_pd; struct ib_mr *ri_bind_mem; @@ -69,7 +70,6 @@ struct rpcrdma_ia { int ri_have_dma_lkey; struct completion ri_done; int ri_async_rc; - enum rpcrdma_memreg ri_memreg_strategy; unsigned int ri_max_frmr_depth; struct ib_device_attr ri_devattr; struct ib_qp_attr ri_qp_attr; @@ -173,9 +173,8 @@ struct rpcrdma_buffer; struct rpcrdma_rep { unsigned int rr_len; - struct rpcrdma_buffer *rr_buffer; - struct rpc_xprt *rr_xprt; - void (*rr_func)(struct rpcrdma_rep *); + struct ib_device *rr_device; + struct rpcrdma_xprt *rr_rxprt; struct list_head rr_list; struct rpcrdma_regbuf *rr_rdmabuf; }; @@ -203,11 +202,18 @@ struct rpcrdma_frmr { struct ib_fast_reg_page_list *fr_pgl; struct ib_mr *fr_mr; enum rpcrdma_frmr_state fr_state; + struct work_struct fr_work; + struct rpcrdma_xprt *fr_xprt; +}; + +struct rpcrdma_fmr { + struct ib_fmr *fmr; + u64 *physaddrs; }; struct rpcrdma_mw { union { - struct ib_fmr *fmr; + struct rpcrdma_fmr fmr; struct rpcrdma_frmr frmr; } r; void (*mw_sendcompletion)(struct ib_wc *); @@ -281,15 +287,17 @@ rpcr_to_rdmar(struct rpc_rqst *rqst) * One of these is associated with a transport instance */ struct rpcrdma_buffer { - spinlock_t rb_lock; /* protects indexes */ - u32 rb_max_requests;/* client max requests */ - struct list_head rb_mws; /* optional memory windows/fmrs/frmrs */ - struct list_head rb_all; - int rb_send_index; + spinlock_t rb_mwlock; /* protect rb_mws list */ + struct list_head rb_mws; + struct list_head rb_all; + char *rb_pool; + + spinlock_t rb_lock; /* protect buf arrays */ + u32 rb_max_requests; + int rb_send_index; + int rb_recv_index; struct rpcrdma_req **rb_send_bufs; - int rb_recv_index; struct rpcrdma_rep **rb_recv_bufs; - char *rb_pool; }; #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) @@ -350,7 +358,6 @@ struct rpcrdma_memreg_ops { struct rpcrdma_create_data_internal *); size_t (*ro_maxpages)(struct rpcrdma_xprt *); int (*ro_init)(struct rpcrdma_xprt *); - void (*ro_reset)(struct rpcrdma_xprt *); void (*ro_destroy)(struct rpcrdma_buffer *); const char *ro_displayname; }; @@ -413,6 +420,8 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *, int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); +struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *); +void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *); struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); void rpcrdma_buffer_put(struct rpcrdma_req *); void rpcrdma_recv_buffer_get(struct rpcrdma_req *); @@ -425,6 +434,9 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *, unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); +int frwr_alloc_recovery_wq(void); +void frwr_destroy_recovery_wq(void); + /* * Wrappers for chunk registration, shared by read/write chunk code. */ diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index b0517287075b..e193c2b5476b 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -623,24 +623,6 @@ process_status: } /** - * xs_tcp_shutdown - gracefully shut down a TCP socket - * @xprt: transport - * - * Initiates a graceful shutdown of the TCP socket by calling the - * equivalent of shutdown(SHUT_RDWR); - */ -static void xs_tcp_shutdown(struct rpc_xprt *xprt) -{ - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - struct socket *sock = transport->sock; - - if (sock != NULL) { - kernel_sock_shutdown(sock, SHUT_RDWR); - trace_rpc_socket_shutdown(xprt, sock); - } -} - -/** * xs_tcp_send_request - write an RPC request to a TCP socket * @task: address of RPC task that manages the state of an RPC request * @@ -786,6 +768,7 @@ static void xs_sock_mark_closed(struct rpc_xprt *xprt) xs_sock_reset_connection_flags(xprt); /* Mark transport as closed and wake up all pending tasks */ xprt_disconnect_done(xprt); + xprt_force_disconnect(xprt); } /** @@ -827,6 +810,9 @@ static void xs_reset_transport(struct sock_xprt *transport) if (sk == NULL) return; + if (atomic_read(&transport->xprt.swapper)) + sk_clear_memalloc(sk); + write_lock_bh(&sk->sk_callback_lock); transport->inet = NULL; transport->sock = NULL; @@ -863,6 +849,13 @@ static void xs_close(struct rpc_xprt *xprt) xprt_disconnect_done(xprt); } +static void xs_inject_disconnect(struct rpc_xprt *xprt) +{ + dprintk("RPC: injecting transport disconnect on xprt=%p\n", + xprt); + xprt_disconnect_done(xprt); +} + static void xs_xprt_free(struct rpc_xprt *xprt) { xs_free_peer_addresses(xprt); @@ -901,7 +894,6 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) /** * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets * @sk: socket with data to read - * @len: how much data to read * * Currently this assumes we can read the whole reply in a single gulp. */ @@ -965,7 +957,6 @@ static void xs_local_data_ready(struct sock *sk) /** * xs_udp_data_ready - "data ready" callback for UDP sockets * @sk: socket with data to read - * @len: how much data to read * */ static void xs_udp_data_ready(struct sock *sk) @@ -1389,7 +1380,6 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns /** * xs_tcp_data_ready - "data ready" callback for TCP sockets * @sk: socket with data to read - * @bytes: how much data to read * */ static void xs_tcp_data_ready(struct sock *sk) @@ -1886,9 +1876,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, /** * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint - * @xprt: RPC transport to connect * @transport: socket transport to connect - * @create_sock: function to create a socket of the correct type */ static int xs_local_setup_socket(struct sock_xprt *transport) { @@ -1960,43 +1948,84 @@ static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task) msleep_interruptible(15000); } -#ifdef CONFIG_SUNRPC_SWAP +#if IS_ENABLED(CONFIG_SUNRPC_SWAP) +/* + * Note that this should be called with XPRT_LOCKED held (or when we otherwise + * know that we have exclusive access to the socket), to guard against + * races with xs_reset_transport. + */ static void xs_set_memalloc(struct rpc_xprt *xprt) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - if (xprt->swapper) + /* + * If there's no sock, then we have nothing to set. The + * reconnecting process will get it for us. + */ + if (!transport->inet) + return; + if (atomic_read(&xprt->swapper)) sk_set_memalloc(transport->inet); } /** - * xs_swapper - Tag this transport as being used for swap. + * xs_enable_swap - Tag this transport as being used for swap. * @xprt: transport to tag - * @enable: enable/disable * + * Take a reference to this transport on behalf of the rpc_clnt, and + * optionally mark it for swapping if it wasn't already. */ -int xs_swapper(struct rpc_xprt *xprt, int enable) +static int +xs_enable_swap(struct rpc_xprt *xprt) { - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, - xprt); - int err = 0; + struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt); - if (enable) { - xprt->swapper++; - xs_set_memalloc(xprt); - } else if (xprt->swapper) { - xprt->swapper--; - sk_clear_memalloc(transport->inet); - } + if (atomic_inc_return(&xprt->swapper) != 1) + return 0; + if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE)) + return -ERESTARTSYS; + if (xs->inet) + sk_set_memalloc(xs->inet); + xprt_release_xprt(xprt, NULL); + return 0; +} - return err; +/** + * xs_disable_swap - Untag this transport as being used for swap. + * @xprt: transport to tag + * + * Drop a "swapper" reference to this xprt on behalf of the rpc_clnt. If the + * swapper refcount goes to 0, untag the socket as a memalloc socket. + */ +static void +xs_disable_swap(struct rpc_xprt *xprt) +{ + struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt); + + if (!atomic_dec_and_test(&xprt->swapper)) + return; + if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE)) + return; + if (xs->inet) + sk_clear_memalloc(xs->inet); + xprt_release_xprt(xprt, NULL); } -EXPORT_SYMBOL_GPL(xs_swapper); #else static void xs_set_memalloc(struct rpc_xprt *xprt) { } + +static int +xs_enable_swap(struct rpc_xprt *xprt) +{ + return -EINVAL; +} + +static void +xs_disable_swap(struct rpc_xprt *xprt) +{ +} #endif static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) @@ -2057,6 +2086,27 @@ out: xprt_wake_pending_tasks(xprt, status); } +/** + * xs_tcp_shutdown - gracefully shut down a TCP socket + * @xprt: transport + * + * Initiates a graceful shutdown of the TCP socket by calling the + * equivalent of shutdown(SHUT_RDWR); + */ +static void xs_tcp_shutdown(struct rpc_xprt *xprt) +{ + struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + struct socket *sock = transport->sock; + + if (sock == NULL) + return; + if (xprt_connected(xprt)) { + kernel_sock_shutdown(sock, SHUT_RDWR); + trace_rpc_socket_shutdown(xprt, sock); + } else + xs_reset_transport(transport); +} + static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); @@ -2067,6 +2117,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) unsigned int keepidle = xprt->timeout->to_initval / HZ; unsigned int keepcnt = xprt->timeout->to_retries + 1; unsigned int opt_on = 1; + unsigned int timeo; /* TCP Keepalive options */ kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, @@ -2078,6 +2129,12 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT, (char *)&keepcnt, sizeof(keepcnt)); + /* TCP user timeout (see RFC5482) */ + timeo = jiffies_to_msecs(xprt->timeout->to_initval) * + (xprt->timeout->to_retries + 1); + kernel_setsockopt(sock, SOL_TCP, TCP_USER_TIMEOUT, + (char *)&timeo, sizeof(timeo)); + write_lock_bh(&sk->sk_callback_lock); xs_save_old_callbacks(transport, sk); @@ -2125,9 +2182,6 @@ out: /** * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint - * @xprt: RPC transport to connect - * @transport: socket transport to connect - * @create_sock: function to create a socket of the correct type * * Invoked by a work queue tasklet. */ @@ -2463,6 +2517,8 @@ static struct rpc_xprt_ops xs_local_ops = { .close = xs_close, .destroy = xs_destroy, .print_stats = xs_local_print_stats, + .enable_swap = xs_enable_swap, + .disable_swap = xs_disable_swap, }; static struct rpc_xprt_ops xs_udp_ops = { @@ -2482,6 +2538,9 @@ static struct rpc_xprt_ops xs_udp_ops = { .close = xs_close, .destroy = xs_destroy, .print_stats = xs_udp_print_stats, + .enable_swap = xs_enable_swap, + .disable_swap = xs_disable_swap, + .inject_disconnect = xs_inject_disconnect, }; static struct rpc_xprt_ops xs_tcp_ops = { @@ -2498,6 +2557,9 @@ static struct rpc_xprt_ops xs_tcp_ops = { .close = xs_tcp_shutdown, .destroy = xs_destroy, .print_stats = xs_tcp_print_stats, + .enable_swap = xs_enable_swap, + .disable_swap = xs_disable_swap, + .inject_disconnect = xs_inject_disconnect, }; /* @@ -2515,6 +2577,9 @@ static struct rpc_xprt_ops bc_tcp_ops = { .close = bc_close, .destroy = bc_destroy, .print_stats = xs_tcp_print_stats, + .enable_swap = xs_enable_swap, + .disable_swap = xs_disable_swap, + .inject_disconnect = xs_inject_disconnect, }; static int xs_init_anyaddr(const int family, struct sockaddr *sap) |