diff options
Diffstat (limited to 'drivers/block/drbd/drbd_receiver.c')
-rw-r--r-- | drivers/block/drbd/drbd_receiver.c | 949 |
1 files changed, 538 insertions, 411 deletions
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c index 081522d3c742..efd6169acf2f 100644 --- a/drivers/block/drbd/drbd_receiver.c +++ b/drivers/block/drbd/drbd_receiver.c @@ -241,7 +241,7 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev) spin_unlock_irq(&mdev->req_lock); list_for_each_entry_safe(e, t, &reclaimed, w.list) - drbd_free_ee(mdev, e); + drbd_free_net_ee(mdev, e); } /** @@ -298,9 +298,11 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool * Is also used from inside an other spin_lock_irq(&mdev->req_lock); * Either links the page chain back to the global pool, * or returns all pages to the system. */ -static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) +static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net) { + atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use; int i; + if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) i = page_chain_free(page); else { @@ -311,10 +313,10 @@ static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) drbd_pp_vacant += i; spin_unlock(&drbd_pp_lock); } - atomic_sub(i, &mdev->pp_in_use); - i = atomic_read(&mdev->pp_in_use); + i = atomic_sub_return(i, a); if (i < 0) - dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i); + dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n", + is_net ? "pp_in_use_by_net" : "pp_in_use", i); wake_up(&drbd_pp_wait); } @@ -365,7 +367,6 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, e->size = data_size; e->flags = 0; e->sector = sector; - e->sector = sector; e->block_id = id; return e; @@ -375,9 +376,11 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, return NULL; } -void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) +void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net) { - drbd_pp_free(mdev, e->pages); + if (e->flags & EE_HAS_DIGEST) + kfree(e->digest); + drbd_pp_free(mdev, e->pages, is_net); D_ASSERT(atomic_read(&e->pending_bios) == 0); D_ASSERT(hlist_unhashed(&e->colision)); mempool_free(e, drbd_ee_mempool); @@ -388,13 +391,14 @@ int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list) LIST_HEAD(work_list); struct drbd_epoch_entry *e, *t; int count = 0; + int is_net = list == &mdev->net_ee; spin_lock_irq(&mdev->req_lock); list_splice_init(list, &work_list); spin_unlock_irq(&mdev->req_lock); list_for_each_entry_safe(e, t, &work_list, w.list) { - drbd_free_ee(mdev, e); + drbd_free_some_ee(mdev, e, is_net); count++; } return count; @@ -423,7 +427,7 @@ static int drbd_process_done_ee(struct drbd_conf *mdev) spin_unlock_irq(&mdev->req_lock); list_for_each_entry_safe(e, t, &reclaimed, w.list) - drbd_free_ee(mdev, e); + drbd_free_net_ee(mdev, e); /* possible callbacks here: * e_end_block, and e_end_resync_block, e_send_discard_ack. @@ -719,14 +723,14 @@ out: static int drbd_send_fp(struct drbd_conf *mdev, struct socket *sock, enum drbd_packets cmd) { - struct p_header *h = (struct p_header *) &mdev->data.sbuf.header; + struct p_header80 *h = &mdev->data.sbuf.header.h80; return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0); } static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock) { - struct p_header *h = (struct p_header *) &mdev->data.sbuf.header; + struct p_header80 *h = &mdev->data.rbuf.header.h80; int rr; rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0); @@ -776,9 +780,6 @@ static int drbd_connect(struct drbd_conf *mdev) D_ASSERT(!mdev->data.socket); - if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) - dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n"); - if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS) return -2; @@ -927,6 +928,11 @@ retry: drbd_thread_start(&mdev->asender); + if (mdev->agreed_pro_version < 95 && get_ldev(mdev)) { + drbd_setup_queue_param(mdev, DRBD_MAX_SIZE_H80_PACKET); + put_ldev(mdev); + } + if (!drbd_send_protocol(mdev)) return -1; drbd_send_sync_param(mdev, &mdev->sync_conf); @@ -946,22 +952,28 @@ out_release_sockets: return -1; } -static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h) +static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size) { + union p_header *h = &mdev->data.rbuf.header; int r; r = drbd_recv(mdev, h, sizeof(*h)); - if (unlikely(r != sizeof(*h))) { dev_err(DEV, "short read expecting header on sock: r=%d\n", r); return FALSE; - }; - h->command = be16_to_cpu(h->command); - h->length = be16_to_cpu(h->length); - if (unlikely(h->magic != BE_DRBD_MAGIC)) { - dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n", - (long)be32_to_cpu(h->magic), - h->command, h->length); + } + + if (likely(h->h80.magic == BE_DRBD_MAGIC)) { + *cmd = be16_to_cpu(h->h80.command); + *packet_size = be16_to_cpu(h->h80.length); + } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) { + *cmd = be16_to_cpu(h->h95.command); + *packet_size = be32_to_cpu(h->h95.length); + } else { + dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n", + be32_to_cpu(h->h80.magic), + be16_to_cpu(h->h80.command), + be16_to_cpu(h->h80.length)); return FALSE; } mdev->last_received = jiffies; @@ -975,7 +987,7 @@ static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct d if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) { rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL, - NULL, BLKDEV_IFL_WAIT); + NULL); if (rv) { dev_err(DEV, "local disk flush failed with status %d\n", rv); /* would rather check on EOPNOTSUPP, but that is not reliable. @@ -1268,17 +1280,12 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea return 1; } -static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h) +static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { int rv, issue_flush; - struct p_barrier *p = (struct p_barrier *)h; + struct p_barrier *p = &mdev->data.rbuf.barrier; struct drbd_epoch *epoch; - ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; - - rv = drbd_recv(mdev, h->payload, h->length); - ERR_IF(rv != h->length) return FALSE; - inc_unacked(mdev); if (mdev->net_conf->wire_protocol != DRBD_PROT_C) @@ -1457,7 +1464,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size) data_size -= rr; } kunmap(page); - drbd_pp_free(mdev, page); + drbd_pp_free(mdev, page, 0); return rv; } @@ -1562,30 +1569,29 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si list_add(&e->w.list, &mdev->sync_ee); spin_unlock_irq(&mdev->req_lock); + atomic_add(data_size >> 9, &mdev->rs_sect_ev); if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0) return TRUE; + /* drbd_submit_ee currently fails for one reason only: + * not being able to allocate enough bios. + * Is dropping the connection going to help? */ + spin_lock_irq(&mdev->req_lock); + list_del(&e->w.list); + spin_unlock_irq(&mdev->req_lock); + drbd_free_ee(mdev, e); fail: put_ldev(mdev); return FALSE; } -static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h) +static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { struct drbd_request *req; sector_t sector; - unsigned int header_size, data_size; int ok; - struct p_data *p = (struct p_data *)h; - - header_size = sizeof(*p) - sizeof(*h); - data_size = h->length - header_size; - - ERR_IF(data_size == 0) return FALSE; - - if (drbd_recv(mdev, h->payload, header_size) != header_size) - return FALSE; + struct p_data *p = &mdev->data.rbuf.data; sector = be64_to_cpu(p->sector); @@ -1611,20 +1617,11 @@ static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h) return ok; } -static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h) +static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { sector_t sector; - unsigned int header_size, data_size; int ok; - struct p_data *p = (struct p_data *)h; - - header_size = sizeof(*p) - sizeof(*h); - data_size = h->length - header_size; - - ERR_IF(data_size == 0) return FALSE; - - if (drbd_recv(mdev, h->payload, header_size) != header_size) - return FALSE; + struct p_data *p = &mdev->data.rbuf.data; sector = be64_to_cpu(p->sector); D_ASSERT(p->block_id == ID_SYNCER); @@ -1640,9 +1637,11 @@ static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h) ok = drbd_drain_block(mdev, data_size); - drbd_send_ack_dp(mdev, P_NEG_ACK, p); + drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size); } + atomic_add(data_size >> 9, &mdev->rs_sect_in); + return ok; } @@ -1765,24 +1764,27 @@ static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq) return ret; } +static unsigned long write_flags_to_bio(struct drbd_conf *mdev, u32 dpf) +{ + if (mdev->agreed_pro_version >= 95) + return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) | + (dpf & DP_UNPLUG ? REQ_UNPLUG : 0) | + (dpf & DP_FUA ? REQ_FUA : 0) | + (dpf & DP_FLUSH ? REQ_FUA : 0) | + (dpf & DP_DISCARD ? REQ_DISCARD : 0); + else + return dpf & DP_RW_SYNC ? (REQ_SYNC | REQ_UNPLUG) : 0; +} + /* mirrored write */ -static int receive_Data(struct drbd_conf *mdev, struct p_header *h) +static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { sector_t sector; struct drbd_epoch_entry *e; - struct p_data *p = (struct p_data *)h; - int header_size, data_size; + struct p_data *p = &mdev->data.rbuf.data; int rw = WRITE; u32 dp_flags; - header_size = sizeof(*p) - sizeof(*h); - data_size = h->length - header_size; - - ERR_IF(data_size == 0) return FALSE; - - if (drbd_recv(mdev, h->payload, header_size) != header_size) - return FALSE; - if (!get_ldev(mdev)) { if (__ratelimit(&drbd_ratelimit_state)) dev_err(DEV, "Can not write mirrored data block " @@ -1792,7 +1794,7 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h) mdev->peer_seq++; spin_unlock(&mdev->peer_seq_lock); - drbd_send_ack_dp(mdev, P_NEG_ACK, p); + drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size); atomic_inc(&mdev->current_epoch->epoch_size); return drbd_drain_block(mdev, data_size); } @@ -1839,12 +1841,8 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h) spin_unlock(&mdev->epoch_lock); dp_flags = be32_to_cpu(p->dp_flags); - if (dp_flags & DP_HARDBARRIER) { - dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n"); - /* rw |= REQ_HARDBARRIER; */ - } - if (dp_flags & DP_RW_SYNC) - rw |= REQ_SYNC | REQ_UNPLUG; + rw |= write_flags_to_bio(mdev, dp_flags); + if (dp_flags & DP_MAY_SET_IN_SYNC) e->flags |= EE_MAY_SET_IN_SYNC; @@ -2007,6 +2005,16 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h) if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0) return TRUE; + /* drbd_submit_ee currently fails for one reason only: + * not being able to allocate enough bios. + * Is dropping the connection going to help? */ + spin_lock_irq(&mdev->req_lock); + list_del(&e->w.list); + hlist_del_init(&e->colision); + spin_unlock_irq(&mdev->req_lock); + if (e->flags & EE_CALL_AL_COMPLETE_IO) + drbd_al_complete_io(mdev, e->sector); + out_interrupted: /* yes, the epoch_size now is imbalanced. * but we drop the connection anyways, so we don't have a chance to @@ -2016,20 +2024,64 @@ out_interrupted: return FALSE; } -static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) +/* We may throttle resync, if the lower device seems to be busy, + * and current sync rate is above c_min_rate. + * + * To decide whether or not the lower device is busy, we use a scheme similar + * to MD RAID is_mddev_idle(): if the partition stats reveal "significant" + * (more than 64 sectors) of activity we cannot account for with our own resync + * activity, it obviously is "busy". + * + * The current sync rate used here uses only the most recent two step marks, + * to have a short time average so we can react faster. + */ +int drbd_rs_should_slow_down(struct drbd_conf *mdev) +{ + struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk; + unsigned long db, dt, dbdt; + int curr_events; + int throttle = 0; + + /* feature disabled? */ + if (mdev->sync_conf.c_min_rate == 0) + return 0; + + curr_events = (int)part_stat_read(&disk->part0, sectors[0]) + + (int)part_stat_read(&disk->part0, sectors[1]) - + atomic_read(&mdev->rs_sect_ev); + if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) { + unsigned long rs_left; + int i; + + mdev->rs_last_events = curr_events; + + /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP, + * approx. */ + i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-2) % DRBD_SYNC_MARKS; + rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed; + + dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ; + if (!dt) + dt++; + db = mdev->rs_mark_left[i] - rs_left; + dbdt = Bit2KB(db/dt); + + if (dbdt > mdev->sync_conf.c_min_rate) + throttle = 1; + } + return throttle; +} + + +static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size) { sector_t sector; const sector_t capacity = drbd_get_capacity(mdev->this_bdev); struct drbd_epoch_entry *e; struct digest_info *di = NULL; - int size, digest_size; + int size, verb; unsigned int fault_type; - struct p_block_req *p = - (struct p_block_req *)h; - const int brps = sizeof(*p)-sizeof(*h); - - if (drbd_recv(mdev, h->payload, brps) != brps) - return FALSE; + struct p_block_req *p = &mdev->data.rbuf.block_req; sector = be64_to_cpu(p->sector); size = be32_to_cpu(p->blksize); @@ -2046,12 +2098,31 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) } if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) { - if (__ratelimit(&drbd_ratelimit_state)) + verb = 1; + switch (cmd) { + case P_DATA_REQUEST: + drbd_send_ack_rp(mdev, P_NEG_DREPLY, p); + break; + case P_RS_DATA_REQUEST: + case P_CSUM_RS_REQUEST: + case P_OV_REQUEST: + drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p); + break; + case P_OV_REPLY: + verb = 0; + dec_rs_pending(mdev); + drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC); + break; + default: + dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n", + cmdname(cmd)); + } + if (verb && __ratelimit(&drbd_ratelimit_state)) dev_err(DEV, "Can not satisfy peer's read request, " "no local data.\n"); - drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY : - P_NEG_RS_DREPLY , p); - return drbd_drain_block(mdev, h->length - brps); + + /* drain possibly payload */ + return drbd_drain_block(mdev, digest_size); } /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD @@ -2063,31 +2134,21 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) return FALSE; } - switch (h->command) { + switch (cmd) { case P_DATA_REQUEST: e->w.cb = w_e_end_data_req; fault_type = DRBD_FAULT_DT_RD; - break; + /* application IO, don't drbd_rs_begin_io */ + goto submit; + case P_RS_DATA_REQUEST: e->w.cb = w_e_end_rsdata_req; fault_type = DRBD_FAULT_RS_RD; - /* Eventually this should become asynchronously. Currently it - * blocks the whole receiver just to delay the reading of a - * resync data block. - * the drbd_work_queue mechanism is made for this... - */ - if (!drbd_rs_begin_io(mdev, sector)) { - /* we have been interrupted, - * probably connection lost! */ - D_ASSERT(signal_pending(current)); - goto out_free_e; - } break; case P_OV_REPLY: case P_CSUM_RS_REQUEST: fault_type = DRBD_FAULT_RS_RD; - digest_size = h->length - brps ; di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO); if (!di) goto out_free_e; @@ -2095,31 +2156,25 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) di->digest_size = digest_size; di->digest = (((char *)di)+sizeof(struct digest_info)); + e->digest = di; + e->flags |= EE_HAS_DIGEST; + if (drbd_recv(mdev, di->digest, digest_size) != digest_size) goto out_free_e; - e->block_id = (u64)(unsigned long)di; - if (h->command == P_CSUM_RS_REQUEST) { + if (cmd == P_CSUM_RS_REQUEST) { D_ASSERT(mdev->agreed_pro_version >= 89); e->w.cb = w_e_end_csum_rs_req; - } else if (h->command == P_OV_REPLY) { + } else if (cmd == P_OV_REPLY) { e->w.cb = w_e_end_ov_reply; dec_rs_pending(mdev); - break; - } - - if (!drbd_rs_begin_io(mdev, sector)) { - /* we have been interrupted, probably connection lost! */ - D_ASSERT(signal_pending(current)); - goto out_free_e; + /* drbd_rs_begin_io done when we sent this request, + * but accounting still needs to be done. */ + goto submit_for_resync; } break; case P_OV_REQUEST: - if (mdev->state.conn >= C_CONNECTED && - mdev->state.conn != C_VERIFY_T) - dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n", - drbd_conn_str(mdev->state.conn)); if (mdev->ov_start_sector == ~(sector_t)0 && mdev->agreed_pro_version >= 90) { mdev->ov_start_sector = sector; @@ -2130,37 +2185,63 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) } e->w.cb = w_e_end_ov_req; fault_type = DRBD_FAULT_RS_RD; - /* Eventually this should become asynchronous. Currently it - * blocks the whole receiver just to delay the reading of a - * resync data block. - * the drbd_work_queue mechanism is made for this... - */ - if (!drbd_rs_begin_io(mdev, sector)) { - /* we have been interrupted, - * probably connection lost! */ - D_ASSERT(signal_pending(current)); - goto out_free_e; - } break; - default: dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n", - cmdname(h->command)); + cmdname(cmd)); fault_type = DRBD_FAULT_MAX; + goto out_free_e; } - spin_lock_irq(&mdev->req_lock); - list_add(&e->w.list, &mdev->read_ee); - spin_unlock_irq(&mdev->req_lock); + /* Throttle, drbd_rs_begin_io and submit should become asynchronous + * wrt the receiver, but it is not as straightforward as it may seem. + * Various places in the resync start and stop logic assume resync + * requests are processed in order, requeuing this on the worker thread + * introduces a bunch of new code for synchronization between threads. + * + * Unlimited throttling before drbd_rs_begin_io may stall the resync + * "forever", throttling after drbd_rs_begin_io will lock that extent + * for application writes for the same time. For now, just throttle + * here, where the rest of the code expects the receiver to sleep for + * a while, anyways. + */ + + /* Throttle before drbd_rs_begin_io, as that locks out application IO; + * this defers syncer requests for some time, before letting at least + * on request through. The resync controller on the receiving side + * will adapt to the incoming rate accordingly. + * + * We cannot throttle here if remote is Primary/SyncTarget: + * we would also throttle its application reads. + * In that case, throttling is done on the SyncTarget only. + */ + if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev)) + msleep(100); + if (drbd_rs_begin_io(mdev, e->sector)) + goto out_free_e; +submit_for_resync: + atomic_add(size >> 9, &mdev->rs_sect_ev); + +submit: inc_unacked(mdev); + spin_lock_irq(&mdev->req_lock); + list_add_tail(&e->w.list, &mdev->read_ee); + spin_unlock_irq(&mdev->req_lock); if (drbd_submit_ee(mdev, e, READ, fault_type) == 0) return TRUE; + /* drbd_submit_ee currently fails for one reason only: + * not being able to allocate enough bios. + * Is dropping the connection going to help? */ + spin_lock_irq(&mdev->req_lock); + list_del(&e->w.list); + spin_unlock_irq(&mdev->req_lock); + /* no drbd_rs_complete_io(), we are dropping the connection anyways */ + out_free_e: - kfree(di); put_ldev(mdev); drbd_free_ee(mdev, e); return FALSE; @@ -2699,20 +2780,13 @@ static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self) return 1; } -static int receive_protocol(struct drbd_conf *mdev, struct p_header *h) +static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - struct p_protocol *p = (struct p_protocol *)h; - int header_size, data_size; + struct p_protocol *p = &mdev->data.rbuf.protocol; int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p; int p_want_lose, p_two_primaries, cf; char p_integrity_alg[SHARED_SECRET_MAX] = ""; - header_size = sizeof(*p) - sizeof(*h); - data_size = h->length - header_size; - - if (drbd_recv(mdev, h->payload, header_size) != header_size) - return FALSE; - p_proto = be32_to_cpu(p->protocol); p_after_sb_0p = be32_to_cpu(p->after_sb_0p); p_after_sb_1p = be32_to_cpu(p->after_sb_1p); @@ -2805,39 +2879,46 @@ struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev, return tfm; } -static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h) +static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size) { int ok = TRUE; - struct p_rs_param_89 *p = (struct p_rs_param_89 *)h; + struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95; unsigned int header_size, data_size, exp_max_sz; struct crypto_hash *verify_tfm = NULL; struct crypto_hash *csums_tfm = NULL; const int apv = mdev->agreed_pro_version; + int *rs_plan_s = NULL; + int fifo_size = 0; exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param) : apv == 88 ? sizeof(struct p_rs_param) + SHARED_SECRET_MAX - : /* 89 */ sizeof(struct p_rs_param_89); + : apv <= 94 ? sizeof(struct p_rs_param_89) + : /* apv >= 95 */ sizeof(struct p_rs_param_95); - if (h->length > exp_max_sz) { + if (packet_size > exp_max_sz) { dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n", - h->length, exp_max_sz); + packet_size, exp_max_sz); return FALSE; } if (apv <= 88) { - header_size = sizeof(struct p_rs_param) - sizeof(*h); - data_size = h->length - header_size; - } else /* apv >= 89 */ { - header_size = sizeof(struct p_rs_param_89) - sizeof(*h); - data_size = h->length - header_size; + header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80); + data_size = packet_size - header_size; + } else if (apv <= 94) { + header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80); + data_size = packet_size - header_size; + D_ASSERT(data_size == 0); + } else { + header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80); + data_size = packet_size - header_size; D_ASSERT(data_size == 0); } /* initialize verify_alg and csums_alg */ memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX); - if (drbd_recv(mdev, h->payload, header_size) != header_size) + if (drbd_recv(mdev, &p->head.payload, header_size) != header_size) return FALSE; mdev->sync_conf.rate = be32_to_cpu(p->rate); @@ -2896,6 +2977,22 @@ static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h) } } + if (apv > 94) { + mdev->sync_conf.rate = be32_to_cpu(p->rate); + mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead); + mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target); + mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target); + mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate); + + fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; + if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) { + rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL); + if (!rs_plan_s) { + dev_err(DEV, "kmalloc of fifo_buffer failed"); + goto disconnect; + } + } + } spin_lock(&mdev->peer_seq_lock); /* lock against drbd_nl_syncer_conf() */ @@ -2913,6 +3010,12 @@ static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h) mdev->csums_tfm = csums_tfm; dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg); } + if (fifo_size != mdev->rs_plan_s.size) { + kfree(mdev->rs_plan_s.values); + mdev->rs_plan_s.values = rs_plan_s; + mdev->rs_plan_s.size = fifo_size; + mdev->rs_planed = 0; + } spin_unlock(&mdev->peer_seq_lock); } @@ -2946,19 +3049,15 @@ static void warn_if_differ_considerably(struct drbd_conf *mdev, (unsigned long long)a, (unsigned long long)b); } -static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) +static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - struct p_sizes *p = (struct p_sizes *)h; + struct p_sizes *p = &mdev->data.rbuf.sizes; enum determine_dev_size dd = unchanged; unsigned int max_seg_s; sector_t p_size, p_usize, my_usize; int ldsc = 0; /* local disk size changed */ enum dds_flags ddsf; - ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; - if (drbd_recv(mdev, h->payload, h->length) != h->length) - return FALSE; - p_size = be64_to_cpu(p->d_size); p_usize = be64_to_cpu(p->u_size); @@ -2972,7 +3071,6 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) * we still need to figure out whether we accept that. */ mdev->p_size = p_size; -#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r)) if (get_ldev(mdev)) { warn_if_differ_considerably(mdev, "lower level device sizes", p_size, drbd_get_max_capacity(mdev->ldev)); @@ -3029,6 +3127,8 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) if (mdev->agreed_pro_version < 94) max_seg_s = be32_to_cpu(p->max_segment_size); + else if (mdev->agreed_pro_version == 94) + max_seg_s = DRBD_MAX_SIZE_H80_PACKET; else /* drbd 8.3.8 onwards */ max_seg_s = DRBD_MAX_SEGMENT_SIZE; @@ -3062,16 +3162,12 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int receive_uuids(struct drbd_conf *mdev, struct p_header *h) +static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - struct p_uuids *p = (struct p_uuids *)h; + struct p_uuids *p = &mdev->data.rbuf.uuids; u64 *p_uuid; int i; - ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; - if (drbd_recv(mdev, h->payload, h->length) != h->length) - return FALSE; - p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO); for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++) @@ -3107,6 +3203,11 @@ static int receive_uuids(struct drbd_conf *mdev, struct p_header *h) drbd_md_sync(mdev); } put_ldev(mdev); + } else if (mdev->state.disk < D_INCONSISTENT && + mdev->state.role == R_PRIMARY) { + /* I am a diskless primary, the peer just created a new current UUID + for me. */ + drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]); } /* Before we test for the disk state, we should wait until an eventually @@ -3150,16 +3251,12 @@ static union drbd_state convert_state(union drbd_state ps) return ms; } -static int receive_req_state(struct drbd_conf *mdev, struct p_header *h) +static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - struct p_req_state *p = (struct p_req_state *)h; + struct p_req_state *p = &mdev->data.rbuf.req_state; union drbd_state mask, val; int rv; - ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; - if (drbd_recv(mdev, h->payload, h->length) != h->length) - return FALSE; - mask.i = be32_to_cpu(p->mask); val.i = be32_to_cpu(p->val); @@ -3180,20 +3277,14 @@ static int receive_req_state(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int receive_state(struct drbd_conf *mdev, struct p_header *h) +static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - struct p_state *p = (struct p_state *)h; - enum drbd_conns nconn, oconn; - union drbd_state ns, peer_state; + struct p_state *p = &mdev->data.rbuf.state; + union drbd_state os, ns, peer_state; enum drbd_disk_state real_peer_disk; + enum chg_state_flags cs_flags; int rv; - ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) - return FALSE; - - if (drbd_recv(mdev, h->payload, h->length) != h->length) - return FALSE; - peer_state.i = be32_to_cpu(p->state); real_peer_disk = peer_state.disk; @@ -3204,38 +3295,72 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h) spin_lock_irq(&mdev->req_lock); retry: - oconn = nconn = mdev->state.conn; + os = ns = mdev->state; spin_unlock_irq(&mdev->req_lock); - if (nconn == C_WF_REPORT_PARAMS) - nconn = C_CONNECTED; + /* peer says his disk is uptodate, while we think it is inconsistent, + * and this happens while we think we have a sync going on. */ + if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE && + os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) { + /* If we are (becoming) SyncSource, but peer is still in sync + * preparation, ignore its uptodate-ness to avoid flapping, it + * will change to inconsistent once the peer reaches active + * syncing states. + * It may have changed syncer-paused flags, however, so we + * cannot ignore this completely. */ + if (peer_state.conn > C_CONNECTED && + peer_state.conn < C_SYNC_SOURCE) + real_peer_disk = D_INCONSISTENT; + + /* if peer_state changes to connected at the same time, + * it explicitly notifies us that it finished resync. + * Maybe we should finish it up, too? */ + else if (os.conn >= C_SYNC_SOURCE && + peer_state.conn == C_CONNECTED) { + if (drbd_bm_total_weight(mdev) <= mdev->rs_failed) + drbd_resync_finished(mdev); + return TRUE; + } + } + + /* peer says his disk is inconsistent, while we think it is uptodate, + * and this happens while the peer still thinks we have a sync going on, + * but we think we are already done with the sync. + * We ignore this to avoid flapping pdsk. + * This should not happen, if the peer is a recent version of drbd. */ + if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT && + os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE) + real_peer_disk = D_UP_TO_DATE; + + if (ns.conn == C_WF_REPORT_PARAMS) + ns.conn = C_CONNECTED; if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING && get_ldev_if_state(mdev, D_NEGOTIATING)) { int cr; /* consider resync */ /* if we established a new connection */ - cr = (oconn < C_CONNECTED); + cr = (os.conn < C_CONNECTED); /* if we had an established connection * and one of the nodes newly attaches a disk */ - cr |= (oconn == C_CONNECTED && + cr |= (os.conn == C_CONNECTED && (peer_state.disk == D_NEGOTIATING || - mdev->state.disk == D_NEGOTIATING)); + os.disk == D_NEGOTIATING)); /* if we have both been inconsistent, and the peer has been * forced to be UpToDate with --overwrite-data */ cr |= test_bit(CONSIDER_RESYNC, &mdev->flags); /* if we had been plain connected, and the admin requested to * start a sync by "invalidate" or "invalidate-remote" */ - cr |= (oconn == C_CONNECTED && + cr |= (os.conn == C_CONNECTED && (peer_state.conn >= C_STARTING_SYNC_S && peer_state.conn <= C_WF_BITMAP_T)); if (cr) - nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk); + ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk); put_ldev(mdev); - if (nconn == C_MASK) { - nconn = C_CONNECTED; + if (ns.conn == C_MASK) { + ns.conn = C_CONNECTED; if (mdev->state.disk == D_NEGOTIATING) { drbd_force_state(mdev, NS(disk, D_DISKLESS)); } else if (peer_state.disk == D_NEGOTIATING) { @@ -3245,7 +3370,7 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h) } else { if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags)) return FALSE; - D_ASSERT(oconn == C_WF_REPORT_PARAMS); + D_ASSERT(os.conn == C_WF_REPORT_PARAMS); drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); return FALSE; } @@ -3253,18 +3378,28 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h) } spin_lock_irq(&mdev->req_lock); - if (mdev->state.conn != oconn) + if (mdev->state.i != os.i) goto retry; clear_bit(CONSIDER_RESYNC, &mdev->flags); - ns.i = mdev->state.i; - ns.conn = nconn; ns.peer = peer_state.role; ns.pdsk = real_peer_disk; ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp); - if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) + if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) ns.disk = mdev->new_state_tmp.disk; - - rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL); + cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD); + if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED && + test_bit(NEW_CUR_UUID, &mdev->flags)) { + /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this + for temporal network outages! */ + spin_unlock_irq(&mdev->req_lock); + dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n"); + tl_clear(mdev); + drbd_uuid_new_current(mdev); + clear_bit(NEW_CUR_UUID, &mdev->flags); + drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0)); + return FALSE; + } + rv = _drbd_set_state(mdev, ns, cs_flags, NULL); ns = mdev->state; spin_unlock_irq(&mdev->req_lock); @@ -3273,8 +3408,8 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h) return FALSE; } - if (oconn > C_WF_REPORT_PARAMS) { - if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED && + if (os.conn > C_WF_REPORT_PARAMS) { + if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED && peer_state.disk != D_NEGOTIATING ) { /* we want resync, peer has not yet decided to sync... */ /* Nowadays only used when forcing a node into primary role and @@ -3291,9 +3426,9 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h) +static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - struct p_rs_uuid *p = (struct p_rs_uuid *)h; + struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid; wait_event(mdev->misc_wait, mdev->state.conn == C_WF_SYNC_UUID || @@ -3302,10 +3437,6 @@ static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h) /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */ - ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; - if (drbd_recv(mdev, h->payload, h->length) != h->length) - return FALSE; - /* Here the _drbd_uuid_ functions are right, current should _not_ be rotated into the history */ if (get_ldev_if_state(mdev, D_NEGOTIATING)) { @@ -3324,14 +3455,14 @@ static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h) enum receive_bitmap_ret { OK, DONE, FAILED }; static enum receive_bitmap_ret -receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h, - unsigned long *buffer, struct bm_xfer_ctx *c) +receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size, + unsigned long *buffer, struct bm_xfer_ctx *c) { unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset); unsigned want = num_words * sizeof(long); - if (want != h->length) { - dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length); + if (want != data_size) { + dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size); return FAILED; } if (want == 0) @@ -3360,7 +3491,7 @@ recv_bm_rle_bits(struct drbd_conf *mdev, u64 tmp; unsigned long s = c->bit_offset; unsigned long e; - int len = p->head.length - (sizeof(*p) - sizeof(p->head)); + int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head)); int toggle = DCBP_get_start(p); int have; int bits; @@ -3429,7 +3560,7 @@ void INFO_bm_xfer_stats(struct drbd_conf *mdev, const char *direction, struct bm_xfer_ctx *c) { /* what would it take to transfer it "plaintext" */ - unsigned plain = sizeof(struct p_header) * + unsigned plain = sizeof(struct p_header80) * ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1) + c->bm_words * sizeof(long); unsigned total = c->bytes[0] + c->bytes[1]; @@ -3467,12 +3598,13 @@ void INFO_bm_xfer_stats(struct drbd_conf *mdev, in order to be agnostic to the 32 vs 64 bits issue. returns 0 on failure, 1 if we successfully received it. */ -static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h) +static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { struct bm_xfer_ctx c; void *buffer; enum receive_bitmap_ret ret; int ok = FALSE; + struct p_header80 *h = &mdev->data.rbuf.header.h80; wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt)); @@ -3492,39 +3624,39 @@ static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h) }; do { - if (h->command == P_BITMAP) { - ret = receive_bitmap_plain(mdev, h, buffer, &c); - } else if (h->command == P_COMPRESSED_BITMAP) { + if (cmd == P_BITMAP) { + ret = receive_bitmap_plain(mdev, data_size, buffer, &c); + } else if (cmd == P_COMPRESSED_BITMAP) { /* MAYBE: sanity check that we speak proto >= 90, * and the feature is enabled! */ struct p_compressed_bm *p; - if (h->length > BM_PACKET_PAYLOAD_BYTES) { + if (data_size > BM_PACKET_PAYLOAD_BYTES) { dev_err(DEV, "ReportCBitmap packet too large\n"); goto out; } /* use the page buff */ p = buffer; memcpy(p, h, sizeof(*h)); - if (drbd_recv(mdev, p->head.payload, h->length) != h->length) + if (drbd_recv(mdev, p->head.payload, data_size) != data_size) goto out; - if (p->head.length <= (sizeof(*p) - sizeof(p->head))) { - dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length); + if (data_size <= (sizeof(*p) - sizeof(p->head))) { + dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size); return FAILED; } ret = decode_bitmap_c(mdev, p, &c); } else { - dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command); + dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd); goto out; } - c.packets[h->command == P_BITMAP]++; - c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length; + c.packets[cmd == P_BITMAP]++; + c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size; if (ret != OK) break; - if (!drbd_recv_header(mdev, h)) + if (!drbd_recv_header(mdev, &cmd, &data_size)) goto out; } while (ret == OK); if (ret == FAILED) @@ -3555,17 +3687,16 @@ static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h) return ok; } -static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent) +static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { /* TODO zero copy sink :) */ static char sink[128]; int size, want, r; - if (!silent) - dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n", - h->command, h->length); + dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n", + cmd, data_size); - size = h->length; + size = data_size; while (size > 0) { want = min_t(int, size, sizeof(sink)); r = drbd_recv(mdev, sink, want); @@ -3575,17 +3706,7 @@ static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent) return size == 0; } -static int receive_skip(struct drbd_conf *mdev, struct p_header *h) -{ - return receive_skip_(mdev, h, 0); -} - -static int receive_skip_silent(struct drbd_conf *mdev, struct p_header *h) -{ - return receive_skip_(mdev, h, 1); -} - -static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h) +static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { if (mdev->state.disk >= D_INCONSISTENT) drbd_kick_lo(mdev); @@ -3597,108 +3718,94 @@ static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *); - -static drbd_cmd_handler_f drbd_default_handler[] = { - [P_DATA] = receive_Data, - [P_DATA_REPLY] = receive_DataReply, - [P_RS_DATA_REPLY] = receive_RSDataReply, - [P_BARRIER] = receive_Barrier, - [P_BITMAP] = receive_bitmap, - [P_COMPRESSED_BITMAP] = receive_bitmap, - [P_UNPLUG_REMOTE] = receive_UnplugRemote, - [P_DATA_REQUEST] = receive_DataRequest, - [P_RS_DATA_REQUEST] = receive_DataRequest, - [P_SYNC_PARAM] = receive_SyncParam, - [P_SYNC_PARAM89] = receive_SyncParam, - [P_PROTOCOL] = receive_protocol, - [P_UUIDS] = receive_uuids, - [P_SIZES] = receive_sizes, - [P_STATE] = receive_state, - [P_STATE_CHG_REQ] = receive_req_state, - [P_SYNC_UUID] = receive_sync_uuid, - [P_OV_REQUEST] = receive_DataRequest, - [P_OV_REPLY] = receive_DataRequest, - [P_CSUM_RS_REQUEST] = receive_DataRequest, - [P_DELAY_PROBE] = receive_skip_silent, +typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive); + +struct data_cmd { + int expect_payload; + size_t pkt_size; + drbd_cmd_handler_f function; +}; + +static struct data_cmd drbd_cmd_handler[] = { + [P_DATA] = { 1, sizeof(struct p_data), receive_Data }, + [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply }, + [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } , + [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } , + [P_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } , + [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } , + [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header80), receive_UnplugRemote }, + [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, + [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, + [P_SYNC_PARAM] = { 1, sizeof(struct p_header80), receive_SyncParam }, + [P_SYNC_PARAM89] = { 1, sizeof(struct p_header80), receive_SyncParam }, + [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol }, + [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids }, + [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes }, + [P_STATE] = { 0, sizeof(struct p_state), receive_state }, + [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state }, + [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid }, + [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, + [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest }, + [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest }, + [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip }, /* anything missing from this table is in * the asender_tbl, see get_asender_cmd */ - [P_MAX_CMD] = NULL, + [P_MAX_CMD] = { 0, 0, NULL }, }; -static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler; -static drbd_cmd_handler_f *drbd_opt_cmd_handler; +/* All handler functions that expect a sub-header get that sub-heder in + mdev->data.rbuf.header.head.payload. + + Usually in mdev->data.rbuf.header.head the callback can find the usual + p_header, but they may not rely on that. Since there is also p_header95 ! + */ static void drbdd(struct drbd_conf *mdev) { - drbd_cmd_handler_f handler; - struct p_header *header = &mdev->data.rbuf.header; + union p_header *header = &mdev->data.rbuf.header; + unsigned int packet_size; + enum drbd_packets cmd; + size_t shs; /* sub header size */ + int rv; while (get_t_state(&mdev->receiver) == Running) { drbd_thread_current_set_cpu(mdev); - if (!drbd_recv_header(mdev, header)) { - drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); - break; - } + if (!drbd_recv_header(mdev, &cmd, &packet_size)) + goto err_out; - if (header->command < P_MAX_CMD) - handler = drbd_cmd_handler[header->command]; - else if (P_MAY_IGNORE < header->command - && header->command < P_MAX_OPT_CMD) - handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE]; - else if (header->command > P_MAX_OPT_CMD) - handler = receive_skip; - else - handler = NULL; + if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) { + dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size); + goto err_out; + } - if (unlikely(!handler)) { - dev_err(DEV, "unknown packet type %d, l: %d!\n", - header->command, header->length); - drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); - break; + shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header); + rv = drbd_recv(mdev, &header->h80.payload, shs); + if (unlikely(rv != shs)) { + dev_err(DEV, "short read while reading sub header: rv=%d\n", rv); + goto err_out; } - if (unlikely(!handler(mdev, header))) { - dev_err(DEV, "error receiving %s, l: %d!\n", - cmdname(header->command), header->length); - drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); - break; + + if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) { + dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size); + goto err_out; } - } -} -static void drbd_fail_pending_reads(struct drbd_conf *mdev) -{ - struct hlist_head *slot; - struct hlist_node *pos; - struct hlist_node *tmp; - struct drbd_request *req; - int i; + rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs); - /* - * Application READ requests - */ - spin_lock_irq(&mdev->req_lock); - for (i = 0; i < APP_R_HSIZE; i++) { - slot = mdev->app_reads_hash+i; - hlist_for_each_entry_safe(req, pos, tmp, slot, colision) { - /* it may (but should not any longer!) - * be on the work queue; if that assert triggers, - * we need to also grab the - * spin_lock_irq(&mdev->data.work.q_lock); - * and list_del_init here. */ - D_ASSERT(list_empty(&req->w.list)); - /* It would be nice to complete outside of spinlock. - * But this is easier for now. */ - _req_mod(req, connection_lost_while_pending); + if (unlikely(!rv)) { + dev_err(DEV, "error receiving %s, l: %d!\n", + cmdname(cmd), packet_size); + goto err_out; } } - for (i = 0; i < APP_R_HSIZE; i++) - if (!hlist_empty(mdev->app_reads_hash+i)) - dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: " - "%p, should be NULL\n", i, mdev->app_reads_hash[i].first); - memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *)); - spin_unlock_irq(&mdev->req_lock); + if (0) { + err_out: + drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); + } + /* If we leave here, we probably want to update at least the + * "Connected" indicator on stable storage. Do so explicitly here. */ + drbd_md_sync(mdev); } void drbd_flush_workqueue(struct drbd_conf *mdev) @@ -3711,6 +3818,36 @@ void drbd_flush_workqueue(struct drbd_conf *mdev) wait_for_completion(&barr.done); } +void drbd_free_tl_hash(struct drbd_conf *mdev) +{ + struct hlist_head *h; + + spin_lock_irq(&mdev->req_lock); + + if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) { + spin_unlock_irq(&mdev->req_lock); + return; + } + /* paranoia code */ + for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++) + if (h->first) + dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n", + (int)(h - mdev->ee_hash), h->first); + kfree(mdev->ee_hash); + mdev->ee_hash = NULL; + mdev->ee_hash_s = 0; + + /* paranoia code */ + for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) + if (h->first) + dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n", + (int)(h - mdev->tl_hash), h->first); + kfree(mdev->tl_hash); + mdev->tl_hash = NULL; + mdev->tl_hash_s = 0; + spin_unlock_irq(&mdev->req_lock); +} + static void drbd_disconnect(struct drbd_conf *mdev) { enum drbd_fencing_p fp; @@ -3728,6 +3865,7 @@ static void drbd_disconnect(struct drbd_conf *mdev) drbd_thread_stop(&mdev->asender); drbd_free_sock(mdev); + /* wait for current activity to cease. */ spin_lock_irq(&mdev->req_lock); _drbd_wait_ee_list_empty(mdev, &mdev->active_ee); _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee); @@ -3752,7 +3890,6 @@ static void drbd_disconnect(struct drbd_conf *mdev) /* make sure syncer is stopped and w_resume_next_sg queued */ del_timer_sync(&mdev->resync_timer); - set_bit(STOP_SYNC_TIMER, &mdev->flags); resync_timer_fn((unsigned long)mdev); /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier, @@ -3767,11 +3904,9 @@ static void drbd_disconnect(struct drbd_conf *mdev) kfree(mdev->p_uuid); mdev->p_uuid = NULL; - if (!mdev->state.susp) + if (!is_susp(mdev->state)) tl_clear(mdev); - drbd_fail_pending_reads(mdev); - dev_info(DEV, "Connection closed\n"); drbd_md_sync(mdev); @@ -3782,12 +3917,8 @@ static void drbd_disconnect(struct drbd_conf *mdev) put_ldev(mdev); } - if (mdev->state.role == R_PRIMARY) { - if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) { - enum drbd_disk_state nps = drbd_try_outdate_peer(mdev); - drbd_request_state(mdev, NS(pdsk, nps)); - } - } + if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) + drbd_try_outdate_peer_async(mdev); spin_lock_irq(&mdev->req_lock); os = mdev->state; @@ -3800,32 +3931,14 @@ static void drbd_disconnect(struct drbd_conf *mdev) spin_unlock_irq(&mdev->req_lock); if (os.conn == C_DISCONNECTING) { - struct hlist_head *h; - wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0); + wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0); - /* we must not free the tl_hash - * while application io is still on the fly */ - wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0); - - spin_lock_irq(&mdev->req_lock); - /* paranoia code */ - for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++) - if (h->first) - dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n", - (int)(h - mdev->ee_hash), h->first); - kfree(mdev->ee_hash); - mdev->ee_hash = NULL; - mdev->ee_hash_s = 0; - - /* paranoia code */ - for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) - if (h->first) - dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n", - (int)(h - mdev->tl_hash), h->first); - kfree(mdev->tl_hash); - mdev->tl_hash = NULL; - mdev->tl_hash_s = 0; - spin_unlock_irq(&mdev->req_lock); + if (!is_susp(mdev->state)) { + /* we must not free the tl_hash + * while application io is still on the fly */ + wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt)); + drbd_free_tl_hash(mdev); + } crypto_free_hash(mdev->cram_hmac_tfm); mdev->cram_hmac_tfm = NULL; @@ -3845,6 +3958,9 @@ static void drbd_disconnect(struct drbd_conf *mdev) i = drbd_release_ee(mdev, &mdev->net_ee); if (i) dev_info(DEV, "net_ee not empty, killed %u entries\n", i); + i = atomic_read(&mdev->pp_in_use_by_net); + if (i) + dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i); i = atomic_read(&mdev->pp_in_use); if (i) dev_info(DEV, "pp_in_use = %d, expected 0\n", i); @@ -3888,7 +4004,7 @@ static int drbd_send_handshake(struct drbd_conf *mdev) p->protocol_min = cpu_to_be32(PRO_VERSION_MIN); p->protocol_max = cpu_to_be32(PRO_VERSION_MAX); ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE, - (struct p_header *)p, sizeof(*p), 0 ); + (struct p_header80 *)p, sizeof(*p), 0 ); mutex_unlock(&mdev->data.mutex); return ok; } @@ -3904,27 +4020,28 @@ static int drbd_do_handshake(struct drbd_conf *mdev) { /* ASSERT current == mdev->receiver ... */ struct p_handshake *p = &mdev->data.rbuf.handshake; - const int expect = sizeof(struct p_handshake) - -sizeof(struct p_header); + const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80); + unsigned int length; + enum drbd_packets cmd; int rv; rv = drbd_send_handshake(mdev); if (!rv) return 0; - rv = drbd_recv_header(mdev, &p->head); + rv = drbd_recv_header(mdev, &cmd, &length); if (!rv) return 0; - if (p->head.command != P_HAND_SHAKE) { + if (cmd != P_HAND_SHAKE) { dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n", - cmdname(p->head.command), p->head.command); + cmdname(cmd), cmd); return -1; } - if (p->head.length != expect) { + if (length != expect) { dev_err(DEV, "expected HandShake length: %u, received: %u\n", - expect, p->head.length); + expect, length); return -1; } @@ -3982,10 +4099,11 @@ static int drbd_do_auth(struct drbd_conf *mdev) char *response = NULL; char *right_response = NULL; char *peers_ch = NULL; - struct p_header p; unsigned int key_len = strlen(mdev->net_conf->shared_secret); unsigned int resp_size; struct hash_desc desc; + enum drbd_packets cmd; + unsigned int length; int rv; desc.tfm = mdev->cram_hmac_tfm; @@ -4005,33 +4123,33 @@ static int drbd_do_auth(struct drbd_conf *mdev) if (!rv) goto fail; - rv = drbd_recv_header(mdev, &p); + rv = drbd_recv_header(mdev, &cmd, &length); if (!rv) goto fail; - if (p.command != P_AUTH_CHALLENGE) { + if (cmd != P_AUTH_CHALLENGE) { dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n", - cmdname(p.command), p.command); + cmdname(cmd), cmd); rv = 0; goto fail; } - if (p.length > CHALLENGE_LEN*2) { + if (length > CHALLENGE_LEN * 2) { dev_err(DEV, "expected AuthChallenge payload too big.\n"); rv = -1; goto fail; } - peers_ch = kmalloc(p.length, GFP_NOIO); + peers_ch = kmalloc(length, GFP_NOIO); if (peers_ch == NULL) { dev_err(DEV, "kmalloc of peers_ch failed\n"); rv = -1; goto fail; } - rv = drbd_recv(mdev, peers_ch, p.length); + rv = drbd_recv(mdev, peers_ch, length); - if (rv != p.length) { + if (rv != length) { dev_err(DEV, "short read AuthChallenge: l=%u\n", rv); rv = 0; goto fail; @@ -4046,7 +4164,7 @@ static int drbd_do_auth(struct drbd_conf *mdev) } sg_init_table(&sg, 1); - sg_set_buf(&sg, peers_ch, p.length); + sg_set_buf(&sg, peers_ch, length); rv = crypto_hash_digest(&desc, &sg, sg.length, response); if (rv) { @@ -4059,18 +4177,18 @@ static int drbd_do_auth(struct drbd_conf *mdev) if (!rv) goto fail; - rv = drbd_recv_header(mdev, &p); + rv = drbd_recv_header(mdev, &cmd, &length); if (!rv) goto fail; - if (p.command != P_AUTH_RESPONSE) { + if (cmd != P_AUTH_RESPONSE) { dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n", - cmdname(p.command), p.command); + cmdname(cmd), cmd); rv = 0; goto fail; } - if (p.length != resp_size) { + if (length != resp_size) { dev_err(DEV, "expected AuthResponse payload of wrong size\n"); rv = 0; goto fail; @@ -4155,7 +4273,7 @@ int drbdd_init(struct drbd_thread *thi) /* ********* acknowledge sender ******** */ -static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h) +static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h) { struct p_req_state_reply *p = (struct p_req_state_reply *)h; @@ -4173,13 +4291,13 @@ static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int got_Ping(struct drbd_conf *mdev, struct p_header *h) +static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h) { return drbd_send_ping_ack(mdev); } -static int got_PingAck(struct drbd_conf *mdev, struct p_header *h) +static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h) { /* restore idle timeout */ mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ; @@ -4189,7 +4307,7 @@ static int got_PingAck(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h) +static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h) { struct p_block_ack *p = (struct p_block_ack *)h; sector_t sector = be64_to_cpu(p->sector); @@ -4199,11 +4317,15 @@ static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h) update_peer_seq(mdev, be32_to_cpu(p->seq_num)); - drbd_rs_complete_io(mdev, sector); - drbd_set_in_sync(mdev, sector, blksize); - /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ - mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); + if (get_ldev(mdev)) { + drbd_rs_complete_io(mdev, sector); + drbd_set_in_sync(mdev, sector, blksize); + /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ + mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); + put_ldev(mdev); + } dec_rs_pending(mdev); + atomic_add(blksize >> 9, &mdev->rs_sect_in); return TRUE; } @@ -4259,7 +4381,7 @@ static int validate_req_change_req_state(struct drbd_conf *mdev, return TRUE; } -static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h) +static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h) { struct p_block_ack *p = (struct p_block_ack *)h; sector_t sector = be64_to_cpu(p->sector); @@ -4299,7 +4421,7 @@ static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h) _ack_id_to_req, __func__ , what); } -static int got_NegAck(struct drbd_conf *mdev, struct p_header *h) +static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h) { struct p_block_ack *p = (struct p_block_ack *)h; sector_t sector = be64_to_cpu(p->sector); @@ -4319,7 +4441,7 @@ static int got_NegAck(struct drbd_conf *mdev, struct p_header *h) _ack_id_to_req, __func__ , neg_acked); } -static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h) +static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h) { struct p_block_ack *p = (struct p_block_ack *)h; sector_t sector = be64_to_cpu(p->sector); @@ -4332,7 +4454,7 @@ static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h) _ar_id_to_req, __func__ , neg_acked); } -static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h) +static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h) { sector_t sector; int size; @@ -4354,7 +4476,7 @@ static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h) +static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h) { struct p_barrier_ack *p = (struct p_barrier_ack *)h; @@ -4363,7 +4485,7 @@ static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int got_OVResult(struct drbd_conf *mdev, struct p_header *h) +static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h) { struct p_block_ack *p = (struct p_block_ack *)h; struct drbd_work *w; @@ -4380,6 +4502,9 @@ static int got_OVResult(struct drbd_conf *mdev, struct p_header *h) else ov_oos_print(mdev); + if (!get_ldev(mdev)) + return TRUE; + drbd_rs_complete_io(mdev, sector); dec_rs_pending(mdev); @@ -4394,18 +4519,18 @@ static int got_OVResult(struct drbd_conf *mdev, struct p_header *h) drbd_resync_finished(mdev); } } + put_ldev(mdev); return TRUE; } -static int got_something_to_ignore_m(struct drbd_conf *mdev, struct p_header *h) +static int got_skip(struct drbd_conf *mdev, struct p_header80 *h) { - /* IGNORE */ return TRUE; } struct asender_cmd { size_t pkt_size; - int (*process)(struct drbd_conf *mdev, struct p_header *h); + int (*process)(struct drbd_conf *mdev, struct p_header80 *h); }; static struct asender_cmd *get_asender_cmd(int cmd) @@ -4414,8 +4539,8 @@ static struct asender_cmd *get_asender_cmd(int cmd) /* anything missing from this table is in * the drbd_cmd_handler (drbd_default_handler) table, * see the beginning of drbdd() */ - [P_PING] = { sizeof(struct p_header), got_Ping }, - [P_PING_ACK] = { sizeof(struct p_header), got_PingAck }, + [P_PING] = { sizeof(struct p_header80), got_Ping }, + [P_PING_ACK] = { sizeof(struct p_header80), got_PingAck }, [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, @@ -4427,7 +4552,7 @@ static struct asender_cmd *get_asender_cmd(int cmd) [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck }, [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply }, [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync }, - [P_DELAY_PROBE] = { sizeof(struct p_delay_probe), got_something_to_ignore_m }, + [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip }, [P_MAX_CMD] = { 0, NULL }, }; if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL) @@ -4438,13 +4563,13 @@ static struct asender_cmd *get_asender_cmd(int cmd) int drbd_asender(struct drbd_thread *thi) { struct drbd_conf *mdev = thi->mdev; - struct p_header *h = &mdev->meta.rbuf.header; + struct p_header80 *h = &mdev->meta.rbuf.header.h80; struct asender_cmd *cmd = NULL; int rv, len; void *buf = h; int received = 0; - int expect = sizeof(struct p_header); + int expect = sizeof(struct p_header80); int empty; sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev)); @@ -4468,10 +4593,8 @@ int drbd_asender(struct drbd_thread *thi) while (1) { clear_bit(SIGNAL_ASENDER, &mdev->flags); flush_signals(current); - if (!drbd_process_done_ee(mdev)) { - dev_err(DEV, "process_done_ee() = NOT_OK\n"); + if (!drbd_process_done_ee(mdev)) goto reconnect; - } /* to avoid race with newly queued ACKs */ set_bit(SIGNAL_ASENDER, &mdev->flags); spin_lock_irq(&mdev->req_lock); @@ -4530,21 +4653,23 @@ int drbd_asender(struct drbd_thread *thi) if (received == expect && cmd == NULL) { if (unlikely(h->magic != BE_DRBD_MAGIC)) { - dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n", - (long)be32_to_cpu(h->magic), - h->command, h->length); + dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n", + be32_to_cpu(h->magic), + be16_to_cpu(h->command), + be16_to_cpu(h->length)); goto reconnect; } cmd = get_asender_cmd(be16_to_cpu(h->command)); len = be16_to_cpu(h->length); if (unlikely(cmd == NULL)) { - dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n", - (long)be32_to_cpu(h->magic), - h->command, h->length); + dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n", + be32_to_cpu(h->magic), + be16_to_cpu(h->command), + be16_to_cpu(h->length)); goto disconnect; } expect = cmd->pkt_size; - ERR_IF(len != expect-sizeof(struct p_header)) + ERR_IF(len != expect-sizeof(struct p_header80)) goto reconnect; } if (received == expect) { @@ -4554,7 +4679,7 @@ int drbd_asender(struct drbd_thread *thi) buf = h; received = 0; - expect = sizeof(struct p_header); + expect = sizeof(struct p_header80); cmd = NULL; } } @@ -4562,10 +4687,12 @@ int drbd_asender(struct drbd_thread *thi) if (0) { reconnect: drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); + drbd_md_sync(mdev); } if (0) { disconnect: drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); + drbd_md_sync(mdev); } clear_bit(SIGNAL_ASENDER, &mdev->flags); |