pre-BLOCKING_QUEUE: Simplified, fixed ThreadQueue, VisualProcessor (use std:deque, cleaner templates, max_size fixes)

This commit is contained in:
vsonnier 2017-02-12 12:53:50 +01:00
parent 8c1d785ff6
commit d3db510643
2 changed files with 70 additions and 69 deletions

View File

@ -7,9 +7,14 @@
#include "ThreadQueue.h" #include "ThreadQueue.h"
#include "IOThread.h" #include "IOThread.h"
#include <algorithm> #include <algorithm>
#include <vector>
template<class InputDataType = ReferenceCounter, class OutputDataType = ReferenceCounter> template<typename InputDataType = ReferenceCounter, typename OutputDataType = ReferenceCounter>
class VisualProcessor { class VisualProcessor {
//
typedef typename ThreadQueue<InputDataType*> VisualInputQueueType;
typedef typename ThreadQueue<OutputDataType*> VisualOutputQueueType;
typedef typename std::vector< VisualOutputQueueType *>::iterator outputs_i;
public: public:
virtual ~VisualProcessor() { virtual ~VisualProcessor() {
@ -24,8 +29,8 @@ public:
bool isOutputEmpty() { bool isOutputEmpty() {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) { for (outputs_i it = outputs.begin(); it != outputs.end(); it++) {
if ((*outputs_i)->full()) { if ((*it)->full()) {
return false; return false;
} }
} }
@ -35,8 +40,8 @@ public:
bool isAnyOutputEmpty() { bool isAnyOutputEmpty() {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) { for (outputs_i it = outputs.begin(); it != outputs.end(); it++) {
if (!(*outputs_i)->full()) { if (!(*it)->full()) {
return true; return true;
} }
} }
@ -44,7 +49,7 @@ public:
} }
//Set a (new) 'input' queue for incoming data. //Set a (new) 'input' queue for incoming data.
void setInput(ThreadQueue<InputDataType *> *vis_in) { void setInput(VisualInputQueueType *vis_in) {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
input = vis_in; input = vis_in;
@ -52,18 +57,18 @@ public:
//Add a vis_out queue where to consumed 'input' data will be //Add a vis_out queue where to consumed 'input' data will be
//dispatched by distribute(). //dispatched by distribute().
void attachOutput(ThreadQueue<OutputDataType *> *vis_out) { void attachOutput(VisualOutputQueueType *vis_out) {
// attach an output queue // attach an output queue
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
outputs.push_back(vis_out); outputs.push_back(vis_out);
} }
//reverse of attachOutput(), removed an existing attached vis_out. //reverse of attachOutput(), removed an existing attached vis_out.
void removeOutput(ThreadQueue<OutputDataType *> *vis_out) { void removeOutput(VisualOutputQueueType *vis_out) {
// remove an output queue // remove an output queue
std::lock_guard < std::recursive_mutex > busy_lock(busy_update); std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
typename std::vector<ThreadQueue<OutputDataType *> *>::iterator i = std::find(outputs.begin(), outputs.end(), vis_out); outputs_i i = std::find(outputs.begin(), outputs.end(), vis_out);
if (i != outputs.end()) { if (i != outputs.end()) {
outputs.erase(i); outputs.erase(i);
} }
@ -96,9 +101,9 @@ protected:
//so 'output' will a-priori be shared among all 'outputs' so set its ref count to this //so 'output' will a-priori be shared among all 'outputs' so set its ref count to this
//amount. //amount.
item->setRefCount((int)outputs.size()); item->setRefCount((int)outputs.size());
for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) { for (outputs_i it = outputs.begin(); it != outputs.end(); it++) {
//if 'output' failed to be given to an outputs_i, dec its ref count accordingly. //if 'output' failed to be given to an outputs_i, dec its ref count accordingly.
if (!(*outputs_i)->push(item)) { if (!(*it)->push(item)) {
item->decRefCount(); item->decRefCount();
} }
} }
@ -108,12 +113,10 @@ protected:
} }
//the incoming data queue //the incoming data queue
ThreadQueue<InputDataType *> *input = nullptr; VisualInputQueueType *input = nullptr;
//the n-outputs where to process()-ed data is distribute()-ed. //the n-outputs where to process()-ed data is distribute()-ed.
std::vector<ThreadQueue<OutputDataType *> *> outputs; std::vector<VisualOutputQueueType *> outputs;
typename std::vector<ThreadQueue<OutputDataType *> *>::iterator outputs_i;
//protects input and outputs, must be recursive because of re-entrance //protects input and outputs, must be recursive because of re-entrance
std::recursive_mutex busy_update; std::recursive_mutex busy_update;
@ -122,7 +125,7 @@ protected:
//Specialization much like VisualDataReDistributor, except //Specialization much like VisualDataReDistributor, except
//the input (pointer) is directly re-dispatched //the input (pointer) is directly re-dispatched
//to outputs, so that all output indeed SHARE the same instance. //to outputs, so that all output indeed SHARE the same instance.
template<class OutputDataType = ReferenceCounter> template<typename OutputDataType = ReferenceCounter>
class VisualDataDistributor : public VisualProcessor<OutputDataType, OutputDataType> { class VisualDataDistributor : public VisualProcessor<OutputDataType, OutputDataType> {
protected: protected:
virtual void process() { virtual void process() {
@ -145,7 +148,7 @@ protected:
//specialization class which process() take an input item and re-dispatch //specialization class which process() take an input item and re-dispatch
//A COPY to every outputs, without further processing. This is a 1-to-n dispatcher. //A COPY to every outputs, without further processing. This is a 1-to-n dispatcher.
template<class OutputDataType = ReferenceCounter> template<typename OutputDataType = ReferenceCounter>
class VisualDataReDistributor : public VisualProcessor<OutputDataType, OutputDataType> { class VisualDataReDistributor : public VisualProcessor<OutputDataType, OutputDataType> {
protected: protected:
virtual void process() { virtual void process() {

View File

@ -9,42 +9,41 @@
* Changes: * Changes:
* Charles J. Nov-19-2014 * Charles J. Nov-19-2014
* - Renamed SafeQueue -> ThreadQueue * - Renamed SafeQueue -> ThreadQueue
* Sonnier.V Feb-10-2017
* - Simplified, various fixes
*/ */
#include <queue> #include <deque>
#include <list> #include <list>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
#include <cstdint> #include <cstdint>
#include <condition_variable> #include <condition_variable>
#include <atomic>
class ThreadQueueBase { class ThreadQueueBase {
}; };
/** A thread-safe asynchronous queue */ /** A thread-safe asynchronous queue */
template<class T, class Container = std::list<T>> template<typename T>
class ThreadQueue : public ThreadQueueBase { class ThreadQueue : public ThreadQueueBase {
typedef typename Container::value_type value_type; typedef typename std::deque<T>::value_type value_type;
typedef typename Container::size_type size_type; typedef typename std::deque<T>::size_type size_type;
typedef Container container_type;
public: public:
/*! Create safe queue. */ /*! Create safe queue. */
ThreadQueue() { ThreadQueue() {
m_max_num_items.store(0); m_max_num_items = 0;
}; };
ThreadQueue(ThreadQueue&& sq) { ThreadQueue(ThreadQueue&& sq) {
m_queue = std::move(sq.m_queue); m_queue = std::move(sq.m_queue);
m_max_num_items.store(0); m_max_num_items = sq.m_max_num_items;
} }
ThreadQueue(const ThreadQueue& sq) { ThreadQueue(const ThreadQueue& sq) {
std::lock_guard < std::mutex > lock(sq.m_mutex); std::lock_guard < std::mutex > lock(sq.m_mutex);
m_queue = sq.m_queue; m_queue = sq.m_queue;
m_max_num_items.store(0); m_max_num_items = sq.m_max_num_items;
} }
/*! Destroy safe queue. */ /*! Destroy safe queue. */
@ -58,9 +57,7 @@ public:
*/ */
void set_max_num_items(unsigned int max_num_items) { void set_max_num_items(unsigned int max_num_items) {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
if (m_max_num_items.load() != max_num_items) { m_max_num_items = max_num_items;
m_max_num_items.store(max_num_items);
}
} }
/** /**
@ -71,13 +68,12 @@ public:
bool push(const value_type& item) { bool push(const value_type& item) {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) { if (m_max_num_items > 0 && m_queue.size() > m_max_num_items) {
m_condition.notify_all();
return false; return false;
} }
m_queue.push(item); m_queue.push_back(item);
m_condition.notify_all(); m_cond_not_empty.notify_all();
return true; return true;
} }
@ -89,13 +85,12 @@ public:
bool push(const value_type&& item) { bool push(const value_type&& item) {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) { if (m_max_num_items > 0 && m_queue.size() > m_max_num_items) {
m_condition.notify_all();
return false; return false;
} }
m_queue.push(item); m_queue.push_back(item);
m_condition.notify_all(); m_cond_not_empty.notify_all();
return true; return true;
} }
@ -105,12 +100,12 @@ public:
*/ */
void pop(value_type& item) { void pop(value_type& item) {
std::unique_lock < std::mutex > lock(m_mutex); std::unique_lock < std::mutex > lock(m_mutex);
m_condition.wait(lock, [this]() // Lambda funct m_cond_not_empty.wait(lock, [this]() // Lambda funct
{ {
return !m_queue.empty(); return !m_queue.empty();
}); });
item = m_queue.front(); item = m_queue.front();
m_queue.pop(); m_queue.pop_front();
} }
/** /**
@ -121,12 +116,12 @@ public:
*/ */
void move_pop(value_type& item) { void move_pop(value_type& item) {
std::unique_lock < std::mutex > lock(m_mutex); std::unique_lock < std::mutex > lock(m_mutex);
m_condition.wait(lock, [this]() // Lambda funct m_cond_not_empty.wait(lock, [this]() // Lambda funct
{ {
return !m_queue.empty(); return !m_queue.empty();
}); });
item = std::move(m_queue.front()); item = std::move(m_queue.front());
m_queue.pop(); m_queue.pop_front();
} }
/** /**
@ -135,13 +130,13 @@ public:
* \return False is returned if no item is available. * \return False is returned if no item is available.
*/ */
bool try_pop(value_type& item) { bool try_pop(value_type& item) {
std::unique_lock < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
if (m_queue.empty()) if (m_queue.empty())
return false; return false;
item = m_queue.front(); item = m_queue.front();
m_queue.pop(); m_queue.pop_front();
return true; return true;
} }
@ -152,13 +147,13 @@ public:
* \return False is returned if no item is available. * \return False is returned if no item is available.
*/ */
bool try_move_pop(value_type& item) { bool try_move_pop(value_type& item) {
std::unique_lock < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
if (m_queue.empty()) if (m_queue.empty())
return false; return false;
item = std::move(m_queue.front()); item = std::move(m_queue.front());
m_queue.pop(); m_queue.pop_front();
return true; return true;
} }
@ -175,12 +170,12 @@ public:
if (timeout == 0) if (timeout == 0)
return false; return false;
if (m_condition.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout) if (m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout)
return false; return false;
} }
item = m_queue.front(); item = m_queue.front();
m_queue.pop(); m_queue.pop_front();
return true; return true;
} }
@ -199,12 +194,12 @@ public:
if (timeout == 0) if (timeout == 0)
return false; return false;
if (m_condition.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout) if (m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout)
return false; return false;
} }
item = std::move(m_queue.front()); item = std::move(m_queue.front());
m_queue.pop(); m_queue.pop_front();
return true; return true;
} }
@ -232,7 +227,7 @@ public:
*/ */
bool full() const { bool full() const {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
return (m_max_num_items.load() != 0) && (m_queue.size() >= m_max_num_items.load()); return (m_max_num_items != 0) && (m_queue.size() >= m_max_num_items);
} }
/** /**
@ -240,9 +235,7 @@ public:
*/ */
void flush() { void flush() {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
m_queue = std::queue<T, Container>(); m_queue.clear();
std::queue<T, Container> emptyQueue;
std::swap(m_queue, emptyQueue);
} }
/** /**
@ -254,12 +247,14 @@ public:
std::lock_guard < std::mutex > lock1(m_mutex); std::lock_guard < std::mutex > lock1(m_mutex);
std::lock_guard < std::mutex > lock2(sq.m_mutex); std::lock_guard < std::mutex > lock2(sq.m_mutex);
m_queue.swap(sq.m_queue); m_queue.swap(sq.m_queue);
std::swap(m_max_num_items, sq.m_max_num_items);
if (!m_queue.empty()) if (!m_queue.empty())
m_condition.notify_all(); m_cond_not_empty.notify_all();
if (!sq.m_queue.empty()) if (!sq.m_queue.empty())
sq.m_condition.notify_all(); sq.m_cond_not_empty.notify_all();
} }
} }
@ -268,11 +263,12 @@ public:
if (this != &sq) { if (this != &sq) {
std::lock_guard < std::mutex > lock1(m_mutex); std::lock_guard < std::mutex > lock1(m_mutex);
std::lock_guard < std::mutex > lock2(sq.m_mutex); std::lock_guard < std::mutex > lock2(sq.m_mutex);
std::queue<T, Container> temp { sq.m_queue };
m_queue.swap(temp); m_queue = sq.m_queue;
m_max_num_items = sq.m_max_num_items;
if (!m_queue.empty()) if (!m_queue.empty())
m_condition.notify_all(); m_cond_not_empty.notify_all();
} }
return *this; return *this;
@ -282,23 +278,25 @@ public:
ThreadQueue& operator=(ThreadQueue && sq) { ThreadQueue& operator=(ThreadQueue && sq) {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
m_queue = std::move(sq.m_queue); m_queue = std::move(sq.m_queue);
m_max_num_items = sq.m_max_num_items;
if (!m_queue.empty()) if (!m_queue.empty())
m_condition.notify_all(); m_cond_not_empty.notify_all();
return *this; return *this;
} }
private: private:
std::queue<T, Container> m_queue; std::deque<T> m_queue;
mutable std::mutex m_mutex; mutable std::mutex m_mutex;
std::condition_variable m_condition; std::condition_variable m_cond_not_empty;
std::atomic_uint m_max_num_items; size_t m_max_num_items;
}; };
/*! Swaps the contents of two ThreadQueue objects. */ /*! Swaps the contents of two ThreadQueue objects. */
template<class T, class Container> template<typename T>
void swap(ThreadQueue<T, Container>& q1, ThreadQueue<T, Container>& q2) { void swap(ThreadQueue<T>& q1, ThreadQueue<T>& q2) {
q1.swap(q2); q1.swap(q2);
} }