All the mail mirrored from lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v2 0/7] erofs-utils: mkfs: introduce multi-threaded compression
@ 2024-02-20  7:55 Yifan Zhao
  2024-02-20  7:55 ` [PATCH v2 1/7] erofs-utils: introduce multi-threading framework Yifan Zhao
  0 siblings, 1 reply; 11+ messages in thread
From: Yifan Zhao @ 2024-02-20  7:55 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

Change log since v1:
- Re-implement workqueue API instead of using xfsprogs' workqueue
- Use per-worker tmpfile for multi-threaded mkfs

Gao Xiang (1):
  erofs-utils: add a helper to get available processors

Yifan Zhao (6):
  erofs-utils: introduce multi-threading framework
  erofs-utils: mkfs: add --worker=# parameter
  erofs-utils: mkfs: optionally print warning in erofs_compressor_init
  erofs-utils: mkfs: introduce inner-file multi-threaded compression
  erofs-utils: mkfs: introduce inter-file multi-threaded compression
  erofs-utils: mkfs: use per-worker tmpfile for multi-threaded mkfs

 configure.ac                |  17 +
 include/erofs/compress.h    |  18 +
 include/erofs/config.h      |   5 +
 include/erofs/internal.h    |   6 +
 include/erofs/list.h        |   8 +
 include/erofs/queue.h       |  22 +
 include/erofs/workqueue.h   |  38 ++
 lib/Makefile.am             |   4 +
 lib/compress.c              | 836 ++++++++++++++++++++++++++++++------
 lib/compressor.c            |   7 +-
 lib/compressor.h            |   5 +-
 lib/compressor_deflate.c    |   4 +-
 lib/compressor_libdeflate.c |   4 +-
 lib/compressor_liblzma.c    |   5 +-
 lib/compressor_lz4.c        |   2 +-
 lib/compressor_lz4hc.c      |   2 +-
 lib/config.c                |  16 +
 lib/inode.c                 | 302 ++++++++++---
 lib/queue.c                 |  64 +++
 lib/workqueue.c             | 132 ++++++
 mkfs/main.c                 |  38 ++
 21 files changed, 1342 insertions(+), 193 deletions(-)
 create mode 100644 include/erofs/queue.h
 create mode 100644 include/erofs/workqueue.h
 create mode 100644 lib/queue.c
 create mode 100644 lib/workqueue.c

Interdiff against v1:
diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
index a11a8fc..857947b 100644
--- a/include/erofs/workqueue.h
+++ b/include/erofs/workqueue.h
@@ -1,48 +1,38 @@
-/* SPDX-License-Identifier: GPL-2.0+ */
-/*
- * Copyright (C) 2017 Oracle.  All Rights Reserved.
- * Author: Darrick J. Wong <darrick.wong@oracle.com>
- */
+/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */
 #ifndef __EROFS_WORKQUEUE_H
 #define __EROFS_WORKQUEUE_H
 
 #include "internal.h"
 
-struct erofs_workqueue;
 struct erofs_work;
 
-typedef void erofs_workqueue_func_t(struct erofs_workqueue *wq,
-				    struct erofs_work *work);
+typedef void erofs_wq_func_t(struct erofs_work *);
 typedef void erofs_wq_priv_fini_t(void *);
 
 struct erofs_work {
-	struct erofs_workqueue	*queue;
+	void (*func)(struct erofs_work *work);
 	struct erofs_work *next;
-	erofs_workqueue_func_t	*function;
-	void 			*private;
+	void *priv;
 };
 
 struct erofs_workqueue {
-	pthread_t		*threads;
-	struct erofs_work	*next_item;
-	struct erofs_work	*last_item;
+	struct erofs_work *head;
+	struct erofs_work *tail;
 	pthread_mutex_t lock;
-	pthread_cond_t		wakeup;
-	unsigned int		item_count;
-	unsigned int		thread_count;
-	bool			terminate;
-	bool			terminated;
-	int			max_queued;
-	pthread_cond_t		queue_full;
-	size_t			private_size;
-	erofs_wq_priv_fini_t	*private_fini;
+	pthread_cond_t cond_empty;
+	pthread_cond_t cond_full;
+	pthread_t *workers;
+	unsigned int nworker;
+	unsigned int max_jobs;
+	unsigned int job_count;
+	bool shutdown;
+	size_t priv_size;
+	erofs_wq_priv_fini_t *priv_fini;
 };
 
-int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size,
-			   erofs_wq_priv_fini_t *private_fini,
-			   unsigned int nr_workers, unsigned int max_queue);
-int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *wi);
-int erofs_workqueue_terminate(struct erofs_workqueue *wq);
-void erofs_workqueue_destroy(struct erofs_workqueue *wq);
-
+int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker,
+			 unsigned int max_jobs, size_t priv_size,
+			 erofs_wq_priv_fini_t *priv_fini);
+int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work);
+int erofs_workqueue_shutdown(struct erofs_workqueue *wq);
 #endif
\ No newline at end of file
diff --git a/lib/compress.c b/lib/compress.c
index d6c59b0..3fae260 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -8,6 +8,9 @@
 #ifndef _LARGEFILE64_SOURCE
 #define _LARGEFILE64_SOURCE
 #endif
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
 #include <string.h>
 #include <stdlib.h>
 #include <unistd.h>
@@ -23,6 +26,13 @@
 #ifdef EROFS_MT_ENABLED
 #include "erofs/workqueue.h"
 #endif
+#ifdef HAVE_LINUX_FALLOC_H
+#include <linux/falloc.h>
+#endif
+
+#if defined(HAVE_FALLOCATE) && defined(FALLOC_FL_PUNCH_HOLE)
+#define USE_PER_WORKER_TMPFILE 1
+#endif
 
 /* compressing configuration specified by users */
 struct erofs_compress_cfg {
@@ -59,6 +69,7 @@ struct z_erofs_vle_compress_ctx {
 
 	int seg_num, seg_idx;
 	FILE *tmpfile;
+	off_t tmpfile_off;
 };
 
 struct z_erofs_write_index_ctx {
@@ -75,6 +86,7 @@ struct erofs_compress_wq_private {
 	u8 *queue;
 	char *destbuf;
 	struct erofs_compress_cfg *ccfg;
+	FILE* tmpfile;
 };
 
 struct erofs_compress_work {
@@ -402,6 +414,7 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
 		ret = fwrite(dst, erofs_blksiz(sbi), 1, ctx->tmpfile);
 		if (ret != 1)
 			return -EIO;
+		fflush(ctx->tmpfile);
 	} else {
 		erofs_dbg("Writing %u uncompressed data to block %u", count,
 			  ctx->blkaddr);
@@ -1073,6 +1086,7 @@ void z_erofs_init_ctx(struct z_erofs_vle_compress_ctx *ctx,
 	ctx->tof_chksum = tof_chksum;
 	ctx->fd = fd;
 	ctx->tmpfile = NULL;
+	ctx->tmpfile_off = 0;
 	init_list_head(&ctx->extents);
 }
 
@@ -1169,7 +1183,7 @@ int z_erofs_mt_private_init(struct erofs_sb_info *sbi,
 	struct erofs_compress_cfg *lc;
 	int ret;
 
-	if (!priv->init) {
+	if (unlikely(!priv->init)) {
 		priv->init = true;
 
 		priv->queue = malloc(EROFS_COMPR_QUEUE_SZ);
@@ -1185,6 +1199,16 @@ int z_erofs_mt_private_init(struct erofs_sb_info *sbi,
 				    sizeof(struct erofs_compress_cfg));
 		if (!priv->ccfg)
 			return -ENOMEM;
+
+#ifdef USE_PER_WORKER_TMPFILE
+#ifndef HAVE_TMPFILE64
+		priv->tmpfile = tmpfile();
+#else
+		priv->tmpfile = tmpfile64();
+#endif
+		if (!priv->tmpfile)
+			return -errno;
+#endif
 	}
 
 	lc = &priv->ccfg[alg_id];
@@ -1214,15 +1238,18 @@ void z_erofs_mt_private_fini(void *private)
 		free(priv->ccfg);
 		free(priv->destbuf);
 		free(priv->queue);
+#ifdef USE_PER_WORKER_TMPFILE
+		fclose(priv->tmpfile);
+#endif
 		priv->init = false;
 	}
 }
 
-void z_erofs_mt_work(struct erofs_workqueue *wq, struct erofs_work *work)
+void z_erofs_mt_work(struct erofs_work *work)
 {
 	struct erofs_compress_work *cwork = (struct erofs_compress_work *)work;
 	struct z_erofs_vle_compress_ctx *ctx = &cwork->ctx;
-	struct erofs_compress_wq_private *priv = work->private;
+	struct erofs_compress_wq_private *priv = work->priv;
 	struct erofs_compress_file *cfile = cwork->file;
 	erofs_blk_t blkaddr = ctx->blkaddr;
 	u64 offset = ctx->seg_idx * cfg.c_mt_segment_size;
@@ -1237,7 +1264,14 @@ void z_erofs_mt_work(struct erofs_workqueue *wq, struct erofs_work *work)
 	ctx->queue = priv->queue;
 	ctx->destbuf = priv->destbuf;
 	ctx->chandle = &priv->ccfg[cwork->alg_id].handle;
-
+#ifdef USE_PER_WORKER_TMPFILE
+	ctx->tmpfile = priv->tmpfile;
+	ctx->tmpfile_off = ftell(ctx->tmpfile);
+	if (ctx->tmpfile_off == -1) {
+		ret = -errno;
+		goto out;
+	}
+#else
 #ifdef HAVE_TMPFILE64
 	ctx->tmpfile = tmpfile64();
 #else
@@ -1247,13 +1281,13 @@ void z_erofs_mt_work(struct erofs_workqueue *wq, struct erofs_work *work)
 		ret = -errno;
 		goto out;
 	}
+	ctx->tmpfile_off = 0;
+#endif
 
 	ret = z_erofs_compress_file(ctx, offset, blkaddr);
 	if (ret)
 		goto out;
 
-	fflush(ctx->tmpfile);
-
 	if (ctx->seg_idx == ctx->seg_num - 1)
 		ret = z_erofs_handle_fragments(ctx);
 
@@ -1273,6 +1307,7 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr,
 	struct erofs_sb_info *sbi = cur->ctx.inode->sbi;
 	struct z_erofs_write_index_ctx *ictx = cfile->ictx;
 	char *memblock = NULL;
+	size_t size = 0;
 	int ret = 0, lret;
 
 	while (cur != NULL) {
@@ -1289,28 +1324,32 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr,
 			goto out;
 		}
 
-		memblock = realloc(memblock,
-				   ctx->compressed_blocks * erofs_blksiz(sbi));
+		size = ctx->compressed_blocks * erofs_blksiz(sbi);
+		memblock = realloc(memblock, size);
 		if (!memblock) {
 			if (!ret)
 				ret = -ENOMEM;
 			goto out;
 		}
 
-		lret = fseek(ctx->tmpfile, 0, SEEK_SET);
-		if (lret) {
+		lret = pread(fileno(ctx->tmpfile), memblock, size,
+			     ctx->tmpfile_off);
+		if (lret != size) {
 			if (!ret)
-				ret = lret;
+				ret = -errno;
 			goto out;
 		}
 
-		lret = fread(memblock, erofs_blksiz(sbi),
-			     ctx->compressed_blocks, ctx->tmpfile);
-		if (lret != ctx->compressed_blocks) {
+#ifdef USE_PER_WORKER_TMPFILE
+		lret = fallocate(fileno(ctx->tmpfile),
+				 FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
+				 ctx->tmpfile_off, size);
+		if (lret) {
 			if (!ret)
-				ret = -EIO;
+				ret = -errno;
 			goto out;
 		}
+#endif
 
 		lret = blk_write(sbi, memblock, blkaddr + *compressed_blocks,
 				 ctx->compressed_blocks);
@@ -1322,8 +1361,9 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr,
 		*compressed_blocks += ctx->compressed_blocks;
 
 out:
-		if (ctx->tmpfile)
+#ifndef USE_PER_WORKER_TMPFILE
 		fclose(ctx->tmpfile);
+#endif
 
 		tmp = cur->next;
 		pthread_mutex_lock(&work_mutex);
@@ -1405,7 +1445,7 @@ struct erofs_compress_file *z_erofs_mt_do_compress(
 		work->dict_size = ccfg->handle.dict_size;
 
 		work->file = cfile;
-		work->work.function = z_erofs_mt_work;
+		work->work.func = z_erofs_mt_work;
 
 		erofs_workqueue_add(&wq, &work->work);
 	}
@@ -1749,10 +1789,10 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
 	if (cfg.c_mt_worker_num == 1) {
 		mt_enabled = false;
 	} else {
-		ret = erofs_workqueue_create(
-			&wq, sizeof(struct erofs_compress_wq_private),
-			z_erofs_mt_private_fini, cfg.c_mt_worker_num,
-			cfg.c_mt_worker_num << 2);
+		ret = erofs_workqueue_init(
+			&wq, cfg.c_mt_worker_num, cfg.c_mt_worker_num << 2,
+			sizeof(struct erofs_compress_wq_private),
+			z_erofs_mt_private_fini);
 		mt_enabled = !ret;
 	}
 #else
@@ -1777,10 +1817,9 @@ int z_erofs_compress_exit(void)
 
 	if (mt_enabled) {
 #ifdef EROFS_MT_ENABLED
-		ret = erofs_workqueue_terminate(&wq);
+		ret = erofs_workqueue_shutdown(&wq);
 		if (ret)
 			return ret;
-		erofs_workqueue_destroy(&wq);
 		while (work_idle) {
 			struct erofs_compress_work *tmp = work_idle->next;
 			free(work_idle);
diff --git a/lib/workqueue.c b/lib/workqueue.c
index 01d12d9..3ec6142 100644
--- a/lib/workqueue.c
+++ b/lib/workqueue.c
@@ -1,222 +1,132 @@
-// SPDX-License-Identifier: GPL-2.0+
-/*
- * Copyright (C) 2017 Oracle.  All Rights Reserved.
- * Author: Darrick J. Wong <darrick.wong@oracle.com>
- */
+// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
 #include <pthread.h>
-#include <signal.h>
 #include <stdlib.h>
-#include <string.h>
-#include <stdint.h>
-#include <stdbool.h>
-#include <errno.h>
-#include <assert.h>
 #include "erofs/workqueue.h"
 
-/* Main processing thread */
-static void *workqueue_thread(void *arg)
+static void *worker_thread(void *arg)
 {
 	struct erofs_workqueue *wq = arg;
-	struct erofs_work		*wi;
-	void				*private = NULL;
+	struct erofs_work *work;
+	void *priv = NULL;
 
-	if (wq->private_size) {
-		private = calloc(1, wq->private_size);
-		assert(private);
+	if (wq->priv_size) {
+		priv = calloc(wq->priv_size, 1);
+		assert(priv);
 	}
 
-	/*
-	 * Loop pulling work from the passed in work queue.
-	 * Check for notification to exit after every chunk of work.
-	 */
-	while (1) {
+	while (true) {
 		pthread_mutex_lock(&wq->lock);
 
-		/*
-		 * Wait for work.
-		 */
-		while (wq->next_item == NULL && !wq->terminate) {
-			assert(wq->item_count == 0);
-			pthread_cond_wait(&wq->wakeup, &wq->lock);
-		}
-		if (wq->next_item == NULL && wq->terminate) {
+		while (wq->job_count == 0 && !wq->shutdown)
+			pthread_cond_wait(&wq->cond_empty, &wq->lock);
+		if (wq->job_count == 0 && wq->shutdown) {
 			pthread_mutex_unlock(&wq->lock);
 			break;
 		}
 
-		/*
-		 *  Dequeue work from the head of the list. If the queue was
-		 *  full then send a wakeup if we're configured to do so.
-		 */
-		assert(wq->item_count > 0);
-		if (wq->max_queued)
-			pthread_cond_broadcast(&wq->queue_full);
-
-		wi = wq->next_item;
-		wq->next_item = wi->next;
-		wq->item_count--;
-
-		if (wq->max_queued && wq->next_item) {
-			/* more work, wake up another worker */
-			pthread_cond_signal(&wq->wakeup);
-		}
+		work = wq->head;
+		wq->head = work->next;
+		if (!wq->head)
+			wq->tail = NULL;
+		wq->job_count--;
+
+		if (wq->job_count == wq->max_jobs - 1)
+			pthread_cond_broadcast(&wq->cond_full);
+
 		pthread_mutex_unlock(&wq->lock);
 
-		wi->private = private;
-		(wi->function)(wq, wi);
+		work->priv = priv;
+		work->func(work);
 	}
 
-	if (private) {
-		assert(wq->private_fini);
-		(wq->private_fini)(private);
-		free(private);
+	if (priv) {
+		assert(wq->priv_fini);
+		(wq->priv_fini)(priv);
+		free(priv);
 	}
 
 	return NULL;
 }
 
-/* Allocate a work queue and threads.  Returns zero or negative error code. */
-int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size,
-			   erofs_wq_priv_fini_t *priv_fini,
-			   unsigned int nr_workers, unsigned int max_queue)
+int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker,
+			 unsigned int max_jobs, size_t priv_size,
+			 erofs_wq_priv_fini_t *priv_fini)
 {
 	unsigned int i;
-	int			err = 0;
-
-	memset(wq, 0, sizeof(*wq));
-	err = -pthread_cond_init(&wq->wakeup, NULL);
-	if (err)
-		return err;
-	err = -pthread_cond_init(&wq->queue_full, NULL);
-	if (err)
-		goto out_wake;
-	err = -pthread_mutex_init(&wq->lock, NULL);
-	if (err)
-		goto out_cond;
-
-	wq->private_size = private_size;
-	wq->private_fini = priv_fini;
-	wq->thread_count = nr_workers;
-	wq->max_queued = max_queue;
-	wq->threads = malloc(nr_workers * sizeof(pthread_t));
-	if (!wq->threads) {
-		err = -errno;
-		goto out_mutex;
-	}
-	wq->terminate = false;
-	wq->terminated = false;
 
-	for (i = 0; i < nr_workers; i++) {
-		err = -pthread_create(&wq->threads[i], NULL, workqueue_thread,
-				wq);
-		if (err)
-			break;
-	}
+	if (!wq || nworker <= 0 || max_jobs <= 0)
+		return -EINVAL;
 
-	/*
-	 * If we encounter errors here, we have to signal and then wait for all
-	 * the threads that may have been started running before we can destroy
-	 * the workqueue.
-	 */
-	if (err)
-		erofs_workqueue_destroy(wq);
-	return err;
-out_mutex:
-	pthread_mutex_destroy(&wq->lock);
-out_cond:
-	pthread_cond_destroy(&wq->queue_full);
-out_wake:
-	pthread_cond_destroy(&wq->wakeup);
-	return err;
-}
+	wq->head = wq->tail = NULL;
+	wq->nworker = nworker;
+	wq->max_jobs = max_jobs;
+	wq->job_count = 0;
+	wq->shutdown = false;
+	wq->priv_size = priv_size;
+	wq->priv_fini = priv_fini;
+	pthread_mutex_init(&wq->lock, NULL);
+	pthread_cond_init(&wq->cond_empty, NULL);
+	pthread_cond_init(&wq->cond_full, NULL);
 
-/*
- * Create a work item consisting of a function and some arguments and schedule
- * the work item to be run via the thread pool.  Returns zero or a negative
- * error code.
- */
-int erofs_workqueue_add(struct erofs_workqueue	*wq,
-			struct erofs_work *wi)
-{
-	int	ret;
+	wq->workers = malloc(nworker * sizeof(pthread_t));
+	if (!wq->workers)
+		return -ENOMEM;
 
-	assert(!wq->terminated);
+	for (i = 0; i < nworker; i++) {
+		if (pthread_create(&wq->workers[i], NULL, worker_thread, wq)) {
+			while (i--)
+				pthread_cancel(wq->workers[i]);
+			free(wq->workers);
+			return -ENOMEM;
+		}
+	}
 
-	if (wq->thread_count == 0) {
-		(wi->function)(wq, wi);
 	return 0;
 }
 
-	wi->queue = wq;
-	wi->next = NULL;
+int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work)
+{
+	if (!wq || !work)
+		return -EINVAL;
 
-	/* Now queue the new work structure to the work queue. */
 	pthread_mutex_lock(&wq->lock);
-restart:
-	if (wq->next_item == NULL) {
-		assert(wq->item_count == 0);
-		ret = -pthread_cond_signal(&wq->wakeup);
-		if (ret) {
-			pthread_mutex_unlock(&wq->lock);
-			return ret;
-		}
-		wq->next_item = wi;
-	} else {
-		/* throttle on a full queue if configured */
-		if (wq->max_queued && wq->item_count == wq->max_queued) {
-			pthread_cond_wait(&wq->queue_full, &wq->lock);
-			/*
-			 * Queue might be empty or even still full by the time
-			 * we get the lock back, so restart the lookup so we do
-			 * the right thing with the current state of the queue.
-			 */
-			goto restart;
-		}
-		wq->last_item->next = wi;
-	}
-	wq->last_item = wi;
-	wq->item_count++;
+
+	while (wq->job_count == wq->max_jobs)
+		pthread_cond_wait(&wq->cond_full, &wq->lock);
+
+	work->next = NULL;
+	if (!wq->head)
+		wq->head = work;
+	else
+		wq->tail->next = work;
+	wq->tail = work;
+	wq->job_count++;
+
+	pthread_cond_signal(&wq->cond_empty);
 	pthread_mutex_unlock(&wq->lock);
+
 	return 0;
 }
 
-/*
- * Wait for all pending work items to be processed and tear down the
- * workqueue thread pool.  Returns zero or a negative error code.
- */
-int erofs_workqueue_terminate(struct erofs_workqueue *wq)
+int erofs_workqueue_shutdown(struct erofs_workqueue *wq)
 {
 	unsigned int i;
-	int			ret;
-
-	pthread_mutex_lock(&wq->lock);
-	wq->terminate = true;
-	pthread_mutex_unlock(&wq->lock);
 
-	ret = -pthread_cond_broadcast(&wq->wakeup);
-	if (ret)
-		return ret;
-
-	for (i = 0; i < wq->thread_count; i++) {
-		ret = -pthread_join(wq->threads[i], NULL);
-		if (ret)
-			return ret;
-	}
+	if (!wq)
+		return -EINVAL;
 
 	pthread_mutex_lock(&wq->lock);
-	wq->terminated = true;
+	wq->shutdown = true;
+	pthread_cond_broadcast(&wq->cond_empty);
 	pthread_mutex_unlock(&wq->lock);
-	return 0;
-}
 
-/* Tear down the workqueue. */
-void erofs_workqueue_destroy(struct erofs_workqueue *wq)
-{
-	assert(wq->terminated);
+	for (i = 0; i < wq->nworker; i++)
+		pthread_join(wq->workers[i], NULL);
 
-	free(wq->threads);
+	free(wq->workers);
 	pthread_mutex_destroy(&wq->lock);
-	pthread_cond_destroy(&wq->wakeup);
-	pthread_cond_destroy(&wq->queue_full);
-	memset(wq, 0, sizeof(*wq));
+	pthread_cond_destroy(&wq->cond_empty);
+	pthread_cond_destroy(&wq->cond_full);
+
+	return 0;
 }
\ No newline at end of file
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v2 1/7] erofs-utils: introduce multi-threading framework
  2024-02-20  7:55 [PATCH v2 0/7] erofs-utils: mkfs: introduce multi-threaded compression Yifan Zhao
@ 2024-02-20  7:55 ` Yifan Zhao
  2024-02-20  7:55   ` [PATCH v2 2/7] erofs-utils: add a helper to get available processors Yifan Zhao
  2024-02-22  2:37   ` [PATCH v2 1/7] erofs-utils: introduce multi-threading framework Gao Xiang
  0 siblings, 2 replies; 11+ messages in thread
