summaryrefslogtreecommitdiff
path: root/net/kcm
diff options
context:
space:
mode:
Diffstat (limited to 'net/kcm')
-rw-r--r--net/kcm/Kconfig1
-rw-r--r--net/kcm/kcmproc.c58
-rw-r--r--net/kcm/kcmsock.c483
3 files changed, 135 insertions, 407 deletions
diff --git a/net/kcm/Kconfig b/net/kcm/Kconfig
index 5db94d940ecc..87fca36e6c47 100644
--- a/net/kcm/Kconfig
+++ b/net/kcm/Kconfig
@@ -3,6 +3,7 @@ config AF_KCM
tristate "KCM sockets"
depends on INET
select BPF_SYSCALL
+ select STREAM_PARSER
---help---
KCM (Kernel Connection Multiplexor) sockets provide a method
for multiplexing messages of a message based application
diff --git a/net/kcm/kcmproc.c b/net/kcm/kcmproc.c
index 16c2e03bd388..bf75c9231cca 100644
--- a/net/kcm/kcmproc.c
+++ b/net/kcm/kcmproc.c
@@ -155,8 +155,8 @@ static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq,
seq_printf(seq,
" psock-%-5u %-10llu %-16llu %-10llu %-16llu %-8d %-8d %-8d %-8d ",
psock->index,
- psock->stats.rx_msgs,
- psock->stats.rx_bytes,
+ psock->strp.stats.rx_msgs,
+ psock->strp.stats.rx_bytes,
psock->stats.tx_msgs,
psock->stats.tx_bytes,
psock->sk->sk_receive_queue.qlen,
@@ -170,14 +170,27 @@ static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq,
if (psock->tx_stopped)
seq_puts(seq, "TxStop ");
- if (psock->rx_stopped)
+ if (psock->strp.rx_stopped)
seq_puts(seq, "RxStop ");
if (psock->tx_kcm)
seq_printf(seq, "Rsvd-%d ", psock->tx_kcm->index);
- if (psock->ready_rx_msg)
- seq_puts(seq, "RdyRx ");
+ if (!psock->strp.rx_paused && !psock->ready_rx_msg) {
+ if (psock->sk->sk_receive_queue.qlen) {
+ if (psock->strp.rx_need_bytes)
+ seq_printf(seq, "RxWait=%u ",
+ psock->strp.rx_need_bytes);
+ else
+ seq_printf(seq, "RxWait ");
+ }
+ } else {
+ if (psock->strp.rx_paused)
+ seq_puts(seq, "RxPause ");
+
+ if (psock->ready_rx_msg)
+ seq_puts(seq, "RdyRx ");
+ }
seq_puts(seq, "\n");
}
@@ -275,6 +288,7 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
{
struct kcm_psock_stats psock_stats;
struct kcm_mux_stats mux_stats;
+ struct strp_aggr_stats strp_stats;
struct kcm_mux *mux;
struct kcm_psock *psock;
struct net *net = seq->private;
@@ -282,20 +296,28 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
memset(&mux_stats, 0, sizeof(mux_stats));
memset(&psock_stats, 0, sizeof(psock_stats));
+ memset(&strp_stats, 0, sizeof(strp_stats));
mutex_lock(&knet->mutex);
aggregate_mux_stats(&knet->aggregate_mux_stats, &mux_stats);
aggregate_psock_stats(&knet->aggregate_psock_stats,
&psock_stats);
+ aggregate_strp_stats(&knet->aggregate_strp_stats,
+ &strp_stats);
list_for_each_entry_rcu(mux, &knet->mux_list, kcm_mux_list) {
spin_lock_bh(&mux->lock);
aggregate_mux_stats(&mux->stats, &mux_stats);
aggregate_psock_stats(&mux->aggregate_psock_stats,
&psock_stats);
- list_for_each_entry(psock, &mux->psocks, psock_list)
+ aggregate_strp_stats(&mux->aggregate_strp_stats,
+ &strp_stats);
+ list_for_each_entry(psock, &mux->psocks, psock_list) {
aggregate_psock_stats(&psock->stats, &psock_stats);
+ save_strp_stats(&psock->strp, &strp_stats);
+ }
+
spin_unlock_bh(&mux->lock);
}
@@ -328,7 +350,7 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
mux_stats.rx_ready_drops);
seq_printf(seq,
- "%-8s %-10s %-16s %-10s %-16s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s\n",
+ "%-8s %-10s %-16s %-10s %-16s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s\n",
"Psock",
"RX-Msgs",
"RX-Bytes",
@@ -337,6 +359,8 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
"Reserved",
"Unreserved",
"RX-Aborts",
+ "RX-Intr",
+ "RX-Unrecov",
"RX-MemFail",
"RX-NeedMor",
"RX-BadLen",
@@ -345,20 +369,22 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
"TX-Aborts");
seq_printf(seq,
- "%-8s %-10llu %-16llu %-10llu %-16llu %-10llu %-10llu %-10u %-10u %-10u %-10u %-10u %-10u %-10u\n",
+ "%-8s %-10llu %-16llu %-10llu %-16llu %-10llu %-10llu %-10u %-10u %-10u %-10u %-10u %-10u %-10u %-10u %-10u\n",
"",
- psock_stats.rx_msgs,
- psock_stats.rx_bytes,
+ strp_stats.rx_msgs,
+ strp_stats.rx_bytes,
psock_stats.tx_msgs,
psock_stats.tx_bytes,
psock_stats.reserved,
psock_stats.unreserved,
- psock_stats.rx_aborts,
- psock_stats.rx_mem_fail,
- psock_stats.rx_need_more_hdr,
- psock_stats.rx_bad_hdr_len,
- psock_stats.rx_msg_too_big,
- psock_stats.rx_msg_timeouts,
+ strp_stats.rx_aborts,
+ strp_stats.rx_interrupted,
+ strp_stats.rx_unrecov_intr,
+ strp_stats.rx_mem_fail,
+ strp_stats.rx_need_more_hdr,
+ strp_stats.rx_bad_hdr_len,
+ strp_stats.rx_msg_too_big,
+ strp_stats.rx_msg_timeouts,
psock_stats.tx_aborts);
return 0;
diff --git a/net/kcm/kcmsock.c b/net/kcm/kcmsock.c
index 8a720ba50061..7e08a4d3d77d 100644
--- a/net/kcm/kcmsock.c
+++ b/net/kcm/kcmsock.c
@@ -1,3 +1,13 @@
+/*
+ * Kernel Connection Multiplexor
+ *
+ * Copyright (c) 2016 Tom Herbert <tom@herbertland.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ */
+
#include <linux/bpf.h>
#include <linux/errno.h>
#include <linux/errqueue.h>
@@ -17,7 +27,6 @@
#include <net/kcm.h>
#include <net/netns/generic.h>
#include <net/sock.h>
-#include <net/tcp.h>
#include <uapi/linux/kcm.h>
unsigned int kcm_net_id;
@@ -36,38 +45,12 @@ static inline struct kcm_tx_msg *kcm_tx_msg(struct sk_buff *skb)
return (struct kcm_tx_msg *)skb->cb;
}
-static inline struct kcm_rx_msg *kcm_rx_msg(struct sk_buff *skb)
-{
- return (struct kcm_rx_msg *)((void *)skb->cb +
- offsetof(struct qdisc_skb_cb, data));
-}
-
static void report_csk_error(struct sock *csk, int err)
{
csk->sk_err = EPIPE;
csk->sk_error_report(csk);
}
-/* Callback lock held */
-static void kcm_abort_rx_psock(struct kcm_psock *psock, int err,
- struct sk_buff *skb)
-{
- struct sock *csk = psock->sk;
-
- /* Unrecoverable error in receive */
-
- del_timer(&psock->rx_msg_timer);
-
- if (psock->rx_stopped)
- return;
-
- psock->rx_stopped = 1;
- KCM_STATS_INCR(psock->stats.rx_aborts);
-
- /* Report an error on the lower socket */
- report_csk_error(csk, err);
-}
-
static void kcm_abort_tx_psock(struct kcm_psock *psock, int err,
bool wakeup_kcm)
{
@@ -110,12 +93,13 @@ static void kcm_abort_tx_psock(struct kcm_psock *psock, int err,
static void kcm_update_rx_mux_stats(struct kcm_mux *mux,
struct kcm_psock *psock)
{
- KCM_STATS_ADD(mux->stats.rx_bytes,
- psock->stats.rx_bytes - psock->saved_rx_bytes);
+ STRP_STATS_ADD(mux->stats.rx_bytes,
+ psock->strp.stats.rx_bytes -
+ psock->saved_rx_bytes);
mux->stats.rx_msgs +=
- psock->stats.rx_msgs - psock->saved_rx_msgs;
- psock->saved_rx_msgs = psock->stats.rx_msgs;
- psock->saved_rx_bytes = psock->stats.rx_bytes;
+ psock->strp.stats.rx_msgs - psock->saved_rx_msgs;
+ psock->saved_rx_msgs = psock->strp.stats.rx_msgs;
+ psock->saved_rx_bytes = psock->strp.stats.rx_bytes;
}
static void kcm_update_tx_mux_stats(struct kcm_mux *mux,
@@ -168,11 +152,11 @@ static void kcm_rcv_ready(struct kcm_sock *kcm)
*/
list_del(&psock->psock_ready_list);
psock->ready_rx_msg = NULL;
-
/* Commit clearing of ready_rx_msg for queuing work */
smp_mb();
- queue_work(kcm_wq, &psock->rx_work);
+ strp_unpause(&psock->strp);
+ strp_check_rcv(&psock->strp);
}
/* Buffer limit is okay now, add to ready list */
@@ -286,6 +270,7 @@ static struct kcm_sock *reserve_rx_kcm(struct kcm_psock *psock,
if (list_empty(&mux->kcm_rx_waiters)) {
psock->ready_rx_msg = head;
+ strp_pause(&psock->strp);
list_add_tail(&psock->psock_ready_list,
&mux->psocks_ready);
spin_unlock_bh(&mux->rx_lock);
@@ -354,346 +339,60 @@ static void unreserve_rx_kcm(struct kcm_psock *psock,
spin_unlock_bh(&mux->rx_lock);
}
-static void kcm_start_rx_timer(struct kcm_psock *psock)
-{
- if (psock->sk->sk_rcvtimeo)
- mod_timer(&psock->rx_msg_timer, psock->sk->sk_rcvtimeo);
-}
-
-/* Macro to invoke filter function. */
-#define KCM_RUN_FILTER(prog, ctx) \
- (*prog->bpf_func)(ctx, prog->insnsi)
-
-/* Lower socket lock held */
-static int kcm_tcp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
- unsigned int orig_offset, size_t orig_len)
-{
- struct kcm_psock *psock = (struct kcm_psock *)desc->arg.data;
- struct kcm_rx_msg *rxm;
- struct kcm_sock *kcm;
- struct sk_buff *head, *skb;
- size_t eaten = 0, cand_len;
- ssize_t extra;
- int err;
- bool cloned_orig = false;
-
- if (psock->ready_rx_msg)
- return 0;
-
- head = psock->rx_skb_head;
- if (head) {
- /* Message already in progress */
-
- rxm = kcm_rx_msg(head);
- if (unlikely(rxm->early_eaten)) {
- /* Already some number of bytes on the receive sock
- * data saved in rx_skb_head, just indicate they
- * are consumed.
- */
- eaten = orig_len <= rxm->early_eaten ?
- orig_len : rxm->early_eaten;
- rxm->early_eaten -= eaten;
-
- return eaten;
- }
-
- if (unlikely(orig_offset)) {
- /* Getting data with a non-zero offset when a message is
- * in progress is not expected. If it does happen, we
- * need to clone and pull since we can't deal with
- * offsets in the skbs for a message expect in the head.
- */
- orig_skb = skb_clone(orig_skb, GFP_ATOMIC);
- if (!orig_skb) {
- KCM_STATS_INCR(psock->stats.rx_mem_fail);
- desc->error = -ENOMEM;
- return 0;
- }
- if (!pskb_pull(orig_skb, orig_offset)) {
- KCM_STATS_INCR(psock->stats.rx_mem_fail);
- kfree_skb(orig_skb);
- desc->error = -ENOMEM;
- return 0;
- }
- cloned_orig = true;
- orig_offset = 0;
- }
-
- if (!psock->rx_skb_nextp) {
- /* We are going to append to the frags_list of head.
- * Need to unshare the frag_list.
- */
- err = skb_unclone(head, GFP_ATOMIC);
- if (err) {
- KCM_STATS_INCR(psock->stats.rx_mem_fail);
- desc->error = err;
- return 0;
- }
-
- if (unlikely(skb_shinfo(head)->frag_list)) {
- /* We can't append to an sk_buff that already
- * has a frag_list. We create a new head, point
- * the frag_list of that to the old head, and
- * then are able to use the old head->next for
- * appending to the message.
- */
- if (WARN_ON(head->next)) {
- desc->error = -EINVAL;
- return 0;
- }
-
- skb = alloc_skb(0, GFP_ATOMIC);
- if (!skb) {
- KCM_STATS_INCR(psock->stats.rx_mem_fail);
- desc->error = -ENOMEM;
- return 0;
- }
- skb->len = head->len;
- skb->data_len = head->len;
- skb->truesize = head->truesize;
- *kcm_rx_msg(skb) = *kcm_rx_msg(head);
- psock->rx_skb_nextp = &head->next;
- skb_shinfo(skb)->frag_list = head;
- psock->rx_skb_head = skb;
- head = skb;
- } else {
- psock->rx_skb_nextp =
- &skb_shinfo(head)->frag_list;
- }
- }
- }
-
- while (eaten < orig_len) {
- /* Always clone since we will consume something */
- skb = skb_clone(orig_skb, GFP_ATOMIC);
- if (!skb) {
- KCM_STATS_INCR(psock->stats.rx_mem_fail);
- desc->error = -ENOMEM;
- break;
- }
-
- cand_len = orig_len - eaten;
-
- head = psock->rx_skb_head;
- if (!head) {
- head = skb;
- psock->rx_skb_head = head;
- /* Will set rx_skb_nextp on next packet if needed */
- psock->rx_skb_nextp = NULL;
- rxm = kcm_rx_msg(head);
- memset(rxm, 0, sizeof(*rxm));
- rxm->offset = orig_offset + eaten;
- } else {
- /* Unclone since we may be appending to an skb that we
- * already share a frag_list with.
- */
- err = skb_unclone(skb, GFP_ATOMIC);
- if (err) {
- KCM_STATS_INCR(psock->stats.rx_mem_fail);
- desc->error = err;
- break;
- }
-
- rxm = kcm_rx_msg(head);
- *psock->rx_skb_nextp = skb;
- psock->rx_skb_nextp = &skb->next;
- head->data_len += skb->len;
- head->len += skb->len;
- head->truesize += skb->truesize;
- }
-
- if (!rxm->full_len) {
- ssize_t len;
-
- len = KCM_RUN_FILTER(psock->bpf_prog, head);
-
- if (!len) {
- /* Need more header to determine length */
- if (!rxm->accum_len) {
- /* Start RX timer for new message */
- kcm_start_rx_timer(psock);
- }
- rxm->accum_len += cand_len;
- eaten += cand_len;
- KCM_STATS_INCR(psock->stats.rx_need_more_hdr);
- WARN_ON(eaten != orig_len);
- break;
- } else if (len > psock->sk->sk_rcvbuf) {
- /* Message length exceeds maximum allowed */
- KCM_STATS_INCR(psock->stats.rx_msg_too_big);
- desc->error = -EMSGSIZE;
- psock->rx_skb_head = NULL;
- kcm_abort_rx_psock(psock, EMSGSIZE, head);
- break;
- } else if (len <= (ssize_t)head->len -
- skb->len - rxm->offset) {
- /* Length must be into new skb (and also
- * greater than zero)
- */
- KCM_STATS_INCR(psock->stats.rx_bad_hdr_len);
- desc->error = -EPROTO;
- psock->rx_skb_head = NULL;
- kcm_abort_rx_psock(psock, EPROTO, head);
- break;
- }
-
- rxm->full_len = len;
- }
-
- extra = (ssize_t)(rxm->accum_len + cand_len) - rxm->full_len;
-
- if (extra < 0) {
- /* Message not complete yet. */
- if (rxm->full_len - rxm->accum_len >
- tcp_inq(psock->sk)) {
- /* Don't have the whole messages in the socket
- * buffer. Set psock->rx_need_bytes to wait for
- * the rest of the message. Also, set "early
- * eaten" since we've already buffered the skb
- * but don't consume yet per tcp_read_sock.
- */
-
- if (!rxm->accum_len) {
- /* Start RX timer for new message */
- kcm_start_rx_timer(psock);
- }
-
- psock->rx_need_bytes = rxm->full_len -
- rxm->accum_len;
- rxm->accum_len += cand_len;
- rxm->early_eaten = cand_len;
- KCM_STATS_ADD(psock->stats.rx_bytes, cand_len);
- desc->count = 0; /* Stop reading socket */
- break;
- }
- rxm->accum_len += cand_len;
- eaten += cand_len;
- WARN_ON(eaten != orig_len);
- break;
- }
-
- /* Positive extra indicates ore bytes than needed for the
- * message
- */
-
- WARN_ON(extra > cand_len);
-
- eaten += (cand_len - extra);
-
- /* Hurray, we have a new message! */
- del_timer(&psock->rx_msg_timer);
- psock->rx_skb_head = NULL;
- KCM_STATS_INCR(psock->stats.rx_msgs);
-
-try_queue:
- kcm = reserve_rx_kcm(psock, head);
- if (!kcm) {
- /* Unable to reserve a KCM, message is held in psock. */
- break;
- }
-
- if (kcm_queue_rcv_skb(&kcm->sk, head)) {
- /* Should mean socket buffer full */
- unreserve_rx_kcm(psock, false);
- goto try_queue;
- }
- }
-
- if (cloned_orig)
- kfree_skb(orig_skb);
-
- KCM_STATS_ADD(psock->stats.rx_bytes, eaten);
-
- return eaten;
-}
-
-/* Called with lock held on lower socket */
-static int psock_tcp_read_sock(struct kcm_psock *psock)
-{
- read_descriptor_t desc;
-
- desc.arg.data = psock;
- desc.error = 0;
- desc.count = 1; /* give more than one skb per call */
-
- /* sk should be locked here, so okay to do tcp_read_sock */
- tcp_read_sock(psock->sk, &desc, kcm_tcp_recv);
-
- unreserve_rx_kcm(psock, true);
-
- return desc.error;
-}
-
/* Lower sock lock held */
-static void psock_tcp_data_ready(struct sock *sk)
+static void psock_data_ready(struct sock *sk)
{
struct kcm_psock *psock;
read_lock_bh(&sk->sk_callback_lock);
psock = (struct kcm_psock *)sk->sk_user_data;
- if (unlikely(!psock || psock->rx_stopped))
- goto out;
+ if (likely(psock))
+ strp_data_ready(&psock->strp);
- if (psock->ready_rx_msg)
- goto out;
-
- if (psock->rx_need_bytes) {
- if (tcp_inq(sk) >= psock->rx_need_bytes)
- psock->rx_need_bytes = 0;
- else
- goto out;
- }
-
- if (psock_tcp_read_sock(psock) == -ENOMEM)
- queue_delayed_work(kcm_wq, &psock->rx_delayed_work, 0);
-
-out:
read_unlock_bh(&sk->sk_callback_lock);
}
-static void do_psock_rx_work(struct kcm_psock *psock)
+/* Called with lower sock held */
+static void kcm_rcv_strparser(struct strparser *strp, struct sk_buff *skb)
{
- read_descriptor_t rd_desc;
- struct sock *csk = psock->sk;
-
- /* We need the read lock to synchronize with psock_tcp_data_ready. We
- * need the socket lock for calling tcp_read_sock.
- */
- lock_sock(csk);
- read_lock_bh(&csk->sk_callback_lock);
-
- if (unlikely(csk->sk_user_data != psock))
- goto out;
-
- if (unlikely(psock->rx_stopped))
- goto out;
-
- if (psock->ready_rx_msg)
- goto out;
-
- rd_desc.arg.data = psock;
+ struct kcm_psock *psock = container_of(strp, struct kcm_psock, strp);
+ struct kcm_sock *kcm;
- if (psock_tcp_read_sock(psock) == -ENOMEM)
- queue_delayed_work(kcm_wq, &psock->rx_delayed_work, 0);
+try_queue:
+ kcm = reserve_rx_kcm(psock, skb);
+ if (!kcm) {
+ /* Unable to reserve a KCM, message is held in psock and strp
+ * is paused.
+ */
+ return;
+ }
-out:
- read_unlock_bh(&csk->sk_callback_lock);
- release_sock(csk);
+ if (kcm_queue_rcv_skb(&kcm->sk, skb)) {
+ /* Should mean socket buffer full */
+ unreserve_rx_kcm(psock, false);
+ goto try_queue;
+ }
}
-static void psock_rx_work(struct work_struct *w)
+static int kcm_parse_func_strparser(struct strparser *strp, struct sk_buff *skb)
{
- do_psock_rx_work(container_of(w, struct kcm_psock, rx_work));
+ struct kcm_psock *psock = container_of(strp, struct kcm_psock, strp);
+ struct bpf_prog *prog = psock->bpf_prog;
+
+ return (*prog->bpf_func)(skb, prog->insnsi);
}
-static void psock_rx_delayed_work(struct work_struct *w)
+static int kcm_read_sock_done(struct strparser *strp, int err)
{
- do_psock_rx_work(container_of(w, struct kcm_psock,
- rx_delayed_work.work));
+ struct kcm_psock *psock = container_of(strp, struct kcm_psock, strp);
+
+ unreserve_rx_kcm(psock, true);
+
+ return err;
}
-static void psock_tcp_state_change(struct sock *sk)
+static void psock_state_change(struct sock *sk)
{
/* TCP only does a POLLIN for a half close. Do a POLLHUP here
* since application will normally not poll with POLLIN
@@ -703,7 +402,7 @@ static void psock_tcp_state_change(struct sock *sk)
report_csk_error(sk, EPIPE);
}
-static void psock_tcp_write_space(struct sock *sk)
+static void psock_write_space(struct sock *sk)
{
struct kcm_psock *psock;
struct kcm_mux *mux;
@@ -714,14 +413,13 @@ static void psock_tcp_write_space(struct sock *sk)
psock = (struct kcm_psock *)sk->sk_user_data;
if (unlikely(!psock))
goto out;
-
mux = psock->mux;
spin_lock_bh(&mux->lock);
/* Check if the socket is reserved so someone is waiting for sending. */
kcm = psock->tx_kcm;
- if (kcm)
+ if (kcm && !unlikely(kcm->tx_stopped))
queue_work(kcm_wq, &kcm->tx_work);
spin_unlock_bh(&mux->lock);
@@ -1412,7 +1110,7 @@ static int kcm_recvmsg(struct socket *sock, struct msghdr *msg,
struct kcm_sock *kcm = kcm_sk(sk);
int err = 0;
long timeo;
- struct kcm_rx_msg *rxm;
+ struct strp_rx_msg *rxm;
int copied = 0;
struct sk_buff *skb;
@@ -1426,7 +1124,7 @@ static int kcm_recvmsg(struct socket *sock, struct msghdr *msg,
/* Okay, have a message on the receive queue */
- rxm = kcm_rx_msg(skb);
+ rxm = strp_rx_msg(skb);
if (len > rxm->full_len)
len = rxm->full_len;
@@ -1469,7 +1167,7 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
struct sock *sk = sock->sk;
struct kcm_sock *kcm = kcm_sk(sk);
long timeo;
- struct kcm_rx_msg *rxm;
+ struct strp_rx_msg *rxm;
int err = 0;
ssize_t copied;
struct sk_buff *skb;
@@ -1486,7 +1184,7 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
/* Okay, have a message on the receive queue */
- rxm = kcm_rx_msg(skb);
+ rxm = strp_rx_msg(skb);
if (len > rxm->full_len)
len = rxm->full_len;
@@ -1661,15 +1359,6 @@ static void init_kcm_sock(struct kcm_sock *kcm, struct kcm_mux *mux)
spin_unlock_bh(&mux->rx_lock);
}
-static void kcm_rx_msg_timeout(unsigned long arg)
-{
- struct kcm_psock *psock = (struct kcm_psock *)arg;
-
- /* Message assembly timed out */
- KCM_STATS_INCR(psock->stats.rx_msg_timeouts);
- kcm_abort_rx_psock(psock, ETIMEDOUT, NULL);
-}
-
static int kcm_attach(struct socket *sock, struct socket *csock,
struct bpf_prog *prog)
{
@@ -1679,19 +1368,13 @@ static int kcm_attach(struct socket *sock, struct socket *csock,
struct kcm_psock *psock = NULL, *tpsock;
struct list_head *head;
int index = 0;
-
- if (csock->ops->family != PF_INET &&
- csock->ops->family != PF_INET6)
- return -EINVAL;
+ struct strp_callbacks cb;
+ int err;
csk = csock->sk;
if (!csk)
return -EINVAL;
- /* Only support TCP for now */
- if (csk->sk_protocol != IPPROTO_TCP)
- return -EINVAL;
-
psock = kmem_cache_zalloc(kcm_psockp, GFP_KERNEL);
if (!psock)
return -ENOMEM;
@@ -1700,11 +1383,16 @@ static int kcm_attach(struct socket *sock, struct socket *csock,
psock->sk = csk;
psock->bpf_prog = prog;
- setup_timer(&psock->rx_msg_timer, kcm_rx_msg_timeout,
- (unsigned long)psock);
+ cb.rcv_msg = kcm_rcv_strparser;
+ cb.abort_parser = NULL;
+ cb.parse_msg = kcm_parse_func_strparser;
+ cb.read_sock_done = kcm_read_sock_done;
- INIT_WORK(&psock->rx_work, psock_rx_work);
- INIT_DELAYED_WORK(&psock->rx_delayed_work, psock_rx_delayed_work);
+ err = strp_init(&psock->strp, csk, &cb);
+ if (err) {
+ kmem_cache_free(kcm_psockp, psock);
+ return err;
+ }
sock_hold(csk);
@@ -1713,9 +1401,9 @@ static int kcm_attach(struct socket *sock, struct socket *csock,
psock->save_write_space = csk->sk_write_space;
psock->save_state_change = csk->sk_state_change;
csk->sk_user_data = psock;
- csk->sk_data_ready = psock_tcp_data_ready;
- csk->sk_write_space = psock_tcp_write_space;
- csk->sk_state_change = psock_tcp_state_change;
+ csk->sk_data_ready = psock_data_ready;
+ csk->sk_write_space = psock_write_space;
+ csk->sk_state_change = psock_state_change;
write_unlock_bh(&csk->sk_callback_lock);
/* Finished initialization, now add the psock to the MUX. */
@@ -1737,7 +1425,7 @@ static int kcm_attach(struct socket *sock, struct socket *csock,
spin_unlock_bh(&mux->lock);
/* Schedule RX work in case there are already bytes queued */
- queue_work(kcm_wq, &psock->rx_work);
+ strp_check_rcv(&psock->strp);
return 0;
}
@@ -1777,6 +1465,8 @@ static void kcm_unattach(struct kcm_psock *psock)
struct sock *csk = psock->sk;
struct kcm_mux *mux = psock->mux;
+ lock_sock(csk);
+
/* Stop getting callbacks from TCP socket. After this there should
* be no way to reserve a kcm for this psock.
*/
@@ -1785,7 +1475,7 @@ static void kcm_unattach(struct kcm_psock *psock)
csk->sk_data_ready = psock->save_data_ready;
csk->sk_write_space = psock->save_write_space;
csk->sk_state_change = psock->save_state_change;
- psock->rx_stopped = 1;
+ strp_stop(&psock->strp);
if (WARN_ON(psock->rx_kcm)) {
write_unlock_bh(&csk->sk_callback_lock);
@@ -1808,18 +1498,17 @@ static void kcm_unattach(struct kcm_psock *psock)
write_unlock_bh(&csk->sk_callback_lock);
- del_timer_sync(&psock->rx_msg_timer);
- cancel_work_sync(&psock->rx_work);
- cancel_delayed_work_sync(&psock->rx_delayed_work);
+ /* Call strp_done without sock lock */
+ release_sock(csk);
+ strp_done(&psock->strp);
+ lock_sock(csk);
bpf_prog_put(psock->bpf_prog);
- kfree_skb(psock->rx_skb_head);
- psock->rx_skb_head = NULL;
-
spin_lock_bh(&mux->lock);
aggregate_psock_stats(&psock->stats, &mux->aggregate_psock_stats);
+ save_strp_stats(&psock->strp, &mux->aggregate_strp_stats);
KCM_STATS_INCR(mux->stats.psock_unattach);
@@ -1862,6 +1551,8 @@ no_reserved:
fput(csk->sk_socket->file);
kmem_cache_free(kcm_psockp, psock);
}
+
+ release_sock(csk);
}
static int kcm_unattach_ioctl(struct socket *sock, struct kcm_unattach *info)
@@ -1902,6 +1593,7 @@ static int kcm_unattach_ioctl(struct socket *sock, struct kcm_unattach *info)
spin_unlock_bh(&mux->lock);
+ /* Lower socket lock should already be held */
kcm_unattach(psock);
err = 0;
@@ -2059,6 +1751,8 @@ static void release_mux(struct kcm_mux *mux)
aggregate_mux_stats(&mux->stats, &knet->aggregate_mux_stats);
aggregate_psock_stats(&mux->aggregate_psock_stats,
&knet->aggregate_psock_stats);
+ aggregate_strp_stats(&mux->aggregate_strp_stats,
+ &knet->aggregate_strp_stats);
list_del_rcu(&mux->kcm_mux_list);
knet->count--;
mutex_unlock(&knet->mutex);
@@ -2138,6 +1832,13 @@ static int kcm_release(struct socket *sock)
* it will just return.
*/
__skb_queue_purge(&sk->sk_write_queue);
+
+ /* Set tx_stopped. This is checked when psock is bound to a kcm and we
+ * get a writespace callback. This prevents further work being queued
+ * from the callback (unbinding the psock occurs after canceling work.
+ */
+ kcm->tx_stopped = 1;
+
release_sock(sk);
spin_lock_bh(&mux->lock);