diff options
Diffstat (limited to 'net/rxrpc/io_thread.c')
-rw-r--r-- | net/rxrpc/io_thread.c | 319 |
1 files changed, 146 insertions, 173 deletions
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c index bc65d83fab88..19aa315eddf5 100644 --- a/net/rxrpc/io_thread.c +++ b/net/rxrpc/io_thread.c @@ -9,6 +9,10 @@ #include "ar-internal.h" +static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn, + struct sockaddr_rxrpc *peer_srx, + struct sk_buff *skb); + /* * handle data received on the local endpoint * - may be called in interrupt context @@ -63,45 +67,19 @@ void rxrpc_error_report(struct sock *sk) } /* - * post connection-level events to the connection - * - this includes challenges, responses, some aborts and call terminal packet - * retransmission. + * Process event packets targeted at a local endpoint. */ -static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn, - struct sk_buff *skb) +static void rxrpc_input_version(struct rxrpc_local *local, struct sk_buff *skb) { - _enter("%p,%p", conn, skb); - - rxrpc_get_skb(skb, rxrpc_skb_get_conn_work); - skb_queue_tail(&conn->rx_queue, skb); - rxrpc_queue_conn(conn, rxrpc_conn_queue_rx_work); -} + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + char v; -/* - * post endpoint-level events to the local endpoint - * - this includes debug and version messages - */ -static void rxrpc_post_packet_to_local(struct rxrpc_local *local, - struct sk_buff *skb) -{ - _enter("%p,%p", local, skb); + _enter(""); - if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) { - rxrpc_get_skb(skb, rxrpc_skb_get_local_work); - skb_queue_tail(&local->event_queue, skb); - rxrpc_queue_local(local); - } -} - -/* - * put a packet up for transport-level abort - */ -static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb) -{ - if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) { - rxrpc_get_skb(skb, rxrpc_skb_get_reject_work); - skb_queue_tail(&local->reject_queue, skb); - rxrpc_queue_local(local); + rxrpc_see_skb(skb, rxrpc_skb_see_version); + if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), &v, 1) >= 0) { + if (v == 0) + rxrpc_send_version_request(local, &sp->hdr, skb); } } @@ -156,22 +134,13 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb) { struct rxrpc_connection *conn; struct sockaddr_rxrpc peer_srx; - struct rxrpc_channel *chan; - struct rxrpc_call *call = NULL; struct rxrpc_skb_priv *sp; struct rxrpc_peer *peer = NULL; - struct rxrpc_sock *rx = NULL; struct sk_buff *skb = *_skb; - unsigned int channel; - - if (skb->tstamp == 0) - skb->tstamp = ktime_get_real(); + int ret = 0; skb_pull(skb, sizeof(struct udphdr)); - /* The UDP protocol already released all skb resources; - * we are free to add our own data there. - */ sp = rxrpc_skb(skb); /* dig out the RxRPC connection details */ @@ -186,15 +155,13 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb) } } - if (skb->tstamp == 0) - skb->tstamp = ktime_get_real(); trace_rxrpc_rx_packet(sp); switch (sp->hdr.type) { case RXRPC_PACKET_TYPE_VERSION: if (rxrpc_to_client(sp)) return 0; - rxrpc_post_packet_to_local(local, skb); + rxrpc_input_version(local, skb); return 0; case RXRPC_PACKET_TYPE_BUSY: @@ -259,7 +226,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb) goto bad_message; if (WARN_ON_ONCE(rxrpc_extract_addr_from_skb(&peer_srx, skb) < 0)) - return 0; /* Unsupported address type - discard. */ + return true; /* Unsupported address type - discard. */ if (peer_srx.transport.family != local->srx.transport.family && (peer_srx.transport.family == AF_INET && @@ -267,171 +234,172 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb) pr_warn_ratelimited("AF_RXRPC: Protocol mismatch %u not %u\n", peer_srx.transport.family, local->srx.transport.family); - return 0; /* Wrong address type - discard. */ + return true; /* Wrong address type - discard. */ + } + + if (rxrpc_to_client(sp)) { + rcu_read_lock(); + conn = rxrpc_find_client_connection_rcu(local, &peer_srx, skb); + conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input); + rcu_read_unlock(); + if (!conn) { + trace_rxrpc_abort(0, "NCC", sp->hdr.cid, + sp->hdr.callNumber, sp->hdr.seq, + RXKADINCONSISTENCY, EBADMSG); + goto protocol_error; + } + + ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb); + rxrpc_put_connection(conn, rxrpc_conn_put_call_input); + return ret; } + /* We need to look up service connections by the full protocol + * parameter set. We look up the peer first as an intermediate step + * and then the connection from the peer's tree. + */ rcu_read_lock(); - if (rxrpc_to_server(sp)) { - /* Weed out packets to services we're not offering. Packets - * that would begin a call are explicitly rejected and the rest - * are just discarded. - */ - rx = rcu_dereference(local->service); - if (!rx || (sp->hdr.serviceId != rx->srx.srx_service && - sp->hdr.serviceId != rx->second_service) - ) { - rcu_read_unlock(); - if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA && - sp->hdr.seq == 1) - goto unsupported_service; - return 0; - } + peer = rxrpc_lookup_peer_rcu(local, &peer_srx); + if (!peer) { + rcu_read_unlock(); + return rxrpc_new_incoming_call(local, NULL, NULL, &peer_srx, skb); } - conn = rxrpc_find_connection_rcu(local, &peer_srx, skb, &peer); + conn = rxrpc_find_service_conn_rcu(peer, skb); + conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input); if (conn) { - if (sp->hdr.securityIndex != conn->security_ix) - goto wrong_security; + rcu_read_unlock(); + ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb); + rxrpc_put_connection(conn, rxrpc_conn_put_call_input); + return ret; + } - if (sp->hdr.serviceId != conn->service_id) { - int old_id; + peer = rxrpc_get_peer_maybe(peer, rxrpc_peer_get_input); + rcu_read_unlock(); - if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags)) - goto reupgrade; - old_id = cmpxchg(&conn->service_id, conn->orig_service_id, - sp->hdr.serviceId); + ret = rxrpc_new_incoming_call(local, peer, NULL, &peer_srx, skb); + rxrpc_put_peer(peer, rxrpc_peer_put_input); + if (ret < 0) + goto reject_packet; + return 0; - if (old_id != conn->orig_service_id && - old_id != sp->hdr.serviceId) - goto reupgrade; - } +bad_message: + trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, + RX_PROTOCOL_ERROR, EBADMSG); +protocol_error: + skb->priority = RX_PROTOCOL_ERROR; + skb->mark = RXRPC_SKB_MARK_REJECT_ABORT; +reject_packet: + rxrpc_reject_packet(local, skb); + return ret; +} - if (sp->hdr.callNumber == 0) { - /* Connection-level packet */ - _debug("CONN %p {%d}", conn, conn->debug_id); - conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_conn_input); - rcu_read_unlock(); - if (conn) { - rxrpc_post_packet_to_conn(conn, skb); - rxrpc_put_connection(conn, rxrpc_conn_put_conn_input); - } - return 0; - } +/* + * Deal with a packet that's associated with an extant connection. + */ +static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn, + struct sockaddr_rxrpc *peer_srx, + struct sk_buff *skb) +{ + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + struct rxrpc_channel *chan; + struct rxrpc_call *call = NULL; + unsigned int channel; - if ((int)sp->hdr.serial - (int)conn->hi_serial > 0) - conn->hi_serial = sp->hdr.serial; + if (sp->hdr.securityIndex != conn->security_ix) + goto wrong_security; - /* Call-bound packets are routed by connection channel. */ - channel = sp->hdr.cid & RXRPC_CHANNELMASK; - chan = &conn->channels[channel]; + if (sp->hdr.serviceId != conn->service_id) { + int old_id; - /* Ignore really old calls */ - if (sp->hdr.callNumber < chan->last_call) { - rcu_read_unlock(); - return 0; - } + if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags)) + goto reupgrade; + old_id = cmpxchg(&conn->service_id, conn->orig_service_id, + sp->hdr.serviceId); - if (sp->hdr.callNumber == chan->last_call) { - if (chan->call || - sp->hdr.type == RXRPC_PACKET_TYPE_ABORT) { - rcu_read_unlock(); - return 0; - } + if (old_id != conn->orig_service_id && + old_id != sp->hdr.serviceId) + goto reupgrade; + } - /* For the previous service call, if completed - * successfully, we discard all further packets. - */ - if (rxrpc_conn_is_service(conn) && - chan->last_type == RXRPC_PACKET_TYPE_ACK) { - rcu_read_unlock(); - return 0; - } + if (after(sp->hdr.serial, conn->hi_serial)) + conn->hi_serial = sp->hdr.serial; - /* But otherwise we need to retransmit the final packet - * from data cached in the connection record. - */ - if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA) - trace_rxrpc_rx_data(chan->call_debug_id, - sp->hdr.seq, - sp->hdr.serial, - sp->hdr.flags); - conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input); - rcu_read_unlock(); - if (conn) { - rxrpc_post_packet_to_conn(conn, skb); - rxrpc_put_connection(conn, rxrpc_conn_put_call_input); - } + /* It's a connection-level packet if the call number is 0. */ + if (sp->hdr.callNumber == 0) + return rxrpc_input_conn_packet(conn, skb); + + /* Call-bound packets are routed by connection channel. */ + channel = sp->hdr.cid & RXRPC_CHANNELMASK; + chan = &conn->channels[channel]; + + /* Ignore really old calls */ + if (sp->hdr.callNumber < chan->last_call) + return 0; + + if (sp->hdr.callNumber == chan->last_call) { + if (chan->call || + sp->hdr.type == RXRPC_PACKET_TYPE_ABORT) return 0; - } - call = rcu_dereference(chan->call); + /* For the previous service call, if completed successfully, we + * discard all further packets. + */ + if (rxrpc_conn_is_service(conn) && + chan->last_type == RXRPC_PACKET_TYPE_ACK) + return 0; - if (sp->hdr.callNumber > chan->call_id) { - if (rxrpc_to_client(sp)) { - rcu_read_unlock(); - goto reject_packet; - } - if (call) { - rxrpc_input_implicit_end_call(conn, call); - chan->call = NULL; - call = NULL; - } - } + /* But otherwise we need to retransmit the final packet from + * data cached in the connection record. + */ + if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA) + trace_rxrpc_rx_data(chan->call_debug_id, + sp->hdr.seq, + sp->hdr.serial, + sp->hdr.flags); + rxrpc_input_conn_packet(conn, skb); + return 0; + } - if (call && !rxrpc_try_get_call(call, rxrpc_call_get_input)) - call = NULL; + rcu_read_lock(); + call = rxrpc_try_get_call(rcu_dereference(chan->call), + rxrpc_call_get_input); + rcu_read_unlock(); + + if (sp->hdr.callNumber > chan->call_id) { + if (rxrpc_to_client(sp)) { + rxrpc_put_call(call, rxrpc_call_put_input); + goto reject_packet; + } if (call) { - if (sp->hdr.serviceId != call->dest_srx.srx_service) - call->dest_srx.srx_service = sp->hdr.serviceId; - if ((int)sp->hdr.serial - (int)call->rx_serial > 0) - call->rx_serial = sp->hdr.serial; - if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags)) - set_bit(RXRPC_CALL_RX_HEARD, &call->flags); + rxrpc_implicit_end_call(call, skb); + rxrpc_put_call(call, rxrpc_call_put_input); + call = NULL; } } if (!call) { - if (rxrpc_to_client(sp) || - sp->hdr.type != RXRPC_PACKET_TYPE_DATA) { - rcu_read_unlock(); + if (rxrpc_to_client(sp)) goto bad_message; - } - if (sp->hdr.seq != 1) { - rcu_read_unlock(); + if (rxrpc_new_incoming_call(conn->local, conn->peer, conn, + peer_srx, skb)) return 0; - } - call = rxrpc_new_incoming_call(local, rx, &peer_srx, skb); - if (!call) { - rcu_read_unlock(); - goto reject_packet; - } + goto reject_packet; } - rcu_read_unlock(); - - /* Process a call packet. */ rxrpc_input_call_event(call, skb); rxrpc_put_call(call, rxrpc_call_put_input); - trace_rxrpc_rx_done(0, 0); return 0; wrong_security: - rcu_read_unlock(); trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, RXKADINCONSISTENCY, EBADMSG); skb->priority = RXKADINCONSISTENCY; goto post_abort; -unsupported_service: - trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, - RX_INVALID_OPERATION, EOPNOTSUPP); - skb->priority = RX_INVALID_OPERATION; - goto post_abort; - reupgrade: - rcu_read_unlock(); trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, RX_PROTOCOL_ERROR, EBADMSG); goto protocol_error; @@ -444,7 +412,7 @@ protocol_error: post_abort: skb->mark = RXRPC_SKB_MARK_REJECT_ABORT; reject_packet: - rxrpc_reject_packet(local, skb); + rxrpc_reject_packet(conn->local, skb); return 0; } @@ -479,6 +447,11 @@ int rxrpc_io_thread(void *data) continue; } + if (!list_empty(&local->ack_tx_queue)) { + rxrpc_transmit_ack_packets(local); + continue; + } + /* Process received packets and errors. */ if ((skb = __skb_dequeue(&rx_queue))) { switch (skb->mark) { |