From: Yifan Zhao @ 2024-02-20  7:55 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

Add a workqueue implementation for multi-threading support inspired by
xfsprogs.

Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
Suggested-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 configure.ac              |  16 +++++
 include/erofs/internal.h  |   3 +
 include/erofs/workqueue.h |  38 +++++++++++
 lib/Makefile.am           |   4 ++
 lib/workqueue.c           | 132 ++++++++++++++++++++++++++++++++++++++
 5 files changed, 193 insertions(+)
 create mode 100644 include/erofs/workqueue.h
 create mode 100644 lib/workqueue.c

diff --git a/configure.ac b/configure.ac
index bf6e99f..53306c3 100644
--- a/configure.ac
+++ b/configure.ac
@@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY],
 
 AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which erofs-utils supports])
 
+AC_MSG_CHECKING([whether to enable multi-threading support])
+AC_ARG_ENABLE([multithreading],
+    AS_HELP_STRING([--enable-multithreading],
+                   [enable multi-threading support @<:@default=no@:>@]),
+    [enable_multithreading="$enableval"],
+    [enable_multithreading="no"])
+AC_MSG_RESULT([$enable_multithreading])
+
 AC_ARG_ENABLE([debug],
     [AS_HELP_STRING([--enable-debug],
                     [enable debugging mode @<:@default=no@:>@])],
@@ -288,6 +296,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [
                              [erofs_cv_max_block_size=4096]))
 ], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE])
 
+# Configure multi-threading support
+AS_IF([test "x$enable_multithreading" != "xno"], [
+    AC_CHECK_HEADERS([pthread.h])
+    AC_CHECK_LIB([pthread], [pthread_mutex_lock], [], AC_MSG_ERROR([libpthread is required for multi-threaded build]))
+    AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support])
+], [])
+
 # Configure debug mode
 AS_IF([test "x$enable_debug" != "xno"], [], [
   dnl Turn off all assert checking.
@@ -467,6 +482,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [
 AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"])
 
 # Set up needed symbols, conditionals and compiler/linker flags
+AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}" != "xno"])
 AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"])
 AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"])
 AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"])
diff --git a/include/erofs/internal.h b/include/erofs/internal.h
index 82797e1..954aef4 100644
--- a/include/erofs/internal.h
+++ b/include/erofs/internal.h
@@ -22,6 +22,9 @@ typedef unsigned short umode_t;
 #include <sys/types.h> /* for off_t definition */
 #include <sys/stat.h> /* for S_ISCHR definition */
 #include <stdio.h>
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
 
 #ifndef PATH_MAX
 #define PATH_MAX        4096    /* # chars in a path name including nul */
diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
new file mode 100644
index 0000000..857947b
--- /dev/null
+++ b/include/erofs/workqueue.h
@@ -0,0 +1,38 @@
+/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */
+#ifndef __EROFS_WORKQUEUE_H
+#define __EROFS_WORKQUEUE_H
+
+#include "internal.h"
+
+struct erofs_work;
+
+typedef void erofs_wq_func_t(struct erofs_work *);
+typedef void erofs_wq_priv_fini_t(void *);
+
+struct erofs_work {
+	void (*func)(struct erofs_work *work);
+	struct erofs_work *next;
+	void *priv;
+};
+
+struct erofs_workqueue {
+	struct erofs_work *head;
+	struct erofs_work *tail;
+	pthread_mutex_t lock;
+	pthread_cond_t cond_empty;
+	pthread_cond_t cond_full;
+	pthread_t *workers;
+	unsigned int nworker;
+	unsigned int max_jobs;
+	unsigned int job_count;
+	bool shutdown;
+	size_t priv_size;
+	erofs_wq_priv_fini_t *priv_fini;
+};
+
+int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker,
+			 unsigned int max_jobs, size_t priv_size,
+			 erofs_wq_priv_fini_t *priv_fini);
+int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work);
+int erofs_workqueue_shutdown(struct erofs_workqueue *wq);
+#endif
\ No newline at end of file
diff --git a/lib/Makefile.am b/lib/Makefile.am
index 54b9c9c..7307f7b 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c compressor_deflate.c
 if ENABLE_LIBDEFLATE
 liberofs_la_SOURCES += compressor_libdeflate.c
 endif
+if ENABLE_EROFS_MT
+liberofs_la_CFLAGS += -lpthread
+liberofs_la_SOURCES += workqueue.c
+endif
diff --git a/lib/workqueue.c b/lib/workqueue.c
new file mode 100644
index 0000000..3ec6142
--- /dev/null
+++ b/lib/workqueue.c
@@ -0,0 +1,132 @@
+// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
+#include <pthread.h>
+#include <stdlib.h>
+#include "erofs/workqueue.h"
+
+static void *worker_thread(void *arg)
+{
+	struct erofs_workqueue *wq = arg;
+	struct erofs_work *work;
+	void *priv = NULL;
+
+	if (wq->priv_size) {
+		priv = calloc(wq->priv_size, 1);
+		assert(priv);
+	}
+
+	while (true) {
+		pthread_mutex_lock(&wq->lock);
+
+		while (wq->job_count == 0 && !wq->shutdown)
+			pthread_cond_wait(&wq->cond_empty, &wq->lock);
+		if (wq->job_count == 0 && wq->shutdown) {
+			pthread_mutex_unlock(&wq->lock);
+			break;
+		}
+
+		work = wq->head;
+		wq->head = work->next;
+		if (!wq->head)
+			wq->tail = NULL;
+		wq->job_count--;
+
+		if (wq->job_count == wq->max_jobs - 1)
+			pthread_cond_broadcast(&wq->cond_full);
+
+		pthread_mutex_unlock(&wq->lock);
+
+		work->priv = priv;
+		work->func(work);
+	}
+
+	if (priv) {
+		assert(wq->priv_fini);
+		(wq->priv_fini)(priv);
+		free(priv);
+	}
+
+	return NULL;
+}
+
+int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker,
+			 unsigned int max_jobs, size_t priv_size,
+			 erofs_wq_priv_fini_t *priv_fini)
+{
+	unsigned int i;
+
+	if (!wq || nworker <= 0 || max_jobs <= 0)
+		return -EINVAL;
+
+	wq->head = wq->tail = NULL;
+	wq->nworker = nworker;
+	wq->max_jobs = max_jobs;
+	wq->job_count = 0;
+	wq->shutdown = false;
+	wq->priv_size = priv_size;
+	wq->priv_fini = priv_fini;
+	pthread_mutex_init(&wq->lock, NULL);
+	pthread_cond_init(&wq->cond_empty, NULL);
+	pthread_cond_init(&wq->cond_full, NULL);
+
+	wq->workers = malloc(nworker * sizeof(pthread_t));
+	if (!wq->workers)
+		return -ENOMEM;
+
+	for (i = 0; i < nworker; i++) {
+		if (pthread_create(&wq->workers[i], NULL, worker_thread, wq)) {
+			while (i--)
+				pthread_cancel(wq->workers[i]);
+			free(wq->workers);
+			return -ENOMEM;
+		}
+	}
+
+	return 0;
+}
+
+int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work)
+{
+	if (!wq || !work)
+		return -EINVAL;
+
+	pthread_mutex_lock(&wq->lock);
+
+	while (wq->job_count == wq->max_jobs)
+		pthread_cond_wait(&wq->cond_full, &wq->lock);
+
+	work->next = NULL;
+	if (!wq->head)
+		wq->head = work;
+	else
+		wq->tail->next = work;
+	wq->tail = work;
+	wq->job_count++;
+
+	pthread_cond_signal(&wq->cond_empty);
+	pthread_mutex_unlock(&wq->lock);
+
+	return 0;
+}
+
+int erofs_workqueue_shutdown(struct erofs_workqueue *wq)
+{
+	unsigned int i;
+
+	if (!wq)
+		return -EINVAL;
+
+	pthread_mutex_lock(&wq->lock);
+	wq->shutdown = true;
+	pthread_cond_broadcast(&wq->cond_empty);
+	pthread_mutex_unlock(&wq->lock);
+
+	for (i = 0; i < wq->nworker; i++)
+		pthread_join(wq->workers[i], NULL);
+
+	free(wq->workers);
+	pthread_mutex_destroy(&wq->lock);
+	pthread_cond_destroy(&wq->cond_empty);
+	pthread_cond_destroy(&wq->cond_full);
+
+	return 0;
+}
\ No newline at end of file
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v2 2/7] erofs-utils: add a helper to get available processors
  2024-02-20  7:55 ` [PATCH v2 1/7] erofs-utils: introduce multi-threading framework Yifan Zhao
@ 2024-02-20  7:55   ` Yifan Zhao
  2024-02-20  7:55     ` [PATCH v2 3/7] erofs-utils: mkfs: add --worker=# parameter Yifan Zhao
  2024-02-22  2:37   ` [PATCH v2 1/7] erofs-utils: introduce multi-threading framework Gao Xiang
  1 sibling, 1 reply; 11+ messages in thread
From: Yifan Zhao @ 2024-02-20  7:55 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

From: Gao Xiang <hsiangkao@linux.alibaba.com>

In order to prepare for multi-threaded decompression.

Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 configure.ac           |  1 +
 include/erofs/config.h |  1 +
 lib/config.c           | 12 ++++++++++++
 3 files changed, 14 insertions(+)

diff --git a/configure.ac b/configure.ac
index 53306c3..52b4010 100644
--- a/configure.ac
+++ b/configure.ac
@@ -264,6 +264,7 @@ AC_CHECK_FUNCS(m4_flatten([
 	strerror
 	strrchr
 	strtoull
+	sysconf
 	tmpfile64
 	utimensat]))
 
diff --git a/include/erofs/config.h b/include/erofs/config.h
index eecf575..73e3ac2 100644
--- a/include/erofs/config.h
+++ b/include/erofs/config.h
@@ -109,6 +109,7 @@ static inline int erofs_selabel_open(const char *file_contexts)
 
 void erofs_update_progressinfo(const char *fmt, ...);
 char *erofs_trim_for_progressinfo(const char *str, int placeholder);
+unsigned int erofs_get_available_processors(void);
 
 #ifdef __cplusplus
 }
diff --git a/lib/config.c b/lib/config.c
index 1096cd1..947a183 100644
--- a/lib/config.c
+++ b/lib/config.c
@@ -14,6 +14,9 @@
 #ifdef HAVE_SYS_IOCTL_H
 #include <sys/ioctl.h>
 #endif
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
 
 struct erofs_configure cfg;
 struct erofs_sb_info sbi;
@@ -177,3 +180,12 @@ void erofs_update_progressinfo(const char *fmt, ...)
 	fputs(msg, stdout);
 	fputc('\n', stdout);
 }
+
+unsigned int erofs_get_available_processors(void)
+{
+#if defined(HAVE_UNISTD_H) && defined(HAVE_SYSCONF)
+	return sysconf(_SC_NPROCESSORS_ONLN);
+#else
+	return 0;
+#endif
+}
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v2 3/7] erofs-utils: mkfs: add --worker=# parameter
  2024-02-20  7:55   ` [PATCH v2 2/7] erofs-utils: add a helper to get available processors Yifan Zhao
@ 2024-02-20  7:55     ` Yifan Zhao
  2024-02-20  7:55       ` [PATCH v2 4/7] erofs-utils: mkfs: optionally print warning in erofs_compressor_init Yifan Zhao
  2024-02-22 16:40       ` [PATCH v2 3/7] erofs-utils: mkfs: add --worker=# parameter Gao Xiang
  0 siblings, 2 replies; 11+ messages in thread
From: Yifan Zhao @ 2024-02-20  7:55 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

This patch introduces a --worker=# parameter for the incoming
multi-threaded compression support. It also introduces a segment size
used in multi-threaded compression, which has the default value 16MB
and cannot be modified.

Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
---
 include/erofs/config.h |  4 ++++
 lib/config.c           |  4 ++++
 mkfs/main.c            | 38 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 46 insertions(+)

diff --git a/include/erofs/config.h b/include/erofs/config.h
index 73e3ac2..d19094e 100644
--- a/include/erofs/config.h
+++ b/include/erofs/config.h
@@ -75,6 +75,10 @@ struct erofs_configure {
 	char c_force_chunkformat;
 	/* < 0, xattr disabled and INT_MAX, always use inline xattrs */
 	int c_inline_xattr_tolerance;
+#ifdef EROFS_MT_ENABLED
+	u64 c_mt_segment_size;
+	u32 c_mt_worker_num;
+#endif
 
 	u32 c_pclusterblks_max, c_pclusterblks_def, c_pclusterblks_packed;
 	u32 c_max_decompressed_extent_bytes;
diff --git a/lib/config.c b/lib/config.c
index 947a183..8add06d 100644
--- a/lib/config.c
+++ b/lib/config.c
@@ -38,6 +38,10 @@ void erofs_init_configure(void)
 	cfg.c_pclusterblks_max = 1;
 	cfg.c_pclusterblks_def = 1;
 	cfg.c_max_decompressed_extent_bytes = -1;
+#ifdef EROFS_MT_ENABLED
+	cfg.c_mt_segment_size = 16ULL * 1024 * 1024;
+	cfg.c_mt_worker_num = 1;
+#endif
 
 	erofs_stdout_tty = isatty(STDOUT_FILENO);
 }
diff --git a/mkfs/main.c b/mkfs/main.c
index 7aea64a..3882533 100644
--- a/mkfs/main.c
+++ b/mkfs/main.c
@@ -73,6 +73,9 @@ static struct option long_options[] = {
 	{"gzip", no_argument, NULL, 517},
 #endif
 	{"offset", required_argument, NULL, 518},
+#ifdef EROFS_MT_ENABLED
+	{"worker", required_argument, NULL, 519},
+#endif
 	{0, 0, 0, 0},
 };
 
@@ -175,6 +178,9 @@ static void usage(int argc, char **argv)
 		" --product-out=X       X=product_out directory\n"
 		" --fs-config-file=X    X=fs_config file\n"
 		" --block-list-file=X   X=block_list file\n"
+#endif
+#ifdef EROFS_MT_ENABLED
+		" --worker=#            set the number of worker threads to # (default=1)\n"
 #endif
 		);
 }
@@ -404,6 +410,13 @@ static void erofs_rebuild_cleanup(void)
 	rebuild_src_count = 0;
 }
 
