summaryrefslogtreecommitdiff
path: root/fs/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/ast.c15
-rw-r--r--fs/dlm/ast.h1
-rw-r--r--fs/dlm/dlm_internal.h2
-rw-r--r--fs/dlm/lock.c167
-rw-r--r--fs/dlm/lock.h2
-rw-r--r--fs/dlm/lockspace.c32
-rw-r--r--fs/dlm/lockspace.h13
-rw-r--r--fs/dlm/lowcomms.c4
-rw-r--r--fs/dlm/netlink.c1
-rw-r--r--fs/dlm/user.c17
10 files changed, 175 insertions, 79 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 19ef136f9e4f..d60a8d8f109d 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -200,13 +200,13 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
if (!prev_seq) {
kref_get(&lkb->lkb_ref);
+ mutex_lock(&ls->ls_cb_mutex);
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
- mutex_lock(&ls->ls_cb_mutex);
list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
- mutex_unlock(&ls->ls_cb_mutex);
} else {
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
}
+ mutex_unlock(&ls->ls_cb_mutex);
}
out:
mutex_unlock(&lkb->lkb_cb_mutex);
@@ -288,10 +288,13 @@ void dlm_callback_stop(struct dlm_ls *ls)
void dlm_callback_suspend(struct dlm_ls *ls)
{
- set_bit(LSFL_CB_DELAY, &ls->ls_flags);
+ if (ls->ls_callback_wq) {
+ mutex_lock(&ls->ls_cb_mutex);
+ set_bit(LSFL_CB_DELAY, &ls->ls_flags);
+ mutex_unlock(&ls->ls_cb_mutex);
- if (ls->ls_callback_wq)
flush_workqueue(ls->ls_callback_wq);
+ }
}
#define MAX_CB_QUEUE 25
@@ -302,11 +305,11 @@ void dlm_callback_resume(struct dlm_ls *ls)
int count = 0, sum = 0;
bool empty;
- clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
-
if (!ls->ls_callback_wq)
return;
+ clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
+
more:
mutex_lock(&ls->ls_cb_mutex);
list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h
index 181ad7d20c4d..e5e05fcc5813 100644
--- a/fs/dlm/ast.h
+++ b/fs/dlm/ast.h
@@ -11,7 +11,6 @@
#ifndef __ASTD_DOT_H__
#define __ASTD_DOT_H__
-void dlm_del_ast(struct dlm_lkb *lkb);
int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags, uint64_t seq);
int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 8aca8085d24e..e34c3d2639a5 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -661,7 +661,7 @@ struct dlm_ls {
spinlock_t ls_recover_idr_lock;
wait_queue_head_t ls_wait_general;
wait_queue_head_t ls_recover_lock_wait;
- struct mutex ls_clear_proc_locks;
+ spinlock_t ls_clear_proc_locks;
struct list_head ls_root_list; /* root resources */
struct rw_semaphore ls_root_sem; /* protect root_list */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index dac7eb75dba9..94a72ede5764 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -401,7 +401,7 @@ static int pre_rsb_struct(struct dlm_ls *ls)
unlock any spinlocks, go back and call pre_rsb_struct again.
Otherwise, take an rsb off the list and return it. */
-static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
+static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
struct dlm_rsb **r_ret)
{
struct dlm_rsb *r;
@@ -412,7 +412,8 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
count = ls->ls_new_rsb_count;
spin_unlock(&ls->ls_new_rsb_spin);
log_debug(ls, "find_rsb retry %d %d %s",
- count, dlm_config.ci_new_rsb_count, name);
+ count, dlm_config.ci_new_rsb_count,
+ (const char *)name);
return -EAGAIN;
}
@@ -448,7 +449,7 @@ static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
}
-int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
+int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
struct dlm_rsb **r_ret)
{
struct rb_node *node = tree->rb_node;
@@ -546,7 +547,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
* while that rsb has a potentially stale master.)
*/
-static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
+static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
uint32_t hash, uint32_t b,
int dir_nodeid, int from_nodeid,
unsigned int flags, struct dlm_rsb **r_ret)
@@ -724,7 +725,7 @@ static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
dlm_recover_locks) before we've made ourself master (in
dlm_recover_masters). */
-static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
+static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
uint32_t hash, uint32_t b,
int dir_nodeid, int from_nodeid,
unsigned int flags, struct dlm_rsb **r_ret)
@@ -818,8 +819,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
return error;
}
-static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
- unsigned int flags, struct dlm_rsb **r_ret)
+static int find_rsb(struct dlm_ls *ls, const void *name, int len,
+ int from_nodeid, unsigned int flags,
+ struct dlm_rsb **r_ret)
{
uint32_t hash, b;
int dir_nodeid;
@@ -2864,17 +2866,9 @@ static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_args *args)
{
- int rv = -EINVAL;
+ int rv = -EBUSY;
if (args->flags & DLM_LKF_CONVERT) {
- if (lkb->lkb_flags & DLM_IFL_MSTCPY)
- goto out;
-
- if (args->flags & DLM_LKF_QUECVT &&
- !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
- goto out;
-
- rv = -EBUSY;
if (lkb->lkb_status != DLM_LKSTS_GRANTED)
goto out;
@@ -2884,6 +2878,14 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
if (is_overlap(lkb))
goto out;
+
+ rv = -EINVAL;
+ if (lkb->lkb_flags & DLM_IFL_MSTCPY)
+ goto out;
+
+ if (args->flags & DLM_LKF_QUECVT &&
+ !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
+ goto out;
}
lkb->lkb_exflags = args->flags;
@@ -2900,11 +2902,25 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
#endif
rv = 0;
out:
- if (rv)
- log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
+ switch (rv) {
+ case 0:
+ break;
+ case -EINVAL:
+ /* annoy the user because dlm usage is wrong */
+ WARN_ON(1);
+ log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
+ rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
+ lkb->lkb_status, lkb->lkb_wait_type,
+ lkb->lkb_resource->res_name);
+ break;
+ default:
+ log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
lkb->lkb_status, lkb->lkb_wait_type,
lkb->lkb_resource->res_name);
+ break;
+ }
+
return rv;
}
@@ -2918,23 +2934,12 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
- int rv = -EINVAL;
-
- if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
- log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
- dlm_print_lkb(lkb);
- goto out;
- }
-
- /* an lkb may still exist even though the lock is EOL'ed due to a
- cancel, unlock or failed noqueue request; an app can't use these
- locks; return same error as if the lkid had not been found at all */
+ int rv = -EBUSY;
- if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
- log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
- rv = -ENOENT;
+ /* normal unlock not allowed if there's any op in progress */
+ if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
+ (lkb->lkb_wait_type || lkb->lkb_wait_count))
goto out;
- }
/* an lkb may be waiting for an rsb lookup to complete where the
lookup was initiated by another lock */
@@ -2949,7 +2954,24 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
unhold_lkb(lkb); /* undoes create_lkb() */
}
/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
- rv = -EBUSY;
+ goto out;
+ }
+
+ rv = -EINVAL;
+ if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
+ log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
+ dlm_print_lkb(lkb);
+ goto out;
+ }
+
+ /* an lkb may still exist even though the lock is EOL'ed due to a
+ * cancel, unlock or failed noqueue request; an app can't use these
+ * locks; return same error as if the lkid had not been found at all
+ */
+
+ if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
+ log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
+ rv = -ENOENT;
goto out;
}
@@ -3022,14 +3044,8 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
goto out;
}
/* add_to_waiters() will set OVERLAP_UNLOCK */
- goto out_ok;
}
- /* normal unlock not allowed if there's any op in progress */
- rv = -EBUSY;
- if (lkb->lkb_wait_type || lkb->lkb_wait_count)
- goto out;
-
out_ok:
/* an overlapping op shouldn't blow away exflags from other op */
lkb->lkb_exflags |= args->flags;
@@ -3037,11 +3053,25 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
lkb->lkb_astparam = args->astparam;
rv = 0;
out:
- if (rv)
- log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
+ switch (rv) {
+ case 0:
+ break;
+ case -EINVAL:
+ /* annoy the user because dlm usage is wrong */
+ WARN_ON(1);
+ log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
+ lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
+ args->flags, lkb->lkb_wait_type,
+ lkb->lkb_resource->res_name);
+ break;
+ default:
+ log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
args->flags, lkb->lkb_wait_type,
lkb->lkb_resource->res_name);
+ break;
+ }
+
return rv;
}
@@ -3292,8 +3322,9 @@ static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
* request_lock(), convert_lock(), unlock_lock(), cancel_lock()
*/
-static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
- int len, struct dlm_args *args)
+static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
+ const void *name, int len,
+ struct dlm_args *args)
{
struct dlm_rsb *r;
int error;
@@ -3392,7 +3423,7 @@ int dlm_lock(dlm_lockspace_t *lockspace,
int mode,
struct dlm_lksb *lksb,
uint32_t flags,
- void *name,
+ const void *name,
unsigned int namelen,
uint32_t parent_lkid,
void (*ast) (void *astarg),
@@ -3438,7 +3469,7 @@ int dlm_lock(dlm_lockspace_t *lockspace,
if (error == -EINPROGRESS)
error = 0;
out_put:
- trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error);
+ trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
if (convert || error)
__put_lkb(ls, lkb);
@@ -3623,7 +3654,7 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
case cpu_to_le32(DLM_MSG_GRANT):
- if (!lkb->lkb_lvbptr)
+ if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
break;
memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
break;
@@ -5080,8 +5111,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
down_read(&ls->ls_recv_active);
if (hd->h_cmd == DLM_MSG)
dlm_receive_message(ls, &p->message, nodeid);
- else
+ else if (hd->h_cmd == DLM_RCOM)
dlm_receive_rcom(ls, &p->rcom, nodeid);
+ else
+ log_error(ls, "invalid h_cmd %d from %d lockspace %x",
+ hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
up_read(&ls->ls_recv_active);
dlm_put_lockspace(ls);
@@ -5801,6 +5835,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
{
struct dlm_lkb *lkb;
struct dlm_args args;
+ bool do_put = true;
int error;
dlm_lock_recovery(ls);
@@ -5811,13 +5846,14 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
goto out;
}
+ trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
+
if (flags & DLM_LKF_VALBLK) {
ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
if (!ua->lksb.sb_lvbptr) {
kfree(ua);
- __put_lkb(ls, lkb);
error = -ENOMEM;
- goto out;
+ goto out_put;
}
}
#ifdef CONFIG_DLM_DEPRECATED_API
@@ -5831,8 +5867,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
kfree(ua->lksb.sb_lvbptr);
ua->lksb.sb_lvbptr = NULL;
kfree(ua);
- __put_lkb(ls, lkb);
- goto out;
+ goto out_put;
}
/* After ua is attached to lkb it will be freed by dlm_free_lkb().
@@ -5851,8 +5886,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
error = 0;
fallthrough;
default:
- __put_lkb(ls, lkb);
- goto out;
+ goto out_put;
}
/* add this new lkb to the per-process list of locks */
@@ -5860,6 +5894,11 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
hold_lkb(lkb);
list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
spin_unlock(&ua->proc->locks_spin);
+ do_put = false;
+ out_put:
+ trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
+ if (do_put)
+ __put_lkb(ls, lkb);
out:
dlm_unlock_recovery(ls);
return error;
@@ -5885,6 +5924,8 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
if (error)
goto out;
+ trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
+
/* user can change the params on its lock when it converts it, or
add an lvb that didn't exist before */
@@ -5922,6 +5963,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
error = 0;
out_put:
+ trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
dlm_put_lkb(lkb);
out:
dlm_unlock_recovery(ls);
@@ -6014,6 +6056,8 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
if (error)
goto out;
+ trace_dlm_unlock_start(ls, lkb, flags);
+
ua = lkb->lkb_ua;
if (lvb_in && ua->lksb.sb_lvbptr)
@@ -6042,6 +6086,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
spin_unlock(&ua->proc->locks_spin);
out_put:
+ trace_dlm_unlock_end(ls, lkb, flags, error);
dlm_put_lkb(lkb);
out:
dlm_unlock_recovery(ls);
@@ -6063,6 +6108,8 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
if (error)
goto out;
+ trace_dlm_unlock_start(ls, lkb, flags);
+
ua = lkb->lkb_ua;
if (ua_tmp->castparam)
ua->castparam = ua_tmp->castparam;
@@ -6080,6 +6127,7 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
if (error == -EBUSY)
error = 0;
out_put:
+ trace_dlm_unlock_end(ls, lkb, flags, error);
dlm_put_lkb(lkb);
out:
dlm_unlock_recovery(ls);
@@ -6101,6 +6149,8 @@ int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
if (error)
goto out;
+ trace_dlm_unlock_start(ls, lkb, flags);
+
ua = lkb->lkb_ua;
error = set_unlock_args(flags, ua, &args);
@@ -6129,6 +6179,7 @@ int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
if (error == -EBUSY)
error = 0;
out_put:
+ trace_dlm_unlock_end(ls, lkb, flags, error);
dlm_put_lkb(lkb);
out:
dlm_unlock_recovery(ls);
@@ -6184,7 +6235,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
{
struct dlm_lkb *lkb = NULL;
- mutex_lock(&ls->ls_clear_proc_locks);
+ spin_lock(&ls->ls_clear_proc_locks);
if (list_empty(&proc->locks))
goto out;
@@ -6196,7 +6247,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
else
lkb->lkb_flags |= DLM_IFL_DEAD;
out:
- mutex_unlock(&ls->ls_clear_proc_locks);
+ spin_unlock(&ls->ls_clear_proc_locks);
return lkb;
}
@@ -6233,7 +6284,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb);
}
- mutex_lock(&ls->ls_clear_proc_locks);
+ spin_lock(&ls->ls_clear_proc_locks);
/* in-progress unlocks */
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
@@ -6249,7 +6300,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb);
}
- mutex_unlock(&ls->ls_clear_proc_locks);
+ spin_unlock(&ls->ls_clear_proc_locks);
dlm_unlock_recovery(ls);
}
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index a7b6474f009d..40c76b5544da 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -36,7 +36,7 @@ static inline void dlm_adjust_timeouts(struct dlm_ls *ls) { }
int dlm_master_lookup(struct dlm_ls *ls, int nodeid, char *name, int len,
unsigned int flags, int *r_nodeid, int *result);
-int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
+int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
struct dlm_rsb **r_ret);
void dlm_recover_purge(struct dlm_ls *ls);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 3972f4d86c75..bae050df7abf 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -416,7 +416,7 @@ static int new_lockspace(const char *name, const char *cluster,
if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
return -EINVAL;
- if (!lvblen || (lvblen % 8))
+ if (lvblen % 8)
return -EINVAL;
if (!try_module_get(THIS_MODULE))
@@ -584,7 +584,7 @@ static int new_lockspace(const char *name, const char *cluster,
atomic_set(&ls->ls_requestqueue_cnt, 0);
init_waitqueue_head(&ls->ls_requestqueue_wait);
mutex_init(&ls->ls_requestqueue_mutex);
- mutex_init(&ls->ls_clear_proc_locks);
+ spin_lock_init(&ls->ls_clear_proc_locks);
/* Due backwards compatibility with 3.1 we need to use maximum
* possible dlm message size to be sure the message will fit and
@@ -703,10 +703,11 @@ static int new_lockspace(const char *name, const char *cluster,
return error;
}
-int dlm_new_lockspace(const char *name, const char *cluster,
- uint32_t flags, int lvblen,
- const struct dlm_lockspace_ops *ops, void *ops_arg,
- int *ops_result, dlm_lockspace_t **lockspace)
+static int __dlm_new_lockspace(const char *name, const char *cluster,
+ uint32_t flags, int lvblen,
+ const struct dlm_lockspace_ops *ops,
+ void *ops_arg, int *ops_result,
+ dlm_lockspace_t **lockspace)
{
int error = 0;
@@ -732,6 +733,25 @@ int dlm_new_lockspace(const char *name, const char *cluster,
return error;
}
+int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
+ int lvblen, const struct dlm_lockspace_ops *ops,
+ void *ops_arg, int *ops_result,
+ dlm_lockspace_t **lockspace)
+{
+ return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
+ ops, ops_arg, ops_result, lockspace);
+}
+
+int dlm_new_user_lockspace(const char *name, const char *cluster,
+ uint32_t flags, int lvblen,
+ const struct dlm_lockspace_ops *ops,
+ void *ops_arg, int *ops_result,
+ dlm_lockspace_t **lockspace)
+{
+ return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
+ ops_arg, ops_result, lockspace);
+}
+
static int lkb_idr_is_local(int id, void *p, void *data)
{
struct dlm_lkb *lkb = p;
diff --git a/fs/dlm/lockspace.h b/fs/dlm/lockspace.h
index 306fc4f4ea15..03f4a4a3a871 100644
--- a/fs/dlm/lockspace.h
+++ b/fs/dlm/lockspace.h
@@ -12,6 +12,14 @@
#ifndef __LOCKSPACE_DOT_H__
#define __LOCKSPACE_DOT_H__
+/* DLM_LSFL_FS
+ * The lockspace user is in the kernel (i.e. filesystem). Enables
+ * direct bast/cast callbacks.
+ *
+ * internal lockspace flag - will be removed in future
+ */
+#define DLM_LSFL_FS 0x00000004
+
int dlm_lockspace_init(void);
void dlm_lockspace_exit(void);
struct dlm_ls *dlm_find_lockspace_global(uint32_t id);
@@ -20,6 +28,11 @@ struct dlm_ls *dlm_find_lockspace_device(int minor);
void dlm_put_lockspace(struct dlm_ls *ls);
void dlm_stop_lockspaces(void);
void dlm_stop_lockspaces_check(void);
+int dlm_new_user_lockspace(const char *name, const char *cluster,
+ uint32_t flags, int lvblen,
+ const struct dlm_lockspace_ops *ops,
+ void *ops_arg, int *ops_result,
+ dlm_lockspace_t **lockspace);
#endif /* __LOCKSPACE_DOT_H__ */
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index a4e84e8d94c8..59f64c596233 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1336,6 +1336,8 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
return NULL;
}
+ /* for dlm_lowcomms_commit_msg() */
+ kref_get(&msg->ref);
/* we assume if successful commit must called */
msg->idx = idx;
return msg;
@@ -1375,6 +1377,8 @@ void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
{
_dlm_lowcomms_commit_msg(msg);
srcu_read_unlock(&connections_srcu, msg->idx);
+ /* because dlm_lowcomms_new_msg() */
+ kref_put(&msg->ref, dlm_msg_release);
}
#endif
diff --git a/fs/dlm/netlink.c b/fs/dlm/netlink.c
index 67f68d48d60c..4de4b8651c6c 100644
--- a/fs/dlm/netlink.c
+++ b/fs/dlm/netlink.c
@@ -75,6 +75,7 @@ static struct genl_family family __ro_after_init = {
.version = DLM_GENL_VERSION,
.small_ops = dlm_nl_ops,
.n_small_ops = ARRAY_SIZE(dlm_nl_ops),
+ .resv_start_op = DLM_CMD_HELLO + 1,
.module = THIS_MODULE,
};
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 99e8f0744513..c5d27bccc3dc 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -16,6 +16,8 @@
#include <linux/slab.h>
#include <linux/sched/signal.h>
+#include <trace/events/dlm.h>
+
#include "dlm_internal.h"
#include "lockspace.h"
#include "lock.h"
@@ -184,7 +186,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
return;
ls = lkb->lkb_resource->res_ls;
- mutex_lock(&ls->ls_clear_proc_locks);
+ spin_lock(&ls->ls_clear_proc_locks);
/* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
can't be delivered. For ORPHAN's, dlm_clear_proc_locks() freed
@@ -230,7 +232,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
spin_unlock(&proc->locks_spin);
}
out:
- mutex_unlock(&ls->ls_clear_proc_locks);
+ spin_unlock(&ls->ls_clear_proc_locks);
}
static int device_user_lock(struct dlm_user_proc *proc,
@@ -421,9 +423,9 @@ static int device_create_lockspace(struct dlm_lspace_params *params)
if (!capable(CAP_SYS_ADMIN))
return -EPERM;
- error = dlm_new_lockspace(params->name, dlm_config.ci_cluster_name, params->flags,
- DLM_USER_LVB_LEN, NULL, NULL, NULL,
- &lockspace);
+ error = dlm_new_user_lockspace(params->name, dlm_config.ci_cluster_name,
+ params->flags, DLM_USER_LVB_LEN, NULL,
+ NULL, NULL, &lockspace);
if (error)
return error;
@@ -882,7 +884,9 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
goto try_another;
}
- if (cb.flags & DLM_CB_CAST) {
+ if (cb.flags & DLM_CB_BAST) {
+ trace_dlm_bast(lkb->lkb_resource->res_ls, lkb, cb.mode);
+ } else if (cb.flags & DLM_CB_CAST) {
new_mode = cb.mode;
if (!cb.sb_status && lkb->lkb_lksb->sb_lvbptr &&
@@ -891,6 +895,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
lkb->lkb_lksb->sb_status = cb.sb_status;
lkb->lkb_lksb->sb_flags = cb.sb_flags;
+ trace_dlm_ast(lkb->lkb_resource->res_ls, lkb);
}
rv = copy_result_to_user(lkb->lkb_ua,