summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/linux/ceph/messenger.h5
-rw-r--r--net/ceph/messenger.c48
2 files changed, 36 insertions, 17 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 318da0170a1e..de1d2e1ecce2 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -108,7 +108,10 @@ struct ceph_msg_data {
};
struct ceph_msg_data_cursor {
- struct ceph_msg_data *data; /* data item this describes */
+ size_t total_resid; /* across all data items */
+ struct list_head *data_head; /* = &ceph_msg->data */
+
+ struct ceph_msg_data *data; /* current data item */
size_t resid; /* bytes not yet consumed */
bool last_piece; /* current is last piece */
bool need_crc; /* crc update needed */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 8bfe7d34bc85..84703e550c26 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -734,7 +734,7 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
BUG_ON(!bio);
BUG_ON(!bio->bi_vcnt);
- cursor->resid = length;
+ cursor->resid = min(length, data->bio_length);
cursor->bio = bio;
cursor->vector_index = 0;
cursor->vector_offset = 0;
@@ -833,9 +833,8 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
BUG_ON(!data->pages);
BUG_ON(!data->length);
- BUG_ON(length > data->length); /* short reads are OK */
- cursor->resid = length;
+ cursor->resid = min(length, data->length);
page_count = calc_pages_for(data->alignment, (u64)data->length);
cursor->page_offset = data->alignment & ~PAGE_MASK;
cursor->page_index = 0;
@@ -904,7 +903,6 @@ ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
pagelist = data->pagelist;
BUG_ON(!pagelist);
- BUG_ON(length > pagelist->length); /* short reads are OK */
if (!length)
return; /* pagelist can be assigned but empty */
@@ -912,7 +910,7 @@ ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
BUG_ON(list_empty(&pagelist->head));
page = list_first_entry(&pagelist->head, struct page, lru);
- cursor->resid = length;
+ cursor->resid = min(length, pagelist->length);
cursor->page = page;
cursor->offset = 0;
cursor->last_piece = length <= PAGE_SIZE;
@@ -982,13 +980,10 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
* be processed in that piece. It also tracks whether the current
* piece is the last one in the data item.
*/
-static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
{
- struct ceph_msg_data_cursor *cursor = &msg->cursor;
- struct ceph_msg_data *data;
+ size_t length = cursor->total_resid;
- data = list_first_entry(&msg->data, struct ceph_msg_data, links);
- cursor->data = data;
switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
ceph_msg_data_pagelist_cursor_init(cursor, length);
@@ -1009,6 +1004,25 @@ static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
cursor->need_crc = true;
}
+static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+{
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
+ struct ceph_msg_data *data;
+
+ BUG_ON(!length);
+ BUG_ON(length > msg->data_length);
+ BUG_ON(list_empty(&msg->data));
+
+ data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+
+ cursor->data_head = &msg->data;
+ cursor->total_resid = length;
+ data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+ cursor->data = data;
+
+ __ceph_msg_data_cursor_init(cursor);
+}
+
/*
* Return the page containing the next piece to process for a given
* data item, and supply the page offset and length of that piece.
@@ -1073,8 +1087,16 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
BUG();
break;
}
+ cursor->total_resid -= bytes;
cursor->need_crc = new_piece;
+ if (!cursor->resid && cursor->total_resid) {
+ WARN_ON(!cursor->last_piece);
+ BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
+ cursor->data = list_entry_next(cursor->data, links);
+ __ceph_msg_data_cursor_init(cursor);
+ }
+
return new_piece;
}
@@ -2990,8 +3012,6 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
BUG_ON(!pages);
BUG_ON(!length);
- BUG_ON(msg->data_length);
- BUG_ON(!list_empty(&msg->data));
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
BUG_ON(!data);
@@ -3012,8 +3032,6 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
BUG_ON(!pagelist);
BUG_ON(!pagelist->length);
- BUG_ON(msg->data_length);
- BUG_ON(!list_empty(&msg->data));
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
BUG_ON(!data);
@@ -3031,8 +3049,6 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
struct ceph_msg_data *data;
BUG_ON(!bio);
- BUG_ON(msg->data_length);
- BUG_ON(!list_empty(&msg->data));
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
BUG_ON(!data);