+#ifdef EROFS_MT_ENABLED
+static u32 mkfs_max_worker_num() {
+	u32 ncpu = erofs_get_available_processors();
+	return ncpu ? ncpu : 16;
+}
+#endif
+
 static int mkfs_parse_options_cfg(int argc, char *argv[])
 {
 	char *endptr;
@@ -642,6 +655,21 @@ static int mkfs_parse_options_cfg(int argc, char *argv[])
 				return -EINVAL;
 			}
 			break;
+#ifdef EROFS_MT_ENABLED
+		case 519:
+			cfg.c_mt_worker_num = strtoul(optarg, &endptr, 0);
+			if (errno || *endptr != '\0') {
+				erofs_err("invalid worker number %s", optarg);
+				return -EINVAL;
+			}
+			if (cfg.c_mt_worker_num > mkfs_max_worker_num()) {
+				erofs_warn(
+					"worker number %s is too large, setting to %ud",
+					optarg, mkfs_max_worker_num());
+				cfg.c_mt_worker_num = mkfs_max_worker_num();
+			}
+			break;
+#endif
 		case 'V':
 			version();
 			exit(0);
@@ -784,6 +812,16 @@ static int mkfs_parse_options_cfg(int argc, char *argv[])
 		}
 		cfg.c_pclusterblks_packed = pclustersize_packed >> sbi.blkszbits;
 	}
+
+#ifdef EROFS_MT_ENABLED
+	if (cfg.c_mt_worker_num > 1 &&
+	    (cfg.c_dedupe || cfg.c_fragments || cfg.c_ztailpacking)) {
+		cfg.c_mt_worker_num = 1;
+		erofs_warn("Please note that dedupe/fragments/ztailpacking"
+			   "is NOT supported in multi-threaded mode now, using worker=1.");
+	}
+#endif
+
 	return 0;
 }
 
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v2 4/7] erofs-utils: mkfs: optionally print warning in erofs_compressor_init
  2024-02-20  7:55     ` [PATCH v2 3/7] erofs-utils: mkfs: add --worker=# parameter Yifan Zhao
@ 2024-02-20  7:55       ` Yifan Zhao
  2024-02-20  7:55         ` [PATCH v2 5/7] erofs-utils: mkfs: introduce inner-file multi-threaded compression Yifan Zhao
  2024-02-22  2:54         ` [PATCH v2 4/7] erofs-utils: mkfs: optionally print warning in erofs_compressor_init Gao Xiang
  2024-02-22 16:40       ` [PATCH v2 3/7] erofs-utils: mkfs: add --worker=# parameter Gao Xiang
  1 sibling, 2 replies; 11+ messages in thread
From: Yifan Zhao @ 2024-02-20  7:55 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

In the incoming multi-threaded compression support, compressor may be
initialized more than once in different worker threads, resulting in
noisy warning output. This patch make sure that each warning message is
printed only once by adding a print_warning option to the
erofs_compressor_init() interface.

Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
---
 lib/compress.c              | 3 ++-
 lib/compressor.c            | 5 +++--
 lib/compressor.h            | 5 +++--
 lib/compressor_deflate.c    | 4 +++-
 lib/compressor_libdeflate.c | 4 +++-
 lib/compressor_liblzma.c    | 5 ++++-
 lib/compressor_lz4.c        | 2 +-
 lib/compressor_lz4hc.c      | 2 +-
 8 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/lib/compress.c b/lib/compress.c
index 9611102..41cb6e5 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -1213,7 +1213,8 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
 
 		ret = erofs_compressor_init(sbi, c, cfg.c_compr_opts[i].alg,
 					    cfg.c_compr_opts[i].level,
-					    cfg.c_compr_opts[i].dict_size);
+					    cfg.c_compr_opts[i].dict_size,
+					    true);
 		if (ret)
 			return ret;
 
diff --git a/lib/compressor.c b/lib/compressor.c
index 4720e72..9b3794b 100644
--- a/lib/compressor.c
+++ b/lib/compressor.c
@@ -78,7 +78,8 @@ int erofs_compress_destsize(const struct erofs_compress *c,
 }
 
 int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c,
-			  char *alg_name, int compression_level, u32 dict_size)
+			  char *alg_name, int compression_level, u32 dict_size,
+			  bool print_warning)
 {
 	int ret, i;
 
@@ -126,7 +127,7 @@ int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c,
 			return -EINVAL;
 		}
 
-		ret = erofs_algs[i].c->init(c);
+		ret = erofs_algs[i].c->init(c, print_warning);
 		if (ret)
 			return ret;
 
diff --git a/lib/compressor.h b/lib/compressor.h
index d8ccf2e..522fde0 100644
--- a/lib/compressor.h
+++ b/lib/compressor.h
@@ -17,7 +17,7 @@ struct erofs_compressor {
 	u32 default_dictsize;
 	u32 max_dictsize;
 
-	int (*init)(struct erofs_compress *c);
+	int (*init)(struct erofs_compress *c, bool print_warning);
 	int (*exit)(struct erofs_compress *c);
 	int (*setlevel)(struct erofs_compress *c, int compression_level);
 	int (*setdictsize)(struct erofs_compress *c, u32 dict_size);
@@ -60,7 +60,8 @@ int erofs_compress_destsize(const struct erofs_compress *c,
 			    void *dst, unsigned int dstsize);
 
 int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c,
-			  char *alg_name, int compression_level, u32 dict_size);
+			  char *alg_name, int compression_level, u32 dict_size,
+			  bool print_warning);
 int erofs_compressor_exit(struct erofs_compress *c);
 
 #endif
diff --git a/lib/compressor_deflate.c b/lib/compressor_deflate.c
index 8629415..9fe067f 100644
--- a/lib/compressor_deflate.c
+++ b/lib/compressor_deflate.c
@@ -34,7 +34,7 @@ static int compressor_deflate_exit(struct erofs_compress *c)
 	return 0;
 }
 
-static int compressor_deflate_init(struct erofs_compress *c)
+static int compressor_deflate_init(struct erofs_compress *c, bool print_warning)
 {
 	if (c->private_data) {
 		kite_deflate_end(c->private_data);
@@ -44,9 +44,11 @@ static int compressor_deflate_init(struct erofs_compress *c)
 	if (IS_ERR_VALUE(c->private_data))
 		return PTR_ERR(c->private_data);
 
+	if (print_warning) {
 		erofs_warn("EXPERIMENTAL DEFLATE algorithm in use. Use at your own risk!");
 		erofs_warn("*Carefully* check filesystem data correctness to avoid corruption!");
 		erofs_warn("Please send a report to <linux-erofs@lists.ozlabs.org> if something is wrong.");
+	}
 	return 0;
 }
 
diff --git a/lib/compressor_libdeflate.c b/lib/compressor_libdeflate.c
index 62d93f7..0583868 100644
--- a/lib/compressor_libdeflate.c
+++ b/lib/compressor_libdeflate.c
@@ -80,13 +80,15 @@ static int compressor_libdeflate_exit(struct erofs_compress *c)
 	return 0;
 }
 
-static int compressor_libdeflate_init(struct erofs_compress *c)
+static int compressor_libdeflate_init(struct erofs_compress *c,
+				      bool print_warning)
 {
 	libdeflate_free_compressor(c->private_data);
 	c->private_data = libdeflate_alloc_compressor(c->compression_level);
 	if (!c->private_data)
 		return -ENOMEM;
 
+	if (print_warning)
 		erofs_warn("EXPERIMENTAL libdeflate compressor in use. Use at your own risk!");
 	return 0;
 }
diff --git a/lib/compressor_liblzma.c b/lib/compressor_liblzma.c
index 7183b0b..b048e57 100644
--- a/lib/compressor_liblzma.c
+++ b/lib/compressor_liblzma.c
@@ -81,7 +81,8 @@ static int erofs_compressor_liblzma_setdictsize(struct erofs_compress *c,
 	return 0;
 }
 
-static int erofs_compressor_liblzma_init(struct erofs_compress *c)
+static int erofs_compressor_liblzma_init(struct erofs_compress *c,
+					 bool print_warning)
 {
 	struct erofs_liblzma_context *ctx;
 	u32 preset;
@@ -103,8 +104,10 @@ static int erofs_compressor_liblzma_init(struct erofs_compress *c)
 	ctx->opt.dict_size = c->dict_size;
 
 	c->private_data = ctx;
+	if (print_warning) {
 		erofs_warn("EXPERIMENTAL MicroLZMA feature in use. Use at your own risk!");
 		erofs_warn("Note that it may take more time since the compressor is still single-threaded for now.");
+	}
 	return 0;
 }
 
diff --git a/lib/compressor_lz4.c b/lib/compressor_lz4.c
index f4e72c3..6aed213 100644
--- a/lib/compressor_lz4.c
+++ b/lib/compressor_lz4.c
@@ -30,7 +30,7 @@ static int compressor_lz4_exit(struct erofs_compress *c)
 	return 0;
 }
 
-static int compressor_lz4_init(struct erofs_compress *c)
+static int compressor_lz4_init(struct erofs_compress *c, bool print_warning)
 {
 	c->sbi->lz4_max_distance = LZ4_DISTANCE_MAX;
 	return 0;
diff --git a/lib/compressor_lz4hc.c b/lib/compressor_lz4hc.c
index 6fc8847..3d10aa8 100644
--- a/lib/compressor_lz4hc.c
+++ b/lib/compressor_lz4hc.c
@@ -37,7 +37,7 @@ static int compressor_lz4hc_exit(struct erofs_compress *c)
 	return 0;
 }
 
-static int compressor_lz4hc_init(struct erofs_compress *c)
+static int compressor_lz4hc_init(struct erofs_compress *c, bool print_warning)
 {
 	c->private_data = LZ4_createStreamHC();
 	if (!c->private_data)
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v2 5/7] erofs-utils: mkfs: introduce inner-file multi-threaded compression
  2024-02-20  7:55       ` [PATCH v2 4/7] erofs-utils: mkfs: optionally print warning in erofs_compressor_init Yifan Zhao
