summaryrefslogtreecommitdiff
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c94
1 files changed, 68 insertions, 26 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index e15a82ccc05f..c340e2e0765b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -76,7 +76,8 @@ const char *ceph_pr_addr(const struct sockaddr_storage *ss)
break;
default:
- sprintf(s, "(unknown sockaddr family %d)", (int)ss->ss_family);
+ snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)",
+ (int)ss->ss_family);
}
return s;
@@ -485,13 +486,10 @@ static void prepare_write_message(struct ceph_connection *con)
m = list_first_entry(&con->out_queue,
struct ceph_msg, list_head);
con->out_msg = m;
- if (test_bit(LOSSYTX, &con->state)) {
- list_del_init(&m->list_head);
- } else {
- /* put message on sent list */
- ceph_msg_get(m);
- list_move_tail(&m->list_head, &con->out_sent);
- }
+
+ /* put message on sent list */
+ ceph_msg_get(m);
+ list_move_tail(&m->list_head, &con->out_sent);
/*
* only assign outgoing seq # if we haven't sent this message
@@ -598,7 +596,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
* Connection negotiation.
*/
-static void prepare_connect_authorizer(struct ceph_connection *con)
+static int prepare_connect_authorizer(struct ceph_connection *con)
{
void *auth_buf;
int auth_len = 0;
@@ -612,13 +610,20 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
con->auth_retry);
mutex_lock(&con->mutex);
+ if (test_bit(CLOSED, &con->state) ||
+ test_bit(OPENING, &con->state))
+ return -EAGAIN;
+
con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
con->out_connect.authorizer_len = cpu_to_le32(auth_len);
- con->out_kvec[con->out_kvec_left].iov_base = auth_buf;
- con->out_kvec[con->out_kvec_left].iov_len = auth_len;
- con->out_kvec_left++;
- con->out_kvec_bytes += auth_len;
+ if (auth_len) {
+ con->out_kvec[con->out_kvec_left].iov_base = auth_buf;
+ con->out_kvec[con->out_kvec_left].iov_len = auth_len;
+ con->out_kvec_left++;
+ con->out_kvec_bytes += auth_len;
+ }
+ return 0;
}
/*
@@ -640,9 +645,9 @@ static void prepare_write_banner(struct ceph_messenger *msgr,
set_bit(WRITE_PENDING, &con->state);
}
-static void prepare_write_connect(struct ceph_messenger *msgr,
- struct ceph_connection *con,
- int after_banner)
+static int prepare_write_connect(struct ceph_messenger *msgr,
+ struct ceph_connection *con,
+ int after_banner)
{
unsigned global_seq = get_global_seq(con->msgr, 0);
int proto;
@@ -683,7 +688,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
- prepare_connect_authorizer(con);
+ return prepare_connect_authorizer(con);
}
@@ -1065,8 +1070,10 @@ static void addr_set_port(struct sockaddr_storage *ss, int p)
switch (ss->ss_family) {
case AF_INET:
((struct sockaddr_in *)ss)->sin_port = htons(p);
+ break;
case AF_INET6:
((struct sockaddr_in6 *)ss)->sin6_port = htons(p);
+ break;
}
}
@@ -1216,6 +1223,7 @@ static int process_connect(struct ceph_connection *con)
u64 sup_feat = con->msgr->supported_features;
u64 req_feat = con->msgr->required_features;
u64 server_feat = le64_to_cpu(con->in_reply.features);
+ int ret;
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -1250,7 +1258,9 @@ static int process_connect(struct ceph_connection *con)
return -1;
}
con->auth_retry = 1;
- prepare_write_connect(con->msgr, con, 0);
+ ret = prepare_write_connect(con->msgr, con, 0);
+ if (ret < 0)
+ return ret;
prepare_read_connect(con);
break;
@@ -1277,6 +1287,9 @@ static int process_connect(struct ceph_connection *con)
if (con->ops->peer_reset)
con->ops->peer_reset(con);
mutex_lock(&con->mutex);
+ if (test_bit(CLOSED, &con->state) ||
+ test_bit(OPENING, &con->state))
+ return -EAGAIN;
break;
case CEPH_MSGR_TAG_RETRY_SESSION:
@@ -1341,7 +1354,9 @@ static int process_connect(struct ceph_connection *con)
* to WAIT. This shouldn't happen if we are the
* client.
*/
- pr_err("process_connect peer connecting WAIT\n");
+ pr_err("process_connect got WAIT as client\n");
+ con->error_msg = "protocol error, got WAIT as client";
+ return -1;
default:
pr_err("connect protocol error, will retry\n");
@@ -1381,6 +1396,7 @@ static void process_ack(struct ceph_connection *con)
break;
dout("got ack for seq %llu type %d at %p\n", seq,
le16_to_cpu(m->hdr.type), m);
+ m->ack_stamp = jiffies;
ceph_msg_remove(m);
}
prepare_read_tag(con);
@@ -1810,6 +1826,17 @@ static int try_read(struct ceph_connection *con)
more:
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
con->in_base_pos);
+
+ /*
+ * process_connect and process_message drop and re-take
+ * con->mutex. make sure we handle a racing close or reopen.
+ */
+ if (test_bit(CLOSED, &con->state) ||
+ test_bit(OPENING, &con->state)) {
+ ret = -EAGAIN;
+ goto out;
+ }
+
if (test_bit(CONNECTING, &con->state)) {
if (!test_bit(NEGOTIATING, &con->state)) {
dout("try_read connecting\n");
@@ -1938,8 +1965,10 @@ static void con_work(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
+ int ret;
mutex_lock(&con->mutex);
+restart:
if (test_and_clear_bit(BACKOFF, &con->state)) {
dout("con_work %p backing off\n", con);
if (queue_delayed_work(ceph_msgr_wq, &con->work,
@@ -1969,18 +1998,31 @@ static void con_work(struct work_struct *work)
con_close_socket(con);
}
- if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
- try_read(con) < 0 ||
- try_write(con) < 0) {
- mutex_unlock(&con->mutex);
- ceph_fault(con); /* error/fault path */
- goto done_unlocked;
- }
+ if (test_and_clear_bit(SOCK_CLOSED, &con->state))
+ goto fault;
+
+ ret = try_read(con);
+ if (ret == -EAGAIN)
+ goto restart;
+ if (ret < 0)
+ goto fault;
+
+ ret = try_write(con);
+ if (ret == -EAGAIN)
+ goto restart;
+ if (ret < 0)
+ goto fault;
done:
mutex_unlock(&con->mutex);
done_unlocked:
con->ops->put(con);
+ return;
+
+fault:
+ mutex_unlock(&con->mutex);
+ ceph_fault(con); /* error/fault path */
+ goto done_unlocked;
}