From e0af60922473209b38cc26f7af6a07134529a53c Mon Sep 17 00:00:00 2001 From: vsonnier Date: Sat, 20 May 2017 21:49:17 +0200 Subject: [PATCH 1/2] Use a circular buffer for ThreadBlockingQueue (allocation-free) --- src/util/ThreadBlockingQueue.h | 121 ++++++++----- src/util/ThreadQueue.cpp | 4 - src/util/ThreadQueue.h | 302 --------------------------------- 3 files changed, 81 insertions(+), 346 deletions(-) delete mode 100644 src/util/ThreadQueue.cpp delete mode 100644 src/util/ThreadQueue.h diff --git a/src/util/ThreadBlockingQueue.h b/src/util/ThreadBlockingQueue.h index 47819d0..323fc34 100644 --- a/src/util/ThreadBlockingQueue.h +++ b/src/util/ThreadBlockingQueue.h @@ -3,7 +3,7 @@ #pragma once -#include +#include #include #include #include @@ -29,22 +29,23 @@ class ThreadQueueBase { template class ThreadBlockingQueue : public ThreadQueueBase { - typedef typename std::deque::value_type value_type; - typedef typename std::deque::size_type size_type; + typedef typename std::vector::value_type value_type; + typedef typename std::vector::size_type size_type; public: /*! Create safe blocking queue. */ ThreadBlockingQueue() { //at least 1 (== Exchanger) - m_max_num_items = MIN_ITEM_NB; + m_circular_buffer.resize(MIN_ITEM_NB + 1); //there is one slot more than the size for internal management. }; //Copy constructor ThreadBlockingQueue(const ThreadBlockingQueue& sq) { std::lock_guard < std::mutex > lock(sq.m_mutex); - m_queue = sq.m_queue; - m_max_num_items = sq.m_max_num_items; + m_circular_buffer = sq.m_circular_buffer; + m_head = sq.m_head; + m_tail = sq.m_tail; } /*! Destroy safe queue. */ @@ -60,10 +61,11 @@ public: void set_max_num_items(unsigned int max_num_items) { std::lock_guard < std::mutex > lock(m_mutex); - if (max_num_items > m_max_num_items) { + if (max_num_items > (unsigned int)privateMaxNumElements()) { //Only raise the existing max size, never reduce it //for simplification sake at runtime. - m_max_num_items = max_num_items; + m_circular_buffer.resize(max_num_items + 1); // there is 1 extra allocated slot. + //m_head and m_tail stays valid. m_cond_not_full.notify_all(); } } @@ -83,14 +85,14 @@ public: if (timeout == BLOCKING_INFINITE_TIMEOUT) { m_cond_not_full.wait(lock, [this]() // Lambda funct { - return m_queue.size() < m_max_num_items; + return privateSize() < privateMaxNumElements(); }); - } else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.size() >= m_max_num_items) { + } else if (timeout <= NON_BLOCKING_TIMEOUT && privateSize() >= privateMaxNumElements()) { // if the value is below a threshold, consider it is a try_push() return false; } else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout), - [this]() { return m_queue.size() < m_max_num_items; })) { + [this]() { return privateSize() < privateMaxNumElements(); })) { std::thread::id currentThreadId = std::this_thread::get_id(); std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " << @@ -98,8 +100,11 @@ public: return false; } - m_queue.push_back(item); - m_cond_not_empty.notify_all(); + //m_tail is already the next valid place an item can be put + m_circular_buffer[m_tail] = item; + m_tail = nextIndex(m_tail, (int)m_circular_buffer.size()); + + m_cond_not_empty.notify_all(); return true; } @@ -111,11 +116,14 @@ public: bool try_push(const value_type& item) { std::lock_guard < std::mutex > lock(m_mutex); - if (m_queue.size() >= m_max_num_items) { + if (privateSize() >= privateMaxNumElements()) { return false; } - m_queue.push_back(item); + //m_tail is already the next valid place an item can be put + m_circular_buffer[m_tail] = item; + m_tail = nextIndex(m_tail, (int)m_circular_buffer.size()); + m_cond_not_empty.notify_all(); return true; } @@ -132,14 +140,14 @@ public: if (timeout == BLOCKING_INFINITE_TIMEOUT) { m_cond_not_empty.wait(lock, [this]() // Lambda funct { - return !m_queue.empty(); + return privateSize() > 0; }); - } else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.empty()) { + } else if (timeout <= NON_BLOCKING_TIMEOUT && privateSize() == 0) { // if the value is below a threshold, consider it is try_pop() return false; } else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout), - [this]() { return !m_queue.empty(); })) { + [this]() { return privateSize() > 0; })) { std::thread::id currentThreadId = std::this_thread::get_id(); std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " << @@ -147,8 +155,9 @@ public: return false; } - item = m_queue.front(); - m_queue.pop_front(); + item = m_circular_buffer[m_head]; + m_head = nextIndex(m_head, (int)m_circular_buffer.size()); + m_cond_not_full.notify_all(); return true; } @@ -161,12 +170,13 @@ public: bool try_pop(value_type& item) { std::lock_guard < std::mutex > lock(m_mutex); - if (m_queue.empty()) { + if (privateSize() == 0) { return false; } - item = m_queue.front(); - m_queue.pop_front(); + item = m_circular_buffer[m_head]; + m_head = nextIndex(m_head, (int)m_circular_buffer.size()); + m_cond_not_full.notify_all(); return true; } @@ -178,7 +188,7 @@ public: */ size_type size() const { std::lock_guard < std::mutex > lock(m_mutex); - return m_queue.size(); + return privateSize(); } /** @@ -187,7 +197,7 @@ public: */ bool empty() const { std::lock_guard < std::mutex > lock(m_mutex); - return m_queue.empty(); + return privateSize() == 0; } /** @@ -196,7 +206,7 @@ public: */ bool full() const { std::lock_guard < std::mutex > lock(m_mutex); - return (m_queue.size() >= m_max_num_items); + return (privateSize() >= privateMaxNumElements()); } /** @@ -204,7 +214,9 @@ public: */ void flush() { std::lock_guard < std::mutex > lock(m_mutex); - m_queue.clear(); + m_head = 0; + m_tail = 0; + m_cond_not_full.notify_all(); } @@ -216,22 +228,24 @@ public: if (this != &sq) { std::lock_guard < std::mutex > lock1(m_mutex); std::lock_guard < std::mutex > lock2(sq.m_mutex); - m_queue.swap(sq.m_queue); - std::swap(m_max_num_items, sq.m_max_num_items); + m_circular_buffer.swap(sq.m_circular_buffer); - if (!m_queue.empty()) { + std::swap(m_head, sq.m_head); + std::swap(m_tail, sq.m_tail); + + if (privateSize() > 0) { m_cond_not_empty.notify_all(); } - if (!sq.m_queue.empty()) { + if (sq.privateSize() > 0) { sq.m_cond_not_empty.notify_all(); } - if (!m_queue.full()) { + if (privateSize() < privateMaxNumElements()) { m_cond_not_full.notify_all(); } - if (!sq.m_queue.full()) { + if (sq.privateSize() < sq.privateMaxNumElements()) { sq.m_cond_not_full.notify_all(); } } @@ -243,14 +257,16 @@ public: std::lock_guard < std::mutex > lock1(m_mutex); std::lock_guard < std::mutex > lock2(sq.m_mutex); - m_queue = sq.m_queue; - m_max_num_items = sq.m_max_num_items; + m_circular_buffer = sq.m_circular_buffer; - if (!m_queue.empty()) { + m_head = sq.m_head; + m_tail = sq.m_tail; + + if (privateSize() > 0) { m_cond_not_empty.notify_all(); } - if (!m_queue.full()) { + if (privateSize() < privateMaxNumElements()) { m_cond_not_full.notify_all(); } } @@ -258,13 +274,38 @@ public: } private: - //TODO: use a circular buffer structure ? (fixed array + modulo) - std::deque m_queue; + /// use a circular buffer structure to prevent allocations / reallocations (fixed array + modulo) + std::vector m_circular_buffer; + + /** + * The 'head' index of the element at the head of the deque, 'tail' + * the next (valid !) index at which an element can be pushed. + * m_head == m_tail means empty. + */ + int m_head = 0, m_tail = 0; + + // + inline int nextIndex(int index, int modulus) const { + return (index + 1 == modulus) ? 0 : index + 1; + } + + // + inline int privateSize() const { + if (m_head <= m_tail) { + return m_tail - m_head; + } + + return (m_tail - m_head + (int)m_circular_buffer.size()); + } + + // + inline int privateMaxNumElements() const { + return (int)m_circular_buffer.size() - 1; + } mutable std::mutex m_mutex; std::condition_variable m_cond_not_empty; std::condition_variable m_cond_not_full; - size_t m_max_num_items = MIN_ITEM_NB; }; /*! Swaps the contents of two ThreadBlockingQueue objects. (external operator) */ diff --git a/src/util/ThreadQueue.cpp b/src/util/ThreadQueue.cpp deleted file mode 100644 index 3597da5..0000000 --- a/src/util/ThreadQueue.cpp +++ /dev/null @@ -1,4 +0,0 @@ -// Copyright (c) Charles J. Cliffe -// SPDX-License-Identifier: GPL-2.0+ - -#include \ No newline at end of file diff --git a/src/util/ThreadQueue.h b/src/util/ThreadQueue.h deleted file mode 100644 index 1805b4b..0000000 --- a/src/util/ThreadQueue.h +++ /dev/null @@ -1,302 +0,0 @@ -// Copyright (c) Charles J. Cliffe -// SPDX-License-Identifier: GPL-2.0+ - -#pragma once - -/* Credit to Alfredo Pons / https://plus.google.com/109903449837592676231 - * Code from http://gnodebian.blogspot.com.es/2013/07/a-thread-safe-asynchronous-queue-in-c11.html - * - * Changes: - * Charles J. Nov-19-2014 - * - Renamed SafeQueue -> ThreadQueue - * Sonnier.V Feb-10-2017 - * - Simplified, various fixes - */ - -#include -#include -#include -#include -#include -#include - -class ThreadQueueBase { -}; - -/** A thread-safe asynchronous queue */ -template -class ThreadQueue : public ThreadQueueBase { - - typedef typename std::deque::value_type value_type; - typedef typename std::deque::size_type size_type; - -public: - - /*! Create safe queue. */ - ThreadQueue() { - m_max_num_items = 0; - }; - ThreadQueue(ThreadQueue&& sq) { - m_queue = std::move(sq.m_queue); - m_max_num_items = sq.m_max_num_items; - } - ThreadQueue(const ThreadQueue& sq) { - std::lock_guard < std::mutex > lock(sq.m_mutex); - m_queue = sq.m_queue; - m_max_num_items = sq.m_max_num_items; - } - - /*! Destroy safe queue. */ - ~ThreadQueue() { - std::lock_guard < std::mutex > lock(m_mutex); - } - - /** - * Sets the maximum number of items in the queue. Defaults is 0: No limit - * \param[in] item An item. - */ - void set_max_num_items(unsigned int max_num_items) { - std::lock_guard < std::mutex > lock(m_mutex); - m_max_num_items = max_num_items; - } - - /** - * Pushes the item into the queue. - * \param[in] item An item. - * \return true if an item was pushed into the queue - */ - bool push(const value_type& item) { - std::lock_guard < std::mutex > lock(m_mutex); - - if (m_max_num_items > 0 && m_queue.size() > m_max_num_items) { - return false; - } - - m_queue.push_back(item); - m_cond_not_empty.notify_all(); - return true; - } - - /** - * Pushes the item into the queue. - * \param[in] item An item. - * \return true if an item was pushed into the queue - */ - bool push(const value_type&& item) { - std::lock_guard < std::mutex > lock(m_mutex); - - if (m_max_num_items > 0 && m_queue.size() > m_max_num_items) { - return false; - } - - m_queue.push_back(item); - m_cond_not_empty.notify_all(); - return true; - } - - /** - * Pops item from the queue. If queue is empty, this function blocks until item becomes available. - * \param[out] item The item. - */ - void pop(value_type& item) { - std::unique_lock < std::mutex > lock(m_mutex); - m_cond_not_empty.wait(lock, [this]() // Lambda funct - { - return !m_queue.empty(); - }); - item = m_queue.front(); - m_queue.pop_front(); - } - - /** - * Pops item from the queue using the contained type's move assignment operator, if it has one.. - * This method is identical to the pop() method if that type has no move assignment operator. - * If queue is empty, this function blocks until item becomes available. - * \param[out] item The item. - */ - void move_pop(value_type& item) { - std::unique_lock < std::mutex > lock(m_mutex); - m_cond_not_empty.wait(lock, [this]() // Lambda funct - { - return !m_queue.empty(); - }); - item = std::move(m_queue.front()); - m_queue.pop_front(); - } - - /** - * Tries to pop item from the queue. - * \param[out] item The item. - * \return False is returned if no item is available. - */ - bool try_pop(value_type& item) { - std::lock_guard < std::mutex > lock(m_mutex); - - if (m_queue.empty()) - return false; - - item = m_queue.front(); - m_queue.pop_front(); - return true; - } - - /** - * Tries to pop item from the queue using the contained type's move assignment operator, if it has one.. - * This method is identical to the try_pop() method if that type has no move assignment operator. - * \param[out] item The item. - * \return False is returned if no item is available. - */ - bool try_move_pop(value_type& item) { - std::lock_guard < std::mutex > lock(m_mutex); - - if (m_queue.empty()) - return false; - - item = std::move(m_queue.front()); - m_queue.pop_front(); - return true; - } - - /** - * Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available. - * \param[out] t An item. - * \param[in] timeout The number of microseconds to wait. - * \return true if get an item from the queue, false if no item is received before the timeout. - */ - bool timeout_pop(value_type& item, std::uint64_t timeout) { - std::unique_lock < std::mutex > lock(m_mutex); - - if (m_queue.empty()) { - if (timeout == 0) - return false; - - if (m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout) - return false; - } - - item = m_queue.front(); - m_queue.pop_front(); - return true; - } - - /** - * Pops item from the queue using the contained type's move assignment operator, if it has one.. - * If the queue is empty, blocks for timeout microseconds, or until item becomes available. - * This method is identical to the try_pop() method if that type has no move assignment operator. - * \param[out] t An item. - * \param[in] timeout The number of microseconds to wait. - * \return true if get an item from the queue, false if no item is received before the timeout. - */ - bool timeout_move_pop(value_type& item, std::uint64_t timeout) { - std::unique_lock < std::mutex > lock(m_mutex); - - if (m_queue.empty()) { - if (timeout == 0) - return false; - - if (m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout)) == std::cv_status::timeout) - return false; - } - - item = std::move(m_queue.front()); - m_queue.pop_front(); - return true; - } - - /** - * Gets the number of items in the queue. - * \return Number of items in the queue. - */ - size_type size() const { - std::lock_guard < std::mutex > lock(m_mutex); - return m_queue.size(); - } - - /** - * Check if the queue is empty. - * \return true if queue is empty. - */ - bool empty() const { - std::lock_guard < std::mutex > lock(m_mutex); - return m_queue.empty(); - } - - /** - * Check if the queue is full. - * \return true if queue is full. - */ - bool full() const { - std::lock_guard < std::mutex > lock(m_mutex); - return (m_max_num_items != 0) && (m_queue.size() >= m_max_num_items); - } - - /** - * Remove any items in the queue. - */ - void flush() { - std::lock_guard < std::mutex > lock(m_mutex); - m_queue.clear(); - } - - /** - * Swaps the contents. - * \param[out] sq The ThreadQueue to swap with 'this'. - */ - void swap(ThreadQueue& sq) { - if (this != &sq) { - std::lock_guard < std::mutex > lock1(m_mutex); - std::lock_guard < std::mutex > lock2(sq.m_mutex); - m_queue.swap(sq.m_queue); - std::swap(m_max_num_items, sq.m_max_num_items); - - if (!m_queue.empty()) - m_cond_not_empty.notify_all(); - - - if (!sq.m_queue.empty()) - sq.m_cond_not_empty.notify_all(); - } - } - - /*! The copy assignment operator */ - ThreadQueue& operator=(const ThreadQueue& sq) { - if (this != &sq) { - std::lock_guard < std::mutex > lock1(m_mutex); - std::lock_guard < std::mutex > lock2(sq.m_mutex); - - m_queue = sq.m_queue; - m_max_num_items = sq.m_max_num_items; - - if (!m_queue.empty()) - m_cond_not_empty.notify_all(); - } - - return *this; - } - - /*! The move assignment operator */ - ThreadQueue& operator=(ThreadQueue && sq) { - std::lock_guard < std::mutex > lock(m_mutex); - m_queue = std::move(sq.m_queue); - m_max_num_items = sq.m_max_num_items; - - if (!m_queue.empty()) - m_cond_not_empty.notify_all(); - - return *this; - } - -private: - - std::deque m_queue; - - mutable std::mutex m_mutex; - std::condition_variable m_cond_not_empty; - size_t m_max_num_items; -}; - -/*! Swaps the contents of two ThreadQueue objects. */ -template -void swap(ThreadQueue& q1, ThreadQueue& q2) { - q1.swap(q2); -} From 41c7dd231b648bc0bbc11c8e42a2a52dc56e92e4 Mon Sep 17 00:00:00 2001 From: vsonnier Date: Sun, 21 May 2017 09:58:45 +0200 Subject: [PATCH 2/2] Fix tabs introduced in latest commits... --- src/sdr/SoapySDRThread.cpp | 152 ++++++++++++++++----------------- src/util/ThreadBlockingQueue.h | 82 +++++++++--------- src/util/Timer.cpp | 124 +++++++++++++-------------- src/util/Timer.h | 3 +- 4 files changed, 180 insertions(+), 181 deletions(-) diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index 4f5adb3..8ccce95 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -182,10 +182,10 @@ void SDRThread::deinit() { } void SDRThread::assureBufferMinSize(SDRThreadIQData * dataOut, size_t minSize) { - - if (dataOut->data.size() < minSize) { - dataOut->data.resize(minSize); - } + + if (dataOut->data.size() < minSize) { + dataOut->data.resize(minSize); + } } //Called in an infinite loop, read SaopySDR device to build @@ -199,42 +199,42 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { int nElems = numElems.load(); int mtElems = mtuElems.load(); - // Warning: if MTU > numElems, i.e if device MTU is too big w.r.t the sample rate, the TARGET_DISPLAY_FPS cannot - //be reached and the CubicSDR displays "slows down". - //To get back a TARGET_DISPLAY_FPS, the user need to adapt - //the SoapySDR Device to use smaller buffer sizes, because - // readStream() is suited to device MTU and cannot be really adapted dynamically. - //TODO: Add in doc the need to reduce SoapySDR device buffer length (if available) to restore higher fps. + // Warning: if MTU > numElems, i.e if device MTU is too big w.r.t the sample rate, the TARGET_DISPLAY_FPS cannot + //be reached and the CubicSDR displays "slows down". + //To get back a TARGET_DISPLAY_FPS, the user need to adapt + //the SoapySDR Device to use smaller buffer sizes, because + // readStream() is suited to device MTU and cannot be really adapted dynamically. + //TODO: Add in doc the need to reduce SoapySDR device buffer length (if available) to restore higher fps. - //0. Retreive a new batch - SDRThreadIQData *dataOut = buffers.getBuffer(); + //0. Retreive a new batch + SDRThreadIQData *dataOut = buffers.getBuffer(); //1.If overflow occured on the previous readStream(), transfer it in dataOut directly. - //Take care of the iq_swap option. + //Take care of the iq_swap option. if (numOverflow > 0) { int n_overflow = std::min(numOverflow, nElems); - assureBufferMinSize(dataOut, n_overflow); + assureBufferMinSize(dataOut, n_overflow); ::memcpy(&dataOut->data[0], &overflowBuffer.data[0], n_overflow * sizeof(liquid_float_complex)); n_read = n_overflow; numOverflow = std::min(0, numOverflow - n_overflow); - // std::cout << "SDRThread::readStream() 1.1 overflowBuffer not empty, collect the remaining " << n_overflow << " samples in it..." << std::endl; + // std::cout << "SDRThread::readStream() 1.1 overflowBuffer not empty, collect the remaining " << n_overflow << " samples in it..." << std::endl; if (numOverflow > 0) { // still some left, shift the remaining samples to the begining.. ::memmove(&overflowBuffer.data[0], &overflowBuffer.data[n_overflow], numOverflow * sizeof(liquid_float_complex)); - std::cout << "SDRThread::readStream() 1.2 overflowBuffer still not empty, compact the remaining " << numOverflow << " samples in it..." << std::endl; + std::cout << "SDRThread::readStream() 1.2 overflowBuffer still not empty, compact the remaining " << numOverflow << " samples in it..." << std::endl; } } //end if numOverflow > 0 //2. attempt readStream() at most nElems, by mtElems-sized chunks, append in dataOut->data directly. while (n_read < nElems && !stopping) { - - //Whatever the number of remaining samples needed to reach nElems, we always try to read a mtElems-size chunk, - //from which SoapySDR effectively returns n_stream_read. + + //Whatever the number of remaining samples needed to reach nElems, we always try to read a mtElems-size chunk, + //from which SoapySDR effectively returns n_stream_read. int n_stream_read = device->readStream(stream, buffs, mtElems, flags, timeNs); //if the n_stream_read <= 0, bail out from reading. @@ -252,7 +252,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { //n_requested is the exact number to reach nElems. int n_requested = nElems-n_read; - + //Copy at most n_requested CF32 into .data liquid_float_complex, //starting at n_read position. //inspired from SoapyRTLSDR code, this mysterious void** is indeed an array of CF32(real/imag) samples, indeed an array of @@ -261,67 +261,67 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { //nor that the Re/Im layout of fields matches the float array order, assign liquid_float_complex field by field. float *pp = (float *)buffs[0]; - assureBufferMinSize(dataOut, n_read + n_requested); + assureBufferMinSize(dataOut, n_read + n_requested); - if (iq_swap.load()) { - for (int i = 0; i < n_requested; i++) { - dataOut->data[n_read + i].imag = pp[2 * i]; - dataOut->data[n_read + i].real = pp[2 * i + 1]; - } - } else { - for (int i = 0; i < n_requested; i++) { - dataOut->data[n_read + i].real = pp[2 * i]; - dataOut->data[n_read + i].imag = pp[2 * i + 1]; - } - } + if (iq_swap.load()) { + for (int i = 0; i < n_requested; i++) { + dataOut->data[n_read + i].imag = pp[2 * i]; + dataOut->data[n_read + i].real = pp[2 * i + 1]; + } + } else { + for (int i = 0; i < n_requested; i++) { + dataOut->data[n_read + i].real = pp[2 * i]; + dataOut->data[n_read + i].imag = pp[2 * i + 1]; + } + } //shift of n_requested samples, each one made of 2 floats... pp += n_requested * 2; //numNewOverflow are in exess, they have to be added in the existing overflowBuffer. - int numNewOverflow = n_stream_read - n_requested; + int numNewOverflow = n_stream_read - n_requested; //so push the remainder samples to overflowBuffer: - if (numNewOverflow > 0) { - // std::cout << "SDRThread::readStream(): 2. SoapySDR read make nElems overflow by " << numNewOverflow << " samples..." << std::endl; - } - - assureBufferMinSize(&overflowBuffer, numOverflow + numNewOverflow); + if (numNewOverflow > 0) { + // std::cout << "SDRThread::readStream(): 2. SoapySDR read make nElems overflow by " << numNewOverflow << " samples..." << std::endl; + } + + assureBufferMinSize(&overflowBuffer, numOverflow + numNewOverflow); - if (iq_swap.load()) { + if (iq_swap.load()) { - for (int i = 0; i < numNewOverflow; i++) { - overflowBuffer.data[numOverflow + i].imag = pp[2 * i]; - overflowBuffer.data[numOverflow + i].real = pp[2 * i + 1]; - } - } - else { - for (int i = 0; i < numNewOverflow; i++) { - overflowBuffer.data[numOverflow + i].real = pp[2 * i]; - overflowBuffer.data[numOverflow + i].imag = pp[2 * i + 1]; - } - } - numOverflow += numNewOverflow; + for (int i = 0; i < numNewOverflow; i++) { + overflowBuffer.data[numOverflow + i].imag = pp[2 * i]; + overflowBuffer.data[numOverflow + i].real = pp[2 * i + 1]; + } + } + else { + for (int i = 0; i < numNewOverflow; i++) { + overflowBuffer.data[numOverflow + i].real = pp[2 * i]; + overflowBuffer.data[numOverflow + i].imag = pp[2 * i + 1]; + } + } + numOverflow += numNewOverflow; n_read += n_requested; } else if (n_stream_read > 0) { // no overflow, read the whole n_stream_read. float *pp = (float *)buffs[0]; - assureBufferMinSize(dataOut, n_read + n_stream_read); + assureBufferMinSize(dataOut, n_read + n_stream_read); - if (iq_swap.load()) { - for (int i = 0; i < n_stream_read; i++) { - dataOut->data[n_read + i].imag = pp[2 * i]; - dataOut->data[n_read + i].real = pp[2 * i + 1]; - } - } - else { - for (int i = 0; i < n_stream_read; i++) { - dataOut->data[n_read + i].real = pp[2 * i]; - dataOut->data[n_read + i].imag = pp[2 * i + 1]; - } - } + if (iq_swap.load()) { + for (int i = 0; i < n_stream_read; i++) { + dataOut->data[n_read + i].imag = pp[2 * i]; + dataOut->data[n_read + i].real = pp[2 * i + 1]; + } + } + else { + for (int i = 0; i < n_stream_read; i++) { + dataOut->data[n_read + i].real = pp[2 * i]; + dataOut->data[n_read + i].imag = pp[2 * i + 1]; + } + } n_read += n_stream_read; } else { @@ -329,11 +329,11 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { } } //end while - //3. At that point, dataOut contains nElems (or less if a read has return an error), try to post in queue, else discard. + //3. At that point, dataOut contains nElems (or less if a read has return an error), try to post in queue, else discard. if (n_read > 0 && !stopping && !iqDataOutQueue->full()) { - //clamp result: - dataOut->data.resize(n_read); + //clamp result: + dataOut->data.resize(n_read); dataOut->frequency = frequency.load(); dataOut->sampleRate = sampleRate.load(); @@ -350,11 +350,11 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { //saturation, let a chance to the other threads to consume the existing samples std::this_thread::yield(); } - } - else { - dataOut->setRefCount(0); - std::cout << "SDRThread::readStream(): 3.1 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl; - } + } + else { + dataOut->setRefCount(0); + std::cout << "SDRThread::readStream(): 3.1 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl; + } } void SDRThread::readLoop() { @@ -425,7 +425,7 @@ void SDRThread::updateSettings() { free(buffs[0]); buffs[0] = malloc(mtuElems.load() * 4 * sizeof(float)); //clear overflow buffer - numOverflow = 0; + numOverflow = 0; rate_changed.store(false); doUpdate = true; @@ -675,9 +675,9 @@ void SDRThread::setGain(std::string name, float value) { float SDRThread::getGain(std::string name) { std::lock_guard < std::mutex > lock(gain_busy); - float val = gainValues[name]; - - return val; + float val = gainValues[name]; + + return val; } void SDRThread::writeSetting(std::string name, std::string value) { diff --git a/src/util/ThreadBlockingQueue.h b/src/util/ThreadBlockingQueue.h index 323fc34..52fee41 100644 --- a/src/util/ThreadBlockingQueue.h +++ b/src/util/ThreadBlockingQueue.h @@ -37,15 +37,15 @@ public: /*! Create safe blocking queue. */ ThreadBlockingQueue() { //at least 1 (== Exchanger) - m_circular_buffer.resize(MIN_ITEM_NB + 1); //there is one slot more than the size for internal management. + m_circular_buffer.resize(MIN_ITEM_NB + 1); //there is one slot more than the size for internal management. }; //Copy constructor ThreadBlockingQueue(const ThreadBlockingQueue& sq) { std::lock_guard < std::mutex > lock(sq.m_mutex); m_circular_buffer = sq.m_circular_buffer; - m_head = sq.m_head; - m_tail = sq.m_tail; + m_head = sq.m_head; + m_tail = sq.m_tail; } /*! Destroy safe queue. */ @@ -64,7 +64,7 @@ public: if (max_num_items > (unsigned int)privateMaxNumElements()) { //Only raise the existing max size, never reduce it //for simplification sake at runtime. - m_circular_buffer.resize(max_num_items + 1); // there is 1 extra allocated slot. + m_circular_buffer.resize(max_num_items + 1); // there is 1 extra allocated slot. //m_head and m_tail stays valid. m_cond_not_full.notify_all(); } @@ -100,11 +100,11 @@ public: return false; } - //m_tail is already the next valid place an item can be put - m_circular_buffer[m_tail] = item; - m_tail = nextIndex(m_tail, (int)m_circular_buffer.size()); + //m_tail is already the next valid place an item can be put + m_circular_buffer[m_tail] = item; + m_tail = nextIndex(m_tail, (int)m_circular_buffer.size()); - m_cond_not_empty.notify_all(); + m_cond_not_empty.notify_all(); return true; } @@ -120,9 +120,9 @@ public: return false; } - //m_tail is already the next valid place an item can be put - m_circular_buffer[m_tail] = item; - m_tail = nextIndex(m_tail, (int)m_circular_buffer.size()); + //m_tail is already the next valid place an item can be put + m_circular_buffer[m_tail] = item; + m_tail = nextIndex(m_tail, (int)m_circular_buffer.size()); m_cond_not_empty.notify_all(); return true; @@ -156,7 +156,7 @@ public: } item = m_circular_buffer[m_head]; - m_head = nextIndex(m_head, (int)m_circular_buffer.size()); + m_head = nextIndex(m_head, (int)m_circular_buffer.size()); m_cond_not_full.notify_all(); return true; @@ -174,8 +174,8 @@ public: return false; } - item = m_circular_buffer[m_head]; - m_head = nextIndex(m_head, (int)m_circular_buffer.size()); + item = m_circular_buffer[m_head]; + m_head = nextIndex(m_head, (int)m_circular_buffer.size()); m_cond_not_full.notify_all(); return true; @@ -214,8 +214,8 @@ public: */ void flush() { std::lock_guard < std::mutex > lock(m_mutex); - m_head = 0; - m_tail = 0; + m_head = 0; + m_tail = 0; m_cond_not_full.notify_all(); } @@ -231,7 +231,7 @@ public: m_circular_buffer.swap(sq.m_circular_buffer); std::swap(m_head, sq.m_head); - std::swap(m_tail, sq.m_tail); + std::swap(m_tail, sq.m_tail); if (privateSize() > 0) { m_cond_not_empty.notify_all(); @@ -257,10 +257,10 @@ public: std::lock_guard < std::mutex > lock1(m_mutex); std::lock_guard < std::mutex > lock2(sq.m_mutex); - m_circular_buffer = sq.m_circular_buffer; + m_circular_buffer = sq.m_circular_buffer; - m_head = sq.m_head; - m_tail = sq.m_tail; + m_head = sq.m_head; + m_tail = sq.m_tail; if (privateSize() > 0) { m_cond_not_empty.notify_all(); @@ -277,31 +277,31 @@ private: /// use a circular buffer structure to prevent allocations / reallocations (fixed array + modulo) std::vector m_circular_buffer; - /** - * The 'head' index of the element at the head of the deque, 'tail' - * the next (valid !) index at which an element can be pushed. - * m_head == m_tail means empty. - */ - int m_head = 0, m_tail = 0; + /** + * The 'head' index of the element at the head of the deque, 'tail' + * the next (valid !) index at which an element can be pushed. + * m_head == m_tail means empty. + */ + int m_head = 0, m_tail = 0; - // - inline int nextIndex(int index, int modulus) const { - return (index + 1 == modulus) ? 0 : index + 1; - } + // + inline int nextIndex(int index, int modulus) const { + return (index + 1 == modulus) ? 0 : index + 1; + } - // - inline int privateSize() const { - if (m_head <= m_tail) { - return m_tail - m_head; - } + // + inline int privateSize() const { + if (m_head <= m_tail) { + return m_tail - m_head; + } - return (m_tail - m_head + (int)m_circular_buffer.size()); - } + return (m_tail - m_head + (int)m_circular_buffer.size()); + } - // - inline int privateMaxNumElements() const { - return (int)m_circular_buffer.size() - 1; - } + // + inline int privateMaxNumElements() const { + return (int)m_circular_buffer.size() - 1; + } mutable std::mutex m_mutex; std::condition_variable m_cond_not_empty; diff --git a/src/util/Timer.cpp b/src/util/Timer.cpp index d7c6717..cf3389d 100644 --- a/src/util/Timer.cpp +++ b/src/util/Timer.cpp @@ -4,7 +4,7 @@ #include "Timer.h" #ifdef _WIN32 - #include + #include #endif #include @@ -12,167 +12,167 @@ Timer::Timer(void) : time_elapsed(0), system_milliseconds(0), start_time(0), end_time(0), last_update(0), num_updates(0), paused_time(0), offset(0), paused_state(false), lock_state(false), lock_rate(0) { #ifdef _WIN32 - // According to Microsoft, QueryPerformanceXXX API is perfectly - //fine for Windows 7+ systems, and use the highest appropriate counter. - //this only need to be done once. - ::QueryPerformanceFrequency(&win_frequency); + // According to Microsoft, QueryPerformanceXXX API is perfectly + //fine for Windows 7+ systems, and use the highest appropriate counter. + //this only need to be done once. + ::QueryPerformanceFrequency(&win_frequency); #endif; } void Timer::start(void) { - update(); - num_updates = 0; - start_time = system_milliseconds; - last_update = start_time; - paused_state = false; - lock_state = false; - lock_rate = 0; - paused_time = 0; - offset = 0; + update(); + num_updates = 0; + start_time = system_milliseconds; + last_update = start_time; + paused_state = false; + lock_state = false; + lock_rate = 0; + paused_time = 0; + offset = 0; } void Timer::stop(void) { - end_time = system_milliseconds; + end_time = system_milliseconds; } void Timer::reset(void) { - start(); + start(); } void Timer::lockFramerate(float f_rate) { - lock_rate = 1.0f/f_rate; - lock_state = true; + lock_rate = 1.0f/f_rate; + lock_state = true; } void Timer::unlock() { - unsigned long msec_tmp = system_milliseconds; - - lock_state = false; + unsigned long msec_tmp = system_milliseconds; + + lock_state = false; - update(); - - last_update = system_milliseconds-(unsigned long)lock_rate; - - offset += msec_tmp-system_milliseconds; - - lock_rate = 0; + update(); + + last_update = system_milliseconds-(unsigned long)lock_rate; + + offset += msec_tmp-system_milliseconds; + + lock_rate = 0; } bool Timer::locked() { - return lock_state; + return lock_state; } void Timer::update(void) { - num_updates++; - last_update = system_milliseconds; - - - if (lock_state) - { - system_milliseconds += (unsigned long)(lock_rate*1000.0); - } - else - { + num_updates++; + last_update = system_milliseconds; + + + if (lock_state) + { + system_milliseconds += (unsigned long)(lock_rate*1000.0); + } + else + { #ifdef _WIN32 - //Use QuaryPerformanceCounter, imune to problems sometimes - //multimedia timers have. - LARGE_INTEGER win_current_count; - ::QueryPerformanceCounter(&win_current_count); + //Use QuaryPerformanceCounter, imune to problems sometimes + //multimedia timers have. + LARGE_INTEGER win_current_count; + ::QueryPerformanceCounter(&win_current_count); - system_milliseconds = (unsigned long)(win_current_count.QuadPart * 1000.0 / win_frequency.QuadPart); + system_milliseconds = (unsigned long)(win_current_count.QuadPart * 1000.0 / win_frequency.QuadPart); #else - gettimeofday(&time_val,&time_zone); + gettimeofday(&time_val,&time_zone); - system_milliseconds = (unsigned long)time_val.tv_usec; - system_milliseconds /= 1000; - system_milliseconds += (unsigned long)(time_val.tv_sec*1000); + system_milliseconds = (unsigned long)time_val.tv_usec; + system_milliseconds /= 1000; + system_milliseconds += (unsigned long)(time_val.tv_sec*1000); #endif - } + } - if (paused_state) paused_time += system_milliseconds-last_update; + if (paused_state) paused_time += system_milliseconds-last_update; - time_elapsed = system_milliseconds-start_time-paused_time+offset; + time_elapsed = system_milliseconds-start_time-paused_time+offset; } unsigned long Timer::getMilliseconds(void) { - return time_elapsed; + return time_elapsed; } double Timer::getSeconds(void) { - return ((double)getMilliseconds())/1000.0; + return ((double)getMilliseconds())/1000.0; } void Timer::setMilliseconds(unsigned long milliseconds_in) { - offset -= (system_milliseconds-start_time-paused_time+offset)-milliseconds_in; + offset -= (system_milliseconds-start_time-paused_time+offset)-milliseconds_in; } void Timer::setSeconds(double seconds_in) { - setMilliseconds((long)(seconds_in*1000.0)); + setMilliseconds((long)(seconds_in*1000.0)); } double Timer::lastUpdateSeconds(void) { - return ((double)lastUpdateMilliseconds())/1000.0; + return ((double)lastUpdateMilliseconds())/1000.0; } unsigned long Timer::lastUpdateMilliseconds(void) { - return system_milliseconds-last_update; + return system_milliseconds-last_update; } unsigned long Timer::totalMilliseconds() { - return system_milliseconds-start_time; + return system_milliseconds-start_time; } double Timer::totalSeconds(void) { - return totalMilliseconds()/1000.0; + return totalMilliseconds()/1000.0; } unsigned long Timer::getNumUpdates(void) { - return num_updates; + return num_updates; } void Timer::paused(bool pause_in) { - paused_state = pause_in; + paused_state = pause_in; } bool Timer::paused() { - return paused_state; + return paused_state; } void Timer::timerTestFunc() { diff --git a/src/util/Timer.h b/src/util/Timer.h index 95bafcb..eb935cf 100644 --- a/src/util/Timer.h +++ b/src/util/Timer.h @@ -18,7 +18,6 @@ class Timer { private: - //units are microsecs: unsigned long time_elapsed; unsigned long system_milliseconds; unsigned long start_time; @@ -32,7 +31,7 @@ private: struct timeval time_val; struct timezone time_zone; #else - LARGE_INTEGER win_frequency; + LARGE_INTEGER win_frequency; #endif; bool paused_state;