@ 2024-02-20  7:55         ` Yifan Zhao
  2024-02-20  7:55           ` [PATCH v2 6/7] erofs-utils: mkfs: introduce inter-file " Yifan Zhao
  2024-02-22  2:54         ` [PATCH v2 4/7] erofs-utils: mkfs: optionally print warning in erofs_compressor_init Gao Xiang
  1 sibling, 1 reply; 11+ messages in thread
From: Yifan Zhao @ 2024-02-20  7:55 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

Currently, the creation of EROFS compressed image creation is
single-threaded, which suffers from performance issues. This patch
attempts to address it by compressing the large file in parallel.

Specifically, each input file larger than 16MB is splited into segments,
and each worker thread compresses a segment as if it were a separate
file. Finally, the main thread merges all the compressed segments.

Multi-threaded compression is not compatible with -Ededupe,
-E(all-)fragments and -Eztailpacking for now.

Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
Co-authored-by: Tong Xin <xin_tong@sjtu.edu.cn>
---
 include/erofs/compress.h |   1 +
 lib/compress.c           | 585 +++++++++++++++++++++++++++++++++------
 lib/compressor.c         |   2 +
 3 files changed, 505 insertions(+), 83 deletions(-)

diff --git a/include/erofs/compress.h b/include/erofs/compress.h
index 046640b..2699334 100644
--- a/include/erofs/compress.h
+++ b/include/erofs/compress.h
@@ -15,6 +15,7 @@ extern "C"
 #include "internal.h"
 
 #define EROFS_CONFIG_COMPR_MAX_SZ           (4000 * 1024)
+#define EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2)
 
 void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
 int erofs_write_compressed_file(struct erofs_inode *inode, int fd);
diff --git a/lib/compress.c b/lib/compress.c
index 41cb6e5..41de8b9 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -20,6 +20,9 @@
 #include "erofs/block_list.h"
 #include "erofs/compress_hints.h"
 #include "erofs/fragments.h"
+#ifdef EROFS_MT_ENABLED
+#include "erofs/workqueue.h"
+#endif
 
 /* compressing configuration specified by users */
 struct erofs_compress_cfg {
@@ -34,28 +37,74 @@ struct z_erofs_extent_item {
 };
 
 struct z_erofs_vle_compress_ctx {
-	u8 queue[EROFS_CONFIG_COMPR_MAX_SZ * 2];
+	u8 *queue;
 	struct list_head extents;
 	struct z_erofs_extent_item *pivot;
 
 	struct erofs_inode *inode;
-	struct erofs_compress_cfg *ccfg;
+	struct erofs_compress *chandle;
+	char *destbuf;
 
-	u8 *metacur;
+	int fd;
 	unsigned int head, tail;
 	erofs_off_t remaining;
 	unsigned int pclustersize;
 	erofs_blk_t blkaddr;		/* pointing to the next blkaddr */
+	erofs_blk_t compressed_blocks;
 	u16 clusterofs;
 
 	u32 tof_chksum;
 	bool fix_dedupedfrag;
 	bool fragemitted;
+
+	int seg_num, seg_idx;
+	FILE *tmpfile;
+};
+
+struct z_erofs_write_index_ctx {
+	struct erofs_inode *inode;
+	struct list_head *extents;
+	u16 clusterofs;
+	erofs_blk_t blkaddr, blkoff;
+	u8 *metacur;
+};
+
+#ifdef EROFS_MT_ENABLED
+struct erofs_compress_wq_private {
+	bool init;
+	u8 *queue;
+	char *destbuf;
+	struct erofs_compress_cfg *ccfg;
+};
+
+struct erofs_compress_work {
+	/* Note: struct erofs_work must be the first member */
+	struct erofs_work work;
+	struct z_erofs_vle_compress_ctx ctx;
+
+	unsigned int alg_id;
+	char *alg_name;
+	unsigned int comp_level;
+	unsigned int dict_size;
+
+	int ret;
+
+	struct erofs_compress_work *next;
 };
 
+static struct erofs_workqueue wq;
+static struct erofs_compress_work *idle;
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+static int nfini;
+#endif
+
+static bool mt_enabled;
+static u8 *queue;
+
 #define Z_EROFS_LEGACY_MAP_HEADER_SIZE	Z_EROFS_FULL_INDEX_ALIGN(0)
 
-static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
+static void z_erofs_write_indexes_final(struct z_erofs_write_index_ctx *ctx)
 {
 	const unsigned int type = Z_EROFS_LCLUSTER_TYPE_PLAIN;
 	struct z_erofs_lcluster_index di;
@@ -71,7 +120,7 @@ static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
 	ctx->metacur += sizeof(di);
 }
 
-static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
+static void z_erofs_write_extent(struct z_erofs_write_index_ctx *ctx,
 				 struct z_erofs_inmem_extent *e)
 {
 	struct erofs_inode *inode = ctx->inode;
@@ -99,10 +148,15 @@ static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
 		di.di_advise = cpu_to_le16(advise);
 
 		if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL &&
-		    !e->compressedblks)
+		    !e->compressedblks) {
 			di.di_u.blkaddr = cpu_to_le32(inode->fragmentoff >> 32);
-		else
+		} else if (mt_enabled) {
+			di.di_u.blkaddr =
+				cpu_to_le32(ctx->blkaddr + ctx->blkoff);
+			ctx->blkoff += e->compressedblks;
+		} else {
 			di.di_u.blkaddr = cpu_to_le32(e->blkaddr);
+		}
 		memcpy(ctx->metacur, &di, sizeof(di));
 		ctx->metacur += sizeof(di);
 
@@ -144,10 +198,15 @@ static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
 				Z_EROFS_LCLUSTER_TYPE_HEAD1;
 
 			if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL &&
-			    !e->compressedblks)
+			    !e->compressedblks) {
 				di.di_u.blkaddr = cpu_to_le32(inode->fragmentoff >> 32);
-			else
+			} else if (mt_enabled) {
+				di.di_u.blkaddr =
+					cpu_to_le32(ctx->blkaddr + ctx->blkoff);
+				ctx->blkoff += e->compressedblks;
+			} else {
 				di.di_u.blkaddr = cpu_to_le32(e->blkaddr);
+			}
 
 			if (e->partial) {
 				DBG_BUGON(e->raw);
@@ -170,12 +229,12 @@ static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
 	ctx->clusterofs = clusterofs + count;
 }
 
-static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx)
+static void z_erofs_write_indexes(struct z_erofs_write_index_ctx *ctx)
 {
 	struct z_erofs_extent_item *ei, *n;
 
 	ctx->clusterofs = 0;
-	list_for_each_entry_safe(ei, n, &ctx->extents, list) {
+	list_for_each_entry_safe(ei, n, ctx->extents, list) {
 		z_erofs_write_extent(ctx, &ei->e);
 
 		list_del(&ei->list);
@@ -335,11 +394,18 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
 	memcpy(dst + interlaced_offset, ctx->queue + ctx->head, rightpart);
 	memcpy(dst, ctx->queue + ctx->head + rightpart, count - rightpart);
 
-	erofs_dbg("Writing %u uncompressed data to block %u",
-		  count, ctx->blkaddr);
+	if (ctx->tmpfile) {
+		erofs_dbg("Writing %u uncompressed data to tmpfile", count);
+		ret = fwrite(dst, erofs_blksiz(sbi), 1, ctx->tmpfile);
+		if (ret != 1)
+			return -EIO;
+	} else {
+		erofs_dbg("Writing %u uncompressed data to block %u", count,
+			  ctx->blkaddr);
 		ret = blk_write(sbi, dst, ctx->blkaddr, 1);
 		if (ret)
 			return ret;
+	}
 	return count;
 }
 
@@ -385,7 +451,7 @@ static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx,
 				   void *out, unsigned int *compressedsize)
 {
 	struct erofs_sb_info *sbi = ctx->inode->sbi;
-	static char tmp[Z_EROFS_PCLUSTER_MAX_SIZE];
+	char tmp[Z_EROFS_PCLUSTER_MAX_SIZE];
 	unsigned int count;
 	int ret = *compressedsize;
 
@@ -439,17 +505,22 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
 static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx,
 				  struct z_erofs_inmem_extent *e)
 {
-	static char dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
+	static char
+		global_dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
+	char *dstbuf = ctx->destbuf ? ctx->destbuf : global_dstbuf;
 	struct erofs_inode *inode = ctx->inode;
 	struct erofs_sb_info *sbi = inode->sbi;
 	unsigned int blksz = erofs_blksiz(sbi);
 	char *const dst = dstbuf + blksz;
-	struct erofs_compress *const h = &ctx->ccfg->handle;
+	struct erofs_compress *const h = ctx->chandle;
 	unsigned int len = ctx->tail - ctx->head;
 	bool is_packed_inode = erofs_is_packed_inode(inode);
 	bool final = !ctx->remaining;
-	bool may_packing = (cfg.c_fragments && final && !is_packed_inode);
-	bool may_inline = (cfg.c_ztailpacking && final && !may_packing);
+	bool is_last_seg = (ctx->seg_idx == ctx->seg_num - 1);
+	bool may_packing =
+		(cfg.c_fragments && final && !is_packed_inode && is_last_seg);
+	bool may_inline =
+		(cfg.c_ztailpacking && final && !may_packing && is_last_seg);
 	unsigned int compressedsize;
 	int ret;
 
@@ -545,7 +616,7 @@ frag_packing:
 		 */
 		if (may_packing && len == e->length &&
 		    (compressedsize & (blksz - 1)) &&
-		    ctx->tail < sizeof(ctx->queue)) {
+		    ctx->tail < EROFS_COMPR_QUEUE_SZ) {
 			ctx->pclustersize = roundup(compressedsize, blksz);
 			goto fix_dedupedfrag;
 		}
@@ -569,6 +640,16 @@ frag_packing:
 		}
 
 		/* write compressed data */
+		if (ctx->tmpfile) {
+			erofs_dbg("Writing %u compressed data to tmpfile of %u blocks",
+				  e->length, e->compressedblks);
+
+			ret = fwrite(dst - padding, erofs_blksiz(sbi),
+				     e->compressedblks, ctx->tmpfile);
+			if (ret != e->compressedblks)
+				return -EIO;
+			fflush(ctx->tmpfile);
+		} else {
 			erofs_dbg("Writing %u compressed data to %u of %u blocks",
 				  e->length, ctx->blkaddr, e->compressedblks);
 
@@ -576,6 +657,7 @@ frag_packing:
 					e->compressedblks);
 			if (ret)
 				return ret;
+		}
 		e->raw = false;
 		may_inline = false;
 		may_packing = false;
@@ -912,12 +994,355 @@ void z_erofs_drop_inline_pcluster(struct erofs_inode *inode)
 	inode->eof_tailraw = NULL;
 }
 
+int z_erofs_compress_file(struct z_erofs_vle_compress_ctx *ctx, u64 offset,
+			  erofs_blk_t blkaddr)
+{
+	struct erofs_inode *inode = ctx->inode;
+	int ret = 0;
+
+	while (ctx->remaining) {
+		const u64 rx = min_t(u64, ctx->remaining,
+				     EROFS_COMPR_QUEUE_SZ - ctx->tail);
+
+		ret = pread(ctx->fd, ctx->queue + ctx->tail, rx, offset);
+		if (ret != rx)
+			return -errno;
+		ctx->remaining -= rx;
+		ctx->tail += rx;
+		offset += rx;
+
+		ret = z_erofs_compress_one(ctx);
+		if (ret)
+			return ret;
+	}
+	DBG_BUGON(ctx->head != ctx->tail);
+
+	ctx->compressed_blocks = ctx->blkaddr - blkaddr;
+	DBG_BUGON(ctx->compressed_blocks < !!inode->idata_size);
+	ctx->compressed_blocks -= !!inode->idata_size;
+
+	if (ctx->pivot) {
+		z_erofs_commit_extent(ctx, ctx->pivot);
+		ctx->pivot = NULL;
+	}
+
+	return 0;
+}
+
+int z_erofs_handle_fragments(struct z_erofs_vle_compress_ctx *ctx)
+{
+	struct erofs_inode *inode = ctx->inode;
+
+	/* generate an extent for the deduplicated fragment */
+	if (inode->fragment_size && !ctx->fragemitted) {
+		struct z_erofs_extent_item *ei;
+
+		ei = malloc(sizeof(*ei));
+		if (!ei)
+			return -ENOMEM;
+
+		ei->e = (struct z_erofs_inmem_extent) {
+			.length = inode->fragment_size,
+			.compressedblks = 0,
+			.raw = false,
+			.partial = false,
+			.blkaddr = ctx->blkaddr,
+		};
+		init_list_head(&ei->list);
+		z_erofs_commit_extent(ctx, ei);
+	}
+	z_erofs_fragments_commit(inode);
+
+	return 0;
+}
+
+#ifdef EROFS_MT_ENABLED
+int z_erofs_mt_private_init(struct erofs_sb_info *sbi,
+			    struct erofs_compress_wq_private *priv,
+			    unsigned int alg_id, char *alg_name,
+			    unsigned int comp_level, unsigned int dict_size)
+{
+	struct erofs_compress_cfg *lc;
+	int ret;
+
+	if (!priv->init) {
+		priv->init = true;
+
+		priv->queue = malloc(EROFS_COMPR_QUEUE_SZ);
+		if (!priv->queue)
+			return -ENOMEM;
+
+		priv->destbuf = calloc(1, EROFS_CONFIG_COMPR_MAX_SZ +
+						  EROFS_MAX_BLOCK_SIZE);
+		if (!priv->destbuf)
+			return -ENOMEM;
+
+		priv->ccfg = calloc(EROFS_MAX_COMPR_CFGS,
+				    sizeof(struct erofs_compress_cfg));
+		if (!priv->ccfg)
+			return -ENOMEM;
+	}
+
+	lc = &priv->ccfg[alg_id];
+	if (!lc->enable) {
+		lc->enable = true;
+		lc->algorithmtype = alg_id;
+
+		ret = erofs_compressor_init(sbi, &lc->handle, alg_name,
+					    comp_level, dict_size, false);
+		if (ret)
+			return ret;
+	}
+
+	return 0;
+}
+
+void z_erofs_mt_private_fini(void *private)
+{
+	struct erofs_compress_wq_private *priv = private;
+	int i;
+
+	if (priv->init) {
+		for (i = 0; i < EROFS_MAX_COMPR_CFGS; i++) {
+			if (priv->ccfg[i].enable)
+				erofs_compressor_exit(&priv->ccfg[i].handle);
+		}
+		free(priv->ccfg);
+		free(priv->destbuf);
+		free(priv->queue);
+		priv->init = false;
+	}
+}
+
+void z_erofs_mt_work(struct erofs_work *work)
+{
+	struct erofs_compress_work *cwork = (struct erofs_compress_work *)work;
+	struct z_erofs_vle_compress_ctx *ctx = &cwork->ctx;
+	struct erofs_compress_wq_private *priv = work->priv;
+	erofs_blk_t blkaddr = ctx->blkaddr;
+	u64 offset = ctx->seg_idx * cfg.c_mt_segment_size;
+	int ret = 0;
+
+	ret = z_erofs_mt_private_init(ctx->inode->sbi, priv, cwork->alg_id,
+				      cwork->alg_name, cwork->comp_level,
+				      cwork->dict_size);
+	if (ret)
+		goto out;
+
+	ctx->queue = priv->queue;
+	ctx->destbuf = priv->destbuf;
+	ctx->chandle = &priv->ccfg[cwork->alg_id].handle;
+
+#ifdef HAVE_TMPFILE64
+	ctx->tmpfile = tmpfile64();
+#else
+	ctx->tmpfile = tmpfile();
+#endif
+
+	ret = z_erofs_compress_file(ctx, offset, blkaddr);
+	if (ret)
+		goto out;
+
+	fflush(ctx->tmpfile);
+
+	if (ctx->seg_idx == ctx->seg_num - 1)
+		ret = z_erofs_handle_fragments(ctx);
+
+out:
+	cwork->ret = ret;
+	pthread_mutex_lock(&mutex);
+	++nfini;
+	pthread_cond_signal(&cond);
+	pthread_mutex_unlock(&mutex);
+}
+
+int z_erofs_mt_merge(struct erofs_compress_work *cur, erofs_blk_t blkaddr,
+		     erofs_blk_t *compressed_blocks)
+{
+	struct z_erofs_vle_compress_ctx *ctx, *listhead = NULL;
+	struct erofs_sb_info *sbi = cur->ctx.inode->sbi;
+	struct erofs_compress_work *tmp;
+	char *memblock = NULL;
+	int ret = 0, lret;
+
+	while (cur != NULL) {
+		ctx = &cur->ctx;
+
+		if (!listhead)
+			listhead = ctx;
+		else
+			list_splice_tail(&ctx->extents, &listhead->extents);
+
+		if (cur->ret != 0) {
+			if (!ret)
+				ret = cur->ret;
+			goto out;
+		}
+
+		memblock = realloc(memblock,
+				   ctx->compressed_blocks * erofs_blksiz(sbi));
+		if (!memblock) {
+			if (!ret)
+				ret = -ENOMEM;
+			goto out;
+		}
+
+		lret = fseek(ctx->tmpfile, 0, SEEK_SET);
+		if (lret) {
+			if (!ret)
+				ret = lret;
+			goto out;
+		}
+
+		lret = fread(memblock, erofs_blksiz(sbi),
+			     ctx->compressed_blocks, ctx->tmpfile);
+		if (lret != ctx->compressed_blocks) {
+			if (!ret)
+				ret = -EIO;
+			goto out;
+		}
+
+		lret = blk_write(sbi, memblock, blkaddr + *compressed_blocks,
+				 ctx->compressed_blocks);
+		if (lret) {
+			if (!ret)
+				ret = lret;
+			goto out;
+		}
+		*compressed_blocks += ctx->compressed_blocks;
+
+out:
+		fclose(ctx->tmpfile);
+
+		tmp = cur->next;
+		cur->next = idle;
+		idle = cur;
+		cur = tmp;
+	}
+
+	free(memblock);
+
+	return ret;
+}
+#endif
+
+void z_erofs_init_ctx(struct z_erofs_vle_compress_ctx *ctx,
+		      struct erofs_inode *inode, erofs_blk_t blkaddr,
+		      u32 tof_chksum, int fd)
+{
+	ctx->inode = inode;
+	ctx->pclustersize = z_erofs_get_max_pclustersize(inode);
+	ctx->blkaddr = blkaddr;
+	ctx->head = ctx->tail = 0;
+	ctx->clusterofs = 0;
+	ctx->fix_dedupedfrag = false;
+	ctx->fragemitted = false;
+	ctx->tof_chksum = tof_chksum;
+	ctx->fd = fd;
+	ctx->tmpfile = NULL;
+	init_list_head(&ctx->extents);
+}
+
+int z_erofs_do_compress(struct z_erofs_vle_compress_ctx *ctx,
+			struct z_erofs_write_index_ctx *ictx,
+			struct erofs_compress_cfg *ccfg,
+			erofs_blk_t *compressed_blocks)
+{
+#ifdef EROFS_MT_ENABLED
+	struct erofs_compress_work *work, *head = NULL, **last = &head;
+#endif
+	struct erofs_inode *inode = ctx->inode;
+	erofs_blk_t blkaddr = ctx->blkaddr;
+	int ret = 0;
+
+	if (mt_enabled) {
+#ifdef EROFS_MT_ENABLED
+		if (inode->i_size <= cfg.c_mt_segment_size)
+			goto single_thread;
+
+		int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_mt_segment_size);
+		nfini = 0;
+
+		for (int i = 0; i < nsegs; i++) {
+			if (idle) {
+				work = idle;
+				idle = work->next;
+				work->next = NULL;
+			} else {
+				work = calloc(1, sizeof(*work));
+				if (!work)
+					return -ENOMEM;
+			}
+			*last = work;
+			last = &work->next;
+
+			z_erofs_init_ctx(&work->ctx, inode, blkaddr,
+					 ctx->tof_chksum, ctx->fd);
+			if (i == nsegs - 1)
+				work->ctx.remaining = inode->i_size -
+						      inode->fragment_size -
+						      i * cfg.c_mt_segment_size;
+			else
+			 	work->ctx.remaining = cfg.c_mt_segment_size;
+			work->ctx.seg_num = nsegs;
+			work->ctx.seg_idx = i;
+
+			work->alg_id = ccfg->handle.alg->id;
+			work->alg_name = ccfg->handle.alg->name;
+			work->comp_level = ccfg->handle.compression_level;
+			work->dict_size = ccfg->handle.dict_size;
+
+			work->work.func = z_erofs_mt_work;
+
+			erofs_workqueue_add(&wq, &work->work);
+		}
+
+		pthread_mutex_lock(&mutex);
+		while (nfini != nsegs)
+			pthread_cond_wait(&cond, &mutex);
+		pthread_mutex_unlock(&mutex);
+
+		ictx->extents = &head->ctx.extents;
+
+		ret = z_erofs_mt_merge(head, blkaddr, compressed_blocks);
+		if (ret)
+			return ret;
+#endif
+	} else {
+#ifdef EROFS_MT_ENABLED
+single_thread:
+#endif
+		ctx->queue = queue;
+		ctx->destbuf = NULL;
+		ctx->chandle = &ccfg->handle;
+		ctx->remaining = inode->i_size - inode->fragment_size;
+		ctx->seg_num = 1;
+		ctx->seg_idx = 0;
+
+		ret = z_erofs_compress_file(ctx, 0, blkaddr);
+		if (ret)
+			return ret;
+
+		ret = z_erofs_handle_fragments(ctx);
+		if (ret)
+			return ret;
+
+		*compressed_blocks = ctx->compressed_blocks;
+		ictx->extents = &ctx->extents;
+	}
+
+	return 0;
+}
+
 int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 {
 	struct erofs_buffer_head *bh;
 	static struct z_erofs_vle_compress_ctx ctx;
-	erofs_blk_t blkaddr, compressed_blocks;
+	static struct z_erofs_write_index_ctx ictx;
+	struct erofs_compress_cfg *ccfg;
+	erofs_blk_t blkaddr, compressed_blocks = 0;
 	unsigned int legacymetasize;
+	u32 tof_chksum = 0;
 	int ret;
 	struct erofs_sb_info *sbi = inode->sbi;
 	u8 *compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) *
@@ -963,8 +1388,8 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 		}
 	}
 #endif
-	ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
-	inode->z_algorithmtype[0] = ctx.ccfg[0].algorithmtype;
+	ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
+	inode->z_algorithmtype[0] = ccfg[0].algorithmtype;
 	inode->z_algorithmtype[1] = 0;
 
 	inode->idata_size = 0;
@@ -975,82 +1400,39 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 	 * parts into the packed inode.
 	 */
 	if (cfg.c_fragments && !erofs_is_packed_inode(inode)) {
-		ret = z_erofs_fragments_dedupe(inode, fd, &ctx.tof_chksum);
+		ret = z_erofs_fragments_dedupe(inode, fd, &tof_chksum);
 		if (ret < 0)
 			goto err_bdrop;
 	}
 
 	blkaddr = erofs_mapbh(bh->block);	/* start_blkaddr */
-	ctx.inode = inode;
-	ctx.pclustersize = z_erofs_get_max_pclustersize(inode);
-	ctx.blkaddr = blkaddr;
-	ctx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE;
-	ctx.head = ctx.tail = 0;
-	ctx.clusterofs = 0;
-	ctx.pivot = &dummy_pivot;
-	init_list_head(&ctx.extents);
-	ctx.remaining = inode->i_size - inode->fragment_size;
-	ctx.fix_dedupedfrag = false;
-	ctx.fragemitted = false;
+	z_erofs_init_ctx(&ctx, inode, blkaddr, tof_chksum, fd);
 	if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
 	    !inode->fragment_size) {
-		ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum);
+		ret = z_erofs_pack_file_from_fd(inode, fd, tof_chksum);
 		if (ret)
 			goto err_free_idata;
-	} else {
-		while (ctx.remaining) {
-			const u64 rx = min_t(u64, ctx.remaining,
-					     sizeof(ctx.queue) - ctx.tail);
-
-			ret = read(fd, ctx.queue + ctx.tail, rx);
-			if (ret != rx) {
-				ret = -errno;
-				goto err_bdrop;
-			}
-			ctx.remaining -= rx;
-			ctx.tail += rx;
 
-			ret = z_erofs_compress_one(&ctx);
+		ret = z_erofs_handle_fragments(&ctx);
 		if (ret)
 			goto err_free_idata;
-		}
-	}
-	DBG_BUGON(ctx.head != ctx.tail);
-
-	/* fall back to no compression mode */
-	compressed_blocks = ctx.blkaddr - blkaddr;
-	DBG_BUGON(compressed_blocks < !!inode->idata_size);
-	compressed_blocks -= !!inode->idata_size;
 
-	if (ctx.pivot) {
-		z_erofs_commit_extent(&ctx, ctx.pivot);
-		ctx.pivot = NULL;
-	}
-
-	/* generate an extent for the deduplicated fragment */
-	if (inode->fragment_size && !ctx.fragemitted) {
-		struct z_erofs_extent_item *ei;
-
-		ei = malloc(sizeof(*ei));
-		if (!ei) {
-			ret = -ENOMEM;
+		ictx.extents = &ctx.extents;
+	} else {
+			ret = z_erofs_do_compress(&ctx, &ictx, ccfg,
+						  &compressed_blocks);
+			if (ret)
 				goto err_free_idata;
 	}
 
-		ei->e = (struct z_erofs_inmem_extent) {
-			.length = inode->fragment_size,
-			.compressedblks = 0,
-			.raw = false,
-			.partial = false,
-			.blkaddr = ctx.blkaddr,
-		};
-		init_list_head(&ei->list);
-		z_erofs_commit_extent(&ctx, ei);
-	}
-	z_erofs_fragments_commit(inode);
+	ictx.inode = inode;
+	ictx.blkaddr = blkaddr;
+	ictx.blkoff = 0;
+	ictx.clusterofs = 0;
+	ictx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE;
 
-	z_erofs_write_indexes(&ctx);
-	legacymetasize = ctx.metacur - compressmeta;
+	z_erofs_write_indexes(&ictx);
+	legacymetasize = ictx.metacur - compressmeta;
 	/* estimate if data compression saves space or not */
 	if (!inode->fragment_size &&
 	    compressed_blocks * erofs_blksiz(sbi) + inode->idata_size +
@@ -1258,8 +1640,29 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
 		return -EINVAL;
 	}
 
-	if (erofs_sb_has_compr_cfgs(sbi))
-		return z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size);
+	if (erofs_sb_has_compr_cfgs(sbi)) {
+		ret = z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size);
+		if (ret)
+			return ret;		
+	}
+
+#ifdef EROFS_MT_ENABLED
+	if (cfg.c_mt_worker_num == 1) {
+		mt_enabled = false;
+	} else {
+		ret = erofs_workqueue_init(
+			&wq, cfg.c_mt_worker_num, cfg.c_mt_worker_num << 2,
+			sizeof(struct erofs_compress_wq_private),
+			z_erofs_mt_private_fini);
+		mt_enabled = !ret;
+	}
+#else
+	mt_enabled = false;
+#endif
+	queue = malloc(EROFS_COMPR_QUEUE_SZ);
+	if (!queue)
+		return -ENOMEM;
+
 	return 0;
 }
 
@@ -1272,5 +1675,21 @@ int z_erofs_compress_exit(void)
 		if (ret)
 			return ret;
 	}
+
+	if (mt_enabled) {
+#ifdef EROFS_MT_ENABLED
+		ret = erofs_workqueue_shutdown(&wq);
+		if (ret)
+			return ret;
+		while (idle) {
+			struct erofs_compress_work *tmp = idle->next;
+			free(idle);
+			idle = tmp;
+		}
+#endif
+	}
+
+	free(queue);
+
 	return 0;
 }
diff --git a/lib/compressor.c b/lib/compressor.c
index 9b3794b..d59e00d 100644
--- a/lib/compressor.c
+++ b/lib/compressor.c
@@ -87,6 +87,8 @@ int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c,
 
 	/* should be written in "minimum compression ratio * 100" */
 	c->compress_threshold = 100;
+	c->compression_level = -1;
+	c->dict_size = 0;
 
 	if (!alg_name) {
 		c->alg = NULL;
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v2 6/7] erofs-utils: mkfs: introduce inter-file multi-threaded compression
  2024-02-20  7:55         ` [PATCH v2 5/7] erofs-utils: mkfs: introduce inner-file multi-threaded compression Yifan Zhao
