From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.1 (2015-04-28) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS198093 171.25.193.0/24 X-Spam-Status: No, score=-2.6 required=3.0 tests=BAYES_00,RCVD_IN_XBL,SPF_FAIL, SPF_HELO_FAIL,TO_EQ_FM_DOM_SPF_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.1 Received: from 80x24.org (tor-exit3-readme.dfri.se [171.25.193.235]) by dcvr.yhbt.net (Postfix) with ESMTP id 6EE081F404 for ; Sat, 1 Sep 2018 13:10:15 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH 1/2] unify sync_waiter, waitpid_state, waiting_fd w/ rb_sched_waiter Date: Sat, 1 Sep 2018 13:10:11 +0000 Message-Id: <20180901131012.22138-1-e@80x24.org> List-Id: This will make upcoming "auto-fiber" changes easier-to-review since Process.wait and Queue/SizedQueue are expected to become scheduling points. --- process.c | 92 ++++++++++++++++++++++++++------------------------- thread.c | 29 ++++++++-------- thread_sync.c | 72 ++++++++++++++++++++-------------------- vm_core.h | 5 +++ 4 files changed, 103 insertions(+), 95 deletions(-) diff --git a/process.c b/process.c index 94060de148..70934e0841 100644 --- a/process.c +++ b/process.c @@ -916,8 +916,7 @@ do_waitpid(rb_pid_t pid, int *st, int flags) } struct waitpid_state { - struct list_node wnode; - rb_execution_context_t *ec; + struct rb_sched_waiter sw; rb_nativethread_cond_t *cond; rb_pid_t ret; rb_pid_t pid; @@ -936,13 +935,15 @@ void rb_sigwait_fd_put(const rb_thread_t *, int fd); void rb_thread_sleep_interruptible(void); static int -waitpid_signal(struct waitpid_state *w) +waitpid_signal(struct rb_sched_waiter *sw) { - if (w->ec) { /* rb_waitpid */ - rb_threadptr_interrupt(rb_ec_thread_ptr(w->ec)); + if (sw->ec) { /* rb_waitpid */ + rb_threadptr_interrupt(rb_ec_thread_ptr(sw->ec)); return TRUE; } else { /* ruby_waitpid_locked */ + struct waitpid_state *w = container_of(sw, struct waitpid_state, sw); + if (w->cond) { rb_native_cond_signal(w->cond); return TRUE; @@ -959,13 +960,13 @@ waitpid_signal(struct waitpid_state *w) static void sigwait_fd_migrate_sleeper(rb_vm_t *vm) { - struct waitpid_state *w = 0; + struct rb_sched_waiter *sw = 0; - list_for_each(&vm->waiting_pids, w, wnode) { - if (waitpid_signal(w)) return; + list_for_each(&vm->waiting_pids, sw, wnode) { + if (waitpid_signal(sw)) return; } - list_for_each(&vm->waiting_grps, w, wnode) { - if (waitpid_signal(w)) return; + list_for_each(&vm->waiting_grps, sw, wnode) { + if (waitpid_signal(sw)) return; } } @@ -983,17 +984,18 @@ extern volatile unsigned int ruby_nocldwait; /* signal.c */ static void waitpid_each(struct list_head *head) { - struct waitpid_state *w = 0, *next; + struct rb_sched_waiter *sw = 0, *next; - list_for_each_safe(head, w, next, wnode) { + list_for_each_safe(head, sw, next, wnode) { + struct waitpid_state *w = container_of(sw, struct waitpid_state, sw); rb_pid_t ret = do_waitpid(w->pid, &w->status, w->options | WNOHANG); if (!ret) continue; if (ret == -1) w->errnum = errno; w->ret = ret; - list_del_init(&w->wnode); - waitpid_signal(w); + list_del_init(&sw->wnode); + waitpid_signal(sw); } } #else @@ -1037,6 +1039,20 @@ sigwait_sleep_time(void) return 0; } +static struct list_head * +waitpid_sleep_prepare(struct waitpid_state *w, rb_vm_t *vm) +{ + if (w->pid > 0 || list_empty(&vm->waiting_pids)) + w->ret = do_waitpid(w->pid, &w->status, w->options | WNOHANG); + + if (w->ret || (w->options & WNOHANG)) { + if (w->ret == -1) w->errnum = errno; + return 0; /* no need to sleep */ + } + + return w->pid > 0 ? &vm->waiting_pids : &vm->waiting_grps; +} + /* * must be called with vm->waitpid_lock held, this is not interruptible */ @@ -1045,20 +1061,17 @@ ruby_waitpid_locked(rb_vm_t *vm, rb_pid_t pid, int *status, int options, rb_nativethread_cond_t *cond) { struct waitpid_state w; + struct list_head *head; assert(!ruby_thread_has_gvl_p() && "must not have GVL"); waitpid_state_init(&w, pid, options); - if (w.pid > 0 || list_empty(&vm->waiting_pids)) - w.ret = do_waitpid(w.pid, &w.status, w.options | WNOHANG); - if (w.ret) { - if (w.ret == -1) w.errnum = errno; - } - else { + head = waitpid_sleep_prepare(&w, vm); + if (head) { int sigwait_fd = -1; - w.ec = 0; - list_add(w.pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w.wnode); + w.sw.ec = 0; + list_add(head, &w.sw.wnode); do { if (sigwait_fd < 0) sigwait_fd = rb_sigwait_fd_get(0); @@ -1074,7 +1087,7 @@ ruby_waitpid_locked(rb_vm_t *vm, rb_pid_t pid, int *status, int options, rb_native_cond_wait(w.cond, &vm->waitpid_lock); } } while (!w.ret); - list_del(&w.wnode); + list_del(&w.sw.wnode); /* we're done, maybe other waitpid callers are not: */ if (sigwait_fd >= 0) { @@ -1107,10 +1120,10 @@ waitpid_cleanup(VALUE x) struct waitpid_state *w = (struct waitpid_state *)x; if (w->ret == 0) { - rb_vm_t *vm = rb_ec_vm_ptr(w->ec); + rb_vm_t *vm = rb_ec_vm_ptr(w->sw.ec); rb_native_mutex_lock(&vm->waitpid_lock); - list_del(&w->wnode); + list_del(&w->sw.wnode); rb_native_mutex_unlock(&vm->waitpid_lock); } @@ -1120,8 +1133,8 @@ waitpid_cleanup(VALUE x) static void waitpid_wait(struct waitpid_state *w) { - rb_vm_t *vm = rb_ec_vm_ptr(w->ec); - int need_sleep = FALSE; + rb_vm_t *vm = rb_ec_vm_ptr(w->sw.ec); + struct list_head *head; /* * Lock here to prevent do_waitpid from stealing work from the @@ -1129,27 +1142,15 @@ waitpid_wait(struct waitpid_state *w) * outside of GVL */ rb_native_mutex_lock(&vm->waitpid_lock); - - if (w->pid > 0 || list_empty(&vm->waiting_pids)) - w->ret = do_waitpid(w->pid, &w->status, w->options | WNOHANG); - if (w->ret) { - if (w->ret == -1) w->errnum = errno; - } - else if (w->options & WNOHANG) { - } - else { - need_sleep = TRUE; - } - - if (need_sleep) { + head = waitpid_sleep_prepare(w, vm); + if (head) { w->cond = 0; /* order matters, favor specified PIDs rather than -1 or 0 */ - list_add(w->pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w->wnode); + list_add(head, &w->sw.wnode); } - rb_native_mutex_unlock(&vm->waitpid_lock); - if (need_sleep) { + if (head) { rb_ensure(waitpid_sleep, (VALUE)w, waitpid_cleanup, (VALUE)w); } } @@ -1174,7 +1175,8 @@ waitpid_no_SIGCHLD(struct waitpid_state *w) do { rb_thread_call_without_gvl(waitpid_blocking_no_SIGCHLD, w, RUBY_UBF_PROCESS, 0); - } while (w->ret < 0 && errno == EINTR && (RUBY_VM_CHECK_INTS(w->ec),1)); + } while (w->ret < 0 && errno == EINTR && + (RUBY_VM_CHECK_INTS(w->sw.ec),1)); } if (w->ret == -1) w->errnum = errno; @@ -1186,7 +1188,7 @@ rb_waitpid(rb_pid_t pid, int *st, int flags) struct waitpid_state w; waitpid_state_init(&w, pid, flags); - w.ec = GET_EC(); + w.sw.ec = GET_EC(); if (WAITPID_USE_SIGCHLD) { waitpid_wait(&w); diff --git a/thread.c b/thread.c index e40b120f72..30c37835ca 100644 --- a/thread.c +++ b/thread.c @@ -116,8 +116,7 @@ void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */ static volatile int system_working = 1; struct waiting_fd { - struct list_node wfd_node; /* <=> vm.waiting_fds */ - rb_thread_t *th; + struct rb_sched_waiter sw; /* .wnode <=> vm->waiting_fds */ int fd; }; @@ -1511,21 +1510,23 @@ VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) { volatile VALUE val = Qundef; /* shouldn't be used */ - rb_execution_context_t * volatile ec = GET_EC(); + rb_execution_context_t * volatile ec; volatile int saved_errno = 0; enum ruby_tag_type state; struct waiting_fd wfd; wfd.fd = fd; - wfd.th = rb_ec_thread_ptr(ec); - list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd.wfd_node); + wfd.sw.ec = ec = GET_EC(); + list_add(&rb_ec_vm_ptr(wfd.sw.ec)->waiting_fds, &wfd.sw.wnode); EC_PUSH_TAG(ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { - BLOCKING_REGION(wfd.th, { + rb_thread_t *th = rb_ec_thread_ptr(wfd.sw.ec); + + BLOCKING_REGION(th, { val = func(data1); saved_errno = errno; - }, ubf_select, wfd.th, FALSE); + }, ubf_select, th, FALSE); } EC_POP_TAG(); @@ -1533,7 +1534,7 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) * must be deleted before jump * this will delete either from waiting_fds or on-stack LIST_HEAD(busy) */ - list_del(&wfd.wfd_node); + list_del(&wfd.sw.wnode); if (state) { EC_JUMP_TAG(ec, state); @@ -2279,15 +2280,17 @@ int rb_notify_fd_close(int fd, struct list_head *busy) { rb_vm_t *vm = GET_THREAD()->vm; - struct waiting_fd *wfd = 0, *next; + struct rb_sched_waiter *sw = 0, *next; + + list_for_each_safe(&vm->waiting_fds, sw, next, wnode) { + struct waiting_fd *wfd = container_of(sw, struct waiting_fd, sw); - list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) { if (wfd->fd == fd) { - rb_thread_t *th = wfd->th; + rb_thread_t *th = rb_ec_thread_ptr(sw->ec); VALUE err; - list_del(&wfd->wfd_node); - list_add(busy, &wfd->wfd_node); + list_del(&sw->wnode); + list_add(busy, &sw->wnode); err = th->vm->special_exceptions[ruby_error_stream_closed]; rb_threadptr_pending_interrupt_enque(th, err); diff --git a/thread_sync.c b/thread_sync.c index 5244d9f3c9..01ad8e096b 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -4,26 +4,22 @@ static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; -/* sync_waiter is always on-stack */ -struct sync_waiter { - rb_thread_t *th; - struct list_node node; -}; - #define MUTEX_ALLOW_TRAP FL_USER1 static void sync_wakeup(struct list_head *head, long max) { - struct sync_waiter *cur = 0, *next; + struct rb_sched_waiter *cur = 0, *next; - list_for_each_safe(head, cur, next, node) { - list_del_init(&cur->node); - if (cur->th->status != THREAD_KILLED) { - rb_threadptr_interrupt(cur->th); - cur->th->status = THREAD_RUNNABLE; - if (--max == 0) return; - } + list_for_each_safe(head, cur, next, wnode) { + rb_thread_t *th = rb_ec_thread_ptr(cur->ec); + + list_del_init(&cur->wnode); + if (th->status != THREAD_KILLED) { + rb_threadptr_interrupt(th); + th->status = THREAD_RUNNABLE; + if (--max == 0) return; + } } } @@ -83,10 +79,10 @@ static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th); static size_t rb_mutex_num_waiting(rb_mutex_t *mutex) { - struct sync_waiter *w = 0; + struct rb_sched_waiter *w = 0; size_t n = 0; - list_for_each(&mutex->waitq, w, node) { + list_for_each(&mutex->waitq, w, wnode) { n++; } @@ -235,13 +231,13 @@ do_mutex_lock(VALUE self, int interruptible_p) } if (rb_mutex_trylock(self) == Qfalse) { - struct sync_waiter w; + struct rb_sched_waiter w; if (mutex->th == th) { rb_raise(rb_eThreadError, "deadlock; recursive locking"); } - w.th = th; + w.ec = th->ec; while (mutex->th != th) { enum rb_thread_status prev_status = th->status; @@ -262,9 +258,9 @@ do_mutex_lock(VALUE self, int interruptible_p) patrol_thread = th; } - list_add_tail(&mutex->waitq, &w.node); + list_add_tail(&mutex->waitq, &w.wnode); native_sleep(th, timeout); /* release GVL */ - list_del(&w.node); + list_del(&w.wnode); if (!mutex->th) { mutex->th = th; @@ -345,16 +341,18 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th) err = "Attempt to unlock a mutex which is locked by another thread"; } else { - struct sync_waiter *cur = 0, *next; + struct rb_sched_waiter *cur = 0, *next; rb_mutex_t **th_mutex = &th->keeping_mutexes; mutex->th = 0; - list_for_each_safe(&mutex->waitq, cur, next, node) { - list_del_init(&cur->node); - switch (cur->th->status) { + list_for_each_safe(&mutex->waitq, cur, next, wnode) { + rb_thread_t *th = rb_ec_thread_ptr(cur->ec); + + list_del_init(&cur->wnode); + switch (th->status) { case THREAD_RUNNABLE: /* from someone else calling Thread#run */ case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */ - rb_threadptr_interrupt(cur->th); + rb_threadptr_interrupt(th); goto found; case THREAD_STOPPED: /* probably impossible */ rb_bug("unexpected THREAD_STOPPED"); @@ -868,7 +866,7 @@ queue_sleep(VALUE arg) } struct queue_waiter { - struct sync_waiter w; + struct rb_sched_waiter w; union { struct rb_queue *q; struct rb_szqueue *sq; @@ -880,7 +878,7 @@ queue_sleep_done(VALUE p) { struct queue_waiter *qw = (struct queue_waiter *)p; - list_del(&qw->w.node); + list_del(&qw->w.wnode); qw->as.q->num_waiting--; return Qfalse; @@ -891,7 +889,7 @@ szqueue_sleep_done(VALUE p) { struct queue_waiter *qw = (struct queue_waiter *)p; - list_del(&qw->w.node); + list_del(&qw->w.wnode); qw->as.sq->num_waiting_push--; return Qfalse; @@ -915,9 +913,9 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block) assert(RARRAY_LEN(q->que) == 0); assert(queue_closed_p(self) == 0); - qw.w.th = GET_THREAD(); + qw.w.ec = GET_EC(); qw.as.q = q; - list_add_tail(&qw.as.q->waitq, &qw.w.node); + list_add_tail(&qw.as.q->waitq, &qw.w.wnode); qw.as.q->num_waiting++; rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw); @@ -1155,9 +1153,9 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) struct queue_waiter qw; struct list_head *pushq = szqueue_pushq(sq); - qw.w.th = GET_THREAD(); + qw.w.ec = GET_EC(); qw.as.sq = sq; - list_add_tail(pushq, &qw.w.node); + list_add_tail(pushq, &qw.w.wnode); sq->num_waiting_push++; rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw); @@ -1373,9 +1371,9 @@ do_sleep(VALUE args) } static VALUE -delete_from_waitq(struct sync_waiter *w) +delete_from_waitq(struct rb_sched_waiter *w) { - list_del(&w->node); + list_del(&w->wnode); return Qnil; } @@ -1395,12 +1393,12 @@ rb_condvar_wait(int argc, VALUE *argv, VALUE self) { struct rb_condvar *cv = condvar_ptr(self); struct sleep_call args; - struct sync_waiter w; + struct rb_sched_waiter w; rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout); - w.th = GET_THREAD(); - list_add_tail(&cv->waitq, &w.node); + w.ec = GET_EC(); + list_add_tail(&cv->waitq, &w.wnode); rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w); return self; diff --git a/vm_core.h b/vm_core.h index bb6409719b..59108dafb8 100644 --- a/vm_core.h +++ b/vm_core.h @@ -854,6 +854,11 @@ typedef struct rb_execution_context_struct { void ec_set_vm_stack(rb_execution_context_t *ec, VALUE *stack, size_t size); +struct rb_sched_waiter { + struct list_node wnode; + rb_execution_context_t *ec; +}; + typedef struct rb_thread_struct { struct list_node vmlt_node; VALUE self; -- EW