diff options
Diffstat (limited to 'fs/dlm/rcom.c')
-rw-r--r-- | fs/dlm/rcom.c | 147 |
1 files changed, 112 insertions, 35 deletions
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index 64d3e2b958c7..87f1a56eab32 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -23,8 +23,6 @@ #include "memory.h" #include "lock.h" #include "util.h" -#include "member.h" - static int rcom_response(struct dlm_ls *ls) { @@ -275,19 +273,9 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) struct dlm_rcom *rc; struct dlm_mhandle *mh; int error = 0; - int max_size = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom); ls->ls_recover_nodeid = nodeid; - if (nodeid == dlm_our_nodeid()) { - ls->ls_recover_buf->rc_header.h_length = - dlm_config.ci_buffer_size; - dlm_copy_master_names(ls, last_name, last_len, - ls->ls_recover_buf->rc_buf, - max_size, nodeid); - goto out; - } - error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh); if (error) goto out; @@ -337,7 +325,26 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) if (error) goto out; memcpy(rc->rc_buf, r->res_name, r->res_length); - rc->rc_id = (unsigned long) r; + rc->rc_id = (unsigned long) r->res_id; + + send_rcom(ls, mh, rc); + out: + return error; +} + +int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid) +{ + struct dlm_rcom *rc; + struct dlm_mhandle *mh; + struct dlm_ls *ls = r->res_ls; + int error; + + error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length, + &rc, &mh); + if (error) + goto out; + memcpy(rc->rc_buf, r->res_name, r->res_length); + rc->rc_id = 0xFFFFFFFF; send_rcom(ls, mh, rc); out: @@ -355,7 +362,14 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) if (error) return; - error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid); + if (rc_in->rc_id == 0xFFFFFFFF) { + log_error(ls, "receive_rcom_lookup dump from %d", nodeid); + dlm_dump_rsb_name(ls, rc_in->rc_buf, len); + return; + } + + error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len, + DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL); if (error) ret_nodeid = error; rc->rc_result = ret_nodeid; @@ -486,17 +500,76 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) return 0; } +/* + * Ignore messages for stage Y before we set + * recover_status bit for stage X: + * + * recover_status = 0 + * + * dlm_recover_members() + * - send nothing + * - recv nothing + * - ignore NAMES, NAMES_REPLY + * - ignore LOOKUP, LOOKUP_REPLY + * - ignore LOCK, LOCK_REPLY + * + * recover_status |= NODES + * + * dlm_recover_members_wait() + * + * dlm_recover_directory() + * - send NAMES + * - recv NAMES_REPLY + * - ignore LOOKUP, LOOKUP_REPLY + * - ignore LOCK, LOCK_REPLY + * + * recover_status |= DIR + * + * dlm_recover_directory_wait() + * + * dlm_recover_masters() + * - send LOOKUP + * - recv LOOKUP_REPLY + * + * dlm_recover_locks() + * - send LOCKS + * - recv LOCKS_REPLY + * + * recover_status |= LOCKS + * + * dlm_recover_locks_wait() + * + * recover_status |= DONE + */ + /* Called by dlm_recv; corresponds to dlm_receive_message() but special recovery-only comms are sent through here. */ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) { int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); - int stop, reply = 0, lock = 0; + int stop, reply = 0, names = 0, lookup = 0, lock = 0; uint32_t status; uint64_t seq; switch (rc->rc_type) { + case DLM_RCOM_STATUS_REPLY: + reply = 1; + break; + case DLM_RCOM_NAMES: + names = 1; + break; + case DLM_RCOM_NAMES_REPLY: + names = 1; + reply = 1; + break; + case DLM_RCOM_LOOKUP: + lookup = 1; + break; + case DLM_RCOM_LOOKUP_REPLY: + lookup = 1; + reply = 1; + break; case DLM_RCOM_LOCK: lock = 1; break; @@ -504,10 +577,6 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) lock = 1; reply = 1; break; - case DLM_RCOM_STATUS_REPLY: - case DLM_RCOM_NAMES_REPLY: - case DLM_RCOM_LOOKUP_REPLY: - reply = 1; }; spin_lock(&ls->ls_recover_lock); @@ -516,19 +585,17 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) seq = ls->ls_recover_seq; spin_unlock(&ls->ls_recover_lock); - if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) || - (reply && (rc->rc_seq_reply != seq)) || - (lock && !(status & DLM_RS_DIR))) { - log_limit(ls, "dlm_receive_rcom ignore msg %d " - "from %d %llu %llu recover seq %llu sts %x gen %u", - rc->rc_type, - nodeid, - (unsigned long long)rc->rc_seq, - (unsigned long long)rc->rc_seq_reply, - (unsigned long long)seq, - status, ls->ls_generation); - goto out; - } + if (stop && (rc->rc_type != DLM_RCOM_STATUS)) + goto ignore; + + if (reply && (rc->rc_seq_reply != seq)) + goto ignore; + + if (!(status & DLM_RS_NODES) && (names || lookup || lock)) + goto ignore; + + if (!(status & DLM_RS_DIR) && (lookup || lock)) + goto ignore; switch (rc->rc_type) { case DLM_RCOM_STATUS: @@ -570,10 +637,20 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) default: log_error(ls, "receive_rcom bad type %d", rc->rc_type); } -out: + return; + +ignore: + log_limit(ls, "dlm_receive_rcom ignore msg %d " + "from %d %llu %llu recover seq %llu sts %x gen %u", + rc->rc_type, + nodeid, + (unsigned long long)rc->rc_seq, + (unsigned long long)rc->rc_seq_reply, + (unsigned long long)seq, + status, ls->ls_generation); return; Eshort: - log_error(ls, "recovery message %x from %d is too short", - rc->rc_type, nodeid); + log_error(ls, "recovery message %d from %d is too short", + rc->rc_type, nodeid); } |