From d3db5106437d273a0bdfdf17b4f97b82e699f919 Mon Sep 17 00:00:00 2001 From: vsonnier Date: Sun, 12 Feb 2017 12:53:50 +0100 Subject: [PATCH] pre-BLOCKING_QUEUE: Simplified, fixed ThreadQueue, VisualProcessor (use std:deque, cleaner templates, max_size fixes) --- src/process/VisualProcessor.h | 37 ++++++------ src/util/ThreadQueue.h | 102 +++++++++++++++++----------------- 2 files changed, 70 insertions(+), 69 deletions(-) diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index bd41d40..a385f4c 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -7,9 +7,14 @@ #include "ThreadQueue.h" #include "IOThread.h" #include +#include -template +template class VisualProcessor { + // + typedef typename ThreadQueue VisualInputQueueType; + typedef typename ThreadQueue VisualOutputQueueType; + typedef typename std::vector< VisualOutputQueueType *>::iterator outputs_i; public: virtual ~VisualProcessor() { @@ -24,8 +29,8 @@ public: bool isOutputEmpty() { std::lock_guard < std::recursive_mutex > busy_lock(busy_update); - for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) { - if ((*outputs_i)->full()) { + for (outputs_i it = outputs.begin(); it != outputs.end(); it++) { + if ((*it)->full()) { return false; } } @@ -35,8 +40,8 @@ public: bool isAnyOutputEmpty() { std::lock_guard < std::recursive_mutex > busy_lock(busy_update); - for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) { - if (!(*outputs_i)->full()) { + for (outputs_i it = outputs.begin(); it != outputs.end(); it++) { + if (!(*it)->full()) { return true; } } @@ -44,7 +49,7 @@ public: } //Set a (new) 'input' queue for incoming data. - void setInput(ThreadQueue *vis_in) { + void setInput(VisualInputQueueType *vis_in) { std::lock_guard < std::recursive_mutex > busy_lock(busy_update); input = vis_in; @@ -52,18 +57,18 @@ public: //Add a vis_out queue where to consumed 'input' data will be //dispatched by distribute(). - void attachOutput(ThreadQueue *vis_out) { + void attachOutput(VisualOutputQueueType *vis_out) { // attach an output queue std::lock_guard < std::recursive_mutex > busy_lock(busy_update); outputs.push_back(vis_out); } //reverse of attachOutput(), removed an existing attached vis_out. - void removeOutput(ThreadQueue *vis_out) { + void removeOutput(VisualOutputQueueType *vis_out) { // remove an output queue std::lock_guard < std::recursive_mutex > busy_lock(busy_update); - typename std::vector *>::iterator i = std::find(outputs.begin(), outputs.end(), vis_out); + outputs_i i = std::find(outputs.begin(), outputs.end(), vis_out); if (i != outputs.end()) { outputs.erase(i); } @@ -96,9 +101,9 @@ protected: //so 'output' will a-priori be shared among all 'outputs' so set its ref count to this //amount. item->setRefCount((int)outputs.size()); - for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) { + for (outputs_i it = outputs.begin(); it != outputs.end(); it++) { //if 'output' failed to be given to an outputs_i, dec its ref count accordingly. - if (!(*outputs_i)->push(item)) { + if (!(*it)->push(item)) { item->decRefCount(); } } @@ -108,12 +113,10 @@ protected: } //the incoming data queue - ThreadQueue *input = nullptr; + VisualInputQueueType *input = nullptr; //the n-outputs where to process()-ed data is distribute()-ed. - std::vector *> outputs; - - typename std::vector *>::iterator outputs_i; + std::vector outputs; //protects input and outputs, must be recursive because of re-entrance std::recursive_mutex busy_update; @@ -122,7 +125,7 @@ protected: //Specialization much like VisualDataReDistributor, except //the input (pointer) is directly re-dispatched //to outputs, so that all output indeed SHARE the same instance. -template +template class VisualDataDistributor : public VisualProcessor { protected: virtual void process() { @@ -145,7 +148,7 @@ protected: //specialization class which process() take an input item and re-dispatch //A COPY to every outputs, without further processing. This is a 1-to-n dispatcher. -template +template class VisualDataReDistributor : public VisualProcessor { protected: virtual void process() { diff --git a/src/util/ThreadQueue.h b/src/util/ThreadQueue.h index 8545e82..1805b4b 100644 --- a/src/util/ThreadQueue.h +++ b/src/util/ThreadQueue.h @@ -9,42 +9,41 @@ * Changes: * Charles J. Nov-19-2014 * - Renamed SafeQueue -> ThreadQueue + * Sonnier.V Feb-10-2017 + * - Simplified, various fixes */ -#include +#include #include #include #include #include #include -#include -class ThreadQueueBase { - +class ThreadQueueBase { }; /** A thread-safe asynchronous queue */ -template> +template class ThreadQueue : public ThreadQueueBase { - typedef typename Container::value_type value_type; - typedef typename Container::size_type size_type; - typedef Container container_type; + typedef typename std::deque::value_type value_type; + typedef typename std::deque::size_type size_type; public: /*! Create safe queue. */ ThreadQueue() { - m_max_num_items.store(0); + m_max_num_items = 0; }; ThreadQueue(ThreadQueue&& sq) { m_queue = std::move(sq.m_queue); - m_max_num_items.store(0); + m_max_num_items = sq.m_max_num_items; } ThreadQueue(const ThreadQueue& sq) { std::lock_guard < std::mutex > lock(sq.m_mutex); m_queue = sq.m_queue; - m_max_num_items.store(0); + m_max_num_items = sq.m_max_num_items; } /*! Destroy safe queue. */ @@ -58,9 +57,7 @@ public: */ void set_max_num_items(unsigned int max_num_items) { std::lock_guard < std::mutex > lock(m_mutex); - if (m_max_num_items.load() != max_num_items) { - m_max_num_items.store(max_num_items); - } + m_max_num_items = max_num_items; } /** @@ -71,13 +68,12 @@ public: bool push(const value_type& item) { std::lock_guard < std::mutex > lock(m_mutex); - if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) { - m_condition.notify_all(); + if (m_max_num_items > 0 && m_queue.size() > m_max_num_items) { return false; } - m_queue.push(item); - m_condition.notify_all(); + m_queue.push_back(item); + m_cond_not_empty.notify_all(); return true; } @@ -89,13 +85,12 @@ public: bool push(const value_type&& item) { std::lock_guard < std::mutex > lock(m_mutex); - if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) { - m_condition.notify_all(); + if (m_max_num_items > 0 && m_queue.size() > m_max_num_items) { return false; } - m_queue.push(item); - m_condition.notify_all(); + m_queue.push_back(item); + m_cond_not_empty.notify_all(); return true; } @@ -105,12 +100,12 @@ public: */ void pop(value_type& item) { std::unique_lock < std::mutex > lock(m_mutex); - m_condition.wait(lock, [this]() // Lambda funct + m_cond_not_empty.wait(lock, [this]() // Lambda funct { return !m_queue.empty(); }); item = m_queue.front(); - m_queue.pop(); + m_queue.pop_front(); } /** @@ -121,12 +116,12 @@ public: */ void move_pop(value_type& item) { std::unique_lock < std::mutex > lock(m_mutex); - m_condition.wait(lock, [this]() // Lambda funct + m_cond_not_empty.wait(lock, [this]() // Lambda funct { return !m_queue.empty(); }); item = std::move(m_queue.front()); - m_queue.pop(); + m_queue.pop_front(); } /** @@ -135,13 +130,13 @@ public: * \return False is returned if no item is available. */ bool try_pop(value_type& item) { - std::unique_lock < std::mutex > lock(m_mutex); + std::lock_guard < std::mutex > lock(m_mutex); if (m_queue.empty()) return false; item = m_queue.front(); - m_queue.pop(); + m_queue.pop_front(); return true; } @@ -152,13 +147,13 @@ public: * \return False is returned if no item is available. */ bool try_move_pop(value_type& item) { - std::unique_lock < std::mutex > lock(m_mutex); + std::lock_guard < std::mutex > lock(m_mutex); if (m_queue.empty()) return false; item = std::move(m_queue.front()); - m_queue.pop(); + m_queue.pop_front(); return true; } @@ -175,12 +170,12 @@ public: if (timeout == 0) return false; - if (m_condition.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout) + if (m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout) return false; } item = m_queue.front(); - m_queue.pop(); + m_queue.pop_front(); return true; } @@ -199,12 +194,12 @@ public: if (timeout == 0) return false; - if (m_condition.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout) + if (m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout) return false; } item = std::move(m_queue.front()); - m_queue.pop(); + m_queue.pop_front(); return true; } @@ -232,7 +227,7 @@ public: */ bool full() const { std::lock_guard < std::mutex > lock(m_mutex); - return (m_max_num_items.load() != 0) && (m_queue.size() >= m_max_num_items.load()); + return (m_max_num_items != 0) && (m_queue.size() >= m_max_num_items); } /** @@ -240,9 +235,7 @@ public: */ void flush() { std::lock_guard < std::mutex > lock(m_mutex); - m_queue = std::queue(); - std::queue emptyQueue; - std::swap(m_queue, emptyQueue); + m_queue.clear(); } /** @@ -254,12 +247,14 @@ public: std::lock_guard < std::mutex > lock1(m_mutex); std::lock_guard < std::mutex > lock2(sq.m_mutex); m_queue.swap(sq.m_queue); + std::swap(m_max_num_items, sq.m_max_num_items); if (!m_queue.empty()) - m_condition.notify_all(); + m_cond_not_empty.notify_all(); + if (!sq.m_queue.empty()) - sq.m_condition.notify_all(); + sq.m_cond_not_empty.notify_all(); } } @@ -268,11 +263,12 @@ public: if (this != &sq) { std::lock_guard < std::mutex > lock1(m_mutex); std::lock_guard < std::mutex > lock2(sq.m_mutex); - std::queue temp { sq.m_queue }; - m_queue.swap(temp); + + m_queue = sq.m_queue; + m_max_num_items = sq.m_max_num_items; - if (!m_queue.empty()) - m_condition.notify_all(); + if (!m_queue.empty()) + m_cond_not_empty.notify_all(); } return *this; @@ -282,23 +278,25 @@ public: ThreadQueue& operator=(ThreadQueue && sq) { std::lock_guard < std::mutex > lock(m_mutex); m_queue = std::move(sq.m_queue); + m_max_num_items = sq.m_max_num_items; - if (!m_queue.empty()) - m_condition.notify_all(); - + if (!m_queue.empty()) + m_cond_not_empty.notify_all(); + return *this; } private: - std::queue m_queue; + std::deque m_queue; + mutable std::mutex m_mutex; - std::condition_variable m_condition; - std::atomic_uint m_max_num_items; + std::condition_variable m_cond_not_empty; + size_t m_max_num_items; }; /*! Swaps the contents of two ThreadQueue objects. */ -template -void swap(ThreadQueue& q1, ThreadQueue& q2) { +template +void swap(ThreadQueue& q1, ThreadQueue& q2) { q1.swap(q2); }