2017-02-09 13:12:12 -05:00
|
|
|
// Copyright (c) Charles J. Cliffe
|
|
|
|
// SPDX-License-Identifier: GPL-2.0+
|
|
|
|
|
|
|
|
#pragma once
|
|
|
|
|
2017-05-20 15:49:17 -04:00
|
|
|
#include <vector>
|
2017-02-09 13:12:12 -05:00
|
|
|
#include <mutex>
|
|
|
|
#include <thread>
|
|
|
|
#include <cstdint>
|
|
|
|
#include <stddef.h>
|
|
|
|
#include <condition_variable>
|
2017-02-12 10:48:04 -05:00
|
|
|
#include <typeinfo>
|
2017-02-16 21:54:18 -05:00
|
|
|
#include <iostream>
|
2017-02-09 13:12:12 -05:00
|
|
|
|
|
|
|
#define MIN_ITEM_NB (1)
|
|
|
|
|
|
|
|
//use this timeout constant in either pop() or push() calls to indicate
|
|
|
|
// a non-blocking operation, so respectively equivalent to try_pop() and try_push()
|
|
|
|
#define NON_BLOCKING_TIMEOUT (100)
|
|
|
|
|
|
|
|
//use this timeout constant in either pop() or push() calls to indicate
|
|
|
|
//an indefnite timeout duration.
|
|
|
|
#define BLOCKING_INFINITE_TIMEOUT (0)
|
|
|
|
|
2017-02-12 10:48:04 -05:00
|
|
|
class ThreadQueueBase {
|
|
|
|
};
|
|
|
|
|
2017-02-09 13:12:12 -05:00
|
|
|
/** A thread-safe asynchronous blocking queue */
|
|
|
|
template<typename T>
|
|
|
|
class ThreadBlockingQueue : public ThreadQueueBase {
|
|
|
|
|
2017-05-20 15:49:17 -04:00
|
|
|
typedef typename std::vector<T>::value_type value_type;
|
|
|
|
typedef typename std::vector<T>::size_type size_type;
|
2017-02-09 13:12:12 -05:00
|
|
|
|
|
|
|
public:
|
|
|
|
|
|
|
|
/*! Create safe blocking queue. */
|
|
|
|
ThreadBlockingQueue() {
|
|
|
|
//at least 1 (== Exchanger)
|
2017-05-21 15:04:12 -04:00
|
|
|
m_circular_buffer.resize(MIN_ITEM_NB);
|
2017-02-09 13:12:12 -05:00
|
|
|
};
|
|
|
|
|
2017-02-12 10:48:04 -05:00
|
|
|
//Copy constructor
|
2017-02-09 13:12:12 -05:00
|
|
|
ThreadBlockingQueue(const ThreadBlockingQueue& sq) {
|
|
|
|
std::lock_guard < std::mutex > lock(sq.m_mutex);
|
2017-05-20 15:49:17 -04:00
|
|
|
m_circular_buffer = sq.m_circular_buffer;
|
2017-05-21 03:58:45 -04:00
|
|
|
m_head = sq.m_head;
|
|
|
|
m_tail = sq.m_tail;
|
2017-05-21 15:04:12 -04:00
|
|
|
m_size = sq.m_size;
|
2017-02-09 13:12:12 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
/*! Destroy safe queue. */
|
|
|
|
~ThreadBlockingQueue() {
|
|
|
|
std::lock_guard < std::mutex > lock(m_mutex);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Sets the maximum number of items in the queue. Real value is clamped
|
|
|
|
* to 1 on the lower bound.
|
|
|
|
* \param[in] nb max of items
|
|
|
|
*/
|
|
|
|
void set_max_num_items(unsigned int max_num_items) {
|
|
|
|
std::lock_guard < std::mutex > lock(m_mutex);
|
|
|
|
|
2017-05-21 15:04:12 -04:00
|
|
|
if (max_num_items > (unsigned int)m_circular_buffer.size()) {
|
2017-02-12 10:48:04 -05:00
|
|
|
//Only raise the existing max size, never reduce it
|
2017-02-09 13:12:12 -05:00
|
|
|
//for simplification sake at runtime.
|
2017-05-21 15:04:12 -04:00
|
|
|
m_circular_buffer.resize(max_num_items);
|
|
|
|
//m_head and m_tail stays valid for the new size.
|
2017-02-09 13:12:12 -05:00
|
|
|
m_cond_not_full.notify_all();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Pushes the item into the queue. If the queue is full, waits until room
|
|
|
|
* is available, for at most timeout microseconds.
|
|
|
|
* \param[in] item An item.
|
|
|
|
* \param[in] timeout a max waiting timeout in microseconds for an item to be pushed.
|
|
|
|
* by default, = 0 means indefinite wait.
|
|
|
|
* \param[in] errorMessage an error message written on std::cout in case of the timeout wait
|
|
|
|
* \return true if an item was pushed into the queue, else a timeout has occured.
|
|
|
|
*/
|
|
|
|
bool push(const value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT,const char* errorMessage = "") {
|
|
|
|
std::unique_lock < std::mutex > lock(m_mutex);
|
|
|
|
|
|
|
|
if (timeout == BLOCKING_INFINITE_TIMEOUT) {
|
|
|
|
m_cond_not_full.wait(lock, [this]() // Lambda funct
|
|
|
|
{
|
2017-05-21 15:04:12 -04:00
|
|
|
return m_size < m_circular_buffer.size();
|
2017-02-09 13:12:12 -05:00
|
|
|
});
|
2017-05-21 15:04:12 -04:00
|
|
|
} else if (timeout <= NON_BLOCKING_TIMEOUT && m_size >= m_circular_buffer.size()) {
|
2017-02-09 13:12:12 -05:00
|
|
|
// if the value is below a threshold, consider it is a try_push()
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout),
|
2017-05-21 15:04:12 -04:00
|
|
|
[this]() { return m_size < m_circular_buffer.size(); })) {
|
2017-02-13 15:51:46 -05:00
|
|
|
std::thread::id currentThreadId = std::this_thread::get_id();
|
|
|
|
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
|
|
|
|
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " <<
|
2017-02-09 13:12:12 -05:00
|
|
|
(timeout * 0.001) << " ms, message: " << errorMessage << std::endl;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-05-21 03:58:45 -04:00
|
|
|
//m_tail is already the next valid place an item can be put
|
|
|
|
m_circular_buffer[m_tail] = item;
|
|
|
|
m_tail = nextIndex(m_tail, (int)m_circular_buffer.size());
|
2017-05-21 15:04:12 -04:00
|
|
|
m_size++;
|
|
|
|
|
2017-05-21 03:58:45 -04:00
|
|
|
m_cond_not_empty.notify_all();
|
2017-02-09 13:12:12 -05:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Try to pushes the item into the queue, immediatly, without waiting. If the queue is full, the item
|
|
|
|
* is not inserted and the function returns false.
|
|
|
|
* \param[in] item An item.
|
|
|
|
*/
|
|
|
|
bool try_push(const value_type& item) {
|
|
|
|
std::lock_guard < std::mutex > lock(m_mutex);
|
|
|
|
|
2017-05-21 15:04:12 -04:00
|
|
|
if (m_size >= m_circular_buffer.size()) {
|
2017-02-09 13:12:12 -05:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-05-21 03:58:45 -04:00
|
|
|
//m_tail is already the next valid place an item can be put
|
|
|
|
m_circular_buffer[m_tail] = item;
|
|
|
|
m_tail = nextIndex(m_tail, (int)m_circular_buffer.size());
|
2017-05-21 15:04:12 -04:00
|
|
|
m_size++;
|
2017-02-09 13:12:12 -05:00
|
|
|
m_cond_not_empty.notify_all();
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available.
|
|
|
|
* \param[in] timeout The number of microseconds to wait. O (default) means indefinite wait.
|
|
|
|
* \param[in] errorMessage an error message written on std::cout in case of the timeout wait
|
|
|
|
* \return true if get an item from the queue, false if no item is received before the timeout.
|
|
|
|
*/
|
|
|
|
bool pop(value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") {
|
|
|
|
std::unique_lock < std::mutex > lock(m_mutex);
|
|
|
|
|
|
|
|
if (timeout == BLOCKING_INFINITE_TIMEOUT) {
|
|
|
|
m_cond_not_empty.wait(lock, [this]() // Lambda funct
|
|
|
|
{
|
2017-05-21 15:04:12 -04:00
|
|
|
return m_size > 0;
|
2017-02-09 13:12:12 -05:00
|
|
|
});
|
2017-05-21 15:04:12 -04:00
|
|
|
} else if (timeout <= NON_BLOCKING_TIMEOUT && m_size == 0) {
|
2017-02-09 13:12:12 -05:00
|
|
|
// if the value is below a threshold, consider it is try_pop()
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout),
|
2017-05-21 15:04:12 -04:00
|
|
|
[this]() { return m_size > 0; })) {
|
2017-02-13 15:51:46 -05:00
|
|
|
std::thread::id currentThreadId = std::this_thread::get_id();
|
|
|
|
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
|
|
|
|
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " <<
|
|
|
|
(timeout * 0.001) << " ms, message: " << errorMessage << std::endl;
|
2017-02-09 13:12:12 -05:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-05-20 15:49:17 -04:00
|
|
|
item = m_circular_buffer[m_head];
|
2017-05-21 03:58:45 -04:00
|
|
|
m_head = nextIndex(m_head, (int)m_circular_buffer.size());
|
2017-05-21 15:04:12 -04:00
|
|
|
m_size--;
|
2017-02-09 13:12:12 -05:00
|
|
|
m_cond_not_full.notify_all();
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Tries to pop item from the queue.
|
|
|
|
* \param[out] item The item.
|
|
|
|
* \return False is returned if no item is available.
|
|
|
|
*/
|
|
|
|
bool try_pop(value_type& item) {
|
|
|
|
std::lock_guard < std::mutex > lock(m_mutex);
|
|
|
|
|
2017-05-21 15:04:12 -04:00
|
|
|
if (m_size == 0) {
|
2017-02-09 13:12:12 -05:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-05-21 03:58:45 -04:00
|
|
|
item = m_circular_buffer[m_head];
|
|
|
|
m_head = nextIndex(m_head, (int)m_circular_buffer.size());
|
2017-05-21 15:04:12 -04:00
|
|
|
m_size--;
|
2017-02-09 13:12:12 -05:00
|
|
|
m_cond_not_full.notify_all();
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Gets the number of items in the queue.
|
|
|
|
* \return Number of items in the queue.
|
|
|
|
*/
|
|
|
|
size_type size() const {
|
|
|
|
std::lock_guard < std::mutex > lock(m_mutex);
|
2017-05-21 15:04:12 -04:00
|
|
|
return m_size;
|
2017-02-09 13:12:12 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Check if the queue is empty.
|
|
|
|
* \return true if queue is empty.
|
|
|
|
*/
|
|
|
|
bool empty() const {
|
|
|
|
std::lock_guard < std::mutex > lock(m_mutex);
|
2017-05-21 15:04:12 -04:00
|
|
|
return m_size == 0;
|
2017-02-09 13:12:12 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Check if the queue is full.
|
|
|
|
* \return true if queue is full.
|
|
|
|
*/
|
|
|
|
bool full() const {
|
|
|
|
std::lock_guard < std::mutex > lock(m_mutex);
|
2017-05-21 15:04:12 -04:00
|
|
|
return (m_size >= m_circular_buffer.size());
|
2017-02-09 13:12:12 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Remove any items in the queue.
|
|
|
|
*/
|
|
|
|
void flush() {
|
|
|
|
std::lock_guard < std::mutex > lock(m_mutex);
|
2017-05-21 03:58:45 -04:00
|
|
|
m_head = 0;
|
|
|
|
m_tail = 0;
|
2017-05-21 15:04:12 -04:00
|
|
|
m_size = 0;
|
2017-02-09 13:12:12 -05:00
|
|
|
m_cond_not_full.notify_all();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Swaps the contents.
|
|
|
|
* \param[out] sq The ThreadBlockingQueue to swap with 'this'.
|
|
|
|
*/
|
|
|
|
void swap(ThreadBlockingQueue& sq) {
|
|
|
|
if (this != &sq) {
|
|
|
|
std::lock_guard < std::mutex > lock1(m_mutex);
|
|
|
|
std::lock_guard < std::mutex > lock2(sq.m_mutex);
|
2017-05-20 15:49:17 -04:00
|
|
|
m_circular_buffer.swap(sq.m_circular_buffer);
|
|
|
|
|
|
|
|
std::swap(m_head, sq.m_head);
|
2017-05-21 03:58:45 -04:00
|
|
|
std::swap(m_tail, sq.m_tail);
|
2017-05-21 15:04:12 -04:00
|
|
|
std::swap(m_size, sq.m_size);
|
2017-02-09 13:12:12 -05:00
|
|
|
|
2017-05-21 15:04:12 -04:00
|
|
|
if (m_size > 0) {
|
2017-02-09 13:12:12 -05:00
|
|
|
m_cond_not_empty.notify_all();
|
|
|
|
}
|
|
|
|
|
2017-05-21 15:04:12 -04:00
|
|
|
if (sq.m_size > 0) {
|
2017-02-09 13:12:12 -05:00
|
|
|
sq.m_cond_not_empty.notify_all();
|
|
|
|
}
|
|
|
|
|
2017-05-21 15:04:12 -04:00
|
|
|
if (m_size < m_circular_buffer.size()) {
|
2017-02-09 13:12:12 -05:00
|
|
|
m_cond_not_full.notify_all();
|
|
|
|
}
|
|
|
|
|
2017-05-21 15:04:12 -04:00
|
|
|
if (sq.m_size < sq.m_circular_buffer.size()) {
|
2017-02-09 13:12:12 -05:00
|
|
|
sq.m_cond_not_full.notify_all();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*! The copy assignment operator */
|
|
|
|
ThreadBlockingQueue& operator=(const ThreadBlockingQueue& sq) {
|
|
|
|
if (this != &sq) {
|
|
|
|
std::lock_guard < std::mutex > lock1(m_mutex);
|
|
|
|
std::lock_guard < std::mutex > lock2(sq.m_mutex);
|
|
|
|
|
2017-05-21 03:58:45 -04:00
|
|
|
m_circular_buffer = sq.m_circular_buffer;
|
2017-05-20 15:49:17 -04:00
|
|
|
|
2017-05-21 03:58:45 -04:00
|
|
|
m_head = sq.m_head;
|
|
|
|
m_tail = sq.m_tail;
|
2017-05-21 15:04:12 -04:00
|
|
|
m_size = sq.m_size;
|
2017-02-09 13:12:12 -05:00
|
|
|
|
2017-05-21 15:04:12 -04:00
|
|
|
if (m_size > 0) {
|
2017-02-09 13:12:12 -05:00
|
|
|
m_cond_not_empty.notify_all();
|
|
|
|
}
|
|
|
|
|
2017-05-21 15:04:12 -04:00
|
|
|
if (m_size < m_circular_buffer.size()) {
|
2017-02-09 13:12:12 -05:00
|
|
|
m_cond_not_full.notify_all();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return *this;
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
2017-05-20 15:49:17 -04:00
|
|
|
/// use a circular buffer structure to prevent allocations / reallocations (fixed array + modulo)
|
|
|
|
std::vector<T> m_circular_buffer;
|
|
|
|
|
2017-05-21 03:58:45 -04:00
|
|
|
/**
|
|
|
|
* The 'head' index of the element at the head of the deque, 'tail'
|
|
|
|
* the next (valid !) index at which an element can be pushed.
|
|
|
|
* m_head == m_tail means empty.
|
|
|
|
*/
|
2017-05-21 15:04:12 -04:00
|
|
|
int m_head = 0, m_tail = 0;
|
2017-05-21 03:58:45 -04:00
|
|
|
|
2017-05-21 15:04:12 -04:00
|
|
|
//hold the current number of elements.
|
|
|
|
size_type m_size = 0;
|
2017-05-21 03:58:45 -04:00
|
|
|
|
|
|
|
//
|
2017-05-21 15:04:12 -04:00
|
|
|
inline int nextIndex(int index, int mod) const {
|
|
|
|
return (index + 1 == mod) ? 0 : index + 1;
|
2017-05-21 03:58:45 -04:00
|
|
|
}
|
2017-02-09 13:12:12 -05:00
|
|
|
|
|
|
|
mutable std::mutex m_mutex;
|
|
|
|
std::condition_variable m_cond_not_empty;
|
|
|
|
std::condition_variable m_cond_not_full;
|
|
|
|
};
|
|
|
|
|
|
|
|
/*! Swaps the contents of two ThreadBlockingQueue objects. (external operator) */
|
|
|
|
template<typename T>
|
|
|
|
void swap(ThreadBlockingQueue<T>& q1, ThreadBlockingQueue<T>& q2) {
|
|
|
|
q1.swap(q2);
|
|
|
|
}
|