summaryrefslogtreecommitdiff
path: root/net/rxrpc/input.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rxrpc/input.c')
-rw-r--r--net/rxrpc/input.c213
1 files changed, 131 insertions, 82 deletions
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 0e7545ed0128..947e7196e2be 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -312,18 +312,43 @@ static bool rxrpc_receiving_reply(struct rxrpc_call *call)
return rxrpc_end_tx_phase(call, true, "ETD");
}
+static void rxrpc_input_update_ack_window(struct rxrpc_call *call,
+ rxrpc_seq_t window, rxrpc_seq_t wtop)
+{
+ atomic64_set_release(&call->ackr_window, ((u64)wtop) << 32 | window);
+}
+
/*
- * Process a DATA packet, adding the packet to the Rx ring. The caller's
- * packet ref must be passed on or discarded.
+ * Push a DATA packet onto the Rx queue.
+ */
+static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb,
+ rxrpc_seq_t window, rxrpc_seq_t wtop,
+ enum rxrpc_receive_trace why)
+{
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
+
+ __skb_queue_tail(&call->recvmsg_queue, skb);
+ rxrpc_input_update_ack_window(call, window, wtop);
+
+ trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq);
+}
+
+/*
+ * Process a DATA packet.
*/
static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ struct sk_buff *oos;
rxrpc_serial_t serial = sp->hdr.serial;
- rxrpc_seq_t seq = sp->hdr.seq, hard_ack;
- unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK;
+ u64 win = atomic64_read(&call->ackr_window);
+ rxrpc_seq_t window = lower_32_bits(win);
+ rxrpc_seq_t wtop = upper_32_bits(win);
+ rxrpc_seq_t wlimit = window + call->rx_winsize - 1;
+ rxrpc_seq_t seq = sp->hdr.seq;
bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
- bool acked = false;
+ int ack_reason = -1;
rxrpc_inc_stat(call->rxnet, stat_rx_data);
if (sp->hdr.flags & RXRPC_REQUEST_ACK)
@@ -331,112 +356,135 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
if (sp->hdr.flags & RXRPC_JUMBO_PACKET)
rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo);
- hard_ack = READ_ONCE(call->rx_hard_ack);
-
- _proto("Rx DATA %%%u { #%x l=%u }", serial, seq, last);
-
if (last) {
- if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
- seq != call->rx_top) {
+ if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
+ seq + 1 != wtop) {
rxrpc_proto_abort("LSN", call, seq);
- goto out;
+ goto err_free;
}
} else {
if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
- after_eq(seq, call->rx_top)) {
+ after_eq(seq, wtop)) {
+ pr_warn("Packet beyond last: c=%x q=%x window=%x-%x wlimit=%x\n",
+ call->debug_id, seq, window, wtop, wlimit);
rxrpc_proto_abort("LSA", call, seq);
- goto out;
+ goto err_free;
}
}
+ if (after(seq, call->rx_highest_seq))
+ call->rx_highest_seq = seq;
+
trace_rxrpc_rx_data(call->debug_id, seq, serial, sp->hdr.flags);
- if (before_eq(seq, hard_ack)) {
- rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
- rxrpc_propose_ack_input_data);
- goto out;
+ if (before(seq, window)) {
+ ack_reason = RXRPC_ACK_DUPLICATE;
+ goto send_ack;
}
-
- if (call->rxtx_buffer[ix]) {
- rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
- rxrpc_propose_ack_input_data);
- goto out;
+ if (after(seq, wlimit)) {
+ ack_reason = RXRPC_ACK_EXCEEDS_WINDOW;
+ goto send_ack;
}
- if (after(seq, hard_ack + call->rx_winsize)) {
- rxrpc_send_ACK(call, RXRPC_ACK_EXCEEDS_WINDOW, serial,
- rxrpc_propose_ack_input_data);
- goto out;
- }
+ /* Queue the packet. */
+ if (seq == window) {
+ rxrpc_seq_t reset_from;
+ bool reset_sack = false;
- if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
- rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, serial,
- rxrpc_propose_ack_input_data);
- acked = true;
- }
+ if (sp->hdr.flags & RXRPC_REQUEST_ACK)
+ ack_reason = RXRPC_ACK_REQUESTED;
+ /* Send an immediate ACK if we fill in a hole */
+ else if (!skb_queue_empty(&call->rx_oos_queue))
+ ack_reason = RXRPC_ACK_DELAY;
- if (after(seq, call->ackr_highest_seq))
- call->ackr_highest_seq = seq;
+ window++;
+ if (after(window, wtop))
+ wtop = window;
- /* Queue the packet. We use a couple of memory barriers here as need
- * to make sure that rx_top is perceived to be set after the buffer
- * pointer and that the buffer pointer is set after the annotation and
- * the skb data.
- *
- * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
- * and also rxrpc_fill_out_ack().
- */
- call->rxtx_annotations[ix] = 1;
- smp_wmb();
- call->rxtx_buffer[ix] = skb;
- if (after(seq, call->rx_top)) {
- smp_store_release(&call->rx_top, seq);
- } else if (before(seq, call->rx_top)) {
- /* Send an immediate ACK if we fill in a hole */
- if (!acked) {
- rxrpc_send_ACK(call, RXRPC_ACK_DELAY, serial,
- rxrpc_propose_ack_input_data_hole);
- acked = true;
+ spin_lock(&call->recvmsg_queue.lock);
+ rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue);
+ skb = NULL;
+
+ while ((oos = skb_peek(&call->rx_oos_queue))) {
+ struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
+
+ if (after(osp->hdr.seq, window))
+ break;
+
+ __skb_unlink(oos, &call->rx_oos_queue);
+ last = osp->hdr.flags & RXRPC_LAST_PACKET;
+ seq = osp->hdr.seq;
+ if (!reset_sack) {
+ reset_from = seq;
+ reset_sack = true;
+ }
+
+ window++;
+ rxrpc_input_queue_data(call, oos, window, wtop,
+ rxrpc_receive_queue_oos);
}
- }
- /* From this point on, we're not allowed to touch the packet any longer
- * as its ref now belongs to the Rx ring.
- */
- skb = NULL;
- sp = NULL;
+ spin_unlock(&call->recvmsg_queue.lock);
- if (last) {
- set_bit(RXRPC_CALL_RX_LAST, &call->flags);
- trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
+ if (reset_sack) {
+ do {
+ call->ackr_sack_table[reset_from % RXRPC_SACK_SIZE] = 0;
+ } while (reset_from++, before(reset_from, window));
+ }
} else {
- trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
- }
+ bool keep = false;
+
+ ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE;
+
+ if (!call->ackr_sack_table[seq % RXRPC_SACK_SIZE]) {
+ call->ackr_sack_table[seq % RXRPC_SACK_SIZE] = 1;
+ keep = 1;
+ }
+
+ if (after(seq + 1, wtop)) {
+ wtop = seq + 1;
+ rxrpc_input_update_ack_window(call, window, wtop);
+ }
+
+ if (!keep) {
+ ack_reason = RXRPC_ACK_DUPLICATE;
+ goto send_ack;
+ }
+
+ skb_queue_walk(&call->rx_oos_queue, oos) {
+ struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
- if (after_eq(seq, call->rx_expect_next)) {
- if (after(seq, call->rx_expect_next)) {
- _net("OOS %u > %u", seq, call->rx_expect_next);
- rxrpc_send_ACK(call, RXRPC_ACK_OUT_OF_SEQUENCE, serial,
- rxrpc_propose_ack_input_data);
- acked = true;
+ if (after(osp->hdr.seq, seq)) {
+ __skb_queue_before(&call->rx_oos_queue, oos, skb);
+ goto oos_queued;
+ }
}
- call->rx_expect_next = seq + 1;
+
+ __skb_queue_tail(&call->rx_oos_queue, skb);
+ oos_queued:
+ trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_oos,
+ sp->hdr.serial, sp->hdr.seq);
+ skb = NULL;
}
-out:
- if (!acked &&
- atomic_inc_return(&call->ackr_nr_unacked) > 2)
- rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
+send_ack:
+ if (ack_reason < 0 &&
+ atomic_inc_return(&call->ackr_nr_unacked) > 2 &&
+ test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) {
+ ack_reason = RXRPC_ACK_IDLE;
+ } else if (ack_reason >= 0) {
+ set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags);
+ }
+
+ if (ack_reason >= 0)
+ rxrpc_send_ACK(call, ack_reason, serial,
rxrpc_propose_ack_input_data);
else
rxrpc_propose_delay_ACK(call, serial,
rxrpc_propose_ack_input_data);
- trace_rxrpc_notify_socket(call->debug_id, serial);
- rxrpc_notify_socket(call);
-
+err_free:
rxrpc_free_skb(skb, rxrpc_skb_freed);
- _leave(" [queued]");
}
/*
@@ -498,8 +546,9 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
rxrpc_serial_t serial = sp->hdr.serial;
rxrpc_seq_t seq0 = sp->hdr.seq;
- _enter("{%u,%u},{%u,%u}",
- call->rx_hard_ack, call->rx_top, skb->len, seq0);
+ _enter("{%llx,%x},{%u,%x}",
+ atomic64_read(&call->ackr_window), call->rx_highest_seq,
+ skb->len, seq0);
_proto("Rx DATA %%%u { #%u f=%02x }",
sp->hdr.serial, seq0, sp->hdr.flags);