diff --git a/Makefile.am b/Makefile.am index 581c21826a..89ba8177f9 100644 --- a/Makefile.am +++ b/Makefile.am @@ -191,6 +191,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-thread.c \ test/test-threadpool-cancel.c \ test/test-threadpool.c \ + test/test-threadpool-fork.c \ test/test-timer-again.c \ test/test-timer-from-check.c \ test/test-timer.c \ diff --git a/src/unix/threadpool.c b/src/unix/threadpool.c index 7923250a09..dc31678f69 100644 --- a/src/unix/threadpool.c +++ b/src/unix/threadpool.c @@ -34,12 +34,17 @@ static QUEUE exit_message; static QUEUE wq; static volatile int initialized; +static uv_barrier_t fork_barrier; +static uv_mutex_t fork_lock; +static int do_fork_guard; + +static void fork_guard(void); +static void init_fork_protection(int); static void uv__cancelled(struct uv__work* w) { abort(); } - /* To avoid deadlock with uv_cancel() it's crucial that the worker * never holds the global mutex and the loop-local mutex at the same time. */ @@ -50,6 +55,10 @@ static void worker(void* arg) { (void) arg; for (;;) { + if (do_fork_guard) { + fork_guard(); + } + uv_mutex_lock(&mutex); while (QUEUE_EMPTY(&wq)) @@ -82,6 +91,84 @@ static void worker(void* arg) { } } +static void fork_guard(void) { + // give all the worker threads time to get to the same place + uv_barrier_wait(&fork_barrier); + + // wait for the fork to occur + uv_mutex_lock(&fork_lock); + // since we were just waiting for the fork now release the resource and go + uv_mutex_unlock(&fork_lock); +} + +static void seed_work_cb(uv_work_t *data) { +} + +static void after_work_cb(uv_work_t *data, int status) { + free(data); +} + +static void prepare_fork(void) { + assert(1 == initialized); + assert(0 == do_fork_guard); + + // tell the threads to start getting ready to fork + do_fork_guard = 1; + + if (QUEUE_EMPTY(&wq)) { + // seed nthreads jobs to spin up threads + int i = 0; + for(; i < nthreads; ++i) { + uv_work_t *req = malloc(sizeof(uv_work_t)); + uv_queue_work(uv_default_loop(), req, &seed_work_cb, &after_work_cb); + } + } + // lock now so when the barrier hits we know the threads will halt + uv_mutex_lock(&fork_lock); + // wait for the threads to get here + uv_barrier_wait(&fork_barrier); +} + +static void parent_process(void) { + assert(1 == initialized); + assert(1 == do_fork_guard); + + // the parent threads can go and do what they want to do + do_fork_guard = 0; + // let them move on with life + uv_mutex_unlock(&fork_lock); +} + +static void child_process(void) { + assert(1 == initialized); + assert(1 == do_fork_guard); + + // set the guard to 0 so the newly spun up threads don't wait in the barrier + // and deadlock + do_fork_guard = 0; + uv_mutex_unlock(&fork_lock); + + // our state will say we're initialized, so let's make it a reality for the + // child + int i; + for (i = 0; i < nthreads; i++) { + if (uv_thread_create(threads + i, worker, NULL)) { + abort(); + } + } +} + +void init_fork_protection(int num_threads) { + do_fork_guard = 0; + // init the fork to num_threads, plus the main thread + if (uv_barrier_init(&fork_barrier, num_threads + 1)) + abort(); + if (uv_mutex_init(&fork_lock)) + abort(); + // register the fork functions + if (pthread_atfork(&prepare_fork, &parent_process, &child_process)) + abort(); +} static void post(QUEUE* q) { uv_mutex_lock(&mutex); @@ -121,6 +208,8 @@ static void init_once(void) { QUEUE_INIT(&wq); + init_fork_protection(nthreads); + for (i = 0; i < nthreads; i++) if (uv_thread_create(threads + i, worker, NULL)) abort(); diff --git a/test/test-list.h b/test/test-list.h index a6e692f631..60c0a1f78d 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -215,6 +215,7 @@ TEST_DECLARE (threadpool_cancel_getaddrinfo) TEST_DECLARE (threadpool_cancel_work) TEST_DECLARE (threadpool_cancel_fs) TEST_DECLARE (threadpool_cancel_single) +TEST_DECLARE (threadpool_queue_work_after_fork) TEST_DECLARE (thread_local_storage) TEST_DECLARE (thread_mutex) TEST_DECLARE (thread_rwlock) diff --git a/test/test-threadpool-fork.c b/test/test-threadpool-fork.c new file mode 100644 index 0000000000..77e70e9304 --- /dev/null +++ b/test/test-threadpool-fork.c @@ -0,0 +1,69 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +static int work_cb_count; +static int after_work_cb_count; +static uv_work_t work_req; +static char data; + + +static void work_cb(uv_work_t* req) { + ASSERT(req == &work_req); + ASSERT(req->data == &data); + work_cb_count++; +} + + +static void after_work_cb(uv_work_t* req, int status) { + ASSERT(status == 0); + ASSERT(req == &work_req); + ASSERT(req->data == &data); + after_work_cb_count++; +} + + +TEST_IMPL(threadpool_queue_work_after_fork) { + int r; + + work_req.data = &data; + r = uv_queue_work(uv_default_loop(), &work_req, work_cb, after_work_cb); + ASSERT(r == 0); + uv_run(uv_default_loop(), UV_RUN_DEFAULT); + + ASSERT(work_cb_count == 1); + ASSERT(after_work_cb_count == 1); + + pid_t p = fork(); + + // both sides should be able to run jobs + r = uv_queue_work(uv_default_loop(), &work_req, work_cb, after_work_cb); + ASSERT(r == 0); + uv_run(uv_default_loop(), UV_RUN_DEFAULT); + + ASSERT(work_cb_count == 2); + ASSERT(after_work_cb_count == 2); + + MAKE_VALGRIND_HAPPY(); + return 0; +}