From 189905ba89aa24c04f682a8526a9931cfdddc16f Mon Sep 17 00:00:00 2001 From: Jarrett Chisholm Date: Sat, 30 Dec 2017 23:18:08 -0500 Subject: [PATCH] can now query for the number of pending tasks --- ctpl.h | 9 ++++++++- ctpl_stl.h | 7 ++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/ctpl.h b/ctpl.h index 64f650d..5fb84e3 100644 --- a/ctpl.h +++ b/ctpl.h @@ -62,6 +62,7 @@ namespace ctpl { // number of idle threads int n_idle() { return this->nWaiting; } + int n_pending() { return this->nPending; } std::thread & get_thread(int i) { return *this->threads[i]; } // change the number of threads in the pool @@ -157,6 +158,8 @@ namespace ctpl { auto _f = new std::function([pck](int id) { (*pck)(id); }); + + ++this->nPending; this->q.push(_f); std::unique_lock lock(this->mutex); @@ -174,6 +177,8 @@ namespace ctpl { auto _f = new std::function([pck](int id) { (*pck)(id); }); + + ++this->nPending; this->q.push(_f); std::unique_lock lock(this->mutex); @@ -199,6 +204,7 @@ namespace ctpl { bool isPop = this->q.pop(_f); while (true) { while (isPop) { // if there is anything in the queue + --this->nPending; std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred (*_f)(i); @@ -221,7 +227,7 @@ namespace ctpl { this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() } - void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; } + void init() { this->nWaiting = 0; this->nPending = 0; this->isStop = false; this->isDone = false; } std::vector> threads; std::vector>> flags; @@ -229,6 +235,7 @@ namespace ctpl { std::atomic isDone; std::atomic isStop; std::atomic nWaiting; // how many threads are waiting + std::atomic nPending; // how many tasks are waiting std::mutex mutex; std::condition_variable cv; diff --git a/ctpl_stl.h b/ctpl_stl.h index 5956cf0..c4f0453 100644 --- a/ctpl_stl.h +++ b/ctpl_stl.h @@ -85,6 +85,7 @@ namespace ctpl { // number of idle threads int n_idle() { return this->nWaiting; } + int n_pending() { return this->nPending; } std::thread & get_thread(int i) { return *this->threads[i]; } // change the number of threads in the pool @@ -177,6 +178,7 @@ namespace ctpl { auto _f = new std::function([pck](int id) { (*pck)(id); }); + ++this->nPending; this->q.push(_f); std::unique_lock lock(this->mutex); this->cv.notify_one(); @@ -191,6 +193,7 @@ namespace ctpl { auto _f = new std::function([pck](int id) { (*pck)(id); }); + ++this->nPending; this->q.push(_f); std::unique_lock lock(this->mutex); this->cv.notify_one(); @@ -214,6 +217,7 @@ namespace ctpl { bool isPop = this->q.pop(_f); while (true) { while (isPop) { // if there is anything in the queue + --this->nPending; std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred (*_f)(i); if (_flag) @@ -233,7 +237,7 @@ namespace ctpl { this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() } - void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; } + void init() { this->nWaiting = 0; this->nPending = 0; this->isStop = false; this->isDone = false; } std::vector> threads; std::vector>> flags; @@ -241,6 +245,7 @@ namespace ctpl { std::atomic isDone; std::atomic isStop; std::atomic nWaiting; // how many threads are waiting + std::atomic nPending; // how many tasks are waiting std::mutex mutex; std::condition_variable cv;