diff options
Diffstat (limited to 'fs/ceph/messenger.c')
-rw-r--r-- | fs/ceph/messenger.c | 142 |
1 files changed, 82 insertions, 60 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index 1360708d7505..25de15c006b1 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c @@ -1279,8 +1279,34 @@ static void process_ack(struct ceph_connection *con) +static int read_partial_message_section(struct ceph_connection *con, + struct kvec *section, unsigned int sec_len, + u32 *crc) +{ + int left; + int ret; + + BUG_ON(!section); + + while (section->iov_len < sec_len) { + BUG_ON(section->iov_base == NULL); + left = sec_len - section->iov_len; + ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + + section->iov_len, left); + if (ret <= 0) + return ret; + section->iov_len += ret; + if (section->iov_len == sec_len) + *crc = crc32c(0, section->iov_base, + section->iov_len); + } + return 1; +} +static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, + struct ceph_msg_header *hdr, + int *skip); /* * read (part of) a message. */ @@ -1292,6 +1318,7 @@ static int read_partial_message(struct ceph_connection *con) int to, want, left; unsigned front_len, middle_len, data_len, data_off; int datacrc = con->msgr->nocrc; + int skip; dout("read_partial_message con %p msg %p\n", con, m); @@ -1315,7 +1342,6 @@ static int read_partial_message(struct ceph_connection *con) } } } - front_len = le32_to_cpu(con->in_hdr.front_len); if (front_len > CEPH_MSG_MAX_FRONT_LEN) return -EIO; @@ -1330,8 +1356,8 @@ static int read_partial_message(struct ceph_connection *con) if (!con->in_msg) { dout("got hdr type %d front %d data %d\n", con->in_hdr.type, con->in_hdr.front_len, con->in_hdr.data_len); - con->in_msg = con->ops->alloc_msg(con, &con->in_hdr); - if (!con->in_msg) { + con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); + if (skip) { /* skip this message */ pr_err("alloc_msg returned NULL, skipping message\n"); con->in_base_pos = -front_len - middle_len - data_len - @@ -1342,56 +1368,28 @@ static int read_partial_message(struct ceph_connection *con) if (IS_ERR(con->in_msg)) { ret = PTR_ERR(con->in_msg); con->in_msg = NULL; - con->error_msg = "out of memory for incoming message"; + con->error_msg = "error allocating memory for incoming message"; return ret; } m = con->in_msg; m->front.iov_len = 0; /* haven't read it yet */ + if (m->middle) + m->middle->vec.iov_len = 0; memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr)); } /* front */ - while (m->front.iov_len < front_len) { - BUG_ON(m->front.iov_base == NULL); - left = front_len - m->front.iov_len; - ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base + - m->front.iov_len, left); - if (ret <= 0) - return ret; - m->front.iov_len += ret; - if (m->front.iov_len == front_len) - con->in_front_crc = crc32c(0, m->front.iov_base, - m->front.iov_len); - } + ret = read_partial_message_section(con, &m->front, front_len, + &con->in_front_crc); + if (ret <= 0) + return ret; /* middle */ - while (middle_len > 0 && (!m->middle || - m->middle->vec.iov_len < middle_len)) { - if (m->middle == NULL) { - ret = -EOPNOTSUPP; - if (con->ops->alloc_middle) - ret = con->ops->alloc_middle(con, m); - if (ret < 0) { - pr_err("alloc_middle fail skipping payload\n"); - con->in_base_pos = -middle_len - data_len - - sizeof(m->footer); - ceph_msg_put(con->in_msg); - con->in_msg = NULL; - con->in_tag = CEPH_MSGR_TAG_READY; - return 0; - } - m->middle->vec.iov_len = 0; - } - left = middle_len - m->middle->vec.iov_len; - ret = ceph_tcp_recvmsg(con->sock, - (char *)m->middle->vec.iov_base + - m->middle->vec.iov_len, left); + if (m->middle) { + ret = read_partial_message_section(con, &m->middle->vec, middle_len, + &con->in_middle_crc); if (ret <= 0) return ret; - m->middle->vec.iov_len += ret; - if (m->middle->vec.iov_len == middle_len) - con->in_middle_crc = crc32c(0, m->middle->vec.iov_base, - m->middle->vec.iov_len); } /* (page) data */ @@ -2116,31 +2114,13 @@ out: } /* - * Generic message allocator, for incoming messages. - */ -struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, - struct ceph_msg_header *hdr) -{ - int type = le16_to_cpu(hdr->type); - int front_len = le32_to_cpu(hdr->front_len); - struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL); - - if (!msg) { - pr_err("unable to allocate msg type %d len %d\n", - type, front_len); - return ERR_PTR(-ENOMEM); - } - return msg; -} - -/* * Allocate "middle" portion of a message, if it is needed and wasn't * allocated by alloc_msg. This allows us to read a small fixed-size * per-type header in the front and then gracefully fail (i.e., * propagate the error to the caller based on info in the front) when * the middle is too large. */ -int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) +static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) { int type = le16_to_cpu(msg->hdr.type); int middle_len = le32_to_cpu(msg->hdr.middle_len); @@ -2156,6 +2136,48 @@ int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) return 0; } +/* + * Generic message allocator, for incoming messages. + */ +static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, + struct ceph_msg_header *hdr, + int *skip) +{ + int type = le16_to_cpu(hdr->type); + int front_len = le32_to_cpu(hdr->front_len); + int middle_len = le32_to_cpu(hdr->middle_len); + struct ceph_msg *msg = NULL; + int ret; + + if (con->ops->alloc_msg) { + msg = con->ops->alloc_msg(con, hdr, skip); + if (IS_ERR(msg)) + return msg; + + if (*skip) + return NULL; + } + if (!msg) { + *skip = 0; + msg = ceph_msg_new(type, front_len, 0, 0, NULL); + if (!msg) { + pr_err("unable to allocate msg type %d len %d\n", + type, front_len); + return ERR_PTR(-ENOMEM); + } + } + + if (middle_len) { + ret = ceph_alloc_middle(con, msg); + + if (ret < 0) { + ceph_msg_put(msg); + return msg; + } + } + return msg; +} + /* * Free a generically kmalloc'd message. |