summaryrefslogtreecommitdiff
path: root/fs/ocfs2/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm')
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h2
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c53
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c29
-rw-r--r--fs/ocfs2/dlm/dlmthread.c57
4 files changed, 95 insertions, 46 deletions
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 8107d0d0c3f6..e9f3705c4c9f 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -1004,6 +1004,8 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
u8 nodenum, u8 *real_master);
+void __dlm_do_purge_lockres(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res);
int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 13719d3f35f8..6ea06f8a7d29 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -2276,9 +2276,12 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
dlm->name, namelen, lockname, res->owner, r);
dlm_print_one_lock_resource(res);
- BUG();
- }
- return ret ? ret : r;
+ if (r == -ENOMEM)
+ BUG();
+ } else
+ ret = r;
+
+ return ret;
}
int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
@@ -2416,48 +2419,26 @@ int dlm_deref_lockres_done_handler(struct o2net_msg *msg, u32 len, void *data,
}
spin_lock(&res->spinlock);
- BUG_ON(!(res->state & DLM_LOCK_RES_DROPPING_REF));
- if (!list_empty(&res->purge)) {
- mlog(0, "%s: Removing res %.*s from purgelist\n",
- dlm->name, res->lockname.len, res->lockname.name);
- list_del_init(&res->purge);
- dlm_lockres_put(res);
- dlm->purge_count--;
- }
-
- if (!__dlm_lockres_unused(res)) {
- mlog(ML_ERROR, "%s: res %.*s in use after deref\n",
- dlm->name, res->lockname.len, res->lockname.name);
- __dlm_print_one_lock_resource(res);
- BUG();
- }
-
- __dlm_unhash_lockres(dlm, res);
-
- spin_lock(&dlm->track_lock);
- if (!list_empty(&res->tracking))
- list_del_init(&res->tracking);
- else {
- mlog(ML_ERROR, "%s: Resource %.*s not on the Tracking list\n",
- dlm->name, res->lockname.len, res->lockname.name);
- __dlm_print_one_lock_resource(res);
+ if (!(res->state & DLM_LOCK_RES_DROPPING_REF)) {
+ spin_unlock(&res->spinlock);
+ spin_unlock(&dlm->spinlock);
+ mlog(ML_NOTICE, "%s:%.*s: node %u sends deref done "
+ "but it is already derefed!\n", dlm->name,
+ res->lockname.len, res->lockname.name, node);
+ ret = 0;
+ goto done;
}
- spin_unlock(&dlm->track_lock);
- /* lockres is not in the hash now. drop the flag and wake up
- * any processes waiting in dlm_get_lock_resource.
- */
- res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+ __dlm_do_purge_lockres(dlm, res);
spin_unlock(&res->spinlock);
wake_up(&res->wq);
- dlm_lockres_put(res);
-
spin_unlock(&dlm->spinlock);
ret = 0;
-
done:
+ if (res)
+ dlm_lockres_put(res);
dlm_put(dlm);
return ret;
}
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index f6b313898763..dd5cb8bcefd1 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -2343,6 +2343,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
struct dlm_lock_resource *res;
int i;
struct hlist_head *bucket;
+ struct hlist_node *tmp;
struct dlm_lock *lock;
@@ -2365,7 +2366,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
*/
for (i = 0; i < DLM_HASH_BUCKETS; i++) {
bucket = dlm_lockres_hash(dlm, i);
- hlist_for_each_entry(res, bucket, hash_node) {
+ hlist_for_each_entry_safe(res, tmp, bucket, hash_node) {
/* always prune any $RECOVERY entries for dead nodes,
* otherwise hangs can occur during later recovery */
if (dlm_is_recovery_lock(res->lockname.name,
@@ -2386,8 +2387,17 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
break;
}
}
- dlm_lockres_clear_refmap_bit(dlm, res,
- dead_node);
+
+ if ((res->owner == dead_node) &&
+ (res->state & DLM_LOCK_RES_DROPPING_REF)) {
+ dlm_lockres_get(res);
+ __dlm_do_purge_lockres(dlm, res);
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+ dlm_lockres_put(res);
+ continue;
+ } else if (res->owner == dlm->node_num)
+ dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
spin_unlock(&res->spinlock);
continue;
}
@@ -2398,14 +2408,17 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
if (res->state & DLM_LOCK_RES_DROPPING_REF) {
mlog(0, "%s:%.*s: owned by "
"dead node %u, this node was "
- "dropping its ref when it died. "
- "continue, dropping the flag.\n",
+ "dropping its ref when master died. "
+ "continue, purging the lockres.\n",
dlm->name, res->lockname.len,
res->lockname.name, dead_node);
+ dlm_lockres_get(res);
+ __dlm_do_purge_lockres(dlm, res);
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+ dlm_lockres_put(res);
+ continue;
}
- res->state &= ~DLM_LOCK_RES_DROPPING_REF;
- dlm_move_lockres_to_recovery_list(dlm,
- res);
} else if (res->owner == dlm->node_num) {
dlm_free_dead_locks(dlm, res, dead_node);
__dlm_lockres_calc_usage(dlm, res);
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index 68d239ba0c63..838a06d4066a 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -160,6 +160,52 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
spin_unlock(&dlm->spinlock);
}
+/*
+ * Do the real purge work:
+ * unhash the lockres, and
+ * clear flag DLM_LOCK_RES_DROPPING_REF.
+ * It requires dlm and lockres spinlock to be taken.
+ */
+void __dlm_do_purge_lockres(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res)
+{
+ assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&res->spinlock);
+
+ if (!list_empty(&res->purge)) {
+ mlog(0, "%s: Removing res %.*s from purgelist\n",
+ dlm->name, res->lockname.len, res->lockname.name);
+ list_del_init(&res->purge);
+ dlm_lockres_put(res);
+ dlm->purge_count--;
+ }
+
+ if (!__dlm_lockres_unused(res)) {
+ mlog(ML_ERROR, "%s: res %.*s in use after deref\n",
+ dlm->name, res->lockname.len, res->lockname.name);
+ __dlm_print_one_lock_resource(res);
+ BUG();
+ }
+
+ __dlm_unhash_lockres(dlm, res);
+
+ spin_lock(&dlm->track_lock);
+ if (!list_empty(&res->tracking))
+ list_del_init(&res->tracking);
+ else {
+ mlog(ML_ERROR, "%s: Resource %.*s not on the Tracking list\n",
+ dlm->name, res->lockname.len, res->lockname.name);
+ __dlm_print_one_lock_resource(res);
+ }
+ spin_unlock(&dlm->track_lock);
+
+ /*
+ * lockres is not in the hash now. drop the flag and wake up
+ * any processes waiting in dlm_get_lock_resource.
+ */
+ res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+}
+
static void dlm_purge_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
@@ -175,6 +221,13 @@ static void dlm_purge_lockres(struct dlm_ctxt *dlm,
res->lockname.len, res->lockname.name, master);
if (!master) {
+ if (res->state & DLM_LOCK_RES_DROPPING_REF) {
+ mlog(ML_NOTICE, "%s: res %.*s already in DLM_LOCK_RES_DROPPING_REF state\n",
+ dlm->name, res->lockname.len, res->lockname.name);
+ spin_unlock(&res->spinlock);
+ return;
+ }
+
res->state |= DLM_LOCK_RES_DROPPING_REF;
/* drop spinlock... retake below */
spin_unlock(&res->spinlock);
@@ -203,8 +256,8 @@ static void dlm_purge_lockres(struct dlm_ctxt *dlm,
dlm->purge_count--;
}
- if (!master && ret != 0) {
- mlog(0, "%s: deref %.*s in progress or master goes down\n",
+ if (!master && ret == DLM_DEREF_RESPONSE_INPROG) {
+ mlog(0, "%s: deref %.*s in progress\n",
dlm->name, res->lockname.len, res->lockname.name);
spin_unlock(&res->spinlock);
return;