@ 2024-02-20  7:55           ` Yifan Zhao
  2024-02-20  7:55             ` [PATCH v2 7/7] erofs-utils: mkfs: use per-worker tmpfile for multi-threaded mkfs Yifan Zhao
  0 siblings, 1 reply; 11+ messages in thread
From: Yifan Zhao @ 2024-02-20  7:55 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

This patch allows parallelizing the compression process of different
files in mkfs. Specifically, a traverser thread traverses the files and
produces the compression task. Then, the main thread consumes them and
writes the compressed data to the device.

To this end, the logic of erofs_write_compressed_file() has been
modified to split the creation and completion logic of the compression
task.

Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
Co-authored-by: Tong Xin <xin_tong@sjtu.edu.cn>
---
 include/erofs/compress.h |  17 ++
 include/erofs/internal.h |   3 +
 include/erofs/list.h     |   8 +
 include/erofs/queue.h    |  22 ++
 lib/Makefile.am          |   2 +-
 lib/compress.c           | 430 ++++++++++++++++++++++++---------------
 lib/inode.c              | 302 ++++++++++++++++++++++-----
 lib/queue.c              |  64 ++++++
 8 files changed, 636 insertions(+), 212 deletions(-)
 create mode 100644 include/erofs/queue.h
 create mode 100644 lib/queue.c

diff --git a/include/erofs/compress.h b/include/erofs/compress.h
index 2699334..36a3fba 100644
--- a/include/erofs/compress.h
+++ b/include/erofs/compress.h
@@ -17,6 +17,23 @@ extern "C"
 #define EROFS_CONFIG_COMPR_MAX_SZ           (4000 * 1024)
 #define EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2)
 
+#ifdef EROFS_MT_ENABLED
+struct erofs_compress_file {
+	pthread_mutex_t mutex;
+	pthread_cond_t cond;
+	int total;
+	int nfini;
+
+	struct z_erofs_write_index_ctx *ictx;
+	struct erofs_compress_work *head;
+	int fd;
+
+	struct erofs_compress_file *next;
+};
+
+int z_erofs_mt_reap(struct erofs_compress_file *cfile);
+#endif
+
 void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
 int erofs_write_compressed_file(struct erofs_inode *inode, int fd);
 
diff --git a/include/erofs/internal.h b/include/erofs/internal.h
index 954aef4..edfa187 100644
--- a/include/erofs/internal.h
+++ b/include/erofs/internal.h
@@ -250,6 +250,9 @@ struct erofs_inode {
 #ifdef WITH_ANDROID
 	uint64_t capabilities;
 #endif
+#ifdef EROFS_MT_ENABLED
+	struct erofs_compress_file* cfile;
+#endif
 };
 
 static inline erofs_off_t erofs_iloc(struct erofs_inode *inode)
diff --git a/include/erofs/list.h b/include/erofs/list.h
index d7a9fee..55383ac 100644
--- a/include/erofs/list.h
+++ b/include/erofs/list.h
@@ -90,6 +90,14 @@ static inline void list_splice_tail(struct list_head *list,
 		__list_splice(list, head->prev, head);
 }
 
+static inline void list_replace(struct list_head *old, struct list_head *new)
+{
+	new->next = old->next;
+	new->next->prev = new;
+	new->prev = old->prev;
+	new->prev->next = new;
+}
+
 #define list_entry(ptr, type, member) container_of(ptr, type, member)
 
 #define list_first_entry(ptr, type, member)                                    \
diff --git a/include/erofs/queue.h b/include/erofs/queue.h
new file mode 100644
index 0000000..ddc45ff
--- /dev/null
+++ b/include/erofs/queue.h
@@ -0,0 +1,22 @@
+/* SPDX-License-Identifier: GPL-2.0+ */
+#ifndef __EROFS_QUEUE_H
+#define __EROFS_QUEUE_H
+
+#include "internal.h"
+
+struct erofs_queue {
+    pthread_mutex_t lock;
+    pthread_cond_t full, empty;
+
+    void *buf;
+
+    size_t size, elem_size;
+    size_t head, tail;
+};
+
+struct erofs_queue* erofs_queue_create(size_t size, size_t elem_size);
+void erofs_queue_push(struct erofs_queue *q, void *elem);
+void *erofs_queue_pop(struct erofs_queue *q);
+void erofs_queue_destroy(struct erofs_queue *q);
+
+#endif
\ No newline at end of file
diff --git a/lib/Makefile.am b/lib/Makefile.am
index 7307f7b..777330b 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -55,5 +55,5 @@ liberofs_la_SOURCES += compressor_libdeflate.c
 endif
 if ENABLE_EROFS_MT
 liberofs_la_CFLAGS += -lpthread
-liberofs_la_SOURCES += workqueue.c
+liberofs_la_SOURCES += workqueue.c queue.c
 endif
diff --git a/lib/compress.c b/lib/compress.c
index 41de8b9..d5a5f16 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -63,7 +63,7 @@ struct z_erofs_vle_compress_ctx {
 
 struct z_erofs_write_index_ctx {
 	struct erofs_inode *inode;
-	struct list_head *extents;
+	struct list_head extents;
 	u16 clusterofs;
 	erofs_blk_t blkaddr, blkoff;
 	u8 *metacur;
@@ -81,6 +81,7 @@ struct erofs_compress_work {
 	/* Note: struct erofs_work must be the first member */
 	struct erofs_work work;
 	struct z_erofs_vle_compress_ctx ctx;
+	struct erofs_compress_file *file;
 
 	unsigned int alg_id;
 	char *alg_name;
@@ -93,13 +94,15 @@ struct erofs_compress_work {
 };
 
 static struct erofs_workqueue wq;
-static struct erofs_compress_work *idle;
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static int nfini;
+
+static struct erofs_compress_work *work_idle;
+static pthread_mutex_t work_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static struct erofs_compress_file *cfile_idle;
+static pthread_mutex_t cfile_mutex = PTHREAD_MUTEX_INITIALIZER;
 #endif
 
-static bool mt_enabled;
+bool mt_enabled;
 static u8 *queue;
 
 #define Z_EROFS_LEGACY_MAP_HEADER_SIZE	Z_EROFS_FULL_INDEX_ALIGN(0)
@@ -234,7 +237,7 @@ static void z_erofs_write_indexes(struct z_erofs_write_index_ctx *ctx)
 	struct z_erofs_extent_item *ei, *n;
 
 	ctx->clusterofs = 0;
-	list_for_each_entry_safe(ei, n, ctx->extents, list) {
+	list_for_each_entry_safe(ei, n, &ctx->extents, list) {
 		z_erofs_write_extent(ctx, &ei->e);
 
 		list_del(&ei->list);
@@ -1056,6 +1059,107 @@ int z_erofs_handle_fragments(struct z_erofs_vle_compress_ctx *ctx)
 	return 0;
 }
 
+void z_erofs_init_ctx(struct z_erofs_vle_compress_ctx *ctx,
+		      struct erofs_inode *inode, erofs_blk_t blkaddr,
+		      u32 tof_chksum, int fd)
+{
+	ctx->inode = inode;
+	ctx->pclustersize = z_erofs_get_max_pclustersize(inode);
+	ctx->blkaddr = blkaddr;
+	ctx->head = ctx->tail = 0;
+	ctx->clusterofs = 0;
+	ctx->fix_dedupedfrag = false;
+	ctx->fragemitted = false;
+	ctx->tof_chksum = tof_chksum;
+	ctx->fd = fd;
+	ctx->tmpfile = NULL;
+	init_list_head(&ctx->extents);
+}
+
+int z_erofs_finish_compress(struct z_erofs_write_index_ctx *ictx,
+			    struct erofs_buffer_head *bh,
+			    erofs_blk_t compressed_blocks, erofs_blk_t blkaddr,
+			    bool fragemitted)
+{
+	struct erofs_inode *inode = ictx->inode;
+	struct erofs_sb_info *sbi = inode->sbi;
+	u8 *compressmeta = ictx->metacur - Z_EROFS_LEGACY_MAP_HEADER_SIZE;
+	unsigned int legacymetasize;
+	int ret = 0;
+
+	ictx->blkaddr = blkaddr;
+	z_erofs_write_indexes(ictx);
+	legacymetasize = ictx->metacur - compressmeta;
+	/* estimate if data compression saves space or not */
+	if (!inode->fragment_size &&
+	    compressed_blocks * erofs_blksiz(sbi) + inode->idata_size +
+	    legacymetasize >= inode->i_size) {
+		z_erofs_dedupe_commit(true);
+
+		if (inode->idata) {
+			free(inode->idata);
+			inode->idata = NULL;
+		}
+		erofs_bdrop(bh, true); /* revoke buffer */
+		free(ictx);
+		free(compressmeta);
+		inode->compressmeta = NULL;
+
+		return -ENOSPC;
+	}
+	z_erofs_dedupe_commit(false);
+	z_erofs_write_mapheader(inode, compressmeta);
+
+	if (!fragemitted)
+		sbi->saved_by_deduplication += inode->fragment_size;
+
+	/* if the entire file is a fragment, a simplified form is used. */
+	if (inode->i_size <= inode->fragment_size) {
+		DBG_BUGON(inode->i_size < inode->fragment_size);
+		DBG_BUGON(inode->fragmentoff >> 63);
+		*(__le64 *)compressmeta =
+			cpu_to_le64(inode->fragmentoff | 1ULL << 63);
+		inode->datalayout = EROFS_INODE_COMPRESSED_FULL;
+		legacymetasize = Z_EROFS_LEGACY_MAP_HEADER_SIZE;
+	}
+
+	if (compressed_blocks) {
+		ret = erofs_bh_balloon(bh, erofs_pos(sbi, compressed_blocks));
+		DBG_BUGON(ret != erofs_blksiz(sbi));
+	} else {
+		if (!cfg.c_fragments && !cfg.c_dedupe)
+			DBG_BUGON(!inode->idata_size);
+	}
+
+	erofs_info("compressed %s (%llu bytes) into %u blocks",
+		   inode->i_srcpath, (unsigned long long)inode->i_size,
+		   compressed_blocks);
+
+	if (inode->idata_size) {
+		bh->op = &erofs_skip_write_bhops;
+		inode->bh_data = bh;
+	} else {
+		erofs_bdrop(bh, false);
+	}
+
+	inode->u.i_blocks = compressed_blocks;
+
+	if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL) {
+		inode->extent_isize = legacymetasize;
+	} else {
+		ret = z_erofs_convert_to_compacted_format(inode, blkaddr,
+							  legacymetasize,
+							  compressmeta);
+		DBG_BUGON(ret);
+	}
+	inode->compressmeta = compressmeta;
+	if (!erofs_is_packed_inode(inode))
+		erofs_droid_blocklist_write(inode, blkaddr, compressed_blocks);
+
+	free(ictx);
+	return 0;
+}
+
 #ifdef EROFS_MT_ENABLED
 int z_erofs_mt_private_init(struct erofs_sb_info *sbi,
 			    struct erofs_compress_wq_private *priv,
@@ -1119,6 +1223,7 @@ void z_erofs_mt_work(struct erofs_work *work)
 	struct erofs_compress_work *cwork = (struct erofs_compress_work *)work;
 	struct z_erofs_vle_compress_ctx *ctx = &cwork->ctx;
 	struct erofs_compress_wq_private *priv = work->priv;
+	struct erofs_compress_file *cfile = cwork->file;
 	erofs_blk_t blkaddr = ctx->blkaddr;
 	u64 offset = ctx->seg_idx * cfg.c_mt_segment_size;
 	int ret = 0;
@@ -1139,6 +1244,11 @@ void z_erofs_mt_work(struct erofs_work *work)
 	ctx->tmpfile = tmpfile();
 #endif
 
+	if (!ctx->tmpfile) {
+		ret = -errno;
+		goto out;
+	}
+
 	ret = z_erofs_compress_file(ctx, offset, blkaddr);
 	if (ret)
 		goto out;
@@ -1150,28 +1260,29 @@ void z_erofs_mt_work(struct erofs_work *work)
 
 out:
 	cwork->ret = ret;
-	pthread_mutex_lock(&mutex);
-	++nfini;
-	pthread_cond_signal(&cond);
-	pthread_mutex_unlock(&mutex);
+	pthread_mutex_lock(&cfile->mutex);
+	++cfile->nfini;
+	pthread_cond_signal(&cfile->cond);
+	pthread_mutex_unlock(&cfile->mutex);
 }
 
-int z_erofs_mt_merge(struct erofs_compress_work *cur, erofs_blk_t blkaddr,
+int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr,
 		     erofs_blk_t *compressed_blocks)
 {
-	struct z_erofs_vle_compress_ctx *ctx, *listhead = NULL;
+	struct z_erofs_vle_compress_ctx *ctx;
+	struct erofs_compress_work *cur = cfile->head, *tmp;
 	struct erofs_sb_info *sbi = cur->ctx.inode->sbi;
-	struct erofs_compress_work *tmp;
+	struct z_erofs_write_index_ctx *ictx = cfile->ictx;
 	char *memblock = NULL;
 	int ret = 0, lret;
 
 	while (cur != NULL) {
 		ctx = &cur->ctx;
 
-		if (!listhead)
-			listhead = ctx;
+		if (cur == cfile->head)
+			list_replace(&ctx->extents, &ictx->extents);
 		else
-			list_splice_tail(&ctx->extents, &listhead->extents);
+			list_splice_tail(&ctx->extents, &ictx->extents);
 
 		if (cur->ret != 0) {
 			if (!ret)
@@ -1215,8 +1326,10 @@ out:
 		fclose(ctx->tmpfile);
 
 		tmp = cur->next;
-		cur->next = idle;
-		idle = cur;
+		pthread_mutex_lock(&work_mutex);
+		cur->next = work_idle;
+		work_idle = cur;
+		pthread_mutex_unlock(&work_mutex);
 		cur = tmp;
 	}
 
@@ -1224,60 +1337,59 @@ out:
 
 	return ret;
 }
-#endif
 
-void z_erofs_init_ctx(struct z_erofs_vle_compress_ctx *ctx,
-		      struct erofs_inode *inode, erofs_blk_t blkaddr,
-		      u32 tof_chksum, int fd)
+struct erofs_compress_file *z_erofs_mt_do_compress(
+	struct erofs_inode *inode, int fd, u32 tof_chksum, erofs_blk_t blkaddr,
+	struct z_erofs_write_index_ctx *ictx, struct erofs_compress_cfg *ccfg)
 {
-	ctx->inode = inode;
-	ctx->pclustersize = z_erofs_get_max_pclustersize(inode);
-	ctx->blkaddr = blkaddr;
-	ctx->head = ctx->tail = 0;
-	ctx->clusterofs = 0;
-	ctx->fix_dedupedfrag = false;
-	ctx->fragemitted = false;
-	ctx->tof_chksum = tof_chksum;
-	ctx->fd = fd;
-	ctx->tmpfile = NULL;
-	init_list_head(&ctx->extents);
-}
-
-int z_erofs_do_compress(struct z_erofs_vle_compress_ctx *ctx,
-			struct z_erofs_write_index_ctx *ictx,
-			struct erofs_compress_cfg *ccfg,
-			erofs_blk_t *compressed_blocks)
-{
-#ifdef EROFS_MT_ENABLED
 	struct erofs_compress_work *work, *head = NULL, **last = &head;
-#endif
-	struct erofs_inode *inode = ctx->inode;
-	erofs_blk_t blkaddr = ctx->blkaddr;
-	int ret = 0;
-
-	if (mt_enabled) {
-#ifdef EROFS_MT_ENABLED
-		if (inode->i_size <= cfg.c_mt_segment_size)
-			goto single_thread;
+	struct erofs_compress_file *cfile;
 
 	int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_mt_segment_size);
-		nfini = 0;
+
+	pthread_mutex_lock(&cfile_mutex);
+	if (cfile_idle) {
+		cfile = cfile_idle;
+		cfile_idle = cfile->next;
+		cfile->next = NULL;
+		pthread_mutex_unlock(&cfile_mutex);
+	} else {
+		pthread_mutex_unlock(&cfile_mutex);
+		cfile = calloc(1, sizeof(*cfile));
+		if (!cfile)
+			return ERR_PTR(-ENOMEM);
+	}
+
+	inode->cfile = cfile;
+
+	cfile->ictx = ictx;
+	cfile->total = nsegs;
+	cfile->nfini = 0;
+	cfile->fd = fd;
+	pthread_mutex_init(&cfile->mutex, NULL);
+	pthread_cond_init(&cfile->cond, NULL);
 
 	for (int i = 0; i < nsegs; i++) {
-			if (idle) {
-				work = idle;
-				idle = work->next;
+		pthread_mutex_lock(&work_mutex);
+		if (work_idle) {
+			work = work_idle;
+			work_idle = work->next;
 			work->next = NULL;
+			pthread_mutex_unlock(&work_mutex);
 		} else {
+			pthread_mutex_unlock(&work_mutex);
 			work = calloc(1, sizeof(*work));
-				if (!work)
-					return -ENOMEM;
+			if (!work) {
+				free(cfile);
+				return ERR_PTR(-ENOMEM);
+			}
 		}
+		if (i == 0)
+			cfile->head = work;
 		*last = work;
 		last = &work->next;
 
-			z_erofs_init_ctx(&work->ctx, inode, blkaddr,
-					 ctx->tof_chksum, ctx->fd);
+		z_erofs_init_ctx(&work->ctx, inode, blkaddr, tof_chksum, fd);
 		if (i == nsegs - 1)
 			work->ctx.remaining = inode->i_size -
 					      inode->fragment_size -
@@ -1292,71 +1404,81 @@ int z_erofs_do_compress(struct z_erofs_vle_compress_ctx *ctx,
 		work->comp_level = ccfg->handle.compression_level;
 		work->dict_size = ccfg->handle.dict_size;
 
+		work->file = cfile;
 		work->work.func = z_erofs_mt_work;
 
 		erofs_workqueue_add(&wq, &work->work);
 	}
 
-		pthread_mutex_lock(&mutex);
-		while (nfini != nsegs)
-			pthread_cond_wait(&cond, &mutex);
-		pthread_mutex_unlock(&mutex);
+	return cfile;
+}
 
-		ictx->extents = &head->ctx.extents;
+int z_erofs_mt_reap(struct erofs_compress_file *cfile)
+{
+	struct erofs_buffer_head *bh = NULL;
+	erofs_blk_t blkaddr, compressed_blocks = 0;
+	int ret = 0;
 
-		ret = z_erofs_mt_merge(head, blkaddr, compressed_blocks);
-		if (ret)
-			return ret;
-#endif
-	} else {
-#ifdef EROFS_MT_ENABLED
-single_thread:
-#endif
-		ctx->queue = queue;
-		ctx->destbuf = NULL;
-		ctx->chandle = &ccfg->handle;
-		ctx->remaining = inode->i_size - inode->fragment_size;
-		ctx->seg_num = 1;
-		ctx->seg_idx = 0;
-
-		ret = z_erofs_compress_file(ctx, 0, blkaddr);
-		if (ret)
-			return ret;
+	bh = erofs_balloc(DATA, 0, 0, 0);
+	if (IS_ERR(bh)) {
+		ret = PTR_ERR(bh);
+		goto out;
+	}
+	blkaddr = erofs_mapbh(bh->block);
 
-		ret = z_erofs_handle_fragments(ctx);
+	ret = z_erofs_mt_merge(cfile, blkaddr, &compressed_blocks);
 	if (ret)
-			return ret;
+		goto out;
 
-		*compressed_blocks = ctx->compressed_blocks;
-		ictx->extents = &ctx->extents;
-	}
+	// multi-threaded compression doesn't support fragments for now
+	ret = z_erofs_finish_compress(cfile->ictx, bh, compressed_blocks,
+				      blkaddr, false);
 
-	return 0;
+out:
+	pthread_mutex_lock(&cfile_mutex);
+	cfile->next = cfile_idle;
+	cfile_idle = cfile;
+	pthread_mutex_unlock(&cfile_mutex);
+
+	return ret;
 }
+#endif
 
 int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 {
 	struct erofs_buffer_head *bh;
-	static struct z_erofs_vle_compress_ctx ctx;
-	static struct z_erofs_write_index_ctx ictx;
 	struct erofs_compress_cfg *ccfg;
-	erofs_blk_t blkaddr, compressed_blocks = 0;
-	unsigned int legacymetasize;
+	erofs_blk_t blkaddr;
 	u32 tof_chksum = 0;
 	int ret;
 	struct erofs_sb_info *sbi = inode->sbi;
-	u8 *compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) *
+	u8 *compressmeta;
+	struct z_erofs_write_index_ctx *ictx;
+	static struct z_erofs_vle_compress_ctx ctx;
+
+	compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) *
 				      sizeof(struct z_erofs_lcluster_index) +
 			      Z_EROFS_LEGACY_MAP_HEADER_SIZE);
-
 	if (!compressmeta)
 		return -ENOMEM;
 
+	ictx = malloc(sizeof(*ictx));
+	if (!ictx) {
+		ret = -ENOMEM;
+		goto err_free_meta;
+	}
+
+	if (!mt_enabled) {
 		/* allocate main data buffer */
 		bh = erofs_balloc(DATA, 0, 0, 0);
 		if (IS_ERR(bh)) {
 			ret = PTR_ERR(bh);
-		goto err_free_meta;
+			goto err_free_ictx;
+		}
+		blkaddr = erofs_mapbh(bh->block);
+	} else {
+		bh = NULL;
+		blkaddr = 0;
 	}
 
 	/* initialize per-file compression setting */
@@ -1405,10 +1527,19 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 			goto err_bdrop;
 	}
 
-	blkaddr = erofs_mapbh(bh->block);	/* start_blkaddr */
-	z_erofs_init_ctx(&ctx, inode, blkaddr, tof_chksum, fd);
+	ictx->inode = inode;
+	ictx->blkoff = 0;
+	ictx->clusterofs = 0;
+	ictx->metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE;
+	init_list_head(&ictx->extents);
+
 	if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
 	    !inode->fragment_size) {
+		// TODO: support multi-threaded compression for fragments
+		DBG_BUGON(mt_enabled);
+
+		z_erofs_init_ctx(&ctx, inode, blkaddr, tof_chksum, fd);
+
 		ret = z_erofs_pack_file_from_fd(inode, fd, tof_chksum);
 		if (ret)
 			goto err_free_idata;
@@ -1417,78 +1548,43 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 		if (ret)
 			goto err_free_idata;
 
-		ictx.extents = &ctx.extents;
-	} else {
-			ret = z_erofs_do_compress(&ctx, &ictx, ccfg,
-						  &compressed_blocks);
+		list_replace(&ctx.extents, &ictx->extents);
+
+		return z_erofs_finish_compress(ictx, bh, 0, blkaddr, false);
+	} else if (!mt_enabled) {
+		z_erofs_init_ctx(&ctx, inode, blkaddr, tof_chksum, fd);
+		ctx.queue = queue;
+		ctx.destbuf = NULL;
+		ctx.chandle = &ccfg->handle;
+		ctx.remaining = inode->i_size - inode->fragment_size;
+		ctx.seg_num = 1;
+		ctx.seg_idx = 0;
+
+		ret = z_erofs_compress_file(&ctx, 0, blkaddr);
 		if (ret)
 			goto err_free_idata;
-	}
-
-	ictx.inode = inode;
-	ictx.blkaddr = blkaddr;
-	ictx.blkoff = 0;
-	ictx.clusterofs = 0;
-	ictx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE;
 
-	z_erofs_write_indexes(&ictx);
-	legacymetasize = ictx.metacur - compressmeta;
-	/* estimate if data compression saves space or not */
-	if (!inode->fragment_size &&
-	    compressed_blocks * erofs_blksiz(sbi) + inode->idata_size +
-	    legacymetasize >= inode->i_size) {
-		z_erofs_dedupe_commit(true);
-		ret = -ENOSPC;
+		ret = z_erofs_handle_fragments(&ctx);
+		if (ret)
 			goto err_free_idata;
-	}
-	z_erofs_dedupe_commit(false);
-	z_erofs_write_mapheader(inode, compressmeta);
 
-	if (!ctx.fragemitted)
-		sbi->saved_by_deduplication += inode->fragment_size;
+		list_replace(&ctx.extents, &ictx->extents);
 
-	/* if the entire file is a fragment, a simplified form is used. */
-	if (inode->i_size <= inode->fragment_size) {
-		DBG_BUGON(inode->i_size < inode->fragment_size);
-		DBG_BUGON(inode->fragmentoff >> 63);
-		*(__le64 *)compressmeta =
-			cpu_to_le64(inode->fragmentoff | 1ULL << 63);
-		inode->datalayout = EROFS_INODE_COMPRESSED_FULL;
-		legacymetasize = Z_EROFS_LEGACY_MAP_HEADER_SIZE;
-	}
-
-	if (compressed_blocks) {
-		ret = erofs_bh_balloon(bh, erofs_pos(sbi, compressed_blocks));
-		DBG_BUGON(ret != erofs_blksiz(sbi));
+		return z_erofs_finish_compress(ictx, bh, ctx.compressed_blocks,
+					       blkaddr, ctx.fragemitted);
 	} else {
-		if (!cfg.c_fragments && !cfg.c_dedupe)
-			DBG_BUGON(!inode->idata_size);
-	}
-
-	erofs_info("compressed %s (%llu bytes) into %u blocks",
-		   inode->i_srcpath, (unsigned long long)inode->i_size,
-		   compressed_blocks);
+#ifdef EROFS_MT_ENABLED
+		struct erofs_compress_file *cfile;
 
-	if (inode->idata_size) {
-		bh->op = &erofs_skip_write_bhops;
-		inode->bh_data = bh;
-	} else {
-		erofs_bdrop(bh, false);
+		cfile = z_erofs_mt_do_compress(inode, fd, tof_chksum, blkaddr,
+					       ictx, ccfg);
+		if (IS_ERR(cfile)) {
+			ret = PTR_ERR(cfile);
+			goto err_free_idata;
 		}
-
-	inode->u.i_blocks = compressed_blocks;
-
-	if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL) {
-		inode->extent_isize = legacymetasize;
-	} else {
-		ret = z_erofs_convert_to_compacted_format(inode, blkaddr,
-							  legacymetasize,
-							  compressmeta);
-		DBG_BUGON(ret);
+#endif
 	}
-	inode->compressmeta = compressmeta;
-	if (!erofs_is_packed_inode(inode))
-		erofs_droid_blocklist_write(inode, blkaddr, compressed_blocks);
+
 	return 0;
 
 err_free_idata:
@@ -1497,7 +1593,10 @@ err_free_idata:
 		inode->idata = NULL;
 	}
 err_bdrop:
