Clearer ThreadBlockingQueue impl by using a field to hold size...

This commit is contained in:
vsonnier 2017-05-21 21:04:12 +02:00
parent 313d66e1d6
commit af20f680a4
1 changed files with 36 additions and 43 deletions

View File

@ -37,7 +37,7 @@ public:
/*! Create safe blocking queue. */
ThreadBlockingQueue() {
//at least 1 (== Exchanger)
m_circular_buffer.resize(MIN_ITEM_NB + 1); //there is one slot more than the size for internal management.
m_circular_buffer.resize(MIN_ITEM_NB);
};
//Copy constructor
@ -46,6 +46,7 @@ public:
m_circular_buffer = sq.m_circular_buffer;
m_head = sq.m_head;
m_tail = sq.m_tail;
m_size = sq.m_size;
}
/*! Destroy safe queue. */
@ -61,11 +62,11 @@ public:
void set_max_num_items(unsigned int max_num_items) {
std::lock_guard < std::mutex > lock(m_mutex);
if (max_num_items > (unsigned int)privateMaxNumElements()) {
if (max_num_items > (unsigned int)m_circular_buffer.size()) {
//Only raise the existing max size, never reduce it
//for simplification sake at runtime.
m_circular_buffer.resize(max_num_items + 1); // there is 1 extra allocated slot.
//m_head and m_tail stays valid.
m_circular_buffer.resize(max_num_items);
//m_head and m_tail stays valid for the new size.
m_cond_not_full.notify_all();
}
}
@ -85,14 +86,14 @@ public:
if (timeout == BLOCKING_INFINITE_TIMEOUT) {
m_cond_not_full.wait(lock, [this]() // Lambda funct
{
return privateSize() < privateMaxNumElements();
return m_size < m_circular_buffer.size();
});
} else if (timeout <= NON_BLOCKING_TIMEOUT && privateSize() >= privateMaxNumElements()) {
} else if (timeout <= NON_BLOCKING_TIMEOUT && m_size >= m_circular_buffer.size()) {
// if the value is below a threshold, consider it is a try_push()
return false;
}
else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout),
[this]() { return privateSize() < privateMaxNumElements(); })) {
[this]() { return m_size < m_circular_buffer.size(); })) {
std::thread::id currentThreadId = std::this_thread::get_id();
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " <<
@ -103,7 +104,8 @@ public:
//m_tail is already the next valid place an item can be put
m_circular_buffer[m_tail] = item;
m_tail = nextIndex(m_tail, (int)m_circular_buffer.size());
m_size++;
m_cond_not_empty.notify_all();
return true;
}
@ -116,14 +118,14 @@ public:
bool try_push(const value_type& item) {
std::lock_guard < std::mutex > lock(m_mutex);
if (privateSize() >= privateMaxNumElements()) {
if (m_size >= m_circular_buffer.size()) {
return false;
}
//m_tail is already the next valid place an item can be put
m_circular_buffer[m_tail] = item;
m_tail = nextIndex(m_tail, (int)m_circular_buffer.size());
m_size++;
m_cond_not_empty.notify_all();
return true;
}
@ -140,14 +142,14 @@ public:
if (timeout == BLOCKING_INFINITE_TIMEOUT) {
m_cond_not_empty.wait(lock, [this]() // Lambda funct
{
return privateSize() > 0;
return m_size > 0;
});
} else if (timeout <= NON_BLOCKING_TIMEOUT && privateSize() == 0) {
} else if (timeout <= NON_BLOCKING_TIMEOUT && m_size == 0) {
// if the value is below a threshold, consider it is try_pop()
return false;
}
else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout),
[this]() { return privateSize() > 0; })) {
[this]() { return m_size > 0; })) {
std::thread::id currentThreadId = std::this_thread::get_id();
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " <<
@ -157,7 +159,7 @@ public:
item = m_circular_buffer[m_head];
m_head = nextIndex(m_head, (int)m_circular_buffer.size());
m_size--;
m_cond_not_full.notify_all();
return true;
}
@ -170,13 +172,13 @@ public:
bool try_pop(value_type& item) {
std::lock_guard < std::mutex > lock(m_mutex);
if (privateSize() == 0) {
if (m_size == 0) {
return false;
}
item = m_circular_buffer[m_head];
m_head = nextIndex(m_head, (int)m_circular_buffer.size());
m_size--;
m_cond_not_full.notify_all();
return true;
}
@ -188,7 +190,7 @@ public:
*/
size_type size() const {
std::lock_guard < std::mutex > lock(m_mutex);
return privateSize();
return m_size;
}
/**
@ -197,7 +199,7 @@ public:
*/
bool empty() const {
std::lock_guard < std::mutex > lock(m_mutex);
return privateSize() == 0;
return m_size == 0;
}
/**
@ -206,7 +208,7 @@ public:
*/
bool full() const {
std::lock_guard < std::mutex > lock(m_mutex);
return (privateSize() >= privateMaxNumElements());
return (m_size >= m_circular_buffer.size());
}
/**
@ -216,7 +218,7 @@ public:
std::lock_guard < std::mutex > lock(m_mutex);
m_head = 0;
m_tail = 0;
m_size = 0;
m_cond_not_full.notify_all();
}
@ -232,20 +234,21 @@ public:
std::swap(m_head, sq.m_head);
std::swap(m_tail, sq.m_tail);
std::swap(m_size, sq.m_size);
if (privateSize() > 0) {
if (m_size > 0) {
m_cond_not_empty.notify_all();
}
if (sq.privateSize() > 0) {
if (sq.m_size > 0) {
sq.m_cond_not_empty.notify_all();
}
if (privateSize() < privateMaxNumElements()) {
if (m_size < m_circular_buffer.size()) {
m_cond_not_full.notify_all();
}
if (sq.privateSize() < sq.privateMaxNumElements()) {
if (sq.m_size < sq.m_circular_buffer.size()) {
sq.m_cond_not_full.notify_all();
}
}
@ -261,12 +264,13 @@ public:
m_head = sq.m_head;
m_tail = sq.m_tail;
m_size = sq.m_size;
if (privateSize() > 0) {
if (m_size > 0) {
m_cond_not_empty.notify_all();
}
if (privateSize() < privateMaxNumElements()) {
if (m_size < m_circular_buffer.size()) {
m_cond_not_full.notify_all();
}
}
@ -282,25 +286,14 @@ private:
* the next (valid !) index at which an element can be pushed.
* m_head == m_tail means empty.
*/
int m_head = 0, m_tail = 0;
int m_head = 0, m_tail = 0;
//hold the current number of elements.
size_type m_size = 0;
//
inline int nextIndex(int index, int modulus) const {
return (index + 1 == modulus) ? 0 : index + 1;
}
//
inline int privateSize() const {
if (m_head <= m_tail) {
return m_tail - m_head;
}
return (m_tail - m_head + (int)m_circular_buffer.size());
}
//
inline int privateMaxNumElements() const {
return (int)m_circular_buffer.size() - 1;
inline int nextIndex(int index, int mod) const {
return (index + 1 == mod) ? 0 : index + 1;
}
mutable std::mutex m_mutex;