Merge pull request #545 from cjcliffe/vso_circular_buffer_for_queue

TheadBlockingQueue implemented with circular buffer (zero runtime mem allocs)
==> Well, looks like it works.
This commit is contained in:
Vincent Sonnier 2017-05-21 10:31:22 +02:00 committed by GitHub
commit 313d66e1d6
6 changed files with 219 additions and 485 deletions

View File

@ -3,7 +3,7 @@
#pragma once #pragma once
#include <deque> #include <vector>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
#include <cstdint> #include <cstdint>
@ -29,22 +29,23 @@ class ThreadQueueBase {
template<typename T> template<typename T>
class ThreadBlockingQueue : public ThreadQueueBase { class ThreadBlockingQueue : public ThreadQueueBase {
typedef typename std::deque<T>::value_type value_type; typedef typename std::vector<T>::value_type value_type;
typedef typename std::deque<T>::size_type size_type; typedef typename std::vector<T>::size_type size_type;
public: public:
/*! Create safe blocking queue. */ /*! Create safe blocking queue. */
ThreadBlockingQueue() { ThreadBlockingQueue() {
//at least 1 (== Exchanger) //at least 1 (== Exchanger)
m_max_num_items = MIN_ITEM_NB; m_circular_buffer.resize(MIN_ITEM_NB + 1); //there is one slot more than the size for internal management.
}; };
//Copy constructor //Copy constructor
ThreadBlockingQueue(const ThreadBlockingQueue& sq) { ThreadBlockingQueue(const ThreadBlockingQueue& sq) {
std::lock_guard < std::mutex > lock(sq.m_mutex); std::lock_guard < std::mutex > lock(sq.m_mutex);
m_queue = sq.m_queue; m_circular_buffer = sq.m_circular_buffer;
m_max_num_items = sq.m_max_num_items; m_head = sq.m_head;
m_tail = sq.m_tail;
} }
/*! Destroy safe queue. */ /*! Destroy safe queue. */
@ -60,10 +61,11 @@ public:
void set_max_num_items(unsigned int max_num_items) { void set_max_num_items(unsigned int max_num_items) {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
if (max_num_items > m_max_num_items) { if (max_num_items > (unsigned int)privateMaxNumElements()) {
//Only raise the existing max size, never reduce it //Only raise the existing max size, never reduce it
//for simplification sake at runtime. //for simplification sake at runtime.
m_max_num_items = max_num_items; m_circular_buffer.resize(max_num_items + 1); // there is 1 extra allocated slot.
//m_head and m_tail stays valid.
m_cond_not_full.notify_all(); m_cond_not_full.notify_all();
} }
} }
@ -83,14 +85,14 @@ public:
if (timeout == BLOCKING_INFINITE_TIMEOUT) { if (timeout == BLOCKING_INFINITE_TIMEOUT) {
m_cond_not_full.wait(lock, [this]() // Lambda funct m_cond_not_full.wait(lock, [this]() // Lambda funct
{ {
return m_queue.size() < m_max_num_items; return privateSize() < privateMaxNumElements();
}); });
} else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.size() >= m_max_num_items) { } else if (timeout <= NON_BLOCKING_TIMEOUT && privateSize() >= privateMaxNumElements()) {
// if the value is below a threshold, consider it is a try_push() // if the value is below a threshold, consider it is a try_push()
return false; return false;
} }
else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout), else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout),
[this]() { return m_queue.size() < m_max_num_items; })) { [this]() { return privateSize() < privateMaxNumElements(); })) {
std::thread::id currentThreadId = std::this_thread::get_id(); std::thread::id currentThreadId = std::this_thread::get_id();
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " << " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " <<
@ -98,7 +100,10 @@ public:
return false; return false;
} }
m_queue.push_back(item); //m_tail is already the next valid place an item can be put
m_circular_buffer[m_tail] = item;
m_tail = nextIndex(m_tail, (int)m_circular_buffer.size());
m_cond_not_empty.notify_all(); m_cond_not_empty.notify_all();
return true; return true;
} }
@ -111,11 +116,14 @@ public:
bool try_push(const value_type& item) { bool try_push(const value_type& item) {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
if (m_queue.size() >= m_max_num_items) { if (privateSize() >= privateMaxNumElements()) {
return false; return false;
} }
m_queue.push_back(item); //m_tail is already the next valid place an item can be put
m_circular_buffer[m_tail] = item;
m_tail = nextIndex(m_tail, (int)m_circular_buffer.size());
m_cond_not_empty.notify_all(); m_cond_not_empty.notify_all();
return true; return true;
} }
@ -132,14 +140,14 @@ public:
if (timeout == BLOCKING_INFINITE_TIMEOUT) { if (timeout == BLOCKING_INFINITE_TIMEOUT) {
m_cond_not_empty.wait(lock, [this]() // Lambda funct m_cond_not_empty.wait(lock, [this]() // Lambda funct
{ {
return !m_queue.empty(); return privateSize() > 0;
}); });
} else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.empty()) { } else if (timeout <= NON_BLOCKING_TIMEOUT && privateSize() == 0) {
// if the value is below a threshold, consider it is try_pop() // if the value is below a threshold, consider it is try_pop()
return false; return false;
} }
else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout), else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout),
[this]() { return !m_queue.empty(); })) { [this]() { return privateSize() > 0; })) {
std::thread::id currentThreadId = std::this_thread::get_id(); std::thread::id currentThreadId = std::this_thread::get_id();
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " << " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " <<
@ -147,8 +155,9 @@ public:
return false; return false;
} }
item = m_queue.front(); item = m_circular_buffer[m_head];
m_queue.pop_front(); m_head = nextIndex(m_head, (int)m_circular_buffer.size());
m_cond_not_full.notify_all(); m_cond_not_full.notify_all();
return true; return true;
} }
@ -161,12 +170,13 @@ public:
bool try_pop(value_type& item) { bool try_pop(value_type& item) {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
if (m_queue.empty()) { if (privateSize() == 0) {
return false; return false;
} }
item = m_queue.front(); item = m_circular_buffer[m_head];
m_queue.pop_front(); m_head = nextIndex(m_head, (int)m_circular_buffer.size());
m_cond_not_full.notify_all(); m_cond_not_full.notify_all();
return true; return true;
} }
@ -178,7 +188,7 @@ public:
*/ */
size_type size() const { size_type size() const {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
return m_queue.size(); return privateSize();
} }
/** /**
@ -187,7 +197,7 @@ public:
*/ */
bool empty() const { bool empty() const {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
return m_queue.empty(); return privateSize() == 0;
} }
/** /**
@ -196,7 +206,7 @@ public:
*/ */
bool full() const { bool full() const {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
return (m_queue.size() >= m_max_num_items); return (privateSize() >= privateMaxNumElements());
} }
/** /**
@ -204,7 +214,9 @@ public:
*/ */
void flush() { void flush() {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
m_queue.clear(); m_head = 0;
m_tail = 0;
m_cond_not_full.notify_all(); m_cond_not_full.notify_all();
} }
@ -216,22 +228,24 @@ public:
if (this != &sq) { if (this != &sq) {
std::lock_guard < std::mutex > lock1(m_mutex); std::lock_guard < std::mutex > lock1(m_mutex);
std::lock_guard < std::mutex > lock2(sq.m_mutex); std::lock_guard < std::mutex > lock2(sq.m_mutex);
m_queue.swap(sq.m_queue); m_circular_buffer.swap(sq.m_circular_buffer);
std::swap(m_max_num_items, sq.m_max_num_items);
if (!m_queue.empty()) { std::swap(m_head, sq.m_head);
std::swap(m_tail, sq.m_tail);
if (privateSize() > 0) {
m_cond_not_empty.notify_all(); m_cond_not_empty.notify_all();
} }
if (!sq.m_queue.empty()) { if (sq.privateSize() > 0) {
sq.m_cond_not_empty.notify_all(); sq.m_cond_not_empty.notify_all();
} }
if (!m_queue.full()) { if (privateSize() < privateMaxNumElements()) {
m_cond_not_full.notify_all(); m_cond_not_full.notify_all();
} }
if (!sq.m_queue.full()) { if (sq.privateSize() < sq.privateMaxNumElements()) {
sq.m_cond_not_full.notify_all(); sq.m_cond_not_full.notify_all();
} }
} }
@ -243,14 +257,16 @@ public:
std::lock_guard < std::mutex > lock1(m_mutex); std::lock_guard < std::mutex > lock1(m_mutex);
std::lock_guard < std::mutex > lock2(sq.m_mutex); std::lock_guard < std::mutex > lock2(sq.m_mutex);
m_queue = sq.m_queue; m_circular_buffer = sq.m_circular_buffer;
m_max_num_items = sq.m_max_num_items;
if (!m_queue.empty()) { m_head = sq.m_head;
m_tail = sq.m_tail;
if (privateSize() > 0) {
m_cond_not_empty.notify_all(); m_cond_not_empty.notify_all();
} }
if (!m_queue.full()) { if (privateSize() < privateMaxNumElements()) {
m_cond_not_full.notify_all(); m_cond_not_full.notify_all();
} }
} }
@ -258,13 +274,38 @@ public:
} }
private: private:
//TODO: use a circular buffer structure ? (fixed array + modulo) /// use a circular buffer structure to prevent allocations / reallocations (fixed array + modulo)
std::deque<T> m_queue; std::vector<T> m_circular_buffer;
/**
* The 'head' index of the element at the head of the deque, 'tail'
* the next (valid !) index at which an element can be pushed.
* m_head == m_tail means empty.
*/
int m_head = 0, m_tail = 0;
//
inline int nextIndex(int index, int modulus) const {
return (index + 1 == modulus) ? 0 : index + 1;
}
//
inline int privateSize() const {
if (m_head <= m_tail) {
return m_tail - m_head;
}
return (m_tail - m_head + (int)m_circular_buffer.size());
}
//
inline int privateMaxNumElements() const {
return (int)m_circular_buffer.size() - 1;
}
mutable std::mutex m_mutex; mutable std::mutex m_mutex;
std::condition_variable m_cond_not_empty; std::condition_variable m_cond_not_empty;
std::condition_variable m_cond_not_full; std::condition_variable m_cond_not_full;
size_t m_max_num_items = MIN_ITEM_NB;
}; };
/*! Swaps the contents of two ThreadBlockingQueue objects. (external operator) */ /*! Swaps the contents of two ThreadBlockingQueue objects. (external operator) */

