diff options
Diffstat (limited to 'fs/ocfs2/dlm/dlmrecovery.c')
-rw-r--r-- | fs/ocfs2/dlm/dlmrecovery.c | 56 |
1 files changed, 41 insertions, 15 deletions
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 9e4f862d20fe..f6b313898763 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1373,6 +1373,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data, char *buf = NULL; struct dlm_work_item *item = NULL; struct dlm_lock_resource *res = NULL; + unsigned int hash; if (!dlm_grab(dlm)) return -EINVAL; @@ -1400,11 +1401,26 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data, /* lookup the lock to see if we have a secondary queue for this * already... just add the locks in and this will have its owner * and RECOVERY flag changed when it completes. */ - res = dlm_lookup_lockres(dlm, mres->lockname, mres->lockname_len); + hash = dlm_lockid_hash(mres->lockname, mres->lockname_len); + spin_lock(&dlm->spinlock); + res = __dlm_lookup_lockres_full(dlm, mres->lockname, mres->lockname_len, + hash); if (res) { /* this will get a ref on res */ /* mark it as recovering/migrating and hash it */ spin_lock(&res->spinlock); + if (res->state & DLM_LOCK_RES_DROPPING_REF) { + mlog(0, "%s: node is attempting to migrate " + "lockres %.*s, but marked as dropping " + " ref!\n", dlm->name, + mres->lockname_len, mres->lockname); + ret = -EINVAL; + spin_unlock(&res->spinlock); + spin_unlock(&dlm->spinlock); + dlm_lockres_put(res); + goto leave; + } + if (mres->flags & DLM_MRES_RECOVERY) { res->state |= DLM_LOCK_RES_RECOVERING; } else { @@ -1421,13 +1437,16 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data, mres->lockname_len, mres->lockname); ret = -EFAULT; spin_unlock(&res->spinlock); + spin_unlock(&dlm->spinlock); dlm_lockres_put(res); goto leave; } res->state |= DLM_LOCK_RES_MIGRATING; } spin_unlock(&res->spinlock); + spin_unlock(&dlm->spinlock); } else { + spin_unlock(&dlm->spinlock); /* need to allocate, just like if it was * mastered here normally */ res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len); @@ -2064,7 +2083,6 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm, dlm_lock_get(lock); if (lock->convert_pending) { /* move converting lock back to granted */ - BUG_ON(i != DLM_CONVERTING_LIST); mlog(0, "node died with convert pending " "on %.*s. move back to granted list.\n", res->lockname.len, res->lockname.name); @@ -2156,6 +2174,13 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, for (i = 0; i < DLM_HASH_BUCKETS; i++) { bucket = dlm_lockres_hash(dlm, i); hlist_for_each_entry(res, bucket, hash_node) { + if (res->state & DLM_LOCK_RES_RECOVERY_WAITING) { + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_RECOVERY_WAITING; + spin_unlock(&res->spinlock); + wake_up(&res->wq); + } + if (!(res->state & DLM_LOCK_RES_RECOVERING)) continue; @@ -2293,6 +2318,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, res->lockname.len, res->lockname.name, freed, dead_node); __dlm_print_one_lock_resource(res); } + res->state |= DLM_LOCK_RES_RECOVERY_WAITING; dlm_lockres_clear_refmap_bit(dlm, res, dead_node); } else if (test_bit(dead_node, res->refmap)) { mlog(0, "%s:%.*s: dead node %u had a ref, but had " @@ -2360,6 +2386,8 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) break; } } + dlm_lockres_clear_refmap_bit(dlm, res, + dead_node); spin_unlock(&res->spinlock); continue; } @@ -2368,14 +2396,16 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) dlm_revalidate_lvb(dlm, res, dead_node); if (res->owner == dead_node) { if (res->state & DLM_LOCK_RES_DROPPING_REF) { - mlog(ML_NOTICE, "%s: res %.*s, Skip " - "recovery as it is being freed\n", - dlm->name, res->lockname.len, - res->lockname.name); - } else - dlm_move_lockres_to_recovery_list(dlm, - res); - + mlog(0, "%s:%.*s: owned by " + "dead node %u, this node was " + "dropping its ref when it died. " + "continue, dropping the flag.\n", + dlm->name, res->lockname.len, + res->lockname.name, dead_node); + } + res->state &= ~DLM_LOCK_RES_DROPPING_REF; + dlm_move_lockres_to_recovery_list(dlm, + res); } else if (res->owner == dlm->node_num) { dlm_free_dead_locks(dlm, res, dead_node); __dlm_lockres_calc_usage(dlm, res); @@ -2450,11 +2480,7 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx) * perhaps later we can genericize this for other waiters. */ wake_up(&dlm->migration_wq); - if (test_bit(idx, dlm->recovery_map)) - mlog(0, "domain %s, node %u already added " - "to recovery map!\n", dlm->name, idx); - else - set_bit(idx, dlm->recovery_map); + set_bit(idx, dlm->recovery_map); } void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data) |