diff options
Diffstat (limited to 'net/rxrpc')
-rw-r--r-- | net/rxrpc/af_rxrpc.c | 29 | ||||
-rw-r--r-- | net/rxrpc/ar-internal.h | 23 | ||||
-rw-r--r-- | net/rxrpc/call_accept.c | 13 | ||||
-rw-r--r-- | net/rxrpc/call_object.c | 5 | ||||
-rw-r--r-- | net/rxrpc/conn_event.c | 1 | ||||
-rw-r--r-- | net/rxrpc/input.c | 10 | ||||
-rw-r--r-- | net/rxrpc/output.c | 2 | ||||
-rw-r--r-- | net/rxrpc/recvmsg.c | 191 | ||||
-rw-r--r-- | net/rxrpc/skbuff.c | 1 |
9 files changed, 214 insertions, 61 deletions
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c index e07c91acd904..32d544995dda 100644 --- a/net/rxrpc/af_rxrpc.c +++ b/net/rxrpc/af_rxrpc.c @@ -231,6 +231,8 @@ static int rxrpc_listen(struct socket *sock, int backlog) * @srx: The address of the peer to contact * @key: The security context to use (defaults to socket setting) * @user_call_ID: The ID to use + * @gfp: The allocation constraints + * @notify_rx: Where to send notifications instead of socket queue * * Allow a kernel service to begin a call on the nominated socket. This just * sets up all the internal tracking structures and allocates connection and @@ -243,7 +245,8 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock, struct sockaddr_rxrpc *srx, struct key *key, unsigned long user_call_ID, - gfp_t gfp) + gfp_t gfp, + rxrpc_notify_rx_t notify_rx) { struct rxrpc_conn_parameters cp; struct rxrpc_call *call; @@ -270,6 +273,8 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock, cp.exclusive = false; cp.service_id = srx->srx_service; call = rxrpc_new_client_call(rx, &cp, srx, user_call_ID, gfp); + if (!IS_ERR(call)) + call->notify_rx = notify_rx; release_sock(&rx->sk); _leave(" = %p", call); @@ -289,31 +294,27 @@ void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call) { _enter("%d{%d}", call->debug_id, atomic_read(&call->usage)); rxrpc_remove_user_ID(rxrpc_sk(sock->sk), call); + rxrpc_purge_queue(&call->knlrecv_queue); rxrpc_put_call(call); } EXPORT_SYMBOL(rxrpc_kernel_end_call); /** - * rxrpc_kernel_intercept_rx_messages - Intercept received RxRPC messages + * rxrpc_kernel_new_call_notification - Get notifications of new calls * @sock: The socket to intercept received messages on - * @interceptor: The function to pass the messages to + * @notify_new_call: Function to be called when new calls appear * - * Allow a kernel service to intercept messages heading for the Rx queue on an - * RxRPC socket. They get passed to the specified function instead. - * @interceptor should free the socket buffers it is given. @interceptor is - * called with the socket receive queue spinlock held and softirqs disabled - - * this ensures that the messages will be delivered in the right order. + * Allow a kernel service to be given notifications about new calls. */ -void rxrpc_kernel_intercept_rx_messages(struct socket *sock, - rxrpc_interceptor_t interceptor) +void rxrpc_kernel_new_call_notification( + struct socket *sock, + rxrpc_notify_new_call_t notify_new_call) { struct rxrpc_sock *rx = rxrpc_sk(sock->sk); - _enter(""); - rx->interceptor = interceptor; + rx->notify_new_call = notify_new_call; } - -EXPORT_SYMBOL(rxrpc_kernel_intercept_rx_messages); +EXPORT_SYMBOL(rxrpc_kernel_new_call_notification); /* * connect an RxRPC socket diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h index 0c320b2b7b43..4e86d248dc5e 100644 --- a/net/rxrpc/ar-internal.h +++ b/net/rxrpc/ar-internal.h @@ -40,6 +40,20 @@ struct rxrpc_crypt { struct rxrpc_connection; /* + * Mark applied to socket buffers. + */ +enum rxrpc_skb_mark { + RXRPC_SKB_MARK_DATA, /* data message */ + RXRPC_SKB_MARK_FINAL_ACK, /* final ACK received message */ + RXRPC_SKB_MARK_BUSY, /* server busy message */ + RXRPC_SKB_MARK_REMOTE_ABORT, /* remote abort message */ + RXRPC_SKB_MARK_LOCAL_ABORT, /* local abort message */ + RXRPC_SKB_MARK_NET_ERROR, /* network error message */ + RXRPC_SKB_MARK_LOCAL_ERROR, /* local error message */ + RXRPC_SKB_MARK_NEW_CALL, /* local error message */ +}; + +/* * sk_state for RxRPC sockets */ enum { @@ -57,7 +71,7 @@ enum { struct rxrpc_sock { /* WARNING: sk has to be the first member */ struct sock sk; - rxrpc_interceptor_t interceptor; /* kernel service Rx interceptor function */ + rxrpc_notify_new_call_t notify_new_call; /* Func to notify of new call */ struct rxrpc_local *local; /* local endpoint */ struct list_head listen_link; /* link in the local endpoint's listen list */ struct list_head secureq; /* calls awaiting connection security clearance */ @@ -367,6 +381,7 @@ enum rxrpc_call_flag { RXRPC_CALL_EXPECT_OOS, /* expect out of sequence packets */ RXRPC_CALL_IS_SERVICE, /* Call is service call */ RXRPC_CALL_EXPOSED, /* The call was exposed to the world */ + RXRPC_CALL_RX_NO_MORE, /* Don't indicate MSG_MORE from recvmsg() */ }; /* @@ -441,6 +456,7 @@ struct rxrpc_call { struct timer_list resend_timer; /* Tx resend timer */ struct work_struct destroyer; /* call destroyer */ struct work_struct processor; /* packet processor and ACK generator */ + rxrpc_notify_rx_t notify_rx; /* kernel service Rx notification function */ struct list_head link; /* link in master call list */ struct list_head chan_wait_link; /* Link in conn->waiting_calls */ struct hlist_node error_link; /* link in error distribution list */ @@ -448,6 +464,7 @@ struct rxrpc_call { struct rb_node sock_node; /* node in socket call tree */ struct sk_buff_head rx_queue; /* received packets */ struct sk_buff_head rx_oos_queue; /* packets received out of sequence */ + struct sk_buff_head knlrecv_queue; /* Queue for kernel_recv [TODO: replace this] */ struct sk_buff *tx_pending; /* Tx socket buffer being filled */ wait_queue_head_t waitq; /* Wait queue for channel or Tx */ __be32 crypto_buf[2]; /* Temporary packet crypto buffer */ @@ -512,7 +529,8 @@ extern struct workqueue_struct *rxrpc_workqueue; * call_accept.c */ void rxrpc_accept_incoming_calls(struct rxrpc_local *); -struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long); +struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long, + rxrpc_notify_rx_t); int rxrpc_reject_call(struct rxrpc_sock *); /* @@ -874,6 +892,7 @@ int rxrpc_init_server_conn_security(struct rxrpc_connection *); /* * skbuff.c */ +void rxrpc_kernel_data_consumed(struct rxrpc_call *, struct sk_buff *); void rxrpc_packet_destructor(struct sk_buff *); void rxrpc_new_skb(struct sk_buff *); void rxrpc_see_skb(struct sk_buff *); diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c index 03af88fe798b..68a439e30df1 100644 --- a/net/rxrpc/call_accept.c +++ b/net/rxrpc/call_accept.c @@ -286,7 +286,8 @@ security_mismatch: * - assign the user call ID to the call at the front of the queue */ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx, - unsigned long user_call_ID) + unsigned long user_call_ID, + rxrpc_notify_rx_t notify_rx) { struct rxrpc_call *call; struct rb_node *parent, **pp; @@ -340,6 +341,7 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx, } /* formalise the acceptance */ + call->notify_rx = notify_rx; call->user_call_ID = user_call_ID; rb_link_node(&call->sock_node, parent, pp); rb_insert_color(&call->sock_node, &rx->calls); @@ -437,17 +439,20 @@ out: * rxrpc_kernel_accept_call - Allow a kernel service to accept an incoming call * @sock: The socket on which the impending call is waiting * @user_call_ID: The tag to attach to the call + * @notify_rx: Where to send notifications instead of socket queue * * Allow a kernel service to accept an incoming call, assuming the incoming - * call is still valid. + * call is still valid. The caller should immediately trigger their own + * notification as there must be data waiting. */ struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *sock, - unsigned long user_call_ID) + unsigned long user_call_ID, + rxrpc_notify_rx_t notify_rx) { struct rxrpc_call *call; _enter(",%lx", user_call_ID); - call = rxrpc_accept_call(rxrpc_sk(sock->sk), user_call_ID); + call = rxrpc_accept_call(rxrpc_sk(sock->sk), user_call_ID, notify_rx); _leave(" = %p", call); return call; } diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c index 104ee8b1de06..516d8ea82f02 100644 --- a/net/rxrpc/call_object.c +++ b/net/rxrpc/call_object.c @@ -136,6 +136,7 @@ static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp) INIT_LIST_HEAD(&call->accept_link); skb_queue_head_init(&call->rx_queue); skb_queue_head_init(&call->rx_oos_queue); + skb_queue_head_init(&call->knlrecv_queue); init_waitqueue_head(&call->waitq); spin_lock_init(&call->lock); rwlock_init(&call->state_lock); @@ -552,8 +553,6 @@ void rxrpc_release_call(struct rxrpc_call *call) spin_lock_bh(&call->lock); } spin_unlock_bh(&call->lock); - - ASSERTCMP(call->state, !=, RXRPC_CALL_COMPLETE); } del_timer_sync(&call->resend_timer); @@ -682,6 +681,7 @@ static void rxrpc_rcu_destroy_call(struct rcu_head *rcu) struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu); rxrpc_purge_queue(&call->rx_queue); + rxrpc_purge_queue(&call->knlrecv_queue); rxrpc_put_peer(call->peer); kmem_cache_free(rxrpc_call_jar, call); } @@ -737,6 +737,7 @@ static void rxrpc_cleanup_call(struct rxrpc_call *call) rxrpc_purge_queue(&call->rx_queue); ASSERT(skb_queue_empty(&call->rx_oos_queue)); + rxrpc_purge_queue(&call->knlrecv_queue); sock_put(&call->socket->sk); call_rcu(&call->rcu, rxrpc_rcu_destroy_call); } diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c index bc9b05938ff5..9db90f4f768d 100644 --- a/net/rxrpc/conn_event.c +++ b/net/rxrpc/conn_event.c @@ -282,7 +282,6 @@ static int rxrpc_process_event(struct rxrpc_connection *conn, case RXRPC_PACKET_TYPE_DATA: case RXRPC_PACKET_TYPE_ACK: rxrpc_conn_retransmit_call(conn, skb); - rxrpc_free_skb(skb); return 0; case RXRPC_PACKET_TYPE_ABORT: diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c index 86bea9ad6c3d..72f016cfaaf5 100644 --- a/net/rxrpc/input.c +++ b/net/rxrpc/input.c @@ -90,9 +90,15 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb, } /* allow interception by a kernel service */ - if (rx->interceptor) { - rx->interceptor(sk, call->user_call_ID, skb); + if (skb->mark == RXRPC_SKB_MARK_NEW_CALL && + rx->notify_new_call) { spin_unlock_bh(&sk->sk_receive_queue.lock); + skb_queue_tail(&call->knlrecv_queue, skb); + rx->notify_new_call(&rx->sk); + } else if (call->notify_rx) { + spin_unlock_bh(&sk->sk_receive_queue.lock); + skb_queue_tail(&call->knlrecv_queue, skb); + call->notify_rx(&rx->sk, call, call->user_call_ID); } else { _net("post skb %p", skb); __skb_queue_tail(&sk->sk_receive_queue, skb); diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c index b1e708a12151..817ae801e769 100644 --- a/net/rxrpc/output.c +++ b/net/rxrpc/output.c @@ -190,7 +190,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) if (cmd == RXRPC_CMD_ACCEPT) { if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) return -EINVAL; - call = rxrpc_accept_call(rx, user_call_ID); + call = rxrpc_accept_call(rx, user_call_ID, NULL); if (IS_ERR(call)) return PTR_ERR(call); rxrpc_put_call(call); diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c index c9b38c7fb448..0ab7b334bab1 100644 --- a/net/rxrpc/recvmsg.c +++ b/net/rxrpc/recvmsg.c @@ -369,55 +369,178 @@ wait_error: } -/** - * rxrpc_kernel_is_data_last - Determine if data message is last one - * @skb: Message holding data +/* + * Deliver messages to a call. This keeps processing packets until the buffer + * is filled and we find either more DATA (returns 0) or the end of the DATA + * (returns 1). If more packets are required, it returns -EAGAIN. * - * Determine if data message is last one for the parent call. + * TODO: Note that this is hacked in at the moment and will be replaced. */ -bool rxrpc_kernel_is_data_last(struct sk_buff *skb) +static int temp_deliver_data(struct socket *sock, struct rxrpc_call *call, + struct iov_iter *iter, size_t size, + size_t *_offset) { - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + struct rxrpc_skb_priv *sp; + struct sk_buff *skb; + size_t remain; + int ret, copy; + + _enter("%d", call->debug_id); + +next: + local_bh_disable(); + skb = skb_dequeue(&call->knlrecv_queue); + local_bh_enable(); + if (!skb) { + if (test_bit(RXRPC_CALL_RX_NO_MORE, &call->flags)) + return 1; + _leave(" = -EAGAIN [empty]"); + return -EAGAIN; + } - ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA); + sp = rxrpc_skb(skb); + _debug("dequeued %p %u/%zu", skb, sp->offset, size); - return sp->hdr.flags & RXRPC_LAST_PACKET; -} + switch (skb->mark) { + case RXRPC_SKB_MARK_DATA: + remain = size - *_offset; + if (remain > 0) { + copy = skb->len - sp->offset; + if (copy > remain) + copy = remain; + ret = skb_copy_datagram_iter(skb, sp->offset, iter, + copy); + if (ret < 0) + goto requeue_and_leave; -EXPORT_SYMBOL(rxrpc_kernel_is_data_last); + /* handle piecemeal consumption of data packets */ + sp->offset += copy; + *_offset += copy; + } -/** - * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message - * @skb: Message indicating an abort - * - * Get the abort code from an RxRPC abort message. - */ -u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb) -{ - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + if (sp->offset < skb->len) + goto partially_used_skb; + + /* We consumed the whole packet */ + ASSERTCMP(sp->offset, ==, skb->len); + if (sp->hdr.flags & RXRPC_LAST_PACKET) + set_bit(RXRPC_CALL_RX_NO_MORE, &call->flags); + rxrpc_kernel_data_consumed(call, skb); + rxrpc_free_skb(skb); + goto next; - switch (skb->mark) { - case RXRPC_SKB_MARK_REMOTE_ABORT: - case RXRPC_SKB_MARK_LOCAL_ABORT: - return sp->call->abort_code; default: - BUG(); + rxrpc_free_skb(skb); + goto next; } -} -EXPORT_SYMBOL(rxrpc_kernel_get_abort_code); +partially_used_skb: + ASSERTCMP(*_offset, ==, size); + ret = 0; +requeue_and_leave: + skb_queue_head(&call->knlrecv_queue, skb); + return ret; +} /** - * rxrpc_kernel_get_error - Get the error number from an RxRPC error message - * @skb: Message indicating an error + * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info + * @sock: The socket that the call exists on + * @call: The call to send data through + * @buf: The buffer to receive into + * @size: The size of the buffer, including data already read + * @_offset: The running offset into the buffer. + * @want_more: True if more data is expected to be read + * @_abort: Where the abort code is stored if -ECONNABORTED is returned + * + * Allow a kernel service to receive data and pick up information about the + * state of a call. Returns 0 if got what was asked for and there's more + * available, 1 if we got what was asked for and we're at the end of the data + * and -EAGAIN if we need more data. + * + * Note that we may return -EAGAIN to drain empty packets at the end of the + * data, even if we've already copied over the requested data. * - * Get the error number from an RxRPC error message. + * This function adds the amount it transfers to *_offset, so this should be + * precleared as appropriate. Note that the amount remaining in the buffer is + * taken to be size - *_offset. + * + * *_abort should also be initialised to 0. */ -int rxrpc_kernel_get_error_number(struct sk_buff *skb) +int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, + void *buf, size_t size, size_t *_offset, + bool want_more, u32 *_abort) { - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + struct iov_iter iter; + struct kvec iov; + int ret; - return sp->error; -} + _enter("{%d,%s},%zu,%d", + call->debug_id, rxrpc_call_states[call->state], size, want_more); + + ASSERTCMP(*_offset, <=, size); + ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING); -EXPORT_SYMBOL(rxrpc_kernel_get_error_number); + iov.iov_base = buf + *_offset; + iov.iov_len = size - *_offset; + iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset); + + lock_sock(sock->sk); + + switch (call->state) { + case RXRPC_CALL_CLIENT_RECV_REPLY: + case RXRPC_CALL_SERVER_RECV_REQUEST: + case RXRPC_CALL_SERVER_ACK_REQUEST: + ret = temp_deliver_data(sock, call, &iter, size, _offset); + if (ret < 0) + goto out; + + /* We can only reach here with a partially full buffer if we + * have reached the end of the data. We must otherwise have a + * full buffer or have been given -EAGAIN. + */ + if (ret == 1) { + if (*_offset < size) + goto short_data; + if (!want_more) + goto read_phase_complete; + ret = 0; + goto out; + } + + if (!want_more) + goto excess_data; + goto out; + + case RXRPC_CALL_COMPLETE: + goto call_complete; + + default: + *_offset = 0; + ret = -EINPROGRESS; + goto out; + } + +read_phase_complete: + ret = 1; +out: + release_sock(sock->sk); + _leave(" = %d [%zu,%d]", ret, *_offset, *_abort); + return ret; + +short_data: + ret = -EBADMSG; + goto out; +excess_data: + ret = -EMSGSIZE; + goto out; +call_complete: + *_abort = call->abort_code; + ret = call->error; + if (call->completion == RXRPC_CALL_SUCCEEDED) { + ret = 1; + if (size > 0) + ret = -ECONNRESET; + } + goto out; +} +EXPORT_SYMBOL(rxrpc_kernel_recv_data); diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c index 20529205bb8c..9752f8b1fdd0 100644 --- a/net/rxrpc/skbuff.c +++ b/net/rxrpc/skbuff.c @@ -127,7 +127,6 @@ void rxrpc_kernel_data_consumed(struct rxrpc_call *call, struct sk_buff *skb) call->rx_data_recv = sp->hdr.seq; rxrpc_hard_ACK_data(call, skb); } -EXPORT_SYMBOL(rxrpc_kernel_data_consumed); /* * Destroy a packet that has an RxRPC control buffer |