+	if (bh)
 		erofs_bdrop(bh, true);	/* revoke buffer */
+err_free_ictx:
+	free(ictx);
 err_free_meta:
 	free(compressmeta);
 	inode->compressmeta = NULL;
@@ -1681,10 +1780,15 @@ int z_erofs_compress_exit(void)
 		ret = erofs_workqueue_shutdown(&wq);
 		if (ret)
 			return ret;
-		while (idle) {
-			struct erofs_compress_work *tmp = idle->next;
-			free(idle);
-			idle = tmp;
+		while (work_idle) {
+			struct erofs_compress_work *tmp = work_idle->next;
+			free(work_idle);
+			work_idle = tmp;
+		}
+		while (cfile_idle) {
+			struct erofs_compress_file *tmp = cfile_idle->next;
+			free(cfile_idle);
+			cfile_idle = tmp;
 		}
 #endif
 	}
diff --git a/lib/inode.c b/lib/inode.c
index c6424c0..43ee23c 100644
--- a/lib/inode.c
+++ b/lib/inode.c
@@ -27,8 +27,13 @@
 #include "erofs/compress_hints.h"
 #include "erofs/blobchunk.h"
 #include "erofs/fragments.h"
+#ifdef EROFS_MT_ENABLED
+#include "erofs/queue.h"
+#endif
 #include "liberofs_private.h"
 
+extern bool mt_enabled;
+
 #define S_SHIFT                 12
 static unsigned char erofs_ftype_by_mode[S_IFMT >> S_SHIFT] = {
 	[S_IFREG >> S_SHIFT]  = EROFS_FT_REG_FILE,
@@ -477,13 +482,8 @@ static int write_uncompressed_file_from_fd(struct erofs_inode *inode, int fd)
 	return 0;
 }
 
