dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH] wtf
@ 2015-12-29 10:35 Eric Wong
  0 siblings, 0 replies; only message in thread
From: Eric Wong @ 2015-12-29 10:35 UTC (permalink / raw)
  To: spew

---
 thread_pthread.c | 56 ++++++++++++++++++++++++++++++++++----------------------
 1 file changed, 34 insertions(+), 22 deletions(-)

diff --git a/thread_pthread.c b/thread_pthread.c
index 2a15816..3954eb6 100644
--- a/thread_pthread.c
+++ b/thread_pthread.c
@@ -1241,12 +1241,12 @@ static struct {
     int normal[2];
     int low[2];
 
-    /* volatile for signal handler use: */
-    volatile rb_pid_t owner_process;
+    /* atomic for both signal handler and thread-safety */
+    rb_atomic_t owner_process;
     rb_atomic_t writing;
 } timer_thread_pipe = {
-    {-1, -1},
-    {-1, -1}, /* low priority */
+    {-99, -99},
+    {-99, -99}, /* low priority */
 };
 
 NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
@@ -1261,15 +1261,20 @@ async_bug_fd(const char *mesg, int errno_arg, int fd)
     rb_async_bug_errno(buff, errno_arg);
 }
 
+static int timer_thread_owner_p(void)
+{
+    return ATOMIC_OR(timer_thread_pipe.owner_process, 0) ==
+	    (rb_atomic_t)getpid();
+}
+
 /* only use signal-safe system calls here */
 static void
-rb_thread_wakeup_timer_thread_fd(volatile int *fdp)
+rb_thread_wakeup_timer_thread_fd(int fd)
 {
     ssize_t result;
-    int fd = *fdp; /* access fdp exactly once here and do not reread fdp */
 
     /* already opened */
-    if (fd >= 0 && timer_thread_pipe.owner_process == getpid()) {
+    if (fd >= 0 && timer_thread_owner_p()) {
 	static const char buff[1] = {'!'};
       retry:
 	if ((result = write(fd, buff, 1)) <= 0) {
@@ -1297,7 +1302,7 @@ rb_thread_wakeup_timer_thread(void)
 {
     /* must be safe inside sighandler, so no mutex */
     ATOMIC_INC(timer_thread_pipe.writing);
-    rb_thread_wakeup_timer_thread_fd(&timer_thread_pipe.normal[1]);
+    rb_thread_wakeup_timer_thread_fd(ATOMIC_OR(timer_thread_pipe.normal[1], 0));
     ATOMIC_DEC(timer_thread_pipe.writing);
 }
 
@@ -1305,7 +1310,7 @@ static void
 rb_thread_wakeup_timer_thread_low(void)
 {
     ATOMIC_INC(timer_thread_pipe.writing);
-    rb_thread_wakeup_timer_thread_fd(&timer_thread_pipe.low[1]);
+    rb_thread_wakeup_timer_thread_fd(ATOMIC_OR(timer_thread_pipe.low[1], 0));
     ATOMIC_DEC(timer_thread_pipe.writing);
 }
 
@@ -1390,11 +1395,11 @@ setup_communication_pipe_internal(int pipes[2])
 static int
 setup_communication_pipe(void)
 {
-    VM_ASSERT(timer_thread_pipe.owner_process == 0);
-    VM_ASSERT(timer_thread_pipe.normal[0] == -1);
-    VM_ASSERT(timer_thread_pipe.normal[1] == -1);
-    VM_ASSERT(timer_thread_pipe.low[0] == -1);
-    VM_ASSERT(timer_thread_pipe.low[1] == -1);
+    VM_ASSERT(ATOMIC_OR(timer_thread_pipe.owner_process, 0) == 0);
+    VM_ASSERT(timer_thread_pipe.normal[0] < 0);
+    VM_ASSERT(timer_thread_pipe.normal[1] < 0);
+    VM_ASSERT(timer_thread_pipe.low[0] < 0);
+    VM_ASSERT(timer_thread_pipe.low[1] < 0);
 
     if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) {
 	return errno;
@@ -1407,7 +1412,7 @@ setup_communication_pipe(void)
     }
 
     /* validate pipe on this process */
-    timer_thread_pipe.owner_process = getpid();
+    ATOMIC_SET(timer_thread_pipe.owner_process, (rb_atomic_t)getpid());
     return 0;
 }
 
@@ -1631,8 +1636,16 @@ native_stop_timer_thread(void)
     if (TT_DEBUG) fprintf(stderr, "stop timer thread\n");
 #if USE_SLEEPY_TIMER_THREAD
     if (stopped) {
+	rb_atomic_t cur = (rb_atomic_t)getpid();
+	int normal_fd, low_fd;
+
 	/* prevent wakeups from signal handler ASAP */
-	timer_thread_pipe.owner_process = 0;
+	if (ATOMIC_CAS(timer_thread_pipe.owner_process, cur, 0) != cur) {
+	    rb_async_bug_errno("stopping timer thread from wrong process", 0);
+	}
+
+	normal_fd = ATOMIC_EXCHANGE(timer_thread_pipe.normal[1], -2);
+	low_fd = ATOMIC_EXCHANGE(timer_thread_pipe.low[1], -2);
 
 	/*
 	 * however, the above was not enough: the FD may already be
@@ -1643,16 +1656,15 @@ native_stop_timer_thread(void)
 	    native_thread_yield();
 	}
 
-	/* stop writing ends of pipes so timer thread notices EOF */
-	CLOSE_INVALIDATE(normal[1]);
-	CLOSE_INVALIDATE(low[1]);
+	close_invalidate(&normal_fd, "close_invalidate: normal_fd");
+	close_invalidate(&low_fd, "close_invalidate: low_fd");
 
 	/* timer thread will stop looping when system_working <= 0: */
 	native_thread_join(timer_thread.id);
 
 	/* timer thread will close the read end on exit: */
-	VM_ASSERT(timer_thread_pipe.normal[0] == -1);
-	VM_ASSERT(timer_thread_pipe.low[0] == -1);
+	VM_ASSERT(timer_thread_pipe.normal[0] < 0);
+	VM_ASSERT(timer_thread_pipe.low[0] < 0);
 
 	if (TT_DEBUG) fprintf(stderr, "joined timer thread\n");
 	timer_thread.created = 0;
@@ -1719,7 +1731,7 @@ rb_reserved_fd_p(int fd)
 	 fd == timer_thread_pipe.normal[1] ||
 	 fd == timer_thread_pipe.low[0] ||
 	 fd == timer_thread_pipe.low[1]) &&
-	timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */
+	timer_thread_owner_p()) { /* async-signal-safe */
 	return 1;
     }
     else {
-- 
EW


^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2015-12-29 10:35 UTC | newest]

Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2015-12-29 10:35 [PATCH] wtf Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).