From af20f680a402763f51edddfbb306c7587ab3d939 Mon Sep 17 00:00:00 2001 From: vsonnier Date: Sun, 21 May 2017 21:04:12 +0200 Subject: [PATCH] Clearer ThreadBlockingQueue impl by using a field to hold size... --- src/util/ThreadBlockingQueue.h | 79 ++++++++++++++++------------------ 1 file changed, 36 insertions(+), 43 deletions(-) diff --git a/src/util/ThreadBlockingQueue.h b/src/util/ThreadBlockingQueue.h index 52fee41..d4e5588 100644 --- a/src/util/ThreadBlockingQueue.h +++ b/src/util/ThreadBlockingQueue.h @@ -37,7 +37,7 @@ public: /*! Create safe blocking queue. */ ThreadBlockingQueue() { //at least 1 (== Exchanger) - m_circular_buffer.resize(MIN_ITEM_NB + 1); //there is one slot more than the size for internal management. + m_circular_buffer.resize(MIN_ITEM_NB); }; //Copy constructor @@ -46,6 +46,7 @@ public: m_circular_buffer = sq.m_circular_buffer; m_head = sq.m_head; m_tail = sq.m_tail; + m_size = sq.m_size; } /*! Destroy safe queue. */ @@ -61,11 +62,11 @@ public: void set_max_num_items(unsigned int max_num_items) { std::lock_guard < std::mutex > lock(m_mutex); - if (max_num_items > (unsigned int)privateMaxNumElements()) { + if (max_num_items > (unsigned int)m_circular_buffer.size()) { //Only raise the existing max size, never reduce it //for simplification sake at runtime. - m_circular_buffer.resize(max_num_items + 1); // there is 1 extra allocated slot. - //m_head and m_tail stays valid. + m_circular_buffer.resize(max_num_items); + //m_head and m_tail stays valid for the new size. m_cond_not_full.notify_all(); } } @@ -85,14 +86,14 @@ public: if (timeout == BLOCKING_INFINITE_TIMEOUT) { m_cond_not_full.wait(lock, [this]() // Lambda funct { - return privateSize() < privateMaxNumElements(); + return m_size < m_circular_buffer.size(); }); - } else if (timeout <= NON_BLOCKING_TIMEOUT && privateSize() >= privateMaxNumElements()) { + } else if (timeout <= NON_BLOCKING_TIMEOUT && m_size >= m_circular_buffer.size()) { // if the value is below a threshold, consider it is a try_push() return false; } else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout), - [this]() { return privateSize() < privateMaxNumElements(); })) { + [this]() { return m_size < m_circular_buffer.size(); })) { std::thread::id currentThreadId = std::this_thread::get_id(); std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " << @@ -103,7 +104,8 @@ public: //m_tail is already the next valid place an item can be put m_circular_buffer[m_tail] = item; m_tail = nextIndex(m_tail, (int)m_circular_buffer.size()); - + m_size++; + m_cond_not_empty.notify_all(); return true; } @@ -116,14 +118,14 @@ public: bool try_push(const value_type& item) { std::lock_guard < std::mutex > lock(m_mutex); - if (privateSize() >= privateMaxNumElements()) { + if (m_size >= m_circular_buffer.size()) { return false; } //m_tail is already the next valid place an item can be put m_circular_buffer[m_tail] = item; m_tail = nextIndex(m_tail, (int)m_circular_buffer.size()); - + m_size++; m_cond_not_empty.notify_all(); return true; } @@ -140,14 +142,14 @@ public: if (timeout == BLOCKING_INFINITE_TIMEOUT) { m_cond_not_empty.wait(lock, [this]() // Lambda funct { - return privateSize() > 0; + return m_size > 0; }); - } else if (timeout <= NON_BLOCKING_TIMEOUT && privateSize() == 0) { + } else if (timeout <= NON_BLOCKING_TIMEOUT && m_size == 0) { // if the value is below a threshold, consider it is try_pop() return false; } else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout), - [this]() { return privateSize() > 0; })) { + [this]() { return m_size > 0; })) { std::thread::id currentThreadId = std::this_thread::get_id(); std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " << @@ -157,7 +159,7 @@ public: item = m_circular_buffer[m_head]; m_head = nextIndex(m_head, (int)m_circular_buffer.size()); - + m_size--; m_cond_not_full.notify_all(); return true; } @@ -170,13 +172,13 @@ public: bool try_pop(value_type& item) { std::lock_guard < std::mutex > lock(m_mutex); - if (privateSize() == 0) { + if (m_size == 0) { return false; } item = m_circular_buffer[m_head]; m_head = nextIndex(m_head, (int)m_circular_buffer.size()); - + m_size--; m_cond_not_full.notify_all(); return true; } @@ -188,7 +190,7 @@ public: */ size_type size() const { std::lock_guard < std::mutex > lock(m_mutex); - return privateSize(); + return m_size; } /** @@ -197,7 +199,7 @@ public: */ bool empty() const { std::lock_guard < std::mutex > lock(m_mutex); - return privateSize() == 0; + return m_size == 0; } /** @@ -206,7 +208,7 @@ public: */ bool full() const { std::lock_guard < std::mutex > lock(m_mutex); - return (privateSize() >= privateMaxNumElements()); + return (m_size >= m_circular_buffer.size()); } /** @@ -216,7 +218,7 @@ public: std::lock_guard < std::mutex > lock(m_mutex); m_head = 0; m_tail = 0; - + m_size = 0; m_cond_not_full.notify_all(); } @@ -232,20 +234,21 @@ public: std::swap(m_head, sq.m_head); std::swap(m_tail, sq.m_tail); + std::swap(m_size, sq.m_size); - if (privateSize() > 0) { + if (m_size > 0) { m_cond_not_empty.notify_all(); } - if (sq.privateSize() > 0) { + if (sq.m_size > 0) { sq.m_cond_not_empty.notify_all(); } - if (privateSize() < privateMaxNumElements()) { + if (m_size < m_circular_buffer.size()) { m_cond_not_full.notify_all(); } - if (sq.privateSize() < sq.privateMaxNumElements()) { + if (sq.m_size < sq.m_circular_buffer.size()) { sq.m_cond_not_full.notify_all(); } } @@ -261,12 +264,13 @@ public: m_head = sq.m_head; m_tail = sq.m_tail; + m_size = sq.m_size; - if (privateSize() > 0) { + if (m_size > 0) { m_cond_not_empty.notify_all(); } - if (privateSize() < privateMaxNumElements()) { + if (m_size < m_circular_buffer.size()) { m_cond_not_full.notify_all(); } } @@ -282,25 +286,14 @@ private: * the next (valid !) index at which an element can be pushed. * m_head == m_tail means empty. */ - int m_head = 0, m_tail = 0; + int m_head = 0, m_tail = 0; + + //hold the current number of elements. + size_type m_size = 0; // - inline int nextIndex(int index, int modulus) const { - return (index + 1 == modulus) ? 0 : index + 1; - } - - // - inline int privateSize() const { - if (m_head <= m_tail) { - return m_tail - m_head; - } - - return (m_tail - m_head + (int)m_circular_buffer.size()); - } - - // - inline int privateMaxNumElements() const { - return (int)m_circular_buffer.size() - 1; + inline int nextIndex(int index, int mod) const { + return (index + 1 == mod) ? 0 : index + 1; } mutable std::mutex m_mutex;