All the mail mirrored from lore.kernel.org
 help / color / mirror / Atom feed
* [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context
@ 2024-02-27  1:48 Alexander Aring
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 01/18] fs: dlm: Simplify the allocation of slab caches in dlm_midcomms_cache_create Alexander Aring
                   ` (17 more replies)
  0 siblings, 18 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

Hi,

this is version 3 of my attempt to bring dlm message parsing to softirq
context. This will "minimal" improve the dlm message parsing that is
still message after message parsing. There is still a context switch
involed that is the dlm callback workqueue. This patch mostly have the
effect that we call "queue_work()" more often. In future we will get
rid of the dlm callback workqueue and call the DLM user callback
directly in the dlm message processing softirq context. However this
requires that DLM users adapt the changes and signal this capability
over a new lockspace flag.

- Alex

changes since v3:
 - add patches that got meanwhile on the mailinglist to resend them.
 - add "remove schedule in dlm receive path" to remove a schedule()
   in the dlm msg processing path.
 - change commit message of "dlm: do dlm message processing in softirq
   context" to mention the 

changes since v2:

I changed in v2 that we split the root_list to root_list and a per
lockspace masters list. The root_list can be used as a stack list
variable in ls_recover() as it's only used for recovery handling. The
masters list is somehow special because it is being used for other dlm
nodes to dump the nodes master rsbs. The current implementation
guarantees that this happens in a very special part of the recovery
handling by using a kind of distributed cluster barriers. I added more
sanity checks for this handling and a more per node based recovery log
mechanism. There is also a TODO that describes we should keep track of
all masters rsb while lockspace locking handling and not create them
while recovery handling which I think should improve the handling to
maybe get rid of those barriers.

Alexander Aring (16):
  dlm: fix off-by-one waiters refcount handling
  dlm: put lkbs instead of force free
  dlm: remove allocation parameter in msg allocation
  dlm: switch to GFP_ATOMIC in dlm allocations
  dlm: move root_list functionality to recover.c
  dlm: move master dir dump to own list
  dlm: move root_list to ls_recover() stack
  dlm: implement directory dump context
  dlm: drop holding waiters mutex in waiters recovery
  dlm: convert ls_waiters_mutex to spinlock
  dlm: convert res_lock to spinlock
  dlm: make requestqueue handling non sleepable
  dlm: ls_recv_active semaphore to rwlock
  dlm: remove schedule in dlm receive path
  dlm: convert message parsing locks to disable bh
  dlm: do dlm message processing in softirq context

Kunwu Chan (2):
  fs: dlm: Simplify the allocation of slab caches in
    dlm_midcomms_cache_create
  fs: dlm: Simplify the allocation of slab caches in
    dlm_lowcomms_msg_cache_create

 fs/dlm/ast.c          |  28 ++---
 fs/dlm/debug_fs.c     |  36 +++---
 fs/dlm/dir.c          | 147 ++++++++++++++++++----
 fs/dlm/dir.h          |   3 +-
 fs/dlm/dlm_internal.h |  20 +--
 fs/dlm/lock.c         | 277 +++++++++++++++++++++++-------------------
 fs/dlm/lock.h         |   8 +-
 fs/dlm/lockspace.c    |  96 ++++++++-------
 fs/dlm/lowcomms.c     |  68 +++++------
 fs/dlm/lowcomms.h     |   5 +-
 fs/dlm/member.c       |  23 ++--
 fs/dlm/memory.c       |  14 +--
 fs/dlm/memory.h       |   4 +-
 fs/dlm/midcomms.c     |  67 +++++-----
 fs/dlm/midcomms.h     |   3 +-
 fs/dlm/rcom.c         |  33 +++--
 fs/dlm/recover.c      | 126 ++++++-------------
 fs/dlm/recover.h      |  10 +-
 fs/dlm/recoverd.c     | 125 ++++++++++++++++---
 fs/dlm/requestqueue.c |  43 ++-----
 fs/dlm/user.c         |  34 +++---
 21 files changed, 657 insertions(+), 513 deletions(-)

-- 
2.43.0


^ permalink raw reply	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 01/18] fs: dlm: Simplify the allocation of slab caches in dlm_midcomms_cache_create
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
@ 2024-02-27  1:48 ` Alexander Aring
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 02/18] fs: dlm: Simplify the allocation of slab caches in dlm_lowcomms_msg_cache_create Alexander Aring
                   ` (16 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

From: Kunwu Chan <chentao@kylinos.cn>

Use the new KMEM_CACHE() macro instead of direct kmem_cache_create
to simplify the creation of SLAB caches.

Signed-off-by: Kunwu Chan <chentao@kylinos.cn>
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/midcomms.c | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 2247ebb61be1..8e9920f1b48b 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -226,8 +226,7 @@ static DEFINE_MUTEX(close_lock);
 
 struct kmem_cache *dlm_midcomms_cache_create(void)
 {
-	return kmem_cache_create("dlm_mhandle", sizeof(struct dlm_mhandle),
-				 0, 0, NULL);
+	return KMEM_CACHE(dlm_mhandle, 0);
 }
 
 static inline const char *dlm_state_str(int state)
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 02/18] fs: dlm: Simplify the allocation of slab caches in dlm_lowcomms_msg_cache_create
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 01/18] fs: dlm: Simplify the allocation of slab caches in dlm_midcomms_cache_create Alexander Aring
@ 2024-02-27  1:48 ` Alexander Aring
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 03/18] dlm: fix off-by-one waiters refcount handling Alexander Aring
                   ` (15 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

From: Kunwu Chan <chentao@kylinos.cn>

Use the new KMEM_CACHE() macro instead of direct kmem_cache_create
to simplify the creation of SLAB caches.

Signed-off-by: Kunwu Chan <chentao@kylinos.cn>
Acked-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 6296c62c10fa..712165a1e567 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -248,7 +248,7 @@ struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
 
 struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
 {
-	return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL);
+	return KMEM_CACHE(dlm_msg, 0);
 }
 
 /* need to held writequeue_lock */
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 03/18] dlm: fix off-by-one waiters refcount handling
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 01/18] fs: dlm: Simplify the allocation of slab caches in dlm_midcomms_cache_create Alexander Aring
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 02/18] fs: dlm: Simplify the allocation of slab caches in dlm_lowcomms_msg_cache_create Alexander Aring
@ 2024-02-27  1:48 ` Alexander Aring
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 04/18] dlm: put lkbs instead of force free Alexander Aring
                   ` (14 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

There was a wrong conversion to atomic counters in commit 75a7d60134ce
("fs: dlm: handle lkb wait count as atomic_t"), when
atomic_dec_and_test() returns true it will decrement at first and
then return true if it hits zero. This means we will mis a unhold_lkb()
for the last iteration. This patch fixes this issue and if the last
reference is taken we will remove the lkb from the waiters list as this
is how it's supposed to work.

Fixes: 75a7d60134ce ("fs: dlm: handle lkb wait count as atomic_t")
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 652c51fbbf76..c30e9f8d017e 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -5070,11 +5070,13 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 		/* drop all wait_count references we still
 		 * hold a reference for this iteration.
 		 */
-		while (!atomic_dec_and_test(&lkb->lkb_wait_count))
-			unhold_lkb(lkb);
-
 		mutex_lock(&ls->ls_waiters_mutex);
-		list_del_init(&lkb->lkb_wait_reply);
+		while (atomic_read(&lkb->lkb_wait_count)) {
+			if (atomic_dec_and_test(&lkb->lkb_wait_count))
+				list_del_init(&lkb->lkb_wait_reply);
+
+			unhold_lkb(lkb);
+		}
 		mutex_unlock(&ls->ls_waiters_mutex);
 
 		if (oc || ou) {
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 04/18] dlm: put lkbs instead of force free
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (2 preceding siblings ...)
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 03/18] dlm: fix off-by-one waiters refcount handling Alexander Aring
@ 2024-02-27  1:48 ` Alexander Aring
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 05/18] dlm: remove allocation parameter in msg allocation Alexander Aring
                   ` (13 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch converts a force free of the lkb idr and switch to use the
lkbs put functionality. If there are still references hold due the lkb
programming logic and its state it will be drop before. Instead of force
freeing the lkbs of the idr using the refcounters makes sure we using
the reference counters correctly. If we do that, then no rsb should be
left on the lockspace keep hash bucket which is an additonally check
added to this patch. All rsbs on the toss list should have a reference
counter of 1.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c      |  2 +-
 fs/dlm/lock.h      |  1 +
 fs/dlm/lockspace.c | 31 +++++++++++++++++++++----------
 3 files changed, 23 insertions(+), 11 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index c30e9f8d017e..f77f479e53b6 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1368,7 +1368,7 @@ static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
 	}
 }
 
-static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
+void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
 	lkb->lkb_status = 0;
 	list_del(&lkb->lkb_statequeue);
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index b54e2cbbe6e2..853c3d3dc49d 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -60,6 +60,7 @@ int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
 		      int lkb_nodeid, unsigned int lkb_flags, int lkb_status);
 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
 				 int mstype, int to_nodeid);
+void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb);
 
 static inline int is_master(struct dlm_rsb *r)
 {
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 0455dddb0797..c7ab7358422b 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -737,14 +737,28 @@ static int lkb_idr_is_any(int id, void *p, void *data)
 	return 1;
 }
 
-static int lkb_idr_free(int id, void *p, void *data)
+/*
+ * No locking required, lockspace usage should be synchronized
+ * to have any activity anymore.
+ */
+static int lkb_idr_put(int id, void *p, void *data)
 {
 	struct dlm_lkb *lkb = p;
 
-	if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
-		dlm_free_lvb(lkb->lkb_lvbptr);
+	if (lkb->lkb_status)
+		del_lkb(lkb->lkb_resource, lkb);
 
-	dlm_free_lkb(lkb);
+	/* drop all wait_count references we still
+	 * hold a reference for this iteration.
+	 */
+	while (atomic_read(&lkb->lkb_wait_count)) {
+		if (atomic_dec_and_test(&lkb->lkb_wait_count))
+			list_del_init(&lkb->lkb_wait_reply);
+
+		WARN_ON_ONCE(dlm_put_lkb(lkb));
+	}
+
+	WARN_ON_ONCE(!dlm_put_lkb(lkb));
 	return 0;
 }
 
@@ -826,7 +840,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	 * Free all lkb's in idr
 	 */
 
-	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
+	idr_for_each(&ls->ls_lkbidr, lkb_idr_put, ls);
 	idr_destroy(&ls->ls_lkbidr);
 
 	/*
@@ -834,15 +848,12 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	 */
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
-			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-			rb_erase(n, &ls->ls_rsbtbl[i].keep);
-			dlm_free_rsb(rsb);
-		}
+		WARN_ON_ONCE(!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].keep));
 
 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
+			WARN_ON_ONCE(kref_read(&rsb->res_ref) != 1);
 			dlm_free_rsb(rsb);
 		}
 	}
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 05/18] dlm: remove allocation parameter in msg allocation
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (3 preceding siblings ...)
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 04/18] dlm: put lkbs instead of force free Alexander Aring
@ 2024-02-27  1:48 ` Alexander Aring
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 06/18] dlm: switch to GFP_ATOMIC in dlm allocations Alexander Aring
                   ` (12 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch removes the context parameter for message allocations and
replace it by always do GFP_ATOMIC. We are preparing to process dlm
message in softirq context therefore it's necessary to switch to
GFP_ATOMIC allocation as we cannot sleep in this context. To simplify
the code overall we just drop the allocation flag and have GFP_ATOMIC
hardcoded when calling the allocation function.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c     | 31 ++++++++++++-------------------
 fs/dlm/lowcomms.c | 16 +++++++---------
 fs/dlm/lowcomms.h |  5 ++---
 fs/dlm/memory.c   |  8 ++++----
 fs/dlm/memory.h   |  4 ++--
 fs/dlm/midcomms.c | 24 ++++++++++--------------
 fs/dlm/midcomms.h |  3 +--
 fs/dlm/rcom.c     |  7 +++----
 8 files changed, 41 insertions(+), 57 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index f77f479e53b6..28e882e0dd19 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3329,8 +3329,7 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
 static int _create_message(struct dlm_ls *ls, int mb_len,
 			   int to_nodeid, int mstype,
 			   struct dlm_message **ms_ret,
-			   struct dlm_mhandle **mh_ret,
-			   gfp_t allocation)
+			   struct dlm_mhandle **mh_ret)
 {
 	struct dlm_message *ms;
 	struct dlm_mhandle *mh;
@@ -3340,7 +3339,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
 	   pass into midcomms_commit and a message buffer (mb) that we
 	   write our data into */
 
-	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb);
+	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
 	if (!mh)
 		return -ENOBUFS;
 
@@ -3362,8 +3361,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
 			  int to_nodeid, int mstype,
 			  struct dlm_message **ms_ret,
-			  struct dlm_mhandle **mh_ret,
-			  gfp_t allocation)
+			  struct dlm_mhandle **mh_ret)
 {
 	int mb_len = sizeof(struct dlm_message);
 
@@ -3384,7 +3382,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
 	}
 
 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
-			       ms_ret, mh_ret, allocation);
+			       ms_ret, mh_ret);
 }
 
 /* further lowcomms enhancements or alternate implementations may make
@@ -3453,7 +3451,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
 	if (error)
 		return error;
 
-	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
+	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
 	if (error)
 		goto fail;
 
@@ -3513,8 +3511,7 @@ static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
 	to_nodeid = lkb->lkb_nodeid;
 
-	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh,
-			       GFP_NOFS);
+	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
 	if (error)
 		goto out;
 
@@ -3535,8 +3532,7 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
 
 	to_nodeid = lkb->lkb_nodeid;
 
-	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh,
-			       GFP_NOFS);
+	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
 	if (error)
 		goto out;
 
@@ -3561,8 +3557,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
 	if (error)
 		return error;
 
-	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh,
-			       GFP_NOFS);
+	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
 	if (error)
 		goto fail;
 
@@ -3586,8 +3581,7 @@ static int send_remove(struct dlm_rsb *r)
 
 	to_nodeid = dlm_dir_nodeid(r);
 
-	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh,
-			       GFP_ATOMIC);
+	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
 	if (error)
 		goto out;
 
@@ -3608,7 +3602,7 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 
 	to_nodeid = lkb->lkb_nodeid;
 
-	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
+	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
 	if (error)
 		goto out;
 
@@ -3650,8 +3644,7 @@ static int send_lookup_reply(struct dlm_ls *ls,
 	struct dlm_mhandle *mh;
 	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
 
-	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh,
-			       GFP_NOFS);
+	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
 	if (error)
 		goto out;
 
@@ -6065,7 +6058,7 @@ static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
 	int error;
 
 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
-				DLM_MSG_PURGE, &ms, &mh, GFP_NOFS);
+				DLM_MSG_PURGE, &ms, &mh);
 	if (error)
 		return error;
 	ms->m_nodeid = cpu_to_le32(nodeid);
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 712165a1e567..ab2cfbd2ea77 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1229,14 +1229,13 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
 };
 
 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
-						gfp_t allocation, char **ppc,
-						void (*cb)(void *data),
+						char **ppc, void (*cb)(void *data),
 						void *data)
 {
 	struct writequeue_entry *e;
 	struct dlm_msg *msg;
 
-	msg = dlm_allocate_msg(allocation);
+	msg = dlm_allocate_msg();
 	if (!msg)
 		return NULL;
 
@@ -1261,9 +1260,8 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
  * dlm_lowcomms_commit_msg which is a must call if success
  */
 #ifndef __CHECKER__
-struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
-				     char **ppc, void (*cb)(void *data),
-				     void *data)
+struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
+				     void (*cb)(void *data), void *data)
 {
 	struct connection *con;
 	struct dlm_msg *msg;
@@ -1284,7 +1282,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
 		return NULL;
 	}
 
-	msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
+	msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data);
 	if (!msg) {
 		srcu_read_unlock(&connections_srcu, idx);
 		return NULL;
@@ -1348,8 +1346,8 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
 	if (msg->retransmit)
 		return 1;
 
-	msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
-					      GFP_ATOMIC, &ppc, NULL, NULL);
+	msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc,
+					      NULL, NULL);
 	if (!msg_resend)
 		return -ENOMEM;
 
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 3e8dca66183b..8deb16f8f620 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -39,9 +39,8 @@ void dlm_lowcomms_stop(void);
 void dlm_lowcomms_init(void);
 void dlm_lowcomms_exit(void);
 int dlm_lowcomms_close(int nodeid);
-struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
-				     char **ppc, void (*cb)(void *data),
-				     void *data);
+struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
+				     void (*cb)(void *data), void *data);
 void dlm_lowcomms_commit_msg(struct dlm_msg *msg);
 void dlm_lowcomms_put_msg(struct dlm_msg *msg);
 int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index 64f212a066cf..c0c1a83f6381 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -134,9 +134,9 @@ void dlm_free_lkb(struct dlm_lkb *lkb)
 	kmem_cache_free(lkb_cache, lkb);
 }
 
-struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation)
+struct dlm_mhandle *dlm_allocate_mhandle(void)
 {
-	return kmem_cache_alloc(mhandle_cache, allocation);
+	return kmem_cache_alloc(mhandle_cache, GFP_ATOMIC);
 }
 
 void dlm_free_mhandle(struct dlm_mhandle *mhandle)
@@ -154,9 +154,9 @@ void dlm_free_writequeue(struct writequeue_entry *writequeue)
 	kmem_cache_free(writequeue_cache, writequeue);
 }
 
-struct dlm_msg *dlm_allocate_msg(gfp_t allocation)
+struct dlm_msg *dlm_allocate_msg(void)
 {
-	return kmem_cache_alloc(msg_cache, allocation);
+	return kmem_cache_alloc(msg_cache, GFP_ATOMIC);
 }
 
 void dlm_free_msg(struct dlm_msg *msg)
diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h
index 6b29563d24f7..15198d46b42a 100644
--- a/fs/dlm/memory.h
+++ b/fs/dlm/memory.h
@@ -20,11 +20,11 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls);
 void dlm_free_lkb(struct dlm_lkb *l);
 char *dlm_allocate_lvb(struct dlm_ls *ls);
 void dlm_free_lvb(char *l);
-struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation);
+struct dlm_mhandle *dlm_allocate_mhandle(void);
 void dlm_free_mhandle(struct dlm_mhandle *mhandle);
 struct writequeue_entry *dlm_allocate_writequeue(void);
 void dlm_free_writequeue(struct writequeue_entry *writequeue);
-struct dlm_msg *dlm_allocate_msg(gfp_t allocation);
+struct dlm_msg *dlm_allocate_msg(void);
 void dlm_free_msg(struct dlm_msg *msg);
 struct dlm_callback *dlm_allocate_cb(void);
 void dlm_free_cb(struct dlm_callback *cb);
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 8e9920f1b48b..ed6fb9b9a582 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -379,8 +379,7 @@ static int dlm_send_ack(int nodeid, uint32_t seq)
 	struct dlm_msg *msg;
 	char *ppc;
 
-	msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_ATOMIC, &ppc,
-				   NULL, NULL);
+	msg = dlm_lowcomms_new_msg(nodeid, mb_len, &ppc, NULL, NULL);
 	if (!msg)
 		return -ENOMEM;
 
@@ -428,7 +427,7 @@ static int dlm_send_fin(struct midcomms_node *node,
 	struct dlm_mhandle *mh;
 	char *ppc;
 
-	mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_ATOMIC, &ppc);
+	mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, &ppc);
 	if (!mh)
 		return -ENOMEM;
 
@@ -976,13 +975,13 @@ static void midcomms_new_msg_cb(void *data)
 }
 
 static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
-						int len, gfp_t allocation, char **ppc)
+						int len, char **ppc)
 {
 	struct dlm_opts *opts;
 	struct dlm_msg *msg;
 
 	msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN,
-				   allocation, ppc, midcomms_new_msg_cb, mh);
+				   ppc, midcomms_new_msg_cb, mh);
 	if (!msg)
 		return NULL;
 
@@ -1001,8 +1000,7 @@ static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int node
  * dlm_midcomms_commit_mhandle which is a must call if success
  */
 #ifndef __CHECKER__
-struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
-					     gfp_t allocation, char **ppc)
+struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc)
 {
 	struct midcomms_node *node;
 	struct dlm_mhandle *mh;
@@ -1017,7 +1015,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 	/* this is a bug, however we going on and hope it will be resolved */
 	WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
 
-	mh = dlm_allocate_mhandle(allocation);
+	mh = dlm_allocate_mhandle();
 	if (!mh)
 		goto err;
 
@@ -1028,8 +1026,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 
 	switch (node->version) {
 	case DLM_VERSION_3_1:
-		msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc,
-					   NULL, NULL);
+		msg = dlm_lowcomms_new_msg(nodeid, len, ppc, NULL, NULL);
 		if (!msg) {
 			dlm_free_mhandle(mh);
 			goto err;
@@ -1040,8 +1037,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 		/* send ack back if necessary */
 		dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
 
-		msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
-					       ppc);
+		msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, ppc);
 		if (!msg) {
 			dlm_free_mhandle(mh);
 			goto err;
@@ -1501,8 +1497,8 @@ int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf,
 	rd.node = node;
 	rd.buf = buf;
 
-	msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS,
-				   &msgbuf, midcomms_new_rawmsg_cb, &rd);
+	msg = dlm_lowcomms_new_msg(node->nodeid, buflen, &msgbuf,
+				   midcomms_new_rawmsg_cb, &rd);
 	if (!msg)
 		return -ENOMEM;
 
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index e7246fb3ef57..278d26fdeb2c 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -16,8 +16,7 @@ struct midcomms_node;
 
 int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len);
 int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
-struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
-					     gfp_t allocation, char **ppc);
+struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc);
 void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
 				 int namelen);
 int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 3b734aed26b5..2e3f529f3ff2 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -55,7 +55,7 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
 	struct dlm_mhandle *mh;
 	char *mb;
 
-	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
+	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
 	if (!mh) {
 		log_print("%s to %d type %d len %d ENOBUFS",
 			  __func__, to_nodeid, type, len);
@@ -75,8 +75,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
 	struct dlm_msg *msg;
 	char *mb;
 
-	msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb,
-				   NULL, NULL);
+	msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, &mb, NULL, NULL);
 	if (!msg) {
 		log_print("create_rcom to %d type %d len %d ENOBUFS",
 			  to_nodeid, type, len);
@@ -510,7 +509,7 @@ int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in)
 	char *mb;
 	int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
 
-	mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb);
+	mh = dlm_midcomms_get_mhandle(nodeid, mb_len, &mb);
 	if (!mh)
 		return -ENOBUFS;
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 06/18] dlm: switch to GFP_ATOMIC in dlm allocations
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (4 preceding siblings ...)
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 05/18] dlm: remove allocation parameter in msg allocation Alexander Aring
@ 2024-02-27  1:48 ` Alexander Aring
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 07/18] dlm: move root_list functionality to recover.c Alexander Aring
                   ` (11 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch prepares to process dlm messages in softirq context. To
switch dlm to parse messages in softirq context some code parts either
runs inside the softirq context or need to switch to run while a spinlock
is held. This patch prepares to switch the allocation context to
GFP_ATOMIC for those places. It's not possible anymore to preload idr
allocations. However this is only a performance speedup and we might
switch to xarray implementation with more lockless readers paradigms.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c         | 2 --
 fs/dlm/memory.c       | 6 +++---
 fs/dlm/recover.c      | 2 --
 fs/dlm/requestqueue.c | 2 +-
 4 files changed, 4 insertions(+), 8 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 28e882e0dd19..97d57c799032 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1208,13 +1208,11 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	spin_lock_init(&lkb->lkb_cb_lock);
 	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
 
-	idr_preload(GFP_NOFS);
 	spin_lock(&ls->ls_lkbidr_spin);
 	rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
 	if (rv >= 0)
 		lkb->lkb_id = rv;
 	spin_unlock(&ls->ls_lkbidr_spin);
-	idr_preload_end();
 
 	if (rv < 0) {
 		log_error(ls, "create_lkb idr error %d", rv);
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index c0c1a83f6381..f44532d9f5c8 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -84,7 +84,7 @@ char *dlm_allocate_lvb(struct dlm_ls *ls)
 {
 	char *p;
 
-	p = kzalloc(ls->ls_lvblen, GFP_NOFS);
+	p = kzalloc(ls->ls_lvblen, GFP_ATOMIC);
 	return p;
 }
 
@@ -97,7 +97,7 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r;
 
-	r = kmem_cache_zalloc(rsb_cache, GFP_NOFS);
+	r = kmem_cache_zalloc(rsb_cache, GFP_ATOMIC);
 	return r;
 }
 
@@ -112,7 +112,7 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb;
 
-	lkb = kmem_cache_zalloc(lkb_cache, GFP_NOFS);
+	lkb = kmem_cache_zalloc(lkb_cache, GFP_ATOMIC);
 	return lkb;
 }
 
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 53917c0aa3c0..ce6dc914cb86 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -310,7 +310,6 @@ static int recover_idr_add(struct dlm_rsb *r)
 	struct dlm_ls *ls = r->res_ls;
 	int rv;
 
-	idr_preload(GFP_NOFS);
 	spin_lock(&ls->ls_recover_idr_lock);
 	if (r->res_id) {
 		rv = -1;
@@ -326,7 +325,6 @@ static int recover_idr_add(struct dlm_rsb *r)
 	rv = 0;
 out_unlock:
 	spin_unlock(&ls->ls_recover_idr_lock);
-	idr_preload_end();
 	return rv;
 }
 
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 892d6ca21e74..c05940afd063 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -37,7 +37,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
 	int length = le16_to_cpu(ms->m_header.h_length) -
 		sizeof(struct dlm_message);
 
-	e = kmalloc(sizeof(struct rq_entry) + length, GFP_NOFS);
+	e = kmalloc(sizeof(struct rq_entry) + length, GFP_ATOMIC);
 	if (!e) {
 		log_print("dlm_add_requestqueue: out of memory len %d", length);
 		return;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 07/18] dlm: move root_list functionality to recover.c
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (5 preceding siblings ...)
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 06/18] dlm: switch to GFP_ATOMIC in dlm allocations Alexander Aring
@ 2024-02-27  1:48 ` Alexander Aring
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 08/18] dlm: move master dir dump to own list Alexander Aring
                   ` (10 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch moves dlm_create_root_list() and dlm_release_root_list() to
recover.c and declare them static because they are only used there.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/recover.c  | 42 ------------------------------------------
 fs/dlm/recover.h  |  2 --
 fs/dlm/recoverd.c | 39 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 39 insertions(+), 44 deletions(-)

diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index ce6dc914cb86..6abc283f8f36 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -889,48 +889,6 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
 
 /* Create a single list of all root rsb's to be used during recovery */
 
-int dlm_create_root_list(struct dlm_ls *ls)
-{
-	struct rb_node *n;
-	struct dlm_rsb *r;
-	int i, error = 0;
-
-	down_write(&ls->ls_root_sem);
-	if (!list_empty(&ls->ls_root_list)) {
-		log_error(ls, "root list not empty");
-		error = -EINVAL;
-		goto out;
-	}
-
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock(&ls->ls_rsbtbl[i].lock);
-		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			list_add(&r->res_root_list, &ls->ls_root_list);
-			dlm_hold_rsb(r);
-		}
-
-		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
-			log_error(ls, "dlm_create_root_list toss not empty");
-		spin_unlock(&ls->ls_rsbtbl[i].lock);
-	}
- out:
-	up_write(&ls->ls_root_sem);
-	return error;
-}
-
-void dlm_release_root_list(struct dlm_ls *ls)
-{
-	struct dlm_rsb *r, *safe;
-
-	down_write(&ls->ls_root_sem);
-	list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
-		list_del_init(&r->res_root_list);
-		dlm_put_rsb(r);
-	}
-	up_write(&ls->ls_root_sem);
-}
-
 void dlm_clear_toss(struct dlm_ls *ls)
 {
 	struct rb_node *n, *next;
diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h
index dbc51013ecad..0b54550ee055 100644
--- a/fs/dlm/recover.h
+++ b/fs/dlm/recover.h
@@ -23,8 +23,6 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq);
 int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc);
 int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq);
 void dlm_recovered_lock(struct dlm_rsb *r);
-int dlm_create_root_list(struct dlm_ls *ls);
-void dlm_release_root_list(struct dlm_ls *ls);
 void dlm_clear_toss(struct dlm_ls *ls);
 void dlm_recover_rsbs(struct dlm_ls *ls);
 
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 4d17491dea2f..8eb42554ccb0 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -20,6 +20,45 @@
 #include "requestqueue.h"
 #include "recoverd.h"
 
+static void dlm_create_root_list(struct dlm_ls *ls)
+{
+	struct rb_node *n;
+	struct dlm_rsb *r;
+	int i;
+
+	down_write(&ls->ls_root_sem);
+	if (!list_empty(&ls->ls_root_list)) {
+		log_error(ls, "root list not empty");
+		goto out;
+	}
+
+	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
+		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
+		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			list_add(&r->res_root_list, &ls->ls_root_list);
+			dlm_hold_rsb(r);
+		}
+
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
+			log_error(ls, "%s toss not empty", __func__);
+		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
+	}
+ out:
+	up_write(&ls->ls_root_sem);
+}
+
+static void dlm_release_root_list(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r, *safe;
+
+	down_write(&ls->ls_root_sem);
+	list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
+		list_del_init(&r->res_root_list);
+		dlm_put_rsb(r);
+	}
+	up_write(&ls->ls_root_sem);
+}
 
 /* If the start for which we're re-enabling locking (seq) has been superseded
    by a newer stop (ls_recover_seq), we need to leave locking disabled.
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 08/18] dlm: move master dir dump to own list
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (6 preceding siblings ...)
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 07/18] dlm: move root_list functionality to recover.c Alexander Aring
@ 2024-02-27  1:48 ` Alexander Aring
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 09/18] dlm: move root_list to ls_recover() stack Alexander Aring
                   ` (9 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:48 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch moves the master directory dump, means dlm_rsbs where we are
the master of (res_nodeid == 0), to it's own list handling. Currently
the only mutual access to ls->root_list is due the master directory
dump. Put it into it's own list handling allows us to put the root_list
out of the global per lockspace context and make it lockless. While on
it move the rw semaphore to a rwlock as the context allows it.

Add a comment that we should keep track of our own master rsbs while
locking occurs instead of recovery creates it in a snapshot like mode.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dir.c          | 22 ++++++---------
 fs/dlm/dlm_internal.h |  3 ++
 fs/dlm/lockspace.c    |  2 ++
 fs/dlm/recoverd.c     | 64 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 77 insertions(+), 14 deletions(-)

diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index f6acba4310a7..10753486049a 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -216,16 +216,13 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 	if (!rv)
 		return r;
 
-	down_read(&ls->ls_root_sem);
-	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) {
 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
-			up_read(&ls->ls_root_sem);
 			log_debug(ls, "find_rsb_root revert to root_list %s",
 				  r->res_name);
 			return r;
 		}
 	}
-	up_read(&ls->ls_root_sem);
 	return NULL;
 }
 
@@ -241,7 +238,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 	int offset = 0, dir_nodeid;
 	__be16 be_namelen;
 
-	down_read(&ls->ls_root_sem);
+	read_lock(&ls->ls_masters_lock);
 
 	if (inlen > 1) {
 		r = find_rsb_root(ls, inbuf, inlen);
@@ -250,16 +247,13 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 				  nodeid, inlen, inlen, inbuf);
 			goto out;
 		}
-		list = r->res_root_list.next;
+		list = r->res_masters_list.next;
 	} else {
-		list = ls->ls_root_list.next;
+		list = ls->ls_masters_list.next;
 	}
 
-	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
-		r = list_entry(list, struct dlm_rsb, res_root_list);
-		if (r->res_nodeid)
-			continue;
-
+	for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
+		r = list_entry(list, struct dlm_rsb, res_masters_list);
 		dir_nodeid = dlm_dir_nodeid(r);
 		if (dir_nodeid != nodeid)
 			continue;
@@ -294,7 +288,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 	 * terminating record.
 	 */
 
-	if ((list == &ls->ls_root_list) &&
+	if ((list == &ls->ls_masters_list) &&
 	    (offset + sizeof(uint16_t) <= outlen)) {
 		be_namelen = cpu_to_be16(0xFFFF);
 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
@@ -302,6 +296,6 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 		ls->ls_recover_dir_sent_msg++;
 	}
  out:
-	up_read(&ls->ls_root_sem);
+	read_unlock(&ls->ls_masters_lock);
 }
 
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index dfc444dad329..cb18f383acff 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -312,6 +312,7 @@ struct dlm_rsb {
 	struct list_head	res_waitqueue;
 
 	struct list_head	res_root_list;	    /* used for recovery */
+	struct list_head	res_masters_list;   /* used for recovery */
 	struct list_head	res_recover_list;   /* used for recovery */
 	int			res_recover_locks_count;
 
@@ -645,6 +646,8 @@ struct dlm_ls {
 
 	struct list_head	ls_root_list;	/* root resources */
 	struct rw_semaphore	ls_root_sem;	/* protect root_list */
+	struct list_head	ls_masters_list;	/* root resources */
+	rwlock_t		ls_masters_lock;	/* protect root_list */
 
 	const struct dlm_lockspace_ops *ls_ops;
 	void			*ls_ops_arg;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index c7ab7358422b..977a648485ee 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -582,6 +582,8 @@ static int new_lockspace(const char *name, const char *cluster,
 	init_waitqueue_head(&ls->ls_wait_general);
 	INIT_LIST_HEAD(&ls->ls_root_list);
 	init_rwsem(&ls->ls_root_sem);
+	INIT_LIST_HEAD(&ls->ls_masters_list);
+	rwlock_init(&ls->ls_masters_lock);
 
 	spin_lock(&lslist_lock);
 	ls->ls_create_count = 1;
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 8eb42554ccb0..dfce8fc6a783 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -20,6 +20,48 @@
 #include "requestqueue.h"
 #include "recoverd.h"
 
+static int dlm_create_masters_list(struct dlm_ls *ls)
+{
+	struct rb_node *n;
+	struct dlm_rsb *r;
+	int i, error = 0;
+
+	write_lock(&ls->ls_masters_lock);
+	if (!list_empty(&ls->ls_masters_list)) {
+		log_error(ls, "root list not empty");
+		error = -EINVAL;
+		goto out;
+	}
+
+	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
+		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
+		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			if (r->res_nodeid)
+				continue;
+
+			list_add(&r->res_masters_list, &ls->ls_masters_list);
+			dlm_hold_rsb(r);
+		}
+		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
+	}
+ out:
+	write_unlock(&ls->ls_masters_lock);
+	return error;
+}
+
+static void dlm_release_masters_list(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r, *safe;
+
+	write_lock(&ls->ls_masters_lock);
+	list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) {
+		list_del_init(&r->res_masters_list);
+		dlm_put_rsb(r);
+	}
+	write_unlock(&ls->ls_masters_lock);
+}
+
 static void dlm_create_root_list(struct dlm_ls *ls)
 {
 	struct rb_node *n;
@@ -123,6 +165,23 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
 	dlm_recover_dir_nodeid(ls);
 
+	/* Create a snapshot of all active rsbs were we are the master of.
+	 * During the barrier between dlm_recover_members_wait() and
+	 * dlm_recover_directory() other nodes can dump their necessary
+	 * directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom
+	 * communication dlm_copy_master_names() handling.
+	 *
+	 * TODO We should create a per lockspace list that contains rsbs
+	 * that we are the master of. Instead of creating this list while
+	 * recovery we keep track of those rsbs while locking handling and
+	 * recovery can use it when necessary.
+	 */
+	error = dlm_create_masters_list(ls);
+	if (error) {
+		log_rinfo(ls, "dlm_create_masters_list error %d", error);
+		goto fail;
+	}
+
 	ls->ls_recover_dir_sent_res = 0;
 	ls->ls_recover_dir_sent_msg = 0;
 	ls->ls_recover_locks_in = 0;
@@ -132,6 +191,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	error = dlm_recover_members_wait(ls, rv->seq);
 	if (error) {
 		log_rinfo(ls, "dlm_recover_members_wait error %d", error);
+		dlm_release_masters_list(ls);
 		goto fail;
 	}
 
@@ -145,6 +205,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	error = dlm_recover_directory(ls, rv->seq);
 	if (error) {
 		log_rinfo(ls, "dlm_recover_directory error %d", error);
+		dlm_release_masters_list(ls);
 		goto fail;
 	}
 
@@ -153,9 +214,12 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	error = dlm_recover_directory_wait(ls, rv->seq);
 	if (error) {
 		log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
+		dlm_release_masters_list(ls);
 		goto fail;
 	}
 
+	dlm_release_masters_list(ls);
+
 	log_rinfo(ls, "dlm_recover_directory %u out %u messages",
 		  ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 09/18] dlm: move root_list to ls_recover() stack
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (7 preceding siblings ...)
  2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 08/18] dlm: move master dir dump to own list Alexander Aring
@ 2024-02-27  1:49 ` Alexander Aring
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 10/18] dlm: implement directory dump context Alexander Aring
                   ` (8 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:49 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch moves the per lockspace ls_root_list list which is mainly
used for snapshoting all dlm_rsb from a hash to a list to do recovery
handling into the recovery function ls_recover() as stack variable.

Doing that shows that there is no need for locking the ls_root_list
which is created at the beginning of ls_recover() and destroyed at the
ending of ls_recover(). In between only functionality is called doing
read only access to the root_list stack variable.

A special case is assigning the per lockspace ls_recover_dir_root_list
variable to the stack variable. The ls_recover_dir_root_list get
accessed by another concurrent process dlm_copy_master_names() during
the time between ls_recover_dir_root_list is set and set to NULL again.
This is done by a special distributed barrier functionality between
dlm_recover_members_wait() and dlm_recover_directory_wait(). A comment
was made to mention about this handling which might be changed to a
better behaviour in future. However setting the ls_recover_dir_root_list
to the stack variable and set it to NULL in this specific time will show
us potential issues with the recovery handling if it's breaks.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dir.c          |  6 ++----
 fs/dlm/dir.h          |  3 ++-
 fs/dlm/dlm_internal.h |  6 ++----
 fs/dlm/lock.c         |  6 ++----
 fs/dlm/lock.h         |  2 +-
 fs/dlm/lockspace.c    |  2 --
 fs/dlm/recover.c      | 30 ++++++++++--------------------
 fs/dlm/recover.h      |  8 +++++---
 fs/dlm/recoverd.c     | 35 +++++++++++++----------------------
 9 files changed, 37 insertions(+), 61 deletions(-)

diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 10753486049a..3da00c46cbb3 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -47,15 +47,13 @@ int dlm_dir_nodeid(struct dlm_rsb *r)
 	return r->res_dir_nodeid;
 }
 
-void dlm_recover_dir_nodeid(struct dlm_ls *ls)
+void dlm_recover_dir_nodeid(struct dlm_ls *ls, const struct list_head *root_list)
 {
 	struct dlm_rsb *r;
 
-	down_read(&ls->ls_root_sem);
-	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry(r, root_list, res_root_list) {
 		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
 	}
-	up_read(&ls->ls_root_sem);
 }
 
 int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
diff --git a/fs/dlm/dir.h b/fs/dlm/dir.h
index 39ecb69d7ef3..5b2a7ee3762d 100644
--- a/fs/dlm/dir.h
+++ b/fs/dlm/dir.h
@@ -14,7 +14,8 @@
 
 int dlm_dir_nodeid(struct dlm_rsb *rsb);
 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
-void dlm_recover_dir_nodeid(struct dlm_ls *ls);
+void dlm_recover_dir_nodeid(struct dlm_ls *ls,
+			    const struct list_head *root_list);
 int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq);
 void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 			   char *outbuf, int outlen, int nodeid);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index cb18f383acff..959f69fb2a52 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -644,10 +644,8 @@ struct dlm_ls {
 	wait_queue_head_t	ls_recover_lock_wait;
 	spinlock_t		ls_clear_proc_locks;
 
-	struct list_head	ls_root_list;	/* root resources */
-	struct rw_semaphore	ls_root_sem;	/* protect root_list */
-	struct list_head	ls_masters_list;	/* root resources */
-	rwlock_t		ls_masters_lock;	/* protect root_list */
+	struct list_head	ls_masters_list; /* root resources */
+	rwlock_t		ls_masters_lock; /* protect root_list */
 
 	const struct dlm_lockspace_ops *ls_ops;
 	void			*ls_ops_arg;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 97d57c799032..113a6b08d68b 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -5187,7 +5187,7 @@ static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
 
 /* Get rid of locks held by nodes that are gone. */
 
-void dlm_recover_purge(struct dlm_ls *ls)
+void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
 {
 	struct dlm_rsb *r;
 	struct dlm_member *memb;
@@ -5206,8 +5206,7 @@ void dlm_recover_purge(struct dlm_ls *ls)
 	if (!nodes_count)
 		return;
 
-	down_write(&ls->ls_root_sem);
-	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry(r, root_list, res_root_list) {
 		hold_rsb(r);
 		lock_rsb(r);
 		if (is_master(r)) {
@@ -5222,7 +5221,6 @@ void dlm_recover_purge(struct dlm_ls *ls)
 		unhold_rsb(r);
 		cond_resched();
 	}
-	up_write(&ls->ls_root_sem);
 
 	if (lkb_count)
 		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 853c3d3dc49d..461123d17d67 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -31,7 +31,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
 			struct dlm_rsb **r_ret);
 
-void dlm_recover_purge(struct dlm_ls *ls);
+void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list);
 void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
 void dlm_recover_grant(struct dlm_ls *ls);
 int dlm_recover_waiters_post(struct dlm_ls *ls);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 977a648485ee..388358aafed4 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -580,8 +580,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	ls->ls_recover_list_count = 0;
 	ls->ls_local_handle = ls;
 	init_waitqueue_head(&ls->ls_wait_general);
-	INIT_LIST_HEAD(&ls->ls_root_list);
-	init_rwsem(&ls->ls_root_sem);
 	INIT_LIST_HEAD(&ls->ls_masters_list);
 	rwlock_init(&ls->ls_masters_lock);
 
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 6abc283f8f36..172c6b73f37a 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -519,7 +519,8 @@ static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
  * the correct dir node.
  */
 
-int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
+int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq,
+			const struct list_head *root_list)
 {
 	struct dlm_rsb *r;
 	unsigned int total = 0;
@@ -529,10 +530,8 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
 
 	log_rinfo(ls, "dlm_recover_masters");
 
-	down_read(&ls->ls_root_sem);
-	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry(r, root_list, res_root_list) {
 		if (dlm_recovery_stopped(ls)) {
-			up_read(&ls->ls_root_sem);
 			error = -EINTR;
 			goto out;
 		}
@@ -546,12 +545,9 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
 		cond_resched();
 		total++;
 
-		if (error) {
-			up_read(&ls->ls_root_sem);
+		if (error)
 			goto out;
-		}
 	}
-	up_read(&ls->ls_root_sem);
 
 	log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
 
@@ -656,13 +652,13 @@ static int recover_locks(struct dlm_rsb *r, uint64_t seq)
 	return error;
 }
 
-int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
+int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq,
+		      const struct list_head *root_list)
 {
 	struct dlm_rsb *r;
 	int error, count = 0;
 
-	down_read(&ls->ls_root_sem);
-	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry(r, root_list, res_root_list) {
 		if (is_master(r)) {
 			rsb_clear_flag(r, RSB_NEW_MASTER);
 			continue;
@@ -673,19 +669,15 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
 
 		if (dlm_recovery_stopped(ls)) {
 			error = -EINTR;
-			up_read(&ls->ls_root_sem);
 			goto out;
 		}
 
 		error = recover_locks(r, seq);
-		if (error) {
-			up_read(&ls->ls_root_sem);
+		if (error)
 			goto out;
-		}
 
 		count += r->res_recover_locks_count;
 	}
-	up_read(&ls->ls_root_sem);
 
 	log_rinfo(ls, "dlm_recover_locks %d out", count);
 
@@ -854,13 +846,12 @@ static void recover_grant(struct dlm_rsb *r)
 		rsb_set_flag(r, RSB_RECOVER_GRANT);
 }
 
-void dlm_recover_rsbs(struct dlm_ls *ls)
+void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list)
 {
 	struct dlm_rsb *r;
 	unsigned int count = 0;
 
-	down_read(&ls->ls_root_sem);
-	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry(r, root_list, res_root_list) {
 		lock_rsb(r);
 		if (is_master(r)) {
 			if (rsb_flag(r, RSB_RECOVER_CONVERT))
@@ -881,7 +872,6 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
 		rsb_clear_flag(r, RSB_NEW_MASTER2);
 		unlock_rsb(r);
 	}
-	up_read(&ls->ls_root_sem);
 
 	if (count)
 		log_rinfo(ls, "dlm_recover_rsbs %d done", count);
diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h
index 0b54550ee055..efc79a6e577d 100644
--- a/fs/dlm/recover.h
+++ b/fs/dlm/recover.h
@@ -19,12 +19,14 @@ int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq);
 int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq);
 int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq);
 int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq);
-int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq);
+int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq,
+			const struct list_head *root_list);
 int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc);
-int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq);
+int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq,
+		      const struct list_head *root_list);
 void dlm_recovered_lock(struct dlm_rsb *r);
 void dlm_clear_toss(struct dlm_ls *ls);
-void dlm_recover_rsbs(struct dlm_ls *ls);
+void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list);
 
 #endif				/* __RECOVER_DOT_H__ */
 
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index dfce8fc6a783..e5649201ba23 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -62,23 +62,17 @@ static void dlm_release_masters_list(struct dlm_ls *ls)
 	write_unlock(&ls->ls_masters_lock);
 }
 
-static void dlm_create_root_list(struct dlm_ls *ls)
+static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 {
 	struct rb_node *n;
 	struct dlm_rsb *r;
 	int i;
 
-	down_write(&ls->ls_root_sem);
-	if (!list_empty(&ls->ls_root_list)) {
-		log_error(ls, "root list not empty");
-		goto out;
-	}
-
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			list_add(&r->res_root_list, &ls->ls_root_list);
+			list_add(&r->res_root_list, root_list);
 			dlm_hold_rsb(r);
 		}
 
@@ -86,20 +80,16 @@ static void dlm_create_root_list(struct dlm_ls *ls)
 			log_error(ls, "%s toss not empty", __func__);
 		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
- out:
-	up_write(&ls->ls_root_sem);
 }
 
-static void dlm_release_root_list(struct dlm_ls *ls)
+static void dlm_release_root_list(struct list_head *root_list)
 {
 	struct dlm_rsb *r, *safe;
 
-	down_write(&ls->ls_root_sem);
-	list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry_safe(r, safe, root_list, res_root_list) {
 		list_del_init(&r->res_root_list);
 		dlm_put_rsb(r);
 	}
-	up_write(&ls->ls_root_sem);
 }
 
 /* If the start for which we're re-enabling locking (seq) has been superseded
@@ -131,6 +121,7 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 
 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 {
+	LIST_HEAD(root_list);
 	unsigned long start;
 	int error, neg = 0;
 
@@ -147,7 +138,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	 * routines.
 	 */
 
-	dlm_create_root_list(ls);
+	dlm_create_root_list(ls, &root_list);
 
 	/*
 	 * Add or remove nodes from the lockspace's ls_nodes list.
@@ -163,7 +154,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		goto fail;
 	}
 
-	dlm_recover_dir_nodeid(ls);
+	dlm_recover_dir_nodeid(ls, &root_list);
 
 	/* Create a snapshot of all active rsbs were we are the master of.
 	 * During the barrier between dlm_recover_members_wait() and
@@ -241,14 +232,14 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		 * Clear lkb's for departed nodes.
 		 */
 
-		dlm_recover_purge(ls);
+		dlm_recover_purge(ls, &root_list);
 
 		/*
 		 * Get new master nodeid's for rsb's that were mastered on
 		 * departed nodes.
 		 */
 
-		error = dlm_recover_masters(ls, rv->seq);
+		error = dlm_recover_masters(ls, rv->seq, &root_list);
 		if (error) {
 			log_rinfo(ls, "dlm_recover_masters error %d", error);
 			goto fail;
@@ -258,7 +249,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		 * Send our locks on remastered rsb's to the new masters.
 		 */
 
-		error = dlm_recover_locks(ls, rv->seq);
+		error = dlm_recover_locks(ls, rv->seq, &root_list);
 		if (error) {
 			log_rinfo(ls, "dlm_recover_locks error %d", error);
 			goto fail;
@@ -281,7 +272,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		 * settings.
 		 */
 
-		dlm_recover_rsbs(ls);
+		dlm_recover_rsbs(ls, &root_list);
 	} else {
 		/*
 		 * Other lockspace members may be going through the "neg" steps
@@ -297,7 +288,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		}
 	}
 
-	dlm_release_root_list(ls);
+	dlm_release_root_list(&root_list);
 
 	/*
 	 * Purge directory-related requests that are saved in requestqueue.
@@ -347,7 +338,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	return 0;
 
  fail:
-	dlm_release_root_list(ls);
+	dlm_release_root_list(&root_list);
 	mutex_unlock(&ls->ls_recoverd_active);
 
 	return error;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 10/18] dlm: implement directory dump context
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (8 preceding siblings ...)
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 09/18] dlm: move root_list to ls_recover() stack Alexander Aring
@ 2024-02-27  1:49 ` Alexander Aring
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 11/18] dlm: drop holding waiters mutex in waiters recovery Alexander Aring
                   ` (7 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:49 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch introduce to keep track of an directory dump in DLM. For now
we only add more sanity checks if e.g. the recovery sequence number has
been changed while dumping the directory. Another change is that we can
keep track of a per nodeid directory dump that can be later being used
to add log messages about how much entries in how many chunks was being
sent to a specific nodeid.

That the whole dump depends on the recovery barrier, because the
resource list is not manipulated during this time may later being
improved. For now we add more sanity checks in the recovery low path to
confirm there is no issue with the current behaviour e.g. it also checks
if the same list entry was being returned from the last resource lookup
vs last resource list entry.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dir.c          | 115 ++++++++++++++++++++++++++++++++++++++++--
 fs/dlm/dlm_internal.h |   4 +-
 fs/dlm/lockspace.c    |   2 +
 fs/dlm/recoverd.c     |   5 --
 4 files changed, 116 insertions(+), 10 deletions(-)

diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 3da00c46cbb3..0dc8a1d9e411 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -224,6 +224,80 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 	return NULL;
 }
 
+struct dlm_dir_dump {
+	/* init values to match if whole
+	 * dump fits to one seq. Sanity check only.
+	 */
+	uint64_t seq_init;
+	uint64_t nodeid_init;
+	/* compare local pointer with last lookup,
+	 * just a sanity check.
+	 */
+	struct list_head *last;
+
+	unsigned int sent_res; /* for log info */
+	unsigned int sent_msg; /* for log info */
+
+	struct list_head list;
+};
+
+static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
+{
+	struct dlm_dir_dump *dd, *safe;
+
+	write_lock(&ls->ls_dir_dump_lock);
+	list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
+		if (dd->nodeid_init == nodeid) {
+			log_error(ls, "drop dump seq %llu",
+				 (unsigned long long)dd->seq_init);
+			list_del(&dd->list);
+			kfree(dd);
+		}
+	}
+	write_unlock(&ls->ls_dir_dump_lock);
+}
+
+static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
+{
+	struct dlm_dir_dump *iter, *dd = NULL;
+
+	read_lock(&ls->ls_dir_dump_lock);
+	list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
+		if (iter->nodeid_init == nodeid) {
+			dd = iter;
+			break;
+		}
+	}
+	read_unlock(&ls->ls_dir_dump_lock);
+
+	return dd;
+}
+
+static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
+{
+	struct dlm_dir_dump *dd;
+
+	dd = lookup_dir_dump(ls, nodeid);
+	if (dd) {
+		log_error(ls, "found ongoing dir dump for node %d, will drop it",
+			  nodeid);
+		drop_dir_ctx(ls, nodeid);
+	}
+
+	dd = kzalloc(sizeof(*dd), GFP_ATOMIC);
+	if (!dd)
+		return NULL;
+
+	dd->seq_init = ls->ls_recover_seq;
+	dd->nodeid_init = nodeid;
+
+	write_lock(&ls->ls_dir_dump_lock);
+	list_add(&dd->list, &ls->ls_dir_dump_list);
+	write_unlock(&ls->ls_dir_dump_lock);
+
+	return dd;
+}
+
 /* Find the rsb where we left off (or start again), then send rsb names
    for rsb's we're master of and whose directory node matches the requesting
    node.  inbuf is the rsb name last sent, inlen is the name's length */
@@ -234,11 +308,20 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 	struct list_head *list;
 	struct dlm_rsb *r;
 	int offset = 0, dir_nodeid;
+	struct dlm_dir_dump *dd;
 	__be16 be_namelen;
 
 	read_lock(&ls->ls_masters_lock);
 
 	if (inlen > 1) {
+		dd = lookup_dir_dump(ls, nodeid);
+		if (!dd) {
+			log_error(ls, "failed to lookup dir dump context nodeid: %d",
+				  nodeid);
+			goto out;
+		}
+
+		/* next chunk in dump */
 		r = find_rsb_root(ls, inbuf, inlen);
 		if (!r) {
 			log_error(ls, "copy_master_names from %d start %d %.*s",
@@ -246,8 +329,25 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 			goto out;
 		}
 		list = r->res_masters_list.next;
+
+		/* sanity checks */
+		if (dd->last != &r->res_masters_list ||
+		    dd->seq_init != ls->ls_recover_seq) {
+			log_error(ls, "failed dir dump sanity check seq_init: %llu seq: %llu",
+				  (unsigned long long)dd->seq_init,
+				  (unsigned long long)ls->ls_recover_seq);
+			goto out;
+		}
 	} else {
+		dd = init_dir_dump(ls, nodeid);
+		if (!dd) {
+			log_error(ls, "failed to allocate dir dump context");
+			goto out;
+		}
+
+		/* start dump */
 		list = ls->ls_masters_list.next;
+		dd->last = list;
 	}
 
 	for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
@@ -269,7 +369,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 			be_namelen = cpu_to_be16(0);
 			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 			offset += sizeof(__be16);
-			ls->ls_recover_dir_sent_msg++;
+			dd->sent_msg++;
 			goto out;
 		}
 
@@ -278,7 +378,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 		offset += sizeof(__be16);
 		memcpy(outbuf + offset, r->res_name, r->res_length);
 		offset += r->res_length;
-		ls->ls_recover_dir_sent_res++;
+		dd->sent_res++;
+		dd->last = list;
 	}
 
 	/*
@@ -288,10 +389,18 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 
 	if ((list == &ls->ls_masters_list) &&
 	    (offset + sizeof(uint16_t) <= outlen)) {
+		/* end dump */
 		be_namelen = cpu_to_be16(0xFFFF);
 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 		offset += sizeof(__be16);
-		ls->ls_recover_dir_sent_msg++;
+		dd->sent_msg++;
+		log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
+			  nodeid, dd->sent_res, dd->sent_msg);
+
+		write_lock(&ls->ls_dir_dump_lock);
+		list_del_init(&dd->list);
+		write_unlock(&ls->ls_dir_dump_lock);
+		kfree(dd);
 	}
  out:
 	read_unlock(&ls->ls_masters_lock);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 959f69fb2a52..9aa1e3a09e02 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -630,8 +630,6 @@ struct dlm_ls {
 	struct mutex		ls_requestqueue_mutex;
 	struct dlm_rcom		*ls_recover_buf;
 	int			ls_recover_nodeid; /* for debugging */
-	unsigned int		ls_recover_dir_sent_res; /* for log info */
-	unsigned int		ls_recover_dir_sent_msg; /* for log info */
 	unsigned int		ls_recover_locks_in; /* for log info */
 	uint64_t		ls_rcom_seq;
 	spinlock_t		ls_rcom_spin;
@@ -646,6 +644,8 @@ struct dlm_ls {
 
 	struct list_head	ls_masters_list; /* root resources */
 	rwlock_t		ls_masters_lock; /* protect root_list */
+	struct list_head	ls_dir_dump_list; /* root resources */
+	rwlock_t		ls_dir_dump_lock; /* protect root_list */
 
 	const struct dlm_lockspace_ops *ls_ops;
 	void			*ls_ops_arg;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 388358aafed4..5fe00bd1164d 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -582,6 +582,8 @@ static int new_lockspace(const char *name, const char *cluster,
 	init_waitqueue_head(&ls->ls_wait_general);
 	INIT_LIST_HEAD(&ls->ls_masters_list);
 	rwlock_init(&ls->ls_masters_lock);
+	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
+	rwlock_init(&ls->ls_dir_dump_lock);
 
 	spin_lock(&lslist_lock);
 	ls->ls_create_count = 1;
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index e5649201ba23..5388db89e22f 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -173,8 +173,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		goto fail;
 	}
 
-	ls->ls_recover_dir_sent_res = 0;
-	ls->ls_recover_dir_sent_msg = 0;
 	ls->ls_recover_locks_in = 0;
 
 	dlm_set_recover_status(ls, DLM_RS_NODES);
@@ -211,9 +209,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
 	dlm_release_masters_list(ls);
 
-	log_rinfo(ls, "dlm_recover_directory %u out %u messages",
-		  ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
-
 	/*
 	 * We may have outstanding operations that are waiting for a reply from
 	 * a failed node.  Mark these to be resent after recovery.  Unlock and
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 11/18] dlm: drop holding waiters mutex in waiters recovery
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (9 preceding siblings ...)
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 10/18] dlm: implement directory dump context Alexander Aring
@ 2024-02-27  1:49 ` Alexander Aring
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 12/18] dlm: convert ls_waiters_mutex to spinlock Alexander Aring
                   ` (6 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:49 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch drops to hold the ls_waiters_mutex in
dlm_recover_waiters_pre(). The dlm_recover_waiters_pre() function is
only being called when recovery handling is being done for the specific
lockspace. During this time there can't be new lock requests initiated
or dlm messages being processed that manipulates the lockspace waiters
list.

Only debugfs can access the lockspace waiters list when
dlm_recover_waiters_pre() may manipulates it. This is not possible
anymore because debugfs will hold the recovery lock when it's acccessing
the waiters list.

A check was introduced in remove_from_waiters_ms() for local dlm
messaging to check if really the lockspace is stopped and no new lock
requests can be initiated.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c |  4 ++++
 fs/dlm/lock.c     | 17 +++++++++--------
 fs/dlm/lock.h     |  1 +
 3 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 4fa11d9ddbb6..b9deffeabbd1 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -823,6 +823,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 	size_t len = DLM_DEBUG_BUF_LEN, pos = 0, ret, rv;
 
 	mutex_lock(&debug_buf_lock);
+	dlm_lock_recovery(ls);
 	mutex_lock(&ls->ls_waiters_mutex);
 	memset(debug_buf, 0, sizeof(debug_buf));
 
@@ -835,6 +836,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 		pos += ret;
 	}
 	mutex_unlock(&ls->ls_waiters_mutex);
+	dlm_unlock_recovery(ls);
 
 	rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
 	mutex_unlock(&debug_buf_lock);
@@ -858,7 +860,9 @@ static ssize_t waiters_write(struct file *file, const char __user *user_buf,
 	if (n != 3)
 		return -EINVAL;
 
+	dlm_lock_recovery(ls);
 	error = dlm_debug_add_lkb_to_waiters(ls, lkb_id, mstype, to_nodeid);
+	dlm_unlock_recovery(ls);
 	if (error)
 		return error;
 
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 113a6b08d68b..bd9ff32984c7 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -201,7 +201,7 @@ void dlm_dump_rsb(struct dlm_rsb *r)
 
 /* Threads cannot use the lockspace while it's being recovered */
 
-static inline void dlm_lock_recovery(struct dlm_ls *ls)
+void dlm_lock_recovery(struct dlm_ls *ls)
 {
 	down_read(&ls->ls_in_recovery);
 }
@@ -1553,7 +1553,11 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 }
 
 /* Handles situations where we might be processing a "fake" or "local" reply in
-   which we can't try to take waiters_mutex again. */
+ * the recovery context which stops any locking activity. Only debugfs might
+ * change the lockspace waiters but they will held the recovery lock to ensure
+ * remove_from_waiters_ms() in local case will be the only user manipulating the
+ * lockspace waiters in recovery context.
+ */
 
 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
 				  const struct dlm_message *ms, bool local)
@@ -1563,6 +1567,9 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
 
 	if (!local)
 		mutex_lock(&ls->ls_waiters_mutex);
+	else
+		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
+			     !dlm_locking_stopped(ls));
 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
 	if (!local)
 		mutex_unlock(&ls->ls_waiters_mutex);
@@ -4395,7 +4402,6 @@ static void _receive_convert_reply(struct dlm_lkb *lkb,
 	if (error)
 		goto out;
 
-	/* local reply can happen with waiters_mutex held */
 	error = remove_from_waiters_ms(lkb, ms, local);
 	if (error)
 		goto out;
@@ -4434,7 +4440,6 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb,
 	if (error)
 		goto out;
 
-	/* local reply can happen with waiters_mutex held */
 	error = remove_from_waiters_ms(lkb, ms, local);
 	if (error)
 		goto out;
@@ -4486,7 +4491,6 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb,
 	if (error)
 		goto out;
 
-	/* local reply can happen with waiters_mutex held */
 	error = remove_from_waiters_ms(lkb, ms, local);
 	if (error)
 		goto out;
@@ -4887,8 +4891,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 	if (!ms_local)
 		return;
 
-	mutex_lock(&ls->ls_waiters_mutex);
-
 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
 
 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
@@ -4981,7 +4983,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 		}
 		schedule();
 	}
-	mutex_unlock(&ls->ls_waiters_mutex);
 	kfree(ms_local);
 }
 
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 461123d17d67..bc787a470632 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -23,6 +23,7 @@ void dlm_hold_rsb(struct dlm_rsb *r);
 int dlm_put_lkb(struct dlm_lkb *lkb);
 void dlm_scan_rsbs(struct dlm_ls *ls);
 int dlm_lock_recovery_try(struct dlm_ls *ls);
+void dlm_lock_recovery(struct dlm_ls *ls);
 void dlm_unlock_recovery(struct dlm_ls *ls);
 
 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 12/18] dlm: convert ls_waiters_mutex to spinlock
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (10 preceding siblings ...)
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 11/18] dlm: drop holding waiters mutex in waiters recovery Alexander Aring
@ 2024-02-27  1:49 ` Alexander Aring
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 13/18] dlm: convert res_lock " Alexander Aring
                   ` (5 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:49 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch converts the per dlm lockspace waiters lock from a mutex to a
spinlock.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     |  4 ++--
 fs/dlm/dlm_internal.h |  2 +-
 fs/dlm/lock.c         | 20 ++++++++++----------
 fs/dlm/lockspace.c    |  2 +-
 4 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index b9deffeabbd1..8e7683840315 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -824,7 +824,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 
 	mutex_lock(&debug_buf_lock);
 	dlm_lock_recovery(ls);
-	mutex_lock(&ls->ls_waiters_mutex);
+	spin_lock(&ls->ls_waiters_lock);
 	memset(debug_buf, 0, sizeof(debug_buf));
 
 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
@@ -835,7 +835,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 			break;
 		pos += ret;
 	}
-	mutex_unlock(&ls->ls_waiters_mutex);
+	spin_unlock(&ls->ls_waiters_lock);
 	dlm_unlock_recovery(ls);
 
 	rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 9aa1e3a09e02..5975d52f4061 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -565,7 +565,7 @@ struct dlm_ls {
 	struct dlm_rsbtable	*ls_rsbtbl;
 	uint32_t		ls_rsbtbl_size;
 
-	struct mutex		ls_waiters_mutex;
+	spinlock_t		ls_waiters_lock;
 	struct list_head	ls_waiters;	/* lkbs needing a reply */
 
 	struct mutex		ls_orphans_mutex;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index bd9ff32984c7..525635f42938 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1407,7 +1407,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 	int error = 0;
 	int wc;
 
-	mutex_lock(&ls->ls_waiters_mutex);
+	spin_lock(&ls->ls_waiters_lock);
 
 	if (is_overlap_unlock(lkb) ||
 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
@@ -1447,7 +1447,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
 			  lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
-	mutex_unlock(&ls->ls_waiters_mutex);
+	spin_unlock(&ls->ls_waiters_lock);
 	return error;
 }
 
@@ -1546,9 +1546,9 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int error;
 
-	mutex_lock(&ls->ls_waiters_mutex);
+	spin_lock(&ls->ls_waiters_lock);
 	error = _remove_from_waiters(lkb, mstype, NULL);
-	mutex_unlock(&ls->ls_waiters_mutex);
+	spin_unlock(&ls->ls_waiters_lock);
 	return error;
 }
 
@@ -1566,13 +1566,13 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
 	int error;
 
 	if (!local)
-		mutex_lock(&ls->ls_waiters_mutex);
+		spin_lock(&ls->ls_waiters_lock);
 	else
 		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
 			     !dlm_locking_stopped(ls));
 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
 	if (!local)
-		mutex_unlock(&ls->ls_waiters_mutex);
+		spin_unlock(&ls->ls_waiters_lock);
 	return error;
 }
 
@@ -4990,7 +4990,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb = NULL, *iter;
 
-	mutex_lock(&ls->ls_waiters_mutex);
+	spin_lock(&ls->ls_waiters_lock);
 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
 		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
 			hold_lkb(iter);
@@ -4998,7 +4998,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
 			break;
 		}
 	}
-	mutex_unlock(&ls->ls_waiters_mutex);
+	spin_unlock(&ls->ls_waiters_lock);
 
 	return lkb;
 }
@@ -5062,14 +5062,14 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 		/* drop all wait_count references we still
 		 * hold a reference for this iteration.
 		 */
-		mutex_lock(&ls->ls_waiters_mutex);
+		spin_lock(&ls->ls_waiters_lock);
 		while (atomic_read(&lkb->lkb_wait_count)) {
 			if (atomic_dec_and_test(&lkb->lkb_wait_count))
 				list_del_init(&lkb->lkb_wait_reply);
 
 			unhold_lkb(lkb);
 		}
-		mutex_unlock(&ls->ls_waiters_mutex);
+		spin_unlock(&ls->ls_waiters_lock);
 
 		if (oc || ou) {
 			/* do an unlock or cancel instead of resending */
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 5fe00bd1164d..42866190af38 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -515,7 +515,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	spin_lock_init(&ls->ls_lkbidr_spin);
 
 	INIT_LIST_HEAD(&ls->ls_waiters);
-	mutex_init(&ls->ls_waiters_mutex);
+	spin_lock_init(&ls->ls_waiters_lock);
 	INIT_LIST_HEAD(&ls->ls_orphans);
 	mutex_init(&ls->ls_orphans_mutex);
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 13/18] dlm: convert res_lock to spinlock
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (11 preceding siblings ...)
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 12/18] dlm: convert ls_waiters_mutex to spinlock Alexander Aring
@ 2024-02-27  1:49 ` Alexander Aring
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 14/18] dlm: make requestqueue handling non sleepable Alexander Aring
                   ` (4 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:49 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch converts the per dlm rsb res_lock from a mutex to a spinlock.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h | 2 +-
 fs/dlm/lock.c         | 2 +-
 fs/dlm/lock.h         | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 5975d52f4061..13225b28fa98 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -290,7 +290,7 @@ struct dlm_lkb {
 struct dlm_rsb {
 	struct dlm_ls		*res_ls;	/* the lockspace */
 	struct kref		res_ref;
-	struct mutex		res_mutex;
+	spinlock_t		res_lock;
 	unsigned long		res_flags;
 	int			res_length;	/* length of rsb name */
 	int			res_nodeid;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 525635f42938..267a18bcf2c6 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -415,7 +415,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 	r->res_ls = ls;
 	r->res_length = len;
 	memcpy(r->res_name, name, len);
-	mutex_init(&r->res_mutex);
+	spin_lock_init(&r->res_lock);
 
 	INIT_LIST_HEAD(&r->res_lookup);
 	INIT_LIST_HEAD(&r->res_grantqueue);
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index bc787a470632..ca71f1dc1b06 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -70,12 +70,12 @@ static inline int is_master(struct dlm_rsb *r)
 
 static inline void lock_rsb(struct dlm_rsb *r)
 {
-	mutex_lock(&r->res_mutex);
+	spin_lock(&r->res_lock);
 }
 
 static inline void unlock_rsb(struct dlm_rsb *r)
 {
-	mutex_unlock(&r->res_mutex);
+	spin_unlock(&r->res_lock);
 }
 
 #endif
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 14/18] dlm: make requestqueue handling non sleepable
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (12 preceding siblings ...)
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 13/18] dlm: convert res_lock " Alexander Aring
@ 2024-02-27  1:49 ` Alexander Aring
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 15/18] dlm: ls_recv_active semaphore to rwlock Alexander Aring
                   ` (3 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:49 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch removes the ls_requestqueue_wait and convert the
ls_requestqueue_mutex to a rw lock. Instead of calling wait_event() in
dlm processing which waits until all messages are processed and allow
new message processing after recovering is done, this patch is using a
bitflag to signal when a message should be saved for future or not. When
recovery processes all saved messages we will clear this bit again and
allow new messages to processed directly.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  5 ++---
 fs/dlm/lock.c         | 16 ++++++++++++++--
 fs/dlm/lockspace.c    |  4 +---
 fs/dlm/member.c       |  5 +++++
 fs/dlm/requestqueue.c | 41 ++++++++---------------------------------
 5 files changed, 30 insertions(+), 41 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 13225b28fa98..4cb1f38067d3 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -625,9 +625,7 @@ struct dlm_ls {
 	struct rw_semaphore	ls_in_recovery;	/* block local requests */
 	struct rw_semaphore	ls_recv_active;	/* block dlm_recv */
 	struct list_head	ls_requestqueue;/* queue remote requests */
-	atomic_t		ls_requestqueue_cnt;
-	wait_queue_head_t	ls_requestqueue_wait;
-	struct mutex		ls_requestqueue_mutex;
+	rwlock_t		ls_requestqueue_lock;
 	struct dlm_rcom		*ls_recover_buf;
 	int			ls_recover_nodeid; /* for debugging */
 	unsigned int		ls_recover_locks_in; /* for log info */
@@ -687,6 +685,7 @@ struct dlm_ls {
 #define LSFL_UEVENT_WAIT	7
 #define LSFL_CB_DELAY		9
 #define LSFL_NODIR		10
+#define LSFL_RECV_MSG_BLOCKED	11
 
 /* much of this is just saving user space pointers associated with the
    lock that we pass back to the user lib with an ast */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 267a18bcf2c6..10cebb418931 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -4749,20 +4749,32 @@ static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
 				int nodeid)
 {
-	if (dlm_locking_stopped(ls)) {
+try_again:
+	read_lock(&ls->ls_requestqueue_lock);
+	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
 		/* If we were a member of this lockspace, left, and rejoined,
 		   other nodes may still be sending us messages from the
 		   lockspace generation before we left. */
 		if (WARN_ON_ONCE(!ls->ls_generation)) {
+			read_unlock(&ls->ls_requestqueue_lock);
 			log_limit(ls, "receive %d from %d ignore old gen",
 				  le32_to_cpu(ms->m_type), nodeid);
 			return;
 		}
 
+		read_unlock(&ls->ls_requestqueue_lock);
+		write_lock(&ls->ls_requestqueue_lock);
+		/* recheck because we hold writelock now */
+		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
+			write_unlock_bh(&ls->ls_requestqueue_lock);
+			goto try_again;
+		}
+
 		dlm_add_requestqueue(ls, nodeid, ms);
+		write_unlock(&ls->ls_requestqueue_lock);
 	} else {
-		dlm_wait_requestqueue(ls);
 		_receive_message(ls, ms, 0);
+		read_unlock(&ls->ls_requestqueue_lock);
 	}
 }
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 42866190af38..0df0f07fa092 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -554,9 +554,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	init_rwsem(&ls->ls_in_recovery);
 	init_rwsem(&ls->ls_recv_active);
 	INIT_LIST_HEAD(&ls->ls_requestqueue);
-	atomic_set(&ls->ls_requestqueue_cnt, 0);
-	init_waitqueue_head(&ls->ls_requestqueue_wait);
-	mutex_init(&ls->ls_requestqueue_mutex);
+	rwlock_init(&ls->ls_requestqueue_lock);
 	spin_lock_init(&ls->ls_clear_proc_locks);
 
 	/* Due backwards compatibility with 3.1 we need to use maximum
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index be7909ead71b..707cebcdc533 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -642,6 +642,11 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
 	ls->ls_recover_seq++;
+
+	/* activate requestqueue and stop processing */
+	write_lock(&ls->ls_requestqueue_lock);
+	set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
+	write_unlock(&ls->ls_requestqueue_lock);
 	spin_unlock(&ls->ls_recover_lock);
 
 	/*
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index c05940afd063..9b646026df46 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -48,10 +48,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
 	memcpy(&e->request, ms, sizeof(*ms));
 	memcpy(&e->request.m_extra, ms->m_extra, length);
 
-	atomic_inc(&ls->ls_requestqueue_cnt);
-	mutex_lock(&ls->ls_requestqueue_mutex);
 	list_add_tail(&e->list, &ls->ls_requestqueue);
-	mutex_unlock(&ls->ls_requestqueue_mutex);
 }
 
 /*
@@ -71,16 +68,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
 	struct dlm_message *ms;
 	int error = 0;
 
-	mutex_lock(&ls->ls_requestqueue_mutex);
-
+	write_lock(&ls->ls_requestqueue_lock);
 	for (;;) {
 		if (list_empty(&ls->ls_requestqueue)) {
-			mutex_unlock(&ls->ls_requestqueue_mutex);
+			clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
 			error = 0;
 			break;
 		}
-		e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
-		mutex_unlock(&ls->ls_requestqueue_mutex);
+		e = list_first_entry(&ls->ls_requestqueue, struct rq_entry, list);
 
 		ms = &e->request;
 
@@ -93,41 +88,23 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
 			  e->recover_seq);
 
 		dlm_receive_message_saved(ls, &e->request, e->recover_seq);
-
-		mutex_lock(&ls->ls_requestqueue_mutex);
 		list_del(&e->list);
-		if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
-			wake_up(&ls->ls_requestqueue_wait);
 		kfree(e);
 
 		if (dlm_locking_stopped(ls)) {
 			log_debug(ls, "process_requestqueue abort running");
-			mutex_unlock(&ls->ls_requestqueue_mutex);
 			error = -EINTR;
 			break;
 		}
+		write_unlock(&ls->ls_requestqueue_lock);
 		schedule();
+		write_lock(&ls->ls_requestqueue_lock);
 	}
+	write_unlock(&ls->ls_requestqueue_lock);
 
 	return error;
 }
 
-/*
- * After recovery is done, locking is resumed and dlm_recoverd takes all the
- * saved requests and processes them as they would have been by dlm_recv.  At
- * the same time, dlm_recv will start receiving new requests from remote nodes.
- * We want to delay dlm_recv processing new requests until dlm_recoverd has
- * finished processing the old saved requests.  We don't check for locking
- * stopped here because dlm_ls_stop won't stop locking until it's suspended us
- * (dlm_recv).
- */
-
-void dlm_wait_requestqueue(struct dlm_ls *ls)
-{
-	wait_event(ls->ls_requestqueue_wait,
-		   atomic_read(&ls->ls_requestqueue_cnt) == 0);
-}
-
 static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
 {
 	__le32 type = ms->m_type;
@@ -158,17 +135,15 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
 	struct dlm_message *ms;
 	struct rq_entry *e, *safe;
 
-	mutex_lock(&ls->ls_requestqueue_mutex);
+	write_lock(&ls->ls_requestqueue_lock);
 	list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
 		ms =  &e->request;
 
 		if (purge_request(ls, ms, e->nodeid)) {
 			list_del(&e->list);
-			if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
-				wake_up(&ls->ls_requestqueue_wait);
 			kfree(e);
 		}
 	}
-	mutex_unlock(&ls->ls_requestqueue_mutex);
+	write_unlock(&ls->ls_requestqueue_lock);
 }
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 15/18] dlm: ls_recv_active semaphore to rwlock
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (13 preceding siblings ...)
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 14/18] dlm: make requestqueue handling non sleepable Alexander Aring
@ 2024-02-27  1:49 ` Alexander Aring
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 16/18] dlm: remove schedule in dlm receive path Alexander Aring
                   ` (2 subsequent siblings)
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:49 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch converts the ls_recv_active semaphore to a rwlock to not
sleep during dlm message processing.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h | 2 +-
 fs/dlm/lock.c         | 4 ++--
 fs/dlm/lockspace.c    | 2 +-
 fs/dlm/member.c       | 4 ++--
 fs/dlm/recoverd.c     | 4 ++--
 5 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 4cb1f38067d3..70de068bcf2b 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -623,7 +623,7 @@ struct dlm_ls {
 	uint64_t		ls_recover_seq;
 	struct dlm_recover	*ls_recover_args;
 	struct rw_semaphore	ls_in_recovery;	/* block local requests */
-	struct rw_semaphore	ls_recv_active;	/* block dlm_recv */
+	rwlock_t		ls_recv_active;	/* block dlm_recv */
 	struct list_head	ls_requestqueue;/* queue remote requests */
 	rwlock_t		ls_requestqueue_lock;
 	struct dlm_rcom		*ls_recover_buf;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 10cebb418931..73bf81eb88d9 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -4834,7 +4834,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
 	   be inactive (in this ls) before transitioning to recovery mode */
 
-	down_read(&ls->ls_recv_active);
+	read_lock(&ls->ls_recv_active);
 	if (hd->h_cmd == DLM_MSG)
 		dlm_receive_message(ls, &p->message, nodeid);
 	else if (hd->h_cmd == DLM_RCOM)
@@ -4842,7 +4842,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
 	else
 		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
 			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
-	up_read(&ls->ls_recv_active);
+	read_unlock(&ls->ls_recv_active);
 
 	dlm_put_lockspace(ls);
 }
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 0df0f07fa092..3384e0004700 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -552,7 +552,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	ls->ls_recover_seq = get_random_u64();
 	ls->ls_recover_args = NULL;
 	init_rwsem(&ls->ls_in_recovery);
-	init_rwsem(&ls->ls_recv_active);
+	rwlock_init(&ls->ls_recv_active);
 	INIT_LIST_HEAD(&ls->ls_requestqueue);
 	rwlock_init(&ls->ls_requestqueue_lock);
 	spin_lock_init(&ls->ls_clear_proc_locks);
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index 707cebcdc533..ac1b555af9d6 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -630,7 +630,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	 * message to the requestqueue without races.
 	 */
 
-	down_write(&ls->ls_recv_active);
+	write_lock(&ls->ls_recv_active);
 
 	/*
 	 * Abort any recovery that's in progress (see RECOVER_STOP,
@@ -654,7 +654,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	 * requestqueue for later.
 	 */
 
-	up_write(&ls->ls_recv_active);
+	write_unlock(&ls->ls_recv_active);
 
 	/*
 	 * This in_recovery lock does two things:
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 5388db89e22f..361327762c1b 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -103,7 +103,7 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 {
 	int error = -EINTR;
 
-	down_write(&ls->ls_recv_active);
+	write_lock(&ls->ls_recv_active);
 
 	spin_lock(&ls->ls_recover_lock);
 	if (ls->ls_recover_seq == seq) {
@@ -115,7 +115,7 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 	}
 	spin_unlock(&ls->ls_recover_lock);
 
-	up_write(&ls->ls_recv_active);
+	write_unlock(&ls->ls_recv_active);
 	return error;
 }
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 16/18] dlm: remove schedule in dlm receive path
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (14 preceding siblings ...)
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 15/18] dlm: ls_recv_active semaphore to rwlock Alexander Aring
@ 2024-02-27  1:49 ` Alexander Aring
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 17/18] dlm: convert message parsing locks to disable bh Alexander Aring
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 18/18] dlm: do dlm message processing in softirq context Alexander Aring
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:49 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch removes an explicit schedule() call in the receive path of
dlm message processing. The goal in DLM is to not trigger any additional
scheduling while processing DLM messages. This schedule() is directly
called inside the dlm message processing path. As soon we handle the dlm
messaging processing in softirq context we cannot call schedule() in
this context anymore. This patch prepares for this transition by simple
removing this schedule() call.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c | 1 -
 1 file changed, 1 deletion(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 73bf81eb88d9..a733dff09ac4 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -2540,7 +2540,6 @@ static void process_lookup_list(struct dlm_rsb *r)
 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
 		list_del_init(&lkb->lkb_rsb_lookup);
 		_request_lock(r, lkb);
-		schedule();
 	}
 }
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 17/18] dlm: convert message parsing locks to disable bh
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (15 preceding siblings ...)
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 16/18] dlm: remove schedule in dlm receive path Alexander Aring
@ 2024-02-27  1:49 ` Alexander Aring
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 18/18] dlm: do dlm message processing in softirq context Alexander Aring
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:49 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch converts all spinlocks involved in message parsing to it's
_bh version. The reason to do that is to convert the message parsing
into softirq context and we need to prevent that those locks can be
interrupted by a softirq if those are held.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/ast.c          |  28 +++---
 fs/dlm/debug_fs.c     |  32 +++----
 fs/dlm/dir.c          |  24 ++---
 fs/dlm/lock.c         | 204 ++++++++++++++++++++++++------------------
 fs/dlm/lock.h         |   4 +-
 fs/dlm/lockspace.c    |  51 ++++++-----
 fs/dlm/lowcomms.c     |  16 ++--
 fs/dlm/member.c       |  22 ++---
 fs/dlm/midcomms.c     |  40 ++++-----
 fs/dlm/rcom.c         |  26 +++---
 fs/dlm/recover.c      |  52 +++++------
 fs/dlm/recoverd.c     |  20 ++---
 fs/dlm/requestqueue.c |  12 +--
 fs/dlm/user.c         |  34 +++----
 14 files changed, 296 insertions(+), 269 deletions(-)

diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 1f2f70a1b824..e3c0903aaa6f 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -127,19 +127,19 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 		return;
 	}
 
-	spin_lock(&lkb->lkb_cb_lock);
+	spin_lock_bh(&lkb->lkb_cb_lock);
 	rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
 	switch (rv) {
 	case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
 		kref_get(&lkb->lkb_ref);
 
-		spin_lock(&ls->ls_cb_lock);
+		spin_lock_bh(&ls->ls_cb_lock);
 		if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
 			list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
 		} else {
 			queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
 		}
-		spin_unlock(&ls->ls_cb_lock);
+		spin_unlock_bh(&ls->ls_cb_lock);
 		break;
 	case DLM_ENQUEUE_CALLBACK_FAILURE:
 		WARN_ON_ONCE(1);
@@ -150,7 +150,7 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 		WARN_ON_ONCE(1);
 		break;
 	}
-	spin_unlock(&lkb->lkb_cb_lock);
+	spin_unlock_bh(&lkb->lkb_cb_lock);
 }
 
 void dlm_callback_work(struct work_struct *work)
@@ -162,14 +162,14 @@ void dlm_callback_work(struct work_struct *work)
 	struct dlm_callback *cb;
 	int rv;
 
-	spin_lock(&lkb->lkb_cb_lock);
+	spin_lock_bh(&lkb->lkb_cb_lock);
 	rv = dlm_dequeue_lkb_callback(lkb, &cb);
 	if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) {
 		clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
-		spin_unlock(&lkb->lkb_cb_lock);
+		spin_unlock_bh(&lkb->lkb_cb_lock);
 		goto out;
 	}
-	spin_unlock(&lkb->lkb_cb_lock);
+	spin_unlock_bh(&lkb->lkb_cb_lock);
 
 	for (;;) {
 		castfn = lkb->lkb_astfn;
@@ -190,14 +190,14 @@ void dlm_callback_work(struct work_struct *work)
 
 		kref_put(&cb->ref, dlm_release_callback);
 
-		spin_lock(&lkb->lkb_cb_lock);
+		spin_lock_bh(&lkb->lkb_cb_lock);
 		rv = dlm_dequeue_lkb_callback(lkb, &cb);
 		if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
 			clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
-			spin_unlock(&lkb->lkb_cb_lock);
+			spin_unlock_bh(&lkb->lkb_cb_lock);
 			break;
 		}
-		spin_unlock(&lkb->lkb_cb_lock);
+		spin_unlock_bh(&lkb->lkb_cb_lock);
 	}
 
 out:
@@ -225,9 +225,9 @@ void dlm_callback_stop(struct dlm_ls *ls)
 void dlm_callback_suspend(struct dlm_ls *ls)
 {
 	if (ls->ls_callback_wq) {
-		spin_lock(&ls->ls_cb_lock);
+		spin_lock_bh(&ls->ls_cb_lock);
 		set_bit(LSFL_CB_DELAY, &ls->ls_flags);
-		spin_unlock(&ls->ls_cb_lock);
+		spin_unlock_bh(&ls->ls_cb_lock);
 
 		flush_workqueue(ls->ls_callback_wq);
 	}
@@ -245,7 +245,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
 		return;
 
 more:
-	spin_lock(&ls->ls_cb_lock);
+	spin_lock_bh(&ls->ls_cb_lock);
 	list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
 		list_del_init(&lkb->lkb_cb_list);
 		queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
@@ -256,7 +256,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
 	empty = list_empty(&ls->ls_cb_delay);
 	if (empty)
 		clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
-	spin_unlock(&ls->ls_cb_lock);
+	spin_unlock_bh(&ls->ls_cb_lock);
 
 	sum += count;
 	if (!empty) {
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 8e7683840315..72bd72d07d66 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -372,7 +372,7 @@ static void print_format5_lock(struct seq_file *s, struct dlm_lkb *lkb)
 
 	/* lkb_id lkb_flags mode flags sb_status sb_flags */
 
-	spin_lock(&lkb->lkb_cb_lock);
+	spin_lock_bh(&lkb->lkb_cb_lock);
 	list_for_each_entry(cb, &lkb->lkb_callbacks, list) {
 		seq_printf(s, "%x %x %d %x %d %x\n",
 			   lkb->lkb_id,
@@ -382,7 +382,7 @@ static void print_format5_lock(struct seq_file *s, struct dlm_lkb *lkb)
 			   cb->sb_status,
 			   cb->sb_flags);
 	}
-	spin_unlock(&lkb->lkb_cb_lock);
+	spin_unlock_bh(&lkb->lkb_cb_lock);
 }
 
 static void print_format5(struct dlm_rsb *r, struct seq_file *s)
@@ -508,7 +508,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 
 	tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
-	spin_lock(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
 	if (!RB_EMPTY_ROOT(tree)) {
 		for (node = rb_first(tree); node; node = rb_next(node)) {
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
@@ -516,12 +516,12 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 				dlm_hold_rsb(r);
 				ri->rsb = r;
 				ri->bucket = bucket;
-				spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+				spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 				return ri;
 			}
 		}
 	}
-	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 
 	/*
 	 * move to the first rsb in the next non-empty bucket
@@ -540,18 +540,18 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		}
 		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
-		spin_lock(&ls->ls_rsbtbl[bucket].lock);
+		spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
 		if (!RB_EMPTY_ROOT(tree)) {
 			node = rb_first(tree);
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
-			spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 			*pos = n;
 			return ri;
 		}
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 	}
 }
 
@@ -572,7 +572,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 	 * move to the next rsb in the same bucket
 	 */
 
-	spin_lock(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
 	rp = ri->rsb;
 	next = rb_next(&rp->res_hashnode);
 
@@ -580,12 +580,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		r = rb_entry(next, struct dlm_rsb, res_hashnode);
 		dlm_hold_rsb(r);
 		ri->rsb = r;
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 		dlm_put_rsb(rp);
 		++*pos;
 		return ri;
 	}
-	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 	dlm_put_rsb(rp);
 
 	/*
@@ -606,18 +606,18 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		}
 		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
-		spin_lock(&ls->ls_rsbtbl[bucket].lock);
+		spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
 		if (!RB_EMPTY_ROOT(tree)) {
 			next = rb_first(tree);
 			r = rb_entry(next, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
-			spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 			*pos = n;
 			return ri;
 		}
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 	}
 }
 
@@ -824,7 +824,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 
 	mutex_lock(&debug_buf_lock);
 	dlm_lock_recovery(ls);
-	spin_lock(&ls->ls_waiters_lock);
+	spin_lock_bh(&ls->ls_waiters_lock);
 	memset(debug_buf, 0, sizeof(debug_buf));
 
 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
@@ -835,7 +835,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 			break;
 		pos += ret;
 	}
-	spin_unlock(&ls->ls_waiters_lock);
+	spin_unlock_bh(&ls->ls_waiters_lock);
 	dlm_unlock_recovery(ls);
 
 	rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 0dc8a1d9e411..ff3a51c759b5 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -204,12 +204,12 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 	hash = jhash(name, len, 0);
 	bucket = hash & (ls->ls_rsbtbl_size - 1);
 
-	spin_lock(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
 	if (rv)
 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
 					 name, len, &r);
-	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 
 	if (!rv)
 		return r;
@@ -245,7 +245,7 @@ static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
 {
 	struct dlm_dir_dump *dd, *safe;
 
-	write_lock(&ls->ls_dir_dump_lock);
+	write_lock_bh(&ls->ls_dir_dump_lock);
 	list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
 		if (dd->nodeid_init == nodeid) {
 			log_error(ls, "drop dump seq %llu",
@@ -254,21 +254,21 @@ static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
 			kfree(dd);
 		}
 	}
-	write_unlock(&ls->ls_dir_dump_lock);
+	write_unlock_bh(&ls->ls_dir_dump_lock);
 }
 
 static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
 {
 	struct dlm_dir_dump *iter, *dd = NULL;
 
-	read_lock(&ls->ls_dir_dump_lock);
+	read_lock_bh(&ls->ls_dir_dump_lock);
 	list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
 		if (iter->nodeid_init == nodeid) {
 			dd = iter;
 			break;
 		}
 	}
-	read_unlock(&ls->ls_dir_dump_lock);
+	read_unlock_bh(&ls->ls_dir_dump_lock);
 
 	return dd;
 }
@@ -291,9 +291,9 @@ static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
 	dd->seq_init = ls->ls_recover_seq;
 	dd->nodeid_init = nodeid;
 
-	write_lock(&ls->ls_dir_dump_lock);
+	write_lock_bh(&ls->ls_dir_dump_lock);
 	list_add(&dd->list, &ls->ls_dir_dump_list);
-	write_unlock(&ls->ls_dir_dump_lock);
+	write_unlock_bh(&ls->ls_dir_dump_lock);
 
 	return dd;
 }
@@ -311,7 +311,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 	struct dlm_dir_dump *dd;
 	__be16 be_namelen;
 
-	read_lock(&ls->ls_masters_lock);
+	read_lock_bh(&ls->ls_masters_lock);
 
 	if (inlen > 1) {
 		dd = lookup_dir_dump(ls, nodeid);
@@ -397,12 +397,12 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 		log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
 			  nodeid, dd->sent_res, dd->sent_msg);
 
-		write_lock(&ls->ls_dir_dump_lock);
+		write_lock_bh(&ls->ls_dir_dump_lock);
 		list_del_init(&dd->list);
-		write_unlock(&ls->ls_dir_dump_lock);
+		write_unlock_bh(&ls->ls_dir_dump_lock);
 		kfree(dd);
 	}
  out:
-	read_unlock(&ls->ls_masters_lock);
+	read_unlock_bh(&ls->ls_masters_lock);
 }
 
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index a733dff09ac4..2c60d3a8fab2 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -333,6 +333,34 @@ void dlm_hold_rsb(struct dlm_rsb *r)
 	hold_rsb(r);
 }
 
+/* TODO move this to lib/refcount.c */
+static bool dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
+{
+	if (refcount_dec_not_one(r))
+		return false;
+
+	spin_lock_bh(lock);
+	if (!refcount_dec_and_test(r)) {
+		spin_unlock_bh(lock);
+		return false;
+	}
+
+	return true;
+}
+
+/* TODO move this to include/linux/kref.h */
+static inline int dlm_kref_put_lock_bh(struct kref *kref,
+				       void (*release)(struct kref *kref),
+				       spinlock_t *lock)
+{
+	if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
+		release(kref);
+		return 1;
+	}
+
+	return 0;
+}
+
 /* When all references to the rsb are gone it's transferred to
    the tossed list for later disposal. */
 
@@ -342,10 +370,10 @@ static void put_rsb(struct dlm_rsb *r)
 	uint32_t bucket = r->res_bucket;
 	int rv;
 
-	rv = kref_put_lock(&r->res_ref, toss_rsb,
-			   &ls->ls_rsbtbl[bucket].lock);
+	rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb,
+				  &ls->ls_rsbtbl[bucket].lock);
 	if (rv)
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 }
 
 void dlm_put_rsb(struct dlm_rsb *r)
@@ -358,17 +386,17 @@ static int pre_rsb_struct(struct dlm_ls *ls)
 	struct dlm_rsb *r1, *r2;
 	int count = 0;
 
-	spin_lock(&ls->ls_new_rsb_spin);
+	spin_lock_bh(&ls->ls_new_rsb_spin);
 	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
-		spin_unlock(&ls->ls_new_rsb_spin);
+		spin_unlock_bh(&ls->ls_new_rsb_spin);
 		return 0;
 	}
-	spin_unlock(&ls->ls_new_rsb_spin);
+	spin_unlock_bh(&ls->ls_new_rsb_spin);
 
 	r1 = dlm_allocate_rsb(ls);
 	r2 = dlm_allocate_rsb(ls);
 
-	spin_lock(&ls->ls_new_rsb_spin);
+	spin_lock_bh(&ls->ls_new_rsb_spin);
 	if (r1) {
 		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
 		ls->ls_new_rsb_count++;
@@ -378,7 +406,7 @@ static int pre_rsb_struct(struct dlm_ls *ls)
 		ls->ls_new_rsb_count++;
 	}
 	count = ls->ls_new_rsb_count;
-	spin_unlock(&ls->ls_new_rsb_spin);
+	spin_unlock_bh(&ls->ls_new_rsb_spin);
 
 	if (!count)
 		return -ENOMEM;
@@ -395,10 +423,10 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 	struct dlm_rsb *r;
 	int count;
 
-	spin_lock(&ls->ls_new_rsb_spin);
+	spin_lock_bh(&ls->ls_new_rsb_spin);
 	if (list_empty(&ls->ls_new_rsb)) {
 		count = ls->ls_new_rsb_count;
-		spin_unlock(&ls->ls_new_rsb_spin);
+		spin_unlock_bh(&ls->ls_new_rsb_spin);
 		log_debug(ls, "find_rsb retry %d %d %s",
 			  count, dlm_config.ci_new_rsb_count,
 			  (const char *)name);
@@ -410,7 +438,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 	/* Convert the empty list_head to a NULL rb_node for tree usage: */
 	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
 	ls->ls_new_rsb_count--;
-	spin_unlock(&ls->ls_new_rsb_spin);
+	spin_unlock_bh(&ls->ls_new_rsb_spin);
 
 	r->res_ls = ls;
 	r->res_length = len;
@@ -584,7 +612,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 			goto out;
 	}
 
-	spin_lock(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (error)
@@ -654,7 +682,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		goto retry;
 	}
 	if (error)
@@ -703,7 +731,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
  out_add:
 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
  out_unlock:
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
  out:
 	*r_ret = r;
 	return error;
@@ -728,7 +756,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	if (error < 0)
 		goto out;
 
-	spin_lock(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (error)
@@ -786,7 +814,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		goto retry;
 	}
 	if (error)
@@ -801,7 +829,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
  out_unlock:
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
  out:
 	*r_ret = r;
 	return error;
@@ -1018,7 +1046,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	if (error < 0)
 		return error;
 
-	spin_lock(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (!error) {
 		/* because the rsb is active, we need to lock_rsb before
@@ -1026,7 +1054,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		 */
 
 		hold_rsb(r);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		lock_rsb(r);
 
 		__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
@@ -1052,14 +1080,14 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 
 	r->res_toss_time = jiffies;
 	/* the rsb was inactive (on toss list) */
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 
 	return 0;
 
  not_found:
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		goto retry;
 	}
 	if (error)
@@ -1077,7 +1105,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	if (error) {
 		/* should never happen */
 		dlm_free_rsb(r);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		goto retry;
 	}
 
@@ -1085,7 +1113,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		*result = DLM_LU_ADD;
 	*r_nodeid = from_nodeid;
  out_unlock:
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 	return error;
 }
 
@@ -1096,13 +1124,13 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 	int i;
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock(&ls->ls_rsbtbl[i].lock);
+		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			if (r->res_hash == hash)
 				dlm_dump_rsb(r);
 		}
-		spin_unlock(&ls->ls_rsbtbl[i].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
 }
 
@@ -1115,7 +1143,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
 	hash = jhash(name, len, 0);
 	b = hash & (ls->ls_rsbtbl_size - 1);
 
-	spin_lock(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (!error)
 		goto out_dump;
@@ -1126,7 +1154,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
  out_dump:
 	dlm_dump_rsb(r);
  out:
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 }
 
 static void toss_rsb(struct kref *kref)
@@ -1208,11 +1236,11 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	spin_lock_init(&lkb->lkb_cb_lock);
 	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
 
-	spin_lock(&ls->ls_lkbidr_spin);
+	spin_lock_bh(&ls->ls_lkbidr_spin);
 	rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
 	if (rv >= 0)
 		lkb->lkb_id = rv;
-	spin_unlock(&ls->ls_lkbidr_spin);
+	spin_unlock_bh(&ls->ls_lkbidr_spin);
 
 	if (rv < 0) {
 		log_error(ls, "create_lkb idr error %d", rv);
@@ -1233,11 +1261,11 @@ static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
 {
 	struct dlm_lkb *lkb;
 
-	spin_lock(&ls->ls_lkbidr_spin);
+	spin_lock_bh(&ls->ls_lkbidr_spin);
 	lkb = idr_find(&ls->ls_lkbidr, lkid);
 	if (lkb)
 		kref_get(&lkb->lkb_ref);
-	spin_unlock(&ls->ls_lkbidr_spin);
+	spin_unlock_bh(&ls->ls_lkbidr_spin);
 
 	*lkb_ret = lkb;
 	return lkb ? 0 : -ENOENT;
@@ -1261,11 +1289,11 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 	uint32_t lkid = lkb->lkb_id;
 	int rv;
 
-	rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
-			   &ls->ls_lkbidr_spin);
+	rv = dlm_kref_put_lock_bh(&lkb->lkb_ref, kill_lkb,
+				  &ls->ls_lkbidr_spin);
 	if (rv) {
 		idr_remove(&ls->ls_lkbidr, lkid);
-		spin_unlock(&ls->ls_lkbidr_spin);
+		spin_unlock_bh(&ls->ls_lkbidr_spin);
 
 		detach_lkb(lkb);
 
@@ -1407,7 +1435,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 	int error = 0;
 	int wc;
 
-	spin_lock(&ls->ls_waiters_lock);
+	spin_lock_bh(&ls->ls_waiters_lock);
 
 	if (is_overlap_unlock(lkb) ||
 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
@@ -1447,7 +1475,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
 			  lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
-	spin_unlock(&ls->ls_waiters_lock);
+	spin_unlock_bh(&ls->ls_waiters_lock);
 	return error;
 }
 
@@ -1546,9 +1574,9 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int error;
 
-	spin_lock(&ls->ls_waiters_lock);
+	spin_lock_bh(&ls->ls_waiters_lock);
 	error = _remove_from_waiters(lkb, mstype, NULL);
-	spin_unlock(&ls->ls_waiters_lock);
+	spin_unlock_bh(&ls->ls_waiters_lock);
 	return error;
 }
 
@@ -1566,13 +1594,13 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
 	int error;
 
 	if (!local)
-		spin_lock(&ls->ls_waiters_lock);
+		spin_lock_bh(&ls->ls_waiters_lock);
 	else
 		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
 			     !dlm_locking_stopped(ls));
 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
 	if (!local)
-		spin_unlock(&ls->ls_waiters_lock);
+		spin_unlock_bh(&ls->ls_waiters_lock);
 	return error;
 }
 
@@ -1588,10 +1616,10 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 	memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
 
-	spin_lock(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 
 	if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		return;
 	}
 
@@ -1648,7 +1676,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
 	else
 		clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 
 	/*
 	 * While searching for rsb's to free, we found some that require
@@ -1663,16 +1691,16 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		name = ls->ls_remove_names[i];
 		len = ls->ls_remove_lens[i];
 
-		spin_lock(&ls->ls_rsbtbl[b].lock);
+		spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
 		if (rv) {
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			log_debug(ls, "remove_name not toss %s", name);
 			continue;
 		}
 
 		if (r->res_master_nodeid != our_nodeid) {
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			log_debug(ls, "remove_name master %d dir %d our %d %s",
 				  r->res_master_nodeid, r->res_dir_nodeid,
 				  our_nodeid, name);
@@ -1681,7 +1709,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 		if (r->res_dir_nodeid == our_nodeid) {
 			/* should never happen */
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			log_error(ls, "remove_name dir %d master %d our %d %s",
 				  r->res_dir_nodeid, r->res_master_nodeid,
 				  our_nodeid, name);
@@ -1690,21 +1718,21 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 		if (!time_after_eq(jiffies, r->res_toss_time +
 				   dlm_config.ci_toss_secs * HZ)) {
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			log_debug(ls, "remove_name toss_time %lu now %lu %s",
 				  r->res_toss_time, jiffies, name);
 			continue;
 		}
 
 		if (!kref_put(&r->res_ref, kill_rsb)) {
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			log_error(ls, "remove_name in use %s", name);
 			continue;
 		}
 
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 		send_remove(r);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 
 		dlm_free_rsb(r);
 	}
@@ -4168,7 +4196,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 	hash = jhash(name, len, 0);
 	b = hash & (ls->ls_rsbtbl_size - 1);
 
-	spin_lock(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 
 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
 	if (rv) {
@@ -4178,7 +4206,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 			/* should not happen */
 			log_error(ls, "receive_remove from %d not found %s",
 				  from_nodeid, name);
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			return;
 		}
 		if (r->res_master_nodeid != from_nodeid) {
@@ -4186,14 +4214,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 			log_error(ls, "receive_remove keep from %d master %d",
 				  from_nodeid, r->res_master_nodeid);
 			dlm_print_rsb(r);
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			return;
 		}
 
 		log_debug(ls, "receive_remove from %d master %d first %x %s",
 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
 			  name);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		return;
 	}
 
@@ -4201,19 +4229,19 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 		log_error(ls, "receive_remove toss from %d master %d",
 			  from_nodeid, r->res_master_nodeid);
 		dlm_print_rsb(r);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		return;
 	}
 
 	if (kref_put(&r->res_ref, kill_rsb)) {
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		dlm_free_rsb(r);
 	} else {
 		log_error(ls, "receive_remove from %d rsb ref error",
 			  from_nodeid);
 		dlm_print_rsb(r);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 	}
 }
 
@@ -4749,20 +4777,20 @@ static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
 				int nodeid)
 {
 try_again:
-	read_lock(&ls->ls_requestqueue_lock);
+	read_lock_bh(&ls->ls_requestqueue_lock);
 	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
 		/* If we were a member of this lockspace, left, and rejoined,
 		   other nodes may still be sending us messages from the
 		   lockspace generation before we left. */
 		if (WARN_ON_ONCE(!ls->ls_generation)) {
-			read_unlock(&ls->ls_requestqueue_lock);
+			read_unlock_bh(&ls->ls_requestqueue_lock);
 			log_limit(ls, "receive %d from %d ignore old gen",
 				  le32_to_cpu(ms->m_type), nodeid);
 			return;
 		}
 
-		read_unlock(&ls->ls_requestqueue_lock);
-		write_lock(&ls->ls_requestqueue_lock);
+		read_unlock_bh(&ls->ls_requestqueue_lock);
+		write_lock_bh(&ls->ls_requestqueue_lock);
 		/* recheck because we hold writelock now */
 		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
 			write_unlock_bh(&ls->ls_requestqueue_lock);
@@ -4770,10 +4798,10 @@ static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
 		}
 
 		dlm_add_requestqueue(ls, nodeid, ms);
-		write_unlock(&ls->ls_requestqueue_lock);
+		write_unlock_bh(&ls->ls_requestqueue_lock);
 	} else {
 		_receive_message(ls, ms, 0);
-		read_unlock(&ls->ls_requestqueue_lock);
+		read_unlock_bh(&ls->ls_requestqueue_lock);
 	}
 }
 
@@ -4833,7 +4861,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
 	   be inactive (in this ls) before transitioning to recovery mode */
 
-	read_lock(&ls->ls_recv_active);
+	read_lock_bh(&ls->ls_recv_active);
 	if (hd->h_cmd == DLM_MSG)
 		dlm_receive_message(ls, &p->message, nodeid);
 	else if (hd->h_cmd == DLM_RCOM)
@@ -4841,7 +4869,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
 	else
 		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
 			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
-	read_unlock(&ls->ls_recv_active);
+	read_unlock_bh(&ls->ls_recv_active);
 
 	dlm_put_lockspace(ls);
 }
@@ -5001,7 +5029,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb = NULL, *iter;
 
-	spin_lock(&ls->ls_waiters_lock);
+	spin_lock_bh(&ls->ls_waiters_lock);
 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
 		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
 			hold_lkb(iter);
@@ -5009,7 +5037,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
 			break;
 		}
 	}
-	spin_unlock(&ls->ls_waiters_lock);
+	spin_unlock_bh(&ls->ls_waiters_lock);
 
 	return lkb;
 }
@@ -5073,14 +5101,14 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 		/* drop all wait_count references we still
 		 * hold a reference for this iteration.
 		 */
-		spin_lock(&ls->ls_waiters_lock);
+		spin_lock_bh(&ls->ls_waiters_lock);
 		while (atomic_read(&lkb->lkb_wait_count)) {
 			if (atomic_dec_and_test(&lkb->lkb_wait_count))
 				list_del_init(&lkb->lkb_wait_reply);
 
 			unhold_lkb(lkb);
 		}
-		spin_unlock(&ls->ls_waiters_lock);
+		spin_unlock_bh(&ls->ls_waiters_lock);
 
 		if (oc || ou) {
 			/* do an unlock or cancel instead of resending */
@@ -5244,7 +5272,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 	struct rb_node *n;
 	struct dlm_rsb *r;
 
-	spin_lock(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
 	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
 
@@ -5255,10 +5283,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 			continue;
 		}
 		hold_rsb(r);
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 		return r;
 	}
-	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 	return NULL;
 }
 
@@ -5602,10 +5630,10 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
 	}
 
 	/* add this new lkb to the per-process list of locks */
-	spin_lock(&ua->proc->locks_spin);
+	spin_lock_bh(&ua->proc->locks_spin);
 	hold_lkb(lkb);
 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
-	spin_unlock(&ua->proc->locks_spin);
+	spin_unlock_bh(&ua->proc->locks_spin);
 	do_put = false;
  out_put:
 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
@@ -5735,9 +5763,9 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
 	 * for the proc locks list.
 	 */
 
-	spin_lock(&ua->proc->locks_spin);
+	spin_lock_bh(&ua->proc->locks_spin);
 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
-	spin_unlock(&ua->proc->locks_spin);
+	spin_unlock_bh(&ua->proc->locks_spin);
  out:
 	kfree(ua_tmp);
 	return rv;
@@ -5781,11 +5809,11 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
 	if (error)
 		goto out_put;
 
-	spin_lock(&ua->proc->locks_spin);
+	spin_lock_bh(&ua->proc->locks_spin);
 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
 	if (!list_empty(&lkb->lkb_ownqueue))
 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
-	spin_unlock(&ua->proc->locks_spin);
+	spin_unlock_bh(&ua->proc->locks_spin);
  out_put:
 	trace_dlm_unlock_end(ls, lkb, flags, error);
 	dlm_put_lkb(lkb);
@@ -5936,7 +5964,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
 {
 	struct dlm_lkb *lkb = NULL;
 
-	spin_lock(&ls->ls_clear_proc_locks);
+	spin_lock_bh(&ls->ls_clear_proc_locks);
 	if (list_empty(&proc->locks))
 		goto out;
 
@@ -5948,7 +5976,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
 	else
 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
  out:
-	spin_unlock(&ls->ls_clear_proc_locks);
+	spin_unlock_bh(&ls->ls_clear_proc_locks);
 	return lkb;
 }
 
@@ -5984,7 +6012,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 		dlm_put_lkb(lkb);
 	}
 
-	spin_lock(&ls->ls_clear_proc_locks);
+	spin_lock_bh(&ls->ls_clear_proc_locks);
 
 	/* in-progress unlocks */
 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
@@ -5999,7 +6027,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 		dlm_put_lkb(lkb);
 	}
 
-	spin_unlock(&ls->ls_clear_proc_locks);
+	spin_unlock_bh(&ls->ls_clear_proc_locks);
 	dlm_unlock_recovery(ls);
 }
 
@@ -6009,13 +6037,13 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 
 	while (1) {
 		lkb = NULL;
-		spin_lock(&proc->locks_spin);
+		spin_lock_bh(&proc->locks_spin);
 		if (!list_empty(&proc->locks)) {
 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
 					 lkb_ownqueue);
 			list_del_init(&lkb->lkb_ownqueue);
 		}
-		spin_unlock(&proc->locks_spin);
+		spin_unlock_bh(&proc->locks_spin);
 
 		if (!lkb)
 			break;
@@ -6025,21 +6053,21 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 		dlm_put_lkb(lkb); /* ref from proc->locks list */
 	}
 
-	spin_lock(&proc->locks_spin);
+	spin_lock_bh(&proc->locks_spin);
 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
 		list_del_init(&lkb->lkb_ownqueue);
 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
 		dlm_put_lkb(lkb);
 	}
-	spin_unlock(&proc->locks_spin);
+	spin_unlock_bh(&proc->locks_spin);
 
-	spin_lock(&proc->asts_spin);
+	spin_lock_bh(&proc->asts_spin);
 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
 		dlm_purge_lkb_callbacks(lkb);
 		list_del_init(&lkb->lkb_cb_list);
 		dlm_put_lkb(lkb);
 	}
-	spin_unlock(&proc->asts_spin);
+	spin_unlock_bh(&proc->asts_spin);
 }
 
 /* pid of 0 means purge all orphans */
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index ca71f1dc1b06..c67c1ede8035 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -70,12 +70,12 @@ static inline int is_master(struct dlm_rsb *r)
 
 static inline void lock_rsb(struct dlm_rsb *r)
 {
-	spin_lock(&r->res_lock);
+	spin_lock_bh(&r->res_lock);
 }
 
 static inline void unlock_rsb(struct dlm_rsb *r)
 {
-	spin_unlock(&r->res_lock);
+	spin_unlock_bh(&r->res_lock);
 }
 
 #endif
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 3384e0004700..74435527d558 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -251,15 +251,15 @@ static struct dlm_ls *find_ls_to_scan(void)
 {
 	struct dlm_ls *ls;
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	list_for_each_entry(ls, &lslist, ls_list) {
 		if (time_after_eq(jiffies, ls->ls_scan_time +
 					    dlm_config.ci_scan_secs * HZ)) {
-			spin_unlock(&lslist_lock);
+			spin_unlock_bh(&lslist_lock);
 			return ls;
 		}
 	}
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 	return NULL;
 }
 
@@ -306,7 +306,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 {
 	struct dlm_ls *ls;
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 
 	list_for_each_entry(ls, &lslist, ls_list) {
 		if (ls->ls_global_id == id) {
@@ -316,7 +316,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 	}
 	ls = NULL;
  out:
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 	return ls;
 }
 
@@ -324,7 +324,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 {
 	struct dlm_ls *ls;
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	list_for_each_entry(ls, &lslist, ls_list) {
 		if (ls->ls_local_handle == lockspace) {
 			atomic_inc(&ls->ls_count);
@@ -333,7 +333,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 	}
 	ls = NULL;
  out:
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 	return ls;
 }
 
@@ -341,7 +341,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor)
 {
 	struct dlm_ls *ls;
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	list_for_each_entry(ls, &lslist, ls_list) {
 		if (ls->ls_device.minor == minor) {
 			atomic_inc(&ls->ls_count);
@@ -350,7 +350,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor)
 	}
 	ls = NULL;
  out:
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 	return ls;
 }
 
@@ -365,15 +365,15 @@ static void remove_lockspace(struct dlm_ls *ls)
 retry:
 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	if (atomic_read(&ls->ls_count) != 0) {
-		spin_unlock(&lslist_lock);
+		spin_unlock_bh(&lslist_lock);
 		goto retry;
 	}
 
 	WARN_ON(ls->ls_create_count != 0);
 	list_del(&ls->ls_list);
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 }
 
 static int threads_start(void)
@@ -448,7 +448,7 @@ static int new_lockspace(const char *name, const char *cluster,
 
 	error = 0;
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	list_for_each_entry(ls, &lslist, ls_list) {
 		WARN_ON(ls->ls_create_count <= 0);
 		if (ls->ls_namelen != namelen)
@@ -464,7 +464,7 @@ static int new_lockspace(const char *name, const char *cluster,
 		error = 1;
 		break;
 	}
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 
 	if (error)
 		goto out;
@@ -583,10 +583,10 @@ static int new_lockspace(const char *name, const char *cluster,
 	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
 	rwlock_init(&ls->ls_dir_dump_lock);
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	ls->ls_create_count = 1;
 	list_add(&ls->ls_list, &lslist);
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 
 	if (flags & DLM_LSFL_FS) {
 		error = dlm_callback_start(ls);
@@ -655,9 +655,9 @@ static int new_lockspace(const char *name, const char *cluster,
  out_callback:
 	dlm_callback_stop(ls);
  out_delist:
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	list_del(&ls->ls_list);
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 	idr_destroy(&ls->ls_recover_idr);
 	kfree(ls->ls_recover_buf);
  out_lkbidr:
@@ -770,7 +770,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 {
 	int rv;
 
-	spin_lock(&ls->ls_lkbidr_spin);
+	spin_lock_bh(&ls->ls_lkbidr_spin);
 	if (force == 0) {
 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 	} else if (force == 1) {
@@ -778,7 +778,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 	} else {
 		rv = 0;
 	}
-	spin_unlock(&ls->ls_lkbidr_spin);
+	spin_unlock_bh(&ls->ls_lkbidr_spin);
 	return rv;
 }
 
@@ -790,7 +790,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 
 	busy = lockspace_busy(ls, force);
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	if (ls->ls_create_count == 1) {
 		if (busy) {
 			rv = -EBUSY;
@@ -804,7 +804,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	} else {
 		rv = -EINVAL;
 	}
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 
 	if (rv) {
 		log_debug(ls, "release_lockspace no remove %d", rv);
@@ -929,20 +929,19 @@ void dlm_stop_lockspaces(void)
 
  restart:
 	count = 0;
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	list_for_each_entry(ls, &lslist, ls_list) {
 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
 			count++;
 			continue;
 		}
-		spin_unlock(&lslist_lock);
+		spin_unlock_bh(&lslist_lock);
 		log_error(ls, "no userland control daemon, stopping lockspace");
 		dlm_ls_stop(ls);
 		goto restart;
 	}
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 
 	if (count)
 		log_print("dlm user daemon left %d lockspaces", count);
 }
-
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index ab2cfbd2ea77..444dc858c4a4 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -867,36 +867,36 @@ static void process_dlm_messages(struct work_struct *work)
 {
 	struct processqueue_entry *pentry;
 
-	spin_lock(&processqueue_lock);
+	spin_lock_bh(&processqueue_lock);
 	pentry = list_first_entry_or_null(&processqueue,
 					  struct processqueue_entry, list);
 	if (WARN_ON_ONCE(!pentry)) {
 		process_dlm_messages_pending = false;
-		spin_unlock(&processqueue_lock);
+		spin_unlock_bh(&processqueue_lock);
 		return;
 	}
 
 	list_del(&pentry->list);
 	atomic_dec(&processqueue_count);
-	spin_unlock(&processqueue_lock);
+	spin_unlock_bh(&processqueue_lock);
 
 	for (;;) {
 		dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
 					    pentry->buflen);
 		free_processqueue_entry(pentry);
 
-		spin_lock(&processqueue_lock);
+		spin_lock_bh(&processqueue_lock);
 		pentry = list_first_entry_or_null(&processqueue,
 						  struct processqueue_entry, list);
 		if (!pentry) {
 			process_dlm_messages_pending = false;
-			spin_unlock(&processqueue_lock);
+			spin_unlock_bh(&processqueue_lock);
 			break;
 		}
 
 		list_del(&pentry->list);
 		atomic_dec(&processqueue_count);
-		spin_unlock(&processqueue_lock);
+		spin_unlock_bh(&processqueue_lock);
 	}
 }
 
@@ -966,14 +966,14 @@ static int receive_from_sock(struct connection *con, int buflen)
 	memmove(con->rx_leftover_buf, pentry->buf + ret,
 		con->rx_leftover);
 
-	spin_lock(&processqueue_lock);
+	spin_lock_bh(&processqueue_lock);
 	ret = atomic_inc_return(&processqueue_count);
 	list_add_tail(&pentry->list, &processqueue);
 	if (!process_dlm_messages_pending) {
 		process_dlm_messages_pending = true;
 		queue_work(process_workqueue, &process_work);
 	}
-	spin_unlock(&processqueue_lock);
+	spin_unlock_bh(&processqueue_lock);
 
 	if (ret > DLM_MAX_PROCESS_BUFFERS)
 		return DLM_IO_FLUSH;
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index ac1b555af9d6..6401916a97ef 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -630,7 +630,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	 * message to the requestqueue without races.
 	 */
 
-	write_lock(&ls->ls_recv_active);
+	write_lock_bh(&ls->ls_recv_active);
 
 	/*
 	 * Abort any recovery that's in progress (see RECOVER_STOP,
@@ -638,23 +638,23 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
 	 */
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
 	ls->ls_recover_seq++;
 
 	/* activate requestqueue and stop processing */
-	write_lock(&ls->ls_requestqueue_lock);
+	write_lock_bh(&ls->ls_requestqueue_lock);
 	set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
-	write_unlock(&ls->ls_requestqueue_lock);
-	spin_unlock(&ls->ls_recover_lock);
+	write_unlock_bh(&ls->ls_requestqueue_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
 	/*
 	 * Let dlm_recv run again, now any normal messages will be saved on the
 	 * requestqueue for later.
 	 */
 
-	write_unlock(&ls->ls_recv_active);
+	write_unlock_bh(&ls->ls_recv_active);
 
 	/*
 	 * This in_recovery lock does two things:
@@ -679,13 +679,13 @@ int dlm_ls_stop(struct dlm_ls *ls)
 
 	dlm_recoverd_suspend(ls);
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	kfree(ls->ls_slots);
 	ls->ls_slots = NULL;
 	ls->ls_num_slots = 0;
 	ls->ls_slots_size = 0;
 	ls->ls_recover_status = 0;
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
 	dlm_recoverd_resume(ls);
 
@@ -719,12 +719,12 @@ int dlm_ls_start(struct dlm_ls *ls)
 	if (error < 0)
 		goto fail_rv;
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 
 	/* the lockspace needs to be stopped before it can be started */
 
 	if (!dlm_locking_stopped(ls)) {
-		spin_unlock(&ls->ls_recover_lock);
+		spin_unlock_bh(&ls->ls_recover_lock);
 		log_error(ls, "start ignored: lockspace running");
 		error = -EINVAL;
 		goto fail;
@@ -735,7 +735,7 @@ int dlm_ls_start(struct dlm_ls *ls)
 	rv->seq = ++ls->ls_recover_seq;
 	rv_old = ls->ls_recover_args;
 	ls->ls_recover_args = rv;
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
 	if (rv_old) {
 		log_error(ls, "unused recovery %llx %d",
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index ed6fb9b9a582..c34f38e9ee5c 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -364,9 +364,9 @@ int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
 	node->users = 0;
 	midcomms_node_reset(node);
 
-	spin_lock(&nodes_lock);
+	spin_lock_bh(&nodes_lock);
 	hlist_add_head_rcu(&node->hlist, &node_hash[r]);
-	spin_unlock(&nodes_lock);
+	spin_unlock_bh(&nodes_lock);
 
 	node->debugfs = dlm_create_debug_comms_file(nodeid, node);
 	return 0;
@@ -477,7 +477,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
 
 static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
 {
-	spin_lock(&node->state_lock);
+	spin_lock_bh(&node->state_lock);
 	pr_debug("receive passive fin ack from node %d with state %s\n",
 		 node->nodeid, dlm_state_str(node->state));
 
@@ -491,13 +491,13 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
 		wake_up(&node->shutdown_wait);
 		break;
 	default:
-		spin_unlock(&node->state_lock);
+		spin_unlock_bh(&node->state_lock);
 		log_print("%s: unexpected state: %d",
 			  __func__, node->state);
 		WARN_ON_ONCE(1);
 		return;
 	}
-	spin_unlock(&node->state_lock);
+	spin_unlock_bh(&node->state_lock);
 }
 
 static void dlm_receive_buffer_3_2_trace(uint32_t seq,
@@ -534,7 +534,7 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
 	if (is_expected_seq) {
 		switch (p->header.h_cmd) {
 		case DLM_FIN:
-			spin_lock(&node->state_lock);
+			spin_lock_bh(&node->state_lock);
 			pr_debug("receive fin msg from node %d with state %s\n",
 				 node->nodeid, dlm_state_str(node->state));
 
@@ -575,13 +575,13 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
 				/* probably remove_member caught it, do nothing */
 				break;
 			default:
-				spin_unlock(&node->state_lock);
+				spin_unlock_bh(&node->state_lock);
 				log_print("%s: unexpected state: %d",
 					  __func__, node->state);
 				WARN_ON_ONCE(1);
 				return;
 			}
-			spin_unlock(&node->state_lock);
+			spin_unlock_bh(&node->state_lock);
 			break;
 		default:
 			WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
@@ -1182,7 +1182,7 @@ void dlm_midcomms_exit(void)
 
 static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
 {
-	spin_lock(&node->state_lock);
+	spin_lock_bh(&node->state_lock);
 	pr_debug("receive active fin ack from node %d with state %s\n",
 		 node->nodeid, dlm_state_str(node->state));
 
@@ -1202,13 +1202,13 @@ static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
 		wake_up(&node->shutdown_wait);
 		break;
 	default:
-		spin_unlock(&node->state_lock);
+		spin_unlock_bh(&node->state_lock);
 		log_print("%s: unexpected state: %d",
 			  __func__, node->state);
 		WARN_ON_ONCE(1);
 		return;
 	}
-	spin_unlock(&node->state_lock);
+	spin_unlock_bh(&node->state_lock);
 }
 
 void dlm_midcomms_add_member(int nodeid)
@@ -1223,7 +1223,7 @@ void dlm_midcomms_add_member(int nodeid)
 		return;
 	}
 
-	spin_lock(&node->state_lock);
+	spin_lock_bh(&node->state_lock);
 	if (!node->users) {
 		pr_debug("receive add member from node %d with state %s\n",
 			 node->nodeid, dlm_state_str(node->state));
@@ -1251,7 +1251,7 @@ void dlm_midcomms_add_member(int nodeid)
 
 	node->users++;
 	pr_debug("node %d users inc count %d\n", nodeid, node->users);
-	spin_unlock(&node->state_lock);
+	spin_unlock_bh(&node->state_lock);
 
 	srcu_read_unlock(&nodes_srcu, idx);
 }
@@ -1269,13 +1269,13 @@ void dlm_midcomms_remove_member(int nodeid)
 		return;
 	}
 
-	spin_lock(&node->state_lock);
+	spin_lock_bh(&node->state_lock);
 	/* case of dlm_midcomms_addr() created node but
 	 * was not added before because dlm_midcomms_close()
 	 * removed the node
 	 */
 	if (!node->users) {
-		spin_unlock(&node->state_lock);
+		spin_unlock_bh(&node->state_lock);
 		srcu_read_unlock(&nodes_srcu, idx);
 		return;
 	}
@@ -1313,7 +1313,7 @@ void dlm_midcomms_remove_member(int nodeid)
 			break;
 		}
 	}
-	spin_unlock(&node->state_lock);
+	spin_unlock_bh(&node->state_lock);
 
 	srcu_read_unlock(&nodes_srcu, idx);
 }
@@ -1351,7 +1351,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
 		return;
 	}
 
-	spin_lock(&node->state_lock);
+	spin_lock_bh(&node->state_lock);
 	pr_debug("receive active shutdown for node %d with state %s\n",
 		 node->nodeid, dlm_state_str(node->state));
 	switch (node->state) {
@@ -1370,7 +1370,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
 		 */
 		break;
 	}
-	spin_unlock(&node->state_lock);
+	spin_unlock_bh(&node->state_lock);
 
 	if (DLM_DEBUG_FENCE_TERMINATION)
 		msleep(5000);
@@ -1441,9 +1441,9 @@ int dlm_midcomms_close(int nodeid)
 	ret = dlm_lowcomms_close(nodeid);
 	dlm_delete_debug_comms_file(node->debugfs);
 
-	spin_lock(&nodes_lock);
+	spin_lock_bh(&nodes_lock);
 	hlist_del_rcu(&node->hlist);
-	spin_unlock(&nodes_lock);
+	spin_unlock_bh(&nodes_lock);
 	srcu_read_unlock(&nodes_srcu, idx);
 
 	/* wait that all readers left until flush send queue */
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 2e3f529f3ff2..be1a71a6303a 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -143,18 +143,18 @@ static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 
 static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
 {
-	spin_lock(&ls->ls_rcom_spin);
+	spin_lock_bh(&ls->ls_rcom_spin);
 	*new_seq = cpu_to_le64(++ls->ls_rcom_seq);
 	set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
-	spin_unlock(&ls->ls_rcom_spin);
+	spin_unlock_bh(&ls->ls_rcom_spin);
 }
 
 static void disallow_sync_reply(struct dlm_ls *ls)
 {
-	spin_lock(&ls->ls_rcom_spin);
+	spin_lock_bh(&ls->ls_rcom_spin);
 	clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 	clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
-	spin_unlock(&ls->ls_rcom_spin);
+	spin_unlock_bh(&ls->ls_rcom_spin);
 }
 
 /*
@@ -245,10 +245,10 @@ static void receive_rcom_status(struct dlm_ls *ls,
 		goto do_create;
 	}
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	status = ls->ls_recover_status;
 	num_slots = ls->ls_num_slots;
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 	len += num_slots * sizeof(struct rcom_slot);
 
  do_create:
@@ -266,9 +266,9 @@ static void receive_rcom_status(struct dlm_ls *ls,
 	if (!num_slots)
 		goto do_send;
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	if (ls->ls_num_slots != num_slots) {
-		spin_unlock(&ls->ls_recover_lock);
+		spin_unlock_bh(&ls->ls_recover_lock);
 		log_debug(ls, "receive_rcom_status num_slots %d to %d",
 			  num_slots, ls->ls_num_slots);
 		rc->rc_result = 0;
@@ -277,7 +277,7 @@ static void receive_rcom_status(struct dlm_ls *ls,
 	}
 
 	dlm_slots_copy_out(ls, rc);
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
  do_send:
 	send_rcom_stateless(msg, rc);
@@ -285,7 +285,7 @@ static void receive_rcom_status(struct dlm_ls *ls,
 
 static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
 {
-	spin_lock(&ls->ls_rcom_spin);
+	spin_lock_bh(&ls->ls_rcom_spin);
 	if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
 	    le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
 		log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
@@ -301,7 +301,7 @@ static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
 	clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 	wake_up(&ls->ls_wait_general);
  out:
-	spin_unlock(&ls->ls_rcom_spin);
+	spin_unlock_bh(&ls->ls_rcom_spin);
 }
 
 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
@@ -613,11 +613,11 @@ void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid)
 		break;
 	}
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	status = ls->ls_recover_status;
 	stop = dlm_recovery_stopped(ls);
 	seq = ls->ls_recover_seq;
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
 	if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
 		goto ignore;
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 172c6b73f37a..13bc845fa305 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -74,9 +74,9 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
 uint32_t dlm_recover_status(struct dlm_ls *ls)
 {
 	uint32_t status;
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	status = ls->ls_recover_status;
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 	return status;
 }
 
@@ -87,9 +87,9 @@ static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
 
 void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
 {
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	_set_recover_status(ls, status);
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 }
 
 static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
@@ -188,13 +188,13 @@ int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq)
 
 		rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
 		if (!rv) {
-			spin_lock(&ls->ls_recover_lock);
+			spin_lock_bh(&ls->ls_recover_lock);
 			_set_recover_status(ls, DLM_RS_NODES_ALL);
 			ls->ls_num_slots = num_slots;
 			ls->ls_slots_size = slots_size;
 			ls->ls_slots = slots;
 			ls->ls_generation = gen;
-			spin_unlock(&ls->ls_recover_lock);
+			spin_unlock_bh(&ls->ls_recover_lock);
 		} else {
 			dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
 		}
@@ -241,9 +241,9 @@ static int recover_list_empty(struct dlm_ls *ls)
 {
 	int empty;
 
-	spin_lock(&ls->ls_recover_list_lock);
+	spin_lock_bh(&ls->ls_recover_list_lock);
 	empty = list_empty(&ls->ls_recover_list);
-	spin_unlock(&ls->ls_recover_list_lock);
+	spin_unlock_bh(&ls->ls_recover_list_lock);
 
 	return empty;
 }
@@ -252,23 +252,23 @@ static void recover_list_add(struct dlm_rsb *r)
 {
 	struct dlm_ls *ls = r->res_ls;
 
-	spin_lock(&ls->ls_recover_list_lock);
+	spin_lock_bh(&ls->ls_recover_list_lock);
 	if (list_empty(&r->res_recover_list)) {
 		list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
 		ls->ls_recover_list_count++;
 		dlm_hold_rsb(r);
 	}
-	spin_unlock(&ls->ls_recover_list_lock);
+	spin_unlock_bh(&ls->ls_recover_list_lock);
 }
 
 static void recover_list_del(struct dlm_rsb *r)
 {
 	struct dlm_ls *ls = r->res_ls;
 
-	spin_lock(&ls->ls_recover_list_lock);
+	spin_lock_bh(&ls->ls_recover_list_lock);
 	list_del_init(&r->res_recover_list);
 	ls->ls_recover_list_count--;
-	spin_unlock(&ls->ls_recover_list_lock);
+	spin_unlock_bh(&ls->ls_recover_list_lock);
 
 	dlm_put_rsb(r);
 }
@@ -277,7 +277,7 @@ static void recover_list_clear(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r, *s;
 
-	spin_lock(&ls->ls_recover_list_lock);
+	spin_lock_bh(&ls->ls_recover_list_lock);
 	list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
 		list_del_init(&r->res_recover_list);
 		r->res_recover_locks_count = 0;
@@ -290,17 +290,17 @@ static void recover_list_clear(struct dlm_ls *ls)
 			  ls->ls_recover_list_count);
 		ls->ls_recover_list_count = 0;
 	}
-	spin_unlock(&ls->ls_recover_list_lock);
+	spin_unlock_bh(&ls->ls_recover_list_lock);
 }
 
 static int recover_idr_empty(struct dlm_ls *ls)
 {
 	int empty = 1;
 
-	spin_lock(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_idr_lock);
 	if (ls->ls_recover_list_count)
 		empty = 0;
-	spin_unlock(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_idr_lock);
 
 	return empty;
 }
@@ -310,7 +310,7 @@ static int recover_idr_add(struct dlm_rsb *r)
 	struct dlm_ls *ls = r->res_ls;
 	int rv;
 
-	spin_lock(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_idr_lock);
 	if (r->res_id) {
 		rv = -1;
 		goto out_unlock;
@@ -324,7 +324,7 @@ static int recover_idr_add(struct dlm_rsb *r)
 	dlm_hold_rsb(r);
 	rv = 0;
 out_unlock:
-	spin_unlock(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_idr_lock);
 	return rv;
 }
 
@@ -332,11 +332,11 @@ static void recover_idr_del(struct dlm_rsb *r)
 {
 	struct dlm_ls *ls = r->res_ls;
 
-	spin_lock(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_idr_lock);
 	idr_remove(&ls->ls_recover_idr, r->res_id);
 	r->res_id = 0;
 	ls->ls_recover_list_count--;
-	spin_unlock(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_idr_lock);
 
 	dlm_put_rsb(r);
 }
@@ -345,9 +345,9 @@ static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
 {
 	struct dlm_rsb *r;
 
-	spin_lock(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_idr_lock);
 	r = idr_find(&ls->ls_recover_idr, (int)id);
-	spin_unlock(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_idr_lock);
 	return r;
 }
 
@@ -356,7 +356,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
 	struct dlm_rsb *r;
 	int id;
 
-	spin_lock(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_idr_lock);
 
 	idr_for_each_entry(&ls->ls_recover_idr, r, id) {
 		idr_remove(&ls->ls_recover_idr, id);
@@ -372,7 +372,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
 			  ls->ls_recover_list_count);
 		ls->ls_recover_list_count = 0;
 	}
-	spin_unlock(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_idr_lock);
 }
 
 
@@ -887,7 +887,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
 	int i;
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock(&ls->ls_rsbtbl[i].lock);
+		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
 			next = rb_next(n);
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
@@ -895,7 +895,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
 			dlm_free_rsb(r);
 			count++;
 		}
-		spin_unlock(&ls->ls_rsbtbl[i].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
 
 	if (count)
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 361327762c1b..a18738b74261 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -26,7 +26,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 	struct dlm_rsb *r;
 	int i, error = 0;
 
-	write_lock(&ls->ls_masters_lock);
+	write_lock_bh(&ls->ls_masters_lock);
 	if (!list_empty(&ls->ls_masters_list)) {
 		log_error(ls, "root list not empty");
 		error = -EINVAL;
@@ -46,7 +46,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
  out:
-	write_unlock(&ls->ls_masters_lock);
+	write_unlock_bh(&ls->ls_masters_lock);
 	return error;
 }
 
@@ -54,12 +54,12 @@ static void dlm_release_masters_list(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r, *safe;
 
-	write_lock(&ls->ls_masters_lock);
+	write_lock_bh(&ls->ls_masters_lock);
 	list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) {
 		list_del_init(&r->res_masters_list);
 		dlm_put_rsb(r);
 	}
-	write_unlock(&ls->ls_masters_lock);
+	write_unlock_bh(&ls->ls_masters_lock);
 }
 
 static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
@@ -103,9 +103,9 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 {
 	int error = -EINTR;
 
-	write_lock(&ls->ls_recv_active);
+	write_lock_bh(&ls->ls_recv_active);
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	if (ls->ls_recover_seq == seq) {
 		set_bit(LSFL_RUNNING, &ls->ls_flags);
 		/* unblocks processes waiting to enter the dlm */
@@ -113,9 +113,9 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 		clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
 		error = 0;
 	}
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
-	write_unlock(&ls->ls_recv_active);
+	write_unlock_bh(&ls->ls_recv_active);
 	return error;
 }
 
@@ -348,12 +348,12 @@ static void do_ls_recovery(struct dlm_ls *ls)
 	struct dlm_recover *rv = NULL;
 	int error;
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	rv = ls->ls_recover_args;
 	ls->ls_recover_args = NULL;
 	if (rv && ls->ls_recover_seq == rv->seq)
 		clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
 	if (rv) {
 		error = ls_recover(ls, rv);
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 9b646026df46..719a5243a069 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -68,7 +68,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
 	struct dlm_message *ms;
 	int error = 0;
 
-	write_lock(&ls->ls_requestqueue_lock);
+	write_lock_bh(&ls->ls_requestqueue_lock);
 	for (;;) {
 		if (list_empty(&ls->ls_requestqueue)) {
 			clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
@@ -96,11 +96,11 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
 			error = -EINTR;
 			break;
 		}
-		write_unlock(&ls->ls_requestqueue_lock);
+		write_unlock_bh(&ls->ls_requestqueue_lock);
 		schedule();
-		write_lock(&ls->ls_requestqueue_lock);
+		write_lock_bh(&ls->ls_requestqueue_lock);
 	}
-	write_unlock(&ls->ls_requestqueue_lock);
+	write_unlock_bh(&ls->ls_requestqueue_lock);
 
 	return error;
 }
@@ -135,7 +135,7 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
 	struct dlm_message *ms;
 	struct rq_entry *e, *safe;
 
-	write_lock(&ls->ls_requestqueue_lock);
+	write_lock_bh(&ls->ls_requestqueue_lock);
 	list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
 		ms =  &e->request;
 
@@ -144,6 +144,6 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
 			kfree(e);
 		}
 	}
-	write_unlock(&ls->ls_requestqueue_lock);
+	write_unlock_bh(&ls->ls_requestqueue_lock);
 }
 
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 695e691b38b3..c0d35678ee54 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -206,7 +206,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
 		return;
 
 	ls = lkb->lkb_resource->res_ls;
-	spin_lock(&ls->ls_clear_proc_locks);
+	spin_lock_bh(&ls->ls_clear_proc_locks);
 
 	/* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
 	   can't be delivered.  For ORPHAN's, dlm_clear_proc_locks() freed
@@ -228,12 +228,12 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
 	if ((flags & DLM_CB_CAST) && lkb_is_endoflife(mode, status))
 		set_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags);
 
-	spin_lock(&proc->asts_spin);
+	spin_lock_bh(&proc->asts_spin);
 
 	rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
 	switch (rv) {
 	case DLM_ENQUEUE_CALLBACK_FAILURE:
-		spin_unlock(&proc->asts_spin);
+		spin_unlock_bh(&proc->asts_spin);
 		WARN_ON_ONCE(1);
 		goto out;
 	case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
@@ -247,19 +247,19 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
 		WARN_ON_ONCE(1);
 		break;
 	}
-	spin_unlock(&proc->asts_spin);
+	spin_unlock_bh(&proc->asts_spin);
 
 	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
 		/* N.B. spin_lock locks_spin, not asts_spin */
-		spin_lock(&proc->locks_spin);
+		spin_lock_bh(&proc->locks_spin);
 		if (!list_empty(&lkb->lkb_ownqueue)) {
 			list_del_init(&lkb->lkb_ownqueue);
 			dlm_put_lkb(lkb);
 		}
-		spin_unlock(&proc->locks_spin);
+		spin_unlock_bh(&proc->locks_spin);
 	}
  out:
-	spin_unlock(&ls->ls_clear_proc_locks);
+	spin_unlock_bh(&ls->ls_clear_proc_locks);
 }
 
 static int device_user_lock(struct dlm_user_proc *proc,
@@ -832,10 +832,10 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
 	if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
 		return -EINVAL;
 
-	spin_lock(&proc->asts_spin);
+	spin_lock_bh(&proc->asts_spin);
 	if (list_empty(&proc->asts)) {
 		if (file->f_flags & O_NONBLOCK) {
-			spin_unlock(&proc->asts_spin);
+			spin_unlock_bh(&proc->asts_spin);
 			return -EAGAIN;
 		}
 
@@ -844,16 +844,16 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
 	repeat:
 		set_current_state(TASK_INTERRUPTIBLE);
 		if (list_empty(&proc->asts) && !signal_pending(current)) {
-			spin_unlock(&proc->asts_spin);
+			spin_unlock_bh(&proc->asts_spin);
 			schedule();
-			spin_lock(&proc->asts_spin);
+			spin_lock_bh(&proc->asts_spin);
 			goto repeat;
 		}
 		set_current_state(TASK_RUNNING);
 		remove_wait_queue(&proc->wait, &wait);
 
 		if (signal_pending(current)) {
-			spin_unlock(&proc->asts_spin);
+			spin_unlock_bh(&proc->asts_spin);
 			return -ERESTARTSYS;
 		}
 	}
@@ -875,7 +875,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
 		 */
 		log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
 		list_del_init(&lkb->lkb_cb_list);
-		spin_unlock(&proc->asts_spin);
+		spin_unlock_bh(&proc->asts_spin);
 		/* removes ref for proc->asts, may cause lkb to be freed */
 		dlm_put_lkb(lkb);
 		WARN_ON_ONCE(1);
@@ -890,7 +890,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
 		WARN_ON_ONCE(1);
 		break;
 	}
-	spin_unlock(&proc->asts_spin);
+	spin_unlock_bh(&proc->asts_spin);
 
 	if (cb->flags & DLM_CB_BAST) {
 		trace_dlm_bast(lkb->lkb_resource->res_ls, lkb, cb->mode);
@@ -925,12 +925,12 @@ static __poll_t device_poll(struct file *file, poll_table *wait)
 
 	poll_wait(file, &proc->wait, wait);
 
-	spin_lock(&proc->asts_spin);
+	spin_lock_bh(&proc->asts_spin);
 	if (!list_empty(&proc->asts)) {
-		spin_unlock(&proc->asts_spin);
+		spin_unlock_bh(&proc->asts_spin);
 		return EPOLLIN | EPOLLRDNORM;
 	}
-	spin_unlock(&proc->asts_spin);
+	spin_unlock_bh(&proc->asts_spin);
 	return 0;
 }
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCHv3 v6.8-rc6 18/18] dlm: do dlm message processing in softirq context
  2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
                   ` (16 preceding siblings ...)
  2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 17/18] dlm: convert message parsing locks to disable bh Alexander Aring
