Linux-EROFS Archive mirror
 help / color / mirror / Atom feed
From: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
To: linux-erofs@lists.ozlabs.org
Cc: hsiangkao@linux.alibaba.com, xin_tong@sjtu.edu.cn
Subject: [PATCH 1/7] erofs-utils: introduce multi-threading framework
Date: Sun,  4 Feb 2024 18:33:31 +0800	[thread overview]
Message-ID: <20240204103331.141298-1-zhaoyifan@sjtu.edu.cn> (raw)

From: Gao Xiang <hsiangkao@linux.alibaba.com>

Add a workqueue implementation based on xfsprogs for multi-threading
support.

Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 configure.ac              |  16 +++
 include/erofs/internal.h  |   3 +
 include/erofs/workqueue.h |  44 ++++++++
 lib/Makefile.am           |   4 +
 lib/workqueue.c           | 205 ++++++++++++++++++++++++++++++++++++++
 5 files changed, 272 insertions(+)
 create mode 100644 include/erofs/workqueue.h
 create mode 100644 lib/workqueue.c

diff --git a/configure.ac b/configure.ac
index bf6e99f..53306c3 100644
--- a/configure.ac
+++ b/configure.ac
@@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY],
 
 AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which erofs-utils supports])
 
+AC_MSG_CHECKING([whether to enable multi-threading support])
+AC_ARG_ENABLE([multithreading],
+    AS_HELP_STRING([--enable-multithreading],
+                   [enable multi-threading support @<:@default=no@:>@]),
+    [enable_multithreading="$enableval"],
+    [enable_multithreading="no"])
+AC_MSG_RESULT([$enable_multithreading])
+
 AC_ARG_ENABLE([debug],
     [AS_HELP_STRING([--enable-debug],
                     [enable debugging mode @<:@default=no@:>@])],
@@ -288,6 +296,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [
                              [erofs_cv_max_block_size=4096]))
 ], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE])
 
