summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/linux/udp.h9
-rw-r--r--include/net/udp.h11
-rw-r--r--net/ipv4/udp.c117
-rw-r--r--net/ipv6/udp.c5
4 files changed, 91 insertions, 51 deletions
diff --git a/include/linux/udp.h b/include/linux/udp.h
index e554890c4415..58795688a186 100644
--- a/include/linux/udp.h
+++ b/include/linux/udp.h
@@ -44,6 +44,12 @@ enum {
UDP_FLAGS_UDPLITE_RECV_CC, /* set via udplite setsockopt */
};
+/* per NUMA structure for lockless producer usage. */
+struct udp_prod_queue {
+ struct llist_head ll_root ____cacheline_aligned_in_smp;
+ atomic_t rmem_alloc;
+};
+
struct udp_sock {
/* inet_sock has to be the first member */
struct inet_sock inet;
@@ -90,6 +96,8 @@ struct udp_sock {
struct sk_buff *skb,
int nhoff);
+ struct udp_prod_queue *udp_prod_queue;
+
/* udp_recvmsg try to use this before splicing sk_receive_queue */
struct sk_buff_head reader_queue ____cacheline_aligned_in_smp;
@@ -109,7 +117,6 @@ struct udp_sock {
*/
struct hlist_node tunnel_list;
struct numa_drop_counters drop_counters;
- spinlock_t busylock ____cacheline_aligned_in_smp;
};
#define udp_test_bit(nr, sk) \
diff --git a/include/net/udp.h b/include/net/udp.h
index 059a0cee5f55..cffedb3e40f2 100644
--- a/include/net/udp.h
+++ b/include/net/udp.h
@@ -284,16 +284,23 @@ INDIRECT_CALLABLE_DECLARE(int udpv6_rcv(struct sk_buff *));
struct sk_buff *__udp_gso_segment(struct sk_buff *gso_skb,
netdev_features_t features, bool is_ipv6);
-static inline void udp_lib_init_sock(struct sock *sk)
+static inline int udp_lib_init_sock(struct sock *sk)
{
struct udp_sock *up = udp_sk(sk);
sk->sk_drop_counters = &up->drop_counters;
- spin_lock_init(&up->busylock);
skb_queue_head_init(&up->reader_queue);
INIT_HLIST_NODE(&up->tunnel_list);
up->forward_threshold = sk->sk_rcvbuf >> 2;
set_bit(SOCK_CUSTOM_SOCKOPT, &sk->sk_socket->flags);
+
+ up->udp_prod_queue = kcalloc(nr_node_ids, sizeof(*up->udp_prod_queue),
+ GFP_KERNEL);
+ if (!up->udp_prod_queue)
+ return -ENOMEM;
+ for (int i = 0; i < nr_node_ids; i++)
+ init_llist_head(&up->udp_prod_queue[i].ll_root);
+ return 0;
}
static inline void udp_drops_inc(struct sock *sk)
diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c
index 85cfc32eb2cc..95241093b7f0 100644
--- a/net/ipv4/udp.c
+++ b/net/ipv4/udp.c
@@ -1685,25 +1685,6 @@ static void udp_skb_dtor_locked(struct sock *sk, struct sk_buff *skb)
udp_rmem_release(sk, udp_skb_truesize(skb), 1, true);
}
-/* Idea of busylocks is to let producers grab an extra spinlock
- * to relieve pressure on the receive_queue spinlock shared by consumer.
- * Under flood, this means that only one producer can be in line
- * trying to acquire the receive_queue spinlock.
- */
-static spinlock_t *busylock_acquire(struct sock *sk)
-{
- spinlock_t *busy = &udp_sk(sk)->busylock;
-
- spin_lock(busy);
- return busy;
-}
-
-static void busylock_release(spinlock_t *busy)
-{
- if (busy)
- spin_unlock(busy);
-}
-
static int udp_rmem_schedule(struct sock *sk, int size)
{
int delta;
@@ -1718,14 +1699,24 @@ static int udp_rmem_schedule(struct sock *sk, int size)
int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb)
{
struct sk_buff_head *list = &sk->sk_receive_queue;
+ struct udp_prod_queue *udp_prod_queue;
+ struct sk_buff *next, *to_drop = NULL;
+ struct llist_node *ll_list;
unsigned int rmem, rcvbuf;
- spinlock_t *busy = NULL;
int size, err = -ENOMEM;
+ int total_size = 0;
+ int q_size = 0;
+ int dropcount;
+ int nb = 0;
rmem = atomic_read(&sk->sk_rmem_alloc);
rcvbuf = READ_ONCE(sk->sk_rcvbuf);
size = skb->truesize;
+ udp_prod_queue = &udp_sk(sk)->udp_prod_queue[numa_node_id()];
+
+ rmem += atomic_read(&udp_prod_queue->rmem_alloc);
+
/* Immediately drop when the receive queue is full.
* Cast to unsigned int performs the boundary check for INT_MAX.
*/
@@ -1747,45 +1738,77 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb)
if (rmem > (rcvbuf >> 1)) {
skb_condense(skb);
size = skb->truesize;
- rmem = atomic_add_return(size, &sk->sk_rmem_alloc);
- if (rmem > rcvbuf)
- goto uncharge_drop;
- busy = busylock_acquire(sk);
- } else {
- atomic_add(size, &sk->sk_rmem_alloc);
}
udp_set_dev_scratch(skb);
+ atomic_add(size, &udp_prod_queue->rmem_alloc);
+
+ if (!llist_add(&skb->ll_node, &udp_prod_queue->ll_root))
+ return 0;
+
+ dropcount = sock_flag(sk, SOCK_RXQ_OVFL) ? sk_drops_read(sk) : 0;
+
spin_lock(&list->lock);
- err = udp_rmem_schedule(sk, size);
- if (err) {
- spin_unlock(&list->lock);
- goto uncharge_drop;
- }
- sk_forward_alloc_add(sk, -size);
+ ll_list = llist_del_all(&udp_prod_queue->ll_root);
- /* no need to setup a destructor, we will explicitly release the
- * forward allocated memory on dequeue
- */
- sock_skb_set_dropcount(sk, skb);
+ ll_list = llist_reverse_order(ll_list);
+
+ llist_for_each_entry_safe(skb, next, ll_list, ll_node) {
+ size = udp_skb_truesize(skb);
+ total_size += size;
+ err = udp_rmem_schedule(sk, size);
+ if (unlikely(err)) {
+ /* Free the skbs outside of locked section. */
+ skb->next = to_drop;
+ to_drop = skb;
+ continue;
+ }
+
+ q_size += size;
+ sk_forward_alloc_add(sk, -size);
+
+ /* no need to setup a destructor, we will explicitly release the
+ * forward allocated memory on dequeue
+ */
+ SOCK_SKB_CB(skb)->dropcount = dropcount;
+ nb++;
+ __skb_queue_tail(list, skb);
+ }
+
+ atomic_add(q_size, &sk->sk_rmem_alloc);
- __skb_queue_tail(list, skb);
spin_unlock(&list->lock);
- if (!sock_flag(sk, SOCK_DEAD))
- INDIRECT_CALL_1(sk->sk_data_ready, sock_def_readable, sk);
+ if (!sock_flag(sk, SOCK_DEAD)) {
+ /* Multiple threads might be blocked in recvmsg(),
+ * using prepare_to_wait_exclusive().
+ */
+ while (nb) {
+ INDIRECT_CALL_1(sk->sk_data_ready,
+ sock_def_readable, sk);
+ nb--;
+ }
+ }
+
+ if (unlikely(to_drop)) {
+ for (nb = 0; to_drop != NULL; nb++) {
+ skb = to_drop;
+ to_drop = skb->next;
+ skb_mark_not_on_list(skb);
+ /* TODO: update SNMP values. */
+ sk_skb_reason_drop(sk, skb, SKB_DROP_REASON_PROTO_MEM);
+ }
+ numa_drop_add(&udp_sk(sk)->drop_counters, nb);
+ }
- busylock_release(busy);
- return 0;
+ atomic_sub(total_size, &udp_prod_queue->rmem_alloc);
-uncharge_drop:
- atomic_sub(skb->truesize, &sk->sk_rmem_alloc);
+ return 0;
drop:
udp_drops_inc(sk);
- busylock_release(busy);
return err;
}
EXPORT_IPV6_MOD_GPL(__udp_enqueue_schedule_skb);
@@ -1803,6 +1826,7 @@ void udp_destruct_common(struct sock *sk)
kfree_skb(skb);
}
udp_rmem_release(sk, total, 0, true);
+ kfree(up->udp_prod_queue);
}
EXPORT_IPV6_MOD_GPL(udp_destruct_common);
@@ -1814,10 +1838,11 @@ static void udp_destruct_sock(struct sock *sk)
int udp_init_sock(struct sock *sk)
{
- udp_lib_init_sock(sk);
+ int res = udp_lib_init_sock(sk);
+
sk->sk_destruct = udp_destruct_sock;
set_bit(SOCK_SUPPORT_ZC, &sk->sk_socket->flags);
- return 0;
+ return res;
}
void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len)
diff --git a/net/ipv6/udp.c b/net/ipv6/udp.c
index 9f4d340d1e3a..813a2ba75824 100644
--- a/net/ipv6/udp.c
+++ b/net/ipv6/udp.c
@@ -67,10 +67,11 @@ static void udpv6_destruct_sock(struct sock *sk)
int udpv6_init_sock(struct sock *sk)
{
- udp_lib_init_sock(sk);
+ int res = udp_lib_init_sock(sk);
+
sk->sk_destruct = udpv6_destruct_sock;
set_bit(SOCK_SUPPORT_ZC, &sk->sk_socket->flags);
- return 0;
+ return res;
}
INDIRECT_CALLABLE_SCOPE