diff options
Diffstat (limited to 'net/rxrpc/input.c')
-rw-r--r-- | net/rxrpc/input.c | 1399 |
1 files changed, 922 insertions, 477 deletions
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c index 70bb77818dea..3ad9f75031e3 100644 --- a/net/rxrpc/input.c +++ b/net/rxrpc/input.c @@ -1,6 +1,6 @@ /* RxRPC packet reception * - * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. + * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved. * Written by David Howells (dhowells@redhat.com) * * This program is free software; you can redistribute it and/or @@ -27,550 +27,920 @@ #include <net/net_namespace.h> #include "ar-internal.h" +static void rxrpc_proto_abort(const char *why, + struct rxrpc_call *call, rxrpc_seq_t seq) +{ + if (rxrpc_abort_call(why, call, seq, RX_PROTOCOL_ERROR, EBADMSG)) { + set_bit(RXRPC_CALL_EV_ABORT, &call->events); + rxrpc_queue_call(call); + } +} + /* - * queue a packet for recvmsg to pass to userspace - * - the caller must hold a lock on call->lock - * - must not be called with interrupts disabled (sk_filter() disables BH's) - * - eats the packet whether successful or not - * - there must be just one reference to the packet, which the caller passes to - * this function + * Do TCP-style congestion management [RFC 5681]. */ -int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb, - bool force, bool terminal) +static void rxrpc_congestion_management(struct rxrpc_call *call, + struct sk_buff *skb, + struct rxrpc_ack_summary *summary, + rxrpc_serial_t acked_serial) { - struct rxrpc_skb_priv *sp; - struct rxrpc_sock *rx = call->socket; - struct sock *sk; - int ret; + enum rxrpc_congest_change change = rxrpc_cong_no_change; + unsigned int cumulative_acks = call->cong_cumul_acks; + unsigned int cwnd = call->cong_cwnd; + bool resend = false; + + summary->flight_size = + (call->tx_top - call->tx_hard_ack) - summary->nr_acks; + + if (test_and_clear_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags)) { + summary->retrans_timeo = true; + call->cong_ssthresh = max_t(unsigned int, + summary->flight_size / 2, 2); + cwnd = 1; + if (cwnd >= call->cong_ssthresh && + call->cong_mode == RXRPC_CALL_SLOW_START) { + call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; + call->cong_tstamp = skb->tstamp; + cumulative_acks = 0; + } + } - _enter(",,%d,%d", force, terminal); + cumulative_acks += summary->nr_new_acks; + cumulative_acks += summary->nr_rot_new_acks; + if (cumulative_acks > 255) + cumulative_acks = 255; + + summary->mode = call->cong_mode; + summary->cwnd = call->cong_cwnd; + summary->ssthresh = call->cong_ssthresh; + summary->cumulative_acks = cumulative_acks; + summary->dup_acks = call->cong_dup_acks; + + switch (call->cong_mode) { + case RXRPC_CALL_SLOW_START: + if (summary->nr_nacks > 0) + goto packet_loss_detected; + if (summary->cumulative_acks > 0) + cwnd += 1; + if (cwnd >= call->cong_ssthresh) { + call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; + call->cong_tstamp = skb->tstamp; + } + goto out; - ASSERT(!irqs_disabled()); + case RXRPC_CALL_CONGEST_AVOIDANCE: + if (summary->nr_nacks > 0) + goto packet_loss_detected; - sp = rxrpc_skb(skb); - ASSERTCMP(sp->call, ==, call); - - /* if we've already posted the terminal message for a call, then we - * don't post any more */ - if (test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) { - _debug("already terminated"); - ASSERTCMP(call->state, >=, RXRPC_CALL_COMPLETE); - rxrpc_free_skb(skb); - return 0; - } - - sk = &rx->sk; - - if (!force) { - /* cast skb->rcvbuf to unsigned... It's pointless, but - * reduces number of warnings when compiling with -W - * --ANK */ -// ret = -ENOBUFS; -// if (atomic_read(&sk->sk_rmem_alloc) + skb->truesize >= -// (unsigned int) sk->sk_rcvbuf) -// goto out; - - ret = sk_filter(sk, skb); - if (ret < 0) + /* We analyse the number of packets that get ACK'd per RTT + * period and increase the window if we managed to fill it. + */ + if (call->peer->rtt_usage == 0) goto out; - } + if (ktime_before(skb->tstamp, + ktime_add_ns(call->cong_tstamp, + call->peer->rtt))) + goto out_no_clear_ca; + change = rxrpc_cong_rtt_window_end; + call->cong_tstamp = skb->tstamp; + if (cumulative_acks >= cwnd) + cwnd++; + goto out; - spin_lock_bh(&sk->sk_receive_queue.lock); - if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags) && - !test_bit(RXRPC_CALL_RELEASED, &call->flags) && - call->socket->sk.sk_state != RXRPC_CLOSE) { - skb->destructor = rxrpc_packet_destructor; - skb->dev = NULL; - skb->sk = sk; - atomic_add(skb->truesize, &sk->sk_rmem_alloc); + case RXRPC_CALL_PACKET_LOSS: + if (summary->nr_nacks == 0) + goto resume_normality; - if (terminal) { - _debug("<<<< TERMINAL MESSAGE >>>>"); - set_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags); + if (summary->new_low_nack) { + change = rxrpc_cong_new_low_nack; + call->cong_dup_acks = 1; + if (call->cong_extra > 1) + call->cong_extra = 1; + goto send_extra_data; } - /* allow interception by a kernel service */ - if (rx->interceptor) { - rx->interceptor(sk, call->user_call_ID, skb); - spin_unlock_bh(&sk->sk_receive_queue.lock); - } else { - _net("post skb %p", skb); - __skb_queue_tail(&sk->sk_receive_queue, skb); - spin_unlock_bh(&sk->sk_receive_queue.lock); + call->cong_dup_acks++; + if (call->cong_dup_acks < 3) + goto send_extra_data; + + change = rxrpc_cong_begin_retransmission; + call->cong_mode = RXRPC_CALL_FAST_RETRANSMIT; + call->cong_ssthresh = max_t(unsigned int, + summary->flight_size / 2, 2); + cwnd = call->cong_ssthresh + 3; + call->cong_extra = 0; + call->cong_dup_acks = 0; + resend = true; + goto out; - if (!sock_flag(sk, SOCK_DEAD)) - sk->sk_data_ready(sk); + case RXRPC_CALL_FAST_RETRANSMIT: + if (!summary->new_low_nack) { + if (summary->nr_new_acks == 0) + cwnd += 1; + call->cong_dup_acks++; + if (call->cong_dup_acks == 2) { + change = rxrpc_cong_retransmit_again; + call->cong_dup_acks = 0; + resend = true; + } + } else { + change = rxrpc_cong_progress; + cwnd = call->cong_ssthresh; + if (summary->nr_nacks == 0) + goto resume_normality; } - skb = NULL; - } else { - spin_unlock_bh(&sk->sk_receive_queue.lock); + goto out; + + default: + BUG(); + goto out; } - ret = 0; +resume_normality: + change = rxrpc_cong_cleared_nacks; + call->cong_dup_acks = 0; + call->cong_extra = 0; + call->cong_tstamp = skb->tstamp; + if (cwnd < call->cong_ssthresh) + call->cong_mode = RXRPC_CALL_SLOW_START; + else + call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; out: - rxrpc_free_skb(skb); + cumulative_acks = 0; +out_no_clear_ca: + if (cwnd >= RXRPC_RXTX_BUFF_SIZE - 1) + cwnd = RXRPC_RXTX_BUFF_SIZE - 1; + call->cong_cwnd = cwnd; + call->cong_cumul_acks = cumulative_acks; + trace_rxrpc_congest(call, summary, acked_serial, change); + if (resend && !test_and_set_bit(RXRPC_CALL_EV_RESEND, &call->events)) + rxrpc_queue_call(call); + return; - _leave(" = %d", ret); - return ret; +packet_loss_detected: + change = rxrpc_cong_saw_nack; + call->cong_mode = RXRPC_CALL_PACKET_LOSS; + call->cong_dup_acks = 0; + goto send_extra_data; + +send_extra_data: + /* Send some previously unsent DATA if we have some to advance the ACK + * state. + */ + if (call->rxtx_annotations[call->tx_top & RXRPC_RXTX_BUFF_MASK] & + RXRPC_TX_ANNO_LAST || + summary->nr_acks != call->tx_top - call->tx_hard_ack) { + call->cong_extra++; + wake_up(&call->waitq); + } + goto out_no_clear_ca; } /* - * process a DATA packet, posting the packet to the appropriate queue - * - eats the packet if successful + * Ping the other end to fill our RTT cache and to retrieve the rwind + * and MTU parameters. */ -static int rxrpc_fast_process_data(struct rxrpc_call *call, - struct sk_buff *skb, u32 seq) +static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb, + int skew) { - struct rxrpc_skb_priv *sp; - bool terminal; - int ret, ackbit, ack; - u32 serial; - u8 flags; + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + ktime_t now = skb->tstamp; - _enter("{%u,%u},,{%u}", call->rx_data_post, call->rx_first_oos, seq); + if (call->peer->rtt_usage < 3 || + ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), now)) + rxrpc_propose_ACK(call, RXRPC_ACK_PING, skew, sp->hdr.serial, + true, true, + rxrpc_propose_ack_ping_for_params); +} - sp = rxrpc_skb(skb); - ASSERTCMP(sp->call, ==, NULL); - flags = sp->hdr.flags; - serial = sp->hdr.serial; +/* + * Apply a hard ACK by advancing the Tx window. + */ +static void rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to, + struct rxrpc_ack_summary *summary) +{ + struct sk_buff *skb, *list = NULL; + int ix; + u8 annotation; + + if (call->acks_lowest_nak == call->tx_hard_ack) { + call->acks_lowest_nak = to; + } else if (before_eq(call->acks_lowest_nak, to)) { + summary->new_low_nack = true; + call->acks_lowest_nak = to; + } spin_lock(&call->lock); - if (call->state > RXRPC_CALL_COMPLETE) - goto discard; + while (before(call->tx_hard_ack, to)) { + call->tx_hard_ack++; + ix = call->tx_hard_ack & RXRPC_RXTX_BUFF_MASK; + skb = call->rxtx_buffer[ix]; + annotation = call->rxtx_annotations[ix]; + rxrpc_see_skb(skb, rxrpc_skb_tx_rotated); + call->rxtx_buffer[ix] = NULL; + call->rxtx_annotations[ix] = 0; + skb->next = list; + list = skb; + + if (annotation & RXRPC_TX_ANNO_LAST) + set_bit(RXRPC_CALL_TX_LAST, &call->flags); + if ((annotation & RXRPC_TX_ANNO_MASK) != RXRPC_TX_ANNO_ACK) + summary->nr_rot_new_acks++; + } - ASSERTCMP(call->rx_data_expect, >=, call->rx_data_post); - ASSERTCMP(call->rx_data_post, >=, call->rx_data_recv); - ASSERTCMP(call->rx_data_recv, >=, call->rx_data_eaten); + spin_unlock(&call->lock); - if (seq < call->rx_data_post) { - _debug("dup #%u [-%u]", seq, call->rx_data_post); - ack = RXRPC_ACK_DUPLICATE; - ret = -ENOBUFS; - goto discard_and_ack; - } + trace_rxrpc_transmit(call, (test_bit(RXRPC_CALL_TX_LAST, &call->flags) ? + rxrpc_transmit_rotate_last : + rxrpc_transmit_rotate)); + wake_up(&call->waitq); - /* we may already have the packet in the out of sequence queue */ - ackbit = seq - (call->rx_data_eaten + 1); - ASSERTCMP(ackbit, >=, 0); - if (__test_and_set_bit(ackbit, call->ackr_window)) { - _debug("dup oos #%u [%u,%u]", - seq, call->rx_data_eaten, call->rx_data_post); - ack = RXRPC_ACK_DUPLICATE; - goto discard_and_ack; + while (list) { + skb = list; + list = skb->next; + skb->next = NULL; + rxrpc_free_skb(skb, rxrpc_skb_tx_freed); } +} - if (seq >= call->ackr_win_top) { - _debug("exceed #%u [%u]", seq, call->ackr_win_top); - __clear_bit(ackbit, call->ackr_window); - ack = RXRPC_ACK_EXCEEDS_WINDOW; - goto discard_and_ack; - } +/* + * End the transmission phase of a call. + * + * This occurs when we get an ACKALL packet, the first DATA packet of a reply, + * or a final ACK packet. + */ +static bool rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun, + const char *abort_why) +{ - if (seq == call->rx_data_expect) { - clear_bit(RXRPC_CALL_EXPECT_OOS, &call->flags); - call->rx_data_expect++; - } else if (seq > call->rx_data_expect) { - _debug("oos #%u [%u]", seq, call->rx_data_expect); - call->rx_data_expect = seq + 1; - if (test_and_set_bit(RXRPC_CALL_EXPECT_OOS, &call->flags)) { - ack = RXRPC_ACK_OUT_OF_SEQUENCE; - goto enqueue_and_ack; - } - goto enqueue_packet; - } + ASSERT(test_bit(RXRPC_CALL_TX_LAST, &call->flags)); - if (seq != call->rx_data_post) { - _debug("ahead #%u [%u]", seq, call->rx_data_post); - goto enqueue_packet; - } + write_lock(&call->state_lock); - if (test_bit(RXRPC_CALL_RCVD_LAST, &call->flags)) - goto protocol_error; + switch (call->state) { + case RXRPC_CALL_CLIENT_SEND_REQUEST: + case RXRPC_CALL_CLIENT_AWAIT_REPLY: + if (reply_begun) + call->state = RXRPC_CALL_CLIENT_RECV_REPLY; + else + call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY; + break; - /* if the packet need security things doing to it, then it goes down - * the slow path */ - if (call->conn->security_ix) - goto enqueue_packet; + case RXRPC_CALL_SERVER_AWAIT_ACK: + __rxrpc_call_completed(call); + rxrpc_notify_socket(call); + break; - sp->call = call; - rxrpc_get_call(call); - atomic_inc(&call->skb_count); - terminal = ((flags & RXRPC_LAST_PACKET) && - !(flags & RXRPC_CLIENT_INITIATED)); - ret = rxrpc_queue_rcv_skb(call, skb, false, terminal); - if (ret < 0) { - if (ret == -ENOMEM || ret == -ENOBUFS) { - __clear_bit(ackbit, call->ackr_window); - ack = RXRPC_ACK_NOSPACE; - goto discard_and_ack; - } - goto out; + default: + goto bad_state; } - skb = NULL; - sp = NULL; - - _debug("post #%u", seq); - ASSERTCMP(call->rx_data_post, ==, seq); - call->rx_data_post++; + write_unlock(&call->state_lock); + if (call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY) { + rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, 0, false, true, + rxrpc_propose_ack_client_tx_end); + trace_rxrpc_transmit(call, rxrpc_transmit_await_reply); + } else { + trace_rxrpc_transmit(call, rxrpc_transmit_end); + } + _leave(" = ok"); + return true; + +bad_state: + write_unlock(&call->state_lock); + kdebug("end_tx %s", rxrpc_call_states[call->state]); + rxrpc_proto_abort(abort_why, call, call->tx_top); + return false; +} - if (flags & RXRPC_LAST_PACKET) - set_bit(RXRPC_CALL_RCVD_LAST, &call->flags); +/* + * Begin the reply reception phase of a call. + */ +static bool rxrpc_receiving_reply(struct rxrpc_call *call) +{ + struct rxrpc_ack_summary summary = { 0 }; + rxrpc_seq_t top = READ_ONCE(call->tx_top); + + if (call->ackr_reason) { + spin_lock_bh(&call->lock); + call->ackr_reason = 0; + call->resend_at = call->expire_at; + call->ack_at = call->expire_at; + spin_unlock_bh(&call->lock); + rxrpc_set_timer(call, rxrpc_timer_init_for_reply, + ktime_get_real()); + } - /* if we've reached an out of sequence packet then we need to drain - * that queue into the socket Rx queue now */ - if (call->rx_data_post == call->rx_first_oos) { - _debug("drain rx oos now"); - read_lock(&call->state_lock); - if (call->state < RXRPC_CALL_COMPLETE && - !test_and_set_bit(RXRPC_CALL_EV_DRAIN_RX_OOS, &call->events)) - rxrpc_queue_call(call); - read_unlock(&call->state_lock); + if (!test_bit(RXRPC_CALL_TX_LAST, &call->flags)) + rxrpc_rotate_tx_window(call, top, &summary); + if (!test_bit(RXRPC_CALL_TX_LAST, &call->flags)) { + rxrpc_proto_abort("TXL", call, top); + return false; } + if (!rxrpc_end_tx_phase(call, true, "ETD")) + return false; + call->tx_phase = false; + return true; +} - spin_unlock(&call->lock); - atomic_inc(&call->ackr_not_idle); - rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false); - _leave(" = 0 [posted]"); - return 0; +/* + * Scan a jumbo packet to validate its structure and to work out how many + * subpackets it contains. + * + * A jumbo packet is a collection of consecutive packets glued together with + * little headers between that indicate how to change the initial header for + * each subpacket. + * + * RXRPC_JUMBO_PACKET must be set on all but the last subpacket - and all but + * the last are RXRPC_JUMBO_DATALEN in size. The last subpacket may be of any + * size. + */ +static bool rxrpc_validate_jumbo(struct sk_buff *skb) +{ + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + unsigned int offset = sizeof(struct rxrpc_wire_header); + unsigned int len = skb->len; + int nr_jumbo = 1; + u8 flags = sp->hdr.flags; -protocol_error: - ret = -EBADMSG; -out: - spin_unlock(&call->lock); - _leave(" = %d", ret); - return ret; + do { + nr_jumbo++; + if (len - offset < RXRPC_JUMBO_SUBPKTLEN) + goto protocol_error; + if (flags & RXRPC_LAST_PACKET) + goto protocol_error; + offset += RXRPC_JUMBO_DATALEN; + if (skb_copy_bits(skb, offset, &flags, 1) < 0) + goto protocol_error; + offset += sizeof(struct rxrpc_jumbo_header); + } while (flags & RXRPC_JUMBO_PACKET); -discard_and_ack: - _debug("discard and ACK packet %p", skb); - __rxrpc_propose_ACK(call, ack, serial, true); -discard: - spin_unlock(&call->lock); - rxrpc_free_skb(skb); - _leave(" = 0 [discarded]"); - return 0; + sp->nr_jumbo = nr_jumbo; + return true; -enqueue_and_ack: - __rxrpc_propose_ACK(call, ack, serial, true); -enqueue_packet: - _net("defer skb %p", skb); - spin_unlock(&call->lock); - skb_queue_tail(&call->rx_queue, skb); - atomic_inc(&call->ackr_not_idle); - read_lock(&call->state_lock); - if (call->state < RXRPC_CALL_DEAD) - rxrpc_queue_call(call); - read_unlock(&call->state_lock); - _leave(" = 0 [queued]"); - return 0; +protocol_error: + return false; } /* - * assume an implicit ACKALL of the transmission phase of a client socket upon - * reception of the first reply packet + * Handle reception of a duplicate packet. + * + * We have to take care to avoid an attack here whereby we're given a series of + * jumbograms, each with a sequence number one before the preceding one and + * filled up to maximum UDP size. If they never send us the first packet in + * the sequence, they can cause us to have to hold on to around 2MiB of kernel + * space until the call times out. + * + * We limit the space usage by only accepting three duplicate jumbo packets per + * call. After that, we tell the other side we're no longer accepting jumbos + * (that information is encoded in the ACK packet). */ -static void rxrpc_assume_implicit_ackall(struct rxrpc_call *call, u32 serial) +static void rxrpc_input_dup_data(struct rxrpc_call *call, rxrpc_seq_t seq, + u8 annotation, bool *_jumbo_bad) { - write_lock_bh(&call->state_lock); - - switch (call->state) { - case RXRPC_CALL_CLIENT_AWAIT_REPLY: - call->state = RXRPC_CALL_CLIENT_RECV_REPLY; - call->acks_latest = serial; - - _debug("implicit ACKALL %%%u", call->acks_latest); - set_bit(RXRPC_CALL_EV_RCVD_ACKALL, &call->events); - write_unlock_bh(&call->state_lock); - - if (try_to_del_timer_sync(&call->resend_timer) >= 0) { - clear_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events); - clear_bit(RXRPC_CALL_EV_RESEND, &call->events); - clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); - } - break; + /* Discard normal packets that are duplicates. */ + if (annotation == 0) + return; - default: - write_unlock_bh(&call->state_lock); - break; + /* Skip jumbo subpackets that are duplicates. When we've had three or + * more partially duplicate jumbo packets, we refuse to take any more + * jumbos for this call. + */ + if (!*_jumbo_bad) { + call->nr_jumbo_bad++; + *_jumbo_bad = true; } } /* - * post an incoming packet to the nominated call to deal with - * - must get rid of the sk_buff, either by freeing it or by queuing it + * Process a DATA packet, adding the packet to the Rx ring. */ -void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb) +static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb, + u16 skew) { struct rxrpc_skb_priv *sp = rxrpc_skb(skb); - __be32 wtmp; - u32 hi_serial, abort_code; + unsigned int offset = sizeof(struct rxrpc_wire_header); + unsigned int ix; + rxrpc_serial_t serial = sp->hdr.serial, ack_serial = 0; + rxrpc_seq_t seq = sp->hdr.seq, hard_ack; + bool immediate_ack = false, jumbo_bad = false, queued; + u16 len; + u8 ack = 0, flags, annotation = 0; - _enter("%p,%p", call, skb); + _enter("{%u,%u},{%u,%u}", + call->rx_hard_ack, call->rx_top, skb->len, seq); - ASSERT(!irqs_disabled()); + _proto("Rx DATA %%%u { #%u f=%02x }", + sp->hdr.serial, seq, sp->hdr.flags); -#if 0 // INJECT RX ERROR - if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA) { - static int skip = 0; - if (++skip == 3) { - printk("DROPPED 3RD PACKET!!!!!!!!!!!!!\n"); - skip = 0; - goto free_packet; - } + if (call->state >= RXRPC_CALL_COMPLETE) + return; + + /* Received data implicitly ACKs all of the request packets we sent + * when we're acting as a client. + */ + if ((call->state == RXRPC_CALL_CLIENT_SEND_REQUEST || + call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY) && + !rxrpc_receiving_reply(call)) + return; + + call->ackr_prev_seq = seq; + + hard_ack = READ_ONCE(call->rx_hard_ack); + if (after(seq, hard_ack + call->rx_winsize)) { + ack = RXRPC_ACK_EXCEEDS_WINDOW; + ack_serial = serial; + goto ack; } -#endif - /* track the latest serial number on this connection for ACK packet - * information */ - hi_serial = atomic_read(&call->conn->hi_serial); - while (sp->hdr.serial > hi_serial) - hi_serial = atomic_cmpxchg(&call->conn->hi_serial, hi_serial, - sp->hdr.serial); + flags = sp->hdr.flags; + if (flags & RXRPC_JUMBO_PACKET) { + if (call->nr_jumbo_bad > 3) { + ack = RXRPC_ACK_NOSPACE; + ack_serial = serial; + goto ack; + } + annotation = 1; + } - /* request ACK generation for any ACK or DATA packet that requests - * it */ - if (sp->hdr.flags & RXRPC_REQUEST_ACK) { - _proto("ACK Requested on %%%u", sp->hdr.serial); - rxrpc_propose_ACK(call, RXRPC_ACK_REQUESTED, sp->hdr.serial, false); +next_subpacket: + queued = false; + ix = seq & RXRPC_RXTX_BUFF_MASK; + len = skb->len; + if (flags & RXRPC_JUMBO_PACKET) + len = RXRPC_JUMBO_DATALEN; + + if (flags & RXRPC_LAST_PACKET) { + if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && + seq != call->rx_top) + return rxrpc_proto_abort("LSN", call, seq); + } else { + if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && + after_eq(seq, call->rx_top)) + return rxrpc_proto_abort("LSA", call, seq); } - switch (sp->hdr.type) { - case RXRPC_PACKET_TYPE_ABORT: - _debug("abort"); + if (before_eq(seq, hard_ack)) { + ack = RXRPC_ACK_DUPLICATE; + ack_serial = serial; + goto skip; + } - if (skb_copy_bits(skb, 0, &wtmp, sizeof(wtmp)) < 0) - goto protocol_error; + if (flags & RXRPC_REQUEST_ACK && !ack) { + ack = RXRPC_ACK_REQUESTED; + ack_serial = serial; + } - abort_code = ntohl(wtmp); - _proto("Rx ABORT %%%u { %x }", sp->hdr.serial, abort_code); - - write_lock_bh(&call->state_lock); - if (call->state < RXRPC_CALL_COMPLETE) { - call->state = RXRPC_CALL_REMOTELY_ABORTED; - call->remote_abort = abort_code; - set_bit(RXRPC_CALL_EV_RCVD_ABORT, &call->events); - rxrpc_queue_call(call); + if (call->rxtx_buffer[ix]) { + rxrpc_input_dup_data(call, seq, annotation, &jumbo_bad); + if (ack != RXRPC_ACK_DUPLICATE) { + ack = RXRPC_ACK_DUPLICATE; + ack_serial = serial; } - goto free_packet_unlock; + immediate_ack = true; + goto skip; + } - case RXRPC_PACKET_TYPE_BUSY: - _proto("Rx BUSY %%%u", sp->hdr.serial); + /* Queue the packet. We use a couple of memory barriers here as need + * to make sure that rx_top is perceived to be set after the buffer + * pointer and that the buffer pointer is set after the annotation and + * the skb data. + * + * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window() + * and also rxrpc_fill_out_ack(). + */ + rxrpc_get_skb(skb, rxrpc_skb_rx_got); + call->rxtx_annotations[ix] = annotation; + smp_wmb(); + call->rxtx_buffer[ix] = skb; + if (after(seq, call->rx_top)) { + smp_store_release(&call->rx_top, seq); + } else if (before(seq, call->rx_top)) { + /* Send an immediate ACK if we fill in a hole */ + if (!ack) { + ack = RXRPC_ACK_DELAY; + ack_serial = serial; + } + immediate_ack = true; + } + if (flags & RXRPC_LAST_PACKET) { + set_bit(RXRPC_CALL_RX_LAST, &call->flags); + trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq); + } else { + trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq); + } + queued = true; - if (rxrpc_conn_is_service(call->conn)) - goto protocol_error; + if (after_eq(seq, call->rx_expect_next)) { + if (after(seq, call->rx_expect_next)) { + _net("OOS %u > %u", seq, call->rx_expect_next); + ack = RXRPC_ACK_OUT_OF_SEQUENCE; + ack_serial = serial; + } + call->rx_expect_next = seq + 1; + } - write_lock_bh(&call->state_lock); - switch (call->state) { - case RXRPC_CALL_CLIENT_SEND_REQUEST: - call->state = RXRPC_CALL_SERVER_BUSY; - set_bit(RXRPC_CALL_EV_RCVD_BUSY, &call->events); - rxrpc_queue_call(call); - case RXRPC_CALL_SERVER_BUSY: - goto free_packet_unlock; - default: - goto protocol_error_locked; +skip: + offset += len; + if (flags & RXRPC_JUMBO_PACKET) { + if (skb_copy_bits(skb, offset, &flags, 1) < 0) + return rxrpc_proto_abort("XJF", call, seq); + offset += sizeof(struct rxrpc_jumbo_header); + seq++; + serial++; + annotation++; + if (flags & RXRPC_JUMBO_PACKET) + annotation |= RXRPC_RX_ANNO_JLAST; + if (after(seq, hard_ack + call->rx_winsize)) { + ack = RXRPC_ACK_EXCEEDS_WINDOW; + ack_serial = serial; + if (!jumbo_bad) { + call->nr_jumbo_bad++; + jumbo_bad = true; + } + goto ack; } - default: - _proto("Rx %s %%%u", rxrpc_pkts[sp->hdr.type], sp->hdr.serial); - goto protocol_error; + _proto("Rx DATA Jumbo %%%u", serial); + goto next_subpacket; + } - case RXRPC_PACKET_TYPE_DATA: - _proto("Rx DATA %%%u { #%u }", sp->hdr.serial, sp->hdr.seq); + if (queued && flags & RXRPC_LAST_PACKET && !ack) { + ack = RXRPC_ACK_DELAY; + ack_serial = serial; + } - if (sp->hdr.seq == 0) - goto protocol_error; +ack: + if (ack) + rxrpc_propose_ACK(call, ack, skew, ack_serial, + immediate_ack, true, + rxrpc_propose_ack_input_data); - call->ackr_prev_seq = sp->hdr.seq; + if (sp->hdr.seq == READ_ONCE(call->rx_hard_ack) + 1) + rxrpc_notify_socket(call); + _leave(" [queued]"); +} - /* received data implicitly ACKs all of the request packets we - * sent when we're acting as a client */ - if (call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY) - rxrpc_assume_implicit_ackall(call, sp->hdr.serial); +/* + * Process a requested ACK. + */ +static void rxrpc_input_requested_ack(struct rxrpc_call *call, + ktime_t resp_time, + rxrpc_serial_t orig_serial, + rxrpc_serial_t ack_serial) +{ + struct rxrpc_skb_priv *sp; + struct sk_buff *skb; + ktime_t sent_at; + int ix; + + for (ix = 0; ix < RXRPC_RXTX_BUFF_SIZE; ix++) { + skb = call->rxtx_buffer[ix]; + if (!skb) + continue; + + sp = rxrpc_skb(skb); + if (sp->hdr.serial != orig_serial) + continue; + smp_rmb(); + sent_at = skb->tstamp; + goto found; + } + return; - switch (rxrpc_fast_process_data(call, skb, sp->hdr.seq)) { - case 0: - skb = NULL; - goto done; +found: + rxrpc_peer_add_rtt(call, rxrpc_rtt_rx_requested_ack, + orig_serial, ack_serial, sent_at, resp_time); +} - default: - BUG(); +/* + * Process a ping response. + */ +static void rxrpc_input_ping_response(struct rxrpc_call *call, + ktime_t resp_time, + rxrpc_serial_t orig_serial, + rxrpc_serial_t ack_serial) +{ + rxrpc_serial_t ping_serial; + ktime_t ping_time; - /* data packet received beyond the last packet */ - case -EBADMSG: - goto protocol_error; - } + ping_time = call->ackr_ping_time; + smp_rmb(); + ping_serial = call->ackr_ping; - case RXRPC_PACKET_TYPE_ACKALL: - case RXRPC_PACKET_TYPE_ACK: - /* ACK processing is done in process context */ - read_lock_bh(&call->state_lock); - if (call->state < RXRPC_CALL_DEAD) { - skb_queue_tail(&call->rx_queue, skb); - rxrpc_queue_call(call); - skb = NULL; - } - read_unlock_bh(&call->state_lock); - goto free_packet; + if (!test_bit(RXRPC_CALL_PINGING, &call->flags) || + before(orig_serial, ping_serial)) + return; + clear_bit(RXRPC_CALL_PINGING, &call->flags); + if (after(orig_serial, ping_serial)) + return; + + rxrpc_peer_add_rtt(call, rxrpc_rtt_rx_ping_response, + orig_serial, ack_serial, ping_time, resp_time); +} + +/* + * Process the extra information that may be appended to an ACK packet + */ +static void rxrpc_input_ackinfo(struct rxrpc_call *call, struct sk_buff *skb, + struct rxrpc_ackinfo *ackinfo) +{ + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + struct rxrpc_peer *peer; + unsigned int mtu; + u32 rwind = ntohl(ackinfo->rwind); + + _proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }", + sp->hdr.serial, + ntohl(ackinfo->rxMTU), ntohl(ackinfo->maxMTU), + rwind, ntohl(ackinfo->jumbo_max)); + + if (rwind > RXRPC_RXTX_BUFF_SIZE - 1) + rwind = RXRPC_RXTX_BUFF_SIZE - 1; + call->tx_winsize = rwind; + if (call->cong_ssthresh > rwind) + call->cong_ssthresh = rwind; + + mtu = min(ntohl(ackinfo->rxMTU), ntohl(ackinfo->maxMTU)); + + peer = call->peer; + if (mtu < peer->maxdata) { + spin_lock_bh(&peer->lock); + peer->maxdata = mtu; + peer->mtu = mtu + peer->hdrsize; + spin_unlock_bh(&peer->lock); + _net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata); } +} -protocol_error: - _debug("protocol error"); - write_lock_bh(&call->state_lock); -protocol_error_locked: - if (call->state <= RXRPC_CALL_COMPLETE) { - call->state = RXRPC_CALL_LOCALLY_ABORTED; - call->local_abort = RX_PROTOCOL_ERROR; - set_bit(RXRPC_CALL_EV_ABORT, &call->events); - rxrpc_queue_call(call); +/* + * Process individual soft ACKs. + * + * Each ACK in the array corresponds to one packet and can be either an ACK or + * a NAK. If we get find an explicitly NAK'd packet we resend immediately; + * packets that lie beyond the end of the ACK list are scheduled for resend by + * the timer on the basis that the peer might just not have processed them at + * the time the ACK was sent. + */ +static void rxrpc_input_soft_acks(struct rxrpc_call *call, u8 *acks, + rxrpc_seq_t seq, int nr_acks, + struct rxrpc_ack_summary *summary) +{ + int ix; + u8 annotation, anno_type; + + for (; nr_acks > 0; nr_acks--, seq++) { + ix = seq & RXRPC_RXTX_BUFF_MASK; + annotation = call->rxtx_annotations[ix]; + anno_type = annotation & RXRPC_TX_ANNO_MASK; + annotation &= ~RXRPC_TX_ANNO_MASK; + switch (*acks++) { + case RXRPC_ACK_TYPE_ACK: + summary->nr_acks++; + if (anno_type == RXRPC_TX_ANNO_ACK) + continue; + summary->nr_new_acks++; + call->rxtx_annotations[ix] = + RXRPC_TX_ANNO_ACK | annotation; + break; + case RXRPC_ACK_TYPE_NACK: + if (!summary->nr_nacks && + call->acks_lowest_nak != seq) { + call->acks_lowest_nak = seq; + summary->new_low_nack = true; + } + summary->nr_nacks++; + if (anno_type == RXRPC_TX_ANNO_NAK) + continue; + summary->nr_new_nacks++; + if (anno_type == RXRPC_TX_ANNO_RETRANS) + continue; + call->rxtx_annotations[ix] = + RXRPC_TX_ANNO_NAK | annotation; + break; + default: + return rxrpc_proto_abort("SFT", call, 0); + } } -free_packet_unlock: - write_unlock_bh(&call->state_lock); -free_packet: - rxrpc_free_skb(skb); -done: - _leave(""); } /* - * split up a jumbo data packet + * Process an ACK packet. + * + * ack.firstPacket is the sequence number of the first soft-ACK'd/NAK'd packet + * in the ACK array. Anything before that is hard-ACK'd and may be discarded. + * + * A hard-ACK means that a packet has been processed and may be discarded; a + * soft-ACK means that the packet may be discarded and retransmission + * requested. A phase is complete when all packets are hard-ACK'd. */ -static void rxrpc_process_jumbo_packet(struct rxrpc_call *call, - struct sk_buff *jumbo) +static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb, + u16 skew) { - struct rxrpc_jumbo_header jhdr; - struct rxrpc_skb_priv *sp; - struct sk_buff *part; + struct rxrpc_ack_summary summary = { 0 }; + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + union { + struct rxrpc_ackpacket ack; + struct rxrpc_ackinfo info; + u8 acks[RXRPC_MAXACKS]; + } buf; + rxrpc_serial_t acked_serial; + rxrpc_seq_t first_soft_ack, hard_ack; + int nr_acks, offset, ioffset; + + _enter(""); + + offset = sizeof(struct rxrpc_wire_header); + if (skb_copy_bits(skb, offset, &buf.ack, sizeof(buf.ack)) < 0) { + _debug("extraction failure"); + return rxrpc_proto_abort("XAK", call, 0); + } + offset += sizeof(buf.ack); + + acked_serial = ntohl(buf.ack.serial); + first_soft_ack = ntohl(buf.ack.firstPacket); + hard_ack = first_soft_ack - 1; + nr_acks = buf.ack.nAcks; + summary.ack_reason = (buf.ack.reason < RXRPC_ACK__INVALID ? + buf.ack.reason : RXRPC_ACK__INVALID); + + trace_rxrpc_rx_ack(call, first_soft_ack, summary.ack_reason, nr_acks); + + _proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", + sp->hdr.serial, + ntohs(buf.ack.maxSkew), + first_soft_ack, + ntohl(buf.ack.previousPacket), + acked_serial, + rxrpc_ack_names[summary.ack_reason], + buf.ack.nAcks); + + if (buf.ack.reason == RXRPC_ACK_PING_RESPONSE) + rxrpc_input_ping_response(call, skb->tstamp, acked_serial, + sp->hdr.serial); + if (buf.ack.reason == RXRPC_ACK_REQUESTED) + rxrpc_input_requested_ack(call, skb->tstamp, acked_serial, + sp->hdr.serial); + + if (buf.ack.reason == RXRPC_ACK_PING) { + _proto("Rx ACK %%%u PING Request", sp->hdr.serial); + rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE, + skew, sp->hdr.serial, true, true, + rxrpc_propose_ack_respond_to_ping); + } else if (sp->hdr.flags & RXRPC_REQUEST_ACK) { + rxrpc_propose_ACK(call, RXRPC_ACK_REQUESTED, + skew, sp->hdr.serial, true, true, + rxrpc_propose_ack_respond_to_ack); + } - _enter(",{%u,%u}", jumbo->data_len, jumbo->len); + ioffset = offset + nr_acks + 3; + if (skb->len >= ioffset + sizeof(buf.info)) { + if (skb_copy_bits(skb, ioffset, &buf.info, sizeof(buf.info)) < 0) + return rxrpc_proto_abort("XAI", call, 0); + rxrpc_input_ackinfo(call, skb, &buf.info); + } - sp = rxrpc_skb(jumbo); + if (first_soft_ack == 0) + return rxrpc_proto_abort("AK0", call, 0); - do { - sp->hdr.flags &= ~RXRPC_JUMBO_PACKET; - - /* make a clone to represent the first subpacket in what's left - * of the jumbo packet */ - part = skb_clone(jumbo, GFP_ATOMIC); - if (!part) { - /* simply ditch the tail in the event of ENOMEM */ - pskb_trim(jumbo, RXRPC_JUMBO_DATALEN); - break; - } - rxrpc_new_skb(part); + /* Ignore ACKs unless we are or have just been transmitting. */ + switch (call->state) { + case RXRPC_CALL_CLIENT_SEND_REQUEST: + case RXRPC_CALL_CLIENT_AWAIT_REPLY: + case RXRPC_CALL_SERVER_SEND_REPLY: + case RXRPC_CALL_SERVER_AWAIT_ACK: + break; + default: + return; + } - pskb_trim(part, RXRPC_JUMBO_DATALEN); + /* Discard any out-of-order or duplicate ACKs. */ + if (before_eq(sp->hdr.serial, call->acks_latest)) { + _debug("discard ACK %d <= %d", + sp->hdr.serial, call->acks_latest); + return; + } + call->acks_latest_ts = skb->tstamp; + call->acks_latest = sp->hdr.serial; + + if (before(hard_ack, call->tx_hard_ack) || + after(hard_ack, call->tx_top)) + return rxrpc_proto_abort("AKW", call, 0); + if (nr_acks > call->tx_top - hard_ack) + return rxrpc_proto_abort("AKN", call, 0); + + if (after(hard_ack, call->tx_hard_ack)) + rxrpc_rotate_tx_window(call, hard_ack, &summary); + + if (nr_acks > 0) { + if (skb_copy_bits(skb, offset, buf.acks, nr_acks) < 0) + return rxrpc_proto_abort("XSA", call, 0); + rxrpc_input_soft_acks(call, buf.acks, first_soft_ack, nr_acks, + &summary); + } - if (!pskb_pull(jumbo, RXRPC_JUMBO_DATALEN)) - goto protocol_error; + if (test_bit(RXRPC_CALL_TX_LAST, &call->flags)) { + rxrpc_end_tx_phase(call, false, "ETA"); + return; + } - if (skb_copy_bits(jumbo, 0, &jhdr, sizeof(jhdr)) < 0) - goto protocol_error; - if (!pskb_pull(jumbo, sizeof(jhdr))) - BUG(); + if (call->rxtx_annotations[call->tx_top & RXRPC_RXTX_BUFF_MASK] & + RXRPC_TX_ANNO_LAST && + summary.nr_acks == call->tx_top - hard_ack) + rxrpc_propose_ACK(call, RXRPC_ACK_PING, skew, sp->hdr.serial, + false, true, + rxrpc_propose_ack_ping_for_lost_reply); - sp->hdr.seq += 1; - sp->hdr.serial += 1; - sp->hdr.flags = jhdr.flags; - sp->hdr._rsvd = ntohs(jhdr._rsvd); + return rxrpc_congestion_management(call, skb, &summary, acked_serial); +} - _proto("Rx DATA Jumbo %%%u", sp->hdr.serial - 1); +/* + * Process an ACKALL packet. + */ +static void rxrpc_input_ackall(struct rxrpc_call *call, struct sk_buff *skb) +{ + struct rxrpc_ack_summary summary = { 0 }; + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); - rxrpc_fast_process_packet(call, part); - part = NULL; + _proto("Rx ACKALL %%%u", sp->hdr.serial); - } while (sp->hdr.flags & RXRPC_JUMBO_PACKET); + rxrpc_rotate_tx_window(call, call->tx_top, &summary); + if (test_bit(RXRPC_CALL_TX_LAST, &call->flags)) + rxrpc_end_tx_phase(call, false, "ETL"); +} - rxrpc_fast_process_packet(call, jumbo); - _leave(""); - return; +/* + * Process an ABORT packet. + */ +static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb) +{ + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + __be32 wtmp; + u32 abort_code = RX_CALL_DEAD; -protocol_error: - _debug("protocol error"); - rxrpc_free_skb(part); - rxrpc_free_skb(jumbo); - write_lock_bh(&call->state_lock); - if (call->state <= RXRPC_CALL_COMPLETE) { - call->state = RXRPC_CALL_LOCALLY_ABORTED; - call->local_abort = RX_PROTOCOL_ERROR; - set_bit(RXRPC_CALL_EV_ABORT, &call->events); - rxrpc_queue_call(call); - } - write_unlock_bh(&call->state_lock); - _leave(""); + _enter(""); + + if (skb->len >= 4 && + skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), + &wtmp, sizeof(wtmp)) >= 0) + abort_code = ntohl(wtmp); + + _proto("Rx ABORT %%%u { %x }", sp->hdr.serial, abort_code); + + if (rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, + abort_code, ECONNABORTED)) + rxrpc_notify_socket(call); } /* - * post an incoming packet to the appropriate call/socket to deal with - * - must get rid of the sk_buff, either by freeing it or by queuing it + * Process an incoming call packet. */ -static void rxrpc_post_packet_to_call(struct rxrpc_call *call, - struct sk_buff *skb) +static void rxrpc_input_call_packet(struct rxrpc_call *call, + struct sk_buff *skb, u16 skew) { - struct rxrpc_skb_priv *sp; + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); _enter("%p,%p", call, skb); - sp = rxrpc_skb(skb); + switch (sp->hdr.type) { + case RXRPC_PACKET_TYPE_DATA: + rxrpc_input_data(call, skb, skew); + break; - _debug("extant call [%d]", call->state); + case RXRPC_PACKET_TYPE_ACK: + rxrpc_input_ack(call, skb, skew); + break; + + case RXRPC_PACKET_TYPE_BUSY: + _proto("Rx BUSY %%%u", sp->hdr.serial); + + /* Just ignore BUSY packets from the server; the retry and + * lifespan timers will take care of business. BUSY packets + * from the client don't make sense. + */ + break; + + case RXRPC_PACKET_TYPE_ABORT: + rxrpc_input_abort(call, skb); + break; + + case RXRPC_PACKET_TYPE_ACKALL: + rxrpc_input_ackall(call, skb); + break; - read_lock(&call->state_lock); - switch (call->state) { - case RXRPC_CALL_LOCALLY_ABORTED: - if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events)) { - rxrpc_queue_call(call); - goto free_unlock; - } - case RXRPC_CALL_REMOTELY_ABORTED: - case RXRPC_CALL_NETWORK_ERROR: - case RXRPC_CALL_DEAD: - goto dead_call; - case RXRPC_CALL_COMPLETE: - case RXRPC_CALL_CLIENT_FINAL_ACK: - /* complete server call */ - if (rxrpc_conn_is_service(call->conn)) - goto dead_call; - /* resend last packet of a completed call */ - _debug("final ack again"); - rxrpc_get_call(call); - set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events); - rxrpc_queue_call(call); - goto free_unlock; default: + _proto("Rx %s %%%u", rxrpc_pkts[sp->hdr.type], sp->hdr.serial); break; } - read_unlock(&call->state_lock); - rxrpc_get_call(call); - - if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA && - sp->hdr.flags & RXRPC_JUMBO_PACKET) - rxrpc_process_jumbo_packet(call, skb); - else - rxrpc_fast_process_packet(call, skb); - - rxrpc_put_call(call); - goto done; - -dead_call: - if (sp->hdr.type != RXRPC_PACKET_TYPE_ABORT) { - skb->priority = RX_CALL_DEAD; - rxrpc_reject_packet(call->conn->params.local, skb); - goto unlock; - } -free_unlock: - rxrpc_free_skb(skb); -unlock: - read_unlock(&call->state_lock); -done: _leave(""); } /* * post connection-level events to the connection - * - this includes challenges, responses and some aborts + * - this includes challenges, responses, some aborts and call terminal packet + * retransmission. */ static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn, struct sk_buff *skb) @@ -595,6 +965,17 @@ static void rxrpc_post_packet_to_local(struct rxrpc_local *local, } /* + * put a packet up for transport-level abort + */ +static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb) +{ + CHECK_SLAB_OKAY(&local->usage); + + skb_queue_tail(&local->reject_queue, skb); + rxrpc_queue_local(local); +} + +/* * Extract the wire header from a packet and translate the byte order. */ static noinline @@ -605,8 +986,6 @@ int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb) /* dig out the RxRPC connection details */ if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0) return -EBADMSG; - if (!pskb_pull(skb, sizeof(whdr))) - BUG(); memset(sp, 0, sizeof(*sp)); sp->hdr.epoch = ntohl(whdr.epoch); @@ -631,19 +1010,22 @@ int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb) * shut down and the local endpoint from going away, thus sk_user_data will not * be cleared until this function returns. */ -void rxrpc_data_ready(struct sock *sk) +void rxrpc_data_ready(struct sock *udp_sk) { struct rxrpc_connection *conn; + struct rxrpc_channel *chan; + struct rxrpc_call *call; struct rxrpc_skb_priv *sp; - struct rxrpc_local *local = sk->sk_user_data; + struct rxrpc_local *local = udp_sk->sk_user_data; struct sk_buff *skb; - int ret; + unsigned int channel; + int ret, skew; - _enter("%p", sk); + _enter("%p", udp_sk); ASSERT(!irqs_disabled()); - skb = skb_recv_datagram(sk, 0, 1, &ret); + skb = skb_recv_datagram(udp_sk, 0, 1, &ret); if (!skb) { if (ret == -EAGAIN) return; @@ -651,13 +1033,13 @@ void rxrpc_data_ready(struct sock *sk) return; } - rxrpc_new_skb(skb); + rxrpc_new_skb(skb, rxrpc_skb_rx_received); _net("recv skb %p", skb); /* we'll probably need to checksum it (didn't call sock_recvmsg) */ if (skb_checksum_complete(skb)) { - rxrpc_free_skb(skb); + rxrpc_free_skb(skb, rxrpc_skb_rx_freed); __UDP_INC_STATS(&init_net, UDP_MIB_INERRORS, 0); _leave(" [CSUM failed]"); return; @@ -671,13 +1053,21 @@ void rxrpc_data_ready(struct sock *sk) skb_orphan(skb); sp = rxrpc_skb(skb); - _net("Rx UDP packet from %08x:%04hu", - ntohl(ip_hdr(skb)->saddr), ntohs(udp_hdr(skb)->source)); - /* dig out the RxRPC connection details */ if (rxrpc_extract_header(sp, skb) < 0) goto bad_message; + if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) { + static int lose; + if ((lose++ & 7) == 7) { + trace_rxrpc_rx_lose(sp); + rxrpc_lose_skb(skb, rxrpc_skb_rx_lost); + return; + } + } + + trace_rxrpc_rx_packet(sp); + _net("Rx RxRPC %s ep=%x call=%x:%x", sp->hdr.flags & RXRPC_CLIENT_INITIATED ? "ToServer" : "ToClient", sp->hdr.epoch, sp->hdr.cid, sp->hdr.callNumber); @@ -688,70 +1078,125 @@ void rxrpc_data_ready(struct sock *sk) goto bad_message; } - if (sp->hdr.type == RXRPC_PACKET_TYPE_VERSION) { + switch (sp->hdr.type) { + case RXRPC_PACKET_TYPE_VERSION: rxrpc_post_packet_to_local(local, skb); goto out; - } - if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA && - (sp->hdr.callNumber == 0 || sp->hdr.seq == 0)) - goto bad_message; + case RXRPC_PACKET_TYPE_BUSY: + if (sp->hdr.flags & RXRPC_CLIENT_INITIATED) + goto discard; + + case RXRPC_PACKET_TYPE_DATA: + if (sp->hdr.callNumber == 0) + goto bad_message; + if (sp->hdr.flags & RXRPC_JUMBO_PACKET && + !rxrpc_validate_jumbo(skb)) + goto bad_message; + break; + } rcu_read_lock(); conn = rxrpc_find_connection_rcu(local, skb); - if (!conn) - goto cant_route_call; + if (conn) { + if (sp->hdr.securityIndex != conn->security_ix) + goto wrong_security; + + if (sp->hdr.callNumber == 0) { + /* Connection-level packet */ + _debug("CONN %p {%d}", conn, conn->debug_id); + rxrpc_post_packet_to_conn(conn, skb); + goto out_unlock; + } + + /* Note the serial number skew here */ + skew = (int)sp->hdr.serial - (int)conn->hi_serial; + if (skew >= 0) { + if (skew > 0) + conn->hi_serial = sp->hdr.serial; + } else { + skew = -skew; + skew = min(skew, 65535); + } - if (sp->hdr.callNumber == 0) { - /* Connection-level packet */ - _debug("CONN %p {%d}", conn, conn->debug_id); - rxrpc_post_packet_to_conn(conn, skb); - } else { /* Call-bound packets are routed by connection channel. */ - unsigned int channel = sp->hdr.cid & RXRPC_CHANNELMASK; - struct rxrpc_channel *chan = &conn->channels[channel]; - struct rxrpc_call *call = rcu_dereference(chan->call); + channel = sp->hdr.cid & RXRPC_CHANNELMASK; + chan = &conn->channels[channel]; + + /* Ignore really old calls */ + if (sp->hdr.callNumber < chan->last_call) + goto discard_unlock; + + if (sp->hdr.callNumber == chan->last_call) { + /* For the previous service call, if completed successfully, we + * discard all further packets. + */ + if (rxrpc_conn_is_service(conn) && + (chan->last_type == RXRPC_PACKET_TYPE_ACK || + sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)) + goto discard_unlock; + + /* But otherwise we need to retransmit the final packet from + * data cached in the connection record. + */ + rxrpc_post_packet_to_conn(conn, skb); + goto out_unlock; + } - if (!call || atomic_read(&call->usage) == 0) - goto cant_route_call; + call = rcu_dereference(chan->call); + } else { + skew = 0; + call = NULL; + } - rxrpc_post_packet_to_call(call, skb); + if (!call || atomic_read(&call->usage) == 0) { + if (!(sp->hdr.type & RXRPC_CLIENT_INITIATED) || + sp->hdr.callNumber == 0 || + sp->hdr.type != RXRPC_PACKET_TYPE_DATA) + goto bad_message_unlock; + if (sp->hdr.seq != 1) + goto discard_unlock; + call = rxrpc_new_incoming_call(local, conn, skb); + if (!call) { + rcu_read_unlock(); + goto reject_packet; + } + rxrpc_send_ping(call, skb, skew); } + rxrpc_input_call_packet(call, skb, skew); + goto discard_unlock; + +discard_unlock: rcu_read_unlock(); +discard: + rxrpc_free_skb(skb, rxrpc_skb_rx_freed); out: + trace_rxrpc_rx_done(0, 0); return; -cant_route_call: +out_unlock: rcu_read_unlock(); + goto out; - _debug("can't route call"); - if (sp->hdr.flags & RXRPC_CLIENT_INITIATED && - sp->hdr.type == RXRPC_PACKET_TYPE_DATA) { - if (sp->hdr.seq == 1) { - _debug("first packet"); - skb_queue_tail(&local->accept_queue, skb); - rxrpc_queue_work(&local->processor); - _leave(" [incoming]"); - return; - } - skb->priority = RX_INVALID_OPERATION; - } else { - skb->priority = RX_CALL_DEAD; - } - - if (sp->hdr.type != RXRPC_PACKET_TYPE_ABORT) { - _debug("reject type %d",sp->hdr.type); - rxrpc_reject_packet(local, skb); - } else { - rxrpc_free_skb(skb); - } - _leave(" [no call]"); - return; +wrong_security: + rcu_read_unlock(); + trace_rxrpc_abort("SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, + RXKADINCONSISTENCY, EBADMSG); + skb->priority = RXKADINCONSISTENCY; + goto post_abort; +bad_message_unlock: + rcu_read_unlock(); bad_message: + trace_rxrpc_abort("BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, + RX_PROTOCOL_ERROR, EBADMSG); skb->priority = RX_PROTOCOL_ERROR; +post_abort: + skb->mark = RXRPC_SKB_MARK_LOCAL_ABORT; +reject_packet: + trace_rxrpc_rx_done(skb->mark, skb->priority); rxrpc_reject_packet(local, skb); _leave(" [badmsg]"); } |