summaryrefslogtreecommitdiff
path: root/fs/dlm/ast.c
diff options
context:
space:
mode:
authorAlexander Aring <aahringo@redhat.com>2024-03-28 18:48:41 +0300
committerDavid Teigland <teigland@redhat.com>2024-04-01 21:31:12 +0300
commit986ae3c2a8dfc1e229cabe9cc2e0b01b721c8980 (patch)
tree6c036bb2a5ba03325b2c6c4ecca07d74c85bf958 /fs/dlm/ast.c
parent0175e51b5134b55c89364aae68ec16271c67e472 (diff)
downloadlinux-986ae3c2a8dfc1e229cabe9cc2e0b01b721c8980.tar.xz
dlm: fix race between final callback and remove
This patch fixes the following issue: node 1 is dir node 2 is master node 3 is other 1->2: unlock 2: put final lkb, rsb moved to toss 2->1: unlock_reply 1: queue lkb callback with EUNLOCK 2->1: remove 1: receive_remove ignored (rsb on keep because of queued lkb callback) 1: complete lkb callback, put_lkb, move rsb to toss 3->1: lookup 1->3: lookup_reply master=2 3->2: request 2->3: request_reply EBADR In summary: An unexpected lkb reference causes the rsb to remain on the wrong list. The rsb being on the wrong list causes receive_remove to be ignored. An ignored receive_remove causes inconsistent dir and master state. This sequence requires an unusually long delay in delivering the unlock callback, because the remove message from 2->1 usually happens after some seconds. So, it's not known exactly how frequently this sequence occurs in pratice. It's possible that the same end result could also have another unknown cause. The solution for this issue is to further separate callback state from the lkb, so that an lkb reference (and from that, an rsb ref) are not held while a callback remains queued. Then, within the unlock_reply, the lkb will be freed and the rsb moved to the toss list. So, the receive_remove will not be ignored. Signed-off-by: Alexander Aring <aahringo@redhat.com> Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm/ast.c')
-rw-r--r--fs/dlm/ast.c166
1 files changed, 61 insertions, 105 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 5ea0b62f276b..e9da812352b4 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -37,12 +37,32 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from,
*from = to;
}
-int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
- int status, uint32_t sbflags)
+static void dlm_callback_work(struct work_struct *work)
{
- struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+ struct dlm_callback *cb = container_of(work, struct dlm_callback, work);
+
+ if (cb->flags & DLM_CB_BAST) {
+ trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name,
+ cb->res_length);
+ cb->bastfn(cb->astparam, cb->mode);
+ } else if (cb->flags & DLM_CB_CAST) {
+ trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status,
+ cb->sb_flags, cb->res_name, cb->res_length);
+ cb->lkb_lksb->sb_status = cb->sb_status;
+ cb->lkb_lksb->sb_flags = cb->sb_flags;
+ cb->astfn(cb->astparam);
+ }
+
+ kref_put(&cb->ref, dlm_release_callback);
+}
+
+int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
+ int status, uint32_t sbflags,
+ struct dlm_callback **cb)
+{
+ struct dlm_rsb *rsb = lkb->lkb_resource;
int rv = DLM_ENQUEUE_CALLBACK_SUCCESS;
- struct dlm_callback *cb;
+ struct dlm_ls *ls = rsb->res_ls;
int copy_lvb = 0;
int prev_mode;
@@ -88,57 +108,46 @@ int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
}
}
- cb = dlm_allocate_cb();
- if (!cb) {
+ *cb = dlm_allocate_cb();
+ if (!*cb) {
rv = DLM_ENQUEUE_CALLBACK_FAILURE;
goto out;
}
- cb->flags = flags;
- cb->mode = mode;
- cb->sb_status = status;
- cb->sb_flags = (sbflags & 0x000000FF);
- cb->copy_lvb = copy_lvb;
- kref_init(&cb->ref);
- if (!test_and_set_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags))
- rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
+ /* for tracing */
+ (*cb)->lkb_id = lkb->lkb_id;
+ (*cb)->ls_id = ls->ls_global_id;
+ memcpy((*cb)->res_name, rsb->res_name, rsb->res_length);
+ (*cb)->res_length = rsb->res_length;
- list_add_tail(&cb->list, &lkb->lkb_callbacks);
+ (*cb)->flags = flags;
+ (*cb)->mode = mode;
+ (*cb)->sb_status = status;
+ (*cb)->sb_flags = (sbflags & 0x000000FF);
+ (*cb)->copy_lvb = copy_lvb;
+ (*cb)->lkb_lksb = lkb->lkb_lksb;
+ kref_init(&(*cb)->ref);
if (flags & DLM_CB_BAST) {
lkb->lkb_last_bast_time = ktime_get();
- lkb->lkb_last_bast_mode = cb->mode;
+ lkb->lkb_last_bast_mode = mode;
} else if (flags & DLM_CB_CAST) {
- dlm_callback_set_last_ptr(&lkb->lkb_last_cast, cb);
+ dlm_callback_set_last_ptr(&lkb->lkb_last_cast, *cb);
lkb->lkb_last_cast_time = ktime_get();
}
- dlm_callback_set_last_ptr(&lkb->lkb_last_cb, cb);
+ dlm_callback_set_last_ptr(&lkb->lkb_last_cb, *cb);
+ rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
- out:
+out:
return rv;
}
-int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb)
-{
- /* oldest undelivered cb is callbacks first entry */
- *cb = list_first_entry_or_null(&lkb->lkb_callbacks,
- struct dlm_callback, list);
- if (!*cb)
- return DLM_DEQUEUE_CALLBACK_EMPTY;
-
- /* remove it from callbacks so shift others down */
- list_del(&(*cb)->list);
- if (list_empty(&lkb->lkb_callbacks))
- return DLM_DEQUEUE_CALLBACK_LAST;
-
- return DLM_DEQUEUE_CALLBACK_SUCCESS;
-}
-
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
- uint32_t sbflags)
+ uint32_t sbflags)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+ struct dlm_callback *cb;
int rv;
if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
@@ -146,18 +155,20 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
return;
}
- spin_lock(&lkb->lkb_cb_lock);
- rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
+ rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags,
+ &cb);
switch (rv) {
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
- kref_get(&lkb->lkb_ref);
+ cb->astfn = lkb->lkb_astfn;
+ cb->bastfn = lkb->lkb_bastfn;
+ cb->astparam = lkb->lkb_astparam;
+ INIT_WORK(&cb->work, dlm_callback_work);
spin_lock(&ls->ls_cb_lock);
- if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
- list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
- } else {
- queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
- }
+ if (test_bit(LSFL_CB_DELAY, &ls->ls_flags))
+ list_add(&cb->list, &ls->ls_cb_delay);
+ else
+ queue_work(ls->ls_callback_wq, &cb->work);
spin_unlock(&ls->ls_cb_lock);
break;
case DLM_ENQUEUE_CALLBACK_SUCCESS:
@@ -168,67 +179,12 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
WARN_ON_ONCE(1);
break;
}
- spin_unlock(&lkb->lkb_cb_lock);
-}
-
-void dlm_callback_work(struct work_struct *work)
-{
- struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
- struct dlm_ls *ls = lkb->lkb_resource->res_ls;
- struct dlm_rsb *rsb = lkb->lkb_resource;
- void (*castfn) (void *astparam);
- void (*bastfn) (void *astparam, int mode);
- struct dlm_callback *cb;
- int rv;
-
- spin_lock(&lkb->lkb_cb_lock);
- rv = dlm_dequeue_lkb_callback(lkb, &cb);
- if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) {
- clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
- spin_unlock(&lkb->lkb_cb_lock);
- goto out;
- }
- spin_unlock(&lkb->lkb_cb_lock);
-
- for (;;) {
- castfn = lkb->lkb_astfn;
- bastfn = lkb->lkb_bastfn;
-
- if (cb->flags & DLM_CB_BAST) {
- trace_dlm_bast(ls->ls_global_id, lkb->lkb_id,
- cb->mode, rsb->res_name,
- rsb->res_length);
- bastfn(lkb->lkb_astparam, cb->mode);
- } else if (cb->flags & DLM_CB_CAST) {
- lkb->lkb_lksb->sb_status = cb->sb_status;
- lkb->lkb_lksb->sb_flags = cb->sb_flags;
- trace_dlm_ast(ls->ls_global_id, lkb->lkb_id,
- cb->sb_flags, cb->sb_status,
- rsb->res_name, rsb->res_length);
- castfn(lkb->lkb_astparam);
- }
-
- kref_put(&cb->ref, dlm_release_callback);
-
- spin_lock(&lkb->lkb_cb_lock);
- rv = dlm_dequeue_lkb_callback(lkb, &cb);
- if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
- clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
- spin_unlock(&lkb->lkb_cb_lock);
- break;
- }
- spin_unlock(&lkb->lkb_cb_lock);
- }
-
-out:
- /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
- dlm_put_lkb(lkb);
}
int dlm_callback_start(struct dlm_ls *ls)
{
- ls->ls_callback_wq = alloc_workqueue("dlm_callback",
- WQ_HIGHPRI | WQ_MEM_RECLAIM, 0);
+ ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback",
+ WQ_HIGHPRI | WQ_MEM_RECLAIM);
if (!ls->ls_callback_wq) {
log_print("can't start dlm_callback workqueue");
return -ENOMEM;
@@ -257,7 +213,7 @@ void dlm_callback_suspend(struct dlm_ls *ls)
void dlm_callback_resume(struct dlm_ls *ls)
{
- struct dlm_lkb *lkb, *safe;
+ struct dlm_callback *cb, *safe;
int count = 0, sum = 0;
bool empty;
@@ -266,9 +222,9 @@ void dlm_callback_resume(struct dlm_ls *ls)
more:
spin_lock(&ls->ls_cb_lock);
- list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
- list_del_init(&lkb->lkb_cb_list);
- queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
+ list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) {
+ list_del(&cb->list);
+ queue_work(ls->ls_callback_wq, &cb->work);
count++;
if (count == MAX_CB_QUEUE)
break;