Skip to content

Commit

Permalink
fixed a bug - crash
Browse files Browse the repository at this point in the history
  • Loading branch information
vit-vit committed Jun 12, 2015
1 parent efec987 commit 437e135
Showing 1 changed file with 73 additions and 66 deletions.
139 changes: 73 additions & 66 deletions ctpl_stl.h
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@

/*********************************************************
*
* Copyright (C) 2014 by Vitaliy Vitsentiy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*********************************************************/
*
* Copyright (C) 2014 by Vitaliy Vitsentiy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*********************************************************/


#ifndef __ctpl_stl_thread_pool_H__
Expand All @@ -41,6 +40,34 @@

namespace ctpl {

namespace detail {
template <typename T>
class Queue {
public:
bool push(T const & value) {
std::unique_lock<std::mutex> lock(this->mutex);
this->q.push(value);
return true;
}
// deletes the retrieved element, do not use for non integral types
bool pop(T & v) {
std::unique_lock<std::mutex> lock(this->mutex);
if (this->q.empty())
return false;
v = this->q.front();
this->q.pop();
return true;
}
bool empty() {
std::unique_lock<std::mutex> lock(this->mutex);
return this->q.empty();
}
private:
std::queue<T> q;
std::mutex mutex;
};
}

class thread_pool {

public:
Expand Down Expand Up @@ -93,24 +120,24 @@ namespace ctpl {

// empty the queue
void clear_queue() {
std::unique_lock<std::mutex> lock(this->mutex);
while (!this->q.empty())
this->q.pop(); // empty the queue
std::function<void(int id)> * _f;
while (this->q.pop(_f))
delete _f; // empty the queue
}

// pops a functional wraper to the original function
// pops a functional wrapper to the original function
std::function<void(int)> pop() {
std::function<void(int id)> * _f = nullptr;
this->q.pop(_f);
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
std::function<void(int)> f;
std::unique_lock<std::mutex> lock(this->mutex);
if (!this->q.empty()) {
f = this->q.front();
this->q.pop();
}
if (_f)
f = *_f;
return f;
}

// wait for all computing threads to finish and stop all threads
// may be called asyncronously to not pause the calling thread while waiting
// may be called asynchronously to not pause the calling thread while waiting
// if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
void stop(bool isWait = false) {
if (!isWait) {
Expand All @@ -132,8 +159,8 @@ namespace ctpl {
this->cv.notify_all(); // stop all waiting threads
}
for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
if (this->threads[i]->joinable())
this->threads[i]->join();
if (this->threads[i]->joinable())
this->threads[i]->join();
}
// if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
// therefore delete them here
Expand All @@ -146,15 +173,13 @@ namespace ctpl {
auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
);

this->q.push([pck](int id) {
);
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});

this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();

return pck->get_future();
}

Expand All @@ -163,14 +188,12 @@ namespace ctpl {
template<typename F>
auto push(F && f) ->std::future<decltype(f(0))> {
auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));

this->q.push([pck](int id) {
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});

this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();

return pck->get_future();
}

Expand All @@ -184,52 +207,37 @@ namespace ctpl {
thread_pool & operator=(thread_pool &&);// = delete;

void set_thread(int i) {
std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
std::atomic<bool> & _flag = *flag;
std::function<void(int id)> _f;
bool isPopped = false;
std::function<void(int id)> * _f;
bool isPop = this->q.pop(_f);
while (true) {
while (true) { // if there is anything in the queue
if (!isPopped) {
std::unique_lock<std::mutex> lock(this->mutex);
if (this->q.empty())
break;
_f = this->q.front();
this->q.pop();
}
_f(i);
isPopped = false;

while (isPop) { // if there is anything in the queue
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
(*_f)(i);
if (_flag)
return; // the thread is wanted to stop, return even if the queue is not empty yet
else
isPop = this->q.pop(_f);
}

// the queue is empty here, wait for the next command
std::unique_lock<std::mutex> lock(this->mutex);
++this->nWaiting;
this->cv.wait(lock, [this, &_f, &isPopped, &_flag](){
isPopped = !this->q.empty();
if (isPopped) {
_f = this->q.front();
this->q.pop();
}
return isPopped || this->isDone || _flag;
});
this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
--this->nWaiting;

if (!isPopped)
if (!isPop)
return; // if the queue is empty and this->isDone == true or *flag then return
}
};
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
}

void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }

std::vector<std::unique_ptr<std::thread>> threads;
std::vector<std::shared_ptr<std::atomic<bool>>> flags;
mutable std::queue<std::function<void(int id)>> q;
detail::Queue<std::function<void(int id)> *> q;
std::atomic<bool> isDone;
std::atomic<bool> isStop;
std::atomic<int> nWaiting; // how many threads are waiting
Expand All @@ -241,4 +249,3 @@ namespace ctpl {
}

#endif // __ctpl_stl_thread_pool_H__

0 comments on commit 437e135

Please sign in to comment.