summaryrefslogtreecommitdiff
path: root/net/rds
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds')
-rw-r--r--net/rds/af_rds.c14
-rw-r--r--net/rds/connection.c7
-rw-r--r--net/rds/ib.c3
-rw-r--r--net/rds/message.c163
-rw-r--r--net/rds/rds.h31
-rw-r--r--net/rds/recv.c42
-rw-r--r--net/rds/send.c54
-rw-r--r--net/rds/tcp.c115
8 files changed, 319 insertions, 110 deletions
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index 744c637c86b0..ab751a150f70 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -77,6 +77,7 @@ static int rds_release(struct socket *sock)
rds_send_drop_to(rs, NULL);
rds_rdma_drop_keys(rs);
rds_notify_queue_get(rs, NULL);
+ rds_notify_msg_zcopy_purge(&rs->rs_zcookie_queue);
spin_lock_bh(&rds_sock_lock);
list_del_init(&rs->rs_item);
@@ -110,7 +111,7 @@ void rds_wake_sk_sleep(struct rds_sock *rs)
}
static int rds_getname(struct socket *sock, struct sockaddr *uaddr,
- int *uaddr_len, int peer)
+ int peer)
{
struct sockaddr_in *sin = (struct sockaddr_in *)uaddr;
struct rds_sock *rs = rds_sk_to_rs(sock->sk);
@@ -131,8 +132,7 @@ static int rds_getname(struct socket *sock, struct sockaddr *uaddr,
sin->sin_family = AF_INET;
- *uaddr_len = sizeof(*sin);
- return 0;
+ return sizeof(*sin);
}
/*
@@ -145,7 +145,7 @@ static int rds_getname(struct socket *sock, struct sockaddr *uaddr,
* - to signal that a previously congested destination may have become
* uncongested
* - A notification has been queued to the socket (this can be a congestion
- * update, or a RDMA completion).
+ * update, or a RDMA completion, or a MSG_ZEROCOPY completion).
*
* EPOLLOUT is asserted if there is room on the send queue. This does not mean
* however, that the next sendmsg() call will succeed. If the application tries
@@ -179,10 +179,13 @@ static __poll_t rds_poll(struct file *file, struct socket *sock,
spin_unlock(&rs->rs_lock);
}
if (!list_empty(&rs->rs_recv_queue) ||
- !list_empty(&rs->rs_notify_queue))
+ !list_empty(&rs->rs_notify_queue) ||
+ !list_empty(&rs->rs_zcookie_queue.zcookie_head))
mask |= (EPOLLIN | EPOLLRDNORM);
if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
mask |= (EPOLLOUT | EPOLLWRNORM);
+ if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
+ mask |= POLLERR;
read_unlock_irqrestore(&rs->rs_recv_lock, flags);
/* clear state any time we wake a seen-congested socket */
@@ -512,6 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
INIT_LIST_HEAD(&rs->rs_recv_queue);
INIT_LIST_HEAD(&rs->rs_notify_queue);
INIT_LIST_HEAD(&rs->rs_cong_list);
+ rds_message_zcopy_queue_init(&rs->rs_zcookie_queue);
spin_lock_init(&rs->rs_rdma_lock);
rs->rs_rdma_keys = RB_ROOT;
rs->rs_rx_traces = 0;
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 2da3176bf792..abef75da89a7 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -540,9 +540,9 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
struct rds_info_iterator *iter,
struct rds_info_lengths *lens,
int (*visitor)(struct rds_connection *, void *),
+ u64 *buffer,
size_t item_len)
{
- uint64_t buffer[(item_len + 7) / 8];
struct hlist_head *head;
struct rds_connection *conn;
size_t i;
@@ -578,9 +578,9 @@ static void rds_walk_conn_path_info(struct socket *sock, unsigned int len,
struct rds_info_iterator *iter,
struct rds_info_lengths *lens,
int (*visitor)(struct rds_conn_path *, void *),
+ u64 *buffer,
size_t item_len)
{
- u64 buffer[(item_len + 7) / 8];
struct hlist_head *head;
struct rds_connection *conn;
size_t i;
@@ -649,8 +649,11 @@ static void rds_conn_info(struct socket *sock, unsigned int len,
struct rds_info_iterator *iter,
struct rds_info_lengths *lens)
{
+ u64 buffer[(sizeof(struct rds_info_connection) + 7) / 8];
+
rds_walk_conn_path_info(sock, len, iter, lens,
rds_conn_info_visitor,
+ buffer,
sizeof(struct rds_info_connection));
}
diff --git a/net/rds/ib.c b/net/rds/ib.c
index 50a88f3e7e39..02deee29e7f1 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -321,8 +321,11 @@ static void rds_ib_ic_info(struct socket *sock, unsigned int len,
struct rds_info_iterator *iter,
struct rds_info_lengths *lens)
{
+ u64 buffer[(sizeof(struct rds_info_rdma_connection) + 7) / 8];
+
rds_for_each_conn_info(sock, len, iter, lens,
rds_ib_conn_info_visitor,
+ buffer,
sizeof(struct rds_info_rdma_connection));
}
diff --git a/net/rds/message.c b/net/rds/message.c
index 4318cc9b78f7..a35f76971984 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -33,6 +33,9 @@
#include <linux/kernel.h>
#include <linux/slab.h>
#include <linux/export.h>
+#include <linux/skbuff.h>
+#include <linux/list.h>
+#include <linux/errqueue.h>
#include "rds.h"
@@ -45,7 +48,6 @@ static unsigned int rds_exthdr_size[__RDS_EXTHDR_MAX] = {
[RDS_EXTHDR_GEN_NUM] = sizeof(u32),
};
-
void rds_message_addref(struct rds_message *rm)
{
rdsdebug("addref rm %p ref %d\n", rm, refcount_read(&rm->m_refcount));
@@ -53,20 +55,107 @@ void rds_message_addref(struct rds_message *rm)
}
EXPORT_SYMBOL_GPL(rds_message_addref);
+static inline bool rds_zcookie_add(struct rds_msg_zcopy_info *info, u32 cookie)
+{
+ struct rds_zcopy_cookies *ck = &info->zcookies;
+ int ncookies = ck->num;
+
+ if (ncookies == RDS_MAX_ZCOOKIES)
+ return false;
+ ck->cookies[ncookies] = cookie;
+ ck->num = ++ncookies;
+ return true;
+}
+
+static struct rds_msg_zcopy_info *rds_info_from_znotifier(struct rds_znotifier *znotif)
+{
+ return container_of(znotif, struct rds_msg_zcopy_info, znotif);
+}
+
+void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *q)
+{
+ unsigned long flags;
+ LIST_HEAD(copy);
+ struct rds_msg_zcopy_info *info, *tmp;
+
+ spin_lock_irqsave(&q->lock, flags);
+ list_splice(&q->zcookie_head, &copy);
+ INIT_LIST_HEAD(&q->zcookie_head);
+ spin_unlock_irqrestore(&q->lock, flags);
+
+ list_for_each_entry_safe(info, tmp, &copy, rs_zcookie_next) {
+ list_del(&info->rs_zcookie_next);
+ kfree(info);
+ }
+}
+
+static void rds_rm_zerocopy_callback(struct rds_sock *rs,
+ struct rds_znotifier *znotif)
+{
+ struct rds_msg_zcopy_info *info;
+ struct rds_msg_zcopy_queue *q;
+ u32 cookie = znotif->z_cookie;
+ struct rds_zcopy_cookies *ck;
+ struct list_head *head;
+ unsigned long flags;
+
+ mm_unaccount_pinned_pages(&znotif->z_mmp);
+ q = &rs->rs_zcookie_queue;
+ spin_lock_irqsave(&q->lock, flags);
+ head = &q->zcookie_head;
+ if (!list_empty(head)) {
+ info = list_entry(head, struct rds_msg_zcopy_info,
+ rs_zcookie_next);
+ if (info && rds_zcookie_add(info, cookie)) {
+ spin_unlock_irqrestore(&q->lock, flags);
+ kfree(rds_info_from_znotifier(znotif));
+ /* caller invokes rds_wake_sk_sleep() */
+ return;
+ }
+ }
+
+ info = rds_info_from_znotifier(znotif);
+ ck = &info->zcookies;
+ memset(ck, 0, sizeof(*ck));
+ WARN_ON(!rds_zcookie_add(info, cookie));
+ list_add_tail(&q->zcookie_head, &info->rs_zcookie_next);
+
+ spin_unlock_irqrestore(&q->lock, flags);
+ /* caller invokes rds_wake_sk_sleep() */
+}
+
/*
* This relies on dma_map_sg() not touching sg[].page during merging.
*/
static void rds_message_purge(struct rds_message *rm)
{
- unsigned long i;
+ unsigned long i, flags;
+ bool zcopy = false;
if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
return;
+ spin_lock_irqsave(&rm->m_rs_lock, flags);
+ if (rm->m_rs) {
+ struct rds_sock *rs = rm->m_rs;
+
+ if (rm->data.op_mmp_znotifier) {
+ zcopy = true;
+ rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
+ rds_wake_sk_sleep(rs);
+ rm->data.op_mmp_znotifier = NULL;
+ }
+ sock_put(rds_rs_to_sk(rs));
+ rm->m_rs = NULL;
+ }
+ spin_unlock_irqrestore(&rm->m_rs_lock, flags);
+
for (i = 0; i < rm->data.op_nents; i++) {
- rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
/* XXX will have to put_page for page refs */
- __free_page(sg_page(&rm->data.op_sg[i]));
+ if (!zcopy)
+ __free_page(sg_page(&rm->data.op_sg[i]));
+ else
+ put_page(sg_page(&rm->data.op_sg[i]));
}
rm->data.op_nents = 0;
@@ -266,12 +355,13 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in
return rm;
}
-int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
+static int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from)
{
- unsigned long to_copy, nbytes;
- unsigned long sg_off;
struct scatterlist *sg;
int ret = 0;
+ int length = iov_iter_count(from);
+ int total_copied = 0;
+ struct rds_msg_zcopy_info *info;
rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
@@ -279,8 +369,67 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
* now allocate and copy in the data payload.
*/
sg = rm->data.op_sg;
+
+ info = kzalloc(sizeof(*info), GFP_KERNEL);
+ if (!info)
+ return -ENOMEM;
+ INIT_LIST_HEAD(&info->rs_zcookie_next);
+ rm->data.op_mmp_znotifier = &info->znotif;
+ if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
+ length)) {
+ ret = -ENOMEM;
+ goto err;
+ }
+ while (iov_iter_count(from)) {
+ struct page *pages;
+ size_t start;
+ ssize_t copied;
+
+ copied = iov_iter_get_pages(from, &pages, PAGE_SIZE,
+ 1, &start);
+ if (copied < 0) {
+ struct mmpin *mmp;
+ int i;
+
+ for (i = 0; i < rm->data.op_nents; i++)
+ put_page(sg_page(&rm->data.op_sg[i]));
+ mmp = &rm->data.op_mmp_znotifier->z_mmp;
+ mm_unaccount_pinned_pages(mmp);
+ ret = -EFAULT;
+ goto err;
+ }
+ total_copied += copied;
+ iov_iter_advance(from, copied);
+ length -= copied;
+ sg_set_page(sg, pages, copied, start);
+ rm->data.op_nents++;
+ sg++;
+ }
+ WARN_ON_ONCE(length != 0);
+ return ret;
+err:
+ kfree(info);
+ rm->data.op_mmp_znotifier = NULL;
+ return ret;
+}
+
+int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
+ bool zcopy)
+{
+ unsigned long to_copy, nbytes;
+ unsigned long sg_off;
+ struct scatterlist *sg;
+ int ret = 0;
+
+ rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
+
+ /* now allocate and copy in the data payload. */
+ sg = rm->data.op_sg;
sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
+ if (zcopy)
+ return rds_message_zcopy_from_user(rm, from);
+
while (iov_iter_count(from)) {
if (!sg_page(sg)) {
ret = rds_page_remainder_alloc(sg, iov_iter_count(from),
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 7301b9b01890..b04c333d9d1c 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -356,6 +356,30 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
#define RDS_MSG_PAGEVEC 7
#define RDS_MSG_FLUSH 8
+struct rds_znotifier {
+ struct mmpin z_mmp;
+ u32 z_cookie;
+};
+
+struct rds_msg_zcopy_info {
+ struct list_head rs_zcookie_next;
+ union {
+ struct rds_znotifier znotif;
+ struct rds_zcopy_cookies zcookies;
+ };
+};
+
+struct rds_msg_zcopy_queue {
+ struct list_head zcookie_head;
+ spinlock_t lock; /* protects zcookie_head queue */
+};
+
+static inline void rds_message_zcopy_queue_init(struct rds_msg_zcopy_queue *q)
+{
+ spin_lock_init(&q->lock);
+ INIT_LIST_HEAD(&q->zcookie_head);
+}
+
struct rds_message {
refcount_t m_refcount;
struct list_head m_sock_item;
@@ -436,6 +460,7 @@ struct rds_message {
unsigned int op_count;
unsigned int op_dmasg;
unsigned int op_dmaoff;
+ struct rds_znotifier *op_mmp_znotifier;
struct scatterlist *op_sg;
} data;
};
@@ -589,6 +614,7 @@ struct rds_sock {
/* Socket receive path trace points*/
u8 rs_rx_traces;
u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
+ struct rds_msg_zcopy_queue rs_zcookie_queue;
};
static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
@@ -709,6 +735,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
struct rds_info_iterator *iter,
struct rds_info_lengths *lens,
int (*visitor)(struct rds_connection *, void *),
+ u64 *buffer,
size_t item_len);
__printf(2, 3)
@@ -771,7 +798,8 @@ rds_conn_connecting(struct rds_connection *conn)
/* message.c */
struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp);
struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents);
-int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from);
+int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
+ bool zcopy);
struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len);
void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
__be16 dport, u64 seq);
@@ -786,6 +814,7 @@ void rds_message_addref(struct rds_message *rm);
void rds_message_put(struct rds_message *rm);
void rds_message_wait(struct rds_message *rm);
void rds_message_unmapped(struct rds_message *rm);
+void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *info);
static inline void rds_message_make_checksum(struct rds_header *hdr)
{
diff --git a/net/rds/recv.c b/net/rds/recv.c
index b25bcfe411ca..de50e2126e40 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -577,6 +577,41 @@ out:
return ret;
}
+static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
+{
+ struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue;
+ struct rds_msg_zcopy_info *info = NULL;
+ struct rds_zcopy_cookies *done;
+ unsigned long flags;
+
+ if (!msg->msg_control)
+ return false;
+
+ if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) ||
+ msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
+ return false;
+
+ spin_lock_irqsave(&q->lock, flags);
+ if (!list_empty(&q->zcookie_head)) {
+ info = list_entry(q->zcookie_head.next,
+ struct rds_msg_zcopy_info, rs_zcookie_next);
+ list_del(&info->rs_zcookie_next);
+ }
+ spin_unlock_irqrestore(&q->lock, flags);
+ if (!info)
+ return false;
+ done = &info->zcookies;
+ if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
+ done)) {
+ spin_lock_irqsave(&q->lock, flags);
+ list_add(&info->rs_zcookie_next, &q->zcookie_head);
+ spin_unlock_irqrestore(&q->lock, flags);
+ return false;
+ }
+ kfree(info);
+ return true;
+}
+
int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
int msg_flags)
{
@@ -594,6 +629,8 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
if (msg_flags & MSG_OOB)
goto out;
+ if (msg_flags & MSG_ERRQUEUE)
+ return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);
while (1) {
/* If there are pending notifications, do those - and nothing else */
@@ -609,7 +646,9 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
if (!rds_next_incoming(rs, &inc)) {
if (nonblock) {
- ret = -EAGAIN;
+ bool reaped = rds_recvmsg_zcookie(rs, msg);
+
+ ret = reaped ? 0 : -EAGAIN;
break;
}
@@ -658,6 +697,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
ret = -EFAULT;
goto out;
}
+ rds_recvmsg_zcookie(rs, msg);
rds_stats_inc(s_recv_delivered);
diff --git a/net/rds/send.c b/net/rds/send.c
index b1b0022b8370..acad04243b41 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -649,7 +649,6 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status)
rm->rdma.op_notifier = NULL;
}
was_on_sock = 1;
- rm->m_rs = NULL;
}
spin_unlock(&rs->rs_lock);
@@ -756,9 +755,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
*/
if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
spin_unlock_irqrestore(&cp->cp_lock, flags);
- spin_lock_irqsave(&rm->m_rs_lock, flags);
- rm->m_rs = NULL;
- spin_unlock_irqrestore(&rm->m_rs_lock, flags);
continue;
}
list_del_init(&rm->m_conn_item);
@@ -774,7 +770,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
__rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
spin_unlock(&rs->rs_lock);
- rm->m_rs = NULL;
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
rds_message_put(rm);
@@ -798,7 +793,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
__rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
spin_unlock(&rs->rs_lock);
- rm->m_rs = NULL;
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
rds_message_put(rm);
@@ -849,6 +843,7 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
set_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
rds_message_addref(rm);
+ sock_hold(rds_rs_to_sk(rs));
rm->m_rs = rs;
/* The code ordering is a little weird, but we're
@@ -880,12 +875,13 @@ out:
* rds_message is getting to be quite complicated, and we'd like to allocate
* it all in one go. This figures out how big it needs to be up front.
*/
-static int rds_rm_size(struct msghdr *msg, int data_len)
+static int rds_rm_size(struct msghdr *msg, int num_sgs)
{
struct cmsghdr *cmsg;
int size = 0;
int cmsg_groups = 0;
int retval;
+ bool zcopy_cookie = false;
for_each_cmsghdr(cmsg, msg) {
if (!CMSG_OK(msg, cmsg))
@@ -904,6 +900,10 @@ static int rds_rm_size(struct msghdr *msg, int data_len)
break;
+ case RDS_CMSG_ZCOPY_COOKIE:
+ zcopy_cookie = true;
+ /* fall through */
+
case RDS_CMSG_RDMA_DEST:
case RDS_CMSG_RDMA_MAP:
cmsg_groups |= 2;
@@ -924,7 +924,10 @@ static int rds_rm_size(struct msghdr *msg, int data_len)
}
- size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist);
+ if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie)
+ return -EINVAL;
+
+ size += num_sgs * sizeof(struct scatterlist);
/* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
if (cmsg_groups == 3)
@@ -933,6 +936,19 @@ static int rds_rm_size(struct msghdr *msg, int data_len)
return size;
}
+static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm,
+ struct cmsghdr *cmsg)
+{
+ u32 *cookie;
+
+ if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) ||
+ !rm->data.op_mmp_znotifier)
+ return -EINVAL;
+ cookie = CMSG_DATA(cmsg);
+ rm->data.op_mmp_znotifier->z_cookie = *cookie;
+ return 0;
+}
+
static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
struct msghdr *msg, int *allocated_mr)
{
@@ -975,6 +991,10 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
ret = rds_cmsg_atomic(rs, rm, cmsg);
break;
+ case RDS_CMSG_ZCOPY_COOKIE:
+ ret = rds_cmsg_zcopy(rs, rm, cmsg);
+ break;
+
default:
return -EINVAL;
}
@@ -1045,10 +1065,13 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
long timeo = sock_sndtimeo(sk, nonblock);
struct rds_conn_path *cpath;
size_t total_payload_len = payload_len, rdma_payload_len = 0;
+ bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) &&
+ sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY));
+ int num_sgs = ceil(payload_len, PAGE_SIZE);
/* Mirror Linux UDP mirror of BSD error message compatibility */
/* XXX: Perhaps MSG_MORE someday */
- if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) {
+ if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) {
ret = -EOPNOTSUPP;
goto out;
}
@@ -1092,8 +1115,15 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
goto out;
}
+ if (zcopy) {
+ if (rs->rs_transport->t_type != RDS_TRANS_TCP) {
+ ret = -EOPNOTSUPP;
+ goto out;
+ }
+ num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX);
+ }
/* size of rm including all sgs */
- ret = rds_rm_size(msg, payload_len);
+ ret = rds_rm_size(msg, num_sgs);
if (ret < 0)
goto out;
@@ -1105,12 +1135,12 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
/* Attach data to the rm */
if (payload_len) {
- rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
+ rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs);
if (!rm->data.op_sg) {
ret = -ENOMEM;
goto out;
}
- ret = rds_message_copy_from_user(rm, &msg->msg_iter);
+ ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy);
if (ret)
goto out;
}
diff --git a/net/rds/tcp.c b/net/rds/tcp.c
index 44c4652721af..351a28474667 100644
--- a/net/rds/tcp.c
+++ b/net/rds/tcp.c
@@ -227,7 +227,6 @@ static void rds_tcp_tc_info(struct socket *rds_sock, unsigned int len,
struct rds_tcp_connection *tc;
unsigned long flags;
struct sockaddr_in sin;
- int sinlen;
struct socket *sock;
spin_lock_irqsave(&rds_tcp_tc_list_lock, flags);
@@ -239,12 +238,10 @@ static void rds_tcp_tc_info(struct socket *rds_sock, unsigned int len,
sock = tc->t_sock;
if (sock) {
- sock->ops->getname(sock, (struct sockaddr *)&sin,
- &sinlen, 0);
+ sock->ops->getname(sock, (struct sockaddr *)&sin, 0);
tsinfo.local_addr = sin.sin_addr.s_addr;
tsinfo.local_port = sin.sin_port;
- sock->ops->getname(sock, (struct sockaddr *)&sin,
- &sinlen, 1);
+ sock->ops->getname(sock, (struct sockaddr *)&sin, 1);
tsinfo.peer_addr = sin.sin_addr.s_addr;
tsinfo.peer_port = sin.sin_port;
}
@@ -275,13 +272,14 @@ static int rds_tcp_laddr_check(struct net *net, __be32 addr)
static void rds_tcp_conn_free(void *arg)
{
struct rds_tcp_connection *tc = arg;
+ unsigned long flags;
rdsdebug("freeing tc %p\n", tc);
- spin_lock_bh(&rds_tcp_conn_lock);
+ spin_lock_irqsave(&rds_tcp_conn_lock, flags);
if (!tc->t_tcp_node_detached)
list_del(&tc->t_tcp_node);
- spin_unlock_bh(&rds_tcp_conn_lock);
+ spin_unlock_irqrestore(&rds_tcp_conn_lock, flags);
kmem_cache_free(rds_tcp_conn_slab, tc);
}
@@ -311,13 +309,13 @@ static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
rdsdebug("rds_conn_path [%d] tc %p\n", i,
conn->c_path[i].cp_transport_data);
}
- spin_lock_bh(&rds_tcp_conn_lock);
+ spin_lock_irq(&rds_tcp_conn_lock);
for (i = 0; i < RDS_MPATH_WORKERS; i++) {
tc = conn->c_path[i].cp_transport_data;
tc->t_tcp_node_detached = false;
list_add_tail(&tc->t_tcp_node, &rds_tcp_conn_list);
}
- spin_unlock_bh(&rds_tcp_conn_lock);
+ spin_unlock_irq(&rds_tcp_conn_lock);
fail:
if (ret) {
for (j = 0; j < i; j++)
@@ -487,39 +485,6 @@ fail:
return err;
}
-static void __net_exit rds_tcp_exit_net(struct net *net)
-{
- struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid);
-
- if (rtn->rds_tcp_sysctl)
- unregister_net_sysctl_table(rtn->rds_tcp_sysctl);
-
- if (net != &init_net && rtn->ctl_table)
- kfree(rtn->ctl_table);
-
- /* If rds_tcp_exit_net() is called as a result of netns deletion,
- * the rds_tcp_kill_sock() device notifier would already have cleaned
- * up the listen socket, thus there is no work to do in this function.
- *
- * If rds_tcp_exit_net() is called as a result of module unload,
- * i.e., due to rds_tcp_exit() -> unregister_pernet_subsys(), then
- * we do need to clean up the listen socket here.
- */
- if (rtn->rds_tcp_listen_sock) {
- struct socket *lsock = rtn->rds_tcp_listen_sock;
-
- rtn->rds_tcp_listen_sock = NULL;
- rds_tcp_listen_stop(lsock, &rtn->rds_tcp_accept_w);
- }
-}
-
-static struct pernet_operations rds_tcp_net_ops = {
- .init = rds_tcp_init_net,
- .exit = rds_tcp_exit_net,
- .id = &rds_tcp_netid,
- .size = sizeof(struct rds_tcp_net),
-};
-
static void rds_tcp_kill_sock(struct net *net)
{
struct rds_tcp_connection *tc, *_tc;
@@ -529,7 +494,7 @@ static void rds_tcp_kill_sock(struct net *net)
rtn->rds_tcp_listen_sock = NULL;
rds_tcp_listen_stop(lsock, &rtn->rds_tcp_accept_w);
- spin_lock_bh(&rds_tcp_conn_lock);
+ spin_lock_irq(&rds_tcp_conn_lock);
list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) {
struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net);
@@ -542,45 +507,42 @@ static void rds_tcp_kill_sock(struct net *net)
tc->t_tcp_node_detached = true;
}
}
- spin_unlock_bh(&rds_tcp_conn_lock);
+ spin_unlock_irq(&rds_tcp_conn_lock);
list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node)
rds_conn_destroy(tc->t_cpath->cp_conn);
}
-void *rds_tcp_listen_sock_def_readable(struct net *net)
+static void __net_exit rds_tcp_exit_net(struct net *net)
{
struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid);
- struct socket *lsock = rtn->rds_tcp_listen_sock;
- if (!lsock)
- return NULL;
+ rds_tcp_kill_sock(net);
- return lsock->sk->sk_user_data;
+ if (rtn->rds_tcp_sysctl)
+ unregister_net_sysctl_table(rtn->rds_tcp_sysctl);
+
+ if (net != &init_net && rtn->ctl_table)
+ kfree(rtn->ctl_table);
}
-static int rds_tcp_dev_event(struct notifier_block *this,
- unsigned long event, void *ptr)
+static struct pernet_operations rds_tcp_net_ops = {
+ .init = rds_tcp_init_net,
+ .exit = rds_tcp_exit_net,
+ .id = &rds_tcp_netid,
+ .size = sizeof(struct rds_tcp_net),
+};
+
+void *rds_tcp_listen_sock_def_readable(struct net *net)
{
- struct net_device *dev = netdev_notifier_info_to_dev(ptr);
+ struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid);
+ struct socket *lsock = rtn->rds_tcp_listen_sock;
- /* rds-tcp registers as a pernet subys, so the ->exit will only
- * get invoked after network acitivity has quiesced. We need to
- * clean up all sockets to quiesce network activity, and use
- * the unregistration of the per-net loopback device as a trigger
- * to start that cleanup.
- */
- if (event == NETDEV_UNREGISTER_FINAL &&
- dev->ifindex == LOOPBACK_IFINDEX)
- rds_tcp_kill_sock(dev_net(dev));
+ if (!lsock)
+ return NULL;
- return NOTIFY_DONE;
+ return lsock->sk->sk_user_data;
}
-static struct notifier_block rds_tcp_dev_notifier = {
- .notifier_call = rds_tcp_dev_event,
- .priority = -10, /* must be called after other network notifiers */
-};
-
/* when sysctl is used to modify some kernel socket parameters,this
* function resets the RDS connections in that netns so that we can
* restart with new parameters. The assumption is that such reset
@@ -590,7 +552,7 @@ static void rds_tcp_sysctl_reset(struct net *net)
{
struct rds_tcp_connection *tc, *_tc;
- spin_lock_bh(&rds_tcp_conn_lock);
+ spin_lock_irq(&rds_tcp_conn_lock);
list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) {
struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net);
@@ -600,7 +562,7 @@ static void rds_tcp_sysctl_reset(struct net *net)
/* reconnect with new parameters */
rds_conn_path_drop(tc->t_cpath, false);
}
- spin_unlock_bh(&rds_tcp_conn_lock);
+ spin_unlock_irq(&rds_tcp_conn_lock);
}
static int rds_tcp_skbuf_handler(struct ctl_table *ctl, int write,
@@ -626,9 +588,7 @@ static void rds_tcp_exit(void)
rds_tcp_set_unloading();
synchronize_rcu();
rds_info_deregister_func(RDS_INFO_TCP_SOCKETS, rds_tcp_tc_info);
- unregister_pernet_subsys(&rds_tcp_net_ops);
- if (unregister_netdevice_notifier(&rds_tcp_dev_notifier))
- pr_warn("could not unregister rds_tcp_dev_notifier\n");
+ unregister_pernet_device(&rds_tcp_net_ops);
rds_tcp_destroy_conns();
rds_trans_unregister(&rds_tcp_transport);
rds_tcp_recv_exit();
@@ -652,24 +612,15 @@ static int rds_tcp_init(void)
if (ret)
goto out_slab;
- ret = register_pernet_subsys(&rds_tcp_net_ops);
+ ret = register_pernet_device(&rds_tcp_net_ops);
if (ret)
goto out_recv;
- ret = register_netdevice_notifier(&rds_tcp_dev_notifier);
- if (ret) {
- pr_warn("could not register rds_tcp_dev_notifier\n");
- goto out_pernet;
- }
-
rds_trans_register(&rds_tcp_transport);
rds_info_register_func(RDS_INFO_TCP_SOCKETS, rds_tcp_tc_info);
goto out;
-
-out_pernet:
- unregister_pernet_subsys(&rds_tcp_net_ops);
out_recv:
rds_tcp_recv_exit();
out_slab: