summaryrefslogtreecommitdiff
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2020-11-18 18:37:14 +0300
committerIlya Dryomov <idryomov@gmail.com>2020-12-15 01:21:49 +0300
commit771294fe0724d92157048650f3585e7be606d0f8 (patch)
tree777cf2905c10b7518530b4b3ae25c6324448abf4 /net/ceph/messenger.c
parentfc4c128e15b50c73466dcd7234dde02f6fd9e4f8 (diff)
downloadlinux-771294fe0724d92157048650f3585e7be606d0f8.tar.xz
libceph: factor out ceph_con_get_out_msg()
Move the logic of grabbing the next message from the queue into its own function. Like ceph_con_in_msg_alloc(), this is protocol independent. Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c59
1 files changed, 39 insertions, 20 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index d161878bc342..a912f2df9a2e 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1304,6 +1304,8 @@ static void prepare_write_message_footer(struct ceph_connection *con)
con->out_msg_done = true;
}
+static void ceph_con_get_out_msg(struct ceph_connection *con);
+
/*
* Prepare headers for the next outgoing message.
*/
@@ -1325,26 +1327,8 @@ static void prepare_write_message(struct ceph_connection *con)
&con->out_temp_ack);
}
- BUG_ON(list_empty(&con->out_queue));
- m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
- con->out_msg = m;
- BUG_ON(m->con != con);
-
- /* put message on sent list */
- ceph_msg_get(m);
- list_move_tail(&m->list_head, &con->out_sent);
-
- /*
- * only assign outgoing seq # if we haven't sent this message
- * yet. if it is requeued, resend with it's original seq.
- */
- if (m->needs_out_seq) {
- m->hdr.seq = cpu_to_le64(++con->out_seq);
- m->needs_out_seq = false;
-
- if (con->ops->reencode_message)
- con->ops->reencode_message(m);
- }
+ ceph_con_get_out_msg(con);
+ m = con->out_msg;
dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
@@ -3118,6 +3102,8 @@ static void clear_standby(struct ceph_connection *con)
/*
* Queue up an outgoing message on the given connection.
+ *
+ * Consumes a ref on @msg.
*/
void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
{
@@ -3503,6 +3489,39 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con,
return ret;
}
+static void ceph_con_get_out_msg(struct ceph_connection *con)
+{
+ struct ceph_msg *msg;
+
+ BUG_ON(list_empty(&con->out_queue));
+ msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
+ WARN_ON(msg->con != con);
+
+ /*
+ * Put the message on "sent" list using a ref from ceph_con_send().
+ * It is put when the message is acked or revoked.
+ */
+ list_move_tail(&msg->list_head, &con->out_sent);
+
+ /*
+ * Only assign outgoing seq # if we haven't sent this message
+ * yet. If it is requeued, resend with it's original seq.
+ */
+ if (msg->needs_out_seq) {
+ msg->hdr.seq = cpu_to_le64(++con->out_seq);
+ msg->needs_out_seq = false;
+
+ if (con->ops->reencode_message)
+ con->ops->reencode_message(msg);
+ }
+
+ /*
+ * Get a ref for out_msg. It is put when we are done sending the
+ * message or in case of a fault.
+ */
+ WARN_ON(con->out_msg);
+ con->out_msg = ceph_msg_get(msg);
+}
/*
* Free a generically kmalloc'd message.