summaryrefslogtreecommitdiff
path: root/net/rxrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/rxrpc')
-rw-r--r--net/rxrpc/af_rxrpc.c29
-rw-r--r--net/rxrpc/ar-internal.h23
-rw-r--r--net/rxrpc/call_accept.c13
-rw-r--r--net/rxrpc/call_object.c5
-rw-r--r--net/rxrpc/conn_event.c1
-rw-r--r--net/rxrpc/input.c10
-rw-r--r--net/rxrpc/output.c2
-rw-r--r--net/rxrpc/recvmsg.c191
-rw-r--r--net/rxrpc/skbuff.c1
9 files changed, 214 insertions, 61 deletions
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index e07c91acd904..32d544995dda 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -231,6 +231,8 @@ static int rxrpc_listen(struct socket *sock, int backlog)
* @srx: The address of the peer to contact
* @key: The security context to use (defaults to socket setting)
* @user_call_ID: The ID to use
+ * @gfp: The allocation constraints
+ * @notify_rx: Where to send notifications instead of socket queue
*
* Allow a kernel service to begin a call on the nominated socket. This just
* sets up all the internal tracking structures and allocates connection and
@@ -243,7 +245,8 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
struct sockaddr_rxrpc *srx,
struct key *key,
unsigned long user_call_ID,
- gfp_t gfp)
+ gfp_t gfp,
+ rxrpc_notify_rx_t notify_rx)
{
struct rxrpc_conn_parameters cp;
struct rxrpc_call *call;
@@ -270,6 +273,8 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
cp.exclusive = false;
cp.service_id = srx->srx_service;
call = rxrpc_new_client_call(rx, &cp, srx, user_call_ID, gfp);
+ if (!IS_ERR(call))
+ call->notify_rx = notify_rx;
release_sock(&rx->sk);
_leave(" = %p", call);
@@ -289,31 +294,27 @@ void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call)
{
_enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
rxrpc_remove_user_ID(rxrpc_sk(sock->sk), call);
+ rxrpc_purge_queue(&call->knlrecv_queue);
rxrpc_put_call(call);
}
EXPORT_SYMBOL(rxrpc_kernel_end_call);
/**
- * rxrpc_kernel_intercept_rx_messages - Intercept received RxRPC messages
+ * rxrpc_kernel_new_call_notification - Get notifications of new calls
* @sock: The socket to intercept received messages on
- * @interceptor: The function to pass the messages to
+ * @notify_new_call: Function to be called when new calls appear
*
- * Allow a kernel service to intercept messages heading for the Rx queue on an
- * RxRPC socket. They get passed to the specified function instead.
- * @interceptor should free the socket buffers it is given. @interceptor is
- * called with the socket receive queue spinlock held and softirqs disabled -
- * this ensures that the messages will be delivered in the right order.
+ * Allow a kernel service to be given notifications about new calls.
*/
-void rxrpc_kernel_intercept_rx_messages(struct socket *sock,
- rxrpc_interceptor_t interceptor)
+void rxrpc_kernel_new_call_notification(
+ struct socket *sock,
+ rxrpc_notify_new_call_t notify_new_call)
{
struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
- _enter("");
- rx->interceptor = interceptor;
+ rx->notify_new_call = notify_new_call;
}
-
-EXPORT_SYMBOL(rxrpc_kernel_intercept_rx_messages);
+EXPORT_SYMBOL(rxrpc_kernel_new_call_notification);
/*
* connect an RxRPC socket
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 0c320b2b7b43..4e86d248dc5e 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -40,6 +40,20 @@ struct rxrpc_crypt {
struct rxrpc_connection;
/*
+ * Mark applied to socket buffers.
+ */
+enum rxrpc_skb_mark {
+ RXRPC_SKB_MARK_DATA, /* data message */
+ RXRPC_SKB_MARK_FINAL_ACK, /* final ACK received message */
+ RXRPC_SKB_MARK_BUSY, /* server busy message */
+ RXRPC_SKB_MARK_REMOTE_ABORT, /* remote abort message */
+ RXRPC_SKB_MARK_LOCAL_ABORT, /* local abort message */
+ RXRPC_SKB_MARK_NET_ERROR, /* network error message */
+ RXRPC_SKB_MARK_LOCAL_ERROR, /* local error message */
+ RXRPC_SKB_MARK_NEW_CALL, /* local error message */
+};
+
+/*
* sk_state for RxRPC sockets
*/
enum {
@@ -57,7 +71,7 @@ enum {
struct rxrpc_sock {
/* WARNING: sk has to be the first member */
struct sock sk;
- rxrpc_interceptor_t interceptor; /* kernel service Rx interceptor function */
+ rxrpc_notify_new_call_t notify_new_call; /* Func to notify of new call */
struct rxrpc_local *local; /* local endpoint */
struct list_head listen_link; /* link in the local endpoint's listen list */
struct list_head secureq; /* calls awaiting connection security clearance */
@@ -367,6 +381,7 @@ enum rxrpc_call_flag {
RXRPC_CALL_EXPECT_OOS, /* expect out of sequence packets */
RXRPC_CALL_IS_SERVICE, /* Call is service call */
RXRPC_CALL_EXPOSED, /* The call was exposed to the world */
+ RXRPC_CALL_RX_NO_MORE, /* Don't indicate MSG_MORE from recvmsg() */
};
/*
@@ -441,6 +456,7 @@ struct rxrpc_call {
struct timer_list resend_timer; /* Tx resend timer */
struct work_struct destroyer; /* call destroyer */
struct work_struct processor; /* packet processor and ACK generator */
+ rxrpc_notify_rx_t notify_rx; /* kernel service Rx notification function */
struct list_head link; /* link in master call list */
struct list_head chan_wait_link; /* Link in conn->waiting_calls */
struct hlist_node error_link; /* link in error distribution list */
@@ -448,6 +464,7 @@ struct rxrpc_call {
struct rb_node sock_node; /* node in socket call tree */
struct sk_buff_head rx_queue; /* received packets */
struct sk_buff_head rx_oos_queue; /* packets received out of sequence */
+ struct sk_buff_head knlrecv_queue; /* Queue for kernel_recv [TODO: replace this] */
struct sk_buff *tx_pending; /* Tx socket buffer being filled */
wait_queue_head_t waitq; /* Wait queue for channel or Tx */
__be32 crypto_buf[2]; /* Temporary packet crypto buffer */
@@ -512,7 +529,8 @@ extern struct workqueue_struct *rxrpc_workqueue;
* call_accept.c
*/
void rxrpc_accept_incoming_calls(struct rxrpc_local *);
-struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long);
+struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long,
+ rxrpc_notify_rx_t);
int rxrpc_reject_call(struct rxrpc_sock *);
/*
@@ -874,6 +892,7 @@ int rxrpc_init_server_conn_security(struct rxrpc_connection *);
/*
* skbuff.c
*/
+void rxrpc_kernel_data_consumed(struct rxrpc_call *, struct sk_buff *);
void rxrpc_packet_destructor(struct sk_buff *);
void rxrpc_new_skb(struct sk_buff *);
void rxrpc_see_skb(struct sk_buff *);
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 03af88fe798b..68a439e30df1 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -286,7 +286,8 @@ security_mismatch:
* - assign the user call ID to the call at the front of the queue
*/
struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
- unsigned long user_call_ID)
+ unsigned long user_call_ID,
+ rxrpc_notify_rx_t notify_rx)
{
struct rxrpc_call *call;
struct rb_node *parent, **pp;
@@ -340,6 +341,7 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
}
/* formalise the acceptance */
+ call->notify_rx = notify_rx;
call->user_call_ID = user_call_ID;
rb_link_node(&call->sock_node, parent, pp);
rb_insert_color(&call->sock_node, &rx->calls);
@@ -437,17 +439,20 @@ out:
* rxrpc_kernel_accept_call - Allow a kernel service to accept an incoming call
* @sock: The socket on which the impending call is waiting
* @user_call_ID: The tag to attach to the call
+ * @notify_rx: Where to send notifications instead of socket queue
*
* Allow a kernel service to accept an incoming call, assuming the incoming
- * call is still valid.
+ * call is still valid. The caller should immediately trigger their own
+ * notification as there must be data waiting.
*/
struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *sock,
- unsigned long user_call_ID)
+ unsigned long user_call_ID,
+ rxrpc_notify_rx_t notify_rx)
{
struct rxrpc_call *call;
_enter(",%lx", user_call_ID);
- call = rxrpc_accept_call(rxrpc_sk(sock->sk), user_call_ID);
+ call = rxrpc_accept_call(rxrpc_sk(sock->sk), user_call_ID, notify_rx);
_leave(" = %p", call);
return call;
}
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 104ee8b1de06..516d8ea82f02 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -136,6 +136,7 @@ static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
INIT_LIST_HEAD(&call->accept_link);
skb_queue_head_init(&call->rx_queue);
skb_queue_head_init(&call->rx_oos_queue);
+ skb_queue_head_init(&call->knlrecv_queue);
init_waitqueue_head(&call->waitq);
spin_lock_init(&call->lock);
rwlock_init(&call->state_lock);
@@ -552,8 +553,6 @@ void rxrpc_release_call(struct rxrpc_call *call)
spin_lock_bh(&call->lock);
}
spin_unlock_bh(&call->lock);
-
- ASSERTCMP(call->state, !=, RXRPC_CALL_COMPLETE);
}
del_timer_sync(&call->resend_timer);
@@ -682,6 +681,7 @@ static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
rxrpc_purge_queue(&call->rx_queue);
+ rxrpc_purge_queue(&call->knlrecv_queue);
rxrpc_put_peer(call->peer);
kmem_cache_free(rxrpc_call_jar, call);
}
@@ -737,6 +737,7 @@ static void rxrpc_cleanup_call(struct rxrpc_call *call)
rxrpc_purge_queue(&call->rx_queue);
ASSERT(skb_queue_empty(&call->rx_oos_queue));
+ rxrpc_purge_queue(&call->knlrecv_queue);
sock_put(&call->socket->sk);
call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
}
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index bc9b05938ff5..9db90f4f768d 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -282,7 +282,6 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
case RXRPC_PACKET_TYPE_DATA:
case RXRPC_PACKET_TYPE_ACK:
rxrpc_conn_retransmit_call(conn, skb);
- rxrpc_free_skb(skb);
return 0;
case RXRPC_PACKET_TYPE_ABORT:
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 86bea9ad6c3d..72f016cfaaf5 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -90,9 +90,15 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
}
/* allow interception by a kernel service */
- if (rx->interceptor) {
- rx->interceptor(sk, call->user_call_ID, skb);
+ if (skb->mark == RXRPC_SKB_MARK_NEW_CALL &&
+ rx->notify_new_call) {
spin_unlock_bh(&sk->sk_receive_queue.lock);
+ skb_queue_tail(&call->knlrecv_queue, skb);
+ rx->notify_new_call(&rx->sk);
+ } else if (call->notify_rx) {
+ spin_unlock_bh(&sk->sk_receive_queue.lock);
+ skb_queue_tail(&call->knlrecv_queue, skb);
+ call->notify_rx(&rx->sk, call, call->user_call_ID);
} else {
_net("post skb %p", skb);
__skb_queue_tail(&sk->sk_receive_queue, skb);
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index b1e708a12151..817ae801e769 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -190,7 +190,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
if (cmd == RXRPC_CMD_ACCEPT) {
if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
return -EINVAL;
- call = rxrpc_accept_call(rx, user_call_ID);
+ call = rxrpc_accept_call(rx, user_call_ID, NULL);
if (IS_ERR(call))
return PTR_ERR(call);
rxrpc_put_call(call);
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index c9b38c7fb448..0ab7b334bab1 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -369,55 +369,178 @@ wait_error:
}
-/**
- * rxrpc_kernel_is_data_last - Determine if data message is last one
- * @skb: Message holding data
+/*
+ * Deliver messages to a call. This keeps processing packets until the buffer
+ * is filled and we find either more DATA (returns 0) or the end of the DATA
+ * (returns 1). If more packets are required, it returns -EAGAIN.
*
- * Determine if data message is last one for the parent call.
+ * TODO: Note that this is hacked in at the moment and will be replaced.
*/
-bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
+static int temp_deliver_data(struct socket *sock, struct rxrpc_call *call,
+ struct iov_iter *iter, size_t size,
+ size_t *_offset)
{
- struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ struct rxrpc_skb_priv *sp;
+ struct sk_buff *skb;
+ size_t remain;
+ int ret, copy;
+
+ _enter("%d", call->debug_id);
+
+next:
+ local_bh_disable();
+ skb = skb_dequeue(&call->knlrecv_queue);
+ local_bh_enable();
+ if (!skb) {
+ if (test_bit(RXRPC_CALL_RX_NO_MORE, &call->flags))
+ return 1;
+ _leave(" = -EAGAIN [empty]");
+ return -EAGAIN;
+ }
- ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);
+ sp = rxrpc_skb(skb);
+ _debug("dequeued %p %u/%zu", skb, sp->offset, size);
- return sp->hdr.flags & RXRPC_LAST_PACKET;
-}
+ switch (skb->mark) {
+ case RXRPC_SKB_MARK_DATA:
+ remain = size - *_offset;
+ if (remain > 0) {
+ copy = skb->len - sp->offset;
+ if (copy > remain)
+ copy = remain;
+ ret = skb_copy_datagram_iter(skb, sp->offset, iter,
+ copy);
+ if (ret < 0)
+ goto requeue_and_leave;
-EXPORT_SYMBOL(rxrpc_kernel_is_data_last);
+ /* handle piecemeal consumption of data packets */
+ sp->offset += copy;
+ *_offset += copy;
+ }
-/**
- * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
- * @skb: Message indicating an abort
- *
- * Get the abort code from an RxRPC abort message.
- */
-u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
-{
- struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ if (sp->offset < skb->len)
+ goto partially_used_skb;
+
+ /* We consumed the whole packet */
+ ASSERTCMP(sp->offset, ==, skb->len);
+ if (sp->hdr.flags & RXRPC_LAST_PACKET)
+ set_bit(RXRPC_CALL_RX_NO_MORE, &call->flags);
+ rxrpc_kernel_data_consumed(call, skb);
+ rxrpc_free_skb(skb);
+ goto next;
- switch (skb->mark) {
- case RXRPC_SKB_MARK_REMOTE_ABORT:
- case RXRPC_SKB_MARK_LOCAL_ABORT:
- return sp->call->abort_code;
default:
- BUG();
+ rxrpc_free_skb(skb);
+ goto next;
}
-}
-EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);
+partially_used_skb:
+ ASSERTCMP(*_offset, ==, size);
+ ret = 0;
+requeue_and_leave:
+ skb_queue_head(&call->knlrecv_queue, skb);
+ return ret;
+}
/**
- * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
- * @skb: Message indicating an error
+ * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
+ * @sock: The socket that the call exists on
+ * @call: The call to send data through
+ * @buf: The buffer to receive into
+ * @size: The size of the buffer, including data already read
+ * @_offset: The running offset into the buffer.
+ * @want_more: True if more data is expected to be read
+ * @_abort: Where the abort code is stored if -ECONNABORTED is returned
+ *
+ * Allow a kernel service to receive data and pick up information about the
+ * state of a call. Returns 0 if got what was asked for and there's more
+ * available, 1 if we got what was asked for and we're at the end of the data
+ * and -EAGAIN if we need more data.
+ *
+ * Note that we may return -EAGAIN to drain empty packets at the end of the
+ * data, even if we've already copied over the requested data.
*
- * Get the error number from an RxRPC error message.
+ * This function adds the amount it transfers to *_offset, so this should be
+ * precleared as appropriate. Note that the amount remaining in the buffer is
+ * taken to be size - *_offset.
+ *
+ * *_abort should also be initialised to 0.
*/
-int rxrpc_kernel_get_error_number(struct sk_buff *skb)
+int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
+ void *buf, size_t size, size_t *_offset,
+ bool want_more, u32 *_abort)
{
- struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ struct iov_iter iter;
+ struct kvec iov;
+ int ret;
- return sp->error;
-}
+ _enter("{%d,%s},%zu,%d",
+ call->debug_id, rxrpc_call_states[call->state], size, want_more);
+
+ ASSERTCMP(*_offset, <=, size);
+ ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
-EXPORT_SYMBOL(rxrpc_kernel_get_error_number);
+ iov.iov_base = buf + *_offset;
+ iov.iov_len = size - *_offset;
+ iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
+
+ lock_sock(sock->sk);
+
+ switch (call->state) {
+ case RXRPC_CALL_CLIENT_RECV_REPLY:
+ case RXRPC_CALL_SERVER_RECV_REQUEST:
+ case RXRPC_CALL_SERVER_ACK_REQUEST:
+ ret = temp_deliver_data(sock, call, &iter, size, _offset);
+ if (ret < 0)
+ goto out;
+
+ /* We can only reach here with a partially full buffer if we
+ * have reached the end of the data. We must otherwise have a
+ * full buffer or have been given -EAGAIN.
+ */
+ if (ret == 1) {
+ if (*_offset < size)
+ goto short_data;
+ if (!want_more)
+ goto read_phase_complete;
+ ret = 0;
+ goto out;
+ }
+
+ if (!want_more)
+ goto excess_data;
+ goto out;
+
+ case RXRPC_CALL_COMPLETE:
+ goto call_complete;
+
+ default:
+ *_offset = 0;
+ ret = -EINPROGRESS;
+ goto out;
+ }
+
+read_phase_complete:
+ ret = 1;
+out:
+ release_sock(sock->sk);
+ _leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
+ return ret;
+
+short_data:
+ ret = -EBADMSG;
+ goto out;
+excess_data:
+ ret = -EMSGSIZE;
+ goto out;
+call_complete:
+ *_abort = call->abort_code;
+ ret = call->error;
+ if (call->completion == RXRPC_CALL_SUCCEEDED) {
+ ret = 1;
+ if (size > 0)
+ ret = -ECONNRESET;
+ }
+ goto out;
+}
+EXPORT_SYMBOL(rxrpc_kernel_recv_data);
diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c
index 20529205bb8c..9752f8b1fdd0 100644
--- a/net/rxrpc/skbuff.c
+++ b/net/rxrpc/skbuff.c
@@ -127,7 +127,6 @@ void rxrpc_kernel_data_consumed(struct rxrpc_call *call, struct sk_buff *skb)
call->rx_data_recv = sp->hdr.seq;
rxrpc_hard_ACK_data(call, skb);
}
-EXPORT_SYMBOL(rxrpc_kernel_data_consumed);
/*
* Destroy a packet that has an RxRPC control buffer