diff --git a/src/IOThread.cpp b/src/IOThread.cpp index 29fd80b..074af53 100644 --- a/src/IOThread.cpp +++ b/src/IOThread.cpp @@ -127,7 +127,7 @@ bool IOThread::isTerminated(int waitMs) { } } - std::cout << "ERROR: thread '" << typeid(*this).name() << "' has not terminated in time ! (> " << waitMs << " ms)" << std::endl; + std::cout << "ERROR: thread '" << typeid(*this).name() << "' has not terminated in time ! (> " << waitMs << " ms)" << std::endl << std::flush; return terminated.load(); } diff --git a/src/IOThread.h b/src/IOThread.h index 3ca3675..3449ae2 100644 --- a/src/IOThread.h +++ b/src/IOThread.h @@ -99,7 +99,7 @@ public: } if (outputBuffers.size() > REBUFFER_WARNING_THRESHOLD) { - std::cout << "Warning: ReBuffer '" << bufferId << "' count '" << outputBuffers.size() << "' exceeds threshold of '" << REBUFFER_WARNING_THRESHOLD << "'" << std::endl; + std::cout << "Warning: ReBuffer '" << bufferId << "' count '" << outputBuffers.size() << "' exceeds threshold of '" << REBUFFER_WARNING_THRESHOLD << "'" << std::endl << std::flush; } //3.We need to allocate a new buffer. diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index 47ec0e9..a96cf34 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -11,6 +11,8 @@ #include #include +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) std::map AudioThread::deviceController; std::map AudioThread::deviceSampleRate; @@ -429,7 +431,9 @@ void AudioThread::run() { while (!stopping) { AudioThreadCommand command; - cmdQueue.pop(command); + if (!cmdQueue.pop(command, HEARTBEAT_CHECK_PERIOD_MICROS)) { + continue; + } if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE) { setupDevice(command.int_value); diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index e6ac58c..f7a6217 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -12,6 +12,9 @@ #include "CubicSDR.h" #include "DemodulatorInstance.h" +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) + DemodulatorPreThread::DemodulatorPreThread(DemodulatorInstance *parent) : IOThread(), iqResampler(NULL), iqResampleRatio(1), cModem(nullptr), cModemKit(nullptr), iqInputQueue(NULL), iqOutputQueue(NULL) { initialized.store(false); @@ -73,7 +76,9 @@ void DemodulatorPreThread::run() { while (!stopping) { DemodulatorThreadIQDataPtr inp; - iqInputQueue->pop(inp); + if (!iqInputQueue->pop(inp, HEARTBEAT_CHECK_PERIOD_MICROS)) { + continue; + } if (frequencyChanged.load()) { currentFrequency.store(newFrequency); diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index faa4d09..5767c08 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -12,6 +12,9 @@ #define M_PI 3.14159265358979323846 #endif +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) + #ifdef __APPLE__ #include #endif @@ -81,7 +84,9 @@ void DemodulatorThread::run() { while (!stopping) { DemodulatorThreadPostIQDataPtr inp; - iqInputQueue->pop(inp); + if (!iqInputQueue->pop(inp, HEARTBEAT_CHECK_PERIOD_MICROS)) { + continue; + } size_t bufSize = inp->data.size(); diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index b1304a7..b7e6daa 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -6,6 +6,9 @@ #include "CubicSDR.h" #include +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) + DemodulatorWorkerThread::DemodulatorWorkerThread() : IOThread(), commandQueue(NULL), resultQueue(NULL), cModem(nullptr), cModemKit(nullptr) { } @@ -31,7 +34,9 @@ void DemodulatorWorkerThread::run() { //we are waiting for the first command to show up (blocking!) //then consuming the commands until done. while (!done) { - commandQueue->pop(command); + if (!commandQueue->pop(command, HEARTBEAT_CHECK_PERIOD_MICROS)) { + continue; + } switch (command.cmd) { case DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS: diff --git a/src/forms/Bookmark/BookmarkView.cpp b/src/forms/Bookmark/BookmarkView.cpp index 883b813..699ef9d 100644 --- a/src/forms/Bookmark/BookmarkView.cpp +++ b/src/forms/Bookmark/BookmarkView.cpp @@ -1411,7 +1411,7 @@ void BookmarkView::onEnterWindow( wxMouseEvent& event ) { } #endif - setStatusText("You can mouse-drag a bookmark entry from one category to the next..etc. TODO: add more Bookmarks descriptions"); + setStatusText("Drag & Drop to create / move bookmarks, Group and arrange bookmarks, quick Search by keywords."); } diff --git a/src/process/FFTDataDistributor.cpp b/src/process/FFTDataDistributor.cpp index 6ad2ba2..07a6a3d 100644 --- a/src/process/FFTDataDistributor.cpp +++ b/src/process/FFTDataDistributor.cpp @@ -5,6 +5,9 @@ #include #include +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) + FFTDataDistributor::FFTDataDistributor() : outputBuffers("FFTDataDistributorBuffers"), fftSize(DEFAULT_FFT_SIZE), linesPerSecond(DEFAULT_WATERFALL_LPS), lineRateAccum(0.0) { } @@ -29,7 +32,10 @@ void FFTDataDistributor::process() { return; } DemodulatorThreadIQDataPtr inp; - input->pop(inp); + + if (!input->pop(inp, HEARTBEAT_CHECK_PERIOD_MICROS)) { + continue; + } if (inp) { //Settings have changed, set new values and dump all previous samples stored in inputBuffer: diff --git a/src/process/SpectrumVisualProcessor.cpp b/src/process/SpectrumVisualProcessor.cpp index 582eb6e..2e69fda 100644 --- a/src/process/SpectrumVisualProcessor.cpp +++ b/src/process/SpectrumVisualProcessor.cpp @@ -4,6 +4,8 @@ #include "SpectrumVisualProcessor.h" #include "CubicSDR.h" +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) SpectrumVisualProcessor::SpectrumVisualProcessor() : outputBuffers("SpectrumVisualProcessorBuffers") { lastInputBandwidth = 0; @@ -194,7 +196,9 @@ void SpectrumVisualProcessor::process() { DemodulatorThreadIQDataPtr iqData; - input->pop(iqData); + if (!input->pop(iqData, HEARTBEAT_CHECK_PERIOD_MICROS)) { + return; + } if (!iqData) { return; diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index c7f77be..df1810a 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -101,7 +101,7 @@ protected: //available outputs, previously set by attachOutput(). //* \param[in] timeout The number of microseconds to wait to push an item in each one of the outputs, 0(default) means indefinite wait. //* \param[in] errorMessage an error message written on std::cout in case pf push timeout. - void distribute(OutputDataTypePtr item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") { + void distribute(OutputDataTypePtr item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = nullptr) { std::lock_guard < std::recursive_mutex > busy_lock(busy_update); //We will try to distribute 'output' among all 'outputs', diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index 01298e2..fa9b38a 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -8,6 +8,9 @@ #include #include +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) + SDRPostThread::SDRPostThread() : IOThread(), buffers("SDRPostThreadBuffers"), visualDataBuffers("SDRPostThreadVisualDataBuffers"), frequency(0) { iqDataInQueue = NULL; iqDataOutQueue = NULL; @@ -185,7 +188,9 @@ void SDRPostThread::run() { while (!stopping) { SDRThreadIQDataPtr data_in; - iqDataInQueue->pop(data_in); + if (!iqDataInQueue->pop(data_in, HEARTBEAT_CHECK_PERIOD_MICROS)) { + continue; + } // std::lock_guard < std::mutex > lock(data_in->m_mutex); std::lock_guard < std::mutex > lock(busy_demod); diff --git a/src/util/ThreadBlockingQueue.h b/src/util/ThreadBlockingQueue.h index d4b7c1e..cf8134c 100644 --- a/src/util/ThreadBlockingQueue.h +++ b/src/util/ThreadBlockingQueue.h @@ -74,10 +74,10 @@ public: * \param[in] item An item. * \param[in] timeout a max waiting timeout in microseconds for an item to be pushed. * by default, = 0 means indefinite wait. - * \param[in] errorMessage an error message written on std::cout in case of the timeout wait + * \param[in] errorMessage if != nullptr (is nullptr by default) an error message written on std::cout in case of the timeout wait * \return true if an item was pushed into the queue, else a timeout has occured. */ - bool push(const value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT,const char* errorMessage = "") { + bool push(const value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT,const char* errorMessage = nullptr) { std::unique_lock < std::mutex > lock(m_mutex); if (timeout == BLOCKING_INFINITE_TIMEOUT) { @@ -90,12 +90,15 @@ public: return false; } else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout), - [this]() { return m_queue.size() < m_max_num_items; })) { - std::thread::id currentThreadId = std::this_thread::get_id(); - std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << - " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " << - (timeout * 0.001) << " ms, message: " << errorMessage << std::endl; - return false; + [this]() { return m_queue.size() < m_max_num_items; })) { + + if (errorMessage != nullptr) { + std::thread::id currentThreadId = std::this_thread::get_id(); + std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << + " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " << + (timeout * 0.001) << " ms, message: " << errorMessage << std::endl << std::flush; + } + return false; } m_queue.push_back(item); @@ -123,10 +126,10 @@ public: /** * Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available. * \param[in] timeout The number of microseconds to wait. O (default) means indefinite wait. - * \param[in] errorMessage an error message written on std::cout in case of the timeout wait + * \param[in] errorMessage if != nullptr (is nullptr by default) an error message written on std::cout in case of the timeout wait * \return true if get an item from the queue, false if no item is received before the timeout. */ - bool pop(value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") { + bool pop(value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = nullptr) { std::unique_lock < std::mutex > lock(m_mutex); if (timeout == BLOCKING_INFINITE_TIMEOUT) { @@ -140,10 +143,13 @@ public: } else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout), [this]() { return !m_queue.empty(); })) { - std::thread::id currentThreadId = std::this_thread::get_id(); - std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << - " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " << - (timeout * 0.001) << " ms, message: " << errorMessage << std::endl; + + if (errorMessage != nullptr) { + std::thread::id currentThreadId = std::this_thread::get_id(); + std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << + " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " << + (timeout * 0.001) << " ms, message: " << errorMessage << std::endl << std::flush; + } return false; }