View File

@ -1,4 +0,0 @@
// Copyright (c) Charles J. Cliffe
// SPDX-License-Identifier: GPL-2.0+
#include <ThreadQueue.h>

View File

@ -1,302 +0,0 @@
// Copyright (c) Charles J. Cliffe
// SPDX-License-Identifier: GPL-2.0+
#pragma once
/* Credit to Alfredo Pons / https://plus.google.com/109903449837592676231
* Code from http://gnodebian.blogspot.com.es/2013/07/a-thread-safe-asynchronous-queue-in-c11.html
*
* Changes:
* Charles J. Nov-19-2014
* - Renamed SafeQueue -> ThreadQueue
* Sonnier.V Feb-10-2017
* - Simplified, various fixes
*/
#include <deque>
#include <list>
#include <mutex>
#include <thread>
#include <cstdint>
#include <condition_variable>
class ThreadQueueBase {
};
/** A thread-safe asynchronous queue */
template<typename T>
class ThreadQueue : public ThreadQueueBase {
typedef typename std::deque<T>::value_type value_type;
typedef typename std::deque<T>::size_type size_type;
public:
/*! Create safe queue. */
ThreadQueue() {
m_max_num_items = 0;
};
ThreadQueue(ThreadQueue&& sq) {
m_queue = std::move(sq.m_queue);
m_max_num_items = sq.m_max_num_items;
}
ThreadQueue(const ThreadQueue& sq) {
std::lock_guard < std::mutex > lock(sq.m_mutex);
m_queue = sq.m_queue;
m_max_num_items = sq.m_max_num_items;
}
/*! Destroy safe queue. */
~ThreadQueue() {
std::lock_guard < std::mutex > lock(m_mutex);
}
/**
* Sets the maximum number of items in the queue. Defaults is 0: No limit
* \param[in] item An item.
*/
void set_max_num_items(unsigned int max_num_items) {
std::lock_guard < std::mutex > lock(m_mutex);
m_max_num_items = max_num_items;
}
/**
* Pushes the item into the queue.
* \param[in] item An item.
* \return true if an item was pushed into the queue
*/
bool push(const value_type& item) {
std::lock_guard < std::mutex > lock(m_mutex);
if (m_max_num_items > 0 && m_queue.size() > m_max_num_items) {
return false;
}
m_queue.push_back(item);
m_cond_not_empty.notify_all();
return true;
}
/**
* Pushes the item into the queue.
* \param[in] item An item.
* \return true if an item was pushed into the queue
*/
bool push(const value_type&& item) {
std::lock_guard < std::mutex > lock(m_mutex);
if (m_max_num_items > 0 && m_queue.size() > m_max_num_items) {
return false;
}
m_queue.push_back(item);
m_cond_not_empty.notify_all();
return true;
}
/**
* Pops item from the queue. If queue is empty, this function blocks until item becomes available.
* \param[out] item The item.
*/
void pop(value_type& item) {
std::unique_lock < std::mutex > lock(m_mutex);
m_cond_not_empty.wait(lock, [this]() // Lambda funct
{
return !m_queue.empty();
});
item = m_queue.front();
m_queue.pop_front();
}
/**
* Pops item from the queue using the contained type's move assignment operator, if it has one..
* This method is identical to the pop() method if that type has no move assignment operator.
* If queue is empty, this function blocks until item becomes available.
* \param[out] item The item.
*/
void move_pop(value_type& item) {
std::unique_lock < std::mutex > lock(m_mutex);
m_cond_not_empty.wait(lock, [this]() // Lambda funct
{
return !m_queue.empty();
});
item = std::move(m_queue.front());
m_queue.pop_front();
}
/**
* Tries to pop item from the queue.
* \param[out] item The item.
* \return False is returned if no item is available.
*/
bool try_pop(value_type& item) {
std::lock_guard < std::mutex > lock(m_mutex);
if (m_queue.empty())
return false;
item = m_queue.front();
m_queue.pop_front();
return true;
}
/**
* Tries to pop item from the queue using the contained type's move assignment operator, if it has one..
* This method is identical to the try_pop() method if that type has no move assignment operator.
* \param[out] item The item.
* \return False is returned if no item is available.
*/
bool try_move_pop(value_type& item) {
std::lock_guard < std::mutex > lock(m_mutex);
if (m_queue.empty())
return false;
item = std::move(m_queue.front());
m_queue.pop_front();
return true;
}
/**
* Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available.
* \param[out] t An item.
* \param[in] timeout The number of microseconds to wait.
* \return true if get an item from the queue, false if no item is received before the timeout.
*/
bool timeout_pop(value_type& item, std::uint64_t timeout) {
std::unique_lock < std::mutex > lock(m_mutex);
if (m_queue.empty()) {
if (timeout == 0)
return false;
if (m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout)
return false;
}
item = m_queue.front();
m_queue.pop_front();
return true;
}
/**
* Pops item from the queue using the contained type's move assignment operator, if it has one..
* If the queue is empty, blocks for timeout microseconds, or until item becomes available.
* This method is identical to the try_pop() method if that type has no move assignment operator.
* \param[out] t An item.
* \param[in] timeout The number of microseconds to wait.
* \return true if get an item from the queue, false if no item is received before the timeout.
*/
bool timeout_move_pop(value_type& item, std::uint64_t timeout) {
std::unique_lock < std::mutex > lock(m_mutex);
if (m_queue.empty()) {
if (timeout == 0)
return false;
if (m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout)
return false;
}
item = std::move(m_queue.front());
m_queue.pop_front();
return true;
}
/**
* Gets the number of items in the queue.
* \return Number of items in the queue.
*/
size_type size() const {
std::lock_guard < std::mutex > lock(m_mutex);
return m_queue.size();
}
/**
* Check if the queue is empty.
* \return true if queue is empty.
*/
bool empty() const {
std::lock_guard < std::mutex > lock(m_mutex);
return m_queue.empty();
}
/**
* Check if the queue is full.
* \return true if queue is full.
*/
bool full() const {
std::lock_guard < std::mutex > lock(m_mutex);
return (m_max_num_items != 0) && (m_queue.size() >= m_max_num_items);
}
/**
* Remove any items in the queue.
*/
void flush() {
std::lock_guard < std::mutex > lock(m_mutex);
m_queue.clear();
}
/**
* Swaps the contents.
* \param[out] sq The ThreadQueue to swap with 'this'.
*/
void swap(ThreadQueue& sq) {
if (this != &sq) {
std::lock_guard < std::mutex > lock1(m_mutex);
std::lock_guard < std::mutex > lock2(sq.m_mutex);
m_queue.swap(sq.m_queue);
std::swap(m_max_num_items, sq.m_max_num_items);
if (!m_queue.empty())
m_cond_not_empty.notify_all();
if (!sq.m_queue.empty())
sq.m_cond_not_empty.notify_all();
}
}
/*! The copy assignment operator */
ThreadQueue& operator=(const ThreadQueue& sq) {
if (this != &sq) {
std::lock_guard < std::mutex > lock1(m_mutex);
std::lock_guard < std::mutex > lock2(sq.m_mutex);
m_queue = sq.m_queue;
m_max_num_items = sq.m_max_num_items;
if (!m_queue.empty())
m_cond_not_empty.notify_all();
}
return *this;
}
/*! The move assignment operator */
ThreadQueue& operator=(ThreadQueue && sq) {
std::lock_guard < std::mutex > lock(m_mutex);
m_queue = std::move(sq.m_queue);
m_max_num_items = sq.m_max_num_items;
if (!m_queue.empty())
m_cond_not_empty.notify_all();
return *this;
}
private:
std::deque<T> m_queue;
mutable std::mutex m_mutex;
std::condition_variable m_cond_not_empty;
size_t m_max_num_items;
};
/*! Swaps the contents of two ThreadQueue objects. */
template<typename T>
void swap(ThreadQueue<T>& q1, ThreadQueue<T>& q2) {
q1.swap(q2);
}

View File

@ -18,7 +18,6 @@
class Timer { class Timer {
private: private:
//units are microsecs:
unsigned long time_elapsed; unsigned long time_elapsed;
unsigned long system_milliseconds; unsigned long system_milliseconds;
unsigned long start_time; unsigned long start_time;