+# Configure multi-threading support
+AS_IF([test "x$enable_multithreading" != "xno"], [
+    AC_CHECK_HEADERS([pthread.h])
+    AC_CHECK_LIB([pthread], [pthread_mutex_lock], [], AC_MSG_ERROR([libpthread is required for multi-threaded build]))
+    AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support])
+], [])
+
 # Configure debug mode
 AS_IF([test "x$enable_debug" != "xno"], [], [
   dnl Turn off all assert checking.
@@ -467,6 +482,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [
 AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"])
 
 # Set up needed symbols, conditionals and compiler/linker flags
+AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}" != "xno"])
 AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"])
 AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"])
 AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"])
diff --git a/include/erofs/internal.h b/include/erofs/internal.h
index 82797e1..954aef4 100644
--- a/include/erofs/internal.h
+++ b/include/erofs/internal.h
@@ -22,6 +22,9 @@ typedef unsigned short umode_t;
 #include <sys/types.h> /* for off_t definition */
 #include <sys/stat.h> /* for S_ISCHR definition */
 #include <stdio.h>
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
 
 #ifndef PATH_MAX
 #define PATH_MAX        4096    /* # chars in a path name including nul */
diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
new file mode 100644
index 0000000..d0ebc5a
--- /dev/null
+++ b/include/erofs/workqueue.h
@@ -0,0 +1,44 @@
+/* SPDX-License-Identifier: GPL-2.0+ */
+/*
+ * Copyright (C) 2017 Oracle.  All Rights Reserved.
+ * Author: Darrick J. Wong <darrick.wong@oracle.com>
+ */
+#ifndef __EROFS_WORKQUEUE_H
+#define __EROFS_WORKQUEUE_H
+
+#include "internal.h"
+
+struct erofs_workqueue;
+struct erofs_work;
+
+typedef void erofs_workqueue_func_t(struct erofs_workqueue *wq,
+				    struct erofs_work *work);
+
+struct erofs_work {
+	struct erofs_workqueue	*queue;
+	struct erofs_work	*next;
+	erofs_workqueue_func_t	*function;
+};
+
+struct erofs_workqueue {
+	pthread_t		*threads;
+	struct erofs_work	*next_item;
+	struct erofs_work	*last_item;
+	pthread_mutex_t		lock;
+	pthread_cond_t		wakeup;
+	unsigned int		item_count;
+	unsigned int		thread_count;
+	bool			terminate;
+	bool			terminated;
+	int			max_queued;
+	pthread_cond_t		queue_full;
+};
+
+int erofs_workqueue_create(struct erofs_workqueue *wq,
+			   unsigned int nr_workers, unsigned int max_queue);
+int erofs_workqueue_add(struct erofs_workqueue	*wq,
+			struct erofs_work *wi);
+int erofs_workqueue_terminate(struct erofs_workqueue *wq);
+void erofs_workqueue_destroy(struct erofs_workqueue *wq);
+
+#endif
diff --git a/lib/Makefile.am b/lib/Makefile.am
index 54b9c9c..7307f7b 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c compressor_deflate.c
 if ENABLE_LIBDEFLATE
 liberofs_la_SOURCES += compressor_libdeflate.c
 endif
+if ENABLE_EROFS_MT
+liberofs_la_CFLAGS += -lpthread
+liberofs_la_SOURCES += workqueue.c
+endif
diff --git a/lib/workqueue.c b/lib/workqueue.c
new file mode 100644
index 0000000..6573821
--- /dev/null
+++ b/lib/workqueue.c
@@ -0,0 +1,205 @@
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2017 Oracle.  All Rights Reserved.
+ * Author: Darrick J. Wong <darrick.wong@oracle.com>
+ */
+#include <pthread.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <errno.h>
+#include <assert.h>
+#include "erofs/workqueue.h"
+
+/* Main processing thread */
+static void *workqueue_thread(void *arg)
+{
+	struct erofs_workqueue		*wq = arg;
+	struct erofs_work		*wi;
+
+	/*
+	 * Loop pulling work from the passed in work queue.
+	 * Check for notification to exit after every chunk of work.
+	 */
+	while (1) {
+		pthread_mutex_lock(&wq->lock);
+
+		/*
+		 * Wait for work.
+		 */
+		while (wq->next_item == NULL && !wq->terminate) {
+			assert(wq->item_count == 0);
+			pthread_cond_wait(&wq->wakeup, &wq->lock);
+		}
+		if (wq->next_item == NULL && wq->terminate) {
+			pthread_mutex_unlock(&wq->lock);
+			break;
+		}
+
+		/*
+		 *  Dequeue work from the head of the list. If the queue was
+		 *  full then send a wakeup if we're configured to do so.
+		 */
+		assert(wq->item_count > 0);
+		if (wq->max_queued)
+			pthread_cond_broadcast(&wq->queue_full);
+
+		wi = wq->next_item;
+		wq->next_item = wi->next;
+		wq->item_count--;
+
+		if (wq->max_queued && wq->next_item) {
+			/* more work, wake up another worker */
+			pthread_cond_signal(&wq->wakeup);
+		}
+		pthread_mutex_unlock(&wq->lock);
+
+		(wi->function)(wq, wi);
+	}
+	return NULL;
+}
+
+/* Allocate a work queue and threads.  Returns zero or negative error code. */
+int erofs_workqueue_create(struct erofs_workqueue *wq,
+			   unsigned int nr_workers, unsigned int max_queue)
+{
+	unsigned int		i;
+	int			err = 0;
+
+	memset(wq, 0, sizeof(*wq));
+	err = -pthread_cond_init(&wq->wakeup, NULL);
+	if (err)
+		return err;
+	err = -pthread_cond_init(&wq->queue_full, NULL);
+	if (err)
+		goto out_wake;
+	err = -pthread_mutex_init(&wq->lock, NULL);
+	if (err)
+		goto out_cond;
+
+	wq->thread_count = nr_workers;
+	wq->max_queued = max_queue;
+	wq->threads = malloc(nr_workers * sizeof(pthread_t));
+	if (!wq->threads) {
+		err = -errno;
+		goto out_mutex;
+	}
+	wq->terminate = false;
+	wq->terminated = false;
+
+	for (i = 0; i < nr_workers; i++) {
+		err = -pthread_create(&wq->threads[i], NULL, workqueue_thread,
+				wq);
+		if (err)
+			break;
+	}
+
+	/*
+	 * If we encounter errors here, we have to signal and then wait for all
+	 * the threads that may have been started running before we can destroy
+	 * the workqueue.
+	 */
+	if (err)
+		erofs_workqueue_destroy(wq);
+	return err;
+out_mutex:
+	pthread_mutex_destroy(&wq->lock);
+out_cond:
+	pthread_cond_destroy(&wq->queue_full);
+out_wake:
+	pthread_cond_destroy(&wq->wakeup);
+	return err;
+}
+
+/*
+ * Create a work item consisting of a function and some arguments and schedule
+ * the work item to be run via the thread pool.  Returns zero or a negative
+ * error code.
+ */
+int erofs_workqueue_add(struct erofs_workqueue	*wq,
+			struct erofs_work *wi)
+{
+	int	ret;
+
+	assert(!wq->terminated);
+
+	if (wq->thread_count == 0) {
+		(wi->function)(wq, wi);
+		return 0;
+	}
+
+	wi->queue = wq;
+	wi->next = NULL;
+
+	/* Now queue the new work structure to the work queue. */
+	pthread_mutex_lock(&wq->lock);
+restart:
+	if (wq->next_item == NULL) {
+		assert(wq->item_count == 0);
+		ret = -pthread_cond_signal(&wq->wakeup);
+		if (ret) {
+			pthread_mutex_unlock(&wq->lock);
+			return ret;
+		}
+		wq->next_item = wi;
+	} else {
+		/* throttle on a full queue if configured */
+		if (wq->max_queued && wq->item_count == wq->max_queued) {
+			pthread_cond_wait(&wq->queue_full, &wq->lock);
+			/*
+			 * Queue might be empty or even still full by the time
+			 * we get the lock back, so restart the lookup so we do
+			 * the right thing with the current state of the queue.
+			 */
+			goto restart;
+		}
+		wq->last_item->next = wi;
+	}
+	wq->last_item = wi;
+	wq->item_count++;
+	pthread_mutex_unlock(&wq->lock);
+	return 0;
+}
+
+/*
+ * Wait for all pending work items to be processed and tear down the
+ * workqueue thread pool.  Returns zero or a negative error code.
+ */
+int erofs_workqueue_terminate(struct erofs_workqueue *wq)
+{
+	unsigned int		i;
+	int			ret;
+
+	pthread_mutex_lock(&wq->lock);
+	wq->terminate = true;
+	pthread_mutex_unlock(&wq->lock);
+
+	ret = -pthread_cond_broadcast(&wq->wakeup);
+	if (ret)
+		return ret;
+
+	for (i = 0; i < wq->thread_count; i++) {
+		ret = -pthread_join(wq->threads[i], NULL);
+		if (ret)
+			return ret;
+	}
+
+	pthread_mutex_lock(&wq->lock);
+	wq->terminated = true;
+	pthread_mutex_unlock(&wq->lock);
+	return 0;
+}
+
+/* Tear down the workqueue. */
+void erofs_workqueue_destroy(struct erofs_workqueue *wq)
+{
+	assert(wq->terminated);
+
+	free(wq->threads);
+	pthread_mutex_destroy(&wq->lock);
+	pthread_cond_destroy(&wq->wakeup);
+	pthread_cond_destroy(&wq->queue_full);
+	memset(wq, 0, sizeof(*wq));
+}
-- 
2.43.0


             reply	other threads:[~2024-02-04 10:33 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-02-04 10:33 Yifan Zhao [this message]
2024-02-07  3:55 ` [PATCH 1/7] erofs-utils: introduce multi-threading framework Gao Xiang

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240204103331.141298-1-zhaoyifan@sjtu.edu.cn \
    --to=zhaoyifan@sjtu.edu.cn \
    --cc=hsiangkao@linux.alibaba.com \
    --cc=linux-erofs@lists.ozlabs.org \
    --cc=xin_tong@sjtu.edu.cn \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).