所以我有一个共享的并发队列。它似乎运行得很好,除了破坏。
The way the queue is implemented is that it contains a condition variable and mutex pair. Several worker threads are started up that wait on this condition variable. When new objects are available to be worked on they are pushed into the queue and the condition variable signaled.
问题是,当主线程退出,破坏队列时,条件变量会被破坏,但由于条件变量正在使用,这会失败。这引发了一个例外,一切都糟糕透顶。
我想给工人们发信号,叫醒他们,让他们离开,等待他们完成,然后继续主线程。我的问题是,当这些线程完成时,我会需要一个额外的同步原语吗?
无论如何,以下是队列的代码:
// Based on code from http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
// Original version by Anthony Williams
// Modifications by Michael Anderson
#include "boost/thread.hpp"
#include <deque>
template<typename Data>
class concurrent_queue
{
private:
std::deque<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
bool is_canceled;
public:
concurrent_queue() : the_queue(), the_mutex(), the_condition_variable(), is_canceled(false) {}
struct Canceled{};
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
if (is_canceled) throw Canceled();
the_queue.push_back(data);
lock.unlock();
the_condition_variable.notify_one();
}
bool empty() const
{
boost::mutex::scoped_lock lock(the_mutex);
if (is_canceled) throw Canceled();
return the_queue.empty();
}
bool try_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
if (is_canceled) throw Canceled();
if(the_queue.empty())
{
return false;
}
popped_value=the_queue.front();
the_queue.pop_front();
return true;
}
void wait_and_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty() && !is_canceled)
{
the_condition_variable.wait(lock);
}
if (is_canceled) throw Canceled();
popped_value=the_queue.front();
the_queue.pop_front();
}
std::deque<Data> wait_and_take_all()
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty() && !is_canceled)
{
the_condition_variable.wait(lock);
}
if (is_canceled) throw Canceled();
std::deque<Data> retval;
std::swap(retval, the_queue);
return retval;
}
void cancel()
{
boost::mutex::scoped_lock lock(the_mutex);
if (is_canceled) throw Canceled();
is_canceled = true;
lock.unlock();
the_condition_variable.notify_all();
}
};