summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/linux/ceph/messenger.h2
-rw-r--r--net/ceph/messenger.c111
2 files changed, 63 insertions, 50 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 0e4536cc46f0..459e55280bf8 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -95,6 +95,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
}
struct ceph_msg_data_cursor {
+ size_t resid; /* bytes not yet consumed */
bool last_piece; /* now at last piece of data item */
union {
#ifdef CONFIG_BLOCK
@@ -105,7 +106,6 @@ struct ceph_msg_data_cursor {
};
#endif /* CONFIG_BLOCK */
struct { /* pages */
- size_t resid; /* bytes from array */
unsigned int page_offset; /* offset in page */
unsigned short page_index; /* index in array */
unsigned short page_count; /* pages in array */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 95f90b01f753..0ac4f6cb7339 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -745,7 +745,8 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
* entry in the current bio iovec, or the first entry in the next
* bio in the list.
*/
-static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data)
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data,
+ size_t length)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
struct bio *bio;
@@ -755,12 +756,12 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data)
bio = data->bio;
BUG_ON(!bio);
BUG_ON(!bio->bi_vcnt);
- /* resid = bio->bi_size */
+ cursor->resid = length;
cursor->bio = bio;
cursor->vector_index = 0;
cursor->vector_offset = 0;
- cursor->last_piece = !bio->bi_next && bio->bi_vcnt == 1;
+ cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
}
static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
@@ -784,8 +785,12 @@ static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
*page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
BUG_ON(*page_offset >= PAGE_SIZE);
- *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
+ if (cursor->last_piece) /* pagelist offset is always 0 */
+ *length = cursor->resid;
+ else
+ *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
BUG_ON(*length > PAGE_SIZE);
+ BUG_ON(*length > cursor->resid);
return bio_vec->bv_page;
}
@@ -805,26 +810,33 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
index = cursor->vector_index;
BUG_ON(index >= (unsigned int) bio->bi_vcnt);
bio_vec = &bio->bi_io_vec[index];
- BUG_ON(cursor->vector_offset + bytes > bio_vec->bv_len);
/* Advance the cursor offset */
+ BUG_ON(cursor->resid < bytes);
+ cursor->resid -= bytes;
cursor->vector_offset += bytes;
if (cursor->vector_offset < bio_vec->bv_len)
return false; /* more bytes to process in this segment */
+ BUG_ON(cursor->vector_offset != bio_vec->bv_len);
/* Move on to the next segment, and possibly the next bio */
- if (++cursor->vector_index == (unsigned int) bio->bi_vcnt) {
+ if (++index == (unsigned int) bio->bi_vcnt) {
bio = bio->bi_next;
- cursor->bio = bio;
- cursor->vector_index = 0;
+ index = 0;
}
+ cursor->bio = bio;
+ cursor->vector_index = index;
cursor->vector_offset = 0;
- if (!cursor->last_piece && bio && !bio->bi_next)
- if (cursor->vector_index == (unsigned int) bio->bi_vcnt - 1)
+ if (!cursor->last_piece) {
+ BUG_ON(!cursor->resid);
+ BUG_ON(!bio);
+ /* A short read is OK, so use <= rather than == */
+ if (cursor->resid <= bio->bi_io_vec[index].bv_len)
cursor->last_piece = true;
+ }
return true;
}
@@ -834,7 +846,8 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
* For a page array, a piece comes from the first page in the array
* that has not already been fully consumed.
*/
-static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data)
+static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data,
+ size_t length)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
int page_count;
@@ -843,14 +856,15 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data)
BUG_ON(!data->pages);
BUG_ON(!data->length);
+ BUG_ON(length != data->length);
+ cursor->resid = length;
page_count = calc_pages_for(data->alignment, (u64)data->length);
- BUG_ON(page_count > (int) USHRT_MAX);
- cursor->resid = data->length;
cursor->page_offset = data->alignment & ~PAGE_MASK;
cursor->page_index = 0;
+ BUG_ON(page_count > (int) USHRT_MAX);
cursor->page_count = (unsigned short) page_count;
- cursor->last_piece = cursor->page_count == 1;
+ cursor->last_piece = length <= PAGE_SIZE;
}
static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
@@ -863,15 +877,12 @@ static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
BUG_ON(cursor->page_index >= cursor->page_count);
BUG_ON(cursor->page_offset >= PAGE_SIZE);
- BUG_ON(!cursor->resid);
*page_offset = cursor->page_offset;
- if (cursor->last_piece) {
- BUG_ON(*page_offset + cursor->resid > PAGE_SIZE);
+ if (cursor->last_piece)
*length = cursor->resid;
- } else {
+ else
*length = PAGE_SIZE - *page_offset;
- }
return data->pages[cursor->page_index];
}
@@ -884,7 +895,6 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
- BUG_ON(bytes > cursor->resid);
/* Advance the cursor page offset */
@@ -898,7 +908,7 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
BUG_ON(cursor->page_index >= cursor->page_count);
cursor->page_offset = 0;
cursor->page_index++;
- cursor->last_piece = cursor->page_index == cursor->page_count - 1;
+ cursor->last_piece = cursor->resid <= PAGE_SIZE;
return true;
}
@@ -907,7 +917,8 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
* For a pagelist, a piece is whatever remains to be consumed in the
* first page in the list, or the front of the next page.
*/
-static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data)
+static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
+ size_t length)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
struct ceph_pagelist *pagelist;
@@ -917,15 +928,18 @@ static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data)
pagelist = data->pagelist;
BUG_ON(!pagelist);
- if (!pagelist->length)
+ BUG_ON(length != pagelist->length);
+
+ if (!length)
return; /* pagelist can be assigned but empty */
BUG_ON(list_empty(&pagelist->head));
page = list_first_entry(&pagelist->head, struct page, lru);
+ cursor->resid = length;
cursor->page = page;
cursor->offset = 0;
- cursor->last_piece = pagelist->length <= PAGE_SIZE;
+ cursor->last_piece = length <= PAGE_SIZE;
}
static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
@@ -934,7 +948,6 @@ static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
struct ceph_pagelist *pagelist;
- size_t piece_end;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
@@ -942,18 +955,13 @@ static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
BUG_ON(!pagelist);
BUG_ON(!cursor->page);
- BUG_ON(cursor->offset >= pagelist->length);
+ BUG_ON(cursor->offset + cursor->resid != pagelist->length);
- if (cursor->last_piece) {
- /* pagelist offset is always 0 */
- piece_end = pagelist->length & ~PAGE_MASK;
- if (!piece_end)
- piece_end = PAGE_SIZE;
- } else {
- piece_end = PAGE_SIZE;
- }
*page_offset = cursor->offset & ~PAGE_MASK;
- *length = piece_end - *page_offset;
+ if (cursor->last_piece) /* pagelist offset is always 0 */
+ *length = cursor->resid;
+ else
+ *length = PAGE_SIZE - *page_offset;
return data->cursor.page;
}
@@ -968,12 +976,13 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
pagelist = data->pagelist;
BUG_ON(!pagelist);
- BUG_ON(!cursor->page);
- BUG_ON(cursor->offset + bytes > pagelist->length);
+
+ BUG_ON(cursor->offset + cursor->resid != pagelist->length);
BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
/* Advance the cursor offset */
+ cursor->resid -= bytes;
cursor->offset += bytes;
/* pagelist offset is always 0 */
if (!bytes || cursor->offset & ~PAGE_MASK)
@@ -983,10 +992,7 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
cursor->page = list_entry_next(cursor->page, lru);
-
- /* cursor offset is at page boundary; pagelist offset is always 0 */
- if (pagelist->length - cursor->offset <= PAGE_SIZE)
- cursor->last_piece = true;
+ cursor->last_piece = cursor->resid <= PAGE_SIZE;
return true;
}
@@ -999,18 +1005,19 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
* be processed in that piece. It also tracks whether the current
* piece is the last one in the data item.
*/
-static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
+static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
+ size_t length)
{
switch (data->type) {
case CEPH_MSG_DATA_PAGELIST:
- ceph_msg_data_pagelist_cursor_init(data);
+ ceph_msg_data_pagelist_cursor_init(data, length);
break;
case CEPH_MSG_DATA_PAGES:
- ceph_msg_data_pages_cursor_init(data);
+ ceph_msg_data_pages_cursor_init(data, length);
break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
- ceph_msg_data_bio_cursor_init(data);
+ ceph_msg_data_bio_cursor_init(data, length);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
@@ -1064,8 +1071,10 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
*/
static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
bool new_piece;
+ BUG_ON(bytes > cursor->resid);
switch (data->type) {
case CEPH_MSG_DATA_PAGELIST:
new_piece = ceph_msg_data_pagelist_advance(data, bytes);
@@ -1090,8 +1099,12 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
static void prepare_message_data(struct ceph_msg *msg,
struct ceph_msg_pos *msg_pos)
{
+ size_t data_len;
+
BUG_ON(!msg);
- BUG_ON(!msg->hdr.data_len);
+
+ data_len = le32_to_cpu(msg->hdr.data_len);
+ BUG_ON(!data_len);
/* initialize page iterator */
msg_pos->page = 0;
@@ -1109,12 +1122,12 @@ static void prepare_message_data(struct ceph_msg *msg,
#ifdef CONFIG_BLOCK
if (ceph_msg_has_bio(msg))
- ceph_msg_data_cursor_init(&msg->b);
+ ceph_msg_data_cursor_init(&msg->b, data_len);
#endif /* CONFIG_BLOCK */
if (ceph_msg_has_pages(msg))
- ceph_msg_data_cursor_init(&msg->p);
+ ceph_msg_data_cursor_init(&msg->p, data_len);
if (ceph_msg_has_pagelist(msg))
- ceph_msg_data_cursor_init(&msg->l);
+ ceph_msg_data_cursor_init(&msg->l, data_len);
msg_pos->did_page_crc = false;
}