From 01883eda72bd3f0a6c81447e4f223de14033fd9d Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Thu, 15 Feb 2018 10:49:35 -0800 Subject: rds: support for zcopy completion notification RDS removes a datagram (rds_message) from the retransmit queue when an ACK is received. The ACK indicates that the receiver has queued the RDS datagram, so that the sender can safely forget the datagram. When all references to the rds_message are quiesced, rds_message_purge is called to release resources used by the rds_message If the datagram to be removed had pinned pages set up, add an entry to the rs->rs_znotify_queue so that the notifcation will be sent up via rds_rm_zerocopy_callback() when the rds_message is eventually freed by rds_message_purge. rds_rm_zerocopy_callback() attempts to batch the number of cookies sent with each notification to a max of SO_EE_ORIGIN_MAX_ZCOOKIES. This is achieved by checking the tail skb in the sk_error_queue: if this has room for one more cookie, the cookie from the current notification is added; else a new skb is added to the sk_error_queue. Every invocation of rds_rm_zerocopy_callback() will trigger a ->sk_error_report to notify the application. Signed-off-by: Sowmini Varadhan Acked-by: Santosh Shilimkar Acked-by: Willem de Bruijn Signed-off-by: David S. Miller --- net/rds/recv.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'net/rds/recv.c') diff --git a/net/rds/recv.c b/net/rds/recv.c index b25bcfe411ca..b080961464df 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -594,6 +594,8 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, if (msg_flags & MSG_OOB) goto out; + if (msg_flags & MSG_ERRQUEUE) + return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR); while (1) { /* If there are pending notifications, do those - and nothing else */ -- cgit v1.2.3 From 401910db4cd425899832a093539222b6174f92a2 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Tue, 27 Feb 2018 09:52:43 -0800 Subject: rds: deliver zerocopy completion notification with data This commit is an optimization over commit 01883eda72bd ("rds: support for zcopy completion notification") for PF_RDS sockets. RDS applications are predominantly request-response transactions, so it is more efficient to reduce the number of system calls and have zerocopy completion notification delivered as ancillary data on the POLLIN channel. Cookies are passed up as ancillary data (at level SOL_RDS) in a struct rds_zcopy_cookies when the returned value of recvmsg() is greater than, or equal to, 0. A max of RDS_MAX_ZCOOKIES may be passed with each message. This commit removes support for zerocopy completion notification on MSG_ERRQUEUE for PF_RDS sockets. Signed-off-by: Sowmini Varadhan Acked-by: Willem de Bruijn Acked-by: Santosh Shilimkar Signed-off-by: David S. Miller --- include/uapi/linux/errqueue.h | 2 -- include/uapi/linux/rds.h | 7 +++++++ net/rds/af_rds.c | 7 +++++-- net/rds/message.c | 38 ++++++++++++++++---------------------- net/rds/rds.h | 2 ++ net/rds/recv.c | 31 ++++++++++++++++++++++++++++++- 6 files changed, 60 insertions(+), 27 deletions(-) (limited to 'net/rds/recv.c') diff --git a/include/uapi/linux/errqueue.h b/include/uapi/linux/errqueue.h index 28812eda4209..dc64cfaf13da 100644 --- a/include/uapi/linux/errqueue.h +++ b/include/uapi/linux/errqueue.h @@ -20,13 +20,11 @@ struct sock_extended_err { #define SO_EE_ORIGIN_ICMP6 3 #define SO_EE_ORIGIN_TXSTATUS 4 #define SO_EE_ORIGIN_ZEROCOPY 5 -#define SO_EE_ORIGIN_ZCOOKIE 6 #define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS #define SO_EE_OFFENDER(ee) ((struct sockaddr*)((ee)+1)) #define SO_EE_CODE_ZEROCOPY_COPIED 1 -#define SO_EE_ORIGIN_MAX_ZCOOKIES 8 /** * struct scm_timestamping - timestamps exposed through cmsg diff --git a/include/uapi/linux/rds.h b/include/uapi/linux/rds.h index 12e3bca32cad..a66b213de3d7 100644 --- a/include/uapi/linux/rds.h +++ b/include/uapi/linux/rds.h @@ -104,6 +104,7 @@ #define RDS_CMSG_MASKED_ATOMIC_CSWP 9 #define RDS_CMSG_RXPATH_LATENCY 11 #define RDS_CMSG_ZCOPY_COOKIE 12 +#define RDS_CMSG_ZCOPY_COMPLETION 13 #define RDS_INFO_FIRST 10000 #define RDS_INFO_COUNTERS 10000 @@ -317,6 +318,12 @@ struct rds_rdma_notify { #define RDS_RDMA_DROPPED 3 #define RDS_RDMA_OTHER_ERROR 4 +#define RDS_MAX_ZCOOKIES 8 +struct rds_zcopy_cookies { + __u32 num; + __u32 cookies[RDS_MAX_ZCOOKIES]; +}; + /* * Common set of flags for all RDMA related structs */ diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index a937f18896ae..f7126108a811 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -77,6 +77,7 @@ static int rds_release(struct socket *sock) rds_send_drop_to(rs, NULL); rds_rdma_drop_keys(rs); rds_notify_queue_get(rs, NULL); + __skb_queue_purge(&rs->rs_zcookie_queue); spin_lock_bh(&rds_sock_lock); list_del_init(&rs->rs_item); @@ -144,7 +145,7 @@ static int rds_getname(struct socket *sock, struct sockaddr *uaddr, * - to signal that a previously congested destination may have become * uncongested * - A notification has been queued to the socket (this can be a congestion - * update, or a RDMA completion). + * update, or a RDMA completion, or a MSG_ZEROCOPY completion). * * EPOLLOUT is asserted if there is room on the send queue. This does not mean * however, that the next sendmsg() call will succeed. If the application tries @@ -178,7 +179,8 @@ static __poll_t rds_poll(struct file *file, struct socket *sock, spin_unlock(&rs->rs_lock); } if (!list_empty(&rs->rs_recv_queue) || - !list_empty(&rs->rs_notify_queue)) + !list_empty(&rs->rs_notify_queue) || + !skb_queue_empty(&rs->rs_zcookie_queue)) mask |= (EPOLLIN | EPOLLRDNORM); if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) mask |= (EPOLLOUT | EPOLLWRNORM); @@ -513,6 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol) INIT_LIST_HEAD(&rs->rs_recv_queue); INIT_LIST_HEAD(&rs->rs_notify_queue); INIT_LIST_HEAD(&rs->rs_cong_list); + skb_queue_head_init(&rs->rs_zcookie_queue); spin_lock_init(&rs->rs_rdma_lock); rs->rs_rdma_keys = RB_ROOT; rs->rs_rx_traces = 0; diff --git a/net/rds/message.c b/net/rds/message.c index 651834513481..116cf87ccb89 100644 --- a/net/rds/message.c +++ b/net/rds/message.c @@ -58,32 +58,26 @@ EXPORT_SYMBOL_GPL(rds_message_addref); static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie) { - struct sock_exterr_skb *serr = SKB_EXT_ERR(skb); - int ncookies; - u32 *ptr; + struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb; + int ncookies = ck->num; - if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE) + if (ncookies == RDS_MAX_ZCOOKIES) return false; - ncookies = serr->ee.ee_data; - if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES) - return false; - ptr = skb_put(skb, sizeof(u32)); - *ptr = cookie; - serr->ee.ee_data = ++ncookies; + ck->cookies[ncookies] = cookie; + ck->num = ++ncookies; return true; } static void rds_rm_zerocopy_callback(struct rds_sock *rs, struct rds_znotifier *znotif) { - struct sock *sk = rds_rs_to_sk(rs); struct sk_buff *skb, *tail; - struct sock_exterr_skb *serr; unsigned long flags; struct sk_buff_head *q; u32 cookie = znotif->z_cookie; + struct rds_zcopy_cookies *ck; - q = &sk->sk_error_queue; + q = &rs->rs_zcookie_queue; spin_lock_irqsave(&q->lock, flags); tail = skb_peek_tail(q); @@ -91,22 +85,19 @@ static void rds_rm_zerocopy_callback(struct rds_sock *rs, spin_unlock_irqrestore(&q->lock, flags); mm_unaccount_pinned_pages(&znotif->z_mmp); consume_skb(rds_skb_from_znotifier(znotif)); - sk->sk_error_report(sk); + /* caller invokes rds_wake_sk_sleep() */ return; } skb = rds_skb_from_znotifier(znotif); - serr = SKB_EXT_ERR(skb); - memset(&serr->ee, 0, sizeof(serr->ee)); - serr->ee.ee_errno = 0; - serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE; - serr->ee.ee_info = 0; + ck = (struct rds_zcopy_cookies *)skb->cb; + memset(ck, 0, sizeof(*ck)); WARN_ON(!skb_zcookie_add(skb, cookie)); __skb_queue_tail(q, skb); spin_unlock_irqrestore(&q->lock, flags); - sk->sk_error_report(sk); + /* caller invokes rds_wake_sk_sleep() */ mm_unaccount_pinned_pages(&znotif->z_mmp); } @@ -129,6 +120,7 @@ static void rds_message_purge(struct rds_message *rm) if (rm->data.op_mmp_znotifier) { zcopy = true; rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier); + rds_wake_sk_sleep(rs); rm->data.op_mmp_znotifier = NULL; } sock_put(rds_rs_to_sk(rs)); @@ -362,10 +354,12 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from, int total_copied = 0; struct sk_buff *skb; - skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32), - GFP_KERNEL); + skb = alloc_skb(0, GFP_KERNEL); if (!skb) return -ENOMEM; + BUILD_BUG_ON(sizeof(skb->cb) < + max_t(int, sizeof(struct rds_znotifier), + sizeof(struct rds_zcopy_cookies))); rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb); if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp, length)) { diff --git a/net/rds/rds.h b/net/rds/rds.h index 31cd38852050..33b16353d8f3 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -603,6 +603,8 @@ struct rds_sock { /* Socket receive path trace points*/ u8 rs_rx_traces; u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX]; + + struct sk_buff_head rs_zcookie_queue; }; static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk) diff --git a/net/rds/recv.c b/net/rds/recv.c index b080961464df..d50747725221 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -577,6 +577,32 @@ out: return ret; } +static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg) +{ + struct sk_buff *skb; + struct sk_buff_head *q = &rs->rs_zcookie_queue; + struct rds_zcopy_cookies *done; + + if (!msg->msg_control) + return false; + + if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) || + msg->msg_controllen < CMSG_SPACE(sizeof(*done))) + return false; + + skb = skb_dequeue(q); + if (!skb) + return false; + done = (struct rds_zcopy_cookies *)skb->cb; + if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done), + done)) { + skb_queue_head(q, skb); + return false; + } + consume_skb(skb); + return true; +} + int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, int msg_flags) { @@ -611,7 +637,9 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, if (!rds_next_incoming(rs, &inc)) { if (nonblock) { - ret = -EAGAIN; + bool reaped = rds_recvmsg_zcookie(rs, msg); + + ret = reaped ? 0 : -EAGAIN; break; } @@ -660,6 +688,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, ret = -EFAULT; goto out; } + rds_recvmsg_zcookie(rs, msg); rds_stats_inc(s_recv_delivered); -- cgit v1.2.3 From 9426bbc6de99b8649d897b94e8f5916b58195643 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Tue, 6 Mar 2018 07:22:34 -0800 Subject: rds: use list structure to track information for zerocopy completion notification Commit 401910db4cd4 ("rds: deliver zerocopy completion notification with data") removes support fo r zerocopy completion notification on the sk_error_queue, thus we no longer need to track the cookie information in sk_buff structures. This commit removes the struct sk_buff_head rs_zcookie_queue by a simpler list that results in a smaller memory footprint as well as more efficient memory_allocation time. Signed-off-by: Sowmini Varadhan Acked-by: Willem de Bruijn Signed-off-by: David S. Miller --- net/rds/af_rds.c | 6 ++--- net/rds/message.c | 77 +++++++++++++++++++++++++++++++++++-------------------- net/rds/rds.h | 23 ++++++++++++----- net/rds/recv.c | 23 ++++++++++++----- 4 files changed, 85 insertions(+), 44 deletions(-) (limited to 'net/rds/recv.c') diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index f7126108a811..ab751a150f70 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -77,7 +77,7 @@ static int rds_release(struct socket *sock) rds_send_drop_to(rs, NULL); rds_rdma_drop_keys(rs); rds_notify_queue_get(rs, NULL); - __skb_queue_purge(&rs->rs_zcookie_queue); + rds_notify_msg_zcopy_purge(&rs->rs_zcookie_queue); spin_lock_bh(&rds_sock_lock); list_del_init(&rs->rs_item); @@ -180,7 +180,7 @@ static __poll_t rds_poll(struct file *file, struct socket *sock, } if (!list_empty(&rs->rs_recv_queue) || !list_empty(&rs->rs_notify_queue) || - !skb_queue_empty(&rs->rs_zcookie_queue)) + !list_empty(&rs->rs_zcookie_queue.zcookie_head)) mask |= (EPOLLIN | EPOLLRDNORM); if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) mask |= (EPOLLOUT | EPOLLWRNORM); @@ -515,7 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol) INIT_LIST_HEAD(&rs->rs_recv_queue); INIT_LIST_HEAD(&rs->rs_notify_queue); INIT_LIST_HEAD(&rs->rs_cong_list); - skb_queue_head_init(&rs->rs_zcookie_queue); + rds_message_zcopy_queue_init(&rs->rs_zcookie_queue); spin_lock_init(&rs->rs_rdma_lock); rs->rs_rdma_keys = RB_ROOT; rs->rs_rx_traces = 0; diff --git a/net/rds/message.c b/net/rds/message.c index c36edbb92c34..90dcdcfe9f62 100644 --- a/net/rds/message.c +++ b/net/rds/message.c @@ -48,7 +48,6 @@ static unsigned int rds_exthdr_size[__RDS_EXTHDR_MAX] = { [RDS_EXTHDR_GEN_NUM] = sizeof(u32), }; - void rds_message_addref(struct rds_message *rm) { rdsdebug("addref rm %p ref %d\n", rm, refcount_read(&rm->m_refcount)); @@ -56,9 +55,9 @@ void rds_message_addref(struct rds_message *rm) } EXPORT_SYMBOL_GPL(rds_message_addref); -static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie) +static inline bool rds_zcookie_add(struct rds_msg_zcopy_info *info, u32 cookie) { - struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb; + struct rds_zcopy_cookies *ck = &info->zcookies; int ncookies = ck->num; if (ncookies == RDS_MAX_ZCOOKIES) @@ -68,38 +67,61 @@ static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie) return true; } +struct rds_msg_zcopy_info *rds_info_from_znotifier(struct rds_znotifier *znotif) +{ + return container_of(znotif, struct rds_msg_zcopy_info, znotif); +} + +void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *q) +{ + unsigned long flags; + LIST_HEAD(copy); + struct rds_msg_zcopy_info *info, *tmp; + + spin_lock_irqsave(&q->lock, flags); + list_splice(&q->zcookie_head, ©); + INIT_LIST_HEAD(&q->zcookie_head); + spin_unlock_irqrestore(&q->lock, flags); + + list_for_each_entry_safe(info, tmp, ©, rs_zcookie_next) { + list_del(&info->rs_zcookie_next); + kfree(info); + } +} + static void rds_rm_zerocopy_callback(struct rds_sock *rs, struct rds_znotifier *znotif) { - struct sk_buff *skb, *tail; - unsigned long flags; - struct sk_buff_head *q; + struct rds_msg_zcopy_info *info; + struct rds_msg_zcopy_queue *q; u32 cookie = znotif->z_cookie; struct rds_zcopy_cookies *ck; + struct list_head *head; + unsigned long flags; + mm_unaccount_pinned_pages(&znotif->z_mmp); q = &rs->rs_zcookie_queue; spin_lock_irqsave(&q->lock, flags); - tail = skb_peek_tail(q); - - if (tail && skb_zcookie_add(tail, cookie)) { - spin_unlock_irqrestore(&q->lock, flags); - mm_unaccount_pinned_pages(&znotif->z_mmp); - consume_skb(rds_skb_from_znotifier(znotif)); - /* caller invokes rds_wake_sk_sleep() */ - return; + head = &q->zcookie_head; + if (!list_empty(head)) { + info = list_entry(head, struct rds_msg_zcopy_info, + rs_zcookie_next); + if (info && rds_zcookie_add(info, cookie)) { + spin_unlock_irqrestore(&q->lock, flags); + kfree(rds_info_from_znotifier(znotif)); + /* caller invokes rds_wake_sk_sleep() */ + return; + } } - skb = rds_skb_from_znotifier(znotif); - ck = (struct rds_zcopy_cookies *)skb->cb; + info = rds_info_from_znotifier(znotif); + ck = &info->zcookies; memset(ck, 0, sizeof(*ck)); - WARN_ON(!skb_zcookie_add(skb, cookie)); - - __skb_queue_tail(q, skb); + WARN_ON(!rds_zcookie_add(info, cookie)); + list_add_tail(&q->zcookie_head, &info->rs_zcookie_next); spin_unlock_irqrestore(&q->lock, flags); /* caller invokes rds_wake_sk_sleep() */ - - mm_unaccount_pinned_pages(&znotif->z_mmp); } /* @@ -340,7 +362,7 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from) int ret = 0; int length = iov_iter_count(from); int total_copied = 0; - struct sk_buff *skb; + struct rds_msg_zcopy_info *info; rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from)); @@ -350,12 +372,11 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from) sg = rm->data.op_sg; sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */ - skb = alloc_skb(0, GFP_KERNEL); - if (!skb) + info = kzalloc(sizeof(*info), GFP_KERNEL); + if (!info) return -ENOMEM; - BUILD_BUG_ON(sizeof(skb->cb) < max_t(int, sizeof(struct rds_znotifier), - sizeof(struct rds_zcopy_cookies))); - rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb); + INIT_LIST_HEAD(&info->rs_zcookie_next); + rm->data.op_mmp_znotifier = &info->znotif; if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp, length)) { ret = -ENOMEM; @@ -389,7 +410,7 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from) WARN_ON_ONCE(length != 0); return ret; err: - consume_skb(skb); + kfree(info); rm->data.op_mmp_znotifier = NULL; return ret; } diff --git a/net/rds/rds.h b/net/rds/rds.h index 33b16353d8f3..74cd27c661de 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -357,16 +357,27 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie) #define RDS_MSG_FLUSH 8 struct rds_znotifier { - struct list_head z_list; struct mmpin z_mmp; u32 z_cookie; }; -#define RDS_ZCOPY_SKB(__skb) ((struct rds_znotifier *)&((__skb)->cb[0])) +struct rds_msg_zcopy_info { + struct list_head rs_zcookie_next; + union { + struct rds_znotifier znotif; + struct rds_zcopy_cookies zcookies; + }; +}; -static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z) +struct rds_msg_zcopy_queue { + struct list_head zcookie_head; + spinlock_t lock; /* protects zcookie_head queue */ +}; + +static inline void rds_message_zcopy_queue_init(struct rds_msg_zcopy_queue *q) { - return container_of((void *)z, struct sk_buff, cb); + spin_lock_init(&q->lock); + INIT_LIST_HEAD(&q->zcookie_head); } struct rds_message { @@ -603,8 +614,7 @@ struct rds_sock { /* Socket receive path trace points*/ u8 rs_rx_traces; u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX]; - - struct sk_buff_head rs_zcookie_queue; + struct rds_msg_zcopy_queue rs_zcookie_queue; }; static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk) @@ -803,6 +813,7 @@ void rds_message_addref(struct rds_message *rm); void rds_message_put(struct rds_message *rm); void rds_message_wait(struct rds_message *rm); void rds_message_unmapped(struct rds_message *rm); +void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *info); static inline void rds_message_make_checksum(struct rds_header *hdr) { diff --git a/net/rds/recv.c b/net/rds/recv.c index d50747725221..de50e2126e40 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -579,9 +579,10 @@ out: static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg) { - struct sk_buff *skb; - struct sk_buff_head *q = &rs->rs_zcookie_queue; + struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue; + struct rds_msg_zcopy_info *info = NULL; struct rds_zcopy_cookies *done; + unsigned long flags; if (!msg->msg_control) return false; @@ -590,16 +591,24 @@ static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg) msg->msg_controllen < CMSG_SPACE(sizeof(*done))) return false; - skb = skb_dequeue(q); - if (!skb) + spin_lock_irqsave(&q->lock, flags); + if (!list_empty(&q->zcookie_head)) { + info = list_entry(q->zcookie_head.next, + struct rds_msg_zcopy_info, rs_zcookie_next); + list_del(&info->rs_zcookie_next); + } + spin_unlock_irqrestore(&q->lock, flags); + if (!info) return false; - done = (struct rds_zcopy_cookies *)skb->cb; + done = &info->zcookies; if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done), done)) { - skb_queue_head(q, skb); + spin_lock_irqsave(&q->lock, flags); + list_add(&info->rs_zcookie_next, &q->zcookie_head); + spin_unlock_irqrestore(&q->lock, flags); return false; } - consume_skb(skb); + kfree(info); return true; } -- cgit v1.2.3