From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 3812F1F609 for ; Wed, 28 Nov 2018 10:52:25 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH r66072 1/2] unify sync_waiter, waitpid_state, waiting_fd w/ rb_sched_waiter Date: Wed, 28 Nov 2018 10:52:24 +0000 Message-Id: <20181128105225.1211-1-e@80x24.org> List-Id: This will make upcoming "auto-fiber" changes easier-to-review since Process.wait and Queue/SizedQueue are expected to become scheduling points. --- process.c | 92 ++++++++++++++++++++++++++++++----------------------------- thread.c | 29 ++++++++++--------- thread_sync.c | 72 +++++++++++++++++++++++----------------------- vm_core.h | 5 ++++ 4 files changed, 103 insertions(+), 95 deletions(-) diff --git a/process.c b/process.c index 56a90e770b..ff46289178 100644 --- a/process.c +++ b/process.c @@ -917,8 +917,7 @@ do_waitpid(rb_pid_t pid, int *st, int flags) } struct waitpid_state { - struct list_node wnode; - rb_execution_context_t *ec; + struct rb_sched_waiter sw; rb_nativethread_cond_t *cond; rb_pid_t ret; rb_pid_t pid; @@ -937,13 +936,15 @@ void rb_sigwait_fd_put(const rb_thread_t *, int fd); void rb_thread_sleep_interruptible(void); static int -waitpid_signal(struct waitpid_state *w) +waitpid_signal(struct rb_sched_waiter *sw) { - if (w->ec) { /* rb_waitpid */ - rb_threadptr_interrupt(rb_ec_thread_ptr(w->ec)); + if (sw->ec) { /* rb_waitpid */ + rb_threadptr_interrupt(rb_ec_thread_ptr(sw->ec)); return TRUE; } else { /* ruby_waitpid_locked */ + struct waitpid_state *w = container_of(sw, struct waitpid_state, sw); + if (w->cond) { rb_native_cond_signal(w->cond); return TRUE; @@ -960,13 +961,13 @@ waitpid_signal(struct waitpid_state *w) static void sigwait_fd_migrate_sleeper(rb_vm_t *vm) { - struct waitpid_state *w = 0; + struct rb_sched_waiter *sw = 0; - list_for_each(&vm->waiting_pids, w, wnode) { - if (waitpid_signal(w)) return; + list_for_each(&vm->waiting_pids, sw, wnode) { + if (waitpid_signal(sw)) return; } - list_for_each(&vm->waiting_grps, w, wnode) { - if (waitpid_signal(w)) return; + list_for_each(&vm->waiting_grps, sw, wnode) { + if (waitpid_signal(sw)) return; } } @@ -984,17 +985,18 @@ extern volatile unsigned int ruby_nocldwait; /* signal.c */ static void waitpid_each(struct list_head *head) { - struct waitpid_state *w = 0, *next; + struct rb_sched_waiter *sw = 0, *next; - list_for_each_safe(head, w, next, wnode) { + list_for_each_safe(head, sw, next, wnode) { + struct waitpid_state *w = container_of(sw, struct waitpid_state, sw); rb_pid_t ret = do_waitpid(w->pid, &w->status, w->options | WNOHANG); if (!ret) continue; if (ret == -1) w->errnum = errno; w->ret = ret; - list_del_init(&w->wnode); - waitpid_signal(w); + list_del_init(&sw->wnode); + waitpid_signal(sw); } } #else @@ -1038,6 +1040,20 @@ sigwait_sleep_time(void) return 0; } +static struct list_head * +waitpid_sleep_prepare(struct waitpid_state *w, rb_vm_t *vm) +{ + if (w->pid > 0 || list_empty(&vm->waiting_pids)) + w->ret = do_waitpid(w->pid, &w->status, w->options | WNOHANG); + + if (w->ret || (w->options & WNOHANG)) { + if (w->ret == -1) w->errnum = errno; + return 0; /* no need to sleep */ + } + + return w->pid > 0 ? &vm->waiting_pids : &vm->waiting_grps; +} + /* * must be called with vm->waitpid_lock held, this is not interruptible */ @@ -1046,20 +1062,17 @@ ruby_waitpid_locked(rb_vm_t *vm, rb_pid_t pid, int *status, int options, rb_nativethread_cond_t *cond) { struct waitpid_state w; + struct list_head *head; assert(!ruby_thread_has_gvl_p() && "must not have GVL"); waitpid_state_init(&w, pid, options); - if (w.pid > 0 || list_empty(&vm->waiting_pids)) - w.ret = do_waitpid(w.pid, &w.status, w.options | WNOHANG); - if (w.ret) { - if (w.ret == -1) w.errnum = errno; - } - else { + head = waitpid_sleep_prepare(&w, vm); + if (head) { int sigwait_fd = -1; - w.ec = 0; - list_add(w.pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w.wnode); + w.sw.ec = 0; + list_add(head, &w.sw.wnode); do { if (sigwait_fd < 0) sigwait_fd = rb_sigwait_fd_get(0); @@ -1075,7 +1088,7 @@ ruby_waitpid_locked(rb_vm_t *vm, rb_pid_t pid, int *status, int options, rb_native_cond_wait(w.cond, &vm->waitpid_lock); } } while (!w.ret); - list_del(&w.wnode); + list_del(&w.sw.wnode); /* we're done, maybe other waitpid callers are not: */ if (sigwait_fd >= 0) { @@ -1112,10 +1125,10 @@ waitpid_cleanup(VALUE x) * Not sure why, so we unconditionally do list_del here: */ if (TRUE || w->ret == 0) { - rb_vm_t *vm = rb_ec_vm_ptr(w->ec); + rb_vm_t *vm = rb_ec_vm_ptr(w->sw.ec); rb_native_mutex_lock(&vm->waitpid_lock); - list_del(&w->wnode); + list_del(&w->sw.wnode); rb_native_mutex_unlock(&vm->waitpid_lock); } @@ -1125,8 +1138,8 @@ waitpid_cleanup(VALUE x) static void waitpid_wait(struct waitpid_state *w) { - rb_vm_t *vm = rb_ec_vm_ptr(w->ec); - int need_sleep = FALSE; + rb_vm_t *vm = rb_ec_vm_ptr(w->sw.ec); + struct list_head *head; /* * Lock here to prevent do_waitpid from stealing work from the @@ -1134,27 +1147,15 @@ waitpid_wait(struct waitpid_state *w) * outside of GVL */ rb_native_mutex_lock(&vm->waitpid_lock); - - if (w->pid > 0 || list_empty(&vm->waiting_pids)) - w->ret = do_waitpid(w->pid, &w->status, w->options | WNOHANG); - if (w->ret) { - if (w->ret == -1) w->errnum = errno; - } - else if (w->options & WNOHANG) { - } - else { - need_sleep = TRUE; - } - - if (need_sleep) { + head = waitpid_sleep_prepare(w, vm); + if (head) { w->cond = 0; /* order matters, favor specified PIDs rather than -1 or 0 */ - list_add(w->pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w->wnode); + list_add(head, &w->sw.wnode); } - rb_native_mutex_unlock(&vm->waitpid_lock); - if (need_sleep) { + if (head) { rb_ensure(waitpid_sleep, (VALUE)w, waitpid_cleanup, (VALUE)w); } } @@ -1179,7 +1180,8 @@ waitpid_no_SIGCHLD(struct waitpid_state *w) do { rb_thread_call_without_gvl(waitpid_blocking_no_SIGCHLD, w, RUBY_UBF_PROCESS, 0); - } while (w->ret < 0 && errno == EINTR && (RUBY_VM_CHECK_INTS(w->ec),1)); + } while (w->ret < 0 && errno == EINTR && + (RUBY_VM_CHECK_INTS(w->sw.ec),1)); } if (w->ret == -1) w->errnum = errno; @@ -1191,7 +1193,7 @@ rb_waitpid(rb_pid_t pid, int *st, int flags) struct waitpid_state w; waitpid_state_init(&w, pid, flags); - w.ec = GET_EC(); + w.sw.ec = GET_EC(); if (WAITPID_USE_SIGCHLD) { waitpid_wait(&w); diff --git a/thread.c b/thread.c index d02b560ac1..f683b5eb7a 100644 --- a/thread.c +++ b/thread.c @@ -117,8 +117,7 @@ void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */ static volatile int system_working = 1; struct waiting_fd { - struct list_node wfd_node; /* <=> vm.waiting_fds */ - rb_thread_t *th; + struct rb_sched_waiter sw; /* .wnode <=> vm->waiting_fds */ int fd; }; @@ -1552,21 +1551,23 @@ VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) { volatile VALUE val = Qundef; /* shouldn't be used */ - rb_execution_context_t * volatile ec = GET_EC(); + rb_execution_context_t * volatile ec; volatile int saved_errno = 0; enum ruby_tag_type state; struct waiting_fd wfd; wfd.fd = fd; - wfd.th = rb_ec_thread_ptr(ec); - list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd.wfd_node); + wfd.sw.ec = ec = GET_EC(); + list_add(&rb_ec_vm_ptr(wfd.sw.ec)->waiting_fds, &wfd.sw.wnode); EC_PUSH_TAG(ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { - BLOCKING_REGION(wfd.th, { + rb_thread_t *th = rb_ec_thread_ptr(wfd.sw.ec); + + BLOCKING_REGION(th, { val = func(data1); saved_errno = errno; - }, ubf_select, wfd.th, FALSE); + }, ubf_select, th, FALSE); } EC_POP_TAG(); @@ -1574,7 +1575,7 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) * must be deleted before jump * this will delete either from waiting_fds or on-stack LIST_HEAD(busy) */ - list_del(&wfd.wfd_node); + list_del(&wfd.sw.wnode); if (state) { EC_JUMP_TAG(ec, state); @@ -2320,15 +2321,17 @@ int rb_notify_fd_close(int fd, struct list_head *busy) { rb_vm_t *vm = GET_THREAD()->vm; - struct waiting_fd *wfd = 0, *next; + struct rb_sched_waiter *sw = 0, *next; + + list_for_each_safe(&vm->waiting_fds, sw, next, wnode) { + struct waiting_fd *wfd = container_of(sw, struct waiting_fd, sw); - list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) { if (wfd->fd == fd) { - rb_thread_t *th = wfd->th; + rb_thread_t *th = rb_ec_thread_ptr(sw->ec); VALUE err; - list_del(&wfd->wfd_node); - list_add(busy, &wfd->wfd_node); + list_del(&sw->wnode); + list_add(busy, &sw->wnode); err = th->vm->special_exceptions[ruby_error_stream_closed]; rb_threadptr_pending_interrupt_enque(th, err); diff --git a/thread_sync.c b/thread_sync.c index b79db1fee3..000812556d 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -4,26 +4,22 @@ static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; -/* sync_waiter is always on-stack */ -struct sync_waiter { - rb_thread_t *th; - struct list_node node; -}; - #define MUTEX_ALLOW_TRAP FL_USER1 static void sync_wakeup(struct list_head *head, long max) { - struct sync_waiter *cur = 0, *next; + struct rb_sched_waiter *cur = 0, *next; - list_for_each_safe(head, cur, next, node) { - list_del_init(&cur->node); - if (cur->th->status != THREAD_KILLED) { - rb_threadptr_interrupt(cur->th); - cur->th->status = THREAD_RUNNABLE; - if (--max == 0) return; - } + list_for_each_safe(head, cur, next, wnode) { + rb_thread_t *th = rb_ec_thread_ptr(cur->ec); + + list_del_init(&cur->wnode); + if (th->status != THREAD_KILLED) { + rb_threadptr_interrupt(th); + th->status = THREAD_RUNNABLE; + if (--max == 0) return; + } } } @@ -83,10 +79,10 @@ static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th); static size_t rb_mutex_num_waiting(rb_mutex_t *mutex) { - struct sync_waiter *w = 0; + struct rb_sched_waiter *w = 0; size_t n = 0; - list_for_each(&mutex->waitq, w, node) { + list_for_each(&mutex->waitq, w, wnode) { n++; } @@ -235,13 +231,13 @@ do_mutex_lock(VALUE self, int interruptible_p) } if (rb_mutex_trylock(self) == Qfalse) { - struct sync_waiter w; + struct rb_sched_waiter w; if (mutex->th == th) { rb_raise(rb_eThreadError, "deadlock; recursive locking"); } - w.th = th; + w.ec = th->ec; while (mutex->th != th) { enum rb_thread_status prev_status = th->status; @@ -262,9 +258,9 @@ do_mutex_lock(VALUE self, int interruptible_p) patrol_thread = th; } - list_add_tail(&mutex->waitq, &w.node); + list_add_tail(&mutex->waitq, &w.wnode); native_sleep(th, timeout); /* release GVL */ - list_del(&w.node); + list_del(&w.wnode); if (!mutex->th) { mutex->th = th; @@ -345,16 +341,18 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th) err = "Attempt to unlock a mutex which is locked by another thread"; } else { - struct sync_waiter *cur = 0, *next; + struct rb_sched_waiter *cur = 0, *next; rb_mutex_t **th_mutex = &th->keeping_mutexes; mutex->th = 0; - list_for_each_safe(&mutex->waitq, cur, next, node) { - list_del_init(&cur->node); - switch (cur->th->status) { + list_for_each_safe(&mutex->waitq, cur, next, wnode) { + rb_thread_t *th = rb_ec_thread_ptr(cur->ec); + + list_del_init(&cur->wnode); + switch (th->status) { case THREAD_RUNNABLE: /* from someone else calling Thread#run */ case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */ - rb_threadptr_interrupt(cur->th); + rb_threadptr_interrupt(th); goto found; case THREAD_STOPPED: /* probably impossible */ rb_bug("unexpected THREAD_STOPPED"); @@ -870,7 +868,7 @@ queue_sleep(VALUE arg) } struct queue_waiter { - struct sync_waiter w; + struct rb_sched_waiter w; union { struct rb_queue *q; struct rb_szqueue *sq; @@ -882,7 +880,7 @@ queue_sleep_done(VALUE p) { struct queue_waiter *qw = (struct queue_waiter *)p; - list_del(&qw->w.node); + list_del(&qw->w.wnode); qw->as.q->num_waiting--; return Qfalse; @@ -893,7 +891,7 @@ szqueue_sleep_done(VALUE p) { struct queue_waiter *qw = (struct queue_waiter *)p; - list_del(&qw->w.node); + list_del(&qw->w.wnode); qw->as.sq->num_waiting_push--; return Qfalse; @@ -917,9 +915,9 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block) assert(RARRAY_LEN(q->que) == 0); assert(queue_closed_p(self) == 0); - qw.w.th = GET_THREAD(); + qw.w.ec = GET_EC(); qw.as.q = q; - list_add_tail(&qw.as.q->waitq, &qw.w.node); + list_add_tail(&qw.as.q->waitq, &qw.w.wnode); qw.as.q->num_waiting++; rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw); @@ -1157,9 +1155,9 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) struct queue_waiter qw; struct list_head *pushq = szqueue_pushq(sq); - qw.w.th = GET_THREAD(); + qw.w.ec = GET_EC(); qw.as.sq = sq; - list_add_tail(pushq, &qw.w.node); + list_add_tail(pushq, &qw.w.wnode); sq->num_waiting_push++; rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw); @@ -1375,9 +1373,9 @@ do_sleep(VALUE args) } static VALUE -delete_from_waitq(struct sync_waiter *w) +delete_from_waitq(struct rb_sched_waiter *w) { - list_del(&w->node); + list_del(&w->wnode); return Qnil; } @@ -1397,12 +1395,12 @@ rb_condvar_wait(int argc, VALUE *argv, VALUE self) { struct rb_condvar *cv = condvar_ptr(self); struct sleep_call args; - struct sync_waiter w; + struct rb_sched_waiter w; rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout); - w.th = GET_THREAD(); - list_add_tail(&cv->waitq, &w.node); + w.ec = GET_EC(); + list_add_tail(&cv->waitq, &w.wnode); rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w); return self; diff --git a/vm_core.h b/vm_core.h index c557562e17..b4894c406a 100644 --- a/vm_core.h +++ b/vm_core.h @@ -881,6 +881,11 @@ typedef struct rb_execution_context_struct { void rb_ec_set_vm_stack(rb_execution_context_t *ec, VALUE *stack, size_t size); +struct rb_sched_waiter { + struct list_node wnode; + rb_execution_context_t *ec; +}; + typedef struct rb_thread_struct { struct list_node vmlt_node; VALUE self; -- EW