-int erofs_write_file(struct erofs_inode *inode, int fd, u64 fpos)
+static int erofs_write_chunked_file(struct erofs_inode *inode, int fd, u64 fpos)
 {
-	int ret;
-
-	DBG_BUGON(!inode->i_size);
-
-	if (cfg.c_chunkbits) {
 	inode->u.chunkbits = cfg.c_chunkbits;
 	/* chunk indexes when explicitly specified */
 	inode->u.chunkformat = 0;
@@ -492,6 +492,15 @@ int erofs_write_file(struct erofs_inode *inode, int fd, u64 fpos)
 	return erofs_blob_write_chunked_file(inode, fd, fpos);
 }
 
+int erofs_write_file(struct erofs_inode *inode, int fd, u64 fpos)
+{
+	int ret;
+
+	DBG_BUGON(!inode->i_size);
+
+	if (cfg.c_chunkbits)
+		return erofs_write_chunked_file(inode, fd, fpos);
+
 	if (cfg.c_compr_opts[0].alg && erofs_file_is_compressible(inode)) {
 		ret = erofs_write_compressed_file(inode, fd);
 		if (!ret || ret != -ENOSPC)
@@ -1032,6 +1041,9 @@ struct erofs_inode *erofs_new_inode(void)
 	inode->i_ino[0] = sbi.inos++;	/* inode serial number */
 	inode->i_count = 1;
 	inode->datalayout = EROFS_INODE_FLAT_PLAIN;
+#ifdef EROFS_MT_ENABLED
+	inode->cfile = NULL;
+#endif
 
 	init_list_head(&inode->i_hash);
 	init_list_head(&inode->i_subdirs);
@@ -1096,41 +1108,56 @@ static void erofs_fixup_meta_blkaddr(struct erofs_inode *rootdir)
 	rootdir->nid = (off - meta_offset) >> EROFS_ISLOTBITS;
 }
 
-static int erofs_mkfs_build_tree(struct erofs_inode *dir, struct list_head *dirs)
-{
-	int ret;
-	DIR *_dir;
-	struct dirent *dp;
-	struct erofs_dentry *d;
-	unsigned int nr_subdirs, i_nlink;
-
-	ret = erofs_scan_file_xattrs(dir);
-	if (ret < 0)
-		return ret;
-
-	ret = erofs_prepare_xattr_ibody(dir);
-	if (ret < 0)
-		return ret;
+#ifdef EROFS_MT_ENABLED
+#define EROFS_MT_QUEUE_SIZE 256
+struct erofs_queue *erofs_mt_queue;
+#endif
 
-	if (!S_ISDIR(dir->i_mode)) {
-		if (S_ISLNK(dir->i_mode)) {
-			char *const symlink = malloc(dir->i_size);
+static int erofs_mkfs_handle_symlink(struct erofs_inode *inode)
+{
+	int ret = 0;
+	char *const symlink = malloc(inode->i_size);
 
 	if (!symlink)
 		return -ENOMEM;
-			ret = readlink(dir->i_srcpath, symlink, dir->i_size);
+	ret = readlink(inode->i_srcpath, symlink, inode->i_size);
 	if (ret < 0) {
 		free(symlink);
 		return -errno;
 	}
-			ret = erofs_write_file_from_buffer(dir, symlink);
+	ret = erofs_write_file_from_buffer(inode, symlink);
 	free(symlink);
-		} else if (dir->i_size) {
-			int fd = open(dir->i_srcpath, O_RDONLY | O_BINARY);
+
+	return ret;
+}
+
+static int erofs_mkfs_handle_file(struct erofs_inode *inode, bool alloc_buf)
+{
+	int ret = 0;
+
+	if (!alloc_buf) {
+		if (!inode->i_size)
+			return 0;
+
+		if (!S_ISLNK(inode->i_mode) && cfg.c_compr_opts[0].alg &&
+		    erofs_file_is_compressible(inode)) {
+			int fd = open(inode->i_srcpath, O_RDONLY | O_BINARY);
+			if (fd < 0)
+				return -errno;
+			ret = erofs_write_compressed_file(inode, fd);
+		}
+
+		return ret;
+	}
+
+	if (S_ISLNK(inode->i_mode)) {
+		ret = erofs_mkfs_handle_symlink(inode);
+	} else if (inode->i_size) {
+		int fd = open(inode->i_srcpath, O_RDONLY | O_BINARY);
 		if (fd < 0)
 			return -errno;
 
-			ret = erofs_write_file(dir, fd, 0);
+		ret = erofs_write_file(inode, fd, 0);
 		close(fd);
 	} else {
 		ret = 0;
@@ -1138,11 +1165,21 @@ static int erofs_mkfs_build_tree(struct erofs_inode *dir, struct list_head *dirs
 	if (ret)
 		return ret;
 
-		erofs_prepare_inode_buffer(dir);
-		erofs_write_tail_end(dir);
+	erofs_prepare_inode_buffer(inode);
+	erofs_write_tail_end(inode);
 	return 0;
 }
 
+static int erofs_mkfs_handle_dir(struct erofs_inode *dir,
+				 struct list_head *dirs,
+				 bool alloc_buf)
+{
+	int ret;
+	DIR *_dir;
+	struct dirent *dp;
+	struct erofs_dentry *d;
+	unsigned int nr_subdirs = 0, i_nlink;
+
 	_dir = opendir(dir->i_srcpath);
 	if (!_dir) {
 		erofs_err("failed to opendir at %s: %s",
@@ -1186,6 +1223,7 @@ static int erofs_mkfs_build_tree(struct erofs_inode *dir, struct list_head *dirs
 	if (ret)
 		return ret;
 
+	if (alloc_buf) {
 		ret = erofs_prepare_inode_buffer(dir);
 		if (ret)
 			return ret;
@@ -1193,6 +1231,7 @@ static int erofs_mkfs_build_tree(struct erofs_inode *dir, struct list_head *dirs
 
 		if (IS_ROOT(dir))
 			erofs_fixup_meta_blkaddr(dir);
+	}
 
 	i_nlink = 0;
 	list_for_each_entry(d, &dir->i_subdirs, d_child) {
@@ -1205,8 +1244,7 @@ static int erofs_mkfs_build_tree(struct erofs_inode *dir, struct list_head *dirs
 			continue;
 		}
 
-		ret = snprintf(buf, PATH_MAX, "%s/%s",
-			       dir->i_srcpath, d->name);
+		ret = snprintf(buf, PATH_MAX, "%s/%s", dir->i_srcpath, d->name);
 		if (ret < 0 || ret >= PATH_MAX) {
 			/* ignore the too long path */
 			goto fail;
@@ -1253,10 +1291,52 @@ err_closedir:
 	return ret;
 }
 
-struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
+static void erofs_mkfs_print_progessinfo(struct erofs_inode *inode)
+{
+	char *trimmed;
+	trimmed = erofs_trim_for_progressinfo(erofs_fspath(inode->i_srcpath),
+					      sizeof("Processing  ...") - 1);
+	erofs_update_progressinfo("Processing %s ...", trimmed);
+	free(trimmed);
+}
+
+static void erofs_mkfs_dumpdir(struct erofs_inode *dumpdir)
+{
+	struct erofs_inode *inode;
+	while (dumpdir) {
+		inode = dumpdir;
+		erofs_write_dir_file(inode);
+		erofs_write_tail_end(inode);
+		inode->bh->op = &erofs_write_inode_bhops;
+		dumpdir = inode->next_dirwrite;
+		erofs_iput(inode);
+	}	
+}
+
+static int erofs_mkfs_build_tree(struct erofs_inode *dir,
+				 struct list_head *dirs, bool alloc_buf)
+{
+	int ret;
+
+	ret = erofs_scan_file_xattrs(dir);
+	if (ret < 0)
+		return ret;
+
+	ret = erofs_prepare_xattr_ibody(dir);
+	if (ret < 0)
+		return ret;
+
+	if (!S_ISDIR(dir->i_mode))
+		return erofs_mkfs_handle_file(dir, alloc_buf);
+
+	return erofs_mkfs_handle_dir(dir, dirs, alloc_buf);
+}
+
+struct erofs_inode *__erofs_mkfs_build_tree_from_path(const char *path,
+						      bool mt_enabled)
 {
 	LIST_HEAD(dirs);
-	struct erofs_inode *inode, *root, *dumpdir;
+	struct erofs_inode *inode, *root, *dumpdir = NULL;
 
 	root = erofs_iget_from_path(path, true);
 	if (IS_ERR(root))
@@ -1266,43 +1346,169 @@ struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
 	root->i_parent = root;	/* rootdir mark */
 	list_add(&root->i_subdirs, &dirs);
 
-	dumpdir = NULL;
 	do {
 		int err;
-		char *trimmed;
 
 		inode = list_first_entry(&dirs, struct erofs_inode, i_subdirs);
 		list_del(&inode->i_subdirs);
 		init_list_head(&inode->i_subdirs);
 
-		trimmed = erofs_trim_for_progressinfo(
-				erofs_fspath(inode->i_srcpath),
-				sizeof("Processing  ...") - 1);
-		erofs_update_progressinfo("Processing %s ...", trimmed);
-		free(trimmed);
+		if (!mt_enabled)
+			erofs_mkfs_print_progessinfo(inode);
 
-		err = erofs_mkfs_build_tree(inode, &dirs);
+		err = erofs_mkfs_build_tree(inode, &dirs, !mt_enabled);
 		if (err) {
 			root = ERR_PTR(err);
 			break;
 		}
 
+		if (!mt_enabled) {
 			if (S_ISDIR(inode->i_mode)) {
 				inode->next_dirwrite = dumpdir;
 				dumpdir = inode;
 			} else {
 				erofs_iput(inode);
 			}
+		} else {
+#ifdef EROFS_MT_ENABLED
+			erofs_queue_push(erofs_mt_queue, &inode);
+#endif
+		}
 	} while (!list_empty(&dirs));
 
-	while (dumpdir) {
-		inode = dumpdir;
-		erofs_write_dir_file(inode);
+	if (!mt_enabled)
+		erofs_mkfs_dumpdir(dumpdir);
+#ifdef EROFS_MT_ENABLED
+	else
+		erofs_queue_push(erofs_mt_queue, &dumpdir);
+#endif
+	return root;
+}
+
+#ifdef EROFS_MT_ENABLED
+pthread_t erofs_mt_traverser;
+
+void *erofs_mkfs_mt_traverse_task(void *path)
+{
+	pthread_exit((void *)__erofs_mkfs_build_tree_from_path(path, true));
+}
+
+static int erofs_mkfs_reap_compressed_file(struct erofs_inode *inode)
+{
+	struct erofs_compress_file *cfile = inode->cfile;
+	int fd = cfile->fd;
+	int ret = 0;
+
+	pthread_mutex_lock(&cfile->mutex);
+	while (cfile->nfini != cfile->total)
+		pthread_cond_wait(&cfile->cond, &cfile->mutex);
+	pthread_mutex_unlock(&cfile->mutex);
+
+	ret = z_erofs_mt_reap(cfile);
+	if (ret == -ENOSPC) {
+		ret = lseek(fd, 0, SEEK_SET);
+		if (ret < 0)
+			return -errno;
+
+		ret = write_uncompressed_file_from_fd(inode, fd);
+	}
+
+	close(fd);
+	return ret;
+}
+
+static int erofs_mkfs_reap_inodes()
+{
+	struct erofs_inode *inode, *dumpdir;
+	int ret = 0;
+
+	dumpdir = NULL;
+	while (true) {
+		inode = *(struct erofs_inode **)erofs_queue_pop(erofs_mt_queue);
+		if (!inode)
+			break;
+
+		erofs_mkfs_print_progessinfo(inode);
+
+		if (S_ISDIR(inode->i_mode)) {
+			ret = erofs_prepare_inode_buffer(inode);
+			if (ret)
+				goto out;
+			inode->bh->op = &erofs_skip_write_bhops;
+
+			if (IS_ROOT(inode))
+				erofs_fixup_meta_blkaddr(inode);
+
+			inode->next_dirwrite = dumpdir;
+			dumpdir = inode;
+			continue;
+		}
+
+		if (inode->cfile) {
+			ret = erofs_mkfs_reap_compressed_file(inode);
+		} else if (S_ISLNK(inode->i_mode)) {
+			ret = erofs_mkfs_handle_symlink(inode);
+		} else if (!inode->i_size) {
+			ret = 0;
+		} else {
+			int fd = open(inode->i_srcpath, O_RDONLY | O_BINARY);
+			if (fd < 0)
+				return -errno;
+
+			if (cfg.c_chunkbits)
+				ret = erofs_write_chunked_file(inode, fd, 0);
+			else
+				ret = write_uncompressed_file_from_fd(inode,
+								      fd);
+			close(fd);
+		}
+		if (ret)
+			goto out;
+
+		erofs_prepare_inode_buffer(inode);
 		erofs_write_tail_end(inode);
-		inode->bh->op = &erofs_write_inode_bhops;
-		dumpdir = inode->next_dirwrite;
 		erofs_iput(inode);
 	}
+
+	erofs_mkfs_dumpdir(dumpdir);
+
+out:
+	return ret;
+}
+#endif
+
+struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
+{
+#ifdef EROFS_MT_ENABLED
+	int err;
+#endif
+	struct erofs_inode *root = NULL;
+
+	if (!mt_enabled)
+		return __erofs_mkfs_build_tree_from_path(path, false);
+
+#ifdef EROFS_MT_ENABLED
+	erofs_mt_queue = erofs_queue_create(EROFS_MT_QUEUE_SIZE,
+					    sizeof(struct erofs_inode *));
+	if (IS_ERR(erofs_mt_queue))
+		return ERR_CAST(erofs_mt_queue);
+
+	err = pthread_create(&erofs_mt_traverser, NULL,
+			     erofs_mkfs_mt_traverse_task, (void *)path);
+	if (err)
+		return ERR_PTR(err);
+
+	err = erofs_mkfs_reap_inodes();
+	if (err)
+		return ERR_PTR(err);
+
+	err = pthread_join(erofs_mt_traverser, (void *)&root);
+	if (err)
+		return ERR_PTR(err);
+
+	erofs_queue_destroy(erofs_mt_queue);
+#endif
+
 	return root;
 }
 
diff --git a/lib/queue.c b/lib/queue.c
new file mode 100644
index 0000000..b69ac26
--- /dev/null
+++ b/lib/queue.c
@@ -0,0 +1,64 @@
+// SPDX-License-Identifier: GPL-2.0+
+#include "erofs/err.h"
+#include <stdlib.h>
+#include "erofs/queue.h"
+
+struct erofs_queue *erofs_queue_create(size_t size, size_t elem_size)
+{
+	struct erofs_queue *q = malloc(sizeof(*q));
+
+	pthread_mutex_init(&q->lock, NULL);
+	pthread_cond_init(&q->empty, NULL);
+	pthread_cond_init(&q->full, NULL);
+
+	q->size = size;
+	q->elem_size = elem_size;
+	q->head = 0;
+	q->tail = 0;
+	q->buf = calloc(size, elem_size);
+	if (!q->buf)
+		return ERR_PTR(-ENOMEM);
+
+	return q;
+}
+
+void erofs_queue_push(struct erofs_queue *q, void *elem)
+{
+	pthread_mutex_lock(&q->lock);
+
+	while ((q->tail + 1) % q->size == q->head)
+		pthread_cond_wait(&q->full, &q->lock);
+
+	memcpy(q->buf + q->tail * q->elem_size, elem, q->elem_size);
+	q->tail = (q->tail + 1) % q->size;
+
+	pthread_cond_signal(&q->empty);
+	pthread_mutex_unlock(&q->lock);
+}
+
+void *erofs_queue_pop(struct erofs_queue *q)
+{
+    void *elem;
+
+    pthread_mutex_lock(&q->lock);
+
+    while (q->head == q->tail)
+        pthread_cond_wait(&q->empty, &q->lock);
+
+    elem = q->buf + q->head * q->elem_size;
+    q->head = (q->head + 1) % q->size;
+
+    pthread_cond_signal(&q->full);
+    pthread_mutex_unlock(&q->lock);
+
+    return elem;
+}
+
+void erofs_queue_destroy(struct erofs_queue *q)
+{
+	pthread_mutex_destroy(&q->lock);
+	pthread_cond_destroy(&q->empty);
+	pthread_cond_destroy(&q->full);
+	free(q->buf);
+	free(q);
+}
\ No newline at end of file
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH v2 7/7] erofs-utils: mkfs: use per-worker tmpfile for multi-threaded mkfs
  2024-02-20  7:55           ` [PATCH v2 6/7] erofs-utils: mkfs: introduce inter-file " Yifan Zhao
@ 2024-02-20  7:55             ` Yifan Zhao
  0 siblings, 0 replies; 11+ messages in thread
From: Yifan Zhao @ 2024-02-20  7:55 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

Currently, multi-threaded mkfs.erofs creates tmpfiles for each segment
to store the intermediate compression result, reaching the limit of open
files when the number of segments is large.

This patch uses per-worker tmpfiles to avoid this problem if possible,
i.e., the environment supports the fallocate() syscall and
FALLOC_FL_PUNCH_HOLE flag.

Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
---
 lib/compress.c | 68 +++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 54 insertions(+), 14 deletions(-)

diff --git a/lib/compress.c b/lib/compress.c
index d5a5f16..3fae260 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -8,6 +8,9 @@
 #ifndef _LARGEFILE64_SOURCE
 #define _LARGEFILE64_SOURCE
 #endif
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
 #include <string.h>
 #include <stdlib.h>
 #include <unistd.h>
@@ -23,6 +26,13 @@
 #ifdef EROFS_MT_ENABLED
 #include "erofs/workqueue.h"
 #endif
+#ifdef HAVE_LINUX_FALLOC_H
+#include <linux/falloc.h>
+#endif
+
+#if defined(HAVE_FALLOCATE) && defined(FALLOC_FL_PUNCH_HOLE)
+#define USE_PER_WORKER_TMPFILE 1
+#endif
 
 /* compressing configuration specified by users */
 struct erofs_compress_cfg {
@@ -59,6 +69,7 @@ struct z_erofs_vle_compress_ctx {
 
 	int seg_num, seg_idx;
 	FILE *tmpfile;
+	off_t tmpfile_off;
 };
 
 struct z_erofs_write_index_ctx {
@@ -75,6 +86,7 @@ struct erofs_compress_wq_private {
 	u8 *queue;
 	char *destbuf;
 	struct erofs_compress_cfg *ccfg;
+	FILE* tmpfile;
 };
 
 struct erofs_compress_work {
@@ -402,6 +414,7 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
 		ret = fwrite(dst, erofs_blksiz(sbi), 1, ctx->tmpfile);
 		if (ret != 1)
 			return -EIO;
+		fflush(ctx->tmpfile);
 	} else {
 		erofs_dbg("Writing %u uncompressed data to block %u", count,
 			  ctx->blkaddr);
@@ -1073,6 +1086,7 @@ void z_erofs_init_ctx(struct z_erofs_vle_compress_ctx *ctx,
 	ctx->tof_chksum = tof_chksum;
 	ctx->fd = fd;
 	ctx->tmpfile = NULL;
+	ctx->tmpfile_off = 0;
 	init_list_head(&ctx->extents);
 }
 
@@ -1169,7 +1183,7 @@ int z_erofs_mt_private_init(struct erofs_sb_info *sbi,
 	struct erofs_compress_cfg *lc;
 	int ret;
 
-	if (!priv->init) {
+	if (unlikely(!priv->init)) {
 		priv->init = true;
 
 		priv->queue = malloc(EROFS_COMPR_QUEUE_SZ);
@@ -1185,6 +1199,16 @@ int z_erofs_mt_private_init(struct erofs_sb_info *sbi,
 				    sizeof(struct erofs_compress_cfg));
 		if (!priv->ccfg)
 			return -ENOMEM;
+
+#ifdef USE_PER_WORKER_TMPFILE
+#ifndef HAVE_TMPFILE64
+		priv->tmpfile = tmpfile();
+#else
+		priv->tmpfile = tmpfile64();
+#endif
+		if (!priv->tmpfile)
+			return -errno;
+#endif
 	}
 
 	lc = &priv->ccfg[alg_id];
@@ -1214,6 +1238,9 @@ void z_erofs_mt_private_fini(void *private)
 		free(priv->ccfg);
 		free(priv->destbuf);
 		free(priv->queue);
+#ifdef USE_PER_WORKER_TMPFILE
+		fclose(priv->tmpfile);
+#endif
 		priv->init = false;
 	}
 }
@@ -1237,24 +1264,30 @@ void z_erofs_mt_work(struct erofs_work *work)
 	ctx->queue = priv->queue;
 	ctx->destbuf = priv->destbuf;
 	ctx->chandle = &priv->ccfg[cwork->alg_id].handle;
-
+#ifdef USE_PER_WORKER_TMPFILE
+	ctx->tmpfile = priv->tmpfile;
+	ctx->tmpfile_off = ftell(ctx->tmpfile);
+	if (ctx->tmpfile_off == -1) {
+		ret = -errno;
+		goto out;
+	}
+#else
 #ifdef HAVE_TMPFILE64
 	ctx->tmpfile = tmpfile64();
 #else
 	ctx->tmpfile = tmpfile();
 #endif
-
 	if (!ctx->tmpfile) {
 		ret = -errno;
 		goto out;
 	}
+	ctx->tmpfile_off = 0;
+#endif
 
 	ret = z_erofs_compress_file(ctx, offset, blkaddr);
 	if (ret)
 		goto out;
 
-	fflush(ctx->tmpfile);
-
 	if (ctx->seg_idx == ctx->seg_num - 1)
 		ret = z_erofs_handle_fragments(ctx);
 
@@ -1274,6 +1307,7 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr,
 	struct erofs_sb_info *sbi = cur->ctx.inode->sbi;
 	struct z_erofs_write_index_ctx *ictx = cfile->ictx;
 	char *memblock = NULL;
+	size_t size = 0;
 	int ret = 0, lret;
 
 	while (cur != NULL) {
@@ -1290,28 +1324,32 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr,
 			goto out;
 		}
 
-		memblock = realloc(memblock,
-				   ctx->compressed_blocks * erofs_blksiz(sbi));
+		size = ctx->compressed_blocks * erofs_blksiz(sbi);
+		memblock = realloc(memblock, size);
 		if (!memblock) {
 			if (!ret)
 				ret = -ENOMEM;
 			goto out;
 		}
 
-		lret = fseek(ctx->tmpfile, 0, SEEK_SET);
-		if (lret) {
+		lret = pread(fileno(ctx->tmpfile), memblock, size,
+			     ctx->tmpfile_off);
+		if (lret != size) {
 			if (!ret)
-				ret = lret;
+				ret = -errno;
 			goto out;
 		}
 
-		lret = fread(memblock, erofs_blksiz(sbi),
-			     ctx->compressed_blocks, ctx->tmpfile);
-		if (lret != ctx->compressed_blocks) {
+#ifdef USE_PER_WORKER_TMPFILE
+		lret = fallocate(fileno(ctx->tmpfile),
+				 FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
+				 ctx->tmpfile_off, size);
+		if (lret) {
 			if (!ret)
-				ret = -EIO;
+				ret = -errno;
 			goto out;
 		}
+#endif
 
 		lret = blk_write(sbi, memblock, blkaddr + *compressed_blocks,
 				 ctx->compressed_blocks);
@@ -1323,7 +1361,9 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr,
 		*compressed_blocks += ctx->compressed_blocks;
 
 out:
+#ifndef USE_PER_WORKER_TMPFILE
 		fclose(ctx->tmpfile);
+#endif
 
 		tmp = cur->next;
 		pthread_mutex_lock(&work_mutex);
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* Re: [PATCH v2 1/7] erofs-utils: introduce multi-threading framework
  2024-02-20  7:55 ` [PATCH v2 1/7] erofs-utils: introduce multi-threading framework Yifan Zhao
  2024-02-20  7:55   ` [PATCH v2 2/7] erofs-utils: add a helper to get available processors Yifan Zhao
@ 2024-02-22  2:37   ` Gao Xiang
  1 sibling, 0 replies; 11+ messages in thread
From: Gao Xiang @ 2024-02-22  2:37 UTC (permalink / raw
  To: Yifan Zhao; +Cc: linux-erofs



On 2024/2/20 15:55, Yifan Zhao wrote:
> Add a workqueue implementation for multi-threading support inspired by
> xfsprogs.
> 
> Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
> Suggested-by: Gao Xiang <hsiangkao@linux.alibaba.com>
> ---
>   configure.ac              |  16 +++++
>   include/erofs/internal.h  |   3 +
>   include/erofs/workqueue.h |  38 +++++++++++
>   lib/Makefile.am           |   4 ++
>   lib/workqueue.c           | 132 ++++++++++++++++++++++++++++++++++++++
>   5 files changed, 193 insertions(+)
>   create mode 100644 include/erofs/workqueue.h
>   create mode 100644 lib/workqueue.c
> 
> diff --git a/configure.ac b/configure.ac
> index bf6e99f..53306c3 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY],
>   
>   AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which erofs-utils supports])
>   
> +AC_MSG_CHECKING([whether to enable multi-threading support])
> +AC_ARG_ENABLE([multithreading],
> +    AS_HELP_STRING([--enable-multithreading],
> +                   [enable multi-threading support @<:@default=no@:>@]),
> +    [enable_multithreading="$enableval"],
> +    [enable_multithreading="no"])
> +AC_MSG_RESULT([$enable_multithreading])
> +
>   AC_ARG_ENABLE([debug],
>       [AS_HELP_STRING([--enable-debug],
>                       [enable debugging mode @<:@default=no@:>@])],
> @@ -288,6 +296,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [
>                                [erofs_cv_max_block_size=4096]))
>   ], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE])
>   
> +# Configure multi-threading support
> +AS_IF([test "x$enable_multithreading" != "xno"], [
> +    AC_CHECK_HEADERS([pthread.h])
> +    AC_CHECK_LIB([pthread], [pthread_mutex_lock], [], AC_MSG_ERROR([libpthread is required for multi-threaded build]))
> +    AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support])
> +], [])
> +
>   # Configure debug mode
>   AS_IF([test "x$enable_debug" != "xno"], [], [
>     dnl Turn off all assert checking.
> @@ -467,6 +482,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [
>   AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"])
>   
>   # Set up needed symbols, conditionals and compiler/linker flags
> +AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}" != "xno"])
>   AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"])
>   AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"])
>   AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"])
> diff --git a/include/erofs/internal.h b/include/erofs/internal.h
> index 82797e1..954aef4 100644
> --- a/include/erofs/internal.h
> +++ b/include/erofs/internal.h
> @@ -22,6 +22,9 @@ typedef unsigned short umode_t;
>   #include <sys/types.h> /* for off_t definition */
>   #include <sys/stat.h> /* for S_ISCHR definition */
>   #include <stdio.h>
> +#ifdef HAVE_PTHREAD_H
> +#include <pthread.h>
> +#endif
>   
>   #ifndef PATH_MAX
>   #define PATH_MAX        4096    /* # chars in a path name including nul */
> diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
> new file mode 100644
> index 0000000..857947b
> --- /dev/null
> +++ b/include/erofs/workqueue.h
> @@ -0,0 +1,38 @@
> +/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */
> +#ifndef __EROFS_WORKQUEUE_H
> +#define __EROFS_WORKQUEUE_H
> +
> +#include "internal.h"
> +
> +struct erofs_work;
> +
> +typedef void erofs_wq_func_t(struct erofs_work *);
> +typedef void erofs_wq_priv_fini_t(void *);
> +
> +struct erofs_work {
> +	void (*func)(struct erofs_work *work);
> +	struct erofs_work *next;
> +	void *priv;
> +};
> +
> +struct erofs_workqueue {
> +	struct erofs_work *head;
> +	struct erofs_work *tail;

I'd suggest
struct erofs_work *head, *tail;

since they seem the same functionality.

> +	pthread_mutex_t lock;
> +	pthread_cond_t cond_empty;
> +	pthread_cond_t cond_full;
> +	pthread_t *workers;
> +	unsigned int nworker;
> +	unsigned int max_jobs;
> +	unsigned int job_count;
> +	bool shutdown;
> +	size_t priv_size;
> +	erofs_wq_priv_fini_t *priv_fini;
> +};
> +
> +int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker,
> +			 unsigned int max_jobs, size_t priv_size,
> +			 erofs_wq_priv_fini_t *priv_fini);
> +int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work);
> +int erofs_workqueue_shutdown(struct erofs_workqueue *wq);
> +#endif
> \ No newline at end of file
> diff --git a/lib/Makefile.am b/lib/Makefile.am
> index 54b9c9c..7307f7b 100644
> --- a/lib/Makefile.am
> +++ b/lib/Makefile.am
> @@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c compressor_deflate.c
>   if ENABLE_LIBDEFLATE
>   liberofs_la_SOURCES += compressor_libdeflate.c
>   endif
> +if ENABLE_EROFS_MT
> +liberofs_la_CFLAGS += -lpthread
> +liberofs_la_SOURCES += workqueue.c
> +endif
> diff --git a/lib/workqueue.c b/lib/workqueue.c
> new file mode 100644
> index 0000000..3ec6142
> --- /dev/null
> +++ b/lib/workqueue.c
> @@ -0,0 +1,132 @@
> +// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
> +#include <pthread.h>
> +#include <stdlib.h>
> +#include "erofs/workqueue.h"
> +
> +static void *worker_thread(void *arg)
> +{
> +	struct erofs_workqueue *wq = arg;
> +	struct erofs_work *work;
> +	void *priv = NULL;
> +
> +	if (wq->priv_size) {
> +		priv = calloc(wq->priv_size, 1);
> +		assert(priv);
> +	}
> +
> +	while (true) {
> +		pthread_mutex_lock(&wq->lock);
> +
> +		while (wq->job_count == 0 && !wq->shutdown)
> +			pthread_cond_wait(&wq->cond_empty, &wq->lock);
> +		if (wq->job_count == 0 && wq->shutdown) {
> +			pthread_mutex_unlock(&wq->lock);
> +			break;
> +		}
> +
> +		work = wq->head;
> +		wq->head = work->next;
> +		if (!wq->head)
> +			wq->tail = NULL;
> +		wq->job_count--;
> +
> +		if (wq->job_count == wq->max_jobs - 1)
> +			pthread_cond_broadcast(&wq->cond_full);
> +
> +		pthread_mutex_unlock(&wq->lock);
> +
> +		work->priv = priv;
> +		work->func(work);
> +	}
> +
> +	if (priv) {
> +		assert(wq->priv_fini);
> +		(wq->priv_fini)(priv);
> +		free(priv);
> +	}
> +
> +	return NULL;
> +}
> +
> +int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker,
> +			 unsigned int max_jobs, size_t priv_size,
> +			 erofs_wq_priv_fini_t *priv_fini)

Let's match kernel workqueue naming...

erofs_alloc_workqueue...

> +{
> +	unsigned int i;
> +
> +	if (!wq || nworker <= 0 || max_jobs <= 0)
> +		return -EINVAL;
> +
> +	wq->head = wq->tail = NULL;
> +	wq->nworker = nworker;
> +	wq->max_jobs = max_jobs;
> +	wq->job_count = 0;
> +	wq->shutdown = false;
> +	wq->priv_size = priv_size;
> +	wq->priv_fini = priv_fini;
> +	pthread_mutex_init(&wq->lock, NULL);
> +	pthread_cond_init(&wq->cond_empty, NULL);
> +	pthread_cond_init(&wq->cond_full, NULL);
> +
> +	wq->workers = malloc(nworker * sizeof(pthread_t));
> +	if (!wq->workers)
> +		return -ENOMEM;
> +
> +	for (i = 0; i < nworker; i++) {
> +		if (pthread_create(&wq->workers[i], NULL, worker_thread, wq)) {
> +			while (i--)
> +				pthread_cancel(wq->workers[i]);
> +			free(wq->workers);
> +			return -ENOMEM;
> +		}
> +	}
> +
> +	return 0;
> +}
> +
> +int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work)

erofs_queue_work


> +{
> +	if (!wq || !work)
> +		return -EINVAL;
> +
> +	pthread_mutex_lock(&wq->lock);
> +
> +	while (wq->job_count == wq->max_jobs)
> +		pthread_cond_wait(&wq->cond_full, &wq->lock);
> +
> +	work->next = NULL;
> +	if (!wq->head)
> +		wq->head = work;
> +	else
> +		wq->tail->next = work;
> +	wq->tail = work;
> +	wq->job_count++;
> +
> +	pthread_cond_signal(&wq->cond_empty);
> +	pthread_mutex_unlock(&wq->lock);
> +
> +	return 0;
> +}
> +
> +int erofs_workqueue_shutdown(struct erofs_workqueue *wq)

erofs_destroy_workqueue

Thanks,
Gao Xiang

> +{
> +	unsigned int i;
> +
> +	if (!wq)
> +		return -EINVAL;
> +
> +	pthread_mutex_lock(&wq->lock);
> +	wq->shutdown = true;
> +	pthread_cond_broadcast(&wq->cond_empty);
> +	pthread_mutex_unlock(&wq->lock);
> +
> +	for (i = 0; i < wq->nworker; i++)
> +		pthread_join(wq->workers[i], NULL);
> +
> +	free(wq->workers);
> +	pthread_mutex_destroy(&wq->lock);
> +	pthread_cond_destroy(&wq->cond_empty);
> +	pthread_cond_destroy(&wq->cond_full);
> +
> +	return 0;
> +}
> \ No newline at end of file

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH v2 4/7] erofs-utils: mkfs: optionally print warning in erofs_compressor_init
  2024-02-20  7:55       ` [PATCH v2 4/7] erofs-utils: mkfs: optionally print warning in erofs_compressor_init Yifan Zhao
  2024-02-20  7:55         ` [PATCH v2 5/7] erofs-utils: mkfs: introduce inner-file multi-threaded compression Yifan Zhao
@ 2024-02-22  2:54         ` Gao Xiang
  1 sibling, 0 replies; 11+ messages in thread
From: Gao Xiang @ 2024-02-22  2:54 UTC (permalink / raw
  To: Yifan Zhao; +Cc: linux-erofs



On 2024/2/20 15:55, Yifan Zhao wrote:
> In the incoming multi-threaded compression support, compressor may be
> initialized more than once in different worker threads, resulting in
> noisy warning output. This patch make sure that each warning message is
> printed only once by adding a print_warning option to the
> erofs_compressor_init() interface.
> 
> Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
> ---
>   lib/compress.c              | 3 ++-
>   lib/compressor.c            | 5 +++--
>   lib/compressor.h            | 5 +++--
>   lib/compressor_deflate.c    | 4 +++-
>   lib/compressor_libdeflate.c | 4 +++-
>   lib/compressor_liblzma.c    | 5 ++++-
>   lib/compressor_lz4.c        | 2 +-
>   lib/compressor_lz4hc.c      | 2 +-
>   8 files changed, 20 insertions(+), 10 deletions(-)
> 
> diff --git a/lib/compress.c b/lib/compress.c
> index 9611102..41cb6e5 100644
> --- a/lib/compress.c
> +++ b/lib/compress.c
> @@ -1213,7 +1213,8 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
>   
>   		ret = erofs_compressor_init(sbi, c, cfg.c_compr_opts[i].alg,
>   					    cfg.c_compr_opts[i].level,
> -					    cfg.c_compr_opts[i].dict_size);
> +					    cfg.c_compr_opts[i].dict_size,
> +					    true);
>   		if (ret)
>   			return ret;
>   
> diff --git a/lib/compressor.c b/lib/compressor.c
> index 4720e72..9b3794b 100644
> --- a/lib/compressor.c
> +++ b/lib/compressor.c
> @@ -78,7 +78,8 @@ int erofs_compress_destsize(const struct erofs_compress *c,
>   }
>   
>   int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c,
> -			  char *alg_name, int compression_level, u32 dict_size)
> +			  char *alg_name, int compression_level, u32 dict_size,
> +			  bool print_warning)

No need add another variable just for this.

>   {
>   	int ret, i;
>   
> @@ -126,7 +127,7 @@ int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c,
>   			return -EINVAL;
>   		}
>   
> -		ret = erofs_algs[i].c->init(c);
> +		ret = erofs_algs[i].c->init(c, print_warning);
>   		if (ret)
>   			return ret;
>   
> diff --git a/lib/compressor.h b/lib/compressor.h
> index d8ccf2e..522fde0 100644
> --- a/lib/compressor.h
> +++ b/lib/compressor.h
> @@ -17,7 +17,7 @@ struct erofs_compressor {
>   	u32 default_dictsize;
>   	u32 max_dictsize;
>   
> -	int (*init)(struct erofs_compress *c);
> +	int (*init)(struct erofs_compress *c, bool print_warning);
>   	int (*exit)(struct erofs_compress *c);
>   	int (*setlevel)(struct erofs_compress *c, int compression_level);
>   	int (*setdictsize)(struct erofs_compress *c, u32 dict_size);
> @@ -60,7 +60,8 @@ int erofs_compress_destsize(const struct erofs_compress *c,
>   			    void *dst, unsigned int dstsize);
>   
>   int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c,
> -			  char *alg_name, int compression_level, u32 dict_size);
> +			  char *alg_name, int compression_level, u32 dict_size,
> +			  bool print_warning);
>   int erofs_compressor_exit(struct erofs_compress *c);
>   
>   #endif
> diff --git a/lib/compressor_deflate.c b/lib/compressor_deflate.c
> index 8629415..9fe067f 100644
> --- a/lib/compressor_deflate.c
> +++ b/lib/compressor_deflate.c
> @@ -34,7 +34,7 @@ static int compressor_deflate_exit(struct erofs_compress *c)
>   	return 0;
>   }
>   
> -static int compressor_deflate_init(struct erofs_compress *c)
> +static int compressor_deflate_init(struct erofs_compress *c, bool print_warning)
>   {

I'd suggest just use a static atomic variable "workers" here, see:

https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html

>   	if (c->private_data) {
>   		kite_deflate_end(c->private_data);
> @@ -44,9 +44,11 @@ static int compressor_deflate_init(struct erofs_compress *c)
>   	if (IS_ERR_VALUE(c->private_data))
>   		return PTR_ERR(c->private_data);
>   
> +	if (print_warning) {

and if (__atomic_add_fetch(1) == 1), print warning:

>   		erofs_warn("EXPERIMENTAL DEFLATE algorithm in use. Use at your own risk!");
>   		erofs_warn("*Carefully* check filesystem data correctness to avoid corruption!");
>   		erofs_warn("Please send a report to <linux-erofs@lists.ozlabs.org> if something is wrong.");
> +	}
>   	return 0;
>   }
>   
> diff --git a/lib/compressor_libdeflate.c b/lib/compressor_libdeflate.c
> index 62d93f7..0583868 100644
> --- a/lib/compressor_libdeflate.c
> +++ b/lib/compressor_libdeflate.c
> @@ -80,13 +80,15 @@ static int compressor_libdeflate_exit(struct erofs_compress *c)
>   	return 0;
>   }
>   
> -static int compressor_libdeflate_init(struct erofs_compress *c)
> +static int compressor_libdeflate_init(struct erofs_compress *c,
> +				      bool print_warning)
>   {
>   	libdeflate_free_compressor(c->private_data);
>   	c->private_data = libdeflate_alloc_compressor(c->compression_level);
>   	if (!c->private_data)
>   		return -ENOMEM;
>   
> +	if (print_warning)

same here.

>   		erofs_warn("EXPERIMENTAL libdeflate compressor in use. Use at your own risk!");
>   	return 0;
>   }
> diff --git a/lib/compressor_liblzma.c b/lib/compressor_liblzma.c
> index 7183b0b..b048e57 100644
> --- a/lib/compressor_liblzma.c
> +++ b/lib/compressor_liblzma.c
> @@ -81,7 +81,8 @@ static int erofs_compressor_liblzma_setdictsize(struct erofs_compress *c,
>   	return 0;
>   }
>   
> -static int erofs_compressor_liblzma_init(struct erofs_compress *c)
> +static int erofs_compressor_liblzma_init(struct erofs_compress *c,
> +					 bool print_warning)
>   {
>   	struct erofs_liblzma_context *ctx;
>   	u32 preset;
> @@ -103,8 +104,10 @@ static int erofs_compressor_liblzma_init(struct erofs_compress *c)
>   	ctx->opt.dict_size = c->dict_size;
>   
>   	c->private_data = ctx;
> +	if (print_warning) {

same here.

Thanks,
Gao Xiang

>   		erofs_warn("EXPERIMENTAL MicroLZMA feature in use. Use at your own risk!");
>   		erofs_warn("Note that it may take more time since the compressor is still single-threaded for now.");
> +	}
>   	return 0;
>   }
>   
> diff --git a/lib/compressor_lz4.c b/lib/compressor_lz4.c
> index f4e72c3..6aed213 100644
> --- a/lib/compressor_lz4.c
> +++ b/lib/compressor_lz4.c
> @@ -30,7 +30,7 @@ static int compressor_lz4_exit(struct erofs_compress *c)
>   	return 0;
>   }
>   
> -static int compressor_lz4_init(struct erofs_compress *c)
> +static int compressor_lz4_init(struct erofs_compress *c, bool print_warning)
>   {
>   	c->sbi->lz4_max_distance = LZ4_DISTANCE_MAX;
>   	return 0;
> diff --git a/lib/compressor_lz4hc.c b/lib/compressor_lz4hc.c
> index 6fc8847..3d10aa8 100644
> --- a/lib/compressor_lz4hc.c
> +++ b/lib/compressor_lz4hc.c
> @@ -37,7 +37,7 @@ static int compressor_lz4hc_exit(struct erofs_compress *c)
>   	return 0;
>   }
>   
> -static int compressor_lz4hc_init(struct erofs_compress *c)
> +static int compressor_lz4hc_init(struct erofs_compress *c, bool print_warning)
>   {
>   	c->private_data = LZ4_createStreamHC();
>   	if (!c->private_data)

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH v2 3/7] erofs-utils: mkfs: add --worker=# parameter
  2024-02-20  7:55     ` [PATCH v2 3/7] erofs-utils: mkfs: add --worker=# parameter Yifan Zhao
  2024-02-20  7:55       ` [PATCH v2 4/7] erofs-utils: mkfs: optionally print warning in erofs_compressor_init Yifan Zhao
@ 2024-02-22 16:40       ` Gao Xiang
  1 sibling, 0 replies; 11+ messages in thread
From: Gao Xiang @ 2024-02-22 16:40 UTC (permalink / raw
  To: Yifan Zhao; +Cc: linux-erofs



On 2024/2/20 15:55, Yifan Zhao wrote:
> This patch introduces a --worker=# parameter for the incoming
> multi-threaded compression support. It also introduces a segment size
> used in multi-threaded compression, which has the default value 16MB
> and cannot be modified.

It also introduces a concept called `segment size` to split large files
for multi-threading, which has the default value 16MB for now.

> 
> Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
> ---
>   include/erofs/config.h |  4 ++++
>   lib/config.c           |  4 ++++
>   mkfs/main.c            | 38 ++++++++++++++++++++++++++++++++++++++
>   3 files changed, 46 insertions(+)
> 
> diff --git a/include/erofs/config.h b/include/erofs/config.h
> index 73e3ac2..d19094e 100644
> --- a/include/erofs/config.h
> +++ b/include/erofs/config.h
> @@ -75,6 +75,10 @@ struct erofs_configure {
>   	char c_force_chunkformat;
>   	/* < 0, xattr disabled and INT_MAX, always use inline xattrs */
>   	int c_inline_xattr_tolerance;
> +#ifdef EROFS_MT_ENABLED
> +	u64 c_mt_segment_size;


I think mt_ prefix is not needed: c_segment_size;


> +	u32 c_mt_worker_num;
c_mt_workers;

> +#endif
>   
>   	u32 c_pclusterblks_max, c_pclusterblks_def, c_pclusterblks_packed;
>   	u32 c_max_decompressed_extent_bytes;
> diff --git a/lib/config.c b/lib/config.c
> index 947a183..8add06d 100644
> --- a/lib/config.c
> +++ b/lib/config.c
> @@ -38,6 +38,10 @@ void erofs_init_configure(void)
>   	cfg.c_pclusterblks_max = 1;
>   	cfg.c_pclusterblks_def = 1;
>   	cfg.c_max_decompressed_extent_bytes = -1;
> +#ifdef EROFS_MT_ENABLED
> +	cfg.c_mt_segment_size = 16ULL * 1024 * 1024;
> +	cfg.c_mt_worker_num = 1;
> +#endif
>   
>   	erofs_stdout_tty = isatty(STDOUT_FILENO);
>   }
> diff --git a/mkfs/main.c b/mkfs/main.c
> index 7aea64a..3882533 100644
> --- a/mkfs/main.c
> +++ b/mkfs/main.c
> @@ -73,6 +73,9 @@ static struct option long_options[] = {
>   	{"gzip", no_argument, NULL, 517},
>   #endif
>   	{"offset", required_argument, NULL, 518},
> +#ifdef EROFS_MT_ENABLED
> +	{"worker", required_argument, NULL, 519},

let's use `--workers=#` instead of `worker`.

> +#endif
>   	{0, 0, 0, 0},
>   };
>   
> @@ -175,6 +178,9 @@ static void usage(int argc, char **argv)
>   		" --product-out=X       X=product_out directory\n"
>   		" --fs-config-file=X    X=fs_config file\n"
>   		" --block-list-file=X   X=block_list file\n"
> +#endif
> +#ifdef EROFS_MT_ENABLED
> +		" --worker=#            set the number of worker threads to # (default=1)\n"

--workers=#

Thanks,
Gao Xiang

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2024-02-22 16:40 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-02-20  7:55 [PATCH v2 0/7] erofs-utils: mkfs: introduce multi-threaded compression Yifan Zhao
2024-02-20  7:55 ` [PATCH v2 1/7] erofs-utils: introduce multi-threading framework Yifan Zhao
2024-02-20  7:55   ` [PATCH v2 2/7] erofs-utils: add a helper to get available processors Yifan Zhao
2024-02-20  7:55     ` [PATCH v2 3/7] erofs-utils: mkfs: add --worker=# parameter Yifan Zhao
2024-02-20  7:55       ` [PATCH v2 4/7] erofs-utils: mkfs: optionally print warning in erofs_compressor_init Yifan Zhao
2024-02-20  7:55         ` [PATCH v2 5/7] erofs-utils: mkfs: introduce inner-file multi-threaded compression Yifan Zhao
2024-02-20  7:55           ` [PATCH v2 6/7] erofs-utils: mkfs: introduce inter-file " Yifan Zhao
2024-02-20  7:55             ` [PATCH v2 7/7] erofs-utils: mkfs: use per-worker tmpfile for multi-threaded mkfs Yifan Zhao
2024-02-22  2:54         ` [PATCH v2 4/7] erofs-utils: mkfs: optionally print warning in erofs_compressor_init Gao Xiang
2024-02-22 16:40       ` [PATCH v2 3/7] erofs-utils: mkfs: add --worker=# parameter Gao Xiang
2024-02-22  2:37   ` [PATCH v2 1/7] erofs-utils: introduce multi-threading framework Gao Xiang

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.