Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion ctpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ namespace ctpl {

// number of idle threads
int n_idle() { return this->nWaiting; }
int n_pending() { return this->nPending; }
std::thread & get_thread(int i) { return *this->threads[i]; }

// change the number of threads in the pool
Expand Down Expand Up @@ -157,6 +158,8 @@ namespace ctpl {
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});

++this->nPending;
this->q.push(_f);

std::unique_lock<std::mutex> lock(this->mutex);
Expand All @@ -174,6 +177,8 @@ namespace ctpl {
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});

++this->nPending;
this->q.push(_f);

std::unique_lock<std::mutex> lock(this->mutex);
Expand All @@ -199,6 +204,7 @@ namespace ctpl {
bool isPop = this->q.pop(_f);
while (true) {
while (isPop) { // if there is anything in the queue
--this->nPending;
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
(*_f)(i);

Expand All @@ -221,14 +227,15 @@ namespace ctpl {
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
}

void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
void init() { this->nWaiting = 0; this->nPending = 0; this->isStop = false; this->isDone = false; }

std::vector<std::unique_ptr<std::thread>> threads;
std::vector<std::shared_ptr<std::atomic<bool>>> flags;
mutable boost::lockfree::queue<std::function<void(int id)> *> q;
std::atomic<bool> isDone;
std::atomic<bool> isStop;
std::atomic<int> nWaiting; // how many threads are waiting
std::atomic<int> nPending; // how many tasks are waiting

std::mutex mutex;
std::condition_variable cv;
Expand Down
7 changes: 6 additions & 1 deletion ctpl_stl.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ namespace ctpl {

// number of idle threads
int n_idle() { return this->nWaiting; }
int n_pending() { return this->nPending; }
std::thread & get_thread(int i) { return *this->threads[i]; }

// change the number of threads in the pool
Expand Down Expand Up @@ -177,6 +178,7 @@ namespace ctpl {
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});
++this->nPending;
this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();
Expand All @@ -191,6 +193,7 @@ namespace ctpl {
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});
++this->nPending;
this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();
Expand All @@ -214,6 +217,7 @@ namespace ctpl {
bool isPop = this->q.pop(_f);
while (true) {
while (isPop) { // if there is anything in the queue
--this->nPending;
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
(*_f)(i);
if (_flag)
Expand All @@ -233,14 +237,15 @@ namespace ctpl {
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
}

void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
void init() { this->nWaiting = 0; this->nPending = 0; this->isStop = false; this->isDone = false; }

std::vector<std::unique_ptr<std::thread>> threads;
std::vector<std::shared_ptr<std::atomic<bool>>> flags;
detail::Queue<std::function<void(int id)> *> q;
std::atomic<bool> isDone;
std::atomic<bool> isStop;
std::atomic<int> nWaiting; // how many threads are waiting
std::atomic<int> nPending; // how many tasks are waiting

std::mutex mutex;
std::condition_variable cv;
Expand Down