@ 2024-02-27  1:49 ` Alexander Aring
  17 siblings, 0 replies; 19+ messages in thread
From: Alexander Aring @ 2024-02-27  1:49 UTC (permalink / raw
  To: teigland; +Cc: gfs2, aahringo

This patch moves the dlm message processing from a ordered workqueue
context to a ordered softirq context. Later we want to call the user
defined ast/bast callbacks directly inside the dlm message processing
context instead of doing an additional context switch to the exisiting
callback workqueue. This should slightly improve the dlm message parsing
behaviour. There are two main reasons why to change to this behaviour:

1.
   Allow fewer scheduling possibilities for dlm message parsing context.
   This should deliver faster DLM user responses to ast/bast callbacks.
   Fewer interrupting of lock requests processing that might trigger a
   new lock request avoids situations that we don't finish lock
   requests. In future the DLM callback workqueue can be disabled by
   a kernel lockspace flag to signal the DLM kernel user is capable
   of exectuing the callbacks in softirq context. If this flag is set,
   the dlm processing gets rid of an additional queue_work() context
   switch that should take more advantage about the new softirq context
   because the last preemption possibility is removed from the message
   processing context.

2. Bringing the ast/callback callback to softirq context that the use is
   aware it should not block into this context. Later patches will
   introduce a per lockspace flag to signal that the user is capable to
   handling these callbacks in softirq context to solve backwards
   compatibility.

3. We can easily switch to a per dlm instance concurrent dlm message
   parsing when DLM is ready to handle it. This instance could be in
   e.g. per lockspace instance or more fine granulatiry instance such as
   per lock instance.

Futher patches will unveil more improvements to switch to a per message
softirq parsing context. Especially if we getting DLM in a state that we
can allow concurrent message parsing.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 34 ++++++++++++----------------------
 1 file changed, 12 insertions(+), 22 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 444dc858c4a4..303d2837f68b 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -183,7 +183,6 @@ static int dlm_local_count;
 
 /* Work queues */
 static struct workqueue_struct *io_workqueue;
-static struct workqueue_struct *process_workqueue;
 
 static struct hlist_head connection_hash[CONN_HASH_SIZE];
 static DEFINE_SPINLOCK(connections_lock);
@@ -199,9 +198,9 @@ static const struct dlm_proto_ops *dlm_proto_ops;
 
 static void process_recv_sockets(struct work_struct *work);
 static void process_send_sockets(struct work_struct *work);
-static void process_dlm_messages(struct work_struct *work);
+static void process_dlm_messages(struct tasklet_struct *tasklet);
 
-static DECLARE_WORK(process_work, process_dlm_messages);
+static DECLARE_TASKLET_DISABLED(process_tasklet, process_dlm_messages);
 static DEFINE_SPINLOCK(processqueue_lock);
 static bool process_dlm_messages_pending;
 static atomic_t processqueue_count;
@@ -863,7 +862,7 @@ struct dlm_processed_nodes {
 	struct list_head list;
 };
 
-static void process_dlm_messages(struct work_struct *work)
+static void process_dlm_messages(struct tasklet_struct *tasklet)
 {
 	struct processqueue_entry *pentry;
 
@@ -971,7 +970,7 @@ static int receive_from_sock(struct connection *con, int buflen)
 	list_add_tail(&pentry->list, &processqueue);
 	if (!process_dlm_messages_pending) {
 		process_dlm_messages_pending = true;
-		queue_work(process_workqueue, &process_work);
+		tasklet_schedule(&process_tasklet);
 	}
 	spin_unlock_bh(&processqueue_lock);
 
@@ -1511,7 +1510,8 @@ static void process_recv_sockets(struct work_struct *work)
 		/* CF_RECV_PENDING cleared */
 		break;
 	case DLM_IO_FLUSH:
-		flush_workqueue(process_workqueue);
+		tasklet_disable(&process_tasklet);
+		tasklet_enable(&process_tasklet);
 		fallthrough;
 	case DLM_IO_RESCHED:
 		cond_resched();
@@ -1686,10 +1686,7 @@ static void work_stop(void)
 		io_workqueue = NULL;
 	}
 
-	if (process_workqueue) {
-		destroy_workqueue(process_workqueue);
-		process_workqueue = NULL;
-	}
+	tasklet_disable(&process_tasklet);
 }
 
 static int work_start(void)
@@ -1701,17 +1698,7 @@ static int work_start(void)
 		return -ENOMEM;
 	}
 
-	/* ordered dlm message process queue,
-	 * should be converted to a tasklet
-	 */
-	process_workqueue = alloc_ordered_workqueue("dlm_process",
-						    WQ_HIGHPRI | WQ_MEM_RECLAIM);
-	if (!process_workqueue) {
-		log_print("can't start dlm_process");
-		destroy_workqueue(io_workqueue);
-		io_workqueue = NULL;
-		return -ENOMEM;
-	}
+	tasklet_enable(&process_tasklet);
 
 	return 0;
 }
@@ -1734,7 +1721,10 @@ void dlm_lowcomms_shutdown(void)
 		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
 			shutdown_connection(con, true);
 			stop_connection_io(con);
-			flush_workqueue(process_workqueue);
+
+			tasklet_disable(&process_tasklet);
+			tasklet_enable(&process_tasklet);
+
 			close_connection(con, true);
 
 			clean_one_writequeue(con);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 19+ messages in thread

end of thread, other threads:[~2024-02-27  1:49 UTC | newest]

Thread overview: 19+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-02-27  1:48 [PATCHv3 v6.8-rc6 00/18] dlm: bring message parsing to softirq context Alexander Aring
2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 01/18] fs: dlm: Simplify the allocation of slab caches in dlm_midcomms_cache_create Alexander Aring
2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 02/18] fs: dlm: Simplify the allocation of slab caches in dlm_lowcomms_msg_cache_create Alexander Aring
2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 03/18] dlm: fix off-by-one waiters refcount handling Alexander Aring
2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 04/18] dlm: put lkbs instead of force free Alexander Aring
2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 05/18] dlm: remove allocation parameter in msg allocation Alexander Aring
2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 06/18] dlm: switch to GFP_ATOMIC in dlm allocations Alexander Aring
2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 07/18] dlm: move root_list functionality to recover.c Alexander Aring
2024-02-27  1:48 ` [PATCHv3 v6.8-rc6 08/18] dlm: move master dir dump to own list Alexander Aring
2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 09/18] dlm: move root_list to ls_recover() stack Alexander Aring
2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 10/18] dlm: implement directory dump context Alexander Aring
2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 11/18] dlm: drop holding waiters mutex in waiters recovery Alexander Aring
2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 12/18] dlm: convert ls_waiters_mutex to spinlock Alexander Aring
2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 13/18] dlm: convert res_lock " Alexander Aring
2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 14/18] dlm: make requestqueue handling non sleepable Alexander Aring
2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 15/18] dlm: ls_recv_active semaphore to rwlock Alexander Aring
2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 16/18] dlm: remove schedule in dlm receive path Alexander Aring
2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 17/18] dlm: convert message parsing locks to disable bh Alexander Aring
2024-02-27  1:49 ` [PATCHv3 v6.8-rc6 18/18] dlm: do dlm message processing in softirq context Alexander Aring

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.